""" This module contains utility functions to construct and manipulate counting data structures for frames. When performing statistical profiling we obtain many call stacks. We aggregate these call stacks into data structures that maintain counts of how many times each function in that call stack has been called. Because these stacks will overlap this aggregation counting structure forms a tree, such as is commonly visualized by profiling tools. We represent this tree as a nested dictionary with the following form: { 'identifier': 'root', 'description': 'A long description of the line of code being run.', 'count': 10 # the number of times we have seen this line 'children': { # callers of this line. Recursive dicts 'ident-b': {'description': ... 'identifier': 'ident-a', 'count': ... 'children': {...}}, 'ident-b': {'description': ... 'identifier': 'ident-b', 'count': ... 'children': {...}}} } """ from __future__ import annotations import bisect import dis import linecache import sys import threading from collections import defaultdict, deque from collections.abc import Callable, Collection from time import sleep from types import FrameType from typing import Any import tlz as toolz from dask.utils import format_time, parse_timedelta from distributed.metrics import time from distributed.utils import color_of #: This lock can be acquired to ensure that no instance of watch() is concurrently holding references to frames lock = threading.Lock() def identifier(frame: FrameType | None) -> str: """A string identifier from a frame Strings are cheaper to use as indexes into dicts than tuples or dicts """ if frame is None: return "None" else: return ";".join( ( frame.f_code.co_name, frame.f_code.co_filename, str(frame.f_code.co_firstlineno), ) ) def _f_lineno(frame: FrameType) -> int: """Work around some frames lacking an f_lineno See: https://bugs.python.org/issue47085 """ f_lineno = frame.f_lineno if f_lineno is not None: return f_lineno f_lasti = frame.f_lasti code = frame.f_code prev_line = code.co_firstlineno for start, next_line in dis.findlinestarts(code): if f_lasti < start: return prev_line prev_line = next_line return prev_line def repr_frame(frame: FrameType) -> str: """Render a frame as a line for inclusion into a text traceback""" co = frame.f_code f_lineno = _f_lineno(frame) text = f' File "{co.co_filename}", line {f_lineno}, in {co.co_name}' line = linecache.getline(co.co_filename, f_lineno, frame.f_globals).lstrip() return text + "\n\t" + line def info_frame(frame: FrameType) -> dict[str, Any]: co = frame.f_code f_lineno = _f_lineno(frame) line = linecache.getline(co.co_filename, f_lineno, frame.f_globals).lstrip() return { "filename": co.co_filename, "name": co.co_name, "line_number": f_lineno, "line": line, } def process( frame: FrameType, child: object | None, state: dict[str, Any], *, stop: str | None = None, omit: Collection[str] = (), depth: int | None = None, ) -> dict[str, Any] | None: """Add counts from a frame stack onto existing state This recursively adds counts to the existing state dictionary and creates new entries for new functions. Parameters ---------- frame: The frame to process onto the state child: For internal use only state: The profile state to accumulate this frame onto, see ``create`` stop: Filename suffix that should stop processing if we encounter it omit: Filenames that we should omit from processing depth: For internal use only, how deep we are in the call stack Used to prevent stack overflow Examples -------- >>> import sys, threading >>> ident = threading.get_ident() # replace with your thread of interest >>> frame = sys._current_frames()[ident] >>> state = create() >>> process(frame, None, state) >>> state {'count': 1, 'identifier': 'root', 'description': 'root', 'children': {'...'}} See also -------- create merge """ if depth is None: depth = sys.getrecursionlimit() - 50 if depth <= 0: return None if any(frame.f_code.co_filename.endswith(o) for o in omit): return None prev = frame.f_back if prev is not None and ( stop is None or not prev.f_code.co_filename.endswith(stop) ): new_state = process(prev, frame, state, stop=stop, depth=depth - 1) if new_state is None: return None state = new_state ident = identifier(frame) try: d = state["children"][ident] except KeyError: d = { "count": 0, "description": info_frame(frame), "children": {}, "identifier": ident, } state["children"][ident] = d state["count"] += 1 if child is not None: return d else: d["count"] += 1 return None def merge(*args: dict[str, Any]) -> dict[str, Any]: """Merge multiple frame states together""" if not args: return create() s = {arg["identifier"] for arg in args} if len(s) != 1: # pragma: no cover raise ValueError(f"Expected identifiers, got {s}") children = defaultdict(list) for arg in args: for child in arg["children"]: children[child].append(arg["children"][child]) try: children_dict = {k: merge(*v) for k, v in children.items()} except RecursionError: # pragma: no cover children_dict = {} count = sum(arg["count"] for arg in args) return { "description": args[0]["description"], "children": children_dict, "count": count, "identifier": args[0]["identifier"], } def create() -> dict[str, Any]: return { "count": 0, "children": {}, "identifier": "root", "description": {"filename": "", "name": "", "line_number": 0, "line": ""}, } def call_stack(frame: FrameType) -> list[str]: """Create a call text stack from a frame Returns ------- list of strings """ L = [] cur_frame: FrameType | None = frame while cur_frame: L.append(repr_frame(cur_frame)) cur_frame = cur_frame.f_back return L[::-1] def plot_data(state, profile_interval=0.010): """Convert a profile state into data useful by Bokeh See Also -------- plot_figure distributed.bokeh.components.ProfilePlot """ starts = [] stops = [] heights = [] widths = [] colors = [] states = [] times = [] filenames = [] lines = [] line_numbers = [] names = [] def traverse(state, start, stop, height): if not state["count"]: return starts.append(start) stops.append(stop) heights.append(height) width = stop - start widths.append(width) states.append(state) times.append(format_time(state["count"] * profile_interval)) desc = state["description"] filenames.append(desc["filename"]) lines.append(desc["line"]) line_numbers.append(desc["line_number"]) names.append(desc["name"]) try: fn = desc["filename"] except IndexError: # pragma: no cover colors.append("gray") else: if fn == "": # pragma: no cover colors.append("lightgray") else: colors.append(color_of(fn)) delta = (stop - start) / state["count"] x = start for _, child in state["children"].items(): width = child["count"] * delta traverse(child, x, x + width, height + 1) x += width traverse(state, 0, 1, 0) percentages = [f"{100 * w:.1f}%" for w in widths] return { "left": starts, "right": stops, "bottom": heights, "width": widths, "top": [x + 1 for x in heights], "color": colors, "states": states, "filename": filenames, "line": lines, "line_number": line_numbers, "name": names, "time": times, "percentage": percentages, } def _watch( thread_id: int, log: deque[tuple[float, dict[str, Any]]], # [(timestamp, output of create()), ...] interval: float, cycle: float, omit: Collection[str], stop: Callable[[], bool], ) -> None: recent = create() last = time() while not stop(): if time() > last + cycle: recent = create() with lock: log.append((time(), recent)) last = time() try: frame = sys._current_frames()[thread_id] except KeyError: return process(frame, None, recent, omit=omit) del frame sleep(interval) def watch( thread_id: int | None = None, interval: str = "20ms", cycle: str = "2s", maxlen: int = 1000, omit: Collection[str] = (), stop: Callable[[], bool] = lambda: False, ) -> deque[tuple[float, dict[str, Any]]]: """Gather profile information on a particular thread This starts a new thread to watch a particular thread and returns a deque that holds periodic profile information. Parameters ---------- thread_id : int, optional Defaults to current thread interval : str Time per sample cycle : str Time per refreshing to a new profile state maxlen : int Passed onto deque, maximum number of periods omit : collection of str Don't include entries whose filename includes any of these substrings stop : callable Function to call to see if we should stop. It must accept no arguments and return a bool (True to stop, False to continue). Returns ------- deque of tuples: - timestamp - dict[str, Any] (output of ``create()``) """ log: deque[tuple[float, dict[str, Any]]] = deque(maxlen=maxlen) thread = threading.Thread( target=_watch, name="Profile", kwargs={ "thread_id": thread_id or threading.get_ident(), "interval": parse_timedelta(interval), "cycle": parse_timedelta(cycle), "log": log, "omit": omit, "stop": stop, }, ) thread.daemon = True thread.start() return log def get_profile(history, recent=None, start=None, stop=None, key=None): """Collect profile information from a sequence of profile states Parameters ---------- history : Sequence[Tuple[time, Dict]] A list or deque of profile states recent : dict The most recent accumulating state start : time stop : time """ if start is None: istart = 0 else: istart = bisect.bisect_left(history, (start,)) if stop is None: istop = None else: istop = bisect.bisect_right(history, (stop,)) + 1 if istop >= len(history): istop = None # include end if istart == 0 and istop is None: history = list(history) else: iistop = len(history) if istop is None else istop history = [history[i] for i in range(istart, iistop)] prof = merge(*toolz.pluck(1, history)) if not history: return create() if recent: prof = merge(prof, recent) return prof def plot_figure(data, **kwargs): """Plot profile data using Bokeh This takes the output from the function ``plot_data`` and produces a Bokeh figure See Also -------- plot_data """ from bokeh.models import HoverTool from bokeh.plotting import ColumnDataSource, figure if "states" in data: data = toolz.dissoc(data, "states") source = ColumnDataSource(data=data) fig = figure(tools="tap,box_zoom,xwheel_zoom,reset", **kwargs) r = fig.quad( "left", "right", "top", "bottom", color="color", line_color="black", line_width=2, source=source, ) r.selection_glyph = None r.nonselection_glyph = None hover = HoverTool( point_policy="follow_mouse", tooltips="""
Name:  @name
Filename:  @filename
Line number:  @line_number
Line:  @line
Time:  @time
Percentage:  @percentage
""", ) fig.add_tools(hover) fig.xaxis.visible = False fig.yaxis.visible = False fig.grid.visible = False return fig, source def _remove_py_stack(frames): for entry in frames: if entry.is_python: break yield entry def llprocess( # type: ignore[no-untyped-def] frames, child: object | None, state: dict[str, Any] | None, ) -> dict[str, Any] | None: """Add counts from low level profile information onto existing state This uses the ``stacktrace`` module to collect low level stack trace information and place it onto the given state. It is configured with the ``distributed.worker.profile.low-level`` config entry. See Also -------- process ll_get_stack """ if not frames: return None frame = frames.pop() if frames: state = llprocess(frames, frame, state) assert state addr = hex(frame.addr - frame.offset) ident = ";".join(map(str, (frame.name, "", addr))) try: d = state["children"][ident] except KeyError: d = { "count": 0, "description": { "filename": "", "name": frame.name, "line_number": 0, "line": str(frame), }, "children": {}, "identifier": ident, } state["children"][ident] = d state["count"] += 1 if child is not None: return d else: d["count"] += 1 return None def ll_get_stack(tid): """Collect low level stack information from thread id""" from stacktrace import get_thread_stack frames = get_thread_stack(tid, show_python=False) llframes = list(_remove_py_stack(frames))[::-1] return llframes