import jwt from datetime import datetime, timedelta from fastapi.security import OAuth2PasswordBearer from fastapi import HTTPException, status, Depends from pydantic import BaseModel, Field from pydantic_settings import BaseSettings from config.logging_config import logger from typing import Dict class Settings(BaseSettings): api_key_secret: str = Field(..., env="API_KEY_SECRET") token_expiration_minutes: int = Field(30, env="TOKEN_EXPIRATION_MINUTES") llm_model_name: str = "google/gemma-3-4b-it" max_tokens: int = 512 host: str = "0.0.0.0" port: int = 7860 chat_rate_limit: str = "100/minute" speech_rate_limit: str = "5/minute" external_tts_url: str = Field(..., env="EXTERNAL_TTS_URL") external_asr_url: str = Field(..., env="EXTERNAL_ASR_URL") external_text_gen_url: str = Field(..., env="EXTERNAL_TEXT_GEN_URL") external_audio_proc_url: str = Field(..., env="EXTERNAL_AUDIO_PROC_URL") class Config: env_file = ".env" env_file_encoding = "utf-8" settings = Settings() logger.info(f"Loaded API_KEY_SECRET at startup: {settings.api_key_secret}") oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/v1/token") class TokenPayload(BaseModel): sub: str exp: float class TokenResponse(BaseModel): access_token: str token_type: str # Simple in-memory user store (replace with database in production) # Format: {username: password} USERS_DB: Dict[str, str] = { "testuser": "password123", "admin": "adminpass" } class LoginRequest(BaseModel): username: str password: str async def create_access_token(user_id: str) -> str: expire = datetime.utcnow() + timedelta(minutes=settings.token_expiration_minutes) payload = {"sub": user_id, "exp": expire.timestamp()} logger.info(f"Signing token with API_KEY_SECRET: {settings.api_key_secret}") token = jwt.encode(payload, settings.api_key_secret, algorithm="HS256") logger.info(f"Generated access token for user: {user_id}") return token async def get_current_user(token: str = Depends(oauth2_scheme)) -> str: credentials_exception = HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid authentication credentials", headers={"WWW-Authenticate": "Bearer"}, ) try: logger.info(f"Received token: {token}") logger.info(f"Verifying token with API_KEY_SECRET: {settings.api_key_secret}") payload = jwt.decode(token, settings.api_key_secret, algorithms=["HS256"], options={"verify_exp": False}) logger.info(f"Decoded payload: {payload}") token_data = TokenPayload(**payload) user_id = token_data.sub if user_id is None or user_id not in USERS_DB: logger.warning(f"Invalid or unknown user: {user_id}") raise credentials_exception current_time = datetime.utcnow().timestamp() logger.info(f"Current time: {current_time}, Token exp: {token_data.exp}") if current_time > token_data.exp: logger.warning(f"Token expired: current_time={current_time}, exp={token_data.exp}") raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Token has expired", headers={"WWW-Authenticate": "Bearer"}, ) logger.info(f"Validated token for user: {user_id}") return user_id except jwt.InvalidSignatureError as e: logger.error(f"Invalid signature error: {str(e)}") raise credentials_exception except jwt.InvalidTokenError as e: logger.error(f"Other token error: {str(e)}") raise credentials_exception except Exception as e: logger.error(f"Unexpected token validation error: {str(e)}") raise credentials_exception async def login(login_request: LoginRequest) -> TokenResponse: username = login_request.username password = login_request.password if username not in USERS_DB or USERS_DB[username] != password: logger.warning(f"Login failed for user: {username}") raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid username or password") token = await create_access_token(user_id=username) return TokenResponse(access_token=token, token_type="bearer")