sachin
user-add
90b38cc
raw
history blame
4.31 kB
import jwt
from datetime import datetime, timedelta
from fastapi.security import OAuth2PasswordBearer
from fastapi import HTTPException, status, Depends
from pydantic import BaseModel, Field
from pydantic_settings import BaseSettings
from config.logging_config import logger
from typing import Dict
class Settings(BaseSettings):
api_key_secret: str = Field(..., env="API_KEY_SECRET")
token_expiration_minutes: int = Field(30, env="TOKEN_EXPIRATION_MINUTES")
llm_model_name: str = "google/gemma-3-4b-it"
max_tokens: int = 512
host: str = "0.0.0.0"
port: int = 7860
chat_rate_limit: str = "100/minute"
speech_rate_limit: str = "5/minute"
external_tts_url: str = Field(..., env="EXTERNAL_TTS_URL")
external_asr_url: str = Field(..., env="EXTERNAL_ASR_URL")
external_text_gen_url: str = Field(..., env="EXTERNAL_TEXT_GEN_URL")
external_audio_proc_url: str = Field(..., env="EXTERNAL_AUDIO_PROC_URL")
class Config:
env_file = ".env"
env_file_encoding = "utf-8"
settings = Settings()
logger.info(f"Loaded API_KEY_SECRET at startup: {settings.api_key_secret}")
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/v1/token")
class TokenPayload(BaseModel):
sub: str
exp: float
class TokenResponse(BaseModel):
access_token: str
token_type: str
# Simple in-memory user store (replace with database in production)
# Format: {username: password}
USERS_DB: Dict[str, str] = {
"testuser": "password123",
"admin": "adminpass"
}
class LoginRequest(BaseModel):
username: str
password: str
async def create_access_token(user_id: str) -> str:
expire = datetime.utcnow() + timedelta(minutes=settings.token_expiration_minutes)
payload = {"sub": user_id, "exp": expire.timestamp()}
logger.info(f"Signing token with API_KEY_SECRET: {settings.api_key_secret}")
token = jwt.encode(payload, settings.api_key_secret, algorithm="HS256")
logger.info(f"Generated access token for user: {user_id}")
return token
async def get_current_user(token: str = Depends(oauth2_scheme)) -> str:
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid authentication credentials",
headers={"WWW-Authenticate": "Bearer"},
)
try:
logger.info(f"Received token: {token}")
logger.info(f"Verifying token with API_KEY_SECRET: {settings.api_key_secret}")
payload = jwt.decode(token, settings.api_key_secret, algorithms=["HS256"], options={"verify_exp": False})
logger.info(f"Decoded payload: {payload}")
token_data = TokenPayload(**payload)
user_id = token_data.sub
if user_id is None or user_id not in USERS_DB:
logger.warning(f"Invalid or unknown user: {user_id}")
raise credentials_exception
current_time = datetime.utcnow().timestamp()
logger.info(f"Current time: {current_time}, Token exp: {token_data.exp}")
if current_time > token_data.exp:
logger.warning(f"Token expired: current_time={current_time}, exp={token_data.exp}")
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Token has expired",
headers={"WWW-Authenticate": "Bearer"},
)
logger.info(f"Validated token for user: {user_id}")
return user_id
except jwt.InvalidSignatureError as e:
logger.error(f"Invalid signature error: {str(e)}")
raise credentials_exception
except jwt.InvalidTokenError as e:
logger.error(f"Other token error: {str(e)}")
raise credentials_exception
except Exception as e:
logger.error(f"Unexpected token validation error: {str(e)}")
raise credentials_exception
async def login(login_request: LoginRequest) -> TokenResponse:
username = login_request.username
password = login_request.password
if username not in USERS_DB or USERS_DB[username] != password:
logger.warning(f"Login failed for user: {username}")
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid username or password")
token = await create_access_token(user_id=username)
return TokenResponse(access_token=token, token_type="bearer")