|
import asyncio |
|
import zipfile |
|
from dataclasses import dataclass |
|
from datetime import datetime |
|
from pathlib import Path |
|
from typing import Collection |
|
|
|
import pandas as pd |
|
import gradio as gr |
|
from telethon import TelegramClient, types, errors |
|
|
|
from utils.auth import AuthState, ClientConnector |
|
from utils.validation import Validator |
|
|
|
|
|
MESSAGE_DICT = dict[str, str | int | datetime | None] |
|
DEFAULT_PARSE_KWARGS = dict( |
|
limit=None, |
|
offset_date=None, |
|
reverse=False, |
|
) |
|
|
|
|
|
@dataclass |
|
class Chat: |
|
chat: types.TLObject |
|
chat_name: str | None |
|
chat_username: str |
|
chat_type: str |
|
chat_id: int |
|
|
|
@classmethod |
|
def from_telethon_chat(cls, chat: types.TLObject, chat_username: str): |
|
chat_id = chat.id |
|
if isinstance(chat, types.User): |
|
chat_type = 'Chat' |
|
chat_name = f'{chat.first_name} {chat.last_name}' |
|
else: |
|
chat_type = 'Channel/Group' |
|
chat_name = chat.title |
|
return cls(chat, chat_name, chat_username, chat_type, chat_id) |
|
|
|
def get_chat_info(self) -> str: |
|
chat_info = f'Chat name: {self.chat_name}, Chat type: {self.chat_type}, Chat ID: {self.chat_id}' |
|
return chat_info |
|
|
|
|
|
class Parser: |
|
parse_results_dir = Path('parse_results_dir') |
|
|
|
@staticmethod |
|
def message_to_dict(message: types.Message) -> MESSAGE_DICT: |
|
text = message.text if message.text else message.message |
|
if not text: |
|
return None |
|
|
|
date = message.date |
|
sender = message.sender |
|
sender_type = type(sender).__name__ |
|
chat = message._chat |
|
chat_id = chat.id |
|
chat_type = type(chat).__name__ |
|
chat_name = chat.title if isinstance(chat, types.Channel) else f'{chat.first_name} {chat.last_name}' |
|
|
|
if isinstance(sender, types.User): |
|
sender_id = message.sender.id |
|
username = sender.username |
|
first_name = sender.first_name |
|
last_name = sender.last_name |
|
else: |
|
sender_id = message._sender_id |
|
username = getattr(message.sender, 'username', None) |
|
first_name = None |
|
last_name = None |
|
|
|
message_dict = { |
|
'date': date, |
|
'chat_type': chat_type, |
|
'chat_name': chat_name, |
|
'chat_id': chat_id, |
|
'sender_type': sender_type, |
|
'sender_username': username, |
|
'sender_first_name': first_name, |
|
'sender_last_name': last_name, |
|
'sender_id': sender_id, |
|
'text': text, |
|
} |
|
return message_dict |
|
|
|
@classmethod |
|
async def get_messages_from_chat( |
|
cls, |
|
client: TelegramClient, |
|
chat: types.TLObject, |
|
parse_chats_pb_info: str, |
|
**parse_kwargs, |
|
) -> list[MESSAGE_DICT]: |
|
|
|
async with client: |
|
progress = gr.Progress() |
|
messages = client.iter_messages(entity=chat, **parse_kwargs) |
|
message_dicts = [] |
|
message_count = 0 |
|
async for message in messages: |
|
message_count += 1 |
|
if message_count % 1000 == 0: |
|
await asyncio.sleep(1) |
|
message_dict = cls.message_to_dict(message) |
|
if message_dict is not None: |
|
message_dicts.append(message_dict) |
|
|
|
if message_count % 1000 == 0: |
|
await asyncio.sleep(1) |
|
|
|
if parse_kwargs['limit'] is not None: |
|
total = parse_kwargs['limit'] |
|
progress(message_count / total, desc=f'{parse_chats_pb_info}, Parsing messages {message_count}/{total}') |
|
else: |
|
progress(message_count, desc=f'{parse_chats_pb_info}, Parsing messages {message_count}/?') |
|
|
|
if not parse_kwargs['reverse']: |
|
message_dicts = message_dicts[::-1] |
|
return message_dicts |
|
|
|
@classmethod |
|
async def parse_chats( |
|
cls, |
|
auth_state: AuthState, |
|
chats_list: list[Chat], |
|
api_id: str, |
|
api_hash: str, |
|
*parse_args, |
|
) -> tuple[str, list[Path]]: |
|
|
|
cvs_paths = [] |
|
parse_result = '' |
|
|
|
if len(chats_list) == 0: |
|
return 'Список чатов для парсинга пустой', cvs_paths |
|
|
|
client = ClientConnector.get_client(auth_state.get_session(), api_id, api_hash) |
|
validation_result = await Validator.validate_auth(client) |
|
if not validation_result.is_valid: |
|
return 'Клиент не авторизован', cvs_paths |
|
|
|
parse_kwargs = dict(zip(DEFAULT_PARSE_KWARGS.keys(), parse_args)) |
|
progress = gr.Progress() |
|
|
|
for i, chat in enumerate(chats_list, start=1): |
|
try: |
|
parse_chats_pb_info = f'Parsing chats {i}/{len(chats_list)}' |
|
message_dicts = await cls.get_messages_from_chat(client, chat.chat, parse_chats_pb_info, **parse_kwargs) |
|
if len(message_dicts) == 0: |
|
log_msg = f'Из чата {chat.chat_username} не было извлечено ни одного сообщения' |
|
parse_result += log_msg + '\n' |
|
else: |
|
cvs_path = cls.messages_to_csv(message_dicts) |
|
cvs_paths.append(cvs_path) |
|
log_msg = f'Успешный парсинг чата {chat.chat_username}, кол-во сообщений: {len(message_dicts)}' |
|
parse_result += log_msg + '\n' |
|
except Exception as ex: |
|
log_msg = f'Ошибка при парсинге чата {chat.chat_username}, код ошибки: {ex}' |
|
parse_result += log_msg + '\n' |
|
|
|
progress(i / len(chats_list), desc=parse_chats_pb_info) |
|
return parse_result, cvs_paths |
|
|
|
@classmethod |
|
def messages_to_csv(cls, message_dicts: Collection[MESSAGE_DICT]) -> Path: |
|
df = pd.DataFrame.from_dict(message_dicts) |
|
df['sender_id'] = df['sender_id'].astype('Int64') |
|
chat_name = message_dicts[0].get('chat_name', '') |
|
cvs_path = cls.parse_results_dir / f'telegram_history_{chat_name}.csv' |
|
df.to_csv(cvs_path, index=False) |
|
return cvs_path |
|
|
|
@classmethod |
|
def zip_files(cls, file_paths: Collection[Path]) -> Path: |
|
zip_filepath = cls.parse_results_dir / 'parse_results_csv.zip' |
|
with zipfile.ZipFile(zip_filepath, 'w') as zipf: |
|
for file_path in file_paths: |
|
zipf.write(file_path, arcname=file_path) |
|
return zip_filepath |
|
|
|
@staticmethod |
|
def get_chats_info(chats_list: list[Chat]) -> str: |
|
chats_info = '' |
|
for i, chat in enumerate(chats_list, start=1): |
|
chats_info += f'{i}: ' + chat.get_chat_info() + '\n' |
|
return chats_info |
|
|
|
@staticmethod |
|
async def get_chat(client: TelegramClient, chat_username: str) -> types.TLObject: |
|
try: |
|
if client.is_connected(): |
|
chat = await client.get_entity(chat_username) |
|
else: |
|
async with client: |
|
chat = await client.get_entity(chat_username) |
|
except (errors.UsernameNotOccupiedError, errors.UsernameInvalidError) as ex: |
|
log_msg = f'Чат или канал {chat_username} не найден или введен неверно' |
|
raise errors.UsernameInvalidError(log_msg) |
|
except Exception as ex: |
|
log_msg = f'Ошибка при получении объекта чата, код ошибки: {ex}' |
|
raise Exception(log_msg) |
|
return chat |
|
|
|
@classmethod |
|
async def add_chat_to_chats_list( |
|
cls, |
|
auth_state: AuthState, |
|
chats_usernames, |
|
chats_list: list[Chat], |
|
api_id: str, |
|
api_hash: str, |
|
) -> str: |
|
|
|
if chats_usernames.strip() == '': |
|
return 'Не заданы адрес/адреса чатов для добавления' |
|
|
|
client = ClientConnector.get_client(auth_state.get_session(), api_id, api_hash) |
|
validation_result = await Validator.validate_auth(client) |
|
if not validation_result.is_valid: |
|
return 'Клиент не авторизован' |
|
|
|
for chat_username in chats_usernames.split(): |
|
try: |
|
telethon_chat = await cls.get_chat(client, chat_username.strip()) |
|
if not telethon_chat in chats_list: |
|
chat = Chat.from_telethon_chat(telethon_chat, chat_username) |
|
chats_list.append(chat) |
|
else: |
|
log_msg = f'Чат {chat_username} уже есть в списке' |
|
gr.Info(log_msg) |
|
except Exception as ex: |
|
log_msg = str(ex) |
|
gr.Info(log_msg) |
|
return cls.get_chats_info(chats_list) |
|
|
|
|
|
Parser.parse_results_dir.mkdir(exist_ok=True) |
|
|