Spaces:
Sleeping
Sleeping
from crewai import Agent, Task, Crew | |
import gradio as gr | |
from gradio import ChatMessage | |
import asyncio | |
import re | |
import sys | |
from typing import List, Generator | |
import os | |
from dotenv import load_dotenv | |
import threading | |
from langchain_openai import ChatOpenAI | |
class OutputParser: | |
def __init__(self): | |
self.buffer = "" | |
self.current_agent = None | |
self.final_article_sent = False | |
self.message_queue = { | |
"Content Planner": [], | |
"Content Writer": [], | |
"Editor": [] | |
} | |
self.agent_sequence = ["Content Planner", "Content Writer", "Editor"] | |
def format_output(self, raw_content: str, agent_name: str) -> str: | |
"""Format the output content based on agent type.""" | |
if agent_name == "Content Planner": | |
# Clean up the planner's output to make it more readable | |
lines = raw_content.split('\n') | |
formatted_lines = [] | |
for line in lines: | |
# Remove number prefixes and clean up | |
line = re.sub(r'^\d+\.\s*', '', line.strip()) | |
# Make text size normal by removing markdown formatting | |
line = re.sub(r'^#+\s*', '', line) | |
if line: | |
formatted_lines.append(line) | |
return '\n\n'.join(formatted_lines) | |
elif agent_name == "Content Writer": | |
# Clean up writer's output to make it more readable | |
# Remove markdown headers but keep the text | |
content = re.sub(r'^#+\s*(.+)$', r'\1', raw_content, flags=re.MULTILINE) | |
# Remove multiple newlines | |
content = re.sub(r'\n{3,}', '\n\n', content) | |
return content.strip() | |
return raw_content.strip() | |
def parse_output(self, text: str) -> List[ChatMessage]: | |
messages = [] | |
cleaned_text = re.sub(r'\x1B\[[0-9;]*[mK]', '', text) | |
# Look for working agent declarations | |
agent_match = re.search(r'\[DEBUG\]: == Working Agent: (.*?)(?=\n|$)', cleaned_text) | |
if agent_match: | |
self.current_agent = agent_match.group(1) | |
self.message_queue[self.current_agent].append(ChatMessage( | |
role="assistant", | |
content=f"Starting work...", | |
metadata={"title": f"π€ {self.current_agent}"} | |
)) | |
# Look for task information | |
task_match = re.search(r'\[INFO\]: == Starting Task: (.*?)(?=\n\n|\n> Entering|$)', cleaned_text, re.DOTALL) | |
if task_match and self.current_agent: | |
task_content = task_match.group(1).strip() | |
self.message_queue[self.current_agent].append(ChatMessage( | |
role="assistant", | |
content=task_content, | |
metadata={"title": f"π Task for {self.current_agent}"} | |
)) | |
# Look for agent outputs in debug messages | |
debug_match = re.search(r'\[DEBUG\]: == \[(.*?)\] Task output: (.*?)(?=\[DEBUG\]|$)', cleaned_text, re.DOTALL) | |
if debug_match: | |
agent_name = debug_match.group(1) | |
output_content = debug_match.group(2).strip() | |
# Format the output content | |
formatted_content = self.format_output(output_content, agent_name) | |
if agent_name == "Editor" and not self.final_article_sent: | |
self.message_queue[agent_name].append(ChatMessage( | |
role="assistant", | |
content="Final article is ready!", | |
metadata={"title": "π Final Article"} | |
)) | |
self.message_queue[agent_name].append(ChatMessage( | |
role="assistant", | |
content=formatted_content | |
)) | |
self.final_article_sent = True | |
elif agent_name != "Editor": | |
self.message_queue[agent_name].append(ChatMessage( | |
role="assistant", | |
content=formatted_content, | |
metadata={"title": f"π‘ Output from {agent_name}"} | |
)) | |
# Return messages in the correct sequence | |
for agent in self.agent_sequence: | |
if self.message_queue[agent]: | |
messages.extend(self.message_queue[agent]) | |
self.message_queue[agent] = [] | |
return messages | |
class StreamingCapture: | |
def __init__(self): | |
self.buffer = "" | |
def write(self, text): | |
self.buffer += text | |
return len(text) | |
def flush(self): | |
pass | |
class ArticleCrew: | |
def __init__(self, api_key: str = None): | |
self.api_key = api_key | |
self.initialize_agents() | |
def initialize_agents(self): | |
# Create a ChatOpenAI instance with the API key | |
llm = ChatOpenAI( | |
openai_api_key=self.api_key, | |
temperature=0.7, | |
model="gpt-4" | |
) | |
# Initialize agents with the LLM | |
self.planner = Agent( | |
role="Content Planner", | |
goal="Plan engaging and factually accurate content on {topic}", | |
backstory="You're working on planning a blog article about the topic: {topic}. " | |
"You collect information that helps the audience learn something " | |
"and make informed decisions.", | |
allow_delegation=False, | |
verbose=True, | |
llm=llm | |
) | |
self.writer = Agent( | |
role="Content Writer", | |
goal="Write insightful and factually accurate opinion piece about the topic: {topic}", | |
backstory="You're working on writing a new opinion piece about the topic: {topic}. " | |
"You base your writing on the work of the Content Planner.", | |
allow_delegation=False, | |
verbose=True, | |
llm=llm | |
) | |
self.editor = Agent( | |
role="Editor", | |
goal="Edit a given blog post to align with the writing style", | |
backstory="You are an editor who receives a blog post from the Content Writer.", | |
allow_delegation=False, | |
verbose=True, | |
llm=llm | |
) | |
self.output_parser = OutputParser() | |
def create_tasks(self, topic: str): | |
plan_task = Task( | |
description=( | |
f"1. Prioritize the latest trends, key players, and noteworthy news on {topic}.\n" | |
f"2. Identify the target audience, considering their interests and pain points.\n" | |
f"3. Develop a detailed content outline including introduction, key points, and call to action.\n" | |
f"4. Include SEO keywords and relevant data or sources." | |
), | |
expected_output="A comprehensive content plan document with an outline, audience analysis, SEO keywords, and resources.", | |
agent=self.planner | |
) | |
write_task = Task( | |
description=( | |
"1. Use the content plan to craft a compelling blog post.\n" | |
"2. Incorporate SEO keywords naturally.\n" | |
"3. Sections/Subtitles are properly named in an engaging manner.\n" | |
"4. Ensure proper structure with introduction, body, and conclusion.\n" | |
"5. Proofread for grammatical errors." | |
), | |
expected_output="A well-written blog post in markdown format, ready for publication.", | |
agent=self.writer | |
) | |
edit_task = Task( | |
description="Proofread the given blog post for grammatical errors and alignment with the brand's voice.", | |
expected_output="A well-written blog post in markdown format, ready for publication.", | |
agent=self.editor | |
) | |
return [plan_task, write_task, edit_task] | |
async def process_article(self, topic: str) -> Generator[List[ChatMessage], None, None]: | |
crew = Crew( | |
agents=[self.planner, self.writer, self.editor], | |
tasks=self.create_tasks(topic), | |
verbose=2 | |
) | |
capture = StreamingCapture() | |
original_stdout = sys.stdout | |
sys.stdout = capture | |
try: | |
# Start the crew task in a separate thread to not block streaming | |
result_container = [] | |
def run_crew(): | |
try: | |
result = crew.kickoff(inputs={"topic": topic}) | |
result_container.append(result) | |
except Exception as e: | |
result_container.append(e) | |
thread = threading.Thread(target=run_crew) | |
thread.start() | |
# Stream output while the crew is working | |
last_processed = 0 | |
while thread.is_alive() or last_processed < len(capture.buffer): | |
if len(capture.buffer) > last_processed: | |
new_content = capture.buffer[last_processed:] | |
messages = self.output_parser.parse_output(new_content) | |
if messages: | |
for msg in messages: | |
yield [msg] | |
last_processed = len(capture.buffer) | |
await asyncio.sleep(0.1) | |
# Check if we got a result or an error | |
if result_container and not isinstance(result_container[0], Exception): | |
# Final messages already sent by the parser | |
pass | |
else: | |
yield [ChatMessage( | |
role="assistant", | |
content="An error occurred while generating the article.", | |
metadata={"title": "β Error"} | |
)] | |
finally: | |
sys.stdout = original_stdout | |
def create_demo(): | |
article_crew = None # Initialize as None | |
with gr.Blocks(theme=gr.themes.Soft()) as demo: | |
gr.Markdown("# π AI Article Writing Crew") | |
gr.Markdown("Watch as this AI Crew collaborates to create your article! This application utilizes [CrewAI](https://www.crewai.com/) agents: Content Planner, Content Writer, and Content Editor, to write an article on any topic you choose. To get started, enter your OpenAI API Key below and press Enter!") | |
openai_api_key = gr.Textbox( | |
label='OpenAI API Key', | |
type='password', | |
placeholder='Type your OpenAI API key and press Enter!', | |
interactive=True) | |
chatbot = gr.Chatbot( | |
label="Writing Process", | |
avatar_images=(None, "https://avatars.githubusercontent.com/u/170677839?v=4"), | |
height=700, | |
type="messages", | |
show_label=True, | |
visible=False | |
) | |
with gr.Row(equal_height=True): | |
topic = gr.Textbox( | |
label="Article Topic", | |
placeholder="Enter the topic you want an article about...", | |
scale=4, | |
visible=False | |
) | |
async def process_input(topic, history, openai_api_key): | |
nonlocal article_crew | |
# Initialize ArticleCrew with the API key if not already initialized | |
if article_crew is None: | |
article_crew = ArticleCrew(api_key=openai_api_key) | |
history.append(ChatMessage(role="user", content=f"Write an article about: {topic}")) | |
yield history | |
async for messages in article_crew.process_article(topic): | |
history.extend(messages) | |
yield history | |
btn = gr.Button("Write Article", variant="primary", scale=1, visible=False) | |
def show_interface(): | |
return { | |
openai_api_key: gr.Textbox(visible=False), | |
chatbot: gr.Chatbot(visible=True), | |
topic: gr.Textbox(visible=True), | |
btn: gr.Button(visible=True) | |
} | |
openai_api_key.submit( | |
show_interface, | |
None, | |
[openai_api_key, chatbot, topic, btn] | |
) | |
btn.click( | |
process_input, | |
inputs=[topic, chatbot, openai_api_key], | |
outputs=[chatbot] | |
) | |
return demo | |
if __name__ == "__main__": | |
demo = create_demo() | |
demo.queue() | |
demo.launch() |