sergey21000 commited on
Commit
87030e2
·
verified ·
1 Parent(s): 8127fac

Update utils/auth.py

Browse files
Files changed (1) hide show
  1. utils/auth.py +205 -205
utils/auth.py CHANGED
@@ -1,205 +1,205 @@
1
- import asyncio
2
- import os
3
- import logging
4
- from dataclasses import dataclass
5
-
6
- from pathlib import Path
7
-
8
- from telethon import TelegramClient, errors
9
- from telethon.sessions import SQLiteSession, MemorySession
10
- from telethon.sessions.abstract import Session
11
-
12
- from utils.validation import Validator
13
-
14
-
15
- @dataclass
16
- class AuthState:
17
- session_type: str = 'sqlite'
18
- session_name: str = 'telegram_api_session'
19
- memory_session: MemorySession | None = None
20
- is_logging: bool = False
21
-
22
- is_auth: bool = False
23
- need_send_code: bool = False
24
- need_verify_code: bool = False
25
- need_verify_2fa: bool = False
26
- message: str | None = None
27
- client: TelegramClient | None = None
28
-
29
- def __post_init__(self):
30
- self.session_dir = Path('sessions')
31
- self.session_dir.mkdir(exist_ok=True)
32
-
33
- def check_start_auth_status(self) -> None:
34
- loop = asyncio.new_event_loop()
35
- asyncio.set_event_loop(loop)
36
- loop.run_until_complete(self.check_is_auth())
37
-
38
- async def check_is_auth(self) -> None:
39
- if Validator.validate_env_vars().is_valid:
40
- client = ClientConnector.get_client(self.get_session(), os.getenv('API_ID'), os.getenv('API_HASH'))
41
- validation_result = await Validator.validate_auth(client)
42
- if validation_result.is_valid:
43
- self.set_auth_success()
44
-
45
- def get_session(self) -> Session:
46
- if self.session_type == 'sqlite':
47
- session_filepath = self.session_dir / self.session_name
48
- return SQLiteSession(str(session_filepath))
49
- elif self.session_type == 'memory':
50
- if self.memory_session is None:
51
- self.memory_session = MemorySession()
52
- return self.memory_session
53
-
54
- def change_session_type(self, session_type):
55
- if session_type != self.session_type:
56
- self.session_type = session_type
57
-
58
- def reset_state(self) -> None:
59
- defaults = self.__class__()
60
- # self.__dict__.update(defaults.__dict__)
61
- self.is_auth = defaults.is_auth
62
- self.need_send_code = defaults.need_send_code
63
- self.need_verify_code = defaults.need_verify_code
64
- self.need_verify_2fa = defaults.need_verify_2fa
65
- self.message = defaults.message
66
- self.client = defaults.client
67
-
68
- def _log(self) -> None:
69
- if self.is_logging and self.message:
70
- logging.info(self.message)
71
-
72
- def set_auth_failed(self, message: str | None = None) -> None:
73
- if message:
74
- self.message = message
75
- self._log()
76
-
77
- def set_start_auth(self) -> None:
78
- self.reset_state()
79
- self.message = 'Начата процедура аутентификации'
80
-
81
- def set_client(self, client: TelegramClient) -> None:
82
- self.client = client
83
- if self.session_type == 'memory':
84
- self.memory_session = client.session
85
-
86
- def set_need_send_code(self) -> None:
87
- self.need_send_code = True
88
- self.message = 'Проверка соединения клиента завершена успешно. Отправка проверочного кода'
89
- self._log()
90
-
91
- def set_need_verify_code(self) -> None:
92
- self.need_verify_code = True
93
- self.message = 'Код отправлен в Telegram. Введите его в поле Проверочный код'
94
- self._log()
95
-
96
- def set_need_verify_2fa(self) -> None:
97
- self.need_verify_2fa = True
98
- self.need_verify_code = False
99
- self.message = 'Требуется 2FA-пароль. Введите его в поле Облачный пароль'
100
- self._log()
101
-
102
- def set_auth_success(self, message: str | None = None) -> None:
103
- self.is_auth = True
104
- self.need_send_code = False
105
- self.need_verify_code = False
106
- self.need_verify_2fa = False
107
- self.message = 'Клиент авторизован' if message is None else message
108
- self._log()
109
-
110
- async def delete_session(self) -> None:
111
- if self.client is not None:
112
- await ClientConnector.log_out(self.client)
113
- if self.session_type == 'sqlite':
114
- session_filepath = self.session_dir / f'{self.session_name}.session'
115
- if session_filepath.is_file():
116
- session_filepath.unlink(missing_ok=True)
117
- elif self.session_type == 'memory':
118
- self.memory_session = None
119
- self.reset_state()
120
- self.message = 'Сессия удалена'
121
- self._log()
122
-
123
-
124
- class ClientConnector:
125
- @staticmethod
126
- def get_client(session: Session, api_id: str, api_hash: str) -> TelegramClient:
127
- client = TelegramClient(session, api_id, api_hash, system_version='4.16.30-vxCUSTOM')
128
- return client
129
-
130
- @staticmethod
131
- async def connect(client: TelegramClient) -> None:
132
- if not client.is_connected():
133
- await client.connect()
134
-
135
- @staticmethod
136
- async def disconnect(client: TelegramClient) -> None:
137
- if client.is_connected():
138
- await client.disconnect()
139
-
140
- @classmethod
141
- async def log_out(cls, client: TelegramClient) -> None:
142
- await cls.connect(client)
143
- await client.log_out()
144
- await cls.disconnect(client)
145
-
146
- @classmethod
147
- async def start_auth(cls, state: AuthState, api_id: str, api_hash: str) -> AuthState:
148
- if not api_id or not api_hash:
149
- message = 'Не заданы api_id и/или api_hash'
150
- state.set_auth_failed(message=message)
151
- return state
152
- state.set_start_auth()
153
- client = cls.get_client(state.get_session(), api_id, api_hash)
154
- validation_result = await Validator.validate_auth(client)
155
- if validation_result.is_valid:
156
- message = 'Клиент авторизован'
157
- state.set_auth_success(message)
158
- elif not validation_result.is_valid and validation_result.is_error:
159
- state.set_auth_failed(message=validation_result.message)
160
- elif not validation_result.is_valid and not validation_result.is_error:
161
- state.set_client(client)
162
- state.set_need_send_code()
163
- return state
164
-
165
- @classmethod
166
- async def send_code(cls, state: AuthState, phone_number: str) -> AuthState:
167
- if not state.need_send_code:
168
- return state
169
- try:
170
- await cls.connect(state.client)
171
- await state.client.send_code_request(phone_number)
172
- state.set_need_verify_code()
173
- except Exception as ex:
174
- message = f'Ошибка при отправке кода подтверждения, код ошибки: {ex}'
175
- state.set_auth_failed(message)
176
- return state
177
-
178
- @classmethod
179
- async def verify_code(cls, state: AuthState, phone_number: str, code: str) -> AuthState:
180
- if not state.need_verify_code:
181
- return state
182
- try:
183
- await state.client.sign_in(phone=phone_number, code=code)
184
- await cls.disconnect(state.client)
185
- state.set_auth_success()
186
- except errors.SessionPasswordNeededError:
187
- state.set_need_verify_2fa()
188
- except Exception as ex:
189
- message = f'Ошибка при верификации кода подтверждения, код ошибки: {ex}'
190
- state.set_auth_failed(message)
191
- return state
192
-
193
- @classmethod
194
- async def verify_2fa(cls, state: AuthState, password_2fa: str) -> AuthState:
195
- if not state.need_verify_2fa:
196
- return state
197
- try:
198
- await state.client.sign_in(password=password_2fa)
199
- state.set_auth_success()
200
- await cls.disconnect(state.client)
201
- except Exception as ex:
202
- message = f'Ошибка при верификации облачного пароля, код ошибки: {ex}'
203
- state.set_auth_failed(message)
204
- return state
205
-
 
1
+ import asyncio
2
+ import os
3
+ import logging
4
+ from dataclasses import dataclass
5
+
6
+ from pathlib import Path
7
+
8
+ from telethon import TelegramClient, errors
9
+ from telethon.sessions import SQLiteSession, MemorySession
10
+ from telethon.sessions.abstract import Session
11
+
12
+ from utils.validation import Validator
13
+
14
+
15
+ @dataclass
16
+ class AuthState:
17
+ session_type: str = 'memory' # sqlite, memory
18
+ session_name: str = 'telegram_api_session'
19
+ memory_session: MemorySession | None = None
20
+ is_logging: bool = False
21
+
22
+ is_auth: bool = False
23
+ need_send_code: bool = False
24
+ need_verify_code: bool = False
25
+ need_verify_2fa: bool = False
26
+ message: str | None = None
27
+ client: TelegramClient | None = None
28
+
29
+ def __post_init__(self):
30
+ self.session_dir = Path('sessions')
31
+ self.session_dir.mkdir(exist_ok=True)
32
+
33
+ def check_start_auth_status(self) -> None:
34
+ loop = asyncio.new_event_loop()
35
+ asyncio.set_event_loop(loop)
36
+ loop.run_until_complete(self.check_is_auth())
37
+
38
+ async def check_is_auth(self) -> None:
39
+ if Validator.validate_env_vars().is_valid:
40
+ client = ClientConnector.get_client(self.get_session(), os.getenv('API_ID'), os.getenv('API_HASH'))
41
+ validation_result = await Validator.validate_auth(client)
42
+ if validation_result.is_valid:
43
+ self.set_auth_success()
44
+
45
+ def get_session(self) -> Session:
46
+ if self.session_type == 'sqlite':
47
+ session_filepath = self.session_dir / self.session_name
48
+ return SQLiteSession(str(session_filepath))
49
+ elif self.session_type == 'memory':
50
+ if self.memory_session is None:
51
+ self.memory_session = MemorySession()
52
+ return self.memory_session
53
+
54
+ def change_session_type(self, session_type):
55
+ if session_type != self.session_type:
56
+ self.session_type = session_type
57
+
58
+ def reset_state(self) -> None:
59
+ defaults = self.__class__()
60
+ # self.__dict__.update(defaults.__dict__)
61
+ self.is_auth = defaults.is_auth
62
+ self.need_send_code = defaults.need_send_code
63
+ self.need_verify_code = defaults.need_verify_code
64
+ self.need_verify_2fa = defaults.need_verify_2fa
65
+ self.message = defaults.message
66
+ self.client = defaults.client
67
+
68
+ def _log(self) -> None:
69
+ if self.is_logging and self.message:
70
+ logging.info(self.message)
71
+
72
+ def set_auth_failed(self, message: str | None = None) -> None:
73
+ if message:
74
+ self.message = message
75
+ self._log()
76
+
77
+ def set_start_auth(self) -> None:
78
+ self.reset_state()
79
+ self.message = 'Начата процедура аутентификации'
80
+
81
+ def set_client(self, client: TelegramClient) -> None:
82
+ self.client = client
83
+ if self.session_type == 'memory':
84
+ self.memory_session = client.session
85
+
86
+ def set_need_send_code(self) -> None:
87
+ self.need_send_code = True
88
+ self.message = 'Проверка соединения клиента завершена успешно. Отправка проверочного кода'
89
+ self._log()
90
+
91
+ def set_need_verify_code(self) -> None:
92
+ self.need_verify_code = True
93
+ self.message = 'Код отправлен в Telegram. Введите его в поле Проверочный код'
94
+ self._log()
95
+
96
+ def set_need_verify_2fa(self) -> None:
97
+ self.need_verify_2fa = True
98
+ self.need_verify_code = False
99
+ self.message = 'Требуется 2FA-пароль. Введите его в поле Облачный пароль'
100
+ self._log()
101
+
102
+ def set_auth_success(self, message: str | None = None) -> None:
103
+ self.is_auth = True
104
+ self.need_send_code = False
105
+ self.need_verify_code = False
106
+ self.need_verify_2fa = False
107
+ self.message = 'Клиент авторизован' if message is None else message
108
+ self._log()
109
+
110
+ async def delete_session(self) -> None:
111
+ if self.client is not None:
112
+ await ClientConnector.log_out(self.client)
113
+ if self.session_type == 'sqlite':
114
+ session_filepath = self.session_dir / f'{self.session_name}.session'
115
+ if session_filepath.is_file():
116
+ session_filepath.unlink(missing_ok=True)
117
+ elif self.session_type == 'memory':
118
+ self.memory_session = None
119
+ self.reset_state()
120
+ self.message = 'Сессия удалена'
121
+ self._log()
122
+
123
+
124
+ class ClientConnector:
125
+ @staticmethod
126
+ def get_client(session: Session, api_id: str, api_hash: str) -> TelegramClient:
127
+ client = TelegramClient(session, api_id, api_hash, system_version='4.16.30-vxCUSTOM')
128
+ return client
129
+
130
+ @staticmethod
131
+ async def connect(client: TelegramClient) -> None:
132
+ if not client.is_connected():
133
+ await client.connect()
134
+
135
+ @staticmethod
136
+ async def disconnect(client: TelegramClient) -> None:
137
+ if client.is_connected():
138
+ await client.disconnect()
139
+
140
+ @classmethod
141
+ async def log_out(cls, client: TelegramClient) -> None:
142
+ await cls.connect(client)
143
+ await client.log_out()
144
+ await cls.disconnect(client)
145
+
146
+ @classmethod
147
+ async def start_auth(cls, state: AuthState, api_id: str, api_hash: str) -> AuthState:
148
+ if not api_id or not api_hash:
149
+ message = 'Не заданы api_id и/или api_hash'
150
+ state.set_auth_failed(message=message)
151
+ return state
152
+ state.set_start_auth()
153
+ client = cls.get_client(state.get_session(), api_id, api_hash)
154
+ validation_result = await Validator.validate_auth(client)
155
+ if validation_result.is_valid:
156
+ message = 'Клиент авторизован'
157
+ state.set_auth_success(message)
158
+ elif not validation_result.is_valid and validation_result.is_error:
159
+ state.set_auth_failed(message=validation_result.message)
160
+ elif not validation_result.is_valid and not validation_result.is_error:
161
+ state.set_client(client)
162
+ state.set_need_send_code()
163
+ return state
164
+
165
+ @classmethod
166
+ async def send_code(cls, state: AuthState, phone_number: str) -> AuthState:
167
+ if not state.need_send_code:
168
+ return state
169
+ try:
170
+ await cls.connect(state.client)
171
+ await state.client.send_code_request(phone_number)
172
+ state.set_need_verify_code()
173
+ except Exception as ex:
174
+ message = f'Ошибка при отправке кода подтверждения, код ошибки: {ex}'
175
+ state.set_auth_failed(message)
176
+ return state
177
+
178
+ @classmethod
179
+ async def verify_code(cls, state: AuthState, phone_number: str, code: str) -> AuthState:
180
+ if not state.need_verify_code:
181
+ return state
182
+ try:
183
+ await state.client.sign_in(phone=phone_number, code=code)
184
+ await cls.disconnect(state.client)
185
+ state.set_auth_success()
186
+ except errors.SessionPasswordNeededError:
187
+ state.set_need_verify_2fa()
188
+ except Exception as ex:
189
+ message = f'Ошибка при верификации кода подтверждения, код ошибки: {ex}'
190
+ state.set_auth_failed(message)
191
+ return state
192
+
193
+ @classmethod
194
+ async def verify_2fa(cls, state: AuthState, password_2fa: str) -> AuthState:
195
+ if not state.need_verify_2fa:
196
+ return state
197
+ try:
198
+ await state.client.sign_in(password=password_2fa)
199
+ state.set_auth_success()
200
+ await cls.disconnect(state.client)
201
+ except Exception as ex:
202
+ message = f'Ошибка при верификации облачного пароля, код ошибки: {ex}'
203
+ state.set_auth_failed(message)
204
+ return state
205
+