Spaces:
Running
Running
File size: 1,569 Bytes
2a0bc63 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 |
#!/usr/bin/env python
from struct import unpack
from six import BytesIO, b
from . import dumps, loads
def _bintoint(data):
return unpack("<i", data)[0]
def sendobj(self, obj):
"""
Atomically send a BSON message.
"""
data = dumps(obj)
self.sendall(data)
def recvobj(self):
"""
Atomic read of a BSON message.
This function either returns a dict, None, or raises a socket error.
If the return value is None, it means the socket is closed by the other side.
"""
sock_buf = self.recvbytes(4)
if sock_buf is None:
return None
message_length = _bintoint(sock_buf.getvalue())
sock_buf = self.recvbytes(message_length - 4, sock_buf)
if sock_buf is None:
return None
retval = loads(sock_buf.getvalue())
return retval
def recvbytes(self, bytes_needed, sock_buf = None):
"""
Atomic read of bytes_needed bytes.
This function either returns exactly the nmber of bytes requested in a
StringIO buffer, None, or raises a socket error.
If the return value is None, it means the socket is closed by the other side.
"""
if sock_buf is None:
sock_buf = BytesIO()
bytes_count = 0
while bytes_count < bytes_needed:
chunk = self.recv(min(bytes_needed - bytes_count, 32768))
part_count = len(chunk)
if type(chunk) == str:
chunk = b(chunk)
if part_count < 1:
return None
bytes_count += part_count
sock_buf.write(chunk)
return sock_buf
|