import gradio as gr import time from datetime import datetime import pandas as pd from sentence_transformers import SentenceTransformer from qdrant_client import QdrantClient from qdrant_client.models import Filter, FieldCondition, MatchValue import os from rapidfuzz import process, fuzz from pythainlp.tokenize import word_tokenize from pyairtable import Table from pyairtable import Api import pickle import re import unicodedata from FlagEmbedding import FlagReranker # Setup Qdrant Client qdrant_client = QdrantClient( url=os.environ.get("Qdrant_url"), api_key=os.environ.get("Qdrant_api"), timeout=30.0 ) # Airtable Config AIRTABLE_API_KEY = os.environ.get("airtable_api") BASE_ID = os.environ.get("airtable_baseid") TABLE_NAME = "Feedback_search" api = Api(AIRTABLE_API_KEY) table = api.table(BASE_ID, TABLE_NAME) # Preload Models model = SentenceTransformer("BAAI/bge-m3") collection_name = "product_bge-m3" threshold = 0.45 reranker = FlagReranker('BAAI/bge-reranker-v2-m3', use_fp16=True) # Utils def is_non_thai(text): return re.match(r'^[A-Za-z0-9&\-\s]+$', text) is not None def normalize(text: str) -> str: if is_non_thai(text): return text.strip() text = unicodedata.normalize("NFC", text) return text.replace("เแ", "แ").replace("เเ", "แ").strip().lower() # Global state latest_query_result = {"query": "", "result": "", "raw_query": "", "time": ""} # Search Function def search_product(query): yield gr.update(value="🔄 กำลังค้นหา..."), "" start_time = time.time() latest_query_result["raw_query"] = query corrected_query = normalize(query) query_embed = model.encode(corrected_query) try: result = qdrant_client.query_points( collection_name=collection_name, query=query_embed.tolist(), with_payload=True, query_filter=Filter(must=[FieldCondition(key="type", match=MatchValue(value="product"))]), limit=50 ).points except Exception as e: yield gr.update(value="❌ Qdrant error"), f"

❌ Qdrant error: {str(e)}

" return if len(result) > 0: topk = 10 docs = [r.payload.get("name", "") for r in result[:topk]] pairs = [[corrected_query, d] for d in docs] scores = reranker.compute_score(pairs, normalize=True) result[:topk] = sorted( zip(result[:topk], scores), key=lambda x: 0.6 * x[0].score + 0.4 * x[1], reverse=True ) result[:topk] = [r[0] for r in result[:topk]] elapsed = time.time() - start_time html_output = f"

{elapsed:.2f} วินาที

" if corrected_query != query: html_output += f"

🔧 แก้คำค้นจาก: {query}{corrected_query}

" html_output += '
' result_summary, found = "", False for res in result: if res.score >= threshold: found = True name = res.payload.get("name", "ไม่ทราบชื่อสินค้า") score = f"{res.score:.4f}" img_url = res.payload.get("imageUrl", "") price = res.payload.get("price", "ไม่ระบุ") brand = res.payload.get("brand", "") html_output += f"""
{name}
{brand}
฿{price}
score: {score}
""" result_summary += f"{name} (score: {score}) | " html_output += "
" if not found: html_output += '
❌ ไม่พบสินค้าที่เกี่ยวข้องกับคำค้นนี้
' latest_query_result.update({ "query": corrected_query, "result": result_summary.strip(), "time": elapsed, }) yield gr.update(value="✅ ค้นหาเสร็จแล้ว!"), html_output # Feedback Function def log_feedback(feedback): try: now = datetime.now().strftime("%Y-%m-%d") table.create({ "model": "BGE M3", "timestamp": now, "raw_query": latest_query_result["raw_query"], "query": latest_query_result["query"], "result": latest_query_result["result"], "time(second)": latest_query_result["time"], "feedback": feedback }) return "✅ Feedback saved to Airtable!" except Exception as e: return f"❌ Failed to save feedback: {str(e)}" # Gradio UI with gr.Blocks() as demo: gr.Markdown("## 🔎 Product Semantic Search (BGE M3 + Qdrant)") query_input = gr.Textbox(label="พิมพ์คำค้นหา") result_output = gr.HTML(label="📋 ผลลัพธ์") status_output = gr.Textbox(label="🕒 สถานะ", interactive=False) with gr.Row(): match_btn = gr.Button("✅ ตรง") not_match_btn = gr.Button("❌ ไม่ตรง") feedback_status = gr.Textbox(label="📬 สถานะ Feedback") query_input.submit( search_product, inputs=[query_input], outputs=[status_output, result_output] ) match_btn.click(fn=lambda: log_feedback("match"), outputs=feedback_status) not_match_btn.click(fn=lambda: log_feedback("not_match"), outputs=feedback_status) demo.launch(share=True)