Spaces:
Running
Running
| ''' | |
| Guidelines for Creating and Utilizing Tools in tools.py: | |
| 1. Initial Assessment: | |
| Review Existing Tools: | |
| Before adding new functions, thoroughly read through tools.py to understand the existing tools and their functionalities. | |
| Determine if an existing tool can be adapted or extended to meet the current needs, avoiding redundancy. | |
| 2. Tool Creation and Function Design: | |
| Create Within tools.py: | |
| Add new tools as functions within tools.py. If tools.py doesn't exist, create it. | |
| Ensure each function is self-contained and focused on a single task for modularity. | |
| Design for Importing: | |
| Design tools to be imported and executed via terminal commands. Do not include execution code that runs when tools.py is imported. | |
| Follow Best Practices: | |
| Use clear and descriptive function names that reflect their general purpose. | |
| Include docstrings for each function, detailing the purpose, parameters, and expected outputs. | |
| Adhere to PEP 8 style guidelines for readable and maintainable code. | |
| 3. Generalization: | |
| Broad Input Handling: | |
| Design functions to handle a wide range of inputs, enhancing reusability for future tasks. | |
| Accept parameters that allow the function to be applicable in various scenarios (e.g., any stock ticker, URL, or data file). | |
| Flexible Functionality: | |
| Ensure functions can process different data types and structures when applicable. | |
| Avoid hardcoding values; use parameters and defaults where necessary. | |
| Modularity: | |
| If a task involves multiple distinct operations, split them into separate functions. | |
| This approach enhances clarity and allows for individual functions to be reused independently. | |
| 4. Execution and Script Management: | |
| Import and Run via Terminal: | |
| Do not execute tools.py directly. Instead, import the necessary functions and run them using the terminal. | |
| Use the command: | |
| bash | |
| Copy code | |
| python -c "from tools import function_name; function_name(args)" | |
| Replace function_name and args with the appropriate function and arguments. | |
| Avoid Additional Scripts: | |
| Do not create extra .py files or scripts for execution purposes. | |
| Keep all tool functions within tools.py and execute them using the import method shown above. | |
| 5. Output: | |
| Console Printing: | |
| Ensure that all tools print their output directly to the console. | |
| Format the output for readability, using clear messages and organizing data in a logical manner. | |
| No Return Statements for Output: | |
| While functions can return values for internal use, the primary results should be displayed using print() statements. | |
| 6. Error Handling: | |
| Input Validation: | |
| Validate all input parameters to catch errors before execution. | |
| Provide informative error messages to guide correct usage. | |
| Exception Management: | |
| Use try-except blocks to handle potential exceptions without crashing the program. | |
| Log errors where appropriate, and ensure they don't expose sensitive information. | |
| Debugging and Testing: | |
| Test functions with various inputs, including edge cases, to ensure robustness. | |
| If errors are found, revise and debug the functions promptly. | |
| 7. Post-Creation: | |
| Execute to Fulfill Requests: | |
| After creating or updating tools, execute them as needed to fulfill user requests unless the request was solely for tool creation. | |
| Documentation: | |
| Update any relevant documentation or comments within tools.py to reflect new additions or changes. | |
| Consider maintaining a usage example within the docstring for complex functions. | |
| 8. Maintenance and Refactoring: | |
| Regular Review: | |
| Periodically review tools.py to identify opportunities for optimization and improvement. | |
| Remove or update deprecated functions that are no longer effective or necessary. | |
| Enhance Generalization: | |
| Refactor functions to improve their general applicability as new requirements emerge. | |
| Stay vigilant for patterns that can be abstracted into more general solutions. | |
| 9. Compliance and Security: | |
| Data Protection: | |
| Ensure that tools handle data securely, especially when dealing with sensitive information. | |
| Avoid hardcoding credentials or exposing private data through outputs. | |
| Licensing and Dependencies: | |
| Verify that any third-party libraries used are properly licensed and documented. | |
| Include installation instructions for dependencies if they are not part of the standard library. | |
| ''' | |
| # Your tools will be defined below this line | |
| import os | |
| from datetime import datetime, timedelta | |
| import PyPDF2 | |
| from pathlib import Path | |
| import json | |
| import re | |
| import requests | |
| from typing import List, Dict, Optional, Union | |
| import subprocess | |
| import sys | |
| from git import Repo | |
| def get_current_month_folder() -> str: | |
| """Returns the path to the current month's folder.""" | |
| base_path = r"C:\Users\admin\Dropbox\Current\2024" | |
| current_month = datetime.now().strftime("%B") # Full month name | |
| return os.path.join(base_path, current_month) | |
| def get_pdfs_for_date(target_date: datetime = None) -> List[str]: | |
| """ | |
| Finds all PDFs saved on a specific date in the current month's folder structure. | |
| Args: | |
| target_date: datetime object for the target date. If None, uses today's date. | |
| Returns a list of full paths to PDF files. | |
| """ | |
| if target_date is None: | |
| target_date = datetime.now() | |
| target_date_str = target_date.strftime("%Y-%m-%d") | |
| month_folder = get_current_month_folder() | |
| pdf_files = [] | |
| # Walk through all subdirectories | |
| for root, _, files in os.walk(month_folder): | |
| for file in files: | |
| if file.lower().endswith('.pdf'): | |
| file_path = os.path.join(root, file) | |
| # Get file's modification time | |
| mod_time = datetime.fromtimestamp(os.path.getmtime(file_path)) | |
| if mod_time.strftime("%Y-%m-%d") == target_date_str: | |
| pdf_files.append(file_path) | |
| return pdf_files | |
| def extract_text_from_pdf(pdf_path: str) -> str: | |
| """Extract text content from a PDF file.""" | |
| try: | |
| with open(pdf_path, 'rb') as file: | |
| reader = PyPDF2.PdfReader(file) | |
| text = "" | |
| for page in reader.pages: | |
| text += page.extract_text() + "\n" | |
| return text | |
| except Exception as e: | |
| print(f"Error processing {pdf_path}: {str(e)}") | |
| return "" | |
| def summarize_pdfs_for_date(target_date: datetime = None): | |
| """ | |
| Main function to process and summarize PDFs for a specific date. | |
| Args: | |
| target_date: datetime object for the target date. If None, uses today's date. | |
| Prints summary to console and saves to a JSON file. | |
| """ | |
| if target_date is None: | |
| target_date = datetime.now() | |
| pdfs = get_pdfs_for_date(target_date) | |
| if not pdfs: | |
| print(f"No PDFs found for {target_date.strftime('%Y-%m-%d')}") | |
| return | |
| summaries = {} | |
| for pdf_path in pdfs: | |
| print(f"Processing: {pdf_path}") | |
| text = extract_text_from_pdf(pdf_path) | |
| # Basic summary: first 500 characters of text | |
| summary = text[:500] + "..." if len(text) > 500 else text | |
| # Store in dictionary with filename as key | |
| filename = os.path.basename(pdf_path) | |
| summaries[filename] = { | |
| "path": pdf_path, | |
| "summary": summary, | |
| "processed_date": datetime.now().strftime("%Y-%m-%d %H:%M:%S") | |
| } | |
| # Save summaries to JSON file in the current directory | |
| output_dir = "summaries" | |
| os.makedirs(output_dir, exist_ok=True) | |
| output_file = os.path.join(output_dir, f"summaries_{target_date.strftime('%Y-%m-%d')}.json") | |
| with open(output_file, 'w', encoding='utf-8') as f: | |
| json.dump(summaries, f, indent=4, ensure_ascii=False) | |
| print(f"\nProcessed {len(pdfs)} PDFs") | |
| print(f"Summaries saved to: {output_file}") | |
| # Print summaries to console | |
| for filename, data in summaries.items(): | |
| print(f"\n{'='*80}\n{filename}") | |
| print(f"Path: {data['path']}") | |
| print(f"\nSummary:\n{data['summary'][:200]}...") | |
| class WebSearchTool: | |
| """ | |
| A tool for performing web searches using the Perplexity API. | |
| Designed to be the default web search mechanism for context-requiring queries. | |
| """ | |
| def __init__(self, api_key: Optional[str] = None): | |
| """ | |
| Initialize the WebSearchTool. | |
| Args: | |
| api_key: Perplexity API key. If None, will try to get from environment variable. | |
| """ | |
| self.api_key = api_key or os.getenv("PERPLEXITY_API_KEY") | |
| if not self.api_key: | |
| raise ValueError("Perplexity API key must be provided or set in PERPLEXITY_API_KEY environment variable") | |
| self.headers = { | |
| "Authorization": f"Bearer {self.api_key}", | |
| "Content-Type": "application/json" | |
| } | |
| def search(self, query: str, max_results: int = 5) -> Dict: | |
| """ | |
| Perform a web search using Perplexity API. | |
| Args: | |
| query: The search query | |
| max_results: Maximum number of results to return | |
| Returns: | |
| Dictionary containing search results and metadata | |
| """ | |
| try: | |
| # Make the API request | |
| response = requests.post( | |
| "https://api.perplexity.ai/chat/completions", | |
| headers=self.headers, | |
| json={ | |
| "model": "llama-3.1-sonar-huge-128k-online", | |
| "messages": [ | |
| { | |
| "role": "system", | |
| "content": "You are a helpful assistant that provides accurate and concise answers based on web search results." | |
| }, | |
| { | |
| "role": "user", | |
| "content": query | |
| } | |
| ] | |
| } | |
| ) | |
| response.raise_for_status() | |
| data = response.json() | |
| # Extract answer from the response | |
| answer = data.get("choices", [{}])[0].get("message", {}).get("content", "No answer found") | |
| # Process and format the results | |
| results = { | |
| "query": query, | |
| "timestamp": datetime.now().isoformat(), | |
| "answer": answer, | |
| "references": [], # References not available in this API version | |
| "metadata": { | |
| "source": "Perplexity API", | |
| "model": "llama-3.1-sonar-huge-128k-online" | |
| } | |
| } | |
| return results | |
| except Exception as e: | |
| print(f"Error performing search: {str(e)}") | |
| return { | |
| "query": query, | |
| "timestamp": datetime.now().isoformat(), | |
| "error": str(e), | |
| "metadata": { | |
| "source": "Perplexity API", | |
| "status": "error" | |
| } | |
| } | |
| def format_results(self, results: Dict, format: str = "text") -> str: | |
| """ | |
| Format search results in the specified format. | |
| Args: | |
| results: Search results dictionary | |
| format: Output format ("text" or "markdown") | |
| Returns: | |
| Formatted string of results | |
| """ | |
| if "error" in results: | |
| return f"Error: {results['error']}" | |
| if format == "markdown": | |
| output = f"# Search Results for: {results['query']}\n\n" | |
| output += f"## Answer\n{results['answer']}\n\n" | |
| if results['references']: | |
| output += "## References\n" | |
| for i, ref in enumerate(results['references'], 1): | |
| output += f"{i}. {ref['title']} - {ref['url']}\n" | |
| return output | |
| else: | |
| output = f"Search Results for: {results['query']}\n\n" | |
| output += f"Answer:\n{results['answer']}\n\n" | |
| if results['references']: | |
| output += "References:\n" | |
| for i, ref in enumerate(results['references'], 1): | |
| output += f"{i}. {ref['title']} - {ref['url']}\n" | |
| return output | |
| class MCPServerManager: | |
| """ | |
| A tool for installing and managing Model Context Protocol (MCP) servers. | |
| Integrates with Claude desktop and manages server configurations. | |
| """ | |
| DEFAULT_CONFIG_LOCATIONS = [ | |
| "mcp.json", | |
| ".mcp/config.json", | |
| "config/mcp.json", | |
| "mcp_config.json", | |
| ".config/mcp/servers.json" | |
| ] | |
| def __init__(self, base_dir: Optional[str] = None): | |
| """ | |
| Initialize the MCP Server Manager. | |
| Args: | |
| base_dir: Base directory for installing servers. If None, uses current directory. | |
| """ | |
| self.base_dir = Path(base_dir) if base_dir else Path.cwd() / "mcp_servers" | |
| self.base_dir.mkdir(parents=True, exist_ok=True) | |
| self.servers_repo_url = "https://github.com/modelcontextprotocol/servers.git" | |
| self.installed_servers = {} | |
| self.load_installed_servers() | |
| def load_installed_servers(self): | |
| """Load information about installed servers from the config file.""" | |
| config_file = self.base_dir / "config.json" | |
| if config_file.exists(): | |
| with open(config_file, "r") as f: | |
| self.installed_servers = json.load(f) | |
| def save_installed_servers(self): | |
| """Save information about installed servers to the config file.""" | |
| config_file = self.base_dir / "config.json" | |
| with open(config_file, "w") as f: | |
| json.dump(self.installed_servers, f, indent=4) | |
| def get_featured_servers(self) -> List[Dict]: | |
| """ | |
| Get list of featured servers from the MCP GitHub repository. | |
| Returns: | |
| List of server information dictionaries | |
| """ | |
| try: | |
| # Clone or update the servers repository | |
| repo_dir = self.base_dir / "servers_repo" | |
| if repo_dir.exists(): | |
| repo = Repo(repo_dir) | |
| repo.remotes.origin.pull() | |
| else: | |
| repo = Repo.clone_from(self.servers_repo_url, repo_dir) | |
| # First try to read from featured.json | |
| featured_file = repo_dir / "featured.json" | |
| if featured_file.exists(): | |
| with open(featured_file, "r", encoding="utf-8") as f: | |
| return json.load(f) | |
| # If featured.json doesn't exist, parse README.md | |
| readme_file = repo_dir / "README.md" | |
| if readme_file.exists(): | |
| servers = [] | |
| with open(readme_file, "r", encoding="utf-8", errors="ignore") as f: | |
| content = f.read() | |
| # Look for server repository links | |
| repo_links = re.findall(r"\[([^\]]+)\]\((https://github.com/[^)]+)\)", content) | |
| for name, url in repo_links: | |
| if "/modelcontextprotocol/" in url: | |
| servers.append({ | |
| "name": name, | |
| "repository": url, | |
| "config": {} | |
| }) | |
| return servers | |
| return [] | |
| except Exception as e: | |
| print(f"Error getting featured servers: {str(e)}") | |
| return [] | |
| def find_server_config(self, server_dir: Path) -> Optional[Dict]: | |
| """ | |
| Search for MCP server configuration in common locations. | |
| Args: | |
| server_dir: Directory to search in | |
| Returns: | |
| Server configuration dictionary if found, None otherwise | |
| """ | |
| # First check common config file locations | |
| for config_path in self.DEFAULT_CONFIG_LOCATIONS: | |
| config_file = server_dir / config_path | |
| if config_file.exists(): | |
| try: | |
| with open(config_file, "r", encoding="utf-8") as f: | |
| config = json.load(f) | |
| if "mcpServers" in config: | |
| return config["mcpServers"] | |
| except Exception as e: | |
| print(f"Error reading config from {config_file}: {str(e)}") | |
| # Check package.json for Node.js projects | |
| package_json = server_dir / "package.json" | |
| if package_json.exists(): | |
| try: | |
| with open(package_json, "r", encoding="utf-8") as f: | |
| config = json.load(f) | |
| if "mcpServers" in config: | |
| return config["mcpServers"] | |
| except Exception as e: | |
| print(f"Error reading config from package.json: {str(e)}") | |
| # Check pyproject.toml for Python projects | |
| pyproject_toml = server_dir / "pyproject.toml" | |
| if pyproject_toml.exists(): | |
| try: | |
| import tomli | |
| with open(pyproject_toml, "rb") as f: | |
| config = tomli.load(f) | |
| if "tool" in config and "mcp" in config["tool"]: | |
| return {"python": config["tool"]["mcp"]} | |
| except ImportError: | |
| print("tomli package not found, skipping pyproject.toml parsing") | |
| except Exception as e: | |
| print(f"Error reading config from pyproject.toml: {str(e)}") | |
| return None | |
| def install_server(self, server_name: str, custom_config: Optional[Dict] = None) -> bool: | |
| """ | |
| Install a specific MCP server. | |
| Args: | |
| server_name: Name of the server to install | |
| custom_config: Optional custom configuration for the server | |
| Returns: | |
| True if installation was successful, False otherwise | |
| """ | |
| try: | |
| # Get server information from featured servers | |
| featured_servers = self.get_featured_servers() | |
| server_info = next((s for s in featured_servers if s["name"] == server_name), None) | |
| if not server_info: | |
| print(f"Server '{server_name}' not found in featured servers") | |
| return False | |
| # Create server directory | |
| server_dir = self.base_dir / server_name | |
| server_dir.mkdir(exist_ok=True) | |
| # Clone server repository | |
| repo = Repo.clone_from(server_info["repository"], server_dir) | |
| # Install dependencies | |
| if (server_dir / "requirements.txt").exists(): | |
| subprocess.run([sys.executable, "-m", "pip", "install", "-r", "requirements.txt"], cwd=server_dir) | |
| elif (server_dir / "package.json").exists(): | |
| subprocess.run(["npm", "install"], cwd=server_dir) | |
| # Find or use custom server configuration | |
| config = custom_config or self.find_server_config(server_dir) or {} | |
| # Store server information | |
| self.installed_servers[server_name] = { | |
| "path": str(server_dir), | |
| "version": repo.head.commit.hexsha[:8], | |
| "install_date": datetime.now().isoformat(), | |
| "config": config | |
| } | |
| self.save_installed_servers() | |
| print(f"Successfully installed {server_name}") | |
| if config: | |
| print(f"Found server configuration: {json.dumps(config, indent=2)}") | |
| else: | |
| print("No server configuration found. You may need to configure it manually.") | |
| return True | |
| except Exception as e: | |
| print(f"Error installing server '{server_name}': {str(e)}") | |
| return False | |
| def uninstall_server(self, server_name: str) -> bool: | |
| """ | |
| Uninstall a specific MCP server. | |
| Args: | |
| server_name: Name of the server to uninstall | |
| Returns: | |
| True if uninstallation was successful, False otherwise | |
| """ | |
| try: | |
| if server_name not in self.installed_servers: | |
| print(f"Server '{server_name}' is not installed") | |
| return False | |
| # Remove server directory | |
| server_dir = Path(self.installed_servers[server_name]["path"]) | |
| shutil.rmtree(server_dir) | |
| # Remove from installed servers | |
| del self.installed_servers[server_name] | |
| self.save_installed_servers() | |
| print(f"Successfully uninstalled {server_name}") | |
| return True | |
| except Exception as e: | |
| print(f"Error uninstalling server '{server_name}': {str(e)}") | |
| return False | |
| def list_installed_servers(self) -> Dict[str, Dict]: | |
| """ | |
| Get information about installed servers. | |
| Returns: | |
| Dictionary of installed server information | |
| """ | |
| return self.installed_servers | |
| def update_server(self, server_name: str) -> bool: | |
| """ | |
| Update a specific MCP server to the latest version. | |
| Args: | |
| server_name: Name of the server to update | |
| Returns: | |
| True if update was successful, False otherwise | |
| """ | |
| try: | |
| if server_name not in self.installed_servers: | |
| print(f"Server '{server_name}' is not installed") | |
| return False | |
| server_dir = Path(self.installed_servers[server_name]["path"]) | |
| repo = Repo(server_dir) | |
| # Get current version | |
| old_version = repo.head.commit.hexsha[:8] | |
| # Pull latest changes | |
| repo.remotes.origin.pull() | |
| # Get new version | |
| new_version = repo.head.commit.hexsha[:8] | |
| # Update dependencies if needed | |
| if (server_dir / "requirements.txt").exists(): | |
| subprocess.run([sys.executable, "-m", "pip", "install", "-r", "requirements.txt"], cwd=server_dir) | |
| # Update stored information | |
| self.installed_servers[server_name]["version"] = new_version | |
| self.save_installed_servers() | |
| print(f"Updated {server_name} from {old_version} to {new_version}") | |
| return True | |
| except Exception as e: | |
| print(f"Error updating server '{server_name}': {str(e)}") | |
| return False | |
| def configure_server(self, server_name: str, config: Dict) -> bool: | |
| """ | |
| Configure a specific MCP server. | |
| Args: | |
| server_name: Name of the server to configure | |
| config: Configuration dictionary in the format: | |
| { | |
| "command": str, # Command to run the server | |
| "args": List[str], # Arguments for the command | |
| "env": Dict[str, str], # Optional environment variables | |
| "cwd": str, # Optional working directory | |
| } | |
| Returns: | |
| True if configuration was successful, False otherwise | |
| """ | |
| try: | |
| if server_name not in self.installed_servers: | |
| print(f"Server '{server_name}' is not installed") | |
| return False | |
| server_dir = Path(self.installed_servers[server_name]["path"]) | |
| # Validate configuration | |
| if "command" not in config: | |
| print("Error: Server configuration must include 'command'") | |
| return False | |
| # Update configuration | |
| self.installed_servers[server_name]["config"] = config | |
| self.save_installed_servers() | |
| # Try to write configuration to a standard location | |
| config_dir = server_dir / ".mcp" | |
| config_dir.mkdir(exist_ok=True) | |
| config_file = config_dir / "config.json" | |
| with open(config_file, "w", encoding="utf-8") as f: | |
| json.dump({"mcpServers": {server_name: config}}, f, indent=2) | |
| print(f"Successfully configured {server_name}") | |
| print(f"Configuration saved to {config_file}") | |
| return True | |
| except Exception as e: | |
| print(f"Error configuring server '{server_name}': {str(e)}") | |
| return False | |
| def start_server(self, server_name: str) -> bool: | |
| """ | |
| Start a specific MCP server. | |
| Args: | |
| server_name: Name of the server to start | |
| Returns: | |
| True if server was started successfully, False otherwise | |
| """ | |
| try: | |
| if server_name not in self.installed_servers: | |
| print(f"Server '{server_name}' is not installed") | |
| return False | |
| server_info = self.installed_servers[server_name] | |
| config = server_info.get("config", {}) | |
| if not config: | |
| print(f"Server '{server_name}' is not configured") | |
| return False | |
| # Prepare command and arguments | |
| command = config.get("command") | |
| args = config.get("args", []) | |
| env = {**os.environ, **(config.get("env", {}))} | |
| cwd = config.get("cwd") or server_info["path"] | |
| # Start the server process | |
| process = subprocess.Popen( | |
| [command, *args], | |
| env=env, | |
| cwd=cwd, | |
| stdout=subprocess.PIPE, | |
| stderr=subprocess.PIPE, | |
| text=True | |
| ) | |
| # Store process information | |
| self.installed_servers[server_name]["process"] = process | |
| self.save_installed_servers() | |
| print(f"Started server '{server_name}' (PID: {process.pid})") | |
| return True | |
| except Exception as e: | |
| print(f"Error starting server '{server_name}': {str(e)}") | |
| return False | |
| def stop_server(self, server_name: str) -> bool: | |
| """ | |
| Stop a specific MCP server. | |
| Args: | |
| server_name: Name of the server to stop | |
| Returns: | |
| True if server was stopped successfully, False otherwise | |
| """ | |
| try: | |
| if server_name not in self.installed_servers: | |
| print(f"Server '{server_name}' is not installed") | |
| return False | |
| server_info = self.installed_servers[server_name] | |
| process = server_info.get("process") | |
| if not process: | |
| print(f"Server '{server_name}' is not running") | |
| return False | |
| # Try to stop the process gracefully | |
| process.terminate() | |
| try: | |
| process.wait(timeout=5) | |
| except subprocess.TimeoutExpired: | |
| process.kill() | |
| # Remove process information | |
| del self.installed_servers[server_name]["process"] | |
| self.save_installed_servers() | |
| print(f"Stopped server '{server_name}'") | |
| return True | |
| except Exception as e: | |
| print(f"Error stopping server '{server_name}': {str(e)}") | |
| return False | |
| async def generate_pyflowchart(repo_path: str, output_dir: Optional[str] = None) -> None: | |
| """ | |
| Generate flowcharts for all Python files in a repository using pyflowchart. | |
| Creates HTML flowcharts that can be viewed in a browser. | |
| Args: | |
| repo_path: Path to the repository | |
| output_dir: Optional directory to save flowcharts. If None, creates a 'flowcharts' directory in the repo. | |
| """ | |
| try: | |
| # Ensure pyflowchart is installed | |
| subprocess.run([sys.executable, "-m", "pip", "install", "pyflowchart"], check=True) | |
| # Set up output directory | |
| if output_dir is None: | |
| output_dir = os.path.join(repo_path, 'flowcharts') | |
| os.makedirs(output_dir, exist_ok=True) | |
| # Generate HTML flowcharts | |
| for root, _, files in os.walk(repo_path): | |
| for file in files: | |
| if file.endswith('.py'): | |
| py_file = os.path.join(root, file) | |
| # Skip empty files | |
| if os.path.getsize(py_file) == 0: | |
| print(f"Skipping empty file: {py_file}") | |
| continue | |
| # Check if file has actual Python code | |
| with open(py_file, 'r', encoding='utf-8') as f: | |
| content = f.read().strip() | |
| if not content: | |
| print(f"Skipping empty file: {py_file}") | |
| continue | |
| html_file = os.path.join(output_dir, f"{os.path.splitext(file)[0]}_flowchart.html") | |
| # Generate flowchart HTML | |
| print(f"Generating flowchart for {py_file}") | |
| try: | |
| subprocess.run([ | |
| sys.executable, "-m", "pyflowchart", py_file, | |
| "--output", html_file | |
| ], check=True) | |
| print(f"Saved HTML to {html_file}") | |
| except subprocess.CalledProcessError as e: | |
| print(f"Error generating flowchart for {py_file}: {str(e)}") | |
| continue | |
| print(f"\nFlowcharts generated in: {output_dir}") | |
| except subprocess.CalledProcessError as e: | |
| print(f"Error running pyflowchart: {str(e)}") | |
| except Exception as e: | |
| print(f"Error generating flowcharts: {str(e)}") | |
| def extract_repo_context(repo_url: str, output_file: Optional[str] = None) -> None: | |
| """ | |
| Extract repository context using gitingest.com. | |
| Args: | |
| repo_url: GitHub repository URL | |
| output_file: Optional file to save the context. If None, uses repo name with .txt extension. | |
| """ | |
| try: | |
| # Convert github.com URL to gitingest.com | |
| if "github.com" not in repo_url: | |
| raise ValueError("Only GitHub repositories are supported") | |
| ingest_url = repo_url.replace("github.com", "gitingest.com") | |
| print(f"Fetching repository context from: {ingest_url}") | |
| # Make request to gitingest.com | |
| response = requests.get(ingest_url) | |
| response.raise_for_status() | |
| # Extract content | |
| content = response.text | |
| # Save to file | |
| if output_file is None: | |
| repo_name = repo_url.split('/')[-1].replace('.git', '') | |
| output_file = f"{repo_name}_context.txt" | |
| with open(output_file, 'w', encoding='utf-8') as f: | |
| f.write(content) | |
| print(f"Repository context saved to: {output_file}") | |
| except requests.RequestException as e: | |
| print(f"Error fetching repository context: {str(e)}") | |
| except Exception as e: | |
| print(f"Error extracting repository context: {str(e)}") | |
| async def post_clone_actions(repo_url: str) -> None: | |
| """ | |
| Perform post-clone actions after a git clone operation: | |
| 1. Generate flowcharts for all Python files using pyflowchart | |
| 2. Extract repository context using gitingest.com | |
| Args: | |
| repo_url: URL of the repository that was just cloned | |
| """ | |
| try: | |
| # Get the repository name from the URL | |
| repo_name = repo_url.split('/')[-1].replace('.git', '') | |
| repo_path = os.path.join(os.getcwd(), repo_name) | |
| # Generate flowcharts | |
| print("\nGenerating flowcharts...") | |
| await generate_pyflowchart(repo_path) | |
| # Extract repository context | |
| print("\nExtracting repository context...") | |
| extract_repo_context(repo_url) | |
| except Exception as e: | |
| print(f"Error in post-clone actions: {str(e)}") | |
| # Example usage: | |
| if __name__ == "__main__": | |
| # Initialize the search tool | |
| search_tool = WebSearchTool() | |
| # Perform a simple search | |
| results = search_tool.search("Latest developments in AI technology") | |
| # Print formatted results | |
| print(search_tool.format_results(results, "markdown")) | |