sergey21000's picture
Update utils/auth.py
87030e2 verified
import asyncio
import os
import logging
from dataclasses import dataclass
from pathlib import Path
from telethon import TelegramClient, errors
from telethon.sessions import SQLiteSession, MemorySession
from telethon.sessions.abstract import Session
from utils.validation import Validator
@dataclass
class AuthState:
session_type: str = 'memory' # sqlite, memory
session_name: str = 'telegram_api_session'
memory_session: MemorySession | None = None
is_logging: bool = False
is_auth: bool = False
need_send_code: bool = False
need_verify_code: bool = False
need_verify_2fa: bool = False
message: str | None = None
client: TelegramClient | None = None
def __post_init__(self):
self.session_dir = Path('sessions')
self.session_dir.mkdir(exist_ok=True)
def check_start_auth_status(self) -> None:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.run_until_complete(self.check_is_auth())
async def check_is_auth(self) -> None:
if Validator.validate_env_vars().is_valid:
client = ClientConnector.get_client(self.get_session(), os.getenv('API_ID'), os.getenv('API_HASH'))
validation_result = await Validator.validate_auth(client)
if validation_result.is_valid:
self.set_auth_success()
def get_session(self) -> Session:
if self.session_type == 'sqlite':
session_filepath = self.session_dir / self.session_name
return SQLiteSession(str(session_filepath))
elif self.session_type == 'memory':
if self.memory_session is None:
self.memory_session = MemorySession()
return self.memory_session
def change_session_type(self, session_type):
if session_type != self.session_type:
self.session_type = session_type
def reset_state(self) -> None:
defaults = self.__class__()
# self.__dict__.update(defaults.__dict__)
self.is_auth = defaults.is_auth
self.need_send_code = defaults.need_send_code
self.need_verify_code = defaults.need_verify_code
self.need_verify_2fa = defaults.need_verify_2fa
self.message = defaults.message
self.client = defaults.client
def _log(self) -> None:
if self.is_logging and self.message:
logging.info(self.message)
def set_auth_failed(self, message: str | None = None) -> None:
if message:
self.message = message
self._log()
def set_start_auth(self) -> None:
self.reset_state()
self.message = 'Начата процедура аутентификации'
def set_client(self, client: TelegramClient) -> None:
self.client = client
if self.session_type == 'memory':
self.memory_session = client.session
def set_need_send_code(self) -> None:
self.need_send_code = True
self.message = 'Проверка соединения клиента завершена успешно. Отправка проверочного кода'
self._log()
def set_need_verify_code(self) -> None:
self.need_verify_code = True
self.message = 'Код отправлен в Telegram. Введите его в поле Проверочный код'
self._log()
def set_need_verify_2fa(self) -> None:
self.need_verify_2fa = True
self.need_verify_code = False
self.message = 'Требуется 2FA-пароль. Введите его в поле Облачный пароль'
self._log()
def set_auth_success(self, message: str | None = None) -> None:
self.is_auth = True
self.need_send_code = False
self.need_verify_code = False
self.need_verify_2fa = False
self.message = 'Клиент авторизован' if message is None else message
self._log()
async def delete_session(self) -> None:
if self.client is not None:
await ClientConnector.log_out(self.client)
if self.session_type == 'sqlite':
session_filepath = self.session_dir / f'{self.session_name}.session'
if session_filepath.is_file():
session_filepath.unlink(missing_ok=True)
elif self.session_type == 'memory':
self.memory_session = None
self.reset_state()
self.message = 'Сессия удалена'
self._log()
class ClientConnector:
@staticmethod
def get_client(session: Session, api_id: str, api_hash: str) -> TelegramClient:
client = TelegramClient(session, api_id, api_hash, system_version='4.16.30-vxCUSTOM')
return client
@staticmethod
async def connect(client: TelegramClient) -> None:
if not client.is_connected():
await client.connect()
@staticmethod
async def disconnect(client: TelegramClient) -> None:
if client.is_connected():
await client.disconnect()
@classmethod
async def log_out(cls, client: TelegramClient) -> None:
await cls.connect(client)
await client.log_out()
await cls.disconnect(client)
@classmethod
async def start_auth(cls, state: AuthState, api_id: str, api_hash: str) -> AuthState:
if not api_id or not api_hash:
message = 'Не заданы api_id и/или api_hash'
state.set_auth_failed(message=message)
return state
state.set_start_auth()
client = cls.get_client(state.get_session(), api_id, api_hash)
validation_result = await Validator.validate_auth(client)
if validation_result.is_valid:
message = 'Клиент авторизован'
state.set_auth_success(message)
elif not validation_result.is_valid and validation_result.is_error:
state.set_auth_failed(message=validation_result.message)
elif not validation_result.is_valid and not validation_result.is_error:
state.set_client(client)
state.set_need_send_code()
return state
@classmethod
async def send_code(cls, state: AuthState, phone_number: str) -> AuthState:
if not state.need_send_code:
return state
try:
await cls.connect(state.client)
await state.client.send_code_request(phone_number)
state.set_need_verify_code()
except Exception as ex:
message = f'Ошибка при отправке кода подтверждения, код ошибки: {ex}'
state.set_auth_failed(message)
return state
@classmethod
async def verify_code(cls, state: AuthState, phone_number: str, code: str) -> AuthState:
if not state.need_verify_code:
return state
try:
await state.client.sign_in(phone=phone_number, code=code)
await cls.disconnect(state.client)
state.set_auth_success()
except errors.SessionPasswordNeededError:
state.set_need_verify_2fa()
except Exception as ex:
message = f'Ошибка при верификации кода подтверждения, код ошибки: {ex}'
state.set_auth_failed(message)
return state
@classmethod
async def verify_2fa(cls, state: AuthState, password_2fa: str) -> AuthState:
if not state.need_verify_2fa:
return state
try:
await state.client.sign_in(password=password_2fa)
state.set_auth_success()
await cls.disconnect(state.client)
except Exception as ex:
message = f'Ошибка при верификации облачного пароля, код ошибки: {ex}'
state.set_auth_failed(message)
return state