|
from transformers import pipeline |
|
from dataclasses import dataclass, field |
|
from typing import List, Optional, Dict, Any |
|
import re |
|
from datetime import datetime |
|
import logging |
|
import html |
|
from uuid import uuid4 |
|
import torch |
|
import gradio as gr |
|
import emoji |
|
|
|
|
|
logging.basicConfig( |
|
level=logging.INFO, |
|
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' |
|
) |
|
logger = logging.getLogger(__name__) |
|
|
|
@dataclass |
|
class Comment: |
|
"""Представляет комментарий Instagram со всеми метаданными""" |
|
id: str = field(default_factory=lambda: str(uuid4())) |
|
username: str = "" |
|
time: str = "" |
|
content: str = "" |
|
likes: int = 0 |
|
level: int = 0 |
|
parent_id: Optional[str] = None |
|
replies: List['Comment'] = field(default_factory=list) |
|
is_verified: bool = False |
|
mentions: List[str] = field(default_factory=list) |
|
hashtags: List[str] = field(default_factory=list) |
|
is_deleted: bool = False |
|
sentiment: Optional[str] = None |
|
language: Optional[str] = None |
|
emojis: List[str] = field(default_factory=list) |
|
|
|
def __post_init__(self): |
|
if len(self.content) > 2200: |
|
logger.warning(f"Comment content exceeds 2200 characters for user {self.username}") |
|
self.content = self.content[:2200] + "..." |
|
|
|
class InstagramCommentAnalyzer: |
|
"""Анализатор комментариев Instagram с расширенной функциональностью""" |
|
|
|
COMMENT_PATTERN = r''' |
|
(?P<username>[\w\u0400-\u04FF.-]+)\s* |
|
(?P<time>(?:\d+\s+(?:нед|мин|ч|д|мес|год|sec|min|h|d|w|mon|y)\.?))\s* |
|
(?P<content>.*?) |
|
(?:(?:Отметки|Likes)\s*"?Нравится"?:\s*(?P<likes>\d+))? |
|
(?:Ответить|Reply)?(?:Показать\sперевод|Show\stranslation)?(?:Нравится|Like)? |
|
''' |
|
|
|
TIME_MAPPING = { |
|
'нед': 'week', 'мин': 'minute', 'ч': 'hour', |
|
'д': 'day', 'мес': 'month', 'год': 'year', |
|
'w': 'week', 'h': 'hour', 'd': 'day', |
|
'mon': 'month', 'y': 'year' |
|
} |
|
|
|
def __init__(self, max_depth: int = 10, max_comment_length: int = 2200): |
|
"""Инициализация анализатора""" |
|
self.check_dependencies() |
|
self.max_depth = max_depth |
|
self.max_comment_length = max_comment_length |
|
self.pattern = re.compile(self.COMMENT_PATTERN, re.VERBOSE | re.DOTALL) |
|
self.comments: List[Comment] = [] |
|
self.stats = self.initialize_stats() |
|
self.sentiment_analyzer = self.load_sentiment_model() |
|
|
|
def initialize_stats(self) -> Dict[str, int]: |
|
"""Инициализация статистики""" |
|
return { |
|
'total_comments': 0, |
|
'deleted_comments': 0, |
|
'empty_comments': 0, |
|
'max_depth_reached': 0, |
|
'truncated_comments': 0, |
|
'processed_mentions': 0, |
|
'processed_hashtags': 0, |
|
'processed_emojis': 0, |
|
'failed_parses': 0 |
|
} |
|
|
|
def check_dependencies(self): |
|
"""Проверка зависимостей""" |
|
required_packages = ['torch', 'transformers', 'emoji'] |
|
for package in required_packages: |
|
try: |
|
__import__(package) |
|
except ImportError: |
|
logger.error(f"Required package {package} is not installed") |
|
raise |
|
|
|
def load_sentiment_model(self): |
|
"""Загрузка модели анализа тональности""" |
|
try: |
|
device = "cuda" if torch.cuda.is_available() else "cpu" |
|
logger.info(f"Using device: {device}") |
|
return pipeline( |
|
"sentiment-analysis", |
|
model="distilbert-base-uncased-finetuned-sst-2-english", |
|
device=device |
|
) |
|
except Exception as e: |
|
logger.error(f"Model loading failed: {str(e)}") |
|
raise |
|
|
|
def normalize_text(self, text: str) -> str: |
|
"""Улучшенная нормализация текста""" |
|
text = html.unescape(text) |
|
text = ' '.join(text.split()) |
|
text = re.sub(r'[\u200b\ufeff\u200c]', '', text) |
|
return text |
|
|
|
def extract_emojis(self, text: str) -> List[str]: |
|
"""Извлечение эмодзи из текста""" |
|
return [c for c in text if c in emoji.EMOJI_DATA] |
|
|
|
def normalize_time(self, time_str: str) -> str: |
|
"""Нормализация временных меток""" |
|
for rus, eng in self.TIME_MAPPING.items(): |
|
if rus in time_str: |
|
return time_str.replace(rus, eng) |
|
return time_str |
|
|
|
def clean_content(self, content: str) -> str: |
|
"""Очистка содержимого комментария""" |
|
content = content.strip() |
|
content = re.sub(r'\s+', ' ', content) |
|
if len(content) > self.max_comment_length: |
|
self.stats['truncated_comments'] += 1 |
|
content = content[:self.max_comment_length] + "..." |
|
return content |
|
|
|
def extract_metadata(self, comment: Comment) -> None: |
|
"""Извлечение метаданных из комментария""" |
|
try: |
|
|
|
comment.mentions = re.findall(r'@(\w+)', comment.content) |
|
comment.hashtags = re.findall(r'#(\w+)', comment.content) |
|
|
|
|
|
comment.emojis = self.extract_emojis(comment.content) |
|
|
|
|
|
self.stats['processed_mentions'] += len(comment.mentions) |
|
self.stats['processed_hashtags'] += len(comment.hashtags) |
|
self.stats['processed_emojis'] += len(comment.emojis) |
|
|
|
|
|
comment.is_verified = bool(re.search(r'✓|Подтвержденный', comment.username)) |
|
except Exception as e: |
|
logger.error(f"Metadata extraction failed: {str(e)}") |
|
|
|
def analyze_sentiment(self, text: str) -> str: |
|
"""Анализ тональности текста""" |
|
try: |
|
result = self.sentiment_analyzer(text) |
|
return result[0]['label'] |
|
except Exception as e: |
|
logger.error(f"Sentiment analysis failed: {str(e)}") |
|
return "UNKNOWN" |
|
def process_comment(self, text: str, parent_id: Optional[str] = None, level: int = 0) -> Optional[Comment]: |
|
"""Обработка отдельного комментария""" |
|
if not self.validate_input(text): |
|
return None |
|
|
|
if level > self.max_depth: |
|
logger.warning(f"Maximum depth {self.max_depth} exceeded") |
|
self.stats['max_depth_reached'] += 1 |
|
return None |
|
|
|
try: |
|
text = self.normalize_text(text) |
|
match = self.pattern.match(text) |
|
|
|
if not match: |
|
alt_match = self.alternative_parse(text) |
|
if not alt_match: |
|
raise ValueError(f"Could not parse comment: {text[:100]}...") |
|
match = alt_match |
|
|
|
data = match.groupdict() |
|
comment = Comment( |
|
username=data['username'].strip(), |
|
time=self.normalize_time(data['time']), |
|
content=self.clean_content(data['content']), |
|
likes=self.parse_likes(data.get('likes', '0')), |
|
level=level, |
|
parent_id=parent_id |
|
) |
|
|
|
|
|
comment.sentiment = self.analyze_sentiment(comment.content) |
|
self.extract_metadata(comment) |
|
|
|
self.stats['total_comments'] += 1 |
|
return comment |
|
|
|
except Exception as e: |
|
logger.error(f"Error processing comment: {str(e)}", exc_info=True) |
|
self.stats['failed_parses'] += 1 |
|
return self.create_damaged_comment() |
|
|
|
def alternative_parse(self, text: str) -> Optional[re.Match]: |
|
"""Альтернативный метод парсинга для сложных случаев""" |
|
alternative_patterns = [ |
|
|
|
r'(?P<username>[\w\u0400-\u04FF.-]+)\s*(?P<content>.*?)(?P<time>\d+\s+\w+\.?)(?P<likes>\d+)?', |
|
|
|
r'(?P<username>[\w\u0400-\u04FF.-]+)\s*(?P<content>.*?)(?P<time>\d+\s+\w+)(?:Like)?(?P<likes>\d+)?' |
|
] |
|
|
|
for pattern in alternative_patterns: |
|
try: |
|
match = re.compile(pattern, re.VERBOSE | re.DOTALL).match(text) |
|
if match: |
|
return match |
|
except Exception: |
|
continue |
|
return None |
|
|
|
def parse_likes(self, likes_str: str) -> int: |
|
"""Безопасный парсинг количества лайков""" |
|
try: |
|
return int(re.sub(r'\D', '', likes_str) or 0) |
|
except (ValueError, TypeError): |
|
return 0 |
|
|
|
def create_damaged_comment(self) -> Comment: |
|
"""Создание заглушки для поврежденного комментария""" |
|
return Comment( |
|
username="[damaged]", |
|
time="unknown", |
|
content="[Поврежденные данные]", |
|
is_deleted=True |
|
) |
|
|
|
def validate_input(self, text: str) -> bool: |
|
"""Валидация входного текста""" |
|
if not text or not isinstance(text, str): |
|
logger.error("Invalid input: text must be non-empty string") |
|
return False |
|
if len(text) > 50000: |
|
logger.error("Input text too large") |
|
return False |
|
return True |
|
|
|
def format_comment(self, comment: Comment, index: int) -> str: |
|
"""Форматирование комментария для вывода""" |
|
try: |
|
if comment.is_deleted: |
|
return f'{index}. "[УДАЛЕНО]"' |
|
|
|
emoji_str = ' '.join(comment.emojis) if comment.emojis else '' |
|
mentions_str = ', '.join(comment.mentions) if comment.mentions else '' |
|
hashtags_str = ', '.join(comment.hashtags) if comment.hashtags else '' |
|
|
|
return ( |
|
f'{index}. "{comment.username}" "{comment.time}" ' |
|
f'"{comment.content}" "Лайки: {comment.likes}" ' |
|
f'"Настроение: {comment.sentiment}" ' |
|
f'"Эмодзи: {emoji_str}" ' |
|
f'"Упоминания: {mentions_str}" ' |
|
f'"Хэштеги: {hashtags_str}"' |
|
) |
|
except Exception as e: |
|
logger.error(f"Error formatting comment: {str(e)}") |
|
return f'{index}. "[ОШИБКА ФОРМАТИРОВАНИЯ]"' |
|
|
|
def process_comments(self, text: str) -> List[str]: |
|
"""Обработка всех комментариев""" |
|
try: |
|
self.stats = self.initialize_stats() |
|
text = self.normalize_text(text) |
|
raw_comments = text.split('ОтветитьНравится') |
|
formatted_comments = [] |
|
|
|
for i, raw_comment in enumerate(raw_comments, 1): |
|
if not raw_comment.strip(): |
|
continue |
|
|
|
comment = self.process_comment(raw_comment) |
|
if comment: |
|
formatted_comments.append(self.format_comment(comment, i)) |
|
|
|
return formatted_comments |
|
except Exception as e: |
|
logger.error(f"Error processing comments: {str(e)}") |
|
return ["[ОШИБКА ОБРАБОТКИ КОММЕНТАРИЕВ]"] |
|
|
|
def create_interface(): |
|
"""Создание интерфейса Gradio""" |
|
analyzer = InstagramCommentAnalyzer() |
|
|
|
def analyze_text(text: str): |
|
formatted_comments = analyzer.process_comments(text) |
|
return "\n".join(formatted_comments) |
|
|
|
iface = gr.Interface( |
|
fn=analyze_text, |
|
inputs=gr.Textbox( |
|
lines=10, |
|
placeholder="Вставьте текст комментариев здесь...", |
|
label="Входной текст" |
|
), |
|
outputs=gr.Textbox( |
|
lines=20, |
|
placeholder="Результаты анализа будут отображены здесь...", |
|
label="Результаты анализа" |
|
), |
|
title="Instagram Comment Analyzer", |
|
description="Анализатор комментариев Instagram с поддержкой эмодзи и мультиязычности", |
|
theme="default", |
|
analytics_enabled=False, |
|
) |
|
return iface |
|
|
|
def main(): |
|
"""Основная функция запуска приложения""" |
|
try: |
|
interface = create_interface() |
|
interface.launch( |
|
server_name="0.0.0.0", |
|
server_port=7860, |
|
share=False, |
|
debug=True |
|
) |
|
except Exception as e: |
|
logger.error(f"Application failed to start: {str(e)}") |
|
raise |
|
|
|
if __name__ == "__main__": |
|
main() |