opex792's picture
Update app.py
949a8a8 verified
raw
history blame
15 kB
import gradio as gr
from sentence_transformers import SentenceTransformer, util
import os
import time
import threading
import queue
import torch
import psycopg2
import zlib
from urllib.parse import urlparse
# Настройки базы данных PostgreSQL
DATABASE_URL = os.environ.get("DATABASE_URL")
if DATABASE_URL is None:
raise ValueError("DATABASE_URL environment variable not set.")
parsed_url = urlparse(DATABASE_URL)
db_params = {
"host": parsed_url.hostname,
"port": parsed_url.port,
"database": parsed_url.path.lstrip("/"),
"user": parsed_url.username,
"password": parsed_url.password,
"sslmode": "require"
}
# Загружаем модель
model_name = "BAAI/bge-m3"
model = SentenceTransformer(model_name)
# Имена таблиц
embeddings_table = "movie_embeddings"
query_cache_table = "query_cache"
# Максимальный размер таблицы кэша запросов в байтах (50MB)
MAX_CACHE_SIZE = 50 * 1024 * 1024
# Загружаем данные из файла movies.json
try:
import json
with open("movies.json", "r", encoding="utf-8") as f:
movies_data = json.load(f)
except FileNotFoundError:
print("Ошибка: Файл movies.json не найден.")
movies_data = []
# Очередь для необработанных фильмов
movies_queue = queue.Queue()
for movie in movies_data:
movies_queue.put(movie)
# Флаг, указывающий, что обработка фильмов завершена
processing_complete = False
# Флаг, указывающий, что выполняется поиск
search_in_progress = False
# Блокировка для доступа к базе данных
db_lock = threading.Lock()
# Размер пакета для обработки эмбеддингов
batch_size = 32
def get_db_connection():
"""Устанавливает соединение с базой данных."""
try:
conn = psycopg2.connect(**db_params)
return conn
except Exception as e:
print(f"Ошибка подключения к базе данных: {e}")
return None
def setup_database():
"""Настраивает базу данных: создает расширение, таблицы и триггер."""
conn = get_db_connection()
if conn is None:
return
with conn.cursor() as cur:
# Создаем расширение pgvector
cur.execute("CREATE EXTENSION IF NOT EXISTS vector;")
# Создаем таблицу для хранения эмбеддингов фильмов
cur.execute(f"""
CREATE TABLE IF NOT EXISTS {embeddings_table} (
movie_id INTEGER,
embedding_crc32 BIGINT PRIMARY KEY,
string_crc32 BIGINT,
model_name TEXT,
embedding vector(1024)
);
""")
# Создаем таблицу для кэширования эмбеддингов запросов
cur.execute(f"""
CREATE TABLE IF NOT EXISTS {query_cache_table} (
query_crc32 BIGINT PRIMARY KEY,
query TEXT,
model_name TEXT,
embedding vector(1024),
created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP
);
CREATE INDEX IF NOT EXISTS idx_query_crc32 ON {query_cache_table} (query_crc32);
CREATE INDEX IF NOT EXISTS idx_created_at ON {query_cache_table} (created_at);
""")
# Создаем функцию и триггер для автоматического удаления старых записей из таблицы кэша запросов
cur.execute(f"""
CREATE OR REPLACE FUNCTION manage_query_cache_size()
RETURNS TRIGGER AS $$
DECLARE
table_size BIGINT;
row_to_delete RECORD;
BEGIN
SELECT pg_total_relation_size('{query_cache_table}') INTO table_size;
IF table_size > {MAX_CACHE_SIZE} THEN
FOR row_to_delete IN
SELECT query_crc32
FROM {query_cache_table}
ORDER BY created_at ASC
LOOP
DELETE FROM {query_cache_table} WHERE query_crc32 = row_to_delete.query_crc32;
SELECT pg_total_relation_size('{query_cache_table}') INTO table_size;
EXIT WHEN table_size <= {MAX_CACHE_SIZE};
END LOOP;
END IF;
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
CREATE OR REPLACE TRIGGER trg_manage_query_cache_size
AFTER INSERT ON {query_cache_table}
FOR EACH ROW
EXECUTE PROCEDURE manage_query_cache_size();
""")
conn.commit()
conn.close()
# Настраиваем базу данных при запуске приложения
setup_database()
def calculate_crc32(text):
"""Вычисляет CRC32 для строки."""
return zlib.crc32(text.encode('utf-8')) & 0xFFFFFFFF
def encode_string(text):
"""Кодирует строку в эмбеддинг."""
return model.encode(text, convert_to_tensor=True, normalize_embeddings=True)
def get_embedding_from_db(conn, table_name, crc32_column, crc32_value, model_name):
"""
Пытается получить эмбеддинг из указанной таблицы по CRC32.
Возвращает эмбеддинг, если найден, иначе None.
"""
with conn.cursor() as cur:
cur.execute(f"SELECT embedding FROM {table_name} WHERE {crc32_column} = %s AND model_name = %s", (crc32_value, model_name))
result = cur.fetchone()
if result:
return torch.tensor(result[0])
else:
return None
def insert_embedding(conn, table_name, crc32_column, crc32_value, other_columns, embedding):
"""Вставляет эмбеддинг в указанную таблицу."""
columns = ', '.join([crc32_column] + list(other_columns.keys()) + ['model_name', 'embedding'])
placeholders = ', '.join(['%s'] * (len(other_columns) + 3))
values = (crc32_value,) + tuple(other_columns.values()) + (model_name, embedding.tolist())
with conn.cursor() as cur:
try:
cur.execute(f"""
INSERT INTO {table_name} ({columns})
VALUES ({placeholders})
ON CONFLICT ({crc32_column}) DO NOTHING;
""", values)
conn.commit()
return True
except Exception as e:
print(f"Ошибка при вставке эмбеддинга в таблицу {table_name}: {e}")
conn.rollback()
return False
def process_movies():
"""
Обрабатывает фильмы из очереди, создавая для них эмбеддинги и сохраняя их в базу данных.
"""
global processing_complete
conn = get_db_connection()
if conn is None:
processing_complete = True
return
while True:
if search_in_progress:
time.sleep(1) # Ждем, пока поиск не завершится
continue
batch = []
while not movies_queue.empty() and len(batch) < batch_size:
try:
movie = movies_queue.get(timeout=1)
batch.append(movie)
except queue.Empty:
break
if not batch:
print("Очередь фильмов пуста.")
processing_complete = True
break
titles = [movie["name"] for movie in batch]
embedding_strings = [
f"Название: {movie['name']}\nГод: {movie['year']}\nЖанры: {movie['genresList']}\nОписание: {movie['description']}"
for movie in batch
]
print(f"Создаются эмбеддинги для фильмов: {', '.join(titles)}...")
with db_lock:
for movie, embedding_string in zip(batch, embedding_strings):
movie_id = movie['id']
embedding_crc32 = calculate_crc32(str(embedding.tolist()))
string_crc32 = calculate_crc32(embedding_string)
# Проверяем, есть ли уже эмбеддинг для этого фильма в базе данных
existing_embedding = get_embedding_from_db(conn, embeddings_table, "embedding_crc32", embedding_crc32, model_name)
if existing_embedding is None:
# Создаем эмбеддинг, только если его нет в базе данных
embedding = encode_string(embedding_string)
if insert_embedding(conn, embeddings_table, "embedding_crc32", embedding_crc32, {"movie_id": movie_id, "string_crc32": string_crc32}, embedding):
print(f"Эмбеддинг для фильма '{movie['name']}' сохранен в базе данных.")
else:
print(f"Ошибка сохранения эмбеддинга для фильма '{movie['name']}'.")
else:
print(f"Эмбеддинг для фильма '{movie['name']}' уже существует в базе данных.")
conn.close()
print("Обработка фильмов завершена.")
def get_movie_embeddings(conn):
"""Загружает все эмбеддинги фильмов из базы данных."""
movie_embeddings = {}
with conn.cursor() as cur:
cur.execute(f"SELECT movie_id, embedding FROM {embeddings_table}")
rows = cur.fetchall()
for row in rows:
movie_id, embedding = row
# Находим название фильма по его ID
for movie in movies_data:
if movie['id'] == movie_id:
title = movie["name"]
movie_embeddings[title] = torch.tensor(embedding)
break
return movie_embeddings
def search_movies(query, top_k=10):
"""
Ищет наиболее похожие фильмы по запросу.
Args:
query: Текстовый запрос.
top_k: Количество возвращаемых результатов.
Returns:
Строку с результатами поиска в формате HTML.
"""
global search_in_progress
search_in_progress = True
start_time = time.time()
print(f"\n\033[1mПоиск по запросу: '{query}'\033[0m")
conn = get_db_connection()
if conn is None:
search_in_progress = False
return "<p>Ошибка подключения к базе данных.</p>"
query_crc32 = calculate_crc32(query)
# Проверяем, есть ли уже эмбеддинг для этого запроса в кэше
print(f"Начало поиска эмбеддинга запроса в кэше: {time.strftime('%Y-%m-%d %H:%M:%S')}")
query_embedding_tensor = get_embedding_from_db(conn, query_cache_table, "query_crc32", query_crc32, model_name)
print(f"Окончание поиска эмбеддинга запроса в кэше: {time.strftime('%Y-%m-%d %H:%M:%S')}")
if query_embedding_tensor is None:
print(f"Начало создания эмбеддинга для запроса: {time.strftime('%Y-%m-%d %H:%M:%S')}")
query_embedding_tensor = encode_string(query)
print(f"Окончание создания эмбеддинга для запроса: {time.strftime('%Y-%m-%d %H:%M:%S')}")
# Вставляем эмбеддинг запроса в базу данных
insert_embedding(conn, query_cache_table, "query_crc32", query_crc32, {"query": query}, query_embedding_tensor)
with db_lock:
current_movie_embeddings = get_movie_embeddings(conn)
conn.close()
if not current_movie_embeddings:
search_in_progress = False
return "<p>Пока что нет обработанных фильмов. Попробуйте позже.</p>"
# Преобразуем эмбеддинги фильмов в тензор
movie_titles = list(current_movie_embeddings.keys())
movie_embeddings_tensor = torch.stack(list(current_movie_embeddings.values()))
print(f"Начало поиска похожих фильмов: {time.strftime('%Y-%m-%d %H:%M:%S')}")
# Используем util.semantic_search для поиска похожих фильмов
hits = util.semantic_search(query_embedding_tensor, movie_embeddings_tensor, top_k=top_k)[0]
print(f"Окончание поиска похожих фильмов: {time.strftime('%Y-%m-%d %H:%M:%S')}")
results_html = ""
for hit in hits:
title = movie_titles[hit['corpus_id']]
score = hit['score']
# Ищем полное описание фильма в исходных данных
for movie in movies_data:
if movie["name"] == title:
description = movie["description"]
year = movie["year"]
genres = movie["genresList"]
break
results_html += f"<h3><b>{title} ({year})</b></h3>"
results_html += f"<p><b>Жанры:</b> {genres}</p>"
results_html += f"<p><b>Описание:</b> {description}</p>"
results_html += f"<p><b>Сходство:</b> {score:.4f}</p>"
results_html += "<hr>"
end_time = time.time()
execution_time = end_time - start_time
print(f"Поиск завершен за {execution_time:.4f} секунд.")
search_in_progress = False
return results_html
# Поток для обработки фильмов
processing_thread = threading.Thread(target=process_movies)
# Создаем интерфейс Gradio
iface = gr.Interface(
fn=search_movies,
inputs=gr.Textbox(label="Введите запрос:"),
outputs=gr.HTML(label="Результаты поиска:"),
title="Поиск фильмов по описанию",
description="Введите запрос, и система найдет наиболее похожие фильмы по их описаниям.",
examples=[
["Фильм про ограбление"],
["Комедия 2019 года"],
["Фантастика про космос"],
],
)
# Запускаем поток для обработки фильмов
processing_thread.start()
# Запускаем приложение
iface.queue()
iface.launch()