|
import asyncio |
|
import os |
|
import logging |
|
from dataclasses import dataclass |
|
|
|
from pathlib import Path |
|
|
|
from telethon import TelegramClient, errors |
|
from telethon.sessions import SQLiteSession, MemorySession |
|
from telethon.sessions.abstract import Session |
|
|
|
from utils.validation import Validator |
|
|
|
|
|
@dataclass |
|
class AuthState: |
|
session_type: str = 'memory' |
|
session_name: str = 'telegram_api_session' |
|
memory_session: MemorySession | None = None |
|
is_logging: bool = False |
|
|
|
is_auth: bool = False |
|
need_send_code: bool = False |
|
need_verify_code: bool = False |
|
need_verify_2fa: bool = False |
|
message: str | None = None |
|
client: TelegramClient | None = None |
|
|
|
def __post_init__(self): |
|
self.session_dir = Path('sessions') |
|
self.session_dir.mkdir(exist_ok=True) |
|
|
|
def check_start_auth_status(self) -> None: |
|
loop = asyncio.new_event_loop() |
|
asyncio.set_event_loop(loop) |
|
loop.run_until_complete(self.check_is_auth()) |
|
|
|
async def check_is_auth(self) -> None: |
|
if Validator.validate_env_vars().is_valid: |
|
client = ClientConnector.get_client(self.get_session(), os.getenv('API_ID'), os.getenv('API_HASH')) |
|
validation_result = await Validator.validate_auth(client) |
|
if validation_result.is_valid: |
|
self.set_auth_success() |
|
|
|
def get_session(self) -> Session: |
|
if self.session_type == 'sqlite': |
|
session_filepath = self.session_dir / self.session_name |
|
return SQLiteSession(str(session_filepath)) |
|
elif self.session_type == 'memory': |
|
if self.memory_session is None: |
|
self.memory_session = MemorySession() |
|
return self.memory_session |
|
|
|
def change_session_type(self, session_type): |
|
if session_type != self.session_type: |
|
self.session_type = session_type |
|
|
|
def reset_state(self) -> None: |
|
defaults = self.__class__() |
|
|
|
self.is_auth = defaults.is_auth |
|
self.need_send_code = defaults.need_send_code |
|
self.need_verify_code = defaults.need_verify_code |
|
self.need_verify_2fa = defaults.need_verify_2fa |
|
self.message = defaults.message |
|
self.client = defaults.client |
|
|
|
def _log(self) -> None: |
|
if self.is_logging and self.message: |
|
logging.info(self.message) |
|
|
|
def set_auth_failed(self, message: str | None = None) -> None: |
|
if message: |
|
self.message = message |
|
self._log() |
|
|
|
def set_start_auth(self) -> None: |
|
self.reset_state() |
|
self.message = 'Начата процедура аутентификации' |
|
|
|
def set_client(self, client: TelegramClient) -> None: |
|
self.client = client |
|
if self.session_type == 'memory': |
|
self.memory_session = client.session |
|
|
|
def set_need_send_code(self) -> None: |
|
self.need_send_code = True |
|
self.message = 'Проверка соединения клиента завершена успешно. Отправка проверочного кода' |
|
self._log() |
|
|
|
def set_need_verify_code(self) -> None: |
|
self.need_verify_code = True |
|
self.message = 'Код отправлен в Telegram. Введите его в поле Проверочный код' |
|
self._log() |
|
|
|
def set_need_verify_2fa(self) -> None: |
|
self.need_verify_2fa = True |
|
self.need_verify_code = False |
|
self.message = 'Требуется 2FA-пароль. Введите его в поле Облачный пароль' |
|
self._log() |
|
|
|
def set_auth_success(self, message: str | None = None) -> None: |
|
self.is_auth = True |
|
self.need_send_code = False |
|
self.need_verify_code = False |
|
self.need_verify_2fa = False |
|
self.message = 'Клиент авторизован' if message is None else message |
|
self._log() |
|
|
|
async def delete_session(self) -> None: |
|
if self.client is not None: |
|
await ClientConnector.log_out(self.client) |
|
if self.session_type == 'sqlite': |
|
session_filepath = self.session_dir / f'{self.session_name}.session' |
|
if session_filepath.is_file(): |
|
session_filepath.unlink(missing_ok=True) |
|
elif self.session_type == 'memory': |
|
self.memory_session = None |
|
self.reset_state() |
|
self.message = 'Сессия удалена' |
|
self._log() |
|
|
|
|
|
class ClientConnector: |
|
@staticmethod |
|
def get_client(session: Session, api_id: str, api_hash: str) -> TelegramClient: |
|
client = TelegramClient(session, api_id, api_hash, system_version='4.16.30-vxCUSTOM') |
|
return client |
|
|
|
@staticmethod |
|
async def connect(client: TelegramClient) -> None: |
|
if not client.is_connected(): |
|
await client.connect() |
|
|
|
@staticmethod |
|
async def disconnect(client: TelegramClient) -> None: |
|
if client.is_connected(): |
|
await client.disconnect() |
|
|
|
@classmethod |
|
async def log_out(cls, client: TelegramClient) -> None: |
|
await cls.connect(client) |
|
await client.log_out() |
|
await cls.disconnect(client) |
|
|
|
@classmethod |
|
async def start_auth(cls, state: AuthState, api_id: str, api_hash: str) -> AuthState: |
|
if not api_id or not api_hash: |
|
message = 'Не заданы api_id и/или api_hash' |
|
state.set_auth_failed(message=message) |
|
return state |
|
state.set_start_auth() |
|
client = cls.get_client(state.get_session(), api_id, api_hash) |
|
validation_result = await Validator.validate_auth(client) |
|
if validation_result.is_valid: |
|
message = 'Клиент авторизован' |
|
state.set_auth_success(message) |
|
elif not validation_result.is_valid and validation_result.is_error: |
|
state.set_auth_failed(message=validation_result.message) |
|
elif not validation_result.is_valid and not validation_result.is_error: |
|
state.set_client(client) |
|
state.set_need_send_code() |
|
return state |
|
|
|
@classmethod |
|
async def send_code(cls, state: AuthState, phone_number: str) -> AuthState: |
|
if not state.need_send_code: |
|
return state |
|
try: |
|
await cls.connect(state.client) |
|
await state.client.send_code_request(phone_number) |
|
state.set_need_verify_code() |
|
except Exception as ex: |
|
message = f'Ошибка при отправке кода подтверждения, код ошибки: {ex}' |
|
state.set_auth_failed(message) |
|
return state |
|
|
|
@classmethod |
|
async def verify_code(cls, state: AuthState, phone_number: str, code: str) -> AuthState: |
|
if not state.need_verify_code: |
|
return state |
|
try: |
|
await state.client.sign_in(phone=phone_number, code=code) |
|
await cls.disconnect(state.client) |
|
state.set_auth_success() |
|
except errors.SessionPasswordNeededError: |
|
state.set_need_verify_2fa() |
|
except Exception as ex: |
|
message = f'Ошибка при верификации кода подтверждения, код ошибки: {ex}' |
|
state.set_auth_failed(message) |
|
return state |
|
|
|
@classmethod |
|
async def verify_2fa(cls, state: AuthState, password_2fa: str) -> AuthState: |
|
if not state.need_verify_2fa: |
|
return state |
|
try: |
|
await state.client.sign_in(password=password_2fa) |
|
state.set_auth_success() |
|
await cls.disconnect(state.client) |
|
except Exception as ex: |
|
message = f'Ошибка при верификации облачного пароля, код ошибки: {ex}' |
|
state.set_auth_failed(message) |
|
return state |
|
|
|
|