from haystack import * from haystack.components.generators.openai import OpenAIGenerator from haystack.components.builders import PromptBuilder from haystack.components.embedders import SentenceTransformersDocumentEmbedder from haystack.components.retrievers.in_memory import * from haystack.document_stores.in_memory import * from haystack.utils import Secret from pathlib import Path import hashlib from datetime import * from typing import * from dataclasses import * import json import logging import re import pickle # Setup logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) @dataclass class ValidationResult: """Stores the result of a validation check""" is_valid: bool errors: List[str] warnings: List[str] normalized_data: Dict[str, str] @dataclass class ApplicationInfo: application_portal: str program_email: str @dataclass class RequiredDocument: name: str description: str conditions: Optional[str] = None @dataclass class SelectionStep: step_number: str description: str @dataclass class ProgramDetailInfo: application_info: ApplicationInfo required_documents: Dict[str, Dict[str, RequiredDocument]] submission_process: str selection_process: List[SelectionStep] @dataclass class Transportation: boat: str bts: str mrt: str airport_link: str bus: Dict[str, str] @dataclass class Contact: email: str facebook: Dict[str, str] @dataclass class ContactDetail: event_type: str department: str faculty: str university: str location: str contact: Contact transportation: Transportation @dataclass class Course: code: str title_th: str title_en: str credits: int @dataclass class CourseCategory: description: Optional[str] credits: Union[str, int] minimum_credits: Optional[int] courses: List[Course] @dataclass class CourseStructure: event_type: str program_name: str department: str total_credits: int degree_level: str structure: Dict[str, CourseCategory] @dataclass class StudyPlan: event_type: str years: Dict[str, Dict[str, Any]] @dataclass class RegularFee: amount: float currency: str period: str @dataclass class LatePaymentFee: amount: float currency: str @dataclass class TuitionFee: event_type: str regular_fee: RegularFee late_payment_fee: LatePaymentFee class OpenAIDateParser: """Uses OpenAI to parse complex Thai date formats""" def __init__(self, api_key: str, model: str = "gpt-4o"): self.generator = OpenAIGenerator( api_key=Secret.from_token(api_key), model=model ) self.prompt_builder = PromptBuilder( template=""" Parse the following Thai date range into a structured format: Date: {{date}} Return in JSON format: { "start_date": "YYYY-MM-DD", "end_date": "YYYY-MM-DD" (if range), "is_range": true/false } Notes: - Convert Buddhist Era (BE) to CE - Handle abbreviated Thai months - Account for date ranges with dashes - Return null for end_date if it's a single date Example inputs and outputs: Input: "จ 8 ก.ค. – จ 19 ส.ค. 67" Output: {"start_date": "2024-07-08", "end_date": "2024-08-19", "is_range": true} Input: "15 มกราคม 2567" Output: {"start_date": "2024-01-15", "end_date": null, "is_range": false} """ ) async def parse_date(self, date_str: str) -> Dict[str, Union[str, bool]]: """Parse complex Thai date format using OpenAI""" try: result = self.prompt_builder.run(date=date_str) response = await self.generator.arun(prompt=result["prompt"]) if not response or not response.get("replies"): raise ValueError("Empty response from OpenAI") parsed = json.loads(response["replies"][0]) for date_field in ['start_date', 'end_date']: if parsed.get(date_field): datetime.strptime(parsed[date_field], '%Y-%m-%d') return parsed except Exception as e: logger.error(f"OpenAI date parsing failed for '{date_str}': {str(e)}") raise ValueError(f"Could not parse date: {date_str}") class ThaiTextPreprocessor: """Handles Thai text preprocessing and normalization""" CHAR_MAP = {'ํา': 'ำ','์': '','–': '-','—': '-','٫': ',',} @classmethod def normalize_thai_text(cls, text: str) -> str: """Normalize Thai text by applying character mappings and spacing rules""" if not text: return text for old, new in cls.CHAR_MAP.items(): text = text.replace(old, new) text = re.sub(r'\s+', ' ', text.strip()) thai_digits = '๐๑๒๓๔๕๖๗๘๙' arabic_digits = '0123456789' for thai, arabic in zip(thai_digits, arabic_digits): text = text.replace(thai, arabic) return text class CalendarEventValidator: """Validates and preprocesses calendar events""" def __init__(self, openai_api_key: str): self.preprocessor = ThaiTextPreprocessor() self.date_parser = OpenAIDateParser(api_key=openai_api_key) async def validate_event(self, event: 'CalendarEvent') -> ValidationResult: """Validate a calendar event and return validation results""" errors = [] warnings = [] normalized_data = {} if event.date: try: parsed_date = await self.date_parser.parse_date(event.date) normalized_data['date'] = parsed_date['start_date'] if parsed_date['is_range'] and parsed_date['end_date']: range_note = f"ถึงวันที่ {parsed_date['end_date']}" if event.note: normalized_data['note'] = f"{event.note}; {range_note}" else: normalized_data['note'] = range_note except ValueError as e: errors.append(f"Invalid date format: {event.date}") else: errors.append("Date is required") if event.time: time_pattern = r'^([01]?[0-9]|2[0-3]):([0-5][0-9])$' if not re.match(time_pattern, event.time): errors.append(f"Invalid time format: {event.time}") normalized_data['time'] = event.time if event.activity: normalized_activity = self.preprocessor.normalize_thai_text(event.activity) if len(normalized_activity) < 3: warnings.append("Activity description is very short") normalized_data['activity'] = normalized_activity else: errors.append("Activity is required") valid_semesters = {'ภาคต้น', 'ภาคปลาย', 'ภาคฤดูร้อน'} if event.semester: normalized_semester = self.preprocessor.normalize_thai_text(event.semester) if normalized_semester not in valid_semesters: warnings.append(f"Unusual semester value: {event.semester}") normalized_data['semester'] = normalized_semester else: errors.append("Semester is required") valid_types = {'registration', 'deadline', 'examination', 'academic', 'holiday'} if event.event_type not in valid_types: errors.append(f"Invalid event type: {event.event_type}") normalized_data['event_type'] = event.event_type if event.note and 'note' not in normalized_data: normalized_data['note'] = self.preprocessor.normalize_thai_text(event.note) if event.section: normalized_data['section'] = self.preprocessor.normalize_thai_text(event.section) return ValidationResult( is_valid=len(errors) == 0, errors=errors, warnings=warnings, normalized_data=normalized_data ) @dataclass class CalendarEvent: """Structured representation of a calendar event with validation""" date: str time: str activity: str note: str semester: str event_type: str section: Optional[str] = None @staticmethod def classify_event_type(activity: str) -> str: """Classify event type based on activity description""" activity_lower = activity.lower() keywords = { 'registration': ['ลงทะเบียน', 'ชําระเงิน', 'ค่าธรรมเนียม', 'เปิดเรียน'], 'deadline': ['วันสุดท้าย', 'กําหนด', 'ภายใน', 'ต้องส่ง'], 'examination': ['สอบ', 'ปริญญานิพนธ์', 'วิทยานิพนธ์', 'สอบปากเปล่า'], 'holiday': ['วันหยุด', 'ชดเชย', 'เทศกาล'], } for event_type, terms in keywords.items(): if any(term in activity_lower for term in terms): return event_type return 'academic' async def initialize(self, openai_api_key: str): """Asynchronously validate and normalize the event""" validator = CalendarEventValidator(openai_api_key) result = await validator.validate_event(self) if not result.is_valid: raise ValueError(f"Invalid calendar event: {', '.join(result.errors)}") for field, value in result.normalized_data.items(): setattr(self, field, value) if result.warnings: logger.warning(f"Calendar event warnings: {', '.join(result.warnings)}") def to_searchable_text(self) -> str: """Convert event to searchable text format""" return f""" ภาคการศึกษา: {self.semester} ประเภท: {self.event_type} วันที่: {self.date} เวลา: {self.time or '-'} กิจกรรม: {self.activity} หมวดหมู่: {self.section or '-'} หมายเหตุ: {self.note or '-'} """.strip() class CacheManager: """Manages caching for different components of the RAG pipeline""" def __init__(self, cache_dir: Path, ttl: int = 3600): """ Initialize CacheManager """ self.cache_dir = cache_dir self.ttl = ttl self.embeddings_cache = self._load_cache("embeddings") self.query_cache = self._load_cache("queries") self.document_cache = self._load_cache("documents") def _generate_key(self, data: Union[str, Dict, Any]) -> str: """Generate a unique cache key""" if isinstance(data, str): content = data.encode('utf-8') else: content = json.dumps(data, sort_keys=True).encode('utf-8') return hashlib.md5(content).hexdigest() def _load_cache(self, cache_type: str) -> Dict: """Load cache from disk""" cache_path = self.cache_dir / f"{cache_type}_cache.pkl" if cache_path.exists(): try: with open(cache_path, 'rb') as f: cache = pickle.load(f) self._clean_expired_entries(cache) return cache except Exception as e: logger.warning(f"Failed to load {cache_type} cache: {e}") return {} return {} def _save_cache(self, cache_type: str, cache_data: Dict): """Save cache to disk""" cache_path = self.cache_dir / f"{cache_type}_cache.pkl" try: with open(cache_path, 'wb') as f: pickle.dump(cache_data, f) except Exception as e: logger.error(f"Failed to save {cache_type} cache: {e}") def _clean_expired_entries(self, cache: Dict): """Remove expired cache entries""" current_time = datetime.now() expired_keys = [ key for key, (_, timestamp) in cache.items() if current_time - timestamp > timedelta(seconds=self.ttl) ] for key in expired_keys: del cache[key] def get_embedding_cache(self, text: str) -> Optional[Any]: """Get cached embedding for text""" key = self._generate_key(text) if key in self.embeddings_cache: embedding, timestamp = self.embeddings_cache[key] if datetime.now() - timestamp <= timedelta(seconds=self.ttl): return embedding return None def set_embedding_cache(self, text: str, embedding: Any): """Cache embedding for text""" key = self._generate_key(text) self.embeddings_cache[key] = (embedding, datetime.now()) self._save_cache("embeddings", self.embeddings_cache) def get_query_cache(self, query: str) -> Optional[Dict]: """Get cached query results""" key = self._generate_key(query) if key in self.query_cache: result, timestamp = self.query_cache[key] if datetime.now() - timestamp <= timedelta(seconds=self.ttl): return result return None def set_query_cache(self, query: str, result: Dict): """Cache query results""" key = self._generate_key(query) self.query_cache[key] = (result, datetime.now()) self._save_cache("queries", self.query_cache) def set_document_cache(self, doc_id: str, document: Any): """Cache document""" self.document_cache[doc_id] = (document, datetime.now()) self._save_cache("documents", self.document_cache) @dataclass class ModelConfig: openai_api_key: str embedder_model: str = "sentence-transformers/paraphrase-multilingual-mpnet-base-v2" openai_model: str = "gpt-4o" temperature: float = 0.7 @dataclass class RetrieverConfig: top_k: int = 5 @dataclass class CacheConfig: enabled: bool = True cache_dir: Path = Path("./cache") ttl: int = 86400 # 24 hours @dataclass class ProcessingConfig: batch_size: int = 32 @dataclass class LocalizationConfig: enable_thai_normalization: bool = True @dataclass class PipelineConfig: model: ModelConfig retriever: RetrieverConfig = field(default_factory=RetrieverConfig) cache: CacheConfig = field(default_factory=CacheConfig) processing: ProcessingConfig = field(default_factory=ProcessingConfig) localization: LocalizationConfig = field(default_factory=LocalizationConfig) def create_default_config(api_key: str) -> PipelineConfig: """ Create a default pipeline configuration with optimized settings for Thai language processing. Args: api_key (str): OpenAI API key Returns: PipelineConfig: Configured pipeline settings """ return PipelineConfig( model=ModelConfig( openai_api_key=api_key, temperature=0.3 # Lower temperature for more focused responses ), retriever=RetrieverConfig( top_k=5 # Optimal number of documents to retrieve ), cache=CacheConfig( enabled=True, cache_dir=Path("./cache"), ttl=86400 # 24 hour cache ), processing=ProcessingConfig( batch_size=32 # Default batch size for processing ), localization=LocalizationConfig( enable_thai_normalization=True # Enable Thai text normalization ) ) class CalendarDataProcessor: """Process and structure calendar data from the new raw-data.json format""" @staticmethod def parse_calendar_json(json_data: Dict) -> List[CalendarEvent]: """Parse the new calendar JSON format into CalendarEvent objects""" events = [] # Extract academic calendar data - handle direct dictionary input calendar_data = json_data.get('academic_calendar', []) if isinstance(json_data, dict) else json_data for semester_block in calendar_data: semester = semester_block.get('education', '') schedule = semester_block.get('schedule', []) # Handle regular schedule events for event in schedule: if 'section' in event and 'details' in event: # Process section-based events (thesis deadlines, etc.) section = event['section'] for detail in event['details']: if 'ภาคต้น' in detail and 'ภาคปลาย' in detail: # Handle dual-semester events for sem_key in ['ภาคต้น', 'ภาคปลาย']: if detail.get(sem_key): events.append(CalendarEvent( date=detail[sem_key], time='', activity=detail['title'], note=section, semester=sem_key, event_type='deadline', section=section )) else: # Single semester event events.append(CalendarEvent( date=detail.get('date', ''), time='', activity=detail.get('title', ''), note=section, semester=ThaiTextPreprocessor.normalize_thai_text(semester), event_type='deadline', section=section )) else: # Regular calendar event event_type = CalendarEvent.classify_event_type(event.get('activity', '')) # Clean semester string cleaned_semester = semester if '(' in semester: match = re.search(r'\((.*?)\)', semester) if match: cleaned_semester = match.group(1) cleaned_semester = ThaiTextPreprocessor.normalize_thai_text(cleaned_semester) events.append(CalendarEvent( date=event.get('date', ''), time=event.get('time', ''), activity=event.get('activity', ''), note=event.get('note', ''), semester=cleaned_semester, event_type=event_type )) return events @staticmethod def extract_program_details(json_data: Dict) -> ProgramDetailInfo: """Extract and structure program details into ProgramDetailInfo object""" raw_details = json_data.get('program_details', {}) # Process application info app_info_data = raw_details.get('application_info', {}) app_info = ApplicationInfo( application_portal=app_info_data.get('application_portal', ''), program_email=app_info_data.get('program_email', '') ) # Process required documents req_docs = {} raw_docs = raw_details.get('required_documents', {}) # Process mandatory documents mandatory_docs = {} for doc_key, doc_value in raw_docs.get('mandatory', {}).items(): mandatory_docs[doc_key] = RequiredDocument( name=doc_key, description=doc_value ) req_docs['mandatory'] = mandatory_docs # Process optional documents optional_docs = {} for doc_key, doc_data in raw_docs.get('optional', {}).items(): if doc_key == 'english_proficiency': ep_data = doc_data optional_docs[doc_key] = RequiredDocument( name=ep_data.get('name', ''), description=str(ep_data.get('accepted_tests', {})), conditions=f"Validity: {ep_data.get('validity', '')}, Benefits: {ep_data.get('benefits', '')}, Exemptions: {ep_data.get('exemptions', '')}" ) else: optional_docs[doc_key] = RequiredDocument( name=doc_data.get('name', ''), description='', conditions=doc_data.get('condition', '') ) req_docs['optional'] = optional_docs # Process selection steps selection_steps = [] for step_data in raw_details.get('selection_process', {}).get('steps', []): for step_num, description in step_data.items(): selection_steps.append(SelectionStep( step_number=step_num, description=description )) return [ProgramDetailInfo( application_info=app_info, required_documents=req_docs, submission_process=raw_details.get('submission_process', ''), selection_process=selection_steps )] @staticmethod def extract_contact_details(json_data: Dict) -> List[ContactDetail]: """Extract and structure contact details into ContactDetail objects""" raw_contacts = json_data.get('contact_details', []) contact_details = [] # Handle the case where raw_contacts might be a single object instead of a list if not isinstance(raw_contacts, list): raw_contacts = [raw_contacts] for contact_data in raw_contacts: # Skip if contact_data is not a dictionary if not isinstance(contact_data, dict): continue try: # Process transportation data transportation_data = contact_data.get('transportation', {}) transportation = Transportation( boat=transportation_data.get('boat', ''), bts=transportation_data.get('bts', ''), mrt=transportation_data.get('mrt', ''), airport_link=transportation_data.get('airport_link', ''), bus=transportation_data.get('bus', {}) ) # Process contact information contact_info = Contact( email=contact_data.get('email', ''), facebook=contact_data.get('facebook', {}) ) # Create ContactDetail object contact_details.append(ContactDetail( event_type=contact_data.get('event_type', ''), department=contact_data.get('department', ''), faculty=contact_data.get('faculty', ''), university=contact_data.get('university', ''), location=contact_data.get('location', ''), contact=contact_info, transportation=transportation )) except Exception as e: print(f"Error processing contact data: {e}") continue return contact_details @staticmethod def extract_course_structure(json_data: Dict) -> List[CourseStructure]: """Extract and structure course information into CourseStructure objects""" course_structures = [] # Get course structure data course_data = json_data.get('course_structure', {}) program_metadata = course_data.get('program_metadata', {}) curriculum = course_data.get('curriculum_structure', {}) # Process foundation courses foundation_data = curriculum.get('foundation_courses', {}) foundation_courses = [] for course in foundation_data.get('courses', []): foundation_courses.append(Course( code=course.get('code', ''), title_th=course.get('title', {}).get('th', ''), title_en=course.get('title', {}).get('en', ''), credits=course.get('credits', 0) )) # Process core courses core_data = curriculum.get('core_courses', {}) core_courses = [] for course in core_data.get('modules', []): core_courses.append(Course( code=course.get('code', ''), title_th=course.get('title', {}).get('th', ''), title_en=course.get('title', {}).get('en', ''), credits=course.get('credits', 0) )) # Process elective courses elective_data = curriculum.get('electives', {}) elective_courses = [] for course in elective_data.get('course_groups', []): elective_courses.append(Course( code=course.get('code', ''), title_th=course.get('title', {}).get('th', ''), title_en=course.get('title', {}).get('en', ''), credits=course.get('credits', 0) )) # Process research courses research_data = curriculum.get('research', {}) research_courses = [] for course in research_data.get('course', []): research_courses.append(Course( code=course.get('code', ''), title_th=course.get('title', {}).get('th', ''), title_en=course.get('title', {}).get('en', ''), credits=course.get('credits', 0) )) # Create course categories structure = { 'หมวดวิชาปรับพื้นฐาน': CourseCategory( # Previously foundation_courses description=foundation_data.get('metadata', {}).get('description'), credits=foundation_data.get('metadata', {}).get('credits', 'non-credit'), minimum_credits=None, courses=foundation_courses ), 'หมวดวิชาบังคับ': CourseCategory( # Previously core_courses description=None, credits=0, minimum_credits=core_data.get('minimum_requirement_credits'), courses=core_courses ), 'หมวดวิชาเลือก': CourseCategory( # Previously elective_courses description=None, credits=0, minimum_credits=elective_data.get('minimum_requirement_credits'), courses=elective_courses ), 'หมวดวิชาการค้นคว้าอิสระ': CourseCategory( # Previously research_courses description=None, credits=0, minimum_credits=research_data.get('minimum_requirement_credits'), courses=research_courses ) } # Create course structure course_structure = CourseStructure( event_type='curriculum_structure', program_name=program_metadata.get('name', ''), department=program_metadata.get('department', ''), total_credits=program_metadata.get('total_credits', 0), degree_level=program_metadata.get('degree_level', ''), structure=structure ) return [course_structure] @staticmethod def extract_program_study_plan(json_data: Dict) -> List[StudyPlan]: """Extract and structure study plan information into StudyPlan objects""" study_plan_data = json_data.get('program_study_plan', {}) # Initialize the years dictionary to store all year/semester data years_dict = {} for year_key, year_data in study_plan_data.items(): years_dict[year_key] = {} for semester_key, semester_data in year_data.items(): # Get metadata metadata = semester_data.get('metadata', {}) # Initialize semester structure semester_struct = { 'metadata': metadata, 'courses': [] } # Handle both 'modules' and 'courses' keys course_data = semester_data.get('modules', []) or semester_data.get('courses', []) # Add courses to semester for course in course_data: course_info = { 'code': course.get('code', ''), 'title': course.get('title', {'th': '', 'en': ''}), 'credits': course.get('credits', 0) } semester_struct['courses'].append(course_info) # Add semester data to year years_dict[year_key][semester_key] = semester_struct # Create StudyPlan object study_plan = StudyPlan( event_type='study_plan', years=years_dict ) return [study_plan] @staticmethod def extract_fees(json_data: Dict) -> List[TuitionFee]: """Extract and structure fee information into TuitionFee objects""" fees_data = json_data.get('fees', {}) # Parse regular tuition fee regular_fee_str = fees_data.get('tuition', '') regular_amount = float(regular_fee_str.split()[0]) if regular_fee_str else 0 regular_fee = RegularFee( amount=regular_amount, currency='THB', period='per semester' ) # Parse late payment fee late_fee_str = fees_data.get('late_payment', '') late_amount = float(late_fee_str.split()[0]) if late_fee_str else 0 late_payment_fee = LatePaymentFee( amount=late_amount, currency='THB' ) # Create TuitionFee object tuition_fee = TuitionFee( event_type='tuition_fee', regular_fee=regular_fee, late_payment_fee=late_payment_fee ) return [tuition_fee] class HybridDocumentStore: """Enhanced document store with hybrid retrieval capabilities""" def __init__(self, config: PipelineConfig): self.store = InMemoryDocumentStore() self.embedder = SentenceTransformersDocumentEmbedder( model=config.model.embedder_model ) # Initialize BM25 retriever self.bm25_retriever = InMemoryBM25Retriever( document_store=self.store, top_k=config.retriever.top_k ) # Initialize embedding retriever self.embedding_retriever = InMemoryEmbeddingRetriever( document_store=self.store, top_k=config.retriever.top_k ) self.cache_manager = CacheManager( cache_dir=config.cache.cache_dir, ttl=config.cache.ttl ) self.embedder.warm_up() # Initialize containers self.events = [] self.event_type_index = {} self.semester_index = {} self._document_counter = 0 # Additional data containers self.course_data = [] self.contact_data = [] self.study_plan_data = [] def _generate_unique_id(self) -> str: """Generate a unique document ID""" self._document_counter += 1 timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") return f"doc_{timestamp}_{self._document_counter}" def _compute_embedding(self, text: str) -> Any: """Compute embedding with caching""" cached_embedding = self.cache_manager.get_embedding_cache(text) if cached_embedding is not None: return cached_embedding doc = Document(content=text) embedding = self.embedder.run(documents=[doc])["documents"][0].embedding self.cache_manager.set_embedding_cache(text, embedding) return embedding def add_document(self, text: str, event_type: str): """Add a single document to the store""" try: # Compute embedding embedding = self._compute_embedding(text) # Create document with unique ID doc = Document( id=self._generate_unique_id(), content=text, embedding=embedding, meta={'event_type': event_type} ) # Write document self.store.write_documents([doc]) # Cache document self.cache_manager.set_document_cache(doc.id, doc) except Exception as e: logger.error(f"Error adding document: {str(e)}") raise def add_events(self, events: List[CalendarEvent], contact_details: Optional[List[ContactDetail]] = None, course_structure: Optional[List[CourseStructure]] = None, study_plans: Optional[List[StudyPlan]] = None): """Add events and additional data with caching""" documents = [] added_events = set() # Track added events to prevent duplicates # Process calendar events for event in events: event_key = f"{event.date}_{event.activity}_{event.semester}" if event_key not in added_events: added_events.add(event_key) self.events.append(event) event_idx = len(self.events) - 1 # Update indices if event.event_type not in self.event_type_index: self.event_type_index[event.event_type] = [] self.event_type_index[event.event_type].append(event_idx) if event.semester not in self.semester_index: self.semester_index[event.semester] = [] self.semester_index[event.semester].append(event_idx) # Create document text = event.to_searchable_text() embedding = self._compute_embedding(text) doc = Document( id=self._generate_unique_id(), content=text, embedding=embedding, meta={ 'event_type': event.event_type, 'semester': event.semester, 'date': event.date, 'event_idx': event_idx } ) documents.append(doc) self.cache_manager.set_document_cache(str(event_idx), doc) # Process contact details if contact_details: for contact in contact_details: self.contact_data.append(contact) text = f""" ข้อมูลการติดต่อ: คณะ: {contact.faculty} ภาควิชา: {contact.department} มหาวิทยาลัย: {contact.university} สถานที่: {contact.location} การติดต่อ: อีเมล: {contact.contact.email} Facebook: {json.dumps(contact.contact.facebook, ensure_ascii=False)} การเดินทาง: เรือ: {contact.transportation.boat} BTS: {contact.transportation.bts} MRT: {contact.transportation.mrt} Airport Link: {contact.transportation.airport_link} รถประจำทาง: {json.dumps(contact.transportation.bus, ensure_ascii=False)} """ embedding = self._compute_embedding(text) doc = Document( id=self._generate_unique_id(), content=text, embedding=embedding, meta={'event_type': 'contact'} ) documents.append(doc) # Process course structure if course_structure: for course in course_structure: self.course_data.append(course) text = f""" โครงสร้างหลักสูตร: ชื่อหลักสูตร: {course.program_name} ภาควิชา: {course.department} หน่วยกิตรวม: {course.total_credits} ระดับการศึกษา: {course.degree_level} รายละเอียดโครงสร้าง: """ for category_name, category in course.structure.items(): text += f"\n{category_name}:\n" if category.description: text += f"คำอธิบาย: {category.description}\n" text += f"หน่วยกิต: {category.credits}\n" if category.minimum_credits: text += f"หน่วยกิตขั้นต่ำ: {category.minimum_credits}\n" text += "รายวิชา:\n" for course_item in category.courses: text += f"- {course_item.code}: {course_item.title_th} ({course_item.title_en}) - {course_item.credits} หน่วยกิต\n" embedding = self._compute_embedding(text) doc = Document( id=self._generate_unique_id(), content=text, embedding=embedding, meta={'event_type': 'curriculum'} ) documents.append(doc) # Process study plans if study_plans: for plan in study_plans: self.study_plan_data.append(plan) text = "แผนการศึกษา:\n" for year, semesters in plan.years.items(): text += f"\nปีที่ {year}:\n" for semester, data in semesters.items(): text += f"\n{semester}:\n" if 'metadata' in data and data['metadata']: text += f"ข้อมูลเพิ่มเติม: {json.dumps(data['metadata'], ensure_ascii=False)}\n" if 'courses' in data: for course in data['courses']: text += f"- {course['code']}: {course['title'].get('th', '')} ({course['title'].get('en', '')}) - {course['credits']} หน่วยกิต\n" embedding = self._compute_embedding(text) doc = Document( id=self._generate_unique_id(), content=text, embedding=embedding, meta={'event_type': 'study_plan'} ) documents.append(doc) batch_size = 10 for i in range(0, len(documents), batch_size): batch = documents[i:i + batch_size] try: self.store.write_documents(batch) except Exception as e: logger.error(f"Error writing document batch {i//batch_size + 1}: {str(e)}") for doc in batch: try: self.store.write_documents([doc]) except Exception as e2: logger.error(f"Failed to write document {doc.id}: {str(e2)}") def hybrid_search(self, query: str, event_type: Optional[str] = None, semester: Optional[str] = None, top_k: int = 10, weight_semantic: float = 0.5) -> List[Document]: """Hybrid search combining semantic and lexical search results""" cache_key = json.dumps({ 'query': query, 'event_type': event_type, 'semester': semester, 'top_k': top_k, 'weight_semantic': weight_semantic }) cached_results = self.cache_manager.get_query_cache(cache_key) if cached_results is not None: return cached_results # Get semantic search results query_embedding = self._compute_embedding(query) semantic_results = self.embedding_retriever.run( query_embedding=query_embedding )["documents"] # Get BM25 results bm25_results = self.bm25_retriever.run( query=query )["documents"] # Combine results using score fusion combined_results = self._merge_results( semantic_results=semantic_results, bm25_results=bm25_results, weight_semantic=weight_semantic, top_k=top_k ) # Filter results based on metadata filtered_results = [] for doc in combined_results: if event_type and doc.meta.get('event_type') != event_type: continue if semester and doc.meta.get('semester') != semester: continue filtered_results.append(doc) final_results = filtered_results[:top_k] self.cache_manager.set_query_cache(cache_key, final_results) return final_results def _merge_results(self, semantic_results: List[Document], bm25_results: List[Document], weight_semantic: float, top_k: int) -> List[Document]: """Merge semantic and BM25 results using weighted score fusion""" # Create dictionaries to store normalized scores semantic_scores = {} bm25_scores = {} # Normalize semantic scores max_semantic_score = max(doc.score for doc in semantic_results) if semantic_results else 1.0 for doc in semantic_results: semantic_scores[doc.id] = doc.score / max_semantic_score if max_semantic_score > 0 else 0 # Normalize BM25 scores max_bm25_score = max(doc.score for doc in bm25_results) if bm25_results else 1.0 for doc in bm25_results: bm25_scores[doc.id] = doc.score / max_bm25_score if max_bm25_score > 0 else 0 # Combine scores combined_scores = {} all_docs = {doc.id: doc for doc in semantic_results + bm25_results} for doc_id in all_docs: semantic_score = semantic_scores.get(doc_id, 0) bm25_score = bm25_scores.get(doc_id, 0) # Weighted combination combined_scores[doc_id] = ( weight_semantic * semantic_score + (1 - weight_semantic) * bm25_score ) # Sort by combined score and return top_k results sorted_docs = sorted( all_docs.values(), key=lambda x: combined_scores[x.id], reverse=True ) return sorted_docs[:top_k] class AdvancedQueryProcessor: """Process queries with better understanding""" def __init__(self, config: PipelineConfig): self.generator = OpenAIGenerator( api_key=Secret.from_token(config.model.openai_api_key), model=config.model.openai_model ) self.prompt_builder = PromptBuilder( template=""" วิเคราะห์คำถามที่เกี่ยวข้องกับปฏิทินการศึกษา (ภาษาไทย): คำถาม: {{query}} ระบุ: 1. ประเภทของข้อมูลที่ต้องการค้นหา 2. ภาคการศึกษาที่ระบุไว้ (ถ้ามี) 3. คำสำคัญที่เกี่ยวข้อง ให้ผลลัพธ์ในรูปแบบ JSON: { "event_type": "ลงทะเบียน|กำหนดเวลา|การสอบ|วิชาการ|วันหยุด", "semester": "ภาคการศึกษาที่ระบุ หรือ null", "key_terms": ["คำสำคัญ 3 คำที่สำคัญที่สุด"], "response_format": "รายการ|คำตอบเดียว|คำตอบละเอียด" } """ ) def _get_default_analysis(self, query: str) -> Dict[str, Any]: """Return default analysis when processing fails""" logger.info("Returning default analysis") return { "original_query": query, "event_type": None, "semester": None, "key_terms": [], "response_format": "detailed" } def process_query(self, query: str) -> Dict[str, Any]: """Enhanced query processing with better error handling.""" try: result = self.prompt_builder.run(query=query) response = self.generator.run(prompt=result["prompt"]) if not response or not response.get("replies") or not response["replies"][0]: logger.warning("Received empty response from OpenAI") return self._get_default_analysis(query) try: analysis = json.loads(response["replies"][0]) except json.JSONDecodeError as je: return self._get_default_analysis(query) # **Ensure course-related queries retrieve study plans & curricula** course_keywords = ['หน่วยกิต', 'วิชา', 'หลักสูตร', 'แผนการเรียน', 'วิชาเลือก', 'วิชาบังคับ', 'วิชาการค้นคว้า', 'วิชาหลัก'] if any(keyword in query for keyword in course_keywords): analysis['event_type'] = 'curriculum' # **Ensure fee-related queries retrieve tuition fee documents** fee_keywords = ['ค่าเทอม', 'ค่าธรรมเนียม', 'ค่าเรียน', 'ค่าปรับ'] if any(keyword in query for keyword in fee_keywords): analysis['event_type'] = 'fees' return { "original_query": query, **analysis } except Exception as e: logger.error(f"Query processing failed: {str(e)}") return self._get_default_analysis(query) class ResponseGenerator: """Generate responses with better context utilization""" def __init__(self, config: PipelineConfig): self.generator = OpenAIGenerator( api_key=Secret.from_token(config.model.openai_api_key), model=config.model.openai_model ) self.prompt_builder = PromptBuilder( template=""" คุณเป็นที่ปรึกษาทางวิชาการ กรุณาตอบคำถามต่อไปนี้โดยใช้ข้อมูลจากปฏิทินการศึกษาที่ให้มา คำถาม: {{query}} ข้อมูลที่เกี่ยวข้องจากปฏิทินการศึกษา: {% for doc in context %} --- {{doc.content}} {% endfor %} **ห้ามเดาข้อมูลเอง ถ้าไม่มีข้อมูลให้ตอบว่า "ไม่มีข้อมูลที่ตรงกับคำถาม"** กรุณาตอบเป็นภาษาไทย: ต้องบอกเสมอว่า **หากมีข้อสงสัยเพิ่มเติมสามารถสอบถามได้** """ ) def generate_response(self, query: str, documents: List[Document], query_info: Dict[str, Any]) -> str: """Generate response using retrieved documents""" try: result = self.prompt_builder.run( query=query, context=documents, format=query_info["response_format"] ) response = self.generator.run(prompt=result["prompt"]) return response["replies"][0] except Exception as e: logger.error(f"Response generation failed: {str(e)}") return "ขออภัย ไม่สามารถประมวลผลคำตอบได้ในขณะนี้" class AcademicCalendarRAG: """Enhanced RAG system for academic calendar and program information""" def __init__(self, config: PipelineConfig): self.config = config self.document_store = HybridDocumentStore(config) # Use the new hybrid store self.query_processor = AdvancedQueryProcessor(config) self.response_generator = ResponseGenerator(config) self.data_processor = CalendarDataProcessor() # Initialize data containers self.calendar_events = [] self.program_details = [] self.contact_details = [] self.course_structure = [] self.study_plans = [] self.tuition_fees = [] def load_data(self, json_data: Dict): """Load and process all data sources""" try: raw_events = self.data_processor.parse_calendar_json(json_data) for event in raw_events: if not event.event_type: event.event_type = CalendarEvent.classify_event_type(event.activity) self.calendar_events.append(event) # Process other data types self.program_details = self.data_processor.extract_program_details(json_data) self.contact_details = self.data_processor.extract_contact_details(json_data) self.course_structure = self.data_processor.extract_course_structure(json_data) self.study_plans = self.data_processor.extract_program_study_plan(json_data) self.tuition_fees = self.data_processor.extract_fees(json_data) self._add_calendar_events() self._add_program_info() except Exception as e: logger.error(f"Error loading data: {str(e)}") raise def _add_calendar_events(self): """Add calendar events and other data to document store""" if self.calendar_events: self.document_store.add_events( events=self.calendar_events, contact_details=self.contact_details, course_structure=self.course_structure, study_plans=self.study_plans ) def _add_program_info(self): """Enhanced method to add program-related information to document store""" if self.program_details: for detail in self.program_details: text = f""" ข้อมูลการสมัคร: เว็บไซต์รับสมัคร: {detail.application_info.application_portal} อีเมล: {detail.application_info.program_email} เอกสารที่ต้องใช้: {self._format_required_docs(detail.required_documents)} ขั้นตอนการส่งเอกสาร: {detail.submission_process} ขั้นตอนการคัดเลือก: {self._format_selection_steps(detail.selection_process)} """ self.document_store.add_document(text, "program_details") if self.tuition_fees: for fee in self.tuition_fees: text = f""" ค่าธรรมเนียมการศึกษา: ค่าเล่าเรียนปกติ: {fee.regular_fee.amount:,.2f} {fee.regular_fee.currency} {fee.regular_fee.period} ค่าปรับชำระล่าช้า: {fee.late_payment_fee.amount:,.2f} {fee.late_payment_fee.currency} """ self.document_store.add_document(text, "fees") def _format_required_docs(self, docs: Dict) -> str: """Format required documents information with detailed English proficiency requirements""" result = [] if 'mandatory' in docs: result.append("เอกสารที่ต้องใช้:") for doc in docs['mandatory'].values(): result.append(f"- {doc.name}: {doc.description}") if 'optional' in docs: result.append("\nเอกสารเพิ่มเติม:") for doc_key, doc in docs['optional'].items(): if doc_key == 'english_proficiency': result.append(f"- {doc.name}") # Parse and format the accepted tests try: accepted_tests = eval(doc.description) result.append(" เกณฑ์คะแนนที่ยอมรับ:") for test, requirement in accepted_tests.items(): result.append(f" * {test}: {requirement}") except: result.append(f" {doc.description}") if doc.conditions: conditions = doc.conditions.split(', ') for condition in conditions: result.append(f" {condition}") else: desc = f"- {doc.name}" if doc.conditions: desc += f" ({doc.conditions})" result.append(desc) return "\n".join(result) def _format_selection_steps(self, steps: List[SelectionStep]) -> str: """Format selection process steps""" return "\n".join(f"{step.step_number}. {step.description}" for step in steps) def _get_fee_documents(self) -> List[Document]: """Get fee-related documents""" if not self.tuition_fees: return [] documents = [] for fee in self.tuition_fees: text = f""" ค่าธรรมเนียมการศึกษา: - ค่าเล่าเรียน: {fee.regular_fee.amount:,.2f} {fee.regular_fee.currency} {fee.regular_fee.period} - ค่าปรับชำระล่าช้า: {fee.late_payment_fee.amount:,.2f} {fee.late_payment_fee.currency} """ doc = Document( content=text, meta={"event_type": "fees"} ) documents.append(doc) return documents def process_query(self, query: str, weight_semantic: float = 0.5) -> Dict[str, Any]: """Process user query using hybrid retrieval""" try: # Analyze query query_info = self.query_processor.process_query(query) # Get relevant documents using hybrid search documents = self.document_store.hybrid_search( query=query, event_type=query_info.get("event_type"), semester=query_info.get("semester"), top_k=self.config.retriever.top_k, weight_semantic= 0.3 ) # Add fee information for fee-related queries if query_info.get("event_type") == "fees" and self.tuition_fees: fee_docs = self._get_fee_documents() documents.extend(fee_docs) # Generate response response = self.response_generator.generate_response( query=query, documents=documents, query_info=query_info ) return { "query": query, "answer": response, "relevant_docs": documents, "query_info": query_info } except Exception as e: logger.error(f"Error processing query: {str(e)}") return { "query": query, "answer": "ขออภัย ไม่สามารถประมวลผลคำตอบได้ในขณะนี้", "error": str(e) } # def main(): # """Main function demonstrating hybrid retrieval""" # try: # # Load API key # with open("key.txt", "r") as f: # openai_api_key = f.read().strip() # # Create config with hybrid retrieval settings # config = create_default_config(openai_api_key) # pipeline = AcademicCalendarRAG(config) # # Load and process data # with open("raw-data.json", "r", encoding="utf-8") as f: # raw_data = json.load(f) # pipeline.load_data(raw_data) # # Test queries with different semantic weights # queries = ["มีวิชาหลักหรือวิชาเลือกอะไรบ้าง"] # print("=" * 80) # for query in queries: # print(f"\nQuery: {query}") # result = pipeline.process_query(query) # print(f"Answer: {result['answer']}") # print("-" * 40) # except Exception as e: # logger.error(f"Pipeline execution failed: {str(e)}") # raise # if __name__ == "__main__": # main()