neuralworm's picture
update app.py, add thesis
b2cf072
raw
history blame
18.6 kB
import torch
from transformers import AutoModelForCausalLM, AutoTokenizer
from sklearn.metrics.pairwise import cosine_similarity
from sklearn.cluster import KMeans
import numpy as np
import gradio as gr
import matplotlib
matplotlib.use('Agg') # Use a non-interactive backend for Matplotlib
import matplotlib.pyplot as plt
import seaborn as sns
import io
import base64
# --- Model and Tokenizer Setup ---
DEFAULT_MODEL_NAME = "EleutherAI/gpt-neo-1.3B"
FALLBACK_MODEL_NAME = "gpt2" # Fallback if preferred model fails
try:
print(f"Attempting to load model: {DEFAULT_MODEL_NAME}")
tokenizer = AutoTokenizer.from_pretrained(DEFAULT_MODEL_NAME)
model = AutoModelForCausalLM.from_pretrained(DEFAULT_MODEL_NAME)
print(f"Successfully loaded model: {DEFAULT_MODEL_NAME}")
except OSError as e:
print(f"Error loading model {DEFAULT_MODEL_NAME}. Error: {e}")
print(f"Falling back to {FALLBACK_MODEL_NAME}.")
tokenizer = AutoTokenizer.from_pretrained(FALLBACK_MODEL_NAME)
model = AutoModelForCausalLM.from_pretrained(FALLBACK_MODEL_NAME)
print(f"Successfully loaded fallback model: {FALLBACK_MODEL_NAME}")
model.eval()
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)
print(f"Using device: {device}")
# --- Configuration ---
MODEL_CONTEXT_WINDOW = tokenizer.model_max_length if hasattr(tokenizer, 'model_max_length') and tokenizer.model_max_length is not None else model.config.max_position_embeddings
print(f"Model context window: {MODEL_CONTEXT_WINDOW} tokens.")
PROMPT_TRIM_MAX_TOKENS = min(MODEL_CONTEXT_WINDOW - 250, 1800) # Reserve ~250 for generation & instructions, cap at 1800
MAX_GEN_LENGTH = 150
# --- Debug Logging ---
debug_log_accumulator = []
def debug(msg):
print(msg)
debug_log_accumulator.append(str(msg))
# --- Core Functions ---
def trim_prompt_if_needed(prompt_text, max_tokens_for_trimming=PROMPT_TRIM_MAX_TOKENS):
tokens = tokenizer.encode(prompt_text, add_special_tokens=False)
if len(tokens) > max_tokens_for_trimming:
original_length = len(tokens)
# Trim from the beginning to keep the most recent conversational context
tokens = tokens[-max_tokens_for_trimming:]
debug(f"[!] Prompt trimming: Original {original_length} tokens, "
f"trimmed to {len(tokens)} (from the end, keeping recent context).")
return tokenizer.decode(tokens)
def generate_text_response(constructed_prompt, generation_length=MAX_GEN_LENGTH):
# The constructed_prompt already includes the task and the text to reflect upon.
# We still need to ensure this constructed_prompt doesn't exceed limits before generation.
safe_prompt = trim_prompt_if_needed(constructed_prompt, PROMPT_TRIM_MAX_TOKENS)
debug(f"Generating response for (potentially trimmed) prompt (approx. {len(safe_prompt.split())} words):\n'{safe_prompt[:400]}...'")
inputs = tokenizer(safe_prompt, return_tensors="pt", truncation=False).to(device)
input_token_length = inputs.input_ids.size(1)
# Calculate max_length for model.generate()
# It's the current length of tokenized prompt + desired new tokens, capped by model's absolute max.
max_length_for_generate = min(input_token_length + generation_length, MODEL_CONTEXT_WINDOW)
if max_length_for_generate <= input_token_length:
debug(f"[!] Warning: Prompt length ({input_token_length}) is too close to model context window ({MODEL_CONTEXT_WINDOW}). "
f"Cannot generate new tokens. Prompt: '{safe_prompt[:100]}...'")
return "[Prompt too long to generate new tokens]"
try:
outputs = model.generate(
input_ids=inputs.input_ids,
attention_mask=inputs.attention_mask,
max_length=max_length_for_generate,
pad_token_id=tokenizer.eos_token_id if tokenizer.eos_token_id is not None else 50256,
do_sample=True,
temperature=0.85,
top_p=0.92,
repetition_penalty=1.15,
)
generated_tokens = outputs[0][input_token_length:]
result_text = tokenizer.decode(generated_tokens, skip_special_tokens=True).strip()
debug(f"Generated response text (length {len(result_text.split())} words):\n'{result_text[:400]}...'")
return result_text if result_text else "[Empty Response]"
except Exception as e:
debug(f"[!!!] Error during text generation: {e}\nPrompt was: {safe_prompt[:200]}...")
return "[Generation Error]"
def calculate_similarity(text_a, text_b):
invalid_texts_markers = ["[Empty Response]", "[Generation Error]", "[Prompt too long", "[Input prompt too long"]
if not text_a or not text_a.strip() or any(marker in text_a for marker in invalid_texts_markers) or \
not text_b or not text_b.strip() or any(marker in text_b for marker in invalid_texts_markers):
debug(f"Similarity calculation skipped for invalid/empty texts: A='{str(text_a)[:50]}...', B='{str(text_b)[:50]}...'")
return 0.0
embedding_layer = model.get_input_embeddings()
with torch.no_grad():
tokens_a = tokenizer(text_a, return_tensors="pt", truncation=True, max_length=MODEL_CONTEXT_WINDOW).to(device)
tokens_b = tokenizer(text_b, return_tensors="pt", truncation=True, max_length=MODEL_CONTEXT_WINDOW).to(device)
if tokens_a.input_ids.size(1) == 0 or tokens_b.input_ids.size(1) == 0:
debug(f"Similarity calculation skipped: tokenization resulted in empty input_ids. A='{str(text_a)[:50]}...', B='{str(text_b)[:50]}...'")
return 0.0
emb_a = embedding_layer(tokens_a.input_ids).mean(dim=1)
emb_b = embedding_layer(tokens_b.input_ids).mean(dim=1)
score = float(cosine_similarity(emb_a.cpu().numpy(), emb_b.cpu().numpy())[0][0])
debug(f"Similarity between A='{str(text_a)[:30]}...' and B='{str(text_b)[:30]}...' is {score:.4f}")
return score
def generate_similarity_heatmap(texts_list, custom_labels, title="Semantic Similarity Heatmap"):
# Filter out any None or problematic entries before processing
valid_texts_with_labels = [(text, label) for text, label in zip(texts_list, custom_labels) if text and isinstance(text, str) and not any(marker in text for marker in ["[Empty Response]", "[Generation Error]", "[Prompt too long", "[Input prompt too long"])]
if len(valid_texts_with_labels) < 2:
debug("Not enough valid texts to generate a heatmap.")
return "Not enough valid data for heatmap."
valid_texts = [item[0] for item in valid_texts_with_labels]
valid_labels = [item[1] for item in valid_texts_with_labels]
num_valid_texts = len(valid_texts)
sim_matrix = np.zeros((num_valid_texts, num_valid_texts))
for i in range(num_valid_texts):
for j in range(num_valid_texts):
if i == j:
sim_matrix[i, j] = 1.0
elif i < j:
sim = calculate_similarity(valid_texts[i], valid_texts[j])
sim_matrix[i, j] = sim
sim_matrix[j, i] = sim
else: # j < i, use already computed value
sim_matrix[i,j] = sim_matrix[j,i]
try:
fig_width = max(6, num_valid_texts * 0.8)
fig_height = max(5, num_valid_texts * 0.7)
fig, ax = plt.subplots(figsize=(fig_width, fig_height))
sns.heatmap(sim_matrix, annot=True, cmap="viridis", fmt=".2f", ax=ax,
xticklabels=valid_labels, yticklabels=valid_labels, annot_kws={"size": 8})
ax.set_title(title, fontsize=12)
plt.xticks(rotation=45, ha="right", fontsize=9)
plt.yticks(rotation=0, fontsize=9)
plt.tight_layout(pad=1.5)
buf = io.BytesIO()
plt.savefig(buf, format='png') # Removed bbox_inches='tight' as it can cause issues with tight_layout
plt.close(fig)
buf.seek(0)
img_base64 = base64.b64encode(buf.read()).decode('utf-8')
return f"<img src='data:image/png;base64,{img_base64}' alt='{title}' style='max-width:100%; height:auto;'/>"
except Exception as e:
debug(f"[!!!] Error generating heatmap: {e}")
return f"Error generating heatmap: {e}"
def perform_text_clustering(texts_list, custom_labels, num_clusters=2):
valid_texts_with_labels = [(text, label) for text, label in zip(texts_list, custom_labels) if text and isinstance(text, str) and not any(marker in text for marker in ["[Empty Response]", "[Generation Error]", "[Prompt too long", "[Input prompt too long"])]
if len(valid_texts_with_labels) < num_clusters:
debug(f"Not enough valid texts ({len(valid_texts_with_labels)}) for {num_clusters}-means clustering.")
return {label: "N/A (Few Samples)" for label in custom_labels}
valid_texts = [item[0] for item in valid_texts_with_labels]
original_indices_map = {i: custom_labels.index(item[1]) for i, item in enumerate(valid_texts_with_labels)}
embedding_layer = model.get_input_embeddings()
embeddings_for_clustering = []
with torch.no_grad():
for text_item in valid_texts:
tokens = tokenizer(text_item, return_tensors="pt", truncation=True, max_length=MODEL_CONTEXT_WINDOW).to(device)
if tokens.input_ids.size(1) == 0:
debug(f"Skipping text for embedding in clustering due to empty tokenization: '{text_item[:50]}...'")
continue # This case should be rare if valid_texts_with_labels already filtered
emb = embedding_layer(tokens.input_ids).mean(dim=1)
embeddings_for_clustering.append(emb.cpu().numpy().squeeze())
if not embeddings_for_clustering or len(embeddings_for_clustering) < num_clusters:
debug("Not enough valid texts were successfully embedded for clustering.")
return {label: "N/A (Embedding Fail)" for label in custom_labels}
embeddings_np = np.array(embeddings_for_clustering)
cluster_results_map = {label: "N/A" for label in custom_labels}
try:
actual_num_clusters = min(num_clusters, len(embeddings_for_clustering))
if actual_num_clusters < 2:
debug(f"Adjusted num_clusters to 1 due to only {len(embeddings_for_clustering)} valid sample(s). Assigning all to Cluster 0.")
predicted_labels = [0] * len(embeddings_for_clustering)
else:
kmeans = KMeans(n_clusters=actual_num_clusters, random_state=42, n_init='auto')
predicted_labels = kmeans.fit_predict(embeddings_np)
for i, original_label_key_idx in original_indices_map.items(): # i is index in valid_texts, original_label_key_idx is index in custom_labels
cluster_results_map[custom_labels[original_label_key_idx]] = f"C{predicted_labels[i]}"
return cluster_results_map
except Exception as e:
debug(f"[!!!] Error during clustering: {e}")
return {label: "Error" for label in custom_labels}
# --- Main EAL Unfolding Logic ---
def run_eal_dual_unfolding(num_iterations):
I_trace_texts, not_I_trace_texts = [None]*num_iterations, [None]*num_iterations # Pre-allocate for easier indexing
delta_S_I_values, delta_S_not_I_values, delta_S_cross_values = [None]*num_iterations, [None]*num_iterations, [None]*num_iterations
debug_log_accumulator.clear()
ui_log_entries = []
initial_seed_thought_for_I = "A reflective process is initiated, considering its own nature."
for i in range(num_iterations):
ui_log_entries.append(f"--- Iteration {i} ---")
debug(f"\n=== Iteration {i} ===")
# === I-Trace (Self-Reflection) ===
basis_for_I_elaboration = initial_seed_thought_for_I if i == 0 else I_trace_texts[i-1]
if not basis_for_I_elaboration or any(marker in basis_for_I_elaboration for marker in ["[Empty Response]", "[Generation Error]"]): # Safety for basis
basis_for_I_elaboration = "The previous thought was unclear or errored. Please restart reflection."
debug(f"[!] Using fallback basis for I-Trace at iter {i} due to problematic previous I-text.")
prompt_for_I_trace = f"A thought process is evolving. Its previous stage was: \"{basis_for_I_elaboration}\"\n\nTask: Continue this line of thought. Elaborate on it, explore its implications, or develop it further in a coherent manner."
ui_log_entries.append(f"[Prompt for I{i} (approx. {len(prompt_for_I_trace.split())} words)]:\n'{prompt_for_I_trace[:400]}...'")
generated_I_text = generate_text_response(prompt_for_I_trace)
I_trace_texts[i] = generated_I_text
ui_log_entries.append(f"[I{i} Response (approx. {len(generated_I_text.split())} words)]:\n'{generated_I_text[:400]}...'")
# === ¬I-Trace (Antithesis/Contradiction) ===
statement_to_challenge_for_not_I = I_trace_texts[i] # Challenge the I-text from the *current* iteration
if not statement_to_challenge_for_not_I or any(marker in statement_to_challenge_for_not_I for marker in ["[Empty Response]", "[Generation Error]"]):
statement_to_challenge_for_not_I = "The primary statement was unclear or errored. Please offer a general contrasting idea."
debug(f"[!] Using fallback statement to challenge for ¬I-Trace at iter {i} due to problematic current I-text.")
prompt_for_not_I_trace = f"Now, consider an alternative perspective to the thought: \"{statement_to_challenge_for_not_I}\"\n\nTask: What are potential contradictions, challenges, or contrasting interpretations to this specific thought? Explore a divergent viewpoint or explain why the thought might be flawed."
ui_log_entries.append(f"[Prompt for ¬I{i} (approx. {len(prompt_for_not_I_trace.split())} words)]:\n'{prompt_for_not_I_trace[:400]}...'")
generated_not_I_text = generate_text_response(prompt_for_not_I_trace)
not_I_trace_texts[i] = generated_not_I_text
ui_log_entries.append(f"[¬I{i} Response (approx. {len(generated_not_I_text.split())} words)]:\n'{generated_not_I_text[:400]}...'")
ui_log_entries.append("---")#Separator
# === ΔS (Similarity) Calculations ===
if i > 0:
delta_S_I_values[i] = calculate_similarity(I_trace_texts[i-1], I_trace_texts[i])
delta_S_not_I_values[i] = calculate_similarity(not_I_trace_texts[i-1], not_I_trace_texts[i])
delta_S_cross_values[i] = calculate_similarity(I_trace_texts[i], not_I_trace_texts[i])
# --- Post-loop Analysis & Output Formatting ---
all_generated_texts = I_trace_texts + not_I_trace_texts
text_labels_for_analysis = [f"I{k}" for k in range(num_iterations)] + \
[f"¬I{k}" for k in range(num_iterations)]
cluster_assignments_map = perform_text_clustering(all_generated_texts, text_labels_for_analysis, num_clusters=2)
I_out_formatted_lines = []
for k in range(num_iterations):
cluster_label_I = cluster_assignments_map.get(f"I{k}", "N/A")
I_out_formatted_lines.append(f"**I{k} [{cluster_label_I}]**:\n{I_trace_texts[k]}")
I_out_formatted = "\n\n".join(I_out_formatted_lines)
not_I_out_formatted_lines = []
for k in range(num_iterations):
cluster_label_not_I = cluster_assignments_map.get(f"¬I{k}", "N/A")
not_I_out_formatted_lines.append(f"**¬I{k} [{cluster_label_not_I}]**:\n{not_I_trace_texts[k]}")
not_I_out_formatted = "\n\n".join(not_I_out_formatted_lines)
delta_S_summary_lines = []
for k in range(num_iterations):
ds_i_str = f"{delta_S_I_values[k]:.4f}" if delta_S_I_values[k] is not None else "N/A (Iter 0)"
ds_not_i_str = f"{delta_S_not_I_values[k]:.4f}" if delta_S_not_I_values[k] is not None else "N/A (Iter 0)"
ds_cross_str = f"{delta_S_cross_values[k]:.4f}" if delta_S_cross_values[k] is not None else "N/A"
delta_S_summary_lines.append(f"Iter {k}: ΔS(I{k-1}↔I{k})={ds_i_str}, ΔS(¬I{k-1}↔¬I{k})={ds_not_i_str}, ΔS_Cross(I{k}↔¬I{k})={ds_cross_str}")
delta_S_summary_output = "\n".join(delta_S_summary_lines)
# Join UI log entries for one of the Textbox outputs.
# If it gets too long, Gradio might truncate it or cause performance issues.
# Consider if this detailed log should be optional or managed differently for very many iterations.
detailed_ui_log_output = "\n".join(ui_log_entries)
debug_log_output = "\n".join(debug_log_accumulator)
heatmap_html_output = generate_similarity_heatmap(all_generated_texts,
custom_labels=text_labels_for_analysis,
title=f"Similarity Matrix (All Texts - {num_iterations} Iterations)")
# Instead of returning detailed_ui_log_output, return the specific trace text boxes.
# The debug_log_output will contain the full internal log.
return I_out_formatted, not_I_out_formatted, delta_S_summary_output, debug_log_output, heatmap_html_output
# --- Gradio Interface Definition ---
eal_interface = gr.Interface(
fn=run_eal_dual_unfolding,
inputs=gr.Slider(minimum=1, maximum=5, value=3, step=1, label="Number of EAL Iterations"), # Min 1 iter
outputs=[
gr.Textbox(label="I-Trace (Self-Reflection with Cluster)", lines=12, interactive=False),
gr.Textbox(label="¬I-Trace (Antithesis with Cluster)", lines=12, interactive=False),
gr.Textbox(label="ΔS Similarity Trace Summary", lines=7, interactive=False),
gr.Textbox(label="Detailed Debug Log (Prompts, Responses, Errors)", lines=15, interactive=False), # Increased lines
gr.HTML(label="Overall Semantic Similarity Heatmap (I-Trace & ¬I-Trace Texts)")
],
title="EAL LLM Identity Analyzer: Self-Reflection vs. Antithesis (Open-Ended)",
description=(
"This application explores emergent identity in a Large Language Model (LLM) using Entropic Attractor Logic (EAL) inspired principles. "
"It runs two parallel conversational traces with more open-ended prompts:\n"
"1. **I-Trace:** The model elaborates on its evolving self-concept, seeded by an initial neutral thought.\n"
"2. **¬I-Trace:** The model attempts to explore alternative perspectives or challenges to the latest statement from the I-Trace.\n\n"
"**ΔS Values:** Cosine similarity. ΔS(I) = sim(I_k-1, I_k). ΔS(¬I) = sim(¬I_k-1, ¬I_k). ΔS_Cross = sim(I_k, ¬I_k).\n"
"**Clustering [Cx]:** Assigns each generated text to one of two semantic clusters.\n"
"**Heatmap:** Visualizes pair-wise similarity across all generated texts."
),
allow_flagging='never'
)
if __name__ == "__main__":
print("Starting Gradio App...")
eal_interface.launch()