Spaces:
Sleeping
Sleeping
import spaces | |
import os, copy, gc, re, sys | |
import traceback | |
import torch | |
import torch.nn.functional as F | |
from datetime import datetime | |
import gradio as gr | |
from huggingface_hub import hf_hub_download | |
# Force CPU mode as requested | |
use_cuda = False | |
device = torch.device("cpu") | |
print(f"Using device: {device} (forced CPU mode)") | |
# Set RWKV environment variables for CPU | |
os.environ["RWKV_V7_ON"] = '1' | |
os.environ["RWKV_JIT_ON"] = '1' | |
os.environ["RWKV_CUDA_ON"] = '0' | |
# Model parameters | |
ctx_limit = 4000 | |
gen_limit = 32000 | |
title_v6 = "rwkv7-g1-0.1b-20250307-ctx4096" | |
# Load RWKV with fallback mechanisms | |
try: | |
# First try importing normally | |
from rwkv.model import RWKV | |
from rwkv.utils import PIPELINE, PIPELINE_ARGS | |
print("RWKV imported successfully") | |
except Exception as e: | |
print(f"Error importing RWKV: {e}") | |
print("Attempting fallback import method...") | |
# Fallback method - reinstall the package | |
try: | |
import subprocess | |
subprocess.check_call([sys.executable, "-m", "pip", "install", "--force-reinstall", "rwkv"]) | |
from rwkv.model import RWKV | |
from rwkv.utils import PIPELINE, PIPELINE_ARGS | |
print("RWKV imported after reinstall") | |
except Exception as e: | |
print(f"Failed to import RWKV after reinstall: {e}") | |
raise | |
# Download and initialize the model | |
try: | |
print(f"Downloading model {title_v6}...") | |
model_path_v6 = hf_hub_download(repo_id="BlinkDL/rwkv7-g1", filename=f"{title_v6}.pth") | |
print(f"Model downloaded to {model_path_v6}") | |
# Use CPU strategy | |
strategy = 'cpu fp32' | |
print(f"Using strategy: {strategy}") | |
# Initialize model with CPU strategy | |
model_v6 = RWKV(model=model_path_v6.replace('.pth',''), strategy=strategy) | |
pipeline_v6 = PIPELINE(model_v6, "rwkv_vocab_v20230424") | |
args = model_v6.args | |
print("Model initialized successfully") | |
model_loaded = True | |
except Exception as e: | |
print(f"Error loading model: {e}") | |
traceback.print_exc() | |
model_loaded = False | |
# Text generation parameters | |
penalty_decay = 0.996 | |
def generate_prompt(instruction, input=""): | |
instruction = instruction.strip().replace('\r\n','\n').replace('\n\n','\n') | |
input = input.strip().replace('\r\n','\n').replace('\n\n','\n') | |
if input: | |
return f"""Instruction: {instruction}\n\nInput: {input}\n\nResponse:""" | |
else: | |
return f"""User: {instruction}\n\nAssistant:""" | |
def qa_prompt(instruction): | |
instruction = instruction.strip().replace('\r\n','\n') | |
instruction = re.sub(r'\n+', '\n', instruction) | |
return f"User: {instruction}\n\nAssistant:""" | |
def evaluate( | |
ctx, | |
token_count=200, | |
temperature=1.0, | |
top_p=0.7, | |
presencePenalty = 0.1, | |
countPenalty = 0.1, | |
): | |
if not model_loaded: | |
yield "Error: Model failed to load. Please check logs for details." | |
return | |
try: | |
args = PIPELINE_ARGS(temperature = max(0.2, float(temperature)), top_p = float(top_p), | |
alpha_frequency = countPenalty, | |
alpha_presence = presencePenalty, | |
token_ban = [], # ban the generation of some tokens | |
token_stop = [0]) # stop generation whenever you see any token here | |
ctx = ctx.strip() | |
all_tokens = [] | |
out_last = 0 | |
out_str = '' | |
occurrence = {} | |
state = None | |
for i in range(int(token_count)): | |
input_ids = pipeline_v6.encode(ctx)[-ctx_limit:] if i == 0 else [token] | |
out, state = model_v6.forward(input_ids, state) | |
for n in occurrence: | |
out[n] -= (args.alpha_presence + occurrence[n] * args.alpha_frequency) | |
token = pipeline_v6.sample_logits(out, temperature=args.temperature, top_p=args.top_p) | |
if token in args.token_stop: | |
break | |
all_tokens += [token] | |
for xxx in occurrence: | |
occurrence[xxx] *= penalty_decay | |
ttt = pipeline_v6.decode([token]) | |
www = 1 | |
if ttt in ' \t0123456789': | |
www = 0 | |
if token not in occurrence: | |
occurrence[token] = www | |
else: | |
occurrence[token] += www | |
tmp = pipeline_v6.decode(all_tokens[out_last:]) | |
if '\ufffd' not in tmp: | |
out_str += tmp | |
yield out_str.strip() | |
out_last = i + 1 | |
# Clean up to free memory | |
del out | |
del state | |
gc.collect() | |
yield out_str.strip() | |
except Exception as e: | |
print(f"Error during generation: {e}") | |
traceback.print_exc() | |
yield f"Error during generation: {str(e)}" | |
# Example prompts | |
examples = [ | |
["User: simulate SpaceX mars landing using python\n\nAssistant: <think", gen_limit, 1, 0.3, 0.5, 0.5], | |
[generate_prompt("Please give the pros and cons of hodl versus active trading."), gen_limit, 1, 0.3, 0.5, 0.5], | |
["Assistant: How can we craft an engaging story featuring vampires on Mars? Let's think step by step and provide an expert response:", gen_limit, 1, 0.3, 0.5, 0.5], | |
["Assistant: How can we persuade Elon Musk to follow you on Twitter? Let's think step by step and provide an expert response:", gen_limit, 1, 0.3, 0.5, 0.5], | |
[generate_prompt("東京で訪れるべき素晴らしい場所とその紹介をいくつか挙げてください。"), gen_limit, 1, 0.3, 0.5, 0.5], | |
[generate_prompt("Write a story using the following information.", "A man named Alex chops a tree down."), gen_limit, 1, 0.3, 0.5, 0.5], | |
["A few light taps upon the pane made her turn to the window. It had begun to snow again.", gen_limit, 1, 0.3, 0.5, 0.5], | |
['''Edward: I am Edward Elric from Fullmetal Alchemist.\n\nUser: Hello Edward. What have you been up to recently?\n\nEdward:''', gen_limit, 1, 0.3, 0.5, 0.5], | |
[generate_prompt("Write a simple webpage. When a user clicks the button, it shows a random joke from a list of 4 jokes."), gen_limit, 1, 0.3, 0.5, 0.5], | |
["En una pequeña aldea escondida entre las montañas de Andalucía, donde las calles aún conservaban el eco de antiguas leyendas, vivía un joven llamado Alejandro.", gen_limit, 1, 0.3, 0.5, 0.5], | |
["Dans le cœur battant de Paris, sous le ciel teinté d'un crépuscule d'or et de pourpre, se tenait une petite librairie oubliée par le temps.", gen_limit, 1, 0.3, 0.5, 0.5], | |
["في تطور مذهل وغير مسبوق، أعلنت السلطات المحلية في العاصمة عن اكتشاف أثري قد يغير مجرى التاريخ كما نعرفه.", gen_limit, 1, 0.3, 0.5, 0.5], | |
['''"当然可以,大宇宙不会因为这五公斤就不坍缩了。"关一帆说,他还有一个没说出来的想法:也许大宇宙真的会因为相差一个原子的质量而由封闭转为开放。大自然的精巧有时超出想象,比如生命的诞生,就需要各项宇宙参数在几亿亿分之一精度上的精确配合。但程心仍然可以留下她的生态球,因为在那无数文明创造的无数小宇宙中,肯定有相当一部分不响应回归运动的号召,所以,大宇宙最终被夺走的质量至少有几亿吨,甚至可能是几亿亿亿吨。\n但愿大宇宙能够忽略这个误差。\n程心和关一帆进入了飞船,智子最后也进来了。她早就不再穿那身华丽的和服了,她现在身着迷彩服,再次成为一名轻捷精悍的战士,她的身上佩带着许多武器和生存装备,最引人注目的是那把插在背后的武士刀。\n"放心,我在,你们就在!"智子对两位人类朋友说。\n聚变发动机启动了,推进器发出幽幽的蓝光,''', gen_limit, 1, 0.3, 0.5, 0.5], | |
] | |
################################################################################################################## | |
# Create Gradio UI | |
with gr.Blocks(title=title_v6) as demo: | |
model_status = "✅ Model loaded successfully" if model_loaded else "❌ Model failed to load" | |
device_status = "Using CPU mode" | |
gr.HTML(f"<div style=\"text-align: center;\">\n<h1>{title_v6}</h1>\n<p>{model_status} - {device_status}</p>\n</div>") | |
with gr.Tab("=== Base Model (Raw Generation) ==="): | |
gr.Markdown(f'This is [RWKV7 G1](https://huggingface.co/BlinkDL/rwkv7-g1) 0.1B (!!!) L12-D768 reasoning base LM - an attention-free pure RNN [RWKV-LM](https://github.com/BlinkDL/RWKV-LM). Supports 100+ world languages and code. Check [400+ Github RWKV projects](https://github.com/search?o=desc&p=1&q=rwkv&s=updated&type=Repositories). *** Can try examples (bottom of page) *** (can edit them). Demo limited to ctxlen {ctx_limit}.') | |
with gr.Row(): | |
with gr.Column(): | |
prompt = gr.Textbox(lines=6, label="Prompt", value="User: simulate SpaceX mars landing using python\n\nAssistant: <think") | |
token_count = gr.Slider(10, gen_limit, label="Max Tokens", step=10, value=gen_limit) | |
temperature = gr.Slider(0.2, 2.0, label="Temperature", step=0.1, value=1.0) | |
top_p = gr.Slider(0.0, 1.0, label="Top P", step=0.05, value=0.3) | |
presence_penalty = gr.Slider(0.0, 1.0, label="Presence Penalty", step=0.1, value=0.5) | |
count_penalty = gr.Slider(0.0, 1.0, label="Count Penalty", step=0.1, value=0.5) | |
with gr.Column(): | |
with gr.Row(): | |
submit = gr.Button("Submit", variant="primary") | |
clear = gr.Button("Clear", variant="secondary") | |
output = gr.Textbox(label="Output", lines=20, max_lines=100) | |
data = gr.Dataset(components=[prompt, token_count, temperature, top_p, presence_penalty, count_penalty], samples=examples, samples_per_page=50, label="Example Instructions", headers=["Prompt", "Max Tokens", "Temperature", "Top P", "Presence Penalty", "Count Penalty"]) | |
submit.click(evaluate, [prompt, token_count, temperature, top_p, presence_penalty, count_penalty], [output]) | |
clear.click(lambda: None, [], [output]) | |
data.click(lambda x: x, [data], [prompt, token_count, temperature, top_p, presence_penalty, count_penalty]) | |
# Launch the app | |
print("Starting Gradio app...") | |
# Fix the queue method call by removing the incorrect parameter | |
demo.queue(max_size=10) | |
demo.launch(share=False) |