File size: 4,936 Bytes
eea2f4b a28b847 eea2f4b a28b847 eea2f4b 5a945fb eea2f4b 5a945fb eea2f4b a28b847 eea2f4b a28b847 eea2f4b a28b847 eea2f4b a28b847 eea2f4b |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 |
import os
from typing import Dict, Any, Optional, List
import nvdlib
from smolagents.tools import Tool
# Optional MISP import
try:
from pymisp import PyMISP
MISP_AVAILABLE = True
except ImportError:
MISP_AVAILABLE = False
class VulnerabilitySearchTool(Tool):
name = "vuln_search"
description = "Search for vulnerabilities in security databases (NVD, MISP if configured)"
inputs = {
'query': {'type': 'string', 'description': 'Search term or CVE ID'},
'sources': {
'type': 'array',
'description': 'List of sources to query (nvd, misp)',
'default': ['nvd'],
'nullable': True
},
'max_results': {
'type': 'integer',
'description': 'Maximum number of results per source',
'default': 5,
'nullable': True
}
}
output_type = "object"
def __init__(self):
"""Initialize API connections"""
# NVD Configuration
self.nvd_api_key = os.getenv('NVD_API_KEY')
if self.nvd_api_key:
nvdlib.set_api_key(self.nvd_api_key)
# MISP Configuration (optional)
self.misp = None
self.misp_available = False
if MISP_AVAILABLE:
misp_url = os.getenv('MISP_URL')
misp_key = os.getenv('MISP_KEY')
if misp_url and misp_key:
try:
self.misp = PyMISP(misp_url, misp_key, ssl=False)
self.misp_available = True
except Exception as e:
print(f"MISP initialization failed: {e}")
def search_nvd(self, query: str, max_results: int) -> List[Dict[str, Any]]:
"""Search vulnerabilities in NVD"""
try:
# If query looks like a CVE-ID, search directly
if query.startswith('CVE-'):
results = nvdlib.get_cve(query)
return [{
'id': results.id,
'description': results.descriptions[0].value,
'severity': results.metrics.cvssMetricV31[0].cvssData.baseScore if results.metrics else None,
'published': results.published,
'references': [ref.url for ref in results.references]
}]
# Otherwise, perform general search
results = nvdlib.searchCVE(
keyword=query,
limit=max_results
)
return [{
'id': r.id,
'description': r.descriptions[0].value,
'severity': r.metrics.cvssMetricV31[0].cvssData.baseScore if r.metrics else None,
'published': r.published,
'references': [ref.url for ref in r.references]
} for r in results]
except Exception as e:
return [{'error': f"Error in NVD search: {str(e)}"}]
def search_misp(self, query: str, max_results: int) -> List[Dict[str, Any]]:
"""Search vulnerabilities in MISP (if available)"""
if not self.misp_available:
return [{'error': 'MISP is not configured or not available'}]
try:
# Search related events
results = self.misp.search(
controller='events',
value=query,
limit=max_results,
pythonify=True
)
return [{
'id': event.uuid,
'info': event.info,
'analysis': event.analysis,
'threat_level': event.threat_level_id,
'date': event.date,
'attributes': [
{'type': attr.type, 'value': attr.value}
for attr in event.attributes
]
} for event in results]
except Exception as e:
return [{'error': f"Error in MISP search: {str(e)}"}]
def forward(self, query: str, sources: List[str] = ['nvd'], max_results: int = 5) -> Dict[str, Any]:
"""Process search across available sources"""
results = {}
available_sources = ['nvd']
# Add MISP to available sources if configured
if self.misp_available:
available_sources.append('misp')
# Filter requested sources to only use available ones
active_sources = [s for s in sources if s in available_sources]
if 'nvd' in active_sources:
results['nvd'] = self.search_nvd(query, max_results)
if 'misp' in active_sources and self.misp_available:
results['misp'] = self.search_misp(query, max_results)
return {
'query': query,
'sources': active_sources,
'available_sources': available_sources,
'results': results
} |