Spaces:
Sleeping
Sleeping
File size: 12,096 Bytes
fcc55bd 1d1182e bd61488 fcc55bd 96b07ba fcc55bd 96b07ba bd61488 fcc55bd 96b07ba fcc55bd 96b07ba fcc55bd 1d1182e fcc55bd |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 |
###############################################################################
# app.py β EAL Emergent-Discourse Analyzer (v0.8 β’ multi-model, VRAM-safe)
###############################################################################
import gc, io, json, re, time, base64
import torch, numpy as np, matplotlib, matplotlib.pyplot as plt, seaborn as sns
import gradio as gr
from sklearn.metrics.pairwise import cosine_similarity
from sklearn.cluster import KMeans
from transformers import AutoTokenizer, AutoModelForCausalLM
# βΈβΈ force the right SDPA backend for GPUs < SM80
torch.backends.cuda.enable_flash_sdp(False)
torch.backends.cuda.enable_math_sdp(False)
torch.backends.cuda.enable_mem_efficient_sdp(True)
matplotlib.use("Agg") # headless
# ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
# 1 Β· Registry of models
# ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
AVAILABLE_MODELS = {
"GPT-Neox-1.3B" : "EleutherAI/gpt-neo-1.3B",
"GPT-2" : "gpt2",
"Gemma-3-1B-IT" : "google/gemma-3-1b-it", # float-16 branch used below
}
_loaded = {} # name β {tok, model, ctx, dev}
_current = None # active name
# debug log (full prompts + answers)
dbg_log: list[str] = []
def dbg(msg: str) -> None:
stamp = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
line = f"[{stamp}] {msg}"
dbg_log.append(line)
print(line)
# ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
# 2 Β· Loader / Unloader helpers
# ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
def _unload_current():
"""Move old model to CPU & free CUDA VRAM."""
global _current
if _current and _current in _loaded:
mdl = _loaded[_current]["model"]
mdl.to("cpu")
del mdl
torch.cuda.empty_cache()
gc.collect()
_current = None
def _load(name: str):
"""Lazy-load model, honouring memory limits, caching, dtype presets."""
global tokenizer, model, MODEL_CTX, device, _current
if name == _current:
return # nothing to do
dbg(f"[boot] switching β {name}")
_unload_current() # free VRAM first
if name in _loaded: # cached
obj = _loaded[name]
tokenizer, model, MODEL_CTX, device = obj["tok"], obj["model"], obj["ctx"], obj["dev"]
_current = name
return
repo = AVAILABLE_MODELS[name]
kwargs = {"device_map": None} # we manage .to(...)
kwargs.update(dict(torch_dtype=torch.float16))
tok = AutoTokenizer.from_pretrained(repo, use_fast=True)
mdl = AutoModelForCausalLM.from_pretrained(repo, **kwargs)
dev = torch.device("cuda" if torch.cuda.is_available() else "cpu")
mdl.to(dev).eval()
ctx = getattr(mdl.config, "max_position_embeddings", 2048)
# Gemma-3 config reports an absurd 1e15 β clamp sensibly
ctx = int(min(ctx, 8192))
if tok.pad_token is None:
tok.pad_token = tok.eos_token
mdl.config.pad_token_id = mdl.config.eos_token_id
_loaded[name] = {"tok": tok, "model": mdl, "ctx": ctx, "dev": dev}
tokenizer, model, MODEL_CTX, device, _current = tok, mdl, ctx, dev, name
dbg(f"[boot] {name} ready (ctx={ctx}, dev={dev}, dtype={mdl.dtype})")
# prime a default so UI pops instantly
_load("GPT-Neox-1.3B")
# ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
# 3 Β· Utility fns
# ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
PROMPT_HEADROOM = 300
MAX_GEN = 100
def trim(txt: str, reserve: int = 80) -> str:
toks = tokenizer.encode(txt, add_special_tokens=False)
keep = MODEL_CTX - PROMPT_HEADROOM - reserve
return tokenizer.decode(toks[-keep:], skip_special_tokens=True) if len(toks) > keep else txt
_quote = re.compile(r'"')
def esc(s: str) -> str: return _quote.sub('\\"', s)
def cosine(a: str, b: str) -> float:
bad = ("[Generation Error", "[Context window full]", "[Model not")
if any(m in a for m in bad) or any(m in b for m in bad): return 0.0
with torch.inference_mode():
emb = model.get_input_embeddings()
ta = emb(tokenizer(a, return_tensors="pt").to(device).input_ids).mean(1)
tb = emb(tokenizer(b, return_tensors="pt").to(device).input_ids).mean(1)
v = float(cosine_similarity(ta.cpu(), tb.cpu())[0, 0])
return max(min(v, 1.0), -1.0)
# ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
# 4 Β· Generation (full prompt / answer into log)
# ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
def generate(prompt: str, temp: float) -> str:
dbg(f"PROMPT >>> {prompt}")
with torch.inference_mode():
inp = tokenizer(prompt, return_tensors="pt").to(device)
out = model.generate(
**inp,
max_length=min(inp.input_ids.size(1) + MAX_GEN, MODEL_CTX),
temperature=temp,
top_p=0.9,
repetition_penalty=1.2,
no_repeat_ngram_size=3,
pad_token_id=tokenizer.pad_token_id,
)
ans = tokenizer.decode(out[0][inp.input_ids.size(1):], skip_special_tokens=True).strip()
dbg(f"OUTPUT <<< {ans}")
return ans or "[Empty]"
# ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
# 5 Β· Heat-map helper
# ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
def heat(mat: np.ndarray, labels: list[str], title: str) -> str:
mask = np.isnan(mat)
fig, ax = plt.subplots(figsize=(max(8, len(labels)), max(7, len(labels)*0.9)))
sns.heatmap(mat, mask=mask, annot=True, cmap="plasma", fmt=".2f",
vmin=np.nanmin(mat)*0.97, vmax=1, annot_kws={"size":7},
xticklabels=labels, yticklabels=labels, ax=ax)
plt.xticks(rotation=45, ha="right"); plt.yticks(rotation=0)
ax.set_title(title, pad=18); plt.tight_layout(pad=2.3)
buf = io.BytesIO(); plt.savefig(buf, format="png"); plt.close(fig); buf.seek(0)
b64 = base64.b64encode(buf.read()).decode()
return f"<img src='data:image/png;base64,{b64}' style='max-width:95%;height:auto;'/>"
# ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
# 6 Β· Main EAL routine
# ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
def run_eal(iters: int, mdl_name: str, prog=gr.Progress()):
dbg_log.clear()
_load(mdl_name)
I, nI, dI, dnI, dx = [None]*iters, [None]*iters, [None]*iters, [None]*iters, [None]*iters
seed = "A thinking process begins. The first thought is:"
for k in range(iters):
prm = seed if k == 0 else (
f'The thought process previously generated: "{esc(trim(I[k-1],60))}"\n\n'
"Task: Continue this line of thought. What logically follows or develops?"
)
I[k] = generate(prm, 0.7)
prm_n = (
f'Consider the statement: "{esc(trim(I[k],80))}"\n\n'
"Task: Explore alternative perspectives or potential issues. "
"What might be a contrasting viewpoint or an overlooked aspect?"
)
nI[k] = generate(prm_n, 0.9)
if k: dI[k] = cosine(I[k-1], I[k]); dnI[k] = cosine(nI[k-1], nI[k])
dx[k] = cosine(I[k], nI[k])
prog((k+1)/iters)
# simple clustering
labels = [f"I{k}" for k in range(iters)] + [f"Β¬I{k}" for k in range(iters)]
vecs, val_lab = [], []
emb = model.get_input_embeddings()
with torch.inference_mode():
for txt, lbl in zip(I+nI, labels):
if txt.startswith("["): continue
vecs.append(emb(tokenizer(txt, return_tensors="pt").to(device).input_ids).mean(1).cpu().numpy().squeeze())
val_lab.append(lbl)
clus = {l: "N/A" for l in labels}
if len(vecs) >= 2:
km = KMeans(n_clusters=2, random_state=0, n_init=10).fit(np.vstack(vecs))
clus.update({l: f"C{c}" for l, c in zip(val_lab, km.labels_)})
def block(seq, tag):
return "\n\n---\n\n".join(f"**{tag}{i} [{clus.get(f'{tag}{i}','N/A')}]**:\n{txt}" for i, txt in enumerate(seq))
tbl = ["|Iter|ΞS(I)|ΞS(Β¬I)|ΞS(I,Β¬I)|", "|:--:|:---:|:----:|:------:|"]
tbl += [f"|{i}|{('N/A' if dI[i] is None else f'{dI[i]:.4f}')}|"
f"{('N/A' if dnI[i] is None else f'{dnI[i]:.4f}')}|"
f"{('N/A' if dx[i] is None else f'{dx[i]:.4f}')}|" for i in range(iters)]
n = len(labels); m = np.full((n,n), np.nan)
for a in range(n):
for b in range(a, n):
sim = 1 if a==b else cosine((I+nI)[a], (I+nI)[b])
m[a,b]=m[b,a]=sim
return (block(I,"I"), block(nI,"Β¬I"), "\n".join(tbl),
"\n".join(dbg_log),
heat(m, labels, f"Similarity Matrix ({iters} iters β’ {mdl_name})"))
# ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
# 7 Β· Gradio UI
# ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
with gr.Blocks(theme=gr.themes.Soft(primary_hue="teal")) as demo:
gr.Markdown("## EAL Β· Emergent Discourse Analyzer (Neox β« Gemma β« GPT-2)")
mdl_dd = gr.Dropdown(label="Model", choices=list(AVAILABLE_MODELS.keys()), value="GPT-Neox-1.3B")
iters = gr.Slider(1, 100, 3, 1, label="Iterations")
run = gr.Button("Run π", variant="primary")
with gr.Tabs():
with gr.Tab("Traces"):
out_I, out_nI = gr.Markdown(), gr.Markdown()
with gr.Tab("ΞS + Heatmap"):
out_tbl, out_hm = gr.Markdown(), gr.HTML()
with gr.Tab("Debug (full prompts & answers)"):
out_dbg = gr.Textbox(lines=26, interactive=False, show_copy_button=True)
run.click(run_eal, inputs=[iters, mdl_dd], outputs=[out_I, out_nI, out_tbl, out_dbg, out_hm])
if __name__ == "__main__":
demo.launch()
|