Spaces:
Sleeping
Sleeping
############################################################################### | |
# app.py β EAL Emergent-Discourse Analyzer (v0.8 β’ multi-model, VRAM-safe) | |
############################################################################### | |
import gc, io, json, re, time, base64 | |
import torch, numpy as np, matplotlib, matplotlib.pyplot as plt, seaborn as sns | |
import gradio as gr | |
from sklearn.metrics.pairwise import cosine_similarity | |
from sklearn.cluster import KMeans | |
from transformers import AutoTokenizer, AutoModelForCausalLM | |
# βΈβΈ force the right SDPA backend for GPUs < SM80 | |
torch.backends.cuda.enable_flash_sdp(False) | |
torch.backends.cuda.enable_math_sdp(False) | |
torch.backends.cuda.enable_mem_efficient_sdp(True) | |
matplotlib.use("Agg") # headless | |
# ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
# 1 Β· Registry of models | |
# ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
AVAILABLE_MODELS = { | |
"GPT-Neox-1.3B" : "EleutherAI/gpt-neo-1.3B", | |
"GPT-2" : "gpt2", | |
"Gemma-3-1B-IT" : "google/gemma-3-1b-it", # float-16 branch used below | |
} | |
_loaded = {} # name β {tok, model, ctx, dev} | |
_current = None # active name | |
# debug log (full prompts + answers) | |
dbg_log: list[str] = [] | |
def dbg(msg: str) -> None: | |
stamp = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) | |
line = f"[{stamp}] {msg}" | |
dbg_log.append(line) | |
print(line) | |
# ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
# 2 Β· Loader / Unloader helpers | |
# ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
def _unload_current(): | |
"""Move old model to CPU & free CUDA VRAM.""" | |
global _current | |
if _current and _current in _loaded: | |
mdl = _loaded[_current]["model"] | |
mdl.to("cpu") | |
del mdl | |
torch.cuda.empty_cache() | |
gc.collect() | |
_current = None | |
def _load(name: str): | |
"""Lazy-load model, honouring memory limits, caching, dtype presets.""" | |
global tokenizer, model, MODEL_CTX, device, _current | |
if name == _current: | |
return # nothing to do | |
dbg(f"[boot] switching β {name}") | |
_unload_current() # free VRAM first | |
if name in _loaded: # cached | |
obj = _loaded[name] | |
tokenizer, model, MODEL_CTX, device = obj["tok"], obj["model"], obj["ctx"], obj["dev"] | |
_current = name | |
return | |
repo = AVAILABLE_MODELS[name] | |
kwargs = {"device_map": None} # we manage .to(...) | |
kwargs.update(dict(torch_dtype=torch.float16)) | |
tok = AutoTokenizer.from_pretrained(repo, use_fast=True) | |
mdl = AutoModelForCausalLM.from_pretrained(repo, **kwargs) | |
dev = torch.device("cuda" if torch.cuda.is_available() else "cpu") | |
mdl.to(dev).eval() | |
ctx = getattr(mdl.config, "max_position_embeddings", 2048) | |
# Gemma-3 config reports an absurd 1e15 β clamp sensibly | |
ctx = int(min(ctx, 8192)) | |
if tok.pad_token is None: | |
tok.pad_token = tok.eos_token | |
mdl.config.pad_token_id = mdl.config.eos_token_id | |
_loaded[name] = {"tok": tok, "model": mdl, "ctx": ctx, "dev": dev} | |
tokenizer, model, MODEL_CTX, device, _current = tok, mdl, ctx, dev, name | |
dbg(f"[boot] {name} ready (ctx={ctx}, dev={dev}, dtype={mdl.dtype})") | |
# prime a default so UI pops instantly | |
_load("GPT-Neox-1.3B") | |
# ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
# 3 Β· Utility fns | |
# ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
PROMPT_HEADROOM = 300 | |
MAX_GEN = 100 | |
def trim(txt: str, reserve: int = 80) -> str: | |
toks = tokenizer.encode(txt, add_special_tokens=False) | |
keep = MODEL_CTX - PROMPT_HEADROOM - reserve | |
return tokenizer.decode(toks[-keep:], skip_special_tokens=True) if len(toks) > keep else txt | |
_quote = re.compile(r'"') | |
def esc(s: str) -> str: return _quote.sub('\\"', s) | |
def cosine(a: str, b: str) -> float: | |
bad = ("[Generation Error", "[Context window full]", "[Model not") | |
if any(m in a for m in bad) or any(m in b for m in bad): return 0.0 | |
with torch.inference_mode(): | |
emb = model.get_input_embeddings() | |
ta = emb(tokenizer(a, return_tensors="pt").to(device).input_ids).mean(1) | |
tb = emb(tokenizer(b, return_tensors="pt").to(device).input_ids).mean(1) | |
v = float(cosine_similarity(ta.cpu(), tb.cpu())[0, 0]) | |
return max(min(v, 1.0), -1.0) | |
# ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
# 4 Β· Generation (full prompt / answer into log) | |
# ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
def generate(prompt: str, temp: float) -> str: | |
dbg(f"PROMPT >>> {prompt}") | |
with torch.inference_mode(): | |
inp = tokenizer(prompt, return_tensors="pt").to(device) | |
out = model.generate( | |
**inp, | |
max_length=min(inp.input_ids.size(1) + MAX_GEN, MODEL_CTX), | |
temperature=temp, | |
top_p=0.9, | |
repetition_penalty=1.2, | |
no_repeat_ngram_size=3, | |
pad_token_id=tokenizer.pad_token_id, | |
) | |
ans = tokenizer.decode(out[0][inp.input_ids.size(1):], skip_special_tokens=True).strip() | |
dbg(f"OUTPUT <<< {ans}") | |
return ans or "[Empty]" | |
# ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
# 5 Β· Heat-map helper | |
# ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
def heat(mat: np.ndarray, labels: list[str], title: str) -> str: | |
mask = np.isnan(mat) | |
fig, ax = plt.subplots(figsize=(max(8, len(labels)), max(7, len(labels)*0.9))) | |
sns.heatmap(mat, mask=mask, annot=True, cmap="plasma", fmt=".2f", | |
vmin=np.nanmin(mat)*0.97, vmax=1, annot_kws={"size":7}, | |
xticklabels=labels, yticklabels=labels, ax=ax) | |
plt.xticks(rotation=45, ha="right"); plt.yticks(rotation=0) | |
ax.set_title(title, pad=18); plt.tight_layout(pad=2.3) | |
buf = io.BytesIO(); plt.savefig(buf, format="png"); plt.close(fig); buf.seek(0) | |
b64 = base64.b64encode(buf.read()).decode() | |
return f"<img src='data:image/png;base64,{b64}' style='max-width:95%;height:auto;'/>" | |
# ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
# 6 Β· Main EAL routine | |
# ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
def run_eal(iters: int, mdl_name: str, prog=gr.Progress()): | |
dbg_log.clear() | |
_load(mdl_name) | |
I, nI, dI, dnI, dx = [None]*iters, [None]*iters, [None]*iters, [None]*iters, [None]*iters | |
seed = "A thinking process begins. The first thought is:" | |
for k in range(iters): | |
prm = seed if k == 0 else ( | |
f'The thought process previously generated: "{esc(trim(I[k-1],60))}"\n\n' | |
"Task: Continue this line of thought. What logically follows or develops?" | |
) | |
I[k] = generate(prm, 0.7) | |
prm_n = ( | |
f'Consider the statement: "{esc(trim(I[k],80))}"\n\n' | |
"Task: Explore alternative perspectives or potential issues. " | |
"What might be a contrasting viewpoint or an overlooked aspect?" | |
) | |
nI[k] = generate(prm_n, 0.9) | |
if k: dI[k] = cosine(I[k-1], I[k]); dnI[k] = cosine(nI[k-1], nI[k]) | |
dx[k] = cosine(I[k], nI[k]) | |
prog((k+1)/iters) | |
# simple clustering | |
labels = [f"I{k}" for k in range(iters)] + [f"Β¬I{k}" for k in range(iters)] | |
vecs, val_lab = [], [] | |
emb = model.get_input_embeddings() | |
with torch.inference_mode(): | |
for txt, lbl in zip(I+nI, labels): | |
if txt.startswith("["): continue | |
vecs.append(emb(tokenizer(txt, return_tensors="pt").to(device).input_ids).mean(1).cpu().numpy().squeeze()) | |
val_lab.append(lbl) | |
clus = {l: "N/A" for l in labels} | |
if len(vecs) >= 2: | |
km = KMeans(n_clusters=2, random_state=0, n_init=10).fit(np.vstack(vecs)) | |
clus.update({l: f"C{c}" for l, c in zip(val_lab, km.labels_)}) | |
def block(seq, tag): | |
return "\n\n---\n\n".join(f"**{tag}{i} [{clus.get(f'{tag}{i}','N/A')}]**:\n{txt}" for i, txt in enumerate(seq)) | |
tbl = ["|Iter|ΞS(I)|ΞS(Β¬I)|ΞS(I,Β¬I)|", "|:--:|:---:|:----:|:------:|"] | |
tbl += [f"|{i}|{('N/A' if dI[i] is None else f'{dI[i]:.4f}')}|" | |
f"{('N/A' if dnI[i] is None else f'{dnI[i]:.4f}')}|" | |
f"{('N/A' if dx[i] is None else f'{dx[i]:.4f}')}|" for i in range(iters)] | |
n = len(labels); m = np.full((n,n), np.nan) | |
for a in range(n): | |
for b in range(a, n): | |
sim = 1 if a==b else cosine((I+nI)[a], (I+nI)[b]) | |
m[a,b]=m[b,a]=sim | |
return (block(I,"I"), block(nI,"Β¬I"), "\n".join(tbl), | |
"\n".join(dbg_log), | |
heat(m, labels, f"Similarity Matrix ({iters} iters β’ {mdl_name})")) | |
# ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
# 7 Β· Gradio UI | |
# ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
with gr.Blocks(theme=gr.themes.Soft(primary_hue="teal")) as demo: | |
gr.Markdown("## EAL Β· Emergent Discourse Analyzer (Neox β« Gemma β« GPT-2)") | |
mdl_dd = gr.Dropdown(label="Model", choices=list(AVAILABLE_MODELS.keys()), value="GPT-Neox-1.3B") | |
iters = gr.Slider(1, 100, 3, 1, label="Iterations") | |
run = gr.Button("Run π", variant="primary") | |
with gr.Tabs(): | |
with gr.Tab("Traces"): | |
out_I, out_nI = gr.Markdown(), gr.Markdown() | |
with gr.Tab("ΞS + Heatmap"): | |
out_tbl, out_hm = gr.Markdown(), gr.HTML() | |
with gr.Tab("Debug (full prompts & answers)"): | |
out_dbg = gr.Textbox(lines=26, interactive=False, show_copy_button=True) | |
run.click(run_eal, inputs=[iters, mdl_dd], outputs=[out_I, out_nI, out_tbl, out_dbg, out_hm]) | |
if __name__ == "__main__": | |
demo.launch() | |