File size: 6,076 Bytes
2c5fbe9 60e12de be283ee 60e12de be283ee 60e12de 55ab780 60e12de 55ab780 2c5fbe9 60e12de 2c5fbe9 60e12de 2c5fbe9 60e12de 2c5fbe9 60e12de 2c5fbe9 60e12de |
|
from transformers import pipeline
from dataclasses import dataclass, field
from typing import List, Optional, Dict, Any
import re
from datetime import datetime
import logging
import html
from uuid import uuid4
# Настройка логирования
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
@dataclass
class Comment:
id: str = field(default_factory=lambda: str(uuid4()))
username: str = ""
time: str = ""
content: str = ""
likes: int = 0
level: int = 0
parent_id: Optional[str] = None
replies: List['Comment'] = field(default_factory=list)
is_verified: bool = False
mentions: List[str] = field(default_factory=list)
hashtags: List[str] = field(default_factory=list)
is_deleted: bool = False
sentiment: Optional[str] = None
def __post_init__(self):
if len(self.content) > 2200:
logger.warning(f"Comment content exceeds 2200 characters for user {self.username}")
self.content = self.content[:2200] + "..."
class InstagramCommentAnalyzer:
COMMENT_PATTERN = r'''
(?P<username>[\w.-]+)\s+
(?P<time>\d+\s+нед\.)
(?P<content>.*?)
(?:Отметки\s*"Нравится":\s*(?P<likes>\d+))?
(?:Ответить)?(?:Показать\sперевод)?(?:Нравится)?
'''
def __init__(self, max_depth: int = 10, max_comment_length: int = 2200):
self.max_depth = max_depth
self.max_comment_length = max_comment_length
self.pattern = re.compile(self.COMMENT_PATTERN, re.VERBOSE | re.DOTALL)
self.comments: List[Comment] = []
self.stats: Dict[str, int] = {
'total_comments': 0,
'deleted_comments': 0,
'empty_comments': 0,
'max_depth_reached': 0,
'truncated_comments': 0,
'processed_mentions': 0,
'processed_hashtags': 0
}
# Явное указание модели для анализа настроений
self.sentiment_analyzer = pipeline(
"sentiment-analysis",
model="distilbert-base-uncased-finetuned-sst-2-english" # Выбор модели
)
def analyze_sentiment(self, text: str) -> str:
result = self.sentiment_analyzer(text)
return result[0]['label']
def normalize_text(self, text: str) -> str:
text = html.unescape(text)
text = ' '.join(text.split())
text = re.sub(r'[\u200b\ufeff\u200c]', '', text)
return text
def extract_metadata(self, comment: Comment) -> None:
comment.mentions = re.findall(r'@(\w+)', comment.content)
self.stats['processed_mentions'] += len(comment.mentions)
comment.hashtags = re.findall(r'#(\w+)', comment.content)
self.stats['processed_hashtags'] += len(comment.hashtags)
comment.is_verified = bool(re.search(r'✓|Подтвержденный', comment.username))
def process_comment(self, text: str, parent_id: Optional[str] = None, level: int = 0) -> Optional[Comment]:
if level > self.max_depth:
logger.warning(f"Maximum depth {self.max_depth} exceeded")
self.stats['max_depth_reached'] += 1
return None
if not text.strip():
self.stats['empty_comments'] += 1
return None
try:
match = self.pattern.match(text)
if not match:
raise ValueError(f"Could not parse comment: {text[:100]}...")
data = match.groupdict()
comment = Comment(
username=data['username'],
time=data['time'],
content=data['content'].strip(),
likes=int(data['likes'] or 0),
level=level,
parent_id=parent_id
)
if len(comment.content) > self.max_comment_length:
self.stats['truncated_comments'] += 1
comment.content = comment.content[:self.max_comment_length] + "..."
comment.sentiment = self.analyze_sentiment(comment.content)
self.extract_metadata(comment)
self.stats['total_comments'] += 1
return comment
except Exception as e:
logger.error(f"Error processing comment: {str(e)}")
comment = Comment(
username="[damaged]",
time="",
content="[Поврежденные данные]",
is_deleted=True
)
self.stats['deleted_comments'] += 1
return comment
def format_comment(self, comment: Comment, index: int) -> str:
if comment.is_deleted:
return f'{index}. "[УДАЛЕНО]" "" "" "Нравится 0"'
return (
f'{index}. "{comment.username}" "{comment.time}" '
f'"{comment.content}" "Нравится {comment.likes}" "Настроение {comment.sentiment}"'
)
def process_comments(self, text: str) -> List[str]:
self.stats = {key: 0 for key in self.stats}
text = self.normalize_text(text)
raw_comments = text.split('ОтветитьНравится')
formatted_comments = []
for i, raw_comment in enumerate(raw_comments, 1):
if not raw_comment.strip():
continue
comment = self.process_comment(raw_comment)
if comment:
formatted_comments.append(self.format_comment(comment, i))
return formatted_comments
def main():
example_text = """
user1 2 нед. This is a positive comment! Отметки "Нравится": 25
user2 3 нед. This is a negative comment! Отметки "Нравится": 5
"""
analyzer = InstagramCommentAnalyzer()
formatted_comments = analyzer.process_comments(example_text)
for formatted_comment in formatted_comments:
print(formatted_comment)
if __name__ == "__main__":
main() |