import pandas as pd import networkx as nx import tqdm import plotly.graph_objects as go from datasets import load_dataset import pandas as pd def load_graph_from_edge_df( repo_name: str, edge_df: pd.DataFrame, ) -> nx.DiGraph: """ Create a NetworkX directed graph from the dependency edge DataFrame. Uses all edge types for centrality calculation. Args: repo_name: Name of the repository to filter by edge_df: DataFrame with columns [repo_name, target, source, edge_type] Returns: NetworkX DiGraph with edges and edge attributes """ G = nx.DiGraph() repo_edge_df = edge_df[edge_df["repo_name"] == repo_name] # Add edges with attributes (all edge types for accurate centrality) for _, row in repo_edge_df.iterrows(): source = row["source"] target = row["target"] edge_type = row["edge_type"] # Add edge with attributes G.add_edge(source, target, edge_type=edge_type, repo_name=repo_name) return G def init_graphs(): """Initialize graphs from dependency data on startup""" print("Loading dependency data from HuggingFace Hub...") dataset = load_dataset( "lambdaofgod/pwc_github_search", data_files="sample_repo_dependency_records.parquet", ) graph_dependencies_df = dataset["train"].to_pandas() repos = graph_dependencies_df["repo_name"].unique() graphs = dict() print(f"Loading {len(repos)} graphs...") for repo_name in tqdm.tqdm(repos): graph = load_graph_from_edge_df(repo_name, graph_dependencies_df) graphs[repo_name] = graph print("Graphs loaded successfully!") return graphs def get_node_type(node, graph): """Determine node type based on edge relationships""" node_str = str(node) # Check if it's a repository (has '/' and is source of repo-file edges) if "/" in node_str: for _, _, data in graph.edges(node, data=True): if data.get("edge_type") == "repo-file": return "repository" # Check if it's a file (target of repo-file edges or source of file-* edges) if ".py" in node_str: # Check if it's target of repo-file edge for source, target, data in graph.edges(data=True): if target == node and data.get("edge_type") == "repo-file": return "file" # Check if it's source of file-* edges for _, _, data in graph.edges(node, data=True): edge_type = data.get("edge_type", "") if edge_type.startswith("file-"): return "file" # Check if it's an import (target of file-import or source/target of import-import) for source, target, data in graph.edges(data=True): edge_type = data.get("edge_type", "") if (target == node and edge_type == "file-import") or ( edge_type == "import-import" and (source == node or target == node) ): return "import" # Check if it's a class (target of file-class edges or source of class-method/inheritance) for source, target, data in graph.edges(data=True): edge_type = data.get("edge_type", "") if target == node and edge_type == "file-class": return "class" if source == node and edge_type in ["class-method", "inheritance"]: return "class" # Check if it's a function (target of file-function or function-function edges) for source, target, data in graph.edges(data=True): edge_type = data.get("edge_type", "") if target == node and edge_type == "file-function": return "function" if edge_type == "function-function" and (source == node or target == node): return "function" # Check if it's a method (target of class-method edges) for source, target, data in graph.edges(data=True): if target == node and data.get("edge_type") == "class-method": return "method" # Default fallback return "unknown" def create_interactive_plotly_graph( repo_name, graph, layout_type="spring", selected_edge_types=None ): """Create an interactive Plotly graph with node names and edge types""" if selected_edge_types is None: selected_edge_types = set() # Get node positions using selected layout if layout_type == "spring": pos = nx.spring_layout(graph, k=1, iterations=100) elif layout_type == "circular": pos = nx.circular_layout(graph) elif layout_type == "kamada_kawai": pos = nx.kamada_kawai_layout(graph) elif layout_type == "fruchterman_reingold": pos = nx.fruchterman_reingold_layout(graph, k=1, iterations=100) elif layout_type == "shell": pos = nx.shell_layout(graph) elif layout_type == "spectral": pos = nx.spectral_layout(graph) elif layout_type == "planar": try: pos = nx.planar_layout(graph) except nx.NetworkXException: # Fallback to spring layout if graph is not planar pos = nx.spring_layout(graph, k=1, iterations=50) else: pos = nx.spring_layout(graph, k=1, iterations=50) # Filter edges based on selected edge types filtered_edges = [] for edge in graph.edges(data=True): edge_type = edge[2].get("edge_type", "unknown") if not selected_edge_types or edge_type in selected_edge_types: filtered_edges.append(edge) # Extract edges with their data edge_x = [] edge_y = [] edge_info = [] for edge in filtered_edges: x0, y0 = pos[edge[0]] x1, y1 = pos[edge[1]] edge_x.extend([x0, x1, None]) edge_y.extend([y0, y1, None]) # Extract edge type from edge data edge_type = edge[2].get("edge_type", "unknown") edge_info.append(f"{edge[0]} → {edge[1]}
Type: {edge_type}") # Create edge trace edge_trace = go.Scatter( x=edge_x, y=edge_y, line=dict(width=1, color="#888"), hoverinfo="none", mode="lines", name="Edges", ) # Define color scheme for node types node_type_colors = { "repository": "#FF6B6B", # Red "file": "#4ECDC4", # Teal "class": "#45B7D1", # Blue "function": "#96CEB4", # Green "method": "#FFEAA7", # Yellow "import": "#FF9F43", # Orange "unknown": "#DDA0DD", # Plum } # Get nodes that are connected by filtered edges connected_nodes = set() for edge in filtered_edges: connected_nodes.add(edge[0]) connected_nodes.add(edge[1]) # If no edges are selected, show all nodes if not selected_edge_types: connected_nodes = set(graph.nodes()) # Calculate degree statistics for opacity normalization degrees = [graph.degree(node) for node in connected_nodes] min_degree = min(degrees) if degrees else 0 max_degree = max(degrees) if degrees else 1 degree_range = max_degree - min_degree if max_degree > min_degree else 1 # Extract node information node_x = [] node_y = [] node_text = [] node_info = [] node_colors = [] node_types = [] node_sizes = [] node_opacities = [] for node in connected_nodes: x, y = pos[node] node_x.append(x) node_y.append(y) # Determine node type node_type = get_node_type(node, graph) node_types.append(node_type) # Calculate node size based on degree degree = graph.degree(node) # Scale size between 8 and 25 based on degree size = max(8, min(25, 8 + degree * 1.5)) node_sizes.append(size) # Calculate opacity based on normalized degree (0.3 to 1.0) normalized_degree = (degree - min_degree) / degree_range opacity = 0.3 + (normalized_degree * 0.7) # Range from 0.3 to 1.0 node_opacities.append(opacity) # Truncate long node names for display display_name = str(node) if len(display_name) > 30: display_name = display_name[:27] + "..." node_text.append(display_name) node_info.append( f"Node: {node}
Type: {node_type}
Degree: {graph.degree(node)}" ) # Color nodes by type node_colors.append(node_type_colors.get(node_type, node_type_colors["unknown"])) # Create node trace node_trace = go.Scatter( x=node_x, y=node_y, mode="markers+text", hoverinfo="text", hovertext=node_info, text=node_text, textposition="middle center", textfont=dict(size=8, color="rgba(0,0,0,1)"), marker=dict( size=node_sizes, color=node_colors, line=dict(width=1, color="black"), opacity=node_opacities, # Variable opacity based on degree ), name="Nodes", ) # Create the figure fig = go.Figure(data=[edge_trace, node_trace]) fig.update_layout( title=dict( text=f"Interactive Dependency Graph: {repo_name}", font=dict(size=16) ), showlegend=True, hovermode="closest", margin=dict(b=20, l=5, r=5, t=40), annotations=[ dict( text="Hover over nodes for details. Zoom and pan to explore.", showarrow=False, xref="paper", yref="paper", x=0.005, y=-0.002, ) ], xaxis=dict(showgrid=False, zeroline=False, showticklabels=False), yaxis=dict(showgrid=False, zeroline=False, showticklabels=False), plot_bgcolor="white", ) return fig def get_available_edge_types(graph): """Get all unique edge types in the graph""" edge_types = set() for _, _, data in graph.edges(data=True): edge_type = data.get("edge_type", "unknown") edge_types.add(edge_type) # Define preferred order preferred_order = [ "repo-file", "file-class", "file-import", "inheritance", "import-import", "file-function", "class-method", "function-function", ] # Sort edge types according to preferred order, then alphabetically for any others ordered_types = [] for edge_type in preferred_order: if edge_type in edge_types: ordered_types.append(edge_type) edge_types.remove(edge_type) # Add any remaining edge types alphabetically ordered_types.extend(sorted(list(edge_types))) return ordered_types def visualize_graph( repo_name, graphs_dict, layout_type="spring", selected_edge_types=None ): """Visualize the selected repository's graph""" if repo_name not in graphs_dict: return None, f"Repository '{repo_name}' not found in loaded graphs." if repo_name is None: return None, "Please select a repository." graph = graphs_dict[repo_name] # Create interactive Plotly graph fig = create_interactive_plotly_graph( repo_name, graph, layout_type, selected_edge_types ) # Generate statistics for filtered graph edge_types = {} filtered_edge_count = 0 for _, _, data in graph.edges(data=True): edge_type = data.get("edge_type", "unknown") if not selected_edge_types or edge_type in selected_edge_types: edge_types[edge_type] = edge_types.get(edge_type, 0) + 1 filtered_edge_count += 1 edge_type_summary = "\n".join( [f" {edge_type}: {count}" for edge_type, count in edge_types.items()] ) # Generate node type statistics for visible nodes if selected_edge_types: # Get nodes connected by filtered edges connected_nodes = set() for source, target, data in graph.edges(data=True): edge_type = data.get("edge_type", "unknown") if edge_type in selected_edge_types: connected_nodes.add(source) connected_nodes.add(target) else: connected_nodes = set(graph.nodes()) node_types = {} for node in connected_nodes: node_type = get_node_type(node, graph) node_types[node_type] = node_types.get(node_type, 0) + 1 node_type_summary = "\n".join( [f" {node_type}: {count}" for node_type, count in node_types.items()] ) stats = f"""Repository: {repo_name} Visible nodes: {len(connected_nodes)} / {graph.number_of_nodes()} Visible edges: {filtered_edge_count} / {graph.number_of_edges()} Visible node types: {node_type_summary} Visible edge types: {edge_type_summary} """ return fig, stats