Spaces:
Running
Running
File size: 6,040 Bytes
2a0bc63 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 |
# Copyright DataStax, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
This module provides an optional protocol parser that returns
NumPy arrays.
=============================================================================
This module should not be imported by any of the main python-driver modules,
as numpy is an optional dependency.
=============================================================================
"""
include "ioutils.pyx"
cimport cython
from libc.stdint cimport uint64_t, uint8_t
from cpython.ref cimport Py_INCREF, PyObject
from cassandra.bytesio cimport BytesIOReader
from cassandra.deserializers cimport Deserializer, from_binary
from cassandra.parsing cimport ParseDesc, ColumnParser, RowParser
from cassandra import cqltypes
from cassandra.util import is_little_endian
import numpy as np
cdef extern from "numpyFlags.h":
# Include 'numpyFlags.h' into the generated C code to disable the
# deprecated NumPy API
pass
cdef extern from "Python.h":
# An integer type large enough to hold a pointer
ctypedef uint64_t Py_uintptr_t
# Simple array descriptor, useful to parse rows into a NumPy array
ctypedef struct ArrDesc:
Py_uintptr_t buf_ptr
int stride # should be large enough as we allocate contiguous arrays
int is_object
Py_uintptr_t mask_ptr
arrDescDtype = np.dtype(
[ ('buf_ptr', np.uintp)
, ('stride', np.dtype('i'))
, ('is_object', np.dtype('i'))
, ('mask_ptr', np.uintp)
], align=True)
_cqltype_to_numpy = {
cqltypes.LongType: np.dtype('>i8'),
cqltypes.CounterColumnType: np.dtype('>i8'),
cqltypes.Int32Type: np.dtype('>i4'),
cqltypes.ShortType: np.dtype('>i2'),
cqltypes.FloatType: np.dtype('>f4'),
cqltypes.DoubleType: np.dtype('>f8'),
}
obj_dtype = np.dtype('O')
cdef uint8_t mask_true = 0x01
cdef class NumpyParser(ColumnParser):
"""Decode a ResultMessage into a bunch of NumPy arrays"""
cpdef parse_rows(self, BytesIOReader reader, ParseDesc desc):
cdef Py_ssize_t rowcount
cdef ArrDesc[::1] array_descs
cdef ArrDesc *arrs
rowcount = read_int(reader)
array_descs, arrays = make_arrays(desc, rowcount)
arrs = &array_descs[0]
_parse_rows(reader, desc, arrs, rowcount)
arrays = [make_native_byteorder(arr) for arr in arrays]
result = dict(zip(desc.colnames, arrays))
return result
cdef _parse_rows(BytesIOReader reader, ParseDesc desc,
ArrDesc *arrs, Py_ssize_t rowcount):
cdef Py_ssize_t i
for i in range(rowcount):
unpack_row(reader, desc, arrs)
### Helper functions to create NumPy arrays and array descriptors
def make_arrays(ParseDesc desc, array_size):
"""
Allocate arrays for each result column.
returns a tuple of (array_descs, arrays), where
'array_descs' describe the arrays for NativeRowParser and
'arrays' is a dict mapping column names to arrays
(e.g. this can be fed into pandas.DataFrame)
"""
array_descs = np.empty((desc.rowsize,), arrDescDtype)
arrays = []
for i, coltype in enumerate(desc.coltypes):
arr = make_array(coltype, array_size)
array_descs[i]['buf_ptr'] = arr.ctypes.data
array_descs[i]['stride'] = arr.strides[0]
array_descs[i]['is_object'] = arr.dtype is obj_dtype
try:
array_descs[i]['mask_ptr'] = arr.mask.ctypes.data
except AttributeError:
array_descs[i]['mask_ptr'] = 0
arrays.append(arr)
return array_descs, arrays
def make_array(coltype, array_size):
"""
Allocate a new NumPy array of the given column type and size.
"""
try:
a = np.ma.empty((array_size,), dtype=_cqltype_to_numpy[coltype])
a.mask = np.zeros((array_size,), dtype=np.bool)
except KeyError:
a = np.empty((array_size,), dtype=obj_dtype)
return a
#### Parse rows into NumPy arrays
@cython.boundscheck(False)
@cython.wraparound(False)
cdef inline int unpack_row(
BytesIOReader reader, ParseDesc desc, ArrDesc *arrays) except -1:
cdef Buffer buf
cdef Py_ssize_t i, rowsize = desc.rowsize
cdef ArrDesc arr
cdef Deserializer deserializer
for i in range(rowsize):
get_buf(reader, &buf)
arr = arrays[i]
if arr.is_object:
deserializer = desc.deserializers[i]
val = from_binary(deserializer, &buf, desc.protocol_version)
Py_INCREF(val)
(<PyObject **> arr.buf_ptr)[0] = <PyObject *> val
elif buf.size >= 0:
memcpy(<char *> arr.buf_ptr, buf.ptr, buf.size)
else:
memcpy(<char *>arr.mask_ptr, &mask_true, 1)
# Update the pointer into the array for the next time
arrays[i].buf_ptr += arr.stride
arrays[i].mask_ptr += 1
return 0
def make_native_byteorder(arr):
"""
Make sure all values have a native endian in the NumPy arrays.
"""
if is_little_endian and not arr.dtype.kind == 'O':
# We have arrays in big-endian order. First swap the bytes
# into little endian order, and then update the numpy dtype
# accordingly (e.g. from '>i8' to '<i8')
#
# Ignore any object arrays of dtype('O')
return arr.byteswap().newbyteorder()
return arr
|