|
import os |
|
apikey = os.getenv("for_space") |
|
|
|
import gradio as gr |
|
import pandas as pd |
|
import sqlite3 |
|
from sklearn.feature_extraction.text import TfidfVectorizer |
|
from sentence_transformers import SentenceTransformer, util |
|
import numpy as np |
|
from openai import OpenAI |
|
|
|
DB_NAME = 'files.db' |
|
conn = sqlite3.connect(DB_NAME, check_same_thread=False) |
|
cursor = conn.cursor() |
|
|
|
embedded_model = SentenceTransformer("xmanii/maux-gte-persian", trust_remote_code=True) |
|
|
|
|
|
def calculate_similarity(embedding1, embedding2): |
|
similarity = util.cos_sim(embedding1, embedding2) |
|
return similarity.item() |
|
|
|
def store_embedding(file_id, chunk_index, embedding): |
|
print("Storing embedding to database...") |
|
cursor.execute("INSERT INTO embeddings (file_id, chunk_index, embedding) VALUES (?, ?, ?)", |
|
(file_id, chunk_index, embedding)) |
|
conn.commit() |
|
|
|
|
|
def get_embeddings(file_id): |
|
cursor.execute("SELECT chunk_index, embedding FROM embeddings WHERE file_id = ?", (file_id,)) |
|
return cursor.fetchall() |
|
|
|
|
|
def store_embedding(file_id, chunk_index, embedding): |
|
print("Storing embedding to database...") |
|
cursor.execute("INSERT INTO embeddings (file_id, chunk_index, embedding) VALUES (?, ?, ?)", |
|
(file_id, chunk_index, embedding)) |
|
conn.commit() |
|
|
|
|
|
def upload_file(file_path): |
|
return file_path |
|
|
|
|
|
def chunk_text(text, chunk_size=1000, overlap_size=100): |
|
chunks = [] |
|
start = 0 |
|
while start < len(text): |
|
end = start + chunk_size |
|
chunks.append(text[start:end]) |
|
start += (chunk_size - overlap_size) |
|
return chunks |
|
|
|
|
|
|
|
def create_database(file_path): |
|
file_name = os.path.basename(file_path) |
|
|
|
if os.path.exists(DB_NAME): |
|
|
|
print("Creating tables if they don't exist") |
|
cursor.execute(''' |
|
CREATE TABLE IF NOT EXISTS files ( |
|
id INTEGER PRIMARY KEY AUTOINCREMENT, |
|
filename TEXT NOT NULL, |
|
path TEXT NOT NULL, |
|
chunks TEXT NOT NULL |
|
) |
|
''') |
|
|
|
cursor.execute(''' |
|
CREATE TABLE IF NOT EXISTS embeddings ( |
|
id INTEGER PRIMARY KEY AUTOINCREMENT, |
|
file_id INTEGER NOT NULL, |
|
chunk_index INTEGER NOT NULL, |
|
embedding BLOB NOT NULL, |
|
FOREIGN KEY (file_id) REFERENCES files(id) |
|
) |
|
''') |
|
conn.commit() |
|
|
|
text = "" |
|
with open(file_path, 'r', encoding='utf-8') as file: |
|
text = file.read() |
|
|
|
|
|
chunks = chunk_text(text) |
|
chunks_str = '|'.join(chunks) |
|
|
|
|
|
cursor.execute("INSERT INTO files (filename, path, chunks) VALUES (?, ?, ?)", |
|
(file_name, file_path, chunks_str)) |
|
file_id = cursor.lastrowid |
|
conn.commit() |
|
|
|
print("Calculate and store embeddings") |
|
for index, chunk in enumerate(chunks): |
|
embedding = embedded_model.encode(chunk).tobytes() |
|
store_embedding(file_id, index, embedding) |
|
|
|
print(f"File '{file_name}' uploaded and processed successfully.") |
|
return {"message": f"File '{file_name}' uploaded and processed successfully.", |
|
"file_path": file_path} |
|
|
|
|
|
def chat_with_model(input_sentence): |
|
if not input_sentence: |
|
return "User input is required." |
|
|
|
cursor.execute("SELECT id FROM files ORDER BY id DESC LIMIT 1") |
|
file_id = cursor.fetchone()[0] |
|
|
|
embeddings = get_embeddings(file_id) |
|
if not embeddings: |
|
return "No embeddings found for the uploaded file." |
|
|
|
input_embedding = embedded_model.encode(input_sentence) |
|
|
|
similarities = [] |
|
for index, embedding in embeddings: |
|
stored_embedding = np.frombuffer(embedding, dtype=np.float32) |
|
similarity = calculate_similarity(input_embedding, stored_embedding) |
|
similarities.append(similarity) |
|
|
|
sorted_indices = sorted(range(len(similarities)), key=lambda i: similarities[i], reverse=True) |
|
|
|
cursor.execute("SELECT chunks FROM files WHERE id = ?", (file_id,)) |
|
chunks = cursor.fetchone()[0].split('|') |
|
|
|
answer_context = "".join(chunks[i] for i in sorted_indices[:5]) |
|
|
|
system_prompt = ( |
|
"فقط طبق متن زیر به سوال کاربر پاسخ بده. " |
|
"اگر جواب داخل متن نبود بگو: 'اطلاعاتی مرتبط با پاسخ شما پیدا نشد! لطفا سوال خود را دقیق تر بپرسید....'\n\n" |
|
"متن:\n" + answer_context +"\n" + |
|
"سوال کاربر" + input_sentence |
|
) |
|
|
|
|
|
print("prompt is: " + system_prompt) |
|
|
|
|
|
print(apikey) |
|
from huggingface_hub import InferenceClient |
|
|
|
client = InferenceClient(api_key="YOUR_HF_TOKEN") |
|
|
|
messages = [ |
|
{ "role": "user", "content": "Tell me a story" } |
|
] |
|
|
|
stream = client.chat.completions.create( |
|
model="Qwen/Qwen2.5-72B-Instruct", |
|
messages=messages, |
|
temperature=0.5, |
|
max_tokens=2048, |
|
top_p=0.7, |
|
stream=True |
|
) |
|
|
|
for chunk in stream: |
|
print(chunk.choices[0].delta.content) |
|
|
|
|
|
with gr.Blocks() as demo: |
|
gr.Markdown("## چت با مدل") |
|
file_input = gr.File(label="فایل متنی را آپلود کنید", type="filepath") |
|
upload_button = gr.Button("آپلود و ایجاد دیتابیس") |
|
|
|
|
|
output_text = gr.Textbox(label="متن آپلود شده") |
|
|
|
|
|
chat_input = gr.Textbox(label="سوال خود را بپرسید") |
|
chat_button = gr.Button("چت با مدل") |
|
chat_output = gr.Textbox(label="پاسخ مدل") |
|
|
|
|
|
upload_button.click(fn=lambda file_path: (upload_file(file_path), create_database(upload_file(file_path))), |
|
inputs=file_input, |
|
outputs=[output_text]) |
|
|
|
chat_button.click(fn=chat_with_model, |
|
inputs=chat_input, |
|
outputs=chat_output) |
|
|
|
demo.launch() |