sergey21000 commited on
Commit
b541f35
·
verified ·
1 Parent(s): 2a039c9

Update utils/parser.py

Browse files
Files changed (1) hide show
  1. utils/parser.py +241 -240
utils/parser.py CHANGED
@@ -1,240 +1,241 @@
1
- import asyncio
2
- import zipfile
3
- from dataclasses import dataclass
4
- from datetime import datetime
5
- from pathlib import Path
6
- from typing import Collection
7
-
8
- import pandas as pd
9
- import gradio as gr
10
- from telethon import TelegramClient, types, errors
11
-
12
- from utils.auth import AuthState, ClientConnector
13
- from utils.validation import Validator
14
-
15
-
16
- MESSAGE_DICT = dict[str, str | int | datetime | None]
17
- DEFAULT_PARSE_KWARGS = dict(
18
- limit=None,
19
- offset_date=None,
20
- reverse=False,
21
- )
22
-
23
-
24
- @dataclass
25
- class Chat:
26
- chat: types.TLObject
27
- chat_name: str | None
28
- chat_username: str
29
- chat_type: str
30
- chat_id: int
31
-
32
- @classmethod
33
- def from_telethon_chat(cls, chat: types.TLObject, chat_username: str):
34
- chat_id = chat.id
35
- if isinstance(chat, types.User):
36
- chat_type = 'Chat'
37
- chat_name = f'{chat.first_name} {chat.last_name}'
38
- else:
39
- chat_type = 'Channel/Group'
40
- chat_name = chat.title
41
- return cls(chat, chat_name, chat_username, chat_type, chat_id)
42
-
43
- def get_chat_info(self) -> str:
44
- chat_info = f'Chat name: {self.chat_name}, Chat type: {self.chat_type}, Chat ID: {self.chat_id}'
45
- return chat_info
46
-
47
-
48
- class Parser:
49
- parse_results_dir = Path('parse_results_dir')
50
-
51
- @staticmethod
52
- def message_to_dict(message: types.Message) -> MESSAGE_DICT:
53
- text = message.text if message.text else message.message
54
- if not text:
55
- return None
56
-
57
- date = message.date
58
- sender = message.sender
59
- sender_type = type(sender).__name__
60
- chat = message._chat
61
- chat_id = chat.id
62
- chat_type = type(chat).__name__
63
- chat_name = chat.title if isinstance(chat, types.Channel) else f'{chat.first_name} {chat.last_name}'
64
-
65
- if isinstance(sender, types.User):
66
- sender_id = message.sender.id
67
- username = sender.username
68
- first_name = sender.first_name
69
- last_name = sender.last_name
70
- else:
71
- sender_id = message._sender_id
72
- username = getattr(message.sender, 'username', None)
73
- first_name = None
74
- last_name = None
75
-
76
- message_dict = {
77
- 'date': date,
78
- 'chat_type': chat_type,
79
- 'chat_name': chat_name,
80
- 'chat_id': chat_id,
81
- 'sender_type': sender_type,
82
- 'sender_username': username,
83
- 'sender_first_name': first_name,
84
- 'sender_last_name': last_name,
85
- 'sender_id': sender_id,
86
- 'text': text,
87
- }
88
- return message_dict
89
-
90
- @classmethod
91
- async def get_messages_from_chat(
92
- cls,
93
- client: TelegramClient,
94
- chat: types.TLObject,
95
- parse_chats_pb_info: str,
96
- **parse_kwargs,
97
- ) -> list[MESSAGE_DICT]:
98
-
99
- async with client:
100
- progress = gr.Progress()
101
- messages = client.iter_messages(entity=chat, **parse_kwargs)
102
- message_dicts = []
103
- message_count = 0
104
- async for message in messages:
105
- message_count += 1
106
- if message_count % 1000 == 0:
107
- await asyncio.sleep(1)
108
- message_dict = cls.message_to_dict(message)
109
- if message_dict is not None:
110
- message_dicts.append(message_dict)
111
-
112
- if message_count % 1000 == 0:
113
- await asyncio.sleep(1)
114
-
115
- if parse_kwargs['limit'] is not None:
116
- total = parse_kwargs['limit']
117
- progress(message_count / total, desc=f'{parse_chats_pb_info}, Parsing messages {message_count}/{total}')
118
- else:
119
- progress(message_count, desc=f'{parse_chats_pb_info}, Parsing messages {message_count}/?')
120
-
121
- if not parse_kwargs['reverse']:
122
- message_dicts = message_dicts[::-1]
123
- return message_dicts
124
-
125
- @classmethod
126
- async def parse_chats(
127
- cls,
128
- auth_state: AuthState,
129
- chats_list: list[Chat],
130
- api_id: str,
131
- api_hash: str,
132
- *parse_args,
133
- ) -> tuple[str, list[Path]]:
134
-
135
- cvs_paths = []
136
- parse_result = ''
137
-
138
- if len(chats_list) == 0:
139
- return 'Список чатов для парсинга пустой', cvs_paths
140
-
141
- client = ClientConnector.get_client(auth_state.get_session(), api_id, api_hash)
142
- validation_result = await Validator.validate_auth(client)
143
- if not validation_result.is_valid:
144
- return 'Клиент не авторизован', cvs_paths
145
-
146
- parse_kwargs = dict(zip(DEFAULT_PARSE_KWARGS.keys(), parse_args))
147
- progress = gr.Progress()
148
-
149
- for i, chat in enumerate(chats_list, start=1):
150
- try:
151
- parse_chats_pb_info = f'Parsing chats {i}/{len(chats_list)}'
152
- message_dicts = await cls.get_messages_from_chat(client, chat.chat, parse_chats_pb_info, **parse_kwargs)
153
- if len(message_dicts) == 0:
154
- log_msg = f'Из чата {chat.chat_username} не было извлечено ни одного сообщения'
155
- parse_result += log_msg + '\n'
156
- else:
157
- cvs_path = cls.messages_to_csv(message_dicts)
158
- cvs_paths.append(cvs_path)
159
- log_msg = f'Успешный парсинг чата {chat.chat_username}, кол-во сообщений: {len(message_dicts)}'
160
- parse_result += log_msg + '\n'
161
- except Exception as ex:
162
- log_msg = f'Ошибка при парсинге чата {chat.chat_username}, код ошибки: {ex}'
163
- parse_result += log_msg + '\n'
164
-
165
- progress(i / len(chats_list), desc=parse_chats_pb_info)
166
- return parse_result, cvs_paths
167
-
168
- @classmethod
169
- def messages_to_csv(cls, message_dicts: Collection[MESSAGE_DICT]) -> Path:
170
- df = pd.DataFrame.from_dict(message_dicts)
171
- chat_name = message_dicts[0].get('chat_name', '')
172
- cvs_path = cls.parse_results_dir / f'telegram_history_{chat_name}.csv'
173
- df.to_csv(cvs_path, index=False)
174
- return cvs_path
175
-
176
- @classmethod
177
- def zip_files(cls, file_paths: Collection[Path]) -> Path:
178
- zip_filepath = cls.parse_results_dir / 'parse_results_csv.zip'
179
- with zipfile.ZipFile(zip_filepath, 'w') as zipf:
180
- for file_path in file_paths:
181
- zipf.write(file_path, arcname=file_path)
182
- return zip_filepath
183
-
184
- @staticmethod
185
- def get_chats_info(chats_list: list[Chat]) -> str:
186
- chats_info = ''
187
- for i, chat in enumerate(chats_list, start=1):
188
- chats_info += f'{i}: ' + chat.get_chat_info() + '\n'
189
- return chats_info
190
-
191
- @staticmethod
192
- async def get_chat(client: TelegramClient, chat_username: str) -> types.TLObject:
193
- try:
194
- if client.is_connected():
195
- chat = await client.get_entity(chat_username)
196
- else:
197
- async with client:
198
- chat = await client.get_entity(chat_username)
199
- except (errors.UsernameNotOccupiedError, errors.UsernameInvalidError) as ex:
200
- log_msg = f'Чат или канал {chat_username} не найден или введен неверно'
201
- raise errors.UsernameInvalidError(log_msg)
202
- except Exception as ex:
203
- log_msg = f'Ошибка при получении объекта чата, код ошибки: {ex}'
204
- raise Exception(log_msg)
205
- return chat
206
-
207
- @classmethod
208
- async def add_chat_to_chats_list(
209
- cls,
210
- auth_state: AuthState,
211
- chats_usernames,
212
- chats_list: list[Chat],
213
- api_id: str,
214
- api_hash: str,
215
- ) -> str:
216
-
217
- if chats_usernames.strip() == '':
218
- return 'Не заданы адрес/адреса чатов для добавления'
219
-
220
- client = ClientConnector.get_client(auth_state.get_session(), api_id, api_hash)
221
- validation_result = await Validator.validate_auth(client)
222
- if not validation_result.is_valid:
223
- return 'Клиент не авторизован'
224
-
225
- for chat_username in chats_usernames.split():
226
- try:
227
- telethon_chat = await cls.get_chat(client, chat_username.strip())
228
- if not telethon_chat in chats_list:
229
- chat = Chat.from_telethon_chat(telethon_chat, chat_username)
230
- chats_list.append(chat)
231
- else:
232
- log_msg = f'Чат {chat_username} уже есть в списке'
233
- gr.Info(log_msg)
234
- except Exception as ex:
235
- log_msg = str(ex)
236
- gr.Info(log_msg)
237
- return cls.get_chats_info(chats_list)
238
-
239
-
240
- Parser.parse_results_dir.mkdir(exist_ok=True)
 
 
1
+ import asyncio
2
+ import zipfile
3
+ from dataclasses import dataclass
4
+ from datetime import datetime
5
+ from pathlib import Path
6
+ from typing import Collection
7
+
8
+ import pandas as pd
9
+ import gradio as gr
10
+ from telethon import TelegramClient, types, errors
11
+
12
+ from utils.auth import AuthState, ClientConnector
13
+ from utils.validation import Validator
14
+
15
+
16
+ MESSAGE_DICT = dict[str, str | int | datetime | None]
17
+ DEFAULT_PARSE_KWARGS = dict(
18
+ limit=None,
19
+ offset_date=None,
20
+ reverse=False,
21
+ )
22
+
23
+
24
+ @dataclass
25
+ class Chat:
26
+ chat: types.TLObject
27
+ chat_name: str | None
28
+ chat_username: str
29
+ chat_type: str
30
+ chat_id: int
31
+
32
+ @classmethod
33
+ def from_telethon_chat(cls, chat: types.TLObject, chat_username: str):
34
+ chat_id = chat.id
35
+ if isinstance(chat, types.User):
36
+ chat_type = 'Chat'
37
+ chat_name = f'{chat.first_name} {chat.last_name}'
38
+ else:
39
+ chat_type = 'Channel/Group'
40
+ chat_name = chat.title
41
+ return cls(chat, chat_name, chat_username, chat_type, chat_id)
42
+
43
+ def get_chat_info(self) -> str:
44
+ chat_info = f'Chat name: {self.chat_name}, Chat type: {self.chat_type}, Chat ID: {self.chat_id}'
45
+ return chat_info
46
+
47
+
48
+ class Parser:
49
+ parse_results_dir = Path('parse_results_dir')
50
+
51
+ @staticmethod
52
+ def message_to_dict(message: types.Message) -> MESSAGE_DICT:
53
+ text = message.text if message.text else message.message
54
+ if not text:
55
+ return None
56
+
57
+ date = message.date
58
+ sender = message.sender
59
+ sender_type = type(sender).__name__
60
+ chat = message._chat
61
+ chat_id = chat.id
62
+ chat_type = type(chat).__name__
63
+ chat_name = chat.title if isinstance(chat, types.Channel) else f'{chat.first_name} {chat.last_name}'
64
+
65
+ if isinstance(sender, types.User):
66
+ sender_id = message.sender.id
67
+ username = sender.username
68
+ first_name = sender.first_name
69
+ last_name = sender.last_name
70
+ else:
71
+ sender_id = message._sender_id
72
+ username = getattr(message.sender, 'username', None)
73
+ first_name = None
74
+ last_name = None
75
+
76
+ message_dict = {
77
+ 'date': date,
78
+ 'chat_type': chat_type,
79
+ 'chat_name': chat_name,
80
+ 'chat_id': chat_id,
81
+ 'sender_type': sender_type,
82
+ 'sender_username': username,
83
+ 'sender_first_name': first_name,
84
+ 'sender_last_name': last_name,
85
+ 'sender_id': sender_id,
86
+ 'text': text,
87
+ }
88
+ return message_dict
89
+
90
+ @classmethod
91
+ async def get_messages_from_chat(
92
+ cls,
93
+ client: TelegramClient,
94
+ chat: types.TLObject,
95
+ parse_chats_pb_info: str,
96
+ **parse_kwargs,
97
+ ) -> list[MESSAGE_DICT]:
98
+
99
+ async with client:
100
+ progress = gr.Progress()
101
+ messages = client.iter_messages(entity=chat, **parse_kwargs)
102
+ message_dicts = []
103
+ message_count = 0
104
+ async for message in messages:
105
+ message_count += 1
106
+ if message_count % 1000 == 0:
107
+ await asyncio.sleep(1)
108
+ message_dict = cls.message_to_dict(message)
109
+ if message_dict is not None:
110
+ message_dicts.append(message_dict)
111
+
112
+ if message_count % 1000 == 0:
113
+ await asyncio.sleep(1)
114
+
115
+ if parse_kwargs['limit'] is not None:
116
+ total = parse_kwargs['limit']
117
+ progress(message_count / total, desc=f'{parse_chats_pb_info}, Parsing messages {message_count}/{total}')
118
+ else:
119
+ progress(message_count, desc=f'{parse_chats_pb_info}, Parsing messages {message_count}/?')
120
+
121
+ if not parse_kwargs['reverse']:
122
+ message_dicts = message_dicts[::-1]
123
+ return message_dicts
124
+
125
+ @classmethod
126
+ async def parse_chats(
127
+ cls,
128
+ auth_state: AuthState,
129
+ chats_list: list[Chat],
130
+ api_id: str,
131
+ api_hash: str,
132
+ *parse_args,
133
+ ) -> tuple[str, list[Path]]:
134
+
135
+ cvs_paths = []
136
+ parse_result = ''
137
+
138
+ if len(chats_list) == 0:
139
+ return 'Список чатов для парсинга пустой', cvs_paths
140
+
141
+ client = ClientConnector.get_client(auth_state.get_session(), api_id, api_hash)
142
+ validation_result = await Validator.validate_auth(client)
143
+ if not validation_result.is_valid:
144
+ return 'Клиент не авторизован', cvs_paths
145
+
146
+ parse_kwargs = dict(zip(DEFAULT_PARSE_KWARGS.keys(), parse_args))
147
+ progress = gr.Progress()
148
+
149
+ for i, chat in enumerate(chats_list, start=1):
150
+ try:
151
+ parse_chats_pb_info = f'Parsing chats {i}/{len(chats_list)}'
152
+ message_dicts = await cls.get_messages_from_chat(client, chat.chat, parse_chats_pb_info, **parse_kwargs)
153
+ if len(message_dicts) == 0:
154
+ log_msg = f'Из чата {chat.chat_username} не было извлечено ни одного сообщения'
155
+ parse_result += log_msg + '\n'
156
+ else:
157
+ cvs_path = cls.messages_to_csv(message_dicts)
158
+ cvs_paths.append(cvs_path)
159
+ log_msg = f'Успешный парсинг чата {chat.chat_username}, кол-во сообщений: {len(message_dicts)}'
160
+ parse_result += log_msg + '\n'
161
+ except Exception as ex:
162
+ log_msg = f'Ошибка при парсинге чата {chat.chat_username}, код ошибки: {ex}'
163
+ parse_result += log_msg + '\n'
164
+
165
+ progress(i / len(chats_list), desc=parse_chats_pb_info)
166
+ return parse_result, cvs_paths
167
+
168
+ @classmethod
169
+ def messages_to_csv(cls, message_dicts: Collection[MESSAGE_DICT]) -> Path:
170
+ df = pd.DataFrame.from_dict(message_dicts)
171
+ df['sender_id'] = df['sender_id'].astype('Int64')
172
+ chat_name = message_dicts[0].get('chat_name', '')
173
+ cvs_path = cls.parse_results_dir / f'telegram_history_{chat_name}.csv'
174
+ df.to_csv(cvs_path, index=False)
175
+ return cvs_path
176
+
177
+ @classmethod
178
+ def zip_files(cls, file_paths: Collection[Path]) -> Path:
179
+ zip_filepath = cls.parse_results_dir / 'parse_results_csv.zip'
180
+ with zipfile.ZipFile(zip_filepath, 'w') as zipf:
181
+ for file_path in file_paths:
182
+ zipf.write(file_path, arcname=file_path)
183
+ return zip_filepath
184
+
185
+ @staticmethod
186
+ def get_chats_info(chats_list: list[Chat]) -> str:
187
+ chats_info = ''
188
+ for i, chat in enumerate(chats_list, start=1):
189
+ chats_info += f'{i}: ' + chat.get_chat_info() + '\n'
190
+ return chats_info
191
+
192
+ @staticmethod
193
+ async def get_chat(client: TelegramClient, chat_username: str) -> types.TLObject:
194
+ try:
195
+ if client.is_connected():
196
+ chat = await client.get_entity(chat_username)
197
+ else:
198
+ async with client:
199
+ chat = await client.get_entity(chat_username)
200
+ except (errors.UsernameNotOccupiedError, errors.UsernameInvalidError) as ex:
201
+ log_msg = f'Чат или канал {chat_username} не найден или введен неверно'
202
+ raise errors.UsernameInvalidError(log_msg)
203
+ except Exception as ex:
204
+ log_msg = f'Ошибка при получении объекта чата, код ошибки: {ex}'
205
+ raise Exception(log_msg)
206
+ return chat
207
+
208
+ @classmethod
209
+ async def add_chat_to_chats_list(
210
+ cls,
211
+ auth_state: AuthState,
212
+ chats_usernames,
213
+ chats_list: list[Chat],
214
+ api_id: str,
215
+ api_hash: str,
216
+ ) -> str:
217
+
218
+ if chats_usernames.strip() == '':
219
+ return 'Не заданы адрес/адреса чатов для добавления'
220
+
221
+ client = ClientConnector.get_client(auth_state.get_session(), api_id, api_hash)
222
+ validation_result = await Validator.validate_auth(client)
223
+ if not validation_result.is_valid:
224
+ return 'Клиент не авторизован'
225
+
226
+ for chat_username in chats_usernames.split():
227
+ try:
228
+ telethon_chat = await cls.get_chat(client, chat_username.strip())
229
+ if not telethon_chat in chats_list:
230
+ chat = Chat.from_telethon_chat(telethon_chat, chat_username)
231
+ chats_list.append(chat)
232
+ else:
233
+ log_msg = f'Чат {chat_username} уже есть в списке'
234
+ gr.Info(log_msg)
235
+ except Exception as ex:
236
+ log_msg = str(ex)
237
+ gr.Info(log_msg)
238
+ return cls.get_chats_info(chats_list)
239
+
240
+
241
+ Parser.parse_results_dir.mkdir(exist_ok=True)