Luigi's picture
improve model management
0813164
raw
history blame
4.77 kB
import streamlit as st
from llama_cpp import Llama
from huggingface_hub import hf_hub_download
import os
import gc
import shutil
# Available models
MODELS = {
"Qwen2.5-7B-Instruct (Q2_K)": {
"repo_id": "Qwen/Qwen2.5-7B-Instruct-GGUF",
"filename": "qwen2.5-7b-instruct-q2_k.gguf",
"description": "Qwen2.5-7B Instruct (Q2_K)"
},
"Gemma-3-4B-IT (Q4_K_M)": {
"repo_id": "unsloth/gemma-3-4b-it-GGUF",
"filename": "gemma-3-4b-it-Q4_K_M.gguf",
"description": "Gemma 3 4B IT (Q4_K_M)"
},
"Phi-4-mini-Instruct (Q4_K_M)": {
"repo_id": "unsloth/Phi-4-mini-instruct-GGUF",
"filename": "Phi-4-mini-instruct-Q4_K_M.gguf",
"description": "Phi-4 Mini Instruct (Q4_K_M)"
},
}
with st.sidebar:
st.header("⚙️ Settings")
selected_model_name = st.selectbox("Select Model", list(MODELS.keys()))
system_prompt = st.text_area("System Prompt", value="You are a helpful assistant.", height=80)
max_tokens = st.slider("Max tokens", 64, 2048, 512, step=32)
temperature = st.slider("Temperature", 0.1, 2.0, 0.7)
top_k = st.slider("Top-K", 1, 100, 40)
top_p = st.slider("Top-P", 0.1, 1.0, 0.95)
repeat_penalty = st.slider("Repetition Penalty", 1.0, 2.0, 1.1)
# Model info
selected_model = MODELS[selected_model_name]
model_path = os.path.join("models", selected_model["filename"])
# Init state
if "model_name" not in st.session_state:
st.session_state.model_name = None
if "llm" not in st.session_state:
st.session_state.llm = None
# Make sure models dir exists
os.makedirs("models", exist_ok=True)
# If the selected model file does not exist or is invalid, clean up and re-download
def validate_or_download_model():
if not os.path.exists(model_path):
cleanup_old_models()
download_model()
return
try:
_ = Llama(model_path=model_path, n_ctx=16, n_threads=1) # dummy check
except Exception as e:
st.warning(f"Model file was invalid or corrupt: {e}\nRedownloading...")
cleanup_old_models()
download_model()
def cleanup_old_models():
for f in os.listdir("models"):
if f.endswith(".gguf") and f != selected_model["filename"]:
try:
os.remove(os.path.join("models", f))
except Exception as e:
st.warning(f"Couldn't delete old model {f}: {e}")
def download_model():
with st.spinner(f"Downloading {selected_model['filename']}..."):
hf_hub_download(
repo_id=selected_model["repo_id"],
filename=selected_model["filename"],
local_dir="./models",
local_dir_use_symlinks=False,
)
validate_or_download_model()
# Load model if changed
if st.session_state.model_name != selected_model_name:
if st.session_state.llm is not None:
del st.session_state.llm
gc.collect()
try:
st.session_state.llm = Llama(
model_path=model_path,
n_ctx=1024,
n_threads=2,
n_threads_batch=2,
n_batch=4,
n_gpu_layers=0,
use_mlock=False,
use_mmap=True,
verbose=False,
)
except Exception as e:
st.error(f"Failed to load model: {e}")
st.stop()
st.session_state.model_name = selected_model_name
llm = st.session_state.llm
# Chat history state
if "chat_history" not in st.session_state:
st.session_state.chat_history = []
st.title(f"🧠 {selected_model['description']} (Streamlit + GGUF)")
st.caption(f"Powered by `llama.cpp` | Model: {selected_model['filename']}")
user_input = st.chat_input("Ask something...")
if user_input:
st.session_state.chat_history.append({"role": "user", "content": user_input})
with st.chat_message("user"):
st.markdown(user_input)
# Trim conversation history to max 8 turns (user+assistant)
MAX_TURNS = 8
trimmed_history = st.session_state.chat_history[-MAX_TURNS * 2:]
messages = [{"role": "system", "content": system_prompt}] + trimmed_history
with st.chat_message("assistant"):
full_response = ""
response_area = st.empty()
stream = llm.create_chat_completion(
messages=messages,
max_tokens=max_tokens,
temperature=temperature,
top_k=top_k,
top_p=top_p,
repeat_penalty=repeat_penalty,
stream=True,
)
for chunk in stream:
if "choices" in chunk:
delta = chunk["choices"][0]["delta"].get("content", "")
full_response += delta
response_area.markdown(full_response)
st.session_state.chat_history.append({"role": "assistant", "content": full_response})