darabos's picture
t -> type_hint, remove stray line.
19946be
"""Automatically wraps all NetworkX functions as LynxKite operations."""
import collections
import types
from lynxkite.core import ops
import functools
import inspect
import networkx as nx
import re
import pandas as pd
ENV = "LynxKite Graph Analytics"
class UnsupportedParameterType(Exception):
pass
_UNSUPPORTED = object()
_SKIP = object()
def doc_to_type(name: str, type_hint: str) -> type:
type_hint = type_hint.lower()
type_hint = re.sub("[(][^)]+[)]", "", type_hint).strip().strip(".")
if " " in name or "http" in name:
return _UNSUPPORTED # Not a parameter type.
if type_hint.endswith(", optional"):
w = doc_to_type(name, type_hint.removesuffix(", optional").strip())
if w is _UNSUPPORTED:
return _SKIP
return w if w is _SKIP else w | None
if type_hint in [
"a digraph or multidigraph",
"a graph g",
"graph",
"graphs",
"networkx graph instance",
"networkx graph",
"networkx undirected graph",
"nx.graph",
"undirected graph",
"undirected networkx graph",
] or type_hint.startswith("networkx graph"):
return nx.Graph
elif type_hint in [
"digraph-like",
"digraph",
"directed graph",
"networkx digraph",
"networkx directed graph",
"nx.digraph",
]:
return nx.DiGraph
elif type_hint == "node":
return _UNSUPPORTED
elif type_hint == '"node (optional)"':
return _SKIP
elif type_hint == '"edge"':
return _UNSUPPORTED
elif type_hint == '"edge (optional)"':
return _SKIP
elif type_hint in ["class", "data type"]:
return _UNSUPPORTED
elif type_hint in ["string", "str", "node label"]:
return str
elif type_hint in ["string or none", "none or string", "string, or none"]:
return str | None
elif type_hint in ["int", "integer"]:
return int
elif type_hint in ["bool", "boolean"]:
return bool
elif type_hint == "tuple":
return _UNSUPPORTED
elif type_hint == "set":
return _UNSUPPORTED
elif type_hint == "list of floats":
return _UNSUPPORTED
elif type_hint == "list of floats or float":
return float
elif type_hint in ["dict", "dictionary"]:
return _UNSUPPORTED
elif type_hint == "scalar or dictionary":
return float
elif type_hint == "none or dict":
return _SKIP
elif type_hint in ["function", "callable"]:
return _UNSUPPORTED
elif type_hint in [
"collection",
"container of nodes",
"list of nodes",
]:
return _UNSUPPORTED
elif type_hint in [
"container",
"generator",
"iterable",
"iterator",
"list or iterable container",
"list or iterable",
"list or set",
"list or tuple",
"list",
]:
return _UNSUPPORTED
elif type_hint == "generator of sets":
return _UNSUPPORTED
elif type_hint == "dict or a set of 2 or 3 tuples":
return _UNSUPPORTED
elif type_hint == "set of 2 or 3 tuples":
return _UNSUPPORTED
elif type_hint == "none, string or function":
return str | None
elif type_hint == "string or function" and name == "weight":
return str
elif type_hint == "integer, float, or none":
return float | None
elif type_hint in [
"float",
"int or float",
"integer or float",
"integer, float",
"number",
"numeric",
"real",
"scalar",
]:
return float
elif type_hint in ["integer or none", "int or none"]:
return int | None
elif name == "seed":
return int | None
elif name == "weight":
return str
elif type_hint == "object":
return _UNSUPPORTED
return _SKIP
def types_from_doc(doc: str) -> dict[str, type]:
types = {}
for line in doc.splitlines():
if ":" in line:
a, b = line.split(":", 1)
for a in a.split(","):
a = a.strip()
types[a] = doc_to_type(a, b)
return types
def wrapped(name: str, func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
for k, v in kwargs.items():
if v == "None":
kwargs[k] = None
res = func(*args, **kwargs)
# Figure out what the returned value is.
if isinstance(res, nx.Graph):
return res
if isinstance(res, types.GeneratorType):
res = list(res)
if name in ["articulation_points"]:
graph = args[0].copy()
nx.set_node_attributes(graph, 0, name=name)
nx.set_node_attributes(graph, {r: 1 for r in res}, name=name)
return graph
if isinstance(res, collections.abc.Sized):
if len(res) == 0:
return pd.DataFrame()
for a in args:
if isinstance(a, nx.Graph):
if a.number_of_nodes() == len(res):
graph = a.copy()
nx.set_node_attributes(graph, values=res, name=name)
return graph
if a.number_of_edges() == len(res):
graph = a.copy()
nx.set_edge_attributes(graph, values=res, name=name)
return graph
return pd.DataFrame({name: res})
return pd.DataFrame({name: [res]})
return wrapper
def _get_params(func) -> dict | None:
sig = inspect.signature(func)
# Get types from docstring.
types = types_from_doc(func.__doc__)
# Always hide these.
for k in ["backend", "backend_kwargs", "create_using"]:
types[k] = _SKIP
# Add in types based on signature.
for k, param in sig.parameters.items():
if k in types:
continue
if param.annotation is not param.empty:
types[k] = param.annotation
if k in ["i", "j", "n"]:
types[k] = int
params = {}
for name, param in sig.parameters.items():
_type = types.get(name, _UNSUPPORTED)
if _type is _UNSUPPORTED:
raise UnsupportedParameterType(name)
if _type is _SKIP or _type in [nx.Graph, nx.DiGraph]:
continue
params[name] = ops.Parameter.basic(
name=name,
default=str(param.default)
if type(param.default) in [str, int, float]
else None,
type=_type,
)
return params
_REPLACEMENTS = [
("Barabasi Albert", "Barabasi–Albert"),
("Bellman Ford", "Bellman–Ford"),
("Bethe Hessian", "Bethe–Hessian"),
("Bfs", "BFS"),
("Dag ", "DAG "),
("Dfs", "DFS"),
("Dorogovtsev Goltsev Mendes", "Dorogovtsev–Goltsev–Mendes"),
("Erdos Renyi", "Erdos–Renyi"),
("Floyd Warshall", "Floyd–Warshall"),
("Gnc", "G(n,c)"),
("Gnm", "G(n,m)"),
("Gnp", "G(n,p)"),
("Gnr", "G(n,r)"),
("Havel Hakimi", "Havel–Hakimi"),
("Hkn", "H(k,n)"),
("Hnm", "H(n,m)"),
("Kl ", "KL "),
("Moebius Kantor", "Moebius–Kantor"),
("Pagerank", "PageRank"),
("Scale Free", "Scale-Free"),
("Vf2Pp", "VF2++"),
("Watts Strogatz", "Watts–Strogatz"),
("Weisfeiler Lehman", "Weisfeiler–Lehman"),
]
def register_networkx(env: str):
cat = ops.CATALOGS.setdefault(env, {})
counter = 0
for name, func in nx.__dict__.items():
if hasattr(func, "graphs"):
try:
params = _get_params(func)
except UnsupportedParameterType:
continue
inputs = {k: ops.Input(name=k, type=nx.Graph) for k in func.graphs}
nicename = "NX › " + name.replace("_", " ").title()
for a, b in _REPLACEMENTS:
nicename = nicename.replace(a, b)
op = ops.Op(
func=wrapped(name, func),
name=nicename,
params=params,
inputs=inputs,
outputs={"output": ops.Output(name="output", type=nx.Graph)},
type="basic",
)
cat[nicename] = op
counter += 1
print(f"Registered {counter} NetworkX operations.")
register_networkx(ENV)