YixuanWang's picture
Update app.py
a4dca14 verified
raw
history blame
10.2 kB
import gradio as gr
import pandas as pd
import numpy as np
import torch
from transformers import AutoTokenizer, AutoModelForSequenceClassification
from textblob import TextBlob
from typing import List, Dict, Tuple
from dataclasses import dataclass
from pathlib import Path
import logging
import re
from datetime import datetime
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class RecommendationWeights:
visibility: float
sentiment: float
popularity: float
class TweetPreprocessor:
def __init__(self, data_path: Path):
"""Initialize the preprocessor with data path."""
self.data = self._load_data(data_path)
@staticmethod
def _load_data(data_path: Path) -> pd.DataFrame:
"""Load and validate the dataset."""
try:
data = pd.read_csv(data_path)
required_columns = {'Text', 'Retweets', 'Likes', 'Timestamp'} # 添加时间戳列
if not required_columns.issubset(data.columns):
raise ValueError(f"Missing required columns: {required_columns - set(data.columns)}")
return data
except Exception as e:
logger.error(f"Error loading data: {e}")
raise
def _clean_text(self, text: str) -> str:
"""清理文本内容,移除无意义的内容"""
if pd.isna(text) or len(str(text).strip()) < 10: # 排除过短或空的文本
return ""
# 移除URL
text = re.sub(r'http\S+|www.\S+', '', str(text))
# 移除特殊字符
text = re.sub(r'[^\w\s]', '', text)
# 移除多余空格
text = ' '.join(text.split())
return text
def calculate_metrics(self) -> pd.DataFrame:
"""Calculate sentiment and popularity metrics."""
# 清理文本
self.data['Clean_Text'] = self.data['Text'].apply(self._clean_text)
# 过滤掉无效的文本
self.data = self.data[self.data['Clean_Text'].str.len() > 0]
self.data['Sentiment'] = self.data['Clean_Text'].apply(self._get_sentiment)
self.data['Popularity'] = self._normalize_popularity()
# 添加时间衰减因子
self.data['Time_Weight'] = self._calculate_time_weight()
return self.data
def _calculate_time_weight(self) -> pd.Series:
"""计算时间权重,越新的内容权重越高"""
current_time = datetime.now()
self.data['Timestamp'] = pd.to_datetime(self.data['Timestamp'])
time_diff = (current_time - self.data['Timestamp']).dt.total_seconds()
return np.exp(-time_diff / (7 * 24 * 3600)) # 7天的衰减周期
@staticmethod
def _get_sentiment(text: str) -> float:
"""Calculate sentiment polarity for a text."""
try:
return TextBlob(str(text)).sentiment.polarity
except Exception as e:
logger.warning(f"Error calculating sentiment: {e}")
return 0.0
def _normalize_popularity(self) -> pd.Series:
"""Normalize popularity scores using min-max scaling."""
popularity = self.data['Retweets'] + self.data['Likes']
return (popularity - popularity.min()) / (popularity.max() - popularity.min() + 1e-6)
class FakeNewsClassifier:
def __init__(self, model_name: str):
"""Initialize the fake news classifier."""
self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
self.model_name = model_name
self.model, self.tokenizer = self._load_model()
def _load_model(self) -> Tuple[AutoModelForSequenceClassification, AutoTokenizer]:
"""Load the model and tokenizer."""
try:
tokenizer = AutoTokenizer.from_pretrained(self.model_name)
model = AutoModelForSequenceClassification.from_pretrained(self.model_name).to(self.device)
return model, tokenizer
except Exception as e:
logger.error(f"Error loading model: {e}")
raise
@torch.no_grad()
def predict_batch(self, texts: List[str], batch_size: int = 32) -> np.ndarray:
"""Predict fake news probability for a batch of texts."""
predictions = []
for i in range(0, len(texts), batch_size):
batch_texts = texts[i:i + batch_size]
inputs = self.tokenizer(
batch_texts,
return_tensors="pt",
padding=True,
truncation=True,
max_length=128
).to(self.device)
outputs = self.model(**inputs)
batch_predictions = outputs.logits.argmax(dim=1).cpu().numpy()
predictions.extend(batch_predictions)
return np.array(predictions)
class RecommendationSystem:
def __init__(self, data_path: Path, model_name: str):
"""Initialize the recommendation system."""
self.preprocessor = TweetPreprocessor(data_path)
self.classifier = FakeNewsClassifier(model_name)
self.data = None
self.setup_system()
def setup_system(self):
"""Set up the recommendation system."""
self.data = self.preprocessor.calculate_metrics()
predictions = self.classifier.predict_batch(self.data['Clean_Text'].tolist())
self.data['Credibility'] = [1 if pred == 1 else -1 for pred in predictions]
def get_recommendations(self, weights: RecommendationWeights, num_recommendations: int = 10) -> Dict:
"""Get tweet recommendations based on weights."""
if not self._validate_weights(weights):
return {"error": "Invalid weights provided"}
normalized_weights = self._normalize_weights(weights)
self.data['Final_Score'] = (
self.data['Credibility'] * normalized_weights.visibility +
self.data['Sentiment'] * normalized_weights.sentiment +
self.data['Popularity'] * normalized_weights.popularity
) * self.data['Time_Weight'] # 考虑时间因素
top_recommendations = (
self.data.nlargest(100, 'Final_Score')
.sample(num_recommendations)
)
return self._format_recommendations(top_recommendations)
def _format_recommendations(self, recommendations: pd.DataFrame) -> Dict:
"""Format recommendations for display."""
formatted_results = []
for _, row in recommendations.iterrows():
score_details = {
"总分": f"{row['Final_Score']:.2f}",
"可信度": "可信" if row['Credibility'] > 0 else "存疑",
"情感倾向": self._get_sentiment_label(row['Sentiment']),
"热度": f"{row['Popularity']:.2f}",
"互动数": f"点赞 {row['Likes']} · 转发 {row['Retweets']}"
}
formatted_results.append({
"text": row['Clean_Text'],
"scores": score_details,
"timestamp": row['Timestamp'].strftime("%Y-%m-%d %H:%M")
})
return {
"recommendations": formatted_results,
"score_explanation": self._get_score_explanation()
}
@staticmethod
def _get_sentiment_label(sentiment_score: float) -> str:
"""Convert sentiment score to human-readable label."""
if sentiment_score > 0.3:
return "积极"
elif sentiment_score < -0.3:
return "消极"
return "中性"
@staticmethod
def _get_score_explanation() -> Dict[str, str]:
"""Provide explanation for different score components."""
return {
"可信度": "基于机器学习模型对内容可信度的评估",
"情感倾向": "文本的情感倾向分析结果",
"热度": "根据点赞和转发数量计算的归一化热度分数",
"时间权重": "考虑内容时效性的权重因子"
}
def create_gradio_interface(recommendation_system: RecommendationSystem) -> gr.Interface:
"""Create and configure the Gradio interface."""
with gr.Blocks(theme=gr.themes.Soft()) as interface:
gr.Markdown("""
# 推文推荐系统
这个系统通过多个维度来为您推荐高质量的推文:
- **可信度**: 评估内容的可靠性
- **情感倾向**: 分析文本的情感色彩
- **热度**: 考虑内容的受欢迎程度
- **时效性**: 优先推荐较新的内容
""")
with gr.Row():
with gr.Column(scale=1):
visibility_weight = gr.Slider(
0, 1, 0.5,
label="可信度权重",
info="调整对内容可信度的重视程度"
)
sentiment_weight = gr.Slider(
0, 1, 0.3,
label="情感倾向权重",
info="调整对情感倾向的重视程度"
)
popularity_weight = gr.Slider(
0, 1, 0.2,
label="热度权重",
info="调整对内容热度的重视程度"
)
submit_btn = gr.Button("获取推荐", variant="primary")
with gr.Column(scale=2):
output = gr.JSON(label="推荐结果")
submit_btn.click(
fn=lambda v, s, p: recommendation_system.get_recommendations(
RecommendationWeights(v, s, p)
),
inputs=[visibility_weight, sentiment_weight, popularity_weight],
outputs=output
)
return interface
def main():
"""Main function to run the application."""
try:
recommendation_system = RecommendationSystem(
data_path=Path('twitter_dataset.csv'),
model_name="hamzab/roberta-fake-news-classification"
)
interface = create_gradio_interface(recommendation_system)
interface.launch()
except Exception as e:
logger.error(f"Application failed to start: {e}")
raise
if __name__ == "__main__":
main()