from cassandra.connection import Connection, ConnectionShutdown import asyncio import logging import os import socket import ssl from threading import Lock, Thread, get_ident log = logging.getLogger(__name__) # This module uses ``yield from`` and ``@asyncio.coroutine`` over ``await`` and # ``async def`` for pre-Python-3.5 compatibility, so keep in mind that the # managed coroutines are generator-based, not native coroutines. See PEP 492: # https://www.python.org/dev/peps/pep-0492/#coroutine-objects try: asyncio.run_coroutine_threadsafe except AttributeError: raise ImportError( 'Cannot use asyncioreactor without access to ' 'asyncio.run_coroutine_threadsafe (added in 3.4.6 and 3.5.1)' ) class AsyncioTimer(object): """ An ``asyncioreactor``-specific Timer. Similar to :class:`.connection.Timer, but with a slightly different API due to limitations in the underlying ``call_later`` interface. Not meant to be used with a :class:`.connection.TimerManager`. """ @property def end(self): raise NotImplementedError('{} is not compatible with TimerManager and ' 'does not implement .end()') def __init__(self, timeout, callback, loop): delayed = self._call_delayed_coro(timeout=timeout, callback=callback) self._handle = asyncio.run_coroutine_threadsafe(delayed, loop=loop) @staticmethod async def _call_delayed_coro(timeout, callback): await asyncio.sleep(timeout) return callback() def __lt__(self, other): try: return self._handle < other._handle except AttributeError: raise NotImplemented def cancel(self): self._handle.cancel() def finish(self): # connection.Timer method not implemented here because we can't inspect # the Handle returned from call_later raise NotImplementedError('{} is not compatible with TimerManager and ' 'does not implement .finish()') class AsyncioConnection(Connection): """ An experimental implementation of :class:`.Connection` that uses the ``asyncio`` module in the Python standard library for its event loop. Note that it requires ``asyncio`` features that were only introduced in the 3.4 line in 3.4.6, and in the 3.5 line in 3.5.1. """ _loop = None _pid = os.getpid() _lock = Lock() _loop_thread = None _write_queue = None _write_queue_lock = None def __init__(self, *args, **kwargs): Connection.__init__(self, *args, **kwargs) self._connect_socket() self._socket.setblocking(0) self._write_queue = asyncio.Queue() self._write_queue_lock = asyncio.Lock() # see initialize_reactor -- loop is running in a separate thread, so we # have to use a threadsafe call self._read_watcher = asyncio.run_coroutine_threadsafe( self.handle_read(), loop=self._loop ) self._write_watcher = asyncio.run_coroutine_threadsafe( self.handle_write(), loop=self._loop ) self._send_options_message() @classmethod def initialize_reactor(cls): with cls._lock: if cls._pid != os.getpid(): cls._loop = None if cls._loop is None: cls._loop = asyncio.new_event_loop() asyncio.set_event_loop(cls._loop) if not cls._loop_thread: # daemonize so the loop will be shut down on interpreter # shutdown cls._loop_thread = Thread(target=cls._loop.run_forever, daemon=True, name="asyncio_thread") cls._loop_thread.start() @classmethod def create_timer(cls, timeout, callback): return AsyncioTimer(timeout, callback, loop=cls._loop) def close(self): with self.lock: if self.is_closed: return self.is_closed = True # close from the loop thread to avoid races when removing file # descriptors asyncio.run_coroutine_threadsafe( self._close(), loop=self._loop ) async def _close(self): log.debug("Closing connection (%s) to %s" % (id(self), self.endpoint)) if self._write_watcher: self._write_watcher.cancel() if self._read_watcher: self._read_watcher.cancel() if self._socket: self._loop.remove_writer(self._socket.fileno()) self._loop.remove_reader(self._socket.fileno()) self._socket.close() log.debug("Closed socket to %s" % (self.endpoint,)) if not self.is_defunct: self.error_all_requests( ConnectionShutdown("Connection to %s was closed" % self.endpoint)) # don't leave in-progress operations hanging self.connected_event.set() def push(self, data): buff_size = self.out_buffer_size if len(data) > buff_size: chunks = [] for i in range(0, len(data), buff_size): chunks.append(data[i:i + buff_size]) else: chunks = [data] if self._loop_thread.ident != get_ident(): asyncio.run_coroutine_threadsafe( self._push_msg(chunks), loop=self._loop ) else: # avoid races/hangs by just scheduling this, not using threadsafe self._loop.create_task(self._push_msg(chunks)) async def _push_msg(self, chunks): # This lock ensures all chunks of a message are sequential in the Queue with await self._write_queue_lock: for chunk in chunks: self._write_queue.put_nowait(chunk) async def handle_write(self): while True: try: next_msg = await self._write_queue.get() if next_msg: await self._loop.sock_sendall(self._socket, next_msg) except socket.error as err: log.debug("Exception in send for %s: %s", self, err) self.defunct(err) return except asyncio.CancelledError: return async def handle_read(self): while True: try: buf = await self._loop.sock_recv(self._socket, self.in_buffer_size) self._iobuf.write(buf) # sock_recv expects EWOULDBLOCK if socket provides no data, but # nonblocking ssl sockets raise these instead, so we handle them # ourselves by yielding to the event loop, where the socket will # get the reading/writing it "wants" before retrying except (ssl.SSLWantWriteError, ssl.SSLWantReadError): # Apparently the preferred way to yield to the event loop from within # a native coroutine based on https://github.com/python/asyncio/issues/284 await asyncio.sleep(0) continue except socket.error as err: log.debug("Exception during socket recv for %s: %s", self, err) self.defunct(err) return # leave the read loop except asyncio.CancelledError: return if buf and self._iobuf.tell(): self.process_io_buffer() else: log.debug("Connection %s closed by server", self) self.close() return