File size: 3,483 Bytes
28ec96b a2682b3 28ec96b c8d57fb a2682b3 c8d57fb a2682b3 c8d57fb a2682b3 c8d57fb a2682b3 c8d57fb a2682b3 c8d57fb |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 |
import datetime
from typing import List, Dict, Any, Optional
import numpy as np
from models.LexRank import degree_centrality_scores
import logging
logger = logging.getLogger(__name__)
class QueryProcessor:
def __init__(self, embedding_model, summarization_model, nlp_model, db_service):
self.embedding_model = embedding_model
self.summarization_model = summarization_model
self.nlp_model = nlp_model
self.db_service = db_service
async def process(
self,
query: str,
topic: Optional[str] = None,
start_date: Optional[str] = None,
end_date: Optional[str] = None
) -> Dict[str, Any]:
try:
# Convert string dates to datetime objects
start_dt = datetime.strptime(start_date, "%Y-%m-%d") if start_date else None
end_dt = datetime.strptime(end_date, "%Y-%m-%d") if end_date else None
# Get query embedding
query_embedding = self.embedding_model.encode(query).tolist()
logger.debug(f"Generated query embedding for: {query[:50]}...")
# Extract entities using the NLP model
entities = self.nlp_model.extract_entities(query) # Changed from direct call to using method
logger.debug(f"Extracted entities: {entities}")
# Semantic search with entities
articles = await self.db_service.semantic_search(
query_embedding=query_embedding,
start_date=start_dt,
end_date=end_dt,
topic=topic,
entities=[ent[0] for ent in entities] # Using just the entity texts
)
if not articles:
logger.info("No articles found matching search criteria")
return {"error": "No articles found matching the criteria"}
# Process results
contents = [article["content"] for article in articles]
sentences = []
for content in contents:
sentences.extend(self.nlp_model.tokenize_sentences(content))
logger.debug(f"Processing {len(sentences)} sentences for summarization")
# Generate summary
if sentences:
embeddings = self.embedding_model.encode(sentences)
similarity_matrix = np.inner(embeddings, embeddings)
centrality_scores = degree_centrality_scores(similarity_matrix, threshold=None)
top_indices = np.argsort(-centrality_scores)[0:10]
key_sentences = [sentences[idx].strip() for idx in top_indices]
combined_text = ' '.join(key_sentences)
summary = self.summarization_model.summarize(combined_text)
logger.debug(f"Generated summary with {len(key_sentences)} key sentences")
else:
key_sentences = []
summary = "No content available for summarization"
logger.warning("No sentences available for summarization")
return {
"summary": summary,
"articles": articles,
"entities": entities # Include extracted entities in response
}
except Exception as e:
logger.error(f"Error in QueryProcessor: {str(e)}", exc_info=True)
return {"error": f"Processing error: {str(e)}"} |