Spaces:
Running
on
Zero
Running
on
Zero
import streamlit as st | |
from llama_cpp import Llama | |
from huggingface_hub import hf_hub_download | |
import os | |
import gc | |
import shutil | |
# Available models | |
MODELS = { | |
"Qwen2.5-7B-Instruct (Q2_K)": { | |
"repo_id": "Qwen/Qwen2.5-7B-Instruct-GGUF", | |
"filename": "qwen2.5-7b-instruct-q2_k.gguf", | |
"description": "Qwen2.5-7B Instruct (Q2_K)" | |
}, | |
"Gemma-3-4B-IT (Q4_K_M)": { | |
"repo_id": "unsloth/gemma-3-4b-it-GGUF", | |
"filename": "gemma-3-4b-it-Q4_K_M.gguf", | |
"description": "Gemma 3 4B IT (Q4_K_M)" | |
}, | |
"Phi-4-mini-Instruct (Q4_K_M)": { | |
"repo_id": "unsloth/Phi-4-mini-instruct-GGUF", | |
"filename": "Phi-4-mini-instruct-Q4_K_M.gguf", | |
"description": "Phi-4 Mini Instruct (Q4_K_M)" | |
}, | |
} | |
with st.sidebar: | |
st.header("⚙️ Settings") | |
selected_model_name = st.selectbox("Select Model", list(MODELS.keys())) | |
system_prompt = st.text_area("System Prompt", value="You are a helpful assistant.", height=80) | |
max_tokens = st.slider("Max tokens", 64, 2048, 512, step=32) | |
temperature = st.slider("Temperature", 0.1, 2.0, 0.7) | |
top_k = st.slider("Top-K", 1, 100, 40) | |
top_p = st.slider("Top-P", 0.1, 1.0, 0.95) | |
repeat_penalty = st.slider("Repetition Penalty", 1.0, 2.0, 1.1) | |
# Model info | |
selected_model = MODELS[selected_model_name] | |
model_path = os.path.join("models", selected_model["filename"]) | |
# Init state | |
if "model_name" not in st.session_state: | |
st.session_state.model_name = None | |
if "llm" not in st.session_state: | |
st.session_state.llm = None | |
# Make sure models dir exists | |
os.makedirs("models", exist_ok=True) | |
# If the selected model file does not exist or is invalid, clean up and re-download | |
def validate_or_download_model(): | |
if not os.path.exists(model_path): | |
cleanup_old_models() | |
download_model() | |
return | |
try: | |
_ = Llama(model_path=model_path, n_ctx=16, n_threads=1) # dummy check | |
except Exception as e: | |
st.warning(f"Model file was invalid or corrupt: {e}\nRedownloading...") | |
cleanup_old_models() | |
download_model() | |
def cleanup_old_models(): | |
for f in os.listdir("models"): | |
if f.endswith(".gguf") and f != selected_model["filename"]: | |
try: | |
os.remove(os.path.join("models", f)) | |
except Exception as e: | |
st.warning(f"Couldn't delete old model {f}: {e}") | |
def download_model(): | |
with st.spinner(f"Downloading {selected_model['filename']}..."): | |
hf_hub_download( | |
repo_id=selected_model["repo_id"], | |
filename=selected_model["filename"], | |
local_dir="./models", | |
local_dir_use_symlinks=False, | |
) | |
validate_or_download_model() | |
# Load model if changed | |
if st.session_state.model_name != selected_model_name: | |
if st.session_state.llm is not None: | |
del st.session_state.llm | |
gc.collect() | |
try: | |
st.session_state.llm = Llama( | |
model_path=model_path, | |
n_ctx=1024, | |
n_threads=2, | |
n_threads_batch=2, | |
n_batch=4, | |
n_gpu_layers=0, | |
use_mlock=False, | |
use_mmap=True, | |
verbose=False, | |
) | |
except Exception as e: | |
st.error(f"Failed to load model: {e}") | |
st.stop() | |
st.session_state.model_name = selected_model_name | |
llm = st.session_state.llm | |
# Chat history state | |
if "chat_history" not in st.session_state: | |
st.session_state.chat_history = [] | |
st.title(f"🧠 {selected_model['description']} (Streamlit + GGUF)") | |
st.caption(f"Powered by `llama.cpp` | Model: {selected_model['filename']}") | |
user_input = st.chat_input("Ask something...") | |
if user_input: | |
st.session_state.chat_history.append({"role": "user", "content": user_input}) | |
with st.chat_message("user"): | |
st.markdown(user_input) | |
# Trim conversation history to max 8 turns (user+assistant) | |
MAX_TURNS = 8 | |
trimmed_history = st.session_state.chat_history[-MAX_TURNS * 2:] | |
messages = [{"role": "system", "content": system_prompt}] + trimmed_history | |
with st.chat_message("assistant"): | |
full_response = "" | |
response_area = st.empty() | |
stream = llm.create_chat_completion( | |
messages=messages, | |
max_tokens=max_tokens, | |
temperature=temperature, | |
top_k=top_k, | |
top_p=top_p, | |
repeat_penalty=repeat_penalty, | |
stream=True, | |
) | |
for chunk in stream: | |
if "choices" in chunk: | |
delta = chunk["choices"][0]["delta"].get("content", "") | |
full_response += delta | |
response_area.markdown(full_response) | |
st.session_state.chat_history.append({"role": "assistant", "content": full_response}) | |