|
|
|
from collections import namedtuple |
|
import datetime |
|
import sys |
|
import struct |
|
|
|
|
|
PY2 = sys.version_info[0] == 2 |
|
|
|
if PY2: |
|
int_types = (int, long) |
|
_utc = None |
|
else: |
|
int_types = int |
|
try: |
|
_utc = datetime.timezone.utc |
|
except AttributeError: |
|
_utc = datetime.timezone(datetime.timedelta(0)) |
|
|
|
|
|
class ExtType(namedtuple("ExtType", "code data")): |
|
"""ExtType represents ext type in msgpack.""" |
|
|
|
def __new__(cls, code, data): |
|
if not isinstance(code, int): |
|
raise TypeError("code must be int") |
|
if not isinstance(data, bytes): |
|
raise TypeError("data must be bytes") |
|
if not 0 <= code <= 127: |
|
raise ValueError("code must be 0~127") |
|
return super(ExtType, cls).__new__(cls, code, data) |
|
|
|
|
|
class Timestamp(object): |
|
"""Timestamp represents the Timestamp extension type in msgpack. |
|
|
|
When built with Cython, msgpack uses C methods to pack and unpack `Timestamp`. When using pure-Python |
|
msgpack, :func:`to_bytes` and :func:`from_bytes` are used to pack and unpack `Timestamp`. |
|
|
|
This class is immutable: Do not override seconds and nanoseconds. |
|
""" |
|
|
|
__slots__ = ["seconds", "nanoseconds"] |
|
|
|
def __init__(self, seconds, nanoseconds=0): |
|
"""Initialize a Timestamp object. |
|
|
|
:param int seconds: |
|
Number of seconds since the UNIX epoch (00:00:00 UTC Jan 1 1970, minus leap seconds). |
|
May be negative. |
|
|
|
:param int nanoseconds: |
|
Number of nanoseconds to add to `seconds` to get fractional time. |
|
Maximum is 999_999_999. Default is 0. |
|
|
|
Note: Negative times (before the UNIX epoch) are represented as negative seconds + positive ns. |
|
""" |
|
if not isinstance(seconds, int_types): |
|
raise TypeError("seconds must be an interger") |
|
if not isinstance(nanoseconds, int_types): |
|
raise TypeError("nanoseconds must be an integer") |
|
if not (0 <= nanoseconds < 10**9): |
|
raise ValueError( |
|
"nanoseconds must be a non-negative integer less than 999999999." |
|
) |
|
self.seconds = seconds |
|
self.nanoseconds = nanoseconds |
|
|
|
def __repr__(self): |
|
"""String representation of Timestamp.""" |
|
return "Timestamp(seconds={0}, nanoseconds={1})".format( |
|
self.seconds, self.nanoseconds |
|
) |
|
|
|
def __eq__(self, other): |
|
"""Check for equality with another Timestamp object""" |
|
if type(other) is self.__class__: |
|
return ( |
|
self.seconds == other.seconds and self.nanoseconds == other.nanoseconds |
|
) |
|
return False |
|
|
|
def __ne__(self, other): |
|
"""not-equals method (see :func:`__eq__()`)""" |
|
return not self.__eq__(other) |
|
|
|
def __hash__(self): |
|
return hash((self.seconds, self.nanoseconds)) |
|
|
|
@staticmethod |
|
def from_bytes(b): |
|
"""Unpack bytes into a `Timestamp` object. |
|
|
|
Used for pure-Python msgpack unpacking. |
|
|
|
:param b: Payload from msgpack ext message with code -1 |
|
:type b: bytes |
|
|
|
:returns: Timestamp object unpacked from msgpack ext payload |
|
:rtype: Timestamp |
|
""" |
|
if len(b) == 4: |
|
seconds = struct.unpack("!L", b)[0] |
|
nanoseconds = 0 |
|
elif len(b) == 8: |
|
data64 = struct.unpack("!Q", b)[0] |
|
seconds = data64 & 0x00000003FFFFFFFF |
|
nanoseconds = data64 >> 34 |
|
elif len(b) == 12: |
|
nanoseconds, seconds = struct.unpack("!Iq", b) |
|
else: |
|
raise ValueError( |
|
"Timestamp type can only be created from 32, 64, or 96-bit byte objects" |
|
) |
|
return Timestamp(seconds, nanoseconds) |
|
|
|
def to_bytes(self): |
|
"""Pack this Timestamp object into bytes. |
|
|
|
Used for pure-Python msgpack packing. |
|
|
|
:returns data: Payload for EXT message with code -1 (timestamp type) |
|
:rtype: bytes |
|
""" |
|
if (self.seconds >> 34) == 0: |
|
data64 = self.nanoseconds << 34 | self.seconds |
|
if data64 & 0xFFFFFFFF00000000 == 0: |
|
|
|
data = struct.pack("!L", data64) |
|
else: |
|
|
|
data = struct.pack("!Q", data64) |
|
else: |
|
|
|
data = struct.pack("!Iq", self.nanoseconds, self.seconds) |
|
return data |
|
|
|
@staticmethod |
|
def from_unix(unix_sec): |
|
"""Create a Timestamp from posix timestamp in seconds. |
|
|
|
:param unix_float: Posix timestamp in seconds. |
|
:type unix_float: int or float. |
|
""" |
|
seconds = int(unix_sec // 1) |
|
nanoseconds = int((unix_sec % 1) * 10**9) |
|
return Timestamp(seconds, nanoseconds) |
|
|
|
def to_unix(self): |
|
"""Get the timestamp as a floating-point value. |
|
|
|
:returns: posix timestamp |
|
:rtype: float |
|
""" |
|
return self.seconds + self.nanoseconds / 1e9 |
|
|
|
@staticmethod |
|
def from_unix_nano(unix_ns): |
|
"""Create a Timestamp from posix timestamp in nanoseconds. |
|
|
|
:param int unix_ns: Posix timestamp in nanoseconds. |
|
:rtype: Timestamp |
|
""" |
|
return Timestamp(*divmod(unix_ns, 10**9)) |
|
|
|
def to_unix_nano(self): |
|
"""Get the timestamp as a unixtime in nanoseconds. |
|
|
|
:returns: posix timestamp in nanoseconds |
|
:rtype: int |
|
""" |
|
return self.seconds * 10**9 + self.nanoseconds |
|
|
|
def to_datetime(self): |
|
"""Get the timestamp as a UTC datetime. |
|
|
|
Python 2 is not supported. |
|
|
|
:rtype: datetime. |
|
""" |
|
return datetime.datetime.fromtimestamp(0, _utc) + datetime.timedelta( |
|
seconds=self.to_unix() |
|
) |
|
|
|
@staticmethod |
|
def from_datetime(dt): |
|
"""Create a Timestamp from datetime with tzinfo. |
|
|
|
Python 2 is not supported. |
|
|
|
:rtype: Timestamp |
|
""" |
|
return Timestamp.from_unix(dt.timestamp()) |
|
|