from workflowschema import LibraryModel import httpx import asyncio import yaml class WorkflowInvocation: def __init__(self, payload): self.config = self.load_config() self.payload = payload def load_config(self): with open("config.yaml","r") as file: config = yaml.safe_load(file) return config async def invoke_workflow(self): try: webHookUrl = self.config['workflow']['webhook_url'] #print(webHookUrl) async with httpx.AsyncClient() as client: response = await client.post(webHookUrl, json=self.payload) response.raise_for_status() try: # Try to parse JSON response json_response = response.json() print("JSON Response:", json_response) return json_response except Exception as e: # Fall back to raw text print("Non-JSON response:", response.text) return response.text #print("Response status code:", response.json()) except httpx.RequestError as e: print(f"Request error occurred: {type(e).__name__}: {str(e)}") import traceback traceback.print_exc() except httpx.HTTPStatusError as e: print(f"HTTP error occurred: {e.response.status_code} - {e.response.text}") except Exception as e: import traceback print("Unexpected error:", str(e)) traceback.print_exc()