Spaces:
Running
on
Zero
Running
on
Zero
| """ This module contains utility functions to construct and manipulate counting | |
| data structures for frames. | |
| When performing statistical profiling we obtain many call stacks. We aggregate | |
| these call stacks into data structures that maintain counts of how many times | |
| each function in that call stack has been called. Because these stacks will | |
| overlap this aggregation counting structure forms a tree, such as is commonly | |
| visualized by profiling tools. | |
| We represent this tree as a nested dictionary with the following form: | |
| { | |
| 'identifier': 'root', | |
| 'description': 'A long description of the line of code being run.', | |
| 'count': 10 # the number of times we have seen this line | |
| 'children': { # callers of this line. Recursive dicts | |
| 'ident-b': {'description': ... | |
| 'identifier': 'ident-a', | |
| 'count': ... | |
| 'children': {...}}, | |
| 'ident-b': {'description': ... | |
| 'identifier': 'ident-b', | |
| 'count': ... | |
| 'children': {...}}} | |
| } | |
| """ | |
| from __future__ import annotations | |
| import bisect | |
| import dis | |
| import linecache | |
| import sys | |
| import threading | |
| from collections import defaultdict, deque | |
| from collections.abc import Callable, Collection | |
| from time import sleep | |
| from types import FrameType | |
| from typing import Any | |
| import tlz as toolz | |
| from dask.utils import format_time, parse_timedelta | |
| from distributed.metrics import time | |
| from distributed.utils import color_of | |
| #: This lock can be acquired to ensure that no instance of watch() is concurrently holding references to frames | |
| lock = threading.Lock() | |
| def identifier(frame: FrameType | None) -> str: | |
| """A string identifier from a frame | |
| Strings are cheaper to use as indexes into dicts than tuples or dicts | |
| """ | |
| if frame is None: | |
| return "None" | |
| else: | |
| return ";".join( | |
| ( | |
| frame.f_code.co_name, | |
| frame.f_code.co_filename, | |
| str(frame.f_code.co_firstlineno), | |
| ) | |
| ) | |
| def _f_lineno(frame: FrameType) -> int: | |
| """Work around some frames lacking an f_lineno | |
| See: https://bugs.python.org/issue47085 | |
| """ | |
| f_lineno = frame.f_lineno | |
| if f_lineno is not None: | |
| return f_lineno | |
| f_lasti = frame.f_lasti | |
| code = frame.f_code | |
| prev_line = code.co_firstlineno | |
| for start, next_line in dis.findlinestarts(code): | |
| if f_lasti < start: | |
| return prev_line | |
| prev_line = next_line | |
| return prev_line | |
| def repr_frame(frame: FrameType) -> str: | |
| """Render a frame as a line for inclusion into a text traceback""" | |
| co = frame.f_code | |
| f_lineno = _f_lineno(frame) | |
| text = f' File "{co.co_filename}", line {f_lineno}, in {co.co_name}' | |
| line = linecache.getline(co.co_filename, f_lineno, frame.f_globals).lstrip() | |
| return text + "\n\t" + line | |
| def info_frame(frame: FrameType) -> dict[str, Any]: | |
| co = frame.f_code | |
| f_lineno = _f_lineno(frame) | |
| line = linecache.getline(co.co_filename, f_lineno, frame.f_globals).lstrip() | |
| return { | |
| "filename": co.co_filename, | |
| "name": co.co_name, | |
| "line_number": f_lineno, | |
| "line": line, | |
| } | |
| def process( | |
| frame: FrameType, | |
| child: object | None, | |
| state: dict[str, Any], | |
| *, | |
| stop: str | None = None, | |
| omit: Collection[str] = (), | |
| depth: int | None = None, | |
| ) -> dict[str, Any] | None: | |
| """Add counts from a frame stack onto existing state | |
| This recursively adds counts to the existing state dictionary and creates | |
| new entries for new functions. | |
| Parameters | |
| ---------- | |
| frame: | |
| The frame to process onto the state | |
| child: | |
| For internal use only | |
| state: | |
| The profile state to accumulate this frame onto, see ``create`` | |
| stop: | |
| Filename suffix that should stop processing if we encounter it | |
| omit: | |
| Filenames that we should omit from processing | |
| depth: | |
| For internal use only, how deep we are in the call stack | |
| Used to prevent stack overflow | |
| Examples | |
| -------- | |
| >>> import sys, threading | |
| >>> ident = threading.get_ident() # replace with your thread of interest | |
| >>> frame = sys._current_frames()[ident] | |
| >>> state = create() | |
| >>> process(frame, None, state) | |
| >>> state | |
| {'count': 1, | |
| 'identifier': 'root', | |
| 'description': 'root', | |
| 'children': {'...'}} | |
| See also | |
| -------- | |
| create | |
| merge | |
| """ | |
| if depth is None: | |
| depth = sys.getrecursionlimit() - 50 | |
| if depth <= 0: | |
| return None | |
| if any(frame.f_code.co_filename.endswith(o) for o in omit): | |
| return None | |
| prev = frame.f_back | |
| if prev is not None and ( | |
| stop is None or not prev.f_code.co_filename.endswith(stop) | |
| ): | |
| new_state = process(prev, frame, state, stop=stop, depth=depth - 1) | |
| if new_state is None: | |
| return None | |
| state = new_state | |
| ident = identifier(frame) | |
| try: | |
| d = state["children"][ident] | |
| except KeyError: | |
| d = { | |
| "count": 0, | |
| "description": info_frame(frame), | |
| "children": {}, | |
| "identifier": ident, | |
| } | |
| state["children"][ident] = d | |
| state["count"] += 1 | |
| if child is not None: | |
| return d | |
| else: | |
| d["count"] += 1 | |
| return None | |
| def merge(*args: dict[str, Any]) -> dict[str, Any]: | |
| """Merge multiple frame states together""" | |
| if not args: | |
| return create() | |
| s = {arg["identifier"] for arg in args} | |
| if len(s) != 1: # pragma: no cover | |
| raise ValueError(f"Expected identifiers, got {s}") | |
| children = defaultdict(list) | |
| for arg in args: | |
| for child in arg["children"]: | |
| children[child].append(arg["children"][child]) | |
| try: | |
| children_dict = {k: merge(*v) for k, v in children.items()} | |
| except RecursionError: # pragma: no cover | |
| children_dict = {} | |
| count = sum(arg["count"] for arg in args) | |
| return { | |
| "description": args[0]["description"], | |
| "children": children_dict, | |
| "count": count, | |
| "identifier": args[0]["identifier"], | |
| } | |
| def create() -> dict[str, Any]: | |
| return { | |
| "count": 0, | |
| "children": {}, | |
| "identifier": "root", | |
| "description": {"filename": "", "name": "", "line_number": 0, "line": ""}, | |
| } | |
| def call_stack(frame: FrameType) -> list[str]: | |
| """Create a call text stack from a frame | |
| Returns | |
| ------- | |
| list of strings | |
| """ | |
| L = [] | |
| cur_frame: FrameType | None = frame | |
| while cur_frame: | |
| L.append(repr_frame(cur_frame)) | |
| cur_frame = cur_frame.f_back | |
| return L[::-1] | |
| def plot_data(state, profile_interval=0.010): | |
| """Convert a profile state into data useful by Bokeh | |
| See Also | |
| -------- | |
| plot_figure | |
| distributed.bokeh.components.ProfilePlot | |
| """ | |
| starts = [] | |
| stops = [] | |
| heights = [] | |
| widths = [] | |
| colors = [] | |
| states = [] | |
| times = [] | |
| filenames = [] | |
| lines = [] | |
| line_numbers = [] | |
| names = [] | |
| def traverse(state, start, stop, height): | |
| if not state["count"]: | |
| return | |
| starts.append(start) | |
| stops.append(stop) | |
| heights.append(height) | |
| width = stop - start | |
| widths.append(width) | |
| states.append(state) | |
| times.append(format_time(state["count"] * profile_interval)) | |
| desc = state["description"] | |
| filenames.append(desc["filename"]) | |
| lines.append(desc["line"]) | |
| line_numbers.append(desc["line_number"]) | |
| names.append(desc["name"]) | |
| try: | |
| fn = desc["filename"] | |
| except IndexError: # pragma: no cover | |
| colors.append("gray") | |
| else: | |
| if fn == "<low-level>": # pragma: no cover | |
| colors.append("lightgray") | |
| else: | |
| colors.append(color_of(fn)) | |
| delta = (stop - start) / state["count"] | |
| x = start | |
| for _, child in state["children"].items(): | |
| width = child["count"] * delta | |
| traverse(child, x, x + width, height + 1) | |
| x += width | |
| traverse(state, 0, 1, 0) | |
| percentages = [f"{100 * w:.1f}%" for w in widths] | |
| return { | |
| "left": starts, | |
| "right": stops, | |
| "bottom": heights, | |
| "width": widths, | |
| "top": [x + 1 for x in heights], | |
| "color": colors, | |
| "states": states, | |
| "filename": filenames, | |
| "line": lines, | |
| "line_number": line_numbers, | |
| "name": names, | |
| "time": times, | |
| "percentage": percentages, | |
| } | |
| def _watch( | |
| thread_id: int, | |
| log: deque[tuple[float, dict[str, Any]]], # [(timestamp, output of create()), ...] | |
| interval: float, | |
| cycle: float, | |
| omit: Collection[str], | |
| stop: Callable[[], bool], | |
| ) -> None: | |
| recent = create() | |
| last = time() | |
| while not stop(): | |
| if time() > last + cycle: | |
| recent = create() | |
| with lock: | |
| log.append((time(), recent)) | |
| last = time() | |
| try: | |
| frame = sys._current_frames()[thread_id] | |
| except KeyError: | |
| return | |
| process(frame, None, recent, omit=omit) | |
| del frame | |
| sleep(interval) | |
| def watch( | |
| thread_id: int | None = None, | |
| interval: str = "20ms", | |
| cycle: str = "2s", | |
| maxlen: int = 1000, | |
| omit: Collection[str] = (), | |
| stop: Callable[[], bool] = lambda: False, | |
| ) -> deque[tuple[float, dict[str, Any]]]: | |
| """Gather profile information on a particular thread | |
| This starts a new thread to watch a particular thread and returns a deque | |
| that holds periodic profile information. | |
| Parameters | |
| ---------- | |
| thread_id : int, optional | |
| Defaults to current thread | |
| interval : str | |
| Time per sample | |
| cycle : str | |
| Time per refreshing to a new profile state | |
| maxlen : int | |
| Passed onto deque, maximum number of periods | |
| omit : collection of str | |
| Don't include entries whose filename includes any of these substrings | |
| stop : callable | |
| Function to call to see if we should stop. It must | |
| accept no arguments and return a bool (True to stop, | |
| False to continue). | |
| Returns | |
| ------- | |
| deque of tuples: | |
| - timestamp | |
| - dict[str, Any] (output of ``create()``) | |
| """ | |
| log: deque[tuple[float, dict[str, Any]]] = deque(maxlen=maxlen) | |
| thread = threading.Thread( | |
| target=_watch, | |
| name="Profile", | |
| kwargs={ | |
| "thread_id": thread_id or threading.get_ident(), | |
| "interval": parse_timedelta(interval), | |
| "cycle": parse_timedelta(cycle), | |
| "log": log, | |
| "omit": omit, | |
| "stop": stop, | |
| }, | |
| ) | |
| thread.daemon = True | |
| thread.start() | |
| return log | |
| def get_profile(history, recent=None, start=None, stop=None, key=None): | |
| """Collect profile information from a sequence of profile states | |
| Parameters | |
| ---------- | |
| history : Sequence[Tuple[time, Dict]] | |
| A list or deque of profile states | |
| recent : dict | |
| The most recent accumulating state | |
| start : time | |
| stop : time | |
| """ | |
| if start is None: | |
| istart = 0 | |
| else: | |
| istart = bisect.bisect_left(history, (start,)) | |
| if stop is None: | |
| istop = None | |
| else: | |
| istop = bisect.bisect_right(history, (stop,)) + 1 | |
| if istop >= len(history): | |
| istop = None # include end | |
| if istart == 0 and istop is None: | |
| history = list(history) | |
| else: | |
| iistop = len(history) if istop is None else istop | |
| history = [history[i] for i in range(istart, iistop)] | |
| prof = merge(*toolz.pluck(1, history)) | |
| if not history: | |
| return create() | |
| if recent: | |
| prof = merge(prof, recent) | |
| return prof | |
| def plot_figure(data, **kwargs): | |
| """Plot profile data using Bokeh | |
| This takes the output from the function ``plot_data`` and produces a Bokeh | |
| figure | |
| See Also | |
| -------- | |
| plot_data | |
| """ | |
| from bokeh.models import HoverTool | |
| from bokeh.plotting import ColumnDataSource, figure | |
| if "states" in data: | |
| data = toolz.dissoc(data, "states") | |
| source = ColumnDataSource(data=data) | |
| fig = figure(tools="tap,box_zoom,xwheel_zoom,reset", **kwargs) | |
| r = fig.quad( | |
| "left", | |
| "right", | |
| "top", | |
| "bottom", | |
| color="color", | |
| line_color="black", | |
| line_width=2, | |
| source=source, | |
| ) | |
| r.selection_glyph = None | |
| r.nonselection_glyph = None | |
| hover = HoverTool( | |
| point_policy="follow_mouse", | |
| tooltips=""" | |
| <div> | |
| <span style="font-size: 14px; font-weight: bold;">Name:</span> | |
| <span style="font-size: 10px; font-family: Monaco, monospace;">@name</span> | |
| </div> | |
| <div> | |
| <span style="font-size: 14px; font-weight: bold;">Filename:</span> | |
| <span style="font-size: 10px; font-family: Monaco, monospace;">@filename</span> | |
| </div> | |
| <div> | |
| <span style="font-size: 14px; font-weight: bold;">Line number:</span> | |
| <span style="font-size: 10px; font-family: Monaco, monospace;">@line_number</span> | |
| </div> | |
| <div> | |
| <span style="font-size: 14px; font-weight: bold;">Line:</span> | |
| <span style="font-size: 10px; font-family: Monaco, monospace;">@line</span> | |
| </div> | |
| <div> | |
| <span style="font-size: 14px; font-weight: bold;">Time:</span> | |
| <span style="font-size: 10px; font-family: Monaco, monospace;">@time</span> | |
| </div> | |
| <div> | |
| <span style="font-size: 14px; font-weight: bold;">Percentage:</span> | |
| <span style="font-size: 10px; font-family: Monaco, monospace;">@percentage</span> | |
| </div> | |
| """, | |
| ) | |
| fig.add_tools(hover) | |
| fig.xaxis.visible = False | |
| fig.yaxis.visible = False | |
| fig.grid.visible = False | |
| return fig, source | |
| def _remove_py_stack(frames): | |
| for entry in frames: | |
| if entry.is_python: | |
| break | |
| yield entry | |
| def llprocess( # type: ignore[no-untyped-def] | |
| frames, | |
| child: object | None, | |
| state: dict[str, Any] | None, | |
| ) -> dict[str, Any] | None: | |
| """Add counts from low level profile information onto existing state | |
| This uses the ``stacktrace`` module to collect low level stack trace | |
| information and place it onto the given state. | |
| It is configured with the ``distributed.worker.profile.low-level`` config | |
| entry. | |
| See Also | |
| -------- | |
| process | |
| ll_get_stack | |
| """ | |
| if not frames: | |
| return None | |
| frame = frames.pop() | |
| if frames: | |
| state = llprocess(frames, frame, state) | |
| assert state | |
| addr = hex(frame.addr - frame.offset) | |
| ident = ";".join(map(str, (frame.name, "<low-level>", addr))) | |
| try: | |
| d = state["children"][ident] | |
| except KeyError: | |
| d = { | |
| "count": 0, | |
| "description": { | |
| "filename": "<low-level>", | |
| "name": frame.name, | |
| "line_number": 0, | |
| "line": str(frame), | |
| }, | |
| "children": {}, | |
| "identifier": ident, | |
| } | |
| state["children"][ident] = d | |
| state["count"] += 1 | |
| if child is not None: | |
| return d | |
| else: | |
| d["count"] += 1 | |
| return None | |
| def ll_get_stack(tid): | |
| """Collect low level stack information from thread id""" | |
| from stacktrace import get_thread_stack | |
| frames = get_thread_stack(tid, show_python=False) | |
| llframes = list(_remove_py_stack(frames))[::-1] | |
| return llframes | |