muryshev's picture
update
86c402d
raw
history blame
30.1 kB
#!/usr/bin/env python
"""
Скрипт для поиска по векторизованным документам через API.
Этот скрипт:
1. Считывает все документы из заданной папки с помощью UniversalParser
2. Чанкит каждый документ через Destructurer с fixed_size-стратегией
3. Векторизует поле in_search_text через BGE-модель
4. Поднимает FastAPI с двумя эндпоинтами:
- /search/entities - возвращает найденные сущности списком словарей
- /search/text - возвращает полноценный собранный текст
"""
import logging
import os
from pathlib import Path
from typing import Dict, List, Optional
import numpy as np
import pandas as pd
import torch
import uvicorn
from fastapi import FastAPI, Query
from ntr_fileparser import UniversalParser
from pydantic import BaseModel
from sklearn.metrics.pairwise import cosine_similarity
from transformers import AutoModel, AutoTokenizer
from ntr_text_fragmentation.chunking.specific_strategies.fixed_size_chunking import \
FixedSizeChunkingStrategy
from ntr_text_fragmentation.core.destructurer import Destructurer
from ntr_text_fragmentation.core.entity_repository import \
InMemoryEntityRepository
from ntr_text_fragmentation.core.injection_builder import InjectionBuilder
from ntr_text_fragmentation.models.linker_entity import LinkerEntity
# Константы
DOCS_FOLDER = "../data/docs" # Путь к папке с документами
MODEL_NAME = "BAAI/bge-m3" # Название модели для векторизации
BATCH_SIZE = 16 # Размер батча для векторизации
DEVICE = "cuda:0" if torch.cuda.is_available() else "cpu" # Устройство для вычислений
MAX_ENTITIES = 100 # Максимальное количество возвращаемых сущностей
WORDS_PER_CHUNK = 50 # Количество слов в чанке для fixed_size стратегии
OVERLAP_WORDS = 25 # Количество слов перекрытия для fixed_size стратегии
# Пути к кэшированным файлам
CACHE_DIR = "../data/cache" # Путь к папке с кэшированными данными
ENTITIES_CSV = os.path.join(CACHE_DIR, "entities.csv") # Путь к CSV с сущностями
EMBEDDINGS_NPY = os.path.join(CACHE_DIR, "embeddings.npy") # Путь к массиву эмбеддингов
# Инициализация FastAPI
app = FastAPI(title="Документный поиск API",
description="API для поиска по векторизованным документам")
# Глобальные переменные для хранения данных
entities_df = None
entity_embeddings = None
model = None
tokenizer = None
entity_repository = None
injection_builder = None
class EntityResponse(BaseModel):
"""Модель ответа для сущностей."""
id: str
name: str
text: str
type: str
score: float
doc_name: Optional[str] = None
metadata: Optional[Dict] = None
class TextResponse(BaseModel):
"""Модель ответа для собранного текста."""
text: str
entities_count: int
class TextsResponse(BaseModel):
"""Модель ответа для списка текстов."""
texts: List[str]
entities_count: int
def setup_logging() -> None:
"""Настройка логгирования."""
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
)
def load_documents(folder_path: str) -> Dict:
"""
Загружает все документы из указанной папки.
Args:
folder_path: Путь к папке с документами
Returns:
Словарь {имя_файла: parsed_document}
"""
logging.info(f"Чтение документов из {folder_path}...")
parser = UniversalParser()
documents = {}
# Проверка существования папки
if not os.path.exists(folder_path):
logging.error(f"Папка {folder_path} не существует!")
return {}
for file_path in Path(folder_path).glob("**/*.docx"):
try:
doc_name = file_path.stem
logging.info(f"Обработка документа: {doc_name}")
documents[doc_name] = parser.parse_by_path(str(file_path))
except Exception as e:
logging.error(f"Ошибка при чтении файла {file_path}: {e}")
logging.info(f"Загружено {len(documents)} документов.")
return documents
def process_documents(documents: Dict) -> List[LinkerEntity]:
"""
Обрабатывает документы, применяя fixed_size стратегию чанкинга.
Args:
documents: Словарь с распарсенными документами
Returns:
Список сущностей из всех документов
"""
logging.info("Применение fixed_size стратегии чанкинга ко всем документам...")
all_entities = []
for doc_name, document in documents.items():
try:
# Создаем Destructurer с fixed_size стратегией
destructurer = Destructurer(
document,
strategy_name="fixed_size",
words_per_chunk=WORDS_PER_CHUNK,
overlap_words=OVERLAP_WORDS
)
# Получаем сущности
doc_entities = destructurer.destructure()
# Добавляем имя документа в метаданные всех сущностей
for entity in doc_entities:
if not hasattr(entity, 'metadata') or entity.metadata is None:
entity.metadata = {}
entity.metadata['doc_name'] = doc_name
all_entities.extend(doc_entities)
logging.info(f"Документ {doc_name}: получено {len(doc_entities)} сущностей")
except Exception as e:
logging.error(f"Ошибка при обработке документа {doc_name}: {e}")
logging.info(f"Всего получено {len(all_entities)} сущностей из всех документов")
return all_entities
def entities_to_dataframe(entities: List[LinkerEntity]) -> pd.DataFrame:
"""
Преобразует список сущностей в DataFrame для удобной работы.
Args:
entities: Список сущностей
Returns:
DataFrame с данными сущностей
"""
data = []
for entity in entities:
# Получаем имя документа из метаданных
doc_name = entity.metadata.get('doc_name', '') if hasattr(entity, 'metadata') and entity.metadata else ''
# Базовые поля для всех типов сущностей
entity_dict = {
"id": str(entity.id),
"type": entity.type,
"name": entity.name,
"text": entity.text,
"in_search_text": entity.in_search_text,
"doc_name": doc_name,
"source_id": entity.source_id if hasattr(entity, 'source_id') else None,
"target_id": entity.target_id if hasattr(entity, 'target_id') else None,
"metadata": entity.metadata if hasattr(entity, 'metadata') else {},
}
data.append(entity_dict)
df = pd.DataFrame(data)
return df
def setup_model_and_tokenizer():
"""
Инициализирует модель и токенизатор для векторизации.
Returns:
Кортеж (модель, токенизатор)
"""
global model, tokenizer
logging.info(f"Загрузка модели {MODEL_NAME} на устройство {DEVICE}...")
tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME)
model = AutoModel.from_pretrained(MODEL_NAME).to(DEVICE)
model.eval()
return model, tokenizer
def _average_pool(
last_hidden_states: torch.Tensor,
attention_mask: torch.Tensor
) -> torch.Tensor:
"""
Расчёт усредненного эмбеддинга по всем токенам
Args:
last_hidden_states: Матрица эмбеддингов отдельных токенов
attention_mask: Маска, чтобы не учитывать при усреднении пустые токены
Returns:
Усредненный эмбеддинг
"""
last_hidden = last_hidden_states.masked_fill(
~attention_mask[..., None].bool(), 0.0
)
return last_hidden.sum(dim=1) / attention_mask.sum(dim=1)[..., None]
def get_embeddings(texts: List[str]) -> np.ndarray:
"""
Получает эмбеддинги для списка текстов.
Args:
texts: Список текстов для векторизации
Returns:
Массив эмбеддингов
"""
global model, tokenizer
# Проверяем, что модель и токенизатор инициализированы
if model is None or tokenizer is None:
model, tokenizer = setup_model_and_tokenizer()
all_embeddings = []
for i in range(0, len(texts), BATCH_SIZE):
batch_texts = texts[i:i+BATCH_SIZE]
# Фильтруем None и пустые строки
batch_texts = [text for text in batch_texts if text]
if not batch_texts:
continue
# Токенизация с обрезкой и padding
encoding = tokenizer(
batch_texts,
padding=True,
truncation=True,
max_length=512,
return_tensors="pt"
).to(DEVICE)
# Получаем эмбеддинги с average pooling
with torch.no_grad():
outputs = model(**encoding)
embeddings = _average_pool(outputs.last_hidden_state, encoding["attention_mask"])
all_embeddings.append(embeddings.cpu().numpy())
if not all_embeddings:
return np.array([])
return np.vstack(all_embeddings)
def init_entity_repository_and_builder(entities: List[LinkerEntity]):
"""
Инициализирует хранилище сущностей и сборщик инъекций.
Args:
entities: Список сущностей
"""
global entity_repository, injection_builder
# Создаем хранилище сущностей
entity_repository = InMemoryEntityRepository(entities)
# Добавляем метод get_entity_by_id в InMemoryEntityRepository
# Это временное решение, в идеале нужно добавить этот метод в сам класс
def get_entity_by_id(self, entity_id):
"""Получает сущность по ID"""
for entity in self.entities:
if str(entity.id) == entity_id:
return entity
return None
# Добавляем метод в класс
InMemoryEntityRepository.get_entity_by_id = get_entity_by_id
# Создаем сборщик инъекций
injection_builder = InjectionBuilder(repository=entity_repository)
# Регистрируем стратегию
injection_builder.register_strategy("fixed_size", FixedSizeChunkingStrategy)
def search_entities(query: str, top_n: int = MAX_ENTITIES) -> List[Dict]:
"""
Ищет сущности по запросу на основе косинусной близости.
Args:
query: Поисковый запрос
top_n: Максимальное количество возвращаемых сущностей
Returns:
Список найденных сущностей с их скорами
"""
global entities_df, entity_embeddings
# Проверяем наличие данных
if entities_df is None or entity_embeddings is None:
logging.error("Данные не инициализированы. Запустите сначала prepare_data().")
return []
# Векторизуем запрос
query_embedding = get_embeddings([query])
if query_embedding.size == 0:
return []
# Считаем косинусную близость
similarities = cosine_similarity(query_embedding, entity_embeddings)[0]
# Получаем индексы топ-N сущностей
top_indices = np.argsort(similarities)[-top_n:][::-1]
# Фильтруем сущности, которые используются для поиска
search_df = entities_df.copy()
search_df = search_df[search_df['in_search_text'].notna()]
# Если после фильтрации нет данных, возвращаем пустой список
if search_df.empty:
return []
# Получаем топ-N сущностей
results = []
for idx in top_indices:
if idx >= len(search_df):
continue
entity = search_df.iloc[idx]
similarity = similarities[idx]
# Создаем результат
result = {
"id": entity["id"],
"name": entity["name"],
"text": entity["text"],
"type": entity["type"],
"score": float(similarity),
"doc_name": entity["doc_name"],
"metadata": entity["metadata"]
}
results.append(result)
return results
@app.get("/search/entities", response_model=List[EntityResponse])
async def api_search_entities(
query: str = Query(..., description="Поисковый запрос"),
limit: int = Query(MAX_ENTITIES, description="Максимальное количество результатов")
):
"""
Эндпоинт для поиска сущностей по запросу.
Args:
query: Поисковый запрос
limit: Максимальное количество результатов
Returns:
Список найденных сущностей
"""
results = search_entities(query, limit)
return results
@app.get("/search/text", response_model=TextResponse)
async def api_search_text(
query: str = Query(..., description="Поисковый запрос"),
limit: int = Query(MAX_ENTITIES, description="Максимальное количество учитываемых сущностей")
):
"""
Эндпоинт для поиска и сборки полного текста по запросу.
Args:
query: Поисковый запрос
limit: Максимальное количество учитываемых сущностей
Returns:
Собранный текст и количество использованных сущностей
"""
global injection_builder
# Проверяем наличие сборщика инъекций
if injection_builder is None:
logging.error("Сборщик инъекций не инициализирован.")
return {"text": "", "entities_count": 0}
# Получаем найденные сущности
entity_results = search_entities(query, limit)
if not entity_results:
return {"text": "", "entities_count": 0}
# Получаем список ID сущностей
entity_ids = [str(result["id"]) for result in entity_results]
# Собираем текст, используя напрямую ID
try:
assembled_text = injection_builder.build(entity_ids)
print('Всё ок прошло вроде бы')
return {"text": assembled_text, "entities_count": len(entity_ids)}
except ImportError as e:
# Обработка ошибки импорта модулей для работы с изображениями
logging.error(f"Ошибка импорта при сборке текста: {e}")
# Альтернативная сборка текста без использования injection_builder
simple_text = "\n\n".join([result["text"] for result in entity_results if result.get("text")])
return {"text": simple_text, "entities_count": len(entity_ids)}
except Exception as e:
logging.error(f"Ошибка при сборке текста: {e}")
return {"text": "", "entities_count": 0}
@app.get("/search/texts", response_model=TextsResponse)
async def api_search_texts(
query: str = Query(..., description="Поисковый запрос"),
limit: int = Query(MAX_ENTITIES, description="Максимальное количество результатов")
):
"""
Эндпоинт для поиска списка текстов сущностей по запросу.
Args:
query: Поисковый запрос
limit: Максимальное количество результатов
Returns:
Список текстов найденных сущностей и их количество
"""
# Получаем найденные сущности
entity_results = search_entities(query, limit)
if not entity_results:
return {"texts": [], "entities_count": 0}
# Извлекаем тексты из результатов
texts = [result["text"] for result in entity_results if result.get("text")]
return {"texts": texts, "entities_count": len(texts)}
@app.get("/search/text_test", response_model=TextResponse)
async def api_search_text_test(
query: str = Query(..., description="Поисковый запрос"),
limit: int = Query(MAX_ENTITIES, description="Максимальное количество учитываемых сущностей")
):
"""
Тестовый эндпоинт для поиска и сборки текста с использованием подхода из test_chunking_visualization.py.
Args:
query: Поисковый запрос
limit: Максимальное количество учитываемых сущностей
Returns:
Собранный текст и количество использованных сущностей
"""
global entity_repository, injection_builder
# Проверяем наличие репозитория и сборщика инъекций
if entity_repository is None or injection_builder is None:
logging.error("Репозиторий или сборщик инъекций не инициализированы.")
return {"text": "", "entities_count": 0}
# Получаем найденные сущности
entity_results = search_entities(query, limit)
if not entity_results:
return {"text": "", "entities_count": 0}
try:
# Получаем объекты сущностей из репозитория по ID
entity_ids = [result["id"] for result in entity_results]
entities = []
for entity_id in entity_ids:
entity = entity_repository.get_entity_by_id(entity_id)
if entity:
entities.append(entity)
logging.info(f"Найдено {len(entities)} объектов сущностей по ID")
if not entities:
logging.error("Не удалось найти сущности в репозитории")
# Собираем простой текст из результатов поиска
simple_text = "\n\n".join([result["text"] for result in entity_results if result.get("text")])
return {"text": simple_text, "entities_count": len(entity_results)}
# Собираем текст, как в test_chunking_visualization.py
assembled_text = injection_builder.build(entities) # Передаем сами объекты
return {"text": assembled_text, "entities_count": len(entities)}
except Exception as e:
logging.error(f"Ошибка при сборке текста: {e}", exc_info=True)
# Запасной вариант - просто соединяем тексты
fallback_text = "\n\n".join([result["text"] for result in entity_results if result.get("text")])
return {"text": fallback_text, "entities_count": len(entity_results)}
def save_entities_to_csv(entities: List[LinkerEntity], csv_path: str) -> None:
"""
Сохраняет сущности в CSV файл.
Args:
entities: Список сущностей
csv_path: Путь для сохранения CSV файла
"""
logging.info(f"Сохранение {len(entities)} сущностей в {csv_path}")
# Создаем директорию, если она не существует
os.makedirs(os.path.dirname(csv_path), exist_ok=True)
# Преобразуем сущности в DataFrame и сохраняем
df = entities_to_dataframe(entities)
df.to_csv(csv_path, index=False)
logging.info(f"Сохранено {len(entities)} сущностей в {csv_path}")
def load_entities_from_csv(csv_path: str) -> List[LinkerEntity]:
"""
Загружает сущности из CSV файла.
Args:
csv_path: Путь к CSV файлу
Returns:
Список сущностей
"""
logging.info(f"Загрузка сущностей из {csv_path}")
if not os.path.exists(csv_path):
logging.error(f"Файл {csv_path} не найден")
return []
df = pd.read_csv(csv_path)
entities = []
for _, row in df.iterrows():
# Обработка метаданных
metadata = row.get("metadata", {})
if isinstance(metadata, str):
try:
metadata = eval(metadata) if metadata and not pd.isna(metadata) else {}
except:
metadata = {}
# Общие поля для всех типов сущностей
common_args = {
"id": row["id"],
"name": row["name"] if not pd.isna(row.get("name", "")) else "",
"text": row["text"] if not pd.isna(row.get("text", "")) else "",
"metadata": metadata,
"type": row["type"],
}
# Добавляем in_search_text, если он есть
if "in_search_text" in row and not pd.isna(row["in_search_text"]):
common_args["in_search_text"] = row["in_search_text"]
# Добавляем поля связи, если они есть
if "source_id" in row and not pd.isna(row["source_id"]):
common_args["source_id"] = row["source_id"]
common_args["target_id"] = row["target_id"]
if "number_in_relation" in row and not pd.isna(row["number_in_relation"]):
common_args["number_in_relation"] = int(row["number_in_relation"])
entity = LinkerEntity(**common_args)
entities.append(entity)
logging.info(f"Загружено {len(entities)} сущностей из {csv_path}")
return entities
def save_embeddings(embeddings: np.ndarray, file_path: str) -> None:
"""
Сохраняет эмбеддинги в numpy файл.
Args:
embeddings: Массив эмбеддингов
file_path: Путь для сохранения файла
"""
logging.info(f"Сохранение эмбеддингов размером {embeddings.shape} в {file_path}")
# Создаем директорию, если она не существует
os.makedirs(os.path.dirname(file_path), exist_ok=True)
# Сохраняем эмбеддинги
np.save(file_path, embeddings)
logging.info(f"Эмбеддинги сохранены в {file_path}")
def load_embeddings(file_path: str) -> np.ndarray:
"""
Загружает эмбеддинги из numpy файла.
Args:
file_path: Путь к файлу
Returns:
Массив эмбеддингов
"""
logging.info(f"Загрузка эмбеддингов из {file_path}")
if not os.path.exists(file_path):
logging.error(f"Файл {file_path} не найден")
return np.array([])
embeddings = np.load(file_path)
logging.info(f"Загружены эмбеддинги размером {embeddings.shape}")
return embeddings
def prepare_data():
"""
Подготавливает все необходимые данные для API.
"""
global entities_df, entity_embeddings, entity_repository, injection_builder
# Проверяем наличие кэшированных данных
cache_exists = os.path.exists(ENTITIES_CSV) and os.path.exists(EMBEDDINGS_NPY)
if cache_exists:
logging.info("Найдены кэшированные данные, загружаем их")
# Загружаем сущности из CSV
entities = load_entities_from_csv(ENTITIES_CSV)
if not entities:
logging.error("Не удалось загрузить сущности из кэша, генерируем заново")
cache_exists = False
else:
# Преобразуем сущности в DataFrame
entities_df = entities_to_dataframe(entities)
# Загружаем эмбеддинги
entity_embeddings = load_embeddings(EMBEDDINGS_NPY)
if entity_embeddings.size == 0:
logging.error("Не удалось загрузить эмбеддинги из кэша, генерируем заново")
cache_exists = False
else:
# Инициализируем хранилище и сборщик
init_entity_repository_and_builder(entities)
logging.info("Данные успешно загружены из кэша")
# Если кэшированных данных нет или их не удалось загрузить, генерируем заново
if not cache_exists:
logging.info("Кэшированные данные не найдены или не могут быть загружены, обрабатываем документы")
# Загружаем и обрабатываем документы
documents = load_documents(DOCS_FOLDER)
if not documents:
logging.error(f"Не найдено документов в папке {DOCS_FOLDER}")
return
# Получаем сущности из всех документов
all_entities = process_documents(documents)
if not all_entities:
logging.error("Не получено сущностей из документов")
return
# Преобразуем сущности в DataFrame
entities_df = entities_to_dataframe(all_entities)
# Инициализируем хранилище и сборщик
init_entity_repository_and_builder(all_entities)
# Фильтруем только сущности для поиска
search_df = entities_df[entities_df['in_search_text'].notna()]
if search_df.empty:
logging.error("Нет сущностей для поиска с in_search_text")
return
# Векторизуем тексты сущностей
search_texts = search_df['in_search_text'].tolist()
entity_embeddings = get_embeddings(search_texts)
logging.info(f"Подготовлено {len(search_df)} сущностей для поиска")
logging.info(f"Размер эмбеддингов: {entity_embeddings.shape}")
# Сохраняем данные в кэш для последующего использования
save_entities_to_csv(all_entities, ENTITIES_CSV)
save_embeddings(entity_embeddings, EMBEDDINGS_NPY)
logging.info("Данные сохранены в кэш для последующего использования")
# Вывод итоговой информации (независимо от источника данных)
logging.info(f"Подготовка данных завершена. Готово к использованию {entity_embeddings.shape[0]} сущностей")
@app.on_event("startup")
async def startup_event():
"""Запускается при старте приложения."""
setup_logging()
prepare_data()
def main():
"""Основная функция для запуска скрипта вручную."""
setup_logging()
prepare_data()
# Запуск Uvicorn сервера
uvicorn.run(app, host="0.0.0.0", port=8017)
if __name__ == "__main__":
main()