Spaces:
Runtime error
Runtime error
| # Copyright DataStax, Inc. | |
| # | |
| # Licensed under the Apache License, Version 2.0 (the "License"); | |
| # you may not use this file except in compliance with the License. | |
| # You may obtain a copy of the License at | |
| # | |
| # http://www.apache.org/licenses/LICENSE-2.0 | |
| # | |
| # Unless required by applicable law or agreed to in writing, software | |
| # distributed under the License is distributed on an "AS IS" BASIS, | |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
| # See the License for the specific language governing permissions and | |
| # limitations under the License. | |
| from binascii import unhexlify | |
| from bisect import bisect_left | |
| from collections import defaultdict | |
| from collections.abc import Mapping | |
| from functools import total_ordering | |
| from hashlib import md5 | |
| import json | |
| import logging | |
| import re | |
| import sys | |
| from threading import RLock | |
| import struct | |
| import random | |
| murmur3 = None | |
| try: | |
| from cassandra.murmur3 import murmur3 | |
| except ImportError as e: | |
| pass | |
| from cassandra import SignatureDescriptor, ConsistencyLevel, InvalidRequest, Unauthorized | |
| import cassandra.cqltypes as types | |
| from cassandra.encoder import Encoder | |
| from cassandra.marshal import varint_unpack | |
| from cassandra.protocol import QueryMessage | |
| from cassandra.query import dict_factory, bind_params | |
| from cassandra.util import OrderedDict, Version | |
| from cassandra.pool import HostDistance | |
| from cassandra.connection import EndPoint | |
| log = logging.getLogger(__name__) | |
| cql_keywords = set(( | |
| 'add', 'aggregate', 'all', 'allow', 'alter', 'and', 'apply', 'as', 'asc', 'ascii', 'authorize', 'batch', 'begin', | |
| 'bigint', 'blob', 'boolean', 'by', 'called', 'clustering', 'columnfamily', 'compact', 'contains', 'count', | |
| 'counter', 'create', 'custom', 'date', 'decimal', 'default', 'delete', 'desc', 'describe', 'deterministic', 'distinct', 'double', 'drop', | |
| 'entries', 'execute', 'exists', 'filtering', 'finalfunc', 'float', 'from', 'frozen', 'full', 'function', | |
| 'functions', 'grant', 'if', 'in', 'index', 'inet', 'infinity', 'initcond', 'input', 'insert', 'int', 'into', 'is', 'json', | |
| 'key', 'keys', 'keyspace', 'keyspaces', 'language', 'limit', 'list', 'login', 'map', 'materialized', 'mbean', 'mbeans', 'modify', 'monotonic', | |
| 'nan', 'nologin', 'norecursive', 'nosuperuser', 'not', 'null', 'of', 'on', 'options', 'or', 'order', 'password', 'permission', | |
| 'permissions', 'primary', 'rename', 'replace', 'returns', 'revoke', 'role', 'roles', 'schema', 'select', 'set', | |
| 'sfunc', 'smallint', 'static', 'storage', 'stype', 'superuser', 'table', 'text', 'time', 'timestamp', 'timeuuid', | |
| 'tinyint', 'to', 'token', 'trigger', 'truncate', 'ttl', 'tuple', 'type', 'unlogged', 'unset', 'update', 'use', 'user', | |
| 'users', 'using', 'uuid', 'values', 'varchar', 'varint', 'view', 'where', 'with', 'writetime', | |
| # DSE specifics | |
| "node", "nodes", "plan", "active", "application", "applications", "java", "executor", "executors", "std_out", "std_err", | |
| "renew", "delegation", "no", "redact", "token", "lowercasestring", "cluster", "authentication", "schemes", "scheme", | |
| "internal", "ldap", "kerberos", "remote", "object", "method", "call", "calls", "search", "schema", "config", "rows", | |
| "columns", "profiles", "commit", "reload", "rebuild", "field", "workpool", "any", "submission", "indices", | |
| "restrict", "unrestrict" | |
| )) | |
| """ | |
| Set of keywords in CQL. | |
| Derived from .../cassandra/src/java/org/apache/cassandra/cql3/Cql.g | |
| """ | |
| cql_keywords_unreserved = set(( | |
| 'aggregate', 'all', 'as', 'ascii', 'bigint', 'blob', 'boolean', 'called', 'clustering', 'compact', 'contains', | |
| 'count', 'counter', 'custom', 'date', 'decimal', 'deterministic', 'distinct', 'double', 'exists', 'filtering', 'finalfunc', 'float', | |
| 'frozen', 'function', 'functions', 'inet', 'initcond', 'input', 'int', 'json', 'key', 'keys', 'keyspaces', | |
| 'language', 'list', 'login', 'map', 'monotonic', 'nologin', 'nosuperuser', 'options', 'password', 'permission', 'permissions', | |
| 'returns', 'role', 'roles', 'sfunc', 'smallint', 'static', 'storage', 'stype', 'superuser', 'text', 'time', | |
| 'timestamp', 'timeuuid', 'tinyint', 'trigger', 'ttl', 'tuple', 'type', 'user', 'users', 'uuid', 'values', 'varchar', | |
| 'varint', 'writetime' | |
| )) | |
| """ | |
| Set of unreserved keywords in CQL. | |
| Derived from .../cassandra/src/java/org/apache/cassandra/cql3/Cql.g | |
| """ | |
| cql_keywords_reserved = cql_keywords - cql_keywords_unreserved | |
| """ | |
| Set of reserved keywords in CQL. | |
| """ | |
| _encoder = Encoder() | |
| class Metadata(object): | |
| """ | |
| Holds a representation of the cluster schema and topology. | |
| """ | |
| cluster_name = None | |
| """ The string name of the cluster. """ | |
| keyspaces = None | |
| """ | |
| A map from keyspace names to matching :class:`~.KeyspaceMetadata` instances. | |
| """ | |
| partitioner = None | |
| """ | |
| The string name of the partitioner for the cluster. | |
| """ | |
| token_map = None | |
| """ A :class:`~.TokenMap` instance describing the ring topology. """ | |
| dbaas = False | |
| """ A boolean indicating if connected to a DBaaS cluster """ | |
| def __init__(self): | |
| self.keyspaces = {} | |
| self.dbaas = False | |
| self._hosts = {} | |
| self._hosts_lock = RLock() | |
| def export_schema_as_string(self): | |
| """ | |
| Returns a string that can be executed as a query in order to recreate | |
| the entire schema. The string is formatted to be human readable. | |
| """ | |
| return "\n\n".join(ks.export_as_string() for ks in self.keyspaces.values()) | |
| def refresh(self, connection, timeout, target_type=None, change_type=None, **kwargs): | |
| server_version = self.get_host(connection.endpoint).release_version | |
| dse_version = self.get_host(connection.endpoint).dse_version | |
| parser = get_schema_parser(connection, server_version, dse_version, timeout) | |
| if not target_type: | |
| self._rebuild_all(parser) | |
| return | |
| tt_lower = target_type.lower() | |
| try: | |
| parse_method = getattr(parser, 'get_' + tt_lower) | |
| meta = parse_method(self.keyspaces, **kwargs) | |
| if meta: | |
| update_method = getattr(self, '_update_' + tt_lower) | |
| if tt_lower == 'keyspace' and connection.protocol_version < 3: | |
| # we didn't have 'type' target in legacy protocol versions, so we need to query those too | |
| user_types = parser.get_types_map(self.keyspaces, **kwargs) | |
| self._update_keyspace(meta, user_types) | |
| else: | |
| update_method(meta) | |
| else: | |
| drop_method = getattr(self, '_drop_' + tt_lower) | |
| drop_method(**kwargs) | |
| except AttributeError: | |
| raise ValueError("Unknown schema target_type: '%s'" % target_type) | |
| def _rebuild_all(self, parser): | |
| current_keyspaces = set() | |
| for keyspace_meta in parser.get_all_keyspaces(): | |
| current_keyspaces.add(keyspace_meta.name) | |
| old_keyspace_meta = self.keyspaces.get(keyspace_meta.name, None) | |
| self.keyspaces[keyspace_meta.name] = keyspace_meta | |
| if old_keyspace_meta: | |
| self._keyspace_updated(keyspace_meta.name) | |
| else: | |
| self._keyspace_added(keyspace_meta.name) | |
| # remove not-just-added keyspaces | |
| removed_keyspaces = [name for name in self.keyspaces.keys() | |
| if name not in current_keyspaces] | |
| self.keyspaces = dict((name, meta) for name, meta in self.keyspaces.items() | |
| if name in current_keyspaces) | |
| for ksname in removed_keyspaces: | |
| self._keyspace_removed(ksname) | |
| def _update_keyspace(self, keyspace_meta, new_user_types=None): | |
| ks_name = keyspace_meta.name | |
| old_keyspace_meta = self.keyspaces.get(ks_name, None) | |
| self.keyspaces[ks_name] = keyspace_meta | |
| if old_keyspace_meta: | |
| keyspace_meta.tables = old_keyspace_meta.tables | |
| keyspace_meta.user_types = new_user_types if new_user_types is not None else old_keyspace_meta.user_types | |
| keyspace_meta.indexes = old_keyspace_meta.indexes | |
| keyspace_meta.functions = old_keyspace_meta.functions | |
| keyspace_meta.aggregates = old_keyspace_meta.aggregates | |
| keyspace_meta.views = old_keyspace_meta.views | |
| if (keyspace_meta.replication_strategy != old_keyspace_meta.replication_strategy): | |
| self._keyspace_updated(ks_name) | |
| else: | |
| self._keyspace_added(ks_name) | |
| def _drop_keyspace(self, keyspace): | |
| if self.keyspaces.pop(keyspace, None): | |
| self._keyspace_removed(keyspace) | |
| def _update_table(self, meta): | |
| try: | |
| keyspace_meta = self.keyspaces[meta.keyspace_name] | |
| # this is unfortunate, but protocol v4 does not differentiate | |
| # between events for tables and views. <parser>.get_table will | |
| # return one or the other based on the query results. | |
| # Here we deal with that. | |
| if isinstance(meta, TableMetadata): | |
| keyspace_meta._add_table_metadata(meta) | |
| else: | |
| keyspace_meta._add_view_metadata(meta) | |
| except KeyError: | |
| # can happen if keyspace disappears while processing async event | |
| pass | |
| def _drop_table(self, keyspace, table): | |
| try: | |
| keyspace_meta = self.keyspaces[keyspace] | |
| keyspace_meta._drop_table_metadata(table) # handles either table or view | |
| except KeyError: | |
| # can happen if keyspace disappears while processing async event | |
| pass | |
| def _update_type(self, type_meta): | |
| try: | |
| self.keyspaces[type_meta.keyspace].user_types[type_meta.name] = type_meta | |
| except KeyError: | |
| # can happen if keyspace disappears while processing async event | |
| pass | |
| def _drop_type(self, keyspace, type): | |
| try: | |
| self.keyspaces[keyspace].user_types.pop(type, None) | |
| except KeyError: | |
| # can happen if keyspace disappears while processing async event | |
| pass | |
| def _update_function(self, function_meta): | |
| try: | |
| self.keyspaces[function_meta.keyspace].functions[function_meta.signature] = function_meta | |
| except KeyError: | |
| # can happen if keyspace disappears while processing async event | |
| pass | |
| def _drop_function(self, keyspace, function): | |
| try: | |
| self.keyspaces[keyspace].functions.pop(function.signature, None) | |
| except KeyError: | |
| pass | |
| def _update_aggregate(self, aggregate_meta): | |
| try: | |
| self.keyspaces[aggregate_meta.keyspace].aggregates[aggregate_meta.signature] = aggregate_meta | |
| except KeyError: | |
| pass | |
| def _drop_aggregate(self, keyspace, aggregate): | |
| try: | |
| self.keyspaces[keyspace].aggregates.pop(aggregate.signature, None) | |
| except KeyError: | |
| pass | |
| def _keyspace_added(self, ksname): | |
| if self.token_map: | |
| self.token_map.rebuild_keyspace(ksname, build_if_absent=False) | |
| def _keyspace_updated(self, ksname): | |
| if self.token_map: | |
| self.token_map.rebuild_keyspace(ksname, build_if_absent=False) | |
| def _keyspace_removed(self, ksname): | |
| if self.token_map: | |
| self.token_map.remove_keyspace(ksname) | |
| def rebuild_token_map(self, partitioner, token_map): | |
| """ | |
| Rebuild our view of the topology from fresh rows from the | |
| system topology tables. | |
| For internal use only. | |
| """ | |
| self.partitioner = partitioner | |
| if partitioner.endswith('RandomPartitioner'): | |
| token_class = MD5Token | |
| elif partitioner.endswith('Murmur3Partitioner'): | |
| token_class = Murmur3Token | |
| elif partitioner.endswith('ByteOrderedPartitioner'): | |
| token_class = BytesToken | |
| else: | |
| self.token_map = None | |
| return | |
| token_to_host_owner = {} | |
| ring = [] | |
| for host, token_strings in token_map.items(): | |
| for token_string in token_strings: | |
| token = token_class.from_string(token_string) | |
| ring.append(token) | |
| token_to_host_owner[token] = host | |
| all_tokens = sorted(ring) | |
| self.token_map = TokenMap( | |
| token_class, token_to_host_owner, all_tokens, self) | |
| def get_replicas(self, keyspace, key): | |
| """ | |
| Returns a list of :class:`.Host` instances that are replicas for a given | |
| partition key. | |
| """ | |
| t = self.token_map | |
| if not t: | |
| return [] | |
| try: | |
| return t.get_replicas(keyspace, t.token_class.from_key(key)) | |
| except NoMurmur3: | |
| return [] | |
| def can_support_partitioner(self): | |
| if self.partitioner.endswith('Murmur3Partitioner') and murmur3 is None: | |
| return False | |
| else: | |
| return True | |
| def add_or_return_host(self, host): | |
| """ | |
| Returns a tuple (host, new), where ``host`` is a Host | |
| instance, and ``new`` is a bool indicating whether | |
| the host was newly added. | |
| """ | |
| with self._hosts_lock: | |
| try: | |
| return self._hosts[host.endpoint], False | |
| except KeyError: | |
| self._hosts[host.endpoint] = host | |
| return host, True | |
| def remove_host(self, host): | |
| with self._hosts_lock: | |
| return bool(self._hosts.pop(host.endpoint, False)) | |
| def get_host(self, endpoint_or_address, port=None): | |
| """ | |
| Find a host in the metadata for a specific endpoint. If a string inet address and port are passed, | |
| iterate all hosts to match the :attr:`~.pool.Host.broadcast_rpc_address` and | |
| :attr:`~.pool.Host.broadcast_rpc_port`attributes. | |
| """ | |
| if not isinstance(endpoint_or_address, EndPoint): | |
| return self._get_host_by_address(endpoint_or_address, port) | |
| return self._hosts.get(endpoint_or_address) | |
| def _get_host_by_address(self, address, port=None): | |
| for host in self._hosts.values(): | |
| if (host.broadcast_rpc_address == address and | |
| (port is None or host.broadcast_rpc_port is None or host.broadcast_rpc_port == port)): | |
| return host | |
| return None | |
| def all_hosts(self): | |
| """ | |
| Returns a list of all known :class:`.Host` instances in the cluster. | |
| """ | |
| with self._hosts_lock: | |
| return list(self._hosts.values()) | |
| REPLICATION_STRATEGY_CLASS_PREFIX = "org.apache.cassandra.locator." | |
| def trim_if_startswith(s, prefix): | |
| if s.startswith(prefix): | |
| return s[len(prefix):] | |
| return s | |
| _replication_strategies = {} | |
| class ReplicationStrategyTypeType(type): | |
| def __new__(metacls, name, bases, dct): | |
| dct.setdefault('name', name) | |
| cls = type.__new__(metacls, name, bases, dct) | |
| if not name.startswith('_'): | |
| _replication_strategies[name] = cls | |
| return cls | |
| class _ReplicationStrategy(object, metaclass=ReplicationStrategyTypeType): | |
| options_map = None | |
| def create(cls, strategy_class, options_map): | |
| if not strategy_class: | |
| return None | |
| strategy_name = trim_if_startswith(strategy_class, REPLICATION_STRATEGY_CLASS_PREFIX) | |
| rs_class = _replication_strategies.get(strategy_name, None) | |
| if rs_class is None: | |
| rs_class = _UnknownStrategyBuilder(strategy_name) | |
| _replication_strategies[strategy_name] = rs_class | |
| try: | |
| rs_instance = rs_class(options_map) | |
| except Exception as exc: | |
| log.warning("Failed creating %s with options %s: %s", strategy_name, options_map, exc) | |
| return None | |
| return rs_instance | |
| def make_token_replica_map(self, token_to_host_owner, ring): | |
| raise NotImplementedError() | |
| def export_for_schema(self): | |
| raise NotImplementedError() | |
| ReplicationStrategy = _ReplicationStrategy | |
| class _UnknownStrategyBuilder(object): | |
| def __init__(self, name): | |
| self.name = name | |
| def __call__(self, options_map): | |
| strategy_instance = _UnknownStrategy(self.name, options_map) | |
| return strategy_instance | |
| class _UnknownStrategy(ReplicationStrategy): | |
| def __init__(self, name, options_map): | |
| self.name = name | |
| self.options_map = options_map.copy() if options_map is not None else dict() | |
| self.options_map['class'] = self.name | |
| def __eq__(self, other): | |
| return (isinstance(other, _UnknownStrategy) and | |
| self.name == other.name and | |
| self.options_map == other.options_map) | |
| def export_for_schema(self): | |
| """ | |
| Returns a string version of these replication options which are | |
| suitable for use in a CREATE KEYSPACE statement. | |
| """ | |
| if self.options_map: | |
| return dict((str(key), str(value)) for key, value in self.options_map.items()) | |
| return "{'class': '%s'}" % (self.name, ) | |
| def make_token_replica_map(self, token_to_host_owner, ring): | |
| return {} | |
| class ReplicationFactor(object): | |
| """ | |
| Represent the replication factor of a keyspace. | |
| """ | |
| all_replicas = None | |
| """ | |
| The number of total replicas. | |
| """ | |
| full_replicas = None | |
| """ | |
| The number of replicas that own a full copy of the data. This is the same | |
| than `all_replicas` when transient replication is not enabled. | |
| """ | |
| transient_replicas = None | |
| """ | |
| The number of transient replicas. | |
| Only set if the keyspace has transient replication enabled. | |
| """ | |
| def __init__(self, all_replicas, transient_replicas=None): | |
| self.all_replicas = all_replicas | |
| self.transient_replicas = transient_replicas | |
| self.full_replicas = (all_replicas - transient_replicas) if transient_replicas else all_replicas | |
| def create(rf): | |
| """ | |
| Given the inputted replication factor string, parse and return the ReplicationFactor instance. | |
| """ | |
| transient_replicas = None | |
| try: | |
| all_replicas = int(rf) | |
| except ValueError: | |
| try: | |
| rf = rf.split('/') | |
| all_replicas, transient_replicas = int(rf[0]), int(rf[1]) | |
| except Exception: | |
| raise ValueError("Unable to determine replication factor from: {}".format(rf)) | |
| return ReplicationFactor(all_replicas, transient_replicas) | |
| def __str__(self): | |
| return ("%d/%d" % (self.all_replicas, self.transient_replicas) if self.transient_replicas | |
| else "%d" % self.all_replicas) | |
| def __eq__(self, other): | |
| if not isinstance(other, ReplicationFactor): | |
| return False | |
| return self.all_replicas == other.all_replicas and self.full_replicas == other.full_replicas | |
| class SimpleStrategy(ReplicationStrategy): | |
| replication_factor_info = None | |
| """ | |
| A :class:`cassandra.metadata.ReplicationFactor` instance. | |
| """ | |
| def replication_factor(self): | |
| """ | |
| The replication factor for this keyspace. | |
| For backward compatibility, this returns the | |
| :attr:`cassandra.metadata.ReplicationFactor.full_replicas` value of | |
| :attr:`cassandra.metadata.SimpleStrategy.replication_factor_info`. | |
| """ | |
| return self.replication_factor_info.full_replicas | |
| def __init__(self, options_map): | |
| self.replication_factor_info = ReplicationFactor.create(options_map['replication_factor']) | |
| def make_token_replica_map(self, token_to_host_owner, ring): | |
| replica_map = {} | |
| for i in range(len(ring)): | |
| j, hosts = 0, list() | |
| while len(hosts) < self.replication_factor and j < len(ring): | |
| token = ring[(i + j) % len(ring)] | |
| host = token_to_host_owner[token] | |
| if host not in hosts: | |
| hosts.append(host) | |
| j += 1 | |
| replica_map[ring[i]] = hosts | |
| return replica_map | |
| def export_for_schema(self): | |
| """ | |
| Returns a string version of these replication options which are | |
| suitable for use in a CREATE KEYSPACE statement. | |
| """ | |
| return "{'class': 'SimpleStrategy', 'replication_factor': '%s'}" \ | |
| % (str(self.replication_factor_info),) | |
| def __eq__(self, other): | |
| if not isinstance(other, SimpleStrategy): | |
| return False | |
| return str(self.replication_factor_info) == str(other.replication_factor_info) | |
| class NetworkTopologyStrategy(ReplicationStrategy): | |
| dc_replication_factors_info = None | |
| """ | |
| A map of datacenter names to the :class:`cassandra.metadata.ReplicationFactor` instance for that DC. | |
| """ | |
| dc_replication_factors = None | |
| """ | |
| A map of datacenter names to the replication factor for that DC. | |
| For backward compatibility, this maps to the :attr:`cassandra.metadata.ReplicationFactor.full_replicas` | |
| value of the :attr:`cassandra.metadata.NetworkTopologyStrategy.dc_replication_factors_info` dict. | |
| """ | |
| def __init__(self, dc_replication_factors): | |
| self.dc_replication_factors_info = dict( | |
| (str(k), ReplicationFactor.create(v)) for k, v in dc_replication_factors.items()) | |
| self.dc_replication_factors = dict( | |
| (dc, rf.full_replicas) for dc, rf in self.dc_replication_factors_info.items()) | |
| def make_token_replica_map(self, token_to_host_owner, ring): | |
| dc_rf_map = dict( | |
| (dc, full_replicas) for dc, full_replicas in self.dc_replication_factors.items() | |
| if full_replicas > 0) | |
| # build a map of DCs to lists of indexes into `ring` for tokens that | |
| # belong to that DC | |
| dc_to_token_offset = defaultdict(list) | |
| dc_racks = defaultdict(set) | |
| hosts_per_dc = defaultdict(set) | |
| for i, token in enumerate(ring): | |
| host = token_to_host_owner[token] | |
| dc_to_token_offset[host.datacenter].append(i) | |
| if host.datacenter and host.rack: | |
| dc_racks[host.datacenter].add(host.rack) | |
| hosts_per_dc[host.datacenter].add(host) | |
| # A map of DCs to an index into the dc_to_token_offset value for that dc. | |
| # This is how we keep track of advancing around the ring for each DC. | |
| dc_to_current_index = defaultdict(int) | |
| replica_map = defaultdict(list) | |
| for i in range(len(ring)): | |
| replicas = replica_map[ring[i]] | |
| # go through each DC and find the replicas in that DC | |
| for dc in dc_to_token_offset.keys(): | |
| if dc not in dc_rf_map: | |
| continue | |
| # advance our per-DC index until we're up to at least the | |
| # current token in the ring | |
| token_offsets = dc_to_token_offset[dc] | |
| index = dc_to_current_index[dc] | |
| num_tokens = len(token_offsets) | |
| while index < num_tokens and token_offsets[index] < i: | |
| index += 1 | |
| dc_to_current_index[dc] = index | |
| replicas_remaining = dc_rf_map[dc] | |
| replicas_this_dc = 0 | |
| skipped_hosts = [] | |
| racks_placed = set() | |
| racks_this_dc = dc_racks[dc] | |
| hosts_this_dc = len(hosts_per_dc[dc]) | |
| for token_offset_index in range(index, index+num_tokens): | |
| if token_offset_index >= len(token_offsets): | |
| token_offset_index = token_offset_index - len(token_offsets) | |
| token_offset = token_offsets[token_offset_index] | |
| host = token_to_host_owner[ring[token_offset]] | |
| if replicas_remaining == 0 or replicas_this_dc == hosts_this_dc: | |
| break | |
| if host in replicas: | |
| continue | |
| if host.rack in racks_placed and len(racks_placed) < len(racks_this_dc): | |
| skipped_hosts.append(host) | |
| continue | |
| replicas.append(host) | |
| replicas_this_dc += 1 | |
| replicas_remaining -= 1 | |
| racks_placed.add(host.rack) | |
| if len(racks_placed) == len(racks_this_dc): | |
| for host in skipped_hosts: | |
| if replicas_remaining == 0: | |
| break | |
| replicas.append(host) | |
| replicas_remaining -= 1 | |
| del skipped_hosts[:] | |
| return replica_map | |
| def export_for_schema(self): | |
| """ | |
| Returns a string version of these replication options which are | |
| suitable for use in a CREATE KEYSPACE statement. | |
| """ | |
| ret = "{'class': 'NetworkTopologyStrategy'" | |
| for dc, rf in sorted(self.dc_replication_factors_info.items()): | |
| ret += ", '%s': '%s'" % (dc, str(rf)) | |
| return ret + "}" | |
| def __eq__(self, other): | |
| if not isinstance(other, NetworkTopologyStrategy): | |
| return False | |
| return self.dc_replication_factors_info == other.dc_replication_factors_info | |
| class LocalStrategy(ReplicationStrategy): | |
| def __init__(self, options_map): | |
| pass | |
| def make_token_replica_map(self, token_to_host_owner, ring): | |
| return {} | |
| def export_for_schema(self): | |
| """ | |
| Returns a string version of these replication options which are | |
| suitable for use in a CREATE KEYSPACE statement. | |
| """ | |
| return "{'class': 'LocalStrategy'}" | |
| def __eq__(self, other): | |
| return isinstance(other, LocalStrategy) | |
| class KeyspaceMetadata(object): | |
| """ | |
| A representation of the schema for a single keyspace. | |
| """ | |
| name = None | |
| """ The string name of the keyspace. """ | |
| durable_writes = True | |
| """ | |
| A boolean indicating whether durable writes are enabled for this keyspace | |
| or not. | |
| """ | |
| replication_strategy = None | |
| """ | |
| A :class:`.ReplicationStrategy` subclass object. | |
| """ | |
| tables = None | |
| """ | |
| A map from table names to instances of :class:`~.TableMetadata`. | |
| """ | |
| indexes = None | |
| """ | |
| A dict mapping index names to :class:`.IndexMetadata` instances. | |
| """ | |
| user_types = None | |
| """ | |
| A map from user-defined type names to instances of :class:`~cassandra.metadata.UserType`. | |
| .. versionadded:: 2.1.0 | |
| """ | |
| functions = None | |
| """ | |
| A map from user-defined function signatures to instances of :class:`~cassandra.metadata.Function`. | |
| .. versionadded:: 2.6.0 | |
| """ | |
| aggregates = None | |
| """ | |
| A map from user-defined aggregate signatures to instances of :class:`~cassandra.metadata.Aggregate`. | |
| .. versionadded:: 2.6.0 | |
| """ | |
| views = None | |
| """ | |
| A dict mapping view names to :class:`.MaterializedViewMetadata` instances. | |
| """ | |
| virtual = False | |
| """ | |
| A boolean indicating if this is a virtual keyspace or not. Always ``False`` | |
| for clusters running Cassandra pre-4.0 and DSE pre-6.7 versions. | |
| .. versionadded:: 3.15 | |
| """ | |
| graph_engine = None | |
| """ | |
| A string indicating whether a graph engine is enabled for this keyspace (Core/Classic). | |
| """ | |
| _exc_info = None | |
| """ set if metadata parsing failed """ | |
| def __init__(self, name, durable_writes, strategy_class, strategy_options, graph_engine=None): | |
| self.name = name | |
| self.durable_writes = durable_writes | |
| self.replication_strategy = ReplicationStrategy.create(strategy_class, strategy_options) | |
| self.tables = {} | |
| self.indexes = {} | |
| self.user_types = {} | |
| self.functions = {} | |
| self.aggregates = {} | |
| self.views = {} | |
| self.graph_engine = graph_engine | |
| def is_graph_enabled(self): | |
| return self.graph_engine is not None | |
| def export_as_string(self): | |
| """ | |
| Returns a CQL query string that can be used to recreate the entire keyspace, | |
| including user-defined types and tables. | |
| """ | |
| # Make sure tables with vertex are exported before tables with edges | |
| tables_with_vertex = [t for t in self.tables.values() if hasattr(t, 'vertex') and t.vertex] | |
| other_tables = [t for t in self.tables.values() if t not in tables_with_vertex] | |
| cql = "\n\n".join( | |
| [self.as_cql_query() + ';'] + | |
| self.user_type_strings() + | |
| [f.export_as_string() for f in self.functions.values()] + | |
| [a.export_as_string() for a in self.aggregates.values()] + | |
| [t.export_as_string() for t in tables_with_vertex + other_tables]) | |
| if self._exc_info: | |
| import traceback | |
| ret = "/*\nWarning: Keyspace %s is incomplete because of an error processing metadata.\n" % \ | |
| (self.name) | |
| for line in traceback.format_exception(*self._exc_info): | |
| ret += line | |
| ret += "\nApproximate structure, for reference:\n(this should not be used to reproduce this schema)\n\n%s\n*/" % cql | |
| return ret | |
| if self.virtual: | |
| return ("/*\nWarning: Keyspace {ks} is a virtual keyspace and cannot be recreated with CQL.\n" | |
| "Structure, for reference:*/\n" | |
| "{cql}\n" | |
| "").format(ks=self.name, cql=cql) | |
| return cql | |
| def as_cql_query(self): | |
| """ | |
| Returns a CQL query string that can be used to recreate just this keyspace, | |
| not including user-defined types and tables. | |
| """ | |
| if self.virtual: | |
| return "// VIRTUAL KEYSPACE {}".format(protect_name(self.name)) | |
| ret = "CREATE KEYSPACE %s WITH replication = %s " % ( | |
| protect_name(self.name), | |
| self.replication_strategy.export_for_schema()) | |
| ret = ret + (' AND durable_writes = %s' % ("true" if self.durable_writes else "false")) | |
| if self.graph_engine is not None: | |
| ret = ret + (" AND graph_engine = '%s'" % self.graph_engine) | |
| return ret | |
| def user_type_strings(self): | |
| user_type_strings = [] | |
| user_types = self.user_types.copy() | |
| keys = sorted(user_types.keys()) | |
| for k in keys: | |
| if k in user_types: | |
| self.resolve_user_types(k, user_types, user_type_strings) | |
| return user_type_strings | |
| def resolve_user_types(self, key, user_types, user_type_strings): | |
| user_type = user_types.pop(key) | |
| for type_name in user_type.field_types: | |
| for sub_type in types.cql_types_from_string(type_name): | |
| if sub_type in user_types: | |
| self.resolve_user_types(sub_type, user_types, user_type_strings) | |
| user_type_strings.append(user_type.export_as_string()) | |
| def _add_table_metadata(self, table_metadata): | |
| old_indexes = {} | |
| old_meta = self.tables.get(table_metadata.name, None) | |
| if old_meta: | |
| # views are not queried with table, so they must be transferred to new | |
| table_metadata.views = old_meta.views | |
| # indexes will be updated with what is on the new metadata | |
| old_indexes = old_meta.indexes | |
| # note the intentional order of add before remove | |
| # this makes sure the maps are never absent something that existed before this update | |
| for index_name, index_metadata in table_metadata.indexes.items(): | |
| self.indexes[index_name] = index_metadata | |
| for index_name in (n for n in old_indexes if n not in table_metadata.indexes): | |
| self.indexes.pop(index_name, None) | |
| self.tables[table_metadata.name] = table_metadata | |
| def _drop_table_metadata(self, table_name): | |
| table_meta = self.tables.pop(table_name, None) | |
| if table_meta: | |
| for index_name in table_meta.indexes: | |
| self.indexes.pop(index_name, None) | |
| for view_name in table_meta.views: | |
| self.views.pop(view_name, None) | |
| return | |
| # we can't tell table drops from views, so drop both | |
| # (name is unique among them, within a keyspace) | |
| view_meta = self.views.pop(table_name, None) | |
| if view_meta: | |
| try: | |
| self.tables[view_meta.base_table_name].views.pop(table_name, None) | |
| except KeyError: | |
| pass | |
| def _add_view_metadata(self, view_metadata): | |
| try: | |
| self.tables[view_metadata.base_table_name].views[view_metadata.name] = view_metadata | |
| self.views[view_metadata.name] = view_metadata | |
| except KeyError: | |
| pass | |
| class UserType(object): | |
| """ | |
| A user defined type, as created by ``CREATE TYPE`` statements. | |
| User-defined types were introduced in Cassandra 2.1. | |
| .. versionadded:: 2.1.0 | |
| """ | |
| keyspace = None | |
| """ | |
| The string name of the keyspace in which this type is defined. | |
| """ | |
| name = None | |
| """ | |
| The name of this type. | |
| """ | |
| field_names = None | |
| """ | |
| An ordered list of the names for each field in this user-defined type. | |
| """ | |
| field_types = None | |
| """ | |
| An ordered list of the types for each field in this user-defined type. | |
| """ | |
| def __init__(self, keyspace, name, field_names, field_types): | |
| self.keyspace = keyspace | |
| self.name = name | |
| # non-frozen collections can return None | |
| self.field_names = field_names or [] | |
| self.field_types = field_types or [] | |
| def as_cql_query(self, formatted=False): | |
| """ | |
| Returns a CQL query that can be used to recreate this type. | |
| If `formatted` is set to :const:`True`, extra whitespace will | |
| be added to make the query more readable. | |
| """ | |
| ret = "CREATE TYPE %s.%s (%s" % ( | |
| protect_name(self.keyspace), | |
| protect_name(self.name), | |
| "\n" if formatted else "") | |
| if formatted: | |
| field_join = ",\n" | |
| padding = " " | |
| else: | |
| field_join = ", " | |
| padding = "" | |
| fields = [] | |
| for field_name, field_type in zip(self.field_names, self.field_types): | |
| fields.append("%s %s" % (protect_name(field_name), field_type)) | |
| ret += field_join.join("%s%s" % (padding, field) for field in fields) | |
| ret += "\n)" if formatted else ")" | |
| return ret | |
| def export_as_string(self): | |
| return self.as_cql_query(formatted=True) + ';' | |
| class Aggregate(object): | |
| """ | |
| A user defined aggregate function, as created by ``CREATE AGGREGATE`` statements. | |
| Aggregate functions were introduced in Cassandra 2.2 | |
| .. versionadded:: 2.6.0 | |
| """ | |
| keyspace = None | |
| """ | |
| The string name of the keyspace in which this aggregate is defined | |
| """ | |
| name = None | |
| """ | |
| The name of this aggregate | |
| """ | |
| argument_types = None | |
| """ | |
| An ordered list of the types for each argument to the aggregate | |
| """ | |
| final_func = None | |
| """ | |
| Name of a final function | |
| """ | |
| initial_condition = None | |
| """ | |
| Initial condition of the aggregate | |
| """ | |
| return_type = None | |
| """ | |
| Return type of the aggregate | |
| """ | |
| state_func = None | |
| """ | |
| Name of a state function | |
| """ | |
| state_type = None | |
| """ | |
| Type of the aggregate state | |
| """ | |
| deterministic = None | |
| """ | |
| Flag indicating if this function is guaranteed to produce the same result | |
| for a particular input and state. This is available only with DSE >=6.0. | |
| """ | |
| def __init__(self, keyspace, name, argument_types, state_func, | |
| state_type, final_func, initial_condition, return_type, | |
| deterministic): | |
| self.keyspace = keyspace | |
| self.name = name | |
| self.argument_types = argument_types | |
| self.state_func = state_func | |
| self.state_type = state_type | |
| self.final_func = final_func | |
| self.initial_condition = initial_condition | |
| self.return_type = return_type | |
| self.deterministic = deterministic | |
| def as_cql_query(self, formatted=False): | |
| """ | |
| Returns a CQL query that can be used to recreate this aggregate. | |
| If `formatted` is set to :const:`True`, extra whitespace will | |
| be added to make the query more readable. | |
| """ | |
| sep = '\n ' if formatted else ' ' | |
| keyspace = protect_name(self.keyspace) | |
| name = protect_name(self.name) | |
| type_list = ', '.join([types.strip_frozen(arg_type) for arg_type in self.argument_types]) | |
| state_func = protect_name(self.state_func) | |
| state_type = types.strip_frozen(self.state_type) | |
| ret = "CREATE AGGREGATE %(keyspace)s.%(name)s(%(type_list)s)%(sep)s" \ | |
| "SFUNC %(state_func)s%(sep)s" \ | |
| "STYPE %(state_type)s" % locals() | |
| ret += ''.join((sep, 'FINALFUNC ', protect_name(self.final_func))) if self.final_func else '' | |
| ret += ''.join((sep, 'INITCOND ', self.initial_condition)) if self.initial_condition is not None else '' | |
| ret += '{}DETERMINISTIC'.format(sep) if self.deterministic else '' | |
| return ret | |
| def export_as_string(self): | |
| return self.as_cql_query(formatted=True) + ';' | |
| def signature(self): | |
| return SignatureDescriptor.format_signature(self.name, self.argument_types) | |
| class Function(object): | |
| """ | |
| A user defined function, as created by ``CREATE FUNCTION`` statements. | |
| User-defined functions were introduced in Cassandra 2.2 | |
| .. versionadded:: 2.6.0 | |
| """ | |
| keyspace = None | |
| """ | |
| The string name of the keyspace in which this function is defined | |
| """ | |
| name = None | |
| """ | |
| The name of this function | |
| """ | |
| argument_types = None | |
| """ | |
| An ordered list of the types for each argument to the function | |
| """ | |
| argument_names = None | |
| """ | |
| An ordered list of the names of each argument to the function | |
| """ | |
| return_type = None | |
| """ | |
| Return type of the function | |
| """ | |
| language = None | |
| """ | |
| Language of the function body | |
| """ | |
| body = None | |
| """ | |
| Function body string | |
| """ | |
| called_on_null_input = None | |
| """ | |
| Flag indicating whether this function should be called for rows with null values | |
| (convenience function to avoid handling nulls explicitly if the result will just be null) | |
| """ | |
| deterministic = None | |
| """ | |
| Flag indicating if this function is guaranteed to produce the same result | |
| for a particular input. This is available only for DSE >=6.0. | |
| """ | |
| monotonic = None | |
| """ | |
| Flag indicating if this function is guaranteed to increase or decrease | |
| monotonically on any of its arguments. This is available only for DSE >=6.0. | |
| """ | |
| monotonic_on = None | |
| """ | |
| A list containing the argument or arguments over which this function is | |
| monotonic. This is available only for DSE >=6.0. | |
| """ | |
| def __init__(self, keyspace, name, argument_types, argument_names, | |
| return_type, language, body, called_on_null_input, | |
| deterministic, monotonic, monotonic_on): | |
| self.keyspace = keyspace | |
| self.name = name | |
| self.argument_types = argument_types | |
| # argument_types (frozen<list<>>) will always be a list | |
| # argument_name is not frozen in C* < 3.0 and may return None | |
| self.argument_names = argument_names or [] | |
| self.return_type = return_type | |
| self.language = language | |
| self.body = body | |
| self.called_on_null_input = called_on_null_input | |
| self.deterministic = deterministic | |
| self.monotonic = monotonic | |
| self.monotonic_on = monotonic_on | |
| def as_cql_query(self, formatted=False): | |
| """ | |
| Returns a CQL query that can be used to recreate this function. | |
| If `formatted` is set to :const:`True`, extra whitespace will | |
| be added to make the query more readable. | |
| """ | |
| sep = '\n ' if formatted else ' ' | |
| keyspace = protect_name(self.keyspace) | |
| name = protect_name(self.name) | |
| arg_list = ', '.join(["%s %s" % (protect_name(n), types.strip_frozen(t)) | |
| for n, t in zip(self.argument_names, self.argument_types)]) | |
| typ = self.return_type | |
| lang = self.language | |
| body = self.body | |
| on_null = "CALLED" if self.called_on_null_input else "RETURNS NULL" | |
| deterministic_token = ('DETERMINISTIC{}'.format(sep) | |
| if self.deterministic else | |
| '') | |
| monotonic_tokens = '' # default for nonmonotonic function | |
| if self.monotonic: | |
| # monotonic on all arguments; ignore self.monotonic_on | |
| monotonic_tokens = 'MONOTONIC{}'.format(sep) | |
| elif self.monotonic_on: | |
| # if monotonic == False and monotonic_on is nonempty, we know that | |
| # monotonicity was specified with MONOTONIC ON <arg>, so there's | |
| # exactly 1 value there | |
| monotonic_tokens = 'MONOTONIC ON {}{}'.format(self.monotonic_on[0], | |
| sep) | |
| return "CREATE FUNCTION %(keyspace)s.%(name)s(%(arg_list)s)%(sep)s" \ | |
| "%(on_null)s ON NULL INPUT%(sep)s" \ | |
| "RETURNS %(typ)s%(sep)s" \ | |
| "%(deterministic_token)s" \ | |
| "%(monotonic_tokens)s" \ | |
| "LANGUAGE %(lang)s%(sep)s" \ | |
| "AS $$%(body)s$$" % locals() | |
| def export_as_string(self): | |
| return self.as_cql_query(formatted=True) + ';' | |
| def signature(self): | |
| return SignatureDescriptor.format_signature(self.name, self.argument_types) | |
| class TableMetadata(object): | |
| """ | |
| A representation of the schema for a single table. | |
| """ | |
| keyspace_name = None | |
| """ String name of this Table's keyspace """ | |
| name = None | |
| """ The string name of the table. """ | |
| partition_key = None | |
| """ | |
| A list of :class:`.ColumnMetadata` instances representing the columns in | |
| the partition key for this table. This will always hold at least one | |
| column. | |
| """ | |
| clustering_key = None | |
| """ | |
| A list of :class:`.ColumnMetadata` instances representing the columns | |
| in the clustering key for this table. These are all of the | |
| :attr:`.primary_key` columns that are not in the :attr:`.partition_key`. | |
| Note that a table may have no clustering keys, in which case this will | |
| be an empty list. | |
| """ | |
| def primary_key(self): | |
| """ | |
| A list of :class:`.ColumnMetadata` representing the components of | |
| the primary key for this table. | |
| """ | |
| return self.partition_key + self.clustering_key | |
| columns = None | |
| """ | |
| A dict mapping column names to :class:`.ColumnMetadata` instances. | |
| """ | |
| indexes = None | |
| """ | |
| A dict mapping index names to :class:`.IndexMetadata` instances. | |
| """ | |
| is_compact_storage = False | |
| options = None | |
| """ | |
| A dict mapping table option names to their specific settings for this | |
| table. | |
| """ | |
| compaction_options = { | |
| "min_compaction_threshold": "min_threshold", | |
| "max_compaction_threshold": "max_threshold", | |
| "compaction_strategy_class": "class"} | |
| triggers = None | |
| """ | |
| A dict mapping trigger names to :class:`.TriggerMetadata` instances. | |
| """ | |
| views = None | |
| """ | |
| A dict mapping view names to :class:`.MaterializedViewMetadata` instances. | |
| """ | |
| _exc_info = None | |
| """ set if metadata parsing failed """ | |
| virtual = False | |
| """ | |
| A boolean indicating if this is a virtual table or not. Always ``False`` | |
| for clusters running Cassandra pre-4.0 and DSE pre-6.7 versions. | |
| .. versionadded:: 3.15 | |
| """ | |
| def is_cql_compatible(self): | |
| """ | |
| A boolean indicating if this table can be represented as CQL in export | |
| """ | |
| if self.virtual: | |
| return False | |
| comparator = getattr(self, 'comparator', None) | |
| if comparator: | |
| # no compact storage with more than one column beyond PK if there | |
| # are clustering columns | |
| incompatible = (self.is_compact_storage and | |
| len(self.columns) > len(self.primary_key) + 1 and | |
| len(self.clustering_key) >= 1) | |
| return not incompatible | |
| return True | |
| extensions = None | |
| """ | |
| Metadata describing configuration for table extensions | |
| """ | |
| def __init__(self, keyspace_name, name, partition_key=None, clustering_key=None, columns=None, triggers=None, options=None, virtual=False): | |
| self.keyspace_name = keyspace_name | |
| self.name = name | |
| self.partition_key = [] if partition_key is None else partition_key | |
| self.clustering_key = [] if clustering_key is None else clustering_key | |
| self.columns = OrderedDict() if columns is None else columns | |
| self.indexes = {} | |
| self.options = {} if options is None else options | |
| self.comparator = None | |
| self.triggers = OrderedDict() if triggers is None else triggers | |
| self.views = {} | |
| self.virtual = virtual | |
| def export_as_string(self): | |
| """ | |
| Returns a string of CQL queries that can be used to recreate this table | |
| along with all indexes on it. The returned string is formatted to | |
| be human readable. | |
| """ | |
| if self._exc_info: | |
| import traceback | |
| ret = "/*\nWarning: Table %s.%s is incomplete because of an error processing metadata.\n" % \ | |
| (self.keyspace_name, self.name) | |
| for line in traceback.format_exception(*self._exc_info): | |
| ret += line | |
| ret += "\nApproximate structure, for reference:\n(this should not be used to reproduce this schema)\n\n%s\n*/" % self._all_as_cql() | |
| elif not self.is_cql_compatible: | |
| # If we can't produce this table with CQL, comment inline | |
| ret = "/*\nWarning: Table %s.%s omitted because it has constructs not compatible with CQL (was created via legacy API).\n" % \ | |
| (self.keyspace_name, self.name) | |
| ret += "\nApproximate structure, for reference:\n(this should not be used to reproduce this schema)\n\n%s\n*/" % self._all_as_cql() | |
| elif self.virtual: | |
| ret = ('/*\nWarning: Table {ks}.{tab} is a virtual table and cannot be recreated with CQL.\n' | |
| 'Structure, for reference:\n' | |
| '{cql}\n*/').format(ks=self.keyspace_name, tab=self.name, cql=self._all_as_cql()) | |
| else: | |
| ret = self._all_as_cql() | |
| return ret | |
| def _all_as_cql(self): | |
| ret = self.as_cql_query(formatted=True) | |
| ret += ";" | |
| for index in self.indexes.values(): | |
| ret += "\n%s;" % index.as_cql_query() | |
| for trigger_meta in self.triggers.values(): | |
| ret += "\n%s;" % (trigger_meta.as_cql_query(),) | |
| for view_meta in self.views.values(): | |
| ret += "\n\n%s;" % (view_meta.as_cql_query(formatted=True),) | |
| if self.extensions: | |
| registry = _RegisteredExtensionType._extension_registry | |
| for k in registry.keys() & self.extensions: # no viewkeys on OrderedMapSerializeKey | |
| ext = registry[k] | |
| cql = ext.after_table_cql(self, k, self.extensions[k]) | |
| if cql: | |
| ret += "\n\n%s" % (cql,) | |
| return ret | |
| def as_cql_query(self, formatted=False): | |
| """ | |
| Returns a CQL query that can be used to recreate this table (index | |
| creations are not included). If `formatted` is set to :const:`True`, | |
| extra whitespace will be added to make the query human readable. | |
| """ | |
| ret = "%s TABLE %s.%s (%s" % ( | |
| ('VIRTUAL' if self.virtual else 'CREATE'), | |
| protect_name(self.keyspace_name), | |
| protect_name(self.name), | |
| "\n" if formatted else "") | |
| if formatted: | |
| column_join = ",\n" | |
| padding = " " | |
| else: | |
| column_join = ", " | |
| padding = "" | |
| columns = [] | |
| for col in self.columns.values(): | |
| columns.append("%s %s%s" % (protect_name(col.name), col.cql_type, ' static' if col.is_static else '')) | |
| if len(self.partition_key) == 1 and not self.clustering_key: | |
| columns[0] += " PRIMARY KEY" | |
| ret += column_join.join("%s%s" % (padding, col) for col in columns) | |
| # primary key | |
| if len(self.partition_key) > 1 or self.clustering_key: | |
| ret += "%s%sPRIMARY KEY (" % (column_join, padding) | |
| if len(self.partition_key) > 1: | |
| ret += "(%s)" % ", ".join(protect_name(col.name) for col in self.partition_key) | |
| else: | |
| ret += protect_name(self.partition_key[0].name) | |
| if self.clustering_key: | |
| ret += ", %s" % ", ".join(protect_name(col.name) for col in self.clustering_key) | |
| ret += ")" | |
| # properties | |
| ret += "%s) WITH " % ("\n" if formatted else "") | |
| ret += self._property_string(formatted, self.clustering_key, self.options, self.is_compact_storage) | |
| return ret | |
| def _property_string(cls, formatted, clustering_key, options_map, is_compact_storage=False): | |
| properties = [] | |
| if is_compact_storage: | |
| properties.append("COMPACT STORAGE") | |
| if clustering_key: | |
| cluster_str = "CLUSTERING ORDER BY " | |
| inner = [] | |
| for col in clustering_key: | |
| ordering = "DESC" if col.is_reversed else "ASC" | |
| inner.append("%s %s" % (protect_name(col.name), ordering)) | |
| cluster_str += "(%s)" % ", ".join(inner) | |
| properties.append(cluster_str) | |
| properties.extend(cls._make_option_strings(options_map)) | |
| join_str = "\n AND " if formatted else " AND " | |
| return join_str.join(properties) | |
| def _make_option_strings(cls, options_map): | |
| ret = [] | |
| options_copy = dict(options_map.items()) | |
| actual_options = json.loads(options_copy.pop('compaction_strategy_options', '{}')) | |
| value = options_copy.pop("compaction_strategy_class", None) | |
| actual_options.setdefault("class", value) | |
| compaction_option_strings = ["'%s': '%s'" % (k, v) for k, v in actual_options.items()] | |
| ret.append('compaction = {%s}' % ', '.join(compaction_option_strings)) | |
| for system_table_name in cls.compaction_options.keys(): | |
| options_copy.pop(system_table_name, None) # delete if present | |
| options_copy.pop('compaction_strategy_option', None) | |
| if not options_copy.get('compression'): | |
| params = json.loads(options_copy.pop('compression_parameters', '{}')) | |
| param_strings = ["'%s': '%s'" % (k, v) for k, v in params.items()] | |
| ret.append('compression = {%s}' % ', '.join(param_strings)) | |
| for name, value in options_copy.items(): | |
| if value is not None: | |
| if name == "comment": | |
| value = value or "" | |
| ret.append("%s = %s" % (name, protect_value(value))) | |
| return list(sorted(ret)) | |
| class TableMetadataV3(TableMetadata): | |
| """ | |
| For C* 3.0+. `option_maps` take a superset of map names, so if nothing | |
| changes structurally, new option maps can just be appended to the list. | |
| """ | |
| compaction_options = {} | |
| option_maps = [ | |
| 'compaction', 'compression', 'caching', | |
| 'nodesync' # added DSE 6.0 | |
| ] | |
| def is_cql_compatible(self): | |
| return True | |
| def _make_option_strings(cls, options_map): | |
| ret = [] | |
| options_copy = dict(options_map.items()) | |
| for option in cls.option_maps: | |
| value = options_copy.get(option) | |
| if isinstance(value, Mapping): | |
| del options_copy[option] | |
| params = ("'%s': '%s'" % (k, v) for k, v in value.items()) | |
| ret.append("%s = {%s}" % (option, ', '.join(params))) | |
| for name, value in options_copy.items(): | |
| if value is not None: | |
| if name == "comment": | |
| value = value or "" | |
| ret.append("%s = %s" % (name, protect_value(value))) | |
| return list(sorted(ret)) | |
| class TableMetadataDSE68(TableMetadataV3): | |
| vertex = None | |
| """A :class:`.VertexMetadata` instance, if graph enabled""" | |
| edge = None | |
| """A :class:`.EdgeMetadata` instance, if graph enabled""" | |
| def as_cql_query(self, formatted=False): | |
| ret = super(TableMetadataDSE68, self).as_cql_query(formatted) | |
| if self.vertex: | |
| ret += " AND VERTEX LABEL %s" % protect_name(self.vertex.label_name) | |
| if self.edge: | |
| ret += " AND EDGE LABEL %s" % protect_name(self.edge.label_name) | |
| ret += self._export_edge_as_cql( | |
| self.edge.from_label, | |
| self.edge.from_partition_key_columns, | |
| self.edge.from_clustering_columns, "FROM") | |
| ret += self._export_edge_as_cql( | |
| self.edge.to_label, | |
| self.edge.to_partition_key_columns, | |
| self.edge.to_clustering_columns, "TO") | |
| return ret | |
| def _export_edge_as_cql(label_name, partition_keys, | |
| clustering_columns, keyword): | |
| ret = " %s %s(" % (keyword, protect_name(label_name)) | |
| if len(partition_keys) == 1: | |
| ret += protect_name(partition_keys[0]) | |
| else: | |
| ret += "(%s)" % ", ".join([protect_name(k) for k in partition_keys]) | |
| if clustering_columns: | |
| ret += ", %s" % ", ".join([protect_name(k) for k in clustering_columns]) | |
| ret += ")" | |
| return ret | |
| class TableExtensionInterface(object): | |
| """ | |
| Defines CQL/DDL for Cassandra table extensions. | |
| """ | |
| # limited API for now. Could be expanded as new extension types materialize -- "extend_option_strings", for example | |
| def after_table_cql(cls, ext_key, ext_blob): | |
| """ | |
| Called to produce CQL/DDL to follow the table definition. | |
| Should contain requisite terminating semicolon(s). | |
| """ | |
| pass | |
| class _RegisteredExtensionType(type): | |
| _extension_registry = {} | |
| def __new__(mcs, name, bases, dct): | |
| cls = super(_RegisteredExtensionType, mcs).__new__(mcs, name, bases, dct) | |
| if name != 'RegisteredTableExtension': | |
| mcs._extension_registry[cls.name] = cls | |
| return cls | |
| class RegisteredTableExtension(TableExtensionInterface, metaclass=_RegisteredExtensionType): | |
| """ | |
| Extending this class registers it by name (associated by key in the `system_schema.tables.extensions` map). | |
| """ | |
| name = None | |
| """ | |
| Name of the extension (key in the map) | |
| """ | |
| def protect_name(name): | |
| return maybe_escape_name(name) | |
| def protect_names(names): | |
| return [protect_name(n) for n in names] | |
| def protect_value(value): | |
| if value is None: | |
| return 'NULL' | |
| if isinstance(value, (int, float, bool)): | |
| return str(value).lower() | |
| return "'%s'" % value.replace("'", "''") | |
| valid_cql3_word_re = re.compile(r'^[a-z][0-9a-z_]*$') | |
| def is_valid_name(name): | |
| if name is None: | |
| return False | |
| if name.lower() in cql_keywords_reserved: | |
| return False | |
| return valid_cql3_word_re.match(name) is not None | |
| def maybe_escape_name(name): | |
| if is_valid_name(name): | |
| return name | |
| return escape_name(name) | |
| def escape_name(name): | |
| return '"%s"' % (name.replace('"', '""'),) | |
| class ColumnMetadata(object): | |
| """ | |
| A representation of a single column in a table. | |
| """ | |
| table = None | |
| """ The :class:`.TableMetadata` this column belongs to. """ | |
| name = None | |
| """ The string name of this column. """ | |
| cql_type = None | |
| """ | |
| The CQL type for the column. | |
| """ | |
| is_static = False | |
| """ | |
| If this column is static (available in Cassandra 2.1+), this will | |
| be :const:`True`, otherwise :const:`False`. | |
| """ | |
| is_reversed = False | |
| """ | |
| If this column is reversed (DESC) as in clustering order | |
| """ | |
| _cass_type = None | |
| def __init__(self, table_metadata, column_name, cql_type, is_static=False, is_reversed=False): | |
| self.table = table_metadata | |
| self.name = column_name | |
| self.cql_type = cql_type | |
| self.is_static = is_static | |
| self.is_reversed = is_reversed | |
| def __str__(self): | |
| return "%s %s" % (self.name, self.cql_type) | |
| class IndexMetadata(object): | |
| """ | |
| A representation of a secondary index on a column. | |
| """ | |
| keyspace_name = None | |
| """ A string name of the keyspace. """ | |
| table_name = None | |
| """ A string name of the table this index is on. """ | |
| name = None | |
| """ A string name for the index. """ | |
| kind = None | |
| """ A string representing the kind of index (COMPOSITE, CUSTOM,...). """ | |
| index_options = {} | |
| """ A dict of index options. """ | |
| def __init__(self, keyspace_name, table_name, index_name, kind, index_options): | |
| self.keyspace_name = keyspace_name | |
| self.table_name = table_name | |
| self.name = index_name | |
| self.kind = kind | |
| self.index_options = index_options | |
| def as_cql_query(self): | |
| """ | |
| Returns a CQL query that can be used to recreate this index. | |
| """ | |
| options = dict(self.index_options) | |
| index_target = options.pop("target") | |
| if self.kind != "CUSTOM": | |
| return "CREATE INDEX %s ON %s.%s (%s)" % ( | |
| protect_name(self.name), | |
| protect_name(self.keyspace_name), | |
| protect_name(self.table_name), | |
| index_target) | |
| else: | |
| class_name = options.pop("class_name") | |
| ret = "CREATE CUSTOM INDEX %s ON %s.%s (%s) USING '%s'" % ( | |
| protect_name(self.name), | |
| protect_name(self.keyspace_name), | |
| protect_name(self.table_name), | |
| index_target, | |
| class_name) | |
| if options: | |
| # PYTHON-1008: `ret` will always be a unicode | |
| opts_cql_encoded = _encoder.cql_encode_all_types(options, as_text_type=True) | |
| ret += " WITH OPTIONS = %s" % opts_cql_encoded | |
| return ret | |
| def export_as_string(self): | |
| """ | |
| Returns a CQL query string that can be used to recreate this index. | |
| """ | |
| return self.as_cql_query() + ';' | |
| class TokenMap(object): | |
| """ | |
| Information about the layout of the ring. | |
| """ | |
| token_class = None | |
| """ | |
| A subclass of :class:`.Token`, depending on what partitioner the cluster uses. | |
| """ | |
| token_to_host_owner = None | |
| """ | |
| A map of :class:`.Token` objects to the :class:`.Host` that owns that token. | |
| """ | |
| tokens_to_hosts_by_ks = None | |
| """ | |
| A map of keyspace names to a nested map of :class:`.Token` objects to | |
| sets of :class:`.Host` objects. | |
| """ | |
| ring = None | |
| """ | |
| An ordered list of :class:`.Token` instances in the ring. | |
| """ | |
| _metadata = None | |
| def __init__(self, token_class, token_to_host_owner, all_tokens, metadata): | |
| self.token_class = token_class | |
| self.ring = all_tokens | |
| self.token_to_host_owner = token_to_host_owner | |
| self.tokens_to_hosts_by_ks = {} | |
| self._metadata = metadata | |
| self._rebuild_lock = RLock() | |
| def rebuild_keyspace(self, keyspace, build_if_absent=False): | |
| with self._rebuild_lock: | |
| try: | |
| current = self.tokens_to_hosts_by_ks.get(keyspace, None) | |
| if (build_if_absent and current is None) or (not build_if_absent and current is not None): | |
| ks_meta = self._metadata.keyspaces.get(keyspace) | |
| if ks_meta: | |
| replica_map = self.replica_map_for_keyspace(self._metadata.keyspaces[keyspace]) | |
| self.tokens_to_hosts_by_ks[keyspace] = replica_map | |
| except Exception: | |
| # should not happen normally, but we don't want to blow up queries because of unexpected meta state | |
| # bypass until new map is generated | |
| self.tokens_to_hosts_by_ks[keyspace] = {} | |
| log.exception("Failed creating a token map for keyspace '%s' with %s. PLEASE REPORT THIS: https://datastax-oss.atlassian.net/projects/PYTHON", keyspace, self.token_to_host_owner) | |
| def replica_map_for_keyspace(self, ks_metadata): | |
| strategy = ks_metadata.replication_strategy | |
| if strategy: | |
| return strategy.make_token_replica_map(self.token_to_host_owner, self.ring) | |
| else: | |
| return None | |
| def remove_keyspace(self, keyspace): | |
| self.tokens_to_hosts_by_ks.pop(keyspace, None) | |
| def get_replicas(self, keyspace, token): | |
| """ | |
| Get a set of :class:`.Host` instances representing all of the | |
| replica nodes for a given :class:`.Token`. | |
| """ | |
| tokens_to_hosts = self.tokens_to_hosts_by_ks.get(keyspace, None) | |
| if tokens_to_hosts is None: | |
| self.rebuild_keyspace(keyspace, build_if_absent=True) | |
| tokens_to_hosts = self.tokens_to_hosts_by_ks.get(keyspace, None) | |
| if tokens_to_hosts: | |
| # The values in self.ring correspond to the end of the | |
| # token range up to and including the value listed. | |
| point = bisect_left(self.ring, token) | |
| if point == len(self.ring): | |
| return tokens_to_hosts[self.ring[0]] | |
| else: | |
| return tokens_to_hosts[self.ring[point]] | |
| return [] | |
| class Token(object): | |
| """ | |
| Abstract class representing a token. | |
| """ | |
| def __init__(self, token): | |
| self.value = token | |
| def hash_fn(cls, key): | |
| return key | |
| def from_key(cls, key): | |
| return cls(cls.hash_fn(key)) | |
| def from_string(cls, token_string): | |
| raise NotImplementedError() | |
| def __eq__(self, other): | |
| return self.value == other.value | |
| def __lt__(self, other): | |
| return self.value < other.value | |
| def __hash__(self): | |
| return hash(self.value) | |
| def __repr__(self): | |
| return "<%s: %s>" % (self.__class__.__name__, self.value) | |
| __str__ = __repr__ | |
| MIN_LONG = -(2 ** 63) | |
| MAX_LONG = (2 ** 63) - 1 | |
| class NoMurmur3(Exception): | |
| pass | |
| class HashToken(Token): | |
| def from_string(cls, token_string): | |
| """ `token_string` should be the string representation from the server. """ | |
| # The hash partitioners just store the deciman value | |
| return cls(int(token_string)) | |
| class Murmur3Token(HashToken): | |
| """ | |
| A token for ``Murmur3Partitioner``. | |
| """ | |
| def hash_fn(cls, key): | |
| if murmur3 is not None: | |
| h = int(murmur3(key)) | |
| return h if h != MIN_LONG else MAX_LONG | |
| else: | |
| raise NoMurmur3() | |
| def __init__(self, token): | |
| """ `token` is an int or string representing the token. """ | |
| self.value = int(token) | |
| class MD5Token(HashToken): | |
| """ | |
| A token for ``RandomPartitioner``. | |
| """ | |
| def hash_fn(cls, key): | |
| if isinstance(key, str): | |
| key = key.encode('UTF-8') | |
| return abs(varint_unpack(md5(key).digest())) | |
| class BytesToken(Token): | |
| """ | |
| A token for ``ByteOrderedPartitioner``. | |
| """ | |
| def from_string(cls, token_string): | |
| """ `token_string` should be the string representation from the server. """ | |
| # unhexlify works fine with unicode input in everythin but pypy3, where it Raises "TypeError: 'str' does not support the buffer interface" | |
| if isinstance(token_string, str): | |
| token_string = token_string.encode('ascii') | |
| # The BOP stores a hex string | |
| return cls(unhexlify(token_string)) | |
| class TriggerMetadata(object): | |
| """ | |
| A representation of a trigger for a table. | |
| """ | |
| table = None | |
| """ The :class:`.TableMetadata` this trigger belongs to. """ | |
| name = None | |
| """ The string name of this trigger. """ | |
| options = None | |
| """ | |
| A dict mapping trigger option names to their specific settings for this | |
| table. | |
| """ | |
| def __init__(self, table_metadata, trigger_name, options=None): | |
| self.table = table_metadata | |
| self.name = trigger_name | |
| self.options = options | |
| def as_cql_query(self): | |
| ret = "CREATE TRIGGER %s ON %s.%s USING %s" % ( | |
| protect_name(self.name), | |
| protect_name(self.table.keyspace_name), | |
| protect_name(self.table.name), | |
| protect_value(self.options['class']) | |
| ) | |
| return ret | |
| def export_as_string(self): | |
| return self.as_cql_query() + ';' | |
| class _SchemaParser(object): | |
| def __init__(self, connection, timeout): | |
| self.connection = connection | |
| self.timeout = timeout | |
| def _handle_results(self, success, result, expected_failures=tuple()): | |
| """ | |
| Given a bool and a ResultSet (the form returned per result from | |
| Connection.wait_for_responses), return a dictionary containing the | |
| results. Used to process results from asynchronous queries to system | |
| tables. | |
| ``expected_failures`` will usually be used to allow callers to ignore | |
| ``InvalidRequest`` errors caused by a missing system keyspace. For | |
| example, some DSE versions report a 4.X server version, but do not have | |
| virtual tables. Thus, running against 4.X servers, SchemaParserV4 uses | |
| expected_failures to make a best-effort attempt to read those | |
| keyspaces, but treat them as empty if they're not found. | |
| :param success: A boolean representing whether or not the query | |
| succeeded | |
| :param result: The resultset in question. | |
| :expected_failures: An Exception class or an iterable thereof. If the | |
| query failed, but raised an instance of an expected failure class, this | |
| will ignore the failure and return an empty list. | |
| """ | |
| if not success and isinstance(result, expected_failures): | |
| return [] | |
| elif success: | |
| return dict_factory(result.column_names, result.parsed_rows) if result else [] | |
| else: | |
| raise result | |
| def _query_build_row(self, query_string, build_func): | |
| result = self._query_build_rows(query_string, build_func) | |
| return result[0] if result else None | |
| def _query_build_rows(self, query_string, build_func): | |
| query = QueryMessage(query=query_string, consistency_level=ConsistencyLevel.ONE) | |
| responses = self.connection.wait_for_responses((query), timeout=self.timeout, fail_on_error=False) | |
| (success, response) = responses[0] | |
| if success: | |
| result = dict_factory(response.column_names, response.parsed_rows) | |
| return [build_func(row) for row in result] | |
| elif isinstance(response, InvalidRequest): | |
| log.debug("user types table not found") | |
| return [] | |
| else: | |
| raise response | |
| class SchemaParserV22(_SchemaParser): | |
| """ | |
| For C* 2.2+ | |
| """ | |
| _SELECT_KEYSPACES = "SELECT * FROM system.schema_keyspaces" | |
| _SELECT_COLUMN_FAMILIES = "SELECT * FROM system.schema_columnfamilies" | |
| _SELECT_COLUMNS = "SELECT * FROM system.schema_columns" | |
| _SELECT_TRIGGERS = "SELECT * FROM system.schema_triggers" | |
| _SELECT_TYPES = "SELECT * FROM system.schema_usertypes" | |
| _SELECT_FUNCTIONS = "SELECT * FROM system.schema_functions" | |
| _SELECT_AGGREGATES = "SELECT * FROM system.schema_aggregates" | |
| _table_name_col = 'columnfamily_name' | |
| _function_agg_arument_type_col = 'signature' | |
| recognized_table_options = ( | |
| "comment", | |
| "read_repair_chance", | |
| "dclocal_read_repair_chance", # kept to be safe, but see _build_table_options() | |
| "local_read_repair_chance", | |
| "replicate_on_write", | |
| "gc_grace_seconds", | |
| "bloom_filter_fp_chance", | |
| "caching", | |
| "compaction_strategy_class", | |
| "compaction_strategy_options", | |
| "min_compaction_threshold", | |
| "max_compaction_threshold", | |
| "compression_parameters", | |
| "min_index_interval", | |
| "max_index_interval", | |
| "index_interval", | |
| "speculative_retry", | |
| "rows_per_partition_to_cache", | |
| "memtable_flush_period_in_ms", | |
| "populate_io_cache_on_flush", | |
| "compression", | |
| "default_time_to_live") | |
| def __init__(self, connection, timeout): | |
| super(SchemaParserV22, self).__init__(connection, timeout) | |
| self.keyspaces_result = [] | |
| self.tables_result = [] | |
| self.columns_result = [] | |
| self.triggers_result = [] | |
| self.types_result = [] | |
| self.functions_result = [] | |
| self.aggregates_result = [] | |
| self.keyspace_table_rows = defaultdict(list) | |
| self.keyspace_table_col_rows = defaultdict(lambda: defaultdict(list)) | |
| self.keyspace_type_rows = defaultdict(list) | |
| self.keyspace_func_rows = defaultdict(list) | |
| self.keyspace_agg_rows = defaultdict(list) | |
| self.keyspace_table_trigger_rows = defaultdict(lambda: defaultdict(list)) | |
| def get_all_keyspaces(self): | |
| self._query_all() | |
| for row in self.keyspaces_result: | |
| keyspace_meta = self._build_keyspace_metadata(row) | |
| try: | |
| for table_row in self.keyspace_table_rows.get(keyspace_meta.name, []): | |
| table_meta = self._build_table_metadata(table_row) | |
| keyspace_meta._add_table_metadata(table_meta) | |
| for usertype_row in self.keyspace_type_rows.get(keyspace_meta.name, []): | |
| usertype = self._build_user_type(usertype_row) | |
| keyspace_meta.user_types[usertype.name] = usertype | |
| for fn_row in self.keyspace_func_rows.get(keyspace_meta.name, []): | |
| fn = self._build_function(fn_row) | |
| keyspace_meta.functions[fn.signature] = fn | |
| for agg_row in self.keyspace_agg_rows.get(keyspace_meta.name, []): | |
| agg = self._build_aggregate(agg_row) | |
| keyspace_meta.aggregates[agg.signature] = agg | |
| except Exception: | |
| log.exception("Error while parsing metadata for keyspace %s. Metadata model will be incomplete.", keyspace_meta.name) | |
| keyspace_meta._exc_info = sys.exc_info() | |
| yield keyspace_meta | |
| def get_table(self, keyspaces, keyspace, table): | |
| cl = ConsistencyLevel.ONE | |
| where_clause = bind_params(" WHERE keyspace_name = %%s AND %s = %%s" % (self._table_name_col,), (keyspace, table), _encoder) | |
| cf_query = QueryMessage(query=self._SELECT_COLUMN_FAMILIES + where_clause, consistency_level=cl) | |
| col_query = QueryMessage(query=self._SELECT_COLUMNS + where_clause, consistency_level=cl) | |
| triggers_query = QueryMessage(query=self._SELECT_TRIGGERS + where_clause, consistency_level=cl) | |
| (cf_success, cf_result), (col_success, col_result), (triggers_success, triggers_result) \ | |
| = self.connection.wait_for_responses(cf_query, col_query, triggers_query, timeout=self.timeout, fail_on_error=False) | |
| table_result = self._handle_results(cf_success, cf_result) | |
| col_result = self._handle_results(col_success, col_result) | |
| # the triggers table doesn't exist in C* 1.2 | |
| triggers_result = self._handle_results(triggers_success, triggers_result, | |
| expected_failures=InvalidRequest) | |
| if table_result: | |
| return self._build_table_metadata(table_result[0], col_result, triggers_result) | |
| def get_type(self, keyspaces, keyspace, type): | |
| where_clause = bind_params(" WHERE keyspace_name = %s AND type_name = %s", (keyspace, type), _encoder) | |
| return self._query_build_row(self._SELECT_TYPES + where_clause, self._build_user_type) | |
| def get_types_map(self, keyspaces, keyspace): | |
| where_clause = bind_params(" WHERE keyspace_name = %s", (keyspace,), _encoder) | |
| types = self._query_build_rows(self._SELECT_TYPES + where_clause, self._build_user_type) | |
| return dict((t.name, t) for t in types) | |
| def get_function(self, keyspaces, keyspace, function): | |
| where_clause = bind_params(" WHERE keyspace_name = %%s AND function_name = %%s AND %s = %%s" % (self._function_agg_arument_type_col,), | |
| (keyspace, function.name, function.argument_types), _encoder) | |
| return self._query_build_row(self._SELECT_FUNCTIONS + where_clause, self._build_function) | |
| def get_aggregate(self, keyspaces, keyspace, aggregate): | |
| where_clause = bind_params(" WHERE keyspace_name = %%s AND aggregate_name = %%s AND %s = %%s" % (self._function_agg_arument_type_col,), | |
| (keyspace, aggregate.name, aggregate.argument_types), _encoder) | |
| return self._query_build_row(self._SELECT_AGGREGATES + where_clause, self._build_aggregate) | |
| def get_keyspace(self, keyspaces, keyspace): | |
| where_clause = bind_params(" WHERE keyspace_name = %s", (keyspace,), _encoder) | |
| return self._query_build_row(self._SELECT_KEYSPACES + where_clause, self._build_keyspace_metadata) | |
| def _build_keyspace_metadata(cls, row): | |
| try: | |
| ksm = cls._build_keyspace_metadata_internal(row) | |
| except Exception: | |
| name = row["keyspace_name"] | |
| ksm = KeyspaceMetadata(name, False, 'UNKNOWN', {}) | |
| ksm._exc_info = sys.exc_info() # capture exc_info before log because nose (test) logging clears it in certain circumstances | |
| log.exception("Error while parsing metadata for keyspace %s row(%s)", name, row) | |
| return ksm | |
| def _build_keyspace_metadata_internal(row): | |
| name = row["keyspace_name"] | |
| durable_writes = row["durable_writes"] | |
| strategy_class = row["strategy_class"] | |
| strategy_options = json.loads(row["strategy_options"]) | |
| return KeyspaceMetadata(name, durable_writes, strategy_class, strategy_options) | |
| def _build_user_type(cls, usertype_row): | |
| field_types = list(map(cls._schema_type_to_cql, usertype_row['field_types'])) | |
| return UserType(usertype_row['keyspace_name'], usertype_row['type_name'], | |
| usertype_row['field_names'], field_types) | |
| def _build_function(cls, function_row): | |
| return_type = cls._schema_type_to_cql(function_row['return_type']) | |
| deterministic = function_row.get('deterministic', False) | |
| monotonic = function_row.get('monotonic', False) | |
| monotonic_on = function_row.get('monotonic_on', ()) | |
| return Function(function_row['keyspace_name'], function_row['function_name'], | |
| function_row[cls._function_agg_arument_type_col], function_row['argument_names'], | |
| return_type, function_row['language'], function_row['body'], | |
| function_row['called_on_null_input'], | |
| deterministic, monotonic, monotonic_on) | |
| def _build_aggregate(cls, aggregate_row): | |
| cass_state_type = types.lookup_casstype(aggregate_row['state_type']) | |
| initial_condition = aggregate_row['initcond'] | |
| if initial_condition is not None: | |
| initial_condition = _encoder.cql_encode_all_types(cass_state_type.deserialize(initial_condition, 3)) | |
| state_type = _cql_from_cass_type(cass_state_type) | |
| return_type = cls._schema_type_to_cql(aggregate_row['return_type']) | |
| return Aggregate(aggregate_row['keyspace_name'], aggregate_row['aggregate_name'], | |
| aggregate_row['signature'], aggregate_row['state_func'], state_type, | |
| aggregate_row['final_func'], initial_condition, return_type, | |
| aggregate_row.get('deterministic', False)) | |
| def _build_table_metadata(self, row, col_rows=None, trigger_rows=None): | |
| keyspace_name = row["keyspace_name"] | |
| cfname = row[self._table_name_col] | |
| col_rows = col_rows or self.keyspace_table_col_rows[keyspace_name][cfname] | |
| trigger_rows = trigger_rows or self.keyspace_table_trigger_rows[keyspace_name][cfname] | |
| if not col_rows: # CASSANDRA-8487 | |
| log.warning("Building table metadata with no column meta for %s.%s", | |
| keyspace_name, cfname) | |
| table_meta = TableMetadata(keyspace_name, cfname) | |
| try: | |
| comparator = types.lookup_casstype(row["comparator"]) | |
| table_meta.comparator = comparator | |
| is_dct_comparator = issubclass(comparator, types.DynamicCompositeType) | |
| is_composite_comparator = issubclass(comparator, types.CompositeType) | |
| column_name_types = comparator.subtypes if is_composite_comparator else (comparator,) | |
| num_column_name_components = len(column_name_types) | |
| last_col = column_name_types[-1] | |
| column_aliases = row.get("column_aliases", None) | |
| clustering_rows = [r for r in col_rows | |
| if r.get('type', None) == "clustering_key"] | |
| if len(clustering_rows) > 1: | |
| clustering_rows = sorted(clustering_rows, key=lambda row: row.get('component_index')) | |
| if column_aliases is not None: | |
| column_aliases = json.loads(column_aliases) | |
| if not column_aliases: # json load failed or column_aliases empty PYTHON-562 | |
| column_aliases = [r.get('column_name') for r in clustering_rows] | |
| if is_composite_comparator: | |
| if issubclass(last_col, types.ColumnToCollectionType): | |
| # collections | |
| is_compact = False | |
| has_value = False | |
| clustering_size = num_column_name_components - 2 | |
| elif (len(column_aliases) == num_column_name_components - 1 and | |
| issubclass(last_col, types.UTF8Type)): | |
| # aliases? | |
| is_compact = False | |
| has_value = False | |
| clustering_size = num_column_name_components - 1 | |
| else: | |
| # compact table | |
| is_compact = True | |
| has_value = column_aliases or not col_rows | |
| clustering_size = num_column_name_components | |
| # Some thrift tables define names in composite types (see PYTHON-192) | |
| if not column_aliases and hasattr(comparator, 'fieldnames'): | |
| column_aliases = filter(None, comparator.fieldnames) | |
| else: | |
| is_compact = True | |
| if column_aliases or not col_rows or is_dct_comparator: | |
| has_value = True | |
| clustering_size = num_column_name_components | |
| else: | |
| has_value = False | |
| clustering_size = 0 | |
| # partition key | |
| partition_rows = [r for r in col_rows | |
| if r.get('type', None) == "partition_key"] | |
| if len(partition_rows) > 1: | |
| partition_rows = sorted(partition_rows, key=lambda row: row.get('component_index')) | |
| key_aliases = row.get("key_aliases") | |
| if key_aliases is not None: | |
| key_aliases = json.loads(key_aliases) if key_aliases else [] | |
| else: | |
| # In 2.0+, we can use the 'type' column. In 3.0+, we have to use it. | |
| key_aliases = [r.get('column_name') for r in partition_rows] | |
| key_validator = row.get("key_validator") | |
| if key_validator is not None: | |
| key_type = types.lookup_casstype(key_validator) | |
| key_types = key_type.subtypes if issubclass(key_type, types.CompositeType) else [key_type] | |
| else: | |
| key_types = [types.lookup_casstype(r.get('validator')) for r in partition_rows] | |
| for i, col_type in enumerate(key_types): | |
| if len(key_aliases) > i: | |
| column_name = key_aliases[i] | |
| elif i == 0: | |
| column_name = "key" | |
| else: | |
| column_name = "key%d" % i | |
| col = ColumnMetadata(table_meta, column_name, col_type.cql_parameterized_type()) | |
| table_meta.columns[column_name] = col | |
| table_meta.partition_key.append(col) | |
| # clustering key | |
| for i in range(clustering_size): | |
| if len(column_aliases) > i: | |
| column_name = column_aliases[i] | |
| else: | |
| column_name = "column%d" % (i + 1) | |
| data_type = column_name_types[i] | |
| cql_type = _cql_from_cass_type(data_type) | |
| is_reversed = types.is_reversed_casstype(data_type) | |
| col = ColumnMetadata(table_meta, column_name, cql_type, is_reversed=is_reversed) | |
| table_meta.columns[column_name] = col | |
| table_meta.clustering_key.append(col) | |
| # value alias (if present) | |
| if has_value: | |
| value_alias_rows = [r for r in col_rows | |
| if r.get('type', None) == "compact_value"] | |
| if not key_aliases: # TODO are we checking the right thing here? | |
| value_alias = "value" | |
| else: | |
| value_alias = row.get("value_alias", None) | |
| if value_alias is None and value_alias_rows: # CASSANDRA-8487 | |
| # In 2.0+, we can use the 'type' column. In 3.0+, we have to use it. | |
| value_alias = value_alias_rows[0].get('column_name') | |
| default_validator = row.get("default_validator") | |
| if default_validator: | |
| validator = types.lookup_casstype(default_validator) | |
| else: | |
| if value_alias_rows: # CASSANDRA-8487 | |
| validator = types.lookup_casstype(value_alias_rows[0].get('validator')) | |
| cql_type = _cql_from_cass_type(validator) | |
| col = ColumnMetadata(table_meta, value_alias, cql_type) | |
| if value_alias: # CASSANDRA-8487 | |
| table_meta.columns[value_alias] = col | |
| # other normal columns | |
| for col_row in col_rows: | |
| column_meta = self._build_column_metadata(table_meta, col_row) | |
| if column_meta.name is not None: | |
| table_meta.columns[column_meta.name] = column_meta | |
| index_meta = self._build_index_metadata(column_meta, col_row) | |
| if index_meta: | |
| table_meta.indexes[index_meta.name] = index_meta | |
| for trigger_row in trigger_rows: | |
| trigger_meta = self._build_trigger_metadata(table_meta, trigger_row) | |
| table_meta.triggers[trigger_meta.name] = trigger_meta | |
| table_meta.options = self._build_table_options(row) | |
| table_meta.is_compact_storage = is_compact | |
| except Exception: | |
| table_meta._exc_info = sys.exc_info() | |
| log.exception("Error while parsing metadata for table %s.%s row(%s) columns(%s)", keyspace_name, cfname, row, col_rows) | |
| return table_meta | |
| def _build_table_options(self, row): | |
| """ Setup the mostly-non-schema table options, like caching settings """ | |
| options = dict((o, row.get(o)) for o in self.recognized_table_options if o in row) | |
| # the option name when creating tables is "dclocal_read_repair_chance", | |
| # but the column name in system.schema_columnfamilies is | |
| # "local_read_repair_chance". We'll store this as dclocal_read_repair_chance, | |
| # since that's probably what users are expecting (and we need it for the | |
| # CREATE TABLE statement anyway). | |
| if "local_read_repair_chance" in options: | |
| val = options.pop("local_read_repair_chance") | |
| options["dclocal_read_repair_chance"] = val | |
| return options | |
| def _build_column_metadata(cls, table_metadata, row): | |
| name = row["column_name"] | |
| type_string = row["validator"] | |
| data_type = types.lookup_casstype(type_string) | |
| cql_type = _cql_from_cass_type(data_type) | |
| is_static = row.get("type", None) == "static" | |
| is_reversed = types.is_reversed_casstype(data_type) | |
| column_meta = ColumnMetadata(table_metadata, name, cql_type, is_static, is_reversed) | |
| column_meta._cass_type = data_type | |
| return column_meta | |
| def _build_index_metadata(column_metadata, row): | |
| index_name = row.get("index_name") | |
| kind = row.get("index_type") | |
| if index_name or kind: | |
| options = row.get("index_options") | |
| options = json.loads(options) if options else {} | |
| options = options or {} # if the json parsed to None, init empty dict | |
| # generate a CQL index identity string | |
| target = protect_name(column_metadata.name) | |
| if kind != "CUSTOM": | |
| if "index_keys" in options: | |
| target = 'keys(%s)' % (target,) | |
| elif "index_values" in options: | |
| # don't use any "function" for collection values | |
| pass | |
| else: | |
| # it might be a "full" index on a frozen collection, but | |
| # we need to check the data type to verify that, because | |
| # there is no special index option for full-collection | |
| # indexes. | |
| data_type = column_metadata._cass_type | |
| collection_types = ('map', 'set', 'list') | |
| if data_type.typename == "frozen" and data_type.subtypes[0].typename in collection_types: | |
| # no index option for full-collection index | |
| target = 'full(%s)' % (target,) | |
| options['target'] = target | |
| return IndexMetadata(column_metadata.table.keyspace_name, column_metadata.table.name, index_name, kind, options) | |
| def _build_trigger_metadata(table_metadata, row): | |
| name = row["trigger_name"] | |
| options = row["trigger_options"] | |
| trigger_meta = TriggerMetadata(table_metadata, name, options) | |
| return trigger_meta | |
| def _query_all(self): | |
| cl = ConsistencyLevel.ONE | |
| queries = [ | |
| QueryMessage(query=self._SELECT_KEYSPACES, consistency_level=cl), | |
| QueryMessage(query=self._SELECT_COLUMN_FAMILIES, consistency_level=cl), | |
| QueryMessage(query=self._SELECT_COLUMNS, consistency_level=cl), | |
| QueryMessage(query=self._SELECT_TYPES, consistency_level=cl), | |
| QueryMessage(query=self._SELECT_FUNCTIONS, consistency_level=cl), | |
| QueryMessage(query=self._SELECT_AGGREGATES, consistency_level=cl), | |
| QueryMessage(query=self._SELECT_TRIGGERS, consistency_level=cl) | |
| ] | |
| ((ks_success, ks_result), | |
| (table_success, table_result), | |
| (col_success, col_result), | |
| (types_success, types_result), | |
| (functions_success, functions_result), | |
| (aggregates_success, aggregates_result), | |
| (triggers_success, triggers_result)) = ( | |
| self.connection.wait_for_responses(*queries, timeout=self.timeout, | |
| fail_on_error=False) | |
| ) | |
| self.keyspaces_result = self._handle_results(ks_success, ks_result) | |
| self.tables_result = self._handle_results(table_success, table_result) | |
| self.columns_result = self._handle_results(col_success, col_result) | |
| # if we're connected to Cassandra < 2.0, the triggers table will not exist | |
| if triggers_success: | |
| self.triggers_result = dict_factory(triggers_result.column_names, triggers_result.parsed_rows) | |
| else: | |
| if isinstance(triggers_result, InvalidRequest): | |
| log.debug("triggers table not found") | |
| elif isinstance(triggers_result, Unauthorized): | |
| log.warning("this version of Cassandra does not allow access to schema_triggers metadata with authorization enabled (CASSANDRA-7967); " | |
| "The driver will operate normally, but will not reflect triggers in the local metadata model, or schema strings.") | |
| else: | |
| raise triggers_result | |
| # if we're connected to Cassandra < 2.1, the usertypes table will not exist | |
| if types_success: | |
| self.types_result = dict_factory(types_result.column_names, types_result.parsed_rows) | |
| else: | |
| if isinstance(types_result, InvalidRequest): | |
| log.debug("user types table not found") | |
| self.types_result = {} | |
| else: | |
| raise types_result | |
| # functions were introduced in Cassandra 2.2 | |
| if functions_success: | |
| self.functions_result = dict_factory(functions_result.column_names, functions_result.parsed_rows) | |
| else: | |
| if isinstance(functions_result, InvalidRequest): | |
| log.debug("user functions table not found") | |
| else: | |
| raise functions_result | |
| # aggregates were introduced in Cassandra 2.2 | |
| if aggregates_success: | |
| self.aggregates_result = dict_factory(aggregates_result.column_names, aggregates_result.parsed_rows) | |
| else: | |
| if isinstance(aggregates_result, InvalidRequest): | |
| log.debug("user aggregates table not found") | |
| else: | |
| raise aggregates_result | |
| self._aggregate_results() | |
| def _aggregate_results(self): | |
| m = self.keyspace_table_rows | |
| for row in self.tables_result: | |
| m[row["keyspace_name"]].append(row) | |
| m = self.keyspace_table_col_rows | |
| for row in self.columns_result: | |
| ksname = row["keyspace_name"] | |
| cfname = row[self._table_name_col] | |
| m[ksname][cfname].append(row) | |
| m = self.keyspace_type_rows | |
| for row in self.types_result: | |
| m[row["keyspace_name"]].append(row) | |
| m = self.keyspace_func_rows | |
| for row in self.functions_result: | |
| m[row["keyspace_name"]].append(row) | |
| m = self.keyspace_agg_rows | |
| for row in self.aggregates_result: | |
| m[row["keyspace_name"]].append(row) | |
| m = self.keyspace_table_trigger_rows | |
| for row in self.triggers_result: | |
| ksname = row["keyspace_name"] | |
| cfname = row[self._table_name_col] | |
| m[ksname][cfname].append(row) | |
| def _schema_type_to_cql(type_string): | |
| cass_type = types.lookup_casstype(type_string) | |
| return _cql_from_cass_type(cass_type) | |
| class SchemaParserV3(SchemaParserV22): | |
| """ | |
| For C* 3.0+ | |
| """ | |
| _SELECT_KEYSPACES = "SELECT * FROM system_schema.keyspaces" | |
| _SELECT_TABLES = "SELECT * FROM system_schema.tables" | |
| _SELECT_COLUMNS = "SELECT * FROM system_schema.columns" | |
| _SELECT_INDEXES = "SELECT * FROM system_schema.indexes" | |
| _SELECT_TRIGGERS = "SELECT * FROM system_schema.triggers" | |
| _SELECT_TYPES = "SELECT * FROM system_schema.types" | |
| _SELECT_FUNCTIONS = "SELECT * FROM system_schema.functions" | |
| _SELECT_AGGREGATES = "SELECT * FROM system_schema.aggregates" | |
| _SELECT_VIEWS = "SELECT * FROM system_schema.views" | |
| _table_name_col = 'table_name' | |
| _function_agg_arument_type_col = 'argument_types' | |
| _table_metadata_class = TableMetadataV3 | |
| recognized_table_options = ( | |
| 'bloom_filter_fp_chance', | |
| 'caching', | |
| 'cdc', | |
| 'comment', | |
| 'compaction', | |
| 'compression', | |
| 'crc_check_chance', | |
| 'dclocal_read_repair_chance', | |
| 'default_time_to_live', | |
| 'gc_grace_seconds', | |
| 'max_index_interval', | |
| 'memtable_flush_period_in_ms', | |
| 'min_index_interval', | |
| 'read_repair_chance', | |
| 'speculative_retry') | |
| def __init__(self, connection, timeout): | |
| super(SchemaParserV3, self).__init__(connection, timeout) | |
| self.indexes_result = [] | |
| self.keyspace_table_index_rows = defaultdict(lambda: defaultdict(list)) | |
| self.keyspace_view_rows = defaultdict(list) | |
| def get_all_keyspaces(self): | |
| for keyspace_meta in super(SchemaParserV3, self).get_all_keyspaces(): | |
| for row in self.keyspace_view_rows[keyspace_meta.name]: | |
| view_meta = self._build_view_metadata(row) | |
| keyspace_meta._add_view_metadata(view_meta) | |
| yield keyspace_meta | |
| def get_table(self, keyspaces, keyspace, table): | |
| cl = ConsistencyLevel.ONE | |
| where_clause = bind_params(" WHERE keyspace_name = %%s AND %s = %%s" % (self._table_name_col), (keyspace, table), _encoder) | |
| cf_query = QueryMessage(query=self._SELECT_TABLES + where_clause, consistency_level=cl) | |
| col_query = QueryMessage(query=self._SELECT_COLUMNS + where_clause, consistency_level=cl) | |
| indexes_query = QueryMessage(query=self._SELECT_INDEXES + where_clause, consistency_level=cl) | |
| triggers_query = QueryMessage(query=self._SELECT_TRIGGERS + where_clause, consistency_level=cl) | |
| # in protocol v4 we don't know if this event is a view or a table, so we look for both | |
| where_clause = bind_params(" WHERE keyspace_name = %s AND view_name = %s", (keyspace, table), _encoder) | |
| view_query = QueryMessage(query=self._SELECT_VIEWS + where_clause, | |
| consistency_level=cl) | |
| ((cf_success, cf_result), (col_success, col_result), | |
| (indexes_sucess, indexes_result), (triggers_success, triggers_result), | |
| (view_success, view_result)) = ( | |
| self.connection.wait_for_responses( | |
| cf_query, col_query, indexes_query, triggers_query, | |
| view_query, timeout=self.timeout, fail_on_error=False) | |
| ) | |
| table_result = self._handle_results(cf_success, cf_result) | |
| col_result = self._handle_results(col_success, col_result) | |
| if table_result: | |
| indexes_result = self._handle_results(indexes_sucess, indexes_result) | |
| triggers_result = self._handle_results(triggers_success, triggers_result) | |
| return self._build_table_metadata(table_result[0], col_result, triggers_result, indexes_result) | |
| view_result = self._handle_results(view_success, view_result) | |
| if view_result: | |
| return self._build_view_metadata(view_result[0], col_result) | |
| def _build_keyspace_metadata_internal(row): | |
| name = row["keyspace_name"] | |
| durable_writes = row["durable_writes"] | |
| strategy_options = dict(row["replication"]) | |
| strategy_class = strategy_options.pop("class") | |
| return KeyspaceMetadata(name, durable_writes, strategy_class, strategy_options) | |
| def _build_aggregate(aggregate_row): | |
| return Aggregate(aggregate_row['keyspace_name'], aggregate_row['aggregate_name'], | |
| aggregate_row['argument_types'], aggregate_row['state_func'], aggregate_row['state_type'], | |
| aggregate_row['final_func'], aggregate_row['initcond'], aggregate_row['return_type'], | |
| aggregate_row.get('deterministic', False)) | |
| def _build_table_metadata(self, row, col_rows=None, trigger_rows=None, index_rows=None, virtual=False): | |
| keyspace_name = row["keyspace_name"] | |
| table_name = row[self._table_name_col] | |
| col_rows = col_rows or self.keyspace_table_col_rows[keyspace_name][table_name] | |
| trigger_rows = trigger_rows or self.keyspace_table_trigger_rows[keyspace_name][table_name] | |
| index_rows = index_rows or self.keyspace_table_index_rows[keyspace_name][table_name] | |
| table_meta = self._table_metadata_class(keyspace_name, table_name, virtual=virtual) | |
| try: | |
| table_meta.options = self._build_table_options(row) | |
| flags = row.get('flags', set()) | |
| if flags: | |
| is_dense = 'dense' in flags | |
| compact_static = not is_dense and 'super' not in flags and 'compound' not in flags | |
| table_meta.is_compact_storage = is_dense or 'super' in flags or 'compound' not in flags | |
| elif virtual: | |
| compact_static = False | |
| table_meta.is_compact_storage = False | |
| is_dense = False | |
| else: | |
| compact_static = True | |
| table_meta.is_compact_storage = True | |
| is_dense = False | |
| self._build_table_columns(table_meta, col_rows, compact_static, is_dense, virtual) | |
| for trigger_row in trigger_rows: | |
| trigger_meta = self._build_trigger_metadata(table_meta, trigger_row) | |
| table_meta.triggers[trigger_meta.name] = trigger_meta | |
| for index_row in index_rows: | |
| index_meta = self._build_index_metadata(table_meta, index_row) | |
| if index_meta: | |
| table_meta.indexes[index_meta.name] = index_meta | |
| table_meta.extensions = row.get('extensions', {}) | |
| except Exception: | |
| table_meta._exc_info = sys.exc_info() | |
| log.exception("Error while parsing metadata for table %s.%s row(%s) columns(%s)", keyspace_name, table_name, row, col_rows) | |
| return table_meta | |
| def _build_table_options(self, row): | |
| """ Setup the mostly-non-schema table options, like caching settings """ | |
| return dict((o, row.get(o)) for o in self.recognized_table_options if o in row) | |
| def _build_table_columns(self, meta, col_rows, compact_static=False, is_dense=False, virtual=False): | |
| # partition key | |
| partition_rows = [r for r in col_rows | |
| if r.get('kind', None) == "partition_key"] | |
| if len(partition_rows) > 1: | |
| partition_rows = sorted(partition_rows, key=lambda row: row.get('position')) | |
| for r in partition_rows: | |
| # we have to add meta here (and not in the later loop) because TableMetadata.columns is an | |
| # OrderedDict, and it assumes keys are inserted first, in order, when exporting CQL | |
| column_meta = self._build_column_metadata(meta, r) | |
| meta.columns[column_meta.name] = column_meta | |
| meta.partition_key.append(meta.columns[r.get('column_name')]) | |
| # clustering key | |
| if not compact_static: | |
| clustering_rows = [r for r in col_rows | |
| if r.get('kind', None) == "clustering"] | |
| if len(clustering_rows) > 1: | |
| clustering_rows = sorted(clustering_rows, key=lambda row: row.get('position')) | |
| for r in clustering_rows: | |
| column_meta = self._build_column_metadata(meta, r) | |
| meta.columns[column_meta.name] = column_meta | |
| meta.clustering_key.append(meta.columns[r.get('column_name')]) | |
| for col_row in (r for r in col_rows | |
| if r.get('kind', None) not in ('partition_key', 'clustering_key')): | |
| column_meta = self._build_column_metadata(meta, col_row) | |
| if is_dense and column_meta.cql_type == types.cql_empty_type: | |
| continue | |
| if compact_static and not column_meta.is_static: | |
| # for compact static tables, we omit the clustering key and value, and only add the logical columns. | |
| # They are marked not static so that it generates appropriate CQL | |
| continue | |
| if compact_static: | |
| column_meta.is_static = False | |
| meta.columns[column_meta.name] = column_meta | |
| def _build_view_metadata(self, row, col_rows=None): | |
| keyspace_name = row["keyspace_name"] | |
| view_name = row["view_name"] | |
| base_table_name = row["base_table_name"] | |
| include_all_columns = row["include_all_columns"] | |
| where_clause = row["where_clause"] | |
| col_rows = col_rows or self.keyspace_table_col_rows[keyspace_name][view_name] | |
| view_meta = MaterializedViewMetadata(keyspace_name, view_name, base_table_name, | |
| include_all_columns, where_clause, self._build_table_options(row)) | |
| self._build_table_columns(view_meta, col_rows) | |
| view_meta.extensions = row.get('extensions', {}) | |
| return view_meta | |
| def _build_column_metadata(table_metadata, row): | |
| name = row["column_name"] | |
| cql_type = row["type"] | |
| is_static = row.get("kind", None) == "static" | |
| is_reversed = row["clustering_order"].upper() == "DESC" | |
| column_meta = ColumnMetadata(table_metadata, name, cql_type, is_static, is_reversed) | |
| return column_meta | |
| def _build_index_metadata(table_metadata, row): | |
| index_name = row.get("index_name") | |
| kind = row.get("kind") | |
| if index_name or kind: | |
| index_options = row.get("options") | |
| return IndexMetadata(table_metadata.keyspace_name, table_metadata.name, index_name, kind, index_options) | |
| else: | |
| return None | |
| def _build_trigger_metadata(table_metadata, row): | |
| name = row["trigger_name"] | |
| options = row["options"] | |
| trigger_meta = TriggerMetadata(table_metadata, name, options) | |
| return trigger_meta | |
| def _query_all(self): | |
| cl = ConsistencyLevel.ONE | |
| queries = [ | |
| QueryMessage(query=self._SELECT_KEYSPACES, consistency_level=cl), | |
| QueryMessage(query=self._SELECT_TABLES, consistency_level=cl), | |
| QueryMessage(query=self._SELECT_COLUMNS, consistency_level=cl), | |
| QueryMessage(query=self._SELECT_TYPES, consistency_level=cl), | |
| QueryMessage(query=self._SELECT_FUNCTIONS, consistency_level=cl), | |
| QueryMessage(query=self._SELECT_AGGREGATES, consistency_level=cl), | |
| QueryMessage(query=self._SELECT_TRIGGERS, consistency_level=cl), | |
| QueryMessage(query=self._SELECT_INDEXES, consistency_level=cl), | |
| QueryMessage(query=self._SELECT_VIEWS, consistency_level=cl) | |
| ] | |
| ((ks_success, ks_result), | |
| (table_success, table_result), | |
| (col_success, col_result), | |
| (types_success, types_result), | |
| (functions_success, functions_result), | |
| (aggregates_success, aggregates_result), | |
| (triggers_success, triggers_result), | |
| (indexes_success, indexes_result), | |
| (views_success, views_result)) = self.connection.wait_for_responses( | |
| *queries, timeout=self.timeout, fail_on_error=False | |
| ) | |
| self.keyspaces_result = self._handle_results(ks_success, ks_result) | |
| self.tables_result = self._handle_results(table_success, table_result) | |
| self.columns_result = self._handle_results(col_success, col_result) | |
| self.triggers_result = self._handle_results(triggers_success, triggers_result) | |
| self.types_result = self._handle_results(types_success, types_result) | |
| self.functions_result = self._handle_results(functions_success, functions_result) | |
| self.aggregates_result = self._handle_results(aggregates_success, aggregates_result) | |
| self.indexes_result = self._handle_results(indexes_success, indexes_result) | |
| self.views_result = self._handle_results(views_success, views_result) | |
| self._aggregate_results() | |
| def _aggregate_results(self): | |
| super(SchemaParserV3, self)._aggregate_results() | |
| m = self.keyspace_table_index_rows | |
| for row in self.indexes_result: | |
| ksname = row["keyspace_name"] | |
| cfname = row[self._table_name_col] | |
| m[ksname][cfname].append(row) | |
| m = self.keyspace_view_rows | |
| for row in self.views_result: | |
| m[row["keyspace_name"]].append(row) | |
| def _schema_type_to_cql(type_string): | |
| return type_string | |
| class SchemaParserDSE60(SchemaParserV3): | |
| """ | |
| For DSE 6.0+ | |
| """ | |
| recognized_table_options = (SchemaParserV3.recognized_table_options + | |
| ("nodesync",)) | |
| class SchemaParserV4(SchemaParserV3): | |
| recognized_table_options = ( | |
| 'additional_write_policy', | |
| 'bloom_filter_fp_chance', | |
| 'caching', | |
| 'cdc', | |
| 'comment', | |
| 'compaction', | |
| 'compression', | |
| 'crc_check_chance', | |
| 'default_time_to_live', | |
| 'gc_grace_seconds', | |
| 'max_index_interval', | |
| 'memtable_flush_period_in_ms', | |
| 'min_index_interval', | |
| 'read_repair', | |
| 'speculative_retry') | |
| _SELECT_VIRTUAL_KEYSPACES = 'SELECT * from system_virtual_schema.keyspaces' | |
| _SELECT_VIRTUAL_TABLES = 'SELECT * from system_virtual_schema.tables' | |
| _SELECT_VIRTUAL_COLUMNS = 'SELECT * from system_virtual_schema.columns' | |
| def __init__(self, connection, timeout): | |
| super(SchemaParserV4, self).__init__(connection, timeout) | |
| self.virtual_keyspaces_rows = defaultdict(list) | |
| self.virtual_tables_rows = defaultdict(list) | |
| self.virtual_columns_rows = defaultdict(lambda: defaultdict(list)) | |
| def _query_all(self): | |
| cl = ConsistencyLevel.ONE | |
| # todo: this duplicates V3; we should find a way for _query_all methods | |
| # to extend each other. | |
| queries = [ | |
| # copied from V3 | |
| QueryMessage(query=self._SELECT_KEYSPACES, consistency_level=cl), | |
| QueryMessage(query=self._SELECT_TABLES, consistency_level=cl), | |
| QueryMessage(query=self._SELECT_COLUMNS, consistency_level=cl), | |
| QueryMessage(query=self._SELECT_TYPES, consistency_level=cl), | |
| QueryMessage(query=self._SELECT_FUNCTIONS, consistency_level=cl), | |
| QueryMessage(query=self._SELECT_AGGREGATES, consistency_level=cl), | |
| QueryMessage(query=self._SELECT_TRIGGERS, consistency_level=cl), | |
| QueryMessage(query=self._SELECT_INDEXES, consistency_level=cl), | |
| QueryMessage(query=self._SELECT_VIEWS, consistency_level=cl), | |
| # V4-only queries | |
| QueryMessage(query=self._SELECT_VIRTUAL_KEYSPACES, consistency_level=cl), | |
| QueryMessage(query=self._SELECT_VIRTUAL_TABLES, consistency_level=cl), | |
| QueryMessage(query=self._SELECT_VIRTUAL_COLUMNS, consistency_level=cl) | |
| ] | |
| responses = self.connection.wait_for_responses( | |
| *queries, timeout=self.timeout, fail_on_error=False) | |
| ( | |
| # copied from V3 | |
| (ks_success, ks_result), | |
| (table_success, table_result), | |
| (col_success, col_result), | |
| (types_success, types_result), | |
| (functions_success, functions_result), | |
| (aggregates_success, aggregates_result), | |
| (triggers_success, triggers_result), | |
| (indexes_success, indexes_result), | |
| (views_success, views_result), | |
| # V4-only responses | |
| (virtual_ks_success, virtual_ks_result), | |
| (virtual_table_success, virtual_table_result), | |
| (virtual_column_success, virtual_column_result) | |
| ) = responses | |
| # copied from V3 | |
| self.keyspaces_result = self._handle_results(ks_success, ks_result) | |
| self.tables_result = self._handle_results(table_success, table_result) | |
| self.columns_result = self._handle_results(col_success, col_result) | |
| self.triggers_result = self._handle_results(triggers_success, triggers_result) | |
| self.types_result = self._handle_results(types_success, types_result) | |
| self.functions_result = self._handle_results(functions_success, functions_result) | |
| self.aggregates_result = self._handle_results(aggregates_success, aggregates_result) | |
| self.indexes_result = self._handle_results(indexes_success, indexes_result) | |
| self.views_result = self._handle_results(views_success, views_result) | |
| # V4-only results | |
| # These tables don't exist in some DSE versions reporting 4.X so we can | |
| # ignore them if we got an error | |
| self.virtual_keyspaces_result = self._handle_results( | |
| virtual_ks_success, virtual_ks_result, | |
| expected_failures=(InvalidRequest,) | |
| ) | |
| self.virtual_tables_result = self._handle_results( | |
| virtual_table_success, virtual_table_result, | |
| expected_failures=(InvalidRequest,) | |
| ) | |
| self.virtual_columns_result = self._handle_results( | |
| virtual_column_success, virtual_column_result, | |
| expected_failures=(InvalidRequest,) | |
| ) | |
| self._aggregate_results() | |
| def _aggregate_results(self): | |
| super(SchemaParserV4, self)._aggregate_results() | |
| m = self.virtual_tables_rows | |
| for row in self.virtual_tables_result: | |
| m[row["keyspace_name"]].append(row) | |
| m = self.virtual_columns_rows | |
| for row in self.virtual_columns_result: | |
| ks_name = row['keyspace_name'] | |
| tab_name = row[self._table_name_col] | |
| m[ks_name][tab_name].append(row) | |
| def get_all_keyspaces(self): | |
| for x in super(SchemaParserV4, self).get_all_keyspaces(): | |
| yield x | |
| for row in self.virtual_keyspaces_result: | |
| ks_name = row['keyspace_name'] | |
| keyspace_meta = self._build_keyspace_metadata(row) | |
| keyspace_meta.virtual = True | |
| for table_row in self.virtual_tables_rows.get(ks_name, []): | |
| table_name = table_row[self._table_name_col] | |
| col_rows = self.virtual_columns_rows[ks_name][table_name] | |
| keyspace_meta._add_table_metadata( | |
| self._build_table_metadata(table_row, | |
| col_rows=col_rows, | |
| virtual=True) | |
| ) | |
| yield keyspace_meta | |
| def _build_keyspace_metadata_internal(row): | |
| # necessary fields that aren't int virtual ks | |
| row["durable_writes"] = row.get("durable_writes", None) | |
| row["replication"] = row.get("replication", {}) | |
| row["replication"]["class"] = row["replication"].get("class", None) | |
| return super(SchemaParserV4, SchemaParserV4)._build_keyspace_metadata_internal(row) | |
| class SchemaParserDSE67(SchemaParserV4): | |
| """ | |
| For DSE 6.7+ | |
| """ | |
| recognized_table_options = (SchemaParserV4.recognized_table_options + | |
| ("nodesync",)) | |
| class SchemaParserDSE68(SchemaParserDSE67): | |
| """ | |
| For DSE 6.8+ | |
| """ | |
| _SELECT_VERTICES = "SELECT * FROM system_schema.vertices" | |
| _SELECT_EDGES = "SELECT * FROM system_schema.edges" | |
| _table_metadata_class = TableMetadataDSE68 | |
| def __init__(self, connection, timeout): | |
| super(SchemaParserDSE68, self).__init__(connection, timeout) | |
| self.keyspace_table_vertex_rows = defaultdict(lambda: defaultdict(list)) | |
| self.keyspace_table_edge_rows = defaultdict(lambda: defaultdict(list)) | |
| def get_all_keyspaces(self): | |
| for keyspace_meta in super(SchemaParserDSE68, self).get_all_keyspaces(): | |
| self._build_graph_metadata(keyspace_meta) | |
| yield keyspace_meta | |
| def get_table(self, keyspaces, keyspace, table): | |
| table_meta = super(SchemaParserDSE68, self).get_table(keyspaces, keyspace, table) | |
| cl = ConsistencyLevel.ONE | |
| where_clause = bind_params(" WHERE keyspace_name = %%s AND %s = %%s" % (self._table_name_col), (keyspace, table), _encoder) | |
| vertices_query = QueryMessage(query=self._SELECT_VERTICES + where_clause, consistency_level=cl) | |
| edges_query = QueryMessage(query=self._SELECT_EDGES + where_clause, consistency_level=cl) | |
| (vertices_success, vertices_result), (edges_success, edges_result) \ | |
| = self.connection.wait_for_responses(vertices_query, edges_query, timeout=self.timeout, fail_on_error=False) | |
| vertices_result = self._handle_results(vertices_success, vertices_result) | |
| edges_result = self._handle_results(edges_success, edges_result) | |
| try: | |
| if vertices_result: | |
| table_meta.vertex = self._build_table_vertex_metadata(vertices_result[0]) | |
| elif edges_result: | |
| table_meta.edge = self._build_table_edge_metadata(keyspaces[keyspace], edges_result[0]) | |
| except Exception: | |
| table_meta.vertex = None | |
| table_meta.edge = None | |
| table_meta._exc_info = sys.exc_info() | |
| log.exception("Error while parsing graph metadata for table %s.%s.", keyspace, table) | |
| return table_meta | |
| def _build_keyspace_metadata_internal(row): | |
| name = row["keyspace_name"] | |
| durable_writes = row.get("durable_writes", None) | |
| replication = dict(row.get("replication")) if 'replication' in row else {} | |
| replication_class = replication.pop("class") if 'class' in replication else None | |
| graph_engine = row.get("graph_engine", None) | |
| return KeyspaceMetadata(name, durable_writes, replication_class, replication, graph_engine) | |
| def _build_graph_metadata(self, keyspace_meta): | |
| def _build_table_graph_metadata(table_meta): | |
| for row in self.keyspace_table_vertex_rows[keyspace_meta.name][table_meta.name]: | |
| table_meta.vertex = self._build_table_vertex_metadata(row) | |
| for row in self.keyspace_table_edge_rows[keyspace_meta.name][table_meta.name]: | |
| table_meta.edge = self._build_table_edge_metadata(keyspace_meta, row) | |
| try: | |
| # Make sure we process vertices before edges | |
| for table_meta in [t for t in keyspace_meta.tables.values() | |
| if t.name in self.keyspace_table_vertex_rows[keyspace_meta.name]]: | |
| _build_table_graph_metadata(table_meta) | |
| # all other tables... | |
| for table_meta in [t for t in keyspace_meta.tables.values() | |
| if t.name not in self.keyspace_table_vertex_rows[keyspace_meta.name]]: | |
| _build_table_graph_metadata(table_meta) | |
| except Exception: | |
| # schema error, remove all graph metadata for this keyspace | |
| for t in keyspace_meta.tables.values(): | |
| t.edge = t.vertex = None | |
| keyspace_meta._exc_info = sys.exc_info() | |
| log.exception("Error while parsing graph metadata for keyspace %s", keyspace_meta.name) | |
| def _build_table_vertex_metadata(row): | |
| return VertexMetadata(row.get("keyspace_name"), row.get("table_name"), | |
| row.get("label_name")) | |
| def _build_table_edge_metadata(keyspace_meta, row): | |
| from_table = row.get("from_table") | |
| from_table_meta = keyspace_meta.tables.get(from_table) | |
| from_label = from_table_meta.vertex.label_name | |
| to_table = row.get("to_table") | |
| to_table_meta = keyspace_meta.tables.get(to_table) | |
| to_label = to_table_meta.vertex.label_name | |
| return EdgeMetadata( | |
| row.get("keyspace_name"), row.get("table_name"), | |
| row.get("label_name"), from_table, from_label, | |
| row.get("from_partition_key_columns"), | |
| row.get("from_clustering_columns"), to_table, to_label, | |
| row.get("to_partition_key_columns"), | |
| row.get("to_clustering_columns")) | |
| def _query_all(self): | |
| cl = ConsistencyLevel.ONE | |
| queries = [ | |
| # copied from v4 | |
| QueryMessage(query=self._SELECT_KEYSPACES, consistency_level=cl), | |
| QueryMessage(query=self._SELECT_TABLES, consistency_level=cl), | |
| QueryMessage(query=self._SELECT_COLUMNS, consistency_level=cl), | |
| QueryMessage(query=self._SELECT_TYPES, consistency_level=cl), | |
| QueryMessage(query=self._SELECT_FUNCTIONS, consistency_level=cl), | |
| QueryMessage(query=self._SELECT_AGGREGATES, consistency_level=cl), | |
| QueryMessage(query=self._SELECT_TRIGGERS, consistency_level=cl), | |
| QueryMessage(query=self._SELECT_INDEXES, consistency_level=cl), | |
| QueryMessage(query=self._SELECT_VIEWS, consistency_level=cl), | |
| QueryMessage(query=self._SELECT_VIRTUAL_KEYSPACES, consistency_level=cl), | |
| QueryMessage(query=self._SELECT_VIRTUAL_TABLES, consistency_level=cl), | |
| QueryMessage(query=self._SELECT_VIRTUAL_COLUMNS, consistency_level=cl), | |
| # dse6.8 only | |
| QueryMessage(query=self._SELECT_VERTICES, consistency_level=cl), | |
| QueryMessage(query=self._SELECT_EDGES, consistency_level=cl) | |
| ] | |
| responses = self.connection.wait_for_responses( | |
| *queries, timeout=self.timeout, fail_on_error=False) | |
| ( | |
| # copied from V4 | |
| (ks_success, ks_result), | |
| (table_success, table_result), | |
| (col_success, col_result), | |
| (types_success, types_result), | |
| (functions_success, functions_result), | |
| (aggregates_success, aggregates_result), | |
| (triggers_success, triggers_result), | |
| (indexes_success, indexes_result), | |
| (views_success, views_result), | |
| (virtual_ks_success, virtual_ks_result), | |
| (virtual_table_success, virtual_table_result), | |
| (virtual_column_success, virtual_column_result), | |
| # dse6.8 responses | |
| (vertices_success, vertices_result), | |
| (edges_success, edges_result) | |
| ) = responses | |
| # copied from V4 | |
| self.keyspaces_result = self._handle_results(ks_success, ks_result) | |
| self.tables_result = self._handle_results(table_success, table_result) | |
| self.columns_result = self._handle_results(col_success, col_result) | |
| self.triggers_result = self._handle_results(triggers_success, triggers_result) | |
| self.types_result = self._handle_results(types_success, types_result) | |
| self.functions_result = self._handle_results(functions_success, functions_result) | |
| self.aggregates_result = self._handle_results(aggregates_success, aggregates_result) | |
| self.indexes_result = self._handle_results(indexes_success, indexes_result) | |
| self.views_result = self._handle_results(views_success, views_result) | |
| # These tables don't exist in some DSE versions reporting 4.X so we can | |
| # ignore them if we got an error | |
| self.virtual_keyspaces_result = self._handle_results( | |
| virtual_ks_success, virtual_ks_result, | |
| expected_failures=(InvalidRequest,) | |
| ) | |
| self.virtual_tables_result = self._handle_results( | |
| virtual_table_success, virtual_table_result, | |
| expected_failures=(InvalidRequest,) | |
| ) | |
| self.virtual_columns_result = self._handle_results( | |
| virtual_column_success, virtual_column_result, | |
| expected_failures=(InvalidRequest,) | |
| ) | |
| # dse6.8-only results | |
| self.vertices_result = self._handle_results(vertices_success, vertices_result) | |
| self.edges_result = self._handle_results(edges_success, edges_result) | |
| self._aggregate_results() | |
| def _aggregate_results(self): | |
| super(SchemaParserDSE68, self)._aggregate_results() | |
| m = self.keyspace_table_vertex_rows | |
| for row in self.vertices_result: | |
| ksname = row["keyspace_name"] | |
| cfname = row['table_name'] | |
| m[ksname][cfname].append(row) | |
| m = self.keyspace_table_edge_rows | |
| for row in self.edges_result: | |
| ksname = row["keyspace_name"] | |
| cfname = row['table_name'] | |
| m[ksname][cfname].append(row) | |
| class MaterializedViewMetadata(object): | |
| """ | |
| A representation of a materialized view on a table | |
| """ | |
| keyspace_name = None | |
| """ A string name of the keyspace of this view.""" | |
| name = None | |
| """ A string name of the view.""" | |
| base_table_name = None | |
| """ A string name of the base table for this view.""" | |
| partition_key = None | |
| """ | |
| A list of :class:`.ColumnMetadata` instances representing the columns in | |
| the partition key for this view. This will always hold at least one | |
| column. | |
| """ | |
| clustering_key = None | |
| """ | |
| A list of :class:`.ColumnMetadata` instances representing the columns | |
| in the clustering key for this view. | |
| Note that a table may have no clustering keys, in which case this will | |
| be an empty list. | |
| """ | |
| columns = None | |
| """ | |
| A dict mapping column names to :class:`.ColumnMetadata` instances. | |
| """ | |
| include_all_columns = None | |
| """ A flag indicating whether the view was created AS SELECT * """ | |
| where_clause = None | |
| """ String WHERE clause for the view select statement. From server metadata """ | |
| options = None | |
| """ | |
| A dict mapping table option names to their specific settings for this | |
| view. | |
| """ | |
| extensions = None | |
| """ | |
| Metadata describing configuration for table extensions | |
| """ | |
| def __init__(self, keyspace_name, view_name, base_table_name, include_all_columns, where_clause, options): | |
| self.keyspace_name = keyspace_name | |
| self.name = view_name | |
| self.base_table_name = base_table_name | |
| self.partition_key = [] | |
| self.clustering_key = [] | |
| self.columns = OrderedDict() | |
| self.include_all_columns = include_all_columns | |
| self.where_clause = where_clause | |
| self.options = options or {} | |
| def as_cql_query(self, formatted=False): | |
| """ | |
| Returns a CQL query that can be used to recreate this function. | |
| If `formatted` is set to :const:`True`, extra whitespace will | |
| be added to make the query more readable. | |
| """ | |
| sep = '\n ' if formatted else ' ' | |
| keyspace = protect_name(self.keyspace_name) | |
| name = protect_name(self.name) | |
| selected_cols = '*' if self.include_all_columns else ', '.join(protect_name(col.name) for col in self.columns.values()) | |
| base_table = protect_name(self.base_table_name) | |
| where_clause = self.where_clause | |
| part_key = ', '.join(protect_name(col.name) for col in self.partition_key) | |
| if len(self.partition_key) > 1: | |
| pk = "((%s)" % part_key | |
| else: | |
| pk = "(%s" % part_key | |
| if self.clustering_key: | |
| pk += ", %s" % ', '.join(protect_name(col.name) for col in self.clustering_key) | |
| pk += ")" | |
| properties = TableMetadataV3._property_string(formatted, self.clustering_key, self.options) | |
| ret = ("CREATE MATERIALIZED VIEW %(keyspace)s.%(name)s AS%(sep)s" | |
| "SELECT %(selected_cols)s%(sep)s" | |
| "FROM %(keyspace)s.%(base_table)s%(sep)s" | |
| "WHERE %(where_clause)s%(sep)s" | |
| "PRIMARY KEY %(pk)s%(sep)s" | |
| "WITH %(properties)s") % locals() | |
| if self.extensions: | |
| registry = _RegisteredExtensionType._extension_registry | |
| for k in registry.keys() & self.extensions: # no viewkeys on OrderedMapSerializeKey | |
| ext = registry[k] | |
| cql = ext.after_table_cql(self, k, self.extensions[k]) | |
| if cql: | |
| ret += "\n\n%s" % (cql,) | |
| return ret | |
| def export_as_string(self): | |
| return self.as_cql_query(formatted=True) + ";" | |
| class VertexMetadata(object): | |
| """ | |
| A representation of a vertex on a table | |
| """ | |
| keyspace_name = None | |
| """ A string name of the keyspace. """ | |
| table_name = None | |
| """ A string name of the table this vertex is on. """ | |
| label_name = None | |
| """ A string name of the label of this vertex.""" | |
| def __init__(self, keyspace_name, table_name, label_name): | |
| self.keyspace_name = keyspace_name | |
| self.table_name = table_name | |
| self.label_name = label_name | |
| class EdgeMetadata(object): | |
| """ | |
| A representation of an edge on a table | |
| """ | |
| keyspace_name = None | |
| """A string name of the keyspace """ | |
| table_name = None | |
| """A string name of the table this edge is on""" | |
| label_name = None | |
| """A string name of the label of this edge""" | |
| from_table = None | |
| """A string name of the from table of this edge (incoming vertex)""" | |
| from_label = None | |
| """A string name of the from table label of this edge (incoming vertex)""" | |
| from_partition_key_columns = None | |
| """The columns that match the partition key of the incoming vertex table.""" | |
| from_clustering_columns = None | |
| """The columns that match the clustering columns of the incoming vertex table.""" | |
| to_table = None | |
| """A string name of the to table of this edge (outgoing vertex)""" | |
| to_label = None | |
| """A string name of the to table label of this edge (outgoing vertex)""" | |
| to_partition_key_columns = None | |
| """The columns that match the partition key of the outgoing vertex table.""" | |
| to_clustering_columns = None | |
| """The columns that match the clustering columns of the outgoing vertex table.""" | |
| def __init__( | |
| self, keyspace_name, table_name, label_name, from_table, | |
| from_label, from_partition_key_columns, from_clustering_columns, | |
| to_table, to_label, to_partition_key_columns, | |
| to_clustering_columns): | |
| self.keyspace_name = keyspace_name | |
| self.table_name = table_name | |
| self.label_name = label_name | |
| self.from_table = from_table | |
| self.from_label = from_label | |
| self.from_partition_key_columns = from_partition_key_columns | |
| self.from_clustering_columns = from_clustering_columns | |
| self.to_table = to_table | |
| self.to_label = to_label | |
| self.to_partition_key_columns = to_partition_key_columns | |
| self.to_clustering_columns = to_clustering_columns | |
| def get_schema_parser(connection, server_version, dse_version, timeout): | |
| version = Version(server_version) | |
| if dse_version: | |
| v = Version(dse_version) | |
| if v >= Version('6.8.0'): | |
| return SchemaParserDSE68(connection, timeout) | |
| elif v >= Version('6.7.0'): | |
| return SchemaParserDSE67(connection, timeout) | |
| elif v >= Version('6.0.0'): | |
| return SchemaParserDSE60(connection, timeout) | |
| if version >= Version('4-a'): | |
| return SchemaParserV4(connection, timeout) | |
| elif version >= Version('3.0.0'): | |
| return SchemaParserV3(connection, timeout) | |
| else: | |
| # we could further specialize by version. Right now just refactoring the | |
| # multi-version parser we have as of C* 2.2.0rc1. | |
| return SchemaParserV22(connection, timeout) | |
| def _cql_from_cass_type(cass_type): | |
| """ | |
| A string representation of the type for this column, such as "varchar" | |
| or "map<string, int>". | |
| """ | |
| if issubclass(cass_type, types.ReversedType): | |
| return cass_type.subtypes[0].cql_parameterized_type() | |
| else: | |
| return cass_type.cql_parameterized_type() | |
| class RLACTableExtension(RegisteredTableExtension): | |
| name = "DSE_RLACA" | |
| def after_table_cql(cls, table_meta, ext_key, ext_blob): | |
| return "RESTRICT ROWS ON %s.%s USING %s;" % (protect_name(table_meta.keyspace_name), | |
| protect_name(table_meta.name), | |
| protect_name(ext_blob.decode('utf-8'))) | |
| NO_VALID_REPLICA = object() | |
| def group_keys_by_replica(session, keyspace, table, keys): | |
| """ | |
| Returns a :class:`dict` with the keys grouped per host. This can be | |
| used to more accurately group by IN clause or to batch the keys per host. | |
| If a valid replica is not found for a particular key it will be grouped under | |
| :class:`~.NO_VALID_REPLICA` | |
| Example usage:: | |
| result = group_keys_by_replica( | |
| session, "system", "peers", | |
| (("127.0.0.1", ), ("127.0.0.2", )) | |
| ) | |
| """ | |
| cluster = session.cluster | |
| partition_keys = cluster.metadata.keyspaces[keyspace].tables[table].partition_key | |
| serializers = list(types._cqltypes[partition_key.cql_type] for partition_key in partition_keys) | |
| keys_per_host = defaultdict(list) | |
| distance = cluster._default_load_balancing_policy.distance | |
| for key in keys: | |
| serialized_key = [serializer.serialize(pk, cluster.protocol_version) | |
| for serializer, pk in zip(serializers, key)] | |
| if len(serialized_key) == 1: | |
| routing_key = serialized_key[0] | |
| else: | |
| routing_key = b"".join(struct.pack(">H%dsB" % len(p), len(p), p, 0) for p in serialized_key) | |
| all_replicas = cluster.metadata.get_replicas(keyspace, routing_key) | |
| # First check if there are local replicas | |
| valid_replicas = [host for host in all_replicas if | |
| host.is_up and distance(host) == HostDistance.LOCAL] | |
| if not valid_replicas: | |
| valid_replicas = [host for host in all_replicas if host.is_up] | |
| if valid_replicas: | |
| keys_per_host[random.choice(valid_replicas)].append(key) | |
| else: | |
| # We will group under this statement all the keys for which | |
| # we haven't found a valid replica | |
| keys_per_host[NO_VALID_REPLICA].append(key) | |
| return dict(keys_per_host) | |
| # TODO next major reorg | |
| class _NodeInfo(object): | |
| """ | |
| Internal utility functions to determine the different host addresses/ports | |
| from a local or peers row. | |
| """ | |
| def get_broadcast_rpc_address(row): | |
| # TODO next major, change the parsing logic to avoid any | |
| # overriding of a non-null value | |
| addr = row.get("rpc_address") | |
| if "native_address" in row: | |
| addr = row.get("native_address") | |
| if "native_transport_address" in row: | |
| addr = row.get("native_transport_address") | |
| if not addr or addr in ["0.0.0.0", "::"]: | |
| addr = row.get("peer") | |
| return addr | |
| def get_broadcast_rpc_port(row): | |
| port = row.get("rpc_port") | |
| if port is None or port == 0: | |
| port = row.get("native_port") | |
| return port if port and port > 0 else None | |
| def get_broadcast_address(row): | |
| addr = row.get("broadcast_address") | |
| if addr is None: | |
| addr = row.get("peer") | |
| return addr | |
| def get_broadcast_port(row): | |
| port = row.get("broadcast_port") | |
| if port is None or port == 0: | |
| port = row.get("peer_port") | |
| return port if port and port > 0 else None | |