wozwize's picture
increasing performance on AI-mode by implementing singletons
a2624a3
raw
history blame
17.1 kB
import logging
from typing import Dict, Any, List, Optional
from textblob import TextBlob
from transformers import pipeline
import numpy as np
logger = logging.getLogger(__name__)
class SentimentAnalyzer:
def __init__(self, use_ai: bool = True, model_registry: Optional[Any] = None):
"""
Initialize sentiment analyzer with both traditional and LLM-based approaches.
Args:
use_ai: Boolean indicating whether to use AI-powered analysis (True) or traditional analysis (False)
model_registry: Optional shared model registry for better performance
"""
self.use_ai = use_ai
self.llm_available = False
self.model_registry = model_registry
self.toxicity_available = False
# Traditional manipulation patterns
self.manipulative_patterns = [
"experts say",
"sources claim",
"many believe",
"some say",
"everyone knows",
"clearly",
"obviously",
"without doubt",
"certainly"
]
if use_ai:
try:
if model_registry and model_registry.is_available:
# Use shared models
self.sentiment_pipeline = model_registry.sentiment
self.zero_shot = model_registry.zero_shot
self.toxicity_pipeline = getattr(model_registry, 'toxicity', None)
self.toxicity_available = self.toxicity_pipeline is not None
self.llm_available = True
logger.info("Using shared model pipelines for sentiment analysis")
if self.toxicity_available:
logger.info("Toxicity analysis enabled")
else:
logger.info("Toxicity analysis not available")
else:
# Initialize own pipelines
self.sentiment_pipeline = pipeline(
"text-classification",
model="SamLowe/roberta-base-go_emotions",
device=-1,
batch_size=16
)
self.zero_shot = pipeline(
"zero-shot-classification",
model="facebook/bart-large-mnli",
device=-1,
batch_size=8
)
try:
self.toxicity_pipeline = pipeline(
"text-classification",
model="unitary/toxic-bert",
device=-1,
batch_size=16
)
self.toxicity_available = True
logger.info("Toxicity analysis enabled")
except Exception as tox_error:
logger.warning(f"Toxicity pipeline initialization failed: {str(tox_error)}")
self.toxicity_available = False
self.llm_available = True
logger.info("Initialized dedicated model pipelines for sentiment analysis")
except Exception as e:
logger.warning(f"Failed to initialize LLM pipelines: {str(e)}")
self.llm_available = False
else:
logger.info("Initializing sentiment analyzer in traditional mode")
def _analyze_with_llm(self, text: str) -> Dict[str, Any]:
"""Perform sentiment analysis using LLM models."""
try:
logger.info("Starting LLM sentiment analysis")
# Clean the text of formatting markers
cleaned_text = text.replace('$!/$', '').replace('##', '').replace('#', '')
cleaned_text = '\n'.join(line for line in cleaned_text.split('\n')
if not line.startswith('[') and not line.startswith('More on'))
logger.info("Text cleaned and prepared for analysis")
# Split text into chunks of 512 tokens (approximate)
chunks = [cleaned_text[i:i+2000] for i in range(0, len(cleaned_text), 2000)]
logger.info(f"Text split into {len(chunks)} chunks for processing")
# Initialize aggregation variables
sentiment_scores = []
toxicity_scores = []
manipulation_scores = []
flagged_phrases = []
manipulation_categories = [
"emotional manipulation",
"fear mongering",
"propaganda",
"factual reporting",
"balanced perspective"
]
# Process each chunk
for i, chunk in enumerate(chunks, 1):
logger.info(f"Processing chunk {i}/{len(chunks)}")
try:
# Get emotion scores with detailed logging
logger.debug(f"Analyzing emotions for chunk {i}")
emotions = self.sentiment_pipeline(chunk)
logger.debug(f"Raw emotion response: {emotions}")
# Handle different response formats
if isinstance(emotions, list):
# Multiple results format
for emotion in emotions:
if isinstance(emotion, dict) and 'label' in emotion and 'score' in emotion:
sentiment_scores.append(emotion)
elif isinstance(emotions, dict) and 'label' in emotions and 'score' in emotions:
# Single result format
sentiment_scores.append(emotions)
logger.debug(f"Processed emotion scores: {sentiment_scores}")
# Get toxicity scores if available
if self.toxicity_available:
logger.debug(f"Analyzing toxicity for chunk {i}")
try:
toxicity = self.toxicity_pipeline(chunk)
if isinstance(toxicity, list):
toxicity_scores.extend(toxicity)
else:
toxicity_scores.append(toxicity)
logger.debug(f"Processed toxicity scores: {toxicity_scores}")
except Exception as tox_error:
logger.warning(f"Toxicity analysis failed for chunk {i}: {str(tox_error)}")
# Get manipulation scores
logger.debug(f"Analyzing manipulation for chunk {i}")
manipulation = self.zero_shot(
chunk,
manipulation_categories,
multi_label=True
)
if isinstance(manipulation, dict) and 'labels' in manipulation and 'scores' in manipulation:
manipulation_scores.append({
label: score
for label, score in zip(manipulation['labels'], manipulation['scores'])
})
logger.debug(f"Processed manipulation scores: {manipulation_scores}")
# Analyze sentences for manipulation
sentences = chunk.split('.')
for sentence in sentences:
if len(sentence.strip()) > 10:
sent_result = self.zero_shot(
sentence.strip(),
manipulation_categories,
multi_label=False
)
if (sent_result['labels'][0] in ["emotional manipulation", "fear mongering", "propaganda"]
and sent_result['scores'][0] > 0.7):
flagged_phrases.append({
'text': sentence.strip(),
'type': sent_result['labels'][0],
'score': sent_result['scores'][0]
})
except Exception as chunk_error:
logger.error(f"Error processing chunk {i}: {str(chunk_error)}")
continue
logger.info("All chunks processed, aggregating scores")
# Aggregate scores with error handling
def aggregate_scores(scores_list, score_type: str):
try:
if not scores_list:
logger.warning(f"No {score_type} scores to aggregate")
return {}
all_scores = {}
for scores in scores_list:
if isinstance(scores, dict):
if 'label' in scores and 'score' in scores:
label = scores['label']
score = scores['score']
else:
# Handle direct label-score mapping
for label, score in scores.items():
if label not in all_scores:
all_scores[label] = []
if isinstance(score, (int, float)):
all_scores[label].append(score)
continue
else:
logger.warning(f"Unexpected score format in {score_type}: {scores}")
continue
if isinstance(label, (str, bytes)):
if label not in all_scores:
all_scores[label] = []
if isinstance(score, (int, float)):
all_scores[label].append(score)
return {k: float(np.mean(v)) for k, v in all_scores.items() if v}
except Exception as agg_error:
logger.error(f"Error aggregating {score_type} scores: {str(agg_error)}")
return {}
emotion_scores = aggregate_scores(sentiment_scores, "emotion")
toxicity_scores = aggregate_scores(toxicity_scores, "toxicity") if self.toxicity_available else {}
logger.debug(f"Aggregated emotion scores: {emotion_scores}")
logger.debug(f"Aggregated toxicity scores: {toxicity_scores}")
# Aggregate manipulation scores
manipulation_agg = {
category: float(np.mean([
scores.get(category, 0)
for scores in manipulation_scores
]))
for category in manipulation_categories
if manipulation_scores # Only process if we have scores
}
logger.debug(f"Aggregated manipulation scores: {manipulation_agg}")
# Calculate manipulation score based on multiple factors
manipulation_indicators = {
'emotional manipulation': 0.4,
'fear mongering': 0.3,
'propaganda': 0.3
}
if self.toxicity_available:
manipulation_indicators.update({
'toxic': 0.2,
'severe_toxic': 0.3,
'threat': 0.2
})
# Combine toxicity and manipulation scores
combined_scores = {**toxicity_scores, **manipulation_agg}
# Calculate manipulation score with fallback
if combined_scores:
manipulation_score = min(100, sum(
combined_scores.get(k, 0) * weight
for k, weight in manipulation_indicators.items()
) * 100)
else:
# Fallback to traditional analysis if no scores available
manipulation_score = len(self._detect_manipulative_phrases(text)) * 10
logger.info(f"Final manipulation score: {manipulation_score}")
# Determine overall sentiment
positive_emotions = ['admiration', 'joy', 'amusement', 'approval']
negative_emotions = ['disgust', 'anger', 'disappointment', 'fear']
neutral_emotions = ['neutral', 'confusion', 'realization']
pos_score = sum(emotion_scores.get(emotion, 0) for emotion in positive_emotions)
neg_score = sum(emotion_scores.get(emotion, 0) for emotion in negative_emotions)
neu_score = sum(emotion_scores.get(emotion, 0) for emotion in neutral_emotions)
logger.debug(f"Sentiment scores - Positive: {pos_score}, Negative: {neg_score}, Neutral: {neu_score}")
# Determine sentiment based on highest score
max_score = max(pos_score, neg_score, neu_score)
if max_score == pos_score and pos_score > 0.3:
sentiment = "Positive"
elif max_score == neg_score and neg_score > 0.3:
sentiment = "Negative"
else:
sentiment = "Neutral"
logger.info(f"Final sentiment determination: {sentiment}")
# Sort and limit flagged phrases by manipulation score
sorted_phrases = sorted(flagged_phrases, key=lambda x: x['score'], reverse=True)
unique_phrases = []
seen = set()
for phrase in sorted_phrases:
clean_text = phrase['text'].strip()
if clean_text not in seen:
unique_phrases.append(clean_text)
seen.add(clean_text)
if len(unique_phrases) >= 5:
break
logger.info("LLM analysis completed successfully")
return {
"sentiment": sentiment,
"manipulation_score": round(manipulation_score, 1),
"flagged_phrases": unique_phrases,
"detailed_scores": {
"emotions": emotion_scores,
"manipulation": manipulation_agg,
"toxicity": toxicity_scores
}
}
except Exception as e:
logger.error(f"LLM analysis failed: {str(e)}", exc_info=True)
return None
def analyze(self, text: str) -> Dict[str, Any]:
"""
Analyze sentiment using LLM with fallback to traditional methods.
Args:
text: The text to analyze
Returns:
Dict containing sentiment analysis results
"""
try:
# Try LLM analysis if enabled and available
if self.use_ai and self.llm_available:
llm_result = self._analyze_with_llm(text)
if llm_result:
return llm_result
# Use traditional analysis
logger.info("Using traditional sentiment analysis")
blob = TextBlob(text)
sentiment_score = blob.sentiment.polarity
manipulative_phrases = self._detect_manipulative_phrases(text)
manipulation_score = len(manipulative_phrases) * 10
if sentiment_score > 0.2:
sentiment = "Positive"
elif sentiment_score < -0.2:
sentiment = "Negative"
else:
sentiment = "Neutral"
return {
"sentiment": sentiment,
"manipulation_score": min(manipulation_score, 100),
"flagged_phrases": manipulative_phrases[:5] # Limit to top 5 phrases
}
except Exception as e:
logger.error(f"Error in sentiment analysis: {str(e)}")
return {
"sentiment": "Error",
"manipulation_score": 0,
"flagged_phrases": []
}
def _detect_manipulative_phrases(self, text: str) -> List[str]:
"""Detect potentially manipulative phrases."""
found_phrases = []
text_lower = text.lower()
for pattern in self.manipulative_patterns:
if pattern in text_lower:
start = text_lower.find(pattern)
context = text[max(0, start-20):min(len(text), start+len(pattern)+20)]
found_phrases.append(context.strip())
return found_phrases