Spaces:
Runtime error
Runtime error
text-generation-webui
/
installer_files
/conda
/lib
/python3.10
/multiprocessing
/resource_sharer.py
| # | |
| # We use a background thread for sharing fds on Unix, and for sharing sockets on | |
| # Windows. | |
| # | |
| # A client which wants to pickle a resource registers it with the resource | |
| # sharer and gets an identifier in return. The unpickling process will connect | |
| # to the resource sharer, sends the identifier and its pid, and then receives | |
| # the resource. | |
| # | |
| import os | |
| import signal | |
| import socket | |
| import sys | |
| import threading | |
| from . import process | |
| from .context import reduction | |
| from . import util | |
| __all__ = ['stop'] | |
| if sys.platform == 'win32': | |
| __all__ += ['DupSocket'] | |
| class DupSocket(object): | |
| '''Picklable wrapper for a socket.''' | |
| def __init__(self, sock): | |
| new_sock = sock.dup() | |
| def send(conn, pid): | |
| share = new_sock.share(pid) | |
| conn.send_bytes(share) | |
| self._id = _resource_sharer.register(send, new_sock.close) | |
| def detach(self): | |
| '''Get the socket. This should only be called once.''' | |
| with _resource_sharer.get_connection(self._id) as conn: | |
| share = conn.recv_bytes() | |
| return socket.fromshare(share) | |
| else: | |
| __all__ += ['DupFd'] | |
| class DupFd(object): | |
| '''Wrapper for fd which can be used at any time.''' | |
| def __init__(self, fd): | |
| new_fd = os.dup(fd) | |
| def send(conn, pid): | |
| reduction.send_handle(conn, new_fd, pid) | |
| def close(): | |
| os.close(new_fd) | |
| self._id = _resource_sharer.register(send, close) | |
| def detach(self): | |
| '''Get the fd. This should only be called once.''' | |
| with _resource_sharer.get_connection(self._id) as conn: | |
| return reduction.recv_handle(conn) | |
| class _ResourceSharer(object): | |
| '''Manager for resources using background thread.''' | |
| def __init__(self): | |
| self._key = 0 | |
| self._cache = {} | |
| self._lock = threading.Lock() | |
| self._listener = None | |
| self._address = None | |
| self._thread = None | |
| util.register_after_fork(self, _ResourceSharer._afterfork) | |
| def register(self, send, close): | |
| '''Register resource, returning an identifier.''' | |
| with self._lock: | |
| if self._address is None: | |
| self._start() | |
| self._key += 1 | |
| self._cache[self._key] = (send, close) | |
| return (self._address, self._key) | |
| def get_connection(ident): | |
| '''Return connection from which to receive identified resource.''' | |
| from .connection import Client | |
| address, key = ident | |
| c = Client(address, authkey=process.current_process().authkey) | |
| c.send((key, os.getpid())) | |
| return c | |
| def stop(self, timeout=None): | |
| '''Stop the background thread and clear registered resources.''' | |
| from .connection import Client | |
| with self._lock: | |
| if self._address is not None: | |
| c = Client(self._address, | |
| authkey=process.current_process().authkey) | |
| c.send(None) | |
| c.close() | |
| self._thread.join(timeout) | |
| if self._thread.is_alive(): | |
| util.sub_warning('_ResourceSharer thread did ' | |
| 'not stop when asked') | |
| self._listener.close() | |
| self._thread = None | |
| self._address = None | |
| self._listener = None | |
| for key, (send, close) in self._cache.items(): | |
| close() | |
| self._cache.clear() | |
| def _afterfork(self): | |
| for key, (send, close) in self._cache.items(): | |
| close() | |
| self._cache.clear() | |
| self._lock._at_fork_reinit() | |
| if self._listener is not None: | |
| self._listener.close() | |
| self._listener = None | |
| self._address = None | |
| self._thread = None | |
| def _start(self): | |
| from .connection import Listener | |
| assert self._listener is None, "Already have Listener" | |
| util.debug('starting listener and thread for sending handles') | |
| self._listener = Listener(authkey=process.current_process().authkey) | |
| self._address = self._listener.address | |
| t = threading.Thread(target=self._serve) | |
| t.daemon = True | |
| t.start() | |
| self._thread = t | |
| def _serve(self): | |
| if hasattr(signal, 'pthread_sigmask'): | |
| signal.pthread_sigmask(signal.SIG_BLOCK, signal.valid_signals()) | |
| while 1: | |
| try: | |
| with self._listener.accept() as conn: | |
| msg = conn.recv() | |
| if msg is None: | |
| break | |
| key, destination_pid = msg | |
| send, close = self._cache.pop(key) | |
| try: | |
| send(conn, destination_pid) | |
| finally: | |
| close() | |
| except: | |
| if not util.is_exiting(): | |
| sys.excepthook(*sys.exc_info()) | |
| _resource_sharer = _ResourceSharer() | |
| stop = _resource_sharer.stop | |