Spaces:
Sleeping
Sleeping
File size: 18,638 Bytes
1d1182e b2cf072 1d1182e bd61488 1d1182e 6a869ae b2cf072 bd61488 6a869ae b2cf072 6a869ae 1d1182e 6a869ae b2cf072 6a869ae 1d0a230 b2cf072 6a869ae b2cf072 bd61488 b2cf072 6a869ae b2cf072 6a869ae b2cf072 6a869ae b2cf072 6a869ae b2cf072 6a869ae b2cf072 6a869ae bd61488 b2cf072 6a869ae b2cf072 bd61488 b2cf072 bd61488 6a869ae b2cf072 6a869ae bd61488 b2cf072 6a869ae bd61488 6a869ae b2cf072 bd61488 6a869ae 1d1182e 6a869ae b2cf072 6a869ae b2cf072 6a869ae b2cf072 6a869ae b2cf072 6a869ae b2cf072 6a869ae b2cf072 6a869ae b2cf072 6a869ae b2cf072 6a869ae b2cf072 6a869ae b2cf072 6a869ae b2cf072 6a869ae b2cf072 6a869ae b2cf072 6a869ae b2cf072 6a869ae bd61488 b2cf072 6a869ae b2cf072 6a869ae b2cf072 6a869ae b2cf072 6a869ae b2cf072 6a869ae b2cf072 58d7c9e 6a869ae b2cf072 6a869ae b2cf072 6a869ae b2cf072 6a869ae b2cf072 6a869ae b2cf072 6a869ae b2cf072 6a869ae b2cf072 6a869ae b2cf072 6a869ae b2cf072 6a869ae b2cf072 6a869ae 1d1182e 6a869ae b2cf072 6a869ae 1d1182e 6a869ae b2cf072 6a869ae 1189ea8 6a869ae b2cf072 6a869ae 1189ea8 b2cf072 6a869ae bd61488 b2cf072 6a869ae bd61488 b2cf072 6a869ae 1189ea8 6a869ae b2cf072 1d1182e 6a869ae b2cf072 1d1182e b2cf072 6a869ae b2cf072 6a869ae b2cf072 1d1182e 6a869ae |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 |
import torch
from transformers import AutoModelForCausalLM, AutoTokenizer
from sklearn.metrics.pairwise import cosine_similarity
from sklearn.cluster import KMeans
import numpy as np
import gradio as gr
import matplotlib
matplotlib.use('Agg') # Use a non-interactive backend for Matplotlib
import matplotlib.pyplot as plt
import seaborn as sns
import io
import base64
# --- Model and Tokenizer Setup ---
DEFAULT_MODEL_NAME = "EleutherAI/gpt-neo-1.3B"
FALLBACK_MODEL_NAME = "gpt2" # Fallback if preferred model fails
try:
print(f"Attempting to load model: {DEFAULT_MODEL_NAME}")
tokenizer = AutoTokenizer.from_pretrained(DEFAULT_MODEL_NAME)
model = AutoModelForCausalLM.from_pretrained(DEFAULT_MODEL_NAME)
print(f"Successfully loaded model: {DEFAULT_MODEL_NAME}")
except OSError as e:
print(f"Error loading model {DEFAULT_MODEL_NAME}. Error: {e}")
print(f"Falling back to {FALLBACK_MODEL_NAME}.")
tokenizer = AutoTokenizer.from_pretrained(FALLBACK_MODEL_NAME)
model = AutoModelForCausalLM.from_pretrained(FALLBACK_MODEL_NAME)
print(f"Successfully loaded fallback model: {FALLBACK_MODEL_NAME}")
model.eval()
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)
print(f"Using device: {device}")
# --- Configuration ---
MODEL_CONTEXT_WINDOW = tokenizer.model_max_length if hasattr(tokenizer, 'model_max_length') and tokenizer.model_max_length is not None else model.config.max_position_embeddings
print(f"Model context window: {MODEL_CONTEXT_WINDOW} tokens.")
PROMPT_TRIM_MAX_TOKENS = min(MODEL_CONTEXT_WINDOW - 250, 1800) # Reserve ~250 for generation & instructions, cap at 1800
MAX_GEN_LENGTH = 150
# --- Debug Logging ---
debug_log_accumulator = []
def debug(msg):
print(msg)
debug_log_accumulator.append(str(msg))
# --- Core Functions ---
def trim_prompt_if_needed(prompt_text, max_tokens_for_trimming=PROMPT_TRIM_MAX_TOKENS):
tokens = tokenizer.encode(prompt_text, add_special_tokens=False)
if len(tokens) > max_tokens_for_trimming:
original_length = len(tokens)
# Trim from the beginning to keep the most recent conversational context
tokens = tokens[-max_tokens_for_trimming:]
debug(f"[!] Prompt trimming: Original {original_length} tokens, "
f"trimmed to {len(tokens)} (from the end, keeping recent context).")
return tokenizer.decode(tokens)
def generate_text_response(constructed_prompt, generation_length=MAX_GEN_LENGTH):
# The constructed_prompt already includes the task and the text to reflect upon.
# We still need to ensure this constructed_prompt doesn't exceed limits before generation.
safe_prompt = trim_prompt_if_needed(constructed_prompt, PROMPT_TRIM_MAX_TOKENS)
debug(f"Generating response for (potentially trimmed) prompt (approx. {len(safe_prompt.split())} words):\n'{safe_prompt[:400]}...'")
inputs = tokenizer(safe_prompt, return_tensors="pt", truncation=False).to(device)
input_token_length = inputs.input_ids.size(1)
# Calculate max_length for model.generate()
# It's the current length of tokenized prompt + desired new tokens, capped by model's absolute max.
max_length_for_generate = min(input_token_length + generation_length, MODEL_CONTEXT_WINDOW)
if max_length_for_generate <= input_token_length:
debug(f"[!] Warning: Prompt length ({input_token_length}) is too close to model context window ({MODEL_CONTEXT_WINDOW}). "
f"Cannot generate new tokens. Prompt: '{safe_prompt[:100]}...'")
return "[Prompt too long to generate new tokens]"
try:
outputs = model.generate(
input_ids=inputs.input_ids,
attention_mask=inputs.attention_mask,
max_length=max_length_for_generate,
pad_token_id=tokenizer.eos_token_id if tokenizer.eos_token_id is not None else 50256,
do_sample=True,
temperature=0.85,
top_p=0.92,
repetition_penalty=1.15,
)
generated_tokens = outputs[0][input_token_length:]
result_text = tokenizer.decode(generated_tokens, skip_special_tokens=True).strip()
debug(f"Generated response text (length {len(result_text.split())} words):\n'{result_text[:400]}...'")
return result_text if result_text else "[Empty Response]"
except Exception as e:
debug(f"[!!!] Error during text generation: {e}\nPrompt was: {safe_prompt[:200]}...")
return "[Generation Error]"
def calculate_similarity(text_a, text_b):
invalid_texts_markers = ["[Empty Response]", "[Generation Error]", "[Prompt too long", "[Input prompt too long"]
if not text_a or not text_a.strip() or any(marker in text_a for marker in invalid_texts_markers) or \
not text_b or not text_b.strip() or any(marker in text_b for marker in invalid_texts_markers):
debug(f"Similarity calculation skipped for invalid/empty texts: A='{str(text_a)[:50]}...', B='{str(text_b)[:50]}...'")
return 0.0
embedding_layer = model.get_input_embeddings()
with torch.no_grad():
tokens_a = tokenizer(text_a, return_tensors="pt", truncation=True, max_length=MODEL_CONTEXT_WINDOW).to(device)
tokens_b = tokenizer(text_b, return_tensors="pt", truncation=True, max_length=MODEL_CONTEXT_WINDOW).to(device)
if tokens_a.input_ids.size(1) == 0 or tokens_b.input_ids.size(1) == 0:
debug(f"Similarity calculation skipped: tokenization resulted in empty input_ids. A='{str(text_a)[:50]}...', B='{str(text_b)[:50]}...'")
return 0.0
emb_a = embedding_layer(tokens_a.input_ids).mean(dim=1)
emb_b = embedding_layer(tokens_b.input_ids).mean(dim=1)
score = float(cosine_similarity(emb_a.cpu().numpy(), emb_b.cpu().numpy())[0][0])
debug(f"Similarity between A='{str(text_a)[:30]}...' and B='{str(text_b)[:30]}...' is {score:.4f}")
return score
def generate_similarity_heatmap(texts_list, custom_labels, title="Semantic Similarity Heatmap"):
# Filter out any None or problematic entries before processing
valid_texts_with_labels = [(text, label) for text, label in zip(texts_list, custom_labels) if text and isinstance(text, str) and not any(marker in text for marker in ["[Empty Response]", "[Generation Error]", "[Prompt too long", "[Input prompt too long"])]
if len(valid_texts_with_labels) < 2:
debug("Not enough valid texts to generate a heatmap.")
return "Not enough valid data for heatmap."
valid_texts = [item[0] for item in valid_texts_with_labels]
valid_labels = [item[1] for item in valid_texts_with_labels]
num_valid_texts = len(valid_texts)
sim_matrix = np.zeros((num_valid_texts, num_valid_texts))
for i in range(num_valid_texts):
for j in range(num_valid_texts):
if i == j:
sim_matrix[i, j] = 1.0
elif i < j:
sim = calculate_similarity(valid_texts[i], valid_texts[j])
sim_matrix[i, j] = sim
sim_matrix[j, i] = sim
else: # j < i, use already computed value
sim_matrix[i,j] = sim_matrix[j,i]
try:
fig_width = max(6, num_valid_texts * 0.8)
fig_height = max(5, num_valid_texts * 0.7)
fig, ax = plt.subplots(figsize=(fig_width, fig_height))
sns.heatmap(sim_matrix, annot=True, cmap="viridis", fmt=".2f", ax=ax,
xticklabels=valid_labels, yticklabels=valid_labels, annot_kws={"size": 8})
ax.set_title(title, fontsize=12)
plt.xticks(rotation=45, ha="right", fontsize=9)
plt.yticks(rotation=0, fontsize=9)
plt.tight_layout(pad=1.5)
buf = io.BytesIO()
plt.savefig(buf, format='png') # Removed bbox_inches='tight' as it can cause issues with tight_layout
plt.close(fig)
buf.seek(0)
img_base64 = base64.b64encode(buf.read()).decode('utf-8')
return f"<img src='data:image/png;base64,{img_base64}' alt='{title}' style='max-width:100%; height:auto;'/>"
except Exception as e:
debug(f"[!!!] Error generating heatmap: {e}")
return f"Error generating heatmap: {e}"
def perform_text_clustering(texts_list, custom_labels, num_clusters=2):
valid_texts_with_labels = [(text, label) for text, label in zip(texts_list, custom_labels) if text and isinstance(text, str) and not any(marker in text for marker in ["[Empty Response]", "[Generation Error]", "[Prompt too long", "[Input prompt too long"])]
if len(valid_texts_with_labels) < num_clusters:
debug(f"Not enough valid texts ({len(valid_texts_with_labels)}) for {num_clusters}-means clustering.")
return {label: "N/A (Few Samples)" for label in custom_labels}
valid_texts = [item[0] for item in valid_texts_with_labels]
original_indices_map = {i: custom_labels.index(item[1]) for i, item in enumerate(valid_texts_with_labels)}
embedding_layer = model.get_input_embeddings()
embeddings_for_clustering = []
with torch.no_grad():
for text_item in valid_texts:
tokens = tokenizer(text_item, return_tensors="pt", truncation=True, max_length=MODEL_CONTEXT_WINDOW).to(device)
if tokens.input_ids.size(1) == 0:
debug(f"Skipping text for embedding in clustering due to empty tokenization: '{text_item[:50]}...'")
continue # This case should be rare if valid_texts_with_labels already filtered
emb = embedding_layer(tokens.input_ids).mean(dim=1)
embeddings_for_clustering.append(emb.cpu().numpy().squeeze())
if not embeddings_for_clustering or len(embeddings_for_clustering) < num_clusters:
debug("Not enough valid texts were successfully embedded for clustering.")
return {label: "N/A (Embedding Fail)" for label in custom_labels}
embeddings_np = np.array(embeddings_for_clustering)
cluster_results_map = {label: "N/A" for label in custom_labels}
try:
actual_num_clusters = min(num_clusters, len(embeddings_for_clustering))
if actual_num_clusters < 2:
debug(f"Adjusted num_clusters to 1 due to only {len(embeddings_for_clustering)} valid sample(s). Assigning all to Cluster 0.")
predicted_labels = [0] * len(embeddings_for_clustering)
else:
kmeans = KMeans(n_clusters=actual_num_clusters, random_state=42, n_init='auto')
predicted_labels = kmeans.fit_predict(embeddings_np)
for i, original_label_key_idx in original_indices_map.items(): # i is index in valid_texts, original_label_key_idx is index in custom_labels
cluster_results_map[custom_labels[original_label_key_idx]] = f"C{predicted_labels[i]}"
return cluster_results_map
except Exception as e:
debug(f"[!!!] Error during clustering: {e}")
return {label: "Error" for label in custom_labels}
# --- Main EAL Unfolding Logic ---
def run_eal_dual_unfolding(num_iterations):
I_trace_texts, not_I_trace_texts = [None]*num_iterations, [None]*num_iterations # Pre-allocate for easier indexing
delta_S_I_values, delta_S_not_I_values, delta_S_cross_values = [None]*num_iterations, [None]*num_iterations, [None]*num_iterations
debug_log_accumulator.clear()
ui_log_entries = []
initial_seed_thought_for_I = "A reflective process is initiated, considering its own nature."
for i in range(num_iterations):
ui_log_entries.append(f"--- Iteration {i} ---")
debug(f"\n=== Iteration {i} ===")
# === I-Trace (Self-Reflection) ===
basis_for_I_elaboration = initial_seed_thought_for_I if i == 0 else I_trace_texts[i-1]
if not basis_for_I_elaboration or any(marker in basis_for_I_elaboration for marker in ["[Empty Response]", "[Generation Error]"]): # Safety for basis
basis_for_I_elaboration = "The previous thought was unclear or errored. Please restart reflection."
debug(f"[!] Using fallback basis for I-Trace at iter {i} due to problematic previous I-text.")
prompt_for_I_trace = f"A thought process is evolving. Its previous stage was: \"{basis_for_I_elaboration}\"\n\nTask: Continue this line of thought. Elaborate on it, explore its implications, or develop it further in a coherent manner."
ui_log_entries.append(f"[Prompt for I{i} (approx. {len(prompt_for_I_trace.split())} words)]:\n'{prompt_for_I_trace[:400]}...'")
generated_I_text = generate_text_response(prompt_for_I_trace)
I_trace_texts[i] = generated_I_text
ui_log_entries.append(f"[I{i} Response (approx. {len(generated_I_text.split())} words)]:\n'{generated_I_text[:400]}...'")
# === ¬I-Trace (Antithesis/Contradiction) ===
statement_to_challenge_for_not_I = I_trace_texts[i] # Challenge the I-text from the *current* iteration
if not statement_to_challenge_for_not_I or any(marker in statement_to_challenge_for_not_I for marker in ["[Empty Response]", "[Generation Error]"]):
statement_to_challenge_for_not_I = "The primary statement was unclear or errored. Please offer a general contrasting idea."
debug(f"[!] Using fallback statement to challenge for ¬I-Trace at iter {i} due to problematic current I-text.")
prompt_for_not_I_trace = f"Now, consider an alternative perspective to the thought: \"{statement_to_challenge_for_not_I}\"\n\nTask: What are potential contradictions, challenges, or contrasting interpretations to this specific thought? Explore a divergent viewpoint or explain why the thought might be flawed."
ui_log_entries.append(f"[Prompt for ¬I{i} (approx. {len(prompt_for_not_I_trace.split())} words)]:\n'{prompt_for_not_I_trace[:400]}...'")
generated_not_I_text = generate_text_response(prompt_for_not_I_trace)
not_I_trace_texts[i] = generated_not_I_text
ui_log_entries.append(f"[¬I{i} Response (approx. {len(generated_not_I_text.split())} words)]:\n'{generated_not_I_text[:400]}...'")
ui_log_entries.append("---")#Separator
# === ΔS (Similarity) Calculations ===
if i > 0:
delta_S_I_values[i] = calculate_similarity(I_trace_texts[i-1], I_trace_texts[i])
delta_S_not_I_values[i] = calculate_similarity(not_I_trace_texts[i-1], not_I_trace_texts[i])
delta_S_cross_values[i] = calculate_similarity(I_trace_texts[i], not_I_trace_texts[i])
# --- Post-loop Analysis & Output Formatting ---
all_generated_texts = I_trace_texts + not_I_trace_texts
text_labels_for_analysis = [f"I{k}" for k in range(num_iterations)] + \
[f"¬I{k}" for k in range(num_iterations)]
cluster_assignments_map = perform_text_clustering(all_generated_texts, text_labels_for_analysis, num_clusters=2)
I_out_formatted_lines = []
for k in range(num_iterations):
cluster_label_I = cluster_assignments_map.get(f"I{k}", "N/A")
I_out_formatted_lines.append(f"**I{k} [{cluster_label_I}]**:\n{I_trace_texts[k]}")
I_out_formatted = "\n\n".join(I_out_formatted_lines)
not_I_out_formatted_lines = []
for k in range(num_iterations):
cluster_label_not_I = cluster_assignments_map.get(f"¬I{k}", "N/A")
not_I_out_formatted_lines.append(f"**¬I{k} [{cluster_label_not_I}]**:\n{not_I_trace_texts[k]}")
not_I_out_formatted = "\n\n".join(not_I_out_formatted_lines)
delta_S_summary_lines = []
for k in range(num_iterations):
ds_i_str = f"{delta_S_I_values[k]:.4f}" if delta_S_I_values[k] is not None else "N/A (Iter 0)"
ds_not_i_str = f"{delta_S_not_I_values[k]:.4f}" if delta_S_not_I_values[k] is not None else "N/A (Iter 0)"
ds_cross_str = f"{delta_S_cross_values[k]:.4f}" if delta_S_cross_values[k] is not None else "N/A"
delta_S_summary_lines.append(f"Iter {k}: ΔS(I{k-1}↔I{k})={ds_i_str}, ΔS(¬I{k-1}↔¬I{k})={ds_not_i_str}, ΔS_Cross(I{k}↔¬I{k})={ds_cross_str}")
delta_S_summary_output = "\n".join(delta_S_summary_lines)
# Join UI log entries for one of the Textbox outputs.
# If it gets too long, Gradio might truncate it or cause performance issues.
# Consider if this detailed log should be optional or managed differently for very many iterations.
detailed_ui_log_output = "\n".join(ui_log_entries)
debug_log_output = "\n".join(debug_log_accumulator)
heatmap_html_output = generate_similarity_heatmap(all_generated_texts,
custom_labels=text_labels_for_analysis,
title=f"Similarity Matrix (All Texts - {num_iterations} Iterations)")
# Instead of returning detailed_ui_log_output, return the specific trace text boxes.
# The debug_log_output will contain the full internal log.
return I_out_formatted, not_I_out_formatted, delta_S_summary_output, debug_log_output, heatmap_html_output
# --- Gradio Interface Definition ---
eal_interface = gr.Interface(
fn=run_eal_dual_unfolding,
inputs=gr.Slider(minimum=1, maximum=5, value=3, step=1, label="Number of EAL Iterations"), # Min 1 iter
outputs=[
gr.Textbox(label="I-Trace (Self-Reflection with Cluster)", lines=12, interactive=False),
gr.Textbox(label="¬I-Trace (Antithesis with Cluster)", lines=12, interactive=False),
gr.Textbox(label="ΔS Similarity Trace Summary", lines=7, interactive=False),
gr.Textbox(label="Detailed Debug Log (Prompts, Responses, Errors)", lines=15, interactive=False), # Increased lines
gr.HTML(label="Overall Semantic Similarity Heatmap (I-Trace & ¬I-Trace Texts)")
],
title="EAL LLM Identity Analyzer: Self-Reflection vs. Antithesis (Open-Ended)",
description=(
"This application explores emergent identity in a Large Language Model (LLM) using Entropic Attractor Logic (EAL) inspired principles. "
"It runs two parallel conversational traces with more open-ended prompts:\n"
"1. **I-Trace:** The model elaborates on its evolving self-concept, seeded by an initial neutral thought.\n"
"2. **¬I-Trace:** The model attempts to explore alternative perspectives or challenges to the latest statement from the I-Trace.\n\n"
"**ΔS Values:** Cosine similarity. ΔS(I) = sim(I_k-1, I_k). ΔS(¬I) = sim(¬I_k-1, ¬I_k). ΔS_Cross = sim(I_k, ¬I_k).\n"
"**Clustering [Cx]:** Assigns each generated text to one of two semantic clusters.\n"
"**Heatmap:** Visualizes pair-wise similarity across all generated texts."
),
allow_flagging='never'
)
if __name__ == "__main__":
print("Starting Gradio App...")
eal_interface.launch()
|