File size: 7,389 Bytes
2c5fbe9 60e12de 5be6938 60e12de 5be6938 be283ee 60e12de be283ee 60e12de 55ab780 60e12de 5be6938 60e12de 5be6938 2c5fbe9 5be6938 2c5fbe9 60e12de 5be6938 60e12de 2c5fbe9 60e12de 5be6938 60e12de 5be6938 60e12de 5be6938 60e12de 5be6938 60e12de 2c5fbe9 60e12de 2c5fbe9 60e12de |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 |
from transformers import pipeline
from dataclasses import dataclass, field
from typing import List, Optional, Dict
import re
from datetime import datetime
import logging
import html
from uuid import uuid4
import torch
# Настройка логирования
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
@dataclass
class Comment:
id: str = field(default_factory=lambda: str(uuid4()))
username: str = ""
time: str = ""
content: str = ""
likes: int = 0
level: int = 0
parent_id: Optional[str] = None
replies: List['Comment'] = field(default_factory=list)
is_verified: bool = False
mentions: List[str] = field(default_factory=list)
hashtags: List[str] = field(default_factory=list)
is_deleted: bool = False
sentiment: Optional[str] = None
def __post_init__(self):
if len(self.content) > 2200:
logger.warning(f"Comment content exceeds 2200 characters for user {self.username}")
self.content = self.content[:2200] + "..."
class InstagramCommentAnalyzer:
COMMENT_PATTERN = r'''
(?P<username>[\w.-]+)\s+
(?P<time>\d+\s+нед\.)
(?P<content>.*?)
(?:Отметки\s*"Нравится":\s*(?P<likes>\d+))?
(?:Ответить)?(?:Показать\sперевод)?(?:Нравится)?
'''
def __init__(self, max_depth: int = 10, max_comment_length: int = 2200):
self.check_dependencies()
self.max_depth = max_depth
self.max_comment_length = max_comment_length
self.pattern = re.compile(self.COMMENT_PATTERN, re.VERBOSE | re.DOTALL)
self.comments: List[Comment] = []
self.stats: Dict[str, int] = {
'total_comments': 0,
'deleted_comments': 0,
'empty_comments': 0,
'max_depth_reached': 0,
'truncated_comments': 0,
'processed_mentions': 0,
'processed_hashtags': 0
}
self.sentiment_analyzer = self.load_sentiment_model()
def check_dependencies(self):
required_packages = ['torch', 'transformers', 'numpy']
for package in required_packages:
try:
__import__(package)
except ImportError:
logger.error(f"Required package {package} is not installed")
raise
def load_sentiment_model(self):
try:
device = "cuda" if torch.cuda.is_available() else "cpu"
logger.info(f"Using device: {device}")
return pipeline(
"sentiment-analysis",
model="distilbert-base-uncased-finetuned-sst-2-english",
device=device
)
except Exception as e:
logger.error(f"Model loading failed: {str(e)}")
raise
def analyze_sentiment(self, text: str) -> str:
try:
result = self.sentiment_analyzer(text)
return result[0]['label']
except Exception as e:
logger.error(f"Sentiment analysis failed: {str(e)}")
return "UNKNOWN"
def normalize_text(self, text: str) -> str:
text = html.unescape(text)
text = ' '.join(text.split())
text = re.sub(r'[\u200b\ufeff\u200c]', '', text)
return text
def extract_metadata(self, comment: Comment) -> None:
try:
comment.mentions = re.findall(r'@(\w+)', comment.content)
self.stats['processed_mentions'] += len(comment.mentions)
comment.hashtags = re.findall(r'#(\w+)', comment.content)
self.stats['processed_hashtags'] += len(comment.hashtags)
comment.is_verified = bool(re.search(r'✓|Подтвержденный', comment.username))
except Exception as e:
logger.error(f"Metadata extraction failed: {str(e)}")
def process_comment(self, text: str, parent_id: Optional[str] = None, level: int = 0) -> Optional[Comment]:
if level > self.max_depth:
logger.warning(f"Maximum depth {self.max_depth} exceeded")
self.stats['max_depth_reached'] += 1
return None
if not text.strip():
self.stats['empty_comments'] += 1
return None
try:
match = self.pattern.match(text)
if not match:
raise ValueError(f"Could not parse comment: {text[:100]}...")
data = match.groupdict()
comment = Comment(
username=data['username'],
time=data['time'],
content=data['content'].strip(),
likes=int(data['likes'] or 0),
level=level,
parent_id=parent_id
)
if len(comment.content) > self.max_comment_length:
self.stats['truncated_comments'] += 1
comment.content = comment.content[:self.max_comment_length] + "..."
comment.sentiment = self.analyze_sentiment(comment.content)
self.extract_metadata(comment)
self.stats['total_comments'] += 1
return comment
except Exception as e:
logger.error(f"Error processing comment: {str(e)}")
self.stats['deleted_comments'] += 1
return Comment(
username="[damaged]",
time="",
content="[Поврежденные данные]",
is_deleted=True
)
def format_comment(self, comment: Comment, index: int) -> str:
try:
if comment.is_deleted:
return f'{index}. "[УДАЛЕНО]" "" "" "Нравится 0"'
return (
f'{index}. "{comment.username}" "{comment.time}" '
f'"{comment.content}" "Нравится {comment.likes}" "Настроение {comment.sentiment}"'
)
except Exception as e:
logger.error(f"Error formatting comment: {str(e)}")
return f'{index}. "[ОШИБКА ФОРМАТИРОВАНИЯ]"'
def process_comments(self, text: str) -> List[str]:
try:
self.stats = {key: 0 for key in self.stats}
text = self.normalize_text(text)
raw_comments = text.split('ОтветитьНравится')
formatted_comments = []
for i, raw_comment in enumerate(raw_comments, 1):
if not raw_comment.strip():
continue
comment = self.process_comment(raw_comment)
if comment:
formatted_comments.append(self.format_comment(comment, i))
return formatted_comments
except Exception as e:
logger.error(f"Error processing comments: {str(e)}")
return ["[ОШИБКА ОБРАБОТКИ КОММЕНТАРИЕВ]"]
def main():
example_text = """
user1 2 нед. This is a positive comment! Отметки "Нравится": 25
user2 3 нед. This is a negative comment! Отметки "Нравится": 5
"""
analyzer = InstagramCommentAnalyzer()
formatted_comments = analyzer.process_comments(example_text)
for formatted_comment in formatted_comments:
print(formatted_comment)
if __name__ == "__main__":
main() |