Spaces:
Runtime error
Runtime error
| # Copyright DataStax, Inc. | |
| # | |
| # Licensed under the Apache License, Version 2.0 (the "License"); | |
| # you may not use this file except in compliance with the License. | |
| # You may obtain a copy of the License at | |
| # | |
| # http://www.apache.org/licenses/LICENSE-2.0 | |
| # | |
| # Unless required by applicable law or agreed to in writing, software | |
| # distributed under the License is distributed on an "AS IS" BASIS, | |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
| # See the License for the specific language governing permissions and | |
| # limitations under the License. | |
| from collections import Counter | |
| import datetime | |
| import json | |
| import logging | |
| import multiprocessing | |
| import random | |
| import platform | |
| import socket | |
| import ssl | |
| import sys | |
| from threading import Event, Thread | |
| import time | |
| from cassandra.policies import HostDistance | |
| from cassandra.util import ms_timestamp_from_datetime | |
| from cassandra.datastax.insights.registry import insights_registry | |
| from cassandra.datastax.insights.serializers import initialize_registry | |
| log = logging.getLogger(__name__) | |
| class MonitorReporter(Thread): | |
| def __init__(self, interval_sec, session): | |
| """ | |
| takes an int indicating interval between requests, a function returning | |
| the connection to be used, and the timeout per request | |
| """ | |
| # Thread is an old-style class so we can't super() | |
| Thread.__init__(self, name='monitor_reporter') | |
| initialize_registry(insights_registry) | |
| self._interval, self._session = interval_sec, session | |
| self._shutdown_event = Event() | |
| self.daemon = True | |
| self.start() | |
| def run(self): | |
| self._send_via_rpc(self._get_startup_data()) | |
| # introduce some jitter -- send up to 1/10 of _interval early | |
| self._shutdown_event.wait(self._interval * random.uniform(.9, 1)) | |
| while not self._shutdown_event.is_set(): | |
| start_time = time.time() | |
| self._send_via_rpc(self._get_status_data()) | |
| elapsed = time.time() - start_time | |
| self._shutdown_event.wait(max(self._interval - elapsed, 0.01)) | |
| # TODO: redundant with ConnectionHeartbeat.ShutdownException | |
| class ShutDownException(Exception): | |
| pass | |
| def _send_via_rpc(self, data): | |
| try: | |
| self._session.execute( | |
| "CALL InsightsRpc.reportInsight(%s)", (json.dumps(data),) | |
| ) | |
| log.debug('Insights RPC data: {}'.format(data)) | |
| except Exception as e: | |
| log.debug('Insights RPC send failed with {}'.format(e)) | |
| log.debug('Insights RPC data: {}'.format(data)) | |
| def _get_status_data(self): | |
| cc = self._session.cluster.control_connection | |
| connected_nodes = { | |
| host.address: { | |
| 'connections': state['open_count'], | |
| 'inFlightQueries': state['in_flights'] | |
| } | |
| for (host, state) in self._session.get_pool_state().items() | |
| } | |
| return { | |
| 'metadata': { | |
| # shared across drivers; never change | |
| 'name': 'driver.status', | |
| # format version | |
| 'insightMappingId': 'v1', | |
| 'insightType': 'EVENT', | |
| # since epoch | |
| 'timestamp': ms_timestamp_from_datetime(datetime.datetime.utcnow()), | |
| 'tags': { | |
| 'language': 'python' | |
| } | |
| }, | |
| # // 'clientId', 'sessionId' and 'controlConnection' are mandatory | |
| # // the rest of the properties are optional | |
| 'data': { | |
| # // 'clientId' must be the same as the one provided in the startup message | |
| 'clientId': str(self._session.cluster.client_id), | |
| # // 'sessionId' must be the same as the one provided in the startup message | |
| 'sessionId': str(self._session.session_id), | |
| 'controlConnection': cc._connection.host if cc._connection else None, | |
| 'connectedNodes': connected_nodes | |
| } | |
| } | |
| def _get_startup_data(self): | |
| cc = self._session.cluster.control_connection | |
| try: | |
| local_ipaddr = cc._connection._socket.getsockname()[0] | |
| except Exception as e: | |
| local_ipaddr = None | |
| log.debug('Unable to get local socket addr from {}: {}'.format(cc._connection, e)) | |
| hostname = socket.getfqdn() | |
| host_distances_counter = Counter( | |
| self._session.cluster.profile_manager.distance(host) | |
| for host in self._session.hosts | |
| ) | |
| host_distances_dict = { | |
| 'local': host_distances_counter[HostDistance.LOCAL], | |
| 'remote': host_distances_counter[HostDistance.REMOTE], | |
| 'ignored': host_distances_counter[HostDistance.IGNORED] | |
| } | |
| try: | |
| compression_type = cc._connection._compression_type | |
| except AttributeError: | |
| compression_type = 'NONE' | |
| cert_validation = None | |
| try: | |
| if self._session.cluster.ssl_context: | |
| if isinstance(self._session.cluster.ssl_context, ssl.SSLContext): | |
| cert_validation = self._session.cluster.ssl_context.verify_mode == ssl.CERT_REQUIRED | |
| else: # pyopenssl | |
| from OpenSSL import SSL | |
| cert_validation = self._session.cluster.ssl_context.get_verify_mode() != SSL.VERIFY_NONE | |
| elif self._session.cluster.ssl_options: | |
| cert_validation = self._session.cluster.ssl_options.get('cert_reqs') == ssl.CERT_REQUIRED | |
| except Exception as e: | |
| log.debug('Unable to get the cert validation: {}'.format(e)) | |
| uname_info = platform.uname() | |
| return { | |
| 'metadata': { | |
| 'name': 'driver.startup', | |
| 'insightMappingId': 'v1', | |
| 'insightType': 'EVENT', | |
| 'timestamp': ms_timestamp_from_datetime(datetime.datetime.utcnow()), | |
| 'tags': { | |
| 'language': 'python' | |
| }, | |
| }, | |
| 'data': { | |
| 'driverName': 'DataStax Python Driver', | |
| 'driverVersion': sys.modules['cassandra'].__version__, | |
| 'clientId': str(self._session.cluster.client_id), | |
| 'sessionId': str(self._session.session_id), | |
| 'applicationName': self._session.cluster.application_name or 'python', | |
| 'applicationNameWasGenerated': not self._session.cluster.application_name, | |
| 'applicationVersion': self._session.cluster.application_version, | |
| 'contactPoints': self._session.cluster._endpoint_map_for_insights, | |
| 'dataCenters': list(set(h.datacenter for h in self._session.cluster.metadata.all_hosts() | |
| if (h.datacenter and | |
| self._session.cluster.profile_manager.distance(h) == HostDistance.LOCAL))), | |
| 'initialControlConnection': cc._connection.host if cc._connection else None, | |
| 'protocolVersion': self._session.cluster.protocol_version, | |
| 'localAddress': local_ipaddr, | |
| 'hostName': hostname, | |
| 'executionProfiles': insights_registry.serialize(self._session.cluster.profile_manager), | |
| 'configuredConnectionLength': host_distances_dict, | |
| 'heartbeatInterval': self._session.cluster.idle_heartbeat_interval, | |
| 'compression': compression_type.upper() if compression_type else 'NONE', | |
| 'reconnectionPolicy': insights_registry.serialize(self._session.cluster.reconnection_policy), | |
| 'sslConfigured': { | |
| 'enabled': bool(self._session.cluster.ssl_options or self._session.cluster.ssl_context), | |
| 'certValidation': cert_validation | |
| }, | |
| 'authProvider': { | |
| 'type': (self._session.cluster.auth_provider.__class__.__name__ | |
| if self._session.cluster.auth_provider else | |
| None) | |
| }, | |
| 'otherOptions': { | |
| }, | |
| 'platformInfo': { | |
| 'os': { | |
| 'name': uname_info.system, | |
| 'version': uname_info.release, | |
| 'arch': uname_info.machine | |
| }, | |
| 'cpus': { | |
| 'length': multiprocessing.cpu_count(), | |
| 'model': platform.processor() | |
| }, | |
| 'runtime': { | |
| 'python': sys.version, | |
| 'event_loop': self._session.cluster.connection_class.__name__ | |
| } | |
| }, | |
| 'periodicStatusInterval': self._interval | |
| } | |
| } | |
| def stop(self): | |
| log.debug("Shutting down Monitor Reporter") | |
| self._shutdown_event.set() | |
| self.join() | |