|
import pandas as pd |
|
import networkx as nx |
|
import tqdm |
|
import plotly.graph_objects as go |
|
from datasets import load_dataset |
|
import pandas as pd |
|
|
|
|
|
def load_graph_from_edge_df( |
|
repo_name: str, |
|
edge_df: pd.DataFrame, |
|
) -> nx.DiGraph: |
|
""" |
|
Create a NetworkX directed graph from the dependency edge DataFrame. |
|
Uses all edge types for centrality calculation. |
|
|
|
Args: |
|
repo_name: Name of the repository to filter by |
|
edge_df: DataFrame with columns [repo_name, target, source, edge_type] |
|
|
|
Returns: |
|
NetworkX DiGraph with edges and edge attributes |
|
""" |
|
G = nx.DiGraph() |
|
repo_edge_df = edge_df[edge_df["repo_name"] == repo_name] |
|
|
|
|
|
for _, row in repo_edge_df.iterrows(): |
|
source = row["source"] |
|
target = row["target"] |
|
edge_type = row["edge_type"] |
|
|
|
|
|
G.add_edge(source, target, edge_type=edge_type, repo_name=repo_name) |
|
|
|
return G |
|
|
|
|
|
def init_graphs(): |
|
"""Initialize graphs from dependency data on startup""" |
|
print("Loading dependency data from HuggingFace Hub...") |
|
dataset = load_dataset( |
|
"lambdaofgod/pwc_github_search", |
|
data_files="sample_repo_dependency_records.parquet", |
|
) |
|
graph_dependencies_df = dataset["train"].to_pandas() |
|
|
|
repos = graph_dependencies_df["repo_name"].unique() |
|
|
|
graphs = dict() |
|
print(f"Loading {len(repos)} graphs...") |
|
for repo_name in tqdm.tqdm(repos): |
|
graph = load_graph_from_edge_df(repo_name, graph_dependencies_df) |
|
graphs[repo_name] = graph |
|
|
|
print("Graphs loaded successfully!") |
|
return graphs |
|
|
|
|
|
def get_node_type(node, graph): |
|
"""Determine node type based on edge relationships""" |
|
node_str = str(node) |
|
|
|
|
|
if "/" in node_str: |
|
for _, _, data in graph.edges(node, data=True): |
|
if data.get("edge_type") == "repo-file": |
|
return "repository" |
|
|
|
|
|
if ".py" in node_str: |
|
|
|
for source, target, data in graph.edges(data=True): |
|
if target == node and data.get("edge_type") == "repo-file": |
|
return "file" |
|
|
|
for _, _, data in graph.edges(node, data=True): |
|
edge_type = data.get("edge_type", "") |
|
if edge_type.startswith("file-"): |
|
return "file" |
|
|
|
|
|
for source, target, data in graph.edges(data=True): |
|
edge_type = data.get("edge_type", "") |
|
if (target == node and edge_type == "file-import") or ( |
|
edge_type == "import-import" and (source == node or target == node) |
|
): |
|
return "import" |
|
|
|
|
|
for source, target, data in graph.edges(data=True): |
|
edge_type = data.get("edge_type", "") |
|
if target == node and edge_type == "file-class": |
|
return "class" |
|
if source == node and edge_type in ["class-method", "inheritance"]: |
|
return "class" |
|
|
|
|
|
for source, target, data in graph.edges(data=True): |
|
edge_type = data.get("edge_type", "") |
|
if target == node and edge_type == "file-function": |
|
return "function" |
|
if edge_type == "function-function" and (source == node or target == node): |
|
return "function" |
|
|
|
|
|
for source, target, data in graph.edges(data=True): |
|
if target == node and data.get("edge_type") == "class-method": |
|
return "method" |
|
|
|
|
|
return "unknown" |
|
|
|
|
|
def create_interactive_plotly_graph( |
|
repo_name, graph, layout_type="spring", selected_edge_types=None |
|
): |
|
"""Create an interactive Plotly graph with node names and edge types""" |
|
if selected_edge_types is None: |
|
selected_edge_types = set() |
|
|
|
if layout_type == "spring": |
|
pos = nx.spring_layout(graph, k=1, iterations=100) |
|
elif layout_type == "circular": |
|
pos = nx.circular_layout(graph) |
|
elif layout_type == "kamada_kawai": |
|
pos = nx.kamada_kawai_layout(graph) |
|
elif layout_type == "fruchterman_reingold": |
|
pos = nx.fruchterman_reingold_layout(graph, k=1, iterations=100) |
|
elif layout_type == "shell": |
|
pos = nx.shell_layout(graph) |
|
elif layout_type == "spectral": |
|
pos = nx.spectral_layout(graph) |
|
elif layout_type == "planar": |
|
try: |
|
pos = nx.planar_layout(graph) |
|
except nx.NetworkXException: |
|
|
|
pos = nx.spring_layout(graph, k=1, iterations=50) |
|
else: |
|
pos = nx.spring_layout(graph, k=1, iterations=50) |
|
|
|
|
|
filtered_edges = [] |
|
for edge in graph.edges(data=True): |
|
edge_type = edge[2].get("edge_type", "unknown") |
|
if not selected_edge_types or edge_type in selected_edge_types: |
|
filtered_edges.append(edge) |
|
|
|
|
|
edge_x = [] |
|
edge_y = [] |
|
edge_info = [] |
|
|
|
for edge in filtered_edges: |
|
x0, y0 = pos[edge[0]] |
|
x1, y1 = pos[edge[1]] |
|
edge_x.extend([x0, x1, None]) |
|
edge_y.extend([y0, y1, None]) |
|
|
|
|
|
edge_type = edge[2].get("edge_type", "unknown") |
|
edge_info.append(f"{edge[0]} → {edge[1]}<br>Type: {edge_type}") |
|
|
|
|
|
edge_trace = go.Scatter( |
|
x=edge_x, |
|
y=edge_y, |
|
line=dict(width=1, color="#888"), |
|
hoverinfo="none", |
|
mode="lines", |
|
name="Edges", |
|
) |
|
|
|
|
|
node_type_colors = { |
|
"repository": "#FF6B6B", |
|
"file": "#4ECDC4", |
|
"class": "#45B7D1", |
|
"function": "#96CEB4", |
|
"method": "#FFEAA7", |
|
"import": "#FF9F43", |
|
"unknown": "#DDA0DD", |
|
} |
|
|
|
|
|
connected_nodes = set() |
|
for edge in filtered_edges: |
|
connected_nodes.add(edge[0]) |
|
connected_nodes.add(edge[1]) |
|
|
|
|
|
if not selected_edge_types: |
|
connected_nodes = set(graph.nodes()) |
|
|
|
|
|
degrees = [graph.degree(node) for node in connected_nodes] |
|
min_degree = min(degrees) if degrees else 0 |
|
max_degree = max(degrees) if degrees else 1 |
|
degree_range = max_degree - min_degree if max_degree > min_degree else 1 |
|
|
|
|
|
node_x = [] |
|
node_y = [] |
|
node_text = [] |
|
node_info = [] |
|
node_colors = [] |
|
node_types = [] |
|
node_sizes = [] |
|
node_opacities = [] |
|
|
|
for node in connected_nodes: |
|
x, y = pos[node] |
|
node_x.append(x) |
|
node_y.append(y) |
|
|
|
|
|
node_type = get_node_type(node, graph) |
|
node_types.append(node_type) |
|
|
|
|
|
degree = graph.degree(node) |
|
|
|
size = max(8, min(25, 8 + degree * 1.5)) |
|
node_sizes.append(size) |
|
|
|
|
|
normalized_degree = (degree - min_degree) / degree_range |
|
opacity = 0.3 + (normalized_degree * 0.7) |
|
node_opacities.append(opacity) |
|
|
|
|
|
display_name = str(node) |
|
if len(display_name) > 30: |
|
display_name = display_name[:27] + "..." |
|
|
|
node_text.append(display_name) |
|
node_info.append( |
|
f"Node: {node}<br>Type: {node_type}<br>Degree: {graph.degree(node)}" |
|
) |
|
|
|
|
|
node_colors.append(node_type_colors.get(node_type, node_type_colors["unknown"])) |
|
|
|
|
|
node_trace = go.Scatter( |
|
x=node_x, |
|
y=node_y, |
|
mode="markers+text", |
|
hoverinfo="text", |
|
hovertext=node_info, |
|
text=node_text, |
|
textposition="middle center", |
|
textfont=dict(size=8, color="rgba(0,0,0,1)"), |
|
marker=dict( |
|
size=node_sizes, |
|
color=node_colors, |
|
line=dict(width=1, color="black"), |
|
opacity=node_opacities, |
|
), |
|
name="Nodes", |
|
) |
|
|
|
|
|
fig = go.Figure(data=[edge_trace, node_trace]) |
|
|
|
fig.update_layout( |
|
title=dict( |
|
text=f"Interactive Dependency Graph: {repo_name}", font=dict(size=16) |
|
), |
|
showlegend=True, |
|
hovermode="closest", |
|
margin=dict(b=20, l=5, r=5, t=40), |
|
annotations=[ |
|
dict( |
|
text="Hover over nodes for details. Zoom and pan to explore.", |
|
showarrow=False, |
|
xref="paper", |
|
yref="paper", |
|
x=0.005, |
|
y=-0.002, |
|
) |
|
], |
|
xaxis=dict(showgrid=False, zeroline=False, showticklabels=False), |
|
yaxis=dict(showgrid=False, zeroline=False, showticklabels=False), |
|
plot_bgcolor="white", |
|
) |
|
|
|
return fig |
|
|
|
|
|
def get_available_edge_types(graph): |
|
"""Get all unique edge types in the graph""" |
|
edge_types = set() |
|
for _, _, data in graph.edges(data=True): |
|
edge_type = data.get("edge_type", "unknown") |
|
edge_types.add(edge_type) |
|
|
|
|
|
preferred_order = [ |
|
"repo-file", |
|
"file-class", |
|
"file-import", |
|
"inheritance", |
|
"import-import", |
|
"file-function", |
|
"class-method", |
|
"function-function", |
|
] |
|
|
|
|
|
ordered_types = [] |
|
for edge_type in preferred_order: |
|
if edge_type in edge_types: |
|
ordered_types.append(edge_type) |
|
edge_types.remove(edge_type) |
|
|
|
|
|
ordered_types.extend(sorted(list(edge_types))) |
|
|
|
return ordered_types |
|
|
|
|
|
def visualize_graph( |
|
repo_name, graphs_dict, layout_type="spring", selected_edge_types=None |
|
): |
|
"""Visualize the selected repository's graph""" |
|
if repo_name not in graphs_dict: |
|
return None, f"Repository '{repo_name}' not found in loaded graphs." |
|
|
|
if repo_name is None: |
|
return None, "Please select a repository." |
|
|
|
graph = graphs_dict[repo_name] |
|
|
|
|
|
fig = create_interactive_plotly_graph( |
|
repo_name, graph, layout_type, selected_edge_types |
|
) |
|
|
|
|
|
edge_types = {} |
|
filtered_edge_count = 0 |
|
for _, _, data in graph.edges(data=True): |
|
edge_type = data.get("edge_type", "unknown") |
|
if not selected_edge_types or edge_type in selected_edge_types: |
|
edge_types[edge_type] = edge_types.get(edge_type, 0) + 1 |
|
filtered_edge_count += 1 |
|
|
|
edge_type_summary = "\n".join( |
|
[f" {edge_type}: {count}" for edge_type, count in edge_types.items()] |
|
) |
|
|
|
|
|
if selected_edge_types: |
|
|
|
connected_nodes = set() |
|
for source, target, data in graph.edges(data=True): |
|
edge_type = data.get("edge_type", "unknown") |
|
if edge_type in selected_edge_types: |
|
connected_nodes.add(source) |
|
connected_nodes.add(target) |
|
else: |
|
connected_nodes = set(graph.nodes()) |
|
|
|
node_types = {} |
|
for node in connected_nodes: |
|
node_type = get_node_type(node, graph) |
|
node_types[node_type] = node_types.get(node_type, 0) + 1 |
|
|
|
node_type_summary = "\n".join( |
|
[f" {node_type}: {count}" for node_type, count in node_types.items()] |
|
) |
|
|
|
stats = f"""Repository: {repo_name} |
|
Visible nodes: {len(connected_nodes)} / {graph.number_of_nodes()} |
|
Visible edges: {filtered_edge_count} / {graph.number_of_edges()} |
|
|
|
Visible node types: |
|
{node_type_summary} |
|
|
|
Visible edge types: |
|
{edge_type_summary} |
|
""" |
|
|
|
return fig, stats |
|
|