Spaces:
Sleeping
Sleeping
from fastapi import FastAPI,HTTPException | |
from fastapi.middleware.cors import CORSMiddleware | |
from fastapi.responses import JSONResponse | |
import uvicorn | |
from pydantic import BaseModel, Field | |
from workflowschema import LibraryModel | |
from workflow_invocation import WorkflowInvocation | |
import asyncio | |
import json | |
app = FastAPI() | |
app.add_middleware( | |
CORSMiddleware, | |
allow_origins=["*"], # Allows all origins | |
allow_credentials=True, | |
allow_methods=["*"], | |
allow_headers=["*"], | |
) | |
def trigger_workflow(payLoad: LibraryModel): | |
try: | |
if payLoad: | |
wf_invocation = WorkflowInvocation(payLoad.model_dump()) | |
asyncio.run(wf_invocation.invoke_workflow()) | |
return JSONResponse(content={"message": "workflow executed successfully"}, status_code=200) | |
except Exception as e: | |
raise HTTPException(status_code=500, detail=str(e)) | |
if __name__ == "__main__": | |
uvicorn.run(app, host = "0.0.0.0", port = 8000) | |
# schema_input = { | |
# "name": "Media", | |
# "action": "modified", | |
# "change_type": "MediaStatus", | |
# "changed_by": "focus", | |
# "timestamp": "2025-07-23T13:45:00Z", | |
# "change_id": 1001, | |
# "change_from": "ON", | |
# "change_to": "OFF", | |
# "unique_identifier": "library", | |
# "title":"OCA" | |
# } | |
# result = trigger_workflow(schema_input) | |
# print(json.loads(result.body)) | |