Spaces:
Sleeping
Sleeping
import os | |
import asyncio | |
import gradio as gr | |
import logging | |
from huggingface_hub import InferenceClient | |
import cohere | |
import google.generativeai as genai | |
from anthropic import Anthropic | |
import openai | |
from typing import List, Dict, Any, Optional | |
from dotenv import load_dotenv | |
# Load environment variables from .env file if it exists | |
load_dotenv() | |
# Configure logging | |
logging.basicConfig(level=logging.INFO) | |
logger = logging.getLogger(__name__) | |
# --- Agent Class --- | |
class PolyThinkAgent: | |
def __init__(self, model_name: str, model_path: str, role: str = "solver", api_provider: str = None): | |
self.model_name = model_name | |
self.model_path = model_path | |
self.role = role | |
self.api_provider = api_provider | |
self.clients = {} | |
self.hf_token = None | |
self.inference = None | |
def set_clients(self, clients: Dict[str, Any]): | |
"""Set the API clients for this agent""" | |
self.clients = clients | |
if "huggingface" in clients: | |
self.hf_token = clients["huggingface"] | |
if self.hf_token: | |
self.inference = InferenceClient(token=self.hf_token) | |
async def solve_problem(self, problem: str) -> Dict[str, Any]: | |
"""Generate a solution to the given problem""" | |
try: | |
if self.api_provider == "cohere" and "cohere" in self.clients: | |
response = self.clients["cohere"].chat( | |
model=self.model_path, | |
message=f""" | |
PROBLEM: {problem} | |
INSTRUCTIONS: | |
- Provide a clear, concise solution in one sentence. | |
- Include brief reasoning in one additional sentence. | |
- Do not repeat the solution or add extraneous text. | |
""" | |
) | |
solution = response.text.strip() | |
return {"solution": solution, "model_name": self.model_name} | |
elif self.api_provider == "anthropic" and "anthropic" in self.clients: | |
response = self.clients["anthropic"].messages.create( | |
model=self.model_path, | |
messages=[{ | |
"role": "user", | |
"content": f""" | |
PROBLEM: {problem} | |
INSTRUCTIONS: | |
- Provide a clear, concise solution in one sentence. | |
- Include brief reasoning in one additional sentence. | |
- Do not repeat the solution or add extraneous text. | |
""" | |
}] | |
) | |
solution = response.content[0].text.strip() | |
return {"solution": solution, "model_name": self.model_name} | |
elif self.api_provider == "openai" and "openai" in self.clients: | |
response = self.clients["openai"].chat.completions.create( | |
model=self.model_path, | |
messages=[{ | |
"role": "user", | |
"content": f""" | |
PROBLEM: {problem} | |
INSTRUCTIONS: | |
- Provide a clear, concise solution. | |
- Include detailed reasoning. | |
- Do not repeat the solution or add extraneous text. | |
""" | |
}] | |
) | |
solution = response.choices[0].message.content.strip() | |
return {"solution": solution, "model_name": self.model_name} | |
elif self.api_provider == "huggingface" and self.inference: | |
prompt = f""" | |
PROBLEM: {problem} | |
INSTRUCTIONS: | |
- Provide a clear, concise solution. | |
- Include detailed reasoning. | |
- Do not repeat the solution or add extraneous text. | |
SOLUTION AND REASONING: | |
""" | |
result = self.inference.text_generation( | |
prompt, model=self.model_path, max_new_tokens=5000, temperature=0.5 | |
) | |
solution = result if isinstance(result, str) else result.generated_text | |
return {"solution": solution.strip(), "model_name": self.model_name} | |
elif self.api_provider == "gemini" and "gemini" in self.clients: | |
model = self.clients["gemini"].GenerativeModel(self.model_path) | |
try: | |
response = model.generate_content( | |
f""" | |
PROBLEM: {problem} | |
INSTRUCTIONS: | |
- Provide a clear, concise solution. | |
- Include detailed reasoning. | |
- Do not repeat the solution or add extraneous text. | |
""", | |
generation_config=genai.types.GenerationConfig( | |
temperature=0.5, | |
) | |
) | |
# Check response validity and handle different response structures | |
try: | |
# First try to access text directly if available | |
if hasattr(response, 'text'): | |
solution = response.text.strip() | |
# Otherwise check for candidates | |
elif hasattr(response, 'candidates') and response.candidates: | |
# Make sure we have candidates and parts before accessing | |
if hasattr(response.candidates[0], 'content') and hasattr(response.candidates[0].content, 'parts'): | |
solution = response.candidates[0].content.parts[0].text.strip() | |
else: | |
logger.warning(f"Gemini response has candidates but missing content structure: {response}") | |
solution = "Error parsing API response; incomplete response structure." | |
else: | |
# Fallback for when candidates is empty | |
logger.warning(f"Gemini API returned no candidates: {response}") | |
solution = "No solution generated; API returned empty response." | |
except Exception as e: | |
logger.error(f"Error extracting text from Gemini response: {e}, response: {response}") | |
solution = "Error parsing API response." | |
except Exception as e: | |
logger.error(f"Gemini API call failed: {e}") | |
solution = f"API error: {str(e)}" | |
return {"solution": solution, "model_name": self.model_name} | |
else: | |
return {"solution": f"Error: Missing API configuration for {self.api_provider}", "model_name": self.model_name} | |
except Exception as e: | |
logger.error(f"Error in {self.model_name}: {str(e)}") | |
return {"solution": f"Error: {str(e)}", "model_name": self.model_name} | |
async def evaluate_solutions(self, problem: str, solutions: List[Dict[str, Any]]) -> Dict[str, Any]: | |
"""Evaluate solutions from solver agents""" | |
try: | |
prompt = f""" | |
PROBLEM: {problem} | |
SOLUTIONS: | |
1. {solutions[0]['model_name']}: {solutions[0]['solution']} | |
2. {solutions[1]['model_name']}: {solutions[1]['solution']} | |
INSTRUCTIONS: | |
- Extract the numerical final answer from each solution (e.g., 68 from '16 + 52 = 68'). | |
- Extract the key reasoning steps from each solution. | |
- Apply strict evaluation criteria: | |
* Numerical answers must match EXACTLY (including units and precision). | |
* Key reasoning steps must align in approach and logic. | |
- Output exactly: 'AGREEMENT: YES' if BOTH the numerical answers AND reasoning align perfectly. | |
- Output 'AGREEMENT: NO' followed by a one-sentence explanation if either the answers or reasoning differ in ANY way. | |
- Be conservative in declaring agreement - when in doubt, declare disagreement. | |
- Do not add scoring, commentary, or extraneous text. | |
EVALUATION: | |
""" | |
if self.api_provider == "gemini" and "gemini" in self.clients: | |
# Instantiate the model for consistency and clarity | |
model = self.clients["gemini"].GenerativeModel(self.model_path) | |
# Use generate_content on the model instance | |
response = model.generate_content( | |
prompt, | |
generation_config=genai.types.GenerationConfig( | |
temperature=0.5, | |
) | |
) | |
# Handle potential empty response or missing text attribute | |
try: | |
# First try to access text directly if available | |
if hasattr(response, 'text'): | |
judgment = response.text.strip() | |
# Otherwise check for candidates | |
elif hasattr(response, 'candidates') and response.candidates: | |
# Make sure we have candidates and parts before accessing | |
if hasattr(response.candidates[0], 'content') and hasattr(response.candidates[0].content, 'parts'): | |
judgment = response.candidates[0].content.parts[0].text.strip() | |
else: | |
logger.warning(f"Gemini response has candidates but missing content structure: {response}") | |
judgment = "AGREEMENT: NO - Unable to evaluate due to API response structure issue." | |
else: | |
# Fallback for when candidates is empty | |
logger.warning(f"Empty response from Gemini API: {response}") | |
judgment = "AGREEMENT: NO - Unable to evaluate due to API response issue." | |
except Exception as e: | |
logger.error(f"Error extracting text from Gemini response: {e}") | |
judgment = "AGREEMENT: NO - Unable to evaluate due to API response issue." | |
return {"judgment": judgment, "reprompt_needed": "AGREEMENT: NO" in judgment.upper()} | |
elif self.api_provider == "openai" and "openai" in self.clients: | |
response = self.clients["openai"].chat.completions.create( | |
model=self.model_path, | |
max_tokens=200, | |
messages=[{"role": "user", "content": prompt}] | |
) | |
judgment = response.choices[0].message.content.strip() | |
return {"judgment": judgment, "reprompt_needed": "AGREEMENT: NO" in judgment.upper()} | |
elif self.api_provider == "huggingface" and self.inference: | |
result = self.inference.text_generation( | |
prompt, model=self.model_path, max_new_tokens=200, temperature=0.5 | |
) | |
judgment = result if isinstance(result, str) else result.generated_text | |
return {"judgment": judgment.strip(), "reprompt_needed": "AGREEMENT: NO" in judgment.upper()} | |
else: | |
return {"judgment": f"Error: Missing API configuration for {self.api_provider}", "reprompt_needed": False} | |
except Exception as e: | |
logger.error(f"Error in judge: {str(e)}") | |
return {"judgment": f"Error: {str(e)}", "reprompt_needed": False} | |
async def reprompt_with_context(self, problem: str, solutions: List[Dict[str, Any]], judgment: str) -> Dict[str, Any]: | |
"""Generate a revised solution based on previous solutions and judgment""" | |
try: | |
prompt = f""" | |
PROBLEM: {problem} | |
PREVIOUS SOLUTIONS: | |
1. {solutions[0]['model_name']}: {solutions[0]['solution']} | |
2. {solutions[1]['model_name']}: {solutions[1]['solution']} | |
JUDGE FEEDBACK: {judgment} | |
INSTRUCTIONS: | |
- Provide a revised, concise solution in one sentence. | |
- Include brief reasoning in one additional sentence. | |
- Address the judge's feedback. | |
""" | |
if self.api_provider == "cohere" and "cohere" in self.clients: | |
response = self.clients["cohere"].chat( | |
model=self.model_path, | |
message=prompt | |
) | |
solution = response.text.strip() | |
return {"solution": solution, "model_name": self.model_name} | |
elif self.api_provider == "anthropic" and "anthropic" in self.clients: | |
response = self.clients["anthropic"].messages.create( | |
model=self.model_path, | |
max_tokens=100, | |
messages=[{"role": "user", "content": prompt}] | |
) | |
solution = response.content[0].text.strip() | |
return {"solution": solution, "model_name": self.model_name} | |
elif self.api_provider == "openai" and "openai" in self.clients: | |
response = self.clients["openai"].chat.completions.create( | |
model=self.model_path, | |
max_tokens=100, | |
messages=[{"role": "user", "content": prompt}] | |
) | |
solution = response.choices[0].message.content.strip() | |
return {"solution": solution, "model_name": self.model_name} | |
elif self.api_provider == "huggingface" and self.inference: | |
prompt += "\nREVISED SOLUTION AND REASONING:" | |
result = self.inference.text_generation( | |
prompt, model=self.model_path, max_new_tokens=500, temperature=0.5 | |
) | |
solution = result if isinstance(result, str) else result.generated_text | |
return {"solution": solution.strip(), "model_name": self.model_name} | |
elif self.api_provider == "gemini" and "gemini" in self.clients: | |
# Instantiate the model for consistency and clarity | |
model = self.clients["gemini"].GenerativeModel(self.model_path) | |
# Use generate_content | |
response = model.generate_content( | |
f""" | |
PROBLEM: {problem} | |
PREVIOUS SOLUTIONS: | |
1. {solutions[0]['model_name']}: {solutions[0]['solution']} | |
2. {solutions[1]['model_name']}: {solutions[1]['solution']} | |
JUDGE FEEDBACK: {judgment} | |
INSTRUCTIONS: | |
- Provide a revised, concise solution in one sentence. | |
- Include brief reasoning in one additional sentence. | |
- Address the judge's feedback. | |
""", | |
generation_config=genai.types.GenerationConfig( | |
temperature=0.5, | |
max_output_tokens=100 | |
) | |
) | |
# Handle potential empty response or missing text attribute | |
try: | |
# First try to access text directly if available | |
if hasattr(response, 'text'): | |
solution = response.text.strip() | |
# Otherwise check for candidates | |
elif hasattr(response, 'candidates') and response.candidates: | |
# Make sure we have candidates and parts before accessing | |
if hasattr(response.candidates[0], 'content') and hasattr(response.candidates[0].content, 'parts'): | |
solution = response.candidates[0].content.parts[0].text.strip() | |
else: | |
logger.warning(f"Gemini response has candidates but missing content structure: {response}") | |
solution = "Unable to generate a solution due to API response structure issue." | |
else: | |
# Fallback for when candidates is empty | |
logger.warning(f"Empty response from Gemini API: {response}") | |
solution = "Unable to generate a solution due to API response issue." | |
except Exception as e: | |
logger.error(f"Error extracting text from Gemini response: {e}") | |
solution = "Unable to generate a solution due to API response issue." | |
return {"solution": solution, "model_name": self.model_name} | |
else: | |
return {"solution": f"Error: Missing API configuration for {self.api_provider}", "model_name": self.model_name} | |
except Exception as e: | |
logger.error(f"Error in {self.model_name}: {str(e)}") | |
return {"solution": f"Error: {str(e)}", "model_name": self.model_name} | |
# --- Model Registry --- | |
class ModelRegistry: | |
def get_available_models(): | |
"""Get the list of available models grouped by provider (original list)""" | |
return { | |
"Anthropic": [ | |
{"name": "Claude 3.5 Sonnet", "id": "claude-3-5-sonnet-20240620", "provider": "anthropic", "type": ["solver"], "icon": "π"}, | |
{"name": "Claude 3.7 Sonnet", "id": "claude-3-7-sonnet-20250219", "provider": "anthropic", "type": ["solver"], "icon": "π"}, | |
{"name": "Claude 3 Opus", "id": "claude-3-opus-20240229", "provider": "anthropic", "type": ["solver"], "icon": "π"}, | |
{"name": "Claude 3 Haiku", "id": "claude-3-haiku-20240307", "provider": "anthropic", "type": ["solver"], "icon": "π"} | |
], | |
"OpenAI": [ | |
{"name": "GPT-4o", "id": "gpt-4o", "provider": "openai", "type": ["solver"], "icon": "π€"}, | |
{"name": "GPT-4 Turbo", "id": "gpt-4-turbo", "provider": "openai", "type": ["solver"], "icon": "π€"}, | |
{"name": "GPT-4", "id": "gpt-4", "provider": "openai", "type": ["solver"], "icon": "π€"}, | |
{"name": "GPT-3.5 Turbo", "id": "gpt-3.5-turbo", "provider": "openai", "type": ["solver"], "icon": "π€"}, | |
{"name": "OpenAI o1", "id": "o1", "provider": "openai", "type": ["solver", "judge"], "icon": "π€"}, | |
{"name": "OpenAI o3", "id": "o3", "provider": "openai", "type": ["solver", "judge"], "icon": "π€"} | |
], | |
"Cohere": [ | |
{"name": "Cohere Command R", "id": "command-r-08-2024", "provider": "cohere", "type": ["solver"], "icon": "π¬"}, | |
{"name": "Cohere Command R+", "id": "command-r-plus-08-2024", "provider": "cohere", "type": ["solver"], "icon": "π¬"} | |
], | |
"Google": [ | |
{"name": "Gemini 1.5 Pro", "id": "gemini-1.5-pro", "provider": "gemini", "type": ["solver"], "icon": "π"}, | |
{"name": "Gemini 2.0 Flash Thinking Experimental 01-21", "id": "gemini-2.0-flash-thinking-exp-01-21", "provider": "gemini", "type": ["solver", "judge"], "icon": "π"}, | |
{"name": "Gemini 2.5 Pro Experimental 03-25", "id": "gemini-2.5-pro-exp-03-25", "provider": "gemini", "type": ["solver", "judge"], "icon": "π"} | |
], | |
"HuggingFace": [ | |
{"name": "Llama 3.3 70B Instruct", "id": "meta-llama/Llama-3.3-70B-Instruct", "provider": "huggingface", "type": ["solver"], "icon": "π₯"}, | |
{"name": "Llama 3.2 3B Instruct", "id": "meta-llama/Llama-3.2-3B-Instruct", "provider": "huggingface", "type": ["solver"], "icon": "π₯"}, | |
{"name": "Llama 3.1 70B Instruct", "id": "meta-llama/Llama-3.1-70B-Instruct", "provider": "huggingface", "type": ["solver"], "icon": "π₯"}, | |
{"name": "Mistral 7B Instruct v0.3", "id": "mistralai/Mistral-7B-Instruct-v0.3", "provider": "huggingface", "type": ["solver"], "icon": "π₯"}, | |
{"name": "DeepSeek R1 Distill Qwen 32B", "id": "deepseek-ai/DeepSeek-R1-Distill-Qwen-32B", "provider": "huggingface", "type": ["solver", "judge"], "icon": "π₯"}, | |
{"name": "DeepSeek Coder V2 Instruct", "id": "deepseek-ai/DeepSeek-Coder-V2-Instruct", "provider": "huggingface", "type": ["solver"], "icon": "π₯"}, | |
{"name": "Qwen 2.5 72B Instruct", "id": "Qwen/Qwen2.5-72B-Instruct", "provider": "huggingface", "type": ["solver"], "icon": "π₯"}, | |
{"name": "Qwen 2.5 Coder 32B Instruct", "id": "Qwen/Qwen2.5-Coder-32B-Instruct", "provider": "huggingface", "type": ["solver"], "icon": "π₯"}, | |
{"name": "Qwen 2.5 Math 1.5B Instruct", "id": "Qwen/Qwen2.5-Math-1.5B-Instruct", "provider": "huggingface", "type": ["solver"], "icon": "π₯"}, | |
{"name": "Gemma 3 27B Instruct", "id": "google/gemma-3-27b-it", "provider": "huggingface", "type": ["solver"], "icon": "π₯"}, | |
{"name": "Phi-3 Mini 4K Instruct", "id": "microsoft/Phi-3-mini-4k-instruct", "provider": "huggingface", "type": ["solver"], "icon": "π₯"} | |
] | |
} | |
def get_solver_models(): | |
"""Get models suitable for solver role with provider grouping""" | |
all_models = ModelRegistry.get_available_models() | |
solver_models = {} | |
for provider, models in all_models.items(): | |
provider_models = [] | |
for model in models: | |
if "solver" in model["type"]: | |
provider_models.append({ | |
"name": f"{model['icon']} {model['name']} ({provider})", | |
"id": model["id"], | |
"provider": model["provider"] | |
}) | |
if provider_models: | |
solver_models[provider] = provider_models | |
return solver_models | |
def get_judge_models(): | |
"""Get only specific reasoning models suitable for judge role with provider grouping""" | |
all_models = ModelRegistry.get_available_models() | |
judge_models = {} | |
allowed_judge_models = [ | |
"Gemini 2.0 Flash Thinking Experimental 01-21 (Google)", | |
"DeepSeek R1 (HuggingFace)", | |
"Gemini 2.5 Pro Experimental 03-25 (Google)", | |
"OpenAI o1 (OpenAI)", | |
"OpenAI o3 (OpenAI)" | |
] | |
for provider, models in all_models.items(): | |
provider_models = [] | |
for model in models: | |
full_name = f"{model['name']} ({provider})" | |
if "judge" in model["type"] and full_name in allowed_judge_models: | |
provider_models.append({ | |
"name": f"{model['icon']} {model['name']} ({provider})", | |
"id": model["id"], | |
"provider": model["provider"] | |
}) | |
if provider_models: | |
judge_models[provider] = provider_models | |
return judge_models | |
# --- Orchestrator Class --- | |
class PolyThinkOrchestrator: | |
def __init__(self, solver1_config=None, solver2_config=None, judge_config=None, api_clients=None): | |
self.solvers = [] | |
self.judge = None | |
self.api_clients = api_clients or {} | |
if solver1_config: | |
solver1 = PolyThinkAgent( | |
model_name=solver1_config["name"].split(" ", 1)[1].rsplit(" (", 1)[0] if " " in solver1_config["name"] else solver1_config["name"], | |
model_path=solver1_config["id"], | |
api_provider=solver1_config["provider"] | |
) | |
solver1.set_clients(self.api_clients) | |
self.solvers.append(solver1) | |
if solver2_config: | |
solver2 = PolyThinkAgent( | |
model_name=solver2_config["name"].split(" ", 1)[1].rsplit(" (", 1)[0] if " " in solver2_config["name"] else solver2_config["name"], | |
model_path=solver2_config["id"], | |
api_provider=solver2_config["provider"] | |
) | |
solver2.set_clients(self.api_clients) | |
self.solvers.append(solver2) | |
if judge_config: | |
self.judge = PolyThinkAgent( | |
model_name=judge_config["name"].split(" ", 1)[1].rsplit(" (", 1)[0] if " " in judge_config["name"] else judge_config["name"], | |
model_path=judge_config["id"], | |
role="judge", | |
api_provider=judge_config["provider"] | |
) | |
self.judge.set_clients(self.api_clients) | |
async def get_initial_solutions(self, problem: str) -> List[Dict[str, Any]]: | |
tasks = [solver.solve_problem(problem) for solver in self.solvers] | |
return await asyncio.gather(*tasks) | |
async def get_judgment(self, problem: str, solutions: List[Dict[str, Any]]) -> Dict[str, Any]: | |
if self.judge: | |
return await self.judge.evaluate_solutions(problem, solutions) | |
return {"judgment": "No judge configured", "reprompt_needed": False} | |
async def get_revised_solutions(self, problem: str, solutions: List[Dict[str, Any]], judgment: str) -> List[Dict[str, Any]]: | |
tasks = [solver.reprompt_with_context(problem, solutions, judgment) for solver in self.solvers] | |
return await asyncio.gather(*tasks) | |
def generate_final_report(self, problem: str, history: List[Dict[str, Any]]) -> str: | |
report = f""" | |
<div class="final-report-container"> | |
<h2 class="final-report-title">π Final Analysis Report</h2> | |
<div class="problem-container"> | |
<h3 class="problem-title">Problem Statement</h3> | |
<div class="problem-content">{problem}</div> | |
</div> | |
""" | |
# Add best answer section if there's agreement | |
last_judgment = next((step.get("judgment", "") for step in reversed(history) if "judgment" in step), "") | |
if "AGREEMENT: YES" in last_judgment.upper(): | |
# Get the last solutions before agreement | |
last_solutions = next((step["solutions"] for step in reversed(history) if "solutions" in step), None) | |
if last_solutions: | |
report += f""" | |
<div class="best-answer-container agreement"> | |
<h3>Best Answer</h3> | |
<div class="best-answer-content"> | |
<div class="best-answer-icon">β¨</div> | |
<div class="best-answer-text"> | |
<p><strong>Agreed Solution:</strong> {last_solutions[0]['solution']}</p> | |
<p><strong>Models:</strong> {last_solutions[0]['model_name']} & {last_solutions[1]['model_name']}</p> | |
</div> | |
</div> | |
</div> | |
""" | |
report += """ | |
<div class="timeline-container"> | |
""" | |
for i, step in enumerate(history, 1): | |
if "solutions" in step and i == 1: | |
report += f""" | |
<div class="timeline-item"> | |
<div class="timeline-marker">1</div> | |
<div class="timeline-content"> | |
<h4>Initial Solutions</h4> | |
<div class="solutions-container"> | |
""" | |
for sol in step["solutions"]: | |
report += f""" | |
<div class="solution-item"> | |
<div class="solution-header">{sol['model_name']}</div> | |
<div class="solution-body">{sol['solution']}</div> | |
</div> | |
""" | |
report += """ | |
</div> | |
</div> | |
</div> | |
""" | |
elif "judgment" in step: | |
is_agreement = "AGREEMENT: YES" in step["judgment"].upper() | |
judgment_class = "agreement" if is_agreement else "disagreement" | |
judgment_icon = "β " if is_agreement else "β" | |
report += f""" | |
<div class="timeline-item"> | |
<div class="timeline-marker">{i}</div> | |
<div class="timeline-content"> | |
<h4>Evaluation {(i+1)//2}</h4> | |
<div class="judgment-container {judgment_class}"> | |
<div class="judgment-icon">{judgment_icon}</div> | |
<div class="judgment-text">{step["judgment"]}</div> | |
</div> | |
</div> | |
</div> | |
""" | |
elif "solutions" in step and i > 1: | |
round_num = (i+1)//2 | |
report += f""" | |
<div class="timeline-item"> | |
<div class="timeline-marker">{i}</div> | |
<div class="timeline-content"> | |
<h4>Revised Solutions (Round {round_num})</h4> | |
<div class="solutions-container"> | |
""" | |
for sol in step["solutions"]: | |
report += f""" | |
<div class="solution-item"> | |
<div class="solution-header">{sol['model_name']}</div> | |
<div class="solution-body">{sol['solution']}</div> | |
</div> | |
""" | |
report += """ | |
</div> | |
</div> | |
</div> | |
""" | |
last_judgment = next((step.get("judgment", "") for step in reversed(history) if "judgment" in step), "") | |
if "AGREEMENT: YES" in last_judgment.upper(): | |
confidence = "100%" if len(history) == 2 else "80%" | |
report += f""" | |
<div class="conclusion-container agreement"> | |
<h3>Conclusion</h3> | |
<div class="conclusion-content"> | |
<div class="conclusion-icon">β </div> | |
<div class="conclusion-text"> | |
<p>Models reached <strong>AGREEMENT</strong></p> | |
<p>Confidence level: <strong>{confidence}</strong></p> | |
</div> | |
</div> | |
</div> | |
""" | |
else: | |
report += f""" | |
<div class="conclusion-container disagreement"> | |
<h3>Conclusion</h3> | |
<div class="conclusion-content"> | |
<div class="conclusion-icon">β</div> | |
<div class="conclusion-text"> | |
<p>Models could not reach agreement</p> | |
<p>Review all solutions above for best answer</p> | |
</div> | |
</div> | |
</div> | |
""" | |
report += """ | |
</div> | |
</div> | |
""" | |
return report | |
# --- Gradio Interface --- | |
def create_polythink_interface(): | |
custom_css = """ | |
/* Solid black background */ | |
body { | |
background: #000000; | |
color: #ffffff; | |
font-family: 'Arial', sans-serif; | |
margin: 0; | |
padding: 0; | |
min-height: 100vh; | |
} | |
footer {visibility: hidden} | |
/* Enhanced heading without the green light */ | |
.polythink-header { | |
text-align: center; | |
margin-bottom: 30px; | |
padding: 20px; | |
} | |
.polythink-title { | |
font-size: 48px; | |
font-weight: 800; | |
margin: 0; | |
padding: 0; | |
background: linear-gradient(90deg, #ffffff, #aaaaaa, #ffffff); | |
-webkit-background-clip: text; | |
background-clip: text; | |
-webkit-text-fill-color: transparent; | |
animation: shine 3s linear infinite; | |
text-transform: uppercase; | |
letter-spacing: 3px; | |
text-shadow: 0 0 10px rgba(255, 255, 255, 0.3); | |
} | |
@keyframes shine { | |
0% {background-position: 0%;} | |
100% {background-position: 200%;} | |
} | |
.polythink-subtitle { | |
font-size: 18px; | |
color: #cccccc; | |
margin: 10px 0 0 0; | |
text-transform: uppercase; | |
letter-spacing: 2px; | |
font-weight: 300; | |
} | |
/* Original Final Report Styling */ | |
.final-report-container { | |
font-family: 'Arial', sans-serif; | |
} | |
.final-report-title { | |
background: linear-gradient(45deg, #333333, #444444); | |
color: #ffffff; | |
padding: 20px; | |
margin: 0; | |
border-bottom: 1px solid #555555; | |
font-size: 24px; | |
text-align: center; | |
} | |
.problem-container { | |
background: #222222; | |
padding: 15px 20px; | |
margin: 0; | |
border-bottom: 1px solid #333333; | |
} | |
.problem-title { | |
color: #bbbbbb; | |
margin: 0 0 10px 0; | |
font-size: 18px; | |
} | |
.problem-content { | |
background: #333333; | |
padding: 15px; | |
border-radius: 5px; | |
font-family: monospace; | |
font-size: 16px; | |
color: #ffffff; | |
} | |
.timeline-container { | |
padding: 20px; | |
} | |
.timeline-item { | |
display: flex; | |
margin-bottom: 25px; | |
position: relative; | |
} | |
.timeline-item:before { | |
content: ''; | |
position: absolute; | |
left: 15px; | |
top: 30px; | |
bottom: -25px; | |
width: 2px; | |
background: #444444; | |
z-index: 0; | |
} | |
.timeline-item:last-child:before { | |
display: none; | |
} | |
.timeline-marker { | |
width: 34px; | |
height: 34px; | |
border-radius: 50%; | |
background: #333333; | |
display: flex; | |
align-items: center; | |
justify-content: center; | |
font-weight: bold; | |
position: relative; | |
z-index: 1; | |
border: 2px solid #555555; | |
margin-right: 15px; | |
} | |
.timeline-content { | |
flex: 1; | |
background: #1d1d1d; | |
border-radius: 5px; | |
padding: 15px; | |
border: 1px solid #333333; | |
} | |
.timeline-content h4 { | |
margin-top: 0; | |
margin-bottom: 15px; | |
color: #cccccc; | |
border-bottom: 1px solid #333333; | |
padding-bottom: 8px; | |
} | |
.solutions-container { | |
display: flex; | |
flex-wrap: wrap; | |
gap: 10px; | |
} | |
.solution-item { | |
flex: 1; | |
min-width: 250px; | |
background: #252525; | |
border-radius: 5px; | |
overflow: hidden; | |
border: 1px solid #383838; | |
} | |
.solution-header { | |
background: #333333; | |
padding: 8px 12px; | |
font-weight: bold; | |
color: #dddddd; | |
border-bottom: 1px solid #444444; | |
} | |
.solution-body { | |
padding: 12px; | |
color: #bbbbbb; | |
} | |
/* Keep green highlights for agreement/disagreement */ | |
.agreement { | |
color: #4CAF50 !important; | |
border: 1px solid #4CAF50; | |
background-color: rgba(76, 175, 80, 0.1) !important; | |
padding: 10px; | |
border-radius: 5px; | |
} | |
.disagreement { | |
color: #F44336 !important; | |
border: 1px solid #F44336; | |
background-color: rgba(244, 67, 54, 0.1) !important; | |
padding: 10px; | |
border-radius: 5px; | |
} | |
.judgment-container { | |
display: flex; | |
align-items: center; | |
padding: 10px; | |
border-radius: 5px; | |
} | |
.judgment-icon { | |
font-size: 24px; | |
margin-right: 15px; | |
} | |
.conclusion-container { | |
margin-top: 30px; | |
border-radius: 5px; | |
padding: 5px 15px 15px; | |
} | |
.conclusion-content { | |
display: flex; | |
align-items: center; | |
} | |
.conclusion-icon { | |
font-size: 36px; | |
margin-right: 20px; | |
} | |
.conclusion-text { | |
flex: 1; | |
} | |
.conclusion-text p { | |
margin: 5px 0; | |
} | |
/* Best Answer styling */ | |
.best-answer-container { | |
background: #1a1a1a; | |
border-radius: 8px; | |
padding: 20px; | |
margin: 20px 0; | |
box-shadow: 0 4px 15px rgba(0, 0, 0, 0.5); | |
border: 1px solid #4CAF50; | |
} | |
.best-answer-container h3 { | |
color: #4CAF50; | |
margin-top: 0; | |
margin-bottom: 15px; | |
font-size: 1.5em; | |
} | |
.best-answer-content { | |
display: flex; | |
align-items: flex-start; | |
gap: 15px; | |
} | |
.best-answer-icon { | |
font-size: 24px; | |
color: #4CAF50; | |
} | |
.best-answer-text { | |
flex: 1; | |
} | |
.best-answer-text p { | |
margin: 5px 0; | |
color: #ffffff; | |
} | |
.best-answer-text strong { | |
color: #4CAF50; | |
} | |
/* Gradio container */ | |
.gradio-container { | |
background: #1a1a1a; | |
border-radius: 10px; | |
box-shadow: 0 4px 15px rgba(0, 0, 0, 0.5); | |
padding: 20px; | |
} | |
/* Status bar styling */ | |
.status-bar { | |
background: #1a1a1a; | |
padding: 10px; | |
border-radius: 5px; | |
font-size: 1.1em; | |
margin-bottom: 20px; | |
border-left: 4px solid #666666; | |
} | |
/* Step section styling */ | |
.step-section { | |
background: #1a1a1a; | |
border-radius: 8px; | |
padding: 15px; | |
margin-bottom: 20px; | |
box-shadow: 0 2px 10px rgba(0, 0, 0, 0.3); | |
} | |
/* Primary button */ | |
.primary-button { | |
background: linear-gradient(45deg, #555555, #777777) !important; | |
border: none !important; | |
color: white !important; | |
padding: 12px 24px !important; | |
font-weight: bold !important; | |
transition: all 0.3s ease !important; | |
box-shadow: 0 4px 10px rgba(0, 0, 0, 0.3) !important; | |
} | |
.primary-button:hover { | |
transform: translateY(-2px) !important; | |
box-shadow: 0 6px 15px rgba(0, 0, 0, 0.4) !important; | |
background: linear-gradient(45deg, #666666, #888888) !important; | |
} | |
""" | |
# Hardcoded model configurations | |
solver1_config = { | |
"name": "Cohere Command R", | |
"id": "command-r-08-2024", | |
"provider": "cohere" | |
} | |
solver2_config = { | |
"name": "Gemma 3 27B", | |
"id": "gemma-3-27b-it", | |
"provider": "gemini" | |
} | |
judge_config = { | |
"name": "Gemini 2.0 Flash Thinking Experimental 01-21", | |
"id": "gemini-2.0-flash-thinking-exp-01-21", | |
"provider": "gemini" | |
} | |
async def solve_problem(problem: str, max_rounds: int): | |
# Get API keys from environment variables or Hugging Face secrets | |
api_clients = {} | |
# Cohere client | |
cohere_key = os.getenv("COHERE_API_KEY") | |
if cohere_key: | |
api_clients["cohere"] = cohere.Client(cohere_key) | |
# Hugging Face client | |
hf_key = os.getenv("HF_API_KEY") | |
if hf_key: | |
api_clients["huggingface"] = hf_key | |
# Gemini client | |
gemini_key = os.getenv("GEMINI_API_KEY") | |
if gemini_key: | |
genai.configure(api_key=gemini_key) | |
api_clients["gemini"] = genai | |
# Anthropic client | |
anthropic_key = os.getenv("ANTHROPIC_API_KEY") | |
if anthropic_key: | |
api_clients["anthropic"] = Anthropic(api_key=anthropic_key) | |
# OpenAI client | |
openai_key = os.getenv("OPENAI_API_KEY") | |
if openai_key: | |
api_clients["openai"] = openai.OpenAI(api_key=openai_key) | |
# Check if all required API keys are present | |
required_providers = {solver1_config["provider"], solver2_config["provider"], judge_config["provider"]} | |
missing_keys = [p for p in required_providers if p not in api_clients] | |
if missing_keys: | |
yield [ | |
gr.update(value=f"Error: Missing API keys for {', '.join(missing_keys)}", visible=True), | |
gr.update(visible=False), | |
gr.update(visible=False), | |
gr.update(visible=False), | |
gr.update(visible=False), | |
gr.update(visible=False), | |
gr.update(visible=False), | |
gr.update(visible=False), | |
gr.update(value=f"### Status: β Missing API keys for {', '.join(missing_keys)}", visible=True) | |
] | |
return | |
orchestrator = PolyThinkOrchestrator(solver1_config, solver2_config, judge_config, api_clients) | |
initial_solutions = await orchestrator.get_initial_solutions(problem) | |
initial_content = f"## Initial Solutions\n**Problem:** `{problem}`\n\n**Solutions:**\n- **{initial_solutions[0]['model_name']}**: {initial_solutions[0]['solution']}\n- **{initial_solutions[1]['model_name']}**: {initial_solutions[1]['solution']}" | |
yield [ | |
gr.update(value=initial_content, visible=True), | |
gr.update(value="", visible=False), | |
gr.update(value="", visible=False), | |
gr.update(value="", visible=False), | |
gr.update(value="", visible=False), | |
gr.update(value="", visible=False), | |
gr.update(value="", visible=False), | |
gr.update(value="", visible=False), | |
gr.update(value="### Status: π Initial solutions generated", visible=True) | |
] | |
await asyncio.sleep(1) | |
solutions = initial_solutions | |
history = [{"solutions": initial_solutions}] | |
max_outputs = max(int(max_rounds) * 2, 6) | |
round_outputs = [""] * max_outputs | |
for round_num in range(int(max_rounds)): | |
judgment = await orchestrator.get_judgment(problem, solutions) | |
history.append({"judgment": judgment["judgment"]}) | |
is_agreement = "AGREEMENT: YES" in judgment["judgment"].upper() | |
agreement_class = "agreement" if is_agreement else "disagreement" | |
agreement_icon = "β " if is_agreement else "β" | |
judgment_content = f"## Round {round_num + 1} Judgment\n**Evaluation:** <div class='{agreement_class}'>{agreement_icon} {judgment['judgment']}</div>" | |
round_outputs[round_num * 2] = judgment_content | |
yield [ | |
gr.update(value=initial_content, visible=True), | |
gr.update(value=round_outputs[0], visible=bool(round_outputs[0])), | |
gr.update(value=round_outputs[1], visible=bool(round_outputs[1])), | |
gr.update(value=round_outputs[2], visible=bool(round_outputs[2])), | |
gr.update(value=round_outputs[3], visible=bool(round_outputs[3])), | |
gr.update(value=round_outputs[4], visible=bool(round_outputs[4])), | |
gr.update(value=round_outputs[5], visible=bool(round_outputs[5])), | |
gr.update(value="", visible=False), | |
gr.update(value=f"### Status: π Round {round_num + 1} judgment complete", visible=True) | |
] | |
await asyncio.sleep(1) | |
if not judgment["reprompt_needed"]: | |
break | |
revised_solutions = await orchestrator.get_revised_solutions(problem, solutions, judgment["judgment"]) | |
history.append({"solutions": revised_solutions}) | |
revision_content = f"## Round {round_num + 1} Revised Solutions\n**Revised Solutions:**\n- **{revised_solutions[0]['model_name']}**: {revised_solutions[0]['solution']}\n- **{revised_solutions[1]['model_name']}**: {revised_solutions[1]['solution']}" | |
round_outputs[round_num * 2 + 1] = revision_content | |
yield [ | |
gr.update(value=initial_content, visible=True), | |
gr.update(value=round_outputs[0], visible=bool(round_outputs[0])), | |
gr.update(value=round_outputs[1], visible=bool(round_outputs[1])), | |
gr.update(value=round_outputs[2], visible=bool(round_outputs[2])), | |
gr.update(value=round_outputs[3], visible=bool(round_outputs[3])), | |
gr.update(value=round_outputs[4], visible=bool(round_outputs[4])), | |
gr.update(value=round_outputs[5], visible=bool(round_outputs[5])), | |
gr.update(value="", visible=False), | |
gr.update(value=f"### Status: π Round {round_num + 1} revised solutions generated", visible=True) | |
] | |
await asyncio.sleep(1) | |
solutions = revised_solutions | |
final_report_content = orchestrator.generate_final_report(problem, history) | |
yield [ | |
gr.update(value=initial_content, visible=True), | |
gr.update(value=round_outputs[0], visible=True), | |
gr.update(value=round_outputs[1], visible=bool(round_outputs[1])), | |
gr.update(value=round_outputs[2], visible=bool(round_outputs[2])), | |
gr.update(value=round_outputs[3], visible=bool(round_outputs[3])), | |
gr.update(value=round_outputs[4], visible=bool(round_outputs[4])), | |
gr.update(value=round_outputs[5], visible=bool(round_outputs[5])), | |
gr.update(value=final_report_content, visible=True), | |
gr.update(value=f"### Status: β¨ Process complete! Completed {round_num + 1} round(s)", visible=True) | |
] | |
# Apply Monochrome theme and customize it | |
theme = gr.themes.Monochrome( | |
primary_hue="slate", | |
secondary_hue="gray", | |
neutral_hue="neutral", | |
).set( | |
body_text_color="white", | |
background_fill_secondary="rgba(26, 26, 26, 0.9)", | |
background_fill_primary="rgba(0, 0, 0, 0)", | |
) | |
with gr.Blocks(title="PolyThink Alpha", css=custom_css, theme=theme) as demo: | |
# Enhanced header section | |
with gr.Column(elem_classes=["polythink-header"]): | |
gr.HTML(""" | |
<h1 class='polythink-title'>PolyThink</h1> | |
<p class='polythink-subtitle'>Multi-Agent Problem Solving System</p> | |
""", show_label=False) | |
with gr.Row(): | |
with gr.Column(scale=2): | |
gr.Markdown("### Problem Input") | |
problem_input = gr.Textbox( | |
label="Problem", | |
placeholder="Enter your problem or question here...", | |
lines=10, | |
max_lines=20 | |
) | |
rounds_slider = gr.Slider(2, 6, value=2, step=1, label="Maximum Rounds") | |
solve_button = gr.Button("Solve Problem", elem_classes=["primary-button"]) | |
status_text = gr.Markdown("### Status: Ready", elem_classes=["status-bar"], visible=True) | |
with gr.Column(): | |
initial_solutions = gr.Markdown(elem_classes=["step-section"], visible=False) | |
round_judgment_1 = gr.Markdown(elem_classes=["step-section"], visible=False) | |
revised_solutions_1 = gr.Markdown(elem_classes=["step-section"], visible=False) | |
round_judgment_2 = gr.Markdown(elem_classes=["step-section"], visible=False) | |
revised_solutions_2 = gr.Markdown(elem_classes=["step-section"], visible=False) | |
round_judgment_3 = gr.Markdown(elem_classes=["step-section"], visible=False) | |
revised_solutions_3 = gr.Markdown(elem_classes=["step-section"], visible=False) | |
final_report = gr.HTML(elem_classes=["final-report"], visible=False) | |
solve_button.click( | |
fn=solve_problem, | |
inputs=[ | |
problem_input, | |
rounds_slider | |
], | |
outputs=[ | |
initial_solutions, | |
round_judgment_1, | |
revised_solutions_1, | |
round_judgment_2, | |
revised_solutions_2, | |
round_judgment_3, | |
revised_solutions_3, | |
final_report, | |
status_text | |
] | |
) | |
return demo.queue() | |
if __name__ == "__main__": | |
demo = create_polythink_interface() | |
demo.launch(share=True) |