|
from pydantic import BaseModel |
|
from typing import Literal |
|
from pydantic import ValidationError |
|
from rich.console import Console |
|
from rich.logging import RichHandler |
|
import logging |
|
import re |
|
from openai import OpenAI |
|
import os |
|
from dotenv import load_dotenv |
|
from huggingface_hub import InferenceClient |
|
from typing import List, Optional |
|
|
|
|
|
load_dotenv() |
|
|
|
def initialize_client(api_key=None): |
|
"""Initialize OpenAI client if API key is provided.""" |
|
if api_key: |
|
return OpenAI(api_key=api_key) |
|
return None |
|
|
|
|
|
console = Console() |
|
logging.basicConfig( |
|
level=logging.INFO, |
|
format="%(asctime)s - %(levelname)s - %(message)s", |
|
datefmt="%Y-%m-%d %H:%M:%S", |
|
handlers=[RichHandler(console=console)] |
|
) |
|
logger = logging.getLogger("email_agent") |
|
|
|
EMAIL_GENERATOR_PROMPT = """ |
|
Your goal is to write a personalized email for the user based on the provided persona, campaign, and sender details. |
|
If there are feedback points from previous generations, you should reflect on them to improve your solution. |
|
|
|
Persona: |
|
{persona} |
|
|
|
Campaign Details: |
|
{campaign} |
|
|
|
Sender Details: |
|
{sender} |
|
|
|
**Output Format Requirement**: The response must strictly adhere to the following format. Ensure that: |
|
1. All opening tags have corresponding closing tags. |
|
2. The content inside each tag is complete and relevant to the provided details. |
|
3. The email is in a format suitable for sending and does not contain any placeholders |
|
|
|
``` |
|
<thoughts> |
|
[Include your understanding of the persona, campaign, sender details.] |
|
</thoughts> |
|
|
|
<email> |
|
[Your email content here,without any placeholders or incomplete references.] |
|
</email> |
|
``` |
|
Important: The tags <thoughts> and <email> must always be properly closed. |
|
""" |
|
|
|
EMAIL_EVALUATOR_PROMPT1 = """ |
|
Evaluate the provided email content using the following criteria: |
|
1. **Personalization Accuracy**: Does the email reflect the persona details and campaign details? |
|
2. **Tone and Style**: Is the tone engaging and appropriate for the persona? Does it align with the persona's characteristics? |
|
3. **Clarity and Readability**: Is the email easy to read, with clear and concise sentences? Does it avoid ambiguity and jargon? |
|
|
|
**Instructions:** |
|
- Always output a JSON response in the specified format below. |
|
- Only output "PASS" if all criteria are met with no room for improvement. |
|
- If the email does not meet the criteria, output "NEEDS_IMPROVEMENT" or "FAIL", followed by specific feedback. |
|
|
|
**Output Format:** |
|
{{"evaluation": "<PASS | NEEDS_IMPROVEMENT | FAIL>", "feedback": "<Provide specific feedback explaining what needs to be improved and why.>"}} |
|
|
|
Persona: |
|
{persona} |
|
|
|
Campaign Details: |
|
{campaign} |
|
|
|
Sender Details: |
|
{sender} |
|
|
|
Email Content: |
|
{generated_content} |
|
""" |
|
|
|
EMAIL_EVALUATOR_PROMPT = """ |
|
Evaluate email against these criteria: |
|
1. Personalization: Match with persona & campaign |
|
2. Tone: Appropriate for persona |
|
3. Clarity: Readable, concise language |
|
4. The email is in a format suitable for sending and does not contain any placeholders |
|
|
|
Scoring: |
|
- Personalization (0-10) |
|
- Tone Alignment (0-10) |
|
- Readability (0-10) |
|
|
|
**Instructions:** |
|
- Always output a JSON response in the specified format below, without any backticks or additional formatting. |
|
- Only output "PASS" if all criteria are met with no room for improvement. |
|
|
|
Output Format: |
|
{{"evaluation": "<PASS | NEEDS_IMPROVEMENT | FAIL>","feedback": {{"personalization_score": 0,"tone_alignment_score": 0,"readability_score": 0,"improvements": ["Suggestion 1", "Suggestion 2"]}}}} |
|
|
|
Persona: {persona} |
|
Campaign: {campaign} |
|
Sender: {sender} |
|
Email: {generated_content} |
|
""" |
|
|
|
def JSON_llm(prompt: str, openai_api_key: str = None, use_huggingface: bool = False, schema: BaseModel = None) -> dict: |
|
""" |
|
Calls the LLM to generate a response and validates it against a given schema. |
|
|
|
Args: |
|
prompt (str): The input prompt for the LLM. |
|
schema (BaseModel): A pydantic schema for validating the LLM's output. |
|
|
|
Returns: |
|
dict: The validated response from the LLM. |
|
|
|
Raises: |
|
ValidationError: If the response doesn't match the schema. |
|
""" |
|
|
|
raw_response = llm_call(prompt,model="gpt-3.5-turbo", api_key=openai_api_key, use_huggingface=use_huggingface) |
|
console.print("Raw response:", raw_response) |
|
try: |
|
|
|
parsed_response = schema.parse_raw(raw_response) |
|
console.print("Parsed response:", parsed_response) |
|
return parsed_response.dict() |
|
except ValidationError as e: |
|
|
|
logger.error(f"Validation failed: {e}") |
|
logger.error(f"Raw response: {raw_response}") |
|
raise ValueError(f"Invalid response format: {raw_response}") from e |
|
|
|
def extract_response_content(generated_text: str) -> str: |
|
|
|
response_match = re.search(r"Response:\s*(.*)", generated_text, re.DOTALL) |
|
return response_match.group(1).strip() if response_match else "" |
|
|
|
|
|
|
|
def llm_call(prompt: str, model: str = "gpt-3.5-turbo", api_key: str = None, use_huggingface: bool = False) -> str: |
|
""" |
|
Call the LLM model (OpenAI or an open-source alternative) and return the response. |
|
""" |
|
if api_key and not use_huggingface: |
|
console.print("Using OpenAI model.") |
|
client = initialize_client(api_key) |
|
messages = [{"role": "user", "content": prompt}] |
|
print("---messages", messages) |
|
response = client.chat.completions.create( |
|
model=model, |
|
messages=messages, |
|
) |
|
|
|
return response.choices[0].message.content |
|
|
|
elif use_huggingface: |
|
console.print("Using Hugging Face model.") |
|
model = "Qwen/Qwen2.5-72B-Instruct" |
|
hf_client = InferenceClient(model) |
|
messages = [{"role": "user", "content": prompt}] |
|
response = "" |
|
for message in hf_client.chat_completion( |
|
messages, |
|
max_tokens=900, |
|
stream=True, |
|
temperature=0.4, |
|
top_p=0.95, |
|
): |
|
token = message.choices[0].delta.content |
|
response += token |
|
return response |
|
|
|
else: |
|
console.print("Using default simulated response.") |
|
|
|
return '{"evaluation": "NEEDS_IMPROVEMENT", "feedback": "Simulated fallback response for testing purposes."}' |
|
|
|
|
|
def extract_xml(text: str, tag: str) -> str: |
|
""" |
|
Extracts the content of the specified XML tag from the given text. |
|
|
|
Args: |
|
text (str): The text containing the XML. |
|
tag (str): The XML tag to extract content from. |
|
|
|
Returns: |
|
str: The content of the specified XML tag, or an empty string if the tag is not found. |
|
""" |
|
match = re.search(f'<{tag}>(.*?)</{tag}>', text, re.DOTALL) |
|
return match.group(1) if match else "" |
|
|
|
def extract_xml(text: str, tag: str) -> str: |
|
""" |
|
Extracts the content of the specified XML tag from the given text.Next tip |
|
|
|
|
|
Args: |
|
text (str): The text containing the XML. |
|
tag (str): The XML tag to extract content from. |
|
|
|
Returns: |
|
str: The content of the specified XML tag, or an empty string if the tag is not found. |
|
""" |
|
match = re.search(f'<{tag}>(.*?)</{tag}>', text, re.DOTALL) |
|
return match.group(1) if match else "" |
|
|
|
def generate_email(persona: dict, campaign: dict,sender_data: dict , generator_prompt: str, context: str = "", openai_api_key: str = None, use_huggingface: bool = False) -> tuple[str, str]: |
|
"""Generate a personalized email based on persona, campaign details, and feedback.""" |
|
|
|
persona_text = "\n".join([f"{key.replace('_', ' ').capitalize()}: {value}" for key, value in persona.items()]) |
|
campaign_text = "\n".join([f"{key.replace('_', ' ').capitalize()}: {value}" for key, value in campaign.items()]) |
|
sender_text = "\n".join([f"{key.replace('_', ' ').capitalize()}: {value}" for key, value in sender_data.items()]) |
|
full_prompt = generator_prompt.format(persona=persona_text, campaign=campaign_text,sender=sender_text) |
|
if context: |
|
full_prompt += f"\nFeedback: {context}" |
|
console.print("Generating email using LLM...") |
|
console.print(f"Prompt: {full_prompt}") |
|
response = llm_call(full_prompt, model="gpt-3.5-turbo", api_key=openai_api_key, use_huggingface=use_huggingface) |
|
console.print("Generated email response.") |
|
console.print("[bold green]Generated Email Output:[/bold green]") |
|
console.print(response) |
|
return response |
|
|
|
def evaluate_email(persona: dict, campaign: dict,sender_data: dict , evaluator_prompt: str, generated_content: str,openai_api_key: str = None, use_huggingface: bool = False): |
|
"""Evaluate if a generated email meets requirements.""" |
|
try: |
|
print("evaluator_prompt type:", type(evaluator_prompt)) |
|
|
|
|
|
if not persona: |
|
raise ValueError("Persona is required") |
|
if not campaign: |
|
raise ValueError("Campaign is required") |
|
if not generated_content: |
|
raise ValueError("Generated content is required") |
|
if sender_data is None: |
|
raise ValueError("Sender data is required") |
|
|
|
|
|
persona_text = "\n".join([f"{key.replace('_', ' ').capitalize()}: {value}" for key, value in persona.items()]) |
|
campaign_text = "\n".join([f"{key.replace('_', ' ').capitalize()}: {value}" for key, value in campaign.items()]) |
|
sender_text = "\n".join([f"{key.replace('_', ' ').capitalize()}: {value}" for key, value in sender_data.items()]) |
|
|
|
|
|
full_prompt = evaluator_prompt.format( |
|
persona=persona_text, |
|
campaign=campaign_text, |
|
sender=sender_text, |
|
generated_content=generated_content |
|
) |
|
|
|
except Exception as e: |
|
|
|
import traceback |
|
traceback.print_exc() |
|
logger.error(f"Error in evaluate_email: {e}") |
|
print(f"Error details: {e}") |
|
raise |
|
|
|
|
|
class Evaluation(BaseModel): |
|
evaluation: Literal["PASS", "NEEDS_IMPROVEMENT", "FAIL"] |
|
feedback: Optional[dict] = { |
|
"personalization_score": 0, |
|
"tone_alignment_score": 0, |
|
"readability_score": 0, |
|
"improvements": [] |
|
} |
|
console.print("Evaluating generated email...") |
|
response = JSON_llm(full_prompt, openai_api_key, use_huggingface, Evaluation) |
|
print("Email evaluation complete.", response) |
|
evaluation = response["evaluation"] |
|
feedback = response["feedback"] |
|
|
|
console.print(f"Evaluation result: {evaluation}") |
|
if feedback: |
|
console.print(f"Feedback: {feedback}") |
|
|
|
console.print("[bold yellow]Evaluation Feedback:[/bold yellow]") |
|
console.print(feedback) |
|
|
|
return evaluation, feedback |
|
|
|
def loop_email_workflow(persona: dict, campaign: dict,sender_data: dict ,evaluator_prompt: str, generator_prompt: str, max_tries: int = 5, openai_api_key: str = None, use_huggingface: bool = False) -> dict: |
|
"""Keep generating and evaluating emails until the evaluator passes or max tries reached.""" |
|
memory = [] |
|
llm_hits = 0 |
|
tokens_used = 0 |
|
cost = 0 |
|
|
|
console.print("Starting email generation workflow...") |
|
if not persona or not campaign or not sender_data: |
|
raise ValueError("Persona, campaign, and sender data are required for email generation.") |
|
|
|
response = generate_email(persona, campaign,sender_data, generator_prompt, openai_api_key=openai_api_key, use_huggingface=use_huggingface) |
|
llm_hits += 1 |
|
tokens_used += len(response.split()) |
|
memory.append(response) |
|
|
|
for attempt in range(max_tries): |
|
console.print(f"Attempt {attempt + 1} to generate a successful email.") |
|
try: |
|
email_content = extract_xml(response, "email") |
|
console.print(f"Email content: {email_content}") |
|
evaluation, feedback = evaluate_email(persona, campaign,sender_data, evaluator_prompt, email_content, openai_api_key=openai_api_key, use_huggingface=use_huggingface) |
|
except ValueError as e: |
|
console.error(f"Evaluation failed: {e}") |
|
break |
|
|
|
llm_hits += 1 |
|
tokens_used += len(str(feedback).split()) |
|
|
|
if evaluation == "PASS": |
|
cost = tokens_used * 0.0001 |
|
console.print("Email generation completed successfully.") |
|
return { |
|
"final_email": email_content, |
|
"llm_hits": llm_hits, |
|
"tokens_used": tokens_used, |
|
"cost": cost, |
|
} |
|
|
|
context = "\n".join([ |
|
"Previous attempts:", |
|
*[f"- {m}" for m in memory], |
|
f"Feedback: {feedback}" |
|
]) |
|
response = generate_email(persona, campaign,sender_data, generator_prompt, context, openai_api_key=openai_api_key, use_huggingface=use_huggingface) |
|
llm_hits += 1 |
|
tokens_used += len(response.split()) |
|
memory.append(response) |
|
|
|
logger.warning("Max attempts reached without generating a successful email.") |
|
cost = tokens_used * 0.0001 |
|
return { |
|
"final_email": None, |
|
"llm_hits": llm_hits, |
|
"tokens_used": tokens_used, |
|
"cost": cost, |
|
"message": "Max attempts reached without a PASS.", |
|
} |
|
|
|
|
|
|
|
def example(): |
|
persona_data = { |
|
"name": "Alice Smith", |
|
"city": "San Francisco", |
|
"hobbies": "Hiking, Cooking", |
|
"purchase_history": "Outdoor Gear" |
|
} |
|
|
|
|
|
campaign_data = { |
|
"subject_line": "Discover Your Next Outdoor Adventure", |
|
"product": "New Hiking Backpacks", |
|
"discount": "20% off", |
|
"validity": "Until January 31st, 2025", |
|
} |
|
|
|
|
|
sender_data = { |
|
"name": "John Doe", |
|
"email": "[email protected]" |
|
} |
|
|
|
|
|
workflow_result = loop_email_workflow( |
|
persona=persona_data, |
|
campaign=campaign_data, |
|
sender_data=sender_data, |
|
evaluator_prompt=EMAIL_EVALUATOR_PROMPT, |
|
generator_prompt=EMAIL_GENERATOR_PROMPT, |
|
max_tries=5, |
|
openai_api_key=os.getenv("OPENAI_API_KEY"), |
|
use_huggingface=False |
|
) |
|
|
|
|
|
if workflow_result["final_email"]: |
|
console.print("Final Email Generated Successfully:") |
|
console.print("[bold green]Final Email Content:[/bold green]") |
|
console.print(workflow_result["final_email"]) |
|
else: |
|
logger.error("Failed to generate a passing email after maximum attempts.") |
|
console.print("[bold red]Workflow Result:[/bold red]") |
|
console.print(workflow_result) |
|
|
|
|
|
if __name__ == "__main__": |
|
example() |
|
|