Spaces:
Runtime error
Runtime error
# This works even better | |
from crewai import Agent, Task, Crew | |
import gradio as gr | |
from gradio import ChatMessage | |
import asyncio | |
import re | |
import sys | |
from typing import List, Generator | |
import os | |
from dotenv import load_dotenv | |
import threading | |
class OutputParser: | |
def __init__(self): | |
self.buffer = "" | |
self.current_agent = None | |
self.final_article_sent = False | |
self.message_queue = { | |
"Content Planner": [], | |
"Content Writer": [], | |
"Editor": [] | |
} | |
self.agent_sequence = ["Content Planner", "Content Writer", "Editor"] | |
def format_output(self, raw_content: str, agent_name: str) -> str: | |
"""Format the output content based on agent type.""" | |
if agent_name == "Content Planner": | |
# Clean up the planner's output to make it more readable | |
lines = raw_content.split('\n') | |
formatted_lines = [] | |
for line in lines: | |
# Remove number prefixes and clean up | |
line = re.sub(r'^\d+\.\s*', '', line.strip()) | |
# Make text size normal by removing markdown formatting | |
line = re.sub(r'^#+\s*', '', line) | |
if line: | |
formatted_lines.append(line) | |
return '\n\n'.join(formatted_lines) | |
elif agent_name == "Content Writer": | |
# Clean up writer's output to make it more readable | |
# Remove markdown headers but keep the text | |
content = re.sub(r'^#+\s*(.+)$', r'\1', raw_content, flags=re.MULTILINE) | |
# Remove multiple newlines | |
content = re.sub(r'\n{3,}', '\n\n', content) | |
return content.strip() | |
return raw_content.strip() | |
def parse_output(self, text: str) -> List[ChatMessage]: | |
messages = [] | |
cleaned_text = re.sub(r'\x1B\[[0-9;]*[mK]', '', text) | |
# Look for working agent declarations | |
agent_match = re.search(r'\[DEBUG\]: == Working Agent: (.*?)(?=\n|$)', cleaned_text) | |
if agent_match: | |
self.current_agent = agent_match.group(1) | |
self.message_queue[self.current_agent].append(ChatMessage( | |
role="assistant", | |
content=f"Starting work...", | |
metadata={"title": f"π€ {self.current_agent}"} | |
)) | |
# Look for task information | |
task_match = re.search(r'\[INFO\]: == Starting Task: (.*?)(?=\n\n|\n> Entering|$)', cleaned_text, re.DOTALL) | |
if task_match and self.current_agent: | |
task_content = task_match.group(1).strip() | |
self.message_queue[self.current_agent].append(ChatMessage( | |
role="assistant", | |
content=task_content, | |
metadata={"title": f"π Task for {self.current_agent}"} | |
)) | |
# Look for agent outputs in debug messages | |
debug_match = re.search(r'\[DEBUG\]: == \[(.*?)\] Task output: (.*?)(?=\[DEBUG\]|$)', cleaned_text, re.DOTALL) | |
if debug_match: | |
agent_name = debug_match.group(1) | |
output_content = debug_match.group(2).strip() | |
# Format the output content | |
formatted_content = self.format_output(output_content, agent_name) | |
if agent_name == "Editor" and not self.final_article_sent: | |
self.message_queue[agent_name].append(ChatMessage( | |
role="assistant", | |
content="Final article is ready!", | |
metadata={"title": "π Final Article"} | |
)) | |
self.message_queue[agent_name].append(ChatMessage( | |
role="assistant", | |
content=formatted_content | |
)) | |
self.final_article_sent = True | |
elif agent_name != "Editor": | |
self.message_queue[agent_name].append(ChatMessage( | |
role="assistant", | |
content=formatted_content, | |
metadata={"title": f"π‘ Output from {agent_name}"} | |
)) | |
# Return messages in the correct sequence | |
for agent in self.agent_sequence: | |
if self.message_queue[agent]: | |
messages.extend(self.message_queue[agent]) | |
self.message_queue[agent] = [] | |
return messages | |
class StreamingCapture: | |
def __init__(self): | |
self.buffer = "" | |
def write(self, text): | |
self.buffer += text | |
return len(text) | |
def flush(self): | |
pass | |
class ArticleCrew: | |
def __init__(self): | |
# Initialize agents | |
self.planner = Agent( | |
role="Content Planner", | |
goal="Plan engaging and factually accurate content on {topic}", | |
backstory="You're working on planning a blog article about the topic: {topic}. " | |
"You collect information that helps the audience learn something " | |
"and make informed decisions.", | |
allow_delegation=False, | |
verbose=True | |
) | |
self.writer = Agent( | |
role="Content Writer", | |
goal="Write insightful and factually accurate opinion piece about the topic: {topic}", | |
backstory="You're working on writing a new opinion piece about the topic: {topic}. " | |
"You base your writing on the work of the Content Planner.", | |
allow_delegation=False, | |
verbose=True | |
) | |
self.editor = Agent( | |
role="Editor", | |
goal="Edit a given blog post to align with the writing style", | |
backstory="You are an editor who receives a blog post from the Content Writer.", | |
allow_delegation=False, | |
verbose=True | |
) | |
self.output_parser = OutputParser() | |
def create_tasks(self, topic: str): | |
plan_task = Task( | |
description=( | |
f"1. Prioritize the latest trends, key players, and noteworthy news on {topic}.\n" | |
f"2. Identify the target audience, considering their interests and pain points.\n" | |
f"3. Develop a detailed content outline including introduction, key points, and call to action.\n" | |
f"4. Include SEO keywords and relevant data or sources." | |
), | |
expected_output="A comprehensive content plan document with an outline, audience analysis, SEO keywords, and resources.", | |
agent=self.planner | |
) | |
write_task = Task( | |
description=( | |
"1. Use the content plan to craft a compelling blog post.\n" | |
"2. Incorporate SEO keywords naturally.\n" | |
"3. Sections/Subtitles are properly named in an engaging manner.\n" | |
"4. Ensure proper structure with introduction, body, and conclusion.\n" | |
"5. Proofread for grammatical errors." | |
), | |
expected_output="A well-written blog post in markdown format, ready for publication.", | |
agent=self.writer | |
) | |
edit_task = Task( | |
description="Proofread the given blog post for grammatical errors and alignment with the brand's voice.", | |
expected_output="A well-written blog post in markdown format, ready for publication.", | |
agent=self.editor | |
) | |
return [plan_task, write_task, edit_task] | |
async def process_article(self, topic: str) -> Generator[List[ChatMessage], None, None]: | |
crew = Crew( | |
agents=[self.planner, self.writer, self.editor], | |
tasks=self.create_tasks(topic), | |
verbose=2 | |
) | |
capture = StreamingCapture() | |
original_stdout = sys.stdout | |
sys.stdout = capture | |
try: | |
# Start the crew task in a separate thread to not block streaming | |
result_container = [] | |
def run_crew(): | |
try: | |
result = crew.kickoff(inputs={"topic": topic}) | |
result_container.append(result) | |
except Exception as e: | |
result_container.append(e) | |
thread = threading.Thread(target=run_crew) | |
thread.start() | |
# Stream output while the crew is working | |
last_processed = 0 | |
while thread.is_alive() or last_processed < len(capture.buffer): | |
if len(capture.buffer) > last_processed: | |
new_content = capture.buffer[last_processed:] | |
messages = self.output_parser.parse_output(new_content) | |
if messages: | |
for msg in messages: | |
yield [msg] | |
last_processed = len(capture.buffer) | |
await asyncio.sleep(0.1) | |
# Check if we got a result or an error | |
if result_container and not isinstance(result_container[0], Exception): | |
# Final messages already sent by the parser | |
pass | |
else: | |
yield [ChatMessage( | |
role="assistant", | |
content="An error occurred while generating the article.", | |
metadata={"title": "β Error"} | |
)] | |
finally: | |
sys.stdout = original_stdout | |
def create_demo(): | |
article_crew = ArticleCrew() | |
with gr.Blocks(theme=gr.themes.Soft()) as demo: | |
gr.Markdown("# π AI Article Writing Crew") | |
gr.Markdown("Watch as our AI crew collaborates to create your article!") | |
chatbot = gr.Chatbot( | |
label="Writing Process", | |
avatar_images=(None, "https://avatars.githubusercontent.com/u/170677839?v=4"), | |
height=700, | |
type="messages", | |
show_label=True | |
) | |
with gr.Row(equal_height=True): | |
topic = gr.Textbox( | |
label="Article Topic", | |
placeholder="Enter the topic you want an article about...", | |
#lines=2, | |
scale=4 | |
) | |
async def process_input(topic, history): | |
history.append(ChatMessage(role="user", content=f"Write an article about: {topic}")) | |
yield history | |
async for messages in article_crew.process_article(topic): | |
history.extend(messages) | |
yield history | |
btn = gr.Button("Write Article", variant="primary", scale=1) | |
btn.click( | |
process_input, | |
inputs=[topic, chatbot], | |
outputs=[chatbot] | |
) | |
return demo | |
if __name__ == "__main__": | |
demo = create_demo() | |
demo.queue() | |
demo.launch(debug=True, share=True) | |