CMT-Mapping / app.py
Severian's picture
Create app.py
6f06eba verified
raw
history blame
34.3 kB
import os
import warnings
import numpy as np
import pandas as pd
import plotly.graph_objects as go
from plotly.subplots import make_subplots
from umap import UMAP
from sklearn.cluster import KMeans
from scipy.stats import entropy as shannon_entropy
from scipy import special as sp_special
from scipy.interpolate import griddata
from sklearn.metrics.pairwise import cosine_similarity
from scipy.spatial.distance import cdist
import soundfile as sf
import gradio as gr
# ================================================================
# Unified Communication Manifold Explorer & CMT Visualizer v4.0
# - Adds side-by-side comparison capabilities from HTML draft
# - Implements cross-species neighbor finding for grammar mapping
# - Separates human and dog audio with automatic pairing
# - Enhanced dual visualization for comparative analysis
# ================================================================
# - Adds Interactive Holography tab for full field reconstruction.
# - Interpolates the continuous CMT state-space (Φ field).
# - Visualizes topology, vector flow, and phase interference.
# - Adds informational-entropy-geometry visualization.
# - Prioritizes specific Colab paths for data loading.
# ================================================================
warnings.filterwarnings("ignore", category=FutureWarning)
warnings.filterwarnings("ignore", category=UserWarning)
print("Initializing the Interactive CMT Holography Explorer...")
# ---------------------------------------------------------------
# Data setup
# ---------------------------------------------------------------
# Paths for local execution (used for dummy data generation fallback)
BASE_DIR = os.path.abspath(os.getcwd())
DATA_DIR = os.path.join(BASE_DIR, "data")
DOG_DIR = os.path.join(DATA_DIR, "dog")
HUMAN_DIR = os.path.join(DATA_DIR, "human")
# Explicit paths for Colab environment
CSV_DOG = "/content/cmt_dog_sound_analysis.csv"
CSV_HUMAN = "/content/cmt_human_speech_analysis.csv"
# These are for creating dummy audio files if needed
os.makedirs(DOG_DIR, exist_ok=True)
os.makedirs(os.path.join(HUMAN_DIR, "Actor_01"), exist_ok=True)
# --- Audio Data Configuration (Must match your data source locations) ---
DOG_AUDIO_BASE_PATH = '/content/drive/MyDrive/combined'
HUMAN_AUDIO_BASE_PATH = '/content/drive/MyDrive/human'
# ---------------------------------------------------------------
# Cross-Species Analysis Functions
# ---------------------------------------------------------------
def find_nearest_cross_species_neighbor(selected_row, df_combined, n_neighbors=5):
"""
Finds the closest neighbor from the opposite species using feature similarity.
This enables cross-species pattern mapping for grammar development.
"""
selected_source = selected_row['source']
opposite_source = 'Human' if selected_source == 'Dog' else 'Dog'
# Get feature columns for similarity calculation
feature_cols = [c for c in df_combined.columns if c.startswith("feature_")]
if not feature_cols:
# Fallback to any numeric columns if no feature columns exist
numeric_cols = df_combined.select_dtypes(include=[np.number]).columns
feature_cols = [c for c in numeric_cols if c not in ['x', 'y', 'z', 'cluster']]
if not feature_cols:
# Random selection if no suitable features found
opposite_species_data = df_combined[df_combined['source'] == opposite_source]
if len(opposite_species_data) > 0:
return opposite_species_data.iloc[0]
return None
# Extract features for the selected row
selected_features = selected_row[feature_cols].values.reshape(1, -1)
selected_features = np.nan_to_num(selected_features)
# Get all rows from the opposite species
opposite_species_data = df_combined[df_combined['source'] == opposite_source]
if len(opposite_species_data) == 0:
return None
# Extract features for opposite species
opposite_features = opposite_species_data[feature_cols].values
opposite_features = np.nan_to_num(opposite_features)
# Calculate cosine similarity (better for high-dimensional feature spaces)
similarities = cosine_similarity(selected_features, opposite_features)[0]
# Find the index of the most similar neighbor
most_similar_idx = np.argmax(similarities)
nearest_neighbor = opposite_species_data.iloc[most_similar_idx]
return nearest_neighbor
# ---------------------------------------------------------------
# Load datasets (Colab-first paths)
# ---------------------------------------------------------------
if os.path.exists(CSV_DOG) and os.path.exists(CSV_HUMAN):
print(f"Found existing data files. Loading from:\n- {CSV_DOG}\n- {CSV_HUMAN}")
df_dog = pd.read_csv(CSV_DOG)
df_human = pd.read_csv(CSV_HUMAN)
print("Successfully loaded data from specified paths.")
else:
print("Could not find one or both CSV files. Generating and using in-memory dummy data.")
# This section is for DUMMY DATA GENERATION ONLY.
# It runs if the primary CSVs are not found and does NOT write files.
n_dummy_items_per_category = 50
rng = np.random.default_rng(42)
dog_labels = ["bark", "growl", "whine", "pant"] * (n_dummy_items_per_category // 4)
human_labels = ["speech", "laugh", "cry", "shout"] * (n_dummy_items_per_category // 4)
dog_rows = {
"feature_1": rng.random(n_dummy_items_per_category), "feature_2": rng.random(n_dummy_items_per_category), "feature_3": rng.random(n_dummy_items_per_category),
"label": dog_labels, "filepath": [f"dog_{i}.wav" for i in range(n_dummy_items_per_category)],
"diag_srl_gamma": rng.uniform(0.5, 5.0, n_dummy_items_per_category), "diag_alpha_gamma": rng.uniform(0.1, 2.0, n_dummy_items_per_category),
"zeta_curvature": rng.uniform(-1, 1, n_dummy_items_per_category), "torsion_index": rng.uniform(0, 1, n_dummy_items_per_category),
}
human_rows = {
"feature_1": rng.random(n_dummy_items_per_category), "feature_2": rng.random(n_dummy_items_per_category), "feature_3": rng.random(n_dummy_items_per_category),
"label": human_labels, "filepath": [f"human_{i}.wav" for i in range(n_dummy_items_per_category)],
"diag_srl_gamma": rng.uniform(0.5, 5.0, n_dummy_items_per_category), "diag_alpha_gamma": rng.uniform(0.1, 2.0, n_dummy_items_per_category),
"zeta_curvature": rng.uniform(-1, 1, n_dummy_items_per_category), "torsion_index": rng.uniform(0, 1, n_dummy_items_per_category),
}
df_dog = pd.DataFrame(dog_rows)
df_human = pd.DataFrame(human_rows)
# We still create dummy audio files for the UI to use if needed
sr = 22050
dur = 2.0
t = np.linspace(0, dur, int(sr * dur), endpoint=False)
for i in range(n_dummy_items_per_category):
tone_freq = 220 + 20 * (i % 5)
audio = 0.1 * np.sin(2 * np.pi * tone_freq * t) + 0.02 * rng.standard_normal(t.shape)
audio = audio / (np.max(np.abs(audio)) + 1e-9)
dog_label = dog_labels[i]
dog_label_dir = os.path.join(DOG_DIR, dog_label)
os.makedirs(dog_label_dir, exist_ok=True)
sf.write(os.path.join(dog_label_dir, f"dog_{i}.wav"), audio, sr)
sf.write(os.path.join(HUMAN_DIR, "Actor_01", f"human_{i}.wav"), audio, sr)
print(f"Loaded {len(df_dog)} dog rows and {len(df_human)} human rows.")
df_dog["source"], df_human["source"] = "Dog", "Human"
df_combined = pd.concat([df_dog, df_human], ignore_index=True)
# ---------------------------------------------------------------
# Expanded CMT implementation
# ---------------------------------------------------------------
class ExpandedCMT:
def __init__(self):
self.c1, self.c2 = 0.587 + 1.223j, -0.994 + 0.0j
# A large but finite number to represent the pole at z=1 for Zeta
self.ZETA_POLE_REGULARIZATION = 1e6 - 1e6j
self.lens_library = {
"gamma": sp_special.gamma,
"zeta": self._regularized_zeta, # Use the robust zeta function
"airy": lambda z: sp_special.airy(z)[0],
"bessel": lambda z: sp_special.jv(0, z),
}
def _regularized_zeta(self, z: np.ndarray) -> np.ndarray:
"""
A wrapper around scipy's zeta function to handle the pole at z=1.
"""
# Create a copy to avoid modifying the original array
z_out = np.copy(z).astype(np.complex128)
# Find where the real part is close to 1 and the imaginary part is close to 0
pole_condition = np.isclose(np.real(z), 1.0) & np.isclose(np.imag(z), 0.0)
# Apply the standard zeta function to non-pole points
non_pole_points = ~pole_condition
z_out[non_pole_points] = sp_special.zeta(z[non_pole_points], 1)
# Apply the regularization constant to the pole points
z_out[pole_condition] = self.ZETA_POLE_REGULARIZATION
return z_out
def _robust_normalize(self, signal: np.ndarray) -> np.ndarray:
if signal.size == 0: return signal
Q1, Q3 = np.percentile(signal, [25, 75])
IQR = Q3 - Q1
if IQR < 1e-9:
median, mad = np.median(signal), np.median(np.abs(signal - np.median(signal)))
return np.zeros_like(signal) if mad < 1e-9 else (signal - median) / (mad + 1e-9)
lower, upper = Q1 - 1.5 * IQR, Q3 + 1.5 * IQR
clipped = np.clip(signal, lower, upper)
s_min, s_max = np.min(clipped), np.max(clipped)
return np.zeros_like(signal) if s_max == s_min else 2.0 * (clipped - s_min) / (s_max - s_min) - 1.0
def _encode(self, signal: np.ndarray) -> np.ndarray:
N = len(signal)
if N == 0: return signal.astype(np.complex128)
i = np.arange(N)
theta = 2.0 * np.pi * i / N
f_k, A_k = np.array([271, 341, 491]), np.array([0.033, 0.050, 0.100])
phi = np.sum(A_k[:, None] * np.sin(2.0 * np.pi * f_k[:, None] * i / N), axis=0)
Theta = theta + phi
exp_iTheta = np.exp(1j * Theta)
g, m = signal * exp_iTheta, np.abs(signal) * exp_iTheta
return 0.5 * g + 0.5 * m
def _apply_lens(self, encoded_signal: np.ndarray, lens_type: str):
lens_fn = self.lens_library.get(lens_type)
if not lens_fn: raise ValueError(f"Lens '{lens_type}' not found.")
with np.errstate(all="ignore"):
w = lens_fn(encoded_signal)
phi_trajectory = self.c1 * np.angle(w) + self.c2 * np.abs(encoded_signal)
finite_mask = np.isfinite(phi_trajectory)
return phi_trajectory[finite_mask], w[finite_mask], encoded_signal[finite_mask], len(encoded_signal), len(phi_trajectory[finite_mask])
# ---------------------------------------------------------------
# Feature preparation and UMAP embedding
# ---------------------------------------------------------------
feature_cols = [c for c in df_combined.columns if c.startswith("feature_")]
features = np.nan_to_num(df_combined[feature_cols].to_numpy())
reducer = UMAP(n_components=3, n_neighbors=15, min_dist=0.1, random_state=42)
df_combined[["x", "y", "z"]] = reducer.fit_transform(features)
kmeans = KMeans(n_clusters=max(4, min(12, int(np.sqrt(len(df_combined))))), random_state=42, n_init=10)
df_combined["cluster"] = kmeans.fit_predict(features)
df_combined["chaos_score"] = np.log1p(df_combined.get("diag_srl_gamma", 0)) / (df_combined.get("diag_alpha_gamma", 1) + 1e-2)
# ---------------------------------------------------------------
# Core Visualization and Analysis Functions
# ---------------------------------------------------------------
def resolve_audio_path(row: pd.Series) -> str:
"""
Intelligently reconstructs the full path to an audio file
based on the logic from the data generation scripts.
"""
basename = str(row.get("filepath", ""))
source = row.get("source", "")
label = row.get("label", "")
# For "Dog" data, the structure is: {base_path}/{label}/{filename}
if source == "Dog":
expected_path = os.path.join(DOG_AUDIO_BASE_PATH, label, basename)
if os.path.exists(expected_path):
return expected_path
# For "Human" data, we must search within all "Actor_XX" subfolders
elif source == "Human":
if os.path.isdir(HUMAN_AUDIO_BASE_PATH):
for actor_folder in os.listdir(HUMAN_AUDIO_BASE_PATH):
if actor_folder.startswith("Actor_"):
expected_path = os.path.join(HUMAN_AUDIO_BASE_PATH, actor_folder, basename)
if os.path.exists(expected_path):
return expected_path
# Fallback for dummy data or other cases
if os.path.exists(basename):
return basename
# If all else fails, return the original basename and let it error out with a clear message
return basename
def get_cmt_data(filepath: str, lens: str):
try:
y, _ = sf.read(filepath)
if y.ndim > 1: y = np.mean(y, axis=1)
except Exception as e:
print(f"Error reading audio file {filepath}: {e}")
return None
cmt = ExpandedCMT()
normalized = cmt._robust_normalize(y)
encoded = cmt._encode(normalized)
# The _apply_lens function now returns additional diagnostic info
phi, w, z, original_count, final_count = cmt._apply_lens(encoded, lens)
return {
"phi": phi, "w": w, "z": z,
"original_count": original_count,
"final_count": final_count
}
def generate_holographic_field(z: np.ndarray, phi: np.ndarray, resolution: int):
if z is None or phi is None or len(z) < 4: return None
points = np.vstack([np.real(z), np.imag(z)]).T
grid_x, grid_y = np.mgrid[
np.min(points[:,0]):np.max(points[:,0]):complex(0, resolution),
np.min(points[:,1]):np.max(points[:,1]):complex(0, resolution)
]
grid_phi_real = griddata(points, np.real(phi), (grid_x, grid_y), method='cubic')
grid_phi_imag = griddata(points, np.imag(phi), (grid_x, grid_y), method='cubic')
grid_phi = np.nan_to_num(grid_phi_real + 1j * grid_phi_imag)
return grid_x, grid_y, grid_phi
def create_holography_plot(z, phi, resolution, wavelength):
field_data = generate_holographic_field(z, phi, resolution)
if field_data is None: return go.Figure(layout={"title": "Not enough data for holography"})
grid_x, grid_y, grid_phi = field_data
mag_phi = np.abs(grid_phi)
phase_phi = np.angle(grid_phi)
# --- Wavelength to Colorscale Mapping ---
def wavelength_to_rgb(wl):
# Simple approximation to map visible spectrum to RGB
if 380 <= wl < 440: return f'rgb({-(wl - 440) / (440 - 380) * 255}, 0, 255)' # Violet
elif 440 <= wl < 495: return f'rgb(0, {(wl - 440) / (495 - 440) * 255}, 255)' # Blue
elif 495 <= wl < 570: return f'rgb(0, 255, {-(wl - 570) / (570 - 495) * 255})' # Green
elif 570 <= wl < 590: return f'rgb({(wl - 570) / (590 - 570) * 255}, 255, 0)' # Yellow
elif 590 <= wl < 620: return f'rgb(255, {-(wl - 620) / (620 - 590) * 255}, 0)' # Orange
elif 620 <= wl <= 750: return f'rgb(255, 0, 0)' # Red
return 'rgb(255,255,255)'
mid_color = wavelength_to_rgb(wavelength)
custom_colorscale = [[0, 'rgb(20,0,40)'], [0.5, mid_color], [1, 'rgb(255,255,255)']]
fig = go.Figure()
# 1. The Holographic Surface (Topology + Phase Interference)
fig.add_trace(go.Surface(
x=grid_x, y=grid_y, z=mag_phi,
surfacecolor=phase_phi,
colorscale=custom_colorscale,
cmin=-np.pi, cmax=np.pi,
colorbar=dict(title='Φ Phase'),
name='Holographic Field',
contours_z=dict(show=True, usecolormap=True, highlightcolor="limegreen", project_z=True, highlightwidth=10)
))
# 2. The original data points projected onto the surface
fig.add_trace(go.Scatter3d(
x=np.real(z), y=np.imag(z), z=np.abs(phi) + 0.05, # slight offset
mode='markers',
marker=dict(size=3, color='black', symbol='x'),
name='Data Points'
))
# 3. The Vector Flow Field (using cones for direction)
grad_y, grad_x = np.gradient(mag_phi)
fig.add_trace(go.Cone(
x=grid_x.flatten(), y=grid_y.flatten(), z=mag_phi.flatten(),
u=-grad_x.flatten(), v=-grad_y.flatten(), w=np.full_like(mag_phi.flatten(), -0.1),
sizemode="absolute", sizeref=0.1,
anchor="tip",
colorscale='Greys',
showscale=False,
name='Vector Flow'
))
fig.update_layout(
title="Interactive Holographic Field Reconstruction",
scene=dict(
xaxis_title="Re(z) - Encoded Signal",
yaxis_title="Im(z) - Encoded Signal",
zaxis_title="|Φ| - Field Magnitude"
),
margin=dict(l=0, r=0, b=0, t=40)
)
return fig
def create_diagnostic_plots(z, w):
"""Creates a 2D plot showing the Aperture (z) and Lens Response (w)."""
if z is None or w is None:
return go.Figure(layout={"title": "Not enough data for diagnostic plots"})
fig = go.Figure()
# Aperture (Encoded Signal)
fig.add_trace(go.Scatter(
x=np.real(z), y=np.imag(z), mode='markers',
marker=dict(size=5, color='blue', opacity=0.6),
name='Aperture (z)'
))
# Lens Response
fig.add_trace(go.Scatter(
x=np.real(w), y=np.imag(w), mode='markers',
marker=dict(size=5, color='red', opacity=0.6, symbol='x'),
name='Lens Response (w)'
))
fig.update_layout(
title="Diagnostic View: Aperture and Lens Response",
xaxis_title="Real Part",
yaxis_title="Imaginary Part",
legend_title="Signal Stage",
margin=dict(l=20, r=20, t=60, b=20)
)
return fig
def create_dual_holography_plot(z1, phi1, z2, phi2, resolution, wavelength, title1="Primary", title2="Comparison"):
"""Creates side-by-side holographic visualizations for comparison."""
field_data1 = generate_holographic_field(z1, phi1, resolution)
field_data2 = generate_holographic_field(z2, phi2, resolution)
if field_data1 is None or field_data2 is None:
return go.Figure(layout={"title": "Insufficient data for dual holography"})
grid_x1, grid_y1, grid_phi1 = field_data1
grid_x2, grid_y2, grid_phi2 = field_data2
mag_phi1, phase_phi1 = np.abs(grid_phi1), np.angle(grid_phi1)
mag_phi2, phase_phi2 = np.abs(grid_phi2), np.angle(grid_phi2)
# Wavelength to colorscale mapping
def wavelength_to_rgb(wl):
if 380 <= wl < 440: return f'rgb({int(-(wl - 440) / (440 - 380) * 255)}, 0, 255)'
elif 440 <= wl < 495: return f'rgb(0, {int((wl - 440) / (495 - 440) * 255)}, 255)'
elif 495 <= wl < 570: return f'rgb(0, 255, {int(-(wl - 570) / (570 - 495) * 255)})'
elif 570 <= wl < 590: return f'rgb({int((wl - 570) / (590 - 570) * 255)}, 255, 0)'
elif 590 <= wl < 620: return f'rgb(255, {int(-(wl - 620) / (620 - 590) * 255)}, 0)'
elif 620 <= wl <= 750: return 'rgb(255, 0, 0)'
return 'rgb(255,255,255)'
mid_color = wavelength_to_rgb(wavelength)
custom_colorscale = [[0, 'rgb(20,0,40)'], [0.5, mid_color], [1, 'rgb(255,255,255)']]
fig = make_subplots(
rows=1, cols=2,
specs=[[{'type': 'scene'}, {'type': 'scene'}]],
subplot_titles=[title1, title2]
)
# Left plot (Primary)
fig.add_trace(go.Surface(
x=grid_x1, y=grid_y1, z=mag_phi1,
surfacecolor=phase_phi1,
colorscale=custom_colorscale,
cmin=-np.pi, cmax=np.pi,
showscale=False,
name=title1,
contours_z=dict(show=True, usecolormap=True, highlightcolor="limegreen", project_z=True)
), row=1, col=1)
# Right plot (Comparison)
fig.add_trace(go.Surface(
x=grid_x2, y=grid_y2, z=mag_phi2,
surfacecolor=phase_phi2,
colorscale=custom_colorscale,
cmin=-np.pi, cmax=np.pi,
showscale=False,
name=title2,
contours_z=dict(show=True, usecolormap=True, highlightcolor="limegreen", project_z=True)
), row=1, col=2)
# Add data points
if z1 is not None and phi1 is not None:
fig.add_trace(go.Scatter3d(
x=np.real(z1), y=np.imag(z1), z=np.abs(phi1) + 0.05,
mode='markers', marker=dict(size=3, color='black', symbol='x'),
name=f'{title1} Points', showlegend=False
), row=1, col=1)
if z2 is not None and phi2 is not None:
fig.add_trace(go.Scatter3d(
x=np.real(z2), y=np.imag(z2), z=np.abs(phi2) + 0.05,
mode='markers', marker=dict(size=3, color='black', symbol='x'),
name=f'{title2} Points', showlegend=False
), row=1, col=2)
fig.update_layout(
title="Side-by-Side Cross-Species Holographic Comparison",
scene=dict(
xaxis_title="Re(z)", yaxis_title="Im(z)", zaxis_title="|Φ|",
camera=dict(eye=dict(x=1.5, y=1.5, z=1.5))
),
scene2=dict(
xaxis_title="Re(z)", yaxis_title="Im(z)", zaxis_title="|Φ|",
camera=dict(eye=dict(x=1.5, y=1.5, z=1.5))
),
margin=dict(l=0, r=0, b=0, t=60),
height=600
)
return fig
def create_dual_diagnostic_plots(z1, w1, z2, w2, title1="Primary", title2="Comparison"):
"""Creates side-by-side diagnostic plots for cross-species comparison."""
fig = make_subplots(
rows=1, cols=2,
subplot_titles=[f"{title1}: Aperture & Lens Response", f"{title2}: Aperture & Lens Response"]
)
if z1 is not None and w1 is not None:
# Primary aperture and response
fig.add_trace(go.Scatter(
x=np.real(z1), y=np.imag(z1), mode='markers',
marker=dict(size=5, color='blue', opacity=0.6),
name=f'{title1} Aperture', showlegend=True
), row=1, col=1)
fig.add_trace(go.Scatter(
x=np.real(w1), y=np.imag(w1), mode='markers',
marker=dict(size=5, color='red', opacity=0.6, symbol='x'),
name=f'{title1} Response', showlegend=True
), row=1, col=1)
if z2 is not None and w2 is not None:
# Comparison aperture and response
fig.add_trace(go.Scatter(
x=np.real(z2), y=np.imag(z2), mode='markers',
marker=dict(size=5, color='darkblue', opacity=0.6),
name=f'{title2} Aperture', showlegend=True
), row=1, col=2)
fig.add_trace(go.Scatter(
x=np.real(w2), y=np.imag(w2), mode='markers',
marker=dict(size=5, color='darkred', opacity=0.6, symbol='x'),
name=f'{title2} Response', showlegend=True
), row=1, col=2)
fig.update_layout(
title="Cross-Species Diagnostic Comparison",
height=400,
margin=dict(l=20, r=20, t=60, b=20)
)
fig.update_xaxes(title_text="Real Part", row=1, col=1)
fig.update_yaxes(title_text="Imaginary Part", row=1, col=1)
fig.update_xaxes(title_text="Real Part", row=1, col=2)
fig.update_yaxes(title_text="Imaginary Part", row=1, col=2)
return fig
def create_entropy_geometry_plot(phi: np.ndarray):
"""Creates a plot showing magnitude/phase distributions and their entropy."""
if phi is None or len(phi) < 2:
return go.Figure(layout={"title": "Not enough data for entropy analysis"})
magnitudes = np.abs(phi)
phases = np.angle(phi)
# Calculate entropy
mag_hist, _ = np.histogram(magnitudes, bins='auto', density=True)
phase_hist, _ = np.histogram(phases, bins='auto', density=True)
mag_entropy = shannon_entropy(mag_hist)
phase_entropy = shannon_entropy(phase_hist)
fig = make_subplots(rows=1, cols=2, subplot_titles=(
f"Magnitude Distribution (Entropy: {mag_entropy:.3f})",
f"Phase Distribution (Entropy: {phase_entropy:.3f})"
))
fig.add_trace(go.Histogram(x=magnitudes, name='Magnitude', nbinsx=50), row=1, col=1)
fig.add_trace(go.Histogram(x=phases, name='Phase', nbinsx=50), row=1, col=2)
fig.update_layout(
title_text="Informational-Entropy Geometry",
showlegend=False,
bargap=0.1,
margin=dict(l=20, r=20, t=60, b=20)
)
fig.update_xaxes(title_text="|Φ|", row=1, col=1)
fig.update_yaxes(title_text="Count", row=1, col=1)
fig.update_xaxes(title_text="angle(Φ)", row=1, col=2)
fig.update_yaxes(title_text="Count", row=1, col=2)
return fig
# ---------------------------------------------------------------
# Gradio UI
# ---------------------------------------------------------------
with gr.Blocks(theme=gr.themes.Soft(primary_hue="teal", secondary_hue="cyan")) as demo:
gr.Markdown("# Exhaustive CMT Explorer for Interspecies Communication v3.2")
file_choices = df_combined["filepath"].astype(str).tolist()
default_primary = file_choices[0] if file_choices else ""
with gr.Tabs():
with gr.TabItem("Unified Manifold"):
gr.Plot(value=lambda: go.Figure(data=[go.Scatter3d(
x=df_combined["x"], y=df_combined["y"], z=df_combined["z"],
mode="markers", marker=dict(color=df_combined["cluster"], size=5, colorscale="Viridis", showscale=True, colorbar={"title": "Cluster ID"}),
text=df_combined.apply(lambda r: f"{r['source']}: {r.get('label', '')}<br>File: {r['filepath']}", axis=1),
hoverinfo="text"
)], layout=dict(title="Communication Manifold (UMAP Projection)")), label="UMAP Manifold")
with gr.TabItem("Interactive Holography"):
with gr.Row():
with gr.Column(scale=1):
gr.Markdown("### Cross-Species Holography Controls")
# Species selection and automatic pairing
species_dropdown = gr.Dropdown(
label="Select Species",
choices=["Dog", "Human"],
value="Dog"
)
# Primary file selection (filtered by species)
primary_dropdown = gr.Dropdown(
label="Primary Audio File",
choices=[],
value=""
)
# Automatically found neighbor (from opposite species)
neighbor_dropdown = gr.Dropdown(
label="Auto-Found Cross-Species Neighbor",
choices=[],
value="",
interactive=True # Allow manual override
)
holo_lens_dropdown = gr.Dropdown(label="CMT Lens", choices=["gamma", "zeta", "airy", "bessel"], value="gamma")
holo_resolution_slider = gr.Slider(label="Field Resolution", minimum=20, maximum=100, step=5, value=40)
holo_wavelength_slider = gr.Slider(label="Illumination Wavelength (nm)", minimum=380, maximum=750, step=5, value=550)
# Information panels
primary_info_html = gr.HTML(label="Primary Audio Info")
neighbor_info_html = gr.HTML(label="Neighbor Audio Info")
# Audio players
primary_audio_out = gr.Audio(label="Primary Audio")
neighbor_audio_out = gr.Audio(label="Neighbor Audio")
with gr.Column(scale=2):
dual_holography_plot = gr.Plot(label="Side-by-Side Holographic Comparison")
dual_diagnostic_plot = gr.Plot(label="Cross-Species Diagnostic Comparison")
def update_file_choices(species):
"""Update the primary file dropdown based on selected species."""
species_files = df_combined[df_combined["source"] == species]["filepath"].astype(str).tolist()
return gr.Dropdown.update(choices=species_files, value=species_files[0] if species_files else "")
def update_cross_species_view(species, primary_file, neighbor_file, lens, resolution, wavelength):
if not primary_file:
empty_fig = go.Figure(layout={"title": "Please select a primary file."})
return empty_fig, empty_fig, "", "", None, None, []
# Get primary row
primary_row = df_combined[
(df_combined["filepath"] == primary_file) &
(df_combined["source"] == species)
].iloc[0] if len(df_combined[
(df_combined["filepath"] == primary_file) &
(df_combined["source"] == species)
]) > 0 else None
if primary_row is None:
empty_fig = go.Figure(layout={"title": "Primary file not found."})
return empty_fig, empty_fig, "", "", None, None, []
# Find cross-species neighbor if not manually selected
if not neighbor_file:
neighbor_row = find_nearest_cross_species_neighbor(primary_row, df_combined)
if neighbor_row is not None:
neighbor_file = neighbor_row['filepath']
else:
# Get manually selected neighbor
opposite_species = 'Human' if species == 'Dog' else 'Dog'
neighbor_row = df_combined[
(df_combined["filepath"] == neighbor_file) &
(df_combined["source"] == opposite_species)
].iloc[0] if len(df_combined[
(df_combined["filepath"] == neighbor_file) &
(df_combined["source"] == opposite_species)
]) > 0 else None
# Get CMT data for both files
primary_fp = resolve_audio_path(primary_row)
primary_cmt = get_cmt_data(primary_fp, lens)
neighbor_cmt = None
if neighbor_row is not None:
neighbor_fp = resolve_audio_path(neighbor_row)
neighbor_cmt = get_cmt_data(neighbor_fp, lens)
# Create visualizations
if primary_cmt and neighbor_cmt:
primary_title = f"{species}: {primary_row.get('label', 'Unknown')}"
neighbor_title = f"{neighbor_row['source']}: {neighbor_row.get('label', 'Unknown')}"
dual_holo_fig = create_dual_holography_plot(
primary_cmt["z"], primary_cmt["phi"],
neighbor_cmt["z"], neighbor_cmt["phi"],
resolution, wavelength, primary_title, neighbor_title
)
dual_diag_fig = create_dual_diagnostic_plots(
primary_cmt["z"], primary_cmt["w"],
neighbor_cmt["z"], neighbor_cmt["w"],
primary_title, neighbor_title
)
else:
dual_holo_fig = go.Figure(layout={"title": "Error processing audio files"})
dual_diag_fig = go.Figure(layout={"title": "Error processing audio files"})
# Build info strings
primary_info = f"""
<b>Primary:</b> {primary_row['filepath']}<br>
<b>Species:</b> {primary_row['source']}<br>
<b>Label:</b> {primary_row.get('label', 'N/A')}<br>
<b>Data Points:</b> {primary_cmt['final_count'] if primary_cmt else 0} / {primary_cmt['original_count'] if primary_cmt else 0}
"""
neighbor_info = ""
if neighbor_row is not None:
neighbor_info = f"""
<b>Neighbor:</b> {neighbor_row['filepath']}<br>
<b>Species:</b> {neighbor_row['source']}<br>
<b>Label:</b> {neighbor_row.get('label', 'N/A')}<br>
<b>Data Points:</b> {neighbor_cmt['final_count'] if neighbor_cmt else 0} / {neighbor_cmt['original_count'] if neighbor_cmt else 0}
"""
# Update neighbor dropdown choices
opposite_species = 'Human' if species == 'Dog' else 'Dog'
neighbor_choices = df_combined[df_combined["source"] == opposite_species]["filepath"].astype(str).tolist()
# Audio files
primary_audio = primary_fp if primary_fp and os.path.exists(primary_fp) else None
neighbor_audio = neighbor_fp if neighbor_row and neighbor_fp and os.path.exists(neighbor_fp) else None
return (dual_holo_fig, dual_diag_fig, primary_info, neighbor_info,
primary_audio, neighbor_audio,
gr.Dropdown.update(choices=neighbor_choices, value=neighbor_file if neighbor_row else ""))
# Event handlers
species_dropdown.change(
update_file_choices,
inputs=[species_dropdown],
outputs=[primary_dropdown]
)
cross_species_inputs = [species_dropdown, primary_dropdown, neighbor_dropdown,
holo_lens_dropdown, holo_resolution_slider, holo_wavelength_slider]
cross_species_outputs = [dual_holography_plot, dual_diagnostic_plot,
primary_info_html, neighbor_info_html,
primary_audio_out, neighbor_audio_out, neighbor_dropdown]
for component in cross_species_inputs:
component.change(update_cross_species_view,
inputs=cross_species_inputs,
outputs=cross_species_outputs)
# Initialize on load
demo.load(lambda: update_file_choices("Dog"), outputs=[primary_dropdown])
demo.load(update_cross_species_view,
inputs=cross_species_inputs,
outputs=cross_species_outputs)
if __name__ == "__main__":
demo.launch(share=True, debug=True)