import gradio as gr import time from datetime import datetime import pandas as pd from sentence_transformers import SentenceTransformer from qdrant_client import QdrantClient from qdrant_client.models import Filter, FieldCondition, MatchValue import os from rapidfuzz import process, fuzz from pythainlp.tokenize import word_tokenize from pyairtable import Table from pyairtable import Api import pickle import re import unicodedata # Setup Qdrant Client qdrant_client = QdrantClient( url=os.environ.get("Qdrant_url"), api_key=os.environ.get("Qdrant_api"), timeout=30.0 ) # Airtable Config AIRTABLE_API_KEY = os.environ.get("airtable_api") BASE_ID = os.environ.get("airtable_baseid") TABLE_NAME = "Feedback_search" api = Api(AIRTABLE_API_KEY) table = api.table(BASE_ID, TABLE_NAME) # Load model model = SentenceTransformer('e5_finetuned') collection_name = "product_E5_finetune" # Load whitelist with open("keyword_whitelist.pkl", "rb") as f: keyword_whitelist = pickle.load(f) # Utils def normalize(text: str) -> str: text = unicodedata.normalize("NFC", text) return text.replace("เแ", "แ").replace("เเ", "แ").strip().lower() def smart_tokenize(text: str) -> list: tokens = word_tokenize(text.strip(), engine="newmm") return tokens if tokens and len("".join(tokens)) >= len(text.strip()) * 0.5 else [text.strip()] def correct_query_merge_phrases(query: str, whitelist, threshold=80, max_ngram=3): query_norm = normalize(query) tokens = smart_tokenize(query_norm) corrected = [] i = 0 while i < len(tokens): matched = False for n in range(min(max_ngram, len(tokens) - i), 0, -1): phrase = "".join(tokens[i:i+n]) match, score, _ = process.extractOne(phrase, whitelist, scorer=fuzz.token_sort_ratio) if score >= threshold: corrected.append(match) i += n matched = True break if not matched: corrected.append(tokens[i]) i += 1 return "".join([word for word in corrected if len(word) > 1 or word in whitelist]) # Global state latest_query_result = {"query": "", "result": "", "raw_query": "", "time": ""} # Main Search def search_product(query): start_time = time.time() latest_query_result["raw_query"] = query corrected_query = correct_query_merge_phrases(query, keyword_whitelist) query_embed = model.encode("query: " + corrected_query) try: result = qdrant_client.query_points( collection_name=collection_name, query=query_embed.tolist(), with_payload=True, query_filter=Filter(must=[FieldCondition(key="type", match=MatchValue(value="product"))]), limit=50 ).points except Exception as e: return f"

❌ Qdrant error: {str(e)}

" elapsed = time.time() - start_time html_output = f"

{elapsed:.2f} วินาที

" if corrected_query != query: html_output += f"

🔧 แก้คำค้นจาก: {query}{corrected_query}

" html_output += '
' result_summary, found = "", False for res in result: if res.score > 0.8: found = True name = res.payload.get("name", "ไม่ทราบชื่อสินค้า") score = f"{res.score:.4f}" img_url = res.payload.get("imageUrl", "") price = res.payload.get("price", "ไม่ระบุ") brand = res.payload.get("brand", "") html_output += f"""
{name}
{brand}
฿{price}
score: {score}
""" result_summary += f"{name} (score: {score}) | " html_output += "
" if not found: html_output += '
❌ ไม่พบสินค้าที่เกี่ยวข้องกับคำค้นนี้
' return html_output latest_query_result.update({ "query": corrected_query, "result": result_summary.strip(), "time": elapsed, }) return html_output # Feedback logging def log_feedback(feedback): try: now = datetime.now().strftime("%Y-%m-%d") table.create({ "model": "E5 (intfloat/multilingual-e5-small)", "timestamp": now, "raw_query": latest_query_result["raw_query"], "query": latest_query_result["query"], "result": latest_query_result["result"], "time(second)": latest_query_result["time"], "feedback": feedback }) return "✅ Feedback saved to Airtable!" except Exception as e: return f"❌ Failed to save feedback: {str(e)}" # Gradio UI with gr.Blocks() as demo: gr.Markdown("## 🔎 Product Semantic Search (Vector Search + Qdrant)") query_input = gr.Textbox(label="พิมพ์คำค้นหา") result_output = gr.HTML(label="📋 ผลลัพธ์") with gr.Row(): match_btn = gr.Button("✅ ตรง") not_match_btn = gr.Button("❌ ไม่ตรง") feedback_status = gr.Textbox(label="📬 สถานะ Feedback") query_input.submit(search_product, inputs=[query_input], outputs=result_output) match_btn.click(lambda: log_feedback("match"), outputs=feedback_status) not_match_btn.click(lambda: log_feedback("not_match"), outputs=feedback_status) # Run demo.launch(share=True)