Spaces:
Sleeping
Sleeping
File size: 11,724 Bytes
a288236 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 |
import asyncio
from dotenv import load_dotenv
import os
from workflow import create_workflow, run_workflow, TRANSCRIPTS_FILE
import json
from datetime import datetime
import traceback
# Load environment variables
load_dotenv()
def log_step(step: str, details: str = None):
"""Print a formatted step log"""
timestamp = datetime.now().strftime("%H:%M:%S")
print(f"\n[{timestamp}] π {step}")
if details:
print(f" {details}")
def log_agent(agent: str, message: str):
"""Print a formatted agent message"""
timestamp = datetime.now().strftime("%H:%M:%S")
agent_icons = {
"extractor": "π",
"skeptic": "π€",
"believer": "π‘",
"supervisor": "π",
"storage": "π¦",
"podcast": "ποΈ",
"error": "β",
"step": "β‘οΈ"
}
icon = agent_icons.get(agent.lower(), "π¬")
print(f"\n[{timestamp}] {icon} {agent}:")
print(f" {message}")
def check_api_keys():
"""Check if all required API keys are present"""
required_keys = {
"OPENAI_API_KEY": os.getenv("OPENAI_API_KEY"),
"ELEVEN_API_KEY": os.getenv("ELEVEN_API_KEY"),
"TAVILY_API_KEY": os.getenv("TAVILY_API_KEY")
}
missing_keys = [key for key, value in required_keys.items() if not value]
if missing_keys:
raise ValueError(f"Missing required API keys: {', '.join(missing_keys)}")
return required_keys["TAVILY_API_KEY"]
async def test_transcript_saving(workflow, query: str):
"""Test that transcripts are properly saved to podcasts.json"""
try:
# Get initial transcript count
initial_transcripts = []
if os.path.exists(TRANSCRIPTS_FILE):
with open(TRANSCRIPTS_FILE, 'r') as f:
initial_transcripts = json.load(f)
initial_count = len(initial_transcripts)
# Run workflow
result = await run_workflow(workflow, query)
# Verify transcript was saved
if not os.path.exists(TRANSCRIPTS_FILE):
return False, "Transcripts file was not created"
with open(TRANSCRIPTS_FILE, 'r') as f:
transcripts = json.load(f)
if len(transcripts) <= initial_count:
return False, "No new transcript was added"
latest_transcript = transcripts[-1]
if not all(key in latest_transcript for key in ["id", "podcastScript", "topic"]):
return False, "Transcript is missing required fields"
if latest_transcript["topic"] != query:
return False, f"Topic mismatch. Expected: {query}, Got: {latest_transcript['topic']}"
return True, "Transcript was saved successfully"
except Exception as e:
return False, f"Error in transcript test: {str(e)}\n{traceback.format_exc()}"
async def test_single_turn(workflow_graph, query: str):
"""Test a single turn of the workflow"""
result = await run_workflow(workflow_graph, query)
return len(result["debate_history"]) > 0
async def test_debate_length(workflow, query):
"""Test that debate history does not exceed 20 messages"""
result = await run_workflow(workflow, query)
return len(result["debate_history"]) <= 20
async def test_podcast_generation(workflow, query):
"""Test podcast generation functionality"""
try:
result = await run_workflow(workflow, query)
# Check for podcast data
if "final_podcast" not in result:
return False, "No podcast data in result"
podcast_data = result["final_podcast"]
# Check for errors in podcast generation
if "error" in podcast_data:
return False, f"Podcast generation error: {podcast_data['error']}"
# Verify script generation
if not podcast_data.get("content"):
return False, "No podcast script generated"
# Verify audio file generation
if not podcast_data.get("audio_file"):
return False, "No audio file generated"
# Check if audio file exists
audio_path = os.path.join(os.path.dirname(__file__), "audio_storage", podcast_data["audio_file"])
if not os.path.exists(audio_path):
return False, f"Audio file not found at {audio_path}"
# Check file size
file_size = os.path.getsize(audio_path)
if file_size == 0:
return False, "Audio file is empty"
# Check if transcript was saved
transcript_success, transcript_message = await test_transcript_saving(workflow, query)
if not transcript_success:
return False, f"Transcript saving failed: {transcript_message}"
return True, f"Podcast generated successfully (file size: {file_size} bytes)"
except Exception as e:
return False, f"Error in podcast test: {str(e)}\n{traceback.format_exc()}"
async def run_all_tests():
log_step("Running all tests")
try:
# Check API keys
tavily_api_key = check_api_keys()
# Create workflow
workflow = create_workflow(tavily_api_key)
# Test queries
queries = [
"What are the environmental impacts of electric vehicles?",
"How does artificial intelligence impact healthcare?",
"What are the pros and cons of remote work?",
"Discuss the future of space exploration"
]
results = {}
for query in queries:
try:
log_step(f"Testing query", query)
# Test transcript saving
transcript_success, transcript_message = await test_transcript_saving(workflow, query)
log_agent("step", f"Transcript test: {transcript_message}")
result = await run_workflow(workflow, query)
podcast_success, podcast_message = await test_podcast_generation(workflow, query)
results[query] = {
"success": True,
"debate_length": len(result["debate_history"]),
"supervisor_notes": len(result["supervisor_notes"]),
"transcript_saved": transcript_success,
"transcript_status": transcript_message,
"podcast_generated": podcast_success,
"podcast_status": podcast_message,
"timestamp": datetime.now().isoformat()
}
log_agent("step", f"Test completed for: {query}")
except Exception as e:
results[query] = {
"success": False,
"error": str(e),
"traceback": traceback.format_exc(),
"timestamp": datetime.now().isoformat()
}
log_agent("error", f"Test failed for: {query}\n{str(e)}")
# Save results
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
filename = f"test_results_{timestamp}.json"
with open(filename, "w") as f:
json.dump(results, f, indent=2)
log_step("Results saved", f"File: {filename}")
print("\nTest Results:")
print(json.dumps(results, indent=2))
return results
except Exception as e:
log_agent("error", f"Critical error in tests: {str(e)}\n{traceback.format_exc()}")
raise
async def test_workflow():
log_step("Starting workflow test")
try:
# Check API keys
tavily_api_key = check_api_keys()
# Create the workflow
log_step("Creating workflow graph")
workflow_graph = create_workflow(tavily_api_key)
# Test query
test_query = "Should artificial intelligence be regulated?"
log_step("Test Query", test_query)
# Run the workflow
log_step("Running workflow")
result = await run_workflow(workflow_graph, test_query)
# Test transcript saving
log_step("Testing transcript saving")
transcript_success, transcript_message = await test_transcript_saving(workflow_graph, test_query)
log_agent("step", f"Transcript test: {transcript_message}")
# Print extractor results
log_step("Information Extraction Phase")
if "extractor_data" in result:
log_agent("Extractor", result["extractor_data"].get("content", "No content"))
# Print debate history
log_step("Debate Phase")
print("\nDebate Timeline:")
for i, entry in enumerate(result["debate_history"], 1):
log_agent(entry["speaker"], entry["content"])
if i < len(result["supervisor_notes"]):
log_agent("Supervisor", f"Analysis of Turn {i}:\n {result['supervisor_notes'][i]}")
# Print final supervisor analysis
log_step("Final Supervisor Analysis")
if result["supervisor_notes"]:
log_agent("Supervisor", result["supervisor_notes"][-1])
# Print podcast results
log_step("Podcast Production Phase")
if "final_podcast" in result:
podcast_data = result["final_podcast"]
if "error" in podcast_data:
log_agent("Podcast", f"Error: {podcast_data['error']}")
else:
log_agent("Podcast", "Script:\n" + podcast_data["content"])
if podcast_data.get("audio_file"):
audio_path = os.path.join(os.path.dirname(__file__), "audio_storage", podcast_data["audio_file"])
file_size = os.path.getsize(audio_path) if os.path.exists(audio_path) else 0
log_agent("Podcast", f"Audio file saved as: {podcast_data['audio_file']} (size: {file_size} bytes)")
# Save results
log_step("Saving Results")
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
filename = f"test_results_{timestamp}.json"
with open(filename, "w") as f:
json.dump({
"timestamp": datetime.now().isoformat(),
"query": test_query,
"workflow_results": result,
"transcript_saved": transcript_success,
"transcript_status": transcript_message
}, f, indent=2)
log_step("Results Saved", f"File: {filename}")
except Exception as e:
log_step("ERROR", f"Workflow execution failed: {str(e)}")
print("\nFull traceback:")
print(traceback.format_exc())
raise
if __name__ == "__main__":
print("\n" + "="*50)
print("π€ Starting AI Debate Workflow Test")
print("="*50)
try:
asyncio.run(test_workflow())
print("\n" + "="*50)
print("β
Test Complete")
print("="*50)
# Run comprehensive tests
print("\nRunning comprehensive tests...")
asyncio.run(run_all_tests())
print("\n" + "="*50)
print("β
All Tests Complete")
print("="*50)
except Exception as e:
print("\n" + "="*50)
print(f"β Tests Failed: {str(e)}")
print("="*50)
raise |