File size: 7,890 Bytes
87030e2
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
import asyncio
import os
import logging
from dataclasses import dataclass

from pathlib import Path

from telethon import TelegramClient, errors
from telethon.sessions import SQLiteSession, MemorySession
from telethon.sessions.abstract import Session

from utils.validation import Validator


@dataclass
class AuthState:
    session_type: str = 'memory'  # sqlite, memory
    session_name: str = 'telegram_api_session'
    memory_session: MemorySession | None = None
    is_logging: bool = False

    is_auth: bool = False
    need_send_code: bool = False
    need_verify_code: bool = False
    need_verify_2fa: bool = False
    message: str | None = None
    client: TelegramClient | None = None

    def __post_init__(self):
        self.session_dir = Path('sessions')
        self.session_dir.mkdir(exist_ok=True)

    def check_start_auth_status(self) -> None:
        loop = asyncio.new_event_loop()
        asyncio.set_event_loop(loop)
        loop.run_until_complete(self.check_is_auth())

    async def check_is_auth(self) -> None:
        if Validator.validate_env_vars().is_valid:
            client = ClientConnector.get_client(self.get_session(), os.getenv('API_ID'), os.getenv('API_HASH'))
            validation_result = await Validator.validate_auth(client)
            if validation_result.is_valid:
                self.set_auth_success()

    def get_session(self) -> Session:
        if self.session_type == 'sqlite':
            session_filepath = self.session_dir / self.session_name
            return SQLiteSession(str(session_filepath))
        elif self.session_type == 'memory':
            if self.memory_session is None:
                self.memory_session = MemorySession()
            return self.memory_session

    def change_session_type(self, session_type):
        if session_type != self.session_type:
            self.session_type = session_type

    def reset_state(self) -> None:
        defaults = self.__class__()
        # self.__dict__.update(defaults.__dict__)
        self.is_auth = defaults.is_auth
        self.need_send_code = defaults.need_send_code
        self.need_verify_code = defaults.need_verify_code
        self.need_verify_2fa = defaults.need_verify_2fa
        self.message = defaults.message
        self.client = defaults.client

    def _log(self) -> None:
        if self.is_logging and self.message:
            logging.info(self.message)

    def set_auth_failed(self, message: str | None = None) -> None:
        if message:
            self.message = message
        self._log()

    def set_start_auth(self) -> None:
        self.reset_state()
        self.message = 'Начата процедура аутентификации'

    def set_client(self, client: TelegramClient) -> None:
        self.client = client
        if self.session_type == 'memory':
            self.memory_session = client.session

    def set_need_send_code(self) -> None:
        self.need_send_code = True
        self.message = 'Проверка соединения клиента завершена успешно. Отправка проверочного кода'
        self._log()

    def set_need_verify_code(self) -> None:
        self.need_verify_code = True
        self.message = 'Код отправлен в Telegram. Введите его в поле Проверочный код'
        self._log()

    def set_need_verify_2fa(self) -> None:
        self.need_verify_2fa = True
        self.need_verify_code = False
        self.message = 'Требуется 2FA-пароль. Введите его в поле Облачный пароль'
        self._log()

    def set_auth_success(self, message: str | None = None) -> None:
        self.is_auth = True
        self.need_send_code = False
        self.need_verify_code = False
        self.need_verify_2fa = False
        self.message = 'Клиент авторизован' if message is None else message
        self._log()

    async def delete_session(self) -> None:
        if self.client is not None:
            await ClientConnector.log_out(self.client)
        if self.session_type == 'sqlite':
            session_filepath = self.session_dir / f'{self.session_name}.session'
            if session_filepath.is_file():
                session_filepath.unlink(missing_ok=True)
        elif self.session_type == 'memory':
            self.memory_session = None
        self.reset_state()
        self.message = 'Сессия удалена'
        self._log()


class ClientConnector:
    @staticmethod
    def get_client(session: Session, api_id: str, api_hash: str) -> TelegramClient:
        client = TelegramClient(session, api_id, api_hash, system_version='4.16.30-vxCUSTOM')
        return client
    
    @staticmethod
    async def connect(client: TelegramClient) -> None:
        if not client.is_connected():
            await client.connect()

    @staticmethod
    async def disconnect(client: TelegramClient) -> None:
        if client.is_connected():
            await client.disconnect()

    @classmethod
    async def log_out(cls, client: TelegramClient) -> None:
        await cls.connect(client)
        await client.log_out()
        await cls.disconnect(client)

    @classmethod
    async def start_auth(cls, state: AuthState, api_id: str, api_hash: str) -> AuthState:
        if not api_id or not api_hash:
            message = 'Не заданы api_id и/или api_hash'
            state.set_auth_failed(message=message)
            return state
        state.set_start_auth()
        client = cls.get_client(state.get_session(), api_id, api_hash)
        validation_result = await Validator.validate_auth(client)
        if validation_result.is_valid:
            message = 'Клиент авторизован'
            state.set_auth_success(message)
        elif not validation_result.is_valid and validation_result.is_error:
            state.set_auth_failed(message=validation_result.message)
        elif not validation_result.is_valid and not validation_result.is_error:
            state.set_client(client)
            state.set_need_send_code()
        return state

    @classmethod
    async def send_code(cls, state: AuthState, phone_number: str) -> AuthState:
        if not state.need_send_code:
            return state
        try:
            await cls.connect(state.client)
            await state.client.send_code_request(phone_number)
            state.set_need_verify_code()
        except Exception as ex:
            message = f'Ошибка при отправке кода подтверждения, код ошибки: {ex}'
            state.set_auth_failed(message)
        return state

    @classmethod
    async def verify_code(cls, state: AuthState, phone_number: str, code: str) -> AuthState:
        if not state.need_verify_code:
            return state
        try:
            await state.client.sign_in(phone=phone_number, code=code)
            await cls.disconnect(state.client)
            state.set_auth_success()
        except errors.SessionPasswordNeededError:
            state.set_need_verify_2fa()
        except Exception as ex:
            message = f'Ошибка при верификации кода подтверждения, код ошибки: {ex}'
            state.set_auth_failed(message)
        return state

    @classmethod
    async def verify_2fa(cls, state: AuthState, password_2fa: str) -> AuthState:
        if not state.need_verify_2fa:
            return state
        try:
            await state.client.sign_in(password=password_2fa)
            state.set_auth_success()
            await cls.disconnect(state.client)
        except Exception as ex:
            message = f'Ошибка при верификации облачного пароля, код ошибки: {ex}'
            state.set_auth_failed(message)
        return state