Spaces:
Paused
Paused
from typing import Union | |
from fastapi.websockets import WebSocket, WebSocketDisconnect | |
class Accelerator: | |
ws: Union[WebSocket, None] = None | |
def connected(self): return self.ws != None | |
async def connect(self, ws: WebSocket): | |
await ws.accept() | |
self.ws = ws | |
async def accelerate(self, input): | |
try: | |
await self.ws.send_text(input) | |
return await self.ws.receive_text() | |
except WebSocketDisconnect: | |
self.ws = None | |