Spaces:
Running
on
Zero
Running
on
Zero
File size: 1,947 Bytes
d1ed09d |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 |
from __future__ import annotations
import asyncio
import ctypes
import random
import sys
from typing import Literal
from dask.utils import parse_timedelta
from distributed.diagnostics.plugin import WorkerPlugin
class KillWorker(WorkerPlugin):
"""Kill Workers Randomly
This kills workers in a cluster randomly. It is intended to be used in
stress testing.
Parameters
----------
delay: str
The expected amount of time for a worker to live.
The actual time will vary, treating worker death as a poisson process.
mode: str
or "graceful" which calls worker.close(...)
Either "sys.exit" which calls sys.exit(0)
or "segfault" which triggers a segfault
"""
def __init__(
self,
delay: str | int | float = "100 s",
mode: Literal["sys.exit", "graceful", "segfault"] = "sys.exit",
):
self.delay = parse_timedelta(delay)
if mode not in ("sys.exit", "graceful", "segfault"):
raise ValueError(
f"Three modes supported, 'sys.exit', 'graceful', and 'segfault'. "
f"got {mode!r}"
)
self.mode = mode
async def setup(self, worker):
self.worker = worker
if self.mode == "graceful":
f = self.graceful
elif self.mode == "sys.exit":
f = self.sys_exit
elif self.mode == "segfault":
f = self.segfault
self.worker.loop.asyncio_loop.call_later(
delay=random.expovariate(1 / self.delay),
callback=f,
)
def graceful(self):
asyncio.create_task(self.worker.close(nanny=False, executor_wait=False))
def sys_exit(self):
sys.exit(0)
def segfault(self):
"""
Magic, from https://gist.github.com/coolreader18/6dbe0be2ae2192e90e1a809f1624c694?permalink_comment_id=3874116#gistcomment-3874116
"""
ctypes.string_at(0)
|