|
|
|
import re |
|
import emoji |
|
import statistics |
|
from collections import Counter |
|
from typing import Dict, List, Tuple, Optional |
|
import logging |
|
from io import StringIO |
|
import csv |
|
|
|
logging.basicConfig(level=logging.INFO) |
|
logger = logging.getLogger(__name__) |
|
|
|
class TextAnalyzer: |
|
"""Класс для базового анализа текста""" |
|
@staticmethod |
|
def clean_text(text: str) -> str: |
|
return re.sub(r'\s+', ' ', text).strip() |
|
|
|
@staticmethod |
|
def count_emojis(text: str) -> int: |
|
return len([c for c in text if c in emoji.EMOJI_DATA]) |
|
|
|
@staticmethod |
|
def extract_mentions(text: str) -> List[str]: |
|
return re.findall(r'@[\w\.]+', text) |
|
|
|
@staticmethod |
|
def get_words(text: str) -> List[str]: |
|
return [w for w in re.findall(r'\w+', text.lower()) if len(w) > 2] |
|
|
|
class SentimentAnalyzer: |
|
"""Класс для анализа тональности""" |
|
POSITIVE_INDICATORS = { |
|
'emoji': ['🔥', '❤️', '👍', '😊', '💪', '👏', '🎉', '♥️', '😍', '🙏'], |
|
'words': ['круто', 'супер', 'класс', 'огонь', 'пушка', 'отлично', 'здорово', |
|
'прекрасно', 'молодец', 'красота', 'спасибо', 'топ', 'лучший', |
|
'amazing', 'wonderful', 'great', 'perfect', 'love', 'beautiful'] |
|
} |
|
|
|
NEGATIVE_INDICATORS = { |
|
'emoji': ['👎', '😢', '😞', '😠', '😡', '💔', '😕', '😑'], |
|
'words': ['плохо', 'ужас', 'отстой', 'фу', 'жесть', 'ужасно', |
|
'разочарован', 'печаль', 'грустно', 'bad', 'worst', |
|
'terrible', 'awful', 'sad', 'disappointed'] |
|
} |
|
|
|
@classmethod |
|
def analyze(cls, text: str) -> str: |
|
text_lower = text.lower() |
|
pos_count = sum(1 for ind in cls.POSITIVE_INDICATORS['emoji'] + cls.POSITIVE_INDICATORS['words'] |
|
if ind in text_lower) |
|
neg_count = sum(1 for ind in cls.NEGATIVE_INDICATORS['emoji'] + cls.NEGATIVE_INDICATORS['words'] |
|
if ind in text_lower) |
|
|
|
exclamation_boost = text.count('!') * 0.5 |
|
if pos_count > neg_count: |
|
pos_count += exclamation_boost |
|
elif neg_count > pos_count: |
|
neg_count += exclamation_boost |
|
|
|
return 'positive' if pos_count > neg_count else 'negative' if neg_count > pos_count else 'neutral' |
|
|
|
class CommentExtractor: |
|
"""Класс для извлечения данных из комментариев""" |
|
PATTERNS = { |
|
'username': [ |
|
r"Фото профиля ([^\n]+)", |
|
r"^([^\s]+)\s+", |
|
r"@([^\s]+)\s+" |
|
], |
|
'time': [ |
|
r"(\d+)\s*(?:ч|нед)\.", |
|
r"(\d+)\s*(?:h|w)", |
|
r"(\d+)\s*(?:час|hour|week)" |
|
], |
|
'likes': [ |
|
r"(\d+) отметк[аи] \"Нравится\"", |
|
r"Нравится: (\d+)", |
|
r"(\d+) отметка \"Нравится\"", |
|
r"\"Нравится\": (\d+)", |
|
r"likes?: (\d+)" |
|
], |
|
'metadata': [ |
|
r"Фото профиля [^\n]+\n", |
|
r"\d+\s*(?:ч|нед|h|w|час|hour|week)\.", |
|
r"Нравится:?\s*\d+", |
|
r"\d+ отметк[аи] \"Нравится\"", |
|
r"Ответить", |
|
r"Показать перевод", |
|
r"Скрыть все ответы", |
|
r"Смотреть все ответы \(\d+\)" |
|
] |
|
} |
|
|
|
@classmethod |
|
def extract_data(cls, comment_text: str) -> Tuple[Optional[str], Optional[str], int, float]: |
|
try: |
|
|
|
username = None |
|
for pattern in cls.PATTERNS['username']: |
|
if match := re.search(pattern, comment_text): |
|
username = match.group(1).strip() |
|
break |
|
|
|
if not username: |
|
return None, None, 0, 0 |
|
|
|
|
|
comment = comment_text |
|
for pattern in cls.PATTERNS['metadata'] + [username]: |
|
comment = re.sub(pattern, '', comment) |
|
comment = TextAnalyzer.clean_text(comment) |
|
|
|
|
|
weeks = 0 |
|
for pattern in cls.PATTERNS['time']: |
|
if match := re.search(pattern, comment_text): |
|
time_value = int(match.group(1)) |
|
if any(unit in comment_text.lower() for unit in ['нед', 'w', 'week']): |
|
weeks = time_value |
|
else: |
|
weeks = time_value / (24 * 7) |
|
break |
|
|
|
|
|
likes = 0 |
|
for pattern in cls.PATTERNS['likes']: |
|
if match := re.search(pattern, comment_text): |
|
likes = int(match.group(1)) |
|
break |
|
|
|
return username, comment, likes, weeks |
|
|
|
except Exception as e: |
|
logger.error(f"Error extracting comment data: {e}") |
|
return None, None, 0, 0 |
|
|
|
class StatsCalculator: |
|
"""Класс для расчета статистики""" |
|
@staticmethod |
|
def calculate_period_stats(weeks: List[float], likes: List[str], sentiments: List[str]) -> Dict: |
|
if not weeks: |
|
return {} |
|
|
|
earliest_week = max(weeks) |
|
latest_week = min(weeks) |
|
week_range = earliest_week - latest_week |
|
|
|
period_length = week_range / 3 if week_range > 0 else 1 |
|
engagement_periods = { |
|
'early': [], |
|
'middle': [], |
|
'late': [] |
|
} |
|
|
|
for i, week in enumerate(weeks): |
|
if week >= earliest_week - period_length: |
|
engagement_periods['early'].append(i) |
|
elif week >= earliest_week - 2 * period_length: |
|
engagement_periods['middle'].append(i) |
|
else: |
|
engagement_periods['late'].append(i) |
|
|
|
return { |
|
period: { |
|
'comments': len(indices), |
|
'avg_likes': sum(int(likes[i]) for i in indices) / len(indices) if indices else 0, |
|
'sentiment_ratio': sum(1 for i in indices if sentiments[i] == 'positive') / len(indices) if indices else 0 |
|
} |
|
for period, indices in engagement_periods.items() |
|
} |
|
|
|
def analyze_post(content_type: str, link_to_post: str, post_likes: int, post_date: str, |
|
description: str, comment_count: int, all_comments: str) -> Tuple[str, str, str, str, str]: |
|
"""Основная функция анализа поста""" |
|
try: |
|
|
|
comment_patterns = '|'.join([ |
|
r"(?=Фото профиля)", |
|
r"(?=\n\s*[a-zA-Z0-9._]+\s+[^\n]+\n)", |
|
r"(?=^[a-zA-Z0-9._]+\s+[^\n]+\n)", |
|
r"(?=@[a-zA-Z0-9._]+\s+[^\n]+\n)" |
|
]) |
|
comments_blocks = [block.strip() for block in re.split(comment_patterns, all_comments) |
|
if block and block.strip() and 'Скрыто алгоритмами Instagram' not in block] |
|
|
|
|
|
data = [CommentExtractor.extract_data(block) for block in comments_blocks] |
|
valid_data = [(u, c, l, w) for u, c, l, w in data if all((u, c))] |
|
|
|
if not valid_data: |
|
return "No comments found", "", "", "", "0" |
|
|
|
usernames, comments, likes, weeks = zip(*valid_data) |
|
likes = [str(l) for l in likes] |
|
|
|
|
|
comment_stats = { |
|
'lengths': [len(c) for c in comments], |
|
'words': [len(TextAnalyzer.get_words(c)) for c in comments], |
|
'emojis': sum(TextAnalyzer.count_emojis(c) for c in comments), |
|
'mentions': [m for c in comments for m in TextAnalyzer.extract_mentions(c)], |
|
'sentiments': [SentimentAnalyzer.analyze(c) for c in comments] |
|
} |
|
|
|
|
|
basic_stats = { |
|
'total_comments': len(comments), |
|
'avg_length': statistics.mean(comment_stats['lengths']), |
|
'median_length': statistics.median(comment_stats['lengths']), |
|
'avg_words': statistics.mean(comment_stats['words']), |
|
'total_likes': sum(map(int, likes)), |
|
'avg_likes': statistics.mean(map(int, likes)) |
|
} |
|
|
|
|
|
period_stats = StatsCalculator.calculate_period_stats(weeks, likes, comment_stats['sentiments']) |
|
|
|
|
|
csv_data = create_csv_report(content_type, link_to_post, post_likes, basic_stats, |
|
comment_stats, period_stats, usernames, comment_stats['mentions']) |
|
|
|
analytics_summary = create_text_report(basic_stats, comment_stats, period_stats, csv_data) |
|
|
|
return ( |
|
analytics_summary, |
|
"\n".join(usernames), |
|
"\n".join(comments), |
|
"\n".join(likes), |
|
str(basic_stats['total_likes']) |
|
) |
|
|
|
except Exception as e: |
|
logger.error(f"Error in analyze_post: {e}", exc_info=True) |
|
return f"Error: {str(e)}", "", "", "", "0" |
|
|
|
def create_csv_report(content_type, link, post_likes, basic_stats, comment_stats, period_stats, usernames, mentions): |
|
"""Создание CSV отчета""" |
|
csv_data = { |
|
'metadata': { |
|
'content_type': content_type, |
|
'link': link, |
|
'post_likes': post_likes |
|
}, |
|
'basic_stats': basic_stats, |
|
'sentiment_stats': dict(Counter(comment_stats['sentiments'])), |
|
'period_analysis': period_stats, |
|
'top_users': dict(Counter(usernames).most_common(5)), |
|
'top_mentioned': dict(Counter(mentions).most_common(5)) |
|
} |
|
|
|
output = StringIO() |
|
writer = csv.writer(output) |
|
for section, data in csv_data.items(): |
|
writer.writerow([section]) |
|
for key, value in data.items(): |
|
writer.writerow([key, value]) |
|
writer.writerow([]) |
|
return output.getvalue() |
|
|
|
def create_text_report(basic_stats, comment_stats, period_stats, csv_data): |
|
"""Создание текстового отчета""" |
|
sentiment_dist = Counter(comment_stats['sentiments']) |
|
return ( |
|
f"CSV DATA:\n{csv_data}\n\n" |
|
f"СТАТИСТИКА:\n" |
|
f"- Всего комментариев: {basic_stats['total_comments']}\n" |
|
f"- Среднее лайков: {basic_stats['avg_likes']:.1f}\n" |
|
f"АНАЛИЗ КОНТЕНТА:\n" |
|
f"- Средняя длина: {basic_stats['avg_length']:.1f}\n" |
|
f"- Медиана длины: {basic_stats['median_length']}\n" |
|
f"- Среднее слов: {basic_stats['avg_words']:.1f}\n" |
|
f"- Эмодзи: {comment_stats['emojis']}\n" |
|
f"ТОНАЛЬНОСТЬ:\n" |
|
f"- Позитив: {sentiment_dist['positive']}\n" |
|
f"- Нейтрально: {sentiment_dist['neutral']}\n" |
|
f"- Негатив: {sentiment_dist['negative']}\n" |
|
) |
|
|
|
|
|
import gradio as gr |
|
|
|
iface = gr.Interface( |
|
fn=analyze_post, |
|
inputs=[ |
|
gr.Radio(choices=["Photo", "Video"], label="Content Type", value="Photo"), |
|
gr.Textbox(label="Link to Post"), |
|
gr.Number(label="Likes", value=0), |
|
gr.Textbox(label="Post Date"), |
|
gr.Textbox(label="Description", lines=3), |
|
gr.Number(label="Total Comment Count", value=0), |
|
gr.Textbox(label="All Comments", lines=10) |
|
], |
|
outputs=[ |
|
gr.Textbox(label="Analytics Summary", lines=20), |
|
gr.Textbox(label="Usernames"), |
|
gr.Textbox(label="Comments"), |
|
gr.Textbox(label="Likes Chronology"), |
|
gr.Textbox(label="Total Likes on Comments") |
|
], |
|
title="Enhanced Instagram Comment Analyzer", |
|
description="Анализатор комментариев Instagram с расширенной аналитикой" |
|
) |
|
|
|
if __name__ == "__main__": |
|
iface.launch() |