File size: 3,303 Bytes
fd45282 75ecc06 fd45282 abc9568 f5f891b 4a2e45e fa83b7a 862a827 83ad7e6 3020aee c0ba1b5 75ecc06 4a2e45e 75ecc06 4a2e45e 75ecc06 e5c24a4 75ecc06 c80b246 14c753e 1c0fdf3 6727ea8 1c0fdf3 6727ea8 c80b246 1c0fdf3 c80b246 1c0fdf3 c80b246 1c0fdf3 abc9568 75ecc06 0316dcf ba7f366 0316dcf ba7f366 0316dcf ba7f366 0316dcf ba7f366 7ea3332 ba7f366 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 |
import gradio as gr
from transformers import AutoTokenizer, AutoModelForCausalLM
from peft import PeftModel
import torch
import requests
import json
import shutil, os
offload_folder = "offload/"
# 如果是文件,删掉
if os.path.isfile(offload_folder):
os.remove(offload_folder)
# 如果目录不存在,创建
if not os.path.exists(offload_folder):
os.makedirs(offload_folder)
offload_folder = "offload/"
print(f"路径是否存在: {os.path.exists(offload_folder)}")
print(f"是否是目录: {os.path.isdir(offload_folder)}")
print(f"是否是文件: {os.path.isfile(offload_folder)}")
os.makedirs(offload_folder, exist_ok=True)
model_id = "deepseek-ai/deepseek-coder-1.3b-base"
lora_id = "Seunggg/lora-plant"
tokenizer = AutoTokenizer.from_pretrained(model_id, trust_remote_code=True)
base = AutoModelForCausalLM.from_pretrained(
model_id,
device_map="auto",
offload_folder=offload_folder,
torch_dtype=torch.float16 if torch.cuda.is_available() else torch.float32,
trust_remote_code=True
)
model = PeftModel.from_pretrained(
base,
lora_id,
device_map="auto",
torch_dtype=torch.float16 if torch.cuda.is_available() else torch.float32
)
model.eval()
from transformers import pipeline
pipe = pipeline(
"text-generation",
model=model,
tokenizer=tokenizer,
device_map="auto",
max_new_tokens=256
)
def get_sensor_data():
try:
sensor_response = requests.get("https://arduino-realtime.onrender.com/api/data", timeout=5)
sensor_data = sensor_response.json().get("sensorData", None)
return json.dumps(sensor_data, ensure_ascii=False, indent=2) if sensor_data else "暂无传感器数据"
except Exception as e:
return "⚠️ 获取失败:" + str(e)
def respond(user_input):
sensor_display = get_sensor_data()
if not user_input.strip():
return sensor_display, "请输入植物相关的问题 😊"
prompt = f"用户提问:{user_input}\n"
try:
sensor_response = requests.get("https://arduino-realtime.onrender.com/api/data", timeout=5)
sensor_data = sensor_response.json().get("sensorData", None)
if sensor_data:
prompt += f"当前传感器数据:{json.dumps(sensor_data, ensure_ascii=False)}\n"
prompt += "请用更人性化的语言生成建议,并推荐相关植物文献或资料。\n回答:"
result = pipe(prompt)
full_output = result[0]["generated_text"]
answer = full_output.replace(prompt, "").strip()
except Exception as e:
answer = f"生成建议时出错:{str(e)}"
return sensor_display, answer
def auto_update_sensor():
return gr.Textbox.update(value=get_sensor_data())
with gr.Blocks() as demo:
gr.Markdown("# 🌱 植物助手 - 实时联动版")
sensor_box = gr.Textbox(label="🧪 当前传感器数据", lines=6, interactive=False)
question = gr.Textbox(label="植物问题", lines=4, placeholder="请输入植物相关的问题 😊")
answer_box = gr.Textbox(label="🤖 回答建议", lines=8, interactive=False)
send_btn = gr.Button("发送")
demo.load(fn=get_sensor_data, inputs=None, outputs=sensor_box, every=5)
send_btn.click(fn=respond, inputs=question, outputs=[sensor_box, answer_box])
demo.launch() |