Spaces:
Running
on
Zero
Running
on
Zero
from __future__ import annotations | |
import asyncio | |
import ctypes | |
import random | |
import sys | |
from typing import Literal | |
from dask.utils import parse_timedelta | |
from distributed.diagnostics.plugin import WorkerPlugin | |
class KillWorker(WorkerPlugin): | |
"""Kill Workers Randomly | |
This kills workers in a cluster randomly. It is intended to be used in | |
stress testing. | |
Parameters | |
---------- | |
delay: str | |
The expected amount of time for a worker to live. | |
The actual time will vary, treating worker death as a poisson process. | |
mode: str | |
or "graceful" which calls worker.close(...) | |
Either "sys.exit" which calls sys.exit(0) | |
or "segfault" which triggers a segfault | |
""" | |
def __init__( | |
self, | |
delay: str | int | float = "100 s", | |
mode: Literal["sys.exit", "graceful", "segfault"] = "sys.exit", | |
): | |
self.delay = parse_timedelta(delay) | |
if mode not in ("sys.exit", "graceful", "segfault"): | |
raise ValueError( | |
f"Three modes supported, 'sys.exit', 'graceful', and 'segfault'. " | |
f"got {mode!r}" | |
) | |
self.mode = mode | |
async def setup(self, worker): | |
self.worker = worker | |
if self.mode == "graceful": | |
f = self.graceful | |
elif self.mode == "sys.exit": | |
f = self.sys_exit | |
elif self.mode == "segfault": | |
f = self.segfault | |
self.worker.loop.asyncio_loop.call_later( | |
delay=random.expovariate(1 / self.delay), | |
callback=f, | |
) | |
def graceful(self): | |
asyncio.create_task(self.worker.close(nanny=False, executor_wait=False)) | |
def sys_exit(self): | |
sys.exit(0) | |
def segfault(self): | |
""" | |
Magic, from https://gist.github.com/coolreader18/6dbe0be2ae2192e90e1a809f1624c694?permalink_comment_id=3874116#gistcomment-3874116 | |
""" | |
ctypes.string_at(0) | |