Spaces:
Sleeping
Sleeping
import asyncio | |
import zipfile | |
from dataclasses import dataclass | |
from datetime import datetime | |
from pathlib import Path | |
from typing import Collection | |
import pandas as pd | |
import gradio as gr | |
from telethon import TelegramClient, types, errors | |
from utils.auth import AuthState, ClientConnector | |
from utils.validation import Validator | |
MESSAGE_DICT = dict[str, str | int | datetime | None] | |
DEFAULT_PARSE_KWARGS = dict( | |
limit=None, | |
offset_date=None, | |
reverse=False, | |
) | |
class Chat: | |
chat: types.TLObject | |
chat_name: str | None | |
chat_username: str | |
chat_type: str | |
chat_id: int | |
def from_telethon_chat(cls, chat: types.TLObject, chat_username: str): | |
chat_id = chat.id | |
if isinstance(chat, types.User): | |
chat_type = 'Chat' | |
chat_name = f'{chat.first_name} {chat.last_name}' | |
else: | |
chat_type = 'Channel/Group' | |
chat_name = chat.title | |
return cls(chat, chat_name, chat_username, chat_type, chat_id) | |
def get_chat_info(self) -> str: | |
chat_info = f'Chat name: {self.chat_name}, Chat type: {self.chat_type}, Chat ID: {self.chat_id}' | |
return chat_info | |
class Parser: | |
parse_results_dir = Path('parse_results_dir') | |
def message_to_dict(message: types.Message) -> MESSAGE_DICT: | |
text = message.text if message.text else message.message | |
if not text: | |
return None | |
date = message.date | |
sender = message.sender | |
sender_type = type(sender).__name__ | |
chat = message._chat | |
chat_id = chat.id | |
chat_type = type(chat).__name__ | |
chat_name = chat.title if isinstance(chat, types.Channel) else f'{chat.first_name} {chat.last_name}' | |
if isinstance(sender, types.User): | |
sender_id = message.sender.id | |
username = sender.username | |
first_name = sender.first_name | |
last_name = sender.last_name | |
else: | |
sender_id = message._sender_id | |
username = getattr(message.sender, 'username', None) | |
first_name = None | |
last_name = None | |
message_dict = { | |
'date': date, | |
'chat_type': chat_type, | |
'chat_name': chat_name, | |
'chat_id': chat_id, | |
'sender_type': sender_type, | |
'sender_username': username, | |
'sender_first_name': first_name, | |
'sender_last_name': last_name, | |
'sender_id': sender_id, | |
'text': text, | |
} | |
return message_dict | |
async def get_messages_from_chat( | |
cls, | |
client: TelegramClient, | |
chat: types.TLObject, | |
parse_chats_pb_info: str, | |
**parse_kwargs, | |
) -> list[MESSAGE_DICT]: | |
async with client: | |
progress = gr.Progress() | |
messages = client.iter_messages(entity=chat, **parse_kwargs) | |
message_dicts = [] | |
message_count = 0 | |
async for message in messages: | |
message_count += 1 | |
if message_count % 1000 == 0: | |
await asyncio.sleep(1) | |
message_dict = cls.message_to_dict(message) | |
if message_dict is not None: | |
message_dicts.append(message_dict) | |
if message_count % 1000 == 0: | |
await asyncio.sleep(1) | |
if parse_kwargs['limit'] is not None: | |
total = parse_kwargs['limit'] | |
progress(message_count / total, desc=f'{parse_chats_pb_info}, Parsing messages {message_count}/{total}') | |
else: | |
progress(message_count, desc=f'{parse_chats_pb_info}, Parsing messages {message_count}/?') | |
if not parse_kwargs['reverse']: | |
message_dicts = message_dicts[::-1] | |
return message_dicts | |
async def parse_chats( | |
cls, | |
auth_state: AuthState, | |
chats_list: list[Chat], | |
api_id: str, | |
api_hash: str, | |
*parse_args, | |
) -> tuple[str, list[Path]]: | |
cvs_paths = [] | |
parse_result = '' | |
if len(chats_list) == 0: | |
return 'Список чатов для парсинга пустой', cvs_paths | |
client = ClientConnector.get_client(auth_state.get_session(), api_id, api_hash) | |
validation_result = await Validator.validate_auth(client) | |
if not validation_result.is_valid: | |
return 'Клиент не авторизован', cvs_paths | |
parse_kwargs = dict(zip(DEFAULT_PARSE_KWARGS.keys(), parse_args)) | |
progress = gr.Progress() | |
for i, chat in enumerate(chats_list, start=1): | |
try: | |
parse_chats_pb_info = f'Parsing chats {i}/{len(chats_list)}' | |
message_dicts = await cls.get_messages_from_chat(client, chat.chat, parse_chats_pb_info, **parse_kwargs) | |
if len(message_dicts) == 0: | |
log_msg = f'Из чата {chat.chat_username} не было извлечено ни одного сообщения' | |
parse_result += log_msg + '\n' | |
else: | |
cvs_path = cls.messages_to_csv(message_dicts) | |
cvs_paths.append(cvs_path) | |
log_msg = f'Успешный парсинг чата {chat.chat_username}, кол-во сообщений: {len(message_dicts)}' | |
parse_result += log_msg + '\n' | |
except Exception as ex: | |
log_msg = f'Ошибка при парсинге чата {chat.chat_username}, код ошибки: {ex}' | |
parse_result += log_msg + '\n' | |
progress(i / len(chats_list), desc=parse_chats_pb_info) | |
return parse_result, cvs_paths | |
def messages_to_csv(cls, message_dicts: Collection[MESSAGE_DICT]) -> Path: | |
df = pd.DataFrame.from_dict(message_dicts) | |
chat_name = message_dicts[0].get('chat_name', '') | |
cvs_path = cls.parse_results_dir / f'telegram_history_{chat_name}.csv' | |
df.to_csv(cvs_path, index=False) | |
return cvs_path | |
def zip_files(cls, file_paths: Collection[Path]) -> Path: | |
zip_filepath = cls.parse_results_dir / 'parse_results_csv.zip' | |
with zipfile.ZipFile(zip_filepath, 'w') as zipf: | |
for file_path in file_paths: | |
zipf.write(file_path, arcname=file_path) | |
return zip_filepath | |
def get_chats_info(chats_list: list[Chat]) -> str: | |
chats_info = '' | |
for i, chat in enumerate(chats_list, start=1): | |
chats_info += f'{i}: ' + chat.get_chat_info() + '\n' | |
return chats_info | |
async def get_chat(client: TelegramClient, chat_username: str) -> types.TLObject: | |
try: | |
if client.is_connected(): | |
chat = await client.get_entity(chat_username) | |
else: | |
async with client: | |
chat = await client.get_entity(chat_username) | |
except (errors.UsernameNotOccupiedError, errors.UsernameInvalidError) as ex: | |
log_msg = f'Чат или канал {chat_username} не найден или введен неверно' | |
raise errors.UsernameInvalidError(log_msg) | |
except Exception as ex: | |
log_msg = f'Ошибка при получении объекта чата, код ошибки: {ex}' | |
raise Exception(log_msg) | |
return chat | |
async def add_chat_to_chats_list( | |
cls, | |
auth_state: AuthState, | |
chats_usernames, | |
chats_list: list[Chat], | |
api_id: str, | |
api_hash: str, | |
) -> str: | |
if chats_usernames.strip() == '': | |
return 'Не заданы адрес/адреса чатов для добавления' | |
client = ClientConnector.get_client(auth_state.get_session(), api_id, api_hash) | |
validation_result = await Validator.validate_auth(client) | |
if not validation_result.is_valid: | |
return 'Клиент не авторизован' | |
for chat_username in chats_usernames.split(): | |
try: | |
telethon_chat = await cls.get_chat(client, chat_username.strip()) | |
if not telethon_chat in chats_list: | |
chat = Chat.from_telethon_chat(telethon_chat, chat_username) | |
chats_list.append(chat) | |
else: | |
log_msg = f'Чат {chat_username} уже есть в списке' | |
gr.Info(log_msg) | |
except Exception as ex: | |
log_msg = str(ex) | |
gr.Info(log_msg) | |
return cls.get_chats_info(chats_list) | |
Parser.parse_results_dir.mkdir(exist_ok=True) | |