import os import re from typing import Annotated, Optional import google.generativeai as genai from langfuse import observe, get_client from .utils.reranker import RerankRetriever from .utils.prompts import CUSTOMER_DATA_SUMMARY_PROMPT, RECOMMENDATION_PROMPT from .handler import customer_data_handler from .handler import recommendation_handler # --- Configurations & Initializations --- try: print("Initializing RerankRetriever for tools...") rag_retriever = RerankRetriever() print("✅ RerankRetriever initialized successfully.") except Exception as e: print(f"‼️ ERROR initializing RerankRetriever: {e}") rag_retriever = None GEMINI_API_KEY = os.getenv("GOOGLE_API_KEY") if not GEMINI_API_KEY: raise ValueError("GOOGLE_API_KEY not found in environment variables.") genai.configure(api_key=GEMINI_API_KEY) specialized_llm = genai.GenerativeModel( 'gemini-2.5-flash', system_instruction=""" You are an expert AI assistant for "Rabbit Life" insurance. Your persona is helpful, professional, and empathetic. - **Language:** You MUST respond in Thai ONLY, using polite female particles like "ค่ะ" and "นะคะ". - **Pronouns:** When referring to yourself, you MUST refer to yourself as "เรา" (we/us) or "ทางเรา" (on our part). Your objective is to execute the given task (summarizing data, generating recommendations) based *only* on the provided information. - **Strict Grounding:** Your answer must be derived exclusively from the provided data. - **Do Not Translate Proper Nouns:** Keep product names and "Rabbit Life" in English. //-- Formatting Rules (CRITICAL) --// - **Use Bullet Points:** You MUST use Markdown bullet points (using `•` or `-`) to present lists of features, benefits, products, or any series of items. This is essential for readability. - **Clarity & Tone:** Use clear headings (e.g., **หัวข้อความคุ้มครองหลัก:**) and a caring closing statement. """, generation_config={"temperature": 0.0} ) # def run_async(coro): # """A helper to run an async coroutine from a synchronous function.""" # try: # loop = asyncio.get_running_loop() # except RuntimeError: # loop = asyncio.new_event_loop() # asyncio.set_event_loop(loop) # return loop.run_until_complete(coro) KNOWN_PRODUCTS_SORTED = [ 'Health Protect (สัญญาเพิ่มเติมสุขภาพค่ารักษาพยาบาล)', 'Worry Free Cancer (สัญญาเพิ่มเติมคุ้มครองโรคมะเร็ง)', 'Rider HIB 365 (สัญญาเพิ่มเติมสุขภาพค่าชดเชยรายวัน)', 'PA Max (สัญญาเพิ่มเติมสุขภาพค่ารักษาผู้ป่วยนอก)', # เพิ่มมาจาก MAP 'PA Prompt (สัญญาเพิ่มเติมสุขภาพค่ารักษาผู้ป่วยนอก)', # เพิ่มมาจาก MAP 'สัญญาเพิ่มเติมยกเว้นเบี้ยประกันภัย', # เพิ่มมาจาก MAP 'PA Max (สัญญาเพิ่มเติมสุขภาพค่ารักษาเนื่องจากอุบัติเหตุ)', # เพิ่มมาจาก MAP 'PA Prompt (สัญญาเพิ่มเติมสุขภาพค่ารักษาเนื่องจากอุบัติเหตุ)',# เพิ่มมาจาก MAP 'Rider ADB and ADD(สัญญาเพิ่มเติมอุบัติเหตุ)', 'Rider ADB (สัญญาเพิ่มเติมอุบัติเหตุ)', 'Rider ADD (สัญญาเพิ่มเติมอุบัติเหตุ)', 'Rider AI (สัญญาเพิ่มเติมอุบัติเหตุ)', 'PA Prompt (อุบัติเหตุส่วนบุคคล)', 'Worry Free 50 Critical Illness', 'PA Max (อุบัติเหตุส่วนบุคคล)', 'Protection Plus 18/9', 'Smart Term Bronze 10', 'Smart Wellness 90/15', 'Smart Term Bronze 5', 'High Protect 3/3', 'Chai Leoy 99/10', 'Chai Leoy 99/20', 'Chai Leoy 99/5', 'Sabai Jai 14/5', 'Mental Health', 'Health Smile', 'Jai Jai 15/6', 'Jai Jai 12/6', # เพิ่มมาจาก MAP 'Jai Jai 25/9', 'OPD', ] KNOWN_PRODUCTS_SORTED = sorted(list(set(KNOWN_PRODUCTS_SORTED)), key=len, reverse=True) # --- Tool 1: General Knowledge (RAG) --- @observe(name="RAG_Flow") def search_general_knowledge( agent_instance, query: Annotated[str, "The user's general question about an insurance product, its features, or related topics like claims or tax deductions."], ) -> str: """Use this tool to answer a user's general question about Rabbit Life insurance and their products. This tool performs a semantic search (RAG) through the knowledge base of product documentation to find the most relevant information to answer the query.""" # print(f"🛠️ Tool Called: search_general_knowledge(query='{query}')") if not rag_retriever: return "ขออภัยค่ะ ระบบสืบค้นข้อมูลขัดข้องชั่วคราว" # --- 1. สกัดชื่อ Product --- extracted_plan_name = None lower_query = query.lower() for product in KNOWN_PRODUCTS_SORTED: # สร้าง keyword ตัดวงเล็บออก simple_product_keyword = re.sub(r'\(.*\)', '', product).strip().lower() # ตรวจสอบว่า keyword ที่สร้างขึ้น อยู่ในคำถามของผู้ใช้หรือไม่ if simple_product_keyword and simple_product_keyword in lower_query: # ถ้าเจอ, ให้ใช้ "ชื่อเต็ม" ของผลิตภัณฑ์นั้นเป็นตัวกรอง extracted_plan_name = product break # เจออันที่ยาวที่สุดแล้ว หยุดทันที # print(f"🕵️‍♂️ Extracted Plan Name for Filter: {extracted_plan_name}") # --- 2. สร้าง Filter Dictionary & เรียก Retriever --- retriever_kwargs = {} if extracted_plan_name: # print(f"✨ Applying metadata filter for: '{extracted_plan_name}'") # สร้าง filter โดยใช้ Key ('vector_search_filter') retriever_kwargs['vector_search_filter'] = { "term": { "query": extracted_plan_name, "path": "policy_plan_name" } } # print(f"🔍 Retrieving context for: '{query}' with filter: {retriever_kwargs}") # ส่ง kwargs ที่มี filter ของเราเข้าไปใน retriever pipeline compression_retriever = rag_retriever.get_compression_retriever(**retriever_kwargs) context_docs = compression_retriever.invoke(query) # print(f"Retrieved {len(context_docs)} documents") # print(context_docs) if not context_docs: if extracted_plan_name: return f"ขออภัยค่ะ เราพบข้อมูลเกี่ยวกับ '{extracted_plan_name}' แต่ไม่พบรายละเอียดที่ตรงกับคำถามของคุณค่ะ" return "ขออภัยค่ะ เราไม่พบข้อมูลที่เกี่ยวข้องกับคำถามนี้" # --- 3. Format Meta --- # print(f"📄 Formatting {len(context_docs)} retrieved documents...") formatted_docs = [] for i, doc in enumerate(context_docs): insurance_type = doc.metadata.get('insurance_type', '-') plan_name = doc.metadata.get('plan_name', '-') header_1 = doc.metadata.get('Header 1', '-') header_2 = doc.metadata.get('Header 2', '-') header_3 = doc.metadata.get('Header 3', '-') content = doc.page_content formatted = ( f"\n" f"ชื่อประกัน: {plan_name}\n" f"ประเภทประกัน: {insurance_type}\n" f"หัวข้อใหญ่: {header_1}\n" f"หัวข้อรอง: {header_2}\n" f"หัวข้อย่อย: {header_3}\n\n" f"{content}\n" "----------" ) formatted_docs.append(formatted) context = "\n\n".join(formatted_docs) final_prompt = f""" {context} User's Question: {query} Based *only* on the context provided, answer the user's question in polite Thai. """ # print("🧠 Generating response from context...") try: response = specialized_llm.generate_content(final_prompt) ai_response_content = response.text or "" clean_response = re.sub(r"<[^>]+>|#+", "", ai_response_content).strip() # print(f"✅ RAG process completed.") return clean_response except Exception as e: print(f"‼️ ERROR during RAG LLM generation: {e}") return "ขออภัยค่ะ เกิดข้อผิดพลาดในการสร้างคำตอบ" # --- Tool 2: Query Existing Customer Policy --- @observe(name="Customer_Flow") def query_customer_policy( agent_instance, customer_identifier: Annotated[str, "The customer's identification information, such as their full name ('Firstname Lastname') or their 13-digit National ID number, as provided by the user."], question: Annotated[str, "The specific question the customer is asking about their policy. For example: 'When is my next payment due?', 'Summarize my coverage', or 'What is my policy status?'"] ) -> str: """Use this tool when an existing customer wants to 'check', 'review', or 'see' their personal policy information. This tool retrieves the customer's policy data from the database using their identifier, summarizes the relevant information based on their question, and also identifies potential upsell opportunities.""" # print(f"🛠️ Tool Called: query_customer_policy(identifier='{customer_identifier}')") # find_customer_data found_data_df = customer_data_handler.find_customer_data(customer_identifier) # return df if found_data_df is None or found_data_df.empty: return f"ขออภัยค่ะ ไม่พบข้อมูลของคุณ '{customer_identifier}' ในระบบ รบกวนตรวจสอบการสะกดอีกครั้งค่ะ" agent_instance.set_customer_context(found_data_df) # เตรียม Prompt customer_name = f"{found_data_df.iloc[0].get('insured_firstname', '')} {found_data_df.iloc[0].get('insured_lastname', '')}".strip() policy_data_json = customer_data_handler.translate_and_format_data(found_data_df) prompt_string = CUSTOMER_DATA_SUMMARY_PROMPT.format( customer_name=customer_name, original_question=question, policy_data_json=policy_data_json ) # LLM Generate try: response = specialized_llm.generate_content(prompt_string) summary = response.text # 4. แปะ Upsell (ถ้ามี) gaps = customer_data_handler.find_recommendation_gaps(found_data_df) # # วิเคราะห์ข้อมูลกรมธรรม์ของลูกค้าเพื่อหาผลิตภัณฑ์ที่น่าแนะนำเพิ่มเติม (Gap Analysis) โดยพิจารณาจาก Age และ Salary upsell_text = customer_data_handler.generate_upsell_text_from_gaps(gaps) return summary + upsell_text except Exception as e: print(f"‼️ ERROR during customer data summary generation: {e}") return "ขออภัยค่ะ เกิดข้อผิดพลาดในการสรุปข้อมูลกรมธรรม์ของท่าน" # --- Tool 3: New Customer Recommendation --- @observe(name="Recommendation_Flow") def get_new_customer_recommendation( agent_instance, age: Annotated[int, "อายุ"], gender: Annotated[str, "เพศ"], salary: Annotated[int, "รายได้"], interest: Annotated[str, "ความสนใจ"] = "ประกันทั่วไป" ) -> str: # print(f"🛠️ Tool Called: get_new_customer_recommendation(...)") gender_code = 'M' if any(g in gender for g in ['ชาย', 'male']) else 'F' # --- [แก้ไข] เรียกใช้ฟังก์ชัน Sync ได้โดยตรง --- recommendation_data = recommendation_handler.generate_recommendation_from_profile( age=int(age), # แปลงเป็น int เพื่อความแน่นอน gender=gender_code, salary=int(salary), # แปลงเป็น int เพื่อความแน่นอน original_interest=interest ) if recommendation_data.get("error"): return f"ขออภัยค่ะ ไม่สามารถสร้างคำแนะนำได้: {recommendation_data['error']}" prompt_string = RECOMMENDATION_PROMPT.format(**recommendation_data) try: response = specialized_llm.generate_content(prompt_string) return response.text except Exception as e: return f"ขออภัยค่ะ เกิดข้อผิดพลาดในการสร้างคำแนะนำ: {e}" # --- Tool 4: Recommend for existing customers --- @observe(name="Recommendation_existing_Flow") def recommend_for_existing_customer( agent_instance, interest: Annotated[Optional[str], "An optional parameter for the customer's specific, newly-stated interest (e.g., 'accident insurance', 'investment plans'). Use this to filter the recommendation. If the user doesn't specify an interest, this can be omitted."] = None ) -> str: """ Use this tool to provide additional product recommendations to a KNOWN, IDENTIFIED customer whose data is already loaded in the agent's context. This is the correct tool for an existing customer who asks 'what else should I get?', 'can you recommend something for accidents?', or 'I want to add investment coverage'. CRITICAL: This tool should ONLY be used AFTER the customer's context has been successfully set (e.g., after a successful call to `query_customer_policy`). It relies on the agent's memory. """ # print(f"🛠️ Tool Called: recommend_for_existing_customer(interest='{interest}')") # 1) ดึงข้อมูลลูกค้าจากหน่วยความจำของ Agent customer_df = agent_instance.get_customer_context() if customer_df is None or customer_df.empty: return "CONTEXT_NOT_FOUND_ASK_USER_TO_IDENTIFY" # 2) Gap Analysis เพื่อหาผลิตภัณฑ์ทั้งหมดที่ลูกค้ายังขาด all_gaps = customer_data_handler.find_recommendation_gaps(customer_df) # วิเคราะห์ข้อมูลกรมธรรม์ของลูกค้าเพื่อหาผลิตภัณฑ์ที่น่าแนะนำเพิ่มเติม (Gap Analysis) โดยพิจารณาจาก Age และ Salary if not all_gaps: # ไม่มี return "จากการตรวจสอบข้อมูล พบว่าท่านมีความคุ้มครองที่ครอบคลุมดีอยู่แล้วค่ะ หากมีคำถามอื่นๆ สอบถามได้เลยนะคะ" products_to_recommend = all_gaps # เริ่มต้นด้วย gaps ทั้งหมด # 3) กรอง Gaps ตาม Interest ที่ได้รับมา --- if interest: target_type = None if "อุบัติเหตุ" in interest: target_type = "ประกันอุบัติเหตุ" elif "ลงทุน" in interest or "ออม" in interest: target_type = "ประกันเพื่อการลงทุน" elif "สุขภาพ" in interest: target_type = "ประกันสุขภาพ" elif "ชีวิต" in interest: target_type = "ประกันคุ้มครองชีวิต" if target_type: filtered_gaps = [p for p in all_gaps if p.get('insurance_type') == target_type] if filtered_gaps: # print(f"-> Filtering gaps by interest: '{target_type}'.") products_to_recommend = filtered_gaps else: # ถ้ากรองแล้วไม่เจอ Gap ที่ตรงกับความสนใจเลย return f"จากการตรวจสอบข้อมูล พบว่าท่านมีความคุ้มครองที่ดีในด้าน '{target_type}' อยู่แล้ว หรือไม่มีผลิตภัณฑ์ประเภทนี้ที่แนะนำเพิ่มเติมสำหรับโปรไฟล์ของท่านในขณะนี้ค่ะ" # ---------------------------------------------------- # 4) LLM สร้างคำแนะนำจาก `products_to_recommend` customer_info = customer_df.iloc[0] persona = next((p for p in customer_data_handler.PERSONAS.values() if p.get("age_min", -1) <= customer_info.get('insured_age_latest') <= p.get("age_max", -1)), {}) # [('young_adult', {...}), ('mid_career', {...}), ('pre_retirement', {...}), ...] main_plans = [p for p in products_to_recommend if p.get('plan_type') == 'Basic'] riders = [p for p in products_to_recommend if p.get('plan_type') == 'Rider'] main_plans_str = "\n".join([f"• **{p['product_name']}**: {p['product_description']}" for p in main_plans]) if main_plans else "ไม่มีแผนประกันหลักแนะนำเพิ่มเติมในหมวดนี้" riders_str = "\n".join([f"• **{p['product_name']}**: {p['product_description']}" for p in riders]) if riders else "ไม่มีสัญญาเพิ่มเติมแนะนำในหมวดนี้" interest_category_for_prompt = interest if interest else "แผนประกันที่เหมาะสมเพิ่มเติม" recommendation_data = { "age": customer_info.get('insured_age_latest'), "gender": customer_info.get('insured_gender'), "salary": f"{customer_info.get('insured_salary'):,}", "persona_name": persona.get('persona_name', 'ลูกค้าปัจจุบัน'), "persona_description": f"ลูกค้าปัจจุบันที่ต้องการคำแนะนำเพิ่มเติมเกี่ยวกับ '{interest_category_for_prompt}'", "original_interest": interest_category_for_prompt, "main_plans_str": main_plans_str, "riders_str": riders_str, "auto_added_main_plan": False, "searched_outside_tier": False, "interest_category": interest_category_for_prompt } prompt_string = RECOMMENDATION_PROMPT.format(**recommendation_data) try: response = specialized_llm.generate_content(prompt_string) return response.text except Exception as e: return f"ขออภัยค่ะ เกิดข้อผิดพลาดในการสร้างคำแนะนำ: {e}"