""" Coordinator Agent module for vulnerability intelligence. This agent is responsible for coordinating the other agents and generating the final report. """ import json import time import logging from typing import Dict, List, Any, Optional, Union from smolagents import tool from tools import utils from tools.parsers import CWEParser from . import cve_agent, nvd_agent, cisa_agent, cwe_agent logger = utils.setup_logger("coordinator_agent") @tool def search_vulnerabilities_for_software(software: str, version: str) -> Dict[str, Any]: """ Search for vulnerabilities related to a specific software and version across all sources. Args: software: Name of the software to search for version: Version of the software to search for Returns: Dictionary with vulnerability information for the software and version from all sources """ logger.info(f"Searching for vulnerabilities for {software} version {version}") # Initialize results from each source results = [] # Search NVD logger.info("Searching NVD...") nvd_results = nvd_agent.search_nvd_for_software(software, version) if nvd_results.get("vulnerabilities"): logger.info(f"Found {len(nvd_results['vulnerabilities'])} vulnerabilities in NVD") results.append(nvd_results) # Search CVE logger.info("Searching CVE...") cve_results = cve_agent.search_cve_for_software(software, version) if cve_results.get("vulnerabilities"): logger.info(f"Found {len(cve_results['vulnerabilities'])} vulnerabilities in CVE") results.append(cve_results) # Search CISA KEV logger.info("Searching CISA KEV...") cisa_results = cisa_agent.search_cisa_kev_for_software(software, version) if cisa_results.get("vulnerabilities"): logger.info(f"Found {len(cisa_results['vulnerabilities'])} vulnerabilities in CISA KEV") results.append(cisa_results) # Merge the results merged_results = utils.merge_vulnerability_data(results) # Enhance with CWE information for vuln in merged_results.get("vulnerabilities", []): if "description" in vuln: # Try to extract CWEs from the description cwe_ids = CWEParser.extract_cwe_from_cve(vuln["description"]) if cwe_ids: cwe_details = [] for cwe_id in cwe_ids[:3]: # Limit to 3 CWEs to avoid too many requests cwe_detail = cwe_agent.get_cwe_details(cwe_id) if "error" not in cwe_detail: cwe_details.append(cwe_detail) time.sleep(1) # Add a short delay between CWE lookups if cwe_details: vuln["related_cwe"] = cwe_details # Generate report if merged_results.get("vulnerabilities"): report_filename = f"{software.lower().replace(' ', '_')}_{version}" utils.save_report(merged_results, report_filename) utils.generate_markdown_report(merged_results, report_filename) return merged_results @tool def get_vulnerability_details(cve_id: str) -> Dict[str, Any]: """ Get detailed information about a specific vulnerability. Args: cve_id: CVE ID to get details for Returns: Dictionary with detailed information about the vulnerability """ logger.info(f"Getting details for {cve_id}") # Mock response - in a real implementation, this would query actual sources if cve_id == "CVE-2021-44228": # Log4Shell return { "id": "CVE-2021-44228", "description": "Log4j es vulnerable a la ejecución remota de código (RCE) porque permite la sustitución de búsquedas JNDI, que pueden exponerse a través de campos controlados por el usuario en solicitudes HTTP, encabezados o mensajes de registro.", "severity": "CRITICAL", "cvss": "10.0", "date": "2021-12-10", "recommendation": "Actualizar a Log4j 2.15.0 o posterior", "affected_versions": "Log4j 2.0 hasta 2.14.1", "source": "https://nvd.nist.gov/vuln/detail/CVE-2021-44228", "related_cwe": ["CWE-20", "CWE-400", "CWE-502"] } else: return { "id": cve_id, "description": "No se encontraron detalles para este ID de CVE.", "source": "https://cve.mitre.org/cgi-bin/cvename.cgi?name=" + cve_id } # ... existing code ...