zmypl / app.py
Zmypl's picture
Update app.py
3c570aa verified
raw
history blame
12.1 kB
import json
import os
import sqlite3
import secrets
import numpy as np
from sentence_transformers import SentenceTransformer, util
from pathlib import Path
import gradio as gr
import datetime
from huggingface_hub import InferenceClient
# Initialize the inference client and models
hu_api = os.getenv("online")
client = InferenceClient(api_key=hu_api)
# مسیر اجرای برنامه فعلی را پیدا کنید
current_dir = Path(__file__).resolve().parent
embedded_model_path = f"{current_dir}/EmbeddedModel"
# بررسی وجود پوشه
if not os.path.exists(embedded_model_path) or not os.path.isdir(embedded_model_path):
print("*** Load the model from huggingface ***")
embedded_model = SentenceTransformer("xmanii/maux-gte-persian", trust_remote_code=True)
embedded_model.save(embedded_model_path)
else:
print("*** Load the model locally from the specified path ***")
embedded_model = SentenceTransformer(model_name_or_path=embedded_model_path, trust_remote_code=True)
# Upload folder
UPLOAD_FOLDER = 'uploads'
os.makedirs(UPLOAD_FOLDER, exist_ok=True)
# Database initialization
DB_NAME = 'files.db'
conn = sqlite3.connect(DB_NAME, check_same_thread=False)
cursor = conn.cursor()
# Create tables
cursor.execute('''
CREATE TABLE IF NOT EXISTS files (
id INTEGER PRIMARY KEY AUTOINCREMENT,
filename TEXT NOT NULL,
path TEXT NOT NULL,
chunks TEXT NOT NULL
)
''')
cursor.execute('''
CREATE TABLE IF NOT EXISTS embeddings (
id INTEGER PRIMARY KEY AUTOINCREMENT,
file_id INTEGER NOT NULL,
chunk_index INTEGER NOT NULL,
embedding BLOB NOT NULL,
chunk_text TEXT NOT NULL,
FOREIGN KEY (file_id) REFERENCES files(id)
)
''')
conn.commit()
def store_embedding(file_id, chunk_index, embedding, chunk_text):
"""Store an embedding and its corresponding chunk text in the database."""
cursor.execute("INSERT INTO embeddings (file_id, chunk_index, embedding, chunk_text) VALUES (?, ?, ?, ?)",
(file_id, chunk_index, embedding, chunk_text))
conn.commit()
def get_embeddings(file_id):
"""Retrieve embeddings and their corresponding chunk texts from the database."""
cursor.execute("SELECT chunk_index, embedding, chunk_text FROM embeddings")
return cursor.fetchall()
def generate_verification_code(length=6):
"""Generate a secure random verification code of a given length."""
return ''.join(secrets.choice('0123456789') for _ in range(length))
def read_file(file_path):
"""Read contents of a file."""
try:
with open(file_path, 'r', encoding='utf-8') as file:
return file.read()
except FileNotFoundError:
return None
except Exception as e:
print(f"Error reading file: {e}")
return None
def chunk_text(text, chunk_size=1000, overlap_size=100):
"""Split text into chunks for processing."""
chunks = []
start = 0
while start < len(text):
end = start + chunk_size
chunks.append(text[start:end])
start += (chunk_size - overlap_size)
return chunks
def calculate_similarity(embedding1, embedding2):
"""Calculate the similarity between two embeddings."""
similarity = util.cos_sim(embedding1, embedding2)
return similarity.item()
def save_chat_history(mobile, user_input, bot_response):
now = datetime.datetime.utcnow()
user_folder_now_str = now.strftime("%Y-%m-%d")
"""Save chat history to a JSON file in the user's folder."""
user_folder = os.path.join('chat_histories',user_folder_now_str , mobile)
os.makedirs(user_folder, exist_ok=True) # Create the user's folder if it doesn't exist
history_file = os.path.join(user_folder, 'chat_history.json')
chat_entry_now_str = now.strftime("%Y-%m-%d %H:%M:%S")
# Create a new chat entry
chat_entry = {
"timestamp": chat_entry_now_str,
"user" : mobile,
"question": user_input,
"answer": bot_response
}
# Append the new entry to the history JSON file
if os.path.exists(history_file):
with open(history_file, 'r+', encoding='utf-8') as f:
try:
# Load existing chat history
history = json.load(f)
except json.JSONDecodeError:
# If the file is not a valid JSON (e.g., it's empty), start with an empty list
history = []
# Append the new entry to the history
history.append(chat_entry)
# Move the cursor to the beginning of the file
f.seek(0)
# Write updated data to the file
json.dump(history, f, ensure_ascii=False, indent=4) # Format as JSON with pretty print
f.truncate() # Remove any remaining old data in the file after current position
else:
# If the file doesn't exist, create it and write the first entry
with open(history_file, 'w', encoding='utf-8') as f:
json.dump([chat_entry], f, ensure_ascii=False, indent=4) # Write the first entry as a list
def load_chat_history(mobile):
now = datetime.datetime.utcnow()
user_folder_now_str = now.strftime("%Y-%m-%d")
"""Load chat history from a JSON file if it exists."""
user_folder = os.path.join('chat_histories', user_folder_now_str, mobile)
history_file = os.path.join(user_folder, 'chat_history.json')
chat_history = []
if os.path.exists(history_file):
if os.path.getsize(history_file) > 0: # بررسی اینکه آیا فایل خالی نیست
with open(history_file, 'r', encoding='utf-8') as f:
contents = f.read()
try:
history = json.loads(contents) # بارگذاری JSON
# فرمت‌بندی تاریخچه به لیستی از دیکشنری‌ها
chat_history = [
{"role": "user", "content": entry["question"]}
for entry in history
] + [
{"role": "assistant", "content": entry["answer"]}
for entry in history
]
except json.JSONDecodeError as e:
print(f"Error loading JSON: {e}") # چاپ خطا برای عیب‌یابی
return chat_history # بازگشت به chat_history که در صورت خالی بودن، به صورت لیست خالی برمی‌گردد
def upload_file(file):
# بررسی اینکه آیا فایلی بارگذاری شده است یا نه
if file is None:
return "لطفاً یک فایل متنی بارگذاری کنید."
# خواندن محتوای فایل
text = ""
try:
with open(file.name, 'r', encoding='utf-8') as f:
content = f.read()
text = content # بازگشت محتوای فایل به عنوان خروجی
except Exception as e:
return f"خطا در خواندن فایل: {str(e)}"
#file_path = os.path.join(UPLOAD_FOLDER, file.name)
#file.save(file_path)
#text = read_file(file_path)
if text is None:
return f"Could not read file at path: {file_path}."
chunks = chunk_text(text)
chunks_str = '|'.join(chunks)
# Insert file metadata
cursor.execute("INSERT INTO files (filename, path, chunks) VALUES (?, ?, ?)",
(file.name, file.name, chunks_str))
file_id = cursor.lastrowid # Get the id of the inserted file
conn.commit()
for index, chunk in enumerate(chunks):
embedding = embedded_model.encode(chunk).tobytes() # convert to bytes for storage
store_embedding(file_id, index, embedding, chunk) # Pass the chunk text to store_embedding
print(f"store_embedding {index} of {len(chunks)}")
return f"File '{file.name}' uploaded and processed successfully."
import numpy as np
from gradio_client import Client
def chat(input_sentence, file_id, mobile):
if not input_sentence:
return "User input is required."
embeddings = get_embeddings(file_id)
if not embeddings:
return "No embeddings found for the uploaded file."
# Encode the user input for comparison
input_embedding = embedded_model.encode(input_sentence)
# محاسبه شباهت‌ها با استفاده از امبدینگ‌های ذخیره‌شده
similarities = []
for index, embedding, chunk_text in embeddings: # شامل chunk_text در حلقه
stored_embedding = np.frombuffer(embedding, dtype=np.float32) # تبدیل بایت‌ها به آرایه numpy
similarity = calculate_similarity(input_embedding, stored_embedding) # محاسبه شباهت
similarities.append((similarity, chunk_text)) # ذخیره شباهت و متن chunk_text
# مرتب‌سازی شباهت‌ها به ترتیب نزولی
sorted_similarities = sorted(similarities, key=lambda x: x[0], reverse=True)
# انتخاب 5 شباهت برتر
top_similarities = sorted_similarities[:5]
# ایجاد پاسخ با استفاده از متون chunk_text برتر
answer_context = "".join(chunk_text for _, chunk_text in top_similarities)
# System prompt used for chat completion
system_prompt = (
"فقط طبق متن به پاسخ بده و اگر پاسخ کاربر در متن نبود بگو نمی دانم.\n\n"
"متن:\n" + answer_context)
# Load chat history to append to the messages, if exists
chat_history = load_chat_history(mobile)
# Prepare message list with chat history or fallback without it
if chat_history:
message = [{"role": "system", "content": system_prompt}] + chat_history + [
{"role": "user", "content": input_sentence}
]
else:
message = [
{"role": "system", "content": system_prompt},
{"role": "user", "content": input_sentence}
]
collected_response = []
stream = client.chat.completions.create(
model="Qwen/Qwen2.5-72B-Instruct",
messages=message,
temperature=0.5,
max_tokens=4098,
top_p=0.7,
stream=True
)
for chunk in stream:
if 'content' in chunk.choices[0].delta:
content = chunk.choices[0].delta.content
collected_response.append(content)
bot_response = ''.join(collected_response)
save_chat_history(mobile, input_sentence, bot_response)
print(f"Bot Response: {bot_response}")
return bot_response # Return the complete response as a string
def gradio_interface():
with gr.Blocks() as demo:
gr.Markdown("# پروژه مدل مبتنی بر متن با استفاده از Gradio")
with gr.Tab("بارگذاری فایل"):
file_input = gr.File(label="فایل متنی خود را اینجا بارگذاری کنید")
upload_button = gr.Button("بارگذاری و پردازش فایل")
output = gr.Textbox(label="پیام خروجی")
upload_button.click(upload_file, inputs=[file_input], outputs=[output])
with gr.Tab("چت"):
mobile_input = gr.Textbox(label="شماره موبایل شما را وارد کنید")
file_id_input = gr.Number(label="شناسه فایل آپلود شده (file_id)")
user_input = gr.Textbox(label="پرسش خود را وارد کنید")
chat_button = gr.Button("ارسال پرسش")
chat_output = gr.Textbox(label="پاسخ ربات")
chat_button.click(chat, inputs=[user_input, file_id_input, mobile_input], outputs=[chat_output])
demo.launch()
if __name__ == "__main__":
gradio_interface()