"""Helpers for the **View Clusters** tab – both the interactive HTML and fallback dataframe view.""" from typing import List import pandas as pd import ast from .state import app_state from .utils import ( search_clusters_by_text, search_clusters_only, create_interactive_cluster_viewer, get_cluster_statistics, format_cluster_dataframe, extract_allowed_tag, ) __all__ = ["view_clusters_interactive", "view_clusters_table"] # --------------------------------------------------------------------------- # Interactive HTML view # --------------------------------------------------------------------------- def view_clusters_interactive( selected_models: List[str], cluster_level: str, search_term: str = "", selected_tags: List[str] | None = None, ) -> str: if app_state["clustered_df"] is None: return ( "
❌ Please load data first " "using the 'Load Data' tab
" ) df = app_state["clustered_df"].dropna(subset=["property_description"]).copy() # Apply search filter first if search_term and search_term.strip(): df = search_clusters_only(df, search_term.strip(), cluster_level) # Optional tags filter – only keep rows whose meta resolves to an allowed tag in selected_tags if selected_tags and len(selected_tags) > 0 and 'meta' in df.columns: def _first_allowed_tag(obj): return extract_allowed_tag(obj) # Check if all meta are empty dicts (means no tags) def _parse_try(obj): if isinstance(obj, str): try: return ast.literal_eval(obj) except Exception: return obj return obj parsed_meta = df['meta'].apply(_parse_try) non_null_parsed = [m for m in parsed_meta.tolist() if m is not None] all_empty_dicts = ( len(non_null_parsed) > 0 and all(isinstance(m, dict) and len(m) == 0 for m in non_null_parsed) ) if not all_empty_dicts: allowed = set(map(str, selected_tags)) df = df[df['meta'].apply(_first_allowed_tag).astype(str).isin(allowed)] # Build interactive viewer cluster_html = create_interactive_cluster_viewer(df, selected_models, cluster_level) # Statistics summary at the top stats = get_cluster_statistics(df, selected_models) if not stats: return ( "❌ No cluster data available
" ) # Get additional metrics from cluster_scores cluster_scores = app_state.get("metrics", {}).get("cluster_scores", {}) # Calculate average quality scores and frequency total_frequency = 0 quality_scores_list = [] metric_names = set() for cluster_name, cluster_data in cluster_scores.items(): total_frequency += cluster_data.get("proportion", 0) * 100 quality_scores = cluster_data.get("quality", {}) if quality_scores: quality_scores_list.extend(quality_scores.values()) metric_names.update(quality_scores.keys()) avg_quality = sum(quality_scores_list) / len(quality_scores_list) if quality_scores_list else 0 metrics_suffix = f" ({', '.join(sorted(metric_names))})" if metric_names else "" stats_html = f"""