daqc's picture
Update config
5a945fb
import os
from typing import Dict, Any, Optional, List
import nvdlib
from smolagents.tools import Tool
# Optional MISP import
try:
from pymisp import PyMISP
MISP_AVAILABLE = True
except ImportError:
MISP_AVAILABLE = False
class VulnerabilitySearchTool(Tool):
name = "vuln_search"
description = "Search for vulnerabilities in security databases (NVD, MISP if configured)"
inputs = {
'query': {'type': 'string', 'description': 'Search term or CVE ID'},
'sources': {
'type': 'array',
'description': 'List of sources to query (nvd, misp)',
'default': ['nvd'],
'nullable': True
},
'max_results': {
'type': 'integer',
'description': 'Maximum number of results per source',
'default': 5,
'nullable': True
}
}
output_type = "object"
def __init__(self):
"""Initialize API connections"""
# NVD Configuration
self.nvd_api_key = os.getenv('NVD_API_KEY')
if self.nvd_api_key:
nvdlib.set_api_key(self.nvd_api_key)
# MISP Configuration (optional)
self.misp = None
self.misp_available = False
if MISP_AVAILABLE:
misp_url = os.getenv('MISP_URL')
misp_key = os.getenv('MISP_KEY')
if misp_url and misp_key:
try:
self.misp = PyMISP(misp_url, misp_key, ssl=False)
self.misp_available = True
except Exception as e:
print(f"MISP initialization failed: {e}")
def search_nvd(self, query: str, max_results: int) -> List[Dict[str, Any]]:
"""Search vulnerabilities in NVD"""
try:
# If query looks like a CVE-ID, search directly
if query.startswith('CVE-'):
results = nvdlib.get_cve(query)
return [{
'id': results.id,
'description': results.descriptions[0].value,
'severity': results.metrics.cvssMetricV31[0].cvssData.baseScore if results.metrics else None,
'published': results.published,
'references': [ref.url for ref in results.references]
}]
# Otherwise, perform general search
results = nvdlib.searchCVE(
keyword=query,
limit=max_results
)
return [{
'id': r.id,
'description': r.descriptions[0].value,
'severity': r.metrics.cvssMetricV31[0].cvssData.baseScore if r.metrics else None,
'published': r.published,
'references': [ref.url for ref in r.references]
} for r in results]
except Exception as e:
return [{'error': f"Error in NVD search: {str(e)}"}]
def search_misp(self, query: str, max_results: int) -> List[Dict[str, Any]]:
"""Search vulnerabilities in MISP (if available)"""
if not self.misp_available:
return [{'error': 'MISP is not configured or not available'}]
try:
# Search related events
results = self.misp.search(
controller='events',
value=query,
limit=max_results,
pythonify=True
)
return [{
'id': event.uuid,
'info': event.info,
'analysis': event.analysis,
'threat_level': event.threat_level_id,
'date': event.date,
'attributes': [
{'type': attr.type, 'value': attr.value}
for attr in event.attributes
]
} for event in results]
except Exception as e:
return [{'error': f"Error in MISP search: {str(e)}"}]
def forward(self, query: str, sources: List[str] = ['nvd'], max_results: int = 5) -> Dict[str, Any]:
"""Process search across available sources"""
results = {}
available_sources = ['nvd']
# Add MISP to available sources if configured
if self.misp_available:
available_sources.append('misp')
# Filter requested sources to only use available ones
active_sources = [s for s in sources if s in available_sources]
if 'nvd' in active_sources:
results['nvd'] = self.search_nvd(query, max_results)
if 'misp' in active_sources and self.misp_available:
results['misp'] = self.search_misp(query, max_results)
return {
'query': query,
'sources': active_sources,
'available_sources': available_sources,
'results': results
}