from typing import Annotated, Optional from fastapi import APIRouter, Body, Depends, Form, HTTPException from datetime import timedelta import common.auth as auth router = APIRouter(prefix="/auth", tags=["Auth"]) def authenticate_user(username: str, password: str): """Проверяет, существует ли пользователь и правильный ли пароль.""" user = next((u for u in auth.USERS if u.username == username), None) if not user or user.password != password: raise HTTPException(status_code=401, detail="Неверный логин или пароль") return user def generate_access_token(username: str): """Генерирует токен для аутентифицированного пользователя.""" access_token_expires = timedelta(minutes=auth.ACCESS_TOKEN_EXPIRE_MINUTES) return auth.create_access_token(data={"sub": username}, expires_delta=access_token_expires) async def login_common(username: str, password: str): """Общий метод аутентификации.""" user = authenticate_user(username, password) access_token = generate_access_token(user.username) return {"access_token": access_token, "token_type": "bearer"} @router.post("/login", summary="Авторизация через JSON") async def login_json(request: auth.LoginRequest = Body(...)): """Принимает JSON-запросы.""" return await login_common(request.username, request.password) @router.post("/login/token", summary="Авторизация через Form-Data") async def login_form(username: str = Form(...), password: str = Form(...)): """Принимает Form-Data (x-www-form-urlencoded).""" return await login_common(username, password) @router.post("/checktoken", summary="Проверяет, аутентифицирован ли пользователь") async def check_token(current_user: Annotated[auth.User, Depends(auth.get_current_user)]): return {"current_user": current_user.username}