File size: 10,232 Bytes
efc4793
 
 
 
 
 
28fe915
 
 
 
ab7af96
 
efc4793
28fe915
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
ab7af96
28fe915
 
 
 
 
 
 
ab7af96
 
 
 
 
 
 
 
 
 
 
 
 
28fe915
 
ab7af96
 
 
 
 
 
28fe915
ab7af96
 
 
28fe915
 
ab7af96
 
 
 
 
 
 
28fe915
 
 
 
 
 
 
 
 
 
 
 
ab7af96
a4dca14
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
28fe915
 
 
 
 
 
 
 
 
 
 
 
ab7af96
28fe915
 
ab7af96
28fe915
 
ab7af96
28fe915
 
 
 
 
 
 
ab7af96
28fe915
 
 
 
 
 
 
 
ab7af96
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
28fe915
 
ab7af96
 
 
 
 
 
 
28fe915
 
ab7af96
 
 
 
 
 
 
 
28fe915
 
 
ab7af96
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
28fe915
 
 
 
 
 
 
 
ab7af96
 
28fe915
 
 
 
 
4822fd4
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
import gradio as gr
import pandas as pd
import numpy as np
import torch
from transformers import AutoTokenizer, AutoModelForSequenceClassification
from textblob import TextBlob
from typing import List, Dict, Tuple
from dataclasses import dataclass
from pathlib import Path
import logging
import re
from datetime import datetime

# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

@dataclass
class RecommendationWeights:
    visibility: float
    sentiment: float
    popularity: float

class TweetPreprocessor:
    def __init__(self, data_path: Path):
        """Initialize the preprocessor with data path."""
        self.data = self._load_data(data_path)
        
    @staticmethod
    def _load_data(data_path: Path) -> pd.DataFrame:
        """Load and validate the dataset."""
        try:
            data = pd.read_csv(data_path)
            required_columns = {'Text', 'Retweets', 'Likes', 'Timestamp'}  # 添加时间戳列
            if not required_columns.issubset(data.columns):
                raise ValueError(f"Missing required columns: {required_columns - set(data.columns)}")
            return data
        except Exception as e:
            logger.error(f"Error loading data: {e}")
            raise

    def _clean_text(self, text: str) -> str:
        """清理文本内容,移除无意义的内容"""
        if pd.isna(text) or len(str(text).strip()) < 10:  # 排除过短或空的文本
            return ""
        
        # 移除URL
        text = re.sub(r'http\S+|www.\S+', '', str(text))
        # 移除特殊字符
        text = re.sub(r'[^\w\s]', '', text)
        # 移除多余空格
        text = ' '.join(text.split())
        return text

    def calculate_metrics(self) -> pd.DataFrame:
        """Calculate sentiment and popularity metrics."""
        # 清理文本
        self.data['Clean_Text'] = self.data['Text'].apply(self._clean_text)
        # 过滤掉无效的文本
        self.data = self.data[self.data['Clean_Text'].str.len() > 0]
        
        self.data['Sentiment'] = self.data['Clean_Text'].apply(self._get_sentiment)
        self.data['Popularity'] = self._normalize_popularity()
        
        # 添加时间衰减因子
        self.data['Time_Weight'] = self._calculate_time_weight()
        return self.data
    
    def _calculate_time_weight(self) -> pd.Series:
        """计算时间权重,越新的内容权重越高"""
        current_time = datetime.now()
        self.data['Timestamp'] = pd.to_datetime(self.data['Timestamp'])
        time_diff = (current_time - self.data['Timestamp']).dt.total_seconds()
        return np.exp(-time_diff / (7 * 24 * 3600))  # 7天的衰减周期

    @staticmethod
    def _get_sentiment(text: str) -> float:
        """Calculate sentiment polarity for a text."""
        try:
            return TextBlob(str(text)).sentiment.polarity
        except Exception as e:
            logger.warning(f"Error calculating sentiment: {e}")
            return 0.0

    def _normalize_popularity(self) -> pd.Series:
        """Normalize popularity scores using min-max scaling."""
        popularity = self.data['Retweets'] + self.data['Likes']
        return (popularity - popularity.min()) / (popularity.max() - popularity.min() + 1e-6)
class FakeNewsClassifier:
    def __init__(self, model_name: str):
        """Initialize the fake news classifier."""
        self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        self.model_name = model_name
        self.model, self.tokenizer = self._load_model()
        
    def _load_model(self) -> Tuple[AutoModelForSequenceClassification, AutoTokenizer]:
        """Load the model and tokenizer."""
        try:
            tokenizer = AutoTokenizer.from_pretrained(self.model_name)
            model = AutoModelForSequenceClassification.from_pretrained(self.model_name).to(self.device)
            return model, tokenizer
        except Exception as e:
            logger.error(f"Error loading model: {e}")
            raise

    @torch.no_grad()
    def predict_batch(self, texts: List[str], batch_size: int = 32) -> np.ndarray:
        """Predict fake news probability for a batch of texts."""
        predictions = []
        
        for i in range(0, len(texts), batch_size):
            batch_texts = texts[i:i + batch_size]
            inputs = self.tokenizer(
                batch_texts,
                return_tensors="pt",
                padding=True,
                truncation=True,
                max_length=128
            ).to(self.device)
            
            outputs = self.model(**inputs)
            batch_predictions = outputs.logits.argmax(dim=1).cpu().numpy()
            predictions.extend(batch_predictions)
            
        return np.array(predictions)     

class RecommendationSystem:
    def __init__(self, data_path: Path, model_name: str):
        """Initialize the recommendation system."""
        self.preprocessor = TweetPreprocessor(data_path)
        self.classifier = FakeNewsClassifier(model_name)
        self.data = None
        self.setup_system()

    def setup_system(self):
        """Set up the recommendation system."""
        self.data = self.preprocessor.calculate_metrics()
        predictions = self.classifier.predict_batch(self.data['Clean_Text'].tolist())
        self.data['Credibility'] = [1 if pred == 1 else -1 for pred in predictions]

    def get_recommendations(self, weights: RecommendationWeights, num_recommendations: int = 10) -> Dict:
        """Get tweet recommendations based on weights."""
        if not self._validate_weights(weights):
            return {"error": "Invalid weights provided"}

        normalized_weights = self._normalize_weights(weights)
        
        self.data['Final_Score'] = (
            self.data['Credibility'] * normalized_weights.visibility +
            self.data['Sentiment'] * normalized_weights.sentiment +
            self.data['Popularity'] * normalized_weights.popularity
        ) * self.data['Time_Weight']  # 考虑时间因素

        top_recommendations = (
            self.data.nlargest(100, 'Final_Score')
            .sample(num_recommendations)
        )

        return self._format_recommendations(top_recommendations)

    def _format_recommendations(self, recommendations: pd.DataFrame) -> Dict:
        """Format recommendations for display."""
        formatted_results = []
        for _, row in recommendations.iterrows():
            score_details = {
                "总分": f"{row['Final_Score']:.2f}",
                "可信度": "可信" if row['Credibility'] > 0 else "存疑",
                "情感倾向": self._get_sentiment_label(row['Sentiment']),
                "热度": f"{row['Popularity']:.2f}",
                "互动数": f"点赞 {row['Likes']} · 转发 {row['Retweets']}"
            }
            
            formatted_results.append({
                "text": row['Clean_Text'],
                "scores": score_details,
                "timestamp": row['Timestamp'].strftime("%Y-%m-%d %H:%M")
            })
            
        return {
            "recommendations": formatted_results,
            "score_explanation": self._get_score_explanation()
        }

    @staticmethod
    def _get_sentiment_label(sentiment_score: float) -> str:
        """Convert sentiment score to human-readable label."""
        if sentiment_score > 0.3:
            return "积极"
        elif sentiment_score < -0.3:
            return "消极"
        return "中性"

    @staticmethod
    def _get_score_explanation() -> Dict[str, str]:
        """Provide explanation for different score components."""
        return {
            "可信度": "基于机器学习模型对内容可信度的评估",
            "情感倾向": "文本的情感倾向分析结果",
            "热度": "根据点赞和转发数量计算的归一化热度分数",
            "时间权重": "考虑内容时效性的权重因子"
        }

def create_gradio_interface(recommendation_system: RecommendationSystem) -> gr.Interface:
    """Create and configure the Gradio interface."""
    with gr.Blocks(theme=gr.themes.Soft()) as interface:
        gr.Markdown("""
        # 推文推荐系统
        
        这个系统通过多个维度来为您推荐高质量的推文:
        - **可信度**: 评估内容的可靠性
        - **情感倾向**: 分析文本的情感色彩
        - **热度**: 考虑内容的受欢迎程度
        - **时效性**: 优先推荐较新的内容
        """)
        
        with gr.Row():
            with gr.Column(scale=1):
                visibility_weight = gr.Slider(
                    0, 1, 0.5,
                    label="可信度权重",
                    info="调整对内容可信度的重视程度"
                )
                sentiment_weight = gr.Slider(
                    0, 1, 0.3,
                    label="情感倾向权重",
                    info="调整对情感倾向的重视程度"
                )
                popularity_weight = gr.Slider(
                    0, 1, 0.2,
                    label="热度权重",
                    info="调整对内容热度的重视程度"
                )
                submit_btn = gr.Button("获取推荐", variant="primary")

            with gr.Column(scale=2):
                output = gr.JSON(label="推荐结果")
                
        submit_btn.click(
            fn=lambda v, s, p: recommendation_system.get_recommendations(
                RecommendationWeights(v, s, p)
            ),
            inputs=[visibility_weight, sentiment_weight, popularity_weight],
            outputs=output
        )
        
    return interface

def main():
    """Main function to run the application."""
    try:
        recommendation_system = RecommendationSystem(
            data_path=Path('twitter_dataset.csv'),
            model_name="hamzab/roberta-fake-news-classification"
        )
        interface = create_gradio_interface(recommendation_system)
        interface.launch()
    except Exception as e:
        logger.error(f"Application failed to start: {e}")
        raise

if __name__ == "__main__":
    main()