daqc's picture
Vulnerabilty Intelligence PoC
2e82565
"""
Coordinator Agent module for vulnerability intelligence.
This agent is responsible for coordinating the other agents and generating the final report.
"""
import json
import time
import logging
from typing import Dict, List, Any, Optional, Union
from smolagents import tool
from tools import utils
from tools.parsers import CWEParser
from . import cve_agent, nvd_agent, cisa_agent, cwe_agent
logger = utils.setup_logger("coordinator_agent")
@tool
def search_vulnerabilities_for_software(software: str, version: str) -> Dict[str, Any]:
"""
Search for vulnerabilities related to a specific software and version across all sources.
Args:
software: Name of the software to search for
version: Version of the software to search for
Returns:
Dictionary with vulnerability information for the software and version from all sources
"""
logger.info(f"Searching for vulnerabilities for {software} version {version}")
# Initialize results from each source
results = []
# Search NVD
logger.info("Searching NVD...")
nvd_results = nvd_agent.search_nvd_for_software(software, version)
if nvd_results.get("vulnerabilities"):
logger.info(f"Found {len(nvd_results['vulnerabilities'])} vulnerabilities in NVD")
results.append(nvd_results)
# Search CVE
logger.info("Searching CVE...")
cve_results = cve_agent.search_cve_for_software(software, version)
if cve_results.get("vulnerabilities"):
logger.info(f"Found {len(cve_results['vulnerabilities'])} vulnerabilities in CVE")
results.append(cve_results)
# Search CISA KEV
logger.info("Searching CISA KEV...")
cisa_results = cisa_agent.search_cisa_kev_for_software(software, version)
if cisa_results.get("vulnerabilities"):
logger.info(f"Found {len(cisa_results['vulnerabilities'])} vulnerabilities in CISA KEV")
results.append(cisa_results)
# Merge the results
merged_results = utils.merge_vulnerability_data(results)
# Enhance with CWE information
for vuln in merged_results.get("vulnerabilities", []):
if "description" in vuln:
# Try to extract CWEs from the description
cwe_ids = CWEParser.extract_cwe_from_cve(vuln["description"])
if cwe_ids:
cwe_details = []
for cwe_id in cwe_ids[:3]: # Limit to 3 CWEs to avoid too many requests
cwe_detail = cwe_agent.get_cwe_details(cwe_id)
if "error" not in cwe_detail:
cwe_details.append(cwe_detail)
time.sleep(1) # Add a short delay between CWE lookups
if cwe_details:
vuln["related_cwe"] = cwe_details
# Generate report
if merged_results.get("vulnerabilities"):
report_filename = f"{software.lower().replace(' ', '_')}_{version}"
utils.save_report(merged_results, report_filename)
utils.generate_markdown_report(merged_results, report_filename)
return merged_results
@tool
def get_vulnerability_details(cve_id: str) -> Dict[str, Any]:
"""
Get detailed information about a specific vulnerability.
Args:
cve_id: CVE ID to get details for
Returns:
Dictionary with detailed information about the vulnerability
"""
logger.info(f"Getting details for {cve_id}")
# Mock response - in a real implementation, this would query actual sources
if cve_id == "CVE-2021-44228": # Log4Shell
return {
"id": "CVE-2021-44228",
"description": "Log4j es vulnerable a la ejecuci贸n remota de c贸digo (RCE) porque permite la sustituci贸n de b煤squedas JNDI, que pueden exponerse a trav茅s de campos controlados por el usuario en solicitudes HTTP, encabezados o mensajes de registro.",
"severity": "CRITICAL",
"cvss": "10.0",
"date": "2021-12-10",
"recommendation": "Actualizar a Log4j 2.15.0 o posterior",
"affected_versions": "Log4j 2.0 hasta 2.14.1",
"source": "https://nvd.nist.gov/vuln/detail/CVE-2021-44228",
"related_cwe": ["CWE-20", "CWE-400", "CWE-502"]
}
else:
return {
"id": cve_id,
"description": "No se encontraron detalles para este ID de CVE.",
"source": "https://cve.mitre.org/cgi-bin/cvename.cgi?name=" + cve_id
}
# ... existing code ...