daqc commited on
Commit
a28b847
·
1 Parent(s): 39c669b

Update Config

Browse files
coordinator_agent.py DELETED
@@ -1,23 +0,0 @@
1
- """
2
- Coordinator Agent module for vulnerability intelligence.
3
- This agent is responsible for coordinating the other agents and generating the final report.
4
- """
5
- import json
6
- import time
7
- import argparse
8
- from typing import Dict, List, Any, Optional, Union
9
-
10
- from smolagents import tool
11
- from ..tools import utils
12
- from ..tools.parsers import CWEParser
13
- from . import cve_agent, nvd_agent, cisa_agent, cwe_agent
14
-
15
- logger = utils.setup_logger("coordinator_agent")
16
-
17
- # Enhance with CWE information
18
- for vuln in merged_results.get("vulnerabilities", []):
19
- if "description" in vuln:
20
- # Try to extract CWEs from the description
21
- cwe_ids = CWEParser.extract_cwe_from_cve(vuln["description"])
22
- if cwe_ids:
23
- # ... rest of the code ...
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
requirements.txt CHANGED
@@ -8,4 +8,5 @@ rich
8
  pyyaml
9
  gradio
10
  pandas
11
- duckduckgo_search
 
 
8
  pyyaml
9
  gradio
10
  pandas
11
+ duckduckgo_search
12
+ pymisp
tools/__pycache__/final_answer.cpython-310.pyc CHANGED
Binary files a/tools/__pycache__/final_answer.cpython-310.pyc and b/tools/__pycache__/final_answer.cpython-310.pyc differ
 
tools/__pycache__/visit_webpage.cpython-310.pyc CHANGED
Binary files a/tools/__pycache__/visit_webpage.cpython-310.pyc and b/tools/__pycache__/visit_webpage.cpython-310.pyc differ
 
tools/__pycache__/vuln_search.cpython-310.pyc CHANGED
Binary files a/tools/__pycache__/vuln_search.cpython-310.pyc and b/tools/__pycache__/vuln_search.cpython-310.pyc differ
 
tools/__pycache__/web_search.cpython-310.pyc CHANGED
Binary files a/tools/__pycache__/web_search.cpython-310.pyc and b/tools/__pycache__/web_search.cpython-310.pyc differ
 
tools/vuln_search.py CHANGED
@@ -3,22 +3,44 @@ from typing import Dict, Any, Optional, List
3
  import nvdlib
4
  from smolagents.tools import Tool
5
 
 
 
 
 
 
 
 
6
  class VulnerabilitySearchTool(Tool):
7
  name = "vuln_search"
8
- description = "Search for vulnerabilities in NVD (National Vulnerability Database)"
9
  inputs = {
10
  'query': {'type': 'str', 'description': 'Search term or CVE ID'},
11
- 'max_results': {'type': 'int', 'description': 'Maximum number of results', 'default': 5}
 
12
  }
13
  output_type = Dict[str, Any]
14
 
15
  def __init__(self):
16
- """Initialize NVD API connection"""
 
17
  self.nvd_api_key = os.getenv('NVD_API_KEY')
18
-
19
- # Configure NVD
20
  if self.nvd_api_key:
21
  nvdlib.set_api_key(self.nvd_api_key)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
22
 
23
  def search_nvd(self, query: str, max_results: int) -> List[Dict[str, Any]]:
24
  """Search vulnerabilities in NVD"""
@@ -51,12 +73,56 @@ class VulnerabilitySearchTool(Tool):
51
  except Exception as e:
52
  return [{'error': f"Error in NVD search: {str(e)}"}]
53
 
54
- def forward(self, query: str, max_results: int = 5) -> Dict[str, Any]:
55
- """Process search in NVD"""
56
- results = self.search_nvd(query, max_results)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
57
 
58
  return {
59
  'query': query,
60
- 'source': 'nvd',
 
61
  'results': results
62
  }
 
3
  import nvdlib
4
  from smolagents.tools import Tool
5
 
6
+ # Optional MISP import
7
+ try:
8
+ from pymisp import PyMISP
9
+ MISP_AVAILABLE = True
10
+ except ImportError:
11
+ MISP_AVAILABLE = False
12
+
13
  class VulnerabilitySearchTool(Tool):
14
  name = "vuln_search"
15
+ description = "Search for vulnerabilities in security databases (NVD, MISP if configured)"
16
  inputs = {
17
  'query': {'type': 'str', 'description': 'Search term or CVE ID'},
18
+ 'sources': {'type': 'list', 'description': 'List of sources to query (nvd, misp)', 'default': ['nvd']},
19
+ 'max_results': {'type': 'int', 'description': 'Maximum number of results per source', 'default': 5}
20
  }
21
  output_type = Dict[str, Any]
22
 
23
  def __init__(self):
24
+ """Initialize API connections"""
25
+ # NVD Configuration
26
  self.nvd_api_key = os.getenv('NVD_API_KEY')
 
 
27
  if self.nvd_api_key:
28
  nvdlib.set_api_key(self.nvd_api_key)
29
+
30
+ # MISP Configuration (optional)
31
+ self.misp = None
32
+ self.misp_available = False
33
+
34
+ if MISP_AVAILABLE:
35
+ misp_url = os.getenv('MISP_URL')
36
+ misp_key = os.getenv('MISP_KEY')
37
+
38
+ if misp_url and misp_key:
39
+ try:
40
+ self.misp = PyMISP(misp_url, misp_key, ssl=False)
41
+ self.misp_available = True
42
+ except Exception as e:
43
+ print(f"MISP initialization failed: {e}")
44
 
45
  def search_nvd(self, query: str, max_results: int) -> List[Dict[str, Any]]:
46
  """Search vulnerabilities in NVD"""
 
73
  except Exception as e:
74
  return [{'error': f"Error in NVD search: {str(e)}"}]
75
 
76
+ def search_misp(self, query: str, max_results: int) -> List[Dict[str, Any]]:
77
+ """Search vulnerabilities in MISP (if available)"""
78
+ if not self.misp_available:
79
+ return [{'error': 'MISP is not configured or not available'}]
80
+
81
+ try:
82
+ # Search related events
83
+ results = self.misp.search(
84
+ controller='events',
85
+ value=query,
86
+ limit=max_results,
87
+ pythonify=True
88
+ )
89
+
90
+ return [{
91
+ 'id': event.uuid,
92
+ 'info': event.info,
93
+ 'analysis': event.analysis,
94
+ 'threat_level': event.threat_level_id,
95
+ 'date': event.date,
96
+ 'attributes': [
97
+ {'type': attr.type, 'value': attr.value}
98
+ for attr in event.attributes
99
+ ]
100
+ } for event in results]
101
+
102
+ except Exception as e:
103
+ return [{'error': f"Error in MISP search: {str(e)}"}]
104
+
105
+ def forward(self, query: str, sources: List[str] = ['nvd'], max_results: int = 5) -> Dict[str, Any]:
106
+ """Process search across available sources"""
107
+ results = {}
108
+ available_sources = ['nvd']
109
+
110
+ # Add MISP to available sources if configured
111
+ if self.misp_available:
112
+ available_sources.append('misp')
113
+
114
+ # Filter requested sources to only use available ones
115
+ active_sources = [s for s in sources if s in available_sources]
116
+
117
+ if 'nvd' in active_sources:
118
+ results['nvd'] = self.search_nvd(query, max_results)
119
+
120
+ if 'misp' in active_sources and self.misp_available:
121
+ results['misp'] = self.search_misp(query, max_results)
122
 
123
  return {
124
  'query': query,
125
+ 'sources': active_sources,
126
+ 'available_sources': available_sources,
127
  'results': results
128
  }
vulnerability_intelligence_agent/README.md DELETED
@@ -1,90 +0,0 @@
1
- # Vulnerability Intelligence Agent (VIA)
2
-
3
- Vulnerability Intelligence Agent (VIA) es un agente inteligente y modular basado en smolagents, capaz de buscar y reportar vulnerabilidades de software y sistemas operativos desde fuentes oficiales, mediante scraping y análisis web. Modular, extensible y diseñado para integrarse a pipelines de seguridad y análisis.
4
-
5
- ## Características
6
-
7
- - Búsqueda de vulnerabilidades en múltiples fuentes oficiales mediante web scraping/parsing
8
- - Arquitectura modular: un agente por fuente
9
- - Sistema de coordinación eficiente de agentes
10
- - Generación de reportes automáticos legibles y exportables (JSON/Markdown)
11
- - Diseño extensible para futuras integraciones con APIs
12
-
13
- ## Fuentes soportadas
14
-
15
- - CVE (Common Vulnerabilities and Exposures)
16
- - CISA (Cybersecurity & Infrastructure Security Agency)
17
- - CWE (Common Weakness Enumeration)
18
- - NVD (National Vulnerability Database)
19
-
20
- ## Instalación
21
-
22
- ```bash
23
- git clone <repository-url>
24
- cd vulnerability_intelligence_agent
25
- pip install -r requirements.txt
26
- ```
27
-
28
- ## Uso
29
-
30
- ```bash
31
- python main.py --input input.json
32
- ```
33
-
34
- Ejemplo de archivo input.json:
35
- ```json
36
- [
37
- { "name": "OpenSSL", "version": "1.1.1k" },
38
- { "name": "Apache", "version": "2.4.54" }
39
- ]
40
- ```
41
-
42
- ## Formato de salida
43
-
44
- El sistema genera reportes en formato JSON y Markdown con información detallada sobre las vulnerabilidades encontradas:
45
-
46
- ```json
47
- {
48
- "software": "OpenSSL",
49
- "version": "1.1.1k",
50
- "vulnerabilities": [
51
- {
52
- "id": "CVE-2021-3450",
53
- "description": "Improper Certificate Validation vulnerability...",
54
- "severity": "HIGH",
55
- "cvss": "7.4",
56
- "source": "https://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-2021-3450",
57
- "date": "2021-03-25",
58
- "recommendation": "Update to version 1.1.1l"
59
- }
60
- ]
61
- }
62
- ```
63
-
64
- ## Estructura del proyecto
65
-
66
- ```
67
- vulnerability_intelligence_agent/
68
- ├── agents/ # Subagentes que buscan en cada fuente específica
69
- │ ├── cve_agent.py
70
- │ ├── cisa_agent.py
71
- │ ├── cwe_agent.py
72
- │ ├── nvd_agent.py
73
- │ └── coordinator_agent.py # Agente principal que coordina a los demás
74
- ├── tools/ # Herramientas genéricas para parsing, http, utils
75
- │ ├── http_client.py
76
- │ ├── parsers.py
77
- │ └── utils.py
78
- ├── reports/ # Carpeta para almacenar reportes generados
79
- ├── main.py # Ejecución principal del agente
80
- ├── README.md # Documentación inicial
81
- └── requirements.txt # Librerías necesarias
82
- ```
83
-
84
- ## Licencia
85
-
86
- MIT
87
-
88
- ## Contribuciones
89
-
90
- Las contribuciones son bienvenidas. Por favor, abra un issue o un pull request para sugerencias y mejoras.
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
vulnerability_intelligence_agent/__init__.py DELETED
@@ -1,6 +0,0 @@
1
- """
2
- Vulnerability Intelligence Agent (VIA).
3
- An intelligent and modular agent for searching and reporting software vulnerabilities from official sources.
4
- """
5
-
6
- __version__ = "0.1.0"
 
 
 
 
 
 
 
vulnerability_intelligence_agent/agents/__init__.py DELETED
@@ -1,10 +0,0 @@
1
- """
2
- Vulnerability Intelligence Agent (VIA) - Agents Package.
3
- This package contains the agent implementations for different vulnerability sources.
4
- """
5
-
6
- from . import cve_agent
7
- from . import nvd_agent
8
- from . import cisa_agent
9
- from . import cwe_agent
10
- from . import coordinator_agent
 
 
 
 
 
 
 
 
 
 
 
vulnerability_intelligence_agent/agents/cisa_agent.py DELETED
@@ -1,154 +0,0 @@
1
- """
2
- CISA Agent module for vulnerability intelligence.
3
- This agent is responsible for querying the CISA Known Exploited Vulnerabilities (KEV) Catalog.
4
- """
5
- import json
6
- import time
7
- from typing import Dict, List, Any, Optional
8
-
9
- from smolagents import tool
10
- from ..tools.http_client import HTTPClient
11
- from ..tools.parsers import CISAParser
12
- from ..tools import utils
13
-
14
- logger = utils.setup_logger("cisa_agent")
15
-
16
-
17
- @tool
18
- def search_cisa_kev_for_software(software: str, version: str) -> Dict[str, Any]:
19
- """
20
- Search the CISA Known Exploited Vulnerabilities (KEV) Catalog for vulnerabilities related to a specific software and version.
21
-
22
- Args:
23
- software: Name of the software to search for
24
- version: Version of the software to search for
25
-
26
- Returns:
27
- Dictionary with vulnerability information for the software and version
28
- """
29
- logger.info(f"Searching CISA KEV for {software} version {version}")
30
-
31
- result = {
32
- "software": software,
33
- "version": version,
34
- "vulnerabilities": []
35
- }
36
-
37
- http_client = HTTPClient()
38
-
39
- try:
40
- # CISA provides the KEV catalog as a JSON file
41
- kev_url = "https://www.cisa.gov/sites/default/files/feeds/known_exploited_vulnerabilities.json"
42
-
43
- # Get the KEV catalog
44
- response = http_client.get(kev_url)
45
- kev_data = response.json()
46
-
47
- # Parse the KEV data
48
- vulnerabilities = CISAParser.parse_cisa_kev_data(kev_data, software, version)
49
-
50
- # Add the vulnerabilities to the result
51
- result["vulnerabilities"] = vulnerabilities
52
-
53
- logger.info(f"Found {len(vulnerabilities)} CISA KEV vulnerabilities for {software} {version}")
54
- return result
55
-
56
- except Exception as e:
57
- logger.error(f"Error searching CISA KEV for {software} {version}: {str(e)}")
58
- return {
59
- "software": software,
60
- "version": version,
61
- "vulnerabilities": [],
62
- "error": str(e)
63
- }
64
-
65
-
66
- @tool
67
- def get_all_cisa_kev_vulnerabilities() -> Dict[str, Any]:
68
- """
69
- Get all vulnerabilities from the CISA Known Exploited Vulnerabilities (KEV) Catalog.
70
-
71
- Returns:
72
- Dictionary with all vulnerabilities from the KEV catalog
73
- """
74
- logger.info("Getting all CISA KEV vulnerabilities")
75
-
76
- http_client = HTTPClient()
77
-
78
- try:
79
- # CISA provides the KEV catalog as a JSON file
80
- kev_url = "https://www.cisa.gov/sites/default/files/feeds/known_exploited_vulnerabilities.json"
81
-
82
- # Get the KEV catalog
83
- response = http_client.get(kev_url)
84
- kev_data = response.json()
85
-
86
- # Extract catalog metadata
87
- result = {
88
- "title": kev_data.get("title", "CISA Known Exploited Vulnerabilities Catalog"),
89
- "catalogVersion": kev_data.get("catalogVersion", ""),
90
- "dateReleased": kev_data.get("dateReleased", ""),
91
- "count": len(kev_data.get("vulnerabilities", [])),
92
- "vulnerabilities": kev_data.get("vulnerabilities", [])
93
- }
94
-
95
- logger.info(f"Found {result['count']} total CISA KEV vulnerabilities")
96
- return result
97
-
98
- except Exception as e:
99
- logger.error(f"Error getting all CISA KEV vulnerabilities: {str(e)}")
100
- return {
101
- "error": str(e),
102
- "vulnerabilities": []
103
- }
104
-
105
-
106
- @tool
107
- def get_cisa_kev_vulnerability(cve_id: str) -> Dict[str, Any]:
108
- """
109
- Get details about a specific vulnerability from the CISA KEV Catalog by CVE ID.
110
-
111
- Args:
112
- cve_id: CVE ID to look up (e.g., "CVE-2021-44228")
113
-
114
- Returns:
115
- Dictionary with vulnerability details if found
116
- """
117
- logger.info(f"Looking up CISA KEV vulnerability for {cve_id}")
118
-
119
- http_client = HTTPClient()
120
-
121
- try:
122
- # CISA provides the KEV catalog as a JSON file
123
- kev_url = "https://www.cisa.gov/sites/default/files/feeds/known_exploited_vulnerabilities.json"
124
-
125
- # Get the KEV catalog
126
- response = http_client.get(kev_url)
127
- kev_data = response.json()
128
-
129
- # Find the specific vulnerability by CVE ID
130
- vulnerabilities = kev_data.get("vulnerabilities", [])
131
- for vuln in vulnerabilities:
132
- if vuln.get("cveID") == cve_id:
133
- # Enhance the vulnerability data with a source URL and severity level
134
- vuln["source"] = "https://www.cisa.gov/known-exploited-vulnerabilities-catalog"
135
- vuln["severity"] = "CRITICAL" # All KEV items are considered critical
136
-
137
- # Add a standardized recommendation
138
- vuln["recommendation"] = f"URGENT: Update immediately as this vulnerability is being actively exploited in the wild"
139
-
140
- return vuln
141
-
142
- # If we get here, the vulnerability wasn't found
143
- logger.warning(f"CVE {cve_id} not found in CISA KEV catalog")
144
- return {
145
- "id": cve_id,
146
- "error": f"CVE {cve_id} not found in CISA KEV catalog"
147
- }
148
-
149
- except Exception as e:
150
- logger.error(f"Error looking up CISA KEV vulnerability for {cve_id}: {str(e)}")
151
- return {
152
- "id": cve_id,
153
- "error": str(e)
154
- }
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
vulnerability_intelligence_agent/agents/coordinator_agent.py DELETED
@@ -1,191 +0,0 @@
1
- """
2
- Coordinator Agent module for vulnerability intelligence.
3
- This agent is responsible for coordinating the other agents and generating the final report.
4
- """
5
- import json
6
- import time
7
- import argparse
8
- from typing import Dict, List, Any, Optional, Union
9
-
10
- from smolagents import tool
11
- from ..tools import utils
12
- from . import cve_agent, nvd_agent, cisa_agent, cwe_agent
13
-
14
- logger = utils.setup_logger("coordinator_agent")
15
-
16
-
17
- @tool
18
- def search_vulnerabilities_for_software(software: str, version: str) -> Dict[str, Any]:
19
- """
20
- Search for vulnerabilities related to a specific software and version across all sources.
21
-
22
- Args:
23
- software: Name of the software to search for
24
- version: Version of the software to search for
25
-
26
- Returns:
27
- Dictionary with vulnerability information for the software and version from all sources
28
- """
29
- logger.info(f"Searching for vulnerabilities for {software} version {version}")
30
-
31
- # Initialize results from each source
32
- results = []
33
-
34
- # Search NVD
35
- logger.info("Searching NVD...")
36
- nvd_results = nvd_agent.search_nvd_for_software(software, version)
37
- if nvd_results.get("vulnerabilities"):
38
- logger.info(f"Found {len(nvd_results['vulnerabilities'])} vulnerabilities in NVD")
39
- results.append(nvd_results)
40
-
41
- # Search CVE
42
- logger.info("Searching CVE...")
43
- cve_results = cve_agent.search_cve_for_software(software, version)
44
- if cve_results.get("vulnerabilities"):
45
- logger.info(f"Found {len(cve_results['vulnerabilities'])} vulnerabilities in CVE")
46
- results.append(cve_results)
47
-
48
- # Search CISA KEV
49
- logger.info("Searching CISA KEV...")
50
- cisa_results = cisa_agent.search_cisa_kev_for_software(software, version)
51
- if cisa_results.get("vulnerabilities"):
52
- logger.info(f"Found {len(cisa_results['vulnerabilities'])} vulnerabilities in CISA KEV")
53
- results.append(cisa_results)
54
-
55
- # Merge the results
56
- merged_results = utils.merge_vulnerability_data(results)
57
-
58
- # Enhance with CWE information
59
- for vuln in merged_results.get("vulnerabilities", []):
60
- if "description" in vuln:
61
- # Try to extract CWEs from the description
62
- cwe_ids = cwe_agent.CWEParser.extract_cwe_from_cve(vuln["description"])
63
- if cwe_ids:
64
- cwe_details = []
65
- for cwe_id in cwe_ids[:3]: # Limit to 3 CWEs to avoid too many requests
66
- cwe_detail = cwe_agent.get_cwe_details(cwe_id)
67
- if "error" not in cwe_detail:
68
- cwe_details.append(cwe_detail)
69
- time.sleep(1) # Add a short delay between CWE lookups
70
-
71
- if cwe_details:
72
- vuln["related_cwe"] = cwe_details
73
-
74
- # Generate report
75
- if merged_results.get("vulnerabilities"):
76
- report_filename = f"{software.lower().replace(' ', '_')}_{version}"
77
- utils.save_report(merged_results, report_filename)
78
- utils.generate_markdown_report(merged_results, report_filename)
79
-
80
- return merged_results
81
-
82
-
83
- @tool
84
- def search_vulnerabilities_for_multiple_software(software_list: List[Dict[str, str]]) -> List[Dict[str, Any]]:
85
- """
86
- Search for vulnerabilities for multiple software and versions.
87
-
88
- Args:
89
- software_list: List of dictionaries, each with 'name' and 'version' keys
90
-
91
- Returns:
92
- List of dictionaries with vulnerability information for each software
93
- """
94
- logger.info(f"Searching vulnerabilities for {len(software_list)} software items")
95
-
96
- results = []
97
-
98
- for item in software_list:
99
- software = item.get("name")
100
- version = item.get("version")
101
-
102
- if not software or not version:
103
- logger.warning(f"Skipping invalid software item: {item}")
104
- continue
105
-
106
- logger.info(f"Processing {software} {version}")
107
-
108
- # Search for vulnerabilities
109
- result = search_vulnerabilities_for_software(software, version)
110
- results.append(result)
111
-
112
- # Add a short delay between software items to avoid hitting rate limits
113
- if item != software_list[-1]: # Skip delay for the last item
114
- time.sleep(2)
115
-
116
- return results
117
-
118
-
119
- @tool
120
- def get_vulnerability_details(vulnerability_id: str) -> Dict[str, Any]:
121
- """
122
- Get detailed information about a specific vulnerability by ID (CVE or CWE).
123
-
124
- Args:
125
- vulnerability_id: ID of the vulnerability (e.g., CVE-2021-44228, CWE-79)
126
-
127
- Returns:
128
- Dictionary with detailed information about the vulnerability
129
- """
130
- logger.info(f"Getting details for vulnerability: {vulnerability_id}")
131
-
132
- if vulnerability_id.startswith("CVE-"):
133
- # Try to get info from NVD first
134
- nvd_details = nvd_agent.get_nvd_cve_details(vulnerability_id)
135
- if "error" not in nvd_details:
136
- # Enrich with CISA KEV information if available
137
- cisa_details = cisa_agent.get_cisa_kev_vulnerability(vulnerability_id)
138
- if "error" not in cisa_details:
139
- nvd_details["cisa_kev"] = True
140
- nvd_details["cisa_required_action"] = cisa_details.get("requiredAction")
141
- nvd_details["cisa_due_date"] = cisa_details.get("dueDate")
142
- nvd_details["severity"] = "CRITICAL" # Override severity for KEV vulnerabilities
143
- nvd_details["recommendation"] = "URGENT: Update immediately as this vulnerability is being actively exploited in the wild"
144
-
145
- # Try to extract CWEs from the description
146
- if "description" in nvd_details:
147
- cwe_details = cwe_agent.extract_cwes_from_cve(nvd_details["description"])
148
- if cwe_details:
149
- nvd_details["related_cwe"] = cwe_details
150
-
151
- return nvd_details
152
-
153
- # Fallback to CVE database
154
- return cve_agent.get_cve_details(vulnerability_id)
155
-
156
- elif vulnerability_id.startswith("CWE-") or vulnerability_id.isdigit():
157
- return cwe_agent.get_cwe_details(vulnerability_id)
158
-
159
- else:
160
- return {
161
- "id": vulnerability_id,
162
- "error": "Unknown vulnerability ID format. Should start with CVE- or CWE-."
163
- }
164
-
165
-
166
- @tool
167
- def process_input_file(input_file: str) -> List[Dict[str, Any]]:
168
- """
169
- Process an input file containing a list of software to check for vulnerabilities.
170
-
171
- Args:
172
- input_file: Path to the input file (JSON format)
173
-
174
- Returns:
175
- List of dictionaries with vulnerability information for each software
176
- """
177
- logger.info(f"Processing input file: {input_file}")
178
-
179
- try:
180
- with open(input_file, 'r') as f:
181
- software_list = json.load(f)
182
-
183
- if not isinstance(software_list, list):
184
- raise ValueError("Input file should contain a JSON array of software items")
185
-
186
- # Process each software item
187
- return search_vulnerabilities_for_multiple_software(software_list)
188
-
189
- except Exception as e:
190
- logger.error(f"Error processing input file {input_file}: {str(e)}")
191
- return [{"error": str(e)}]
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
vulnerability_intelligence_agent/agents/cve_agent.py DELETED
@@ -1,170 +0,0 @@
1
- """
2
- CVE Agent module for vulnerability intelligence.
3
- This agent is responsible for querying the CVE database.
4
- """
5
- import re
6
- import time
7
- from typing import Dict, List, Any, Optional
8
-
9
- from smolagents import tool
10
- from ..tools.http_client import HTTPClient
11
- from ..tools.parsers import CVEParser
12
- from ..tools import utils
13
-
14
- logger = utils.setup_logger("cve_agent")
15
-
16
-
17
- @tool
18
- def search_cve_for_software(software: str, version: str) -> Dict[str, Any]:
19
- """
20
- Search for CVEs related to a specific software and version.
21
-
22
- Args:
23
- software: Name of the software to search for
24
- version: Version of the software to search for
25
-
26
- Returns:
27
- Dictionary with vulnerability information for the software and version
28
- """
29
- logger.info(f"Searching CVE for {software} version {version}")
30
-
31
- result = {
32
- "software": software,
33
- "version": version,
34
- "vulnerabilities": []
35
- }
36
-
37
- http_client = HTTPClient()
38
-
39
- try:
40
- # First, search for CVEs by software name and version
41
- search_url = "https://cve.mitre.org/cgi-bin/cvekey.cgi"
42
- search_term = f"{software} {version}"
43
-
44
- # Get the search results page
45
- soup = http_client.get_soup(search_url, params={"keyword": search_term})
46
-
47
- # Parse the search results to get a list of relevant CVEs
48
- vulnerabilities = CVEParser.parse_cve_search_results(soup, software, version)
49
-
50
- # If we find any vulnerabilities, get more details for each one
51
- if vulnerabilities:
52
- for i, vuln in enumerate(vulnerabilities):
53
- cve_id = vuln["id"]
54
- # Get the CVE detail page
55
- detail_url = f"https://cve.mitre.org/cgi-bin/cvename.cgi?name={cve_id}"
56
- detail_soup = http_client.get_soup(detail_url)
57
-
58
- # Parse the detail page to get more information
59
- detailed_vuln = CVEParser.parse_cve_page(detail_soup, cve_id)
60
-
61
- # Update with any new details
62
- for key, value in detailed_vuln.items():
63
- if key != "id": # Keep the original ID
64
- vuln[key] = value
65
-
66
- # Add recommendation based on severity if not already present
67
- if "recommendation" not in vuln:
68
- severity = vuln.get("severity", "UNKNOWN")
69
- if severity == "CRITICAL" or severity == "HIGH":
70
- vuln["recommendation"] = f"Update {software} to a version newer than {version} immediately"
71
- elif severity == "MEDIUM":
72
- vuln["recommendation"] = f"Plan to update {software} to a version newer than {version}"
73
- else:
74
- vuln["recommendation"] = f"Consider updating {software} when convenient"
75
-
76
- # Add a short delay to avoid hitting rate limits
77
- if i < len(vulnerabilities) - 1:
78
- time.sleep(1)
79
-
80
- result["vulnerabilities"] = vulnerabilities
81
-
82
- # Additionally, try searching with the software name only to catch more generic vulnerabilities
83
- if len(vulnerabilities) < 5:
84
- broader_soup = http_client.get_soup(search_url, params={"keyword": software})
85
- broader_vulnerabilities = CVEParser.parse_cve_search_results(broader_soup, software, version)
86
-
87
- # Filter out any duplicates by ID
88
- existing_ids = {v["id"] for v in vulnerabilities}
89
- unique_broader = [v for v in broader_vulnerabilities if v["id"] not in existing_ids]
90
-
91
- # Get details for each new vulnerability
92
- for i, vuln in enumerate(unique_broader):
93
- cve_id = vuln["id"]
94
- detail_url = f"https://cve.mitre.org/cgi-bin/cvename.cgi?name={cve_id}"
95
- detail_soup = http_client.get_soup(detail_url)
96
-
97
- detailed_vuln = CVEParser.parse_cve_page(detail_soup, cve_id)
98
-
99
- for key, value in detailed_vuln.items():
100
- if key != "id":
101
- vuln[key] = value
102
-
103
- # Add recommendation if not already present
104
- if "recommendation" not in vuln:
105
- severity = vuln.get("severity", "UNKNOWN")
106
- if severity == "CRITICAL" or severity == "HIGH":
107
- vuln["recommendation"] = f"Update {software} to a version newer than {version} immediately"
108
- elif severity == "MEDIUM":
109
- vuln["recommendation"] = f"Plan to update {software} to a version newer than {version}"
110
- else:
111
- vuln["recommendation"] = f"Consider updating {software} when convenient"
112
-
113
- # Add a short delay to avoid hitting rate limits
114
- if i < len(unique_broader) - 1:
115
- time.sleep(1)
116
-
117
- # Add the unique broader vulnerabilities to the result
118
- result["vulnerabilities"].extend(unique_broader)
119
-
120
- logger.info(f"Found {len(result['vulnerabilities'])} CVE vulnerabilities for {software} {version}")
121
- return result
122
-
123
- except Exception as e:
124
- logger.error(f"Error searching CVE for {software} {version}: {str(e)}")
125
- return {
126
- "software": software,
127
- "version": version,
128
- "vulnerabilities": [],
129
- "error": str(e)
130
- }
131
-
132
-
133
- @tool
134
- def get_cve_details(cve_id: str) -> Dict[str, Any]:
135
- """
136
- Get detailed information about a specific CVE.
137
-
138
- Args:
139
- cve_id: The CVE ID to look up
140
-
141
- Returns:
142
- Dictionary with detailed information about the CVE
143
- """
144
- logger.info(f"Getting details for {cve_id}")
145
-
146
- http_client = HTTPClient()
147
-
148
- try:
149
- # Ensure the CVE ID is properly formatted
150
- if not re.match(r"CVE-\d{4}-\d{4,}", cve_id):
151
- return {
152
- "id": cve_id,
153
- "error": "Invalid CVE ID format. Should be CVE-YYYY-NNNN..."
154
- }
155
-
156
- # Get the CVE detail page
157
- detail_url = f"https://cve.mitre.org/cgi-bin/cvename.cgi?name={cve_id}"
158
- detail_soup = http_client.get_soup(detail_url)
159
-
160
- # Parse the detail page
161
- vuln_details = CVEParser.parse_cve_page(detail_soup, cve_id)
162
-
163
- return vuln_details
164
-
165
- except Exception as e:
166
- logger.error(f"Error getting details for {cve_id}: {str(e)}")
167
- return {
168
- "id": cve_id,
169
- "error": str(e)
170
- }
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
vulnerability_intelligence_agent/agents/cwe_agent.py DELETED
@@ -1,171 +0,0 @@
1
- """
2
- CWE Agent module for vulnerability intelligence.
3
- This agent is responsible for querying the Common Weakness Enumeration (CWE) database.
4
- """
5
- import re
6
- import time
7
- from typing import Dict, List, Any, Optional
8
-
9
- from smolagents import tool
10
- from ..tools.http_client import HTTPClient
11
- from ..tools.parsers import CWEParser
12
- from ..tools import utils
13
-
14
- logger = utils.setup_logger("cwe_agent")
15
-
16
-
17
- @tool
18
- def get_cwe_details(cwe_id: str) -> Dict[str, Any]:
19
- """
20
- Get detailed information about a specific CWE.
21
-
22
- Args:
23
- cwe_id: The CWE ID to look up (format: CWE-NUM or just NUM)
24
-
25
- Returns:
26
- Dictionary with detailed information about the CWE
27
- """
28
- logger.info(f"Getting details for {cwe_id}")
29
-
30
- http_client = HTTPClient()
31
-
32
- try:
33
- # Normalize the CWE ID format
34
- if cwe_id.startswith("CWE-"):
35
- cwe_num = cwe_id[4:]
36
- else:
37
- cwe_num = cwe_id
38
- cwe_id = f"CWE-{cwe_id}"
39
-
40
- # Ensure the CWE ID is valid
41
- if not re.match(r"^\d+$", cwe_num):
42
- return {
43
- "id": cwe_id,
44
- "error": "Invalid CWE ID format. Should be numeric or CWE-NUM."
45
- }
46
-
47
- # Get the CWE detail page
48
- detail_url = f"https://cwe.mitre.org/data/definitions/{cwe_num}.html"
49
- detail_soup = http_client.get_soup(detail_url)
50
-
51
- # Parse the detail page
52
- cwe_details = CWEParser.parse_cwe_page(detail_soup, cwe_id)
53
-
54
- return cwe_details
55
-
56
- except Exception as e:
57
- logger.error(f"Error getting details for {cwe_id}: {str(e)}")
58
- return {
59
- "id": cwe_id,
60
- "error": str(e)
61
- }
62
-
63
-
64
- @tool
65
- def extract_cwes_from_cve(cve_description: str) -> List[Dict[str, Any]]:
66
- """
67
- Extract CWE IDs from a CVE description and get details for each.
68
-
69
- Args:
70
- cve_description: CVE description text to extract CWEs from
71
-
72
- Returns:
73
- List of CWE details dictionaries
74
- """
75
- logger.info("Extracting CWEs from CVE description")
76
-
77
- try:
78
- # Extract CWE IDs
79
- cwe_ids = CWEParser.extract_cwe_from_cve(cve_description)
80
-
81
- if not cwe_ids:
82
- logger.info("No CWE IDs found in the CVE description")
83
- return []
84
-
85
- logger.info(f"Found {len(cwe_ids)} CWE IDs: {', '.join(cwe_ids)}")
86
-
87
- # Get details for each CWE
88
- cwe_details_list = []
89
- for cwe_id in cwe_ids:
90
- # Get details for this CWE
91
- cwe_details = get_cwe_details(cwe_id)
92
-
93
- # Only add if we got valid details (no error)
94
- if "error" not in cwe_details:
95
- cwe_details_list.append(cwe_details)
96
-
97
- # Add a short delay to avoid hitting rate limits
98
- if cwe_id != cwe_ids[-1]: # Skip delay for the last item
99
- time.sleep(1)
100
-
101
- return cwe_details_list
102
-
103
- except Exception as e:
104
- logger.error(f"Error extracting CWEs from CVE description: {str(e)}")
105
- return []
106
-
107
-
108
- @tool
109
- def search_cwe_weaknesses(keyword: str) -> List[Dict[str, Any]]:
110
- """
111
- Search for CWE weaknesses by keyword.
112
-
113
- Args:
114
- keyword: Keyword to search for
115
-
116
- Returns:
117
- List of matching CWE weakness dictionaries
118
- """
119
- logger.info(f"Searching CWE for keyword: {keyword}")
120
-
121
- http_client = HTTPClient()
122
-
123
- try:
124
- # Search URL
125
- search_url = "https://cwe.mitre.org/find/index.html"
126
-
127
- # Get the search results page
128
- soup = http_client.get_soup(search_url, params={"query": keyword})
129
-
130
- # Parse the search results
131
- results = []
132
-
133
- # Look for the table of matching items
134
- result_table = soup.find("table", {"class": "detail"})
135
- if not result_table:
136
- logger.warning(f"No results found for keyword: {keyword}")
137
- return []
138
-
139
- # Extract information from each row
140
- rows = result_table.find_all("tr")[1:] # Skip header row
141
- for row in rows:
142
- cells = row.find_all("td")
143
- if len(cells) >= 2:
144
- # Extract CWE ID and name
145
- id_cell = cells[0]
146
- name_cell = cells[1]
147
-
148
- cwe_link = id_cell.find("a")
149
- if cwe_link:
150
- cwe_id = cwe_link.get_text(strip=True)
151
- cwe_name = name_cell.get_text(strip=True)
152
-
153
- # Get the URL from the link
154
- cwe_url = cwe_link.get("href")
155
- if cwe_url and not cwe_url.startswith("http"):
156
- cwe_url = f"https://cwe.mitre.org{cwe_url}"
157
-
158
- result = {
159
- "id": cwe_id,
160
- "name": cwe_name,
161
- "source": cwe_url
162
- }
163
-
164
- results.append(result)
165
-
166
- logger.info(f"Found {len(results)} CWE weaknesses for keyword: {keyword}")
167
- return results
168
-
169
- except Exception as e:
170
- logger.error(f"Error searching CWE for keyword {keyword}: {str(e)}")
171
- return []
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
vulnerability_intelligence_agent/agents/nvd_agent.py DELETED
@@ -1,227 +0,0 @@
1
- """
2
- NVD Agent module for vulnerability intelligence.
3
- This agent is responsible for querying the National Vulnerability Database.
4
- """
5
- import json
6
- import time
7
- from typing import Dict, List, Any, Optional
8
- import urllib.parse
9
-
10
- from smolagents import tool
11
- from ..tools.http_client import HTTPClient
12
- from ..tools.parsers import NVDParser
13
- from ..tools import utils
14
-
15
- logger = utils.setup_logger("nvd_agent")
16
-
17
-
18
- @tool
19
- def search_nvd_for_software(software: str, version: str, max_results: int = 20) -> Dict[str, Any]:
20
- """
21
- Search the National Vulnerability Database for vulnerabilities related to a specific software and version.
22
-
23
- Args:
24
- software: Name of the software to search for
25
- version: Version of the software to search for
26
- max_results: Maximum number of results to return (default: 20)
27
-
28
- Returns:
29
- Dictionary with vulnerability information for the software and version
30
- """
31
- logger.info(f"Searching NVD for {software} version {version}")
32
-
33
- result = {
34
- "software": software,
35
- "version": version,
36
- "vulnerabilities": []
37
- }
38
-
39
- http_client = HTTPClient()
40
-
41
- try:
42
- # NVD API endpoint
43
- # Note: This uses the public API without an API key, which has rate limits
44
- # For production use, consider registering for an API key: https://nvd.nist.gov/developers/request-an-api-key
45
- api_url = "https://services.nvd.nist.gov/rest/json/cves/2.0"
46
-
47
- # Construct the query for the specific software
48
- # Format the search to look for CPE matches containing the software name
49
- encoded_software = urllib.parse.quote(software.lower())
50
-
51
- # First query: search for exact software + version
52
- params = {
53
- "cpeName": f"cpe:2.3:*:{encoded_software}:*:{version}:*:*:*:*:*:*",
54
- "resultsPerPage": max_results
55
- }
56
-
57
- # Make the request
58
- response = http_client.get(api_url, params=params)
59
- response_json = response.json()
60
-
61
- # Parse the response
62
- vulnerabilities = NVDParser.parse_nvd_api_response(response_json, software, version)
63
-
64
- # If we didn't find enough results, try a broader search without specifying the version
65
- if len(vulnerabilities) < 5:
66
- # Add a delay to respect rate limits
67
- time.sleep(2)
68
-
69
- # Second query: search for software name only
70
- broader_params = {
71
- "cpeName": f"cpe:2.3:*:{encoded_software}:*:*:*:*:*:*:*:*",
72
- "resultsPerPage": max_results
73
- }
74
-
75
- broader_response = http_client.get(api_url, params=broader_params)
76
- broader_json = broader_response.json()
77
-
78
- broader_vulns = NVDParser.parse_nvd_api_response(broader_json, software, version)
79
-
80
- # Filter out duplicates
81
- existing_ids = {v["id"] for v in vulnerabilities}
82
- unique_broader = [v for v in broader_vulns if v["id"] not in existing_ids]
83
-
84
- vulnerabilities.extend(unique_broader)
85
-
86
- # Try a keyword search as a fallback
87
- if len(vulnerabilities) < 5:
88
- # Add a delay to respect rate limits
89
- time.sleep(2)
90
-
91
- # Third query: keyword search
92
- keyword_params = {
93
- "keywordSearch": f"{software} {version}",
94
- "resultsPerPage": max_results
95
- }
96
-
97
- keyword_response = http_client.get(api_url, params=keyword_params)
98
- keyword_json = keyword_response.json()
99
-
100
- keyword_vulns = NVDParser.parse_nvd_api_response(keyword_json, software, version)
101
-
102
- # Filter out duplicates
103
- existing_ids = {v["id"] for v in vulnerabilities}
104
- unique_keyword = [v for v in keyword_vulns if v["id"] not in existing_ids]
105
-
106
- vulnerabilities.extend(unique_keyword)
107
-
108
- # Set the vulnerabilities in the result
109
- result["vulnerabilities"] = vulnerabilities
110
-
111
- logger.info(f"Found {len(vulnerabilities)} NVD vulnerabilities for {software} {version}")
112
- return result
113
-
114
- except Exception as e:
115
- logger.error(f"Error searching NVD for {software} {version}: {str(e)}")
116
- return {
117
- "software": software,
118
- "version": version,
119
- "vulnerabilities": [],
120
- "error": str(e)
121
- }
122
-
123
-
124
- @tool
125
- def get_nvd_cve_details(cve_id: str) -> Dict[str, Any]:
126
- """
127
- Get detailed information about a specific CVE from the NVD database.
128
-
129
- Args:
130
- cve_id: The CVE ID to look up
131
-
132
- Returns:
133
- Dictionary with detailed information about the CVE from NVD
134
- """
135
- logger.info(f"Getting NVD details for {cve_id}")
136
-
137
- http_client = HTTPClient()
138
-
139
- try:
140
- # NVD API endpoint for a specific CVE
141
- api_url = f"https://services.nvd.nist.gov/rest/json/cves/2.0?cveId={cve_id}"
142
-
143
- # Make the request
144
- response = http_client.get(api_url)
145
- response_json = response.json()
146
-
147
- # Check if we got a valid response with vulnerabilities
148
- if response_json.get("totalResults", 0) == 0 or not response_json.get("vulnerabilities"):
149
- return {
150
- "id": cve_id,
151
- "error": "CVE not found in NVD"
152
- }
153
-
154
- # Extract the vulnerability data
155
- vuln_data = response_json["vulnerabilities"][0]["cve"]
156
-
157
- # Extract key information
158
- result = {
159
- "id": vuln_data.get("id", cve_id),
160
- "source": f"https://nvd.nist.gov/vuln/detail/{cve_id}"
161
- }
162
-
163
- # Extract description
164
- descriptions = vuln_data.get("descriptions", [])
165
- for desc in descriptions:
166
- if desc.get("lang") == "en":
167
- result["description"] = desc.get("value", "")
168
- break
169
-
170
- # Extract metrics (severity and CVSS score)
171
- metrics = vuln_data.get("metrics", {})
172
- cvss_v3 = metrics.get("cvssMetricV31", [])
173
- cvss_v2 = metrics.get("cvssMetricV2", [])
174
-
175
- if cvss_v3:
176
- base_metric = cvss_v3[0].get("cvssData", {})
177
- result["cvss"] = str(base_metric.get("baseScore", ""))
178
- result["severity"] = base_metric.get("baseSeverity", "UNKNOWN").upper()
179
- elif cvss_v2:
180
- base_metric = cvss_v2[0].get("cvssData", {})
181
- score = base_metric.get("baseScore")
182
- result["cvss"] = str(score) if score is not None else ""
183
-
184
- # Map CVSS v2 score to severity
185
- if score is not None:
186
- if score >= 9.0:
187
- result["severity"] = "CRITICAL"
188
- elif score >= 7.0:
189
- result["severity"] = "HIGH"
190
- elif score >= 4.0:
191
- result["severity"] = "MEDIUM"
192
- else:
193
- result["severity"] = "LOW"
194
- else:
195
- result["severity"] = "UNKNOWN"
196
-
197
- # Extract published date
198
- if "published" in vuln_data:
199
- try:
200
- date_str = vuln_data["published"].replace("Z", "+00:00")
201
- result["date"] = date_str.split("T")[0] # Just keep the date part
202
- except (ValueError, IndexError):
203
- result["date"] = vuln_data["published"]
204
-
205
- # Extract references
206
- references = vuln_data.get("references", [])
207
- if references:
208
- result["references"] = [ref.get("url") for ref in references if "url" in ref]
209
-
210
- # Add recommendation based on severity
211
- if "severity" in result:
212
- severity = result["severity"]
213
- if severity in ["CRITICAL", "HIGH"]:
214
- result["recommendation"] = "Update affected software immediately"
215
- elif severity == "MEDIUM":
216
- result["recommendation"] = "Plan to update affected software soon"
217
- else:
218
- result["recommendation"] = "Consider updating affected software when convenient"
219
-
220
- return result
221
-
222
- except Exception as e:
223
- logger.error(f"Error getting NVD details for {cve_id}: {str(e)}")
224
- return {
225
- "id": cve_id,
226
- "error": str(e)
227
- }
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
vulnerability_intelligence_agent/example_input.json DELETED
@@ -1,14 +0,0 @@
1
- [
2
- {
3
- "name": "OpenSSL",
4
- "version": "1.1.1k"
5
- },
6
- {
7
- "name": "Apache",
8
- "version": "2.4.54"
9
- },
10
- {
11
- "name": "log4j",
12
- "version": "2.14.1"
13
- }
14
- ]
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
vulnerability_intelligence_agent/main.py DELETED
@@ -1,136 +0,0 @@
1
- #!/usr/bin/env python3
2
- """
3
- Main script for the Vulnerability Intelligence Agent (VIA).
4
- """
5
- import os
6
- import sys
7
- import json
8
- import argparse
9
- import logging
10
- from typing import List, Dict, Any
11
-
12
- from smolagents import CodeAgent, HfApiModel
13
-
14
- from agents.coordinator_agent import process_input_file, search_vulnerabilities_for_software, search_vulnerabilities_for_multiple_software
15
- from tools import utils
16
-
17
- logger = utils.setup_logger("main")
18
-
19
-
20
- def parse_args():
21
- """Parse command line arguments."""
22
- parser = argparse.ArgumentParser(description="Vulnerability Intelligence Agent (VIA)")
23
- parser.add_argument("--input", "-i", type=str, help="Path to input JSON file containing software to check")
24
- parser.add_argument("--software", "-s", type=str, help="Name of software to check")
25
- parser.add_argument("--version", "-v", type=str, help="Version of software to check")
26
- parser.add_argument("--output-dir", "-o", type=str, default="reports", help="Directory to save reports")
27
- parser.add_argument("--verbose", action="store_true", help="Enable verbose logging")
28
- parser.add_argument("--model", type=str, default="Qwen/Qwen2.5-Coder-32B-Instruct", help="HuggingFace model ID to use")
29
-
30
- return parser.parse_args()
31
-
32
-
33
- def main():
34
- """Main entry point for the script."""
35
- args = parse_args()
36
-
37
- # Configure logging
38
- log_level = logging.DEBUG if args.verbose else logging.INFO
39
- logging.basicConfig(
40
- level=log_level,
41
- format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
42
- )
43
-
44
- logger.info("Starting Vulnerability Intelligence Agent (VIA)")
45
-
46
- # Set output directory
47
- if args.output_dir:
48
- os.makedirs(args.output_dir, exist_ok=True)
49
-
50
- # Initialize the model
51
- model = HfApiModel(
52
- max_tokens=2096,
53
- temperature=0.5,
54
- model_id=args.model,
55
- custom_role_conversions=None,
56
- )
57
-
58
- # Initialize the agent
59
- agent = CodeAgent(
60
- model=model,
61
- tools=[process_input_file, search_vulnerabilities_for_software, search_vulnerabilities_for_multiple_software],
62
- max_steps=10,
63
- verbosity_level=2 if args.verbose else 1,
64
- )
65
-
66
- # Process input
67
- try:
68
- if args.input:
69
- # Process input file
70
- logger.info(f"Processing input file: {args.input}")
71
-
72
- # Use the process_input_file tool directly
73
- result = process_input_file(args.input)
74
-
75
- # Display summary
76
- for software_result in result:
77
- software_name = software_result.get("software", "Unknown")
78
- software_version = software_result.get("version", "Unknown")
79
- vuln_count = len(software_result.get("vulnerabilities", []))
80
-
81
- print(f"\n{software_name} {software_version}: {vuln_count} vulnerabilities found")
82
-
83
- # Show top 3 critical/high vulnerabilities if any
84
- high_vulns = [v for v in software_result.get("vulnerabilities", [])
85
- if v.get("severity") in ["CRITICAL", "HIGH"]]
86
-
87
- if high_vulns:
88
- print("\nTop Critical/High Vulnerabilities:")
89
- for i, vuln in enumerate(high_vulns[:3], 1):
90
- print(f"{i}. {vuln.get('id')} - {vuln.get('severity')} - {vuln.get('source')}")
91
- description = vuln.get("description", "")
92
- if len(description) > 100:
93
- description = description[:100] + "..."
94
- print(f" {description}")
95
-
96
- elif args.software and args.version:
97
- # Process single software
98
- logger.info(f"Checking vulnerabilities for {args.software} {args.version}")
99
-
100
- # Use the search_vulnerabilities_for_software tool directly
101
- result = search_vulnerabilities_for_software(args.software, args.version)
102
-
103
- # Display summary
104
- vuln_count = len(result.get("vulnerabilities", []))
105
- print(f"\n{args.software} {args.version}: {vuln_count} vulnerabilities found")
106
-
107
- if vuln_count > 0:
108
- # Show all vulnerabilities
109
- print("\nVulnerabilities:")
110
- for i, vuln in enumerate(result.get("vulnerabilities", []), 1):
111
- print(f"{i}. {vuln.get('id')} - {vuln.get('severity')}")
112
- description = vuln.get("description", "")
113
- if len(description) > 100:
114
- description = description[:100] + "..."
115
- print(f" {description}")
116
- print(f" Source: {vuln.get('source')}")
117
- if vuln.get("recommendation"):
118
- print(f" Recommendation: {vuln.get('recommendation')}")
119
- print()
120
-
121
- else:
122
- # No input provided
123
- print("Error: No input provided. Use --input to specify an input file or --software and --version to check a specific software.")
124
- parser.print_help()
125
- return 1
126
-
127
- except Exception as e:
128
- logger.error(f"Error: {str(e)}")
129
- return 1
130
-
131
- logger.info("Vulnerability Intelligence Agent completed successfully")
132
- return 0
133
-
134
-
135
- if __name__ == "__main__":
136
- sys.exit(main())
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
vulnerability_intelligence_agent/requirements.txt DELETED
@@ -1,7 +0,0 @@
1
- smolagents>=1.9.2
2
- requests>=2.32.3
3
- beautifulsoup4>=4.13.3
4
- httpx>=0.28.1
5
- python-dotenv>=1.0.1
6
- rich>=13.9.4
7
- pyyaml>=6.0.2
 
 
 
 
 
 
 
 
vulnerability_intelligence_agent/tools/__init__.py DELETED
@@ -1,8 +0,0 @@
1
- """
2
- Vulnerability Intelligence Agent (VIA) - Tools Package.
3
- This package contains utility tools for HTTP requests, parsing, and general utilities.
4
- """
5
-
6
- from . import http_client
7
- from . import parsers
8
- from . import utils
 
 
 
 
 
 
 
 
 
vulnerability_intelligence_agent/tools/http_client.py DELETED
@@ -1,214 +0,0 @@
1
- """
2
- HTTP Client module for VIA.
3
- Provides a unified interface for making HTTP requests to vulnerability databases.
4
- """
5
- import time
6
- import random
7
- import asyncio
8
- from typing import Dict, Optional, Any, Union
9
- import httpx
10
- import requests
11
- from bs4 import BeautifulSoup
12
-
13
-
14
- class HTTPClient:
15
- """
16
- A client for making HTTP requests to vulnerability databases.
17
- Supports both synchronous and asynchronous requests.
18
- """
19
-
20
- DEFAULT_HEADERS = {
21
- "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/96.0.4664.110 Safari/537.36",
22
- "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8",
23
- "Accept-Language": "en-US,en;q=0.5",
24
- "DNT": "1",
25
- "Connection": "keep-alive",
26
- "Upgrade-Insecure-Requests": "1",
27
- }
28
-
29
- DEFAULT_TIMEOUT = 30.0 # seconds
30
- DEFAULT_RETRIES = 3
31
- DEFAULT_RETRY_DELAY = 2.0 # seconds
32
-
33
- def __init__(
34
- self,
35
- headers: Optional[Dict[str, str]] = None,
36
- timeout: float = DEFAULT_TIMEOUT,
37
- max_retries: int = DEFAULT_RETRIES,
38
- retry_delay: float = DEFAULT_RETRY_DELAY,
39
- ):
40
- """
41
- Initialize the HTTP client with custom headers and settings.
42
-
43
- Args:
44
- headers: Optional custom headers to use for requests
45
- timeout: Request timeout in seconds
46
- max_retries: Maximum number of retry attempts for failed requests
47
- retry_delay: Base delay between retries in seconds
48
- """
49
- self.headers = headers or self.DEFAULT_HEADERS.copy()
50
- self.timeout = timeout
51
- self.max_retries = max_retries
52
- self.retry_delay = retry_delay
53
-
54
- # Initialize clients
55
- self.sync_client = requests.Session()
56
- self.sync_client.headers.update(self.headers)
57
-
58
- self.async_client = httpx.AsyncClient(
59
- headers=self.headers,
60
- timeout=self.timeout,
61
- follow_redirects=True,
62
- )
63
-
64
- def get(
65
- self,
66
- url: str,
67
- params: Optional[Dict[str, Any]] = None,
68
- headers: Optional[Dict[str, str]] = None,
69
- timeout: Optional[float] = None,
70
- ) -> requests.Response:
71
- """
72
- Make a synchronous GET request with retries.
73
-
74
- Args:
75
- url: The URL to request
76
- params: Optional URL parameters
77
- headers: Optional headers to add or override
78
- timeout: Optional timeout override
79
-
80
- Returns:
81
- Response object from requests
82
- """
83
- merged_headers = self.headers.copy()
84
- if headers:
85
- merged_headers.update(headers)
86
-
87
- timeout = timeout or self.timeout
88
-
89
- for attempt in range(self.max_retries):
90
- try:
91
- response = self.sync_client.get(
92
- url,
93
- params=params,
94
- headers=merged_headers,
95
- timeout=timeout,
96
- )
97
- response.raise_for_status()
98
- return response
99
- except (requests.RequestException, httpx.HTTPError) as e:
100
- if attempt == self.max_retries - 1:
101
- raise e
102
-
103
- # Apply exponential backoff with jitter
104
- delay = self.retry_delay * (2 ** attempt) + random.uniform(0, 1)
105
- time.sleep(delay)
106
-
107
- # This should not be reached due to the exception in the loop
108
- raise RuntimeError("Failed to complete request after all retries")
109
-
110
- async def get_async(
111
- self,
112
- url: str,
113
- params: Optional[Dict[str, Any]] = None,
114
- headers: Optional[Dict[str, str]] = None,
115
- timeout: Optional[float] = None,
116
- ) -> httpx.Response:
117
- """
118
- Make an asynchronous GET request with retries.
119
-
120
- Args:
121
- url: The URL to request
122
- params: Optional URL parameters
123
- headers: Optional headers to add or override
124
- timeout: Optional timeout override
125
-
126
- Returns:
127
- Response object from httpx
128
- """
129
- merged_headers = self.headers.copy()
130
- if headers:
131
- merged_headers.update(headers)
132
-
133
- timeout_val = timeout or self.timeout
134
-
135
- for attempt in range(self.max_retries):
136
- try:
137
- response = await self.async_client.get(
138
- url,
139
- params=params,
140
- headers=merged_headers,
141
- timeout=timeout_val,
142
- )
143
- response.raise_for_status()
144
- return response
145
- except httpx.HTTPError as e:
146
- if attempt == self.max_retries - 1:
147
- raise e
148
-
149
- # Apply exponential backoff with jitter
150
- delay = self.retry_delay * (2 ** attempt) + random.uniform(0, 1)
151
- await asyncio.sleep(delay)
152
-
153
- # This should not be reached due to the exception in the loop
154
- raise RuntimeError("Failed to complete request after all retries")
155
-
156
- def get_soup(
157
- self,
158
- url: str,
159
- params: Optional[Dict[str, Any]] = None,
160
- headers: Optional[Dict[str, str]] = None,
161
- parser: str = "html.parser",
162
- ) -> BeautifulSoup:
163
- """
164
- Make a GET request and return a BeautifulSoup object.
165
-
166
- Args:
167
- url: The URL to request
168
- params: Optional URL parameters
169
- headers: Optional headers to override
170
- parser: BeautifulSoup parser to use
171
-
172
- Returns:
173
- BeautifulSoup object for the response
174
- """
175
- response = self.get(url, params=params, headers=headers)
176
- return BeautifulSoup(response.text, parser)
177
-
178
- async def get_soup_async(
179
- self,
180
- url: str,
181
- params: Optional[Dict[str, Any]] = None,
182
- headers: Optional[Dict[str, str]] = None,
183
- parser: str = "html.parser",
184
- ) -> BeautifulSoup:
185
- """
186
- Make an async GET request and return a BeautifulSoup object.
187
-
188
- Args:
189
- url: The URL to request
190
- params: Optional URL parameters
191
- headers: Optional headers to override
192
- parser: BeautifulSoup parser to use
193
-
194
- Returns:
195
- BeautifulSoup object for the response
196
- """
197
- response = await self.get_async(url, params=params, headers=headers)
198
- return BeautifulSoup(response.text, parser)
199
-
200
- async def close(self):
201
- """Close the async client."""
202
- await self.async_client.aclose()
203
-
204
- def __del__(self):
205
- """Ensure the async client is closed."""
206
- try:
207
- if hasattr(self, "async_client"):
208
- loop = asyncio.get_event_loop()
209
- if loop.is_running():
210
- loop.create_task(self.async_client.aclose())
211
- else:
212
- loop.run_until_complete(self.async_client.aclose())
213
- except (ImportError, RuntimeError):
214
- pass
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
vulnerability_intelligence_agent/tools/parsers.py DELETED
@@ -1,456 +0,0 @@
1
- """
2
- Parsers for vulnerability databases.
3
- """
4
- import re
5
- import json
6
- from typing import Dict, List, Any, Optional, Tuple
7
- from datetime import datetime
8
- from bs4 import BeautifulSoup, Tag
9
- from . import utils
10
-
11
- logger = utils.setup_logger("parsers")
12
-
13
-
14
- class CVEParser:
15
- """Parser for CVE database entries."""
16
-
17
- @staticmethod
18
- def parse_cve_page(soup: BeautifulSoup, cve_id: str) -> Dict[str, Any]:
19
- """
20
- Parse a CVE detail page from cve.mitre.org.
21
-
22
- Args:
23
- soup: BeautifulSoup object of the CVE page
24
- cve_id: CVE ID being parsed
25
-
26
- Returns:
27
- Dictionary with parsed vulnerability information
28
- """
29
- result = {
30
- "id": cve_id,
31
- "source": f"https://cve.mitre.org/cgi-bin/cvename.cgi?name={cve_id}"
32
- }
33
-
34
- try:
35
- # Extract description
36
- description_div = soup.find("div", {"class": "cvedetails"})
37
- if description_div:
38
- desc_content = description_div.get_text(strip=True)
39
- result["description"] = desc_content
40
-
41
- # Extract date if available
42
- date_div = soup.find("th", text=re.compile("Published"))
43
- if date_div and date_div.find_next_sibling("td"):
44
- date_text = date_div.find_next_sibling("td").get_text(strip=True)
45
- try:
46
- parsed_date = datetime.strptime(date_text, "%m/%d/%Y")
47
- result["date"] = parsed_date.strftime("%Y-%m-%d")
48
- except ValueError:
49
- # If date format is unexpected, include as-is
50
- result["date"] = date_text
51
-
52
- # Severity is not typically available directly on CVE pages
53
- # but might be referenced in the description
54
- severity_patterns = [
55
- (r'high severity', 'HIGH'),
56
- (r'medium severity', 'MEDIUM'),
57
- (r'low severity', 'LOW'),
58
- (r'critical severity', 'CRITICAL')
59
- ]
60
-
61
- for pattern, severity in severity_patterns:
62
- if result.get("description") and re.search(pattern, result["description"], re.IGNORECASE):
63
- result["severity"] = severity
64
- break
65
-
66
- if "severity" not in result:
67
- result["severity"] = "UNKNOWN"
68
-
69
- return result
70
-
71
- except Exception as e:
72
- logger.error(f"Error parsing CVE page for {cve_id}: {str(e)}")
73
- return {
74
- "id": cve_id,
75
- "description": "Error parsing CVE information",
76
- "severity": "UNKNOWN",
77
- "source": f"https://cve.mitre.org/cgi-bin/cvename.cgi?name={cve_id}"
78
- }
79
-
80
- @staticmethod
81
- def parse_cve_search_results(soup: BeautifulSoup, software: str, version: str) -> List[Dict[str, Any]]:
82
- """
83
- Parse CVE search results for a specific software and version.
84
-
85
- Args:
86
- soup: BeautifulSoup object of the search results page
87
- software: Software name being searched
88
- version: Software version being searched
89
-
90
- Returns:
91
- List of vulnerability dictionaries
92
- """
93
- vulnerabilities = []
94
-
95
- try:
96
- # Find the main table containing CVEs
97
- table = soup.find("table", {"id": "cves"})
98
- if not table:
99
- logger.warning(f"No CVE table found for {software} {version}")
100
- return []
101
-
102
- rows = table.find_all("tr")[1:] # Skip header row
103
-
104
- for row in rows:
105
- cols = row.find_all("td")
106
- if len(cols) >= 2:
107
- cve_id = cols[0].get_text(strip=True)
108
- description = cols[1].get_text(strip=True)
109
-
110
- # Check if the version appears in the description
111
- if version.lower() in description.lower():
112
- vuln = {
113
- "id": cve_id,
114
- "description": description,
115
- "severity": "UNKNOWN", # Will need to be determined later
116
- "source": f"https://cve.mitre.org/cgi-bin/cvename.cgi?name={cve_id}"
117
- }
118
- vulnerabilities.append(vuln)
119
-
120
- return vulnerabilities
121
-
122
- except Exception as e:
123
- logger.error(f"Error parsing CVE search results for {software} {version}: {str(e)}")
124
- return []
125
-
126
-
127
- class NVDParser:
128
- """Parser for National Vulnerability Database entries."""
129
-
130
- @staticmethod
131
- def parse_nvd_api_response(response_json: Dict[str, Any], software: str, version: str) -> List[Dict[str, Any]]:
132
- """
133
- Parse a response from the NVD API.
134
-
135
- Args:
136
- response_json: JSON response from NVD API
137
- software: Software name being searched
138
- version: Software version being searched
139
-
140
- Returns:
141
- List of parsed vulnerabilities
142
- """
143
- vulnerabilities = []
144
-
145
- try:
146
- results = response_json.get("vulnerabilities", [])
147
-
148
- for item in results:
149
- cve = item.get("cve", {})
150
-
151
- # Extract CVE ID
152
- cve_id = cve.get("id", "")
153
-
154
- # Extract description
155
- descriptions = cve.get("descriptions", [])
156
- description = ""
157
- for desc in descriptions:
158
- if desc.get("lang") == "en":
159
- description = desc.get("value", "")
160
- break
161
-
162
- # Extract metrics for CVSS score
163
- metrics = cve.get("metrics", {})
164
- cvss_v3 = metrics.get("cvssMetricV31", [])
165
- cvss_v2 = metrics.get("cvssMetricV2", [])
166
-
167
- severity = "UNKNOWN"
168
- cvss_score = None
169
-
170
- # Try to get CVSS v3 first, then fallback to v2
171
- if cvss_v3:
172
- base_metric = cvss_v3[0].get("cvssData", {})
173
- cvss_score = base_metric.get("baseScore")
174
- severity_raw = base_metric.get("baseSeverity", "").upper()
175
- if severity_raw:
176
- severity = severity_raw
177
- elif cvss_v2:
178
- base_metric = cvss_v2[0].get("cvssData", {})
179
- cvss_score = base_metric.get("baseScore")
180
-
181
- # Map CVSS v2 score to severity
182
- if cvss_score is not None:
183
- if cvss_score >= 9.0:
184
- severity = "CRITICAL"
185
- elif cvss_score >= 7.0:
186
- severity = "HIGH"
187
- elif cvss_score >= 4.0:
188
- severity = "MEDIUM"
189
- else:
190
- severity = "LOW"
191
-
192
- # Extract published date
193
- published_date = cve.get("published", "")
194
- if published_date:
195
- try:
196
- # NVD dates are in ISO format
197
- date_obj = datetime.fromisoformat(published_date.replace("Z", "+00:00"))
198
- published_date = date_obj.strftime("%Y-%m-%d")
199
- except ValueError:
200
- pass
201
-
202
- # Check CPE matches for the specific software and version
203
- configurations = cve.get("configurations", [])
204
- matches_software = False
205
-
206
- for config in configurations:
207
- nodes = config.get("nodes", [])
208
- for node in nodes:
209
- cpe_matches = node.get("cpeMatch", [])
210
- for cpe_match in cpe_matches:
211
- cpe_name = cpe_match.get("criteria", "").lower()
212
-
213
- # Check if the CPE contains the software name and version
214
- if software.lower() in cpe_name:
215
- # Direct version match
216
- if f":{version}:" in cpe_name or f":{version}" in cpe_name:
217
- matches_software = True
218
- break
219
-
220
- # Version range match
221
- version_start_inclusive = cpe_match.get("versionStartIncluding", "")
222
- version_start_exclusive = cpe_match.get("versionStartExcluding", "")
223
- version_end_inclusive = cpe_match.get("versionEndIncluding", "")
224
- version_end_exclusive = cpe_match.get("versionEndExcluding", "")
225
-
226
- if any([version_start_inclusive, version_start_exclusive,
227
- version_end_inclusive, version_end_exclusive]):
228
- # Convert version to comparable parts
229
- version_parts = utils.extract_version_parts(version)
230
-
231
- # Check range conditions
232
- in_range = True
233
-
234
- if version_start_inclusive:
235
- start_parts = utils.extract_version_parts(version_start_inclusive)
236
- if version_parts < start_parts:
237
- in_range = False
238
-
239
- if version_start_exclusive:
240
- start_parts = utils.extract_version_parts(version_start_exclusive)
241
- if version_parts <= start_parts:
242
- in_range = False
243
-
244
- if version_end_inclusive:
245
- end_parts = utils.extract_version_parts(version_end_inclusive)
246
- if version_parts > end_parts:
247
- in_range = False
248
-
249
- if version_end_exclusive:
250
- end_parts = utils.extract_version_parts(version_end_exclusive)
251
- if version_parts >= end_parts:
252
- in_range = False
253
-
254
- if in_range:
255
- matches_software = True
256
- break
257
-
258
- if matches_software:
259
- break
260
-
261
- if matches_software:
262
- break
263
-
264
- # Only include vulnerabilities that match the software and version
265
- if matches_software:
266
- vulnerability = {
267
- "id": cve_id,
268
- "description": description,
269
- "severity": severity,
270
- "source": f"https://nvd.nist.gov/vuln/detail/{cve_id}"
271
- }
272
-
273
- if cvss_score is not None:
274
- vulnerability["cvss"] = str(cvss_score)
275
-
276
- if published_date:
277
- vulnerability["date"] = published_date
278
-
279
- # Add recommendation based on severity
280
- if severity in ["CRITICAL", "HIGH"]:
281
- vulnerability["recommendation"] = f"Update {software} to the latest version immediately"
282
- elif severity == "MEDIUM":
283
- vulnerability["recommendation"] = f"Plan to update {software} to the latest version"
284
- else:
285
- vulnerability["recommendation"] = f"Consider updating {software} when convenient"
286
-
287
- vulnerabilities.append(vulnerability)
288
-
289
- return vulnerabilities
290
-
291
- except Exception as e:
292
- logger.error(f"Error parsing NVD API response for {software} {version}: {str(e)}")
293
- return []
294
-
295
-
296
- class CISAParser:
297
- """Parser for CISA Known Exploited Vulnerabilities Catalog."""
298
-
299
- @staticmethod
300
- def parse_cisa_kev_data(kev_data: Dict[str, Any], software: str, version: str) -> List[Dict[str, Any]]:
301
- """
302
- Parse CISA Known Exploited Vulnerabilities (KEV) catalog data.
303
-
304
- Args:
305
- kev_data: KEV catalog data as JSON
306
- software: Software name to filter for
307
- version: Software version to filter for
308
-
309
- Returns:
310
- List of parsed vulnerabilities
311
- """
312
- vulnerabilities = []
313
-
314
- try:
315
- if not isinstance(kev_data, dict):
316
- logger.error(f"Invalid KEV data format: {type(kev_data)}")
317
- return []
318
-
319
- catalog_items = kev_data.get("vulnerabilities", [])
320
-
321
- for item in catalog_items:
322
- product_name = item.get("product", "").lower()
323
-
324
- # Check if this vulnerability applies to our software
325
- normalized_software = utils.normalize_software_name(software)
326
- if normalized_software not in utils.normalize_software_name(product_name):
327
- continue
328
-
329
- # Extract version information, which may be in the vendorProject field
330
- vendor_project = item.get("vendorProject", "").lower()
331
- if version.lower() not in vendor_project and version.lower() not in product_name:
332
- continue
333
-
334
- cve_id = item.get("cveID", "")
335
- date_added = item.get("dateAdded", "")
336
-
337
- # Format the date if available
338
- formatted_date = ""
339
- if date_added:
340
- try:
341
- date_obj = datetime.strptime(date_added, "%Y-%m-%d")
342
- formatted_date = date_obj.strftime("%Y-%m-%d")
343
- except ValueError:
344
- formatted_date = date_added
345
-
346
- vulnerability = {
347
- "id": cve_id,
348
- "description": item.get("vulnerabilityName", ""),
349
- "severity": "CRITICAL", # All KEV items are considered critical as they are actively exploited
350
- "source": "https://www.cisa.gov/known-exploited-vulnerabilities-catalog",
351
- "cisa_required_action": item.get("requiredAction", ""),
352
- "cisa_due_date": item.get("dueDate", "")
353
- }
354
-
355
- if formatted_date:
356
- vulnerability["date"] = formatted_date
357
-
358
- # Add strong recommendation as these are known exploited vulnerabilities
359
- vulnerability["recommendation"] = f"URGENT: Update {software} immediately as this vulnerability is being actively exploited in the wild"
360
-
361
- vulnerabilities.append(vulnerability)
362
-
363
- return vulnerabilities
364
-
365
- except Exception as e:
366
- logger.error(f"Error parsing CISA KEV data for {software} {version}: {str(e)}")
367
- return []
368
-
369
-
370
- class CWEParser:
371
- """Parser for Common Weakness Enumeration (CWE) data."""
372
-
373
- @staticmethod
374
- def parse_cwe_page(soup: BeautifulSoup, cwe_id: str) -> Dict[str, Any]:
375
- """
376
- Parse a CWE detail page.
377
-
378
- Args:
379
- soup: BeautifulSoup object of the CWE page
380
- cwe_id: CWE ID being parsed
381
-
382
- Returns:
383
- Dictionary with parsed weakness information
384
- """
385
- result = {
386
- "id": cwe_id,
387
- "source": f"https://cwe.mitre.org/data/definitions/{cwe_id}.html"
388
- }
389
-
390
- try:
391
- # Extract the name/title
392
- title_div = soup.find("div", {"id": "title"})
393
- if title_div:
394
- result["title"] = title_div.get_text(strip=True).replace(f"{cwe_id}: ", "")
395
-
396
- # Extract description
397
- desc_div = soup.find("div", {"id": "description"})
398
- if desc_div:
399
- desc_content = desc_div.find("div", {"class": "detail"})
400
- if desc_content:
401
- result["description"] = desc_content.get_text(strip=True)
402
-
403
- # Extract likelihood
404
- likelihood_div = soup.find("div", {"id": "likelihood"})
405
- if likelihood_div:
406
- likelihood_content = likelihood_div.find("div", {"class": "detail"})
407
- if likelihood_content:
408
- result["likelihood"] = likelihood_content.get_text(strip=True)
409
-
410
- # Determine severity based on likelihood or description keywords
411
- if "likelihood" in result:
412
- if "high" in result["likelihood"].lower():
413
- result["severity"] = "HIGH"
414
- elif "medium" in result["likelihood"].lower():
415
- result["severity"] = "MEDIUM"
416
- elif "low" in result["likelihood"].lower():
417
- result["severity"] = "LOW"
418
- else:
419
- result["severity"] = "UNKNOWN"
420
- else:
421
- result["severity"] = "UNKNOWN"
422
-
423
- # Extract mitigation information
424
- mitigation_div = soup.find("div", {"id": "mitigations"})
425
- if mitigation_div:
426
- mitigation_content = mitigation_div.find("div", {"class": "detail"})
427
- if mitigation_content:
428
- result["mitigation"] = mitigation_content.get_text(strip=True)
429
- result["recommendation"] = result["mitigation"]
430
-
431
- return result
432
-
433
- except Exception as e:
434
- logger.error(f"Error parsing CWE page for {cwe_id}: {str(e)}")
435
- return {
436
- "id": cwe_id,
437
- "description": "Error parsing CWE information",
438
- "severity": "UNKNOWN",
439
- "source": f"https://cwe.mitre.org/data/definitions/{cwe_id}.html"
440
- }
441
-
442
- @staticmethod
443
- def extract_cwe_from_cve(cve_description: str) -> List[str]:
444
- """
445
- Extract CWE IDs from a CVE description.
446
-
447
- Args:
448
- cve_description: CVE description text
449
-
450
- Returns:
451
- List of CWE IDs
452
- """
453
- # Pattern to match CWE references like CWE-79, CWE-89, etc.
454
- pattern = r'CWE-(\d+)'
455
- matches = re.findall(pattern, cve_description)
456
- return [f"CWE-{match}" for match in matches]
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
vulnerability_intelligence_agent/tools/utils.py DELETED
@@ -1,301 +0,0 @@
1
- """
2
- Utility functions for the Vulnerability Intelligence Agent.
3
- """
4
- import json
5
- import os
6
- import re
7
- import logging
8
- import datetime
9
- from typing import Dict, List, Any, Optional, Union
10
-
11
- # Configure logging
12
- logging.basicConfig(
13
- level=logging.INFO,
14
- format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
15
- )
16
- logger = logging.getLogger("via")
17
-
18
-
19
- def setup_logger(name: str, level: int = logging.INFO) -> logging.Logger:
20
- """
21
- Set up a logger with the given name and level.
22
-
23
- Args:
24
- name: Name of the logger
25
- level: Logging level
26
-
27
- Returns:
28
- Configured logger instance
29
- """
30
- logger = logging.getLogger(f"via.{name}")
31
- logger.setLevel(level)
32
- return logger
33
-
34
-
35
- def normalize_software_name(name: str) -> str:
36
- """
37
- Normalize a software name to improve matching across databases.
38
-
39
- Args:
40
- name: Software name to normalize
41
-
42
- Returns:
43
- Normalized software name
44
- """
45
- # Convert to lowercase and remove special characters
46
- normalized = re.sub(r"[^a-z0-9]", "", name.lower())
47
- return normalized
48
-
49
-
50
- def normalize_version(version: str) -> str:
51
- """
52
- Normalize a version string to improve matching across databases.
53
-
54
- Args:
55
- version: Version string to normalize
56
-
57
- Returns:
58
- Normalized version string
59
- """
60
- # Remove leading 'v' if present
61
- if version.lower().startswith("v"):
62
- version = version[1:]
63
-
64
- # Replace underscores with dots
65
- version = version.replace("_", ".")
66
-
67
- # Remove any alphabetic parts (like beta, alpha, etc.)
68
- version = re.sub(r"[a-zA-Z].*$", "", version)
69
-
70
- return version.strip()
71
-
72
-
73
- def save_report(data: Dict[str, Any], filename: str, report_dir: str = "reports") -> str:
74
- """
75
- Save a vulnerability report to a file.
76
-
77
- Args:
78
- data: Report data to save
79
- filename: Base filename (without extension)
80
- report_dir: Directory to save the report in
81
-
82
- Returns:
83
- Path to the saved report file
84
- """
85
- # Ensure the reports directory exists
86
- os.makedirs(report_dir, exist_ok=True)
87
-
88
- # Add timestamp to filename to avoid overwriting
89
- timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
90
- json_filename = f"{filename}_{timestamp}.json"
91
- json_path = os.path.join(report_dir, json_filename)
92
-
93
- # Save JSON report
94
- with open(json_path, 'w') as f:
95
- json.dump(data, f, indent=2)
96
-
97
- logger.info(f"Report saved to {json_path}")
98
- return json_path
99
-
100
-
101
- def generate_markdown_report(data: Dict[str, Any], filename: str, report_dir: str = "reports") -> str:
102
- """
103
- Generate a Markdown report from vulnerability data.
104
-
105
- Args:
106
- data: Vulnerability data
107
- filename: Base filename (without extension)
108
- report_dir: Directory to save the report in
109
-
110
- Returns:
111
- Path to the generated Markdown file
112
- """
113
- # Ensure the reports directory exists
114
- os.makedirs(report_dir, exist_ok=True)
115
-
116
- # Add timestamp to filename to avoid overwriting
117
- timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
118
- md_filename = f"{filename}_{timestamp}.md"
119
- md_path = os.path.join(report_dir, md_filename)
120
-
121
- with open(md_path, 'w') as f:
122
- # Write title
123
- f.write(f"# Vulnerability Report: {data['software']} {data['version']}\n\n")
124
- f.write(f"*Generated on: {datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}*\n\n")
125
-
126
- # Write summary
127
- f.write("## Summary\n\n")
128
- vuln_count = len(data.get('vulnerabilities', []))
129
- f.write(f"Found **{vuln_count}** vulnerabilities for {data['software']} {data['version']}.\n\n")
130
-
131
- # Write vulnerabilities
132
- if vuln_count > 0:
133
- f.write("## Vulnerabilities\n\n")
134
-
135
- for i, vuln in enumerate(data['vulnerabilities'], 1):
136
- f.write(f"### {i}. {vuln.get('id', 'Unknown ID')}\n\n")
137
- f.write(f"**Severity:** {vuln.get('severity', 'Unknown')}")
138
- if 'cvss' in vuln:
139
- f.write(f" (CVSS: {vuln['cvss']})")
140
- f.write("\n\n")
141
-
142
- f.write(f"**Description:** {vuln.get('description', 'No description available.')}\n\n")
143
-
144
- if 'date' in vuln:
145
- f.write(f"**Published:** {vuln['date']}\n\n")
146
-
147
- if 'recommendation' in vuln:
148
- f.write(f"**Recommendation:** {vuln['recommendation']}\n\n")
149
-
150
- if 'source' in vuln:
151
- f.write(f"**Source:** [{vuln['source']}]({vuln['source']})\n\n")
152
-
153
- f.write("---\n\n")
154
- else:
155
- f.write("## No vulnerabilities found\n\n")
156
- f.write("No known vulnerabilities were found for this software and version.\n\n")
157
-
158
- # Write footer
159
- f.write("## References\n\n")
160
- f.write("- [CVE (Common Vulnerabilities and Exposures)](https://cve.mitre.org/)\n")
161
- f.write("- [NVD (National Vulnerability Database)](https://nvd.nist.gov/)\n")
162
- f.write("- [CISA Known Exploited Vulnerabilities Catalog](https://www.cisa.gov/known-exploited-vulnerabilities-catalog)\n")
163
- f.write("- [CWE (Common Weakness Enumeration)](https://cwe.mitre.org/)\n")
164
-
165
- logger.info(f"Markdown report saved to {md_path}")
166
- return md_path
167
-
168
-
169
- def merge_vulnerability_data(data_list: List[Dict[str, Any]]) -> Dict[str, Any]:
170
- """
171
- Merge vulnerability data from multiple sources, removing duplicates.
172
-
173
- Args:
174
- data_list: List of vulnerability data dictionaries
175
-
176
- Returns:
177
- Merged vulnerability data
178
- """
179
- if not data_list:
180
- return {}
181
-
182
- # Start with the first item
183
- result = data_list[0].copy()
184
- result['vulnerabilities'] = result.get('vulnerabilities', []).copy()
185
-
186
- # Track vulnerability IDs to avoid duplicates
187
- vuln_ids = {v.get('id'): True for v in result['vulnerabilities']}
188
-
189
- # Merge additional data
190
- for data in data_list[1:]:
191
- if data.get('vulnerabilities'):
192
- for vuln in data['vulnerabilities']:
193
- vuln_id = vuln.get('id')
194
- if vuln_id and vuln_id not in vuln_ids:
195
- result['vulnerabilities'].append(vuln)
196
- vuln_ids[vuln_id] = True
197
-
198
- # Sort vulnerabilities by severity (if available)
199
- severity_order = {
200
- "CRITICAL": 0,
201
- "HIGH": 1,
202
- "MEDIUM": 2,
203
- "LOW": 3,
204
- "UNKNOWN": 4,
205
- }
206
-
207
- result['vulnerabilities'].sort(
208
- key=lambda v: severity_order.get(v.get('severity', '').upper(), 999)
209
- )
210
-
211
- return result
212
-
213
-
214
- def extract_version_parts(version: str) -> List[int]:
215
- """
216
- Extract version numbers into a list of integers for comparison.
217
-
218
- Args:
219
- version: Version string (e.g., "1.2.3")
220
-
221
- Returns:
222
- List of integer version parts
223
- """
224
- # Normalize version and extract numeric parts
225
- norm_version = normalize_version(version)
226
- return [int(part) for part in re.findall(r'\d+', norm_version)]
227
-
228
-
229
- def is_version_in_range(version: str, min_version: str, max_version: str) -> bool:
230
- """
231
- Check if a version is within a specified range.
232
-
233
- Args:
234
- version: Version to check
235
- min_version: Minimum version (inclusive)
236
- max_version: Maximum version (inclusive)
237
-
238
- Returns:
239
- True if version is in range, False otherwise
240
- """
241
- version_parts = extract_version_parts(version)
242
- min_parts = extract_version_parts(min_version)
243
- max_parts = extract_version_parts(max_version)
244
-
245
- # Extend parts with zeros to ensure equal length
246
- max_length = max(len(version_parts), len(min_parts), len(max_parts))
247
- version_parts.extend([0] * (max_length - len(version_parts)))
248
- min_parts.extend([0] * (max_length - len(min_parts)))
249
- max_parts.extend([0] * (max_length - len(max_parts)))
250
-
251
- # Check if version is in range
252
- return min_parts <= version_parts <= max_parts
253
-
254
-
255
- def is_version_affected(version: str, affected_versions: str) -> bool:
256
- """
257
- Check if a version is affected by a vulnerability based on version string.
258
-
259
- Args:
260
- version: Version to check
261
- affected_versions: Description of affected versions (e.g., "< 1.2.3", ">= 2.0")
262
-
263
- Returns:
264
- True if version is affected, False otherwise
265
- """
266
- version_parts = extract_version_parts(version)
267
-
268
- # Handle different version patterns
269
- if "<=" in affected_versions:
270
- max_version = affected_versions.split("<=")[1].strip()
271
- max_parts = extract_version_parts(max_version)
272
- max_parts.extend([0] * (len(version_parts) - len(max_parts)))
273
- return version_parts <= max_parts
274
-
275
- elif ">=" in affected_versions:
276
- min_version = affected_versions.split(">=")[1].strip()
277
- min_parts = extract_version_parts(min_version)
278
- min_parts.extend([0] * (len(version_parts) - len(min_parts)))
279
- return version_parts >= min_parts
280
-
281
- elif "<" in affected_versions:
282
- max_version = affected_versions.split("<")[1].strip()
283
- max_parts = extract_version_parts(max_version)
284
- max_parts.extend([0] * (len(version_parts) - len(max_parts)))
285
- return version_parts < max_parts
286
-
287
- elif ">" in affected_versions:
288
- min_version = affected_versions.split(">")[1].strip()
289
- min_parts = extract_version_parts(min_version)
290
- min_parts.extend([0] * (len(version_parts) - len(min_parts)))
291
- return version_parts > min_parts
292
-
293
- elif "-" in affected_versions:
294
- # Handle range: "1.0.0 - 2.0.0"
295
- parts = affected_versions.split("-")
296
- min_version = parts[0].strip()
297
- max_version = parts[1].strip()
298
- return is_version_in_range(version, min_version, max_version)
299
-
300
- # Direct comparison
301
- return normalize_version(version) == normalize_version(affected_versions)