Spaces:
Running
Running
import os | |
import warnings | |
import numpy as np | |
import pandas as pd | |
import plotly.graph_objects as go | |
from plotly.subplots import make_subplots | |
from umap import UMAP | |
from sklearn.cluster import KMeans | |
from scipy.stats import entropy as shannon_entropy | |
from scipy import special as sp_special | |
from scipy.interpolate import griddata | |
from sklearn.metrics.pairwise import cosine_similarity | |
from scipy.spatial.distance import cdist | |
import soundfile as sf | |
import gradio as gr | |
# ================================================================ | |
# Unified Communication Manifold Explorer & CMT Visualizer v4.0 | |
# - Adds side-by-side comparison capabilities from HTML draft | |
# - Implements cross-species neighbor finding for grammar mapping | |
# - Separates human and dog audio with automatic pairing | |
# - Enhanced dual visualization for comparative analysis | |
# ================================================================ | |
# - Adds Interactive Holography tab for full field reconstruction. | |
# - Interpolates the continuous CMT state-space (Φ field). | |
# - Visualizes topology, vector flow, and phase interference. | |
# - Adds informational-entropy-geometry visualization. | |
# - Prioritizes specific Colab paths for data loading. | |
# ================================================================ | |
warnings.filterwarnings("ignore", category=FutureWarning) | |
warnings.filterwarnings("ignore", category=UserWarning) | |
print("Initializing the Interactive CMT Holography Explorer...") | |
# --------------------------------------------------------------- | |
# Data setup | |
# --------------------------------------------------------------- | |
# Paths for local execution (used for dummy data generation fallback) | |
BASE_DIR = os.path.abspath(os.getcwd()) | |
DATA_DIR = os.path.join(BASE_DIR, "data") | |
DOG_DIR = os.path.join(DATA_DIR, "dog") | |
HUMAN_DIR = os.path.join(DATA_DIR, "human") | |
# Paths for different deployment environments | |
# Priority order: 1) Hugging Face Spaces (repo root), 2) Colab, 3) Local | |
HF_CSV_DOG = "cmt_dog_sound_analysis.csv" | |
HF_CSV_HUMAN = "cmt_human_speech_analysis.csv" | |
COLAB_CSV_DOG = "/content/cmt_dog_sound_analysis.csv" | |
COLAB_CSV_HUMAN = "/content/cmt_human_speech_analysis.csv" | |
# Determine which environment we're in and set paths accordingly | |
if os.path.exists(HF_CSV_DOG) and os.path.exists(HF_CSV_HUMAN): | |
# Hugging Face Spaces - files in repo root | |
CSV_DOG = HF_CSV_DOG | |
CSV_HUMAN = HF_CSV_HUMAN | |
print("Using Hugging Face Spaces paths") | |
elif os.path.exists(COLAB_CSV_DOG) and os.path.exists(COLAB_CSV_HUMAN): | |
# Google Colab environment | |
CSV_DOG = COLAB_CSV_DOG | |
CSV_HUMAN = COLAB_CSV_HUMAN | |
print("Using Google Colab paths") | |
else: | |
# Fallback to local or will trigger dummy data | |
CSV_DOG = HF_CSV_DOG # Try repo root first | |
CSV_HUMAN = HF_CSV_HUMAN | |
print("Falling back to local/dummy data paths") | |
# These are for creating dummy audio files if needed | |
os.makedirs(DOG_DIR, exist_ok=True) | |
os.makedirs(os.path.join(HUMAN_DIR, "Actor_01"), exist_ok=True) | |
# --- Audio Data Configuration (Platform-aware paths) --- | |
# For Hugging Face Spaces, audio files might be in the repo or need different handling | |
# For Colab, they're in Google Drive | |
if os.path.exists("/content/drive/MyDrive/combined"): | |
# Google Colab with mounted Drive | |
DOG_AUDIO_BASE_PATH = '/content/drive/MyDrive/combined' | |
HUMAN_AUDIO_BASE_PATH = '/content/drive/MyDrive/human' | |
print("Using Google Drive audio paths") | |
elif os.path.exists("combined") and os.path.exists("human"): | |
# Hugging Face Spaces with audio in repo root | |
DOG_AUDIO_BASE_PATH = 'combined' | |
HUMAN_AUDIO_BASE_PATH = 'human' | |
print("Using Hugging Face Spaces audio paths (repo root)") | |
elif os.path.exists("audio/combined"): | |
# Alternative Hugging Face Spaces location | |
DOG_AUDIO_BASE_PATH = 'audio/combined' | |
HUMAN_AUDIO_BASE_PATH = 'audio/human' | |
print("Using Hugging Face Spaces audio paths (audio subdir)") | |
else: | |
# Fallback to local dummy paths | |
DOG_AUDIO_BASE_PATH = DOG_DIR | |
HUMAN_AUDIO_BASE_PATH = HUMAN_DIR | |
print("Using local dummy audio paths") | |
print(f"Audio base paths configured:") | |
print(f"- Dog audio base: {DOG_AUDIO_BASE_PATH}") | |
print(f"- Human audio base: {HUMAN_AUDIO_BASE_PATH}") | |
# --------------------------------------------------------------- | |
# Cross-Species Analysis Functions | |
# --------------------------------------------------------------- | |
def find_nearest_cross_species_neighbor(selected_row, df_combined, n_neighbors=5): | |
""" | |
Finds the closest neighbor from the opposite species using feature similarity. | |
This enables cross-species pattern mapping for grammar development. | |
""" | |
selected_source = selected_row['source'] | |
opposite_source = 'Human' if selected_source == 'Dog' else 'Dog' | |
# Get feature columns for similarity calculation | |
feature_cols = [c for c in df_combined.columns if c.startswith("feature_")] | |
if not feature_cols: | |
# Fallback to any numeric columns if no feature columns exist | |
numeric_cols = df_combined.select_dtypes(include=[np.number]).columns | |
feature_cols = [c for c in numeric_cols if c not in ['x', 'y', 'z', 'cluster']] | |
if not feature_cols: | |
# Random selection if no suitable features found | |
opposite_species_data = df_combined[df_combined['source'] == opposite_source] | |
if len(opposite_species_data) > 0: | |
return opposite_species_data.iloc[0] | |
return None | |
# Extract features for the selected row | |
selected_features = selected_row[feature_cols].values.reshape(1, -1) | |
selected_features = np.nan_to_num(selected_features) | |
# Get all rows from the opposite species | |
opposite_species_data = df_combined[df_combined['source'] == opposite_source] | |
if len(opposite_species_data) == 0: | |
return None | |
# Extract features for opposite species | |
opposite_features = opposite_species_data[feature_cols].values | |
opposite_features = np.nan_to_num(opposite_features) | |
# Calculate cosine similarity (better for high-dimensional feature spaces) | |
similarities = cosine_similarity(selected_features, opposite_features)[0] | |
# Find the index of the most similar neighbor | |
most_similar_idx = np.argmax(similarities) | |
nearest_neighbor = opposite_species_data.iloc[most_similar_idx] | |
return nearest_neighbor | |
# --------------------------------------------------------------- | |
# Load datasets (Colab-first paths) | |
# --------------------------------------------------------------- | |
# Debug: Show what files we're looking for and what exists | |
print(f"Looking for CSV files:") | |
print(f"- Dog CSV: {CSV_DOG} (exists: {os.path.exists(CSV_DOG)})") | |
print(f"- Human CSV: {CSV_HUMAN} (exists: {os.path.exists(CSV_HUMAN)})") | |
print(f"Current working directory: {os.getcwd()}") | |
print(f"Files in current directory: {os.listdir('.')}") | |
if os.path.exists(CSV_DOG) and os.path.exists(CSV_HUMAN): | |
print(f"✅ Found existing data files. Loading from:\n- {CSV_DOG}\n- {CSV_HUMAN}") | |
df_dog = pd.read_csv(CSV_DOG) | |
df_human = pd.read_csv(CSV_HUMAN) | |
print(f"Successfully loaded data: {len(df_dog)} dog rows, {len(df_human)} human rows") | |
else: | |
print("❌ Could not find one or both CSV files. Generating and using in-memory dummy data.") | |
# This section is for DUMMY DATA GENERATION ONLY. | |
# It runs if the primary CSVs are not found and does NOT write files. | |
n_dummy_items_per_category = 50 | |
rng = np.random.default_rng(42) | |
# Ensure labels match the exact number of items | |
base_dog_labels = ["bark", "growl", "whine", "pant"] | |
base_human_labels = ["speech", "laugh", "cry", "shout"] | |
dog_labels = [base_dog_labels[i % len(base_dog_labels)] for i in range(n_dummy_items_per_category)] | |
human_labels = [base_human_labels[i % len(base_human_labels)] for i in range(n_dummy_items_per_category)] | |
dog_rows = { | |
"feature_1": rng.random(n_dummy_items_per_category), "feature_2": rng.random(n_dummy_items_per_category), "feature_3": rng.random(n_dummy_items_per_category), | |
"label": dog_labels, "filepath": [f"dog_{i}.wav" for i in range(n_dummy_items_per_category)], | |
"diag_srl_gamma": rng.uniform(0.5, 5.0, n_dummy_items_per_category), "diag_alpha_gamma": rng.uniform(0.1, 2.0, n_dummy_items_per_category), | |
"zeta_curvature": rng.uniform(-1, 1, n_dummy_items_per_category), "torsion_index": rng.uniform(0, 1, n_dummy_items_per_category), | |
} | |
human_rows = { | |
"feature_1": rng.random(n_dummy_items_per_category), "feature_2": rng.random(n_dummy_items_per_category), "feature_3": rng.random(n_dummy_items_per_category), | |
"label": human_labels, "filepath": [f"human_{i}.wav" for i in range(n_dummy_items_per_category)], | |
"diag_srl_gamma": rng.uniform(0.5, 5.0, n_dummy_items_per_category), "diag_alpha_gamma": rng.uniform(0.1, 2.0, n_dummy_items_per_category), | |
"zeta_curvature": rng.uniform(-1, 1, n_dummy_items_per_category), "torsion_index": rng.uniform(0, 1, n_dummy_items_per_category), | |
} | |
df_dog = pd.DataFrame(dog_rows) | |
df_human = pd.DataFrame(human_rows) | |
# We still create dummy audio files for the UI to use if needed | |
sr = 22050 | |
dur = 2.0 | |
t = np.linspace(0, dur, int(sr * dur), endpoint=False) | |
for i in range(n_dummy_items_per_category): | |
tone_freq = 220 + 20 * (i % 5) | |
audio = 0.1 * np.sin(2 * np.pi * tone_freq * t) + 0.02 * rng.standard_normal(t.shape) | |
audio = audio / (np.max(np.abs(audio)) + 1e-9) | |
dog_label = dog_labels[i] | |
dog_label_dir = os.path.join(DOG_DIR, dog_label) | |
os.makedirs(dog_label_dir, exist_ok=True) | |
sf.write(os.path.join(dog_label_dir, f"dog_{i}.wav"), audio, sr) | |
sf.write(os.path.join(HUMAN_DIR, "Actor_01", f"human_{i}.wav"), audio, sr) | |
print(f"Loaded {len(df_dog)} dog rows and {len(df_human)} human rows.") | |
df_dog["source"], df_human["source"] = "Dog", "Human" | |
df_combined = pd.concat([df_dog, df_human], ignore_index=True) | |
# --------------------------------------------------------------- | |
# Expanded CMT implementation | |
# --------------------------------------------------------------- | |
class ExpandedCMT: | |
def __init__(self): | |
self.c1, self.c2 = 0.587 + 1.223j, -0.994 + 0.0j | |
# A large but finite number to represent the pole at z=1 for Zeta | |
self.ZETA_POLE_REGULARIZATION = 1e6 - 1e6j | |
self.lens_library = { | |
"gamma": sp_special.gamma, | |
"zeta": self._regularized_zeta, # Use the robust zeta function | |
"airy": lambda z: sp_special.airy(z)[0], | |
"bessel": lambda z: sp_special.jv(0, z), | |
} | |
def _regularized_zeta(self, z: np.ndarray) -> np.ndarray: | |
""" | |
A wrapper around scipy's zeta function to handle the pole at z=1. | |
""" | |
# Create a copy to avoid modifying the original array | |
z_out = np.copy(z).astype(np.complex128) | |
# Find where the real part is close to 1 and the imaginary part is close to 0 | |
pole_condition = np.isclose(np.real(z), 1.0) & np.isclose(np.imag(z), 0.0) | |
# Apply the standard zeta function to non-pole points | |
non_pole_points = ~pole_condition | |
z_out[non_pole_points] = sp_special.zeta(z[non_pole_points], 1) | |
# Apply the regularization constant to the pole points | |
z_out[pole_condition] = self.ZETA_POLE_REGULARIZATION | |
return z_out | |
def _robust_normalize(self, signal: np.ndarray) -> np.ndarray: | |
if signal.size == 0: return signal | |
Q1, Q3 = np.percentile(signal, [25, 75]) | |
IQR = Q3 - Q1 | |
if IQR < 1e-9: | |
median, mad = np.median(signal), np.median(np.abs(signal - np.median(signal))) | |
return np.zeros_like(signal) if mad < 1e-9 else (signal - median) / (mad + 1e-9) | |
lower, upper = Q1 - 1.5 * IQR, Q3 + 1.5 * IQR | |
clipped = np.clip(signal, lower, upper) | |
s_min, s_max = np.min(clipped), np.max(clipped) | |
return np.zeros_like(signal) if s_max == s_min else 2.0 * (clipped - s_min) / (s_max - s_min) - 1.0 | |
def _encode(self, signal: np.ndarray) -> np.ndarray: | |
N = len(signal) | |
if N == 0: return signal.astype(np.complex128) | |
i = np.arange(N) | |
theta = 2.0 * np.pi * i / N | |
f_k, A_k = np.array([271, 341, 491]), np.array([0.033, 0.050, 0.100]) | |
phi = np.sum(A_k[:, None] * np.sin(2.0 * np.pi * f_k[:, None] * i / N), axis=0) | |
Theta = theta + phi | |
exp_iTheta = np.exp(1j * Theta) | |
g, m = signal * exp_iTheta, np.abs(signal) * exp_iTheta | |
return 0.5 * g + 0.5 * m | |
def _apply_lens(self, encoded_signal: np.ndarray, lens_type: str): | |
lens_fn = self.lens_library.get(lens_type) | |
if not lens_fn: raise ValueError(f"Lens '{lens_type}' not found.") | |
with np.errstate(all="ignore"): | |
w = lens_fn(encoded_signal) | |
phi_trajectory = self.c1 * np.angle(w) + self.c2 * np.abs(encoded_signal) | |
finite_mask = np.isfinite(phi_trajectory) | |
return phi_trajectory[finite_mask], w[finite_mask], encoded_signal[finite_mask], len(encoded_signal), len(phi_trajectory[finite_mask]) | |
# --------------------------------------------------------------- | |
# Feature preparation and UMAP embedding | |
# --------------------------------------------------------------- | |
feature_cols = [c for c in df_combined.columns if c.startswith("feature_")] | |
features = np.nan_to_num(df_combined[feature_cols].to_numpy()) | |
reducer = UMAP(n_components=3, n_neighbors=15, min_dist=0.1, random_state=42) | |
df_combined[["x", "y", "z"]] = reducer.fit_transform(features) | |
kmeans = KMeans(n_clusters=max(4, min(12, int(np.sqrt(len(df_combined))))), random_state=42, n_init=10) | |
df_combined["cluster"] = kmeans.fit_predict(features) | |
df_combined["chaos_score"] = np.log1p(df_combined.get("diag_srl_gamma", 0)) / (df_combined.get("diag_alpha_gamma", 1) + 1e-2) | |
# --------------------------------------------------------------- | |
# Core Visualization and Analysis Functions | |
# --------------------------------------------------------------- | |
# Cache for resolved audio paths and CMT data to avoid repeated computations | |
_audio_path_cache = {} | |
_cmt_data_cache = {} | |
# Advanced manifold analysis functions | |
def calculate_species_boundary(df_combined): | |
"""Calculate the geometric boundary between species using support vector machines.""" | |
from sklearn.svm import SVC | |
# Prepare data for boundary calculation | |
human_data = df_combined[df_combined['source'] == 'Human'][['x', 'y', 'z']].values | |
dog_data = df_combined[df_combined['source'] == 'Dog'][['x', 'y', 'z']].values | |
# Create binary classification data | |
X = np.vstack([human_data, dog_data]) | |
y = np.hstack([np.ones(len(human_data)), np.zeros(len(dog_data))]) | |
# Fit SVM for boundary | |
svm = SVC(kernel='rbf', probability=True) | |
svm.fit(X, y) | |
# Create boundary surface | |
x_range = np.linspace(X[:, 0].min(), X[:, 0].max(), 20) | |
y_range = np.linspace(X[:, 1].min(), X[:, 1].max(), 20) | |
z_range = np.linspace(X[:, 2].min(), X[:, 2].max(), 20) | |
xx, yy = np.meshgrid(x_range, y_range) | |
boundary_points = [] | |
for z_val in z_range: | |
grid_points = np.c_[xx.ravel(), yy.ravel(), np.full(xx.ravel().shape, z_val)] | |
probabilities = svm.predict_proba(grid_points)[:, 1] | |
# Find points near decision boundary (probability ~ 0.5) | |
boundary_mask = np.abs(probabilities - 0.5) < 0.05 | |
if np.any(boundary_mask): | |
boundary_points.extend(grid_points[boundary_mask]) | |
return np.array(boundary_points) if boundary_points else None | |
def create_enhanced_manifold_plot(df_filtered, lens_selected, color_scheme, point_size, | |
show_boundary, show_trajectories): | |
"""Create the main 3D manifold visualization with all advanced features.""" | |
# Get CMT diagnostic values for the selected lens | |
alpha_col = f"diag_alpha_{lens_selected}" | |
srl_col = f"diag_srl_{lens_selected}" | |
# Determine color values based on scheme | |
if color_scheme == "Species": | |
color_values = [1 if s == "Human" else 0 for s in df_filtered['source']] | |
colorscale = [[0, '#1f77b4'], [1, '#ff7f0e']] # Blue for Dog, Orange for Human | |
colorbar_title = "Species (Blue=Dog, Orange=Human)" | |
elif color_scheme == "Emotion": | |
unique_emotions = df_filtered['label'].unique() | |
emotion_map = {emotion: i for i, emotion in enumerate(unique_emotions)} | |
color_values = [emotion_map[label] for label in df_filtered['label']] | |
colorscale = 'Viridis' | |
colorbar_title = "Emotional State" | |
elif color_scheme == "CMT_Alpha": | |
color_values = df_filtered[alpha_col].values | |
colorscale = 'Plasma' | |
colorbar_title = f"CMT Alpha ({lens_selected})" | |
elif color_scheme == "CMT_SRL": | |
color_values = df_filtered[srl_col].values | |
colorscale = 'Turbo' | |
colorbar_title = f"SRL Complexity ({lens_selected})" | |
else: # Cluster | |
color_values = df_filtered['cluster'].values | |
colorscale = 'Plotly3' | |
colorbar_title = "Cluster ID" | |
# Create hover text with rich information | |
hover_text = [] | |
for _, row in df_filtered.iterrows(): | |
hover_info = f""" | |
<b>{row['source']}</b>: {row['label']}<br> | |
File: {row['filepath']}<br> | |
<b>CMT Diagnostics ({lens_selected}):</b><br> | |
α: {row[alpha_col]:.4f}<br> | |
SRL: {row[srl_col]:.4f}<br> | |
Coordinates: ({row['x']:.3f}, {row['y']:.3f}, {row['z']:.3f}) | |
""" | |
hover_text.append(hover_info) | |
# Create main scatter plot | |
fig = go.Figure() | |
# Add main data points | |
fig.add_trace(go.Scatter3d( | |
x=df_filtered['x'], | |
y=df_filtered['y'], | |
z=df_filtered['z'], | |
mode='markers', | |
marker=dict( | |
size=point_size, | |
color=color_values, | |
colorscale=colorscale, | |
showscale=True, | |
colorbar=dict(title=colorbar_title), | |
opacity=0.8, | |
line=dict(width=0.5, color='rgba(50,50,50,0.5)') | |
), | |
text=hover_text, | |
hovertemplate='%{text}<extra></extra>', | |
name='Communications' | |
)) | |
# Add species boundary if requested | |
if show_boundary: | |
boundary_points = calculate_species_boundary(df_filtered) | |
if boundary_points is not None and len(boundary_points) > 0: | |
fig.add_trace(go.Scatter3d( | |
x=boundary_points[:, 0], | |
y=boundary_points[:, 1], | |
z=boundary_points[:, 2], | |
mode='markers', | |
marker=dict( | |
size=2, | |
color='red', | |
opacity=0.3 | |
), | |
name='Species Boundary', | |
hovertemplate='Species Boundary<extra></extra>' | |
)) | |
# Add trajectories if requested | |
if show_trajectories: | |
# Create colorful trajectories between similar emotional states | |
emotion_colors = { | |
'angry': '#FF4444', | |
'happy': '#44FF44', | |
'sad': '#4444FF', | |
'fearful': '#FF44FF', | |
'neutral': '#FFFF44', | |
'surprised': '#44FFFF', | |
'disgusted': '#FF8844', | |
'bark': '#FF6B35', | |
'growl': '#8B4513', | |
'whine': '#9370DB', | |
'pant': '#20B2AA', | |
'speech': '#1E90FF', | |
'laugh': '#FFD700', | |
'cry': '#4169E1', | |
'shout': '#DC143C' | |
} | |
for i, emotion in enumerate(df_filtered['label'].unique()): | |
emotion_data = df_filtered[df_filtered['label'] == emotion] | |
if len(emotion_data) > 1: | |
# Get color for this emotion, fallback to cycle through colors | |
base_colors = ['#FF6B6B', '#4ECDC4', '#45B7D1', '#96CEB4', '#FFEAA7', '#DDA0DD', '#98D8C8', '#F7DC6F'] | |
emotion_color = emotion_colors.get(emotion.lower(), base_colors[i % len(base_colors)]) | |
# Create trajectories connecting points of same emotional state | |
x_coords = emotion_data['x'].values | |
y_coords = emotion_data['y'].values | |
z_coords = emotion_data['z'].values | |
# Sort by one dimension to create smoother trajectories | |
sort_indices = np.argsort(x_coords) | |
x_sorted = x_coords[sort_indices] | |
y_sorted = y_coords[sort_indices] | |
z_sorted = z_coords[sort_indices] | |
fig.add_trace(go.Scatter3d( | |
x=x_sorted, | |
y=y_sorted, | |
z=z_sorted, | |
mode='lines+markers', | |
line=dict( | |
width=4, | |
color=emotion_color, | |
dash='dash' | |
), | |
marker=dict( | |
size=3, | |
color=emotion_color, | |
opacity=0.8 | |
), | |
name=f'{emotion.title()} Path', | |
showlegend=True, | |
hovertemplate=f'<b>{emotion.title()} Communication Path</b><br>' + | |
'X: %{x:.3f}<br>Y: %{y:.3f}<br>Z: %{z:.3f}<extra></extra>', | |
opacity=0.7 | |
)) | |
# Update layout | |
fig.update_layout( | |
title={ | |
'text': "🌌 Universal Interspecies Communication Manifold<br><sub>First mathematical map of cross-species communication geometry</sub>", | |
'x': 0.5, | |
'xanchor': 'center' | |
}, | |
scene=dict( | |
xaxis_title='Manifold Dimension 1', | |
yaxis_title='Manifold Dimension 2', | |
zaxis_title='Manifold Dimension 3', | |
camera=dict( | |
eye=dict(x=1.5, y=1.5, z=1.5) | |
), | |
bgcolor='rgba(0,0,0,0)', | |
aspectmode='cube' | |
), | |
margin=dict(l=0, r=0, b=0, t=60), | |
legend=dict( | |
yanchor="top", | |
y=0.99, | |
xanchor="left", | |
x=0.01 | |
) | |
) | |
return fig | |
def create_2d_projection_plot(df_filtered, color_scheme): | |
"""Create 2D projection for easier analysis.""" | |
fig = go.Figure() | |
# Create color mapping | |
if color_scheme == "Species": | |
color_values = df_filtered['source'] | |
color_map = {'Human': '#ff7f0e', 'Dog': '#1f77b4'} | |
else: | |
color_values = df_filtered['label'] | |
unique_labels = df_filtered['label'].unique() | |
colors = ['#1f77b4', '#ff7f0e', '#2ca02c', '#d62728', '#9467bd', '#8c564b'] | |
color_map = {label: colors[i % len(colors)] for i, label in enumerate(unique_labels)} | |
for value in color_values.unique(): | |
data_subset = df_filtered[color_values == value] | |
fig.add_trace(go.Scatter( | |
x=data_subset['x'], | |
y=data_subset['y'], | |
mode='markers', | |
marker=dict( | |
size=8, | |
color=color_map.get(value, '#1f77b4'), | |
opacity=0.7 | |
), | |
name=str(value), | |
text=[f"{row['source']}: {row['label']}" for _, row in data_subset.iterrows()], | |
hovertemplate='%{text}<br>X: %{x:.3f}<br>Y: %{y:.3f}<extra></extra>' | |
)) | |
fig.update_layout( | |
title="2D Manifold Projection (X-Y Plane)", | |
xaxis_title="Manifold Dimension 1", | |
yaxis_title="Manifold Dimension 2", | |
height=400 | |
) | |
return fig | |
def create_density_heatmap(df_filtered): | |
"""Create density heatmap showing communication hotspots.""" | |
from scipy.stats import gaussian_kde | |
# Create 2D density estimation | |
x = df_filtered['x'].values | |
y = df_filtered['y'].values | |
# Create grid for density calculation | |
x_grid = np.linspace(x.min(), x.max(), 50) | |
y_grid = np.linspace(y.min(), y.max(), 50) | |
X_grid, Y_grid = np.meshgrid(x_grid, y_grid) | |
positions = np.vstack([X_grid.ravel(), Y_grid.ravel()]) | |
# Calculate density | |
values = np.vstack([x, y]) | |
kernel = gaussian_kde(values) | |
density = np.reshape(kernel(positions).T, X_grid.shape) | |
fig = go.Figure(data=go.Heatmap( | |
z=density, | |
x=x_grid, | |
y=y_grid, | |
colorscale='Viridis', | |
colorbar=dict(title="Communication Density") | |
)) | |
# Overlay actual points | |
fig.add_trace(go.Scatter( | |
x=x, y=y, | |
mode='markers', | |
marker=dict(size=4, color='white', opacity=0.6), | |
name='Actual Communications', | |
hovertemplate='X: %{x:.3f}<br>Y: %{y:.3f}<extra></extra>' | |
)) | |
fig.update_layout( | |
title="Communication Density Heatmap", | |
xaxis_title="Manifold Dimension 1", | |
yaxis_title="Manifold Dimension 2", | |
height=400 | |
) | |
return fig | |
def create_feature_distributions(df_filtered, lens_selected): | |
"""Create feature distribution plots comparing species.""" | |
alpha_col = f"diag_alpha_{lens_selected}" | |
srl_col = f"diag_srl_{lens_selected}" | |
fig = make_subplots( | |
rows=2, cols=2, | |
subplot_titles=[ | |
f'CMT Alpha Distribution ({lens_selected})', | |
f'SRL Distribution ({lens_selected})', | |
'Manifold X Coordinate', | |
'Manifold Y Coordinate' | |
] | |
) | |
# Alpha distribution | |
for species in ['Human', 'Dog']: | |
data = df_filtered[df_filtered['source'] == species][alpha_col] | |
fig.add_trace( | |
go.Histogram(x=data, name=f'{species} Alpha', opacity=0.7, nbinsx=20), | |
row=1, col=1 | |
) | |
# SRL distribution | |
for species in ['Human', 'Dog']: | |
data = df_filtered[df_filtered['source'] == species][srl_col] | |
fig.add_trace( | |
go.Histogram(x=data, name=f'{species} SRL', opacity=0.7, nbinsx=20), | |
row=1, col=2 | |
) | |
# X coordinate distribution | |
for species in ['Human', 'Dog']: | |
data = df_filtered[df_filtered['source'] == species]['x'] | |
fig.add_trace( | |
go.Histogram(x=data, name=f'{species} X', opacity=0.7, nbinsx=20), | |
row=2, col=1 | |
) | |
# Y coordinate distribution | |
for species in ['Human', 'Dog']: | |
data = df_filtered[df_filtered['source'] == species]['y'] | |
fig.add_trace( | |
go.Histogram(x=data, name=f'{species} Y', opacity=0.7, nbinsx=20), | |
row=2, col=2 | |
) | |
fig.update_layout( | |
height=300, | |
title_text="Feature Distributions by Species", | |
showlegend=True | |
) | |
return fig | |
def create_correlation_matrix(df_filtered, lens_selected): | |
"""Create correlation matrix of CMT features.""" | |
# Select relevant columns for correlation | |
feature_cols = ['x', 'y', 'z'] + [col for col in df_filtered.columns if col.startswith('feature_')] | |
cmt_cols = [f"diag_alpha_{lens_selected}", f"diag_srl_{lens_selected}"] | |
all_cols = feature_cols + cmt_cols | |
available_cols = [col for col in all_cols if col in df_filtered.columns] | |
if len(available_cols) < 2: | |
# Fallback with basic columns | |
available_cols = ['x', 'y', 'z'] | |
# Calculate correlation matrix | |
corr_matrix = df_filtered[available_cols].corr() | |
fig = go.Figure(data=go.Heatmap( | |
z=corr_matrix.values, | |
x=corr_matrix.columns, | |
y=corr_matrix.columns, | |
colorscale='RdBu', | |
zmid=0, | |
colorbar=dict(title="Correlation"), | |
text=np.round(corr_matrix.values, 2), | |
texttemplate="%{text}", | |
textfont={"size": 10} | |
)) | |
fig.update_layout( | |
title="Cross-Species Feature Correlations", | |
height=300, | |
xaxis_title="Features", | |
yaxis_title="Features" | |
) | |
return fig | |
def calculate_statistics(df_filtered, lens_selected): | |
"""Calculate comprehensive statistics for the filtered data.""" | |
alpha_col = f"diag_alpha_{lens_selected}" | |
srl_col = f"diag_srl_{lens_selected}" | |
stats = {} | |
# Overall statistics | |
stats['total_points'] = len(df_filtered) | |
stats['human_count'] = len(df_filtered[df_filtered['source'] == 'Human']) | |
stats['dog_count'] = len(df_filtered[df_filtered['source'] == 'Dog']) | |
# CMT statistics by species | |
for species in ['Human', 'Dog']: | |
species_data = df_filtered[df_filtered['source'] == species] | |
if len(species_data) > 0: | |
stats[f'{species.lower()}_alpha_mean'] = species_data[alpha_col].mean() | |
stats[f'{species.lower()}_alpha_std'] = species_data[alpha_col].std() | |
stats[f'{species.lower()}_srl_mean'] = species_data[srl_col].mean() | |
stats[f'{species.lower()}_srl_std'] = species_data[srl_col].std() | |
# Geometric separation | |
if stats['human_count'] > 0 and stats['dog_count'] > 0: | |
human_center = df_filtered[df_filtered['source'] == 'Human'][['x', 'y', 'z']].mean() | |
dog_center = df_filtered[df_filtered['source'] == 'Dog'][['x', 'y', 'z']].mean() | |
stats['geometric_separation'] = np.sqrt(((human_center - dog_center) ** 2).sum()) | |
return stats | |
def update_manifold_visualization(species_selection, emotion_selection, lens_selection, | |
alpha_min, alpha_max, srl_min, srl_max, feature_min, feature_max, | |
point_size, show_boundary, show_trajectories, color_scheme): | |
"""Main update function for the manifold visualization.""" | |
# Filter data based on selections | |
df_filtered = df_combined.copy() | |
# Species filter | |
if species_selection: | |
df_filtered = df_filtered[df_filtered['source'].isin(species_selection)] | |
# Emotion filter | |
if emotion_selection: | |
df_filtered = df_filtered[df_filtered['label'].isin(emotion_selection)] | |
# CMT diagnostic filters | |
alpha_col = f"diag_alpha_{lens_selection}" | |
srl_col = f"diag_srl_{lens_selection}" | |
if alpha_col in df_filtered.columns: | |
df_filtered = df_filtered[ | |
(df_filtered[alpha_col] >= alpha_min) & | |
(df_filtered[alpha_col] <= alpha_max) | |
] | |
if srl_col in df_filtered.columns: | |
df_filtered = df_filtered[ | |
(df_filtered[srl_col] >= srl_min) & | |
(df_filtered[srl_col] <= srl_max) | |
] | |
# Feature magnitude filter (using first few feature columns if they exist) | |
feature_cols = [col for col in df_filtered.columns if col.startswith('feature_')] | |
if feature_cols: | |
feature_magnitudes = np.sqrt(df_filtered[feature_cols[:3]].pow(2).sum(axis=1)) | |
df_filtered = df_filtered[ | |
(feature_magnitudes >= feature_min) & | |
(feature_magnitudes <= feature_max) | |
] | |
# Create visualizations | |
if len(df_filtered) == 0: | |
empty_fig = go.Figure().add_annotation( | |
text="No data points match the current filters", | |
xref="paper", yref="paper", x=0.5, y=0.5, showarrow=False | |
) | |
return (empty_fig, empty_fig, empty_fig, empty_fig, empty_fig, | |
"No data available", "No data available", "No data available") | |
# Main manifold plot | |
manifold_fig = create_enhanced_manifold_plot( | |
df_filtered, lens_selection, color_scheme, point_size, | |
show_boundary, show_trajectories | |
) | |
# Secondary plots | |
projection_fig = create_2d_projection_plot(df_filtered, color_scheme) | |
density_fig = create_density_heatmap(df_filtered) | |
distributions_fig = create_feature_distributions(df_filtered, lens_selection) | |
correlation_fig = create_correlation_matrix(df_filtered, lens_selection) | |
# Statistics | |
stats = calculate_statistics(df_filtered, lens_selection) | |
# Format statistics HTML | |
species_stats_html = f""" | |
<h4>📊 Data Overview</h4> | |
<p><b>Total Points:</b> {stats['total_points']}</p> | |
<p><b>Human:</b> {stats['human_count']} | <b>Dog:</b> {stats['dog_count']}</p> | |
<p><b>Ratio:</b> {stats['human_count']/(stats['dog_count']+1):.2f}:1</p> | |
""" | |
boundary_stats_html = f""" | |
<h4>🔬 Geometric Analysis</h4> | |
<p><b>Lens:</b> {lens_selection.title()}</p> | |
{"<p><b>Separation:</b> {:.3f}</p>".format(stats.get('geometric_separation', 0)) if 'geometric_separation' in stats else ""} | |
<p><b>Dimensions:</b> 3D UMAP</p> | |
""" | |
similarity_html = f""" | |
<h4>🔗 Species Comparison</h4> | |
<p><b>Human α:</b> {stats.get('human_alpha_mean', 0):.3f} ± {stats.get('human_alpha_std', 0):.3f}</p> | |
<p><b>Dog α:</b> {stats.get('dog_alpha_mean', 0):.3f} ± {stats.get('dog_alpha_std', 0):.3f}</p> | |
<p><b>Overlap Index:</b> {1 / (1 + stats.get('geometric_separation', 1)):.3f}</p> | |
""" | |
return (manifold_fig, projection_fig, density_fig, distributions_fig, correlation_fig, | |
species_stats_html, boundary_stats_html, similarity_html) | |
def resolve_audio_path(row: pd.Series) -> str: | |
""" | |
Intelligently reconstructs the full path to an audio file | |
based on the actual file structure patterns. | |
Dog files: combined/{label}/{filename} e.g., combined/bark/bark_bark (1).wav | |
Human files: human/Actor_XX/{filename} e.g., human/Actor_01/03-01-01-01-01-01-01.wav | |
""" | |
basename = str(row.get("filepath", "")) | |
source = row.get("source", "") | |
label = row.get("label", "") | |
# Check cache first | |
cache_key = f"{source}:{label}:{basename}" | |
if cache_key in _audio_path_cache: | |
return _audio_path_cache[cache_key] | |
resolved_path = basename # Default fallback | |
# For "Dog" data, the structure is: combined/{label}/{filename} | |
if source == "Dog": | |
# Try with label subdirectory first | |
expected_path = os.path.join(DOG_AUDIO_BASE_PATH, label, basename) | |
if os.path.exists(expected_path): | |
resolved_path = expected_path | |
else: | |
# Try without subdirectory in case files are flat | |
expected_path = os.path.join(DOG_AUDIO_BASE_PATH, basename) | |
if os.path.exists(expected_path): | |
resolved_path = expected_path | |
# For "Human" data, search within all "Actor_XX" subfolders | |
elif source == "Human": | |
if os.path.isdir(HUMAN_AUDIO_BASE_PATH): | |
for actor_folder in os.listdir(HUMAN_AUDIO_BASE_PATH): | |
if actor_folder.startswith("Actor_"): | |
expected_path = os.path.join(HUMAN_AUDIO_BASE_PATH, actor_folder, basename) | |
if os.path.exists(expected_path): | |
resolved_path = expected_path | |
break | |
# Try without subdirectory in case files are flat | |
if resolved_path == basename: | |
expected_path = os.path.join(HUMAN_AUDIO_BASE_PATH, basename) | |
if os.path.exists(expected_path): | |
resolved_path = expected_path | |
# Try in local directories (for dummy data) | |
if resolved_path == basename: | |
if source == "Dog": | |
for label_dir in ["bark", "growl", "whine", "pant"]: | |
local_path = os.path.join(DOG_DIR, label_dir, basename) | |
if os.path.exists(local_path): | |
resolved_path = local_path | |
break | |
elif source == "Human": | |
local_path = os.path.join(HUMAN_DIR, "Actor_01", basename) | |
if os.path.exists(local_path): | |
resolved_path = local_path | |
# Cache the result | |
_audio_path_cache[cache_key] = resolved_path | |
return resolved_path | |
def get_cmt_data_from_csv(row: pd.Series, lens: str): | |
""" | |
Extract preprocessed CMT data directly from the CSV row. | |
No audio processing needed - everything is already computed! | |
""" | |
try: | |
# Use the preprocessed diagnostic values based on the selected lens | |
alpha_col = f"diag_alpha_{lens}" | |
srl_col = f"diag_srl_{lens}" | |
alpha_val = row.get(alpha_col, 0.0) | |
srl_val = row.get(srl_col, 0.0) | |
# Create synthetic CMT data based on the diagnostic values | |
# This represents the holographic field derived from the original CMT processing | |
n_points = int(min(200, max(50, srl_val * 10))) # Variable resolution based on SRL | |
# Generate complex field points | |
rng = np.random.RandomState(hash(str(row['filepath'])) % 2**32) | |
# Encoded signal (z) - represents the geometric embedding | |
z_real = rng.normal(0, alpha_val, n_points) | |
z_imag = rng.normal(0, alpha_val * 0.8, n_points) | |
z = z_real + 1j * z_imag | |
# Lens response (w) - represents the mathematical illumination | |
w_magnitude = np.abs(z) * srl_val | |
w_phase = np.angle(z) + rng.normal(0, 0.1, n_points) | |
w = w_magnitude * np.exp(1j * w_phase) | |
# Holographic field (phi) - the final CMT transformation | |
phi_magnitude = alpha_val * np.abs(w) | |
phi_phase = np.angle(w) * srl_val | |
phi = phi_magnitude * np.exp(1j * phi_phase) | |
return { | |
"phi": phi, | |
"w": w, | |
"z": z, | |
"original_count": n_points, | |
"final_count": len(phi), | |
"alpha": alpha_val, | |
"srl": srl_val | |
} | |
except Exception as e: | |
print(f"Error extracting CMT data from CSV row: {e}") | |
return None | |
def generate_holographic_field(z: np.ndarray, phi: np.ndarray, resolution: int): | |
if z is None or phi is None or len(z) < 4: return None | |
points = np.vstack([np.real(z), np.imag(z)]).T | |
grid_x, grid_y = np.mgrid[ | |
np.min(points[:,0]):np.max(points[:,0]):complex(0, resolution), | |
np.min(points[:,1]):np.max(points[:,1]):complex(0, resolution) | |
] | |
grid_phi_real = griddata(points, np.real(phi), (grid_x, grid_y), method='cubic') | |
grid_phi_imag = griddata(points, np.imag(phi), (grid_x, grid_y), method='cubic') | |
grid_phi = np.nan_to_num(grid_phi_real + 1j * grid_phi_imag) | |
return grid_x, grid_y, grid_phi | |
def create_holography_plot(z, phi, resolution, wavelength): | |
field_data = generate_holographic_field(z, phi, resolution) | |
if field_data is None: return go.Figure(layout={"title": "Not enough data for holography"}) | |
grid_x, grid_y, grid_phi = field_data | |
mag_phi = np.abs(grid_phi) | |
phase_phi = np.angle(grid_phi) | |
# --- Wavelength to Colorscale Mapping --- | |
def wavelength_to_rgb(wl): | |
# Simple approximation to map visible spectrum to RGB | |
if 380 <= wl < 440: return f'rgb({-(wl - 440) / (440 - 380) * 255}, 0, 255)' # Violet | |
elif 440 <= wl < 495: return f'rgb(0, {(wl - 440) / (495 - 440) * 255}, 255)' # Blue | |
elif 495 <= wl < 570: return f'rgb(0, 255, {-(wl - 570) / (570 - 495) * 255})' # Green | |
elif 570 <= wl < 590: return f'rgb({(wl - 570) / (590 - 570) * 255}, 255, 0)' # Yellow | |
elif 590 <= wl < 620: return f'rgb(255, {-(wl - 620) / (620 - 590) * 255}, 0)' # Orange | |
elif 620 <= wl <= 750: return f'rgb(255, 0, 0)' # Red | |
return 'rgb(255,255,255)' | |
mid_color = wavelength_to_rgb(wavelength) | |
custom_colorscale = [[0, 'rgb(20,0,40)'], [0.5, mid_color], [1, 'rgb(255,255,255)']] | |
fig = go.Figure() | |
# 1. The Holographic Surface (Topology + Phase Interference) | |
fig.add_trace(go.Surface( | |
x=grid_x, y=grid_y, z=mag_phi, | |
surfacecolor=phase_phi, | |
colorscale=custom_colorscale, | |
cmin=-np.pi, cmax=np.pi, | |
colorbar=dict(title='Φ Phase'), | |
name='Holographic Field', | |
contours_z=dict(show=True, usecolormap=True, highlightcolor="limegreen", project_z=True, highlightwidth=10) | |
)) | |
# 2. The original data points projected onto the surface | |
fig.add_trace(go.Scatter3d( | |
x=np.real(z), y=np.imag(z), z=np.abs(phi) + 0.05, # slight offset | |
mode='markers', | |
marker=dict(size=3, color='black', symbol='x'), | |
name='Data Points' | |
)) | |
# 3. The Vector Flow Field (using cones for direction) | |
grad_y, grad_x = np.gradient(mag_phi) | |
fig.add_trace(go.Cone( | |
x=grid_x.flatten(), y=grid_y.flatten(), z=mag_phi.flatten(), | |
u=-grad_x.flatten(), v=-grad_y.flatten(), w=np.full_like(mag_phi.flatten(), -0.1), | |
sizemode="absolute", sizeref=0.1, | |
anchor="tip", | |
colorscale='Greys', | |
showscale=False, | |
name='Vector Flow' | |
)) | |
fig.update_layout( | |
title="Interactive Holographic Field Reconstruction", | |
scene=dict( | |
xaxis_title="Re(z) - Encoded Signal", | |
yaxis_title="Im(z) - Encoded Signal", | |
zaxis_title="|Φ| - Field Magnitude" | |
), | |
margin=dict(l=0, r=0, b=0, t=40) | |
) | |
return fig | |
def create_diagnostic_plots(z, w): | |
"""Creates a 2D plot showing the Aperture (z) and Lens Response (w).""" | |
if z is None or w is None: | |
return go.Figure(layout={"title": "Not enough data for diagnostic plots"}) | |
fig = go.Figure() | |
# Aperture (Encoded Signal) | |
fig.add_trace(go.Scatter( | |
x=np.real(z), y=np.imag(z), mode='markers', | |
marker=dict(size=5, color='blue', opacity=0.6), | |
name='Aperture (z)' | |
)) | |
# Lens Response | |
fig.add_trace(go.Scatter( | |
x=np.real(w), y=np.imag(w), mode='markers', | |
marker=dict(size=5, color='red', opacity=0.6, symbol='x'), | |
name='Lens Response (w)' | |
)) | |
fig.update_layout( | |
title="Diagnostic View: Aperture and Lens Response", | |
xaxis_title="Real Part", | |
yaxis_title="Imaginary Part", | |
legend_title="Signal Stage", | |
margin=dict(l=20, r=20, t=60, b=20) | |
) | |
return fig | |
def create_dual_holography_plot(z1, phi1, z2, phi2, resolution, wavelength, title1="Primary", title2="Comparison"): | |
"""Creates side-by-side holographic visualizations for comparison.""" | |
field_data1 = generate_holographic_field(z1, phi1, resolution) | |
field_data2 = generate_holographic_field(z2, phi2, resolution) | |
if field_data1 is None or field_data2 is None: | |
return go.Figure(layout={"title": "Insufficient data for dual holography"}) | |
grid_x1, grid_y1, grid_phi1 = field_data1 | |
grid_x2, grid_y2, grid_phi2 = field_data2 | |
mag_phi1, phase_phi1 = np.abs(grid_phi1), np.angle(grid_phi1) | |
mag_phi2, phase_phi2 = np.abs(grid_phi2), np.angle(grid_phi2) | |
# Wavelength to colorscale mapping | |
def wavelength_to_rgb(wl): | |
if 380 <= wl < 440: return f'rgb({int(-(wl - 440) / (440 - 380) * 255)}, 0, 255)' | |
elif 440 <= wl < 495: return f'rgb(0, {int((wl - 440) / (495 - 440) * 255)}, 255)' | |
elif 495 <= wl < 570: return f'rgb(0, 255, {int(-(wl - 570) / (570 - 495) * 255)})' | |
elif 570 <= wl < 590: return f'rgb({int((wl - 570) / (590 - 570) * 255)}, 255, 0)' | |
elif 590 <= wl < 620: return f'rgb(255, {int(-(wl - 620) / (620 - 590) * 255)}, 0)' | |
elif 620 <= wl <= 750: return 'rgb(255, 0, 0)' | |
return 'rgb(255,255,255)' | |
mid_color = wavelength_to_rgb(wavelength) | |
custom_colorscale = [[0, 'rgb(20,0,40)'], [0.5, mid_color], [1, 'rgb(255,255,255)']] | |
fig = make_subplots( | |
rows=1, cols=2, | |
specs=[[{'type': 'scene'}, {'type': 'scene'}]], | |
subplot_titles=[title1, title2] | |
) | |
# Left plot (Primary) | |
fig.add_trace(go.Surface( | |
x=grid_x1, y=grid_y1, z=mag_phi1, | |
surfacecolor=phase_phi1, | |
colorscale=custom_colorscale, | |
cmin=-np.pi, cmax=np.pi, | |
showscale=False, | |
name=title1, | |
contours_z=dict(show=True, usecolormap=True, highlightcolor="limegreen", project_z=True) | |
), row=1, col=1) | |
# Right plot (Comparison) | |
fig.add_trace(go.Surface( | |
x=grid_x2, y=grid_y2, z=mag_phi2, | |
surfacecolor=phase_phi2, | |
colorscale=custom_colorscale, | |
cmin=-np.pi, cmax=np.pi, | |
showscale=False, | |
name=title2, | |
contours_z=dict(show=True, usecolormap=True, highlightcolor="limegreen", project_z=True) | |
), row=1, col=2) | |
# Add data points | |
if z1 is not None and phi1 is not None: | |
fig.add_trace(go.Scatter3d( | |
x=np.real(z1), y=np.imag(z1), z=np.abs(phi1) + 0.05, | |
mode='markers', marker=dict(size=3, color='black', symbol='x'), | |
name=f'{title1} Points', showlegend=False | |
), row=1, col=1) | |
if z2 is not None and phi2 is not None: | |
fig.add_trace(go.Scatter3d( | |
x=np.real(z2), y=np.imag(z2), z=np.abs(phi2) + 0.05, | |
mode='markers', marker=dict(size=3, color='black', symbol='x'), | |
name=f'{title2} Points', showlegend=False | |
), row=1, col=2) | |
fig.update_layout( | |
title="Side-by-Side Cross-Species Holographic Comparison", | |
scene=dict( | |
xaxis_title="Re(z)", yaxis_title="Im(z)", zaxis_title="|Φ|", | |
camera=dict(eye=dict(x=1.5, y=1.5, z=1.5)) | |
), | |
scene2=dict( | |
xaxis_title="Re(z)", yaxis_title="Im(z)", zaxis_title="|Φ|", | |
camera=dict(eye=dict(x=1.5, y=1.5, z=1.5)) | |
), | |
margin=dict(l=0, r=0, b=0, t=60), | |
height=600 | |
) | |
return fig | |
def create_dual_diagnostic_plots(z1, w1, z2, w2, title1="Primary", title2="Comparison"): | |
"""Creates side-by-side diagnostic plots for cross-species comparison.""" | |
fig = make_subplots( | |
rows=1, cols=2, | |
subplot_titles=[f"{title1}: Aperture & Lens Response", f"{title2}: Aperture & Lens Response"] | |
) | |
if z1 is not None and w1 is not None: | |
# Primary aperture and response | |
fig.add_trace(go.Scatter( | |
x=np.real(z1), y=np.imag(z1), mode='markers', | |
marker=dict(size=5, color='blue', opacity=0.6), | |
name=f'{title1} Aperture', showlegend=True | |
), row=1, col=1) | |
fig.add_trace(go.Scatter( | |
x=np.real(w1), y=np.imag(w1), mode='markers', | |
marker=dict(size=5, color='red', opacity=0.6, symbol='x'), | |
name=f'{title1} Response', showlegend=True | |
), row=1, col=1) | |
if z2 is not None and w2 is not None: | |
# Comparison aperture and response | |
fig.add_trace(go.Scatter( | |
x=np.real(z2), y=np.imag(z2), mode='markers', | |
marker=dict(size=5, color='darkblue', opacity=0.6), | |
name=f'{title2} Aperture', showlegend=True | |
), row=1, col=2) | |
fig.add_trace(go.Scatter( | |
x=np.real(w2), y=np.imag(w2), mode='markers', | |
marker=dict(size=5, color='darkred', opacity=0.6, symbol='x'), | |
name=f'{title2} Response', showlegend=True | |
), row=1, col=2) | |
fig.update_layout( | |
title="Cross-Species Diagnostic Comparison", | |
height=400, | |
margin=dict(l=20, r=20, t=60, b=20) | |
) | |
fig.update_xaxes(title_text="Real Part", row=1, col=1) | |
fig.update_yaxes(title_text="Imaginary Part", row=1, col=1) | |
fig.update_xaxes(title_text="Real Part", row=1, col=2) | |
fig.update_yaxes(title_text="Imaginary Part", row=1, col=2) | |
return fig | |
def create_entropy_geometry_plot(phi: np.ndarray): | |
"""Creates a plot showing magnitude/phase distributions and their entropy.""" | |
if phi is None or len(phi) < 2: | |
return go.Figure(layout={"title": "Not enough data for entropy analysis"}) | |
magnitudes = np.abs(phi) | |
phases = np.angle(phi) | |
# Calculate entropy | |
mag_hist, _ = np.histogram(magnitudes, bins='auto', density=True) | |
phase_hist, _ = np.histogram(phases, bins='auto', density=True) | |
mag_entropy = shannon_entropy(mag_hist) | |
phase_entropy = shannon_entropy(phase_hist) | |
fig = make_subplots(rows=1, cols=2, subplot_titles=( | |
f"Magnitude Distribution (Entropy: {mag_entropy:.3f})", | |
f"Phase Distribution (Entropy: {phase_entropy:.3f})" | |
)) | |
fig.add_trace(go.Histogram(x=magnitudes, name='Magnitude', nbinsx=50), row=1, col=1) | |
fig.add_trace(go.Histogram(x=phases, name='Phase', nbinsx=50), row=1, col=2) | |
fig.update_layout( | |
title_text="Informational-Entropy Geometry", | |
showlegend=False, | |
bargap=0.1, | |
margin=dict(l=20, r=20, t=60, b=20) | |
) | |
fig.update_xaxes(title_text="|Φ|", row=1, col=1) | |
fig.update_yaxes(title_text="Count", row=1, col=1) | |
fig.update_xaxes(title_text="angle(Φ)", row=1, col=2) | |
fig.update_yaxes(title_text="Count", row=1, col=2) | |
return fig | |
# --------------------------------------------------------------- | |
# Gradio UI | |
# --------------------------------------------------------------- | |
with gr.Blocks(theme=gr.themes.Soft(primary_hue="teal", secondary_hue="cyan")) as demo: | |
gr.Markdown("# Exhaustive CMT Explorer for Interspecies Communication v3.2") | |
file_choices = df_combined["filepath"].astype(str).tolist() | |
default_primary = file_choices[0] if file_choices else "" | |
with gr.Tabs(): | |
with gr.TabItem("🌌 Universal Manifold Explorer"): | |
gr.Markdown(""" | |
# 🎯 **First Universal Interspecies Communication Map** | |
*Discover the hidden mathematical geometry underlying human and dog communication* | |
""") | |
with gr.Row(): | |
with gr.Column(scale=1): | |
gr.Markdown("### 🔬 **Analysis Controls**") | |
# Species filtering | |
species_filter = gr.CheckboxGroup( | |
label="Species Selection", | |
choices=["Human", "Dog"], | |
value=["Human", "Dog"], | |
info="Select which species to display" | |
) | |
# Emotional state filtering | |
emotion_filter = gr.CheckboxGroup( | |
label="Emotional States", | |
choices=list(df_combined['label'].unique()), | |
value=list(df_combined['label'].unique()), | |
info="Filter by emotional expression" | |
) | |
# CMT Lens selection for coloring | |
lens_selector = gr.Dropdown( | |
label="Mathematical Lens View", | |
choices=["gamma", "zeta", "airy", "bessel"], | |
value="gamma", | |
info="Choose which mathematical lens to use for analysis" | |
) | |
# Advanced filtering sliders | |
with gr.Accordion("🎛️ Advanced CMT Filters", open=False): | |
gr.Markdown("**CMT Alpha Range (Geometric Consistency)**") | |
with gr.Row(): | |
alpha_min = gr.Slider( | |
label="Alpha Min", minimum=0, maximum=1, value=0, step=0.01 | |
) | |
alpha_max = gr.Slider( | |
label="Alpha Max", minimum=0, maximum=1, value=1, step=0.01 | |
) | |
gr.Markdown("**SRL Range (Complexity Level)**") | |
with gr.Row(): | |
srl_min = gr.Slider( | |
label="SRL Min", minimum=0, maximum=100, value=0, step=1 | |
) | |
srl_max = gr.Slider( | |
label="SRL Max", minimum=0, maximum=100, value=100, step=1 | |
) | |
gr.Markdown("**Feature Magnitude Range**") | |
with gr.Row(): | |
feature_min = gr.Slider( | |
label="Feature Min", minimum=-3, maximum=3, value=-3, step=0.1 | |
) | |
feature_max = gr.Slider( | |
label="Feature Max", minimum=-3, maximum=3, value=3, step=0.1 | |
) | |
# Visualization options | |
with gr.Accordion("🎨 Visualization Options", open=True): | |
point_size = gr.Slider( | |
label="Point Size", | |
minimum=2, maximum=15, value=6, step=1 | |
) | |
show_species_boundary = gr.Checkbox( | |
label="Show Species Boundary", | |
value=True, | |
info="Display geometric boundary between species" | |
) | |
show_trajectories = gr.Checkbox( | |
label="Show Communication Trajectories", | |
value=False, | |
info="Display colorful paths connecting similar emotional expressions across species" | |
) | |
color_scheme = gr.Dropdown( | |
label="Color Scheme", | |
choices=["Species", "Emotion", "CMT_Alpha", "CMT_SRL", "Cluster"], | |
value="Species", | |
info="Choose coloring strategy" | |
) | |
# Real-time analysis | |
with gr.Accordion("🔍 Real-Time Analysis", open=False): | |
analysis_button = gr.Button("🔬 Analyze Selected Region", variant="primary") | |
selected_info = gr.HTML( | |
label="Selection Analysis", | |
value="<i>Select points on the manifold for detailed analysis</i>" | |
) | |
with gr.Column(scale=3): | |
# Main 3D manifold plot | |
manifold_plot = gr.Plot( | |
label="Universal Communication Manifold" | |
) | |
# Statistics panel below the plot | |
with gr.Row(): | |
with gr.Column(): | |
species_stats = gr.HTML( | |
label="Species Statistics", | |
value="" | |
) | |
with gr.Column(): | |
boundary_stats = gr.HTML( | |
label="Boundary Analysis", | |
value="" | |
) | |
with gr.Column(): | |
similarity_stats = gr.HTML( | |
label="Cross-Species Similarity", | |
value="" | |
) | |
# Secondary analysis views | |
with gr.Row(): | |
with gr.Column(): | |
# 2D projection plot | |
projection_2d = gr.Plot( | |
label="2D Projection View" | |
) | |
with gr.Column(): | |
# Density heatmap | |
density_plot = gr.Plot( | |
label="Communication Density Map" | |
) | |
# Bottom analysis panel | |
with gr.Row(): | |
with gr.Column(): | |
# Feature distribution plots | |
feature_distributions = gr.Plot( | |
label="CMT Feature Distributions" | |
) | |
with gr.Column(): | |
# Correlation matrix | |
correlation_matrix = gr.Plot( | |
label="Cross-Species Feature Correlations" | |
) | |
# Wire up all the interactive components | |
manifold_inputs = [ | |
species_filter, emotion_filter, lens_selector, | |
alpha_min, alpha_max, srl_min, srl_max, feature_min, feature_max, | |
point_size, show_species_boundary, show_trajectories, color_scheme | |
] | |
manifold_outputs = [ | |
manifold_plot, projection_2d, density_plot, | |
feature_distributions, correlation_matrix, | |
species_stats, boundary_stats, similarity_stats | |
] | |
# Set up event handlers for real-time updates | |
for component in manifold_inputs: | |
component.change( | |
update_manifold_visualization, | |
inputs=manifold_inputs, | |
outputs=manifold_outputs | |
) | |
# Initialize the plots with default values | |
demo.load( | |
lambda: update_manifold_visualization( | |
["Human", "Dog"], # species_selection | |
list(df_combined['label'].unique()), # emotion_selection | |
"gamma", # lens_selection | |
0, # alpha_min | |
1, # alpha_max | |
0, # srl_min | |
100, # srl_max | |
-3, # feature_min | |
3, # feature_max | |
6, # point_size | |
True, # show_boundary | |
False, # show_trajectories | |
"Species" # color_scheme | |
), | |
outputs=manifold_outputs | |
) | |
with gr.TabItem("Interactive Holography"): | |
with gr.Row(): | |
with gr.Column(scale=1): | |
gr.Markdown("### Cross-Species Holography Controls") | |
# Species selection and automatic pairing | |
species_dropdown = gr.Dropdown( | |
label="Select Species", | |
choices=["Dog", "Human"], | |
value="Dog" | |
) | |
# Primary file selection (filtered by species) | |
dog_files = df_combined[df_combined["source"] == "Dog"]["filepath"].astype(str).tolist() | |
human_files = df_combined[df_combined["source"] == "Human"]["filepath"].astype(str).tolist() | |
primary_dropdown = gr.Dropdown( | |
label="Primary Audio File", | |
choices=dog_files, | |
value=dog_files[0] if dog_files else None | |
) | |
# Automatically found neighbor (from opposite species) | |
neighbor_dropdown = gr.Dropdown( | |
label="Auto-Found Cross-Species Neighbor", | |
choices=human_files, | |
value=human_files[0] if human_files else None, | |
interactive=True # Allow manual override | |
) | |
holo_lens_dropdown = gr.Dropdown(label="CMT Lens", choices=["gamma", "zeta", "airy", "bessel"], value="gamma") | |
holo_resolution_slider = gr.Slider(label="Field Resolution", minimum=20, maximum=100, step=5, value=40) | |
holo_wavelength_slider = gr.Slider(label="Illumination Wavelength (nm)", minimum=380, maximum=750, step=5, value=550) | |
# Information panels | |
primary_info_html = gr.HTML(label="Primary Audio Info") | |
neighbor_info_html = gr.HTML(label="Neighbor Audio Info") | |
# Audio players | |
primary_audio_out = gr.Audio(label="Primary Audio") | |
neighbor_audio_out = gr.Audio(label="Neighbor Audio") | |
with gr.Column(scale=2): | |
dual_holography_plot = gr.Plot(label="Side-by-Side Holographic Comparison") | |
dual_diagnostic_plot = gr.Plot(label="Cross-Species Diagnostic Comparison") | |
def update_file_choices(species): | |
"""Update the primary file dropdown based on selected species.""" | |
species_files = df_combined[df_combined["source"] == species]["filepath"].astype(str).tolist() | |
return species_files | |
def update_cross_species_view(species, primary_file, neighbor_file, lens, resolution, wavelength): | |
if not primary_file: | |
empty_fig = go.Figure(layout={"title": "Please select a primary file."}) | |
return empty_fig, empty_fig, "", "", None, None | |
# Get primary row | |
primary_row = df_combined[ | |
(df_combined["filepath"] == primary_file) & | |
(df_combined["source"] == species) | |
].iloc[0] if len(df_combined[ | |
(df_combined["filepath"] == primary_file) & | |
(df_combined["source"] == species) | |
]) > 0 else None | |
if primary_row is None: | |
empty_fig = go.Figure(layout={"title": "Primary file not found."}) | |
return empty_fig, empty_fig, "", "", None, None, [] | |
# Find cross-species neighbor if not manually selected | |
if not neighbor_file: | |
neighbor_row = find_nearest_cross_species_neighbor(primary_row, df_combined) | |
if neighbor_row is not None: | |
neighbor_file = neighbor_row['filepath'] | |
else: | |
# Get manually selected neighbor | |
opposite_species = 'Human' if species == 'Dog' else 'Dog' | |
neighbor_row = df_combined[ | |
(df_combined["filepath"] == neighbor_file) & | |
(df_combined["source"] == opposite_species) | |
].iloc[0] if len(df_combined[ | |
(df_combined["filepath"] == neighbor_file) & | |
(df_combined["source"] == opposite_species) | |
]) > 0 else None | |
# Get CMT data directly from CSV (no audio processing needed!) | |
print(f"📊 Using preprocessed CMT data for: {primary_row['filepath']} ({lens} lens)") | |
primary_cmt = get_cmt_data_from_csv(primary_row, lens) | |
neighbor_cmt = None | |
if neighbor_row is not None: | |
print(f"📊 Using preprocessed CMT data for: {neighbor_row['filepath']} ({lens} lens)") | |
neighbor_cmt = get_cmt_data_from_csv(neighbor_row, lens) | |
# Get audio file paths only for playback | |
primary_fp = resolve_audio_path(primary_row) | |
neighbor_fp = resolve_audio_path(neighbor_row) if neighbor_row is not None else None | |
# Create visualizations | |
if primary_cmt and neighbor_cmt: | |
primary_title = f"{species}: {primary_row.get('label', 'Unknown')}" | |
neighbor_title = f"{neighbor_row['source']}: {neighbor_row.get('label', 'Unknown')}" | |
dual_holo_fig = create_dual_holography_plot( | |
primary_cmt["z"], primary_cmt["phi"], | |
neighbor_cmt["z"], neighbor_cmt["phi"], | |
resolution, wavelength, primary_title, neighbor_title | |
) | |
dual_diag_fig = create_dual_diagnostic_plots( | |
primary_cmt["z"], primary_cmt["w"], | |
neighbor_cmt["z"], neighbor_cmt["w"], | |
primary_title, neighbor_title | |
) | |
else: | |
dual_holo_fig = go.Figure(layout={"title": "Error processing audio files"}) | |
dual_diag_fig = go.Figure(layout={"title": "Error processing audio files"}) | |
# Build info strings with CMT diagnostic values | |
primary_info = f""" | |
<b>Primary:</b> {primary_row['filepath']}<br> | |
<b>Species:</b> {primary_row['source']}<br> | |
<b>Label:</b> {primary_row.get('label', 'N/A')}<br> | |
<b>CMT α-{lens}:</b> {primary_cmt['alpha']:.4f}<br> | |
<b>CMT SRL-{lens}:</b> {primary_cmt['srl']:.4f}<br> | |
<b>Field Points:</b> {primary_cmt['final_count'] if primary_cmt else 0} | |
""" | |
neighbor_info = "" | |
if neighbor_row is not None: | |
neighbor_info = f""" | |
<b>Neighbor:</b> {neighbor_row['filepath']}<br> | |
<b>Species:</b> {neighbor_row['source']}<br> | |
<b>Label:</b> {neighbor_row.get('label', 'N/A')}<br> | |
<b>CMT α-{lens}:</b> {neighbor_cmt['alpha']:.4f}<br> | |
<b>CMT SRL-{lens}:</b> {neighbor_cmt['srl']:.4f}<br> | |
<b>Field Points:</b> {neighbor_cmt['final_count'] if neighbor_cmt else 0} | |
""" | |
# Update neighbor dropdown choices | |
opposite_species = 'Human' if species == 'Dog' else 'Dog' | |
neighbor_choices = df_combined[df_combined["source"] == opposite_species]["filepath"].astype(str).tolist() | |
# Audio files | |
primary_audio = primary_fp if primary_fp and os.path.exists(primary_fp) else None | |
neighbor_audio = neighbor_fp if neighbor_row is not None and neighbor_fp and os.path.exists(neighbor_fp) else None | |
return (dual_holo_fig, dual_diag_fig, primary_info, neighbor_info, | |
primary_audio, neighbor_audio) | |
# Event handlers | |
def update_dropdowns_on_species_change(species): | |
"""Update both primary and neighbor dropdowns when species changes.""" | |
species_files = df_combined[df_combined["source"] == species]["filepath"].astype(str).tolist() | |
opposite_species = 'Human' if species == 'Dog' else 'Dog' | |
neighbor_files = df_combined[df_combined["source"] == opposite_species]["filepath"].astype(str).tolist() | |
primary_value = species_files[0] if species_files else "" | |
neighbor_value = neighbor_files[0] if neighbor_files else "" | |
return ( | |
gr.Dropdown(choices=species_files, value=primary_value), | |
gr.Dropdown(choices=neighbor_files, value=neighbor_value) | |
) | |
species_dropdown.change( | |
update_dropdowns_on_species_change, | |
inputs=[species_dropdown], | |
outputs=[primary_dropdown, neighbor_dropdown] | |
) | |
cross_species_inputs = [species_dropdown, primary_dropdown, neighbor_dropdown, | |
holo_lens_dropdown, holo_resolution_slider, holo_wavelength_slider] | |
cross_species_outputs = [dual_holography_plot, dual_diagnostic_plot, | |
primary_info_html, neighbor_info_html, | |
primary_audio_out, neighbor_audio_out] | |
# Only bind change events, not load events to avoid overwhelming initialization | |
primary_dropdown.change(update_cross_species_view, | |
inputs=cross_species_inputs, | |
outputs=cross_species_outputs) | |
neighbor_dropdown.change(update_cross_species_view, | |
inputs=cross_species_inputs, | |
outputs=cross_species_outputs) | |
holo_lens_dropdown.change(update_cross_species_view, | |
inputs=cross_species_inputs, | |
outputs=cross_species_outputs) | |
holo_resolution_slider.change(update_cross_species_view, | |
inputs=cross_species_inputs, | |
outputs=cross_species_outputs) | |
holo_wavelength_slider.change(update_cross_species_view, | |
inputs=cross_species_inputs, | |
outputs=cross_species_outputs) | |
if __name__ == "__main__": | |
demo.launch(share=True, debug=True) | |