# main.py from fastapi import FastAPI, HTTPException, BackgroundTasks from fastapi.middleware.cors import CORSMiddleware from pydantic import BaseModel from typing import Dict, Optional import uuid from datetime import datetime, timedelta import asyncio import random app = FastAPI(title="Kairos News API", version="1.0") # Enable CORS app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_methods=["*"], allow_headers=["*"], ) # In-memory database simulation jobs_db: Dict[str, Dict] = {} class PostRequest(BaseModel): query: str topic: str date: str # Format: "YYYY/MM to YYYY/MM" class JobStatus(BaseModel): id: str status: str # "processing", "completed", "failed" created_at: datetime completed_at: Optional[datetime] request: PostRequest result: Optional[Dict] @app.post("/index", response_model=JobStatus) async def create_job(request: PostRequest, background_tasks: BackgroundTasks): """Create a new processing job""" job_id = str(uuid.uuid4()) # Store initial job data jobs_db[job_id] = { "status": "processing", "created_at": datetime.now(), "completed_at": None, "request": request.dict(), "result": None } # Simulate background processing background_tasks.add_task(process_job, job_id) return { "id": job_id, "status": "processing", "created_at": jobs_db[job_id]["created_at"], "completed_at": None, "request": request, "result": None } @app.get("/loading", response_model=JobStatus) async def get_job_status(id: str): """Check job status with timeout simulation""" if id not in jobs_db: raise HTTPException(status_code=404, detail="Job not found") job = jobs_db[id] # Simulate random processing time (3-25 seconds) elapsed = datetime.now() - job["created_at"] if elapsed < timedelta(seconds=3): await asyncio.sleep(1) # Artificial delay # 10% chance of failure for demonstration if random.random() < 0.1 and job["status"] == "processing": job["status"] = "failed" job["result"] = {"error": "Random processing failure"} return { "id": id, "status": job["status"], "created_at": job["created_at"], "completed_at": job["completed_at"], "request": job["request"], "result": job["result"] } async def process_job(job_id: str): """Background task to simulate processing""" await asyncio.sleep(random.uniform(3, 10)) # Random processing time if job_id in jobs_db: jobs_db[job_id]["status"] = "completed" jobs_db[job_id]["completed_at"] = datetime.now() jobs_db[job_id]["result"] = { "query": jobs_db[job_id]["request"]["query"], "topic": jobs_db[job_id]["request"]["topic"], "date_range": jobs_db[job_id]["request"]["date"], "analysis": f"Processed results for {jobs_db[job_id]['request']['query']}", "sources": ["Source A", "Source B", "Source C"], "summary": "This is a generated summary based on your query." } @app.get("/jobs") async def list_jobs(): """Debug endpoint to view all jobs""" return jobs_db