|
import gradio as gr |
|
import pandas as pd |
|
import numpy as np |
|
import torch |
|
from transformers import AutoTokenizer, AutoModelForSequenceClassification |
|
from textblob import TextBlob |
|
from typing import List, Dict, Tuple |
|
from dataclasses import dataclass |
|
from pathlib import Path |
|
import logging |
|
import re |
|
from datetime import datetime |
|
|
|
|
|
logging.basicConfig(level=logging.INFO) |
|
logger = logging.getLogger(__name__) |
|
|
|
@dataclass |
|
class RecommendationWeights: |
|
visibility: float |
|
sentiment: float |
|
popularity: float |
|
|
|
class TweetPreprocessor: |
|
def __init__(self, data_path: Path): |
|
"""Initialize the preprocessor with data path.""" |
|
self.data = self._load_data(data_path) |
|
|
|
@staticmethod |
|
def _load_data(data_path: Path) -> pd.DataFrame: |
|
"""Load and validate the dataset.""" |
|
try: |
|
data = pd.read_csv(data_path) |
|
required_columns = {'Text', 'Retweets', 'Likes', 'Timestamp'} |
|
if not required_columns.issubset(data.columns): |
|
raise ValueError(f"Missing required columns: {required_columns - set(data.columns)}") |
|
return data |
|
except Exception as e: |
|
logger.error(f"Error loading data: {e}") |
|
raise |
|
|
|
def _clean_text(self, text: str) -> str: |
|
"""清理文本内容,移除无意义的内容""" |
|
if pd.isna(text) or len(str(text).strip()) < 10: |
|
return "" |
|
|
|
|
|
text = re.sub(r'http\S+|www.\S+', '', str(text)) |
|
|
|
text = re.sub(r'[^\w\s]', '', text) |
|
|
|
text = ' '.join(text.split()) |
|
return text |
|
|
|
def calculate_metrics(self) -> pd.DataFrame: |
|
"""Calculate sentiment and popularity metrics.""" |
|
|
|
self.data['Clean_Text'] = self.data['Text'].apply(self._clean_text) |
|
|
|
self.data = self.data[self.data['Clean_Text'].str.len() > 0] |
|
|
|
self.data['Sentiment'] = self.data['Clean_Text'].apply(self._get_sentiment) |
|
self.data['Popularity'] = self._normalize_popularity() |
|
|
|
|
|
self.data['Time_Weight'] = self._calculate_time_weight() |
|
return self.data |
|
|
|
def _calculate_time_weight(self) -> pd.Series: |
|
"""计算时间权重,越新的内容权重越高""" |
|
current_time = datetime.now() |
|
self.data['Timestamp'] = pd.to_datetime(self.data['Timestamp']) |
|
time_diff = (current_time - self.data['Timestamp']).dt.total_seconds() |
|
return np.exp(-time_diff / (7 * 24 * 3600)) |
|
|
|
@staticmethod |
|
def _get_sentiment(text: str) -> float: |
|
"""Calculate sentiment polarity for a text.""" |
|
try: |
|
return TextBlob(str(text)).sentiment.polarity |
|
except Exception as e: |
|
logger.warning(f"Error calculating sentiment: {e}") |
|
return 0.0 |
|
|
|
def _normalize_popularity(self) -> pd.Series: |
|
"""Normalize popularity scores using min-max scaling.""" |
|
popularity = self.data['Retweets'] + self.data['Likes'] |
|
return (popularity - popularity.min()) / (popularity.max() - popularity.min() + 1e-6) |
|
class FakeNewsClassifier: |
|
def __init__(self, model_name: str): |
|
"""Initialize the fake news classifier.""" |
|
self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu") |
|
self.model_name = model_name |
|
self.model, self.tokenizer = self._load_model() |
|
|
|
def _load_model(self) -> Tuple[AutoModelForSequenceClassification, AutoTokenizer]: |
|
"""Load the model and tokenizer.""" |
|
try: |
|
tokenizer = AutoTokenizer.from_pretrained(self.model_name) |
|
model = AutoModelForSequenceClassification.from_pretrained(self.model_name).to(self.device) |
|
return model, tokenizer |
|
except Exception as e: |
|
logger.error(f"Error loading model: {e}") |
|
raise |
|
|
|
@torch.no_grad() |
|
def predict_batch(self, texts: List[str], batch_size: int = 32) -> np.ndarray: |
|
"""Predict fake news probability for a batch of texts.""" |
|
predictions = [] |
|
|
|
for i in range(0, len(texts), batch_size): |
|
batch_texts = texts[i:i + batch_size] |
|
inputs = self.tokenizer( |
|
batch_texts, |
|
return_tensors="pt", |
|
padding=True, |
|
truncation=True, |
|
max_length=128 |
|
).to(self.device) |
|
|
|
outputs = self.model(**inputs) |
|
batch_predictions = outputs.logits.argmax(dim=1).cpu().numpy() |
|
predictions.extend(batch_predictions) |
|
|
|
return np.array(predictions) |
|
|
|
class RecommendationSystem: |
|
def __init__(self, data_path: Path, model_name: str): |
|
"""Initialize the recommendation system.""" |
|
self.preprocessor = TweetPreprocessor(data_path) |
|
self.classifier = FakeNewsClassifier(model_name) |
|
self.data = None |
|
self.setup_system() |
|
|
|
def setup_system(self): |
|
"""Set up the recommendation system.""" |
|
self.data = self.preprocessor.calculate_metrics() |
|
predictions = self.classifier.predict_batch(self.data['Clean_Text'].tolist()) |
|
self.data['Credibility'] = [1 if pred == 1 else -1 for pred in predictions] |
|
|
|
def get_recommendations(self, weights: RecommendationWeights, num_recommendations: int = 10) -> Dict: |
|
"""Get tweet recommendations based on weights.""" |
|
if not self._validate_weights(weights): |
|
return {"error": "Invalid weights provided"} |
|
|
|
normalized_weights = self._normalize_weights(weights) |
|
|
|
self.data['Final_Score'] = ( |
|
self.data['Credibility'] * normalized_weights.visibility + |
|
self.data['Sentiment'] * normalized_weights.sentiment + |
|
self.data['Popularity'] * normalized_weights.popularity |
|
) * self.data['Time_Weight'] |
|
|
|
top_recommendations = ( |
|
self.data.nlargest(100, 'Final_Score') |
|
.sample(num_recommendations) |
|
) |
|
|
|
return self._format_recommendations(top_recommendations) |
|
|
|
def _format_recommendations(self, recommendations: pd.DataFrame) -> Dict: |
|
"""Format recommendations for display.""" |
|
formatted_results = [] |
|
for _, row in recommendations.iterrows(): |
|
score_details = { |
|
"总分": f"{row['Final_Score']:.2f}", |
|
"可信度": "可信" if row['Credibility'] > 0 else "存疑", |
|
"情感倾向": self._get_sentiment_label(row['Sentiment']), |
|
"热度": f"{row['Popularity']:.2f}", |
|
"互动数": f"点赞 {row['Likes']} · 转发 {row['Retweets']}" |
|
} |
|
|
|
formatted_results.append({ |
|
"text": row['Clean_Text'], |
|
"scores": score_details, |
|
"timestamp": row['Timestamp'].strftime("%Y-%m-%d %H:%M") |
|
}) |
|
|
|
return { |
|
"recommendations": formatted_results, |
|
"score_explanation": self._get_score_explanation() |
|
} |
|
|
|
@staticmethod |
|
def _get_sentiment_label(sentiment_score: float) -> str: |
|
"""Convert sentiment score to human-readable label.""" |
|
if sentiment_score > 0.3: |
|
return "积极" |
|
elif sentiment_score < -0.3: |
|
return "消极" |
|
return "中性" |
|
|
|
@staticmethod |
|
def _get_score_explanation() -> Dict[str, str]: |
|
"""Provide explanation for different score components.""" |
|
return { |
|
"可信度": "基于机器学习模型对内容可信度的评估", |
|
"情感倾向": "文本的情感倾向分析结果", |
|
"热度": "根据点赞和转发数量计算的归一化热度分数", |
|
"时间权重": "考虑内容时效性的权重因子" |
|
} |
|
|
|
def create_gradio_interface(recommendation_system: RecommendationSystem) -> gr.Interface: |
|
"""Create and configure the Gradio interface.""" |
|
with gr.Blocks(theme=gr.themes.Soft()) as interface: |
|
gr.Markdown(""" |
|
# 推文推荐系统 |
|
|
|
这个系统通过多个维度来为您推荐高质量的推文: |
|
- **可信度**: 评估内容的可靠性 |
|
- **情感倾向**: 分析文本的情感色彩 |
|
- **热度**: 考虑内容的受欢迎程度 |
|
- **时效性**: 优先推荐较新的内容 |
|
""") |
|
|
|
with gr.Row(): |
|
with gr.Column(scale=1): |
|
visibility_weight = gr.Slider( |
|
0, 1, 0.5, |
|
label="可信度权重", |
|
info="调整对内容可信度的重视程度" |
|
) |
|
sentiment_weight = gr.Slider( |
|
0, 1, 0.3, |
|
label="情感倾向权重", |
|
info="调整对情感倾向的重视程度" |
|
) |
|
popularity_weight = gr.Slider( |
|
0, 1, 0.2, |
|
label="热度权重", |
|
info="调整对内容热度的重视程度" |
|
) |
|
submit_btn = gr.Button("获取推荐", variant="primary") |
|
|
|
with gr.Column(scale=2): |
|
output = gr.JSON(label="推荐结果") |
|
|
|
submit_btn.click( |
|
fn=lambda v, s, p: recommendation_system.get_recommendations( |
|
RecommendationWeights(v, s, p) |
|
), |
|
inputs=[visibility_weight, sentiment_weight, popularity_weight], |
|
outputs=output |
|
) |
|
|
|
return interface |
|
|
|
def main(): |
|
"""Main function to run the application.""" |
|
try: |
|
recommendation_system = RecommendationSystem( |
|
data_path=Path('twitter_dataset.csv'), |
|
model_name="hamzab/roberta-fake-news-classification" |
|
) |
|
interface = create_gradio_interface(recommendation_system) |
|
interface.launch() |
|
except Exception as e: |
|
logger.error(f"Application failed to start: {e}") |
|
raise |
|
|
|
if __name__ == "__main__": |
|
main() |