SoumyaJ's picture
Upload 5 files
79ff0e6 verified
from fastapi import FastAPI,HTTPException
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse
import uvicorn
from pydantic import BaseModel, Field
from workflowschema import LibraryModel
from workflow_invocation import WorkflowInvocation
import asyncio
import json
app = FastAPI()
app.add_middleware(
CORSMiddleware,
allow_origins=["*"], # Allows all origins
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
@app.post("/api/v1/trigger_email_workflow")
def trigger_workflow(payLoad: LibraryModel):
try:
if payLoad:
wf_invocation = WorkflowInvocation(payLoad.model_dump())
asyncio.run(wf_invocation.invoke_workflow())
return JSONResponse(content={"message": "workflow executed successfully"}, status_code=200)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
if __name__ == "__main__":
uvicorn.run(app, host = "0.0.0.0", port = 8000)
# schema_input = {
# "name": "Media",
# "action": "modified",
# "change_type": "MediaStatus",
# "changed_by": "focus",
# "timestamp": "2025-07-23T13:45:00Z",
# "change_id": 1001,
# "change_from": "ON",
# "change_to": "OFF",
# "unique_identifier": "library",
# "title":"OCA"
# }
# result = trigger_workflow(schema_input)
# print(json.loads(result.body))