|
|
|
from crewai import Agent, Task, Crew |
|
import gradio as gr |
|
from gradio import ChatMessage |
|
import asyncio |
|
import re |
|
import sys |
|
from typing import List, Generator |
|
import os |
|
from dotenv import load_dotenv |
|
import threading |
|
|
|
|
|
class OutputParser: |
|
def __init__(self): |
|
self.buffer = "" |
|
self.current_agent = None |
|
self.final_article_sent = False |
|
self.message_queue = { |
|
"Content Planner": [], |
|
"Content Writer": [], |
|
"Editor": [] |
|
} |
|
self.agent_sequence = ["Content Planner", "Content Writer", "Editor"] |
|
|
|
def format_output(self, raw_content: str, agent_name: str) -> str: |
|
"""Format the output content based on agent type.""" |
|
if agent_name == "Content Planner": |
|
|
|
lines = raw_content.split('\n') |
|
formatted_lines = [] |
|
for line in lines: |
|
|
|
line = re.sub(r'^\d+\.\s*', '', line.strip()) |
|
|
|
line = re.sub(r'^#+\s*', '', line) |
|
if line: |
|
formatted_lines.append(line) |
|
return '\n\n'.join(formatted_lines) |
|
|
|
elif agent_name == "Content Writer": |
|
|
|
|
|
content = re.sub(r'^#+\s*(.+)$', r'\1', raw_content, flags=re.MULTILINE) |
|
|
|
content = re.sub(r'\n{3,}', '\n\n', content) |
|
return content.strip() |
|
|
|
return raw_content.strip() |
|
|
|
def parse_output(self, text: str) -> List[ChatMessage]: |
|
messages = [] |
|
cleaned_text = re.sub(r'\x1B\[[0-9;]*[mK]', '', text) |
|
|
|
|
|
agent_match = re.search(r'\[DEBUG\]: == Working Agent: (.*?)(?=\n|$)', cleaned_text) |
|
if agent_match: |
|
self.current_agent = agent_match.group(1) |
|
self.message_queue[self.current_agent].append(ChatMessage( |
|
role="assistant", |
|
content=f"Starting work...", |
|
metadata={"title": f"π€ {self.current_agent}"} |
|
)) |
|
|
|
|
|
task_match = re.search(r'\[INFO\]: == Starting Task: (.*?)(?=\n\n|\n> Entering|$)', cleaned_text, re.DOTALL) |
|
if task_match and self.current_agent: |
|
task_content = task_match.group(1).strip() |
|
self.message_queue[self.current_agent].append(ChatMessage( |
|
role="assistant", |
|
content=task_content, |
|
metadata={"title": f"π Task for {self.current_agent}"} |
|
)) |
|
|
|
|
|
debug_match = re.search(r'\[DEBUG\]: == \[(.*?)\] Task output: (.*?)(?=\[DEBUG\]|$)', cleaned_text, re.DOTALL) |
|
if debug_match: |
|
agent_name = debug_match.group(1) |
|
output_content = debug_match.group(2).strip() |
|
|
|
|
|
formatted_content = self.format_output(output_content, agent_name) |
|
|
|
if agent_name == "Editor" and not self.final_article_sent: |
|
self.message_queue[agent_name].append(ChatMessage( |
|
role="assistant", |
|
content="Final article is ready!", |
|
metadata={"title": "π Final Article"} |
|
)) |
|
self.message_queue[agent_name].append(ChatMessage( |
|
role="assistant", |
|
content=formatted_content |
|
)) |
|
self.final_article_sent = True |
|
elif agent_name != "Editor": |
|
self.message_queue[agent_name].append(ChatMessage( |
|
role="assistant", |
|
content=formatted_content, |
|
metadata={"title": f"π‘ Output from {agent_name}"} |
|
)) |
|
|
|
|
|
for agent in self.agent_sequence: |
|
if self.message_queue[agent]: |
|
messages.extend(self.message_queue[agent]) |
|
self.message_queue[agent] = [] |
|
|
|
return messages |
|
|
|
class StreamingCapture: |
|
def __init__(self): |
|
self.buffer = "" |
|
|
|
def write(self, text): |
|
self.buffer += text |
|
return len(text) |
|
|
|
def flush(self): |
|
pass |
|
|
|
class ArticleCrew: |
|
def __init__(self): |
|
|
|
self.planner = Agent( |
|
role="Content Planner", |
|
goal="Plan engaging and factually accurate content on {topic}", |
|
backstory="You're working on planning a blog article about the topic: {topic}. " |
|
"You collect information that helps the audience learn something " |
|
"and make informed decisions.", |
|
allow_delegation=False, |
|
verbose=True |
|
) |
|
|
|
self.writer = Agent( |
|
role="Content Writer", |
|
goal="Write insightful and factually accurate opinion piece about the topic: {topic}", |
|
backstory="You're working on writing a new opinion piece about the topic: {topic}. " |
|
"You base your writing on the work of the Content Planner.", |
|
allow_delegation=False, |
|
verbose=True |
|
) |
|
|
|
self.editor = Agent( |
|
role="Editor", |
|
goal="Edit a given blog post to align with the writing style", |
|
backstory="You are an editor who receives a blog post from the Content Writer.", |
|
allow_delegation=False, |
|
verbose=True |
|
) |
|
|
|
self.output_parser = OutputParser() |
|
|
|
def create_tasks(self, topic: str): |
|
plan_task = Task( |
|
description=( |
|
f"1. Prioritize the latest trends, key players, and noteworthy news on {topic}.\n" |
|
f"2. Identify the target audience, considering their interests and pain points.\n" |
|
f"3. Develop a detailed content outline including introduction, key points, and call to action.\n" |
|
f"4. Include SEO keywords and relevant data or sources." |
|
), |
|
expected_output="A comprehensive content plan document with an outline, audience analysis, SEO keywords, and resources.", |
|
agent=self.planner |
|
) |
|
|
|
write_task = Task( |
|
description=( |
|
"1. Use the content plan to craft a compelling blog post.\n" |
|
"2. Incorporate SEO keywords naturally.\n" |
|
"3. Sections/Subtitles are properly named in an engaging manner.\n" |
|
"4. Ensure proper structure with introduction, body, and conclusion.\n" |
|
"5. Proofread for grammatical errors." |
|
), |
|
expected_output="A well-written blog post in markdown format, ready for publication.", |
|
agent=self.writer |
|
) |
|
|
|
edit_task = Task( |
|
description="Proofread the given blog post for grammatical errors and alignment with the brand's voice.", |
|
expected_output="A well-written blog post in markdown format, ready for publication.", |
|
agent=self.editor |
|
) |
|
|
|
return [plan_task, write_task, edit_task] |
|
|
|
async def process_article(self, topic: str) -> Generator[List[ChatMessage], None, None]: |
|
crew = Crew( |
|
agents=[self.planner, self.writer, self.editor], |
|
tasks=self.create_tasks(topic), |
|
verbose=2 |
|
) |
|
|
|
capture = StreamingCapture() |
|
original_stdout = sys.stdout |
|
sys.stdout = capture |
|
|
|
try: |
|
|
|
result_container = [] |
|
|
|
def run_crew(): |
|
try: |
|
result = crew.kickoff(inputs={"topic": topic}) |
|
result_container.append(result) |
|
except Exception as e: |
|
result_container.append(e) |
|
|
|
thread = threading.Thread(target=run_crew) |
|
thread.start() |
|
|
|
|
|
last_processed = 0 |
|
while thread.is_alive() or last_processed < len(capture.buffer): |
|
if len(capture.buffer) > last_processed: |
|
new_content = capture.buffer[last_processed:] |
|
messages = self.output_parser.parse_output(new_content) |
|
if messages: |
|
for msg in messages: |
|
yield [msg] |
|
last_processed = len(capture.buffer) |
|
await asyncio.sleep(0.1) |
|
|
|
|
|
if result_container and not isinstance(result_container[0], Exception): |
|
|
|
pass |
|
else: |
|
yield [ChatMessage( |
|
role="assistant", |
|
content="An error occurred while generating the article.", |
|
metadata={"title": "β Error"} |
|
)] |
|
|
|
finally: |
|
sys.stdout = original_stdout |
|
|
|
def create_demo(): |
|
article_crew = ArticleCrew() |
|
|
|
with gr.Blocks(theme=gr.themes.Soft()) as demo: |
|
gr.Markdown("# π AI Article Writing Crew") |
|
gr.Markdown("Watch as our AI crew collaborates to create your article!") |
|
|
|
chatbot = gr.Chatbot( |
|
label="Writing Process", |
|
avatar_images=(None, "https://avatars.githubusercontent.com/u/170677839?v=4"), |
|
height=700, |
|
type="messages", |
|
show_label=True |
|
) |
|
|
|
|
|
with gr.Row(equal_height=True): |
|
topic = gr.Textbox( |
|
label="Article Topic", |
|
placeholder="Enter the topic you want an article about...", |
|
|
|
scale=4 |
|
) |
|
|
|
async def process_input(topic, history): |
|
history.append(ChatMessage(role="user", content=f"Write an article about: {topic}")) |
|
yield history |
|
|
|
async for messages in article_crew.process_article(topic): |
|
history.extend(messages) |
|
yield history |
|
|
|
btn = gr.Button("Write Article", variant="primary", scale=1) |
|
|
|
btn.click( |
|
process_input, |
|
inputs=[topic, chatbot], |
|
outputs=[chatbot] |
|
) |
|
|
|
return demo |
|
|
|
if __name__ == "__main__": |
|
demo = create_demo() |
|
demo.queue() |
|
demo.launch(debug=True, share=True) |
|
|