from typing import Dict, Any, List, Annotated, TypedDict, Union, Optional from langgraph.graph import Graph, END from agents import create_agents import os from dotenv import load_dotenv import json import uuid # Load environment variables load_dotenv() # Create transcripts directory if it doesn't exist TRANSCRIPTS_DIR = os.path.join(os.path.dirname(__file__), "transcripts") os.makedirs(TRANSCRIPTS_DIR, exist_ok=True) TRANSCRIPTS_FILE = os.path.join(TRANSCRIPTS_DIR, "podcasts.json") def save_transcript(podcast_script: str, user_query: str) -> None: """Save podcast transcript to JSON file.""" # Create new transcript entry transcript = { "id": str(uuid.uuid4()), "podcastScript": podcast_script, "topic": user_query } try: # Load existing transcripts if os.path.exists(TRANSCRIPTS_FILE): with open(TRANSCRIPTS_FILE, 'r') as f: transcripts = json.load(f) else: transcripts = [] # Append new transcript transcripts.append(transcript) # Save updated transcripts with open(TRANSCRIPTS_FILE, 'w') as f: json.dump(transcripts, f, indent=2) except Exception as e: print(f"Error saving transcript: {str(e)}") class AgentState(TypedDict): messages: List[Dict[str, Any]] current_agent: str debate_turns: int extractor_data: Dict[str, Any] debate_history: List[Dict[str, Any]] supervisor_notes: List[str] supervisor_chunks: List[Dict[str, List[str]]] final_podcast: Dict[str, Any] agent_type: str context: Optional[Dict[str, Any]] def create_workflow(tavily_api_key: str): # Initialize all agents agents = create_agents(tavily_api_key) # Create the graph workflow = Graph() # Define the extractor node function async def run_extractor(state: AgentState) -> Dict[str, Any]: query = state["messages"][-1]["content"] print(f"Extractor processing query: {query}") try: response = await agents["extractor"](query) print(f"Extractor response: {response}") # Update state state["extractor_data"] = response # Get initial supervisor analysis supervisor_analysis = await agents["supervisor"]({ "extractor": response, "skeptic": {"content": "Not started"}, "believer": {"content": "Not started"} }) print(f"Initial supervisor analysis: {supervisor_analysis}") state["supervisor_notes"].append(supervisor_analysis["content"]) state["supervisor_chunks"].append(supervisor_analysis.get("chunks", {})) # Move to debate phase state["current_agent"] = "debate" return state except Exception as e: print(f"Error in extractor: {str(e)}") raise Exception(f"Error in extractor: {str(e)}") # Define the debate node function async def run_debate(state: AgentState) -> Dict[str, Any]: print(f"Debate turn {state['debate_turns']}") try: if state["debate_turns"] == 0: # First turn: both agents respond to extractor print("Starting first debate turn") # If we have context, use it to inform the agents' responses context = state.get("context", {}) agent_chunks = context.get("agent_chunks", []) if context else [] # Create context-aware input for agents context_input = { "content": state["extractor_data"]["content"], "chunks": agent_chunks } skeptic_response = await agents["skeptic"](context_input) believer_response = await agents["believer"](context_input) state["debate_history"].extend([ {"speaker": "skeptic", "content": skeptic_response["content"]}, {"speaker": "believer", "content": believer_response["content"]} ]) print(f"First turn responses added: {state['debate_history'][-2:]}") else: # Alternating responses based on agent type if specified if state["agent_type"] in ["believer", "skeptic"]: current_speaker = state["agent_type"] else: # Default alternating behavior last_speaker = state["debate_history"][-1]["speaker"] current_speaker = "believer" if last_speaker == "skeptic" else "skeptic" print(f"Processing response for {current_speaker}") # Create context-aware input context = state.get("context", {}) agent_chunks = context.get("agent_chunks", []) if context else [] context_input = { "content": state["debate_history"][-1]["content"], "chunks": agent_chunks } response = await agents[current_speaker](context_input) state["debate_history"].append({ "speaker": current_speaker, "content": response["content"] }) print(f"Added response: {state['debate_history'][-1]}") # Add supervisor note and chunks supervisor_analysis = await agents["supervisor"]({ "extractor": state["extractor_data"], "skeptic": {"content": state["debate_history"][-1]["content"]}, "believer": {"content": state["debate_history"][-2]["content"] if len(state["debate_history"]) > 1 else "Not started"} }) print(f"Supervisor analysis: {supervisor_analysis}") state["supervisor_notes"].append(supervisor_analysis["content"]) state["supervisor_chunks"].append(supervisor_analysis.get("chunks", {})) state["debate_turns"] += 1 print(f"Debate turn {state['debate_turns']} completed") # End the workflow after 2 debate turns if state["debate_turns"] >= 2: state["current_agent"] = "podcast" print("Moving to podcast production") return state except Exception as e: print(f"Error in debate: {str(e)}") raise Exception(f"Error in debate: {str(e)}") async def run_podcast_producer(state: AgentState) -> Dict[str, Any]: print("Starting podcast production") try: # Create podcast from debate podcast_result = await agents["podcast_producer"]( state["debate_history"], state["supervisor_notes"], state["messages"][-1]["content"], # Pass the original user query state["supervisor_chunks"], {} # Empty quadrant analysis since we removed storage manager ) print(f"Podcast production result: {podcast_result}") # Save transcript to JSON file save_transcript( podcast_script=podcast_result["content"], user_query=state["messages"][-1]["content"] ) # Store the result state["final_podcast"] = podcast_result # End the workflow state["current_agent"] = END return state except Exception as e: print(f"Error in podcast production: {str(e)}") raise Exception(f"Error in podcast production: {str(e)}") # Add nodes to the graph workflow.add_node("extractor", run_extractor) workflow.add_node("debate", run_debate) workflow.add_node("podcast", run_podcast_producer) # Set the entry point workflow.set_entry_point("extractor") # Add edges workflow.add_edge("extractor", "debate") # Add conditional edges for debate workflow.add_conditional_edges( "debate", lambda x: "podcast" if x["debate_turns"] >= 2 else "debate" ) # Add edge from podcast to end workflow.add_edge("podcast", END) # Compile the graph return workflow.compile() async def run_workflow( graph: Graph, query: str, agent_type: str = "believer", context: Optional[Dict[str, Any]] = None ) -> Dict[str, Any]: """Run the workflow with a given query.""" # Initialize the state initial_state = { "messages": [{"role": "user", "content": query}], "current_agent": "extractor", "debate_turns": 0, "extractor_data": {}, "debate_history": [], "supervisor_notes": [], "supervisor_chunks": [], "final_podcast": {}, "agent_type": agent_type, "context": context } # Run the graph result = await graph.ainvoke(initial_state) return { "debate_history": result["debate_history"], "supervisor_notes": result["supervisor_notes"], "supervisor_chunks": result["supervisor_chunks"], "extractor_data": result["extractor_data"], "final_podcast": result["final_podcast"] }