Spaces:
Sleeping
Sleeping
from langchain_openai import ChatOpenAI | |
from langchain.prompts import ChatPromptTemplate, MessagesPlaceholder | |
from langchain_community.agent_toolkits.load_tools import load_tools | |
from langchain_community.tools.tavily_search.tool import TavilySearchResults | |
from langchain_community.tools import ElevenLabsText2SpeechTool | |
import tiktoken | |
from typing import Dict, Any, List, Optional | |
import os | |
from dotenv import load_dotenv | |
from datetime import datetime | |
import logging | |
import json | |
from langchain.text_splitter import RecursiveCharacterTextSplitter | |
import numpy as np | |
from langchain.schema import SystemMessage, HumanMessage, AIMessage | |
from langchain.output_parsers import PydanticOutputParser | |
from pydantic import BaseModel, Field | |
from utils import save_transcript | |
# Configure logging | |
logging.basicConfig( | |
level=logging.INFO, | |
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', | |
handlers=[ | |
logging.FileHandler('logs/agents.log'), | |
logging.StreamHandler() | |
] | |
) | |
# Create loggers for each agent | |
extractor_logger = logging.getLogger('ExtractorAgent') | |
skeptic_logger = logging.getLogger('SkepticAgent') | |
believer_logger = logging.getLogger('BelieverAgent') | |
supervisor_logger = logging.getLogger('SupervisorAgent') | |
podcast_logger = logging.getLogger('PodcastProducerAgent') | |
# Load environment variables | |
load_dotenv() | |
# Get API keys | |
openai_api_key = os.getenv("OPENAI_API_KEY") | |
eleven_api_key = os.getenv("ELEVEN_API_KEY") | |
if not openai_api_key: | |
raise ValueError("OPENAI_API_KEY not found in environment variables") | |
if not eleven_api_key: | |
raise ValueError("ELEVEN_API_KEY not found in environment variables") | |
# Initialize the base LLM | |
base_llm = ChatOpenAI( | |
model_name="gpt-3.5-turbo", | |
temperature=0.7, | |
openai_api_key=openai_api_key | |
) | |
class ExtractorOutput(BaseModel): | |
content: str = Field(description="The extracted and refined query") | |
key_points: List[str] = Field(description="Key points extracted from the query") | |
class AgentResponse(BaseModel): | |
content: str = Field(description="The agent's response") | |
chunks: Optional[List[Dict[str, str]]] = Field(description="Relevant context chunks used", default=None) | |
class SupervisorOutput(BaseModel): | |
content: str = Field(description="The supervisor's analysis") | |
chunks: Dict[str, List[str]] = Field(description="Quadrant-based chunks of the analysis") | |
class PodcastOutput(BaseModel): | |
title: str = Field(description="Title of the podcast episode") | |
description: str = Field(description="Description of the episode") | |
script: str = Field(description="The podcast script") | |
summary: str = Field(description="A brief summary of the episode") | |
duration_minutes: int = Field(description="Estimated duration in minutes") | |
class ExtractorAgent: | |
def __init__(self, tavily_api_key: str): | |
self.search_tool = TavilySearchResults( | |
api_key=tavily_api_key, | |
max_results=5 | |
) | |
self.prompt = ChatPromptTemplate.from_messages([ | |
("system", """You are an expert information extractor. Your role is to: | |
1. Extract relevant information from search results | |
2. Organize the information in a clear, structured way | |
3. Focus on factual, verifiable information | |
4. Cite sources when possible"""), | |
("human", "{input}") | |
]) | |
self.chain = self.prompt | base_llm | |
async def __call__(self, query: str) -> Dict[str, Any]: | |
try: | |
# Log the incoming query | |
extractor_logger.info(f"Processing query: {query}") | |
try: | |
# Search using Tavily | |
search_results = await self.search_tool.ainvoke(query) | |
extractor_logger.debug(f"Search results: {json.dumps(search_results, indent=2)}") | |
except Exception as e: | |
extractor_logger.error(f"Error in Tavily search: {str(e)}", exc_info=True) | |
raise Exception(f"Tavily search failed: {str(e)}") | |
# Format the results | |
if isinstance(search_results, list): | |
formatted_results = f"Search results for: {query}\n" + "\n".join( | |
[str(result) for result in search_results] | |
) | |
else: | |
formatted_results = f"Search results for: {query}\n{search_results}" | |
try: | |
# Generate response using the chain | |
response = await self.chain.ainvoke({"input": formatted_results}) | |
extractor_logger.info(f"Generated response: {response.content}") | |
except Exception as e: | |
extractor_logger.error(f"Error in LLM chain: {str(e)}", exc_info=True) | |
raise Exception(f"LLM chain failed: {str(e)}") | |
return { | |
"type": "extractor", | |
"content": response.content, | |
"raw_results": search_results | |
} | |
except Exception as e: | |
extractor_logger.error(f"Error in ExtractorAgent: {str(e)}", exc_info=True) | |
raise Exception(f"Error in extractor: {str(e)}") | |
class SkepticAgent: | |
def __init__(self): | |
self.prompt = ChatPromptTemplate.from_messages([ | |
("system", """You are a critical thinker engaging in a thoughtful discussion. While maintaining a balanced perspective, you should: | |
- Analyze potential challenges and limitations | |
- Consider real-world implications | |
- Support arguments with evidence and examples | |
- Maintain a respectful and constructive tone | |
- Raise important considerations | |
If provided with context information, use it to inform your response while maintaining your analytical perspective. | |
Focus on examining risks and important questions from the context. | |
Keep your responses concise and focused on the topic at hand."""), | |
("human", """Context information: | |
{chunks} | |
Question/Topic: | |
{input}""") | |
]) | |
self.chain = self.prompt | base_llm | |
async def __call__(self, input_data: Dict[str, Any]) -> Dict[str, Any]: | |
skeptic_logger.info(f"Processing input: {input_data['content']}") | |
chunks = input_data.get("chunks", []) | |
chunks_text = "\n".join(chunks) if chunks else "No additional context provided." | |
response = await self.chain.ainvoke({ | |
"input": input_data["content"], | |
"chunks": chunks_text | |
}) | |
skeptic_logger.info(f"Generated response: {response.content}") | |
return {"type": "skeptic", "content": response.content} | |
class BelieverAgent: | |
def __init__(self): | |
self.prompt = ChatPromptTemplate.from_messages([ | |
("system", """You are an optimistic thinker engaging in a thoughtful discussion. While maintaining a balanced perspective, you should: | |
- Highlight opportunities and potential benefits | |
- Share innovative solutions and possibilities | |
- Support arguments with evidence and examples | |
- Maintain a constructive and forward-thinking tone | |
- Build on existing ideas positively | |
If provided with context information, use it to inform your response while maintaining your optimistic perspective. | |
Focus on opportunities and solutions from the context. | |
Keep your responses concise and focused on the topic at hand."""), | |
("human", """Context information: | |
{chunks} | |
Question/Topic: | |
{input}""") | |
]) | |
self.chain = self.prompt | base_llm | |
async def __call__(self, input_data: Dict[str, Any]) -> Dict[str, Any]: | |
believer_logger.info(f"Processing input: {input_data['content']}") | |
chunks = input_data.get("chunks", []) | |
chunks_text = "\n".join(chunks) if chunks else "No additional context provided." | |
response = await self.chain.ainvoke({ | |
"input": input_data["content"], | |
"chunks": chunks_text | |
}) | |
believer_logger.info(f"Generated response: {response.content}") | |
return {"type": "believer", "content": response.content} | |
class SupervisorAgent: | |
def __init__(self): | |
self.prompt = ChatPromptTemplate.from_messages([ | |
("system", """You are a balanced supervisor. Your role is to: | |
1. Analyze inputs from all agents | |
2. Identify key points and insights | |
3. Balance different perspectives | |
4. Synthesize a comprehensive view | |
5. Provide clear, actionable conclusions | |
Organize your response into these sections: | |
- Opportunities: Key possibilities and positive aspects | |
- Risks: Important challenges and concerns | |
- Questions: Critical questions to consider | |
- Solutions: Potential ways forward | |
Focus on creating a balanced, well-reasoned synthesis of all viewpoints. | |
Keep your words to 100 words or less."""), | |
("human", "Analyze the following perspectives:\n\nExtractor: {extractor_content}\n\nSkeptic: {skeptic_content}\n\nBeliever: {believer_content}") | |
]) | |
self.chain = self.prompt | base_llm | |
async def __call__(self, agent_responses: Dict[str, Any]) -> Dict[str, Any]: | |
supervisor_logger.info("Processing agent responses:") | |
supervisor_logger.info(f"Extractor: {agent_responses['extractor']['content']}") | |
supervisor_logger.info(f"Skeptic: {agent_responses['skeptic']['content']}") | |
supervisor_logger.info(f"Believer: {agent_responses['believer']['content']}") | |
# Process supervisor's analysis | |
response = await self.chain.ainvoke({ | |
"extractor_content": agent_responses["extractor"]["content"], | |
"skeptic_content": agent_responses["skeptic"]["content"], | |
"believer_content": agent_responses["believer"]["content"] | |
}) | |
supervisor_logger.info(f"Generated analysis: {response.content}") | |
# Parse the response into sections | |
content = response.content | |
sections = { | |
"opportunities": [], | |
"risks": [], | |
"questions": [], | |
"solutions": [] | |
} | |
# Simple parsing of the content into sections | |
current_section = None | |
for line in content.split('\n'): | |
line = line.strip() | |
if line.lower().startswith('opportunities:'): | |
current_section = "opportunities" | |
elif line.lower().startswith('risks:'): | |
current_section = "risks" | |
elif line.lower().startswith('questions:'): | |
current_section = "questions" | |
elif line.lower().startswith('solutions:'): | |
current_section = "solutions" | |
elif line and current_section: | |
sections[current_section].append(line) | |
return { | |
"type": "supervisor", | |
"content": response.content, | |
"chunks": sections | |
} | |
class PodcastProducerAgent: | |
def __init__(self): | |
podcast_logger.info("Initializing PodcastProducerAgent") | |
# Initialize the agent with a lower temperature for more consistent output | |
llm = ChatOpenAI( | |
model_name="gpt-3.5-turbo", | |
temperature=0.3, | |
openai_api_key=openai_api_key | |
) | |
# Create audio storage directory if it doesn't exist | |
self.audio_dir = os.path.join(os.path.dirname(__file__), "audio_storage") | |
os.makedirs(self.audio_dir, exist_ok=True) | |
podcast_logger.info(f"Audio directory: {self.audio_dir}") | |
self.prompt = ChatPromptTemplate.from_messages([ | |
("system", """You are an expert podcast producer. Create a single, cohesive podcast script that: | |
1. Introduces the topic clearly | |
2. Presents a balanced debate between perspectives | |
3. Incorporates key insights from the supervisor's analysis: | |
- Opportunities and positive aspects | |
- Risks and challenges | |
- Key questions to consider | |
- Potential solutions | |
4. Prioritizes content based on quadrant analysis: | |
- Important & Urgent: Address first and emphasize | |
- Important & Not Urgent: Cover thoroughly but with less urgency | |
- Not Important & Urgent: Mention briefly if relevant | |
- Not Important & Not Urgent: Include only if adds value | |
5. Maintains natural conversation flow with clear speaker transitions | |
6. Concludes with actionable takeaways | |
Keep the tone professional but conversational. Format the script with clear speaker indicators and natural pauses."""), | |
("human", """Create a podcast script from this content: | |
Topic: {user_query} | |
Debate Content: | |
{debate_content} | |
Supervisor's Analysis: | |
{supervisor_content} | |
Quadrant Analysis: | |
Important & Urgent: | |
{important_urgent} | |
Important & Not Urgent: | |
{important_not_urgent} | |
Not Important & Urgent: | |
{not_important_urgent} | |
Not Important & Not Urgent: | |
{not_important_not_urgent}""") | |
]) | |
self.chain = self.prompt | llm | |
# Metadata prompt for categorization | |
self.metadata_prompt = ChatPromptTemplate.from_messages([ | |
("system", """Analyze the debate and provide: | |
1. A category (single word: technology/science/society/politics/economics/culture) | |
2. A short description (3-4 words) of the main topic | |
Format: category|short_description"""), | |
("human", "{content}") | |
]) | |
self.metadata_chain = self.metadata_prompt | llm | |
async def __call__(self, debate_history: list, supervisor_notes: list, user_query: str, supervisor_chunks: dict, quadrant_analysis: dict) -> Dict[str, Any]: | |
try: | |
podcast_logger.info("Starting podcast production") | |
# Format the debate content | |
debate_content = "\n\n".join([ | |
f"{entry['speaker']}: {entry['content']}" | |
for entry in debate_history | |
]) | |
# Get the latest supervisor analysis | |
supervisor_content = supervisor_notes[-1] if supervisor_notes else "" | |
# Format quadrant content | |
important_urgent = "\n".join(quadrant_analysis.get("important_urgent", [])) | |
important_not_urgent = "\n".join(quadrant_analysis.get("important_not_urgent", [])) | |
not_important_urgent = "\n".join(quadrant_analysis.get("not_important_urgent", [])) | |
not_important_not_urgent = "\n".join(quadrant_analysis.get("not_important_not_urgent", [])) | |
# Generate the podcast script | |
script_response = await self.chain.ainvoke({ | |
"user_query": user_query, | |
"debate_content": debate_content, | |
"supervisor_content": supervisor_content, | |
"important_urgent": important_urgent, | |
"important_not_urgent": important_not_urgent, | |
"not_important_urgent": not_important_urgent, | |
"not_important_not_urgent": not_important_not_urgent | |
}) | |
# Get metadata for the podcast | |
metadata_response = await self.metadata_chain.ainvoke({ | |
"content": script_response.content | |
}) | |
category, description = metadata_response.content.strip().split("|") | |
# Clean up filename components | |
clean_query = user_query.lower().replace(" ", "_")[:30] | |
clean_description = description.lower().replace(" ", "_") | |
clean_category = category.lower().strip() | |
try: | |
# Create a single filename with hyphens separating main components | |
filename = f"{clean_query}-{clean_description}-{clean_category}.mp3" | |
filepath = os.path.join(self.audio_dir, filename) | |
# Generate audio file | |
from gtts import gTTS | |
tts = gTTS(text=script_response.content, lang='en') | |
tts.save(filepath) | |
podcast_logger.info(f"Successfully saved audio file: {filepath}") | |
# Save the transcript | |
try: | |
save_transcript(script_response.content, user_query) | |
podcast_logger.info("Successfully saved transcript") | |
except Exception as e: | |
podcast_logger.error(f"Error saving transcript: {str(e)}") | |
return { | |
"type": "podcast", | |
"content": script_response.content, | |
"audio_file": filename, | |
"category": clean_category, | |
"description": description, | |
"title": f"Debate: {description.title()}" | |
} | |
except Exception as e: | |
podcast_logger.error(f"Error in audio generation: {str(e)}", exc_info=True) | |
return { | |
"type": "podcast", | |
"content": script_response.content, | |
"error": f"Audio generation failed: {str(e)}" | |
} | |
except Exception as e: | |
podcast_logger.error(f"Error in podcast production: {str(e)}", exc_info=True) | |
return { | |
"type": "podcast", | |
"error": f"Podcast production failed: {str(e)}" | |
} | |
def create_agents(tavily_api_key: str) -> Dict[str, Any]: | |
# Initialize all agents | |
extractor = ExtractorAgent(tavily_api_key) | |
believer = BelieverAgent() | |
skeptic = SkepticAgent() | |
supervisor = SupervisorAgent() | |
podcast_producer = PodcastProducerAgent() | |
return { | |
"extractor": extractor, | |
"believer": believer, | |
"skeptic": skeptic, | |
"supervisor": supervisor, | |
"podcast_producer": podcast_producer | |
} |