daqc commited on
Commit
2e82565
·
1 Parent(s): 060c1c8

Vulnerabilty Intelligence PoC

Browse files
.gitignore ADDED
@@ -0,0 +1,63 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ # Python
2
+ __pycache__/
3
+ *.py[cod]
4
+ *$py.class
5
+ *.so
6
+ .Python
7
+ build/
8
+ develop-eggs/
9
+ dist/
10
+ downloads/
11
+ eggs/
12
+ .eggs/
13
+ lib/
14
+ lib64/
15
+ parts/
16
+ sdist/
17
+ var/
18
+ wheels/
19
+ *.egg-info/
20
+ .installed.cfg
21
+ *.egg
22
+ MANIFEST
23
+
24
+ # Virtual Environment
25
+ venv/
26
+ env/
27
+ ENV/
28
+
29
+ # Reports generated by the agent
30
+ reports/
31
+ *.json
32
+ *.md
33
+ !README.md
34
+ !USAGE.md
35
+ !example_input.json
36
+
37
+ # IDE files
38
+ .idea/
39
+ .vscode/
40
+ *.swp
41
+ *.swo
42
+ .project
43
+ .pydevproject
44
+ .settings/
45
+
46
+ # Environment variables
47
+ .env
48
+
49
+ # Jupyter Notebook
50
+ .ipynb_checkpoints
51
+
52
+ # Logs and databases
53
+ *.log
54
+ *.sqlite3
55
+
56
+ # OS specific files
57
+ .DS_Store
58
+ .DS_Store?
59
+ ._*
60
+ .Spotlight-V100
61
+ .Trashes
62
+ ehthumbs.db
63
+ Thumbs.db
README.md CHANGED
@@ -16,4 +16,6 @@ tags:
16
  - hacking
17
  ---
18
 
19
- Check out the configuration reference at https://huggingface.co/docs/hub/spaces-config-reference
 
 
 
16
  - hacking
17
  ---
18
 
19
+ # Vulnerability Intelligence Agent (VIA)
20
+
21
+ Vulnerability Intelligence Agent (VIA) es un agente inteligente diseñado para buscar y analizar vulnerabilidades en software.
USAGE.md ADDED
@@ -0,0 +1,101 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ # Instrucciones de Uso - Vulnerability Intelligence Agent (VIA)
2
+
3
+ ## Requisitos previos
4
+
5
+ Asegúrate de tener instaladas todas las dependencias necesarias:
6
+
7
+ ```bash
8
+ pip install -r requirements.txt
9
+ ```
10
+
11
+ ## Uso Básico
12
+
13
+ ### Comprobar vulnerabilidades para un software específico
14
+
15
+ Para buscar vulnerabilidades en un software y versión específicos:
16
+
17
+ ```bash
18
+ python main.py --software "OpenSSL" --version "1.1.1k"
19
+ ```
20
+
21
+ ### Procesar un archivo de entrada con múltiples software
22
+
23
+ Para procesar un archivo JSON que contiene una lista de software:
24
+
25
+ ```bash
26
+ python main.py --input example_input.json
27
+ ```
28
+
29
+ El archivo de entrada debe tener el siguiente formato:
30
+
31
+ ```json
32
+ [
33
+ {
34
+ "name": "OpenSSL",
35
+ "version": "1.1.1k"
36
+ },
37
+ {
38
+ "name": "Apache",
39
+ "version": "2.4.54"
40
+ }
41
+ ]
42
+ ```
43
+
44
+ ### Especificar directorio de salida para los reportes
45
+
46
+ Por defecto, los reportes se guardan en el directorio `reports`. Puedes especificar un directorio diferente:
47
+
48
+ ```bash
49
+ python main.py --input example_input.json --output-dir my_reports
50
+ ```
51
+
52
+ ### Habilitar modo verboso
53
+
54
+ Para obtener más información sobre lo que está haciendo el agente:
55
+
56
+ ```bash
57
+ python main.py --software "OpenSSL" --version "1.1.1k" --verbose
58
+ ```
59
+
60
+ ### Especificar un modelo diferente
61
+
62
+ Por defecto, el agente utiliza el modelo "Qwen/Qwen2.5-Coder-32B-Instruct". Puedes especificar un modelo diferente:
63
+
64
+ ```bash
65
+ python main.py --software "OpenSSL" --version "1.1.1k" --model "otra-id-de-modelo"
66
+ ```
67
+
68
+ ## Formato de Salida
69
+
70
+ El agente genera reportes en dos formatos:
71
+
72
+ 1. **JSON**: Contiene todos los datos estructurados de las vulnerabilidades encontradas.
73
+ 2. **Markdown**: Un reporte legible con información formateada sobre las vulnerabilidades.
74
+
75
+ Los reportes se guardan en el directorio especificado (por defecto, `reports`) con un nombre de archivo basado en el software y la versión, más una marca de tiempo.
76
+
77
+ ## Ejemplos
78
+
79
+ ### Ejemplo 1: Buscar vulnerabilidades en OpenSSL 1.1.1k
80
+
81
+ ```bash
82
+ python main.py --software "OpenSSL" --version "1.1.1k"
83
+ ```
84
+
85
+ ### Ejemplo 2: Procesar varios software desde un archivo
86
+
87
+ ```bash
88
+ python main.py --input example_input.json --verbose
89
+ ```
90
+
91
+ ### Ejemplo 3: Guardar reportes en un directorio específico
92
+
93
+ ```bash
94
+ python main.py --input example_input.json --output-dir vulnerability_reports
95
+ ```
96
+
97
+ ## Notas
98
+
99
+ - El agente limita las solicitudes a las bases de datos de vulnerabilidades para evitar problemas de limitación de tasa.
100
+ - Para consultas de software con muchas vulnerabilidades conocidas, el proceso puede tardar varios minutos.
101
+ - Si el agente encuentra muchas vulnerabilidades, solo mostrará las más críticas en la salida de la consola, pero todas se incluirán en los reportes generados.
agents/__init__.py ADDED
@@ -0,0 +1,4 @@
 
 
 
 
 
1
+ """
2
+ Vulnerability Intelligence Agent (VIA) - Agents Package.
3
+ This package contains specialized agents for querying different vulnerability databases.
4
+ """
agents/cisa_agent.py ADDED
@@ -0,0 +1,22 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ Agent for searching the CISA Known Exploited Vulnerabilities (KEV) catalog.
3
+ """
4
+ from typing import Dict, List, Any, Optional
5
+
6
+ def search_cisa_kev_for_software(software: str, version: str) -> Dict[str, Any]:
7
+ """
8
+ Search for CISA KEV entries related to a specific software and version.
9
+
10
+ Args:
11
+ software: Name of the software to search for
12
+ version: Version of the software to search for
13
+
14
+ Returns:
15
+ Dictionary with CISA KEV information for the software and version
16
+ """
17
+ # Simplified mock implementation
18
+ return {
19
+ "software": software,
20
+ "version": version,
21
+ "vulnerabilities": []
22
+ }
agents/coordinator_agent.py ADDED
@@ -0,0 +1,117 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ Coordinator Agent module for vulnerability intelligence.
3
+ This agent is responsible for coordinating the other agents and generating the final report.
4
+ """
5
+ import json
6
+ import time
7
+ import logging
8
+ from typing import Dict, List, Any, Optional, Union
9
+
10
+ from smolagents import tool
11
+ from tools import utils
12
+ from tools.parsers import CWEParser
13
+ from . import cve_agent, nvd_agent, cisa_agent, cwe_agent
14
+
15
+ logger = utils.setup_logger("coordinator_agent")
16
+
17
+
18
+ @tool
19
+ def search_vulnerabilities_for_software(software: str, version: str) -> Dict[str, Any]:
20
+ """
21
+ Search for vulnerabilities related to a specific software and version across all sources.
22
+
23
+ Args:
24
+ software: Name of the software to search for
25
+ version: Version of the software to search for
26
+
27
+ Returns:
28
+ Dictionary with vulnerability information for the software and version from all sources
29
+ """
30
+ logger.info(f"Searching for vulnerabilities for {software} version {version}")
31
+
32
+ # Initialize results from each source
33
+ results = []
34
+
35
+ # Search NVD
36
+ logger.info("Searching NVD...")
37
+ nvd_results = nvd_agent.search_nvd_for_software(software, version)
38
+ if nvd_results.get("vulnerabilities"):
39
+ logger.info(f"Found {len(nvd_results['vulnerabilities'])} vulnerabilities in NVD")
40
+ results.append(nvd_results)
41
+
42
+ # Search CVE
43
+ logger.info("Searching CVE...")
44
+ cve_results = cve_agent.search_cve_for_software(software, version)
45
+ if cve_results.get("vulnerabilities"):
46
+ logger.info(f"Found {len(cve_results['vulnerabilities'])} vulnerabilities in CVE")
47
+ results.append(cve_results)
48
+
49
+ # Search CISA KEV
50
+ logger.info("Searching CISA KEV...")
51
+ cisa_results = cisa_agent.search_cisa_kev_for_software(software, version)
52
+ if cisa_results.get("vulnerabilities"):
53
+ logger.info(f"Found {len(cisa_results['vulnerabilities'])} vulnerabilities in CISA KEV")
54
+ results.append(cisa_results)
55
+
56
+ # Merge the results
57
+ merged_results = utils.merge_vulnerability_data(results)
58
+
59
+ # Enhance with CWE information
60
+ for vuln in merged_results.get("vulnerabilities", []):
61
+ if "description" in vuln:
62
+ # Try to extract CWEs from the description
63
+ cwe_ids = CWEParser.extract_cwe_from_cve(vuln["description"])
64
+ if cwe_ids:
65
+ cwe_details = []
66
+ for cwe_id in cwe_ids[:3]: # Limit to 3 CWEs to avoid too many requests
67
+ cwe_detail = cwe_agent.get_cwe_details(cwe_id)
68
+ if "error" not in cwe_detail:
69
+ cwe_details.append(cwe_detail)
70
+ time.sleep(1) # Add a short delay between CWE lookups
71
+
72
+ if cwe_details:
73
+ vuln["related_cwe"] = cwe_details
74
+
75
+ # Generate report
76
+ if merged_results.get("vulnerabilities"):
77
+ report_filename = f"{software.lower().replace(' ', '_')}_{version}"
78
+ utils.save_report(merged_results, report_filename)
79
+ utils.generate_markdown_report(merged_results, report_filename)
80
+
81
+ return merged_results
82
+
83
+
84
+ @tool
85
+ def get_vulnerability_details(cve_id: str) -> Dict[str, Any]:
86
+ """
87
+ Get detailed information about a specific vulnerability.
88
+
89
+ Args:
90
+ cve_id: CVE ID to get details for
91
+
92
+ Returns:
93
+ Dictionary with detailed information about the vulnerability
94
+ """
95
+ logger.info(f"Getting details for {cve_id}")
96
+
97
+ # Mock response - in a real implementation, this would query actual sources
98
+ if cve_id == "CVE-2021-44228": # Log4Shell
99
+ return {
100
+ "id": "CVE-2021-44228",
101
+ "description": "Log4j es vulnerable a la ejecución remota de código (RCE) porque permite la sustitución de búsquedas JNDI, que pueden exponerse a través de campos controlados por el usuario en solicitudes HTTP, encabezados o mensajes de registro.",
102
+ "severity": "CRITICAL",
103
+ "cvss": "10.0",
104
+ "date": "2021-12-10",
105
+ "recommendation": "Actualizar a Log4j 2.15.0 o posterior",
106
+ "affected_versions": "Log4j 2.0 hasta 2.14.1",
107
+ "source": "https://nvd.nist.gov/vuln/detail/CVE-2021-44228",
108
+ "related_cwe": ["CWE-20", "CWE-400", "CWE-502"]
109
+ }
110
+ else:
111
+ return {
112
+ "id": cve_id,
113
+ "description": "No se encontraron detalles para este ID de CVE.",
114
+ "source": "https://cve.mitre.org/cgi-bin/cvename.cgi?name=" + cve_id
115
+ }
116
+
117
+ # ... existing code ...
agents/cve_agent.py ADDED
@@ -0,0 +1,22 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ Agent for searching the Common Vulnerabilities and Exposures (CVE) database.
3
+ """
4
+ from typing import Dict, List, Any, Optional
5
+
6
+ def search_cve_for_software(software: str, version: str) -> Dict[str, Any]:
7
+ """
8
+ Search for CVE entries related to a specific software and version.
9
+
10
+ Args:
11
+ software: Name of the software to search for
12
+ version: Version of the software to search for
13
+
14
+ Returns:
15
+ Dictionary with CVE information for the software and version
16
+ """
17
+ # Simplified mock implementation
18
+ return {
19
+ "software": software,
20
+ "version": version,
21
+ "vulnerabilities": []
22
+ }
agents/cwe_agent.py ADDED
@@ -0,0 +1,22 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ Agent for retrieving Common Weakness Enumeration (CWE) details.
3
+ """
4
+ from typing import Dict, List, Any, Optional
5
+
6
+ def get_cwe_details(cwe_id: str) -> Dict[str, Any]:
7
+ """
8
+ Get details about a specific CWE.
9
+
10
+ Args:
11
+ cwe_id: CWE ID to get details for (e.g., 'CWE-79')
12
+
13
+ Returns:
14
+ Dictionary with CWE details
15
+ """
16
+ # Simplified mock implementation
17
+ return {
18
+ "id": cwe_id,
19
+ "name": "Generic Weakness",
20
+ "description": "This is a placeholder for CWE details.",
21
+ "source": f"https://cwe.mitre.org/data/definitions/{cwe_id.replace('CWE-', '')}.html"
22
+ }
agents/nvd_agent.py ADDED
@@ -0,0 +1,22 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ Agent for searching the National Vulnerability Database (NVD).
3
+ """
4
+ from typing import Dict, List, Any, Optional
5
+
6
+ def search_nvd_for_software(software: str, version: str) -> Dict[str, Any]:
7
+ """
8
+ Search for NVD entries related to a specific software and version.
9
+
10
+ Args:
11
+ software: Name of the software to search for
12
+ version: Version of the software to search for
13
+
14
+ Returns:
15
+ Dictionary with NVD information for the software and version
16
+ """
17
+ # Simplified mock implementation
18
+ return {
19
+ "software": software,
20
+ "version": version,
21
+ "vulnerabilities": []
22
+ }
app.py CHANGED
@@ -1,65 +1,113 @@
1
- from smolagents import CodeAgent,DuckDuckGoSearchTool, HfApiModel,load_tool,tool
2
- import datetime
3
- import requests
4
- import pytz
5
- import yaml
6
- from tools.final_answer import FinalAnswerTool
 
 
 
 
7
 
8
- from Gradio_UI import GradioUI
9
- # Hola prueba
10
- # Below is an example of a tool that does nothing. Amaze us with your creativity !
11
- @tool
12
- def my_cutom_tool(arg1:str, arg2:int)-> str: #it's import to specify the return type
13
- #Keep this format for the description / args / args description but feel free to modify the tool
14
- """A tool that does nothing yet
15
- Args:
16
- arg1: the first argument
17
- arg2: the second argument
18
- """
19
- return "What magic will you build ?"
20
 
21
- @tool
22
- def get_current_time_in_timezone(timezone: str) -> str:
23
- """A tool that fetches the current local time in a specified timezone.
24
- Args:
25
- timezone: A string representing a valid timezone (e.g., 'America/New_York').
26
- """
27
- try:
28
- # Create timezone object
29
- tz = pytz.timezone(timezone)
30
- # Get current time in that timezone
31
- local_time = datetime.datetime.now(tz).strftime("%Y-%m-%d %H:%M:%S")
32
- return f"The current local time in {timezone} is: {local_time}"
33
- except Exception as e:
34
- return f"Error fetching time for timezone '{timezone}': {str(e)}"
35
 
 
 
36
 
37
- final_answer = FinalAnswerTool()
38
- model = HfApiModel(
39
- max_tokens=2096,
40
- temperature=0.5,
41
- model_id='Qwen/Qwen2.5-Coder-32B-Instruct',
42
- custom_role_conversions=None,
43
  )
 
44
 
 
 
45
 
46
- # Import tool from Hub
47
- image_generation_tool = load_tool("agents-course/text-to-image", trust_remote_code=True)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
48
 
49
- with open("prompts.yaml", 'r') as stream:
50
- prompt_templates = yaml.safe_load(stream)
 
 
 
 
 
 
 
51
 
52
- agent = CodeAgent(
53
- model=model,
54
- tools=[final_answer], ## add your tools here (don't remove final answer)
55
- max_steps=6,
56
- verbosity_level=1,
57
- grammar=None,
58
- planning_interval=None,
59
- name=None,
60
- description=None,
61
- prompt_templates=prompt_templates
62
- )
63
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
64
 
65
- GradioUI(agent).launch()
 
 
1
+ #!/usr/bin/env python3
2
+ """
3
+ Gradio UI for the Vulnerability Intelligence Agent (VIA).
4
+ This provides a chat interface to interact with the VIA using natural language.
5
+ """
6
+ import os
7
+ import sys
8
+ import argparse
9
+ import logging
10
+ from typing import Dict, List, Any, Optional
11
 
12
+ import gradio as gr
13
+ from smolagents import CodeAgent, HfApiModel, GradioUI
14
+ from smolagents.tools import load_tool, tool
 
 
 
 
 
 
 
 
 
15
 
16
+ # Asegurarse de que el directorio actual esté en sys.path para que los imports funcionen
17
+ sys.path.append(os.path.dirname(os.path.abspath(__file__)))
 
 
 
 
 
 
 
 
 
 
 
 
18
 
19
+ from agents.coordinator_agent import search_vulnerabilities_for_software, get_vulnerability_details
20
+ from tools import utils
21
 
22
+ # Configure logging
23
+ logging.basicConfig(
24
+ level=logging.INFO,
25
+ format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
 
 
26
  )
27
+ logger = utils.setup_logger("gradio_ui")
28
 
29
+ # Cargar las herramientas básicas usando las que ya existen en smolagents
30
+ final_answer = load_tool("smolagents/final_answer", trust_remote_code=True)
31
 
32
+ def get_agent_description():
33
+ """
34
+ Get the description for the agent.
35
+ """
36
+ return """
37
+ # 🔐 Vulnerability Intelligence Agent (VIA)
38
+
39
+ I am an intelligent agent designed to help you find vulnerabilities in software and systems.
40
+
41
+ ## What I can do:
42
+ - Search for known vulnerabilities in software by name and version
43
+ - Provide detailed information about specific vulnerabilities (CVE, CWE, etc.)
44
+ - Generate reports about vulnerabilities
45
+
46
+ ## How to use me:
47
+ - Ask about vulnerabilities in specific software, e.g., "Find vulnerabilities in OpenSSL 1.1.1k"
48
+ - Ask about a specific vulnerability, e.g., "Tell me about CVE-2021-44228"
49
+ - Use natural language to describe what you're looking for
50
+
51
+ ## Examples:
52
+ - "What vulnerabilities exist in Apache 2.4.54?"
53
+ - "Are there any critical vulnerabilities in log4j 2.14.1?"
54
+ - "Give me details about CVE-2021-44228"
55
+ - "What security issues should I be aware of in OpenSSL 1.1.1k?"
56
+ """
57
 
58
+ def create_parser():
59
+ """Create command line argument parser."""
60
+ parser = argparse.ArgumentParser(description="Vulnerability Intelligence Agent (VIA) UI")
61
+ parser.add_argument("--port", type=int, default=7860, help="Port to run the Gradio app on")
62
+ parser.add_argument("--host", type=str, default="127.0.0.1", help="Host to run the Gradio app on")
63
+ parser.add_argument("--model", type=str, default="Qwen/Qwen2.5-Coder-32B-Instruct",
64
+ help="HuggingFace model ID to use")
65
+ parser.add_argument("--share", action="store_true", help="Create a public link")
66
+ parser.add_argument("--verbose", action="store_true", help="Enable verbose logging")
67
 
68
+ return parser
 
 
 
 
 
 
 
 
 
 
69
 
70
+ def main():
71
+ """Main entry point for the Gradio UI."""
72
+ args = create_parser().parse_args()
73
+
74
+ # Configure logging level
75
+ log_level = logging.DEBUG if args.verbose else logging.INFO
76
+ logging.basicConfig(level=log_level)
77
+
78
+ # Initialize the model
79
+ model = HfApiModel(
80
+ max_tokens=2096,
81
+ temperature=0.5,
82
+ model_id=args.model,
83
+ custom_role_conversions=None,
84
+ )
85
+
86
+ # Initialize the agent con las herramientas ya existentes y las que hemos creado
87
+ agent = CodeAgent(
88
+ model=model,
89
+ tools=[search_vulnerabilities_for_software, get_vulnerability_details, final_answer],
90
+ max_steps=10,
91
+ verbosity_level=2 if args.verbose else 1,
92
+ )
93
+
94
+ # Create Gradio UI
95
+ ui = GradioUI(agent)
96
+
97
+ # Launch the UI
98
+ ui.launch(
99
+ share=args.share,
100
+ server_name=args.host,
101
+ server_port=args.port,
102
+ show_api=False,
103
+ favicon_path=None,
104
+ allowed_paths=[],
105
+ app_kwargs={
106
+ "title": "🔐 Vulnerability Intelligence Agent (VIA)",
107
+ "description": get_agent_description(),
108
+ "theme": gr.themes.Base(),
109
+ },
110
+ )
111
 
112
+ if __name__ == "__main__":
113
+ main()
coordinator_agent.py ADDED
@@ -0,0 +1,23 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ Coordinator Agent module for vulnerability intelligence.
3
+ This agent is responsible for coordinating the other agents and generating the final report.
4
+ """
5
+ import json
6
+ import time
7
+ import argparse
8
+ from typing import Dict, List, Any, Optional, Union
9
+
10
+ from smolagents import tool
11
+ from ..tools import utils
12
+ from ..tools.parsers import CWEParser
13
+ from . import cve_agent, nvd_agent, cisa_agent, cwe_agent
14
+
15
+ logger = utils.setup_logger("coordinator_agent")
16
+
17
+ # Enhance with CWE information
18
+ for vuln in merged_results.get("vulnerabilities", []):
19
+ if "description" in vuln:
20
+ # Try to extract CWEs from the description
21
+ cwe_ids = CWEParser.extract_cwe_from_cve(vuln["description"])
22
+ if cwe_ids:
23
+ # ... rest of the code ...
prompts.yaml CHANGED
@@ -9,139 +9,149 @@
9
  These print outputs will then appear in the 'Observation:' field, which will be available as input for the next step.
10
  In the end you have to return a final answer using the `final_answer` tool.
11
 
12
- Here are a few examples using notional tools:
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
13
  ---
14
- Task: "Generate an image of the oldest person in this document."
15
 
16
- Thought: I will proceed step by step and use the following tools: `document_qa` to find the oldest person in the document, then `image_generator` to generate an image according to the answer.
17
  Code:
18
  ```py
19
- answer = document_qa(document=document, question="Who is the oldest person mentioned?")
20
- print(answer)
21
  ```<end_code>
22
- Observation: "The oldest person in the document is John Doe, a 55 year old lumberjack living in Newfoundland."
23
 
24
- Thought: I will now generate an image showcasing the oldest person.
25
  Code:
26
  ```py
27
- image = image_generator("A portrait of John Doe, a 55-year-old man living in Canada.")
28
- final_answer(image)
 
 
 
 
 
 
 
 
 
 
 
 
 
29
  ```<end_code>
30
 
31
  ---
32
- Task: "What is the result of the following operation: 5 + 3 + 1294.678?"
33
 
34
- Thought: I will use python code to compute the result of the operation and then return the final answer using the `final_answer` tool
35
  Code:
36
  ```py
37
- result = 5 + 3 + 1294.678
38
- final_answer(result)
39
  ```<end_code>
 
40
 
41
- ---
42
- Task:
43
- "Answer the question in the variable `question` about the image stored in the variable `image`. The question is in French.
44
- You have been provided with these additional arguments, that you can access using the keys as variables in your python code:
45
- {'question': 'Quel est l'animal sur l'image?', 'image': 'path/to/image.jpg'}"
46
-
47
- Thought: I will use the following tools: `translator` to translate the question into English and then `image_qa` to answer the question on the input image.
48
  Code:
49
  ```py
50
- translated_question = translator(question=question, src_lang="French", tgt_lang="English")
51
- print(f"The translated question is {translated_question}.")
52
- answer = image_qa(image=image, question=translated_question)
53
- final_answer(f"The answer is {answer}")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
54
  ```<end_code>
55
 
56
  ---
57
- Task:
58
- In a 1979 interview, Stanislaus Ulam discusses with Martin Sherwin about other great physicists of his time, including Oppenheimer.
59
- What does he say was the consequence of Einstein learning too much math on his creativity, in one word?
60
 
61
- Thought: I need to find and read the 1979 interview of Stanislaus Ulam with Martin Sherwin.
62
  Code:
63
  ```py
64
- pages = search(query="1979 interview Stanislaus Ulam Martin Sherwin physicists Einstein")
65
- print(pages)
 
 
 
 
 
 
 
 
66
  ```<end_code>
67
- Observation:
68
- No result found for query "1979 interview Stanislaus Ulam Martin Sherwin physicists Einstein".
69
 
70
- Thought: The query was maybe too restrictive and did not find any results. Let's try again with a broader query.
71
  Code:
72
  ```py
73
- pages = search(query="1979 interview Stanislaus Ulam")
74
- print(pages)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
75
  ```<end_code>
76
- Observation:
77
- Found 6 pages:
78
- [Stanislaus Ulam 1979 interview](https://ahf.nuclearmuseum.org/voices/oral-histories/stanislaus-ulams-interview-1979/)
79
-
80
- [Ulam discusses Manhattan Project](https://ahf.nuclearmuseum.org/manhattan-project/ulam-manhattan-project/)
81
 
82
- (truncated)
83
-
84
- Thought: I will read the first 2 pages to know more.
85
- Code:
86
- ```py
87
- for url in ["https://ahf.nuclearmuseum.org/voices/oral-histories/stanislaus-ulams-interview-1979/", "https://ahf.nuclearmuseum.org/manhattan-project/ulam-manhattan-project/"]:
88
- whole_page = visit_webpage(url)
89
- print(whole_page)
90
- print("\n" + "="*80 + "\n") # Print separator between pages
91
- ```<end_code>
92
- Observation:
93
- Manhattan Project Locations:
94
- Los Alamos, NM
95
- Stanislaus Ulam was a Polish-American mathematician. He worked on the Manhattan Project at Los Alamos and later helped design the hydrogen bomb. In this interview, he discusses his work at
96
- (truncated)
97
-
98
- Thought: I now have the final answer: from the webpages visited, Stanislaus Ulam says of Einstein: "He learned too much mathematics and sort of diminished, it seems to me personally, it seems to me his purely physics creativity." Let's answer in one word.
99
- Code:
100
- ```py
101
- final_answer("diminished")
102
- ```<end_code>
103
-
104
- ---
105
- Task: "Which city has the highest population: Guangzhou or Shanghai?"
106
-
107
- Thought: I need to get the populations for both cities and compare them: I will use the tool `search` to get the population of both cities.
108
- Code:
109
- ```py
110
- for city in ["Guangzhou", "Shanghai"]:
111
- print(f"Population {city}:", search(f"{city} population")
112
- ```<end_code>
113
- Observation:
114
- Population Guangzhou: ['Guangzhou has a population of 15 million inhabitants as of 2021.']
115
- Population Shanghai: '26 million (2019)'
116
-
117
- Thought: Now I know that Shanghai has the highest population.
118
- Code:
119
- ```py
120
- final_answer("Shanghai")
121
- ```<end_code>
122
-
123
- ---
124
- Task: "What is the current age of the pope, raised to the power 0.36?"
125
-
126
- Thought: I will use the tool `wiki` to get the age of the pope, and confirm that with a web search.
127
- Code:
128
- ```py
129
- pope_age_wiki = wiki(query="current pope age")
130
- print("Pope age as per wikipedia:", pope_age_wiki)
131
- pope_age_search = web_search(query="current pope age")
132
- print("Pope age as per google search:", pope_age_search)
133
- ```<end_code>
134
- Observation:
135
- Pope age: "The pope Francis is currently 88 years old."
136
-
137
- Thought: I know that the pope is 88 years old. Let's compute the result using python code.
138
- Code:
139
- ```py
140
- pope_current_age = 88 ** 0.36
141
- final_answer(pope_current_age)
142
- ```<end_code>
143
-
144
- Above example were using notional tools that might not exist for you. On top of performing computations in the Python code snippets that you create, you only have access to these tools:
145
  {%- for tool in tools.values() %}
146
  - {{ tool.name }}: {{ tool.description }}
147
  Takes inputs: {{tool.inputs}}
@@ -172,6 +182,7 @@
172
  10. Don't give up! You're in charge of solving the task, not providing directions to solve it.
173
 
174
  Now Begin! If you solve the task correctly, you will receive a reward of $1,000,000.
 
175
  "planning":
176
  "initial_facts": |-
177
  Below I will present you a task.
 
9
  These print outputs will then appear in the 'Observation:' field, which will be available as input for the next step.
10
  In the end you have to return a final answer using the `final_answer` tool.
11
 
12
+ You are the Vulnerability Intelligence Agent (VIA), a specialized AI designed to find and analyze software vulnerabilities.
13
+
14
+ Your goal is to provide accurate information about vulnerabilities in software systems by searching across multiple vulnerability databases.
15
+
16
+ When a user asks about vulnerabilities in a specific software or version, you should:
17
+
18
+ 1. Extract the software name and version from the query
19
+ 2. Use the search_vulnerabilities_for_software() function to find vulnerabilities
20
+ 3. Present the results in a clear, structured format
21
+ 4. For critical vulnerabilities, provide more detailed information
22
+
23
+ If the user asks about a specific CVE or vulnerability identifier, use get_vulnerability_details() to provide comprehensive information.
24
+
25
+ Always cite your sources and be clear about the limitations of your knowledge. If you're uncertain about something, acknowledge it.
26
+
27
+ Remember that security is critical - don't spread misinformation about vulnerabilities.
28
+
29
+ NEVER make up CVE identifiers, vulnerability details, or other security information.
30
+
31
+ Here are a few examples using vulnerability search tools:
32
  ---
33
+ Task: "Find vulnerabilities in OpenSSL 1.1.1k"
34
 
35
+ Thought: I need to search for vulnerabilities related to OpenSSL version 1.1.1k. I'll use the search_vulnerabilities_for_software function with the appropriate parameters.
36
  Code:
37
  ```py
38
+ results = search_vulnerabilities_for_software(software="OpenSSL", version="1.1.1k")
39
+ print(json.dumps(results, indent=2))
40
  ```<end_code>
41
+ Observation: {"software": "OpenSSL", "version": "1.1.1k", "vulnerabilities": [{"id": "CVE-2021-3450", "description": "The certificate X509 with CA extension enabled is not properly validated", "severity": "HIGH", "cvss": "7.4", "date": "2021-03-25", "source": "https://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-2021-3450"}]}
42
 
43
+ Thought: I found a critical vulnerability in OpenSSL 1.1.1k. I'll now create a comprehensive and formatted answer for the user.
44
  Code:
45
  ```py
46
+ final_answer(f"""
47
+ # Vulnerabilities in OpenSSL 1.1.1k
48
+
49
+ I've found **1 high severity vulnerability** in OpenSSL 1.1.1k:
50
+
51
+ ## CVE-2021-3450
52
+ - **Severity**: HIGH (CVSS: 7.4)
53
+ - **Published**: March 25, 2021
54
+ - **Description**: The certificate X509 with CA extension enabled is not properly validated, which could allow attackers to issue a malicious certificate that might be accepted as valid.
55
+ - **Recommendation**: Update to OpenSSL 1.1.1l or newer
56
+ - **Source**: [CVE-2021-3450](https://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-2021-3450)
57
+
58
+ ## Recommendation
59
+ It is highly recommended to upgrade your OpenSSL installation to version 1.1.1l or newer to address this vulnerability.
60
+ """)
61
  ```<end_code>
62
 
63
  ---
64
+ Task: "Tell me about CVE-2021-44228"
65
 
66
+ Thought: I need to get details about a specific CVE identifier, CVE-2021-44228, which is commonly known as Log4Shell. I'll use the get_vulnerability_details function to retrieve information about this vulnerability.
67
  Code:
68
  ```py
69
+ details = get_vulnerability_details(cve_id="CVE-2021-44228")
70
+ print(json.dumps(details, indent=2))
71
  ```<end_code>
72
+ Observation: {"id": "CVE-2021-44228", "description": "Log4j is vulnerable to remote code execution (RCE) by allowing JNDI lookups, which can be exposed through user-controlled fields in HTTP requests, headers, or log messages.", "severity": "CRITICAL", "cvss": "10.0", "date": "2021-12-10", "recommendation": "Upgrade to Log4j 2.15.0 or later", "affected_versions": "Log4j 2.0 through 2.14.1", "source": "https://nvd.nist.gov/vuln/detail/CVE-2021-44228", "related_cwe": ["CWE-20", "CWE-400", "CWE-502"]}
73
 
74
+ Thought: I have detailed information about CVE-2021-44228 (Log4Shell). I'll format this information in a clear and comprehensive way for the user.
 
 
 
 
 
 
75
  Code:
76
  ```py
77
+ final_answer(f"""
78
+ # CVE-2021-44228 (Log4Shell)
79
+
80
+ ## Overview
81
+ - **Severity**: CRITICAL
82
+ - **CVSS Score**: 10.0 (maximum severity)
83
+ - **Published**: December 10, 2021
84
+ - **Affected Versions**: Log4j 2.0 through 2.14.1
85
+
86
+ ## Description
87
+ Log4j is vulnerable to remote code execution (RCE) by allowing JNDI lookups, which can be exposed through user-controlled fields in HTTP requests, headers, or log messages. This vulnerability is extremely serious as it allows attackers to execute arbitrary code on affected systems remotely with minimal effort.
88
+
89
+ ## Related Weaknesses
90
+ - CWE-20: Improper Input Validation
91
+ - CWE-400: Uncontrolled Resource Consumption
92
+ - CWE-502: Deserialization of Untrusted Data
93
+
94
+ ## Recommendation
95
+ Upgrade to Log4j 2.15.0 or later versions. If upgrading is not immediately possible, mitigations include:
96
+ - Setting the system property `-Dlog4j2.formatMsgNoLookups=true`
97
+ - Removing the JndiLookup class from the classpath
98
+
99
+ ## Source
100
+ [NVD Database Entry](https://nvd.nist.gov/vuln/detail/CVE-2021-44228)
101
+ """)
102
  ```<end_code>
103
 
104
  ---
105
+ Task: "Are there any vulnerabilities in log4j 2.14.1?"
 
 
106
 
107
+ Thought: I need to extract the software name and version from the query, then search for vulnerabilities related to Log4j version 2.14.1.
108
  Code:
109
  ```py
110
+ import re
111
+ query = "Are there any vulnerabilities in log4j 2.14.1?"
112
+ # Extract software name and version using regex
113
+ match = re.search(r"in\s+([a-zA-Z0-9_\-\.]+)\s+([0-9\.]+)", query)
114
+ if match:
115
+ software_name = match.group(1)
116
+ software_version = match.group(2)
117
+ print(f"Software: {software_name}, Version: {software_version}")
118
+ results = search_vulnerabilities_for_software(software_name, software_version)
119
+ print(json.dumps(results, indent=2))
120
  ```<end_code>
121
+ Observation: Software: log4j, Version: 2.14.1
122
+ {"software": "log4j", "version": "2.14.1", "vulnerabilities": [{"id": "CVE-2021-44228", "description": "Log4j is vulnerable to remote code execution (RCE) by allowing JNDI lookups", "severity": "CRITICAL", "cvss": "10.0", "date": "2021-12-10", "source": "https://nvd.nist.gov/vuln/detail/CVE-2021-44228"}]}
123
 
124
+ Thought: I've found a critical vulnerability in Log4j 2.14.1, which is the infamous Log4Shell vulnerability (CVE-2021-44228). I'll provide a detailed answer about this severe security issue.
125
  Code:
126
  ```py
127
+ final_answer(f"""
128
+ # Vulnerabilities in Log4j 2.14.1
129
+
130
+ ## Critical Findings
131
+
132
+ Yes, Log4j 2.14.1 has a **CRITICAL vulnerability** known as **Log4Shell** (CVE-2021-44228), which has a maximum CVSS score of 10.0.
133
+
134
+ ### CVE-2021-44228 (Log4Shell)
135
+ - **Severity**: CRITICAL
136
+ - **CVSS**: 10.0/10.0
137
+ - **Published**: December 10, 2021
138
+ - **Description**: This vulnerability allows attackers to execute arbitrary code on systems using Log4j by sending specially crafted requests that contain malicious JNDI lookups.
139
+ - **Impact**: Extremely high - allows remote code execution without authentication
140
+
141
+ ## Recommendation
142
+
143
+ It is **STRONGLY RECOMMENDED** to upgrade Log4j to version 2.15.0 or later immediately. This vulnerability is actively exploited in the wild and represents a serious security risk for any system using the affected versions.
144
+
145
+ If immediate upgrading is not possible, apply these mitigations:
146
+ - Set system property `-Dlog4j2.formatMsgNoLookups=true`
147
+ - Remove the JndiLookup class from the classpath
148
+
149
+ ## Source
150
+ [NVD Database Entry](https://nvd.nist.gov/vuln/detail/CVE-2021-44228)
151
+ """)
152
  ```<end_code>
 
 
 
 
 
153
 
154
+ Above examples were using notional tools that might not exist for you. On top of performing computations in the Python code snippets that you create, you only have access to these tools:
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
155
  {%- for tool in tools.values() %}
156
  - {{ tool.name }}: {{ tool.description }}
157
  Takes inputs: {{tool.inputs}}
 
182
  10. Don't give up! You're in charge of solving the task, not providing directions to solve it.
183
 
184
  Now Begin! If you solve the task correctly, you will receive a reward of $1,000,000.
185
+
186
  "planning":
187
  "initial_facts": |-
188
  Below I will present you a task.
requirements.txt CHANGED
@@ -1,5 +1,11 @@
1
  markdownify
2
  smolagents
3
  requests
4
- duckduckgo_search
 
 
 
 
 
5
  pandas
 
 
1
  markdownify
2
  smolagents
3
  requests
4
+ beautifulsoup4
5
+ httpx
6
+ python-dotenv
7
+ rich
8
+ pyyaml
9
+ gradio
10
  pandas
11
+ duckduckgo_search
tools/__init__.py ADDED
@@ -0,0 +1,7 @@
 
 
 
 
 
 
 
 
1
+ """
2
+ Vulnerability Intelligence Agent (VIA) - Tools Package.
3
+ This package contains utility tools for HTTP requests, parsing, and general utilities.
4
+ """
5
+
6
+ from . import utils
7
+ from . import final_answer
tools/parsers.py ADDED
@@ -0,0 +1,89 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ Parsers for Vulnerability Intelligence Agent.
3
+ This module contains parsers for different vulnerability data formats.
4
+ """
5
+ import re
6
+ from typing import Dict, List, Any, Optional
7
+
8
+ class CWEParser:
9
+ """Parser for Common Weakness Enumeration (CWE) entries."""
10
+
11
+ @staticmethod
12
+ def extract_cwe_from_cve(description: str) -> List[str]:
13
+ """
14
+ Extract CWE IDs from a CVE description.
15
+
16
+ Args:
17
+ description: CVE description text
18
+
19
+ Returns:
20
+ List of CWE IDs found in the description
21
+ """
22
+ # Pattern to match CWE IDs (e.g., CWE-79, CWE-89)
23
+ pattern = r"CWE-(\d+)"
24
+ matches = re.findall(pattern, description)
25
+
26
+ # Convert matches to full CWE IDs
27
+ cwe_ids = [f"CWE-{match}" for match in matches]
28
+
29
+ return cwe_ids
30
+
31
+
32
+ class NVDParser:
33
+ """Parser for National Vulnerability Database entries."""
34
+
35
+ @staticmethod
36
+ def parse_nvd_api_response(response_json: Dict[str, Any], software: str, version: str) -> List[Dict[str, Any]]:
37
+ """
38
+ Parse a response from the NVD API.
39
+
40
+ Args:
41
+ response_json: JSON response from NVD API
42
+ software: Software name being searched
43
+ version: Software version being searched
44
+
45
+ Returns:
46
+ List of parsed vulnerabilities
47
+ """
48
+ # Simplified implementation
49
+ return []
50
+
51
+
52
+ class CVEParser:
53
+ """Parser for Common Vulnerabilities and Exposures (CVE) entries."""
54
+
55
+ @staticmethod
56
+ def parse_cve_data(html_content: str, software: str, version: str) -> List[Dict[str, Any]]:
57
+ """
58
+ Parse CVE data from HTML content.
59
+
60
+ Args:
61
+ html_content: HTML content from the CVE website
62
+ software: Software name being searched
63
+ version: Software version being searched
64
+
65
+ Returns:
66
+ List of parsed vulnerabilities
67
+ """
68
+ # Simplified implementation
69
+ return []
70
+
71
+
72
+ class CISAParser:
73
+ """Parser for CISA Known Exploited Vulnerabilities (KEV) catalog entries."""
74
+
75
+ @staticmethod
76
+ def parse_kev_data(json_data: Dict[str, Any], software: str, version: str) -> List[Dict[str, Any]]:
77
+ """
78
+ Parse data from the CISA KEV catalog.
79
+
80
+ Args:
81
+ json_data: JSON data from the CISA KEV catalog
82
+ software: Software name being searched
83
+ version: Software version being searched
84
+
85
+ Returns:
86
+ List of parsed vulnerabilities
87
+ """
88
+ # Simplified implementation
89
+ return []
tools/utils.py ADDED
@@ -0,0 +1,146 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ Utility functions for the Vulnerability Intelligence Agent (VIA).
3
+ """
4
+ import os
5
+ import logging
6
+ import json
7
+ import datetime
8
+ from typing import Dict, List, Any, Optional, Union
9
+
10
+ def setup_logger(name: str) -> logging.Logger:
11
+ """
12
+ Set up a logger with the specified name.
13
+
14
+ Args:
15
+ name: Name of the logger
16
+
17
+ Returns:
18
+ Configured logger instance
19
+ """
20
+ logger = logging.getLogger(name)
21
+ if not logger.handlers:
22
+ handler = logging.StreamHandler()
23
+ formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
24
+ handler.setFormatter(formatter)
25
+ logger.addHandler(handler)
26
+
27
+ return logger
28
+
29
+ def merge_vulnerability_data(results: List[Dict[str, Any]]) -> Dict[str, Any]:
30
+ """
31
+ Merge vulnerability data from multiple sources.
32
+
33
+ Args:
34
+ results: List of dictionaries with vulnerability data from different sources
35
+
36
+ Returns:
37
+ Merged dictionary with all vulnerabilities
38
+ """
39
+ # Simple implementation - in a real system, this would be more sophisticated
40
+ if not results:
41
+ return {"software": "", "version": "", "vulnerabilities": []}
42
+
43
+ merged = {
44
+ "software": results[0].get("software", ""),
45
+ "version": results[0].get("version", ""),
46
+ "vulnerabilities": []
47
+ }
48
+
49
+ # Simple merge - just combine all vulnerabilities
50
+ for result in results:
51
+ if "vulnerabilities" in result:
52
+ merged["vulnerabilities"].extend(result["vulnerabilities"])
53
+
54
+ return merged
55
+
56
+ def save_report(data: Dict[str, Any], filename: str, report_dir: str = "reports") -> str:
57
+ """
58
+ Save vulnerability data to a JSON file.
59
+
60
+ Args:
61
+ data: Vulnerability data to save
62
+ filename: Base filename (without extension)
63
+ report_dir: Directory to save the report in
64
+
65
+ Returns:
66
+ Path to the saved JSON file
67
+ """
68
+ # Ensure the reports directory exists
69
+ os.makedirs(report_dir, exist_ok=True)
70
+
71
+ # Add timestamp to filename to avoid overwriting
72
+ timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
73
+ json_filename = f"{filename}_{timestamp}.json"
74
+ json_path = os.path.join(report_dir, json_filename)
75
+
76
+ # Save the data to a JSON file
77
+ with open(json_path, 'w') as f:
78
+ json.dump(data, f, indent=2)
79
+
80
+ return json_path
81
+
82
+ def generate_markdown_report(data: Dict[str, Any], filename: str, report_dir: str = "reports") -> str:
83
+ """
84
+ Generate a Markdown report from vulnerability data.
85
+
86
+ Args:
87
+ data: Vulnerability data
88
+ filename: Base filename (without extension)
89
+ report_dir: Directory to save the report in
90
+
91
+ Returns:
92
+ Path to the generated Markdown file
93
+ """
94
+ # Ensure the reports directory exists
95
+ os.makedirs(report_dir, exist_ok=True)
96
+
97
+ # Add timestamp to filename to avoid overwriting
98
+ timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
99
+ md_filename = f"{filename}_{timestamp}.md"
100
+ md_path = os.path.join(report_dir, md_filename)
101
+
102
+ with open(md_path, 'w') as f:
103
+ # Write title
104
+ f.write(f"# Vulnerability Report: {data['software']} {data['version']}\n\n")
105
+ f.write(f"*Generated on: {datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}*\n\n")
106
+
107
+ # Write summary
108
+ f.write("## Summary\n\n")
109
+ vuln_count = len(data.get('vulnerabilities', []))
110
+ f.write(f"Found **{vuln_count}** vulnerabilities for {data['software']} {data['version']}.\n\n")
111
+
112
+ # Write vulnerabilities
113
+ if vuln_count > 0:
114
+ f.write("## Vulnerabilities\n\n")
115
+
116
+ for i, vuln in enumerate(data['vulnerabilities'], 1):
117
+ f.write(f"### {i}. {vuln.get('id', 'Unknown ID')}\n\n")
118
+ f.write(f"**Severity:** {vuln.get('severity', 'Unknown')}")
119
+ if 'cvss' in vuln:
120
+ f.write(f" (CVSS: {vuln['cvss']})")
121
+ f.write("\n\n")
122
+
123
+ f.write(f"**Description:** {vuln.get('description', 'No description available.')}\n\n")
124
+
125
+ if 'date' in vuln:
126
+ f.write(f"**Published:** {vuln['date']}\n\n")
127
+
128
+ if 'recommendation' in vuln:
129
+ f.write(f"**Recommendation:** {vuln['recommendation']}\n\n")
130
+
131
+ if 'source' in vuln:
132
+ f.write(f"**Source:** [{vuln['source']}]({vuln['source']})\n\n")
133
+
134
+ f.write("---\n\n")
135
+ else:
136
+ f.write("## No vulnerabilities found\n\n")
137
+ f.write("No known vulnerabilities were found for this software and version.\n\n")
138
+
139
+ # Write footer
140
+ f.write("## References\n\n")
141
+ f.write("- [CVE (Common Vulnerabilities and Exposures)](https://cve.mitre.org/)\n")
142
+ f.write("- [NVD (National Vulnerability Database)](https://nvd.nist.gov/)\n")
143
+ f.write("- [CISA Known Exploited Vulnerabilities Catalog](https://www.cisa.gov/known-exploited-vulnerabilities-catalog)\n")
144
+ f.write("- [CWE (Common Weakness Enumeration)](https://cwe.mitre.org/)\n")
145
+
146
+ return md_path
vulnerability_intelligence_agent/README.md ADDED
@@ -0,0 +1,90 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ # Vulnerability Intelligence Agent (VIA)
2
+
3
+ Vulnerability Intelligence Agent (VIA) es un agente inteligente y modular basado en smolagents, capaz de buscar y reportar vulnerabilidades de software y sistemas operativos desde fuentes oficiales, mediante scraping y análisis web. Modular, extensible y diseñado para integrarse a pipelines de seguridad y análisis.
4
+
5
+ ## Características
6
+
7
+ - Búsqueda de vulnerabilidades en múltiples fuentes oficiales mediante web scraping/parsing
8
+ - Arquitectura modular: un agente por fuente
9
+ - Sistema de coordinación eficiente de agentes
10
+ - Generación de reportes automáticos legibles y exportables (JSON/Markdown)
11
+ - Diseño extensible para futuras integraciones con APIs
12
+
13
+ ## Fuentes soportadas
14
+
15
+ - CVE (Common Vulnerabilities and Exposures)
16
+ - CISA (Cybersecurity & Infrastructure Security Agency)
17
+ - CWE (Common Weakness Enumeration)
18
+ - NVD (National Vulnerability Database)
19
+
20
+ ## Instalación
21
+
22
+ ```bash
23
+ git clone <repository-url>
24
+ cd vulnerability_intelligence_agent
25
+ pip install -r requirements.txt
26
+ ```
27
+
28
+ ## Uso
29
+
30
+ ```bash
31
+ python main.py --input input.json
32
+ ```
33
+
34
+ Ejemplo de archivo input.json:
35
+ ```json
36
+ [
37
+ { "name": "OpenSSL", "version": "1.1.1k" },
38
+ { "name": "Apache", "version": "2.4.54" }
39
+ ]
40
+ ```
41
+
42
+ ## Formato de salida
43
+
44
+ El sistema genera reportes en formato JSON y Markdown con información detallada sobre las vulnerabilidades encontradas:
45
+
46
+ ```json
47
+ {
48
+ "software": "OpenSSL",
49
+ "version": "1.1.1k",
50
+ "vulnerabilities": [
51
+ {
52
+ "id": "CVE-2021-3450",
53
+ "description": "Improper Certificate Validation vulnerability...",
54
+ "severity": "HIGH",
55
+ "cvss": "7.4",
56
+ "source": "https://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-2021-3450",
57
+ "date": "2021-03-25",
58
+ "recommendation": "Update to version 1.1.1l"
59
+ }
60
+ ]
61
+ }
62
+ ```
63
+
64
+ ## Estructura del proyecto
65
+
66
+ ```
67
+ vulnerability_intelligence_agent/
68
+ ├── agents/ # Subagentes que buscan en cada fuente específica
69
+ │ ├── cve_agent.py
70
+ │ ├── cisa_agent.py
71
+ │ ├── cwe_agent.py
72
+ │ ├── nvd_agent.py
73
+ │ └── coordinator_agent.py # Agente principal que coordina a los demás
74
+ ├── tools/ # Herramientas genéricas para parsing, http, utils
75
+ │ ├── http_client.py
76
+ │ ├── parsers.py
77
+ │ └── utils.py
78
+ ├── reports/ # Carpeta para almacenar reportes generados
79
+ ├── main.py # Ejecución principal del agente
80
+ ├── README.md # Documentación inicial
81
+ └── requirements.txt # Librerías necesarias
82
+ ```
83
+
84
+ ## Licencia
85
+
86
+ MIT
87
+
88
+ ## Contribuciones
89
+
90
+ Las contribuciones son bienvenidas. Por favor, abra un issue o un pull request para sugerencias y mejoras.
vulnerability_intelligence_agent/__init__.py ADDED
@@ -0,0 +1,6 @@
 
 
 
 
 
 
 
1
+ """
2
+ Vulnerability Intelligence Agent (VIA).
3
+ An intelligent and modular agent for searching and reporting software vulnerabilities from official sources.
4
+ """
5
+
6
+ __version__ = "0.1.0"
vulnerability_intelligence_agent/agents/__init__.py ADDED
@@ -0,0 +1,10 @@
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ Vulnerability Intelligence Agent (VIA) - Agents Package.
3
+ This package contains the agent implementations for different vulnerability sources.
4
+ """
5
+
6
+ from . import cve_agent
7
+ from . import nvd_agent
8
+ from . import cisa_agent
9
+ from . import cwe_agent
10
+ from . import coordinator_agent
vulnerability_intelligence_agent/agents/cisa_agent.py ADDED
@@ -0,0 +1,154 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ CISA Agent module for vulnerability intelligence.
3
+ This agent is responsible for querying the CISA Known Exploited Vulnerabilities (KEV) Catalog.
4
+ """
5
+ import json
6
+ import time
7
+ from typing import Dict, List, Any, Optional
8
+
9
+ from smolagents import tool
10
+ from ..tools.http_client import HTTPClient
11
+ from ..tools.parsers import CISAParser
12
+ from ..tools import utils
13
+
14
+ logger = utils.setup_logger("cisa_agent")
15
+
16
+
17
+ @tool
18
+ def search_cisa_kev_for_software(software: str, version: str) -> Dict[str, Any]:
19
+ """
20
+ Search the CISA Known Exploited Vulnerabilities (KEV) Catalog for vulnerabilities related to a specific software and version.
21
+
22
+ Args:
23
+ software: Name of the software to search for
24
+ version: Version of the software to search for
25
+
26
+ Returns:
27
+ Dictionary with vulnerability information for the software and version
28
+ """
29
+ logger.info(f"Searching CISA KEV for {software} version {version}")
30
+
31
+ result = {
32
+ "software": software,
33
+ "version": version,
34
+ "vulnerabilities": []
35
+ }
36
+
37
+ http_client = HTTPClient()
38
+
39
+ try:
40
+ # CISA provides the KEV catalog as a JSON file
41
+ kev_url = "https://www.cisa.gov/sites/default/files/feeds/known_exploited_vulnerabilities.json"
42
+
43
+ # Get the KEV catalog
44
+ response = http_client.get(kev_url)
45
+ kev_data = response.json()
46
+
47
+ # Parse the KEV data
48
+ vulnerabilities = CISAParser.parse_cisa_kev_data(kev_data, software, version)
49
+
50
+ # Add the vulnerabilities to the result
51
+ result["vulnerabilities"] = vulnerabilities
52
+
53
+ logger.info(f"Found {len(vulnerabilities)} CISA KEV vulnerabilities for {software} {version}")
54
+ return result
55
+
56
+ except Exception as e:
57
+ logger.error(f"Error searching CISA KEV for {software} {version}: {str(e)}")
58
+ return {
59
+ "software": software,
60
+ "version": version,
61
+ "vulnerabilities": [],
62
+ "error": str(e)
63
+ }
64
+
65
+
66
+ @tool
67
+ def get_all_cisa_kev_vulnerabilities() -> Dict[str, Any]:
68
+ """
69
+ Get all vulnerabilities from the CISA Known Exploited Vulnerabilities (KEV) Catalog.
70
+
71
+ Returns:
72
+ Dictionary with all vulnerabilities from the KEV catalog
73
+ """
74
+ logger.info("Getting all CISA KEV vulnerabilities")
75
+
76
+ http_client = HTTPClient()
77
+
78
+ try:
79
+ # CISA provides the KEV catalog as a JSON file
80
+ kev_url = "https://www.cisa.gov/sites/default/files/feeds/known_exploited_vulnerabilities.json"
81
+
82
+ # Get the KEV catalog
83
+ response = http_client.get(kev_url)
84
+ kev_data = response.json()
85
+
86
+ # Extract catalog metadata
87
+ result = {
88
+ "title": kev_data.get("title", "CISA Known Exploited Vulnerabilities Catalog"),
89
+ "catalogVersion": kev_data.get("catalogVersion", ""),
90
+ "dateReleased": kev_data.get("dateReleased", ""),
91
+ "count": len(kev_data.get("vulnerabilities", [])),
92
+ "vulnerabilities": kev_data.get("vulnerabilities", [])
93
+ }
94
+
95
+ logger.info(f"Found {result['count']} total CISA KEV vulnerabilities")
96
+ return result
97
+
98
+ except Exception as e:
99
+ logger.error(f"Error getting all CISA KEV vulnerabilities: {str(e)}")
100
+ return {
101
+ "error": str(e),
102
+ "vulnerabilities": []
103
+ }
104
+
105
+
106
+ @tool
107
+ def get_cisa_kev_vulnerability(cve_id: str) -> Dict[str, Any]:
108
+ """
109
+ Get details about a specific vulnerability from the CISA KEV Catalog by CVE ID.
110
+
111
+ Args:
112
+ cve_id: CVE ID to look up (e.g., "CVE-2021-44228")
113
+
114
+ Returns:
115
+ Dictionary with vulnerability details if found
116
+ """
117
+ logger.info(f"Looking up CISA KEV vulnerability for {cve_id}")
118
+
119
+ http_client = HTTPClient()
120
+
121
+ try:
122
+ # CISA provides the KEV catalog as a JSON file
123
+ kev_url = "https://www.cisa.gov/sites/default/files/feeds/known_exploited_vulnerabilities.json"
124
+
125
+ # Get the KEV catalog
126
+ response = http_client.get(kev_url)
127
+ kev_data = response.json()
128
+
129
+ # Find the specific vulnerability by CVE ID
130
+ vulnerabilities = kev_data.get("vulnerabilities", [])
131
+ for vuln in vulnerabilities:
132
+ if vuln.get("cveID") == cve_id:
133
+ # Enhance the vulnerability data with a source URL and severity level
134
+ vuln["source"] = "https://www.cisa.gov/known-exploited-vulnerabilities-catalog"
135
+ vuln["severity"] = "CRITICAL" # All KEV items are considered critical
136
+
137
+ # Add a standardized recommendation
138
+ vuln["recommendation"] = f"URGENT: Update immediately as this vulnerability is being actively exploited in the wild"
139
+
140
+ return vuln
141
+
142
+ # If we get here, the vulnerability wasn't found
143
+ logger.warning(f"CVE {cve_id} not found in CISA KEV catalog")
144
+ return {
145
+ "id": cve_id,
146
+ "error": f"CVE {cve_id} not found in CISA KEV catalog"
147
+ }
148
+
149
+ except Exception as e:
150
+ logger.error(f"Error looking up CISA KEV vulnerability for {cve_id}: {str(e)}")
151
+ return {
152
+ "id": cve_id,
153
+ "error": str(e)
154
+ }
vulnerability_intelligence_agent/agents/coordinator_agent.py ADDED
@@ -0,0 +1,191 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ Coordinator Agent module for vulnerability intelligence.
3
+ This agent is responsible for coordinating the other agents and generating the final report.
4
+ """
5
+ import json
6
+ import time
7
+ import argparse
8
+ from typing import Dict, List, Any, Optional, Union
9
+
10
+ from smolagents import tool
11
+ from ..tools import utils
12
+ from . import cve_agent, nvd_agent, cisa_agent, cwe_agent
13
+
14
+ logger = utils.setup_logger("coordinator_agent")
15
+
16
+
17
+ @tool
18
+ def search_vulnerabilities_for_software(software: str, version: str) -> Dict[str, Any]:
19
+ """
20
+ Search for vulnerabilities related to a specific software and version across all sources.
21
+
22
+ Args:
23
+ software: Name of the software to search for
24
+ version: Version of the software to search for
25
+
26
+ Returns:
27
+ Dictionary with vulnerability information for the software and version from all sources
28
+ """
29
+ logger.info(f"Searching for vulnerabilities for {software} version {version}")
30
+
31
+ # Initialize results from each source
32
+ results = []
33
+
34
+ # Search NVD
35
+ logger.info("Searching NVD...")
36
+ nvd_results = nvd_agent.search_nvd_for_software(software, version)
37
+ if nvd_results.get("vulnerabilities"):
38
+ logger.info(f"Found {len(nvd_results['vulnerabilities'])} vulnerabilities in NVD")
39
+ results.append(nvd_results)
40
+
41
+ # Search CVE
42
+ logger.info("Searching CVE...")
43
+ cve_results = cve_agent.search_cve_for_software(software, version)
44
+ if cve_results.get("vulnerabilities"):
45
+ logger.info(f"Found {len(cve_results['vulnerabilities'])} vulnerabilities in CVE")
46
+ results.append(cve_results)
47
+
48
+ # Search CISA KEV
49
+ logger.info("Searching CISA KEV...")
50
+ cisa_results = cisa_agent.search_cisa_kev_for_software(software, version)
51
+ if cisa_results.get("vulnerabilities"):
52
+ logger.info(f"Found {len(cisa_results['vulnerabilities'])} vulnerabilities in CISA KEV")
53
+ results.append(cisa_results)
54
+
55
+ # Merge the results
56
+ merged_results = utils.merge_vulnerability_data(results)
57
+
58
+ # Enhance with CWE information
59
+ for vuln in merged_results.get("vulnerabilities", []):
60
+ if "description" in vuln:
61
+ # Try to extract CWEs from the description
62
+ cwe_ids = cwe_agent.CWEParser.extract_cwe_from_cve(vuln["description"])
63
+ if cwe_ids:
64
+ cwe_details = []
65
+ for cwe_id in cwe_ids[:3]: # Limit to 3 CWEs to avoid too many requests
66
+ cwe_detail = cwe_agent.get_cwe_details(cwe_id)
67
+ if "error" not in cwe_detail:
68
+ cwe_details.append(cwe_detail)
69
+ time.sleep(1) # Add a short delay between CWE lookups
70
+
71
+ if cwe_details:
72
+ vuln["related_cwe"] = cwe_details
73
+
74
+ # Generate report
75
+ if merged_results.get("vulnerabilities"):
76
+ report_filename = f"{software.lower().replace(' ', '_')}_{version}"
77
+ utils.save_report(merged_results, report_filename)
78
+ utils.generate_markdown_report(merged_results, report_filename)
79
+
80
+ return merged_results
81
+
82
+
83
+ @tool
84
+ def search_vulnerabilities_for_multiple_software(software_list: List[Dict[str, str]]) -> List[Dict[str, Any]]:
85
+ """
86
+ Search for vulnerabilities for multiple software and versions.
87
+
88
+ Args:
89
+ software_list: List of dictionaries, each with 'name' and 'version' keys
90
+
91
+ Returns:
92
+ List of dictionaries with vulnerability information for each software
93
+ """
94
+ logger.info(f"Searching vulnerabilities for {len(software_list)} software items")
95
+
96
+ results = []
97
+
98
+ for item in software_list:
99
+ software = item.get("name")
100
+ version = item.get("version")
101
+
102
+ if not software or not version:
103
+ logger.warning(f"Skipping invalid software item: {item}")
104
+ continue
105
+
106
+ logger.info(f"Processing {software} {version}")
107
+
108
+ # Search for vulnerabilities
109
+ result = search_vulnerabilities_for_software(software, version)
110
+ results.append(result)
111
+
112
+ # Add a short delay between software items to avoid hitting rate limits
113
+ if item != software_list[-1]: # Skip delay for the last item
114
+ time.sleep(2)
115
+
116
+ return results
117
+
118
+
119
+ @tool
120
+ def get_vulnerability_details(vulnerability_id: str) -> Dict[str, Any]:
121
+ """
122
+ Get detailed information about a specific vulnerability by ID (CVE or CWE).
123
+
124
+ Args:
125
+ vulnerability_id: ID of the vulnerability (e.g., CVE-2021-44228, CWE-79)
126
+
127
+ Returns:
128
+ Dictionary with detailed information about the vulnerability
129
+ """
130
+ logger.info(f"Getting details for vulnerability: {vulnerability_id}")
131
+
132
+ if vulnerability_id.startswith("CVE-"):
133
+ # Try to get info from NVD first
134
+ nvd_details = nvd_agent.get_nvd_cve_details(vulnerability_id)
135
+ if "error" not in nvd_details:
136
+ # Enrich with CISA KEV information if available
137
+ cisa_details = cisa_agent.get_cisa_kev_vulnerability(vulnerability_id)
138
+ if "error" not in cisa_details:
139
+ nvd_details["cisa_kev"] = True
140
+ nvd_details["cisa_required_action"] = cisa_details.get("requiredAction")
141
+ nvd_details["cisa_due_date"] = cisa_details.get("dueDate")
142
+ nvd_details["severity"] = "CRITICAL" # Override severity for KEV vulnerabilities
143
+ nvd_details["recommendation"] = "URGENT: Update immediately as this vulnerability is being actively exploited in the wild"
144
+
145
+ # Try to extract CWEs from the description
146
+ if "description" in nvd_details:
147
+ cwe_details = cwe_agent.extract_cwes_from_cve(nvd_details["description"])
148
+ if cwe_details:
149
+ nvd_details["related_cwe"] = cwe_details
150
+
151
+ return nvd_details
152
+
153
+ # Fallback to CVE database
154
+ return cve_agent.get_cve_details(vulnerability_id)
155
+
156
+ elif vulnerability_id.startswith("CWE-") or vulnerability_id.isdigit():
157
+ return cwe_agent.get_cwe_details(vulnerability_id)
158
+
159
+ else:
160
+ return {
161
+ "id": vulnerability_id,
162
+ "error": "Unknown vulnerability ID format. Should start with CVE- or CWE-."
163
+ }
164
+
165
+
166
+ @tool
167
+ def process_input_file(input_file: str) -> List[Dict[str, Any]]:
168
+ """
169
+ Process an input file containing a list of software to check for vulnerabilities.
170
+
171
+ Args:
172
+ input_file: Path to the input file (JSON format)
173
+
174
+ Returns:
175
+ List of dictionaries with vulnerability information for each software
176
+ """
177
+ logger.info(f"Processing input file: {input_file}")
178
+
179
+ try:
180
+ with open(input_file, 'r') as f:
181
+ software_list = json.load(f)
182
+
183
+ if not isinstance(software_list, list):
184
+ raise ValueError("Input file should contain a JSON array of software items")
185
+
186
+ # Process each software item
187
+ return search_vulnerabilities_for_multiple_software(software_list)
188
+
189
+ except Exception as e:
190
+ logger.error(f"Error processing input file {input_file}: {str(e)}")
191
+ return [{"error": str(e)}]
vulnerability_intelligence_agent/agents/cve_agent.py ADDED
@@ -0,0 +1,170 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ CVE Agent module for vulnerability intelligence.
3
+ This agent is responsible for querying the CVE database.
4
+ """
5
+ import re
6
+ import time
7
+ from typing import Dict, List, Any, Optional
8
+
9
+ from smolagents import tool
10
+ from ..tools.http_client import HTTPClient
11
+ from ..tools.parsers import CVEParser
12
+ from ..tools import utils
13
+
14
+ logger = utils.setup_logger("cve_agent")
15
+
16
+
17
+ @tool
18
+ def search_cve_for_software(software: str, version: str) -> Dict[str, Any]:
19
+ """
20
+ Search for CVEs related to a specific software and version.
21
+
22
+ Args:
23
+ software: Name of the software to search for
24
+ version: Version of the software to search for
25
+
26
+ Returns:
27
+ Dictionary with vulnerability information for the software and version
28
+ """
29
+ logger.info(f"Searching CVE for {software} version {version}")
30
+
31
+ result = {
32
+ "software": software,
33
+ "version": version,
34
+ "vulnerabilities": []
35
+ }
36
+
37
+ http_client = HTTPClient()
38
+
39
+ try:
40
+ # First, search for CVEs by software name and version
41
+ search_url = "https://cve.mitre.org/cgi-bin/cvekey.cgi"
42
+ search_term = f"{software} {version}"
43
+
44
+ # Get the search results page
45
+ soup = http_client.get_soup(search_url, params={"keyword": search_term})
46
+
47
+ # Parse the search results to get a list of relevant CVEs
48
+ vulnerabilities = CVEParser.parse_cve_search_results(soup, software, version)
49
+
50
+ # If we find any vulnerabilities, get more details for each one
51
+ if vulnerabilities:
52
+ for i, vuln in enumerate(vulnerabilities):
53
+ cve_id = vuln["id"]
54
+ # Get the CVE detail page
55
+ detail_url = f"https://cve.mitre.org/cgi-bin/cvename.cgi?name={cve_id}"
56
+ detail_soup = http_client.get_soup(detail_url)
57
+
58
+ # Parse the detail page to get more information
59
+ detailed_vuln = CVEParser.parse_cve_page(detail_soup, cve_id)
60
+
61
+ # Update with any new details
62
+ for key, value in detailed_vuln.items():
63
+ if key != "id": # Keep the original ID
64
+ vuln[key] = value
65
+
66
+ # Add recommendation based on severity if not already present
67
+ if "recommendation" not in vuln:
68
+ severity = vuln.get("severity", "UNKNOWN")
69
+ if severity == "CRITICAL" or severity == "HIGH":
70
+ vuln["recommendation"] = f"Update {software} to a version newer than {version} immediately"
71
+ elif severity == "MEDIUM":
72
+ vuln["recommendation"] = f"Plan to update {software} to a version newer than {version}"
73
+ else:
74
+ vuln["recommendation"] = f"Consider updating {software} when convenient"
75
+
76
+ # Add a short delay to avoid hitting rate limits
77
+ if i < len(vulnerabilities) - 1:
78
+ time.sleep(1)
79
+
80
+ result["vulnerabilities"] = vulnerabilities
81
+
82
+ # Additionally, try searching with the software name only to catch more generic vulnerabilities
83
+ if len(vulnerabilities) < 5:
84
+ broader_soup = http_client.get_soup(search_url, params={"keyword": software})
85
+ broader_vulnerabilities = CVEParser.parse_cve_search_results(broader_soup, software, version)
86
+
87
+ # Filter out any duplicates by ID
88
+ existing_ids = {v["id"] for v in vulnerabilities}
89
+ unique_broader = [v for v in broader_vulnerabilities if v["id"] not in existing_ids]
90
+
91
+ # Get details for each new vulnerability
92
+ for i, vuln in enumerate(unique_broader):
93
+ cve_id = vuln["id"]
94
+ detail_url = f"https://cve.mitre.org/cgi-bin/cvename.cgi?name={cve_id}"
95
+ detail_soup = http_client.get_soup(detail_url)
96
+
97
+ detailed_vuln = CVEParser.parse_cve_page(detail_soup, cve_id)
98
+
99
+ for key, value in detailed_vuln.items():
100
+ if key != "id":
101
+ vuln[key] = value
102
+
103
+ # Add recommendation if not already present
104
+ if "recommendation" not in vuln:
105
+ severity = vuln.get("severity", "UNKNOWN")
106
+ if severity == "CRITICAL" or severity == "HIGH":
107
+ vuln["recommendation"] = f"Update {software} to a version newer than {version} immediately"
108
+ elif severity == "MEDIUM":
109
+ vuln["recommendation"] = f"Plan to update {software} to a version newer than {version}"
110
+ else:
111
+ vuln["recommendation"] = f"Consider updating {software} when convenient"
112
+
113
+ # Add a short delay to avoid hitting rate limits
114
+ if i < len(unique_broader) - 1:
115
+ time.sleep(1)
116
+
117
+ # Add the unique broader vulnerabilities to the result
118
+ result["vulnerabilities"].extend(unique_broader)
119
+
120
+ logger.info(f"Found {len(result['vulnerabilities'])} CVE vulnerabilities for {software} {version}")
121
+ return result
122
+
123
+ except Exception as e:
124
+ logger.error(f"Error searching CVE for {software} {version}: {str(e)}")
125
+ return {
126
+ "software": software,
127
+ "version": version,
128
+ "vulnerabilities": [],
129
+ "error": str(e)
130
+ }
131
+
132
+
133
+ @tool
134
+ def get_cve_details(cve_id: str) -> Dict[str, Any]:
135
+ """
136
+ Get detailed information about a specific CVE.
137
+
138
+ Args:
139
+ cve_id: The CVE ID to look up
140
+
141
+ Returns:
142
+ Dictionary with detailed information about the CVE
143
+ """
144
+ logger.info(f"Getting details for {cve_id}")
145
+
146
+ http_client = HTTPClient()
147
+
148
+ try:
149
+ # Ensure the CVE ID is properly formatted
150
+ if not re.match(r"CVE-\d{4}-\d{4,}", cve_id):
151
+ return {
152
+ "id": cve_id,
153
+ "error": "Invalid CVE ID format. Should be CVE-YYYY-NNNN..."
154
+ }
155
+
156
+ # Get the CVE detail page
157
+ detail_url = f"https://cve.mitre.org/cgi-bin/cvename.cgi?name={cve_id}"
158
+ detail_soup = http_client.get_soup(detail_url)
159
+
160
+ # Parse the detail page
161
+ vuln_details = CVEParser.parse_cve_page(detail_soup, cve_id)
162
+
163
+ return vuln_details
164
+
165
+ except Exception as e:
166
+ logger.error(f"Error getting details for {cve_id}: {str(e)}")
167
+ return {
168
+ "id": cve_id,
169
+ "error": str(e)
170
+ }
vulnerability_intelligence_agent/agents/cwe_agent.py ADDED
@@ -0,0 +1,171 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ CWE Agent module for vulnerability intelligence.
3
+ This agent is responsible for querying the Common Weakness Enumeration (CWE) database.
4
+ """
5
+ import re
6
+ import time
7
+ from typing import Dict, List, Any, Optional
8
+
9
+ from smolagents import tool
10
+ from ..tools.http_client import HTTPClient
11
+ from ..tools.parsers import CWEParser
12
+ from ..tools import utils
13
+
14
+ logger = utils.setup_logger("cwe_agent")
15
+
16
+
17
+ @tool
18
+ def get_cwe_details(cwe_id: str) -> Dict[str, Any]:
19
+ """
20
+ Get detailed information about a specific CWE.
21
+
22
+ Args:
23
+ cwe_id: The CWE ID to look up (format: CWE-NUM or just NUM)
24
+
25
+ Returns:
26
+ Dictionary with detailed information about the CWE
27
+ """
28
+ logger.info(f"Getting details for {cwe_id}")
29
+
30
+ http_client = HTTPClient()
31
+
32
+ try:
33
+ # Normalize the CWE ID format
34
+ if cwe_id.startswith("CWE-"):
35
+ cwe_num = cwe_id[4:]
36
+ else:
37
+ cwe_num = cwe_id
38
+ cwe_id = f"CWE-{cwe_id}"
39
+
40
+ # Ensure the CWE ID is valid
41
+ if not re.match(r"^\d+$", cwe_num):
42
+ return {
43
+ "id": cwe_id,
44
+ "error": "Invalid CWE ID format. Should be numeric or CWE-NUM."
45
+ }
46
+
47
+ # Get the CWE detail page
48
+ detail_url = f"https://cwe.mitre.org/data/definitions/{cwe_num}.html"
49
+ detail_soup = http_client.get_soup(detail_url)
50
+
51
+ # Parse the detail page
52
+ cwe_details = CWEParser.parse_cwe_page(detail_soup, cwe_id)
53
+
54
+ return cwe_details
55
+
56
+ except Exception as e:
57
+ logger.error(f"Error getting details for {cwe_id}: {str(e)}")
58
+ return {
59
+ "id": cwe_id,
60
+ "error": str(e)
61
+ }
62
+
63
+
64
+ @tool
65
+ def extract_cwes_from_cve(cve_description: str) -> List[Dict[str, Any]]:
66
+ """
67
+ Extract CWE IDs from a CVE description and get details for each.
68
+
69
+ Args:
70
+ cve_description: CVE description text to extract CWEs from
71
+
72
+ Returns:
73
+ List of CWE details dictionaries
74
+ """
75
+ logger.info("Extracting CWEs from CVE description")
76
+
77
+ try:
78
+ # Extract CWE IDs
79
+ cwe_ids = CWEParser.extract_cwe_from_cve(cve_description)
80
+
81
+ if not cwe_ids:
82
+ logger.info("No CWE IDs found in the CVE description")
83
+ return []
84
+
85
+ logger.info(f"Found {len(cwe_ids)} CWE IDs: {', '.join(cwe_ids)}")
86
+
87
+ # Get details for each CWE
88
+ cwe_details_list = []
89
+ for cwe_id in cwe_ids:
90
+ # Get details for this CWE
91
+ cwe_details = get_cwe_details(cwe_id)
92
+
93
+ # Only add if we got valid details (no error)
94
+ if "error" not in cwe_details:
95
+ cwe_details_list.append(cwe_details)
96
+
97
+ # Add a short delay to avoid hitting rate limits
98
+ if cwe_id != cwe_ids[-1]: # Skip delay for the last item
99
+ time.sleep(1)
100
+
101
+ return cwe_details_list
102
+
103
+ except Exception as e:
104
+ logger.error(f"Error extracting CWEs from CVE description: {str(e)}")
105
+ return []
106
+
107
+
108
+ @tool
109
+ def search_cwe_weaknesses(keyword: str) -> List[Dict[str, Any]]:
110
+ """
111
+ Search for CWE weaknesses by keyword.
112
+
113
+ Args:
114
+ keyword: Keyword to search for
115
+
116
+ Returns:
117
+ List of matching CWE weakness dictionaries
118
+ """
119
+ logger.info(f"Searching CWE for keyword: {keyword}")
120
+
121
+ http_client = HTTPClient()
122
+
123
+ try:
124
+ # Search URL
125
+ search_url = "https://cwe.mitre.org/find/index.html"
126
+
127
+ # Get the search results page
128
+ soup = http_client.get_soup(search_url, params={"query": keyword})
129
+
130
+ # Parse the search results
131
+ results = []
132
+
133
+ # Look for the table of matching items
134
+ result_table = soup.find("table", {"class": "detail"})
135
+ if not result_table:
136
+ logger.warning(f"No results found for keyword: {keyword}")
137
+ return []
138
+
139
+ # Extract information from each row
140
+ rows = result_table.find_all("tr")[1:] # Skip header row
141
+ for row in rows:
142
+ cells = row.find_all("td")
143
+ if len(cells) >= 2:
144
+ # Extract CWE ID and name
145
+ id_cell = cells[0]
146
+ name_cell = cells[1]
147
+
148
+ cwe_link = id_cell.find("a")
149
+ if cwe_link:
150
+ cwe_id = cwe_link.get_text(strip=True)
151
+ cwe_name = name_cell.get_text(strip=True)
152
+
153
+ # Get the URL from the link
154
+ cwe_url = cwe_link.get("href")
155
+ if cwe_url and not cwe_url.startswith("http"):
156
+ cwe_url = f"https://cwe.mitre.org{cwe_url}"
157
+
158
+ result = {
159
+ "id": cwe_id,
160
+ "name": cwe_name,
161
+ "source": cwe_url
162
+ }
163
+
164
+ results.append(result)
165
+
166
+ logger.info(f"Found {len(results)} CWE weaknesses for keyword: {keyword}")
167
+ return results
168
+
169
+ except Exception as e:
170
+ logger.error(f"Error searching CWE for keyword {keyword}: {str(e)}")
171
+ return []
vulnerability_intelligence_agent/agents/nvd_agent.py ADDED
@@ -0,0 +1,227 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ NVD Agent module for vulnerability intelligence.
3
+ This agent is responsible for querying the National Vulnerability Database.
4
+ """
5
+ import json
6
+ import time
7
+ from typing import Dict, List, Any, Optional
8
+ import urllib.parse
9
+
10
+ from smolagents import tool
11
+ from ..tools.http_client import HTTPClient
12
+ from ..tools.parsers import NVDParser
13
+ from ..tools import utils
14
+
15
+ logger = utils.setup_logger("nvd_agent")
16
+
17
+
18
+ @tool
19
+ def search_nvd_for_software(software: str, version: str, max_results: int = 20) -> Dict[str, Any]:
20
+ """
21
+ Search the National Vulnerability Database for vulnerabilities related to a specific software and version.
22
+
23
+ Args:
24
+ software: Name of the software to search for
25
+ version: Version of the software to search for
26
+ max_results: Maximum number of results to return (default: 20)
27
+
28
+ Returns:
29
+ Dictionary with vulnerability information for the software and version
30
+ """
31
+ logger.info(f"Searching NVD for {software} version {version}")
32
+
33
+ result = {
34
+ "software": software,
35
+ "version": version,
36
+ "vulnerabilities": []
37
+ }
38
+
39
+ http_client = HTTPClient()
40
+
41
+ try:
42
+ # NVD API endpoint
43
+ # Note: This uses the public API without an API key, which has rate limits
44
+ # For production use, consider registering for an API key: https://nvd.nist.gov/developers/request-an-api-key
45
+ api_url = "https://services.nvd.nist.gov/rest/json/cves/2.0"
46
+
47
+ # Construct the query for the specific software
48
+ # Format the search to look for CPE matches containing the software name
49
+ encoded_software = urllib.parse.quote(software.lower())
50
+
51
+ # First query: search for exact software + version
52
+ params = {
53
+ "cpeName": f"cpe:2.3:*:{encoded_software}:*:{version}:*:*:*:*:*:*",
54
+ "resultsPerPage": max_results
55
+ }
56
+
57
+ # Make the request
58
+ response = http_client.get(api_url, params=params)
59
+ response_json = response.json()
60
+
61
+ # Parse the response
62
+ vulnerabilities = NVDParser.parse_nvd_api_response(response_json, software, version)
63
+
64
+ # If we didn't find enough results, try a broader search without specifying the version
65
+ if len(vulnerabilities) < 5:
66
+ # Add a delay to respect rate limits
67
+ time.sleep(2)
68
+
69
+ # Second query: search for software name only
70
+ broader_params = {
71
+ "cpeName": f"cpe:2.3:*:{encoded_software}:*:*:*:*:*:*:*:*",
72
+ "resultsPerPage": max_results
73
+ }
74
+
75
+ broader_response = http_client.get(api_url, params=broader_params)
76
+ broader_json = broader_response.json()
77
+
78
+ broader_vulns = NVDParser.parse_nvd_api_response(broader_json, software, version)
79
+
80
+ # Filter out duplicates
81
+ existing_ids = {v["id"] for v in vulnerabilities}
82
+ unique_broader = [v for v in broader_vulns if v["id"] not in existing_ids]
83
+
84
+ vulnerabilities.extend(unique_broader)
85
+
86
+ # Try a keyword search as a fallback
87
+ if len(vulnerabilities) < 5:
88
+ # Add a delay to respect rate limits
89
+ time.sleep(2)
90
+
91
+ # Third query: keyword search
92
+ keyword_params = {
93
+ "keywordSearch": f"{software} {version}",
94
+ "resultsPerPage": max_results
95
+ }
96
+
97
+ keyword_response = http_client.get(api_url, params=keyword_params)
98
+ keyword_json = keyword_response.json()
99
+
100
+ keyword_vulns = NVDParser.parse_nvd_api_response(keyword_json, software, version)
101
+
102
+ # Filter out duplicates
103
+ existing_ids = {v["id"] for v in vulnerabilities}
104
+ unique_keyword = [v for v in keyword_vulns if v["id"] not in existing_ids]
105
+
106
+ vulnerabilities.extend(unique_keyword)
107
+
108
+ # Set the vulnerabilities in the result
109
+ result["vulnerabilities"] = vulnerabilities
110
+
111
+ logger.info(f"Found {len(vulnerabilities)} NVD vulnerabilities for {software} {version}")
112
+ return result
113
+
114
+ except Exception as e:
115
+ logger.error(f"Error searching NVD for {software} {version}: {str(e)}")
116
+ return {
117
+ "software": software,
118
+ "version": version,
119
+ "vulnerabilities": [],
120
+ "error": str(e)
121
+ }
122
+
123
+
124
+ @tool
125
+ def get_nvd_cve_details(cve_id: str) -> Dict[str, Any]:
126
+ """
127
+ Get detailed information about a specific CVE from the NVD database.
128
+
129
+ Args:
130
+ cve_id: The CVE ID to look up
131
+
132
+ Returns:
133
+ Dictionary with detailed information about the CVE from NVD
134
+ """
135
+ logger.info(f"Getting NVD details for {cve_id}")
136
+
137
+ http_client = HTTPClient()
138
+
139
+ try:
140
+ # NVD API endpoint for a specific CVE
141
+ api_url = f"https://services.nvd.nist.gov/rest/json/cves/2.0?cveId={cve_id}"
142
+
143
+ # Make the request
144
+ response = http_client.get(api_url)
145
+ response_json = response.json()
146
+
147
+ # Check if we got a valid response with vulnerabilities
148
+ if response_json.get("totalResults", 0) == 0 or not response_json.get("vulnerabilities"):
149
+ return {
150
+ "id": cve_id,
151
+ "error": "CVE not found in NVD"
152
+ }
153
+
154
+ # Extract the vulnerability data
155
+ vuln_data = response_json["vulnerabilities"][0]["cve"]
156
+
157
+ # Extract key information
158
+ result = {
159
+ "id": vuln_data.get("id", cve_id),
160
+ "source": f"https://nvd.nist.gov/vuln/detail/{cve_id}"
161
+ }
162
+
163
+ # Extract description
164
+ descriptions = vuln_data.get("descriptions", [])
165
+ for desc in descriptions:
166
+ if desc.get("lang") == "en":
167
+ result["description"] = desc.get("value", "")
168
+ break
169
+
170
+ # Extract metrics (severity and CVSS score)
171
+ metrics = vuln_data.get("metrics", {})
172
+ cvss_v3 = metrics.get("cvssMetricV31", [])
173
+ cvss_v2 = metrics.get("cvssMetricV2", [])
174
+
175
+ if cvss_v3:
176
+ base_metric = cvss_v3[0].get("cvssData", {})
177
+ result["cvss"] = str(base_metric.get("baseScore", ""))
178
+ result["severity"] = base_metric.get("baseSeverity", "UNKNOWN").upper()
179
+ elif cvss_v2:
180
+ base_metric = cvss_v2[0].get("cvssData", {})
181
+ score = base_metric.get("baseScore")
182
+ result["cvss"] = str(score) if score is not None else ""
183
+
184
+ # Map CVSS v2 score to severity
185
+ if score is not None:
186
+ if score >= 9.0:
187
+ result["severity"] = "CRITICAL"
188
+ elif score >= 7.0:
189
+ result["severity"] = "HIGH"
190
+ elif score >= 4.0:
191
+ result["severity"] = "MEDIUM"
192
+ else:
193
+ result["severity"] = "LOW"
194
+ else:
195
+ result["severity"] = "UNKNOWN"
196
+
197
+ # Extract published date
198
+ if "published" in vuln_data:
199
+ try:
200
+ date_str = vuln_data["published"].replace("Z", "+00:00")
201
+ result["date"] = date_str.split("T")[0] # Just keep the date part
202
+ except (ValueError, IndexError):
203
+ result["date"] = vuln_data["published"]
204
+
205
+ # Extract references
206
+ references = vuln_data.get("references", [])
207
+ if references:
208
+ result["references"] = [ref.get("url") for ref in references if "url" in ref]
209
+
210
+ # Add recommendation based on severity
211
+ if "severity" in result:
212
+ severity = result["severity"]
213
+ if severity in ["CRITICAL", "HIGH"]:
214
+ result["recommendation"] = "Update affected software immediately"
215
+ elif severity == "MEDIUM":
216
+ result["recommendation"] = "Plan to update affected software soon"
217
+ else:
218
+ result["recommendation"] = "Consider updating affected software when convenient"
219
+
220
+ return result
221
+
222
+ except Exception as e:
223
+ logger.error(f"Error getting NVD details for {cve_id}: {str(e)}")
224
+ return {
225
+ "id": cve_id,
226
+ "error": str(e)
227
+ }
vulnerability_intelligence_agent/example_input.json ADDED
@@ -0,0 +1,14 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ [
2
+ {
3
+ "name": "OpenSSL",
4
+ "version": "1.1.1k"
5
+ },
6
+ {
7
+ "name": "Apache",
8
+ "version": "2.4.54"
9
+ },
10
+ {
11
+ "name": "log4j",
12
+ "version": "2.14.1"
13
+ }
14
+ ]
vulnerability_intelligence_agent/main.py ADDED
@@ -0,0 +1,136 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ #!/usr/bin/env python3
2
+ """
3
+ Main script for the Vulnerability Intelligence Agent (VIA).
4
+ """
5
+ import os
6
+ import sys
7
+ import json
8
+ import argparse
9
+ import logging
10
+ from typing import List, Dict, Any
11
+
12
+ from smolagents import CodeAgent, HfApiModel
13
+
14
+ from agents.coordinator_agent import process_input_file, search_vulnerabilities_for_software, search_vulnerabilities_for_multiple_software
15
+ from tools import utils
16
+
17
+ logger = utils.setup_logger("main")
18
+
19
+
20
+ def parse_args():
21
+ """Parse command line arguments."""
22
+ parser = argparse.ArgumentParser(description="Vulnerability Intelligence Agent (VIA)")
23
+ parser.add_argument("--input", "-i", type=str, help="Path to input JSON file containing software to check")
24
+ parser.add_argument("--software", "-s", type=str, help="Name of software to check")
25
+ parser.add_argument("--version", "-v", type=str, help="Version of software to check")
26
+ parser.add_argument("--output-dir", "-o", type=str, default="reports", help="Directory to save reports")
27
+ parser.add_argument("--verbose", action="store_true", help="Enable verbose logging")
28
+ parser.add_argument("--model", type=str, default="Qwen/Qwen2.5-Coder-32B-Instruct", help="HuggingFace model ID to use")
29
+
30
+ return parser.parse_args()
31
+
32
+
33
+ def main():
34
+ """Main entry point for the script."""
35
+ args = parse_args()
36
+
37
+ # Configure logging
38
+ log_level = logging.DEBUG if args.verbose else logging.INFO
39
+ logging.basicConfig(
40
+ level=log_level,
41
+ format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
42
+ )
43
+
44
+ logger.info("Starting Vulnerability Intelligence Agent (VIA)")
45
+
46
+ # Set output directory
47
+ if args.output_dir:
48
+ os.makedirs(args.output_dir, exist_ok=True)
49
+
50
+ # Initialize the model
51
+ model = HfApiModel(
52
+ max_tokens=2096,
53
+ temperature=0.5,
54
+ model_id=args.model,
55
+ custom_role_conversions=None,
56
+ )
57
+
58
+ # Initialize the agent
59
+ agent = CodeAgent(
60
+ model=model,
61
+ tools=[process_input_file, search_vulnerabilities_for_software, search_vulnerabilities_for_multiple_software],
62
+ max_steps=10,
63
+ verbosity_level=2 if args.verbose else 1,
64
+ )
65
+
66
+ # Process input
67
+ try:
68
+ if args.input:
69
+ # Process input file
70
+ logger.info(f"Processing input file: {args.input}")
71
+
72
+ # Use the process_input_file tool directly
73
+ result = process_input_file(args.input)
74
+
75
+ # Display summary
76
+ for software_result in result:
77
+ software_name = software_result.get("software", "Unknown")
78
+ software_version = software_result.get("version", "Unknown")
79
+ vuln_count = len(software_result.get("vulnerabilities", []))
80
+
81
+ print(f"\n{software_name} {software_version}: {vuln_count} vulnerabilities found")
82
+
83
+ # Show top 3 critical/high vulnerabilities if any
84
+ high_vulns = [v for v in software_result.get("vulnerabilities", [])
85
+ if v.get("severity") in ["CRITICAL", "HIGH"]]
86
+
87
+ if high_vulns:
88
+ print("\nTop Critical/High Vulnerabilities:")
89
+ for i, vuln in enumerate(high_vulns[:3], 1):
90
+ print(f"{i}. {vuln.get('id')} - {vuln.get('severity')} - {vuln.get('source')}")
91
+ description = vuln.get("description", "")
92
+ if len(description) > 100:
93
+ description = description[:100] + "..."
94
+ print(f" {description}")
95
+
96
+ elif args.software and args.version:
97
+ # Process single software
98
+ logger.info(f"Checking vulnerabilities for {args.software} {args.version}")
99
+
100
+ # Use the search_vulnerabilities_for_software tool directly
101
+ result = search_vulnerabilities_for_software(args.software, args.version)
102
+
103
+ # Display summary
104
+ vuln_count = len(result.get("vulnerabilities", []))
105
+ print(f"\n{args.software} {args.version}: {vuln_count} vulnerabilities found")
106
+
107
+ if vuln_count > 0:
108
+ # Show all vulnerabilities
109
+ print("\nVulnerabilities:")
110
+ for i, vuln in enumerate(result.get("vulnerabilities", []), 1):
111
+ print(f"{i}. {vuln.get('id')} - {vuln.get('severity')}")
112
+ description = vuln.get("description", "")
113
+ if len(description) > 100:
114
+ description = description[:100] + "..."
115
+ print(f" {description}")
116
+ print(f" Source: {vuln.get('source')}")
117
+ if vuln.get("recommendation"):
118
+ print(f" Recommendation: {vuln.get('recommendation')}")
119
+ print()
120
+
121
+ else:
122
+ # No input provided
123
+ print("Error: No input provided. Use --input to specify an input file or --software and --version to check a specific software.")
124
+ parser.print_help()
125
+ return 1
126
+
127
+ except Exception as e:
128
+ logger.error(f"Error: {str(e)}")
129
+ return 1
130
+
131
+ logger.info("Vulnerability Intelligence Agent completed successfully")
132
+ return 0
133
+
134
+
135
+ if __name__ == "__main__":
136
+ sys.exit(main())
vulnerability_intelligence_agent/requirements.txt ADDED
@@ -0,0 +1,7 @@
 
 
 
 
 
 
 
 
1
+ smolagents>=1.9.2
2
+ requests>=2.32.3
3
+ beautifulsoup4>=4.13.3
4
+ httpx>=0.28.1
5
+ python-dotenv>=1.0.1
6
+ rich>=13.9.4
7
+ pyyaml>=6.0.2
vulnerability_intelligence_agent/tools/__init__.py ADDED
@@ -0,0 +1,8 @@
 
 
 
 
 
 
 
 
 
1
+ """
2
+ Vulnerability Intelligence Agent (VIA) - Tools Package.
3
+ This package contains utility tools for HTTP requests, parsing, and general utilities.
4
+ """
5
+
6
+ from . import http_client
7
+ from . import parsers
8
+ from . import utils
vulnerability_intelligence_agent/tools/http_client.py ADDED
@@ -0,0 +1,214 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ HTTP Client module for VIA.
3
+ Provides a unified interface for making HTTP requests to vulnerability databases.
4
+ """
5
+ import time
6
+ import random
7
+ import asyncio
8
+ from typing import Dict, Optional, Any, Union
9
+ import httpx
10
+ import requests
11
+ from bs4 import BeautifulSoup
12
+
13
+
14
+ class HTTPClient:
15
+ """
16
+ A client for making HTTP requests to vulnerability databases.
17
+ Supports both synchronous and asynchronous requests.
18
+ """
19
+
20
+ DEFAULT_HEADERS = {
21
+ "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/96.0.4664.110 Safari/537.36",
22
+ "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8",
23
+ "Accept-Language": "en-US,en;q=0.5",
24
+ "DNT": "1",
25
+ "Connection": "keep-alive",
26
+ "Upgrade-Insecure-Requests": "1",
27
+ }
28
+
29
+ DEFAULT_TIMEOUT = 30.0 # seconds
30
+ DEFAULT_RETRIES = 3
31
+ DEFAULT_RETRY_DELAY = 2.0 # seconds
32
+
33
+ def __init__(
34
+ self,
35
+ headers: Optional[Dict[str, str]] = None,
36
+ timeout: float = DEFAULT_TIMEOUT,
37
+ max_retries: int = DEFAULT_RETRIES,
38
+ retry_delay: float = DEFAULT_RETRY_DELAY,
39
+ ):
40
+ """
41
+ Initialize the HTTP client with custom headers and settings.
42
+
43
+ Args:
44
+ headers: Optional custom headers to use for requests
45
+ timeout: Request timeout in seconds
46
+ max_retries: Maximum number of retry attempts for failed requests
47
+ retry_delay: Base delay between retries in seconds
48
+ """
49
+ self.headers = headers or self.DEFAULT_HEADERS.copy()
50
+ self.timeout = timeout
51
+ self.max_retries = max_retries
52
+ self.retry_delay = retry_delay
53
+
54
+ # Initialize clients
55
+ self.sync_client = requests.Session()
56
+ self.sync_client.headers.update(self.headers)
57
+
58
+ self.async_client = httpx.AsyncClient(
59
+ headers=self.headers,
60
+ timeout=self.timeout,
61
+ follow_redirects=True,
62
+ )
63
+
64
+ def get(
65
+ self,
66
+ url: str,
67
+ params: Optional[Dict[str, Any]] = None,
68
+ headers: Optional[Dict[str, str]] = None,
69
+ timeout: Optional[float] = None,
70
+ ) -> requests.Response:
71
+ """
72
+ Make a synchronous GET request with retries.
73
+
74
+ Args:
75
+ url: The URL to request
76
+ params: Optional URL parameters
77
+ headers: Optional headers to add or override
78
+ timeout: Optional timeout override
79
+
80
+ Returns:
81
+ Response object from requests
82
+ """
83
+ merged_headers = self.headers.copy()
84
+ if headers:
85
+ merged_headers.update(headers)
86
+
87
+ timeout = timeout or self.timeout
88
+
89
+ for attempt in range(self.max_retries):
90
+ try:
91
+ response = self.sync_client.get(
92
+ url,
93
+ params=params,
94
+ headers=merged_headers,
95
+ timeout=timeout,
96
+ )
97
+ response.raise_for_status()
98
+ return response
99
+ except (requests.RequestException, httpx.HTTPError) as e:
100
+ if attempt == self.max_retries - 1:
101
+ raise e
102
+
103
+ # Apply exponential backoff with jitter
104
+ delay = self.retry_delay * (2 ** attempt) + random.uniform(0, 1)
105
+ time.sleep(delay)
106
+
107
+ # This should not be reached due to the exception in the loop
108
+ raise RuntimeError("Failed to complete request after all retries")
109
+
110
+ async def get_async(
111
+ self,
112
+ url: str,
113
+ params: Optional[Dict[str, Any]] = None,
114
+ headers: Optional[Dict[str, str]] = None,
115
+ timeout: Optional[float] = None,
116
+ ) -> httpx.Response:
117
+ """
118
+ Make an asynchronous GET request with retries.
119
+
120
+ Args:
121
+ url: The URL to request
122
+ params: Optional URL parameters
123
+ headers: Optional headers to add or override
124
+ timeout: Optional timeout override
125
+
126
+ Returns:
127
+ Response object from httpx
128
+ """
129
+ merged_headers = self.headers.copy()
130
+ if headers:
131
+ merged_headers.update(headers)
132
+
133
+ timeout_val = timeout or self.timeout
134
+
135
+ for attempt in range(self.max_retries):
136
+ try:
137
+ response = await self.async_client.get(
138
+ url,
139
+ params=params,
140
+ headers=merged_headers,
141
+ timeout=timeout_val,
142
+ )
143
+ response.raise_for_status()
144
+ return response
145
+ except httpx.HTTPError as e:
146
+ if attempt == self.max_retries - 1:
147
+ raise e
148
+
149
+ # Apply exponential backoff with jitter
150
+ delay = self.retry_delay * (2 ** attempt) + random.uniform(0, 1)
151
+ await asyncio.sleep(delay)
152
+
153
+ # This should not be reached due to the exception in the loop
154
+ raise RuntimeError("Failed to complete request after all retries")
155
+
156
+ def get_soup(
157
+ self,
158
+ url: str,
159
+ params: Optional[Dict[str, Any]] = None,
160
+ headers: Optional[Dict[str, str]] = None,
161
+ parser: str = "html.parser",
162
+ ) -> BeautifulSoup:
163
+ """
164
+ Make a GET request and return a BeautifulSoup object.
165
+
166
+ Args:
167
+ url: The URL to request
168
+ params: Optional URL parameters
169
+ headers: Optional headers to override
170
+ parser: BeautifulSoup parser to use
171
+
172
+ Returns:
173
+ BeautifulSoup object for the response
174
+ """
175
+ response = self.get(url, params=params, headers=headers)
176
+ return BeautifulSoup(response.text, parser)
177
+
178
+ async def get_soup_async(
179
+ self,
180
+ url: str,
181
+ params: Optional[Dict[str, Any]] = None,
182
+ headers: Optional[Dict[str, str]] = None,
183
+ parser: str = "html.parser",
184
+ ) -> BeautifulSoup:
185
+ """
186
+ Make an async GET request and return a BeautifulSoup object.
187
+
188
+ Args:
189
+ url: The URL to request
190
+ params: Optional URL parameters
191
+ headers: Optional headers to override
192
+ parser: BeautifulSoup parser to use
193
+
194
+ Returns:
195
+ BeautifulSoup object for the response
196
+ """
197
+ response = await self.get_async(url, params=params, headers=headers)
198
+ return BeautifulSoup(response.text, parser)
199
+
200
+ async def close(self):
201
+ """Close the async client."""
202
+ await self.async_client.aclose()
203
+
204
+ def __del__(self):
205
+ """Ensure the async client is closed."""
206
+ try:
207
+ if hasattr(self, "async_client"):
208
+ loop = asyncio.get_event_loop()
209
+ if loop.is_running():
210
+ loop.create_task(self.async_client.aclose())
211
+ else:
212
+ loop.run_until_complete(self.async_client.aclose())
213
+ except (ImportError, RuntimeError):
214
+ pass
vulnerability_intelligence_agent/tools/parsers.py ADDED
@@ -0,0 +1,456 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ Parsers for vulnerability databases.
3
+ """
4
+ import re
5
+ import json
6
+ from typing import Dict, List, Any, Optional, Tuple
7
+ from datetime import datetime
8
+ from bs4 import BeautifulSoup, Tag
9
+ from . import utils
10
+
11
+ logger = utils.setup_logger("parsers")
12
+
13
+
14
+ class CVEParser:
15
+ """Parser for CVE database entries."""
16
+
17
+ @staticmethod
18
+ def parse_cve_page(soup: BeautifulSoup, cve_id: str) -> Dict[str, Any]:
19
+ """
20
+ Parse a CVE detail page from cve.mitre.org.
21
+
22
+ Args:
23
+ soup: BeautifulSoup object of the CVE page
24
+ cve_id: CVE ID being parsed
25
+
26
+ Returns:
27
+ Dictionary with parsed vulnerability information
28
+ """
29
+ result = {
30
+ "id": cve_id,
31
+ "source": f"https://cve.mitre.org/cgi-bin/cvename.cgi?name={cve_id}"
32
+ }
33
+
34
+ try:
35
+ # Extract description
36
+ description_div = soup.find("div", {"class": "cvedetails"})
37
+ if description_div:
38
+ desc_content = description_div.get_text(strip=True)
39
+ result["description"] = desc_content
40
+
41
+ # Extract date if available
42
+ date_div = soup.find("th", text=re.compile("Published"))
43
+ if date_div and date_div.find_next_sibling("td"):
44
+ date_text = date_div.find_next_sibling("td").get_text(strip=True)
45
+ try:
46
+ parsed_date = datetime.strptime(date_text, "%m/%d/%Y")
47
+ result["date"] = parsed_date.strftime("%Y-%m-%d")
48
+ except ValueError:
49
+ # If date format is unexpected, include as-is
50
+ result["date"] = date_text
51
+
52
+ # Severity is not typically available directly on CVE pages
53
+ # but might be referenced in the description
54
+ severity_patterns = [
55
+ (r'high severity', 'HIGH'),
56
+ (r'medium severity', 'MEDIUM'),
57
+ (r'low severity', 'LOW'),
58
+ (r'critical severity', 'CRITICAL')
59
+ ]
60
+
61
+ for pattern, severity in severity_patterns:
62
+ if result.get("description") and re.search(pattern, result["description"], re.IGNORECASE):
63
+ result["severity"] = severity
64
+ break
65
+
66
+ if "severity" not in result:
67
+ result["severity"] = "UNKNOWN"
68
+
69
+ return result
70
+
71
+ except Exception as e:
72
+ logger.error(f"Error parsing CVE page for {cve_id}: {str(e)}")
73
+ return {
74
+ "id": cve_id,
75
+ "description": "Error parsing CVE information",
76
+ "severity": "UNKNOWN",
77
+ "source": f"https://cve.mitre.org/cgi-bin/cvename.cgi?name={cve_id}"
78
+ }
79
+
80
+ @staticmethod
81
+ def parse_cve_search_results(soup: BeautifulSoup, software: str, version: str) -> List[Dict[str, Any]]:
82
+ """
83
+ Parse CVE search results for a specific software and version.
84
+
85
+ Args:
86
+ soup: BeautifulSoup object of the search results page
87
+ software: Software name being searched
88
+ version: Software version being searched
89
+
90
+ Returns:
91
+ List of vulnerability dictionaries
92
+ """
93
+ vulnerabilities = []
94
+
95
+ try:
96
+ # Find the main table containing CVEs
97
+ table = soup.find("table", {"id": "cves"})
98
+ if not table:
99
+ logger.warning(f"No CVE table found for {software} {version}")
100
+ return []
101
+
102
+ rows = table.find_all("tr")[1:] # Skip header row
103
+
104
+ for row in rows:
105
+ cols = row.find_all("td")
106
+ if len(cols) >= 2:
107
+ cve_id = cols[0].get_text(strip=True)
108
+ description = cols[1].get_text(strip=True)
109
+
110
+ # Check if the version appears in the description
111
+ if version.lower() in description.lower():
112
+ vuln = {
113
+ "id": cve_id,
114
+ "description": description,
115
+ "severity": "UNKNOWN", # Will need to be determined later
116
+ "source": f"https://cve.mitre.org/cgi-bin/cvename.cgi?name={cve_id}"
117
+ }
118
+ vulnerabilities.append(vuln)
119
+
120
+ return vulnerabilities
121
+
122
+ except Exception as e:
123
+ logger.error(f"Error parsing CVE search results for {software} {version}: {str(e)}")
124
+ return []
125
+
126
+
127
+ class NVDParser:
128
+ """Parser for National Vulnerability Database entries."""
129
+
130
+ @staticmethod
131
+ def parse_nvd_api_response(response_json: Dict[str, Any], software: str, version: str) -> List[Dict[str, Any]]:
132
+ """
133
+ Parse a response from the NVD API.
134
+
135
+ Args:
136
+ response_json: JSON response from NVD API
137
+ software: Software name being searched
138
+ version: Software version being searched
139
+
140
+ Returns:
141
+ List of parsed vulnerabilities
142
+ """
143
+ vulnerabilities = []
144
+
145
+ try:
146
+ results = response_json.get("vulnerabilities", [])
147
+
148
+ for item in results:
149
+ cve = item.get("cve", {})
150
+
151
+ # Extract CVE ID
152
+ cve_id = cve.get("id", "")
153
+
154
+ # Extract description
155
+ descriptions = cve.get("descriptions", [])
156
+ description = ""
157
+ for desc in descriptions:
158
+ if desc.get("lang") == "en":
159
+ description = desc.get("value", "")
160
+ break
161
+
162
+ # Extract metrics for CVSS score
163
+ metrics = cve.get("metrics", {})
164
+ cvss_v3 = metrics.get("cvssMetricV31", [])
165
+ cvss_v2 = metrics.get("cvssMetricV2", [])
166
+
167
+ severity = "UNKNOWN"
168
+ cvss_score = None
169
+
170
+ # Try to get CVSS v3 first, then fallback to v2
171
+ if cvss_v3:
172
+ base_metric = cvss_v3[0].get("cvssData", {})
173
+ cvss_score = base_metric.get("baseScore")
174
+ severity_raw = base_metric.get("baseSeverity", "").upper()
175
+ if severity_raw:
176
+ severity = severity_raw
177
+ elif cvss_v2:
178
+ base_metric = cvss_v2[0].get("cvssData", {})
179
+ cvss_score = base_metric.get("baseScore")
180
+
181
+ # Map CVSS v2 score to severity
182
+ if cvss_score is not None:
183
+ if cvss_score >= 9.0:
184
+ severity = "CRITICAL"
185
+ elif cvss_score >= 7.0:
186
+ severity = "HIGH"
187
+ elif cvss_score >= 4.0:
188
+ severity = "MEDIUM"
189
+ else:
190
+ severity = "LOW"
191
+
192
+ # Extract published date
193
+ published_date = cve.get("published", "")
194
+ if published_date:
195
+ try:
196
+ # NVD dates are in ISO format
197
+ date_obj = datetime.fromisoformat(published_date.replace("Z", "+00:00"))
198
+ published_date = date_obj.strftime("%Y-%m-%d")
199
+ except ValueError:
200
+ pass
201
+
202
+ # Check CPE matches for the specific software and version
203
+ configurations = cve.get("configurations", [])
204
+ matches_software = False
205
+
206
+ for config in configurations:
207
+ nodes = config.get("nodes", [])
208
+ for node in nodes:
209
+ cpe_matches = node.get("cpeMatch", [])
210
+ for cpe_match in cpe_matches:
211
+ cpe_name = cpe_match.get("criteria", "").lower()
212
+
213
+ # Check if the CPE contains the software name and version
214
+ if software.lower() in cpe_name:
215
+ # Direct version match
216
+ if f":{version}:" in cpe_name or f":{version}" in cpe_name:
217
+ matches_software = True
218
+ break
219
+
220
+ # Version range match
221
+ version_start_inclusive = cpe_match.get("versionStartIncluding", "")
222
+ version_start_exclusive = cpe_match.get("versionStartExcluding", "")
223
+ version_end_inclusive = cpe_match.get("versionEndIncluding", "")
224
+ version_end_exclusive = cpe_match.get("versionEndExcluding", "")
225
+
226
+ if any([version_start_inclusive, version_start_exclusive,
227
+ version_end_inclusive, version_end_exclusive]):
228
+ # Convert version to comparable parts
229
+ version_parts = utils.extract_version_parts(version)
230
+
231
+ # Check range conditions
232
+ in_range = True
233
+
234
+ if version_start_inclusive:
235
+ start_parts = utils.extract_version_parts(version_start_inclusive)
236
+ if version_parts < start_parts:
237
+ in_range = False
238
+
239
+ if version_start_exclusive:
240
+ start_parts = utils.extract_version_parts(version_start_exclusive)
241
+ if version_parts <= start_parts:
242
+ in_range = False
243
+
244
+ if version_end_inclusive:
245
+ end_parts = utils.extract_version_parts(version_end_inclusive)
246
+ if version_parts > end_parts:
247
+ in_range = False
248
+
249
+ if version_end_exclusive:
250
+ end_parts = utils.extract_version_parts(version_end_exclusive)
251
+ if version_parts >= end_parts:
252
+ in_range = False
253
+
254
+ if in_range:
255
+ matches_software = True
256
+ break
257
+
258
+ if matches_software:
259
+ break
260
+
261
+ if matches_software:
262
+ break
263
+
264
+ # Only include vulnerabilities that match the software and version
265
+ if matches_software:
266
+ vulnerability = {
267
+ "id": cve_id,
268
+ "description": description,
269
+ "severity": severity,
270
+ "source": f"https://nvd.nist.gov/vuln/detail/{cve_id}"
271
+ }
272
+
273
+ if cvss_score is not None:
274
+ vulnerability["cvss"] = str(cvss_score)
275
+
276
+ if published_date:
277
+ vulnerability["date"] = published_date
278
+
279
+ # Add recommendation based on severity
280
+ if severity in ["CRITICAL", "HIGH"]:
281
+ vulnerability["recommendation"] = f"Update {software} to the latest version immediately"
282
+ elif severity == "MEDIUM":
283
+ vulnerability["recommendation"] = f"Plan to update {software} to the latest version"
284
+ else:
285
+ vulnerability["recommendation"] = f"Consider updating {software} when convenient"
286
+
287
+ vulnerabilities.append(vulnerability)
288
+
289
+ return vulnerabilities
290
+
291
+ except Exception as e:
292
+ logger.error(f"Error parsing NVD API response for {software} {version}: {str(e)}")
293
+ return []
294
+
295
+
296
+ class CISAParser:
297
+ """Parser for CISA Known Exploited Vulnerabilities Catalog."""
298
+
299
+ @staticmethod
300
+ def parse_cisa_kev_data(kev_data: Dict[str, Any], software: str, version: str) -> List[Dict[str, Any]]:
301
+ """
302
+ Parse CISA Known Exploited Vulnerabilities (KEV) catalog data.
303
+
304
+ Args:
305
+ kev_data: KEV catalog data as JSON
306
+ software: Software name to filter for
307
+ version: Software version to filter for
308
+
309
+ Returns:
310
+ List of parsed vulnerabilities
311
+ """
312
+ vulnerabilities = []
313
+
314
+ try:
315
+ if not isinstance(kev_data, dict):
316
+ logger.error(f"Invalid KEV data format: {type(kev_data)}")
317
+ return []
318
+
319
+ catalog_items = kev_data.get("vulnerabilities", [])
320
+
321
+ for item in catalog_items:
322
+ product_name = item.get("product", "").lower()
323
+
324
+ # Check if this vulnerability applies to our software
325
+ normalized_software = utils.normalize_software_name(software)
326
+ if normalized_software not in utils.normalize_software_name(product_name):
327
+ continue
328
+
329
+ # Extract version information, which may be in the vendorProject field
330
+ vendor_project = item.get("vendorProject", "").lower()
331
+ if version.lower() not in vendor_project and version.lower() not in product_name:
332
+ continue
333
+
334
+ cve_id = item.get("cveID", "")
335
+ date_added = item.get("dateAdded", "")
336
+
337
+ # Format the date if available
338
+ formatted_date = ""
339
+ if date_added:
340
+ try:
341
+ date_obj = datetime.strptime(date_added, "%Y-%m-%d")
342
+ formatted_date = date_obj.strftime("%Y-%m-%d")
343
+ except ValueError:
344
+ formatted_date = date_added
345
+
346
+ vulnerability = {
347
+ "id": cve_id,
348
+ "description": item.get("vulnerabilityName", ""),
349
+ "severity": "CRITICAL", # All KEV items are considered critical as they are actively exploited
350
+ "source": "https://www.cisa.gov/known-exploited-vulnerabilities-catalog",
351
+ "cisa_required_action": item.get("requiredAction", ""),
352
+ "cisa_due_date": item.get("dueDate", "")
353
+ }
354
+
355
+ if formatted_date:
356
+ vulnerability["date"] = formatted_date
357
+
358
+ # Add strong recommendation as these are known exploited vulnerabilities
359
+ vulnerability["recommendation"] = f"URGENT: Update {software} immediately as this vulnerability is being actively exploited in the wild"
360
+
361
+ vulnerabilities.append(vulnerability)
362
+
363
+ return vulnerabilities
364
+
365
+ except Exception as e:
366
+ logger.error(f"Error parsing CISA KEV data for {software} {version}: {str(e)}")
367
+ return []
368
+
369
+
370
+ class CWEParser:
371
+ """Parser for Common Weakness Enumeration (CWE) data."""
372
+
373
+ @staticmethod
374
+ def parse_cwe_page(soup: BeautifulSoup, cwe_id: str) -> Dict[str, Any]:
375
+ """
376
+ Parse a CWE detail page.
377
+
378
+ Args:
379
+ soup: BeautifulSoup object of the CWE page
380
+ cwe_id: CWE ID being parsed
381
+
382
+ Returns:
383
+ Dictionary with parsed weakness information
384
+ """
385
+ result = {
386
+ "id": cwe_id,
387
+ "source": f"https://cwe.mitre.org/data/definitions/{cwe_id}.html"
388
+ }
389
+
390
+ try:
391
+ # Extract the name/title
392
+ title_div = soup.find("div", {"id": "title"})
393
+ if title_div:
394
+ result["title"] = title_div.get_text(strip=True).replace(f"{cwe_id}: ", "")
395
+
396
+ # Extract description
397
+ desc_div = soup.find("div", {"id": "description"})
398
+ if desc_div:
399
+ desc_content = desc_div.find("div", {"class": "detail"})
400
+ if desc_content:
401
+ result["description"] = desc_content.get_text(strip=True)
402
+
403
+ # Extract likelihood
404
+ likelihood_div = soup.find("div", {"id": "likelihood"})
405
+ if likelihood_div:
406
+ likelihood_content = likelihood_div.find("div", {"class": "detail"})
407
+ if likelihood_content:
408
+ result["likelihood"] = likelihood_content.get_text(strip=True)
409
+
410
+ # Determine severity based on likelihood or description keywords
411
+ if "likelihood" in result:
412
+ if "high" in result["likelihood"].lower():
413
+ result["severity"] = "HIGH"
414
+ elif "medium" in result["likelihood"].lower():
415
+ result["severity"] = "MEDIUM"
416
+ elif "low" in result["likelihood"].lower():
417
+ result["severity"] = "LOW"
418
+ else:
419
+ result["severity"] = "UNKNOWN"
420
+ else:
421
+ result["severity"] = "UNKNOWN"
422
+
423
+ # Extract mitigation information
424
+ mitigation_div = soup.find("div", {"id": "mitigations"})
425
+ if mitigation_div:
426
+ mitigation_content = mitigation_div.find("div", {"class": "detail"})
427
+ if mitigation_content:
428
+ result["mitigation"] = mitigation_content.get_text(strip=True)
429
+ result["recommendation"] = result["mitigation"]
430
+
431
+ return result
432
+
433
+ except Exception as e:
434
+ logger.error(f"Error parsing CWE page for {cwe_id}: {str(e)}")
435
+ return {
436
+ "id": cwe_id,
437
+ "description": "Error parsing CWE information",
438
+ "severity": "UNKNOWN",
439
+ "source": f"https://cwe.mitre.org/data/definitions/{cwe_id}.html"
440
+ }
441
+
442
+ @staticmethod
443
+ def extract_cwe_from_cve(cve_description: str) -> List[str]:
444
+ """
445
+ Extract CWE IDs from a CVE description.
446
+
447
+ Args:
448
+ cve_description: CVE description text
449
+
450
+ Returns:
451
+ List of CWE IDs
452
+ """
453
+ # Pattern to match CWE references like CWE-79, CWE-89, etc.
454
+ pattern = r'CWE-(\d+)'
455
+ matches = re.findall(pattern, cve_description)
456
+ return [f"CWE-{match}" for match in matches]
vulnerability_intelligence_agent/tools/utils.py ADDED
@@ -0,0 +1,301 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ Utility functions for the Vulnerability Intelligence Agent.
3
+ """
4
+ import json
5
+ import os
6
+ import re
7
+ import logging
8
+ import datetime
9
+ from typing import Dict, List, Any, Optional, Union
10
+
11
+ # Configure logging
12
+ logging.basicConfig(
13
+ level=logging.INFO,
14
+ format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
15
+ )
16
+ logger = logging.getLogger("via")
17
+
18
+
19
+ def setup_logger(name: str, level: int = logging.INFO) -> logging.Logger:
20
+ """
21
+ Set up a logger with the given name and level.
22
+
23
+ Args:
24
+ name: Name of the logger
25
+ level: Logging level
26
+
27
+ Returns:
28
+ Configured logger instance
29
+ """
30
+ logger = logging.getLogger(f"via.{name}")
31
+ logger.setLevel(level)
32
+ return logger
33
+
34
+
35
+ def normalize_software_name(name: str) -> str:
36
+ """
37
+ Normalize a software name to improve matching across databases.
38
+
39
+ Args:
40
+ name: Software name to normalize
41
+
42
+ Returns:
43
+ Normalized software name
44
+ """
45
+ # Convert to lowercase and remove special characters
46
+ normalized = re.sub(r"[^a-z0-9]", "", name.lower())
47
+ return normalized
48
+
49
+
50
+ def normalize_version(version: str) -> str:
51
+ """
52
+ Normalize a version string to improve matching across databases.
53
+
54
+ Args:
55
+ version: Version string to normalize
56
+
57
+ Returns:
58
+ Normalized version string
59
+ """
60
+ # Remove leading 'v' if present
61
+ if version.lower().startswith("v"):
62
+ version = version[1:]
63
+
64
+ # Replace underscores with dots
65
+ version = version.replace("_", ".")
66
+
67
+ # Remove any alphabetic parts (like beta, alpha, etc.)
68
+ version = re.sub(r"[a-zA-Z].*$", "", version)
69
+
70
+ return version.strip()
71
+
72
+
73
+ def save_report(data: Dict[str, Any], filename: str, report_dir: str = "reports") -> str:
74
+ """
75
+ Save a vulnerability report to a file.
76
+
77
+ Args:
78
+ data: Report data to save
79
+ filename: Base filename (without extension)
80
+ report_dir: Directory to save the report in
81
+
82
+ Returns:
83
+ Path to the saved report file
84
+ """
85
+ # Ensure the reports directory exists
86
+ os.makedirs(report_dir, exist_ok=True)
87
+
88
+ # Add timestamp to filename to avoid overwriting
89
+ timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
90
+ json_filename = f"{filename}_{timestamp}.json"
91
+ json_path = os.path.join(report_dir, json_filename)
92
+
93
+ # Save JSON report
94
+ with open(json_path, 'w') as f:
95
+ json.dump(data, f, indent=2)
96
+
97
+ logger.info(f"Report saved to {json_path}")
98
+ return json_path
99
+
100
+
101
+ def generate_markdown_report(data: Dict[str, Any], filename: str, report_dir: str = "reports") -> str:
102
+ """
103
+ Generate a Markdown report from vulnerability data.
104
+
105
+ Args:
106
+ data: Vulnerability data
107
+ filename: Base filename (without extension)
108
+ report_dir: Directory to save the report in
109
+
110
+ Returns:
111
+ Path to the generated Markdown file
112
+ """
113
+ # Ensure the reports directory exists
114
+ os.makedirs(report_dir, exist_ok=True)
115
+
116
+ # Add timestamp to filename to avoid overwriting
117
+ timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
118
+ md_filename = f"{filename}_{timestamp}.md"
119
+ md_path = os.path.join(report_dir, md_filename)
120
+
121
+ with open(md_path, 'w') as f:
122
+ # Write title
123
+ f.write(f"# Vulnerability Report: {data['software']} {data['version']}\n\n")
124
+ f.write(f"*Generated on: {datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}*\n\n")
125
+
126
+ # Write summary
127
+ f.write("## Summary\n\n")
128
+ vuln_count = len(data.get('vulnerabilities', []))
129
+ f.write(f"Found **{vuln_count}** vulnerabilities for {data['software']} {data['version']}.\n\n")
130
+
131
+ # Write vulnerabilities
132
+ if vuln_count > 0:
133
+ f.write("## Vulnerabilities\n\n")
134
+
135
+ for i, vuln in enumerate(data['vulnerabilities'], 1):
136
+ f.write(f"### {i}. {vuln.get('id', 'Unknown ID')}\n\n")
137
+ f.write(f"**Severity:** {vuln.get('severity', 'Unknown')}")
138
+ if 'cvss' in vuln:
139
+ f.write(f" (CVSS: {vuln['cvss']})")
140
+ f.write("\n\n")
141
+
142
+ f.write(f"**Description:** {vuln.get('description', 'No description available.')}\n\n")
143
+
144
+ if 'date' in vuln:
145
+ f.write(f"**Published:** {vuln['date']}\n\n")
146
+
147
+ if 'recommendation' in vuln:
148
+ f.write(f"**Recommendation:** {vuln['recommendation']}\n\n")
149
+
150
+ if 'source' in vuln:
151
+ f.write(f"**Source:** [{vuln['source']}]({vuln['source']})\n\n")
152
+
153
+ f.write("---\n\n")
154
+ else:
155
+ f.write("## No vulnerabilities found\n\n")
156
+ f.write("No known vulnerabilities were found for this software and version.\n\n")
157
+
158
+ # Write footer
159
+ f.write("## References\n\n")
160
+ f.write("- [CVE (Common Vulnerabilities and Exposures)](https://cve.mitre.org/)\n")
161
+ f.write("- [NVD (National Vulnerability Database)](https://nvd.nist.gov/)\n")
162
+ f.write("- [CISA Known Exploited Vulnerabilities Catalog](https://www.cisa.gov/known-exploited-vulnerabilities-catalog)\n")
163
+ f.write("- [CWE (Common Weakness Enumeration)](https://cwe.mitre.org/)\n")
164
+
165
+ logger.info(f"Markdown report saved to {md_path}")
166
+ return md_path
167
+
168
+
169
+ def merge_vulnerability_data(data_list: List[Dict[str, Any]]) -> Dict[str, Any]:
170
+ """
171
+ Merge vulnerability data from multiple sources, removing duplicates.
172
+
173
+ Args:
174
+ data_list: List of vulnerability data dictionaries
175
+
176
+ Returns:
177
+ Merged vulnerability data
178
+ """
179
+ if not data_list:
180
+ return {}
181
+
182
+ # Start with the first item
183
+ result = data_list[0].copy()
184
+ result['vulnerabilities'] = result.get('vulnerabilities', []).copy()
185
+
186
+ # Track vulnerability IDs to avoid duplicates
187
+ vuln_ids = {v.get('id'): True for v in result['vulnerabilities']}
188
+
189
+ # Merge additional data
190
+ for data in data_list[1:]:
191
+ if data.get('vulnerabilities'):
192
+ for vuln in data['vulnerabilities']:
193
+ vuln_id = vuln.get('id')
194
+ if vuln_id and vuln_id not in vuln_ids:
195
+ result['vulnerabilities'].append(vuln)
196
+ vuln_ids[vuln_id] = True
197
+
198
+ # Sort vulnerabilities by severity (if available)
199
+ severity_order = {
200
+ "CRITICAL": 0,
201
+ "HIGH": 1,
202
+ "MEDIUM": 2,
203
+ "LOW": 3,
204
+ "UNKNOWN": 4,
205
+ }
206
+
207
+ result['vulnerabilities'].sort(
208
+ key=lambda v: severity_order.get(v.get('severity', '').upper(), 999)
209
+ )
210
+
211
+ return result
212
+
213
+
214
+ def extract_version_parts(version: str) -> List[int]:
215
+ """
216
+ Extract version numbers into a list of integers for comparison.
217
+
218
+ Args:
219
+ version: Version string (e.g., "1.2.3")
220
+
221
+ Returns:
222
+ List of integer version parts
223
+ """
224
+ # Normalize version and extract numeric parts
225
+ norm_version = normalize_version(version)
226
+ return [int(part) for part in re.findall(r'\d+', norm_version)]
227
+
228
+
229
+ def is_version_in_range(version: str, min_version: str, max_version: str) -> bool:
230
+ """
231
+ Check if a version is within a specified range.
232
+
233
+ Args:
234
+ version: Version to check
235
+ min_version: Minimum version (inclusive)
236
+ max_version: Maximum version (inclusive)
237
+
238
+ Returns:
239
+ True if version is in range, False otherwise
240
+ """
241
+ version_parts = extract_version_parts(version)
242
+ min_parts = extract_version_parts(min_version)
243
+ max_parts = extract_version_parts(max_version)
244
+
245
+ # Extend parts with zeros to ensure equal length
246
+ max_length = max(len(version_parts), len(min_parts), len(max_parts))
247
+ version_parts.extend([0] * (max_length - len(version_parts)))
248
+ min_parts.extend([0] * (max_length - len(min_parts)))
249
+ max_parts.extend([0] * (max_length - len(max_parts)))
250
+
251
+ # Check if version is in range
252
+ return min_parts <= version_parts <= max_parts
253
+
254
+
255
+ def is_version_affected(version: str, affected_versions: str) -> bool:
256
+ """
257
+ Check if a version is affected by a vulnerability based on version string.
258
+
259
+ Args:
260
+ version: Version to check
261
+ affected_versions: Description of affected versions (e.g., "< 1.2.3", ">= 2.0")
262
+
263
+ Returns:
264
+ True if version is affected, False otherwise
265
+ """
266
+ version_parts = extract_version_parts(version)
267
+
268
+ # Handle different version patterns
269
+ if "<=" in affected_versions:
270
+ max_version = affected_versions.split("<=")[1].strip()
271
+ max_parts = extract_version_parts(max_version)
272
+ max_parts.extend([0] * (len(version_parts) - len(max_parts)))
273
+ return version_parts <= max_parts
274
+
275
+ elif ">=" in affected_versions:
276
+ min_version = affected_versions.split(">=")[1].strip()
277
+ min_parts = extract_version_parts(min_version)
278
+ min_parts.extend([0] * (len(version_parts) - len(min_parts)))
279
+ return version_parts >= min_parts
280
+
281
+ elif "<" in affected_versions:
282
+ max_version = affected_versions.split("<")[1].strip()
283
+ max_parts = extract_version_parts(max_version)
284
+ max_parts.extend([0] * (len(version_parts) - len(max_parts)))
285
+ return version_parts < max_parts
286
+
287
+ elif ">" in affected_versions:
288
+ min_version = affected_versions.split(">")[1].strip()
289
+ min_parts = extract_version_parts(min_version)
290
+ min_parts.extend([0] * (len(version_parts) - len(min_parts)))
291
+ return version_parts > min_parts
292
+
293
+ elif "-" in affected_versions:
294
+ # Handle range: "1.0.0 - 2.0.0"
295
+ parts = affected_versions.split("-")
296
+ min_version = parts[0].strip()
297
+ max_version = parts[1].strip()
298
+ return is_version_in_range(version, min_version, max_version)
299
+
300
+ # Direct comparison
301
+ return normalize_version(version) == normalize_version(affected_versions)