import jwt from datetime import datetime, timedelta from pydantic import BaseModel, Field, field_validator from fastapi.security import OAuth2PasswordBearer from fastapi import HTTPException, status, Depends from pydantic import BaseModel from pydantic_settings import BaseSettings from config.logging_config import logger # Assuming this is available from typing import Optional # Centralized Settings class (can be moved to a separate config file later) class Settings(BaseSettings): api_key_secret: str = Field(..., env="API_KEY_SECRET") # Secret key for signing JWTs token_expiration_minutes: int = Field(30, env="TOKEN_EXPIRATION_MINUTES") # Default to 30 minutes llm_model_name: str = "google/gemma-3-4b-it" max_tokens: int = 512 host: str = "0.0.0.0" port: int = 7860 chat_rate_limit: str = "100/minute" speech_rate_limit: str = "5/minute" class Config: env_file = ".env" env_file_encoding = "utf-8" settings = Settings() logger.info(f"Loaded API_KEY_SECRET at startup: {settings.api_key_secret}") # Add this line # OAuth2 scheme with Bearer token oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/v1/token") # Model for token payload class TokenPayload(BaseModel): sub: str # Subject (user identifier) exp: int # Expiration timestamp # Model for token response class TokenResponse(BaseModel): access_token: str token_type: str async def create_access_token(user_id: str) -> str: """ Create a JWT access token for a given user. """ expire = datetime.utcnow() + timedelta(minutes=settings.token_expiration_minutes) payload = {"sub": user_id, "exp": expire.timestamp()} logger.info(f"Signing token with API_KEY_SECRET: {settings.api_key_secret}") # Add this line token = jwt.encode(payload, settings.api_key_secret, algorithm="HS256") logger.info(f"Generated access token for user: {user_id}") return token async def get_current_user(token: str = Depends(oauth2_scheme)) -> str: """ Validate the Bearer token and return the user ID. """ credentials_exception = HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid authentication credentials", headers={"WWW-Authenticate": "Bearer"}, ) try: logger.info(f"Verifying token with API_KEY_SECRET: {settings.api_key_secret}") # Add this line payload = jwt.decode(token, settings.api_key_secret, algorithms=["HS256"]) token_data = TokenPayload(**payload) user_id = token_data.sub if user_id is None: raise credentials_exception if datetime.utcnow().timestamp() > token_data.exp: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Token has expired", headers={"WWW-Authenticate": "Bearer"}, ) logger.info(f"Validated token for user: {user_id}") return user_id except jwt.InvalidTokenError: logger.warning(f"Invalid token attempt: {token[:10]}...") raise credentials_exception except Exception as e: logger.error(f"Token validation error: {str(e)}") raise credentials_exception # For demonstration purposes, a simple login function # In production, replace with proper user authentication (e.g., database lookup) async def login(user_id: str) -> TokenResponse: """ Generate a token for a user. In production, validate credentials here. """ # Placeholder: Assume user_id is valid; in reality, check against a database token = await create_access_token(user_id=user_id) return TokenResponse(access_token=token, token_type="bearer")