Spaces:
Running
Running
File size: 7,876 Bytes
2a0bc63 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 |
from cassandra.connection import Connection, ConnectionShutdown
import asyncio
import logging
import os
import socket
import ssl
from threading import Lock, Thread, get_ident
log = logging.getLogger(__name__)
# This module uses ``yield from`` and ``@asyncio.coroutine`` over ``await`` and
# ``async def`` for pre-Python-3.5 compatibility, so keep in mind that the
# managed coroutines are generator-based, not native coroutines. See PEP 492:
# https://www.python.org/dev/peps/pep-0492/#coroutine-objects
try:
asyncio.run_coroutine_threadsafe
except AttributeError:
raise ImportError(
'Cannot use asyncioreactor without access to '
'asyncio.run_coroutine_threadsafe (added in 3.4.6 and 3.5.1)'
)
class AsyncioTimer(object):
"""
An ``asyncioreactor``-specific Timer. Similar to :class:`.connection.Timer,
but with a slightly different API due to limitations in the underlying
``call_later`` interface. Not meant to be used with a
:class:`.connection.TimerManager`.
"""
@property
def end(self):
raise NotImplementedError('{} is not compatible with TimerManager and '
'does not implement .end()')
def __init__(self, timeout, callback, loop):
delayed = self._call_delayed_coro(timeout=timeout,
callback=callback)
self._handle = asyncio.run_coroutine_threadsafe(delayed, loop=loop)
@staticmethod
async def _call_delayed_coro(timeout, callback):
await asyncio.sleep(timeout)
return callback()
def __lt__(self, other):
try:
return self._handle < other._handle
except AttributeError:
raise NotImplemented
def cancel(self):
self._handle.cancel()
def finish(self):
# connection.Timer method not implemented here because we can't inspect
# the Handle returned from call_later
raise NotImplementedError('{} is not compatible with TimerManager and '
'does not implement .finish()')
class AsyncioConnection(Connection):
"""
An experimental implementation of :class:`.Connection` that uses the
``asyncio`` module in the Python standard library for its event loop.
Note that it requires ``asyncio`` features that were only introduced in the
3.4 line in 3.4.6, and in the 3.5 line in 3.5.1.
"""
_loop = None
_pid = os.getpid()
_lock = Lock()
_loop_thread = None
_write_queue = None
_write_queue_lock = None
def __init__(self, *args, **kwargs):
Connection.__init__(self, *args, **kwargs)
self._connect_socket()
self._socket.setblocking(0)
self._write_queue = asyncio.Queue()
self._write_queue_lock = asyncio.Lock()
# see initialize_reactor -- loop is running in a separate thread, so we
# have to use a threadsafe call
self._read_watcher = asyncio.run_coroutine_threadsafe(
self.handle_read(), loop=self._loop
)
self._write_watcher = asyncio.run_coroutine_threadsafe(
self.handle_write(), loop=self._loop
)
self._send_options_message()
@classmethod
def initialize_reactor(cls):
with cls._lock:
if cls._pid != os.getpid():
cls._loop = None
if cls._loop is None:
cls._loop = asyncio.new_event_loop()
asyncio.set_event_loop(cls._loop)
if not cls._loop_thread:
# daemonize so the loop will be shut down on interpreter
# shutdown
cls._loop_thread = Thread(target=cls._loop.run_forever,
daemon=True, name="asyncio_thread")
cls._loop_thread.start()
@classmethod
def create_timer(cls, timeout, callback):
return AsyncioTimer(timeout, callback, loop=cls._loop)
def close(self):
with self.lock:
if self.is_closed:
return
self.is_closed = True
# close from the loop thread to avoid races when removing file
# descriptors
asyncio.run_coroutine_threadsafe(
self._close(), loop=self._loop
)
async def _close(self):
log.debug("Closing connection (%s) to %s" % (id(self), self.endpoint))
if self._write_watcher:
self._write_watcher.cancel()
if self._read_watcher:
self._read_watcher.cancel()
if self._socket:
self._loop.remove_writer(self._socket.fileno())
self._loop.remove_reader(self._socket.fileno())
self._socket.close()
log.debug("Closed socket to %s" % (self.endpoint,))
if not self.is_defunct:
self.error_all_requests(
ConnectionShutdown("Connection to %s was closed" % self.endpoint))
# don't leave in-progress operations hanging
self.connected_event.set()
def push(self, data):
buff_size = self.out_buffer_size
if len(data) > buff_size:
chunks = []
for i in range(0, len(data), buff_size):
chunks.append(data[i:i + buff_size])
else:
chunks = [data]
if self._loop_thread.ident != get_ident():
asyncio.run_coroutine_threadsafe(
self._push_msg(chunks),
loop=self._loop
)
else:
# avoid races/hangs by just scheduling this, not using threadsafe
self._loop.create_task(self._push_msg(chunks))
async def _push_msg(self, chunks):
# This lock ensures all chunks of a message are sequential in the Queue
with await self._write_queue_lock:
for chunk in chunks:
self._write_queue.put_nowait(chunk)
async def handle_write(self):
while True:
try:
next_msg = await self._write_queue.get()
if next_msg:
await self._loop.sock_sendall(self._socket, next_msg)
except socket.error as err:
log.debug("Exception in send for %s: %s", self, err)
self.defunct(err)
return
except asyncio.CancelledError:
return
async def handle_read(self):
while True:
try:
buf = await self._loop.sock_recv(self._socket, self.in_buffer_size)
self._iobuf.write(buf)
# sock_recv expects EWOULDBLOCK if socket provides no data, but
# nonblocking ssl sockets raise these instead, so we handle them
# ourselves by yielding to the event loop, where the socket will
# get the reading/writing it "wants" before retrying
except (ssl.SSLWantWriteError, ssl.SSLWantReadError):
# Apparently the preferred way to yield to the event loop from within
# a native coroutine based on https://github.com/python/asyncio/issues/284
await asyncio.sleep(0)
continue
except socket.error as err:
log.debug("Exception during socket recv for %s: %s",
self, err)
self.defunct(err)
return # leave the read loop
except asyncio.CancelledError:
return
if buf and self._iobuf.tell():
self.process_io_buffer()
else:
log.debug("Connection %s closed by server", self)
self.close()
return
|