from haystack import * from haystack.components.generators.openai import OpenAIGenerator from haystack.components.builders import PromptBuilder from haystack.components.embedders import SentenceTransformersDocumentEmbedder from haystack.components.retrievers.in_memory import * from haystack.document_stores.in_memory import InMemoryDocumentStore from haystack.utils import Secret from pathlib import Path import hashlib from datetime import * from typing import * from dataclasses import * import json import logging import re import pickle import statistics # Setup logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) @dataclass class ValidationResult: """Stores the result of a validation check""" is_valid: bool errors: List[str] warnings: List[str] normalized_data: Dict[str, str] @dataclass class ApplicationInfo: application_portal: str program_email: str @dataclass class RequiredDocument: name: str description: str conditions: Optional[str] = None @dataclass class SelectionStep: step_number: str description: str @dataclass class ProgramDetailInfo: application_info: ApplicationInfo required_documents: Dict[str, Dict[str, RequiredDocument]] submission_process: str selection_process: List[SelectionStep] @dataclass class Transportation: boat: str bts: str mrt: str airport_link: str bus: Dict[str, str] @dataclass class Contact: email: str facebook: Dict[str, str] @dataclass class ContactDetail: event_type: str department: str faculty: str university: str location: str contact: Contact transportation: Transportation @dataclass class Course: code: str title_th: str title_en: str credits: int @dataclass class CourseCategory: description: Optional[str] credits: Union[str, int] minimum_credits: Optional[int] courses: List[Course] @dataclass class CourseStructure: event_type: str program_name: str department: str total_credits: int degree_level: str structure: Dict[str, CourseCategory] @dataclass class StudyPlan: event_type: str years: Dict[str, Dict[str, Any]] @dataclass class RegularFee: amount: float currency: str period: str @dataclass class LatePaymentFee: amount: float currency: str @dataclass class TuitionFee: event_type: str regular_fee: RegularFee late_payment_fee: LatePaymentFee class OpenAIDateParser: """Uses OpenAI to parse complex Thai date formats""" def __init__(self, api_key: str, model: str = "gpt-4o"): self.generator = OpenAIGenerator( api_key=Secret.from_token(api_key), model=model ) self.prompt_builder = PromptBuilder( template=""" Parse the following Thai date range into a structured format: Date: {{date}} Return in JSON format: { "start_date": "YYYY-MM-DD", "end_date": "YYYY-MM-DD" (if range), "is_range": true/false } Notes: - Convert Buddhist Era (BE) to CE - Handle abbreviated Thai months - Account for date ranges with dashes - Return null for end_date if it's a single date Example inputs and outputs: Input: "จ 8 ก.ค. – จ 19 ส.ค. 67" Output: {"start_date": "2024-07-08", "end_date": "2024-08-19", "is_range": true} Input: "15 มกราคม 2567" Output: {"start_date": "2024-01-15", "end_date": null, "is_range": false} """ ) async def parse_date(self, date_str: str) -> Dict[str, Union[str, bool]]: """Parse complex Thai date format using OpenAI""" try: result = self.prompt_builder.run(date=date_str) response = await self.generator.arun(prompt=result["prompt"]) if not response or not response.get("replies"): raise ValueError("Empty response from OpenAI") parsed = json.loads(response["replies"][0]) for date_field in ['start_date', 'end_date']: if parsed.get(date_field): datetime.strptime(parsed[date_field], '%Y-%m-%d') return parsed except Exception as e: logger.error(f"OpenAI date parsing failed for '{date_str}': {str(e)}") raise ValueError(f"Could not parse date: {date_str}") class ThaiTextPreprocessor: """Handles Thai text preprocessing and normalization""" CHAR_MAP = {'ํา': 'ำ','์': '','–': '-','—': '-','٫': ',',} @classmethod def normalize_thai_text(cls, text: str) -> str: """Normalize Thai text by applying character mappings and spacing rules""" if not text: return text for old, new in cls.CHAR_MAP.items(): text = text.replace(old, new) text = re.sub(r'\s+', ' ', text.strip()) thai_digits = '๐๑๒๓๔๕๖๗๘๙' arabic_digits = '0123456789' for thai, arabic in zip(thai_digits, arabic_digits): text = text.replace(thai, arabic) return text class CalendarEventValidator: """Validates and preprocesses calendar events""" def __init__(self, openai_api_key: str): self.preprocessor = ThaiTextPreprocessor() self.date_parser = OpenAIDateParser(api_key=openai_api_key) async def validate_event(self, event: 'CalendarEvent') -> ValidationResult: """Validate a calendar event and return validation results""" errors = [] warnings = [] normalized_data = {} if event.date: try: parsed_date = await self.date_parser.parse_date(event.date) normalized_data['date'] = parsed_date['start_date'] if parsed_date['is_range'] and parsed_date['end_date']: range_note = f"ถึงวันที่ {parsed_date['end_date']}" if event.note: normalized_data['note'] = f"{event.note}; {range_note}" else: normalized_data['note'] = range_note except ValueError as e: errors.append(f"Invalid date format: {event.date}") else: errors.append("Date is required") if event.time: time_pattern = r'^([01]?[0-9]|2[0-3]):([0-5][0-9])$' if not re.match(time_pattern, event.time): errors.append(f"Invalid time format: {event.time}") normalized_data['time'] = event.time if event.activity: normalized_activity = self.preprocessor.normalize_thai_text(event.activity) if len(normalized_activity) < 3: warnings.append("Activity description is very short") normalized_data['activity'] = normalized_activity else: errors.append("Activity is required") valid_semesters = {'ภาคต้น', 'ภาคปลาย', 'ภาคฤดูร้อน'} if event.semester: normalized_semester = self.preprocessor.normalize_thai_text(event.semester) if normalized_semester not in valid_semesters: warnings.append(f"Unusual semester value: {event.semester}") normalized_data['semester'] = normalized_semester else: errors.append("Semester is required") valid_types = {'registration', 'deadline', 'examination', 'academic', 'holiday'} if event.event_type not in valid_types: errors.append(f"Invalid event type: {event.event_type}") normalized_data['event_type'] = event.event_type if event.note and 'note' not in normalized_data: normalized_data['note'] = self.preprocessor.normalize_thai_text(event.note) if event.section: normalized_data['section'] = self.preprocessor.normalize_thai_text(event.section) return ValidationResult( is_valid=len(errors) == 0, errors=errors, warnings=warnings, normalized_data=normalized_data ) @dataclass class CalendarEvent: """Structured representation of a calendar event with validation""" date: str time: str activity: str note: str semester: str event_type: str section: Optional[str] = None @staticmethod def classify_event_type(activity: str) -> str: """Classify event type based on activity description""" activity_lower = activity.lower() keywords = { 'registration': ['ลงทะเบียน', 'ชําระเงิน', 'ค่าธรรมเนียม', 'เปิดเรียน'], 'deadline': ['วันสุดท้าย', 'กําหนด', 'ภายใน', 'ต้องส่ง'], 'examination': ['สอบ', 'ปริญญานิพนธ์', 'วิทยานิพนธ์', 'สอบปากเปล่า'], 'holiday': ['วันหยุด', 'ชดเชย', 'เทศกาล'], } for event_type, terms in keywords.items(): if any(term in activity_lower for term in terms): return event_type return 'academic' async def initialize(self, openai_api_key: str): """Asynchronously validate and normalize the event""" validator = CalendarEventValidator(openai_api_key) result = await validator.validate_event(self) if not result.is_valid: raise ValueError(f"Invalid calendar event: {', '.join(result.errors)}") for field, value in result.normalized_data.items(): setattr(self, field, value) if result.warnings: logger.warning(f"Calendar event warnings: {', '.join(result.warnings)}") def to_searchable_text(self) -> str: """Convert event to searchable text format""" return f""" ภาคการศึกษา: {self.semester} ประเภท: {self.event_type} วันที่: {self.date} เวลา: {self.time or '-'} กิจกรรม: {self.activity} หมวดหมู่: {self.section or '-'} หมายเหตุ: {self.note or '-'} """.strip() class CacheManager: """Manages caching for different components of the RAG pipeline""" def __init__(self, cache_dir: Path, ttl: int = 3600): """ Initialize CacheManager """ self.cache_dir = cache_dir self.ttl = ttl self.embeddings_cache = self._load_cache("embeddings") self.query_cache = self._load_cache("queries") self.document_cache = self._load_cache("documents") def _generate_key(self, data: Union[str, Dict, Any]) -> str: """Generate a unique cache key""" if isinstance(data, str): content = data.encode('utf-8') else: content = json.dumps(data, sort_keys=True).encode('utf-8') return hashlib.md5(content).hexdigest() def _load_cache(self, cache_type: str) -> Dict: """Load cache from disk""" cache_path = self.cache_dir / f"{cache_type}_cache.pkl" if cache_path.exists(): try: with open(cache_path, 'rb') as f: cache = pickle.load(f) self._clean_expired_entries(cache) return cache except Exception as e: logger.warning(f"Failed to load {cache_type} cache: {e}") return {} return {} def _save_cache(self, cache_type: str, cache_data: Dict): """Save cache to disk""" cache_path = self.cache_dir / f"{cache_type}_cache.pkl" try: with open(cache_path, 'wb') as f: pickle.dump(cache_data, f) except Exception as e: logger.error(f"Failed to save {cache_type} cache: {e}") def _clean_expired_entries(self, cache: Dict): """Remove expired cache entries""" current_time = datetime.now() expired_keys = [ key for key, (_, timestamp) in cache.items() if current_time - timestamp > timedelta(seconds=self.ttl) ] for key in expired_keys: del cache[key] def get_embedding_cache(self, text: str) -> Optional[Any]: """Get cached embedding for text""" key = self._generate_key(text) if key in self.embeddings_cache: embedding, timestamp = self.embeddings_cache[key] if datetime.now() - timestamp <= timedelta(seconds=self.ttl): return embedding return None def set_embedding_cache(self, text: str, embedding: Any): """Cache embedding for text""" key = self._generate_key(text) self.embeddings_cache[key] = (embedding, datetime.now()) self._save_cache("embeddings", self.embeddings_cache) def get_query_cache(self, query: str) -> Optional[Dict]: """Get cached query results""" key = self._generate_key(query) if key in self.query_cache: result, timestamp = self.query_cache[key] if datetime.now() - timestamp <= timedelta(seconds=self.ttl): return result return None def set_query_cache(self, query: str, result: Dict): """Cache query results""" key = self._generate_key(query) self.query_cache[key] = (result, datetime.now()) self._save_cache("queries", self.query_cache) def set_document_cache(self, doc_id: str, document: Any): """Cache document""" self.document_cache[doc_id] = (document, datetime.now()) self._save_cache("documents", self.document_cache) @dataclass class ModelConfig: openai_api_key: str embedder_model: str = "sentence-transformers/paraphrase-multilingual-mpnet-base-v2" openai_model: str = "gpt-4o" temperature: float = 0.7 @dataclass class RetrieverConfig: top_k: int = 5 @dataclass class CacheConfig: enabled: bool = True cache_dir: Path = Path("./cache") ttl: int = 86400 # 24 hours @dataclass class ProcessingConfig: batch_size: int = 32 @dataclass class LocalizationConfig: enable_thai_normalization: bool = True @dataclass class PipelineConfig: model: ModelConfig retriever: RetrieverConfig = field(default_factory=RetrieverConfig) cache: CacheConfig = field(default_factory=CacheConfig) processing: ProcessingConfig = field(default_factory=ProcessingConfig) localization: LocalizationConfig = field(default_factory=LocalizationConfig) def create_default_config(api_key: str) -> PipelineConfig: """ Create a default pipeline configuration with optimized settings for Thai language processing. Args: api_key (str): OpenAI API key Returns: PipelineConfig: Configured pipeline settings """ return PipelineConfig( model=ModelConfig( openai_api_key=api_key, temperature=0.3 # Lower temperature for more focused responses ), retriever=RetrieverConfig( top_k=5 # Optimal number of documents to retrieve ), cache=CacheConfig( enabled=True, cache_dir=Path("./cache"), ttl=86400 # 24 hour cache ), processing=ProcessingConfig( batch_size=32 # Default batch size for processing ), localization=LocalizationConfig( enable_thai_normalization=True # Enable Thai text normalization ) ) class CalendarDataProcessor: """Process and structure calendar data from the new raw-data.json format""" @staticmethod def parse_calendar_json(json_data: Dict) -> List[CalendarEvent]: """Parse the new calendar JSON format into CalendarEvent objects""" events = [] # Extract academic calendar data - handle direct dictionary input calendar_data = json_data.get('academic_calendar', []) if isinstance(json_data, dict) else json_data for semester_block in calendar_data: semester = semester_block.get('education', '') schedule = semester_block.get('schedule', []) # Handle regular schedule events for event in schedule: if 'section' in event and 'details' in event: # Process section-based events (thesis deadlines, etc.) section = event['section'] for detail in event['details']: if 'ภาคต้น' in detail and 'ภาคปลาย' in detail: # Handle dual-semester events for sem_key in ['ภาคต้น', 'ภาคปลาย']: if detail.get(sem_key): events.append(CalendarEvent( date=detail[sem_key], time='', activity=detail['title'], note=section, semester=sem_key, event_type='deadline', section=section )) else: # Single semester event events.append(CalendarEvent( date=detail.get('date', ''), time='', activity=detail.get('title', ''), note=section, semester=ThaiTextPreprocessor.normalize_thai_text(semester), event_type='deadline', section=section )) else: # Regular calendar event event_type = CalendarEvent.classify_event_type(event.get('activity', '')) # Clean semester string cleaned_semester = semester if '(' in semester: match = re.search(r'\((.*?)\)', semester) if match: cleaned_semester = match.group(1) cleaned_semester = ThaiTextPreprocessor.normalize_thai_text(cleaned_semester) events.append(CalendarEvent( date=event.get('date', ''), time=event.get('time', ''), activity=event.get('activity', ''), note=event.get('note', ''), semester=cleaned_semester, event_type=event_type )) return events @staticmethod def extract_program_details(json_data: Dict) -> ProgramDetailInfo: """Extract and structure program details into ProgramDetailInfo object""" raw_details = json_data.get('program_details', {}) # Process application info app_info_data = raw_details.get('application_info', {}) app_info = ApplicationInfo( application_portal=app_info_data.get('application_portal', ''), program_email=app_info_data.get('program_email', '') ) # Process required documents req_docs = {} raw_docs = raw_details.get('required_documents', {}) # Process mandatory documents mandatory_docs = {} for doc_key, doc_value in raw_docs.get('mandatory', {}).items(): mandatory_docs[doc_key] = RequiredDocument( name=doc_key, description=doc_value ) req_docs['mandatory'] = mandatory_docs # Process optional documents optional_docs = {} for doc_key, doc_data in raw_docs.get('optional', {}).items(): if doc_key == 'english_proficiency': ep_data = doc_data optional_docs[doc_key] = RequiredDocument( name=ep_data.get('name', ''), description=str(ep_data.get('accepted_tests', {})), conditions=f"Validity: {ep_data.get('validity', '')}, Benefits: {ep_data.get('benefits', '')}, Exemptions: {ep_data.get('exemptions', '')}" ) else: optional_docs[doc_key] = RequiredDocument( name=doc_data.get('name', ''), description='', conditions=doc_data.get('condition', '') ) req_docs['optional'] = optional_docs # Process selection steps selection_steps = [] for step_data in raw_details.get('selection_process', {}).get('steps', []): for step_num, description in step_data.items(): selection_steps.append(SelectionStep( step_number=step_num, description=description )) return [ProgramDetailInfo( application_info=app_info, required_documents=req_docs, submission_process=raw_details.get('submission_process', ''), selection_process=selection_steps )] @staticmethod def extract_contact_details(json_data: Dict) -> List[ContactDetail]: """Extract and structure contact details into ContactDetail objects""" raw_contacts = json_data.get('contact_details', []) contact_details = [] # Handle the case where raw_contacts might be a single object instead of a list if not isinstance(raw_contacts, list): raw_contacts = [raw_contacts] for contact_data in raw_contacts: # Skip if contact_data is not a dictionary if not isinstance(contact_data, dict): continue try: # Process transportation data transportation_data = contact_data.get('transportation', {}) transportation = Transportation( boat=transportation_data.get('boat', ''), bts=transportation_data.get('bts', ''), mrt=transportation_data.get('mrt', ''), airport_link=transportation_data.get('airport_link', ''), bus=transportation_data.get('bus', {}) ) # Process contact information contact_info = Contact( email=contact_data.get('email', ''), facebook=contact_data.get('facebook', {}) ) # Create ContactDetail object contact_details.append(ContactDetail( event_type=contact_data.get('event_type', ''), department=contact_data.get('department', ''), faculty=contact_data.get('faculty', ''), university=contact_data.get('university', ''), location=contact_data.get('location', ''), contact=contact_info, transportation=transportation )) except Exception as e: continue return contact_details @staticmethod def extract_course_structure(json_data: Dict) -> List[CourseStructure]: """Extract and structure course information into CourseStructure objects""" course_structures = [] # Get course structure data course_data = json_data.get('course_structure', {}) program_metadata = course_data.get('program_metadata', {}) curriculum = course_data.get('curriculum_structure', {}) # Process foundation courses foundation_data = curriculum.get('foundation_courses', {}) foundation_courses = [] for course in foundation_data.get('courses', []): foundation_courses.append(Course( code=course.get('code', ''), title_th=course.get('title', {}).get('th', ''), title_en=course.get('title', {}).get('en', ''), credits=course.get('credits', 0) )) # Process core courses core_data = curriculum.get('core_courses', {}) core_courses = [] for course in core_data.get('modules', []): core_courses.append(Course( code=course.get('code', ''), title_th=course.get('title', {}).get('th', ''), title_en=course.get('title', {}).get('en', ''), credits=course.get('credits', 0) )) # Process elective courses elective_data = curriculum.get('electives', {}) elective_courses = [] for course in elective_data.get('course_groups', []): elective_courses.append(Course( code=course.get('code', ''), title_th=course.get('title', {}).get('th', ''), title_en=course.get('title', {}).get('en', ''), credits=course.get('credits', 0) )) # Process research courses research_data = curriculum.get('research', {}) research_courses = [] for course in research_data.get('course', []): research_courses.append(Course( code=course.get('code', ''), title_th=course.get('title', {}).get('th', ''), title_en=course.get('title', {}).get('en', ''), credits=course.get('credits', 0) )) # Create course categories structure = { 'หมวดวิชาปรับพื้นฐาน': CourseCategory( # Previously foundation_courses description="วิชาพื้นฐานที่จำเป็นต้องเรียน foundation courses รายวิชาปรับพื้นฐาน", credits=foundation_data.get('metadata', {}).get('credits', 'non-credit'), minimum_credits=None, courses=foundation_courses ), 'หมวดวิชาบังคับ': CourseCategory( # Previously core_courses description="วิชาบังคับ วิชาหลัก core courses รายวิชาที่ต้องเรียน", credits=0, minimum_credits=core_data.get('minimum_requirement_credits'), courses=core_courses ), 'หมวดวิชาเลือก': CourseCategory( # Previously elective_courses description="วิชาเลือก elective courses รายวิชาเลือก วิชาที่สามารถเลือกเรียนได้", credits=0, minimum_credits=elective_data.get('minimum_requirement_credits'), courses=elective_courses ), 'หมวดวิชาการค้นคว้าอิสระ': CourseCategory( # Previously research_courses description="วิชาค้นคว้าอิสระ research courses วิทยานิพนธ์", credits=0, minimum_credits=research_data.get('minimum_requirement_credits'), courses=research_courses ) } # Create course structure course_structure = CourseStructure( event_type='curriculum_structure', program_name=program_metadata.get('name', ''), department=program_metadata.get('department', ''), total_credits=program_metadata.get('total_credits', 0), degree_level=program_metadata.get('degree_level', ''), structure=structure ) return [course_structure] @staticmethod def extract_program_study_plan(json_data: Dict) -> List[StudyPlan]: """Extract and structure study plan information into StudyPlan objects""" study_plan_data = json_data.get('program_study_plan', {}) # Initialize the years dictionary to store all year/semester data years_dict = {} for year_key, year_data in study_plan_data.items(): years_dict[year_key] = {} for semester_key, semester_data in year_data.items(): # Get metadata metadata = semester_data.get('metadata', {}) # Initialize semester structure semester_struct = { 'metadata': metadata, 'courses': [] } # Handle both 'modules' and 'courses' keys course_data = semester_data.get('modules', []) or semester_data.get('courses', []) # Add courses to semester for course in course_data: course_info = { 'code': course.get('code', ''), 'title': course.get('title', {'th': '', 'en': ''}), 'credits': course.get('credits', 0) } semester_struct['courses'].append(course_info) # Add semester data to year years_dict[year_key][semester_key] = semester_struct # Create StudyPlan object study_plan = StudyPlan( event_type='study_plan', years=years_dict ) return [study_plan] @staticmethod def extract_fees(json_data: Dict) -> List[TuitionFee]: """Extract and structure fee information into TuitionFee objects""" fees_data = json_data.get('fees', {}) # Parse regular tuition fee regular_fee_str = fees_data.get('tuition', '') regular_amount = float(regular_fee_str.split()[0]) if regular_fee_str else 0 regular_fee = RegularFee( amount=regular_amount, currency='THB', period='per semester' ) # Parse late payment fee late_fee_str = fees_data.get('late_payment', '') late_amount = float(late_fee_str.split()[0]) if late_fee_str else 0 late_payment_fee = LatePaymentFee( amount=late_amount, currency='THB' ) # Create TuitionFee object tuition_fee = TuitionFee( event_type='tuition_fee', regular_fee=regular_fee, late_payment_fee=late_payment_fee ) return [tuition_fee] class HybridDocumentStore: """Enhanced document store with hybrid retrieval capabilities""" def __init__(self, config: PipelineConfig): self.store = InMemoryDocumentStore() self.embedder = SentenceTransformersDocumentEmbedder( model=config.model.embedder_model ) # Initialize BM25 retriever self.bm25_retriever = InMemoryBM25Retriever( document_store=self.store, top_k=config.retriever.top_k ) # Initialize embedding retriever self.embedding_retriever = InMemoryEmbeddingRetriever( document_store=self.store, top_k=config.retriever.top_k ) self.cache_manager = CacheManager( cache_dir=config.cache.cache_dir, ttl=config.cache.ttl ) self.embedder.warm_up() # Initialize containers self.events = [] self.event_type_index = {} self.semester_index = {} self._document_counter = 0 # Additional data containers self.course_data = [] self.contact_data = [] self.study_plan_data = [] def _generate_unique_id(self) -> str: """Generate a unique document ID""" self._document_counter += 1 timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") return f"doc_{timestamp}_{self._document_counter}" def _compute_embedding(self, text: str) -> Any: """Compute embedding with caching""" cached_embedding = self.cache_manager.get_embedding_cache(text) if cached_embedding is not None: return cached_embedding doc = Document(content=text) embedding = self.embedder.run(documents=[doc])["documents"][0].embedding self.cache_manager.set_embedding_cache(text, embedding) return embedding def _format_required_docs(self, docs: Dict) -> str: """Format required documents information with detailed English proficiency requirements""" result = [] if 'mandatory' in docs: result.append("เอกสารที่ต้องใช้:") for doc in docs['mandatory'].values(): result.append(f"- {doc.name}: {doc.description}") if 'optional' in docs: result.append("\nเอกสารเพิ่มเติม:") for doc_key, doc in docs['optional'].items(): if doc_key == 'english_proficiency': result.append(f"- {doc.name}") # Parse and format the accepted tests try: accepted_tests = eval(doc.description) result.append(" เกณฑ์คะแนนที่ยอมรับ:") for test, requirement in accepted_tests.items(): result.append(f" * {test}: {requirement}") except: result.append(f" {doc.description}") if doc.conditions: conditions = doc.conditions.split(', ') for condition in conditions: result.append(f" {condition}") else: desc = f"- {doc.name}" if doc.conditions: desc += f" ({doc.conditions})" result.append(desc) return "\n".join(result) def _format_selection_steps(self, steps: List[SelectionStep]) -> str: """Format selection process steps""" return "\n".join(f"{step.step_number}. {step.description}" for step in steps) def add_events(self, events: List[CalendarEvent], contact_details: Optional[List[ContactDetail]] = None, course_structure: Optional[List[CourseStructure]] = None, study_plans: Optional[List[StudyPlan]] = None, program_details: Optional[List[ProgramDetailInfo]] = None, tuition_fees: Optional[List[TuitionFee]] = None): """Add events and additional data with caching""" documents = [] added_events = set() # Track added events to prevent duplicates # Process calendar events for event in events: event_key = f"{event.date}_{event.activity}_{event.semester}" if event_key not in added_events: added_events.add(event_key) self.events.append(event) event_idx = len(self.events) - 1 # Update indices if event.event_type not in self.event_type_index: self.event_type_index[event.event_type] = [] self.event_type_index[event.event_type].append(event_idx) if event.semester not in self.semester_index: self.semester_index[event.semester] = [] self.semester_index[event.semester].append(event_idx) # Create document text = event.to_searchable_text() embedding = self._compute_embedding(text) doc = Document( id=self._generate_unique_id(), content=text, embedding=embedding, meta={ 'event_type': event.event_type, 'semester': event.semester, 'date': event.date, 'event_idx': event_idx } ) documents.append(doc) self.cache_manager.set_document_cache(str(event_idx), doc) # Process contact details if contact_details: for contact in contact_details: self.contact_data.append(contact) text = f""" ข้อมูลการติดต่อ: คณะ: {contact.faculty} ภาควิชา: {contact.department} มหาวิทยาลัย: {contact.university} สถานที่: {contact.location} การติดต่อ: อีเมล: {contact.contact.email} Facebook: {json.dumps(contact.contact.facebook, ensure_ascii=False)} การเดินทาง: เรือ: {contact.transportation.boat} BTS: {contact.transportation.bts} MRT: {contact.transportation.mrt} Airport Link: {contact.transportation.airport_link} รถประจำทาง: {json.dumps(contact.transportation.bus, ensure_ascii=False)} """ embedding = self._compute_embedding(text) doc = Document( id=self._generate_unique_id(), content=text, embedding=embedding, meta={'event_type': 'contact'} ) documents.append(doc) # Process course structure if course_structure: for course in course_structure: text = f""" โครงสร้างหลักสูตร: ชื่อหลักสูตร: {course.program_name} ภาควิชา: {course.department} หน่วยกิตรวม: {course.total_credits} ระดับการศึกษา: {course.degree_level} รายละเอียดโครงสร้าง: หมวดวิชาปรับพื้นฐาน/วิชาพื้นฐาน: คำอธิบาย: {course.structure['หมวดวิชาปรับพื้นฐาน'].description or 'ไม่ระบุ'} หน่วยกิต: {course.structure['หมวดวิชาปรับพื้นฐาน'].credits} รายวิชา: """ # Add foundation courses foundation_courses = [] for c in course.structure['หมวดวิชาปรับพื้นฐาน'].courses: foundation_courses.append(f"- {c.code}: {c.title_th} ({c.title_en}) - {c.credits} หน่วยกิต") text += "\n".join(foundation_courses) text += f""" หมวดวิชาบังคับ/วิชาหลัก: หน่วยกิตขั้นต่ำ: {course.structure['หมวดวิชาบังคับ'].minimum_credits} รายวิชา: """ # Add core courses core_courses = [] for c in course.structure['หมวดวิชาบังคับ'].courses: core_courses.append(f"- {c.code}: {c.title_th} ({c.title_en}) - {c.credits} หน่วยกิต") text += "\n".join(core_courses) text += f""" หมวดวิชาเลือก: หน่วยกิตขั้นต่ำ: {course.structure['หมวดวิชาเลือก'].minimum_credits} รายวิชา: """ # Add elective courses elective_courses = [] for c in course.structure['หมวดวิชาเลือก'].courses: elective_courses.append(f"- {c.code}: {c.title_th} ({c.title_en}) - {c.credits} หน่วยกิต") text += "\n".join(elective_courses) text += f""" หมวดวิชาการค้นคว้าอิสระ: หน่วยกิตขั้นต่ำ: {course.structure['หมวดวิชาการค้นคว้าอิสระ'].minimum_credits} รายวิชา: """ # Add research courses research_courses = [] for c in course.structure['หมวดวิชาการค้นคว้าอิสระ'].courses: research_courses.append(f"- {c.code}: {c.title_th} ({c.title_en}) - {c.credits} หน่วยกิต") text += "\n".join(research_courses) doc = Document( id=self._generate_unique_id(), content=text.strip(), embedding=self._compute_embedding(text), meta={'event_type': 'curriculum'} ) documents.append(doc) # Process study plans if study_plans: for plan in study_plans: self.study_plan_data.append(plan) for year, semesters in plan.years.items(): for semester, data in semesters.items(): # Convert year and semester format year_num = year.replace('year', '') semester_num = semester.replace('semester', '') # Determine course type and translate to Thai course_type = data.get('metadata', {}).get('course_type', 'core') course_type_th = 'วิชาหลัก' if course_type == 'core' else 'วิชาเลือก' # Calculate total credits total_credits = sum(course.get('credits', 0) for course in data.get('courses', [])) text = f"""แผนการศึกษา: ปี: {year_num} ภาคการศึกษา: {semester_num} ประเภทรายวิชา: {course_type_th} ({course_type}) จำนวนหน่วยกิตรวม: {total_credits} รายวิชาที่ต้องเรียน:""" # Add courses if 'courses' in data: for course in data['courses']: text += f"\n- {course['code']}: {course['title'].get('th', '')} ({course['title'].get('en', '')}) - {course['credits']} หน่วยกิต" embedding = self._compute_embedding(text) doc = Document( id=self._generate_unique_id(), content=text, embedding=embedding, meta={ 'event_type': 'study_plan', 'year': year_num, 'semester': semester_num, 'course_type': course_type } ) documents.append(doc) if program_details: for detail in program_details: # Main application document app_text = f""" ข้อมูลการสมัคร: เว็บไซต์รับสมัคร: {detail.application_info.application_portal} อีเมล: {detail.application_info.program_email} เอกสารที่ต้องใช้: {self._format_required_docs(detail.required_documents)} ขั้นตอนการส่งเอกสาร: {detail.submission_process} ขั้นตอนการคัดเลือก: {self._format_selection_steps(detail.selection_process)} """ doc = Document( id=self._generate_unique_id(), content=app_text.strip(), embedding=self._compute_embedding(app_text), meta={'event_type': 'program_details'} ) documents.append(doc) # Create separate document for English proficiency requirements if 'optional' in detail.required_documents: eng_prof = next((doc for doc_key, doc in detail.required_documents['optional'].items() if doc_key == 'english_proficiency'), None) if eng_prof: eng_text = f""" ข้อกำหนดภาษาอังกฤษ: {eng_prof.name} รายละเอียด: {eng_prof.description} เงื่อนไข: {eng_prof.conditions} """ eng_doc = Document( id=self._generate_unique_id(), content=eng_text.strip(), embedding=self._compute_embedding(eng_text), meta={ 'event_type': 'program_details' } ) documents.append(eng_doc) # Process tuition fees if tuition_fees: for fee in tuition_fees: fee_text = f""" ค่าธรรมเนียมการศึกษา: - ค่าเล่าเรียน: {fee.regular_fee.amount:,.2f} {fee.regular_fee.currency} {fee.regular_fee.period} - ค่าปรับชำระล่าช้า: {fee.late_payment_fee.amount:,.2f} {fee.late_payment_fee.currency} """ doc = Document( id=self._generate_unique_id(), content=fee_text.strip(), embedding=self._compute_embedding(fee_text), meta={'event_type': 'fees'} ) documents.append(doc) batch_size = 10 for i in range(0, len(documents), batch_size): batch = documents[i:i + batch_size] try: self.store.write_documents(batch) except Exception as e: logger.error(f"Error writing document batch {i//batch_size + 1}: {str(e)}") for doc in batch: try: self.store.write_documents([doc]) except Exception as e2: logger.error(f"Failed to write document {doc.id}: {str(e2)}") def hybrid_search(self, query: str, event_type: Optional[str] = None, detail_type: Optional[str] = None, semester: Optional[str] = None, top_k: int = 10, weight_semantic: float = 0.5) -> List[Document]: """Hybrid search combining semantic and lexical search results""" cache_key = json.dumps({ 'query': query, 'event_type': event_type, 'semester': semester, 'top_k': top_k, 'weight_semantic': weight_semantic }) cached_results = self.cache_manager.get_query_cache(cache_key) if cached_results is not None: return cached_results # Get semantic search results query_embedding = self._compute_embedding(query) semantic_results = self.embedding_retriever.run(query_embedding=query_embedding)["documents"] # Get BM25 results bm25_results = self.bm25_retriever.run( query=query )["documents"] if event_type == "program_details": weight_semantic = 0.3 # Give more weight to keyword matching # Combine results using score fusion combined_results = self._merge_results( semantic_results=semantic_results, bm25_results=bm25_results, weight_semantic=weight_semantic, top_k=top_k ) # Filter results based on metadata filtered_results = [] for doc in combined_results: if event_type and event_type != "program_details" and doc.meta.get('event_type') != event_type: continue # Keep only relevant event type unless it's program_details filtered_results.append(doc) final_results = filtered_results[:top_k] self.cache_manager.set_query_cache(cache_key, final_results) return final_results def _merge_results(self, semantic_results: List[Document], bm25_results: List[Document], weight_semantic: float, top_k: int) -> List[Document]: """Merge semantic and BM25 results using weighted score fusion""" # Create dictionaries to store normalized scores semantic_scores = {} bm25_scores = {} # Normalize semantic scores max_semantic_score = max(doc.score for doc in semantic_results) if semantic_results else 1.0 for doc in semantic_results: semantic_scores[doc.id] = doc.score / max_semantic_score if max_semantic_score > 0 else 0 # Normalize BM25 scores max_bm25_score = max(doc.score for doc in bm25_results) if bm25_results else 1.0 for doc in bm25_results: bm25_scores[doc.id] = doc.score / max_bm25_score if max_bm25_score > 0 else 0 # Combine scores combined_scores = {} all_docs = {doc.id: doc for doc in semantic_results + bm25_results} for doc_id in all_docs: semantic_score = semantic_scores.get(doc_id, 0) bm25_score = bm25_scores.get(doc_id, 0) # Weighted combination combined_scores[doc_id] = ( weight_semantic * semantic_score + (1 - weight_semantic) * bm25_score ) # Sort by combined score and return top_k results sorted_docs = sorted( all_docs.values(), key=lambda x: combined_scores[x.id], reverse=True ) return sorted_docs[:top_k] def search_with_reranking(self, query: str, event_type: Optional[str] = None, detail_type: Optional[str] = None, semester: Optional[str] = None, top_k_initial: int = 20, top_k_final: int = 5, weight_semantic: float = 0.5) -> List[Document]: """ Two-stage retrieval with hybrid search followed by cross-encoder reranking """ # Generate cache key for the reranked query cache_key = json.dumps({ 'query': query, 'event_type': event_type, 'semester': semester, 'top_k_initial': top_k_initial, 'top_k_final': top_k_final, 'weight_semantic': weight_semantic, 'reranked': True # Indicate this is a reranked query }) # Check cache first cached_results = self.cache_manager.get_query_cache(cache_key) if cached_results is not None: return cached_results # 1. Get larger initial result set initial_results = self.hybrid_search( query=query, event_type=event_type, detail_type=detail_type, semester=semester, top_k=top_k_initial, weight_semantic=weight_semantic ) # If we don't have enough initial results, just return what we have if len(initial_results) <= top_k_final: return initial_results try: # We'll lazily initialize the cross encoder to save memory cross_encoder = SentenceTransformersCrossEncoder("cross-encoder/mmarco-mMiniLMv2-L12-H384-v1") pairs = [(query, doc.content) for doc in initial_results] scores = cross_encoder.predict(pairs) for doc, score in zip(initial_results, scores): doc.score = float(score) # Ensure score is a regular float reranked_results = sorted(initial_results, key=lambda x: x.score, reverse=True)[:top_k_final] # Cache the results self.cache_manager.set_query_cache(cache_key, reranked_results) return reranked_results except Exception as e: logger.error(f"Reranking failed: {str(e)}. Falling back to hybrid search results.") return initial_results[:top_k_final] class ResponseGenerator: """Generate responses with enhanced conversation context awareness""" def __init__(self, config: PipelineConfig): self.generator = OpenAIGenerator( api_key=Secret.from_token(config.model.openai_api_key), model=config.model.openai_model ) self.prompt_builder = PromptBuilder( template=""" คุณเป็นที่ปรึกษาทางวิชาการ กรุณาตอบคำถามต่อไปนี้โดยใช้ข้อมูลจากเอกสารที่ให้มาและพิจารณาบริบทจากประวัติการสนทนา {% if conversation_history %} ประวัติการสนทนา: {% for message in conversation_history %} {% if message.role == 'user' %} ผู้ใช้: {{ message.content }} {% else %} ที่ปรึกษา: {{ message.content }} {% endif %} {% endfor %} {% endif %} คำถามปัจจุบัน: {{query}} ข้อมูลที่เกี่ยวข้อง: {% for doc in context %} --- ประเภท: {{doc.meta.event_type}}{% if doc.meta.detail_type %}, รายละเอียด: {{doc.meta.detail_type}}{% endif %} เนื้อหา: {{doc.content}} {% endfor %} คำแนะนำในการตอบ: 1. ตอบเฉพาะข้อมูลที่มีในเอกสารเท่านั้น 2. หากไม่มีข้อมูลให้ตอบว่า "ขออภัย ไม่พบข้อมูลที่เกี่ยวข้องกับคำถามนี้" 3. หากข้อมูลไม่ชัดเจนให้ระบุว่าข้อมูลอาจไม่ครบถ้วน 4. จัดรูปแบบคำตอบให้อ่านง่าย ใช้หัวข้อและย่อหน้าตามความเหมาะสม 5. สำหรับคำถามเกี่ยวกับข้อกำหนดภาษาอังกฤษหรือขั้นตอนการสมัคร ให้อธิบายข้อมูลอย่างละเอียด 6. ใส่ข้อความ "หากมีข้อสงสัยเพิ่มเติม สามารถสอบถามได้" ท้ายคำตอบเสมอ 7. คำนึงถึงประวัติการสนทนาและให้คำตอบที่ต่อเนื่องกับบทสนทนาก่อนหน้า 8. หากคำถามอ้างอิงถึงข้อมูลในบทสนทนาก่อนหน้า (เช่น "แล้วอันนั้นล่ะ", "มีอะไรอีกบ้าง", "คำถามก่อนหน้า") ให้พิจารณาบริบทและตอบคำถามอย่างตรงประเด็น แต่ไม่ต้องแสดงคำถามก่อนหน้าในคำตอบ 9. กรณีคำถามมีความไม่ชัดเจน ใช้ประวัติการสนทนาเพื่อเข้าใจบริบทของคำถาม สำคัญ: ไม่ต้องใส่คำว่า "คำถามก่อนหน้าคือ [คำถามก่อนหน้า] และคำตอบคือ..." ในคำตอบของคุณ ให้ตอบคำถามโดยตรง กรุณาตอบเป็นภาษาไทย: """ ) def generate_response(self, query: str, documents: List[Document], query_info: Dict[str, Any], conversation_history: List[Dict[str, str]] = None) -> str: """Generate response using retrieved documents and conversation history""" try: # Enhanced handling of reference questions reference_keywords = ["ก่อนหน้านี้", "ก่อนหน้า", "ที่ผ่านมา", "คำถามก่อนหน้า", "คำถามที่แล้ว", "previous", "earlier", "before", "last time", "last question"] is_reference_question = any(keyword in query.lower() for keyword in reference_keywords) # For reference questions, we'll add additional prompting enhanced_context = conversation_history or [] result = self.prompt_builder.run( query=query, context=documents, format=query_info.get("response_format", "detailed"), conversation_history=enhanced_context, is_reference_question=is_reference_question ) response = self.generator.run(prompt=result["prompt"]) return response["replies"][0] except Exception as e: logger.error(f"Response generation failed: {str(e)}") return "ขออภัย ไม่สามารถประมวลผลคำตอบได้ในขณะนี้" class AdvancedQueryProcessor: """Process queries with better understanding""" def __init__(self, config: PipelineConfig): self.generator = OpenAIGenerator( api_key=Secret.from_token(config.model.openai_api_key), model=config.model.openai_model ) self.prompt_builder = PromptBuilder( template=""" คุณเป็นผู้ช่วย AI ที่เชี่ยวชาญด้านการศึกษาในประเทศไทย หน้าที่ของคุณคือการวิเคราะห์และจำแนกคำถามของผู้ใช้ให้ตรงกับหมวดหมู่ข้อมูลที่เหมาะสม ได้แก่: 1. **รายละเอียดโปรแกรมการศึกษา (program_details)**: ข้อมูลเกี่ยวกับหลักสูตร โปรแกรมการเรียนการสอน และโครงสร้างหลักสูตร 2. **ข้อมูลการติดต่อ (contact)**: ข้อมูลการติดต่อของหน่วยงานหรือบุคคลที่เกี่ยวข้องในสถาบันการศึกษา 3. **โครงสร้างหลักสูตร (curriculum)**: รายละเอียดเกี่ยวกับวิชาเรียน หน่วยกิต และแผนการศึกษา 4. **ค่าเล่าเรียน (fees)**: ข้อมูลเกี่ยวกับค่าใช้จ่ายในการศึกษา ค่าธรรมเนียม และทุนการศึกษา 5. **แผนการศึกษารายปี (study_plan)**: ข้อมูลแผนการเรียนแบ่งตามชั้นปีและภาคการศึกษา รายละเอียดรายวิชาที่ต้องลงทะเบียนในแต่ละเทอม และจำนวนหน่วยกิตรวม **คำถาม**: {{query}} **คำแนะนำในการวิเคราะห์**: - ตรวจสอบคำสำคัญในคำถามเพื่อระบุหมวดหมู่ที่สอดคล้อง - หากคำถามเกี่ยวข้องกับหลายหมวดหมู่ ให้จัดลำดับความสำคัญตามความต้องการของผู้ใช้ - หากไม่สามารถระบุหมวดหมู่ได้อย่างชัดเจน ให้จัดหมวดหมู่เป็น "อื่นๆ" และระบุความไม่แน่นอน **รูปแบบการตอบกลับ**: หมายเหตุ: - รูปแบบปีการศึกษาที่ยอมรับ: "ปีที่ 1", "ปี 1", "ชั้นปีที่ 1" - รูปแบบภาคการศึกษาที่ยอมรับ: "เทอมที่ 1", "เทอม 1", "ภาคการศึกษาที่ 1" - หากข้อมูลไม่ครบ ให้ระบุค่าสำหรับฟิลด์ที่ขาดหายเป็น null พร้อมข้อความแจ้งความไม่แน่นอน ให้ผลลัพธ์ในรูปแบบ JSON ตามโครงสร้าง: { "event_type": "program_details" | "contact" | "curriculum" | "fees" | "study_plan", "year": "ปีที่ X", // แปลงเป็นรูปแบบมาตรฐาน หรือ null หากไม่ระบุ "semester": "เทอมที่ X", // แปลงเป็นรูปแบบมาตรฐาน หรือ null หากไม่ระบุ "key_terms": ["คำสำคัญที่เกี่ยวข้อง"], "response_format": "detailed", "uncertainty": "low" // ระบุระดับความไม่แน่นอน (เช่น 'low', 'high') } ตัวอย่าง: Input: "โปรแกรมการศึกษามีรายละเอียดอะไรบ้าง" Output: { "event_type": "program_details", "year": null, "semester": null, "key_terms": ["โปรแกรมการศึกษา", "รายละเอียด"], "response_format": "detailed", "uncertainty": "low" } Input: "ฉันจะติดต่อภาควิชาได้อย่างไร" Output: { "event_type": "contact", "year": null, "semester": null, "key_terms": ["ติดต่อ", "ภาควิชา"], "response_format": "detailed", "uncertainty": "low" } Input: "โครงสร้างหลักสูตรของปี 2 เป็นอย่างไร" Output: { "event_type": "curriculum", "year": "ปีที่ 2", "semester": null, "key_terms": ["โครงสร้างหลักสูตร"], "response_format": "detailed", "uncertainty": "low" } Input: "ค่าเล่าเรียนสำหรับเทอม 1 เท่าไหร่" Output: { "event_type": "fees", "year": null, "semester": "เทอมที่ 1", "key_terms": ["ค่าเล่าเรียน", "เทอม 1"], "response_format": "detailed", "uncertainty": "low" } Input: "ปี 1 เทอม 1 ต้องเรียนอะไรบ้าง" Output: { "event_type": "study_plan", "year": null, "semester": null, "key_terms": ["เรียนอะไร", "เทอม"], "response_format": "detailed", "uncertainty": "low" } กรุณาตอบเป็นภาษาไทยและตรวจสอบให้แน่ใจว่า JSON มีโครงสร้างที่ถูกต้อง """ ) def normalize_year_semester(self, query: str) -> str: """Normalize year and semester formats in queries""" # Year patterns year_patterns = { r'ปี\s*(\d+)': r'ปีที่ \1', r'ชั้นปีที่\s*(\d+)': r'ปีที่ \1', r'ปีการศึกษาที่\s*(\d+)': r'ปีที่ \1' } # Semester patterns semester_patterns = { r'เทอม\s*(\d+)': r'เทอมที่ \1', r'ภาคเรียนที่\s*(\d+)': r'เทอมที่ \1', r'ภาคการศึกษาที่\s*(\d+)': r'เทอมที่ \1' } normalized_query = query for pattern, replacement in year_patterns.items(): normalized_query = re.sub(pattern, replacement, normalized_query) for pattern, replacement in semester_patterns.items(): normalized_query = re.sub(pattern, replacement, normalized_query) return normalized_query def normalize_query(self, query: str) -> str: """เพิ่มการเปลี่ยนแปลงคำ (synonym mapping) เพื่อลดปัญหา Vocabulary Mismatch""" normalized_query = self.normalize_year_semester(query) # เพิ่ม mapping สำหรับคำที่มีความหมายเดียวกัน synonyms = { "วิชาเลือก": "หมวดวิชาเลือก" # สามารถเพิ่มคำอื่น ๆ ได้ตามต้องการ } for original, replacement in synonyms.items(): normalized_query = normalized_query.replace(original, replacement) return normalized_query def _get_default_analysis(self, query: str) -> Dict[str, Any]: logger.info("Returning default analysis") return { "original_query": query, "event_type": None, "semester": None, "key_terms": [], "response_format": "detailed" } def process_query(self, query: str) -> Dict[str, Any]: """Enhanced query processing with support for detail types and better categorization.""" try: # ใช้ normalize_query ที่แก้ไขแล้วเพื่อให้คำค้นมีรูปแบบที่ตรงกับดัชนีข้อมูล normalized_query = self.normalize_query(query) result = self.prompt_builder.run(query=normalized_query) response = self.generator.run(prompt=result["prompt"]) if not response or not response.get("replies") or not response["replies"][0]: logger.warning("Received empty response from OpenAI") return self._get_default_analysis(query) # ทำความสะอาด JSON string json_str = response["replies"][0] json_str = json_str.replace("```json", "").replace("```", "").strip() analysis = json.loads(json_str) analysis['detail_type'] = None # Enhanced categorization with detail types if any(keyword in query.lower() for keyword in ['ภาษาอังกฤษ', 'toefl', 'ielts', 'swu-set', 'โทอิค', 'คะแนนภาษา']): analysis['event_type'] = 'program_details' elif any(keyword in query.lower() for keyword in ['สมัคร', 'ขั้นตอน', 'วิธีการ', 'เอกสาร', 'หลักฐาน', 'admission']): analysis['event_type'] = 'program_details' analysis['detail_type'] = None elif any(keyword in query.lower() for keyword in ['ค่าเทอม', 'ค่าธรรมเนียม', 'ค่าเรียน', 'ค่าปรับ', 'ค่าใช้จ่าย']): analysis['event_type'] = 'fees' elif any(keyword in query.lower() for keyword in ['หน่วยกิต', 'วิชา', 'หลักสูตร', 'แผนการเรียน', 'วิชาเลือก', 'วิชาบังคับ', 'วิชาหลัก', 'หมวดวิชา']): analysis['event_type'] = 'curriculum' return { "original_query": query, **analysis } except Exception as e: logger.error(f"Query processing failed: {str(e)}") return self._get_default_analysis(query) # First, let's modify the AcademicCalendarRAG class to maintain conversation history class AcademicCalendarRAG: """Enhanced RAG system for academic calendar and program information with conversation memory""" def __init__(self, config: PipelineConfig): self.config = config self.document_store = HybridDocumentStore(config) self.query_processor = AdvancedQueryProcessor(config) self.response_generator = ResponseGenerator(config) self.data_processor = CalendarDataProcessor() # Initialize conversation memory self.conversation_history = [] self.max_history_length = 5 # Keep last 5 exchanges (10 messages) # Initialize data containers self.calendar_events = [] self.program_details = [] self.contact_details = [] self.course_structure = [] self.study_plans = [] self.tuition_fees = [] def add_to_conversation(self, role: str, content: str): """Add a message to the conversation history""" self.conversation_history.append({"role": role, "content": content}) # Limit history length to prevent context overflow if len(self.conversation_history) > self.max_history_length * 2: # Each exchange is 2 messages self.conversation_history = self.conversation_history[-(self.max_history_length * 2):] def load_data(self, json_data: Dict): """Load and process all data sources""" try: raw_events = self.data_processor.parse_calendar_json(json_data) for event in raw_events: if not event.event_type: event.event_type = CalendarEvent.classify_event_type(event.activity) self.calendar_events.append(event) # Process other data types self.program_details = self.data_processor.extract_program_details(json_data) self.contact_details = self.data_processor.extract_contact_details(json_data) self.course_structure = self.data_processor.extract_course_structure(json_data) self.study_plans = self.data_processor.extract_program_study_plan(json_data) self.tuition_fees = self.data_processor.extract_fees(json_data) self.document_store.add_events( events=self.calendar_events, program_details=self.program_details, contact_details=self.contact_details, course_structure=self.course_structure, study_plans=self.study_plans, tuition_fees=self.tuition_fees ) except Exception as e: logger.error(f"Error loading data: {str(e)}") raise def process_query(self, query: str, conversation_history=None) -> Dict[str, Any]: """Process user query using conversation history and hybrid retrieval.""" # Use provided conversation history or the internal history if conversation_history is not None: self.conversation_history = conversation_history # Add the current query to history self.add_to_conversation("user", query) # Create a context-enhanced query by including relevant previous exchanges query_with_context = query if self.conversation_history and len(self.conversation_history) > 1: # Extract previous exchanges to provide context (up to 2 previous exchanges) prev_exchanges = self.conversation_history[:-1] if len(prev_exchanges) > 4: # Limit to last 2 exchanges (4 messages) prev_exchanges = prev_exchanges[-4:] context_str = "\n".join([f"{msg['role']}: {msg['content']}" for msg in prev_exchanges]) query_with_context = f"Previous conversation:\n{context_str}\n\nCurrent question: {query}" # Process with conversation context max_attempts = 4 # Allow up to 4 attempts attempt = 0 weight_values = [0.3, 0.7, 0.3, 0.7] # Switching semantic retrieval weight while attempt < max_attempts: attempt += 1 try: # Analyze query - use context-enhanced query for better understanding if attempt <= 2: query_info = self.query_processor.process_query(query_with_context if attempt == 1 else query) else: query_info = self.query_processor._get_default_analysis(query) logger.info(f"Retrying query processing (attempt {attempt}) with default analysis") weight_semantic = weight_values[attempt - 1] # Get relevant documents using hybrid search logger.info(f"Attempt {attempt}: Searching with weight_semantic={weight_semantic}") documents = self.document_store.hybrid_search( query=query_with_context if attempt == 1 else query, event_type=query_info.get("event_type"), detail_type=query_info.get("detail_type"), semester=query_info.get("semester"), top_k=self.config.retriever.top_k, weight_semantic=weight_semantic ) # Generate response with conversation context response = self.response_generator.generate_response( query=query, documents=documents, query_info=query_info, conversation_history=self.conversation_history ).strip() # If response indicates no relevant information, retry with adjusted approach if "ขออภัย ไม่พบข้อมูลที่เกี่ยวข้อง" in response and attempt < max_attempts: continue # Try again with new weight or default analysis # Add the response to conversation history self.add_to_conversation("assistant", response) return { "query": query, "answer": response, "relevant_docs": documents, "query_info": query_info } except Exception as e: logger.error(f"Error processing query: {str(e)}") return { "query": query, "answer": "ขออภัย ไม่สามารถประมวลผลคำตอบได้ในขณะนี้", "error": "Maximum retry attempts reached" } # def main(): # """Main function demonstrating hybrid retrieval""" # try: # # Load API key # with open("key.txt", "r") as f: # openai_api_key = f.read().strip() # # Create config with hybrid retrieval settings # config = create_default_config(openai_api_key) # pipeline = AcademicCalendarRAG(config) # # Load and process data # with open("raw-data.json", "r", encoding="utf-8") as f: # raw_data = json.load(f) # pipeline.load_data(raw_data) # # Test queries with different semantic weights # queries = ["ค่าเทอมเท่าไหร่","เปิดเรียนวันไหน","ขั้นตอนการสมัครที่สาขานี้มีอะไรบ้าง","ต้องใช้ระดับภาษาอังกฤษเท่าไหร่ในการสมัครเรียนที่นี้","ถ้าจะไปติดต่อมาหลายต้องลง mrt อะไร","มีวิชาหลักเเละวิชาเลือกออะไรบ้าง", "ปีที่ 1 เทอม 1 ต้องเรียนอะไรบ้าง", "ปีที่ 2 เทอม 1 ต้องเรียนอะไรบ้าง"] # # queries = ["ปีที่ 1 เทอม 1 ต้องเรียนอะไรบ้าง"] # print("=" * 80) # for query in queries: # print(f"\nQuery: {query}") # result = pipeline.process_query(query) # print(f"Answer: {result['answer']}") # print("-" * 40) # except Exception as e: # logger.error(f"Pipeline execution failed: {str(e)}") # raise # if __name__ == "__main__": # main()