Spaces:
Runtime error
Runtime error
import socket | |
from socketserver import BaseRequestHandler, TCPServer | |
from typing import Any, Optional, Tuple, Type | |
class RoboflowTCPServer(TCPServer): | |
def __init__( | |
self, | |
server_address: Tuple[str, int], | |
handler_class: Type[BaseRequestHandler], | |
socket_operations_timeout: Optional[float] = None, | |
): | |
TCPServer.__init__(self, server_address, handler_class) | |
self._socket_operations_timeout = socket_operations_timeout | |
def get_request(self) -> Tuple[socket.socket, Any]: | |
connection, address = self.socket.accept() | |
connection.settimeout(self._socket_operations_timeout) | |
return connection, address | |