Spaces:
Sleeping
Sleeping
from typing import Annotated | |
import numpy as np | |
from fastapi import APIRouter, Depends, HTTPException | |
from sqlalchemy.orm import Session | |
from common import auth | |
import common.dependencies as DI | |
from components.dbo.chunk_repository import ChunkRepository | |
from components.services.entity import EntityService | |
from schemas.entity import (EntityNeighborsRequest, EntityNeighborsResponse, | |
EntitySearchRequest, EntitySearchResponse, | |
EntitySearchWithTextRequest, | |
EntitySearchWithTextResponse, EntityTextRequest, | |
EntityTextResponse) | |
router = APIRouter(prefix="/entity", tags=["Entity"]) | |
async def search_entities( | |
request: EntitySearchRequest, | |
entity_service: Annotated[EntityService, Depends(DI.get_entity_service)], | |
current_user: Annotated[any, Depends(auth.get_current_user)] | |
) -> EntitySearchResponse: | |
""" | |
Поиск похожих сущностей по векторному сходству (только ID). | |
Args: | |
request: Параметры поиска | |
entity_service: Сервис для работы с сущностями | |
Returns: | |
Результаты поиска (ID и оценки), отсортированные по убыванию сходства | |
""" | |
try: | |
_, scores, ids = entity_service.search_similar( | |
request.query, | |
request.dataset_id, | |
) | |
# Проверяем, что scores и ids - корректные numpy массивы | |
if not isinstance(scores, np.ndarray): | |
scores = np.array(scores) | |
if not isinstance(ids, np.ndarray): | |
ids = np.array(ids) | |
# Сортируем результаты по убыванию оценок | |
# Проверим, что массивы не пустые | |
if len(scores) > 0: | |
# Преобразуем индексы в список, чтобы избежать проблем с индексацией | |
sorted_indices = scores.argsort()[::-1].tolist() | |
sorted_scores = [float(scores[i]) for i in sorted_indices] | |
# Преобразуем все ID в строки | |
sorted_ids = [str(ids[i]) for i in sorted_indices] | |
else: | |
sorted_scores = [] | |
sorted_ids = [] | |
return EntitySearchResponse( | |
scores=sorted_scores, | |
entity_ids=sorted_ids, | |
) | |
except Exception as e: | |
raise HTTPException( | |
status_code=500, | |
detail=f"Error during entity search: {str(e)}" | |
) | |
async def search_entities_with_text( | |
request: EntitySearchWithTextRequest, | |
entity_service: Annotated[EntityService, Depends(DI.get_entity_service)], | |
current_user: Annotated[any, Depends(auth.get_current_user)] | |
) -> EntitySearchWithTextResponse: | |
""" | |
Поиск похожих сущностей по векторному сходству с возвратом текстов. | |
Args: | |
request: Параметры поиска | |
entity_service: Сервис для работы с сущностями | |
Returns: | |
Результаты поиска с текстами чанков, отсортированные по убыванию сходства | |
""" | |
try: | |
# Получаем результаты поиска | |
_, scores, entity_ids = entity_service.search_similar( | |
request.query, | |
request.dataset_id | |
) | |
# Проверяем, что scores и entity_ids - корректные numpy массивы | |
if not isinstance(scores, np.ndarray): | |
scores = np.array(scores) | |
if not isinstance(entity_ids, np.ndarray): | |
entity_ids = np.array(entity_ids) | |
# Сортируем результаты по убыванию оценок | |
# Проверим, что массивы не пустые | |
if len(scores) > 0: | |
# Преобразуем индексы в список, чтобы избежать проблем с индексацией | |
sorted_indices = scores.argsort()[::-1].tolist() | |
sorted_scores = [float(scores[i]) for i in sorted_indices] | |
sorted_ids = [str(entity_ids[i]) for i in sorted_indices] # Преобразуем в строки | |
# Получаем тексты чанков | |
chunks = entity_service.chunk_repository.get_chunks_by_ids(sorted_ids) | |
# Формируем ответ | |
return EntitySearchWithTextResponse( | |
chunks=[ | |
{ | |
"id": str(chunk.id), # Преобразуем UUID в строку | |
"text": chunk.text, | |
"score": score | |
} | |
for chunk, score in zip(chunks, sorted_scores) | |
] | |
) | |
else: | |
return EntitySearchWithTextResponse(chunks=[]) | |
except Exception as e: | |
raise HTTPException( | |
status_code=500, | |
detail=f"Error during entity search with text: {str(e)}" | |
) | |
async def build_entity_text( | |
request: EntityTextRequest, | |
entity_service: Annotated[EntityService, Depends(DI.get_entity_service)], | |
current_user: Annotated[any, Depends(auth.get_current_user)] | |
) -> EntityTextResponse: | |
""" | |
Сборка текста из сущностей. | |
Args: | |
request: Параметры сборки текста | |
entity_service: Сервис для работы с сущностями | |
Returns: | |
Собранный текст | |
""" | |
try: | |
# Получаем объекты LinkerEntity по ID | |
entities = entity_service.chunk_repository.get_chunks_by_ids(request.entities) | |
if not entities: | |
raise HTTPException( | |
status_code=404, | |
detail="No entities found with provided IDs" | |
) | |
# Собираем текст | |
text = entity_service.build_text( | |
entities=entities, | |
chunk_scores=request.chunk_scores, | |
include_tables=request.include_tables, | |
max_documents=request.max_documents, | |
) | |
return EntityTextResponse(text=text) | |
except Exception as e: | |
raise HTTPException( | |
status_code=500, | |
detail=f"Error building entity text: {str(e)}" | |
) | |
async def get_neighboring_chunks( | |
request: EntityNeighborsRequest, | |
entity_service: Annotated[EntityService, Depends(DI.get_entity_service)], | |
current_user: Annotated[any, Depends(auth.get_current_user)] | |
) -> EntityNeighborsResponse: | |
""" | |
Получение соседних чанков для заданных сущностей. | |
Args: | |
request: Параметры запроса соседей | |
entity_service: Сервис для работы с сущностями | |
Returns: | |
Список сущностей с соседями | |
""" | |
try: | |
# Получаем объекты LinkerEntity по ID | |
entities = entity_service.chunk_repository.get_chunks_by_ids(request.entities) | |
if not entities: | |
raise HTTPException( | |
status_code=404, | |
detail="No entities found with provided IDs" | |
) | |
# Получаем соседние чанки | |
entities_with_neighbors = entity_service.add_neighboring_chunks( | |
entities, | |
max_distance=request.max_distance, | |
) | |
# Преобразуем LinkerEntity в строки | |
return EntityNeighborsResponse( | |
entities=[str(entity.id) for entity in entities_with_neighbors] | |
) | |
except Exception as e: | |
raise HTTPException( | |
status_code=500, | |
detail=f"Error getting neighboring chunks: {str(e)}" | |
) | |
async def get_entity_info( | |
dataset_id: int, | |
db: Annotated[Session, Depends(DI.get_db)], | |
current_user: Annotated[any, Depends(auth.get_current_user)] | |
) -> dict: | |
""" | |
Получить информацию о сущностях в датасете. | |
Args: | |
dataset_id: ID датасета | |
db: Сессия базы данных | |
config: Конфигурация приложения | |
Returns: | |
dict: Информация о сущностях | |
""" | |
chunk_repository = ChunkRepository(db) | |
entities, embeddings = chunk_repository.get_searching_entities(dataset_id) | |
if not entities: | |
raise HTTPException(status_code=404, detail=f"No entities found for dataset {dataset_id}") | |
# Собираем статистику | |
stats = { | |
"total_entities": len(entities), | |
"entities_with_embeddings": len([e for e in embeddings if e is not None]), | |
"embedding_shapes": [e.shape if e is not None else None for e in embeddings], | |
"unique_embedding_shapes": set(str(e.shape) if e is not None else None for e in embeddings), | |
"entity_types": set(e.type for e in entities), | |
"entities_per_type": { | |
t: len([e for e in entities if e.type == t]) | |
for t in set(e.type for e in entities) | |
} | |
} | |
# Примеры сущностей | |
examples = [ | |
{ | |
"id": str(e.id), # Преобразуем UUID в строку | |
"name": e.name, | |
"type": e.type, | |
"has_embedding": embeddings[i] is not None, | |
"embedding_shape": str(embeddings[i].shape) if embeddings[i] is not None else None, | |
"text_length": len(e.text), | |
"in_search_text_length": len(e.in_search_text) if e.in_search_text else 0 | |
} | |
for i, e in enumerate(entities[:5]) # Берем только первые 5 для примера | |
] | |
return { | |
"stats": stats, | |
"examples": examples | |
} |