Spaces:
Running
on
Zero
Running
on
Zero
import numpy as np | |
import matplotlib.pyplot as plt | |
import networkx as nx | |
from matplotlib.collections import LineCollection | |
from itertools import count | |
from heapq import heappush, heappop | |
from collections import defaultdict | |
import time | |
import pandas as pd | |
from datashader.bundling import hammer_bundle # New import for hammer bundling | |
############################################################################### | |
# Minimal AbstractBundling base class (refactored from .abstractBundling import) | |
############################################################################### | |
class AbstractBundling: | |
def __init__(self, G: nx.Graph): | |
self.G = G | |
def bundle(self): | |
raise NotImplementedError("Subclasses should implement 'bundle'.") | |
############################################################################### | |
# Simple SplineC placeholder (refactoring out the nx2ipe dependency) | |
############################################################################### | |
class SplineC: | |
def __init__(self, points): | |
self.points = points | |
############################################################################### | |
# A base SpannerBundling class that SpannerBundlingNoSP depends on | |
############################################################################### | |
class SpannerBundling(AbstractBundling): | |
""" | |
S-EPB. Implementation | |
weightFactor: kappa value that sets the bundling strength | |
distortion: t value that sets the maximum allowed stretch/distortion | |
numWorkers: number of workers that process biconnected components | |
""" | |
def __init__(self, G: nx.Graph, weightFactor=2, distortion=2, numWorkers=1): | |
super().__init__(G) | |
self.distortion = distortion | |
self.weightFactor = weightFactor | |
self.mode = "greedy" | |
self.name = None | |
self.numWorkers = numWorkers | |
def name(self): | |
return f"SEPB_d_{self.distortion}_w_{self.weightFactor}_{self.mode}" | |
def name(self, value): | |
self._name = value | |
def bundle(self): | |
# Default does nothing | |
return 0.0 | |
def process(self, component): | |
# Default does nothing | |
pass | |
def spanner(self, g, k): | |
# Default does nothing | |
return None | |
############################################################################### | |
# The requested SpannerBundlingNoSP class | |
############################################################################### | |
class SpannerBundlingNoSP(SpannerBundling): | |
""" | |
S-EPB where instead of computing single source shortest paths we reuse | |
shortest paths during the spanner construction. | |
""" | |
def __init__(self, G: nx.Graph, weightFactor=2, distortion=2): | |
super().__init__(G) | |
self.distortion = distortion | |
self.weightFactor = weightFactor | |
self.mode = "reuse" | |
def bundle(self): | |
""" | |
Executes the bundling process on all biconnected components. | |
Returns the total time for bundling. | |
""" | |
t_start = time.process_time() | |
if nx.is_directed(self.G): | |
# Convert to undirected for the biconnected components | |
GG = self.G.to_undirected(as_view=True) | |
components = nx.biconnected_components(GG) | |
else: | |
components = nx.biconnected_components(self.G) | |
to_process = [] | |
for nodes in components: | |
if len(nodes) > 2: | |
subg = self.G.subgraph(nodes).copy() | |
to_process.append(subg) | |
# Sort the components from largest to smallest | |
to_process = sorted(to_process, key=lambda x: len(x.nodes()), reverse=True) | |
# Process each component | |
for comp in to_process: | |
self.process(comp) | |
return time.process_time() - t_start | |
def process(self, component): | |
""" | |
Process a component: build a spanner, then for each edge not in | |
the spanner, store a 'path' and create a Spline if possible. | |
""" | |
T = self.spanner(component, self.distortion) | |
# Mark edges in T as 'Spanning' | |
for u, v, data in T.edges(data=True): | |
data["weight"] = np.power(data["dist"], self.weightFactor) | |
for u, v in T.edges(): | |
self.G[u][v]["Layer"] = "Spanning" | |
self.G[u][v]["Stroke"] = "blue" | |
# For edges not in T, build a spline from the stored path | |
for u, v, data in component.edges(data=True): | |
if T.has_edge(u, v): | |
continue | |
path = data.get("path", []) | |
if len(path) < 1: | |
continue | |
spline_points = [] | |
current = path[0] | |
for nxt in path[1:-1]: | |
x = component.nodes[nxt].get("X", component.nodes[nxt].get("x", 0)) | |
y = component.nodes[nxt].get("Y", component.nodes[nxt].get("y", 0)) | |
spline_points.append((x, y)) | |
current = nxt | |
self.G[u][v]["Spline"] = SplineC(spline_points) | |
self.G[u][v]["Layer"] = "Bundled" | |
self.G[u][v]["Stroke"] = "purple" | |
return | |
def spanner(self, g, k): | |
""" | |
Create a spanner and store the shortest path in edge['path'] when the | |
edge is not added to the spanner. | |
""" | |
if nx.is_directed(g): | |
spanner = nx.DiGraph() | |
else: | |
spanner = nx.Graph() | |
edges = sorted(g.edges(data=True), key=lambda t: t[2].get("dist", 1)) | |
for u, v, data in edges: | |
if u not in spanner.nodes: | |
spanner.add_edge(u, v, dist=data["dist"]) | |
continue | |
if v not in spanner.nodes: | |
spanner.add_edge(u, v, dist=data["dist"]) | |
continue | |
pred, pathLength = nx.dijkstra_predecessor_and_distance( | |
spanner, u, weight="dist", cutoff=k * data["dist"] | |
) | |
# If v is in pathLength, we store the path in data['path'] | |
if v in pathLength: | |
# reconstruct path from v back to u | |
path = [] | |
nxt = v | |
while nxt != u: | |
path.append(nxt) | |
nxt = pred[nxt][0] | |
# remove the first node (==v) because we typically want just intermediate | |
path = path[1:] | |
path.reverse() | |
data["path"] = path | |
else: | |
spanner.add_edge(u, v, dist=data["dist"]) | |
return spanner | |
############################################################################### | |
# Function to plot only the bundled edges (with optional color gradient) | |
############################################################################### | |
def plot_bundled_edges_only(G, edge_gradient=False, node_colors=None, ax=None, **plot_kwargs): | |
""" | |
Plots only the edges whose 'Layer' is 'Bundled' (or user-defined). | |
Nodes are plotted for reference in black. | |
Parameters: | |
G: NetworkX graph | |
title: Plot title | |
edge_gradient: If True, color edges with gradient | |
node_colors: Dictionary of node colors | |
ax: Optional matplotlib axis to plot on. If None, creates new figure. | |
**plot_kwargs: Additional keyword arguments passed to LineCollection | |
""" | |
# Use provided axis or create new one | |
if ax is None: | |
plt.figure(figsize=(8, 8)) | |
ax = plt.gca() | |
# 1. Extract positions | |
pos = {} | |
for node, data in G.nodes(data=True): | |
x = data.get('X', data.get('x', 0)) | |
y = data.get('Y', data.get('y', 0)) | |
pos[node] = (x, y) | |
# 2. Assign or retrieve node colors. If your graph doesn't already have | |
# some color-coded attribute, you can define them here. | |
# For example, let's just fix them to green for demonstration: | |
# node_colors = {} | |
# for node in G.nodes(): | |
# node_colors[node] = (0.0, 0.5, 0.0, 1.0) # RGBA | |
# 3. Build up segments (and possibly per-segment colors) for the edges | |
def binomial(n, k): | |
"""Compute the binomial coefficient (n choose k).""" | |
coeff = 1 | |
for i in range(1, k + 1): | |
coeff *= (n - i + 1) / i | |
return coeff | |
def approxBezier(points, n=50): | |
""" | |
Compute and return n points along a Bezier curve defined by control points. | |
""" | |
X, Y = [], [] | |
m = len(points) - 1 | |
binom_vals = [binomial(m, i) for i in range(m + 1)] | |
t_values = np.linspace(0, 1, n) | |
for t in t_values: | |
pX, pY = 0.0, 0.0 | |
for i, p in enumerate(points): | |
coeff = binom_vals[i] * ((1 - t) ** (m - i)) * (t ** i) | |
pX += coeff * p[0] | |
pY += coeff * p[1] | |
X.append(pX) | |
Y.append(pY) | |
return np.column_stack([X, Y]) | |
edge_segments = [] | |
edge_colors = [] | |
for u, v, data in G.edges(data=True): | |
if data.get("Layer", None) != "Bundled": | |
# Skip edges not marked as bundled | |
continue | |
# (a) Gather the control points | |
if "Spline" in data and data["Spline"] is not None: | |
spline_obj = data["Spline"] | |
control_points = list(spline_obj.points) | |
# Add the start/end for completeness | |
control_points = [pos[u]] + control_points + [pos[v]] | |
else: | |
# fallback to a straight line | |
control_points = [pos[u], pos[v]] | |
# (b) Approximate a curve from these control points | |
# We always subdivide if edge_gradient is True. | |
# If not gradient-based, only subdivide for an actual curve. | |
do_subdivide = edge_gradient or (len(control_points) > 2) | |
if do_subdivide: | |
curve_points = approxBezier(control_points, n=50) | |
else: | |
curve_points = np.array(control_points) | |
# (c) If we're using gradient, we break it into small segments, each with a color | |
if edge_gradient: | |
c_u = np.array(node_colors[u]) # RGBA for source node | |
c_v = np.array(node_colors[v]) # RGBA for target node | |
num_pts = len(curve_points) | |
for i in range(num_pts - 1): | |
p0 = curve_points[i] | |
p1 = curve_points[i + 1] | |
# fraction along the curve | |
t = i / max(1, (num_pts - 2)) | |
seg_color = (1 - t) * c_u + t * c_v # linear interpolation in RGBA | |
edge_segments.append([p0, p1]) | |
edge_colors.append(seg_color) | |
else: | |
# Single color for the entire edge | |
if len(curve_points) > 1: | |
edge_segments.append([curve_points[0], curve_points[-1]]) | |
edge_colors.append((0.5, 0.0, 0.5, 0.9)) # purple RGBA | |
# 4. Plot | |
# Remove the plt.figure() call since we're using the provided axis | |
# Set default values for LineCollection | |
lc_kwargs = { | |
'linewidths': 1, | |
'alpha': 0.9 | |
} | |
# If colors weren't explicitly passed and we calculated edge_colors, use them | |
if 'colors' not in plot_kwargs and edge_colors: | |
lc_kwargs['colors'] = edge_colors | |
# Update with user-provided kwargs | |
lc_kwargs.update(plot_kwargs) | |
# Create the LineCollection with all parameters | |
lc = LineCollection(edge_segments, **lc_kwargs) | |
ax.add_collection(lc) | |
# The nodes in black | |
# node_positions = np.array([pos[n] for n in G.nodes()]) | |
# ax.scatter(node_positions[:, 0], node_positions[:, 1], color="black", s=20, alpha=0.8) | |
# ax.set_aspect('equal') | |
# Remove plt.show() since we want to allow further additions to the plot | |
############################################################################### | |
# Convenience function to run SpannerBundlingNoSP on a graph and plot results | |
############################################################################### | |
def run_and_plot_spanner_bundling_no_sp(G, weightFactor=2, distortion=2, edge_gradient=False, node_colors=None, ax=None, **plot_kwargs): | |
""" | |
Create an instance of SpannerBundlingNoSP, run .bundle(), and | |
plot only the bundled edges. Pass edge_gradient=True to see | |
color-gradient edges. | |
Additional keyword arguments are passed to the LineCollection for edge styling. | |
""" | |
bundler = SpannerBundlingNoSP(G, weightFactor=weightFactor, distortion=distortion) | |
bundler.bundle() | |
plot_bundled_edges_only(G, | |
edge_gradient=edge_gradient, | |
node_colors=node_colors, | |
ax=ax, | |
**plot_kwargs) | |
def run_hammer_bundling(G, accuracy=500, advect_iterations=50, batch_size=20000, | |
decay=0.01, initial_bandwidth=1.1, iterations=4, | |
max_segment_length=0.016, min_segment_length=0.008, | |
tension=1.2): | |
""" | |
Run hammer bundling on a NetworkX graph and return the bundled paths. | |
""" | |
# Create nodes DataFrame | |
nodes = [] | |
node_to_index = {} | |
for i, (node, attr) in enumerate(G.nodes(data=True)): | |
x = attr.get('X', attr.get('x', 0)) | |
y = attr.get('Y', attr.get('y', 0)) | |
nodes.append({'node': node, 'x': x, 'y': y}) | |
node_to_index[node] = i | |
nodes_df = pd.DataFrame(nodes) | |
# Create edges DataFrame | |
edges = [] | |
for u, v in G.edges(): | |
edges.append({'source': node_to_index[u], 'target': node_to_index[v]}) | |
edges_df = pd.DataFrame(edges) | |
# Apply hammer bundling | |
bundled_paths = hammer_bundle(nodes_df, edges_df, | |
accuracy=accuracy, | |
advect_iterations=advect_iterations, | |
batch_size=batch_size, | |
decay=decay, | |
initial_bandwidth=initial_bandwidth, | |
iterations=iterations, | |
max_segment_length=max_segment_length, | |
min_segment_length=min_segment_length, | |
tension=tension) | |
# Convert bundled paths to a format compatible with our plotting function | |
paths = [] | |
current_path = [] | |
edge_index = 0 | |
for _, row in bundled_paths.iterrows(): | |
if pd.isna(row['x']) or pd.isna(row['y']): | |
if current_path: | |
# Get source and target nodes for this edge | |
source_idx = edges_df.iloc[edge_index]['source'] | |
target_idx = edges_df.iloc[edge_index]['target'] | |
source_node = nodes_df.iloc[source_idx]['node'] | |
target_node = nodes_df.iloc[target_idx]['node'] | |
paths.append((source_node, target_node, current_path)) | |
current_path = [] | |
edge_index += 1 | |
else: | |
current_path.append((row['x'], row['y'])) | |
if current_path: # Handle the last path | |
source_idx = edges_df.iloc[edge_index]['source'] | |
target_idx = edges_df.iloc[edge_index]['target'] | |
source_node = nodes_df.iloc[source_idx]['node'] | |
target_node = nodes_df.iloc[target_idx]['node'] | |
paths.append((source_node, target_node, current_path)) | |
return paths | |
def plot_bundled_edges(G, bundled_paths, edge_gradient=False, node_colors=None, ax=None, **plot_kwargs): | |
""" | |
Generic plotting function that works with both bundling methods. | |
Parameters: | |
G: NetworkX graph | |
bundled_paths: List of (source, target, path_points) tuples | |
edge_gradient: If True, color edges with gradient | |
node_colors: Dictionary of node colors | |
ax: Optional matplotlib axis | |
**plot_kwargs: Additional styling arguments | |
""" | |
if ax is None: | |
plt.figure(figsize=(8, 8)) | |
ax = plt.gca() | |
def approxBezier(points, n=50): | |
"""Compute points along a Bezier curve.""" | |
points = np.array(points) | |
t = np.linspace(0, 1, n) | |
return np.array([(1-t)*points[:-1] + t*points[1:] for t in t]).reshape(-1, 2) | |
edge_segments = [] | |
edge_colors = [] | |
for source, target, path_points in bundled_paths: | |
points = np.array(path_points) | |
if edge_gradient: | |
# Create segments with gradient colors | |
c_u = np.array(node_colors[source]) | |
c_v = np.array(node_colors[target]) | |
num_pts = len(points) | |
for i in range(num_pts - 1): | |
p0, p1 = points[i], points[i + 1] | |
t = i / max(1, (num_pts - 2)) | |
seg_color = (1 - t) * c_u + t * c_v | |
edge_segments.append([p0, p1]) | |
edge_colors.append(seg_color) | |
else: | |
# Single color for the entire path | |
for i in range(len(points) - 1): | |
edge_segments.append([points[i], points[i + 1]]) | |
edge_colors.append((0.5, 0.0, 0.5, 0.9)) | |
# Plot edges | |
lc_kwargs = {'linewidths': 1, 'alpha': 0.9} | |
if edge_colors: | |
lc_kwargs['colors'] = edge_colors | |
lc_kwargs.update(plot_kwargs) | |
lc = LineCollection(edge_segments, **lc_kwargs) | |
ax.add_collection(lc) | |
ax.autoscale() | |
def run_and_plot_bundling(G, method='hammer', edge_gradient=False, node_colors=None, ax=None, | |
bundling_params=None, **plot_kwargs): | |
""" | |
Unified function to run and plot different bundling methods. | |
Parameters: | |
G: NetworkX graph | |
method: 'spanner' or 'hammer' | |
bundling_params: dict of parameters specific to the bundling method | |
Other parameters same as plot_bundled_edges | |
""" | |
bundling_params = bundling_params or {} | |
if method == 'spanner': | |
bundler = SpannerBundlingNoSP(G, **bundling_params) | |
bundler.bundle() | |
# Extract bundled paths from SpannerBundling format | |
bundled_paths = [] | |
for u, v, data in G.edges(data=True): | |
if data.get("Layer") == "Bundled" and "Spline" in data: | |
spline_points = data["Spline"].points | |
pos_u = (G.nodes[u].get('X', G.nodes[u].get('x', 0)), | |
G.nodes[u].get('Y', G.nodes[u].get('y', 0))) | |
pos_v = (G.nodes[v].get('X', G.nodes[v].get('x', 0)), | |
G.nodes[v].get('Y', G.nodes[v].get('y', 0))) | |
path = [pos_u] + list(spline_points) + [pos_v] | |
bundled_paths.append((u, v, path)) | |
elif method == 'hammer': | |
bundled_paths = run_hammer_bundling(G, **bundling_params) | |
else: | |
raise ValueError(f"Unknown bundling method: {method}") | |
plot_bundled_edges(G, bundled_paths, edge_gradient, node_colors, ax, **plot_kwargs) |