|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
import os |
|
import datetime |
|
import json |
|
import re |
|
import base64 |
|
import tempfile |
|
|
|
|
|
from dotenv import load_dotenv |
|
from openai import OpenAI |
|
from google import genai as google_genai |
|
from google.genai import types as google_genai_types |
|
from langchain.agents import create_openai_tools_agent, AgentExecutor |
|
from langchain_openai import ChatOpenAI |
|
from langchain_core.tools import StructuredTool |
|
from langchain_core.messages import HumanMessage, AIMessage |
|
from langchain import hub |
|
from langchain_community.tools import DuckDuckGoSearchRun |
|
|
|
|
|
import gradio as gr |
|
|
|
|
|
|
|
|
|
load_dotenv() |
|
|
|
|
|
|
|
llm = ChatOpenAI(model="gpt-4o", temperature=0.0) |
|
client = OpenAI(api_key=os.getenv("OPENAI_API_KEY")) |
|
|
|
|
|
|
|
genai_client = google_genai.Client(api_key=os.getenv("GOOGLE_API_KEY")) |
|
|
|
|
|
|
|
|
|
|
|
def generate_image(prompt: str) -> dict: |
|
""" |
|
Generates an image based on a text prompt, saves it to 'generated_phishing_image.png' |
|
in the current directory (overwriting previous images), and returns the absolute file path. |
|
""" |
|
|
|
output_filename = "generated_phishing_image.png" |
|
|
|
print(f"INFO: Generating image with prompt: '{prompt}'") |
|
try: |
|
output = genai_client.models.generate_images( |
|
prompt=prompt, |
|
model="imagen-4.0-generate-preview-06-06", |
|
config=google_genai_types.GenerateImagesConfig( |
|
number_of_images=1, |
|
aspect_ratio="16:9", |
|
), |
|
) |
|
generated_img = output.generated_images[0].image |
|
|
|
|
|
generated_img.save(output_filename) |
|
|
|
|
|
absolute_image_path = os.path.abspath(output_filename) |
|
|
|
print(f"INFO: Image saved to: {absolute_image_path}") |
|
return {"status": "success", "image_path": absolute_image_path} |
|
except Exception as e: |
|
print(f"ERROR: Image generation failed: {e}") |
|
return {"status": "error", "message": f"Image generation failed: {e}"} |
|
|
|
|
|
def get_company_info() -> dict: |
|
""" |
|
Retrieves company information (name, logoUrl, departments, etc.) from company_info.json. |
|
""" |
|
print("INFO: Reading company_info.json") |
|
try: |
|
with open('company_info.json', 'r') as f: |
|
data = json.load(f) |
|
return {"status": "success", "data": data} |
|
except FileNotFoundError: |
|
return {"status": "error", "message": "company_info.json not found."} |
|
except json.JSONDecodeError: |
|
return {"status": "error", "message": "Error decoding company_info.json."} |
|
|
|
|
|
def get_user_info() -> dict: |
|
""" |
|
Retrieves the current user's information (name, role, email) from user_info.json. |
|
""" |
|
print("INFO: Reading user_info.json") |
|
try: |
|
with open('user_info.json', 'r') as f: |
|
data = json.load(f) |
|
return {"status": "success", "data": data} |
|
except FileNotFoundError: |
|
return {"status": "error", "message": "user_info.json not found."} |
|
except json.JSONDecodeError: |
|
return {"status": "error", "message": "Error decoding user_info.json."} |
|
|
|
|
|
def create_html_template(html_code: str) -> dict: |
|
""" |
|
Takes a complete HTML string, cleans it (removes newlines), and prepares it for preview. |
|
""" |
|
print("INFO: Formalizing agent-generated HTML template.") |
|
|
|
cleaned_html = html_code.replace("\n", "").replace("\r", "") |
|
return {"status": "success", "template": cleaned_html} |
|
|
|
|
|
def send_test_email(recipient: str, html_body: str) -> dict: |
|
"""Simulates sending a test phishing email to a specified recipient.""" |
|
print(f"INFO: Test email sent to {recipient}") |
|
return {"status": "success", "data": {"recipient": recipient}, "message": f"Test email sent to {recipient}."} |
|
|
|
|
|
def get_or_create_employee_list(action: str, employee_data: list = None) -> dict: |
|
"""Simulates managing employee lists (create, add, use_existing).""" |
|
message = f"Action '{action}' on employee list was successful." |
|
return {"status": "success", "data": {"action": action}, "message": message} |
|
|
|
|
|
def select_target_group(group_type: str, values: list = None) -> dict: |
|
""" |
|
Selects the target group (all, department, individual). Includes error checking |
|
to ensure 'values' are provided when necessary. |
|
""" |
|
if group_type == "all": |
|
message = "The campaign will target all employees." |
|
elif group_type == "department" and values: |
|
message = f"Targeting departments: {', '.join(values)}." |
|
elif group_type == "individual" and values: |
|
message = f"Targeting individuals: {', '.join(values)}." |
|
else: |
|
|
|
message = f"Error: Invalid selection for group type '{group_type}' or missing values." |
|
return {"status": "success", "data": {"group_type": group_type, "targets": values}, "message": message} |
|
|
|
|
|
def schedule_attack(date_time: str) -> dict: |
|
"""Simulates scheduling the phishing campaign.""" |
|
return {"status": "success", "data": {"scheduled_for": date_time}, |
|
"message": f"Campaign scheduled for {date_time}."} |
|
|
|
|
|
|
|
|
|
|
|
tools = [ |
|
StructuredTool.from_function(func=generate_image, name="GenerateImage", |
|
description="Generates an image from a prompt and returns its local file path."), |
|
StructuredTool.from_function(func=get_company_info, name="GetCompanyInfo", |
|
description="Retrieves company information (including logoUrl and departments)."), |
|
StructuredTool.from_function(func=get_user_info, name="GetUserInfo", |
|
description="Retrieves the current user's information (including email)."), |
|
StructuredTool.from_function(func=create_html_template, name="CreateHtmlTemplate", |
|
description="Finalizes the phishing email's HTML code."), |
|
StructuredTool.from_function(func=send_test_email, name="SendTestEmail", |
|
description="Sends a test phishing email for review."), |
|
StructuredTool.from_function(func=get_or_create_employee_list, name="ManageEmployeeList", |
|
description="Manages the employee list for the campaign."), |
|
StructuredTool.from_function(func=select_target_group, name="SelectTargetGroup", |
|
description="Selects the target group for the campaign."), |
|
StructuredTool.from_function(func=schedule_attack, name="ScheduleAttack", |
|
description="Schedules the phishing campaign.") |
|
] |
|
|
|
|
|
prompt = hub.pull("hwchase17/openai-tools-agent") |
|
|
|
|
|
SYSTEM_PROMPT = """ |
|
You are an AI assistant named Cbulwork, designed to set up phishing simulation campaigns. Your goal is to guide the user step-by-step with precision and clarity. The user has already been greeted, so you should start directly with the process. |
|
|
|
**PROCESS:** |
|
|
|
**Step 1: Gather Context & Suggest Scenario** |
|
- Call `GetUserInfo` and `GetCompanyInfo`. |
|
- Greet the user by name. |
|
- If the user has NOT provided a topic, suggest 5 relevant scenarios based on company info. |
|
- Await the user's confirmation of the scenario. |
|
|
|
**Step 2: Choose Template Type** |
|
- Ask the user to choose a template type: Text Only, Text + Photo, or Photo Only. |
|
- Wait for their selection. |
|
|
|
**Step 3: Template Design** |
|
- Write a **highly detailed and convincing**, valid HTML code for the email based on the user's choice. |
|
- **IMAGE & LOGO RULES (CRITICAL):** |
|
- If 'Text + Photo' or 'Photo Only' was chosen: |
|
1. Call `GenerateImage`. The prompt MUST be for a **flyer-style image with simple, bold text** related to the scenario (e.g., "A modern corporate flyer with the text 'Urgent Action Required: Update Your Password'"). |
|
2. Use the exact `image_path` returned by the tool in the `src` attribute of an `<img>` tag. **You MUST prefix the local path with `file:///` for the preview to work.** |
|
- If "Text + Photo" was chosen, also include the `logoUrl` from `GetCompanyInfo` in a separate `<img>` tag. |
|
- **CONTENT RULES:** |
|
- The email body must have at least two convincing paragraphs. |
|
- Generate a professional footer with fake details (address, contact info) for realism. |
|
- Generate a compelling subject, personalized greeting ("{{recipient.name}}"), detailed body, footer, and a clear call-to-action. |
|
- Do NOT include copyright lines. |
|
- After writing the code, you MUST call `CreateHtmlTemplate` with the HTML as a single string. |
|
|
|
**Step 4: Send Test Email** |
|
- After approval, ask to send a test email. If yes, use `SendTestEmail` with the user's email. |
|
|
|
**Step 5: Employee List** |
|
- Ask for the list provision method (upload/manual). If manual, provide an example format (`Name,Email`). Call `ManageEmployeeList`. |
|
|
|
**Step 6: Target Group Selection** |
|
- Ask to target 'all', 'department', or 'individual'. |
|
- If not 'all', ask for the specific names/departments (list available departments from `GetCompanyInfo`). |
|
- Call `SelectTargetGroup` with the correct `group_type` and `values`. |
|
|
|
**Step 7: Schedule Campaign** |
|
- Ask for a future launch date/time (`dd/mm/yyyy` format). Call `ScheduleAttack`. |
|
|
|
**Step 8: Final Summary & Confirmation** |
|
- Provide a complete summary. Ask for final confirmation. After confirmation, ask if there is anything else. |
|
""" |
|
|
|
|
|
prompt.messages[0].prompt.template = SYSTEM_PROMPT |
|
|
|
|
|
agent = create_openai_tools_agent(llm, tools, prompt) |
|
|
|
|
|
agent_executor = AgentExecutor( |
|
agent=agent, |
|
tools=tools, |
|
verbose=True, |
|
handle_parsing_errors=True, |
|
max_iterations=15, |
|
return_intermediate_steps=True |
|
) |
|
|
|
|
|
|
|
|
|
def run_agent_turn(user_input: str, chat_history: list) -> dict: |
|
""" |
|
Processes one turn of the conversation: sends input to the agent, executes tools, |
|
and collects the results (response, HTML, image path, and tool calls). |
|
""" |
|
|
|
langchain_messages = [ |
|
HumanMessage(content=msg["content"]) if msg["role"] == "user" else AIMessage(content=msg["content"]) |
|
for msg in chat_history |
|
] |
|
|
|
|
|
response = agent_executor.invoke({ |
|
"input": user_input, |
|
"chat_history": langchain_messages |
|
}) |
|
|
|
agent_output = response.get("output", "Sorry, an error occurred.") |
|
|
|
|
|
html_to_preview = "" |
|
generated_image_path = None |
|
function_calls = [] |
|
intermediate_steps = response.get("intermediate_steps", []) |
|
|
|
|
|
for action, tool_output in intermediate_steps: |
|
|
|
function_calls.append({ |
|
"tool_name": action.tool, |
|
"tool_args": action.tool_input, |
|
"tool_output": tool_output, |
|
}) |
|
|
|
if action.tool == "CreateHtmlTemplate" and isinstance(tool_output, dict): |
|
html_to_preview = tool_output.get("template", "") |
|
|
|
if action.tool == "GenerateImage" and tool_output.get("status") == "success": |
|
generated_image_path = tool_output.get("image_path") |
|
|
|
|
|
updated_chat_history = chat_history + [ |
|
{"role": "user", "content": user_input}, |
|
{"role": "assistant", "content": agent_output} |
|
] |
|
|
|
|
|
return { |
|
"agent_response": agent_output, |
|
"html_preview": html_to_preview, |
|
"function_calls": function_calls, |
|
"updated_chat_history": updated_chat_history, |
|
"generated_image_preview": generated_image_path |
|
} |
|
|
|
|
|
def process_input_for_gradio(user_input: str, chat_history: list) -> tuple: |
|
""" |
|
Event handler for the Gradio UI. Calls the core agent logic and returns |
|
the outputs in the order expected by the Gradio outputs list. |
|
""" |
|
if not user_input.strip(): |
|
|
|
return chat_history, "", None, None |
|
|
|
|
|
json_output = run_agent_turn(user_input, chat_history) |
|
|
|
|
|
print(f"--- Backend JSON Output ---\n{json.dumps(json_output, indent=2)}\n--------------------------") |
|
|
|
|
|
return ( |
|
json_output["updated_chat_history"], |
|
json_output["html_preview"], |
|
json_output["function_calls"], |
|
json_output["generated_image_preview"] |
|
) |
|
|
|
|
|
|
|
|
|
|
|
with gr.Blocks(theme=gr.themes.Default(primary_hue="blue", secondary_hue="sky")) as demo: |
|
gr.Markdown("## Phishing Campaign Setup Assistant") |
|
gr.Markdown("I will guide you step-by-step to create and schedule a new phishing campaign.") |
|
|
|
with gr.Row(): |
|
|
|
with gr.Column(scale=1): |
|
welcome_message = "Hello, I'm your AI phishing assistant. Send a message to get started." |
|
chatbot = gr.Chatbot( |
|
value=[{"role": "assistant", "content": welcome_message}], |
|
label="Conversation", |
|
height=600, |
|
type="messages" |
|
) |
|
user_input = gr.Textbox( |
|
placeholder="Send a message to continue...", |
|
label="Your Message", |
|
scale=12 |
|
) |
|
|
|
with gr.Column(scale=1): |
|
gr.Markdown("### Email Template Preview") |
|
html_block = gr.HTML(label="HTML Preview") |
|
|
|
gr.Markdown("### Generated Image Preview") |
|
|
|
image_preview_box = gr.Image(label="Image Preview", interactive=False) |
|
|
|
gr.Markdown("### Function Call Output (Debugging)") |
|
json_requests_box = gr.JSON(label="Function 'Requests' Output") |
|
|
|
|
|
user_input.submit( |
|
fn=process_input_for_gradio, |
|
inputs=[user_input, chatbot], |
|
|
|
outputs=[chatbot, html_block, json_requests_box, image_preview_box] |
|
) |
|
|
|
user_input.submit(lambda: "", None, user_input) |
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
|
print("Launching Phishing Campaign Setup Assistant UI...") |
|
demo.launch(debug=False) |