import os from typing import Dict, Any, Optional, List import nvdlib from smolagents.tools import Tool # Optional MISP import try: from pymisp import PyMISP MISP_AVAILABLE = True except ImportError: MISP_AVAILABLE = False class VulnerabilitySearchTool(Tool): name = "vuln_search" description = "Search for vulnerabilities in security databases (NVD, MISP if configured)" inputs = { 'query': {'type': 'string', 'description': 'Search term or CVE ID'}, 'sources': { 'type': 'array', 'description': 'List of sources to query (nvd, misp)', 'default': ['nvd'], 'nullable': True }, 'max_results': { 'type': 'integer', 'description': 'Maximum number of results per source', 'default': 5, 'nullable': True } } output_type = "object" def __init__(self): """Initialize API connections""" # NVD Configuration self.nvd_api_key = os.getenv('NVD_API_KEY') if self.nvd_api_key: nvdlib.set_api_key(self.nvd_api_key) # MISP Configuration (optional) self.misp = None self.misp_available = False if MISP_AVAILABLE: misp_url = os.getenv('MISP_URL') misp_key = os.getenv('MISP_KEY') if misp_url and misp_key: try: self.misp = PyMISP(misp_url, misp_key, ssl=False) self.misp_available = True except Exception as e: print(f"MISP initialization failed: {e}") def search_nvd(self, query: str, max_results: int) -> List[Dict[str, Any]]: """Search vulnerabilities in NVD""" try: # If query looks like a CVE-ID, search directly if query.startswith('CVE-'): results = nvdlib.get_cve(query) return [{ 'id': results.id, 'description': results.descriptions[0].value, 'severity': results.metrics.cvssMetricV31[0].cvssData.baseScore if results.metrics else None, 'published': results.published, 'references': [ref.url for ref in results.references] }] # Otherwise, perform general search results = nvdlib.searchCVE( keyword=query, limit=max_results ) return [{ 'id': r.id, 'description': r.descriptions[0].value, 'severity': r.metrics.cvssMetricV31[0].cvssData.baseScore if r.metrics else None, 'published': r.published, 'references': [ref.url for ref in r.references] } for r in results] except Exception as e: return [{'error': f"Error in NVD search: {str(e)}"}] def search_misp(self, query: str, max_results: int) -> List[Dict[str, Any]]: """Search vulnerabilities in MISP (if available)""" if not self.misp_available: return [{'error': 'MISP is not configured or not available'}] try: # Search related events results = self.misp.search( controller='events', value=query, limit=max_results, pythonify=True ) return [{ 'id': event.uuid, 'info': event.info, 'analysis': event.analysis, 'threat_level': event.threat_level_id, 'date': event.date, 'attributes': [ {'type': attr.type, 'value': attr.value} for attr in event.attributes ] } for event in results] except Exception as e: return [{'error': f"Error in MISP search: {str(e)}"}] def forward(self, query: str, sources: List[str] = ['nvd'], max_results: int = 5) -> Dict[str, Any]: """Process search across available sources""" results = {} available_sources = ['nvd'] # Add MISP to available sources if configured if self.misp_available: available_sources.append('misp') # Filter requested sources to only use available ones active_sources = [s for s in sources if s in available_sources] if 'nvd' in active_sources: results['nvd'] = self.search_nvd(query, max_results) if 'misp' in active_sources and self.misp_available: results['misp'] = self.search_misp(query, max_results) return { 'query': query, 'sources': active_sources, 'available_sources': available_sources, 'results': results }