|
from haystack import * |
|
from haystack.components.generators.openai import OpenAIGenerator |
|
from haystack.components.builders import PromptBuilder |
|
from haystack.components.embedders import SentenceTransformersDocumentEmbedder |
|
from haystack.components.retrievers.in_memory import * |
|
from haystack.document_stores.in_memory import InMemoryDocumentStore |
|
from haystack.utils import Secret |
|
from pathlib import Path |
|
import hashlib |
|
from datetime import * |
|
from typing import * |
|
from dataclasses import * |
|
import json |
|
import logging |
|
import re |
|
import pickle |
|
import statistics |
|
|
|
|
|
logging.basicConfig(level=logging.INFO) |
|
logger = logging.getLogger(__name__) |
|
|
|
@dataclass |
|
class ValidationResult: |
|
"""Stores the result of a validation check""" |
|
is_valid: bool |
|
errors: List[str] |
|
warnings: List[str] |
|
normalized_data: Dict[str, str] |
|
|
|
@dataclass |
|
class ApplicationInfo: |
|
application_portal: str |
|
program_email: str |
|
|
|
@dataclass |
|
class RequiredDocument: |
|
name: str |
|
description: str |
|
conditions: Optional[str] = None |
|
|
|
@dataclass |
|
class SelectionStep: |
|
step_number: str |
|
description: str |
|
|
|
@dataclass |
|
class ProgramDetailInfo: |
|
application_info: ApplicationInfo |
|
required_documents: Dict[str, Dict[str, RequiredDocument]] |
|
submission_process: str |
|
selection_process: List[SelectionStep] |
|
|
|
@dataclass |
|
class Transportation: |
|
boat: str |
|
bts: str |
|
mrt: str |
|
airport_link: str |
|
bus: Dict[str, str] |
|
|
|
@dataclass |
|
class Contact: |
|
email: str |
|
facebook: Dict[str, str] |
|
|
|
@dataclass |
|
class ContactDetail: |
|
event_type: str |
|
department: str |
|
faculty: str |
|
university: str |
|
location: str |
|
contact: Contact |
|
transportation: Transportation |
|
|
|
@dataclass |
|
class Course: |
|
code: str |
|
title_th: str |
|
title_en: str |
|
credits: int |
|
|
|
@dataclass |
|
class CourseCategory: |
|
description: Optional[str] |
|
credits: Union[str, int] |
|
minimum_credits: Optional[int] |
|
courses: List[Course] |
|
|
|
@dataclass |
|
class CourseStructure: |
|
event_type: str |
|
program_name: str |
|
department: str |
|
total_credits: int |
|
degree_level: str |
|
structure: Dict[str, CourseCategory] |
|
|
|
@dataclass |
|
class StudyPlan: |
|
event_type: str |
|
years: Dict[str, Dict[str, Any]] |
|
|
|
@dataclass |
|
class RegularFee: |
|
amount: float |
|
currency: str |
|
period: str |
|
|
|
@dataclass |
|
class LatePaymentFee: |
|
amount: float |
|
currency: str |
|
|
|
@dataclass |
|
class TuitionFee: |
|
event_type: str |
|
regular_fee: RegularFee |
|
late_payment_fee: LatePaymentFee |
|
|
|
|
|
class OpenAIDateParser: |
|
"""Uses OpenAI to parse complex Thai date formats""" |
|
|
|
def __init__(self, api_key: str, model: str = "gpt-4o"): |
|
self.generator = OpenAIGenerator( |
|
api_key=Secret.from_token(api_key), |
|
model=model |
|
) |
|
self.prompt_builder = PromptBuilder( |
|
template=""" |
|
Parse the following Thai date range into a structured format: |
|
Date: {{date}} |
|
|
|
Return in JSON format: |
|
{ |
|
"start_date": "YYYY-MM-DD", |
|
"end_date": "YYYY-MM-DD" (if range), |
|
"is_range": true/false |
|
} |
|
|
|
Notes: |
|
- Convert Buddhist Era (BE) to CE |
|
- Handle abbreviated Thai months |
|
- Account for date ranges with dashes |
|
- Return null for end_date if it's a single date |
|
|
|
Example inputs and outputs: |
|
Input: "จ 8 ก.ค. – จ 19 ส.ค. 67" |
|
Output: {"start_date": "2024-07-08", "end_date": "2024-08-19", "is_range": true} |
|
|
|
Input: "15 มกราคม 2567" |
|
Output: {"start_date": "2024-01-15", "end_date": null, "is_range": false} |
|
""" |
|
) |
|
|
|
async def parse_date(self, date_str: str) -> Dict[str, Union[str, bool]]: |
|
"""Parse complex Thai date format using OpenAI""" |
|
try: |
|
result = self.prompt_builder.run(date=date_str) |
|
response = await self.generator.arun(prompt=result["prompt"]) |
|
|
|
if not response or not response.get("replies"): |
|
raise ValueError("Empty response from OpenAI") |
|
|
|
parsed = json.loads(response["replies"][0]) |
|
|
|
for date_field in ['start_date', 'end_date']: |
|
if parsed.get(date_field): |
|
datetime.strptime(parsed[date_field], '%Y-%m-%d') |
|
|
|
return parsed |
|
|
|
except Exception as e: |
|
logger.error(f"OpenAI date parsing failed for '{date_str}': {str(e)}") |
|
raise ValueError(f"Could not parse date: {date_str}") |
|
|
|
class ThaiTextPreprocessor: |
|
"""Handles Thai text preprocessing and normalization""" |
|
|
|
CHAR_MAP = {'ํา': 'ำ','์': '','–': '-','—': '-','٫': ',',} |
|
|
|
@classmethod |
|
def normalize_thai_text(cls, text: str) -> str: |
|
"""Normalize Thai text by applying character mappings and spacing rules""" |
|
if not text: |
|
return text |
|
|
|
for old, new in cls.CHAR_MAP.items(): |
|
text = text.replace(old, new) |
|
|
|
text = re.sub(r'\s+', ' ', text.strip()) |
|
|
|
thai_digits = '๐๑๒๓๔๕๖๗๘๙' |
|
arabic_digits = '0123456789' |
|
|
|
for thai, arabic in zip(thai_digits, arabic_digits): |
|
text = text.replace(thai, arabic) |
|
|
|
return text |
|
|
|
class CalendarEventValidator: |
|
"""Validates and preprocesses calendar events""" |
|
|
|
def __init__(self, openai_api_key: str): |
|
self.preprocessor = ThaiTextPreprocessor() |
|
self.date_parser = OpenAIDateParser(api_key=openai_api_key) |
|
|
|
async def validate_event(self, event: 'CalendarEvent') -> ValidationResult: |
|
"""Validate a calendar event and return validation results""" |
|
errors = [] |
|
warnings = [] |
|
normalized_data = {} |
|
|
|
if event.date: |
|
try: |
|
parsed_date = await self.date_parser.parse_date(event.date) |
|
normalized_data['date'] = parsed_date['start_date'] |
|
|
|
if parsed_date['is_range'] and parsed_date['end_date']: |
|
range_note = f"ถึงวันที่ {parsed_date['end_date']}" |
|
if event.note: |
|
normalized_data['note'] = f"{event.note}; {range_note}" |
|
else: |
|
normalized_data['note'] = range_note |
|
|
|
except ValueError as e: |
|
errors.append(f"Invalid date format: {event.date}") |
|
else: |
|
errors.append("Date is required") |
|
|
|
if event.time: |
|
time_pattern = r'^([01]?[0-9]|2[0-3]):([0-5][0-9])$' |
|
if not re.match(time_pattern, event.time): |
|
errors.append(f"Invalid time format: {event.time}") |
|
normalized_data['time'] = event.time |
|
|
|
if event.activity: |
|
normalized_activity = self.preprocessor.normalize_thai_text(event.activity) |
|
if len(normalized_activity) < 3: |
|
warnings.append("Activity description is very short") |
|
normalized_data['activity'] = normalized_activity |
|
else: |
|
errors.append("Activity is required") |
|
|
|
valid_semesters = {'ภาคต้น', 'ภาคปลาย', 'ภาคฤดูร้อน'} |
|
if event.semester: |
|
normalized_semester = self.preprocessor.normalize_thai_text(event.semester) |
|
if normalized_semester not in valid_semesters: |
|
warnings.append(f"Unusual semester value: {event.semester}") |
|
normalized_data['semester'] = normalized_semester |
|
else: |
|
errors.append("Semester is required") |
|
|
|
valid_types = {'registration', 'deadline', 'examination', 'academic', 'holiday'} |
|
if event.event_type not in valid_types: |
|
errors.append(f"Invalid event type: {event.event_type}") |
|
normalized_data['event_type'] = event.event_type |
|
|
|
if event.note and 'note' not in normalized_data: |
|
normalized_data['note'] = self.preprocessor.normalize_thai_text(event.note) |
|
|
|
if event.section: |
|
normalized_data['section'] = self.preprocessor.normalize_thai_text(event.section) |
|
|
|
return ValidationResult( |
|
is_valid=len(errors) == 0, |
|
errors=errors, |
|
warnings=warnings, |
|
normalized_data=normalized_data |
|
) |
|
|
|
@dataclass |
|
class CalendarEvent: |
|
"""Structured representation of a calendar event with validation""" |
|
date: str |
|
time: str |
|
activity: str |
|
note: str |
|
semester: str |
|
event_type: str |
|
section: Optional[str] = None |
|
|
|
@staticmethod |
|
def classify_event_type(activity: str) -> str: |
|
"""Classify event type based on activity description""" |
|
activity_lower = activity.lower() |
|
|
|
keywords = { |
|
'registration': ['ลงทะเบียน', 'ชําระเงิน', 'ค่าธรรมเนียม', 'เปิดเรียน'], |
|
'deadline': ['วันสุดท้าย', 'กําหนด', 'ภายใน', 'ต้องส่ง'], |
|
'examination': ['สอบ', 'ปริญญานิพนธ์', 'วิทยานิพนธ์', 'สอบปากเปล่า'], |
|
'holiday': ['วันหยุด', 'ชดเชย', 'เทศกาล'], |
|
} |
|
|
|
for event_type, terms in keywords.items(): |
|
if any(term in activity_lower for term in terms): |
|
return event_type |
|
return 'academic' |
|
|
|
async def initialize(self, openai_api_key: str): |
|
"""Asynchronously validate and normalize the event""" |
|
validator = CalendarEventValidator(openai_api_key) |
|
result = await validator.validate_event(self) |
|
|
|
if not result.is_valid: |
|
raise ValueError(f"Invalid calendar event: {', '.join(result.errors)}") |
|
|
|
for field, value in result.normalized_data.items(): |
|
setattr(self, field, value) |
|
|
|
if result.warnings: |
|
logger.warning(f"Calendar event warnings: {', '.join(result.warnings)}") |
|
|
|
def to_searchable_text(self) -> str: |
|
"""Convert event to searchable text format""" |
|
return f""" |
|
ภาคการศึกษา: {self.semester} |
|
ประเภท: {self.event_type} |
|
วันที่: {self.date} |
|
เวลา: {self.time or '-'} |
|
กิจกรรม: {self.activity} |
|
หมวดหมู่: {self.section or '-'} |
|
หมายเหตุ: {self.note or '-'} |
|
""".strip() |
|
|
|
class CacheManager: |
|
"""Manages caching for different components of the RAG pipeline""" |
|
|
|
def __init__(self, cache_dir: Path, ttl: int = 3600): |
|
""" |
|
Initialize CacheManager |
|
""" |
|
self.cache_dir = cache_dir |
|
self.ttl = ttl |
|
self.embeddings_cache = self._load_cache("embeddings") |
|
self.query_cache = self._load_cache("queries") |
|
self.document_cache = self._load_cache("documents") |
|
|
|
def _generate_key(self, data: Union[str, Dict, Any]) -> str: |
|
"""Generate a unique cache key""" |
|
if isinstance(data, str): |
|
content = data.encode('utf-8') |
|
else: |
|
content = json.dumps(data, sort_keys=True).encode('utf-8') |
|
return hashlib.md5(content).hexdigest() |
|
|
|
def _load_cache(self, cache_type: str) -> Dict: |
|
"""Load cache from disk""" |
|
cache_path = self.cache_dir / f"{cache_type}_cache.pkl" |
|
if cache_path.exists(): |
|
try: |
|
with open(cache_path, 'rb') as f: |
|
cache = pickle.load(f) |
|
self._clean_expired_entries(cache) |
|
return cache |
|
except Exception as e: |
|
logger.warning(f"Failed to load {cache_type} cache: {e}") |
|
return {} |
|
return {} |
|
|
|
def _save_cache(self, cache_type: str, cache_data: Dict): |
|
"""Save cache to disk""" |
|
cache_path = self.cache_dir / f"{cache_type}_cache.pkl" |
|
try: |
|
with open(cache_path, 'wb') as f: |
|
pickle.dump(cache_data, f) |
|
except Exception as e: |
|
logger.error(f"Failed to save {cache_type} cache: {e}") |
|
|
|
def _clean_expired_entries(self, cache: Dict): |
|
"""Remove expired cache entries""" |
|
current_time = datetime.now() |
|
expired_keys = [ |
|
key for key, (_, timestamp) in cache.items() |
|
if current_time - timestamp > timedelta(seconds=self.ttl) |
|
] |
|
for key in expired_keys: |
|
del cache[key] |
|
|
|
def get_embedding_cache(self, text: str) -> Optional[Any]: |
|
"""Get cached embedding for text""" |
|
key = self._generate_key(text) |
|
if key in self.embeddings_cache: |
|
embedding, timestamp = self.embeddings_cache[key] |
|
if datetime.now() - timestamp <= timedelta(seconds=self.ttl): |
|
return embedding |
|
return None |
|
|
|
def set_embedding_cache(self, text: str, embedding: Any): |
|
"""Cache embedding for text""" |
|
key = self._generate_key(text) |
|
self.embeddings_cache[key] = (embedding, datetime.now()) |
|
self._save_cache("embeddings", self.embeddings_cache) |
|
|
|
def get_query_cache(self, query: str) -> Optional[Dict]: |
|
"""Get cached query results""" |
|
key = self._generate_key(query) |
|
if key in self.query_cache: |
|
result, timestamp = self.query_cache[key] |
|
if datetime.now() - timestamp <= timedelta(seconds=self.ttl): |
|
return result |
|
return None |
|
|
|
def set_query_cache(self, query: str, result: Dict): |
|
"""Cache query results""" |
|
key = self._generate_key(query) |
|
self.query_cache[key] = (result, datetime.now()) |
|
self._save_cache("queries", self.query_cache) |
|
|
|
def set_document_cache(self, doc_id: str, document: Any): |
|
"""Cache document""" |
|
self.document_cache[doc_id] = (document, datetime.now()) |
|
self._save_cache("documents", self.document_cache) |
|
|
|
@dataclass |
|
class ModelConfig: |
|
openai_api_key: str |
|
embedder_model: str = "sentence-transformers/paraphrase-multilingual-mpnet-base-v2" |
|
openai_model: str = "gpt-4o" |
|
temperature: float = 0.7 |
|
|
|
@dataclass |
|
class RetrieverConfig: |
|
top_k: int = 5 |
|
|
|
@dataclass |
|
class CacheConfig: |
|
enabled: bool = True |
|
cache_dir: Path = Path("./cache") |
|
ttl: int = 86400 |
|
|
|
@dataclass |
|
class ProcessingConfig: |
|
batch_size: int = 32 |
|
|
|
@dataclass |
|
class LocalizationConfig: |
|
enable_thai_normalization: bool = True |
|
|
|
@dataclass |
|
class PipelineConfig: |
|
model: ModelConfig |
|
retriever: RetrieverConfig = field(default_factory=RetrieverConfig) |
|
cache: CacheConfig = field(default_factory=CacheConfig) |
|
processing: ProcessingConfig = field(default_factory=ProcessingConfig) |
|
localization: LocalizationConfig = field(default_factory=LocalizationConfig) |
|
|
|
def create_default_config(api_key: str) -> PipelineConfig: |
|
""" |
|
Create a default pipeline configuration with optimized settings for Thai language processing. |
|
|
|
Args: |
|
api_key (str): OpenAI API key |
|
|
|
Returns: |
|
PipelineConfig: Configured pipeline settings |
|
""" |
|
return PipelineConfig( |
|
model=ModelConfig( |
|
openai_api_key=api_key, |
|
temperature=0.3 |
|
), |
|
retriever=RetrieverConfig( |
|
top_k=5 |
|
), |
|
cache=CacheConfig( |
|
enabled=True, |
|
cache_dir=Path("./cache"), |
|
ttl=86400 |
|
), |
|
processing=ProcessingConfig( |
|
batch_size=32 |
|
), |
|
localization=LocalizationConfig( |
|
enable_thai_normalization=True |
|
) |
|
) |
|
|
|
class CalendarDataProcessor: |
|
"""Process and structure calendar data from the new raw-data.json format""" |
|
|
|
@staticmethod |
|
def parse_calendar_json(json_data: Dict) -> List[CalendarEvent]: |
|
"""Parse the new calendar JSON format into CalendarEvent objects""" |
|
events = [] |
|
|
|
|
|
calendar_data = json_data.get('academic_calendar', []) if isinstance(json_data, dict) else json_data |
|
|
|
for semester_block in calendar_data: |
|
semester = semester_block.get('education', '') |
|
schedule = semester_block.get('schedule', []) |
|
|
|
|
|
for event in schedule: |
|
if 'section' in event and 'details' in event: |
|
|
|
section = event['section'] |
|
for detail in event['details']: |
|
if 'ภาคต้น' in detail and 'ภาคปลาย' in detail: |
|
|
|
for sem_key in ['ภาคต้น', 'ภาคปลาย']: |
|
if detail.get(sem_key): |
|
events.append(CalendarEvent( |
|
date=detail[sem_key], |
|
time='', |
|
activity=detail['title'], |
|
note=section, |
|
semester=sem_key, |
|
event_type='deadline', |
|
section=section |
|
)) |
|
else: |
|
|
|
events.append(CalendarEvent( |
|
date=detail.get('date', ''), |
|
time='', |
|
activity=detail.get('title', ''), |
|
note=section, |
|
semester=ThaiTextPreprocessor.normalize_thai_text(semester), |
|
event_type='deadline', |
|
section=section |
|
)) |
|
else: |
|
|
|
event_type = CalendarEvent.classify_event_type(event.get('activity', '')) |
|
|
|
|
|
cleaned_semester = semester |
|
if '(' in semester: |
|
match = re.search(r'\((.*?)\)', semester) |
|
if match: |
|
cleaned_semester = match.group(1) |
|
cleaned_semester = ThaiTextPreprocessor.normalize_thai_text(cleaned_semester) |
|
|
|
events.append(CalendarEvent( |
|
date=event.get('date', ''), |
|
time=event.get('time', ''), |
|
activity=event.get('activity', ''), |
|
note=event.get('note', ''), |
|
semester=cleaned_semester, |
|
event_type=event_type |
|
)) |
|
|
|
return events |
|
|
|
@staticmethod |
|
def extract_program_details(json_data: Dict) -> ProgramDetailInfo: |
|
"""Extract and structure program details into ProgramDetailInfo object""" |
|
raw_details = json_data.get('program_details', {}) |
|
|
|
|
|
app_info_data = raw_details.get('application_info', {}) |
|
app_info = ApplicationInfo( |
|
application_portal=app_info_data.get('application_portal', ''), |
|
program_email=app_info_data.get('program_email', '') |
|
) |
|
|
|
|
|
req_docs = {} |
|
raw_docs = raw_details.get('required_documents', {}) |
|
|
|
|
|
mandatory_docs = {} |
|
for doc_key, doc_value in raw_docs.get('mandatory', {}).items(): |
|
mandatory_docs[doc_key] = RequiredDocument( |
|
name=doc_key, |
|
description=doc_value |
|
) |
|
req_docs['mandatory'] = mandatory_docs |
|
|
|
|
|
optional_docs = {} |
|
for doc_key, doc_data in raw_docs.get('optional', {}).items(): |
|
if doc_key == 'english_proficiency': |
|
ep_data = doc_data |
|
optional_docs[doc_key] = RequiredDocument( |
|
name=ep_data.get('name', ''), |
|
description=str(ep_data.get('accepted_tests', {})), |
|
conditions=f"Validity: {ep_data.get('validity', '')}, Benefits: {ep_data.get('benefits', '')}, Exemptions: {ep_data.get('exemptions', '')}" |
|
) |
|
else: |
|
optional_docs[doc_key] = RequiredDocument( |
|
name=doc_data.get('name', ''), |
|
description='', |
|
conditions=doc_data.get('condition', '') |
|
) |
|
req_docs['optional'] = optional_docs |
|
|
|
|
|
selection_steps = [] |
|
for step_data in raw_details.get('selection_process', {}).get('steps', []): |
|
for step_num, description in step_data.items(): |
|
selection_steps.append(SelectionStep( |
|
step_number=step_num, |
|
description=description |
|
)) |
|
|
|
return [ProgramDetailInfo( |
|
application_info=app_info, |
|
required_documents=req_docs, |
|
submission_process=raw_details.get('submission_process', ''), |
|
selection_process=selection_steps |
|
)] |
|
|
|
@staticmethod |
|
def extract_contact_details(json_data: Dict) -> List[ContactDetail]: |
|
"""Extract and structure contact details into ContactDetail objects""" |
|
raw_contacts = json_data.get('contact_details', []) |
|
contact_details = [] |
|
|
|
|
|
if not isinstance(raw_contacts, list): |
|
raw_contacts = [raw_contacts] |
|
|
|
for contact_data in raw_contacts: |
|
|
|
if not isinstance(contact_data, dict): |
|
continue |
|
|
|
try: |
|
|
|
transportation_data = contact_data.get('transportation', {}) |
|
transportation = Transportation( |
|
boat=transportation_data.get('boat', ''), |
|
bts=transportation_data.get('bts', ''), |
|
mrt=transportation_data.get('mrt', ''), |
|
airport_link=transportation_data.get('airport_link', ''), |
|
bus=transportation_data.get('bus', {}) |
|
) |
|
|
|
|
|
contact_info = Contact( |
|
email=contact_data.get('email', ''), |
|
facebook=contact_data.get('facebook', {}) |
|
) |
|
|
|
|
|
contact_details.append(ContactDetail( |
|
event_type=contact_data.get('event_type', ''), |
|
department=contact_data.get('department', ''), |
|
faculty=contact_data.get('faculty', ''), |
|
university=contact_data.get('university', ''), |
|
location=contact_data.get('location', ''), |
|
contact=contact_info, |
|
transportation=transportation |
|
)) |
|
except Exception as e: |
|
continue |
|
|
|
return contact_details |
|
|
|
|
|
@staticmethod |
|
def extract_course_structure(json_data: Dict) -> List[CourseStructure]: |
|
"""Extract and structure course information into CourseStructure objects""" |
|
course_structures = [] |
|
|
|
|
|
course_data = json_data.get('course_structure', {}) |
|
program_metadata = course_data.get('program_metadata', {}) |
|
curriculum = course_data.get('curriculum_structure', {}) |
|
|
|
|
|
foundation_data = curriculum.get('foundation_courses', {}) |
|
foundation_courses = [] |
|
for course in foundation_data.get('courses', []): |
|
foundation_courses.append(Course( |
|
code=course.get('code', ''), |
|
title_th=course.get('title', {}).get('th', ''), |
|
title_en=course.get('title', {}).get('en', ''), |
|
credits=course.get('credits', 0) |
|
)) |
|
|
|
|
|
core_data = curriculum.get('core_courses', {}) |
|
core_courses = [] |
|
for course in core_data.get('modules', []): |
|
core_courses.append(Course( |
|
code=course.get('code', ''), |
|
title_th=course.get('title', {}).get('th', ''), |
|
title_en=course.get('title', {}).get('en', ''), |
|
credits=course.get('credits', 0) |
|
)) |
|
|
|
|
|
elective_data = curriculum.get('electives', {}) |
|
elective_courses = [] |
|
for course in elective_data.get('course_groups', []): |
|
elective_courses.append(Course( |
|
code=course.get('code', ''), |
|
title_th=course.get('title', {}).get('th', ''), |
|
title_en=course.get('title', {}).get('en', ''), |
|
credits=course.get('credits', 0) |
|
)) |
|
|
|
|
|
research_data = curriculum.get('research', {}) |
|
research_courses = [] |
|
for course in research_data.get('course', []): |
|
research_courses.append(Course( |
|
code=course.get('code', ''), |
|
title_th=course.get('title', {}).get('th', ''), |
|
title_en=course.get('title', {}).get('en', ''), |
|
credits=course.get('credits', 0) |
|
)) |
|
|
|
|
|
structure = { |
|
'หมวดวิชาปรับพื้นฐาน': CourseCategory( |
|
description="วิชาพื้นฐานที่จำเป็นต้องเรียน foundation courses รายวิชาปรับพื้นฐาน", |
|
credits=foundation_data.get('metadata', {}).get('credits', 'non-credit'), |
|
minimum_credits=None, |
|
courses=foundation_courses |
|
), |
|
'หมวดวิชาบังคับ': CourseCategory( |
|
description="วิชาบังคับ วิชาหลัก core courses รายวิชาที่ต้องเรียน", |
|
credits=0, |
|
minimum_credits=core_data.get('minimum_requirement_credits'), |
|
courses=core_courses |
|
), |
|
'หมวดวิชาเลือก': CourseCategory( |
|
description="วิชาเลือก elective courses รายวิชาเลือก วิชาที่สามารถเลือกเรียนได้", |
|
credits=0, |
|
minimum_credits=elective_data.get('minimum_requirement_credits'), |
|
courses=elective_courses |
|
), |
|
'หมวดวิชาการค้นคว้าอิสระ': CourseCategory( |
|
description="วิชาค้นคว้าอิสระ research courses วิทยานิพนธ์", |
|
credits=0, |
|
minimum_credits=research_data.get('minimum_requirement_credits'), |
|
courses=research_courses |
|
) |
|
} |
|
|
|
|
|
course_structure = CourseStructure( |
|
event_type='curriculum_structure', |
|
program_name=program_metadata.get('name', ''), |
|
department=program_metadata.get('department', ''), |
|
total_credits=program_metadata.get('total_credits', 0), |
|
degree_level=program_metadata.get('degree_level', ''), |
|
structure=structure |
|
) |
|
|
|
return [course_structure] |
|
|
|
@staticmethod |
|
def extract_program_study_plan(json_data: Dict) -> List[StudyPlan]: |
|
"""Extract and structure study plan information into StudyPlan objects""" |
|
study_plan_data = json_data.get('program_study_plan', {}) |
|
|
|
|
|
years_dict = {} |
|
|
|
for year_key, year_data in study_plan_data.items(): |
|
years_dict[year_key] = {} |
|
|
|
for semester_key, semester_data in year_data.items(): |
|
|
|
metadata = semester_data.get('metadata', {}) |
|
|
|
|
|
semester_struct = { |
|
'metadata': metadata, |
|
'courses': [] |
|
} |
|
|
|
|
|
course_data = semester_data.get('modules', []) or semester_data.get('courses', []) |
|
|
|
|
|
for course in course_data: |
|
course_info = { |
|
'code': course.get('code', ''), |
|
'title': course.get('title', {'th': '', 'en': ''}), |
|
'credits': course.get('credits', 0) |
|
} |
|
semester_struct['courses'].append(course_info) |
|
|
|
|
|
years_dict[year_key][semester_key] = semester_struct |
|
|
|
|
|
study_plan = StudyPlan( |
|
event_type='study_plan', |
|
years=years_dict |
|
) |
|
|
|
return [study_plan] |
|
|
|
@staticmethod |
|
def extract_fees(json_data: Dict) -> List[TuitionFee]: |
|
"""Extract and structure fee information into TuitionFee objects""" |
|
fees_data = json_data.get('fees', {}) |
|
|
|
|
|
regular_fee_str = fees_data.get('tuition', '') |
|
regular_amount = float(regular_fee_str.split()[0]) if regular_fee_str else 0 |
|
|
|
regular_fee = RegularFee( |
|
amount=regular_amount, |
|
currency='THB', |
|
period='per semester' |
|
) |
|
|
|
|
|
late_fee_str = fees_data.get('late_payment', '') |
|
late_amount = float(late_fee_str.split()[0]) if late_fee_str else 0 |
|
|
|
late_payment_fee = LatePaymentFee( |
|
amount=late_amount, |
|
currency='THB' |
|
) |
|
|
|
|
|
tuition_fee = TuitionFee( |
|
event_type='tuition_fee', |
|
regular_fee=regular_fee, |
|
late_payment_fee=late_payment_fee |
|
) |
|
|
|
return [tuition_fee] |
|
|
|
class HybridDocumentStore: |
|
"""Enhanced document store with hybrid retrieval capabilities""" |
|
|
|
def __init__(self, config: PipelineConfig): |
|
self.store = InMemoryDocumentStore() |
|
self.embedder = SentenceTransformersDocumentEmbedder( |
|
model=config.model.embedder_model |
|
) |
|
|
|
self.bm25_retriever = InMemoryBM25Retriever( |
|
document_store=self.store, |
|
top_k=config.retriever.top_k |
|
) |
|
|
|
self.embedding_retriever = InMemoryEmbeddingRetriever( |
|
document_store=self.store, |
|
top_k=config.retriever.top_k |
|
) |
|
self.cache_manager = CacheManager( |
|
cache_dir=config.cache.cache_dir, |
|
ttl=config.cache.ttl |
|
) |
|
|
|
self.embedder.warm_up() |
|
|
|
|
|
self.events = [] |
|
self.event_type_index = {} |
|
self.semester_index = {} |
|
self._document_counter = 0 |
|
|
|
|
|
self.course_data = [] |
|
self.contact_data = [] |
|
self.study_plan_data = [] |
|
|
|
def _generate_unique_id(self) -> str: |
|
"""Generate a unique document ID""" |
|
self._document_counter += 1 |
|
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") |
|
return f"doc_{timestamp}_{self._document_counter}" |
|
|
|
def _compute_embedding(self, text: str) -> Any: |
|
"""Compute embedding with caching""" |
|
cached_embedding = self.cache_manager.get_embedding_cache(text) |
|
if cached_embedding is not None: |
|
return cached_embedding |
|
|
|
doc = Document(content=text) |
|
embedding = self.embedder.run(documents=[doc])["documents"][0].embedding |
|
self.cache_manager.set_embedding_cache(text, embedding) |
|
return embedding |
|
|
|
def _format_required_docs(self, docs: Dict) -> str: |
|
"""Format required documents information with detailed English proficiency requirements""" |
|
result = [] |
|
|
|
if 'mandatory' in docs: |
|
result.append("เอกสารที่ต้องใช้:") |
|
for doc in docs['mandatory'].values(): |
|
result.append(f"- {doc.name}: {doc.description}") |
|
|
|
if 'optional' in docs: |
|
result.append("\nเอกสารเพิ่มเติม:") |
|
for doc_key, doc in docs['optional'].items(): |
|
if doc_key == 'english_proficiency': |
|
result.append(f"- {doc.name}") |
|
|
|
try: |
|
accepted_tests = eval(doc.description) |
|
result.append(" เกณฑ์คะแนนที่ยอมรับ:") |
|
for test, requirement in accepted_tests.items(): |
|
result.append(f" * {test}: {requirement}") |
|
except: |
|
result.append(f" {doc.description}") |
|
|
|
if doc.conditions: |
|
conditions = doc.conditions.split(', ') |
|
for condition in conditions: |
|
result.append(f" {condition}") |
|
else: |
|
desc = f"- {doc.name}" |
|
if doc.conditions: |
|
desc += f" ({doc.conditions})" |
|
result.append(desc) |
|
|
|
return "\n".join(result) |
|
|
|
def _format_selection_steps(self, steps: List[SelectionStep]) -> str: |
|
"""Format selection process steps""" |
|
return "\n".join(f"{step.step_number}. {step.description}" for step in steps) |
|
|
|
def add_events(self, |
|
events: List[CalendarEvent], |
|
contact_details: Optional[List[ContactDetail]] = None, |
|
course_structure: Optional[List[CourseStructure]] = None, |
|
study_plans: Optional[List[StudyPlan]] = None, |
|
program_details: Optional[List[ProgramDetailInfo]] = None, |
|
tuition_fees: Optional[List[TuitionFee]] = None): |
|
"""Add events and additional data with caching""" |
|
documents = [] |
|
added_events = set() |
|
|
|
|
|
for event in events: |
|
event_key = f"{event.date}_{event.activity}_{event.semester}" |
|
if event_key not in added_events: |
|
added_events.add(event_key) |
|
self.events.append(event) |
|
event_idx = len(self.events) - 1 |
|
|
|
|
|
if event.event_type not in self.event_type_index: |
|
self.event_type_index[event.event_type] = [] |
|
self.event_type_index[event.event_type].append(event_idx) |
|
|
|
if event.semester not in self.semester_index: |
|
self.semester_index[event.semester] = [] |
|
self.semester_index[event.semester].append(event_idx) |
|
|
|
|
|
text = event.to_searchable_text() |
|
embedding = self._compute_embedding(text) |
|
doc = Document( |
|
id=self._generate_unique_id(), |
|
content=text, |
|
embedding=embedding, |
|
meta={ |
|
'event_type': event.event_type, |
|
'semester': event.semester, |
|
'date': event.date, |
|
'event_idx': event_idx |
|
} |
|
) |
|
documents.append(doc) |
|
self.cache_manager.set_document_cache(str(event_idx), doc) |
|
|
|
|
|
if contact_details: |
|
for contact in contact_details: |
|
self.contact_data.append(contact) |
|
text = f""" |
|
ข้อมูลการติดต่อ: |
|
คณะ: {contact.faculty} |
|
ภาควิชา: {contact.department} |
|
มหาวิทยาลัย: {contact.university} |
|
สถานที่: {contact.location} |
|
|
|
การติดต่อ: |
|
อีเมล: {contact.contact.email} |
|
Facebook: {json.dumps(contact.contact.facebook, ensure_ascii=False)} |
|
|
|
การเดินทาง: |
|
เรือ: {contact.transportation.boat} |
|
BTS: {contact.transportation.bts} |
|
MRT: {contact.transportation.mrt} |
|
Airport Link: {contact.transportation.airport_link} |
|
รถประจำทาง: {json.dumps(contact.transportation.bus, ensure_ascii=False)} |
|
""" |
|
embedding = self._compute_embedding(text) |
|
doc = Document( |
|
id=self._generate_unique_id(), |
|
content=text, |
|
embedding=embedding, |
|
meta={'event_type': 'contact'} |
|
) |
|
documents.append(doc) |
|
|
|
|
|
if course_structure: |
|
for course in course_structure: |
|
text = f""" |
|
โครงสร้างหลักสูตร: |
|
ชื่อหลักสูตร: {course.program_name} |
|
ภาควิชา: {course.department} |
|
หน่วยกิตรวม: {course.total_credits} |
|
ระดับการศึกษา: {course.degree_level} |
|
|
|
รายละเอียดโครงสร้าง: |
|
|
|
หมวดวิชาปรับพื้นฐาน/วิชาพื้นฐาน: |
|
คำอธิบาย: {course.structure['หมวดวิชาปรับพื้นฐาน'].description or 'ไม่ระบุ'} |
|
หน่วยกิต: {course.structure['หมวดวิชาปรับพื้นฐาน'].credits} |
|
รายวิชา: |
|
""" |
|
|
|
foundation_courses = [] |
|
for c in course.structure['หมวดวิชาปรับพื้นฐาน'].courses: |
|
foundation_courses.append(f"- {c.code}: {c.title_th} ({c.title_en}) - {c.credits} หน่วยกิต") |
|
text += "\n".join(foundation_courses) |
|
|
|
text += f""" |
|
|
|
หมวดวิชาบังคับ/วิชาหลัก: |
|
หน่วยกิตขั้นต่ำ: {course.structure['หมวดวิชาบังคับ'].minimum_credits} |
|
รายวิชา: |
|
""" |
|
|
|
core_courses = [] |
|
for c in course.structure['หมวดวิชาบังคับ'].courses: |
|
core_courses.append(f"- {c.code}: {c.title_th} ({c.title_en}) - {c.credits} หน่วยกิต") |
|
text += "\n".join(core_courses) |
|
|
|
text += f""" |
|
|
|
หมวดวิชาเลือก: |
|
หน่วยกิตขั้นต่ำ: {course.structure['หมวดวิชาเลือก'].minimum_credits} |
|
รายวิชา: |
|
""" |
|
|
|
elective_courses = [] |
|
for c in course.structure['หมวดวิชาเลือก'].courses: |
|
elective_courses.append(f"- {c.code}: {c.title_th} ({c.title_en}) - {c.credits} หน่วยกิต") |
|
text += "\n".join(elective_courses) |
|
|
|
text += f""" |
|
|
|
หมวดวิชาการค้นคว้าอิสระ: |
|
หน่วยกิตขั้นต่ำ: {course.structure['หมวดวิชาการค้นคว้าอิสระ'].minimum_credits} |
|
รายวิชา: |
|
""" |
|
|
|
research_courses = [] |
|
for c in course.structure['หมวดวิชาการค้นคว้าอิสระ'].courses: |
|
research_courses.append(f"- {c.code}: {c.title_th} ({c.title_en}) - {c.credits} หน่วยกิต") |
|
text += "\n".join(research_courses) |
|
|
|
doc = Document( |
|
id=self._generate_unique_id(), |
|
content=text.strip(), |
|
embedding=self._compute_embedding(text), |
|
meta={'event_type': 'curriculum'} |
|
) |
|
documents.append(doc) |
|
|
|
|
|
if study_plans: |
|
for plan in study_plans: |
|
self.study_plan_data.append(plan) |
|
for year, semesters in plan.years.items(): |
|
for semester, data in semesters.items(): |
|
|
|
year_num = year.replace('year', '') |
|
semester_num = semester.replace('semester', '') |
|
|
|
|
|
course_type = data.get('metadata', {}).get('course_type', 'core') |
|
course_type_th = 'วิชาหลัก' if course_type == 'core' else 'วิชาเลือก' |
|
|
|
|
|
total_credits = sum(course.get('credits', 0) for course in data.get('courses', [])) |
|
|
|
text = f"""แผนการศึกษา: |
|
ปี: {year_num} |
|
ภาคการศึกษา: {semester_num} |
|
ประเภทรายวิชา: {course_type_th} ({course_type}) |
|
จำนวนหน่วยกิตรวม: {total_credits} |
|
|
|
รายวิชาที่ต้องเรียน:""" |
|
|
|
|
|
if 'courses' in data: |
|
for course in data['courses']: |
|
text += f"\n- {course['code']}: {course['title'].get('th', '')} ({course['title'].get('en', '')}) - {course['credits']} หน่วยกิต" |
|
|
|
embedding = self._compute_embedding(text) |
|
doc = Document( |
|
id=self._generate_unique_id(), |
|
content=text, |
|
embedding=embedding, |
|
meta={ |
|
'event_type': 'study_plan', |
|
'year': year_num, |
|
'semester': semester_num, |
|
'course_type': course_type |
|
} |
|
) |
|
documents.append(doc) |
|
|
|
if program_details: |
|
for detail in program_details: |
|
|
|
app_text = f""" |
|
ข้อมูลการสมัคร: |
|
เว็บไซต์รับสมัคร: {detail.application_info.application_portal} |
|
อีเมล: {detail.application_info.program_email} |
|
|
|
เอกสารที่ต้องใช้: |
|
{self._format_required_docs(detail.required_documents)} |
|
|
|
ขั้นตอนการส่งเอกสาร: |
|
{detail.submission_process} |
|
|
|
ขั้นตอนการคัดเลือก: |
|
{self._format_selection_steps(detail.selection_process)} |
|
""" |
|
|
|
doc = Document( |
|
id=self._generate_unique_id(), |
|
content=app_text.strip(), |
|
embedding=self._compute_embedding(app_text), |
|
meta={'event_type': 'program_details'} |
|
) |
|
documents.append(doc) |
|
|
|
|
|
if 'optional' in detail.required_documents: |
|
eng_prof = next((doc for doc_key, doc in detail.required_documents['optional'].items() |
|
if doc_key == 'english_proficiency'), None) |
|
if eng_prof: |
|
eng_text = f""" |
|
ข้อกำหนดภาษาอังกฤษ: |
|
{eng_prof.name} |
|
รายละเอียด: {eng_prof.description} |
|
เงื่อนไข: {eng_prof.conditions} |
|
""" |
|
|
|
eng_doc = Document( |
|
id=self._generate_unique_id(), |
|
content=eng_text.strip(), |
|
embedding=self._compute_embedding(eng_text), |
|
meta={ |
|
'event_type': 'program_details' } |
|
) |
|
documents.append(eng_doc) |
|
|
|
|
|
if tuition_fees: |
|
for fee in tuition_fees: |
|
fee_text = f""" |
|
ค่าธรรมเนียมการศึกษา: |
|
- ค่าเล่าเรียน: {fee.regular_fee.amount:,.2f} {fee.regular_fee.currency} {fee.regular_fee.period} |
|
- ค่าปรับชำระล่าช้า: {fee.late_payment_fee.amount:,.2f} {fee.late_payment_fee.currency} |
|
""" |
|
|
|
doc = Document( |
|
id=self._generate_unique_id(), |
|
content=fee_text.strip(), |
|
embedding=self._compute_embedding(fee_text), |
|
meta={'event_type': 'fees'} |
|
) |
|
documents.append(doc) |
|
|
|
batch_size = 10 |
|
for i in range(0, len(documents), batch_size): |
|
batch = documents[i:i + batch_size] |
|
try: |
|
self.store.write_documents(batch) |
|
except Exception as e: |
|
logger.error(f"Error writing document batch {i//batch_size + 1}: {str(e)}") |
|
for doc in batch: |
|
try: |
|
self.store.write_documents([doc]) |
|
except Exception as e2: |
|
logger.error(f"Failed to write document {doc.id}: {str(e2)}") |
|
|
|
def hybrid_search(self, |
|
query: str, |
|
event_type: Optional[str] = None, |
|
detail_type: Optional[str] = None, |
|
semester: Optional[str] = None, |
|
top_k: int = 10, |
|
weight_semantic: float = 0.5) -> List[Document]: |
|
|
|
"""Hybrid search combining semantic and lexical search results""" |
|
|
|
cache_key = json.dumps({ |
|
'query': query, |
|
'event_type': event_type, |
|
'semester': semester, |
|
'top_k': top_k, |
|
'weight_semantic': weight_semantic |
|
}) |
|
|
|
cached_results = self.cache_manager.get_query_cache(cache_key) |
|
if cached_results is not None: |
|
return cached_results |
|
|
|
|
|
query_embedding = self._compute_embedding(query) |
|
semantic_results = self.embedding_retriever.run(query_embedding=query_embedding)["documents"] |
|
|
|
|
|
bm25_results = self.bm25_retriever.run( |
|
query=query |
|
)["documents"] |
|
|
|
if event_type == "program_details": |
|
weight_semantic = 0.3 |
|
|
|
|
|
combined_results = self._merge_results( |
|
semantic_results=semantic_results, |
|
bm25_results=bm25_results, |
|
weight_semantic=weight_semantic, |
|
top_k=top_k |
|
) |
|
|
|
|
|
filtered_results = [] |
|
for doc in combined_results: |
|
if event_type and event_type != "program_details" and doc.meta.get('event_type') != event_type: |
|
continue |
|
filtered_results.append(doc) |
|
|
|
final_results = filtered_results[:top_k] |
|
self.cache_manager.set_query_cache(cache_key, final_results) |
|
|
|
return final_results |
|
|
|
def _merge_results(self, |
|
semantic_results: List[Document], |
|
bm25_results: List[Document], |
|
weight_semantic: float, |
|
top_k: int) -> List[Document]: |
|
"""Merge semantic and BM25 results using weighted score fusion""" |
|
|
|
|
|
semantic_scores = {} |
|
bm25_scores = {} |
|
|
|
|
|
max_semantic_score = max(doc.score for doc in semantic_results) if semantic_results else 1.0 |
|
for doc in semantic_results: |
|
semantic_scores[doc.id] = doc.score / max_semantic_score if max_semantic_score > 0 else 0 |
|
|
|
|
|
max_bm25_score = max(doc.score for doc in bm25_results) if bm25_results else 1.0 |
|
for doc in bm25_results: |
|
bm25_scores[doc.id] = doc.score / max_bm25_score if max_bm25_score > 0 else 0 |
|
|
|
|
|
combined_scores = {} |
|
all_docs = {doc.id: doc for doc in semantic_results + bm25_results} |
|
|
|
for doc_id in all_docs: |
|
semantic_score = semantic_scores.get(doc_id, 0) |
|
bm25_score = bm25_scores.get(doc_id, 0) |
|
|
|
|
|
combined_scores[doc_id] = ( |
|
weight_semantic * semantic_score + |
|
(1 - weight_semantic) * bm25_score |
|
) |
|
|
|
|
|
sorted_docs = sorted( |
|
all_docs.values(), |
|
key=lambda x: combined_scores[x.id], |
|
reverse=True |
|
) |
|
|
|
return sorted_docs[:top_k] |
|
|
|
def search_with_reranking(self, |
|
query: str, |
|
event_type: Optional[str] = None, |
|
detail_type: Optional[str] = None, |
|
semester: Optional[str] = None, |
|
top_k_initial: int = 20, |
|
top_k_final: int = 5, |
|
weight_semantic: float = 0.5) -> List[Document]: |
|
""" |
|
Two-stage retrieval with hybrid search followed by cross-encoder reranking |
|
""" |
|
|
|
cache_key = json.dumps({ |
|
'query': query, |
|
'event_type': event_type, |
|
'semester': semester, |
|
'top_k_initial': top_k_initial, |
|
'top_k_final': top_k_final, |
|
'weight_semantic': weight_semantic, |
|
'reranked': True |
|
}) |
|
|
|
|
|
cached_results = self.cache_manager.get_query_cache(cache_key) |
|
if cached_results is not None: |
|
return cached_results |
|
|
|
|
|
initial_results = self.hybrid_search( |
|
query=query, |
|
event_type=event_type, |
|
detail_type=detail_type, |
|
semester=semester, |
|
top_k=top_k_initial, |
|
weight_semantic=weight_semantic |
|
) |
|
|
|
|
|
if len(initial_results) <= top_k_final: |
|
return initial_results |
|
|
|
try: |
|
|
|
cross_encoder = SentenceTransformersCrossEncoder("cross-encoder/mmarco-mMiniLMv2-L12-H384-v1") |
|
pairs = [(query, doc.content) for doc in initial_results] |
|
scores = cross_encoder.predict(pairs) |
|
|
|
for doc, score in zip(initial_results, scores): |
|
doc.score = float(score) |
|
|
|
reranked_results = sorted(initial_results, key=lambda x: x.score, reverse=True)[:top_k_final] |
|
|
|
|
|
self.cache_manager.set_query_cache(cache_key, reranked_results) |
|
|
|
return reranked_results |
|
|
|
except Exception as e: |
|
logger.error(f"Reranking failed: {str(e)}. Falling back to hybrid search results.") |
|
|
|
return initial_results[:top_k_final] |
|
|
|
class ResponseGenerator: |
|
"""Generate responses with enhanced conversation context awareness""" |
|
|
|
def __init__(self, config: PipelineConfig): |
|
self.generator = OpenAIGenerator( |
|
api_key=Secret.from_token(config.model.openai_api_key), |
|
model=config.model.openai_model |
|
) |
|
self.prompt_builder = PromptBuilder( |
|
template=""" |
|
คุณเป็นที่ปรึกษาทางวิชาการ กรุณาตอบคำถามต่อไปนี้โดยใช้ข้อมูลจากเอกสารที่ให้มาและพิจารณาบริบทจากประวัติการสนทนา |
|
|
|
{% if conversation_history %} |
|
ประวัติการสนทนา: |
|
{% for message in conversation_history %} |
|
{% if message.role == 'user' %} |
|
ผู้ใช้: {{ message.content }} |
|
{% else %} |
|
ที่ปรึกษา: {{ message.content }} |
|
{% endif %} |
|
{% endfor %} |
|
{% endif %} |
|
|
|
คำถามปัจจุบัน: {{query}} |
|
|
|
ข้อมูลที่เกี่ยวข้อง: |
|
{% for doc in context %} |
|
--- |
|
ประเภท: {{doc.meta.event_type}}{% if doc.meta.detail_type %}, รายละเอียด: {{doc.meta.detail_type}}{% endif %} |
|
เนื้อหา: |
|
{{doc.content}} |
|
{% endfor %} |
|
|
|
คำแนะนำในการตอบ: |
|
1. ตอบเฉพาะข้อมูลที่มีในเอกสารเท่านั้น |
|
2. หากไม่มีข้อมูลให้ตอบว่า "ขออภัย ไม่พบข้อมูลที่เกี่ยวข้องกับคำถามนี้" |
|
3. หากข้อมูลไม่ชัดเจนให้ระบุว่าข้อมูลอาจไม่ครบถ้วน |
|
4. จัดรูปแบบคำตอบให้อ่านง่าย ใช้หัวข้อและย่อหน้าตามความเหมาะสม |
|
5. สำหรับคำถามเกี่ยวกับข้อกำหนดภาษาอังกฤษหรือขั้นตอนการสมัคร ให้อธิบายข้อมูลอย่างละเอียด |
|
6. ใส่ข้อความ "หากมีข้อสงสัยเพิ่มเติม สามารถสอบถามได้" ท้ายคำตอบเสมอ |
|
7. คำนึงถึงประวัติการสนทนาและให้คำตอบที่ต่อเนื่องกับบทสนทนาก่อนหน้า |
|
8. หากคำถามอ้างอิงถึงข้อมูลในบทสนทนาก่อนหน้า (เช่น "แล้วอันนั้นล่ะ", "มีอะไรอีกบ้าง", "คำถามก่อนหน้า") ให้พิจารณาบริบทและตอบคำถามอย่างตรงประเด็น แต่ไม่ต้องแสดงคำถามก่อนหน้าในคำตอบ |
|
9. กรณีคำถามมีความไม่ชัดเจน ใช้ประวัติการสนทนาเพื่อเข้าใจบริบทของคำถาม |
|
|
|
สำคัญ: ไม่ต้องใส่คำว่า "คำถามก่อนหน้าคือ [คำถามก่อนหน้า] และคำตอบคือ..." ในคำตอบของคุณ ให้ตอบคำถามโดยตรง |
|
|
|
กรุณาตอบเป็นภาษาไทย: |
|
""" |
|
) |
|
|
|
def generate_response(self, |
|
query: str, |
|
documents: List[Document], |
|
query_info: Dict[str, Any], |
|
conversation_history: List[Dict[str, str]] = None) -> str: |
|
"""Generate response using retrieved documents and conversation history""" |
|
try: |
|
|
|
reference_keywords = ["ก่อนหน้านี้", "ก่อนหน้า", "ที่ผ่านมา", "คำถามก่อนหน้า", "คำถามที่แล้ว", |
|
"previous", "earlier", "before", "last time", "last question"] |
|
|
|
is_reference_question = any(keyword in query.lower() for keyword in reference_keywords) |
|
|
|
|
|
enhanced_context = conversation_history or [] |
|
|
|
result = self.prompt_builder.run( |
|
query=query, |
|
context=documents, |
|
format=query_info.get("response_format", "detailed"), |
|
conversation_history=enhanced_context, |
|
is_reference_question=is_reference_question |
|
) |
|
|
|
response = self.generator.run(prompt=result["prompt"]) |
|
return response["replies"][0] |
|
|
|
except Exception as e: |
|
logger.error(f"Response generation failed: {str(e)}") |
|
return "ขออภัย ไม่สามารถประมวลผลคำตอบได้ในขณะนี้" |
|
|
|
class AdvancedQueryProcessor: |
|
"""Process queries with better understanding""" |
|
|
|
def __init__(self, config: PipelineConfig): |
|
self.generator = OpenAIGenerator( |
|
api_key=Secret.from_token(config.model.openai_api_key), |
|
model=config.model.openai_model |
|
) |
|
self.prompt_builder = PromptBuilder( |
|
template=""" |
|
คุณเป็นผู้ช่วย AI ที่เชี่ยวชาญด้านการศึกษาในประเทศไทย หน้าที่ของคุณคือการวิเคราะห์และจำแนกคำถามของผู้ใช้ให้ตรงกับหมวดหมู่ข้อมูลที่เหมาะสม ได้แก่: |
|
|
|
1. **รายละเอียดโปรแกรมการศึกษา (program_details)**: ข้อมูลเกี่ยวกับหลักสูตร โปรแกรมการเรียนการสอน และโครงสร้างหลักสูตร |
|
2. **ข้อมูลการติดต่อ (contact)**: ข้อมูลการติดต่อของหน่วยงานหรือบุคคลที่เกี่ยวข้องในสถาบันการศึกษา |
|
3. **โครงสร้างหลักสูตร (curriculum)**: รายละเอียดเกี่ยวกับวิชาเรียน หน่วยกิต และแผนการศึกษา |
|
4. **ค่าเล่าเรียน (fees)**: ข้อมูลเกี่ยวกับค่าใช้จ่ายในการศึกษา ค่าธรรมเนียม และทุนการศึกษา |
|
5. **แผนการศึกษารายปี (study_plan)**: ข้อมูลแผนการเรียนแบ่งตามชั้นปีและภาคการศึกษา รายละเอียดรายวิชาที่ต้องลงทะเบียนในแต่ละเทอม และจำนวนหน่วยกิตรวม |
|
|
|
**คำถาม**: {{query}} |
|
|
|
**คำแนะนำในการวิเคราะห์**: |
|
- ตรวจสอบคำสำคัญในคำถามเพื่อระบุหมวดหมู่ที่สอดคล้อง |
|
- หากคำถามเกี่ยวข้องกับหลายหมวดหมู่ ให้จัดลำดับความสำคัญตามความต้องการของผู้ใช้ |
|
- หากไม่สามารถระบุหมวดหมู่ได้อย่างชัดเจน ให้จัดหมวดหมู่เป็น "อื่นๆ" และระบุความไม่แน่นอน |
|
|
|
**รูปแบบการตอบกลับ**: |
|
|
|
หมายเหตุ: |
|
- รูปแบบปีการศึกษาที่ยอมรับ: "ปีที่ 1", "ปี 1", "ชั้นปีที่ 1" |
|
- รูปแบบภาคการศึกษาที่ยอมรับ: "เทอมที่ 1", "เทอม 1", "ภาคการศึกษาที่ 1" |
|
- หากข้อมูลไม่ครบ ให้ระบุค่าสำหรับฟิลด์ที่ขาดหายเป็น null พร้อมข้อความแจ้งความไม่แน่นอน |
|
|
|
ให้ผลลัพธ์ในรูปแบบ JSON ตามโครงสร้าง: |
|
{ |
|
"event_type": "program_details" | "contact" | "curriculum" | "fees" | "study_plan", |
|
"year": "ปีที่ X", // แปลงเป็นรูปแบบมาตรฐาน หรือ null หากไม่ระบุ |
|
"semester": "เทอมที่ X", // แปลงเป็นรูปแบบมาตรฐาน หรือ null หากไม่ระบุ |
|
"key_terms": ["คำสำคัญที่เกี่ยวข้อง"], |
|
"response_format": "detailed", |
|
"uncertainty": "low" // ระบุระดับความไม่แน่นอน (เช่น 'low', 'high') |
|
} |
|
|
|
ตัวอย่าง: |
|
Input: "โปรแกรมการศึกษามีรายละเอียดอะไรบ้าง" |
|
Output: { |
|
"event_type": "program_details", |
|
"year": null, |
|
"semester": null, |
|
"key_terms": ["โปรแกรมการศึกษา", "รายละเอียด"], |
|
"response_format": "detailed", |
|
"uncertainty": "low" |
|
} |
|
|
|
Input: "ฉันจะติดต่อภาควิชาได้อย่างไร" |
|
Output: { |
|
"event_type": "contact", |
|
"year": null, |
|
"semester": null, |
|
"key_terms": ["ติดต่อ", "ภาควิชา"], |
|
"response_format": "detailed", |
|
"uncertainty": "low" |
|
} |
|
|
|
Input: "โครงสร้างหลักสูตรของปี 2 เป็นอย่างไร" |
|
Output: { |
|
"event_type": "curriculum", |
|
"year": "ปีที่ 2", |
|
"semester": null, |
|
"key_terms": ["โครงสร้างหลักสูตร"], |
|
"response_format": "detailed", |
|
"uncertainty": "low" |
|
} |
|
|
|
Input: "ค่าเล่าเรียนสำหรับเทอม 1 เท่าไหร่" |
|
Output: { |
|
"event_type": "fees", |
|
"year": null, |
|
"semester": "เทอมที่ 1", |
|
"key_terms": ["ค่าเล่าเรียน", "เทอม 1"], |
|
"response_format": "detailed", |
|
"uncertainty": "low" |
|
} |
|
|
|
Input: "ปี 1 เทอม 1 ต้องเรียนอะไรบ้าง" |
|
Output: { |
|
"event_type": "study_plan", |
|
"year": null, |
|
"semester": null, |
|
"key_terms": ["เรียนอะไร", "เทอม"], |
|
"response_format": "detailed", |
|
"uncertainty": "low" |
|
} |
|
|
|
กรุณาตอบเป็นภาษาไทยและตรวจสอบให้แน่ใจว่า JSON มีโครงสร้างที่ถูกต้อง |
|
""" |
|
) |
|
|
|
def normalize_year_semester(self, query: str) -> str: |
|
"""Normalize year and semester formats in queries""" |
|
|
|
year_patterns = { |
|
r'ปี\s*(\d+)': r'ปีที่ \1', |
|
r'ชั้นปีที่\s*(\d+)': r'ปีที่ \1', |
|
r'ปีการศึกษาที่\s*(\d+)': r'ปีที่ \1' |
|
} |
|
|
|
semester_patterns = { |
|
r'เทอม\s*(\d+)': r'เทอมที่ \1', |
|
r'ภาคเรียนที่\s*(\d+)': r'เทอมที่ \1', |
|
r'ภาคการศึกษาที่\s*(\d+)': r'เทอมที่ \1' |
|
} |
|
normalized_query = query |
|
for pattern, replacement in year_patterns.items(): |
|
normalized_query = re.sub(pattern, replacement, normalized_query) |
|
for pattern, replacement in semester_patterns.items(): |
|
normalized_query = re.sub(pattern, replacement, normalized_query) |
|
return normalized_query |
|
|
|
def normalize_query(self, query: str) -> str: |
|
"""เพิ่มการเปลี่ยนแปลงคำ (synonym mapping) เพื่อลดปัญหา Vocabulary Mismatch""" |
|
normalized_query = self.normalize_year_semester(query) |
|
|
|
synonyms = { |
|
"วิชาเลือก": "หมวดวิชาเลือก" |
|
|
|
} |
|
for original, replacement in synonyms.items(): |
|
normalized_query = normalized_query.replace(original, replacement) |
|
return normalized_query |
|
|
|
def _get_default_analysis(self, query: str) -> Dict[str, Any]: |
|
logger.info("Returning default analysis") |
|
return { |
|
"original_query": query, |
|
"event_type": None, |
|
"semester": None, |
|
"key_terms": [], |
|
"response_format": "detailed" |
|
} |
|
|
|
def process_query(self, query: str) -> Dict[str, Any]: |
|
"""Enhanced query processing with support for detail types and better categorization.""" |
|
try: |
|
|
|
normalized_query = self.normalize_query(query) |
|
result = self.prompt_builder.run(query=normalized_query) |
|
response = self.generator.run(prompt=result["prompt"]) |
|
|
|
if not response or not response.get("replies") or not response["replies"][0]: |
|
logger.warning("Received empty response from OpenAI") |
|
return self._get_default_analysis(query) |
|
|
|
|
|
json_str = response["replies"][0] |
|
json_str = json_str.replace("```json", "").replace("```", "").strip() |
|
analysis = json.loads(json_str) |
|
|
|
analysis['detail_type'] = None |
|
|
|
|
|
if any(keyword in query.lower() for keyword in ['ภาษาอังกฤษ', 'toefl', 'ielts', 'swu-set', 'โทอิค', 'คะแนนภาษา']): |
|
analysis['event_type'] = 'program_details' |
|
elif any(keyword in query.lower() for keyword in ['สมัคร', 'ขั้นตอน', 'วิธีการ', 'เอกสาร', 'หลักฐาน', 'admission']): |
|
analysis['event_type'] = 'program_details' |
|
analysis['detail_type'] = None |
|
elif any(keyword in query.lower() for keyword in ['ค่าเทอม', 'ค่าธรรมเนียม', 'ค่าเรียน', 'ค่าปรับ', 'ค่าใช้จ่าย']): |
|
analysis['event_type'] = 'fees' |
|
elif any(keyword in query.lower() for keyword in ['หน่วยกิต', 'วิชา', 'หลักสูตร', 'แผนการเรียน', 'วิชาเลือก', 'วิชาบังคับ', 'วิชาหลัก', 'หมวดวิชา']): |
|
analysis['event_type'] = 'curriculum' |
|
return { |
|
"original_query": query, |
|
**analysis |
|
} |
|
except Exception as e: |
|
logger.error(f"Query processing failed: {str(e)}") |
|
return self._get_default_analysis(query) |
|
|
|
|
|
|
|
|
|
class AcademicCalendarRAG: |
|
"""Enhanced RAG system for academic calendar and program information with conversation memory""" |
|
|
|
def __init__(self, config: PipelineConfig): |
|
self.config = config |
|
self.document_store = HybridDocumentStore(config) |
|
self.query_processor = AdvancedQueryProcessor(config) |
|
self.response_generator = ResponseGenerator(config) |
|
self.data_processor = CalendarDataProcessor() |
|
|
|
|
|
self.conversation_history = [] |
|
self.max_history_length = 5 |
|
|
|
|
|
self.calendar_events = [] |
|
self.program_details = [] |
|
self.contact_details = [] |
|
self.course_structure = [] |
|
self.study_plans = [] |
|
self.tuition_fees = [] |
|
|
|
def add_to_conversation(self, role: str, content: str): |
|
"""Add a message to the conversation history""" |
|
self.conversation_history.append({"role": role, "content": content}) |
|
|
|
if len(self.conversation_history) > self.max_history_length * 2: |
|
self.conversation_history = self.conversation_history[-(self.max_history_length * 2):] |
|
|
|
def load_data(self, json_data: Dict): |
|
"""Load and process all data sources""" |
|
try: |
|
raw_events = self.data_processor.parse_calendar_json(json_data) |
|
for event in raw_events: |
|
if not event.event_type: |
|
event.event_type = CalendarEvent.classify_event_type(event.activity) |
|
self.calendar_events.append(event) |
|
|
|
|
|
self.program_details = self.data_processor.extract_program_details(json_data) |
|
self.contact_details = self.data_processor.extract_contact_details(json_data) |
|
self.course_structure = self.data_processor.extract_course_structure(json_data) |
|
self.study_plans = self.data_processor.extract_program_study_plan(json_data) |
|
self.tuition_fees = self.data_processor.extract_fees(json_data) |
|
|
|
self.document_store.add_events( |
|
events=self.calendar_events, |
|
program_details=self.program_details, |
|
contact_details=self.contact_details, |
|
course_structure=self.course_structure, |
|
study_plans=self.study_plans, |
|
tuition_fees=self.tuition_fees |
|
) |
|
|
|
except Exception as e: |
|
logger.error(f"Error loading data: {str(e)}") |
|
raise |
|
|
|
def process_query(self, query: str, conversation_history=None) -> Dict[str, Any]: |
|
"""Process user query using conversation history and hybrid retrieval.""" |
|
|
|
if conversation_history is not None: |
|
self.conversation_history = conversation_history |
|
|
|
|
|
self.add_to_conversation("user", query) |
|
|
|
|
|
query_with_context = query |
|
if self.conversation_history and len(self.conversation_history) > 1: |
|
|
|
prev_exchanges = self.conversation_history[:-1] |
|
if len(prev_exchanges) > 4: |
|
prev_exchanges = prev_exchanges[-4:] |
|
|
|
context_str = "\n".join([f"{msg['role']}: {msg['content']}" for msg in prev_exchanges]) |
|
query_with_context = f"Previous conversation:\n{context_str}\n\nCurrent question: {query}" |
|
|
|
|
|
max_attempts = 4 |
|
attempt = 0 |
|
weight_values = [0.3, 0.7, 0.3, 0.7] |
|
|
|
while attempt < max_attempts: |
|
attempt += 1 |
|
try: |
|
|
|
if attempt <= 2: |
|
query_info = self.query_processor.process_query(query_with_context if attempt == 1 else query) |
|
else: |
|
query_info = self.query_processor._get_default_analysis(query) |
|
logger.info(f"Retrying query processing (attempt {attempt}) with default analysis") |
|
|
|
weight_semantic = weight_values[attempt - 1] |
|
|
|
|
|
logger.info(f"Attempt {attempt}: Searching with weight_semantic={weight_semantic}") |
|
documents = self.document_store.hybrid_search( |
|
query=query_with_context if attempt == 1 else query, |
|
event_type=query_info.get("event_type"), |
|
detail_type=query_info.get("detail_type"), |
|
semester=query_info.get("semester"), |
|
top_k=self.config.retriever.top_k, |
|
weight_semantic=weight_semantic |
|
) |
|
|
|
|
|
response = self.response_generator.generate_response( |
|
query=query, |
|
documents=documents, |
|
query_info=query_info, |
|
conversation_history=self.conversation_history |
|
).strip() |
|
|
|
|
|
if "ขออภัย ไม่พบข้อมูลที่เกี่ยวข้อง" in response and attempt < max_attempts: |
|
continue |
|
|
|
|
|
self.add_to_conversation("assistant", response) |
|
|
|
return { |
|
"query": query, |
|
"answer": response, |
|
"relevant_docs": documents, |
|
"query_info": query_info |
|
} |
|
|
|
except Exception as e: |
|
logger.error(f"Error processing query: {str(e)}") |
|
|
|
return { |
|
"query": query, |
|
"answer": "ขออภัย ไม่สามารถประมวลผลคำตอบได้ในขณะนี้", |
|
"error": "Maximum retry attempts reached" |
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|