Luigi's picture
fix reasonning model's thought process display
9d3ca6c
raw
history blame
10.8 kB
import streamlit as st
from llama_cpp import Llama
from huggingface_hub import hf_hub_download
import os
import gc
import shutil
import re
# ----- Custom CSS for pretty formatting of internal reasoning -----
CUSTOM_CSS = """
<style>
/* Styles for the internal reasoning bullet list */
ul.think-list {
margin: 0.5em 0 1em 1.5em;
padding: 0;
list-style-type: disc;
}
ul.think-list li {
margin-bottom: 0.5em;
}
/* Container style for the "in progress" internal reasoning */
.chat-assistant {
background-color: #f9f9f9;
padding: 1em;
border-radius: 5px;
margin-bottom: 1em;
}
</style>
"""
st.markdown(CUSTOM_CSS, unsafe_allow_html=True)
# ----- Set a threshold for required free storage (in bytes) -----
REQUIRED_SPACE_BYTES = 5 * 1024 ** 3 # 5 GB
# ----- Available models -----
MODELS = {
"Qwen2.5-7B-Instruct (Q2_K)": {
"repo_id": "Qwen/Qwen2.5-7B-Instruct-GGUF",
"filename": "qwen2.5-7b-instruct-q2_k.gguf",
"description": "Qwen2.5-7B Instruct (Q2_K)"
},
"Gemma-3-4B-IT (Q4_K_M)": {
"repo_id": "unsloth/gemma-3-4b-it-GGUF",
"filename": "gemma-3-4b-it-Q4_K_M.gguf",
"description": "Gemma 3 4B IT (Q4_K_M)"
},
"Phi-4-mini-Instruct (Q4_K_M)": {
"repo_id": "unsloth/Phi-4-mini-instruct-GGUF",
"filename": "Phi-4-mini-instruct-Q4_K_M.gguf",
"description": "Phi-4 Mini Instruct (Q4_K_M)"
},
"Meta-Llama-3.1-8B-Instruct (Q2_K)": {
"repo_id": "MaziyarPanahi/Meta-Llama-3.1-8B-Instruct-GGUF",
"filename": "Meta-Llama-3.1-8B-Instruct.Q2_K.gguf",
"description": "Meta-Llama-3.1-8B-Instruct (Q2_K)"
},
"DeepSeek-R1-Distill-Llama-8B (Q2_K)": {
"repo_id": "unsloth/DeepSeek-R1-Distill-Llama-8B-GGUF",
"filename": "DeepSeek-R1-Distill-Llama-8B-Q2_K.gguf",
"description": "DeepSeek-R1-Distill-Llama-8B (Q2_K)"
},
"Mistral-7B-Instruct-v0.3 (IQ3_XS)": {
"repo_id": "MaziyarPanahi/Mistral-7B-Instruct-v0.3-GGUF",
"filename": "Mistral-7B-Instruct-v0.3.IQ3_XS.gguf",
"description": "Mistral-7B-Instruct-v0.3 (IQ3_XS)"
},
"Qwen2.5-Coder-7B-Instruct (Q2_K)": {
"repo_id": "Qwen/Qwen2.5-Coder-7B-Instruct-GGUF",
"filename": "qwen2.5-coder-7b-instruct-q2_k.gguf",
"description": "Qwen2.5-Coder-7B-Instruct (Q2_K)"
},
}
# ----- Sidebar settings -----
with st.sidebar:
st.header("⚙️ Settings")
selected_model_name = st.selectbox("Select Model", list(MODELS.keys()))
system_prompt = st.text_area("System Prompt", value="You are a helpful assistant.", height=80)
max_tokens = st.slider("Max tokens", 64, 2048, 512, step=32)
temperature = st.slider("Temperature", 0.1, 2.0, 0.7)
top_k = st.slider("Top-K", 1, 100, 40)
top_p = st.slider("Top-P", 0.1, 1.0, 0.95)
repeat_penalty = st.slider("Repetition Penalty", 1.0, 2.0, 1.1)
if st.button("🧹 Clear All Cached Models"):
try:
for f in os.listdir("models"):
if f.endswith(".gguf"):
os.remove(os.path.join("models", f))
st.success("Model cache cleared.")
except Exception as e:
st.error(f"Failed to clear models: {e}")
if st.button("📦 Show Disk Usage"):
try:
usage = shutil.disk_usage(".")
used = usage.used / (1024 ** 3)
free = usage.free / (1024 ** 3)
st.info(f"Disk Used: {used:.2f} GB | Free: {free:.2f} GB")
except Exception as e:
st.error(f"Disk usage error: {e}")
# ----- Model info -----
selected_model = MODELS[selected_model_name]
model_path = os.path.join("models", selected_model["filename"])
# ----- Session state initialization -----
if "model_name" not in st.session_state:
st.session_state.model_name = None
if "llm" not in st.session_state:
st.session_state.llm = None
if "chat_history" not in st.session_state:
st.session_state.chat_history = []
if "pending_response" not in st.session_state:
st.session_state.pending_response = False
# ----- Ensure model directory exists -----
os.makedirs("models", exist_ok=True)
# ----- Functions for model management -----
def cleanup_old_models():
for f in os.listdir("models"):
if f.endswith(".gguf") and f != selected_model["filename"]:
try:
os.remove(os.path.join("models", f))
except Exception as e:
st.warning(f"Couldn't delete old model {f}: {e}")
def download_model():
with st.spinner(f"Downloading {selected_model['filename']}..."):
hf_hub_download(
repo_id=selected_model["repo_id"],
filename=selected_model["filename"],
local_dir="./models",
local_dir_use_symlinks=False, # Deprecated parameter; harmless warning.
)
def try_load_model(path):
try:
return Llama(
model_path=path,
n_ctx=1024,
n_threads=2,
n_threads_batch=2,
n_batch=4,
n_gpu_layers=0,
use_mlock=False,
use_mmap=True,
verbose=False,
)
except Exception as e:
return str(e)
def validate_or_download_model():
# Download model if not present locally.
if not os.path.exists(model_path):
free_space = shutil.disk_usage(".").free
if free_space < REQUIRED_SPACE_BYTES:
st.info("Insufficient storage detected. Cleaning up old models to free up space.")
cleanup_old_models()
download_model()
result = try_load_model(model_path)
if isinstance(result, str):
st.warning(f"Initial load failed: {result}\nAttempting re-download...")
try:
os.remove(model_path)
except Exception:
pass
free_space = shutil.disk_usage(".").free
if free_space < REQUIRED_SPACE_BYTES:
st.info("Insufficient storage detected on re-download attempt. Cleaning up old models to free up space.")
cleanup_old_models()
download_model()
result = try_load_model(model_path)
if isinstance(result, str):
st.error(f"Model still failed after re-download: {result}")
st.stop()
return result
return result
# ----- Load model if changed -----
if st.session_state.model_name != selected_model_name:
if st.session_state.llm is not None:
del st.session_state.llm
gc.collect()
st.session_state.llm = validate_or_download_model()
st.session_state.model_name = selected_model_name
llm = st.session_state.llm
# ----- Display title and caption -----
st.title(f"🧠 {selected_model['description']} (Streamlit + GGUF)")
st.caption(f"Powered by `llama.cpp` | Model: {selected_model['filename']}")
# ----- Render full chat history -----
for chat in st.session_state.chat_history:
with st.chat_message(chat["role"]):
st.markdown(chat["content"])
# For assistant messages, if there's completed internal reasoning, display it behind an expander.
if chat.get("role") == "assistant" and chat.get("thinking"):
with st.expander("🧠 Model's Internal Reasoning"):
for t in chat["thinking"]:
st.markdown(t.strip())
# ----- Chat input widget -----
user_input = st.chat_input("Ask something...")
if user_input:
if st.session_state.pending_response:
st.warning("Please wait for the assistant to finish responding.")
else:
st.session_state.chat_history.append({"role": "user", "content": user_input})
with st.chat_message("user"):
st.markdown(user_input)
st.session_state.pending_response = True
MAX_TURNS = 8
trimmed_history = st.session_state.chat_history[-(MAX_TURNS * 2):]
messages = [{"role": "system", "content": system_prompt}] + trimmed_history
# ----- Streaming the assistant response -----
with st.chat_message("assistant"):
visible_placeholder = st.empty()
thinking_placeholder = st.empty()
full_response = ""
stream = llm.create_chat_completion(
messages=messages,
max_tokens=max_tokens,
temperature=temperature,
top_k=top_k,
top_p=top_p,
repeat_penalty=repeat_penalty,
stream=True,
)
for chunk in stream:
if "choices" in chunk:
delta = chunk["choices"][0]["delta"].get("content", "")
full_response += delta
# Determine if there is an open (in-progress) <think> block
open_think = re.search(r"<think>([^<]*)$", full_response, flags=re.DOTALL)
in_progress = open_think.group(1).strip() if open_think else ""
# Create the visible response by removing any complete <think>...</think> blocks,
# and also removing any in-progress (unclosed) <think> content.
visible_response = re.sub(r"<think>.*?</think>", "", full_response, flags=re.DOTALL)
visible_response = re.sub(r"<think>.*$", "", visible_response, flags=re.DOTALL)
visible_placeholder.markdown(visible_response)
# If there's an in-progress thinking part, display it in a pretty style
if in_progress:
# You can further format in_progress as you like; here we wrap it in a styled div.
thinking_html = f"""
<div class="chat-assistant">
<strong>Internal Reasoning (in progress):</strong>
<br>{in_progress}
</div>
"""
thinking_placeholder.markdown(thinking_html, unsafe_allow_html=True)
else:
thinking_placeholder.empty()
# After streaming completes:
# Extract all completed <think> blocks (the final internal reasoning that was closed)
final_thinking = re.findall(r"<think>(.*?)</think>", full_response, flags=re.DOTALL)
# The final visible response: remove any <think> blocks or any in-progress open block.
final_visible = re.sub(r"<think>.*?</think>", "", full_response, flags=re.DOTALL)
final_visible = re.sub(r"<think>.*$", "", final_visible, flags=re.DOTALL)
st.session_state.chat_history.append({
"role": "assistant",
"content": final_visible,
"thinking": final_thinking
})
st.session_state.pending_response = False