import gradio as gr from sentence_transformers import SentenceTransformer, util import os import time import threading import queue import torch import psycopg2 import zlib import numpy as np from urllib.parse import urlparse import logging # Настройка логирования logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') # Настройки базы данных PostgreSQL DATABASE_URL = os.environ.get("DATABASE_URL") if DATABASE_URL is None: raise ValueError("DATABASE_URL environment variable not set.") parsed_url = urlparse(DATABASE_URL) db_params = { "host": parsed_url.hostname, "port": parsed_url.port, "database": parsed_url.path.lstrip("/"), "user": parsed_url.username, "password": parsed_url.password, "sslmode": "require" } # Загружаем модель model_name = "BAAI/bge-m3" logging.info(f"Загрузка модели {model_name}...") model = SentenceTransformer(model_name) logging.info("Модель загружена успешно.") # Имена таблиц embeddings_table = "movie_embeddings" query_cache_table = "query_cache" # Максимальный размер таблицы кэша запросов в байтах (50MB) MAX_CACHE_SIZE = 50 * 1024 * 1024 # Загружаем данные из файла movies.json try: import json with open("movies.json", "r", encoding="utf-8") as f: movies_data = json.load(f) logging.info(f"Загружено {len(movies_data)} фильмов из movies.json") except FileNotFoundError: logging.error("Ошибка: Файл movies.json не найден.") movies_data = [] # Очередь для необработанных фильмов movies_queue = queue.Queue() # Флаг, указывающий, что обработка фильмов завершена processing_complete = False # Флаг, указывающий, что выполняется поиск search_in_progress = False # Блокировка для доступа к базе данных db_lock = threading.Lock() # Размер пакета для обработки эмбеддингов batch_size = 32 def get_db_connection(): """Устанавливает соединение с базой данных.""" try: conn = psycopg2.connect(**db_params) return conn except Exception as e: logging.error(f"Ошибка подключения к базе данных: {e}") return None def setup_database(): """Настраивает базу данных: создает расширение, таблицы и индексы.""" conn = get_db_connection() if conn is None: return try: with conn.cursor() as cur: # Создаем расширение pgvector если его нет cur.execute("CREATE EXTENSION IF NOT EXISTS vector;") # Удаляем существующие таблицы если они есть cur.execute(f"DROP TABLE IF EXISTS {embeddings_table}, {query_cache_table};") # Создаем таблицу для хранения эмбеддингов фильмов cur.execute(f""" CREATE TABLE {embeddings_table} ( movie_id INTEGER PRIMARY KEY, embedding_crc32 BIGINT, string_crc32 BIGINT, model_name TEXT, embedding vector(1024) ); CREATE INDEX ON {embeddings_table} (string_crc32); """) # Создаем таблицу для кэширования запросов cur.execute(f""" CREATE TABLE {query_cache_table} ( query_crc32 BIGINT PRIMARY KEY, query TEXT, model_name TEXT, embedding vector(1024), created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP ); CREATE INDEX ON {query_cache_table} (query_crc32); CREATE INDEX ON {query_cache_table} (created_at); """) conn.commit() logging.info("База данных успешно настроена.") except Exception as e: logging.error(f"Ошибка при настройке базы данных: {e}") conn.rollback() finally: conn.close() # Настраиваем базу данных при запуске setup_database() def calculate_crc32(text): """Вычисляет CRC32 для строки.""" return zlib.crc32(text.encode('utf-8')) & 0xFFFFFFFF def encode_string(text): """Кодирует строку в эмбеддинг.""" return model.encode(text, convert_to_tensor=True, normalize_embeddings=True) def get_movies_without_embeddings(): """Получает список фильмов, для которых нужно создать эмбеддинги.""" conn = get_db_connection() if conn is None: return [] movies_to_process = [] try: with conn.cursor() as cur: # Получаем список ID фильмов, которые уже есть в базе cur.execute(f"SELECT movie_id FROM {embeddings_table}") existing_ids = {row[0] for row in cur.fetchall()} # Фильтруем только те фильмы, которых нет в базе for movie in movies_data: if movie['id'] not in existing_ids: movies_to_process.append(movie) logging.info(f"Найдено {len(movies_to_process)} фильмов для обработки.") except Exception as e: logging.error(f"Ошибка при получении списка фильмов для обработки: {e}") finally: conn.close() return movies_to_process def get_embedding_from_db(conn, table_name, crc32_column, crc32_value, model_name): """Получает эмбеддинг из базы данных.""" try: with conn.cursor() as cur: cur.execute(f"SELECT embedding FROM {table_name} WHERE {crc32_column} = %s AND model_name = %s", (crc32_value, model_name)) result = cur.fetchone() if result and result[0]: return np.array(result[0]) except Exception as e: logging.error(f"Ошибка при получении эмбеддинга из БД: {e}") return None def insert_embedding(conn, table_name, movie_id, embedding_crc32, string_crc32, embedding): """Вставляет эмбеддинг в базу данных.""" try: with conn.cursor() as cur: cur.execute(f""" INSERT INTO {table_name} (movie_id, embedding_crc32, string_crc32, model_name, embedding) VALUES (%s, %s, %s, %s, %s) ON CONFLICT (movie_id) DO NOTHING """, (movie_id, embedding_crc32, string_crc32, model_name, embedding.tolist())) conn.commit() return True except Exception as e: logging.error(f"Ошибка при вставке эмбеддинга: {e}") conn.rollback() return False def process_movies(): """Обрабатывает фильмы, создавая для них эмбеддинги.""" global processing_complete logging.info("Начало обработки фильмов.") # Получаем список фильмов, которые нужно обработать movies_to_process = get_movies_without_embeddings() if not movies_to_process: logging.info("Все фильмы уже обработаны.") processing_complete = True return # Добавляем фильмы в очередь for movie in movies_to_process: movies_queue.put(movie) conn = get_db_connection() if conn is None: processing_complete = True return try: while not movies_queue.empty(): if search_in_progress: time.sleep(1) continue batch = [] while not movies_queue.empty() and len(batch) < batch_size: try: movie = movies_queue.get_nowait() batch.append(movie) except queue.Empty: break if not batch: break logging.info(f"Обработка пакета из {len(batch)} фильмов...") for movie in batch: embedding_string = f"Название: {movie['name']}\nГод: {movie['year']}\nЖанры: {movie['genresList']}\nОписание: {movie['description']}" string_crc32 = calculate_crc32(embedding_string) # Проверяем существующий эмбеддинг existing_embedding = get_embedding_from_db(conn, embeddings_table, "string_crc32", string_crc32, model_name) if existing_embedding is None: embedding = encode_string(embedding_string) embedding_crc32 = calculate_crc32(str(embedding.tolist())) if insert_embedding(conn, embeddings_table, movie['id'], embedding_crc32, string_crc32, embedding): logging.info(f"Сохранен эмбеддинг для '{movie['name']}'") else: logging.error(f"Ошибка сохранения эмбеддинга для '{movie['name']}'") else: logging.info(f"Эмбеддинг для '{movie['name']}' уже существует") except Exception as e: logging.error(f"Ошибка при обработке фильмов: {e}") finally: conn.close() processing_complete = True logging.info("Обработка фильмов завершена") def get_movie_embeddings(conn): """Загружает все эмбеддинги фильмов из базы данных.""" movie_embeddings = {} try: with conn.cursor() as cur: cur.execute(f"SELECT movie_id, embedding FROM {embeddings_table}") for movie_id, embedding in cur.fetchall(): # Находим название фильма по ID for movie in movies_data: if movie['id'] == movie_id: movie_embeddings[movie['name']] = np.array(embedding) break logging.info(f"Загружено {len(movie_embeddings)} эмбеддингов фильмов.") except Exception as e: logging.error(f"Ошибка при загрузке эмбеддингов фильмов: {e}") return movie_embeddings def search_movies(query, top_k=10): """Выполняет поиск фильмов по запросу.""" global search_in_progress search_in_progress = True start_time = time.time() try: conn = get_db_connection() if conn is None: return "

Ошибка подключения к базе данных

" query_crc32 = calculate_crc32(query) query_embedding = get_embedding_from_db(conn, query_cache_table, "query_crc32", query_crc32, model_name) if query_embedding is None: query_embedding = encode_string(query) try: with conn.cursor() as cur: cur.execute(f""" INSERT INTO {query_cache_table} (query_crc32, query, model_name, embedding) VALUES (%s, %s, %s, %s) ON CONFLICT (query_crc32) DO NOTHING """, (query_crc32, query, model_name, query_embedding.tolist())) conn.commit() logging.info(f"Сохранен новый эмбеддинг запроса: {query}") except Exception as e: logging.error(f"Ошибка при сохранении эмбеддинга запроса: {e}") conn.rollback() # Используем косинусное расстояние для поиска try: with conn.cursor() as cur: cur.execute(f""" WITH query_embedding AS ( SELECT embedding FROM {query_cache_table} WHERE query_crc32 = %s ) SELECT m.movie_id, 1 - (m.embedding <=> (SELECT embedding FROM query_embedding)) as similarity FROM {embeddings_table} m, query_embedding ORDER BY similarity DESC LIMIT %s """, (query_crc32, top_k)) results = cur.fetchall() logging.info(f"Найдено {len(results)} результатов поиска.") except Exception as e: logging.error(f"Ошибка при выполнении поискового запроса: {e}") results = [] results_html = "
    " for movie_id, similarity in results: # Находим название фильма по ID movie_title = None for movie in movies_data: if movie['id'] == movie_id: movie_title = movie['name'] break if movie_title: results_html += f"
  1. {movie_title} (Сходство: {similarity:.4f})
  2. " results_html += "
" search_time = time.time() - start_time logging.info(f"Поиск выполнен за {search_time:.2f} секунд.") return f"

Время поиска: {search_time:.2f} сек

{results_html}" except Exception as e: logging.error(f"Ошибка при выполнении поиска: {e}") return "

Произошла ошибка при выполнении поиска.

" finally: if conn: conn.close() search_in_progress = False # Запускаем обработку фильмов в отдельном потоке processing_thread = threading.Thread(target=process_movies) processing_thread.start() # Создаем интерфейс Gradio iface = gr.Interface( fn=search_movies, inputs=gr.Textbox(lines=2, placeholder="Введите запрос для поиска фильмов..."), outputs=gr.HTML(label="Результаты поиска"), title="Семантический поиск фильмов", description="Введите описание фильма, который вы ищете, и система найдет наиболее похожие фильмы." ) # Запускаем интерфейс iface.launch()