|
import datetime |
|
from typing import List, Dict, Any, Optional |
|
import numpy as np |
|
from models.LexRank import degree_centrality_scores |
|
import logging |
|
|
|
logger = logging.getLogger(__name__) |
|
|
|
class QueryProcessor: |
|
def __init__(self, embedding_model, summarization_model, nlp_model, db_service): |
|
self.embedding_model = embedding_model |
|
self.summarization_model = summarization_model |
|
self.nlp_model = nlp_model |
|
self.db_service = db_service |
|
|
|
async def process( |
|
self, |
|
query: str, |
|
topic: Optional[str] = None, |
|
start_date: Optional[str] = None, |
|
end_date: Optional[str] = None |
|
) -> Dict[str, Any]: |
|
try: |
|
|
|
start_dt = datetime.strptime(start_date, "%Y-%m-%d") if start_date else None |
|
end_dt = datetime.strptime(end_date, "%Y-%m-%d") if end_date else None |
|
|
|
|
|
query_embedding = self.embedding_model.encode(query).tolist() |
|
logger.debug(f"Generated query embedding for: {query[:50]}...") |
|
|
|
|
|
entities = self.nlp_model.extract_entities(query) |
|
logger.debug(f"Extracted entities: {entities}") |
|
|
|
|
|
articles = await self.db_service.semantic_search( |
|
query_embedding=query_embedding, |
|
start_date=start_dt, |
|
end_date=end_dt, |
|
topic=topic, |
|
entities=[ent[0] for ent in entities] |
|
) |
|
|
|
if not articles: |
|
logger.info("No articles found matching search criteria") |
|
return {"error": "No articles found matching the criteria"} |
|
|
|
|
|
contents = [article["content"] for article in articles] |
|
sentences = [] |
|
for content in contents: |
|
sentences.extend(self.nlp_model.tokenize_sentences(content)) |
|
|
|
logger.debug(f"Processing {len(sentences)} sentences for summarization") |
|
|
|
|
|
if sentences: |
|
embeddings = self.embedding_model.encode(sentences) |
|
similarity_matrix = np.inner(embeddings, embeddings) |
|
centrality_scores = degree_centrality_scores(similarity_matrix, threshold=None) |
|
|
|
top_indices = np.argsort(-centrality_scores)[0:10] |
|
key_sentences = [sentences[idx].strip() for idx in top_indices] |
|
combined_text = ' '.join(key_sentences) |
|
|
|
summary = self.summarization_model.summarize(combined_text) |
|
logger.debug(f"Generated summary with {len(key_sentences)} key sentences") |
|
else: |
|
key_sentences = [] |
|
summary = "No content available for summarization" |
|
logger.warning("No sentences available for summarization") |
|
|
|
return { |
|
"summary": summary, |
|
"articles": articles, |
|
"entities": entities |
|
} |
|
|
|
except Exception as e: |
|
logger.error(f"Error in QueryProcessor: {str(e)}", exc_info=True) |
|
return {"error": f"Processing error: {str(e)}"} |