SoumyaJ's picture
Upload 5 files
79ff0e6 verified
raw
history blame
1.46 kB
from fastapi import FastAPI,HTTPException
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse
import uvicorn
from pydantic import BaseModel, Field
from workflowschema import LibraryModel
from workflow_invocation import WorkflowInvocation
import asyncio
import json
app = FastAPI()
app.add_middleware(
CORSMiddleware,
allow_origins=["*"], # Allows all origins
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
@app.post("/api/v1/trigger_email_workflow")
def trigger_workflow(payLoad: LibraryModel):
try:
if payLoad:
wf_invocation = WorkflowInvocation(payLoad.model_dump())
asyncio.run(wf_invocation.invoke_workflow())
return JSONResponse(content={"message": "workflow executed successfully"}, status_code=200)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
if __name__ == "__main__":
uvicorn.run(app, host = "0.0.0.0", port = 8000)
# schema_input = {
# "name": "Media",
# "action": "modified",
# "change_type": "MediaStatus",
# "changed_by": "focus",
# "timestamp": "2025-07-23T13:45:00Z",
# "change_id": 1001,
# "change_from": "ON",
# "change_to": "OFF",
# "unique_identifier": "library",
# "title":"OCA"
# }
# result = trigger_workflow(schema_input)
# print(json.loads(result.body))