Spaces:
Runtime error
Runtime error
import asyncio | |
from utils.logger import Logger | |
logger = Logger.get_logger(__name__) | |
class RequestCancellation: | |
""" | |
RequestCancellation middleware handles request canceling | |
* In case of API routes where very frequent/expensive requests are made. | |
""" | |
def __init__(self, app): | |
self.app = app | |
async def __call__(self, scope, receive, send): | |
if scope["type"] != "http": | |
await self.app(scope, receive, send) | |
return | |
queue = asyncio.Queue() | |
async def message_poller(sentinel, handler_task): | |
nonlocal queue | |
while True: | |
message = await receive() | |
if message["type"] == "http.disconnect": | |
handler_task.cancel() | |
return sentinel | |
await queue.put(message) | |
sentinel = object() | |
handler_task = asyncio.create_task(self.app(scope, queue.get, send)) | |
asyncio.create_task(message_poller(sentinel, handler_task)) | |
try: | |
return await handler_task | |
except asyncio.CancelledError: | |
logger.info('Task Cancellatation Requested.') | |