|
""" |
|
Coordinator Agent module for vulnerability intelligence. |
|
This agent is responsible for coordinating the other agents and generating the final report. |
|
""" |
|
import json |
|
import time |
|
import logging |
|
from typing import Dict, List, Any, Optional, Union |
|
|
|
from smolagents import tool |
|
from tools import utils |
|
from tools.parsers import CWEParser |
|
from . import cve_agent, nvd_agent, cisa_agent, cwe_agent |
|
|
|
logger = utils.setup_logger("coordinator_agent") |
|
|
|
|
|
@tool |
|
def search_vulnerabilities_for_software(software: str, version: str) -> Dict[str, Any]: |
|
""" |
|
Search for vulnerabilities related to a specific software and version across all sources. |
|
|
|
Args: |
|
software: Name of the software to search for |
|
version: Version of the software to search for |
|
|
|
Returns: |
|
Dictionary with vulnerability information for the software and version from all sources |
|
""" |
|
logger.info(f"Searching for vulnerabilities for {software} version {version}") |
|
|
|
|
|
results = [] |
|
|
|
|
|
logger.info("Searching NVD...") |
|
nvd_results = nvd_agent.search_nvd_for_software(software, version) |
|
if nvd_results.get("vulnerabilities"): |
|
logger.info(f"Found {len(nvd_results['vulnerabilities'])} vulnerabilities in NVD") |
|
results.append(nvd_results) |
|
|
|
|
|
logger.info("Searching CVE...") |
|
cve_results = cve_agent.search_cve_for_software(software, version) |
|
if cve_results.get("vulnerabilities"): |
|
logger.info(f"Found {len(cve_results['vulnerabilities'])} vulnerabilities in CVE") |
|
results.append(cve_results) |
|
|
|
|
|
logger.info("Searching CISA KEV...") |
|
cisa_results = cisa_agent.search_cisa_kev_for_software(software, version) |
|
if cisa_results.get("vulnerabilities"): |
|
logger.info(f"Found {len(cisa_results['vulnerabilities'])} vulnerabilities in CISA KEV") |
|
results.append(cisa_results) |
|
|
|
|
|
merged_results = utils.merge_vulnerability_data(results) |
|
|
|
|
|
for vuln in merged_results.get("vulnerabilities", []): |
|
if "description" in vuln: |
|
|
|
cwe_ids = CWEParser.extract_cwe_from_cve(vuln["description"]) |
|
if cwe_ids: |
|
cwe_details = [] |
|
for cwe_id in cwe_ids[:3]: |
|
cwe_detail = cwe_agent.get_cwe_details(cwe_id) |
|
if "error" not in cwe_detail: |
|
cwe_details.append(cwe_detail) |
|
time.sleep(1) |
|
|
|
if cwe_details: |
|
vuln["related_cwe"] = cwe_details |
|
|
|
|
|
if merged_results.get("vulnerabilities"): |
|
report_filename = f"{software.lower().replace(' ', '_')}_{version}" |
|
utils.save_report(merged_results, report_filename) |
|
utils.generate_markdown_report(merged_results, report_filename) |
|
|
|
return merged_results |
|
|
|
|
|
@tool |
|
def get_vulnerability_details(cve_id: str) -> Dict[str, Any]: |
|
""" |
|
Get detailed information about a specific vulnerability. |
|
|
|
Args: |
|
cve_id: CVE ID to get details for |
|
|
|
Returns: |
|
Dictionary with detailed information about the vulnerability |
|
""" |
|
logger.info(f"Getting details for {cve_id}") |
|
|
|
|
|
if cve_id == "CVE-2021-44228": |
|
return { |
|
"id": "CVE-2021-44228", |
|
"description": "Log4j es vulnerable a la ejecuci贸n remota de c贸digo (RCE) porque permite la sustituci贸n de b煤squedas JNDI, que pueden exponerse a trav茅s de campos controlados por el usuario en solicitudes HTTP, encabezados o mensajes de registro.", |
|
"severity": "CRITICAL", |
|
"cvss": "10.0", |
|
"date": "2021-12-10", |
|
"recommendation": "Actualizar a Log4j 2.15.0 o posterior", |
|
"affected_versions": "Log4j 2.0 hasta 2.14.1", |
|
"source": "https://nvd.nist.gov/vuln/detail/CVE-2021-44228", |
|
"related_cwe": ["CWE-20", "CWE-400", "CWE-502"] |
|
} |
|
else: |
|
return { |
|
"id": cve_id, |
|
"description": "No se encontraron detalles para este ID de CVE.", |
|
"source": "https://cve.mitre.org/cgi-bin/cvename.cgi?name=" + cve_id |
|
} |
|
|
|
|