Spaces:
Sleeping
Sleeping
from crewai import Agent, Task, Crew | |
import gradio as gr | |
from gradio import ChatMessage | |
import asyncio | |
import re | |
import sys | |
from typing import List, Generator | |
import os | |
from dotenv import load_dotenv | |
import threading | |
load_dotenv() | |
class OutputParser: | |
def __init__(self): | |
self.buffer = "" | |
self.current_agent = None | |
def parse_output(self, text: str) -> List[ChatMessage]: | |
messages = [] | |
# Clean ANSI codes | |
cleaned_text = re.sub(r'\x1B\[[0-9;]*[mK]', '', text) | |
# Look for working agent declarations | |
agent_match = re.search(r'\[DEBUG\]: == Working Agent: (.*?)(?=\n|$)', cleaned_text) | |
if agent_match: | |
self.current_agent = agent_match.group(1) | |
messages.append(ChatMessage( | |
role="assistant", | |
content=f"Starting work...", | |
metadata={"title": f"π€ {self.current_agent}"} | |
)) | |
# Look for task information with full task list | |
task_match = re.search(r'\[INFO\]: == Starting Task: (.*?)(?=\n\n|\n> Entering|$)', cleaned_text, re.DOTALL) | |
if task_match and self.current_agent: | |
task_content = task_match.group(1).strip() | |
messages.append(ChatMessage( | |
role="assistant", | |
content=task_content, | |
metadata={"title": f"π Task for {self.current_agent}"} | |
)) | |
# Look for thought processes | |
thought_match = re.search(r'Thought: (.*?)(?=\nAction:|$)', cleaned_text, re.DOTALL) | |
if thought_match and self.current_agent: | |
thought_content = thought_match.group(1).strip() | |
messages.append(ChatMessage( | |
role="assistant", | |
content=thought_content, | |
metadata={"title": f"π {self.current_agent}'s Thoughts"} | |
)) | |
# Look for final answers from non-Editor agents | |
if "Final Answer:" in cleaned_text and self.current_agent != "Editor": | |
answer_match = re.search(r'Final Answer:\s*(.*?)(?=\n> Finished chain|$)', cleaned_text, re.DOTALL) | |
if answer_match: | |
answer_content = answer_match.group(1).strip() | |
messages.append(ChatMessage( | |
role="assistant", | |
content=answer_content, | |
metadata={"title": f"π‘ Output from {self.current_agent}"} | |
)) | |
# Special handling for Editor's final answer (the final article) | |
elif "Final Answer:" in cleaned_text and self.current_agent == "Editor": | |
answer_match = re.search(r'Final Answer:\s*(.*?)(?=\n> Finished chain|$)', cleaned_text, re.DOTALL) | |
if answer_match: | |
answer_content = answer_match.group(1).strip() | |
# First send the metadata marker | |
messages.append(ChatMessage( | |
role="assistant", | |
content="Final article is ready!", | |
metadata={"title": "π Final Article"} | |
)) | |
# Then send the actual content without metadata | |
messages.append(ChatMessage( | |
role="assistant", | |
content=answer_content | |
)) | |
return messages | |
class StreamingCapture: | |
def __init__(self): | |
self.buffer = "" | |
def write(self, text): | |
self.buffer += text | |
return len(text) | |
def flush(self): | |
pass | |
class ArticleCrew: | |
def __init__(self): | |
# Initialize agents | |
self.planner = Agent( | |
role="Content Planner", | |
goal="Plan engaging and factually accurate content on {topic}", | |
backstory="You're working on planning a blog article about the topic: {topic}. " | |
"You collect information that helps the audience learn something " | |
"and make informed decisions.", | |
allow_delegation=False, | |
verbose=True | |
) | |
self.writer = Agent( | |
role="Content Writer", | |
goal="Write insightful and factually accurate opinion piece about the topic: {topic}", | |
backstory="You're working on writing a new opinion piece about the topic: {topic}. " | |
"You base your writing on the work of the Content Planner.", | |
allow_delegation=False, | |
verbose=True | |
) | |
self.editor = Agent( | |
role="Editor", | |
goal="Edit a given blog post to align with the writing style", | |
backstory="You are an editor who receives a blog post from the Content Writer.", | |
allow_delegation=False, | |
verbose=True | |
) | |
self.output_parser = OutputParser() | |
def create_tasks(self, topic: str): | |
plan_task = Task( | |
description=( | |
f"1. Prioritize the latest trends, key players, and noteworthy news on {topic}.\n" | |
f"2. Identify the target audience, considering their interests and pain points.\n" | |
f"3. Develop a detailed content outline including introduction, key points, and call to action.\n" | |
f"4. Include SEO keywords and relevant data or sources." | |
), | |
expected_output="A comprehensive content plan document with an outline, audience analysis, SEO keywords, and resources.", | |
agent=self.planner | |
) | |
write_task = Task( | |
description=( | |
"1. Use the content plan to craft a compelling blog post.\n" | |
"2. Incorporate SEO keywords naturally.\n" | |
"3. Sections/Subtitles are properly named in an engaging manner.\n" | |
"4. Ensure proper structure with introduction, body, and conclusion.\n" | |
"5. Proofread for grammatical errors." | |
), | |
expected_output="A well-written blog post in markdown format, ready for publication.", | |
agent=self.writer | |
) | |
edit_task = Task( | |
description="Proofread the given blog post for grammatical errors and alignment with the brand's voice.", | |
expected_output="A well-written blog post in markdown format, ready for publication.", | |
agent=self.editor | |
) | |
return [plan_task, write_task, edit_task] | |
async def process_article(self, topic: str) -> Generator[List[ChatMessage], None, None]: | |
crew = Crew( | |
agents=[self.planner, self.writer, self.editor], | |
tasks=self.create_tasks(topic), | |
verbose=2 | |
) | |
capture = StreamingCapture() | |
original_stdout = sys.stdout | |
sys.stdout = capture | |
try: | |
# Start the crew task in a separate thread to not block streaming | |
result_container = [] | |
def run_crew(): | |
try: | |
result = crew.kickoff(inputs={"topic": topic}) | |
result_container.append(result) | |
except Exception as e: | |
result_container.append(e) | |
thread = threading.Thread(target=run_crew) | |
thread.start() | |
# Stream output while the crew is working | |
last_processed = 0 | |
while thread.is_alive() or last_processed < len(capture.buffer): | |
if len(capture.buffer) > last_processed: | |
new_content = capture.buffer[last_processed:] | |
messages = self.output_parser.parse_output(new_content) | |
if messages: | |
for msg in messages: | |
yield [msg] | |
last_processed = len(capture.buffer) | |
await asyncio.sleep(0.1) | |
# Check if we got a result or an error | |
if result_container and not isinstance(result_container[0], Exception): | |
# Final messages already sent by the parser | |
pass | |
else: | |
yield [ChatMessage( | |
role="assistant", | |
content="An error occurred while generating the article.", | |
metadata={"title": "β Error"} | |
)] | |
finally: | |
sys.stdout = original_stdout | |
def create_demo(): | |
article_crew = ArticleCrew() | |
with gr.Blocks(theme=gr.themes.Soft()) as demo: | |
gr.Markdown("# π AI Article Writing Crew") | |
gr.Markdown("Watch as our AI crew collaborates to create your article!") | |
chatbot = gr.Chatbot( | |
label="Writing Process", | |
avatar_images=(None, "π€"), | |
height=700, | |
type="messages", | |
show_label=True | |
) | |
topic = gr.Textbox( | |
label="Article Topic", | |
placeholder="Enter the topic you want an article about...", | |
lines=2 | |
) | |
async def process_input(topic, history): | |
history.append(ChatMessage(role="user", content=f"Write an article about: {topic}")) | |
yield history | |
async for messages in article_crew.process_article(topic): | |
history.extend(messages) | |
yield history | |
btn = gr.Button("Write Article", variant="primary") | |
btn.click( | |
process_input, | |
inputs=[topic, chatbot], | |
outputs=[chatbot] | |
) | |
return demo | |
if __name__ == "__main__": | |
demo = create_demo() | |
demo.queue() | |
demo.launch(debug=True, share=True) |