File size: 4,936 Bytes
eea2f4b
 
 
 
 
a28b847
 
 
 
 
 
 
eea2f4b
 
a28b847
eea2f4b
5a945fb
 
 
 
 
 
 
 
 
 
 
 
 
eea2f4b
5a945fb
eea2f4b
 
a28b847
 
eea2f4b
 
 
a28b847
 
 
 
 
 
 
 
 
 
 
 
 
 
 
eea2f4b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
a28b847
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
eea2f4b
 
 
a28b847
 
eea2f4b
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
import os
from typing import Dict, Any, Optional, List
import nvdlib
from smolagents.tools import Tool

# Optional MISP import
try:
    from pymisp import PyMISP
    MISP_AVAILABLE = True
except ImportError:
    MISP_AVAILABLE = False

class VulnerabilitySearchTool(Tool):
    name = "vuln_search"
    description = "Search for vulnerabilities in security databases (NVD, MISP if configured)"
    inputs = {
        'query': {'type': 'string', 'description': 'Search term or CVE ID'},
        'sources': {
            'type': 'array', 
            'description': 'List of sources to query (nvd, misp)', 
            'default': ['nvd'],
            'nullable': True
        },
        'max_results': {
            'type': 'integer', 
            'description': 'Maximum number of results per source', 
            'default': 5,
            'nullable': True
        }
    }
    output_type = "object"

    def __init__(self):
        """Initialize API connections"""
        # NVD Configuration
        self.nvd_api_key = os.getenv('NVD_API_KEY')
        if self.nvd_api_key:
            nvdlib.set_api_key(self.nvd_api_key)
        
        # MISP Configuration (optional)
        self.misp = None
        self.misp_available = False
        
        if MISP_AVAILABLE:
            misp_url = os.getenv('MISP_URL')
            misp_key = os.getenv('MISP_KEY')
            
            if misp_url and misp_key:
                try:
                    self.misp = PyMISP(misp_url, misp_key, ssl=False)
                    self.misp_available = True
                except Exception as e:
                    print(f"MISP initialization failed: {e}")

    def search_nvd(self, query: str, max_results: int) -> List[Dict[str, Any]]:
        """Search vulnerabilities in NVD"""
        try:
            # If query looks like a CVE-ID, search directly
            if query.startswith('CVE-'):
                results = nvdlib.get_cve(query)
                return [{
                    'id': results.id,
                    'description': results.descriptions[0].value,
                    'severity': results.metrics.cvssMetricV31[0].cvssData.baseScore if results.metrics else None,
                    'published': results.published,
                    'references': [ref.url for ref in results.references]
                }]
            
            # Otherwise, perform general search
            results = nvdlib.searchCVE(
                keyword=query,
                limit=max_results
            )
            
            return [{
                'id': r.id,
                'description': r.descriptions[0].value,
                'severity': r.metrics.cvssMetricV31[0].cvssData.baseScore if r.metrics else None,
                'published': r.published,
                'references': [ref.url for ref in r.references]
            } for r in results]
        
        except Exception as e:
            return [{'error': f"Error in NVD search: {str(e)}"}]

    def search_misp(self, query: str, max_results: int) -> List[Dict[str, Any]]:
        """Search vulnerabilities in MISP (if available)"""
        if not self.misp_available:
            return [{'error': 'MISP is not configured or not available'}]
        
        try:
            # Search related events
            results = self.misp.search(
                controller='events',
                value=query,
                limit=max_results,
                pythonify=True
            )
            
            return [{
                'id': event.uuid,
                'info': event.info,
                'analysis': event.analysis,
                'threat_level': event.threat_level_id,
                'date': event.date,
                'attributes': [
                    {'type': attr.type, 'value': attr.value}
                    for attr in event.attributes
                ]
            } for event in results]
        
        except Exception as e:
            return [{'error': f"Error in MISP search: {str(e)}"}]

    def forward(self, query: str, sources: List[str] = ['nvd'], max_results: int = 5) -> Dict[str, Any]:
        """Process search across available sources"""
        results = {}
        available_sources = ['nvd']
        
        # Add MISP to available sources if configured
        if self.misp_available:
            available_sources.append('misp')
        
        # Filter requested sources to only use available ones
        active_sources = [s for s in sources if s in available_sources]
        
        if 'nvd' in active_sources:
            results['nvd'] = self.search_nvd(query, max_results)
        
        if 'misp' in active_sources and self.misp_available:
            results['misp'] = self.search_misp(query, max_results)
        
        return {
            'query': query,
            'sources': active_sources,
            'available_sources': available_sources,
            'results': results
        }