EmailTriggerOnChange / workflow_invocation.py
SoumyaJ's picture
Update workflow_invocation.py
412b084 verified
from workflowschema import LibraryModel
import httpx
import asyncio
import yaml
class WorkflowInvocation:
def __init__(self, payload):
self.config = self.load_config()
self.payload = payload
def load_config(self):
with open("config.yaml","r") as file:
config = yaml.safe_load(file)
return config
async def invoke_workflow(self):
try:
webHookUrl = self.config['workflow']['webhook_url']
#print(webHookUrl)
async with httpx.AsyncClient() as client:
response = await client.post(webHookUrl, json=self.payload)
response.raise_for_status()
try:
# Try to parse JSON response
json_response = response.json()
print("JSON Response:", json_response)
return json_response
except Exception as e:
# Fall back to raw text
print("Non-JSON response:", response.text)
return response.text
#print("Response status code:", response.json())
except httpx.RequestError as e:
print(f"Request error occurred: {type(e).__name__}: {str(e)}")
import traceback
traceback.print_exc()
except httpx.HTTPStatusError as e:
print(f"HTTP error occurred: {e.response.status_code} - {e.response.text}")
except Exception as e:
import traceback
print("Unexpected error:", str(e))
traceback.print_exc()