import datetime from typing import List, Dict, Any, Optional import numpy as np from models.LexRank import degree_centrality_scores import logging logger = logging.getLogger(__name__) class QueryProcessor: def __init__(self, embedding_model, summarization_model, nlp_model, db_service): self.embedding_model = embedding_model self.summarization_model = summarization_model self.nlp_model = nlp_model self.db_service = db_service async def process( self, query: str, topic: Optional[str] = None, start_date: Optional[str] = None, end_date: Optional[str] = None ) -> Dict[str, Any]: try: # Convert string dates to datetime objects start_dt = datetime.strptime(start_date, "%Y-%m-%d") if start_date else None end_dt = datetime.strptime(end_date, "%Y-%m-%d") if end_date else None # Get query embedding query_embedding = self.embedding_model.encode(query).tolist() logger.debug(f"Generated query embedding for: {query[:50]}...") # Extract entities using the NLP model entities = self.nlp_model.extract_entities(query) # Changed from direct call to using method logger.debug(f"Extracted entities: {entities}") # Semantic search with entities articles = await self.db_service.semantic_search( query_embedding=query_embedding, start_date=start_dt, end_date=end_dt, topic=topic, entities=[ent[0] for ent in entities] # Using just the entity texts ) if not articles: logger.info("No articles found matching search criteria") return {"error": "No articles found matching the criteria"} # Process results contents = [article["content"] for article in articles] sentences = [] for content in contents: sentences.extend(self.nlp_model.tokenize_sentences(content)) logger.debug(f"Processing {len(sentences)} sentences for summarization") # Generate summary if sentences: embeddings = self.embedding_model.encode(sentences) similarity_matrix = np.inner(embeddings, embeddings) centrality_scores = degree_centrality_scores(similarity_matrix, threshold=None) top_indices = np.argsort(-centrality_scores)[0:10] key_sentences = [sentences[idx].strip() for idx in top_indices] combined_text = ' '.join(key_sentences) summary = self.summarization_model.summarize(combined_text) logger.debug(f"Generated summary with {len(key_sentences)} key sentences") else: key_sentences = [] summary = "No content available for summarization" logger.warning("No sentences available for summarization") return { "summary": summary, "articles": articles, "entities": entities # Include extracted entities in response } except Exception as e: logger.error(f"Error in QueryProcessor: {str(e)}", exc_info=True) return {"error": f"Processing error: {str(e)}"}