import os import asyncio import gradio as gr import logging from huggingface_hub import InferenceClient import cohere import google.generativeai as genai from anthropic import Anthropic import openai from typing import List, Dict, Any, Optional from datetime import datetime # Configure logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # --- Agent Class --- class PolyThinkAgent: def __init__(self, model_name: str, model_path: str, role: str = "solver", api_provider: str = None): self.model_name = model_name self.model_path = model_path self.role = role self.api_provider = api_provider self.clients = {} self.hf_token = None self.inference = None def set_clients(self, clients: Dict[str, Any]): """Set the API clients for this agent""" self.clients = clients if "huggingface" in clients: self.hf_token = clients["huggingface"] if self.hf_token: self.inference = InferenceClient(token=self.hf_token) async def solve_problem(self, problem: str) -> Dict[str, Any]: """Generate a solution to the given problem""" try: if self.api_provider == "cohere" and "cohere" in self.clients: response = self.clients["cohere"].chat( model=self.model_path, message=f""" PROBLEM: {problem} INSTRUCTIONS: - Provide a clear, concise solution in one sentence. - Include brief reasoning in one additional sentence. - Do not repeat the solution or add extraneous text. """ ) solution = response.text.strip() return {"solution": solution, "model_name": self.model_name} elif self.api_provider == "anthropic" and "anthropic" in self.clients: response = self.clients["anthropic"].messages.create( model=self.model_path, messages=[{ "role": "user", "content": f""" PROBLEM: {problem} INSTRUCTIONS: - Provide a clear, concise solution in one sentence. - Include brief reasoning in one additional sentence. - Do not repeat the solution or add extraneous text. """ }] ) solution = response.content[0].text.strip() return {"solution": solution, "model_name": self.model_name} elif self.api_provider == "openai" and "openai" in self.clients: response = self.clients["openai"].chat.completions.create( model=self.model_path, max_tokens=100, messages=[{ "role": "user", "content": f""" PROBLEM: {problem} INSTRUCTIONS: - Provide a clear, concise solution in one sentence. - Include brief reasoning in one additional sentence. - Do not repeat the solution or add extraneous text. - Keep the response under 100 characters. """ }] ) solution = response.choices[0].message.content.strip() return {"solution": solution, "model_name": self.model_name} elif self.api_provider == "huggingface" and self.inference: prompt = f""" PROBLEM: {problem} INSTRUCTIONS: - Provide a clear, concise solution in one sentence. - Include brief reasoning in one additional sentence. - Do not repeat the solution or add extraneous text. - Keep the response under 100 characters. SOLUTION AND REASONING: """ result = self.inference.text_generation( prompt, model=self.model_path, max_new_tokens=5000, temperature=0.5 ) solution = result if isinstance(result, str) else result.generated_text return {"solution": solution.strip(), "model_name": self.model_name} elif self.api_provider == "gemini" and "gemini" in self.clients: model = self.clients["gemini"].GenerativeModel(self.model_path) try: response = model.generate_content( f""" PROBLEM: {problem} INSTRUCTIONS: - Provide a clear, concise solution in one sentence. - Include brief reasoning in one additional sentence. - Do not repeat the solution or add extraneous text. - Keep the response under 100 characters. """, generation_config=genai.types.GenerationConfig( temperature=0.5, ) ) # Check response validity and handle different response structures try: # First try to access text directly if available if hasattr(response, 'text'): solution = response.text.strip() # Otherwise check for candidates elif hasattr(response, 'candidates') and response.candidates: # Make sure we have candidates and parts before accessing if hasattr(response.candidates[0], 'content') and hasattr(response.candidates[0].content, 'parts'): solution = response.candidates[0].content.parts[0].text.strip() else: logger.warning(f"Gemini response has candidates but missing content structure: {response}") solution = "Error parsing API response; incomplete response structure." else: # Fallback for when candidates is empty logger.warning(f"Gemini API returned no candidates: {response}") solution = "No solution generated; API returned empty response." except Exception as e: logger.error(f"Error extracting text from Gemini response: {e}, response: {response}") solution = "Error parsing API response." except Exception as e: logger.error(f"Gemini API call failed: {e}") solution = f"API error: {str(e)}" return {"solution": solution, "model_name": self.model_name} else: return {"solution": f"Error: Missing API configuration for {self.api_provider}", "model_name": self.model_name} except Exception as e: logger.error(f"Error in {self.model_name}: {str(e)}") return {"solution": f"Error: {str(e)}", "model_name": self.model_name} async def evaluate_solutions(self, problem: str, solutions: List[Dict[str, Any]]) -> Dict[str, Any]: """Evaluate solutions from solver agents""" try: prompt = f""" PROBLEM: {problem} SOLUTIONS: 1. {solutions[0]['model_name']}: {solutions[0]['solution']} 2. {solutions[1]['model_name']}: {solutions[1]['solution']} INSTRUCTIONS: - Extract the numerical final answer from each solution (e.g., 68 from '16 + 52 = 68'). - Extract the key reasoning steps from each solution. - Apply strict evaluation criteria: * Numerical answers must match EXACTLY (including units and precision). * Key reasoning steps must align in approach and logic. - Output exactly: 'AGREEMENT: YES' if BOTH the numerical answers AND reasoning align perfectly. - Output 'AGREEMENT: NO' followed by a one-sentence explanation if either the answers or reasoning differ in ANY way. - Be conservative in declaring agreement - when in doubt, declare disagreement. - Do not add scoring, commentary, or extraneous text. EVALUATION: """ if self.api_provider == "gemini" and "gemini" in self.clients: # Instantiate the model for consistency and clarity model = self.clients["gemini"].GenerativeModel(self.model_path) # Use generate_content on the model instance response = model.generate_content( prompt, generation_config=genai.types.GenerationConfig( temperature=0.5, ) ) # Handle potential empty response or missing text attribute try: # First try to access text directly if available if hasattr(response, 'text'): judgment = response.text.strip() # Otherwise check for candidates elif hasattr(response, 'candidates') and response.candidates: # Make sure we have candidates and parts before accessing if hasattr(response.candidates[0], 'content') and hasattr(response.candidates[0].content, 'parts'): judgment = response.candidates[0].content.parts[0].text.strip() else: logger.warning(f"Gemini response has candidates but missing content structure: {response}") judgment = "AGREEMENT: NO - Unable to evaluate due to API response structure issue." else: # Fallback for when candidates is empty logger.warning(f"Empty response from Gemini API: {response}") judgment = "AGREEMENT: NO - Unable to evaluate due to API response issue." except Exception as e: logger.error(f"Error extracting text from Gemini response: {e}") judgment = "AGREEMENT: NO - Unable to evaluate due to API response issue." return {"judgment": judgment, "reprompt_needed": "AGREEMENT: NO" in judgment.upper()} elif self.api_provider == "openai" and "openai" in self.clients: response = self.clients["openai"].chat.completions.create( model=self.model_path, max_tokens=200, messages=[{"role": "user", "content": prompt}] ) judgment = response.choices[0].message.content.strip() return {"judgment": judgment, "reprompt_needed": "AGREEMENT: NO" in judgment.upper()} elif self.api_provider == "huggingface" and self.inference: result = self.inference.text_generation( prompt, model=self.model_path, max_new_tokens=200, temperature=0.5 ) judgment = result if isinstance(result, str) else result.generated_text return {"judgment": judgment.strip(), "reprompt_needed": "AGREEMENT: NO" in judgment.upper()} else: return {"judgment": f"Error: Missing API configuration for {self.api_provider}", "reprompt_needed": False} except Exception as e: logger.error(f"Error in judge: {str(e)}") return {"judgment": f"Error: {str(e)}", "reprompt_needed": False} async def reprompt_with_context(self, problem: str, solutions: List[Dict[str, Any]], judgment: str) -> Dict[str, Any]: """Generate a revised solution based on previous solutions and judgment""" try: prompt = f""" PROBLEM: {problem} PREVIOUS SOLUTIONS: 1. {solutions[0]['model_name']}: {solutions[0]['solution']} 2. {solutions[1]['model_name']}: {solutions[1]['solution']} JUDGE FEEDBACK: {judgment} INSTRUCTIONS: - Provide a revised, concise solution in one sentence. - Include brief reasoning in one additional sentence. - Address the judge's feedback. """ if self.api_provider == "cohere" and "cohere" in self.clients: response = self.clients["cohere"].chat( model=self.model_path, message=prompt ) solution = response.text.strip() return {"solution": solution, "model_name": self.model_name} elif self.api_provider == "anthropic" and "anthropic" in self.clients: response = self.clients["anthropic"].messages.create( model=self.model_path, max_tokens=100, messages=[{"role": "user", "content": prompt}] ) solution = response.content[0].text.strip() return {"solution": solution, "model_name": self.model_name} elif self.api_provider == "openai" and "openai" in self.clients: response = self.clients["openai"].chat.completions.create( model=self.model_path, max_tokens=100, messages=[{"role": "user", "content": prompt}] ) solution = response.choices[0].message.content.strip() return {"solution": solution, "model_name": self.model_name} elif self.api_provider == "huggingface" and self.inference: prompt += "\nREVISED SOLUTION AND REASONING:" result = self.inference.text_generation( prompt, model=self.model_path, max_new_tokens=500, temperature=0.5 ) solution = result if isinstance(result, str) else result.generated_text return {"solution": solution.strip(), "model_name": self.model_name} elif self.api_provider == "gemini" and "gemini" in self.clients: # Instantiate the model for consistency and clarity model = self.clients["gemini"].GenerativeModel(self.model_path) # Use generate_content response = model.generate_content( f""" PROBLEM: {problem} PREVIOUS SOLUTIONS: 1. {solutions[0]['model_name']}: {solutions[0]['solution']} 2. {solutions[1]['model_name']}: {solutions[1]['solution']} JUDGE FEEDBACK: {judgment} INSTRUCTIONS: - Provide a revised, concise solution in one sentence. - Include brief reasoning in one additional sentence. - Address the judge's feedback. """, generation_config=genai.types.GenerationConfig( temperature=0.5, max_output_tokens=100 ) ) # Handle potential empty response or missing text attribute try: # First try to access text directly if available if hasattr(response, 'text'): solution = response.text.strip() # Otherwise check for candidates elif hasattr(response, 'candidates') and response.candidates: # Make sure we have candidates and parts before accessing if hasattr(response.candidates[0], 'content') and hasattr(response.candidates[0].content, 'parts'): solution = response.candidates[0].content.parts[0].text.strip() else: logger.warning(f"Gemini response has candidates but missing content structure: {response}") solution = "Unable to generate a solution due to API response structure issue." else: # Fallback for when candidates is empty logger.warning(f"Empty response from Gemini API: {response}") solution = "Unable to generate a solution due to API response issue." except Exception as e: logger.error(f"Error extracting text from Gemini response: {e}") solution = "Unable to generate a solution due to API response issue." return {"solution": solution, "model_name": self.model_name} else: return {"solution": f"Error: Missing API configuration for {self.api_provider}", "model_name": self.model_name} except Exception as e: logger.error(f"Error in {self.model_name}: {str(e)}") return {"solution": f"Error: {str(e)}", "model_name": self.model_name} # --- Model Registry --- class ModelRegistry: @staticmethod def get_available_models(): """Get the list of available models grouped by provider (original list)""" return { "Anthropic": [ {"name": "Claude 3.5 Sonnet", "id": "claude-3-5-sonnet-20240620", "provider": "anthropic", "type": ["solver"], "icon": "📜"}, {"name": "Claude 3.7 Sonnet", "id": "claude-3-7-sonnet-20250219", "provider": "anthropic", "type": ["solver"], "icon": "📜"}, {"name": "Claude 3 Opus", "id": "claude-3-opus-20240229", "provider": "anthropic", "type": ["solver"], "icon": "📜"}, {"name": "Claude 3 Haiku", "id": "claude-3-haiku-20240307", "provider": "anthropic", "type": ["solver"], "icon": "📜"} ], "OpenAI": [ {"name": "GPT-4o", "id": "gpt-4o", "provider": "openai", "type": ["solver"], "icon": "🤖"}, {"name": "GPT-4 Turbo", "id": "gpt-4-turbo", "provider": "openai", "type": ["solver"], "icon": "🤖"}, {"name": "GPT-4", "id": "gpt-4", "provider": "openai", "type": ["solver"], "icon": "🤖"}, {"name": "GPT-3.5 Turbo", "id": "gpt-3.5-turbo", "provider": "openai", "type": ["solver"], "icon": "🤖"}, {"name": "OpenAI o1", "id": "o1", "provider": "openai", "type": ["solver", "judge"], "icon": "🤖"}, {"name": "OpenAI o3", "id": "o3", "provider": "openai", "type": ["solver", "judge"], "icon": "🤖"} ], "Cohere": [ {"name": "Cohere Command R", "id": "command-r-08-2024", "provider": "cohere", "type": ["solver"], "icon": "💬"}, {"name": "Cohere Command R+", "id": "command-r-plus-08-2024", "provider": "cohere", "type": ["solver"], "icon": "💬"} ], "Google": [ {"name": "Gemini 1.5 Pro", "id": "gemini-1.5-pro", "provider": "gemini", "type": ["solver"], "icon": "🌟"}, {"name": "Gemini 2.0 Flash Thinking Experimental 01-21", "id": "gemini-2.0-flash-thinking-exp-01-21", "provider": "gemini", "type": ["solver", "judge"], "icon": "🌟"}, {"name": "Gemini 2.5 Pro Experimental 03-25", "id": "gemini-2.5-pro-exp-03-25", "provider": "gemini", "type": ["solver", "judge"], "icon": "🌟"} ], "HuggingFace": [ {"name": "Llama 3.3 70B Instruct", "id": "meta-llama/Llama-3.3-70B-Instruct", "provider": "huggingface", "type": ["solver"], "icon": "🔥"}, {"name": "Llama 3.2 3B Instruct", "id": "meta-llama/Llama-3.2-3B-Instruct", "provider": "huggingface", "type": ["solver"], "icon": "🔥"}, {"name": "Llama 3.1 70B Instruct", "id": "meta-llama/Llama-3.1-70B-Instruct", "provider": "huggingface", "type": ["solver"], "icon": "🔥"}, {"name": "Mistral 7B Instruct v0.3", "id": "mistralai/Mistral-7B-Instruct-v0.3", "provider": "huggingface", "type": ["solver"], "icon": "🔥"}, {"name": "DeepSeek R1 Distill Qwen 32B", "id": "deepseek-ai/DeepSeek-R1-Distill-Qwen-32B", "provider": "huggingface", "type": ["solver", "judge"], "icon": "🔥"}, {"name": "DeepSeek Coder V2 Instruct", "id": "deepseek-ai/DeepSeek-Coder-V2-Instruct", "provider": "huggingface", "type": ["solver"], "icon": "🔥"}, {"name": "Qwen 2.5 72B Instruct", "id": "Qwen/Qwen2.5-72B-Instruct", "provider": "huggingface", "type": ["solver"], "icon": "🔥"}, {"name": "Qwen 2.5 Coder 32B Instruct", "id": "Qwen/Qwen2.5-Coder-32B-Instruct", "provider": "huggingface", "type": ["solver"], "icon": "🔥"}, {"name": "Qwen 2.5 Math 1.5B Instruct", "id": "Qwen/Qwen2.5-Math-1.5B-Instruct", "provider": "huggingface", "type": ["solver"], "icon": "🔥"}, {"name": "Gemma 3 27B Instruct", "id": "google/gemma-3-27b-it", "provider": "huggingface", "type": ["solver"], "icon": "🔥"}, {"name": "Phi-3 Mini 4K Instruct", "id": "microsoft/Phi-3-mini-4k-instruct", "provider": "huggingface", "type": ["solver"], "icon": "🔥"} ] } @staticmethod def get_solver_models(): """Get models suitable for solver role with provider grouping""" all_models = ModelRegistry.get_available_models() solver_models = {} for provider, models in all_models.items(): provider_models = [] for model in models: if "solver" in model["type"]: provider_models.append({ "name": f"{model['icon']} {model['name']} ({provider})", "id": model["id"], "provider": model["provider"] }) if provider_models: solver_models[provider] = provider_models return solver_models @staticmethod def get_judge_models(): """Get only specific reasoning models suitable for judge role with provider grouping""" all_models = ModelRegistry.get_available_models() judge_models = {} allowed_judge_models = [ "Gemini 2.0 Flash Thinking Experimental 01-21 (Google)", "DeepSeek R1 (HuggingFace)", "Gemini 2.5 Pro Experimental 03-25 (Google)", "OpenAI o1 (OpenAI)", "OpenAI o3 (OpenAI)" ] for provider, models in all_models.items(): provider_models = [] for model in models: full_name = f"{model['name']} ({provider})" if "judge" in model["type"] and full_name in allowed_judge_models: provider_models.append({ "name": f"{model['icon']} {model['name']} ({provider})", "id": model["id"], "provider": model["provider"] }) if provider_models: judge_models[provider] = provider_models return judge_models # --- Orchestrator Class --- class PolyThinkOrchestrator: def __init__(self, solver1_config=None, solver2_config=None, judge_config=None, api_clients=None): self.solvers = [] self.judge = None self.api_clients = api_clients or {} if solver1_config: solver1 = PolyThinkAgent( model_name=solver1_config["name"].split(" ", 1)[1].rsplit(" (", 1)[0] if " " in solver1_config["name"] else solver1_config["name"], model_path=solver1_config["id"], api_provider=solver1_config["provider"] ) solver1.set_clients(self.api_clients) self.solvers.append(solver1) if solver2_config: solver2 = PolyThinkAgent( model_name=solver2_config["name"].split(" ", 1)[1].rsplit(" (", 1)[0] if " " in solver2_config["name"] else solver2_config["name"], model_path=solver2_config["id"], api_provider=solver2_config["provider"] ) solver2.set_clients(self.api_clients) self.solvers.append(solver2) if judge_config: self.judge = PolyThinkAgent( model_name=judge_config["name"].split(" ", 1)[1].rsplit(" (", 1)[0] if " " in judge_config["name"] else judge_config["name"], model_path=judge_config["id"], role="judge", api_provider=judge_config["provider"] ) self.judge.set_clients(self.api_clients) async def get_initial_solutions(self, problem: str) -> List[Dict[str, Any]]: tasks = [solver.solve_problem(problem) for solver in self.solvers] return await asyncio.gather(*tasks) async def get_judgment(self, problem: str, solutions: List[Dict[str, Any]]) -> Dict[str, Any]: if self.judge: return await self.judge.evaluate_solutions(problem, solutions) return {"judgment": "No judge configured", "reprompt_needed": False} async def get_revised_solutions(self, problem: str, solutions: List[Dict[str, Any]], judgment: str) -> List[Dict[str, Any]]: tasks = [solver.reprompt_with_context(problem, solutions, judgment) for solver in self.solvers] return await asyncio.gather(*tasks) def generate_final_report(self, problem: str, history: List[Dict[str, Any]]) -> str: report = f"""

🔍 Final Analysis Report

Problem Statement

{problem}
""" for i, step in enumerate(history, 1): if "solutions" in step and i == 1: report += f"""
1

Initial Solutions

""" for sol in step["solutions"]: # Fix the backslash issue by using a different approach to escape single quotes escaped_solution = sol['solution'].replace("'", "'") report += f"""
{sol['model_name']}
""" report += """
""" elif "judgment" in step: is_agreement = "AGREEMENT: YES" in step["judgment"].upper() judgment_class = "agreement" if is_agreement else "disagreement" judgment_icon = "✅" if is_agreement else "❌" report += f"""
{i}

Evaluation {(i+1)//2}

{judgment_icon}
{step["judgment"]}
""" elif "solutions" in step and i > 1: round_num = (i+1)//2 report += f"""
{i}

Revised Solutions (Round {round_num})

""" for sol in step["solutions"]: # Fix the backslash issue by using a different approach to escape single quotes escaped_solution = sol['solution'].replace("'", "'") report += f"""
{sol['model_name']}
""" report += """
""" last_judgment = next((step.get("judgment", "") for step in reversed(history) if "judgment" in step), "") if "AGREEMENT: YES" in last_judgment.upper(): confidence = "100%" if len(history) == 2 else "80%" # Get the final agreed-upon solution final_solutions = next((step["solutions"] for step in reversed(history) if "solutions" in step), None) if final_solutions: # Extract the numerical answer and reasoning from the first solution solution_parts = final_solutions[0]["solution"].split("\n") answer = solution_parts[0] if solution_parts else final_solutions[0]["solution"] reasoning = solution_parts[1] if len(solution_parts) > 1 else "No additional reasoning provided." report += f"""

Final Agreed Solution

Answer: {answer}

Reasoning: {reasoning}

Confidence level: {confidence}

""" else: report += f"""

Conclusion

Models reached AGREEMENT

Confidence level: {confidence}

""" else: report += f"""

Conclusion

Models could not reach agreement

Review all solutions above for best answer

""" report += """
""" return report # --- Gradio Interface --- def create_polythink_interface(): css = """ /* Reverted to Original Black Theme */ body { background: #000000; color: #ffffff; font-family: 'Arial', sans-serif; } .gradio-container { background: #1a1a1a; border-radius: 10px; box-shadow: 0 4px 15px rgba(0, 0, 0, 0.5); padding: 20px; } .gr-button { background: linear-gradient(45deg, #666666, #999999); color: #ffffff; border: none; padding: 10px 20px; border-radius: 5px; transition: all 0.3s ease; } .gr-button:hover { background: linear-gradient(45deg, #555555, #888888); transform: translateY(-2px); } .gr-textbox { background: #333333; color: #ffffff; border: 1px solid #444444; border-radius: 5px; padding: 10px; } .gr-slider { background: #333333; border-radius: 5px; } .gr-slider .track-fill { background: #cccccc; } .step-section { background: #1a1a1a; border-radius: 8px; padding: 15px; margin-bottom: 20px; box-shadow: 0 2px 10px rgba(0, 0, 0, 0.3); } .step-section h3 { color: #cccccc; margin-top: 0; font-size: 1.5em; } .step-section p { color: #aaaaaa; line-height: 1.6; } .step-section code { background: #333333; padding: 2px 6px; border-radius: 3px; color: #ff6b6b; } .step-section strong { color: #ffffff; } /* Enhanced Status Bar */ .status-bar { background: #1a1a1a; padding: 15px; border-radius: 5px; font-size: 1.1em; margin-bottom: 20px; border-left: 4px solid #666666; position: relative; overflow: hidden; } .status-progress { position: absolute; bottom: 0; left: 0; height: 3px; background: linear-gradient(90deg, #4CAF50, #2196F3); transition: width 0.3s ease; } .status-icon { display: inline-block; margin-right: 10px; font-size: 1.2em; } .status-message { display: inline-block; vertical-align: middle; } .status-stage { display: inline-block; margin-left: 15px; padding: 4px 8px; border-radius: 12px; font-size: 0.9em; background: rgba(255, 255, 255, 0.1); } .status-stage.active { background: rgba(76, 175, 80, 0.2); color: #4CAF50; } .status-stage.completed { background: rgba(33, 150, 243, 0.2); color: #2196F3; } /* Agreement/Disagreement styling */ .agreement { color: #4CAF50 !important; border: 1px solid #4CAF50; background-color: rgba(76, 175, 80, 0.1) !important; padding: 10px; border-radius: 5px; } .disagreement { color: #F44336 !important; border: 1px solid #F44336; background-color: rgba(244, 67, 54, 0.1) !important; padding: 10px; border-radius: 5px; } /* Enhanced Final Report Styling */ .final-report { background: #111111; padding: 0; border-radius: 8px; box-shadow: 0 4px 15px rgba(0, 0, 0, 0.5); margin-top: 20px; overflow: hidden; } .final-report-container { font-family: 'Arial', sans-serif; } .final-report-title { background: linear-gradient(45deg, #333333, #444444); color: #ffffff; padding: 20px; margin: 0; border-bottom: 1px solid #555555; font-size: 24px; text-align: center; } .problem-container { background: #222222; padding: 15px 20px; margin: 0; border-bottom: 1px solid #333333; } .problem-title { color: #bbbbbb; margin: 0 0 10px 0; font-size: 18px; } .problem-content { background: #333333; padding: 15px; border-radius: 5px; font-family: monospace; font-size: 16px; color: #ffffff; } .timeline-container { padding: 20px; } .timeline-item { display: flex; margin-bottom: 25px; position: relative; } .timeline-item:before { content: ''; position: absolute; left: 15px; top: 30px; bottom: -25px; width: 2px; background: #444444; z-index: 0; } .timeline-item:last-child:before { display: none; } .timeline-marker { width: 34px; height: 34px; border-radius: 50%; background: #333333; display: flex; align-items: center; justify-content: center; font-weight: bold; position: relative; z-index: 1; border: 2px solid #555555; margin-right: 15px; } .timeline-content { flex: 1; background: #1d1d1d; border-radius: 5px; padding: 15px; border: 1px solid #333333; } .timeline-content h4 { margin-top: 0; margin-bottom: 15px; color: #cccccc; border-bottom: 1px solid #333333; padding-bottom: 8px; } .solutions-container { display: flex; flex-wrap: wrap; gap: 10px; } .solution-item { flex: 1; min-width: 250px; background: #252525; border-radius: 5px; overflow: hidden; border: 1px solid #383838; transition: all 0.3s ease; } .solution-item:hover { transform: translateY(-2px); box-shadow: 0 4px 15px rgba(0, 0, 0, 0.3); } .solution-header { background: #333333; padding: 8px 12px; font-weight: bold; color: #dddddd; border-bottom: 1px solid #444444; display: flex; justify-content: space-between; align-items: center; } .solution-body { padding: 12px; color: #bbbbbb; } .judgment-container { display: flex; align-items: center; padding: 10px; border-radius: 5px; } .judgment-icon { font-size: 24px; margin-right: 15px; } .conclusion-container { margin-top: 30px; border-radius: 5px; padding: 5px 15px 15px; } .conclusion-content { display: flex; align-items: center; } .conclusion-icon { font-size: 36px; margin-right: 20px; } .conclusion-text { flex: 1; } .conclusion-text p { margin: 5px 0; } /* Enhanced Header styling with animation */ .app-header { background: linear-gradient(-45deg, #222222, #333333, #444444, #333333); background-size: 400% 400%; animation: gradient 15s ease infinite; padding: 20px; border-radius: 10px; margin-bottom: 20px; box-shadow: 0 4px 10px rgba(0, 0, 0, 0.3); border: 1px solid #444444; position: relative; overflow: hidden; } @keyframes gradient { 0% { background-position: 0% 50%; } 50% { background-position: 100% 50%; } 100% { background-position: 0% 50%; } } .app-header::before { content: ''; position: absolute; top: 0; left: 0; right: 0; bottom: 0; background: linear-gradient(45deg, rgba(255,255,255,0.1) 0%, rgba(255,255,255,0) 100%); pointer-events: none; } .app-title { font-size: 28px; margin: 0 0 10px 0; background: -webkit-linear-gradient(45deg, #cccccc, #ffffff); -webkit-background-clip: text; -webkit-text-fill-color: transparent; display: inline-block; position: relative; } .app-subtitle { font-size: 16px; color: #aaaaaa; margin: 0; position: relative; } /* Button style */ .primary-button { background: linear-gradient(45deg, #555555, #777777) !important; border: none !important; color: white !important; padding: 12px 24px !important; font-weight: bold !important; transition: all 0.3s ease !important; box-shadow: 0 4px 10px rgba(0, 0, 0, 0.3) !important; } .primary-button:hover { transform: translateY(-2px) !important; box-shadow: 0 6px 15px rgba(0, 0, 0, 0.4) !important; background: linear-gradient(45deg, #666666, #888888) !important; } /* Mobile Responsiveness */ @media (max-width: 768px) { .solutions-container { flex-direction: column; } .solution-item { min-width: 100%; margin-bottom: 10px; } .app-title { font-size: 24px; } .app-subtitle { font-size: 14px; } .timeline-item { flex-direction: column; } .timeline-marker { margin-bottom: 10px; } } /* Solution Controls */ .solution-controls { display: flex; gap: 8px; } .copy-btn, .toggle-btn { background: none; border: none; color: #bbbbbb; cursor: pointer; padding: 4px 8px; border-radius: 4px; transition: all 0.2s ease; font-size: 1.1em; } .copy-btn:hover, .toggle-btn:hover { background: rgba(255, 255, 255, 0.1); color: #ffffff; } .copy-btn:active, .toggle-btn:active { transform: scale(0.95); } /* Solution Item Enhancements */ .solution-item { position: relative; overflow: hidden; } .solution-item::before { content: ''; position: absolute; top: 0; left: 0; width: 4px; height: 100%; background: linear-gradient(to bottom, #4CAF50, #2196F3); opacity: 0.5; } .solution-body { padding: 15px; background: rgba(0, 0, 0, 0.2); border-top: 1px solid rgba(255, 255, 255, 0.1); transition: all 0.3s ease; } /* Loading States */ .loading-skeleton { background: linear-gradient(90deg, var(--card-bg) 25%, var(--border-color) 50%, var(--card-bg) 75%); background-size: 200% 100%; animation: loading 1.5s infinite; border-radius: 8px; margin: 10px 0; } @keyframes loading { 0% { background-position: 200% 0; } 100% { background-position: -200% 0; } } /* Error States */ .error-message { background: rgba(255, 59, 48, 0.1); border: 1px solid rgba(255, 59, 48, 0.2); border-radius: 8px; padding: 15px; margin: 10px 0; color: #ff3b30; font-size: 14px; line-height: 1.5; } .error-message::before { content: "⚠️"; margin-right: 8px; } /* Light Theme Error States */ body.light-theme .error-message { background: rgba(255, 59, 48, 0.05); border-color: rgba(255, 59, 48, 0.1); } /* Status Bar States */ .status-bar { display: flex; align-items: center; gap: 10px; padding: 10px; background: var(--card-bg); border-radius: 8px; margin: 10px 0; font-size: 14px; } .status-icon { font-size: 16px; } .status-message { flex: 1; color: var(--text-color); } .status-stage { padding: 4px 8px; border-radius: 4px; font-size: 12px; background: var(--border-color); color: var(--text-color); opacity: 0.7; } .status-stage.active { background: var(--primary-color); color: white; opacity: 1; } .status-stage.completed { background: var(--success-color); color: white; opacity: 1; } .status-progress { height: 4px; background: var(--primary-color); border-radius: 2px; transition: width 0.3s ease; } /* Light Theme Status Bar */ body.light-theme .status-bar { background: white; border: 1px solid var(--border-color); } body.light-theme .status-stage { background: #f0f0f0; } /* Mobile Responsive Status Bar */ @media (max-width: 768px) { .status-bar { flex-direction: column; align-items: flex-start; gap: 5px; } .status-stage { font-size: 10px; padding: 2px 4px; } } """ # Hardcoded model configurations solver1_config = { "name": "Cohere Command R", "id": "command-r-08-2024", "provider": "cohere" } solver2_config = { "name": "Llama 3.2 3B Instruct", "id": "meta-llama/Llama-3.2-3B-Instruct", "provider": "huggingface" } judge_config = { "name": "Gemini 2.0 Flash Thinking Experimental 01-21", "id": "gemini-2.0-flash-thinking-exp-01-21", "provider": "gemini" } async def solve_problem(problem: str, max_rounds: int): try: # Get API keys from environment variables api_clients = {} # Cohere client cohere_key = os.getenv("COHERE_API_KEY") if cohere_key: api_clients["cohere"] = cohere.Client(cohere_key) # Hugging Face client hf_key = os.getenv("HF_API_KEY") if hf_key: api_clients["huggingface"] = hf_key # Gemini client gemini_key = os.getenv("GEMINI_API_KEY") if gemini_key: genai.configure(api_key=gemini_key) api_clients["gemini"] = genai # Check if all required API keys are present required_providers = {solver1_config["provider"], solver2_config["provider"], judge_config["provider"]} missing_keys = [p for p in required_providers if p not in api_clients] if missing_keys: yield [ gr.update(value=f"""
Missing API keys for {', '.join(missing_keys)}. Please check your environment variables.
""", visible=True), *[gr.update(visible=False) for _ in range(6)], gr.update(visible=False), gr.update(value=f"""
Missing API keys for {', '.join(missing_keys)}
""", visible=True), gr.update(visible=False) ] return # Show loading state yield [ gr.update(value="""
""", visible=True), *[gr.update(visible=False) for _ in range(6)], gr.update(visible=False), gr.update(value=f"""
Initializing AI team...
""", visible=True), gr.update(visible=False) ] orchestrator = PolyThinkOrchestrator(solver1_config, solver2_config, judge_config, api_clients) # Show loading state for initial solutions yield [ gr.update(value="""
""", visible=True), *[gr.update(visible=False) for _ in range(6)], gr.update(visible=False), gr.update(value=f"""
🤖 Generating initial solutions...
""", visible=True), gr.update(visible=False) ] try: initial_solutions = await orchestrator.get_initial_solutions(problem) except Exception as e: yield [ gr.update(value=f"""
Error generating initial solutions: {str(e)}

This appears to be a server-side issue with the AI APIs. Please try again later.
""", visible=True), *[gr.update(visible=False) for _ in range(6)], gr.update(visible=False), gr.update(value=f"""
Error: Failed to generate initial solutions
""", visible=True), gr.update(visible=False) ] return initial_content = f"## Initial Solutions\n**Problem:** `{problem}`\n\n**Solutions:**\n- **{initial_solutions[0]['model_name']}**: {initial_solutions[0]['solution']}\n- **{initial_solutions[1]['model_name']}**: {initial_solutions[1]['solution']}" # Show initial solutions yield [ gr.update(value=initial_content, visible=True), *[gr.update(visible=False) for _ in range(6)], gr.update(visible=False), gr.update(value=f"""
📋 Initial solutions generated Initial Solutions
""", visible=True), gr.update(visible=True) ] await asyncio.sleep(1) solutions = initial_solutions history = [{"solutions": initial_solutions}] round_outputs = [""] * 6 for round_num in range(int(max_rounds)): # Show loading state for judgment yield [ gr.update(value=initial_content, visible=True), *[gr.update(value=output, visible=bool(output)) for output in round_outputs], gr.update(visible=False), gr.update(value=f"""
⚖️ Evaluating solutions (Round {round_num + 1})... Initial Solutions Judgment {round_num + 1}
""", visible=True), gr.update(visible=True) ] try: judgment = await orchestrator.get_judgment(problem, solutions) except Exception as e: yield [ gr.update(value=initial_content, visible=True), *[gr.update(value=output, visible=bool(output)) for output in round_outputs], gr.update(visible=False), gr.update(value=f"""
Error: Failed to evaluate solutions
""", visible=True), gr.update(visible=True) ] return history.append({"judgment": judgment["judgment"]}) is_agreement = "AGREEMENT: YES" in judgment["judgment"].upper() agreement_class = "agreement" if is_agreement else "disagreement" agreement_icon = "✅" if is_agreement else "❌" judgment_content = f"## Round {round_num + 1} Judgment\n**Evaluation:**
{agreement_icon} {judgment['judgment']}
" round_outputs[round_num * 2] = judgment_content progress = (round_num * 2 + 1) / (int(max_rounds) * 2) * 100 # Show judgment yield [ gr.update(value=initial_content, visible=True), *[gr.update(value=output, visible=bool(output)) for output in round_outputs], gr.update(visible=False), gr.update(value=f"""
🔍 Round {round_num + 1} judgment complete Initial Solutions Judgment {round_num + 1}
""", visible=True), gr.update(visible=True) ] await asyncio.sleep(1) if not judgment["reprompt_needed"]: break # Show loading state for revised solutions yield [ gr.update(value=initial_content, visible=True), *[gr.update(value=output, visible=bool(output)) for output in round_outputs], gr.update(visible=False), gr.update(value=f"""
🔄 Generating revised solutions (Round {round_num + 1})... Initial Solutions Judgment {round_num + 1} Revision {round_num + 1}
""", visible=True), gr.update(visible=True) ] try: revised_solutions = await orchestrator.get_revised_solutions(problem, solutions, judgment["judgment"]) except Exception as e: yield [ gr.update(value=initial_content, visible=True), *[gr.update(value=output, visible=bool(output)) for output in round_outputs], gr.update(visible=False), gr.update(value=f"""
Error: Failed to generate revised solutions
""", visible=True), gr.update(visible=True) ] return history.append({"solutions": revised_solutions}) revision_content = f"## Round {round_num + 1} Revised Solutions\n**Revised Solutions:**\n- **{revised_solutions[0]['model_name']}**: {revised_solutions[0]['solution']}\n- **{revised_solutions[1]['model_name']}**: {revised_solutions[1]['solution']}" round_outputs[round_num * 2 + 1] = revision_content progress = (round_num * 2 + 2) / (int(max_rounds) * 2) * 100 # Show revised solutions yield [ gr.update(value=initial_content, visible=True), *[gr.update(value=output, visible=bool(output)) for output in round_outputs], gr.update(visible=False), gr.update(value=f"""
🔄 Round {round_num + 1} revised solutions generated Initial Solutions Judgment {round_num + 1} Revision {round_num + 1}
""", visible=True), gr.update(visible=True) ] await asyncio.sleep(1) solutions = revised_solutions # Show loading state for final report yield [ gr.update(value=initial_content, visible=True), *[gr.update(value=output, visible=bool(output)) for output in round_outputs], gr.update(value="""
""", visible=True), gr.update(value=f"""
📊 Generating final report... Initial Solutions All Judgments All Revisions
""", visible=True), gr.update(visible=True) ] final_report_content = orchestrator.generate_final_report(problem, history) # Show final report yield [ gr.update(value=initial_content, visible=True), *[gr.update(value=output, visible=bool(output)) for output in round_outputs], gr.update(value=final_report_content, visible=True), gr.update(value=f"""
Process complete! Completed {round_num + 1} round(s) Initial Solutions All Judgments All Revisions
""", visible=True), gr.update(visible=True) ] except Exception as e: yield [ gr.update(value=f"""
An unexpected error occurred: {str(e)}

This appears to be a server-side issue with the AI APIs. Please try again later.
""", visible=True), *[gr.update(visible=False) for _ in range(6)], gr.update(visible=False), gr.update(value=f"""
Error: An unexpected error occurred
""", visible=True), gr.update(visible=False) ] def export_report(problem: str, report: str) -> None: """Export the problem-solving report to a text file.""" timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") filename = f"polythink_report_{timestamp}.txt" content = f"""PolyThink Problem Solving Report Generated: {datetime.now().strftime("%Y-%m-%d %H:%M:%S")} Problem: {problem} Report: {report} """ # Create a download link html = f""" """ return gr.update(value=html, visible=True) def share_results(problem: str, report: str) -> None: """Share the problem-solving results.""" # Create a shareable text share_text = f"""Check out this problem-solving result from PolyThink! Problem: {problem} {report} Generated with PolyThink Alpha """ # Try to use the Web Share API if available html = f""" """ return gr.update(value=html, visible=True) with gr.Blocks(title="PolyThink Alpha", css=css) as demo: with gr.Column(elem_classes=["app-header"]): gr.Markdown("

PolyThink Alpha

", show_label=False) gr.Markdown("

Multi-Agent Problem Solving System

", show_label=False) with gr.Row(elem_classes=["header-controls"]): gr.Button("🌙 Theme", elem_classes=["theme-toggle"], show_label=False) gr.Button("❓ Help", elem_classes=["help-button"], show_label=False) with gr.Row(): with gr.Column(scale=2): gr.Markdown("### Problem Input") problem_input = gr.Textbox( label="Problem", placeholder="Enter your problem here...", lines=5, elem_classes=["problem-input"] ) with gr.Row(): max_rounds = gr.Slider( minimum=1, maximum=5, value=3, step=1, label="Maximum Rounds", elem_classes=["rounds-slider"] ) solve_btn = gr.Button("Solve", variant="primary", elem_classes=["solve-button"]) status_bar = gr.HTML( value="", elem_classes=["status-bar"], visible=False ) with gr.Column(scale=3): with gr.Tabs() as output_tabs: with gr.TabItem("Process", elem_classes=["process-tab"]): initial_solutions = gr.Markdown(visible=False) round_outputs = [gr.Markdown(visible=False) for _ in range(6)] with gr.TabItem("Final Report", elem_classes=["report-tab"]): final_report = gr.Markdown(visible=False) with gr.Row(elem_classes=["action-buttons"], visible=False) as action_buttons: export_btn = gr.Button("Export Report", elem_classes=["action-button"]) share_btn = gr.Button("Share Results", elem_classes=["action-button"]) def toggle_theme(): return """ """ def show_help(): return """ """ solve_btn.click( fn=show_loading_state, outputs=[ initial_solutions, *round_outputs, final_report, status_bar, action_buttons ] ).then( fn=solve_problem, inputs=[problem_input, max_rounds], outputs=[ initial_solutions, *round_outputs, final_report, status_bar, action_buttons ] ) export_btn.click( fn=export_report, inputs=[problem_input, final_report], outputs=None ) share_btn.click( fn=share_results, inputs=[problem_input, final_report], outputs=None ) return demo.queue() if __name__ == "__main__": demo = create_polythink_interface() demo.launch(share=True)