|
import os |
|
from typing import Dict, Any, Optional, List |
|
import nvdlib |
|
from smolagents.tools import Tool |
|
|
|
|
|
try: |
|
from pymisp import PyMISP |
|
MISP_AVAILABLE = True |
|
except ImportError: |
|
MISP_AVAILABLE = False |
|
|
|
class VulnerabilitySearchTool(Tool): |
|
name = "vuln_search" |
|
description = "Search for vulnerabilities in security databases (NVD, MISP if configured)" |
|
inputs = { |
|
'query': {'type': 'string', 'description': 'Search term or CVE ID'}, |
|
'sources': { |
|
'type': 'array', |
|
'description': 'List of sources to query (nvd, misp)', |
|
'default': ['nvd'], |
|
'nullable': True |
|
}, |
|
'max_results': { |
|
'type': 'integer', |
|
'description': 'Maximum number of results per source', |
|
'default': 5, |
|
'nullable': True |
|
} |
|
} |
|
output_type = "object" |
|
|
|
def __init__(self): |
|
"""Initialize API connections""" |
|
|
|
self.nvd_api_key = os.getenv('NVD_API_KEY') |
|
if self.nvd_api_key: |
|
nvdlib.set_api_key(self.nvd_api_key) |
|
|
|
|
|
self.misp = None |
|
self.misp_available = False |
|
|
|
if MISP_AVAILABLE: |
|
misp_url = os.getenv('MISP_URL') |
|
misp_key = os.getenv('MISP_KEY') |
|
|
|
if misp_url and misp_key: |
|
try: |
|
self.misp = PyMISP(misp_url, misp_key, ssl=False) |
|
self.misp_available = True |
|
except Exception as e: |
|
print(f"MISP initialization failed: {e}") |
|
|
|
def search_nvd(self, query: str, max_results: int) -> List[Dict[str, Any]]: |
|
"""Search vulnerabilities in NVD""" |
|
try: |
|
|
|
if query.startswith('CVE-'): |
|
results = nvdlib.get_cve(query) |
|
return [{ |
|
'id': results.id, |
|
'description': results.descriptions[0].value, |
|
'severity': results.metrics.cvssMetricV31[0].cvssData.baseScore if results.metrics else None, |
|
'published': results.published, |
|
'references': [ref.url for ref in results.references] |
|
}] |
|
|
|
|
|
results = nvdlib.searchCVE( |
|
keyword=query, |
|
limit=max_results |
|
) |
|
|
|
return [{ |
|
'id': r.id, |
|
'description': r.descriptions[0].value, |
|
'severity': r.metrics.cvssMetricV31[0].cvssData.baseScore if r.metrics else None, |
|
'published': r.published, |
|
'references': [ref.url for ref in r.references] |
|
} for r in results] |
|
|
|
except Exception as e: |
|
return [{'error': f"Error in NVD search: {str(e)}"}] |
|
|
|
def search_misp(self, query: str, max_results: int) -> List[Dict[str, Any]]: |
|
"""Search vulnerabilities in MISP (if available)""" |
|
if not self.misp_available: |
|
return [{'error': 'MISP is not configured or not available'}] |
|
|
|
try: |
|
|
|
results = self.misp.search( |
|
controller='events', |
|
value=query, |
|
limit=max_results, |
|
pythonify=True |
|
) |
|
|
|
return [{ |
|
'id': event.uuid, |
|
'info': event.info, |
|
'analysis': event.analysis, |
|
'threat_level': event.threat_level_id, |
|
'date': event.date, |
|
'attributes': [ |
|
{'type': attr.type, 'value': attr.value} |
|
for attr in event.attributes |
|
] |
|
} for event in results] |
|
|
|
except Exception as e: |
|
return [{'error': f"Error in MISP search: {str(e)}"}] |
|
|
|
def forward(self, query: str, sources: List[str] = ['nvd'], max_results: int = 5) -> Dict[str, Any]: |
|
"""Process search across available sources""" |
|
results = {} |
|
available_sources = ['nvd'] |
|
|
|
|
|
if self.misp_available: |
|
available_sources.append('misp') |
|
|
|
|
|
active_sources = [s for s in sources if s in available_sources] |
|
|
|
if 'nvd' in active_sources: |
|
results['nvd'] = self.search_nvd(query, max_results) |
|
|
|
if 'misp' in active_sources and self.misp_available: |
|
results['misp'] = self.search_misp(query, max_results) |
|
|
|
return { |
|
'query': query, |
|
'sources': active_sources, |
|
'available_sources': available_sources, |
|
'results': results |
|
} |