import logging from typing import Dict, Any, List, Optional from textblob import TextBlob from transformers import pipeline import numpy as np logger = logging.getLogger(__name__) class SentimentAnalyzer: def __init__(self, use_ai: bool = True, model_registry: Optional[Any] = None): """ Initialize sentiment analyzer with both traditional and LLM-based approaches. Args: use_ai: Boolean indicating whether to use AI-powered analysis (True) or traditional analysis (False) model_registry: Optional shared model registry for better performance """ self.use_ai = use_ai self.llm_available = False self.model_registry = model_registry self.toxicity_available = False # Traditional manipulation patterns self.manipulative_patterns = [ "experts say", "sources claim", "many believe", "some say", "everyone knows", "clearly", "obviously", "without doubt", "certainly" ] if use_ai: try: if model_registry and model_registry.is_available: # Use shared models self.sentiment_pipeline = model_registry.sentiment self.zero_shot = model_registry.zero_shot self.toxicity_pipeline = getattr(model_registry, 'toxicity', None) self.toxicity_available = self.toxicity_pipeline is not None self.llm_available = True logger.info("Using shared model pipelines for sentiment analysis") if self.toxicity_available: logger.info("Toxicity analysis enabled") else: logger.info("Toxicity analysis not available") else: # Initialize own pipelines self.sentiment_pipeline = pipeline( "text-classification", model="SamLowe/roberta-base-go_emotions", device=-1, batch_size=16 ) self.zero_shot = pipeline( "zero-shot-classification", model="facebook/bart-large-mnli", device=-1, batch_size=8 ) try: self.toxicity_pipeline = pipeline( "text-classification", model="unitary/toxic-bert", device=-1, batch_size=16 ) self.toxicity_available = True logger.info("Toxicity analysis enabled") except Exception as tox_error: logger.warning(f"Toxicity pipeline initialization failed: {str(tox_error)}") self.toxicity_available = False self.llm_available = True logger.info("Initialized dedicated model pipelines for sentiment analysis") except Exception as e: logger.warning(f"Failed to initialize LLM pipelines: {str(e)}") self.llm_available = False else: logger.info("Initializing sentiment analyzer in traditional mode") def _analyze_with_llm(self, text: str) -> Dict[str, Any]: """Perform sentiment analysis using LLM models.""" try: logger.info("Starting LLM sentiment analysis") # Clean the text of formatting markers cleaned_text = text.replace('$!/$', '').replace('##', '').replace('#', '') cleaned_text = '\n'.join(line for line in cleaned_text.split('\n') if not line.startswith('[') and not line.startswith('More on')) logger.info("Text cleaned and prepared for analysis") # Split text into chunks of 512 tokens (approximate) chunks = [cleaned_text[i:i+2000] for i in range(0, len(cleaned_text), 2000)] logger.info(f"Text split into {len(chunks)} chunks for processing") # Initialize aggregation variables sentiment_scores = [] toxicity_scores = [] manipulation_scores = [] flagged_phrases = [] manipulation_categories = [ "emotional manipulation", "fear mongering", "propaganda", "factual reporting", "balanced perspective" ] # Process each chunk for i, chunk in enumerate(chunks, 1): logger.info(f"Processing chunk {i}/{len(chunks)}") try: # Get emotion scores with detailed logging logger.debug(f"Analyzing emotions for chunk {i}") emotions = self.sentiment_pipeline(chunk) logger.debug(f"Raw emotion response: {emotions}") # Handle different response formats if isinstance(emotions, list): # Multiple results format for emotion in emotions: if isinstance(emotion, dict) and 'label' in emotion and 'score' in emotion: sentiment_scores.append(emotion) elif isinstance(emotions, dict) and 'label' in emotions and 'score' in emotions: # Single result format sentiment_scores.append(emotions) logger.debug(f"Processed emotion scores: {sentiment_scores}") # Get toxicity scores if available if self.toxicity_available: logger.debug(f"Analyzing toxicity for chunk {i}") try: toxicity = self.toxicity_pipeline(chunk) if isinstance(toxicity, list): toxicity_scores.extend(toxicity) else: toxicity_scores.append(toxicity) logger.debug(f"Processed toxicity scores: {toxicity_scores}") except Exception as tox_error: logger.warning(f"Toxicity analysis failed for chunk {i}: {str(tox_error)}") # Get manipulation scores logger.debug(f"Analyzing manipulation for chunk {i}") manipulation = self.zero_shot( chunk, manipulation_categories, multi_label=True ) if isinstance(manipulation, dict) and 'labels' in manipulation and 'scores' in manipulation: manipulation_scores.append({ label: score for label, score in zip(manipulation['labels'], manipulation['scores']) }) logger.debug(f"Processed manipulation scores: {manipulation_scores}") # Analyze sentences for manipulation sentences = chunk.split('.') for sentence in sentences: if len(sentence.strip()) > 10: sent_result = self.zero_shot( sentence.strip(), manipulation_categories, multi_label=False ) if (sent_result['labels'][0] in ["emotional manipulation", "fear mongering", "propaganda"] and sent_result['scores'][0] > 0.7): flagged_phrases.append({ 'text': sentence.strip(), 'type': sent_result['labels'][0], 'score': sent_result['scores'][0] }) except Exception as chunk_error: logger.error(f"Error processing chunk {i}: {str(chunk_error)}") continue logger.info("All chunks processed, aggregating scores") # Aggregate scores with error handling def aggregate_scores(scores_list, score_type: str): try: if not scores_list: logger.warning(f"No {score_type} scores to aggregate") return {} all_scores = {} for scores in scores_list: if isinstance(scores, dict): if 'label' in scores and 'score' in scores: label = scores['label'] score = scores['score'] else: # Handle direct label-score mapping for label, score in scores.items(): if label not in all_scores: all_scores[label] = [] if isinstance(score, (int, float)): all_scores[label].append(score) continue else: logger.warning(f"Unexpected score format in {score_type}: {scores}") continue if isinstance(label, (str, bytes)): if label not in all_scores: all_scores[label] = [] if isinstance(score, (int, float)): all_scores[label].append(score) return {k: float(np.mean(v)) for k, v in all_scores.items() if v} except Exception as agg_error: logger.error(f"Error aggregating {score_type} scores: {str(agg_error)}") return {} emotion_scores = aggregate_scores(sentiment_scores, "emotion") toxicity_scores = aggregate_scores(toxicity_scores, "toxicity") if self.toxicity_available else {} logger.debug(f"Aggregated emotion scores: {emotion_scores}") logger.debug(f"Aggregated toxicity scores: {toxicity_scores}") # Aggregate manipulation scores manipulation_agg = { category: float(np.mean([ scores.get(category, 0) for scores in manipulation_scores ])) for category in manipulation_categories if manipulation_scores # Only process if we have scores } logger.debug(f"Aggregated manipulation scores: {manipulation_agg}") # Calculate manipulation score based on multiple factors manipulation_indicators = { 'emotional manipulation': 0.4, 'fear mongering': 0.3, 'propaganda': 0.3 } if self.toxicity_available: manipulation_indicators.update({ 'toxic': 0.2, 'severe_toxic': 0.3, 'threat': 0.2 }) # Combine toxicity and manipulation scores combined_scores = {**toxicity_scores, **manipulation_agg} # Calculate manipulation score with fallback if combined_scores: manipulation_score = min(100, sum( combined_scores.get(k, 0) * weight for k, weight in manipulation_indicators.items() ) * 100) else: # Fallback to traditional analysis if no scores available manipulation_score = len(self._detect_manipulative_phrases(text)) * 10 logger.info(f"Final manipulation score: {manipulation_score}") # Determine overall sentiment positive_emotions = ['admiration', 'joy', 'amusement', 'approval'] negative_emotions = ['disgust', 'anger', 'disappointment', 'fear'] neutral_emotions = ['neutral', 'confusion', 'realization'] pos_score = sum(emotion_scores.get(emotion, 0) for emotion in positive_emotions) neg_score = sum(emotion_scores.get(emotion, 0) for emotion in negative_emotions) neu_score = sum(emotion_scores.get(emotion, 0) for emotion in neutral_emotions) logger.debug(f"Sentiment scores - Positive: {pos_score}, Negative: {neg_score}, Neutral: {neu_score}") # Determine sentiment based on highest score max_score = max(pos_score, neg_score, neu_score) if max_score == pos_score and pos_score > 0.3: sentiment = "Positive" elif max_score == neg_score and neg_score > 0.3: sentiment = "Negative" else: sentiment = "Neutral" logger.info(f"Final sentiment determination: {sentiment}") # Sort and limit flagged phrases by manipulation score sorted_phrases = sorted(flagged_phrases, key=lambda x: x['score'], reverse=True) unique_phrases = [] seen = set() for phrase in sorted_phrases: clean_text = phrase['text'].strip() if clean_text not in seen: unique_phrases.append(clean_text) seen.add(clean_text) if len(unique_phrases) >= 5: break logger.info("LLM analysis completed successfully") return { "sentiment": sentiment, "manipulation_score": round(manipulation_score, 1), "flagged_phrases": unique_phrases, "detailed_scores": { "emotions": emotion_scores, "manipulation": manipulation_agg, "toxicity": toxicity_scores } } except Exception as e: logger.error(f"LLM analysis failed: {str(e)}", exc_info=True) return None def analyze(self, text: str) -> Dict[str, Any]: """ Analyze sentiment using LLM with fallback to traditional methods. Args: text: The text to analyze Returns: Dict containing sentiment analysis results """ try: # Try LLM analysis if enabled and available if self.use_ai and self.llm_available: llm_result = self._analyze_with_llm(text) if llm_result: return llm_result # Use traditional analysis logger.info("Using traditional sentiment analysis") blob = TextBlob(text) sentiment_score = blob.sentiment.polarity manipulative_phrases = self._detect_manipulative_phrases(text) manipulation_score = len(manipulative_phrases) * 10 if sentiment_score > 0.2: sentiment = "Positive" elif sentiment_score < -0.2: sentiment = "Negative" else: sentiment = "Neutral" return { "sentiment": sentiment, "manipulation_score": min(manipulation_score, 100), "flagged_phrases": manipulative_phrases[:5] # Limit to top 5 phrases } except Exception as e: logger.error(f"Error in sentiment analysis: {str(e)}") return { "sentiment": "Error", "manipulation_score": 0, "flagged_phrases": [] } def _detect_manipulative_phrases(self, text: str) -> List[str]: """Detect potentially manipulative phrases.""" found_phrases = [] text_lower = text.lower() for pattern in self.manipulative_patterns: if pattern in text_lower: start = text_lower.find(pattern) context = text[max(0, start-20):min(len(text), start+len(pattern)+20)] found_phrases.append(context.strip()) return found_phrases