m7n's picture
first commit
d1ed09d
raw
history blame
1.95 kB
from __future__ import annotations
import asyncio
import ctypes
import random
import sys
from typing import Literal
from dask.utils import parse_timedelta
from distributed.diagnostics.plugin import WorkerPlugin
class KillWorker(WorkerPlugin):
"""Kill Workers Randomly
This kills workers in a cluster randomly. It is intended to be used in
stress testing.
Parameters
----------
delay: str
The expected amount of time for a worker to live.
The actual time will vary, treating worker death as a poisson process.
mode: str
or "graceful" which calls worker.close(...)
Either "sys.exit" which calls sys.exit(0)
or "segfault" which triggers a segfault
"""
def __init__(
self,
delay: str | int | float = "100 s",
mode: Literal["sys.exit", "graceful", "segfault"] = "sys.exit",
):
self.delay = parse_timedelta(delay)
if mode not in ("sys.exit", "graceful", "segfault"):
raise ValueError(
f"Three modes supported, 'sys.exit', 'graceful', and 'segfault'. "
f"got {mode!r}"
)
self.mode = mode
async def setup(self, worker):
self.worker = worker
if self.mode == "graceful":
f = self.graceful
elif self.mode == "sys.exit":
f = self.sys_exit
elif self.mode == "segfault":
f = self.segfault
self.worker.loop.asyncio_loop.call_later(
delay=random.expovariate(1 / self.delay),
callback=f,
)
def graceful(self):
asyncio.create_task(self.worker.close(nanny=False, executor_wait=False))
def sys_exit(self):
sys.exit(0)
def segfault(self):
"""
Magic, from https://gist.github.com/coolreader18/6dbe0be2ae2192e90e1a809f1624c694?permalink_comment_id=3874116#gistcomment-3874116
"""
ctypes.string_at(0)