opex792's picture
Update app.py
29b1a76 verified
raw
history blame
14.7 kB
import gradio as gr
from sentence_transformers import SentenceTransformer, util
import os
import time
import threading
import queue
import torch
import psycopg2
import zlib
from urllib.parse import urlparse
# Настройки базы данных PostgreSQL
DATABASE_URL = os.environ.get("DATABASE_URL")
if DATABASE_URL is None:
raise ValueError("DATABASE_URL environment variable not set.")
parsed_url = urlparse(DATABASE_URL)
db_params = {
"host": parsed_url.hostname,
"port": parsed_url.port,
"database": parsed_url.path.lstrip("/"),
"user": parsed_url.username,
"password": parsed_url.password,
"sslmode": "require"
}
# Загружаем модель
model_name = "BAAI/bge-m3"
model = SentenceTransformer(model_name)
# Имена таблиц
embeddings_table = "movie_embeddings"
query_cache_table = "query_cache"
# Максимальный размер таблицы кэша запросов в байтах (50MB)
MAX_CACHE_SIZE = 50 * 1024 * 1024
# Загружаем данные из файла movies.json
try:
import json
with open("movies.json", "r", encoding="utf-8") as f:
movies_data = json.load(f)
except FileNotFoundError:
print("Ошибка: Файл movies.json не найден.")
movies_data = []
# Очередь для необработанных фильмов
movies_queue = queue.Queue()
for movie in movies_data:
movies_queue.put(movie)
# Флаг, указывающий, что обработка фильмов завершена
processing_complete = False
# Флаг, указывающий, что выполняется поиск
search_in_progress = False
# Блокировка для доступа к базе данных
db_lock = threading.Lock()
# Размер пакета для обработки эмбеддингов
batch_size = 32
def get_db_connection():
"""Устанавливает соединение с базой данных."""
try:
conn = psycopg2.connect(**db_params)
return conn
except Exception as e:
print(f"Ошибка подключения к базе данных: {e}")
return None
def setup_database():
"""Настраивает базу данных: создает расширение, таблицы и триггер."""
conn = get_db_connection()
if conn is None:
return
with conn.cursor() as cur:
# Создаем расширение pgvector
cur.execute("CREATE EXTENSION IF NOT EXISTS vector;")
# Создаем таблицу для хранения эмбеддингов фильмов
cur.execute(f"""
CREATE TABLE IF NOT EXISTS {embeddings_table} (
movie_id INTEGER,
embedding_crc32 BIGINT PRIMARY KEY,
string_crc32 BIGINT,
model_name TEXT,
embedding vector(1024)
);
""")
# Создаем таблицу для кэширования эмбеддингов запросов
cur.execute(f"""
CREATE TABLE IF NOT EXISTS {query_cache_table} (
query_crc32 BIGINT PRIMARY KEY,
query TEXT,
model_name TEXT,
embedding vector(1024),
created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP
);
CREATE INDEX IF NOT EXISTS idx_query_crc32 ON {query_cache_table} (query_crc32);
CREATE INDEX IF NOT EXISTS idx_created_at ON {query_cache_table} (created_at);
""")
# Создаем функцию и триггер для автоматического удаления старых записей из таблицы кэша запросов
cur.execute(f"""
CREATE OR REPLACE FUNCTION manage_query_cache_size()
RETURNS TRIGGER AS $$
DECLARE
table_size BIGINT;
row_to_delete RECORD;
BEGIN
SELECT pg_total_relation_size('{query_cache_table}') INTO table_size;
IF table_size > {MAX_CACHE_SIZE} THEN
FOR row_to_delete IN SELECT query_crc32 FROM {query_cache_table} ORDER BY created_at ASC LOOP
DELETE FROM {query_cache_table} WHERE query_crc32 = row_to_delete.query_crc32;
SELECT pg_total_relation_size('{query_cache_table}') INTO table_size;
EXIT WHEN table_size <= {MAX_CACHE_SIZE};
END LOOP;
END IF;
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
CREATE OR REPLACE TRIGGER trg_manage_query_cache_size
AFTER INSERT ON {query_cache_table}
FOR EACH ROW
EXECUTE PROCEDURE manage_query_cache_size();
""")
conn.commit()
conn.close()
# Настраиваем базу данных при запуске приложения
setup_database()
def calculate_crc32(text):
"""Вычисляет CRC32 для строки."""
return zlib.crc32(text.encode('utf-8')) & 0xFFFFFFFF
def encode_string(text):
"""Кодирует строку в эмбеддинг."""
return model.encode(text, convert_to_tensor=True, normalize_embeddings=True)
def get_embedding_from_db(conn, table_name, crc32_column, crc32_value, model_name):
"""
Пытается получить эмбеддинг из указанной таблицы по CRC32.
Возвращает эмбеддинг, если найден, иначе None.
"""
with conn.cursor() as cur:
cur.execute(f"SELECT embedding FROM {table_name} WHERE {crc32_column} = %s AND model_name = %s", (crc32_value, model_name))
result = cur.fetchone()
if result:
return torch.tensor(result[0])
else:
return None
def insert_embedding(conn, table_name, crc32_column, crc32_value, other_columns, embedding):
"""Вставляет эмбеддинг в указанную таблицу."""
columns = ', '.join([crc32_column] + list(other_columns.keys()) + ['model_name', 'embedding'])
placeholders = ', '.join(['%s'] * (len(other_columns) + 3))
values = (crc32_value,) + tuple(other_columns.values()) + (model_name, embedding.tolist())
with conn.cursor() as cur:
try:
cur.execute(f"""
INSERT INTO {table_name} ({columns})
VALUES ({placeholders})
ON CONFLICT ({crc32_column}) DO NOTHING;
""", values)
conn.commit()
return True
except Exception as e:
print(f"Ошибка при вставке эмбеддинга в таблицу {table_name}: {e}")
conn.rollback()
return False
def process_movies():
"""
Обрабатывает фильмы из очереди, создавая для них эмбеддинги и сохраняя их в базу данных.
"""
global processing_complete
conn = get_db_connection()
if conn is None:
processing_complete = True
return
while True:
if search_in_progress:
time.sleep(1) # Ждем, пока поиск не завершится
continue
batch = []
while not movies_queue.empty() and len(batch) < batch_size:
try:
movie = movies_queue.get(timeout=1)
batch.append(movie)
except queue.Empty:
break
if not batch:
print("Очередь фильмов пуста.")
processing_complete = True
break
titles = [movie["name"] for movie in batch]
embedding_strings = [
f"Название: {movie['name']}\nГод: {movie['year']}\nЖанры: {movie['genresList']}\nОписание: {movie['description']}"
for movie in batch
]
print(f"Создаются эмбеддинги для фильмов: {', '.join(titles)}...")
with db_lock:
for movie, embedding_string in zip(batch, embedding_strings):
movie_id = movie['id']
string_crc32 = calculate_crc32(embedding_string)
# Проверяем, есть ли уже эмбеддинг для этого фильма в базе данных
existing_embedding = get_embedding_from_db(conn, embeddings_table, "string_crc32", string_crc32, model_name)
if existing_embedding is None:
# Создаем эмбеддинг, только если его нет в базе данных
embedding = encode_string(embedding_string)
embedding_crc32 = calculate_crc32(embedding.cpu().numpy().tobytes())
if insert_embedding(conn, embeddings_table, "embedding_crc32", embedding_crc32, {"movie_id": movie_id, "string_crc32": string_crc32}, embedding):
print(f"Эмбеддинг для фильма '{movie['name']}' сохранен в базе данных.")
else:
print(f"Ошибка сохранения эмбеддинга для фильма '{movie['name']}'.")
else:
print(f"Эмбеддинг для фильма '{movie['name']}' уже существует в базе данных.")
conn.close()
print("Обработка фильмов завершена.")
def get_movie_embeddings(conn):
"""Загружает все эмбеддинги фильмов из базы данных."""
movie_embeddings = {}
with conn.cursor() as cur:
cur.execute(f"SELECT movie_id, embedding FROM {embeddings_table}")
rows = cur.fetchall()
for row in rows:
movie_id, embedding = row
# Находим название фильма по его ID
for movie in movies_data:
if movie['id'] == movie_id:
title = movie["name"]
movie_embeddings[title] = torch.tensor(embedding)
break
return movie_embeddings
def search_movies(query, top_k=10):
"""
Ищет наиболее похожие фильмы по запросу.
Args:
query: Текстовый запрос.
top_k: Количество возвращаемых результатов.
Returns:
Строку с результатами поиска в формате HTML.
"""
global search_in_progress
search_in_progress = True
start_time = time.time()
print(f"\n\033[1mПоиск по запросу: '{query}'\033[0m")
conn = get_db_connection()
if conn is None:
search_in_progress = False
return "<p>Ошибка подключения к базе данных.</p>"
query_crc32 = calculate_crc32(query)
# Проверяем, есть ли уже эмбеддинг для этого запроса в кэше
print(f"Начало поиска эмбеддинга запроса в кэше: {time.strftime('%Y-%m-%d %H:%M:%S')}")
query_embedding_tensor = get_embedding_from_db(conn, query_cache_table, "query_crc32", query_crc32, model_name)
print(f"Окончание поиска эмбеддинга запроса в кэше: {time.strftime('%Y-%m-%d %H:%M:%S')}")
if query_embedding_tensor is None:
# Если эмбеддинга нет в кэше, создаем новый
print(f"Начало создания эмбеддинга запроса: {time.strftime('%Y-%m-%d %H:%M:%S')}")
query_embedding_tensor = encode_string(query)
print(f"Окончание создания эмбеддинга запроса: {time.strftime('%Y-%m-%d %H:%M:%S')}")
# Сохраняем эмбеддинг запроса в кэш
insert_embedding(conn, query_cache_table, "query_crc32", query_crc32, {"query": query}, query_embedding_tensor)
else:
print("Эмбеддинг запроса найден в кэше.")
# Загружаем эмбеддинги фильмов
print(f"Начало загрузки эмбеддингов фильмов: {time.strftime('%Y-%m-%d %H:%M:%S')}")
movie_embeddings = get_movie_embeddings(conn)
print(f"Окончание загрузки эмбеддингов фильмов: {time.strftime('%Y-%m-%d %H:%M:%S')}")
# Вычисляем косинусное сходство
print(f"Начало вычисления косинусного сходства: {time.strftime('%Y-%m-%d %H:%M:%S')}")
similarities = []
for title, movie_embedding in movie_embeddings.items():
similarity = util.pytorch_cos_sim(query_embedding_tensor, movie_embedding).item()
similarities.append((title, similarity))
# Сортируем результаты
similarities.sort(key=lambda x: x[1], reverse=True)
top_results = similarities[:top_k]
print(f"Окончание вычисления косинусного сходства: {time.strftime('%Y-%m-%d %H:%M:%S')}")
# Формируем HTML-строку с результатами
results_html = "<ol>"
for title, score in top_results:
results_html += f"<li><strong>{title}</strong> (Сходство: {score:.4f})</li>"
results_html += "</ol>"
search_in_progress = False
end_time = time.time()
search_time = end_time - start_time
print(f"\033[1mПоиск завершен за {search_time:.2f} секунд.\033[0m")
return f"<p>Время поиска: {search_time:.2f} секунд</p>" + results_html
# Запускаем обработку фильмов в отдельном потоке
processing_thread = threading.Thread(target=process_movies)
processing_thread.start()
# Создаем интерфейс Gradio
iface = gr.Interface(
fn=search_movies,
inputs=gr.Textbox(lines=2, placeholder="Введите запрос для поиска фильмов..."),
outputs=gr.HTML(label="Результаты поиска"),
title="Семантический поиск фильмов",
description="Введите описание фильма, который вы ищете, и система найдет наиболее похожие фильмы."
)
# Запускаем интерфейс
iface.launch()