# Protocol Buffers - Google's data interchange format # Copyright 2008 Google Inc. All rights reserved. # # Use of this source code is governed by a BSD-style # license that can be found in the LICENSE file or at # https://developers.google.com/open-source/licenses/bsd """Contains the Nextgen Pythonic protobuf APIs.""" import io from typing import Type, TypeVar from google.protobuf.internal import decoder from google.protobuf.internal import encoder from google.protobuf.message import Message _MESSAGE = TypeVar('_MESSAGE', bound='Message') def serialize(message: _MESSAGE, deterministic: bool = None) -> bytes: """Return the serialized proto. Args: message: The proto message to be serialized. deterministic: If true, requests deterministic serialization of the protobuf, with predictable ordering of map keys. Returns: A binary bytes representation of the message. """ return message.SerializeToString(deterministic=deterministic) def parse(message_class: Type[_MESSAGE], payload: bytes) -> _MESSAGE: """Given a serialized data in binary form, deserialize it into a Message. Args: message_class: The message meta class. payload: A serialized bytes in binary form. Returns: A new message deserialized from payload. """ new_message = message_class() new_message.ParseFromString(payload) return new_message def serialize_length_prefixed(message: _MESSAGE, output: io.BytesIO) -> None: """Writes the size of the message as a varint and the serialized message. Writes the size of the message as a varint and then the serialized message. This allows more data to be written to the output after the message. Use parse_length_prefixed to parse messages written by this method. The output stream must be buffered, e.g. using https://docs.python.org/3/library/io.html#buffered-streams. Example usage: out = io.BytesIO() for msg in message_list: proto.serialize_length_prefixed(msg, out) Args: message: The protocol buffer message that should be serialized. output: BytesIO or custom buffered IO that data should be written to. """ size = message.ByteSize() encoder._VarintEncoder()(output.write, size) out_size = output.write(serialize(message)) if out_size != size: raise TypeError( 'Failed to write complete message (wrote: %d, expected: %d)' '. Ensure output is using buffered IO.' % (out_size, size) ) def parse_length_prefixed( message_class: Type[_MESSAGE], input_bytes: io.BytesIO ) -> _MESSAGE: """Parse a message from input_bytes. Args: message_class: The protocol buffer message class that parser should parse. input_bytes: A buffered input. Example usage: while True: msg = proto.parse_length_prefixed(message_class, input_bytes) if msg is None: break ... Returns: A parsed message if successful. None if input_bytes is at EOF. """ size = decoder._DecodeVarint(input_bytes) if size is None: # It is the end of buffered input. See example usage in the # API description. return None message = message_class() if size == 0: return message parsed_size = message.ParseFromString(input_bytes.read(size)) if parsed_size != size: raise ValueError( 'Truncated message or non-buffered input_bytes: ' 'Expected {0} bytes but only {1} bytes parsed for ' '{2}.'.format(size, parsed_size, message.DESCRIPTOR.name) ) return message