|
""" |
|
My own variation on function-specific inspect-like features. |
|
""" |
|
|
|
|
|
|
|
|
|
|
|
import collections |
|
import inspect |
|
import os |
|
import re |
|
import warnings |
|
from itertools import islice |
|
from tokenize import open as open_py_source |
|
|
|
from .logger import pformat |
|
|
|
full_argspec_fields = ( |
|
"args varargs varkw defaults kwonlyargs kwonlydefaults annotations" |
|
) |
|
full_argspec_type = collections.namedtuple("FullArgSpec", full_argspec_fields) |
|
|
|
|
|
def get_func_code(func): |
|
"""Attempts to retrieve a reliable function code hash. |
|
|
|
The reason we don't use inspect.getsource is that it caches the |
|
source, whereas we want this to be modified on the fly when the |
|
function is modified. |
|
|
|
Returns |
|
------- |
|
func_code: string |
|
The function code |
|
source_file: string |
|
The path to the file in which the function is defined. |
|
first_line: int |
|
The first line of the code in the source file. |
|
|
|
Notes |
|
------ |
|
This function does a bit more magic than inspect, and is thus |
|
more robust. |
|
""" |
|
source_file = None |
|
try: |
|
code = func.__code__ |
|
source_file = code.co_filename |
|
if not os.path.exists(source_file): |
|
|
|
|
|
source_code = "".join(inspect.getsourcelines(func)[0]) |
|
line_no = 1 |
|
if source_file.startswith("<doctest "): |
|
source_file, line_no = re.match( |
|
r"\<doctest (.*\.rst)\[(.*)\]\>", source_file |
|
).groups() |
|
line_no = int(line_no) |
|
source_file = "<doctest %s>" % source_file |
|
return source_code, source_file, line_no |
|
|
|
with open_py_source(source_file) as source_file_obj: |
|
first_line = code.co_firstlineno |
|
|
|
source_lines = list(islice(source_file_obj, first_line - 1, None)) |
|
return "".join(inspect.getblock(source_lines)), source_file, first_line |
|
except: |
|
|
|
|
|
if hasattr(func, "__code__"): |
|
|
|
return str(func.__code__.__hash__()), source_file, -1 |
|
else: |
|
|
|
|
|
|
|
|
|
return repr(func), source_file, -1 |
|
|
|
|
|
def _clean_win_chars(string): |
|
"""Windows cannot encode some characters in filename.""" |
|
import urllib |
|
|
|
if hasattr(urllib, "quote"): |
|
quote = urllib.quote |
|
else: |
|
|
|
import urllib.parse |
|
|
|
quote = urllib.parse.quote |
|
for char in ("<", ">", "!", ":", "\\"): |
|
string = string.replace(char, quote(char)) |
|
return string |
|
|
|
|
|
def get_func_name(func, resolv_alias=True, win_characters=True): |
|
"""Return the function import path (as a list of module names), and |
|
a name for the function. |
|
|
|
Parameters |
|
---------- |
|
func: callable |
|
The func to inspect |
|
resolv_alias: boolean, optional |
|
If true, possible local aliases are indicated. |
|
win_characters: boolean, optional |
|
If true, substitute special characters using urllib.quote |
|
This is useful in Windows, as it cannot encode some filenames |
|
""" |
|
if hasattr(func, "__module__"): |
|
module = func.__module__ |
|
else: |
|
try: |
|
module = inspect.getmodule(func) |
|
except TypeError: |
|
if hasattr(func, "__class__"): |
|
module = func.__class__.__module__ |
|
else: |
|
module = "unknown" |
|
if module is None: |
|
|
|
module = "" |
|
if module == "__main__": |
|
try: |
|
filename = os.path.abspath(inspect.getsourcefile(func)) |
|
except: |
|
filename = None |
|
if filename is not None: |
|
|
|
parts = filename.split(os.sep) |
|
if parts[-1].startswith("<ipython-input"): |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
split = parts[-1].split("-") |
|
parts[-1] = "-".join(split[:2] + split[3:]) |
|
elif len(parts) > 2 and parts[-2].startswith("ipykernel_"): |
|
|
|
|
|
|
|
|
|
|
|
parts[-2] = "ipykernel" |
|
filename = "-".join(parts) |
|
if filename.endswith(".py"): |
|
filename = filename[:-3] |
|
module = module + "-" + filename |
|
module = module.split(".") |
|
if hasattr(func, "func_name"): |
|
name = func.func_name |
|
elif hasattr(func, "__name__"): |
|
name = func.__name__ |
|
else: |
|
name = "unknown" |
|
|
|
if resolv_alias: |
|
|
|
if hasattr(func, "func_globals") and name in func.func_globals: |
|
if func.func_globals[name] is not func: |
|
name = "%s-alias" % name |
|
if hasattr(func, "__qualname__") and func.__qualname__ != name: |
|
|
|
|
|
module.extend(func.__qualname__.split(".")[:-1]) |
|
if inspect.ismethod(func): |
|
|
|
if hasattr(func, "im_class"): |
|
klass = func.im_class |
|
module.append(klass.__name__) |
|
if os.name == "nt" and win_characters: |
|
|
|
name = _clean_win_chars(name) |
|
module = [_clean_win_chars(s) for s in module] |
|
return module, name |
|
|
|
|
|
def _signature_str(function_name, arg_sig): |
|
"""Helper function to output a function signature""" |
|
return "{}{}".format(function_name, arg_sig) |
|
|
|
|
|
def _function_called_str(function_name, args, kwargs): |
|
"""Helper function to output a function call""" |
|
template_str = "{0}({1}, {2})" |
|
|
|
args_str = repr(args)[1:-1] |
|
kwargs_str = ", ".join("%s=%s" % (k, v) for k, v in kwargs.items()) |
|
return template_str.format(function_name, args_str, kwargs_str) |
|
|
|
|
|
def filter_args(func, ignore_lst, args=(), kwargs=dict()): |
|
"""Filters the given args and kwargs using a list of arguments to |
|
ignore, and a function specification. |
|
|
|
Parameters |
|
---------- |
|
func: callable |
|
Function giving the argument specification |
|
ignore_lst: list of strings |
|
List of arguments to ignore (either a name of an argument |
|
in the function spec, or '*', or '**') |
|
*args: list |
|
Positional arguments passed to the function. |
|
**kwargs: dict |
|
Keyword arguments passed to the function |
|
|
|
Returns |
|
------- |
|
filtered_args: list |
|
List of filtered positional and keyword arguments. |
|
""" |
|
args = list(args) |
|
if isinstance(ignore_lst, str): |
|
|
|
raise ValueError( |
|
"ignore_lst must be a list of parameters to ignore " |
|
"%s (type %s) was given" % (ignore_lst, type(ignore_lst)) |
|
) |
|
|
|
if not inspect.ismethod(func) and not inspect.isfunction(func): |
|
if ignore_lst: |
|
warnings.warn( |
|
"Cannot inspect object %s, ignore list will not work." % func, |
|
stacklevel=2, |
|
) |
|
return {"*": args, "**": kwargs} |
|
arg_sig = inspect.signature(func) |
|
arg_names = [] |
|
arg_defaults = [] |
|
arg_kwonlyargs = [] |
|
arg_varargs = None |
|
arg_varkw = None |
|
for param in arg_sig.parameters.values(): |
|
if param.kind is param.POSITIONAL_OR_KEYWORD: |
|
arg_names.append(param.name) |
|
elif param.kind is param.KEYWORD_ONLY: |
|
arg_names.append(param.name) |
|
arg_kwonlyargs.append(param.name) |
|
elif param.kind is param.VAR_POSITIONAL: |
|
arg_varargs = param.name |
|
elif param.kind is param.VAR_KEYWORD: |
|
arg_varkw = param.name |
|
if param.default is not param.empty: |
|
arg_defaults.append(param.default) |
|
if inspect.ismethod(func): |
|
|
|
|
|
args = [ |
|
func.__self__, |
|
] + args |
|
|
|
|
|
|
|
class_method_sig = inspect.signature(func.__func__) |
|
self_name = next(iter(class_method_sig.parameters)) |
|
arg_names = [self_name] + arg_names |
|
|
|
|
|
|
|
_, name = get_func_name(func, resolv_alias=False) |
|
arg_dict = dict() |
|
arg_position = -1 |
|
for arg_position, arg_name in enumerate(arg_names): |
|
if arg_position < len(args): |
|
|
|
if arg_name not in arg_kwonlyargs: |
|
arg_dict[arg_name] = args[arg_position] |
|
else: |
|
raise ValueError( |
|
"Keyword-only parameter '%s' was passed as " |
|
"positional parameter for %s:\n" |
|
" %s was called." |
|
% ( |
|
arg_name, |
|
_signature_str(name, arg_sig), |
|
_function_called_str(name, args, kwargs), |
|
) |
|
) |
|
|
|
else: |
|
position = arg_position - len(arg_names) |
|
if arg_name in kwargs: |
|
arg_dict[arg_name] = kwargs[arg_name] |
|
else: |
|
try: |
|
arg_dict[arg_name] = arg_defaults[position] |
|
except (IndexError, KeyError) as e: |
|
|
|
raise ValueError( |
|
"Wrong number of arguments for %s:\n" |
|
" %s was called." |
|
% ( |
|
_signature_str(name, arg_sig), |
|
_function_called_str(name, args, kwargs), |
|
) |
|
) from e |
|
|
|
varkwargs = dict() |
|
for arg_name, arg_value in sorted(kwargs.items()): |
|
if arg_name in arg_dict: |
|
arg_dict[arg_name] = arg_value |
|
elif arg_varkw is not None: |
|
varkwargs[arg_name] = arg_value |
|
else: |
|
raise TypeError( |
|
"Ignore list for %s() contains an unexpected " |
|
"keyword argument '%s'" % (name, arg_name) |
|
) |
|
|
|
if arg_varkw is not None: |
|
arg_dict["**"] = varkwargs |
|
if arg_varargs is not None: |
|
varargs = args[arg_position + 1 :] |
|
arg_dict["*"] = varargs |
|
|
|
|
|
for item in ignore_lst: |
|
if item in arg_dict: |
|
arg_dict.pop(item) |
|
else: |
|
raise ValueError( |
|
"Ignore list: argument '%s' is not defined for " |
|
"function %s" % (item, _signature_str(name, arg_sig)) |
|
) |
|
|
|
return arg_dict |
|
|
|
|
|
def _format_arg(arg): |
|
formatted_arg = pformat(arg, indent=2) |
|
if len(formatted_arg) > 1500: |
|
formatted_arg = "%s..." % formatted_arg[:700] |
|
return formatted_arg |
|
|
|
|
|
def format_signature(func, *args, **kwargs): |
|
|
|
module, name = get_func_name(func) |
|
module = [m for m in module if m] |
|
if module: |
|
module.append(name) |
|
module_path = ".".join(module) |
|
else: |
|
module_path = name |
|
arg_str = list() |
|
previous_length = 0 |
|
for arg in args: |
|
formatted_arg = _format_arg(arg) |
|
if previous_length > 80: |
|
formatted_arg = "\n%s" % formatted_arg |
|
previous_length = len(formatted_arg) |
|
arg_str.append(formatted_arg) |
|
arg_str.extend(["%s=%s" % (v, _format_arg(i)) for v, i in kwargs.items()]) |
|
arg_str = ", ".join(arg_str) |
|
|
|
signature = "%s(%s)" % (name, arg_str) |
|
return module_path, signature |
|
|
|
|
|
def format_call(func, args, kwargs, object_name="Memory"): |
|
"""Returns a nicely formatted statement displaying the function |
|
call with the given arguments. |
|
""" |
|
path, signature = format_signature(func, *args, **kwargs) |
|
msg = "%s\n[%s] Calling %s...\n%s" % (80 * "_", object_name, path, signature) |
|
return msg |
|
|
|
|
|
|