zmypl / app.py
Zmypl's picture
Update app.py
2bccf44 verified
raw
history blame
6.65 kB
import os
apikey = os.getenv("for_space")
import gradio as gr
import pandas as pd
import sqlite3
from sklearn.feature_extraction.text import TfidfVectorizer
from sentence_transformers import SentenceTransformer, util
import numpy as np
from openai import OpenAI
DB_NAME = 'files.db'
conn = sqlite3.connect(DB_NAME, check_same_thread=False)
cursor = conn.cursor()
embedded_model = SentenceTransformer("xmanii/maux-gte-persian", trust_remote_code=True)
def calculate_similarity(embedding1, embedding2):
similarity = util.cos_sim(embedding1, embedding2)
return similarity.item()
def store_embedding(file_id, chunk_index, embedding):
print("Storing embedding to database...")
cursor.execute("INSERT INTO embeddings (file_id, chunk_index, embedding) VALUES (?, ?, ?)",
(file_id, chunk_index, embedding))
conn.commit()
def get_embeddings(file_id):
cursor.execute("SELECT chunk_index, embedding FROM embeddings WHERE file_id = ?", (file_id,))
return cursor.fetchall()
def store_embedding(file_id, chunk_index, embedding):
print("Storing embedding to database...")
cursor.execute("INSERT INTO embeddings (file_id, chunk_index, embedding) VALUES (?, ?, ?)",
(file_id, chunk_index, embedding))
conn.commit()
# متد برای آپلود فایل
def upload_file(file_path):
return file_path
# متد برای چانک کردن متن
def chunk_text(text, chunk_size=1000, overlap_size=100):
chunks = []
start = 0
while start < len(text):
end = start + chunk_size
chunks.append(text[start:end])
start += (chunk_size - overlap_size)
return chunks
# متد برای ایجاد دیتابیس و محاسبه‌ی امبدینگ
def create_database(file_path):
file_name = os.path.basename(file_path) # Get the filename using os.path.basename
if os.path.exists(DB_NAME):
print("Creating tables if they don't exist")
cursor.execute('''
CREATE TABLE IF NOT EXISTS files (
id INTEGER PRIMARY KEY AUTOINCREMENT,
filename TEXT NOT NULL,
path TEXT NOT NULL,
chunks TEXT NOT NULL
)
''')
cursor.execute('''
CREATE TABLE IF NOT EXISTS embeddings (
id INTEGER PRIMARY KEY AUTOINCREMENT,
file_id INTEGER NOT NULL,
chunk_index INTEGER NOT NULL,
embedding BLOB NOT NULL,
FOREIGN KEY (file_id) REFERENCES files(id)
)
''')
conn.commit()
text = ""
with open(file_path, 'r', encoding='utf-8') as file:
text = file.read()
chunks = chunk_text(text)
chunks_str = '|'.join(chunks)
# Insert file metadata
cursor.execute("INSERT INTO files (filename, path, chunks) VALUES (?, ?, ?)",
(file_name, file_path, chunks_str))
file_id = cursor.lastrowid # get the id of the inserted file
conn.commit()
print("Calculate and store embeddings")
for index, chunk in enumerate(chunks):
embedding = embedded_model.encode(chunk).tobytes() # convert to bytes for storage
store_embedding(file_id, index, embedding)
print(f"File '{file_name}' uploaded and processed successfully.")
return {"message": f"File '{file_name}' uploaded and processed successfully.",
"file_path": file_path}
# متد برای چت با مدل (در اینجا فقط یک پاسخ ساده می‌دهیم)
def chat_with_model(input_sentence):
if not input_sentence:
return "User input is required."
cursor.execute("SELECT id FROM files ORDER BY id DESC LIMIT 1")
file_id = cursor.fetchone()[0]
embeddings = get_embeddings(file_id)
if not embeddings:
return "No embeddings found for the uploaded file."
input_embedding = embedded_model.encode(input_sentence)
similarities = []
for index, embedding in embeddings:
stored_embedding = np.frombuffer(embedding, dtype=np.float32)
similarity = calculate_similarity(input_embedding, stored_embedding)
similarities.append(similarity)
sorted_indices = sorted(range(len(similarities)), key=lambda i: similarities[i], reverse=True)
cursor.execute("SELECT chunks FROM files WHERE id = ?", (file_id,))
chunks = cursor.fetchone()[0].split('|')
answer_context = "".join(chunks[i] for i in sorted_indices[:5])
system_prompt = (
"فقط طبق متن زیر به سوال کاربر پاسخ بده. "
"اگر جواب داخل متن نبود بگو: 'اطلاعاتی مرتبط با پاسخ شما پیدا نشد! لطفا سوال خود را دقیق تر بپرسید....'\n\n"
"متن:\n" + answer_context +"\n" +
"سوال کاربر" + input_sentence
)
#return system_prompt # Return the system prompt as a response
print("prompt is: " + system_prompt)
print(apikey)
from huggingface_hub import InferenceClient
client = InferenceClient(api_key="YOUR_HF_TOKEN")
messages = [
{ "role": "user", "content": "Tell me a story" }
]
stream = client.chat.completions.create(
model="Qwen/Qwen2.5-72B-Instruct",
messages=messages,
temperature=0.5,
max_tokens=2048,
top_p=0.7,
stream=True
)
for chunk in stream:
print(chunk.choices[0].delta.content)
# رابط کاربری Gradio
with gr.Blocks() as demo:
gr.Markdown("## چت با مدل")
file_input = gr.File(label="فایل متنی را آپلود کنید", type="filepath")
upload_button = gr.Button("آپلود و ایجاد دیتابیس")
# نمایش متن
output_text = gr.Textbox(label="متن آپلود شده")
chat_input = gr.Textbox(label="سوال خود را بپرسید")
chat_button = gr.Button("چت با مدل")
chat_output = gr.Textbox(label="پاسخ مدل")
# متدهای دکمه‌ها
upload_button.click(fn=lambda file_path: (upload_file(file_path), create_database(upload_file(file_path))),
inputs=file_input,
outputs=[output_text])
chat_button.click(fn=chat_with_model,
inputs=chat_input,
outputs=chat_output)
demo.launch()