# fmt: off # flake8: noqa # Generated by the gRPC Python protocol compiler plugin. DO NOT EDIT! """Client and server classes corresponding to protobuf-defined services.""" import grpc import warnings import async_inference_pb2 as async__inference__pb2 GRPC_GENERATED_VERSION = '1.71.0' GRPC_VERSION = grpc.__version__ _version_not_supported = False try: from grpc._utilities import first_version_is_lower _version_not_supported = first_version_is_lower(GRPC_VERSION, GRPC_GENERATED_VERSION) except ImportError: _version_not_supported = True if _version_not_supported: raise RuntimeError( f'The grpc package installed is at version {GRPC_VERSION},' + f' but the generated code in async_inference_pb2_grpc.py depends on' + f' grpcio>={GRPC_GENERATED_VERSION}.' + f' Please upgrade your grpc module to grpcio>={GRPC_GENERATED_VERSION}' + f' or downgrade your generated code using grpcio-tools<={GRPC_VERSION}.' ) class AsyncInferenceStub: """AsyncInference: from Robot perspective Robot send observations to & executes action received from a remote Policy server """ def __init__(self, channel): """Constructor. Args: channel: A grpc.Channel. """ self.SendObservations = channel.stream_unary( '/async_inference.AsyncInference/SendObservations', request_serializer=async__inference__pb2.Observation.SerializeToString, response_deserializer=async__inference__pb2.Empty.FromString, _registered_method=True) self.StreamActions = channel.unary_stream( '/async_inference.AsyncInference/StreamActions', request_serializer=async__inference__pb2.Empty.SerializeToString, response_deserializer=async__inference__pb2.Action.FromString, _registered_method=True) self.SendPolicyInstructions = channel.unary_unary( '/async_inference.AsyncInference/SendPolicyInstructions', request_serializer=async__inference__pb2.PolicySetup.SerializeToString, response_deserializer=async__inference__pb2.Empty.FromString, _registered_method=True) self.Ready = channel.unary_unary( '/async_inference.AsyncInference/Ready', request_serializer=async__inference__pb2.Empty.SerializeToString, response_deserializer=async__inference__pb2.Empty.FromString, _registered_method=True) class AsyncInferenceServicer: """AsyncInference: from Robot perspective Robot send observations to & executes action received from a remote Policy server """ def SendObservations(self, request_iterator, context): """Robot -> Policy to share observations with a remote inference server Policy -> Robot to share actions predicted for given observations """ context.set_code(grpc.StatusCode.UNIMPLEMENTED) context.set_details('Method not implemented!') raise NotImplementedError('Method not implemented!') def StreamActions(self, request, context): """Missing associated documentation comment in .proto file.""" context.set_code(grpc.StatusCode.UNIMPLEMENTED) context.set_details('Method not implemented!') raise NotImplementedError('Method not implemented!') def SendPolicyInstructions(self, request, context): """Missing associated documentation comment in .proto file.""" context.set_code(grpc.StatusCode.UNIMPLEMENTED) context.set_details('Method not implemented!') raise NotImplementedError('Method not implemented!') def Ready(self, request, context): """Missing associated documentation comment in .proto file.""" context.set_code(grpc.StatusCode.UNIMPLEMENTED) context.set_details('Method not implemented!') raise NotImplementedError('Method not implemented!') def add_AsyncInferenceServicer_to_server(servicer, server): rpc_method_handlers = { 'SendObservations': grpc.stream_unary_rpc_method_handler( servicer.SendObservations, request_deserializer=async__inference__pb2.Observation.FromString, response_serializer=async__inference__pb2.Empty.SerializeToString, ), 'StreamActions': grpc.unary_stream_rpc_method_handler( servicer.StreamActions, request_deserializer=async__inference__pb2.Empty.FromString, response_serializer=async__inference__pb2.Action.SerializeToString, ), 'SendPolicyInstructions': grpc.unary_unary_rpc_method_handler( servicer.SendPolicyInstructions, request_deserializer=async__inference__pb2.PolicySetup.FromString, response_serializer=async__inference__pb2.Empty.SerializeToString, ), 'Ready': grpc.unary_unary_rpc_method_handler( servicer.Ready, request_deserializer=async__inference__pb2.Empty.FromString, response_serializer=async__inference__pb2.Empty.SerializeToString, ), } generic_handler = grpc.method_handlers_generic_handler( 'async_inference.AsyncInference', rpc_method_handlers) server.add_generic_rpc_handlers((generic_handler,)) server.add_registered_method_handlers('async_inference.AsyncInference', rpc_method_handlers) # This class is part of an EXPERIMENTAL API. class AsyncInference: """AsyncInference: from Robot perspective Robot send observations to & executes action received from a remote Policy server """ @staticmethod def SendObservations(request_iterator, target, options=(), channel_credentials=None, call_credentials=None, insecure=False, compression=None, wait_for_ready=None, timeout=None, metadata=None): return grpc.experimental.stream_unary( request_iterator, target, '/async_inference.AsyncInference/SendObservations', async__inference__pb2.Observation.SerializeToString, async__inference__pb2.Empty.FromString, options, channel_credentials, insecure, call_credentials, compression, wait_for_ready, timeout, metadata, _registered_method=True) @staticmethod def StreamActions(request, target, options=(), channel_credentials=None, call_credentials=None, insecure=False, compression=None, wait_for_ready=None, timeout=None, metadata=None): return grpc.experimental.unary_stream( request, target, '/async_inference.AsyncInference/StreamActions', async__inference__pb2.Empty.SerializeToString, async__inference__pb2.Action.FromString, options, channel_credentials, insecure, call_credentials, compression, wait_for_ready, timeout, metadata, _registered_method=True) @staticmethod def SendPolicyInstructions(request, target, options=(), channel_credentials=None, call_credentials=None, insecure=False, compression=None, wait_for_ready=None, timeout=None, metadata=None): return grpc.experimental.unary_unary( request, target, '/async_inference.AsyncInference/SendPolicyInstructions', async__inference__pb2.PolicySetup.SerializeToString, async__inference__pb2.Empty.FromString, options, channel_credentials, insecure, call_credentials, compression, wait_for_ready, timeout, metadata, _registered_method=True) @staticmethod def Ready(request, target, options=(), channel_credentials=None, call_credentials=None, insecure=False, compression=None, wait_for_ready=None, timeout=None, metadata=None): return grpc.experimental.unary_unary( request, target, '/async_inference.AsyncInference/Ready', async__inference__pb2.Empty.SerializeToString, async__inference__pb2.Empty.FromString, options, channel_credentials, insecure, call_credentials, compression, wait_for_ready, timeout, metadata, _registered_method=True)