Spaces:
Running
Running
| import os | |
| import asyncio | |
| import gradio as gr | |
| import logging | |
| from huggingface_hub import InferenceClient | |
| import cohere | |
| import google.generativeai as genai | |
| from anthropic import Anthropic | |
| import openai | |
| from typing import List, Dict, Any, Optional | |
| from dotenv import load_dotenv | |
| # Load environment variables from .env file if it exists | |
| load_dotenv() | |
| # Configure logging | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| # --- Agent Class --- | |
| class PolyThinkAgent: | |
| def __init__(self, model_name: str, model_path: str, role: str = "solver", api_provider: str = None): | |
| self.model_name = model_name | |
| self.model_path = model_path | |
| self.role = role | |
| self.api_provider = api_provider | |
| self.clients = {} | |
| self.hf_token = None | |
| self.inference = None | |
| def set_clients(self, clients: Dict[str, Any]): | |
| """Set the API clients for this agent""" | |
| self.clients = clients | |
| if "huggingface" in clients: | |
| self.hf_token = clients["huggingface"] | |
| if self.hf_token: | |
| self.inference = InferenceClient(token=self.hf_token) | |
| async def solve_problem(self, problem: str) -> Dict[str, Any]: | |
| """Generate a solution to the given problem""" | |
| try: | |
| if self.api_provider == "cohere" and "cohere" in self.clients: | |
| response = self.clients["cohere"].chat( | |
| model=self.model_path, | |
| message=f""" | |
| PROBLEM: {problem} | |
| INSTRUCTIONS: | |
| - Provide a clear, concise solution in one sentence. | |
| - Include brief reasoning in one additional sentence. | |
| - Do not repeat the solution or add extraneous text. | |
| """ | |
| ) | |
| solution = response.text.strip() | |
| return {"solution": solution, "model_name": self.model_name} | |
| elif self.api_provider == "anthropic" and "anthropic" in self.clients: | |
| response = self.clients["anthropic"].messages.create( | |
| model=self.model_path, | |
| messages=[{ | |
| "role": "user", | |
| "content": f""" | |
| PROBLEM: {problem} | |
| INSTRUCTIONS: | |
| - Provide a clear, concise solution in one sentence. | |
| - Include brief reasoning in one additional sentence. | |
| - Do not repeat the solution or add extraneous text. | |
| """ | |
| }] | |
| ) | |
| solution = response.content[0].text.strip() | |
| return {"solution": solution, "model_name": self.model_name} | |
| elif self.api_provider == "openai" and "openai" in self.clients: | |
| response = self.clients["openai"].chat.completions.create( | |
| model=self.model_path, | |
| messages=[{ | |
| "role": "user", | |
| "content": f""" | |
| PROBLEM: {problem} | |
| INSTRUCTIONS: | |
| - Provide a clear, concise solution. | |
| - Include detailed reasoning. | |
| - Do not repeat the solution or add extraneous text. | |
| """ | |
| }] | |
| ) | |
| solution = response.choices[0].message.content.strip() | |
| return {"solution": solution, "model_name": self.model_name} | |
| elif self.api_provider == "huggingface" and self.inference: | |
| prompt = f""" | |
| PROBLEM: {problem} | |
| INSTRUCTIONS: | |
| - Provide a clear, concise solution. | |
| - Include detailed reasoning. | |
| - Do not repeat the solution or add extraneous text. | |
| SOLUTION AND REASONING: | |
| """ | |
| result = self.inference.text_generation( | |
| prompt, model=self.model_path, max_new_tokens=5000, temperature=0.5 | |
| ) | |
| solution = result if isinstance(result, str) else result.generated_text | |
| return {"solution": solution.strip(), "model_name": self.model_name} | |
| elif self.api_provider == "gemini" and "gemini" in self.clients: | |
| model = self.clients["gemini"].GenerativeModel(self.model_path) | |
| try: | |
| response = model.generate_content( | |
| f""" | |
| PROBLEM: {problem} | |
| INSTRUCTIONS: | |
| - Provide a clear, concise solution. | |
| - Include detailed reasoning. | |
| - Do not repeat the solution or add extraneous text. | |
| """, | |
| generation_config=genai.types.GenerationConfig( | |
| temperature=0.5, | |
| ) | |
| ) | |
| # Check response validity and handle different response structures | |
| try: | |
| # First try to access text directly if available | |
| if hasattr(response, 'text'): | |
| solution = response.text.strip() | |
| # Otherwise check for candidates | |
| elif hasattr(response, 'candidates') and response.candidates: | |
| # Make sure we have candidates and parts before accessing | |
| if hasattr(response.candidates[0], 'content') and hasattr(response.candidates[0].content, 'parts'): | |
| solution = response.candidates[0].content.parts[0].text.strip() | |
| else: | |
| logger.warning(f"Gemini response has candidates but missing content structure: {response}") | |
| solution = "Error parsing API response; incomplete response structure." | |
| else: | |
| # Fallback for when candidates is empty | |
| logger.warning(f"Gemini API returned no candidates: {response}") | |
| solution = "No solution generated; API returned empty response." | |
| except Exception as e: | |
| logger.error(f"Error extracting text from Gemini response: {e}, response: {response}") | |
| solution = "Error parsing API response." | |
| except Exception as e: | |
| logger.error(f"Gemini API call failed: {e}") | |
| solution = f"API error: {str(e)}" | |
| return {"solution": solution, "model_name": self.model_name} | |
| else: | |
| return {"solution": f"Error: Missing API configuration for {self.api_provider}", "model_name": self.model_name} | |
| except Exception as e: | |
| logger.error(f"Error in {self.model_name}: {str(e)}") | |
| return {"solution": f"Error: {str(e)}", "model_name": self.model_name} | |
| async def evaluate_solutions(self, problem: str, solutions: List[Dict[str, Any]]) -> Dict[str, Any]: | |
| """Evaluate solutions from solver agents""" | |
| try: | |
| prompt = f""" | |
| PROBLEM: {problem} | |
| SOLUTIONS: | |
| 1. {solutions[0]['model_name']}: {solutions[0]['solution']} | |
| 2. {solutions[1]['model_name']}: {solutions[1]['solution']} | |
| INSTRUCTIONS: | |
| - Extract the numerical final answer from each solution (e.g., 68 from '16 + 52 = 68'). | |
| - Extract the key reasoning steps from each solution. | |
| - Apply strict evaluation criteria: | |
| * Numerical answers must match EXACTLY (including units and precision). | |
| * Key reasoning steps must align in approach and logic. | |
| - Output exactly: 'AGREEMENT: YES' if BOTH the numerical answers AND reasoning align perfectly. | |
| - Output 'AGREEMENT: NO' followed by a one-sentence explanation if either the answers or reasoning differ in ANY way. | |
| - Be conservative in declaring agreement - when in doubt, declare disagreement. | |
| - Do not add scoring, commentary, or extraneous text. | |
| EVALUATION: | |
| """ | |
| if self.api_provider == "gemini" and "gemini" in self.clients: | |
| # Instantiate the model for consistency and clarity | |
| model = self.clients["gemini"].GenerativeModel(self.model_path) | |
| # Use generate_content on the model instance | |
| response = model.generate_content( | |
| prompt, | |
| generation_config=genai.types.GenerationConfig( | |
| temperature=0.5, | |
| ) | |
| ) | |
| # Handle potential empty response or missing text attribute | |
| try: | |
| # First try to access text directly if available | |
| if hasattr(response, 'text'): | |
| judgment = response.text.strip() | |
| # Otherwise check for candidates | |
| elif hasattr(response, 'candidates') and response.candidates: | |
| # Make sure we have candidates and parts before accessing | |
| if hasattr(response.candidates[0], 'content') and hasattr(response.candidates[0].content, 'parts'): | |
| judgment = response.candidates[0].content.parts[0].text.strip() | |
| else: | |
| logger.warning(f"Gemini response has candidates but missing content structure: {response}") | |
| judgment = "AGREEMENT: NO - Unable to evaluate due to API response structure issue." | |
| else: | |
| # Fallback for when candidates is empty | |
| logger.warning(f"Empty response from Gemini API: {response}") | |
| judgment = "AGREEMENT: NO - Unable to evaluate due to API response issue." | |
| except Exception as e: | |
| logger.error(f"Error extracting text from Gemini response: {e}") | |
| judgment = "AGREEMENT: NO - Unable to evaluate due to API response issue." | |
| return {"judgment": judgment, "reprompt_needed": "AGREEMENT: NO" in judgment.upper()} | |
| elif self.api_provider == "openai" and "openai" in self.clients: | |
| response = self.clients["openai"].chat.completions.create( | |
| model=self.model_path, | |
| max_tokens=200, | |
| messages=[{"role": "user", "content": prompt}] | |
| ) | |
| judgment = response.choices[0].message.content.strip() | |
| return {"judgment": judgment, "reprompt_needed": "AGREEMENT: NO" in judgment.upper()} | |
| elif self.api_provider == "huggingface" and self.inference: | |
| result = self.inference.text_generation( | |
| prompt, model=self.model_path, max_new_tokens=200, temperature=0.5 | |
| ) | |
| judgment = result if isinstance(result, str) else result.generated_text | |
| return {"judgment": judgment.strip(), "reprompt_needed": "AGREEMENT: NO" in judgment.upper()} | |
| else: | |
| return {"judgment": f"Error: Missing API configuration for {self.api_provider}", "reprompt_needed": False} | |
| except Exception as e: | |
| logger.error(f"Error in judge: {str(e)}") | |
| return {"judgment": f"Error: {str(e)}", "reprompt_needed": False} | |
| async def reprompt_with_context(self, problem: str, solutions: List[Dict[str, Any]], judgment: str) -> Dict[str, Any]: | |
| """Generate a revised solution based on previous solutions and judgment""" | |
| try: | |
| prompt = f""" | |
| PROBLEM: {problem} | |
| PREVIOUS SOLUTIONS: | |
| 1. {solutions[0]['model_name']}: {solutions[0]['solution']} | |
| 2. {solutions[1]['model_name']}: {solutions[1]['solution']} | |
| JUDGE FEEDBACK: {judgment} | |
| INSTRUCTIONS: | |
| - Provide a revised, concise solution in one sentence. | |
| - Include brief reasoning in one additional sentence. | |
| - Address the judge's feedback. | |
| """ | |
| if self.api_provider == "cohere" and "cohere" in self.clients: | |
| response = self.clients["cohere"].chat( | |
| model=self.model_path, | |
| message=prompt | |
| ) | |
| solution = response.text.strip() | |
| return {"solution": solution, "model_name": self.model_name} | |
| elif self.api_provider == "anthropic" and "anthropic" in self.clients: | |
| response = self.clients["anthropic"].messages.create( | |
| model=self.model_path, | |
| max_tokens=100, | |
| messages=[{"role": "user", "content": prompt}] | |
| ) | |
| solution = response.content[0].text.strip() | |
| return {"solution": solution, "model_name": self.model_name} | |
| elif self.api_provider == "openai" and "openai" in self.clients: | |
| response = self.clients["openai"].chat.completions.create( | |
| model=self.model_path, | |
| max_tokens=100, | |
| messages=[{"role": "user", "content": prompt}] | |
| ) | |
| solution = response.choices[0].message.content.strip() | |
| return {"solution": solution, "model_name": self.model_name} | |
| elif self.api_provider == "huggingface" and self.inference: | |
| prompt += "\nREVISED SOLUTION AND REASONING:" | |
| result = self.inference.text_generation( | |
| prompt, model=self.model_path, max_new_tokens=500, temperature=0.5 | |
| ) | |
| solution = result if isinstance(result, str) else result.generated_text | |
| return {"solution": solution.strip(), "model_name": self.model_name} | |
| elif self.api_provider == "gemini" and "gemini" in self.clients: | |
| # Instantiate the model for consistency and clarity | |
| model = self.clients["gemini"].GenerativeModel(self.model_path) | |
| # Use generate_content | |
| response = model.generate_content( | |
| f""" | |
| PROBLEM: {problem} | |
| PREVIOUS SOLUTIONS: | |
| 1. {solutions[0]['model_name']}: {solutions[0]['solution']} | |
| 2. {solutions[1]['model_name']}: {solutions[1]['solution']} | |
| JUDGE FEEDBACK: {judgment} | |
| INSTRUCTIONS: | |
| - Provide a revised, concise solution in one sentence. | |
| - Include brief reasoning in one additional sentence. | |
| - Address the judge's feedback. | |
| """, | |
| generation_config=genai.types.GenerationConfig( | |
| temperature=0.5, | |
| max_output_tokens=100 | |
| ) | |
| ) | |
| # Handle potential empty response or missing text attribute | |
| try: | |
| # First try to access text directly if available | |
| if hasattr(response, 'text'): | |
| solution = response.text.strip() | |
| # Otherwise check for candidates | |
| elif hasattr(response, 'candidates') and response.candidates: | |
| # Make sure we have candidates and parts before accessing | |
| if hasattr(response.candidates[0], 'content') and hasattr(response.candidates[0].content, 'parts'): | |
| solution = response.candidates[0].content.parts[0].text.strip() | |
| else: | |
| logger.warning(f"Gemini response has candidates but missing content structure: {response}") | |
| solution = "Unable to generate a solution due to API response structure issue." | |
| else: | |
| # Fallback for when candidates is empty | |
| logger.warning(f"Empty response from Gemini API: {response}") | |
| solution = "Unable to generate a solution due to API response issue." | |
| except Exception as e: | |
| logger.error(f"Error extracting text from Gemini response: {e}") | |
| solution = "Unable to generate a solution due to API response issue." | |
| return {"solution": solution, "model_name": self.model_name} | |
| else: | |
| return {"solution": f"Error: Missing API configuration for {self.api_provider}", "model_name": self.model_name} | |
| except Exception as e: | |
| logger.error(f"Error in {self.model_name}: {str(e)}") | |
| return {"solution": f"Error: {str(e)}", "model_name": self.model_name} | |
| # --- Model Registry --- | |
| class ModelRegistry: | |
| def get_available_models(): | |
| """Get the list of available models grouped by provider (original list)""" | |
| return { | |
| "Anthropic": [ | |
| {"name": "Claude 3.5 Sonnet", "id": "claude-3-5-sonnet-20240620", "provider": "anthropic", "type": ["solver"], "icon": "π"}, | |
| {"name": "Claude 3.7 Sonnet", "id": "claude-3-7-sonnet-20250219", "provider": "anthropic", "type": ["solver"], "icon": "π"}, | |
| {"name": "Claude 3 Opus", "id": "claude-3-opus-20240229", "provider": "anthropic", "type": ["solver"], "icon": "π"}, | |
| {"name": "Claude 3 Haiku", "id": "claude-3-haiku-20240307", "provider": "anthropic", "type": ["solver"], "icon": "π"} | |
| ], | |
| "OpenAI": [ | |
| {"name": "GPT-4o", "id": "gpt-4o", "provider": "openai", "type": ["solver"], "icon": "οΏ½οΏ½"}, | |
| {"name": "GPT-4 Turbo", "id": "gpt-4-turbo", "provider": "openai", "type": ["solver"], "icon": "π€"}, | |
| {"name": "GPT-4", "id": "gpt-4", "provider": "openai", "type": ["solver"], "icon": "π€"}, | |
| {"name": "GPT-3.5 Turbo", "id": "gpt-3.5-turbo", "provider": "openai", "type": ["solver"], "icon": "π€"}, | |
| {"name": "OpenAI o1", "id": "o1", "provider": "openai", "type": ["solver", "judge"], "icon": "π€"}, | |
| {"name": "OpenAI o3", "id": "o3", "provider": "openai", "type": ["solver", "judge"], "icon": "π€"} | |
| ], | |
| "Cohere": [ | |
| {"name": "Cohere Command R", "id": "command-r-08-2024", "provider": "cohere", "type": ["solver"], "icon": "π¬"}, | |
| {"name": "Cohere Command R+", "id": "command-r-plus-08-2024", "provider": "cohere", "type": ["solver"], "icon": "π¬"} | |
| ], | |
| "Google": [ | |
| {"name": "Gemini 1.5 Pro", "id": "gemini-1.5-pro", "provider": "gemini", "type": ["solver"], "icon": "π"}, | |
| {"name": "Gemini 2.0 Flash Thinking Experimental 01-21", "id": "gemini-2.0-flash-thinking-exp-01-21", "provider": "gemini", "type": ["solver", "judge"], "icon": "π"}, | |
| {"name": "Gemini 2.5 Pro Experimental 03-25", "id": "gemini-2.5-pro-exp-03-25", "provider": "gemini", "type": ["solver", "judge"], "icon": "π"} | |
| ], | |
| "HuggingFace": [ | |
| {"name": "Llama 3.3 70B Instruct", "id": "meta-llama/Llama-3.3-70B-Instruct", "provider": "huggingface", "type": ["solver"], "icon": "π₯"}, | |
| {"name": "Llama 3.2 3B Instruct", "id": "meta-llama/Llama-3.2-3B-Instruct", "provider": "huggingface", "type": ["solver"], "icon": "π₯"}, | |
| {"name": "Llama 3.1 70B Instruct", "id": "meta-llama/Llama-3.1-70B-Instruct", "provider": "huggingface", "type": ["solver"], "icon": "π₯"}, | |
| {"name": "Mistral 7B Instruct v0.3", "id": "mistralai/Mistral-7B-Instruct-v0.3", "provider": "huggingface", "type": ["solver"], "icon": "π₯"}, | |
| {"name": "DeepSeek R1 Distill Qwen 32B", "id": "deepseek-ai/DeepSeek-R1-Distill-Qwen-32B", "provider": "huggingface", "type": ["solver", "judge"], "icon": "π₯"}, | |
| {"name": "DeepSeek Coder V2 Instruct", "id": "deepseek-ai/DeepSeek-Coder-V2-Instruct", "provider": "huggingface", "type": ["solver"], "icon": "π₯"}, | |
| {"name": "Qwen 2.5 72B Instruct", "id": "Qwen/Qwen2.5-72B-Instruct", "provider": "huggingface", "type": ["solver"], "icon": "π₯"}, | |
| {"name": "Qwen 2.5 Coder 32B Instruct", "id": "Qwen/Qwen2.5-Coder-32B-Instruct", "provider": "huggingface", "type": ["solver"], "icon": "π₯"}, | |
| {"name": "Qwen 2.5 Math 1.5B Instruct", "id": "Qwen/Qwen2.5-Math-1.5B-Instruct", "provider": "huggingface", "type": ["solver"], "icon": "π₯"}, | |
| {"name": "Gemma 3 27B Instruct", "id": "google/gemma-3-27b-it", "provider": "huggingface", "type": ["solver"], "icon": "π₯"}, | |
| {"name": "Phi-3 Mini 4K Instruct", "id": "microsoft/Phi-3-mini-4k-instruct", "provider": "huggingface", "type": ["solver"], "icon": "π₯"} | |
| ] | |
| } | |
| def get_solver_models(): | |
| """Get models suitable for solver role with provider grouping""" | |
| all_models = ModelRegistry.get_available_models() | |
| solver_models = {} | |
| for provider, models in all_models.items(): | |
| provider_models = [] | |
| for model in models: | |
| if "solver" in model["type"]: | |
| provider_models.append({ | |
| "name": f"{model['icon']} {model['name']} ({provider})", | |
| "id": model["id"], | |
| "provider": model["provider"] | |
| }) | |
| if provider_models: | |
| solver_models[provider] = provider_models | |
| return solver_models | |
| def get_judge_models(): | |
| """Get only specific reasoning models suitable for judge role with provider grouping""" | |
| all_models = ModelRegistry.get_available_models() | |
| judge_models = {} | |
| allowed_judge_models = [ | |
| "Gemini 2.0 Flash Thinking Experimental 01-21 (Google)", | |
| "DeepSeek R1 (HuggingFace)", | |
| "Gemini 2.5 Pro Experimental 03-25 (Google)", | |
| "OpenAI o1 (OpenAI)", | |
| "OpenAI o3 (OpenAI)" | |
| ] | |
| for provider, models in all_models.items(): | |
| provider_models = [] | |
| for model in models: | |
| full_name = f"{model['name']} ({provider})" | |
| if "judge" in model["type"] and full_name in allowed_judge_models: | |
| provider_models.append({ | |
| "name": f"{model['icon']} {model['name']} ({provider})", | |
| "id": model["id"], | |
| "provider": model["provider"] | |
| }) | |
| if provider_models: | |
| judge_models[provider] = provider_models | |
| return judge_models | |
| # --- Orchestrator Class --- | |
| class PolyThinkOrchestrator: | |
| def __init__(self, solver1_config=None, solver2_config=None, judge_config=None, api_clients=None): | |
| self.solvers = [] | |
| self.judge = None | |
| self.api_clients = api_clients or {} | |
| if solver1_config: | |
| solver1 = PolyThinkAgent( | |
| model_name=solver1_config["name"].split(" ", 1)[1].rsplit(" (", 1)[0] if " " in solver1_config["name"] else solver1_config["name"], | |
| model_path=solver1_config["id"], | |
| api_provider=solver1_config["provider"] | |
| ) | |
| solver1.set_clients(self.api_clients) | |
| self.solvers.append(solver1) | |
| if solver2_config: | |
| solver2 = PolyThinkAgent( | |
| model_name=solver2_config["name"].split(" ", 1)[1].rsplit(" (", 1)[0] if " " in solver2_config["name"] else solver2_config["name"], | |
| model_path=solver2_config["id"], | |
| api_provider=solver2_config["provider"] | |
| ) | |
| solver2.set_clients(self.api_clients) | |
| self.solvers.append(solver2) | |
| if judge_config: | |
| self.judge = PolyThinkAgent( | |
| model_name=judge_config["name"].split(" ", 1)[1].rsplit(" (", 1)[0] if " " in judge_config["name"] else judge_config["name"], | |
| model_path=judge_config["id"], | |
| role="judge", | |
| api_provider=judge_config["provider"] | |
| ) | |
| self.judge.set_clients(self.api_clients) | |
| async def get_initial_solutions(self, problem: str) -> List[Dict[str, Any]]: | |
| tasks = [solver.solve_problem(problem) for solver in self.solvers] | |
| return await asyncio.gather(*tasks) | |
| async def get_judgment(self, problem: str, solutions: List[Dict[str, Any]]) -> Dict[str, Any]: | |
| if self.judge: | |
| return await self.judge.evaluate_solutions(problem, solutions) | |
| return {"judgment": "No judge configured", "reprompt_needed": False} | |
| async def get_revised_solutions(self, problem: str, solutions: List[Dict[str, Any]], judgment: str) -> List[Dict[str, Any]]: | |
| tasks = [solver.reprompt_with_context(problem, solutions, judgment) for solver in self.solvers] | |
| return await asyncio.gather(*tasks) | |
| def generate_final_report(self, problem: str, history: List[Dict[str, Any]]) -> str: | |
| report = f""" | |
| <div class="final-report-container"> | |
| <h2 class="final-report-title">π Final Analysis Report</h2> | |
| <div class="problem-container"> | |
| <h3 class="problem-title">Problem Statement</h3> | |
| <div class="problem-content">{problem}</div> | |
| </div> | |
| """ | |
| # Add best answer section if there's agreement | |
| last_judgment = next((step.get("judgment", "") for step in reversed(history) if "judgment" in step), "") | |
| if "AGREEMENT: YES" in last_judgment.upper(): | |
| # Get the last solutions before agreement | |
| last_solutions = next((step["solutions"] for step in reversed(history) if "solutions" in step), None) | |
| if last_solutions: | |
| report += f""" | |
| <div class="best-answer-container agreement"> | |
| <h3>Best Answer</h3> | |
| <div class="best-answer-content"> | |
| <div class="best-answer-icon">β¨</div> | |
| <div class="best-answer-text"> | |
| <p><strong>Agreed Solution:</strong> {last_solutions[0]['solution']}</p> | |
| <p><strong>Models:</strong> {last_solutions[0]['model_name']} & {last_solutions[1]['model_name']}</p> | |
| </div> | |
| </div> | |
| </div> | |
| """ | |
| report += """ | |
| <div class="timeline-container"> | |
| """ | |
| for i, step in enumerate(history, 1): | |
| if "solutions" in step and i == 1: | |
| report += f""" | |
| <div class="timeline-item"> | |
| <div class="timeline-marker">1</div> | |
| <div class="timeline-content"> | |
| <h4>Initial Solutions</h4> | |
| <div class="solutions-container"> | |
| """ | |
| for sol in step["solutions"]: | |
| report += f""" | |
| <div class="solution-item"> | |
| <div class="solution-header">{sol['model_name']}</div> | |
| <div class="solution-body">{sol['solution']}</div> | |
| </div> | |
| """ | |
| report += """ | |
| </div> | |
| </div> | |
| </div> | |
| """ | |
| elif "judgment" in step: | |
| is_agreement = "AGREEMENT: YES" in step["judgment"].upper() | |
| judgment_class = "agreement" if is_agreement else "disagreement" | |
| judgment_icon = "β " if is_agreement else "β" | |
| report += f""" | |
| <div class="timeline-item"> | |
| <div class="timeline-marker">{i}</div> | |
| <div class="timeline-content"> | |
| <h4>Evaluation {(i+1)//2}</h4> | |
| <div class="judgment-container {judgment_class}"> | |
| <div class="judgment-icon">{judgment_icon}</div> | |
| <div class="judgment-text">{step["judgment"]}</div> | |
| </div> | |
| </div> | |
| </div> | |
| """ | |
| elif "solutions" in step and i > 1: | |
| round_num = (i+1)//2 | |
| report += f""" | |
| <div class="timeline-item"> | |
| <div class="timeline-marker">{i}</div> | |
| <div class="timeline-content"> | |
| <h4>Revised Solutions (Round {round_num})</h4> | |
| <div class="solutions-container"> | |
| """ | |
| for sol in step["solutions"]: | |
| report += f""" | |
| <div class="solution-item"> | |
| <div class="solution-header">{sol['model_name']}</div> | |
| <div class="solution-body">{sol['solution']}</div> | |
| </div> | |
| """ | |
| report += """ | |
| </div> | |
| </div> | |
| </div> | |
| """ | |
| last_judgment = next((step.get("judgment", "") for step in reversed(history) if "judgment" in step), "") | |
| if "AGREEMENT: YES" in last_judgment.upper(): | |
| confidence = "100%" if len(history) == 2 else "80%" | |
| report += f""" | |
| <div class="conclusion-container agreement"> | |
| <h3>Conclusion</h3> | |
| <div class="conclusion-content"> | |
| <div class="conclusion-icon">β </div> | |
| <div class="conclusion-text"> | |
| <p>Models reached <strong>AGREEMENT</strong></p> | |
| <p>Confidence level: <strong>{confidence}</strong></p> | |
| </div> | |
| </div> | |
| </div> | |
| """ | |
| else: | |
| report += f""" | |
| <div class="conclusion-container disagreement"> | |
| <h3>Conclusion</h3> | |
| <div class="conclusion-content"> | |
| <div class="conclusion-icon">β</div> | |
| <div class="conclusion-text"> | |
| <p>Models could not reach agreement</p> | |
| <p>Review all solutions above for best answer</p> | |
| </div> | |
| </div> | |
| </div> | |
| """ | |
| report += """ | |
| </div> | |
| </div> | |
| """ | |
| return report | |
| # --- Gradio Interface --- | |
| def create_polythink_interface(): | |
| custom_css = """ | |
| /* Reverted to Original Black Theme */ | |
| body { | |
| background: #000000; | |
| color: #ffffff; | |
| font-family: 'Arial', sans-serif; | |
| } | |
| .gradio-container { | |
| background: #1a1a1a; | |
| border-radius: 10px; | |
| box-shadow: 0 4px 15px rgba(0, 0, 0, 0.5); | |
| padding: 20px; | |
| } | |
| .gr-button { | |
| background: linear-gradient(45deg, #666666, #999999); | |
| color: #ffffff; | |
| border: none; | |
| padding: 10px 20px; | |
| border-radius: 5px; | |
| transition: all 0.3s ease; | |
| } | |
| .gr-button:hover { | |
| background: linear-gradient(45deg, #555555, #888888); | |
| transform: translateY(-2px); | |
| } | |
| .gr-textbox { | |
| background: #333333; | |
| color: #ffffff; | |
| border: 1px solid #444444; | |
| border-radius: 5px; | |
| padding: 10px; | |
| } | |
| .gr-slider { | |
| background: #333333; | |
| border-radius: 5px; | |
| } | |
| .gr-slider .track-fill { | |
| background: #cccccc; | |
| } | |
| .step-section { | |
| background: #1a1a1a; | |
| border-radius: 8px; | |
| padding: 15px; | |
| margin-bottom: 20px; | |
| box-shadow: 0 2px 10px rgba(0, 0, 0, 0.3); | |
| } | |
| .step-section h3 { | |
| color: #cccccc; | |
| margin-top: 0; | |
| font-size: 1.5em; | |
| } | |
| .step-section p { | |
| color: #aaaaaa; | |
| line-height: 1.6; | |
| } | |
| .step-section code { | |
| background: #333333; | |
| padding: 2px 6px; | |
| border-radius: 3px; | |
| color: #ff6b6b; | |
| } | |
| .step-section strong { | |
| color: #ffffff; | |
| } | |
| .status-bar { | |
| background: #1a1a1a; | |
| padding: 10px; | |
| border-radius: 5px; | |
| font-size: 1.1em; | |
| margin-bottom: 20px; | |
| border-left: 4px solid #666666; | |
| } | |
| /* Agreement/Disagreement styling */ | |
| .agreement { | |
| color: #4CAF50 !important; | |
| border: 1px solid #4CAF50; | |
| background-color: rgba(76, 175, 80, 0.1) !important; | |
| padding: 10px; | |
| border-radius: 5px; | |
| } | |
| .disagreement { | |
| color: #F44336 !important; | |
| border: 1px solid #F44336; | |
| background-color: rgba(244, 67, 54, 0.1) !important; | |
| padding: 10px; | |
| border-radius: 5px; | |
| } | |
| /* Enhanced Final Report Styling */ | |
| .final-report { | |
| background: #111111; | |
| padding: 0; | |
| border-radius: 8px; | |
| box-shadow: 0 4px 15px rgba(0, 0, 0, 0.5); | |
| margin-top: 20px; | |
| overflow: hidden; | |
| } | |
| .final-report-container { | |
| font-family: 'Arial', sans-serif; | |
| } | |
| .final-report-title { | |
| background: linear-gradient(45deg, #333333, #444444); | |
| color: #ffffff; | |
| padding: 20px; | |
| margin: 0; | |
| border-bottom: 1px solid #555555; | |
| font-size: 24px; | |
| text-align: center; | |
| } | |
| .problem-container { | |
| background: #222222; | |
| padding: 15px 20px; | |
| margin: 0; | |
| border-bottom: 1px solid #333333; | |
| } | |
| .problem-title { | |
| color: #bbbbbb; | |
| margin: 0 0 10px 0; | |
| font-size: 18px; | |
| } | |
| .problem-content { | |
| background: #333333; | |
| padding: 15px; | |
| border-radius: 5px; | |
| font-family: monospace; | |
| font-size: 16px; | |
| color: #ffffff; | |
| } | |
| .timeline-container { | |
| padding: 20px; | |
| } | |
| .timeline-item { | |
| display: flex; | |
| margin-bottom: 25px; | |
| position: relative; | |
| } | |
| .timeline-item:before { | |
| content: ''; | |
| position: absolute; | |
| left: 15px; | |
| top: 30px; | |
| bottom: -25px; | |
| width: 2px; | |
| background: #444444; | |
| z-index: 0; | |
| } | |
| .timeline-item:last-child:before { | |
| display: none; | |
| } | |
| .timeline-marker { | |
| width: 34px; | |
| height: 34px; | |
| border-radius: 50%; | |
| background: #333333; | |
| display: flex; | |
| align-items: center; | |
| justify-content: center; | |
| font-weight: bold; | |
| position: relative; | |
| z-index: 1; | |
| border: 2px solid #555555; | |
| margin-right: 15px; | |
| } | |
| .timeline-content { | |
| flex: 1; | |
| background: #1d1d1d; | |
| border-radius: 5px; | |
| padding: 15px; | |
| border: 1px solid #333333; | |
| } | |
| .timeline-content h4 { | |
| margin-top: 0; | |
| margin-bottom: 15px; | |
| color: #cccccc; | |
| border-bottom: 1px solid #333333; | |
| padding-bottom: 8px; | |
| } | |
| .solutions-container { | |
| display: flex; | |
| flex-wrap: wrap; | |
| gap: 10px; | |
| } | |
| .solution-item { | |
| flex: 1; | |
| min-width: 250px; | |
| background: #252525; | |
| border-radius: 5px; | |
| overflow: hidden; | |
| border: 1px solid #383838; | |
| } | |
| .solution-header { | |
| background: #333333; | |
| padding: 8px 12px; | |
| font-weight: bold; | |
| color: #dddddd; | |
| border-bottom: 1px solid #444444; | |
| } | |
| .solution-body { | |
| padding: 12px; | |
| color: #bbbbbb; | |
| } | |
| .judgment-container { | |
| display: flex; | |
| align-items: center; | |
| padding: 10px; | |
| border-radius: 5px; | |
| } | |
| .judgment-icon { | |
| font-size: 24px; | |
| margin-right: 15px; | |
| } | |
| .conclusion-container { | |
| margin-top: 30px; | |
| border-radius: 5px; | |
| padding: 5px 15px 15px; | |
| } | |
| .conclusion-content { | |
| display: flex; | |
| align-items: center; | |
| } | |
| .conclusion-icon { | |
| font-size: 36px; | |
| margin-right: 20px; | |
| } | |
| .conclusion-text { | |
| flex: 1; | |
| } | |
| .conclusion-text p { | |
| margin: 5px 0; | |
| } | |
| /* Header styling */ | |
| .app-header { | |
| background: linear-gradient(45deg, #222222, #333333); | |
| padding: 20px; | |
| border-radius: 10px; | |
| margin-bottom: 20px; | |
| box-shadow: 0 4px 10px rgba(0, 0, 0, 0.3); | |
| border: 1px solid #444444; | |
| } | |
| .app-title { | |
| font-size: 28px; | |
| margin: 0 0 10px 0; | |
| background: -webkit-linear-gradient(45deg, #cccccc, #ffffff); | |
| -webkit-background-clip: text; | |
| -webkit-text-fill-color: transparent; | |
| display: inline-block; | |
| } | |
| .app-subtitle { | |
| font-size: 16px; | |
| color: #aaaaaa; | |
| margin: 0; | |
| } | |
| /* Button style */ | |
| .primary-button { | |
| background: linear-gradient(45deg, #555555, #777777) !important; | |
| border: none !important; | |
| color: white !important; | |
| padding: 12px 24px !important; | |
| font-weight: bold !important; | |
| transition: all 0.3s ease !important; | |
| box-shadow: 0 4px 10px rgba(0, 0, 0, 0.3) !important; | |
| } | |
| .primary-button:hover { | |
| transform: translateY(-2px) !important; | |
| box-shadow: 0 6px 15px rgba(0, 0, 0, 0.4) !important; | |
| background: linear-gradient(45deg, #666666, #888888) !important; | |
| } | |
| /* Best Answer styling */ | |
| .best-answer-container { | |
| background: #1a1a1a; | |
| border-radius: 8px; | |
| padding: 20px; | |
| margin: 20px 0; | |
| box-shadow: 0 4px 15px rgba(0, 0, 0, 0.5); | |
| border: 1px solid #4CAF50; | |
| } | |
| .best-answer-container h3 { | |
| color: #4CAF50; | |
| margin-top: 0; | |
| margin-bottom: 15px; | |
| font-size: 1.5em; | |
| } | |
| .best-answer-content { | |
| display: flex; | |
| align-items: flex-start; | |
| gap: 15px; | |
| } | |
| .best-answer-icon { | |
| font-size: 24px; | |
| color: #4CAF50; | |
| } | |
| .best-answer-text { | |
| flex: 1; | |
| } | |
| .best-answer-text p { | |
| margin: 5px 0; | |
| color: #ffffff; | |
| } | |
| .best-answer-text strong { | |
| color: #4CAF50; | |
| } | |
| """ | |
| # Hardcoded model configurations | |
| solver1_config = { | |
| "name": "Cohere Command R", | |
| "id": "command-r-08-2024", | |
| "provider": "cohere" | |
| } | |
| solver2_config = { | |
| "name": "Llama 3.2 3B Instruct", | |
| "id": "meta-llama/Llama-3.2-3B-Instruct", | |
| "provider": "huggingface" | |
| } | |
| judge_config = { | |
| "name": "Gemini 2.0 Flash Thinking Experimental 01-21", | |
| "id": "gemini-2.0-flash-thinking-exp-01-21", | |
| "provider": "gemini" | |
| } | |
| async def solve_problem(problem: str, max_rounds: int): | |
| # Get API keys from environment variables or Hugging Face secrets | |
| api_clients = {} | |
| # Cohere client | |
| cohere_key = os.getenv("COHERE_API_KEY") | |
| if cohere_key: | |
| api_clients["cohere"] = cohere.Client(cohere_key) | |
| # Hugging Face client | |
| hf_key = os.getenv("HF_API_KEY") | |
| if hf_key: | |
| api_clients["huggingface"] = hf_key | |
| # Gemini client | |
| gemini_key = os.getenv("GEMINI_API_KEY") | |
| if gemini_key: | |
| genai.configure(api_key=gemini_key) | |
| api_clients["gemini"] = genai | |
| # Anthropic client | |
| anthropic_key = os.getenv("ANTHROPIC_API_KEY") | |
| if anthropic_key: | |
| api_clients["anthropic"] = Anthropic(api_key=anthropic_key) | |
| # OpenAI client | |
| openai_key = os.getenv("OPENAI_API_KEY") | |
| if openai_key: | |
| api_clients["openai"] = openai.OpenAI(api_key=openai_key) | |
| # Check if all required API keys are present | |
| required_providers = {solver1_config["provider"], solver2_config["provider"], judge_config["provider"]} | |
| missing_keys = [p for p in required_providers if p not in api_clients] | |
| if missing_keys: | |
| yield [ | |
| gr.update(value=f"Error: Missing API keys for {', '.join(missing_keys)}", visible=True), | |
| gr.update(visible=False), | |
| gr.update(visible=False), | |
| gr.update(visible=False), | |
| gr.update(visible=False), | |
| gr.update(visible=False), | |
| gr.update(visible=False), | |
| gr.update(visible=False), | |
| gr.update(value=f"### Status: β Missing API keys for {', '.join(missing_keys)}", visible=True) | |
| ] | |
| return | |
| orchestrator = PolyThinkOrchestrator(solver1_config, solver2_config, judge_config, api_clients) | |
| initial_solutions = await orchestrator.get_initial_solutions(problem) | |
| initial_content = f"## Initial Solutions\n**Problem:** `{problem}`\n\n**Solutions:**\n- **{initial_solutions[0]['model_name']}**: {initial_solutions[0]['solution']}\n- **{initial_solutions[1]['model_name']}**: {initial_solutions[1]['solution']}" | |
| yield [ | |
| gr.update(value=initial_content, visible=True), | |
| gr.update(value="", visible=False), | |
| gr.update(value="", visible=False), | |
| gr.update(value="", visible=False), | |
| gr.update(value="", visible=False), | |
| gr.update(value="", visible=False), | |
| gr.update(value="", visible=False), | |
| gr.update(value="", visible=False), | |
| gr.update(value="### Status: π Initial solutions generated", visible=True) | |
| ] | |
| await asyncio.sleep(1) | |
| solutions = initial_solutions | |
| history = [{"solutions": initial_solutions}] | |
| max_outputs = max(int(max_rounds) * 2, 6) | |
| round_outputs = [""] * max_outputs | |
| for round_num in range(int(max_rounds)): | |
| judgment = await orchestrator.get_judgment(problem, solutions) | |
| history.append({"judgment": judgment["judgment"]}) | |
| is_agreement = "AGREEMENT: YES" in judgment["judgment"].upper() | |
| agreement_class = "agreement" if is_agreement else "disagreement" | |
| agreement_icon = "β " if is_agreement else "β" | |
| judgment_content = f"## Round {round_num + 1} Judgment\n**Evaluation:** <div class='{agreement_class}'>{agreement_icon} {judgment['judgment']}</div>" | |
| round_outputs[round_num * 2] = judgment_content | |
| yield [ | |
| gr.update(value=initial_content, visible=True), | |
| gr.update(value=round_outputs[0], visible=bool(round_outputs[0])), | |
| gr.update(value=round_outputs[1], visible=bool(round_outputs[1])), | |
| gr.update(value=round_outputs[2], visible=bool(round_outputs[2])), | |
| gr.update(value=round_outputs[3], visible=bool(round_outputs[3])), | |
| gr.update(value=round_outputs[4], visible=bool(round_outputs[4])), | |
| gr.update(value=round_outputs[5], visible=bool(round_outputs[5])), | |
| gr.update(value="", visible=False), | |
| gr.update(value=f"### Status: π Round {round_num + 1} judgment complete", visible=True) | |
| ] | |
| await asyncio.sleep(1) | |
| if not judgment["reprompt_needed"]: | |
| break | |
| revised_solutions = await orchestrator.get_revised_solutions(problem, solutions, judgment["judgment"]) | |
| history.append({"solutions": revised_solutions}) | |
| revision_content = f"## Round {round_num + 1} Revised Solutions\n**Revised Solutions:**\n- **{revised_solutions[0]['model_name']}**: {revised_solutions[0]['solution']}\n- **{revised_solutions[1]['model_name']}**: {revised_solutions[1]['solution']}" | |
| round_outputs[round_num * 2 + 1] = revision_content | |
| yield [ | |
| gr.update(value=initial_content, visible=True), | |
| gr.update(value=round_outputs[0], visible=bool(round_outputs[0])), | |
| gr.update(value=round_outputs[1], visible=bool(round_outputs[1])), | |
| gr.update(value=round_outputs[2], visible=bool(round_outputs[2])), | |
| gr.update(value=round_outputs[3], visible=bool(round_outputs[3])), | |
| gr.update(value=round_outputs[4], visible=bool(round_outputs[4])), | |
| gr.update(value=round_outputs[5], visible=bool(round_outputs[5])), | |
| gr.update(value="", visible=False), | |
| gr.update(value=f"### Status: π Round {round_num + 1} revised solutions generated", visible=True) | |
| ] | |
| await asyncio.sleep(1) | |
| solutions = revised_solutions | |
| final_report_content = orchestrator.generate_final_report(problem, history) | |
| yield [ | |
| gr.update(value=initial_content, visible=True), | |
| gr.update(value=round_outputs[0], visible=True), | |
| gr.update(value=round_outputs[1], visible=bool(round_outputs[1])), | |
| gr.update(value=round_outputs[2], visible=bool(round_outputs[2])), | |
| gr.update(value=round_outputs[3], visible=bool(round_outputs[3])), | |
| gr.update(value=round_outputs[4], visible=bool(round_outputs[4])), | |
| gr.update(value=round_outputs[5], visible=bool(round_outputs[5])), | |
| gr.update(value=final_report_content, visible=True), | |
| gr.update(value=f"### Status: β¨ Process complete! Completed {round_num + 1} round(s)", visible=True) | |
| ] | |
| with gr.Blocks(title="PolyThink Alpha", css=custom_css) as demo: | |
| with gr.Column(elem_classes=["app-header"]): | |
| gr.Markdown("<h1 class='app-title'>PolyThink Alpha</h1>", show_label=False) | |
| gr.Markdown("<p class='app-subtitle'>Multi-Agent Problem Solving System</p>", show_label=False) | |
| with gr.Row(): | |
| with gr.Column(scale=2): | |
| gr.Markdown("### Problem Input") | |
| problem_input = gr.Textbox( | |
| label="Problem", | |
| placeholder="Enter your problem or question here...", | |
| lines=10, | |
| max_lines=20 | |
| ) | |
| rounds_slider = gr.Slider(2, 6, value=2, step=1, label="Maximum Rounds") | |
| solve_button = gr.Button("Solve Problem", elem_classes=["primary-button"]) | |
| status_text = gr.Markdown("### Status: Ready", elem_classes=["status-bar"], visible=True) | |
| with gr.Column(): | |
| initial_solutions = gr.Markdown(elem_classes=["step-section"], visible=False) | |
| round_judgment_1 = gr.Markdown(elem_classes=["step-section"], visible=False) | |
| revised_solutions_1 = gr.Markdown(elem_classes=["step-section"], visible=False) | |
| round_judgment_2 = gr.Markdown(elem_classes=["step-section"], visible=False) | |
| revised_solutions_2 = gr.Markdown(elem_classes=["step-section"], visible=False) | |
| round_judgment_3 = gr.Markdown(elem_classes=["step-section"], visible=False) | |
| revised_solutions_3 = gr.Markdown(elem_classes=["step-section"], visible=False) | |
| final_report = gr.HTML(elem_classes=["final-report"], visible=False) | |
| solve_button.click( | |
| fn=solve_problem, | |
| inputs=[ | |
| problem_input, | |
| rounds_slider | |
| ], | |
| outputs=[ | |
| initial_solutions, | |
| round_judgment_1, | |
| revised_solutions_1, | |
| round_judgment_2, | |
| revised_solutions_2, | |
| round_judgment_3, | |
| revised_solutions_3, | |
| final_report, | |
| status_text | |
| ] | |
| ) | |
| return demo.queue() | |
| if __name__ == "__main__": | |
| demo = create_polythink_interface() | |
| demo.launch(share=True) |