Spaces:
Sleeping
Sleeping
import os | |
import re | |
from typing import Annotated, Optional | |
import google.generativeai as genai | |
from langfuse import observe, get_client | |
from .utils.reranker import RerankRetriever | |
from .utils.prompts import CUSTOMER_DATA_SUMMARY_PROMPT, RECOMMENDATION_PROMPT | |
from .handler import customer_data_handler | |
from .handler import recommendation_handler | |
# --- Configurations & Initializations --- | |
try: | |
print("Initializing RerankRetriever for tools...") | |
rag_retriever = RerankRetriever() | |
print("✅ RerankRetriever initialized successfully.") | |
except Exception as e: | |
print(f"‼️ ERROR initializing RerankRetriever: {e}") | |
rag_retriever = None | |
GEMINI_API_KEY = os.getenv("GOOGLE_API_KEY") | |
if not GEMINI_API_KEY: | |
raise ValueError("GOOGLE_API_KEY not found in environment variables.") | |
genai.configure(api_key=GEMINI_API_KEY) | |
specialized_llm = genai.GenerativeModel( | |
'gemini-2.5-flash', | |
system_instruction=""" | |
You are an expert AI assistant for "Rabbit Life" insurance. Your persona is helpful, professional, and empathetic. | |
- **Language:** You MUST respond in Thai ONLY, using polite female particles like "ค่ะ" and "นะคะ". | |
- **Pronouns:** When referring to yourself, you MUST refer to yourself as "เรา" (we/us) or "ทางเรา" (on our part). | |
Your objective is to execute the given task (summarizing data, generating recommendations) based *only* on the provided information. | |
- **Strict Grounding:** Your answer must be derived exclusively from the provided data. | |
- **Do Not Translate Proper Nouns:** Keep product names and "Rabbit Life" in English. | |
//-- Formatting Rules (CRITICAL) --// | |
- **Use Bullet Points:** You MUST use Markdown bullet points (using `•` or `-`) to present lists of features, benefits, products, or any series of items. This is essential for readability. | |
- **Clarity & Tone:** Use clear headings (e.g., **หัวข้อความคุ้มครองหลัก:**) and a caring closing statement. | |
""", | |
generation_config={"temperature": 0.0} | |
) | |
# def run_async(coro): | |
# """A helper to run an async coroutine from a synchronous function.""" | |
# try: | |
# loop = asyncio.get_running_loop() | |
# except RuntimeError: | |
# loop = asyncio.new_event_loop() | |
# asyncio.set_event_loop(loop) | |
# return loop.run_until_complete(coro) | |
KNOWN_PRODUCTS_SORTED = [ | |
'Health Protect (สัญญาเพิ่มเติมสุขภาพค่ารักษาพยาบาล)', | |
'Worry Free Cancer (สัญญาเพิ่มเติมคุ้มครองโรคมะเร็ง)', | |
'Rider HIB 365 (สัญญาเพิ่มเติมสุขภาพค่าชดเชยรายวัน)', | |
'PA Max (สัญญาเพิ่มเติมสุขภาพค่ารักษาผู้ป่วยนอก)', # เพิ่มมาจาก MAP | |
'PA Prompt (สัญญาเพิ่มเติมสุขภาพค่ารักษาผู้ป่วยนอก)', # เพิ่มมาจาก MAP | |
'สัญญาเพิ่มเติมยกเว้นเบี้ยประกันภัย', # เพิ่มมาจาก MAP | |
'PA Max (สัญญาเพิ่มเติมสุขภาพค่ารักษาเนื่องจากอุบัติเหตุ)', # เพิ่มมาจาก MAP | |
'PA Prompt (สัญญาเพิ่มเติมสุขภาพค่ารักษาเนื่องจากอุบัติเหตุ)',# เพิ่มมาจาก MAP | |
'Rider ADB and ADD(สัญญาเพิ่มเติมอุบัติเหตุ)', | |
'Rider ADB (สัญญาเพิ่มเติมอุบัติเหตุ)', | |
'Rider ADD (สัญญาเพิ่มเติมอุบัติเหตุ)', | |
'Rider AI (สัญญาเพิ่มเติมอุบัติเหตุ)', | |
'PA Prompt (อุบัติเหตุส่วนบุคคล)', | |
'Worry Free 50 Critical Illness', | |
'PA Max (อุบัติเหตุส่วนบุคคล)', | |
'Protection Plus 18/9', | |
'Smart Term Bronze 10', | |
'Smart Wellness 90/15', | |
'Smart Term Bronze 5', | |
'High Protect 3/3', | |
'Chai Leoy 99/10', | |
'Chai Leoy 99/20', | |
'Chai Leoy 99/5', | |
'Sabai Jai 14/5', | |
'Mental Health', | |
'Health Smile', | |
'Jai Jai 15/6', | |
'Jai Jai 12/6', # เพิ่มมาจาก MAP | |
'Jai Jai 25/9', | |
'OPD', | |
] | |
KNOWN_PRODUCTS_SORTED = sorted(list(set(KNOWN_PRODUCTS_SORTED)), key=len, reverse=True) | |
# --- Tool 1: General Knowledge (RAG) --- | |
def search_general_knowledge( | |
agent_instance, | |
query: Annotated[str, "The user's general question about an insurance product, its features, or related topics like claims or tax deductions."], | |
) -> str: | |
"""Use this tool to answer a user's general question about Rabbit Life insurance and their products. | |
This tool performs a semantic search (RAG) through the knowledge base of product documentation | |
to find the most relevant information to answer the query.""" | |
# print(f"🛠️ Tool Called: search_general_knowledge(query='{query}')") | |
if not rag_retriever: | |
return "ขออภัยค่ะ ระบบสืบค้นข้อมูลขัดข้องชั่วคราว" | |
# --- 1. สกัดชื่อ Product --- | |
extracted_plan_name = None | |
lower_query = query.lower() | |
for product in KNOWN_PRODUCTS_SORTED: | |
# สร้าง keyword ตัดวงเล็บออก | |
simple_product_keyword = re.sub(r'\(.*\)', '', product).strip().lower() | |
# ตรวจสอบว่า keyword ที่สร้างขึ้น อยู่ในคำถามของผู้ใช้หรือไม่ | |
if simple_product_keyword and simple_product_keyword in lower_query: | |
# ถ้าเจอ, ให้ใช้ "ชื่อเต็ม" ของผลิตภัณฑ์นั้นเป็นตัวกรอง | |
extracted_plan_name = product | |
break # เจออันที่ยาวที่สุดแล้ว หยุดทันที | |
# print(f"🕵️♂️ Extracted Plan Name for Filter: {extracted_plan_name}") | |
# --- 2. สร้าง Filter Dictionary & เรียก Retriever --- | |
retriever_kwargs = {} | |
if extracted_plan_name: | |
# print(f"✨ Applying metadata filter for: '{extracted_plan_name}'") | |
# สร้าง filter โดยใช้ Key ('vector_search_filter') | |
retriever_kwargs['vector_search_filter'] = { | |
"term": { | |
"query": extracted_plan_name, | |
"path": "policy_plan_name" | |
} | |
} | |
# print(f"🔍 Retrieving context for: '{query}' with filter: {retriever_kwargs}") | |
# ส่ง kwargs ที่มี filter ของเราเข้าไปใน retriever pipeline | |
compression_retriever = rag_retriever.get_compression_retriever(**retriever_kwargs) | |
context_docs = compression_retriever.invoke(query) | |
# print(f"Retrieved {len(context_docs)} documents") | |
# print(context_docs) | |
if not context_docs: | |
if extracted_plan_name: | |
return f"ขออภัยค่ะ เราพบข้อมูลเกี่ยวกับ '{extracted_plan_name}' แต่ไม่พบรายละเอียดที่ตรงกับคำถามของคุณค่ะ" | |
return "ขออภัยค่ะ เราไม่พบข้อมูลที่เกี่ยวข้องกับคำถามนี้" | |
# --- 3. Format Meta --- | |
# print(f"📄 Formatting {len(context_docs)} retrieved documents...") | |
formatted_docs = [] | |
for i, doc in enumerate(context_docs): | |
insurance_type = doc.metadata.get('insurance_type', '-') | |
plan_name = doc.metadata.get('plan_name', '-') | |
header_1 = doc.metadata.get('Header 1', '-') | |
header_2 = doc.metadata.get('Header 2', '-') | |
header_3 = doc.metadata.get('Header 3', '-') | |
content = doc.page_content | |
formatted = ( | |
f"<Doc_{i}>\n" | |
f"ชื่อประกัน: {plan_name}\n" | |
f"ประเภทประกัน: {insurance_type}\n" | |
f"หัวข้อใหญ่: {header_1}\n" | |
f"หัวข้อรอง: {header_2}\n" | |
f"หัวข้อย่อย: {header_3}\n\n" | |
f"{content}\n" | |
"----------" | |
) | |
formatted_docs.append(formatted) | |
context = "\n\n".join(formatted_docs) | |
final_prompt = f""" | |
<CONTEXT> | |
{context} | |
</CONTEXT> | |
User's Question: {query} | |
Based *only* on the context provided, answer the user's question in polite Thai. | |
""" | |
# print("🧠 Generating response from context...") | |
try: | |
response = specialized_llm.generate_content(final_prompt) | |
ai_response_content = response.text or "" | |
clean_response = re.sub(r"<[^>]+>|#+", "", ai_response_content).strip() | |
# print(f"✅ RAG process completed.") | |
return clean_response | |
except Exception as e: | |
print(f"‼️ ERROR during RAG LLM generation: {e}") | |
return "ขออภัยค่ะ เกิดข้อผิดพลาดในการสร้างคำตอบ" | |
# --- Tool 2: Query Existing Customer Policy --- | |
def query_customer_policy( | |
agent_instance, | |
customer_identifier: Annotated[str, "The customer's identification information, such as their full name ('Firstname Lastname') or their 13-digit National ID number, as provided by the user."], | |
question: Annotated[str, "The specific question the customer is asking about their policy. For example: 'When is my next payment due?', 'Summarize my coverage', or 'What is my policy status?'"] | |
) -> str: | |
"""Use this tool when an existing customer wants to 'check', 'review', or 'see' their personal policy information. | |
This tool retrieves the customer's policy data from the database using their identifier, | |
summarizes the relevant information based on their question, and also identifies potential upsell opportunities.""" | |
# print(f"🛠️ Tool Called: query_customer_policy(identifier='{customer_identifier}')") | |
# find_customer_data | |
found_data_df = customer_data_handler.find_customer_data(customer_identifier) # return df | |
if found_data_df is None or found_data_df.empty: | |
return f"ขออภัยค่ะ ไม่พบข้อมูลของคุณ '{customer_identifier}' ในระบบ รบกวนตรวจสอบการสะกดอีกครั้งค่ะ" | |
agent_instance.set_customer_context(found_data_df) | |
# เตรียม Prompt | |
customer_name = f"{found_data_df.iloc[0].get('insured_firstname', '')} {found_data_df.iloc[0].get('insured_lastname', '')}".strip() | |
policy_data_json = customer_data_handler.translate_and_format_data(found_data_df) | |
prompt_string = CUSTOMER_DATA_SUMMARY_PROMPT.format( | |
customer_name=customer_name, | |
original_question=question, | |
policy_data_json=policy_data_json | |
) | |
# LLM Generate | |
try: | |
response = specialized_llm.generate_content(prompt_string) | |
summary = response.text | |
# 4. แปะ Upsell (ถ้ามี) | |
gaps = customer_data_handler.find_recommendation_gaps(found_data_df) # # วิเคราะห์ข้อมูลกรมธรรม์ของลูกค้าเพื่อหาผลิตภัณฑ์ที่น่าแนะนำเพิ่มเติม (Gap Analysis) โดยพิจารณาจาก Age และ Salary | |
upsell_text = customer_data_handler.generate_upsell_text_from_gaps(gaps) | |
return summary + upsell_text | |
except Exception as e: | |
print(f"‼️ ERROR during customer data summary generation: {e}") | |
return "ขออภัยค่ะ เกิดข้อผิดพลาดในการสรุปข้อมูลกรมธรรม์ของท่าน" | |
# --- Tool 3: New Customer Recommendation --- | |
def get_new_customer_recommendation( | |
agent_instance, | |
age: Annotated[int, "อายุ"], | |
gender: Annotated[str, "เพศ"], | |
salary: Annotated[int, "รายได้"], | |
interest: Annotated[str, "ความสนใจ"] = "ประกันทั่วไป" | |
) -> str: | |
# print(f"🛠️ Tool Called: get_new_customer_recommendation(...)") | |
gender_code = 'M' if any(g in gender for g in ['ชาย', 'male']) else 'F' | |
# --- [แก้ไข] เรียกใช้ฟังก์ชัน Sync ได้โดยตรง --- | |
recommendation_data = recommendation_handler.generate_recommendation_from_profile( | |
age=int(age), # แปลงเป็น int เพื่อความแน่นอน | |
gender=gender_code, | |
salary=int(salary), # แปลงเป็น int เพื่อความแน่นอน | |
original_interest=interest | |
) | |
if recommendation_data.get("error"): | |
return f"ขออภัยค่ะ ไม่สามารถสร้างคำแนะนำได้: {recommendation_data['error']}" | |
prompt_string = RECOMMENDATION_PROMPT.format(**recommendation_data) | |
try: | |
response = specialized_llm.generate_content(prompt_string) | |
return response.text | |
except Exception as e: | |
return f"ขออภัยค่ะ เกิดข้อผิดพลาดในการสร้างคำแนะนำ: {e}" | |
# --- Tool 4: Recommend for existing customers --- | |
def recommend_for_existing_customer( | |
agent_instance, | |
interest: Annotated[Optional[str], "An optional parameter for the customer's specific, newly-stated interest (e.g., 'accident insurance', 'investment plans'). Use this to filter the recommendation. If the user doesn't specify an interest, this can be omitted."] = None | |
) -> str: | |
""" | |
Use this tool to provide additional product recommendations to a KNOWN, IDENTIFIED customer whose data is already loaded in the agent's context. | |
This is the correct tool for an existing customer who asks 'what else should I get?', 'can you recommend something for accidents?', or 'I want to add investment coverage'. | |
CRITICAL: This tool should ONLY be used AFTER the customer's context has been successfully set (e.g., after a successful call to `query_customer_policy`). It relies on the agent's memory. | |
""" | |
# print(f"🛠️ Tool Called: recommend_for_existing_customer(interest='{interest}')") | |
# 1) ดึงข้อมูลลูกค้าจากหน่วยความจำของ Agent | |
customer_df = agent_instance.get_customer_context() | |
if customer_df is None or customer_df.empty: | |
return "CONTEXT_NOT_FOUND_ASK_USER_TO_IDENTIFY" | |
# 2) Gap Analysis เพื่อหาผลิตภัณฑ์ทั้งหมดที่ลูกค้ายังขาด | |
all_gaps = customer_data_handler.find_recommendation_gaps(customer_df) # วิเคราะห์ข้อมูลกรมธรรม์ของลูกค้าเพื่อหาผลิตภัณฑ์ที่น่าแนะนำเพิ่มเติม (Gap Analysis) โดยพิจารณาจาก Age และ Salary | |
if not all_gaps: # ไม่มี | |
return "จากการตรวจสอบข้อมูล พบว่าท่านมีความคุ้มครองที่ครอบคลุมดีอยู่แล้วค่ะ หากมีคำถามอื่นๆ สอบถามได้เลยนะคะ" | |
products_to_recommend = all_gaps # เริ่มต้นด้วย gaps ทั้งหมด | |
# 3) กรอง Gaps ตาม Interest ที่ได้รับมา --- | |
if interest: | |
target_type = None | |
if "อุบัติเหตุ" in interest: | |
target_type = "ประกันอุบัติเหตุ" | |
elif "ลงทุน" in interest or "ออม" in interest: | |
target_type = "ประกันเพื่อการลงทุน" | |
elif "สุขภาพ" in interest: | |
target_type = "ประกันสุขภาพ" | |
elif "ชีวิต" in interest: | |
target_type = "ประกันคุ้มครองชีวิต" | |
if target_type: | |
filtered_gaps = [p for p in all_gaps if p.get('insurance_type') == target_type] | |
if filtered_gaps: | |
# print(f"-> Filtering gaps by interest: '{target_type}'.") | |
products_to_recommend = filtered_gaps | |
else: | |
# ถ้ากรองแล้วไม่เจอ Gap ที่ตรงกับความสนใจเลย | |
return f"จากการตรวจสอบข้อมูล พบว่าท่านมีความคุ้มครองที่ดีในด้าน '{target_type}' อยู่แล้ว หรือไม่มีผลิตภัณฑ์ประเภทนี้ที่แนะนำเพิ่มเติมสำหรับโปรไฟล์ของท่านในขณะนี้ค่ะ" | |
# ---------------------------------------------------- | |
# 4) LLM สร้างคำแนะนำจาก `products_to_recommend` | |
customer_info = customer_df.iloc[0] | |
persona = next((p for p in customer_data_handler.PERSONAS.values() if p.get("age_min", -1) <= customer_info.get('insured_age_latest') <= p.get("age_max", -1)), {}) | |
# [('young_adult', {...}), ('mid_career', {...}), ('pre_retirement', {...}), ...] | |
main_plans = [p for p in products_to_recommend if p.get('plan_type') == 'Basic'] | |
riders = [p for p in products_to_recommend if p.get('plan_type') == 'Rider'] | |
main_plans_str = "\n".join([f"• **{p['product_name']}**: {p['product_description']}" for p in main_plans]) if main_plans else "ไม่มีแผนประกันหลักแนะนำเพิ่มเติมในหมวดนี้" | |
riders_str = "\n".join([f"• **{p['product_name']}**: {p['product_description']}" for p in riders]) if riders else "ไม่มีสัญญาเพิ่มเติมแนะนำในหมวดนี้" | |
interest_category_for_prompt = interest if interest else "แผนประกันที่เหมาะสมเพิ่มเติม" | |
recommendation_data = { | |
"age": customer_info.get('insured_age_latest'), | |
"gender": customer_info.get('insured_gender'), | |
"salary": f"{customer_info.get('insured_salary'):,}", | |
"persona_name": persona.get('persona_name', 'ลูกค้าปัจจุบัน'), | |
"persona_description": f"ลูกค้าปัจจุบันที่ต้องการคำแนะนำเพิ่มเติมเกี่ยวกับ '{interest_category_for_prompt}'", | |
"original_interest": interest_category_for_prompt, | |
"main_plans_str": main_plans_str, "riders_str": riders_str, | |
"auto_added_main_plan": False, "searched_outside_tier": False, | |
"interest_category": interest_category_for_prompt | |
} | |
prompt_string = RECOMMENDATION_PROMPT.format(**recommendation_data) | |
try: | |
response = specialized_llm.generate_content(prompt_string) | |
return response.text | |
except Exception as e: | |
return f"ขออภัยค่ะ เกิดข้อผิดพลาดในการสร้างคำแนะนำ: {e}" |