Spaces:
Sleeping
Sleeping
from workflowschema import LibraryModel | |
import httpx | |
import asyncio | |
import yaml | |
class WorkflowInvocation: | |
def __init__(self, payload): | |
self.config = self.load_config() | |
self.payload = payload | |
def load_config(self): | |
with open("config.yaml","r") as file: | |
config = yaml.safe_load(file) | |
return config | |
async def invoke_workflow(self): | |
try: | |
webHookUrl = self.config['workflow']['webhook_url'] | |
#print(webHookUrl) | |
async with httpx.AsyncClient() as client: | |
response = await client.post(webHookUrl, json=self.payload) | |
response.raise_for_status() | |
try: | |
# Try to parse JSON response | |
json_response = response.json() | |
print("JSON Response:", json_response) | |
return json_response | |
except Exception as e: | |
# Fall back to raw text | |
print("Non-JSON response:", response.text) | |
return response.text | |
#print("Response status code:", response.json()) | |
except httpx.RequestError as e: | |
print(f"Request error occurred: {type(e).__name__}: {str(e)}") | |
import traceback | |
traceback.print_exc() | |
except httpx.HTTPStatusError as e: | |
print(f"HTTP error occurred: {e.response.status_code} - {e.response.text}") | |
except Exception as e: | |
import traceback | |
print("Unexpected error:", str(e)) | |
traceback.print_exc() | |