Draken007's picture
Upload 7228 files
2a0bc63 verified
# Copyright DataStax, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
def initialize_registry(insights_registry):
# This will be called from the cluster module, so we put all this behavior
# in a function to avoid circular imports
if insights_registry.initialized:
return False
from cassandra import ConsistencyLevel
from cassandra.cluster import (
ExecutionProfile, GraphExecutionProfile,
ProfileManager, ContinuousPagingOptions,
EXEC_PROFILE_DEFAULT, EXEC_PROFILE_GRAPH_DEFAULT,
EXEC_PROFILE_GRAPH_ANALYTICS_DEFAULT,
EXEC_PROFILE_GRAPH_SYSTEM_DEFAULT,
_NOT_SET
)
from cassandra.datastax.graph import GraphOptions
from cassandra.datastax.insights.registry import insights_registry
from cassandra.datastax.insights.util import namespace
from cassandra.policies import (
RoundRobinPolicy,
DCAwareRoundRobinPolicy,
TokenAwarePolicy,
WhiteListRoundRobinPolicy,
HostFilterPolicy,
ConstantReconnectionPolicy,
ExponentialReconnectionPolicy,
RetryPolicy,
SpeculativeExecutionPolicy,
ConstantSpeculativeExecutionPolicy,
WrapperPolicy
)
import logging
log = logging.getLogger(__name__)
@insights_registry.register_serializer_for(RoundRobinPolicy)
def round_robin_policy_insights_serializer(policy):
return {'type': policy.__class__.__name__,
'namespace': namespace(policy.__class__),
'options': {}}
@insights_registry.register_serializer_for(DCAwareRoundRobinPolicy)
def dc_aware_round_robin_policy_insights_serializer(policy):
return {'type': policy.__class__.__name__,
'namespace': namespace(policy.__class__),
'options': {'local_dc': policy.local_dc,
'used_hosts_per_remote_dc': policy.used_hosts_per_remote_dc}
}
@insights_registry.register_serializer_for(TokenAwarePolicy)
def token_aware_policy_insights_serializer(policy):
return {'type': policy.__class__.__name__,
'namespace': namespace(policy.__class__),
'options': {'child_policy': insights_registry.serialize(policy._child_policy,
policy=True),
'shuffle_replicas': policy.shuffle_replicas}
}
@insights_registry.register_serializer_for(WhiteListRoundRobinPolicy)
def whitelist_round_robin_policy_insights_serializer(policy):
return {'type': policy.__class__.__name__,
'namespace': namespace(policy.__class__),
'options': {'allowed_hosts': policy._allowed_hosts}
}
@insights_registry.register_serializer_for(HostFilterPolicy)
def host_filter_policy_insights_serializer(policy):
return {
'type': policy.__class__.__name__,
'namespace': namespace(policy.__class__),
'options': {'child_policy': insights_registry.serialize(policy._child_policy,
policy=True),
'predicate': policy.predicate.__name__}
}
@insights_registry.register_serializer_for(ConstantReconnectionPolicy)
def constant_reconnection_policy_insights_serializer(policy):
return {'type': policy.__class__.__name__,
'namespace': namespace(policy.__class__),
'options': {'delay': policy.delay,
'max_attempts': policy.max_attempts}
}
@insights_registry.register_serializer_for(ExponentialReconnectionPolicy)
def exponential_reconnection_policy_insights_serializer(policy):
return {'type': policy.__class__.__name__,
'namespace': namespace(policy.__class__),
'options': {'base_delay': policy.base_delay,
'max_delay': policy.max_delay,
'max_attempts': policy.max_attempts}
}
@insights_registry.register_serializer_for(RetryPolicy)
def retry_policy_insights_serializer(policy):
return {'type': policy.__class__.__name__,
'namespace': namespace(policy.__class__),
'options': {}}
@insights_registry.register_serializer_for(SpeculativeExecutionPolicy)
def speculative_execution_policy_insights_serializer(policy):
return {'type': policy.__class__.__name__,
'namespace': namespace(policy.__class__),
'options': {}}
@insights_registry.register_serializer_for(ConstantSpeculativeExecutionPolicy)
def constant_speculative_execution_policy_insights_serializer(policy):
return {'type': policy.__class__.__name__,
'namespace': namespace(policy.__class__),
'options': {'delay': policy.delay,
'max_attempts': policy.max_attempts}
}
@insights_registry.register_serializer_for(WrapperPolicy)
def wrapper_policy_insights_serializer(policy):
return {'type': policy.__class__.__name__,
'namespace': namespace(policy.__class__),
'options': {
'child_policy': insights_registry.serialize(policy._child_policy,
policy=True)
}}
@insights_registry.register_serializer_for(ExecutionProfile)
def execution_profile_insights_serializer(profile):
return {
'loadBalancing': insights_registry.serialize(profile.load_balancing_policy,
policy=True),
'retry': insights_registry.serialize(profile.retry_policy,
policy=True),
'readTimeout': profile.request_timeout,
'consistency': ConsistencyLevel.value_to_name.get(profile.consistency_level, None),
'serialConsistency': ConsistencyLevel.value_to_name.get(profile.serial_consistency_level, None),
'continuousPagingOptions': (insights_registry.serialize(profile.continuous_paging_options)
if (profile.continuous_paging_options is not None and
profile.continuous_paging_options is not _NOT_SET) else
None),
'speculativeExecution': insights_registry.serialize(profile.speculative_execution_policy),
'graphOptions': None
}
@insights_registry.register_serializer_for(GraphExecutionProfile)
def graph_execution_profile_insights_serializer(profile):
rv = insights_registry.serialize(profile, cls=ExecutionProfile)
rv['graphOptions'] = insights_registry.serialize(profile.graph_options)
return rv
_EXEC_PROFILE_DEFAULT_KEYS = (EXEC_PROFILE_DEFAULT,
EXEC_PROFILE_GRAPH_DEFAULT,
EXEC_PROFILE_GRAPH_SYSTEM_DEFAULT,
EXEC_PROFILE_GRAPH_ANALYTICS_DEFAULT)
@insights_registry.register_serializer_for(ProfileManager)
def profile_manager_insights_serializer(manager):
defaults = {
# Insights's expected default
'default': insights_registry.serialize(manager.profiles[EXEC_PROFILE_DEFAULT]),
# remaining named defaults for driver's defaults, including duplicated default
'EXEC_PROFILE_DEFAULT': insights_registry.serialize(manager.profiles[EXEC_PROFILE_DEFAULT]),
'EXEC_PROFILE_GRAPH_DEFAULT': insights_registry.serialize(manager.profiles[EXEC_PROFILE_GRAPH_DEFAULT]),
'EXEC_PROFILE_GRAPH_SYSTEM_DEFAULT': insights_registry.serialize(
manager.profiles[EXEC_PROFILE_GRAPH_SYSTEM_DEFAULT]
),
'EXEC_PROFILE_GRAPH_ANALYTICS_DEFAULT': insights_registry.serialize(
manager.profiles[EXEC_PROFILE_GRAPH_ANALYTICS_DEFAULT]
)
}
other = {
key: insights_registry.serialize(value)
for key, value in manager.profiles.items()
if key not in _EXEC_PROFILE_DEFAULT_KEYS
}
overlapping_keys = set(defaults) & set(other)
if overlapping_keys:
log.debug('The following key names overlap default key sentinel keys '
'and these non-default EPs will not be displayed in Insights '
': {}'.format(list(overlapping_keys)))
other.update(defaults)
return other
@insights_registry.register_serializer_for(GraphOptions)
def graph_options_insights_serializer(options):
rv = {
'source': options.graph_source,
'language': options.graph_language,
'graphProtocol': options.graph_protocol
}
updates = {k: v.decode('utf-8') for k, v in rv.items()
if isinstance(v, bytes)}
rv.update(updates)
return rv
@insights_registry.register_serializer_for(ContinuousPagingOptions)
def continuous_paging_options_insights_serializer(paging_options):
return {
'page_unit': paging_options.page_unit,
'max_pages': paging_options.max_pages,
'max_pages_per_second': paging_options.max_pages_per_second,
'max_queue_size': paging_options.max_queue_size
}
insights_registry.initialized = True
return True