Spaces:
Sleeping
Sleeping
############################################################################### | |
# app.py β EAL Emergent-Discourse Analyzer (Gemma 1 / 2 / 3 compliant) | |
############################################################################### | |
import gc, io, json, re, time, base64 | |
import torch, numpy as np, matplotlib, matplotlib.pyplot as plt, seaborn as sns | |
import gradio as gr | |
from transformers import AutoTokenizer, AutoModelForCausalLM | |
from sklearn.metrics.pairwise import cosine_similarity | |
from sklearn.cluster import KMeans | |
matplotlib.use("Agg") # headless | |
# ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
# 1 Β· Registry of models | |
# ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
AVAILABLE_MODELS = { | |
"GPT-Neox-1.3B" : "EleutherAI/gpt-neo-1.3B", | |
"GPT-2" : "gpt2", | |
"Gemma 1.1 2B-IT" : "google/gemma-1.1-2b-it", | |
"Gemma 2 2B-IT" : "google/gemma-2-2b-it", | |
"Gemma 3 1B-IT" : "google/gemma-3-1b-it", | |
} | |
_loaded, _current = {}, None | |
dbg_log: list[str] = [] | |
def dbg(msg: str) -> None: | |
ts = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) | |
line = f"[{ts}] {msg}" | |
dbg_log.append(line) | |
print(line) | |
# ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
# 2 Β· Loader helpers (BF16-aware & VRAM-safe) | |
# ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
def _gpu_supports_bf16() -> bool: | |
if not torch.cuda.is_available(): return False | |
major, _ = torch.cuda.get_device_capability() | |
return major >= 8 # Ampere (8.0) or newer | |
def _unload_current(): | |
global _current | |
if _current and _current in _loaded: | |
_loaded[_current]["model"].to("cpu") | |
torch.cuda.empty_cache(); gc.collect() | |
_current = None | |
def _load(name: str): | |
"""Lazy load or swap in the requested model.""" | |
global tokenizer, model, MODEL_CTX, device, _current | |
if name == _current: return | |
dbg(f"[boot] switching β {name}") | |
_unload_current() | |
if name in _loaded: # cached | |
obj = _loaded[name] | |
tokenizer, model, MODEL_CTX, device = obj["tok"], obj["model"], obj["ctx"], obj["dev"] | |
_current = name; return | |
repo = AVAILABLE_MODELS[name] | |
torch_dtype = torch.bfloat16 if _gpu_supports_bf16() else torch.float16 | |
tok = AutoTokenizer.from_pretrained(repo, use_fast=True) | |
mdl = AutoModelForCausalLM.from_pretrained(repo, torch_dtype=torch_dtype) | |
dev = torch.device("cuda" if torch.cuda.is_available() else "cpu") | |
mdl.to(dev).eval() | |
ctx_raw = getattr(mdl.config, "max_position_embeddings", 2048) | |
ctx = int(min(ctx_raw, 8192)) # Gemma-3 reports 1e15 β clamp | |
if tok.pad_token is None: | |
tok.pad_token = tok.eos_token | |
mdl.config.pad_token_id = mdl.config.eos_token_id | |
_loaded[name] = {"tok": tok, "model": mdl, "ctx": ctx, "dev": dev} | |
tokenizer, model, MODEL_CTX, device, _current = tok, mdl, ctx, dev, name | |
dbg(f"[boot] {name} ready (ctx={ctx}, dev={dev}, dtype={torch_dtype})") | |
# prime default | |
_load("GPT-Neox-1.3B") | |
# ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
# 3 Β· Utility fns (unchanged) | |
# ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
PROMPT_HEADROOM, MAX_GEN = 300, 100 | |
_q = re.compile(r'"') | |
def esc(t): return _q.sub('\\"', t) | |
def trim(t, rv=80): | |
toks = tokenizer.encode(t, add_special_tokens=False) | |
keep = MODEL_CTX - PROMPT_HEADROOM - rv | |
return tokenizer.decode(toks[-keep:], skip_special_tokens=True) if len(toks) > keep else t | |
def cosine(a, b): | |
noisy = ("[Generation Error", "[Context window full]", "[Model not") | |
if any(m in a for m in noisy) or any(m in b for m in noisy): return 0.0 | |
with torch.inference_mode(): | |
emb = model.get_input_embeddings() | |
ta = emb(tokenizer(a, return_tensors="pt").to(device).input_ids).mean(1) | |
tb = emb(tokenizer(b, return_tensors="pt").to(device).input_ids).mean(1) | |
return max(min(float(cosine_similarity(ta.cpu(), tb.cpu())[0,0]),1),-1) | |
def generate(prompt, temp): | |
dbg(f"PROMPT >>> {prompt}") | |
with torch.inference_mode(): | |
inp = tokenizer(prompt, return_tensors="pt").to(device) | |
out = model.generate( | |
**inp, | |
max_length=min(inp.input_ids.size(1)+MAX_GEN, MODEL_CTX), | |
temperature=temp, top_p=0.9, | |
repetition_penalty=1.2, no_repeat_ngram_size=3, | |
pad_token_id=tokenizer.pad_token_id, | |
) | |
ans = tokenizer.decode(out[0][inp.input_ids.size(1):], skip_special_tokens=True).strip() | |
dbg(f"OUTPUT <<< {ans}") | |
return ans or "[Empty]" | |
def heat(mat, labels, title): | |
mask=np.isnan(mat) | |
fig, ax=plt.subplots(figsize=(max(8,len(labels)), max(7,len(labels)*0.9))) | |
sns.heatmap(mat,mask=mask,annot=True,cmap="plasma",fmt=".2f", | |
vmin=np.nanmin(mat)*0.97,vmax=1,annot_kws={"size":7}, | |
xticklabels=labels, yticklabels=labels, ax=ax) | |
plt.xticks(rotation=45,ha="right"); plt.yticks(rotation=0) | |
ax.set_title(title,pad=18); plt.tight_layout(pad=2.3) | |
buf=io.BytesIO(); plt.savefig(buf,format="png"); plt.close(fig); buf.seek(0) | |
return f"<img src='data:image/png;base64,{base64.b64encode(buf.read()).decode()}' style='max-width:95%;height:auto;'/>" | |
# ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
# 4 Β· Main EAL routine (unchanged logic) | |
# ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
def run_eal(iters:int, mdl:str, prog=gr.Progress()): | |
dbg_log.clear(); _load(mdl) | |
I,nI,dI,dnI,dx=[None]*iters,[None]*iters,[None]*iters,[None]*iters,[None]*iters | |
seed="A thinking process begins. The first thought is:" | |
for k in range(iters): | |
prm = seed if not k else ( | |
f'The thought process previously generated: "{esc(trim(I[k-1],60))}"\n\n' | |
"Task: Continue this line of thought. What logically follows or develops?" | |
) | |
I[k]=generate(prm,0.7) | |
prm_n=(f'Consider the statement: "{esc(trim(I[k],80))}"\n\n' | |
"Task: Explore alternative perspectives or potential issues. " | |
"What might be a contrasting viewpoint or an overlooked aspect?") | |
nI[k]=generate(prm_n,0.9) | |
if k: dI[k]=cosine(I[k-1],I[k]); dnI[k]=cosine(nI[k-1],nI[k]) | |
dx[k]=cosine(I[k],nI[k]); prog((k+1)/iters) | |
# clusters | |
labels=[f"I{k}" for k in range(iters)]+[f"Β¬I{k}" for k in range(iters)] | |
vecs,lab=[],[] | |
with torch.inference_mode(): | |
emb=model.get_input_embeddings() | |
for t,l in zip(I+nI,labels): | |
if t.startswith("["):continue | |
vecs.append(emb(tokenizer(t,return_tensors="pt").to(device).input_ids).mean(1).cpu().numpy().squeeze()); lab.append(l) | |
clus={l:"N/A" for l in labels} | |
if len(vecs)>=2: clus.update({l:f"C{c}" for l,c in zip(lab,KMeans(2,random_state=0,n_init=10).fit(np.vstack(vecs)).labels_)}) | |
def block(seq,tag): return "\n\n---\n\n".join(f"**{tag}{i} [{clus.get(f'{tag}{i}','N/A')}]**:\n{t}" for i,t in enumerate(seq)) | |
tbl=["|Iter|ΞS(I)|ΞS(Β¬I)|ΞS(I,Β¬I)|","|:--:|:---:|:----:|:------:|"] | |
tbl+=[f"|{i}|{('N/A' if dI[i] is None else f'{dI[i]:.4f}')}|" | |
f"{('N/A' if dnI[i] is None else f'{dnI[i]:.4f}')}|" | |
f"{('N/A' if dx[i] is None else f'{dx[i]:.4f}')}|" for i in range(iters)] | |
n=len(labels); mat=np.full((n,n),np.nan) | |
for a in range(n): | |
for b in range(a,n): | |
sim=1 if a==b else cosine((I+nI)[a],(I+nI)[b]) | |
mat[a,b]=mat[b,a]=sim | |
return block(I,"I"),block(nI,"Β¬I"),"\n".join(tbl),"\n".join(dbg_log),heat(mat,labels,f"Similarity Matrix ({iters} iters β’ {mdl})") | |
# ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
# 5 Β· Gradio UI | |
# ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
with gr.Blocks(theme=gr.themes.Soft(primary_hue="teal")) as demo: | |
gr.Markdown("## EAL Β· Emergent-Discourse Analyzer (Gemma 1 / 2 / 3 ready)") | |
mdl_dd=gr.Dropdown(list(AVAILABLE_MODELS.keys()),value="GPT-Neox-1.3B",label="Model") | |
iters=gr.Slider(1,7,3,1,label="Iterations") | |
run=gr.Button("Run π",variant="primary") | |
with gr.Tabs(): | |
with gr.Tab("Traces"): | |
outI,outnI=gr.Markdown(),gr.Markdown() | |
with gr.Tab("ΞS + Heatmap"): | |
outTbl,outHm=gr.Markdown(),gr.HTML() | |
with gr.Tab("Debug (full prompts & answers)"): | |
outDbg=gr.Textbox(lines=26,interactive=False,show_copy_button=True) | |
run.click(run_eal,[iters,mdl_dd],[outI,outnI,outTbl,outDbg,outHm]) | |
if __name__=="__main__": | |
demo.launch() | |