import pandas as pd
import networkx as nx
import tqdm
import plotly.graph_objects as go
from datasets import load_dataset
import pandas as pd
def load_graph_from_edge_df(
repo_name: str,
edge_df: pd.DataFrame,
) -> nx.DiGraph:
"""
Create a NetworkX directed graph from the dependency edge DataFrame.
Uses all edge types for centrality calculation.
Args:
repo_name: Name of the repository to filter by
edge_df: DataFrame with columns [repo_name, target, source, edge_type]
Returns:
NetworkX DiGraph with edges and edge attributes
"""
G = nx.DiGraph()
repo_edge_df = edge_df[edge_df["repo_name"] == repo_name]
# Add edges with attributes (all edge types for accurate centrality)
for _, row in repo_edge_df.iterrows():
source = row["source"]
target = row["target"]
edge_type = row["edge_type"]
# Add edge with attributes
G.add_edge(source, target, edge_type=edge_type, repo_name=repo_name)
return G
def init_graphs():
"""Initialize graphs from dependency data on startup"""
print("Loading dependency data from HuggingFace Hub...")
dataset = load_dataset(
"lambdaofgod/pwc_github_search",
data_files="sample_repo_dependency_records.parquet",
)
graph_dependencies_df = dataset["train"].to_pandas()
repos = graph_dependencies_df["repo_name"].unique()
graphs = dict()
print(f"Loading {len(repos)} graphs...")
for repo_name in tqdm.tqdm(repos):
graph = load_graph_from_edge_df(repo_name, graph_dependencies_df)
graphs[repo_name] = graph
print("Graphs loaded successfully!")
return graphs
def get_node_type(node, graph):
"""Determine node type based on edge relationships"""
node_str = str(node)
# Check if it's a repository (has '/' and is source of repo-file edges)
if "/" in node_str:
for _, _, data in graph.edges(node, data=True):
if data.get("edge_type") == "repo-file":
return "repository"
# Check if it's a file (target of repo-file edges or source of file-* edges)
if ".py" in node_str:
# Check if it's target of repo-file edge
for source, target, data in graph.edges(data=True):
if target == node and data.get("edge_type") == "repo-file":
return "file"
# Check if it's source of file-* edges
for _, _, data in graph.edges(node, data=True):
edge_type = data.get("edge_type", "")
if edge_type.startswith("file-"):
return "file"
# Check if it's an import (target of file-import or source/target of import-import)
for source, target, data in graph.edges(data=True):
edge_type = data.get("edge_type", "")
if (target == node and edge_type == "file-import") or (
edge_type == "import-import" and (source == node or target == node)
):
return "import"
# Check if it's a class (target of file-class edges or source of class-method/inheritance)
for source, target, data in graph.edges(data=True):
edge_type = data.get("edge_type", "")
if target == node and edge_type == "file-class":
return "class"
if source == node and edge_type in ["class-method", "inheritance"]:
return "class"
# Check if it's a function (target of file-function or function-function edges)
for source, target, data in graph.edges(data=True):
edge_type = data.get("edge_type", "")
if target == node and edge_type == "file-function":
return "function"
if edge_type == "function-function" and (source == node or target == node):
return "function"
# Check if it's a method (target of class-method edges)
for source, target, data in graph.edges(data=True):
if target == node and data.get("edge_type") == "class-method":
return "method"
# Default fallback
return "unknown"
def create_interactive_plotly_graph(
repo_name, graph, layout_type="spring", selected_edge_types=None
):
"""Create an interactive Plotly graph with node names and edge types"""
if selected_edge_types is None:
selected_edge_types = set()
# Get node positions using selected layout
if layout_type == "spring":
pos = nx.spring_layout(graph, k=1, iterations=100)
elif layout_type == "circular":
pos = nx.circular_layout(graph)
elif layout_type == "kamada_kawai":
pos = nx.kamada_kawai_layout(graph)
elif layout_type == "fruchterman_reingold":
pos = nx.fruchterman_reingold_layout(graph, k=1, iterations=100)
elif layout_type == "shell":
pos = nx.shell_layout(graph)
elif layout_type == "spectral":
pos = nx.spectral_layout(graph)
elif layout_type == "planar":
try:
pos = nx.planar_layout(graph)
except nx.NetworkXException:
# Fallback to spring layout if graph is not planar
pos = nx.spring_layout(graph, k=1, iterations=50)
else:
pos = nx.spring_layout(graph, k=1, iterations=50)
# Filter edges based on selected edge types
filtered_edges = []
for edge in graph.edges(data=True):
edge_type = edge[2].get("edge_type", "unknown")
if not selected_edge_types or edge_type in selected_edge_types:
filtered_edges.append(edge)
# Extract edges with their data
edge_x = []
edge_y = []
edge_info = []
for edge in filtered_edges:
x0, y0 = pos[edge[0]]
x1, y1 = pos[edge[1]]
edge_x.extend([x0, x1, None])
edge_y.extend([y0, y1, None])
# Extract edge type from edge data
edge_type = edge[2].get("edge_type", "unknown")
edge_info.append(f"{edge[0]} → {edge[1]}
Type: {edge_type}")
# Create edge trace
edge_trace = go.Scatter(
x=edge_x,
y=edge_y,
line=dict(width=1, color="#888"),
hoverinfo="none",
mode="lines",
name="Edges",
)
# Define color scheme for node types
node_type_colors = {
"repository": "#FF6B6B", # Red
"file": "#4ECDC4", # Teal
"class": "#45B7D1", # Blue
"function": "#96CEB4", # Green
"method": "#FFEAA7", # Yellow
"import": "#FF9F43", # Orange
"unknown": "#DDA0DD", # Plum
}
# Get nodes that are connected by filtered edges
connected_nodes = set()
for edge in filtered_edges:
connected_nodes.add(edge[0])
connected_nodes.add(edge[1])
# If no edges are selected, show all nodes
if not selected_edge_types:
connected_nodes = set(graph.nodes())
# Calculate degree statistics for opacity normalization
degrees = [graph.degree(node) for node in connected_nodes]
min_degree = min(degrees) if degrees else 0
max_degree = max(degrees) if degrees else 1
degree_range = max_degree - min_degree if max_degree > min_degree else 1
# Extract node information
node_x = []
node_y = []
node_text = []
node_info = []
node_colors = []
node_types = []
node_sizes = []
node_opacities = []
for node in connected_nodes:
x, y = pos[node]
node_x.append(x)
node_y.append(y)
# Determine node type
node_type = get_node_type(node, graph)
node_types.append(node_type)
# Calculate node size based on degree
degree = graph.degree(node)
# Scale size between 8 and 25 based on degree
size = max(8, min(25, 8 + degree * 1.5))
node_sizes.append(size)
# Calculate opacity based on normalized degree (0.3 to 1.0)
normalized_degree = (degree - min_degree) / degree_range
opacity = 0.3 + (normalized_degree * 0.7) # Range from 0.3 to 1.0
node_opacities.append(opacity)
# Truncate long node names for display
display_name = str(node)
if len(display_name) > 30:
display_name = display_name[:27] + "..."
node_text.append(display_name)
node_info.append(
f"Node: {node}
Type: {node_type}
Degree: {graph.degree(node)}"
)
# Color nodes by type
node_colors.append(node_type_colors.get(node_type, node_type_colors["unknown"]))
# Create node trace
node_trace = go.Scatter(
x=node_x,
y=node_y,
mode="markers+text",
hoverinfo="text",
hovertext=node_info,
text=node_text,
textposition="middle center",
textfont=dict(size=8, color="rgba(0,0,0,1)"),
marker=dict(
size=node_sizes,
color=node_colors,
line=dict(width=1, color="black"),
opacity=node_opacities, # Variable opacity based on degree
),
name="Nodes",
)
# Create the figure
fig = go.Figure(data=[edge_trace, node_trace])
fig.update_layout(
title=dict(
text=f"Interactive Dependency Graph: {repo_name}", font=dict(size=16)
),
showlegend=True,
hovermode="closest",
margin=dict(b=20, l=5, r=5, t=40),
annotations=[
dict(
text="Hover over nodes for details. Zoom and pan to explore.",
showarrow=False,
xref="paper",
yref="paper",
x=0.005,
y=-0.002,
)
],
xaxis=dict(showgrid=False, zeroline=False, showticklabels=False),
yaxis=dict(showgrid=False, zeroline=False, showticklabels=False),
plot_bgcolor="white",
)
return fig
def get_available_edge_types(graph):
"""Get all unique edge types in the graph"""
edge_types = set()
for _, _, data in graph.edges(data=True):
edge_type = data.get("edge_type", "unknown")
edge_types.add(edge_type)
# Define preferred order
preferred_order = [
"repo-file",
"file-class",
"file-import",
"inheritance",
"import-import",
"file-function",
"class-method",
"function-function",
]
# Sort edge types according to preferred order, then alphabetically for any others
ordered_types = []
for edge_type in preferred_order:
if edge_type in edge_types:
ordered_types.append(edge_type)
edge_types.remove(edge_type)
# Add any remaining edge types alphabetically
ordered_types.extend(sorted(list(edge_types)))
return ordered_types
def visualize_graph(
repo_name, graphs_dict, layout_type="spring", selected_edge_types=None
):
"""Visualize the selected repository's graph"""
if repo_name not in graphs_dict:
return None, f"Repository '{repo_name}' not found in loaded graphs."
if repo_name is None:
return None, "Please select a repository."
graph = graphs_dict[repo_name]
# Create interactive Plotly graph
fig = create_interactive_plotly_graph(
repo_name, graph, layout_type, selected_edge_types
)
# Generate statistics for filtered graph
edge_types = {}
filtered_edge_count = 0
for _, _, data in graph.edges(data=True):
edge_type = data.get("edge_type", "unknown")
if not selected_edge_types or edge_type in selected_edge_types:
edge_types[edge_type] = edge_types.get(edge_type, 0) + 1
filtered_edge_count += 1
edge_type_summary = "\n".join(
[f" {edge_type}: {count}" for edge_type, count in edge_types.items()]
)
# Generate node type statistics for visible nodes
if selected_edge_types:
# Get nodes connected by filtered edges
connected_nodes = set()
for source, target, data in graph.edges(data=True):
edge_type = data.get("edge_type", "unknown")
if edge_type in selected_edge_types:
connected_nodes.add(source)
connected_nodes.add(target)
else:
connected_nodes = set(graph.nodes())
node_types = {}
for node in connected_nodes:
node_type = get_node_type(node, graph)
node_types[node_type] = node_types.get(node_type, 0) + 1
node_type_summary = "\n".join(
[f" {node_type}: {count}" for node_type, count in node_types.items()]
)
stats = f"""Repository: {repo_name}
Visible nodes: {len(connected_nodes)} / {graph.number_of_nodes()}
Visible edges: {filtered_edge_count} / {graph.number_of_edges()}
Visible node types:
{node_type_summary}
Visible edge types:
{edge_type_summary}
"""
return fig, stats