|
""" |
|
Utility functions for the Vulnerability Intelligence Agent (VIA). |
|
""" |
|
import os |
|
import logging |
|
import json |
|
import datetime |
|
from typing import Dict, List, Any, Optional, Union |
|
|
|
def setup_logger(name: str) -> logging.Logger: |
|
""" |
|
Set up a logger with the specified name. |
|
|
|
Args: |
|
name: Name of the logger |
|
|
|
Returns: |
|
Configured logger instance |
|
""" |
|
logger = logging.getLogger(name) |
|
if not logger.handlers: |
|
handler = logging.StreamHandler() |
|
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') |
|
handler.setFormatter(formatter) |
|
logger.addHandler(handler) |
|
|
|
return logger |
|
|
|
def merge_vulnerability_data(results: List[Dict[str, Any]]) -> Dict[str, Any]: |
|
""" |
|
Merge vulnerability data from multiple sources. |
|
|
|
Args: |
|
results: List of dictionaries with vulnerability data from different sources |
|
|
|
Returns: |
|
Merged dictionary with all vulnerabilities |
|
""" |
|
|
|
if not results: |
|
return {"software": "", "version": "", "vulnerabilities": []} |
|
|
|
merged = { |
|
"software": results[0].get("software", ""), |
|
"version": results[0].get("version", ""), |
|
"vulnerabilities": [] |
|
} |
|
|
|
|
|
for result in results: |
|
if "vulnerabilities" in result: |
|
merged["vulnerabilities"].extend(result["vulnerabilities"]) |
|
|
|
return merged |
|
|
|
def save_report(data: Dict[str, Any], filename: str, report_dir: str = "reports") -> str: |
|
""" |
|
Save vulnerability data to a JSON file. |
|
|
|
Args: |
|
data: Vulnerability data to save |
|
filename: Base filename (without extension) |
|
report_dir: Directory to save the report in |
|
|
|
Returns: |
|
Path to the saved JSON file |
|
""" |
|
|
|
os.makedirs(report_dir, exist_ok=True) |
|
|
|
|
|
timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") |
|
json_filename = f"{filename}_{timestamp}.json" |
|
json_path = os.path.join(report_dir, json_filename) |
|
|
|
|
|
with open(json_path, 'w') as f: |
|
json.dump(data, f, indent=2) |
|
|
|
return json_path |
|
|
|
def generate_markdown_report(data: Dict[str, Any], filename: str, report_dir: str = "reports") -> str: |
|
""" |
|
Generate a Markdown report from vulnerability data. |
|
|
|
Args: |
|
data: Vulnerability data |
|
filename: Base filename (without extension) |
|
report_dir: Directory to save the report in |
|
|
|
Returns: |
|
Path to the generated Markdown file |
|
""" |
|
|
|
os.makedirs(report_dir, exist_ok=True) |
|
|
|
|
|
timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") |
|
md_filename = f"{filename}_{timestamp}.md" |
|
md_path = os.path.join(report_dir, md_filename) |
|
|
|
with open(md_path, 'w') as f: |
|
|
|
f.write(f"# Vulnerability Report: {data['software']} {data['version']}\n\n") |
|
f.write(f"*Generated on: {datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}*\n\n") |
|
|
|
|
|
f.write("## Summary\n\n") |
|
vuln_count = len(data.get('vulnerabilities', [])) |
|
f.write(f"Found **{vuln_count}** vulnerabilities for {data['software']} {data['version']}.\n\n") |
|
|
|
|
|
if vuln_count > 0: |
|
f.write("## Vulnerabilities\n\n") |
|
|
|
for i, vuln in enumerate(data['vulnerabilities'], 1): |
|
f.write(f"### {i}. {vuln.get('id', 'Unknown ID')}\n\n") |
|
f.write(f"**Severity:** {vuln.get('severity', 'Unknown')}") |
|
if 'cvss' in vuln: |
|
f.write(f" (CVSS: {vuln['cvss']})") |
|
f.write("\n\n") |
|
|
|
f.write(f"**Description:** {vuln.get('description', 'No description available.')}\n\n") |
|
|
|
if 'date' in vuln: |
|
f.write(f"**Published:** {vuln['date']}\n\n") |
|
|
|
if 'recommendation' in vuln: |
|
f.write(f"**Recommendation:** {vuln['recommendation']}\n\n") |
|
|
|
if 'source' in vuln: |
|
f.write(f"**Source:** [{vuln['source']}]({vuln['source']})\n\n") |
|
|
|
f.write("---\n\n") |
|
else: |
|
f.write("## No vulnerabilities found\n\n") |
|
f.write("No known vulnerabilities were found for this software and version.\n\n") |
|
|
|
|
|
f.write("## References\n\n") |
|
f.write("- [CVE (Common Vulnerabilities and Exposures)](https://cve.mitre.org/)\n") |
|
f.write("- [NVD (National Vulnerability Database)](https://nvd.nist.gov/)\n") |
|
f.write("- [CISA Known Exploited Vulnerabilities Catalog](https://www.cisa.gov/known-exploited-vulnerabilities-catalog)\n") |
|
f.write("- [CWE (Common Weakness Enumeration)](https://cwe.mitre.org/)\n") |
|
|
|
return md_path |