|
import json |
|
import os |
|
import sqlite3 |
|
import secrets |
|
import numpy as np |
|
from sentence_transformers import SentenceTransformer, util |
|
from pathlib import Path |
|
import gradio as gr |
|
import datetime |
|
from huggingface_hub import InferenceClient |
|
|
|
|
|
hu_api = os.getenv("online") |
|
client = InferenceClient(api_key=hu_api) |
|
|
|
|
|
current_dir = Path(__file__).resolve().parent |
|
|
|
embedded_model_path = f"{current_dir}/EmbeddedModel" |
|
|
|
if not os.path.exists(embedded_model_path) or not os.path.isdir(embedded_model_path): |
|
print("*** Load the model from huggingface ***") |
|
embedded_model = SentenceTransformer("xmanii/maux-gte-persian", trust_remote_code=True) |
|
embedded_model.save(embedded_model_path) |
|
else: |
|
print("*** Load the model locally from the specified path ***") |
|
embedded_model = SentenceTransformer(model_name_or_path=embedded_model_path, trust_remote_code=True) |
|
|
|
|
|
UPLOAD_FOLDER = 'uploads' |
|
os.makedirs(UPLOAD_FOLDER, exist_ok=True) |
|
|
|
|
|
DB_NAME = 'files.db' |
|
conn = sqlite3.connect(DB_NAME, check_same_thread=False) |
|
cursor = conn.cursor() |
|
|
|
|
|
cursor.execute(''' |
|
CREATE TABLE IF NOT EXISTS files ( |
|
id INTEGER PRIMARY KEY AUTOINCREMENT, |
|
filename TEXT NOT NULL, |
|
path TEXT NOT NULL, |
|
chunks TEXT NOT NULL |
|
) |
|
''') |
|
|
|
cursor.execute(''' |
|
CREATE TABLE IF NOT EXISTS embeddings ( |
|
id INTEGER PRIMARY KEY AUTOINCREMENT, |
|
file_id INTEGER NOT NULL, |
|
chunk_index INTEGER NOT NULL, |
|
embedding BLOB NOT NULL, |
|
chunk_text TEXT NOT NULL, |
|
FOREIGN KEY (file_id) REFERENCES files(id) |
|
) |
|
''') |
|
conn.commit() |
|
|
|
def store_embedding(file_id, chunk_index, embedding, chunk_text): |
|
"""Store an embedding and its corresponding chunk text in the database.""" |
|
cursor.execute("INSERT INTO embeddings (file_id, chunk_index, embedding, chunk_text) VALUES (?, ?, ?, ?)", |
|
(file_id, chunk_index, embedding, chunk_text)) |
|
conn.commit() |
|
|
|
def get_embeddings(file_id): |
|
"""Retrieve embeddings and their corresponding chunk texts from the database.""" |
|
cursor.execute("SELECT chunk_index, embedding, chunk_text FROM embeddings") |
|
return cursor.fetchall() |
|
|
|
def generate_verification_code(length=6): |
|
"""Generate a secure random verification code of a given length.""" |
|
return ''.join(secrets.choice('0123456789') for _ in range(length)) |
|
|
|
def read_file(file_path): |
|
"""Read contents of a file.""" |
|
try: |
|
with open(file_path, 'r', encoding='utf-8') as file: |
|
return file.read() |
|
except FileNotFoundError: |
|
return None |
|
except Exception as e: |
|
print(f"Error reading file: {e}") |
|
return None |
|
|
|
def chunk_text(text, chunk_size=1000, overlap_size=100): |
|
"""Split text into chunks for processing.""" |
|
chunks = [] |
|
start = 0 |
|
while start < len(text): |
|
end = start + chunk_size |
|
chunks.append(text[start:end]) |
|
start += (chunk_size - overlap_size) |
|
return chunks |
|
|
|
def calculate_similarity(embedding1, embedding2): |
|
"""Calculate the similarity between two embeddings.""" |
|
similarity = util.cos_sim(embedding1, embedding2) |
|
return similarity.item() |
|
|
|
def save_chat_history(mobile, user_input, bot_response): |
|
now = datetime.datetime.utcnow() |
|
user_folder_now_str = now.strftime("%Y-%m-%d") |
|
"""Save chat history to a JSON file in the user's folder.""" |
|
user_folder = os.path.join('chat_histories',user_folder_now_str , mobile) |
|
os.makedirs(user_folder, exist_ok=True) |
|
history_file = os.path.join(user_folder, 'chat_history.json') |
|
chat_entry_now_str = now.strftime("%Y-%m-%d %H:%M:%S") |
|
|
|
chat_entry = { |
|
"timestamp": chat_entry_now_str, |
|
"user" : mobile, |
|
"question": user_input, |
|
"answer": bot_response |
|
} |
|
|
|
|
|
if os.path.exists(history_file): |
|
with open(history_file, 'r+', encoding='utf-8') as f: |
|
try: |
|
|
|
history = json.load(f) |
|
except json.JSONDecodeError: |
|
|
|
history = [] |
|
|
|
history.append(chat_entry) |
|
|
|
f.seek(0) |
|
|
|
json.dump(history, f, ensure_ascii=False, indent=4) |
|
f.truncate() |
|
else: |
|
|
|
with open(history_file, 'w', encoding='utf-8') as f: |
|
json.dump([chat_entry], f, ensure_ascii=False, indent=4) |
|
|
|
def load_chat_history(mobile): |
|
now = datetime.datetime.utcnow() |
|
user_folder_now_str = now.strftime("%Y-%m-%d") |
|
"""Load chat history from a JSON file if it exists.""" |
|
user_folder = os.path.join('chat_histories', user_folder_now_str, mobile) |
|
history_file = os.path.join(user_folder, 'chat_history.json') |
|
chat_history = [] |
|
|
|
if os.path.exists(history_file): |
|
if os.path.getsize(history_file) > 0: |
|
with open(history_file, 'r', encoding='utf-8') as f: |
|
contents = f.read() |
|
try: |
|
history = json.loads(contents) |
|
|
|
chat_history = [ |
|
{"role": "user", "content": entry["question"]} |
|
for entry in history |
|
] + [ |
|
{"role": "assistant", "content": entry["answer"]} |
|
for entry in history |
|
] |
|
except json.JSONDecodeError as e: |
|
print(f"Error loading JSON: {e}") |
|
|
|
return chat_history |
|
|
|
def upload_file(file): |
|
|
|
if file is None: |
|
return "لطفاً یک فایل متنی بارگذاری کنید." |
|
|
|
|
|
text = "" |
|
try: |
|
with open(file.name, 'r', encoding='utf-8') as f: |
|
content = f.read() |
|
text = content |
|
|
|
except Exception as e: |
|
return f"خطا در خواندن فایل: {str(e)}" |
|
|
|
|
|
|
|
|
|
|
|
if text is None: |
|
return f"Could not read file at path: {file_path}." |
|
chunks = chunk_text(text) |
|
chunks_str = '|'.join(chunks) |
|
|
|
|
|
cursor.execute("INSERT INTO files (filename, path, chunks) VALUES (?, ?, ?)", |
|
(file.name, file.name, chunks_str)) |
|
file_id = cursor.lastrowid |
|
conn.commit() |
|
|
|
for index, chunk in enumerate(chunks): |
|
embedding = embedded_model.encode(chunk).tobytes() |
|
store_embedding(file_id, index, embedding, chunk) |
|
print(f"store_embedding {index} of {len(chunks)}") |
|
|
|
return f"File '{file.name}' uploaded and processed successfully." |
|
|
|
import numpy as np |
|
from gradio_client import Client |
|
|
|
def chat(input_sentence, file_id, mobile): |
|
if not input_sentence: |
|
return "User input is required." |
|
|
|
embeddings = get_embeddings(file_id) |
|
if not embeddings: |
|
return "No embeddings found for the uploaded file." |
|
|
|
|
|
input_embedding = embedded_model.encode(input_sentence) |
|
|
|
|
|
similarities = [] |
|
for index, embedding, chunk_text in embeddings: |
|
stored_embedding = np.frombuffer(embedding, dtype=np.float32) |
|
similarity = calculate_similarity(input_embedding, stored_embedding) |
|
similarities.append((similarity, chunk_text)) |
|
|
|
|
|
sorted_similarities = sorted(similarities, key=lambda x: x[0], reverse=True) |
|
|
|
|
|
top_similarities = sorted_similarities[:5] |
|
|
|
|
|
answer_context = "".join(chunk_text for _, chunk_text in top_similarities) |
|
|
|
|
|
system_prompt = ( |
|
"فقط طبق متن به پاسخ بده و اگر پاسخ کاربر در متن نبود بگو نمی دانم.\n\n" |
|
"متن:\n" + answer_context) |
|
|
|
|
|
chat_history = load_chat_history(mobile) |
|
|
|
|
|
if chat_history: |
|
message = [{"role": "system", "content": system_prompt}] + chat_history + [ |
|
{"role": "user", "content": input_sentence} |
|
] |
|
else: |
|
message = [ |
|
{"role": "system", "content": system_prompt}, |
|
{"role": "user", "content": input_sentence} |
|
] |
|
|
|
collected_response = [] |
|
stream = client.chat.completions.create( |
|
model="Qwen/Qwen2.5-72B-Instruct", |
|
messages=message, |
|
temperature=0.5, |
|
max_tokens=4098, |
|
top_p=0.7, |
|
stream=True |
|
) |
|
for chunk in stream: |
|
if 'content' in chunk.choices[0].delta: |
|
content = chunk.choices[0].delta.content |
|
collected_response.append(content) |
|
|
|
bot_response = ''.join(collected_response) |
|
save_chat_history(mobile, input_sentence, bot_response) |
|
print(f"Bot Response: {bot_response}") |
|
|
|
return bot_response |
|
|
|
def gradio_interface(): |
|
with gr.Blocks() as demo: |
|
gr.Markdown("# پروژه مدل مبتنی بر متن با استفاده از Gradio") |
|
|
|
with gr.Tab("بارگذاری فایل"): |
|
file_input = gr.File(label="فایل متنی خود را اینجا بارگذاری کنید") |
|
upload_button = gr.Button("بارگذاری و پردازش فایل") |
|
output = gr.Textbox(label="پیام خروجی") |
|
|
|
upload_button.click(upload_file, inputs=[file_input], outputs=[output]) |
|
|
|
with gr.Tab("چت"): |
|
mobile_input = gr.Textbox(label="شماره موبایل شما را وارد کنید") |
|
file_id_input = gr.Number(label="شناسه فایل آپلود شده (file_id)") |
|
user_input = gr.Textbox(label="پرسش خود را وارد کنید") |
|
chat_button = gr.Button("ارسال پرسش") |
|
chat_output = gr.Textbox(label="پاسخ ربات") |
|
|
|
chat_button.click(chat, inputs=[user_input, file_id_input, mobile_input], outputs=[chat_output]) |
|
|
|
demo.launch() |
|
|
|
if __name__ == "__main__": |
|
gradio_interface() |