JirasakJo's picture
Update calendar_rag.py
b16b6fc verified
from haystack import *
from haystack.components.generators.openai import OpenAIGenerator
from haystack.components.builders import PromptBuilder
from haystack.components.embedders import SentenceTransformersDocumentEmbedder
from haystack.components.retrievers.in_memory import *
from haystack.document_stores.in_memory import InMemoryDocumentStore
from haystack.utils import Secret
from pathlib import Path
import hashlib
from datetime import *
from typing import *
from dataclasses import *
import json
import logging
import re
import pickle
import statistics
# Setup logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class ValidationResult:
"""Stores the result of a validation check"""
is_valid: bool
errors: List[str]
warnings: List[str]
normalized_data: Dict[str, str]
@dataclass
class ApplicationInfo:
application_portal: str
program_email: str
@dataclass
class RequiredDocument:
name: str
description: str
conditions: Optional[str] = None
@dataclass
class SelectionStep:
step_number: str
description: str
@dataclass
class ProgramDetailInfo:
application_info: ApplicationInfo
required_documents: Dict[str, Dict[str, RequiredDocument]]
submission_process: str
selection_process: List[SelectionStep]
@dataclass
class Transportation:
boat: str
bts: str
mrt: str
airport_link: str
bus: Dict[str, str]
@dataclass
class Contact:
email: str
facebook: Dict[str, str]
@dataclass
class ContactDetail:
event_type: str
department: str
faculty: str
university: str
location: str
contact: Contact
transportation: Transportation
@dataclass
class Course:
code: str
title_th: str
title_en: str
credits: int
@dataclass
class CourseCategory:
description: Optional[str]
credits: Union[str, int]
minimum_credits: Optional[int]
courses: List[Course]
@dataclass
class CourseStructure:
event_type: str
program_name: str
department: str
total_credits: int
degree_level: str
structure: Dict[str, CourseCategory]
@dataclass
class StudyPlan:
event_type: str
years: Dict[str, Dict[str, Any]]
@dataclass
class RegularFee:
amount: float
currency: str
period: str
@dataclass
class LatePaymentFee:
amount: float
currency: str
@dataclass
class TuitionFee:
event_type: str
regular_fee: RegularFee
late_payment_fee: LatePaymentFee
class OpenAIDateParser:
"""Uses OpenAI to parse complex Thai date formats"""
def __init__(self, api_key: str, model: str = "gpt-4o"):
self.generator = OpenAIGenerator(
api_key=Secret.from_token(api_key),
model=model
)
self.prompt_builder = PromptBuilder(
template="""
Parse the following Thai date range into a structured format:
Date: {{date}}
Return in JSON format:
{
"start_date": "YYYY-MM-DD",
"end_date": "YYYY-MM-DD" (if range),
"is_range": true/false
}
Notes:
- Convert Buddhist Era (BE) to CE
- Handle abbreviated Thai months
- Account for date ranges with dashes
- Return null for end_date if it's a single date
Example inputs and outputs:
Input: "จ 8 ก.ค. – จ 19 ส.ค. 67"
Output: {"start_date": "2024-07-08", "end_date": "2024-08-19", "is_range": true}
Input: "15 มกราคม 2567"
Output: {"start_date": "2024-01-15", "end_date": null, "is_range": false}
"""
)
async def parse_date(self, date_str: str) -> Dict[str, Union[str, bool]]:
"""Parse complex Thai date format using OpenAI"""
try:
result = self.prompt_builder.run(date=date_str)
response = await self.generator.arun(prompt=result["prompt"])
if not response or not response.get("replies"):
raise ValueError("Empty response from OpenAI")
parsed = json.loads(response["replies"][0])
for date_field in ['start_date', 'end_date']:
if parsed.get(date_field):
datetime.strptime(parsed[date_field], '%Y-%m-%d')
return parsed
except Exception as e:
logger.error(f"OpenAI date parsing failed for '{date_str}': {str(e)}")
raise ValueError(f"Could not parse date: {date_str}")
class ThaiTextPreprocessor:
"""Handles Thai text preprocessing and normalization"""
CHAR_MAP = {'ํา': 'ำ','์': '','–': '-','—': '-','٫': ',',}
@classmethod
def normalize_thai_text(cls, text: str) -> str:
"""Normalize Thai text by applying character mappings and spacing rules"""
if not text:
return text
for old, new in cls.CHAR_MAP.items():
text = text.replace(old, new)
text = re.sub(r'\s+', ' ', text.strip())
thai_digits = '๐๑๒๓๔๕๖๗๘๙'
arabic_digits = '0123456789'
for thai, arabic in zip(thai_digits, arabic_digits):
text = text.replace(thai, arabic)
return text
class CalendarEventValidator:
"""Validates and preprocesses calendar events"""
def __init__(self, openai_api_key: str):
self.preprocessor = ThaiTextPreprocessor()
self.date_parser = OpenAIDateParser(api_key=openai_api_key)
async def validate_event(self, event: 'CalendarEvent') -> ValidationResult:
"""Validate a calendar event and return validation results"""
errors = []
warnings = []
normalized_data = {}
if event.date:
try:
parsed_date = await self.date_parser.parse_date(event.date)
normalized_data['date'] = parsed_date['start_date']
if parsed_date['is_range'] and parsed_date['end_date']:
range_note = f"ถึงวันที่ {parsed_date['end_date']}"
if event.note:
normalized_data['note'] = f"{event.note}; {range_note}"
else:
normalized_data['note'] = range_note
except ValueError as e:
errors.append(f"Invalid date format: {event.date}")
else:
errors.append("Date is required")
if event.time:
time_pattern = r'^([01]?[0-9]|2[0-3]):([0-5][0-9])$'
if not re.match(time_pattern, event.time):
errors.append(f"Invalid time format: {event.time}")
normalized_data['time'] = event.time
if event.activity:
normalized_activity = self.preprocessor.normalize_thai_text(event.activity)
if len(normalized_activity) < 3:
warnings.append("Activity description is very short")
normalized_data['activity'] = normalized_activity
else:
errors.append("Activity is required")
valid_semesters = {'ภาคต้น', 'ภาคปลาย', 'ภาคฤดูร้อน'}
if event.semester:
normalized_semester = self.preprocessor.normalize_thai_text(event.semester)
if normalized_semester not in valid_semesters:
warnings.append(f"Unusual semester value: {event.semester}")
normalized_data['semester'] = normalized_semester
else:
errors.append("Semester is required")
valid_types = {'registration', 'deadline', 'examination', 'academic', 'holiday'}
if event.event_type not in valid_types:
errors.append(f"Invalid event type: {event.event_type}")
normalized_data['event_type'] = event.event_type
if event.note and 'note' not in normalized_data:
normalized_data['note'] = self.preprocessor.normalize_thai_text(event.note)
if event.section:
normalized_data['section'] = self.preprocessor.normalize_thai_text(event.section)
return ValidationResult(
is_valid=len(errors) == 0,
errors=errors,
warnings=warnings,
normalized_data=normalized_data
)
@dataclass
class CalendarEvent:
"""Structured representation of a calendar event with validation"""
date: str
time: str
activity: str
note: str
semester: str
event_type: str
section: Optional[str] = None
@staticmethod
def classify_event_type(activity: str) -> str:
"""Classify event type based on activity description"""
activity_lower = activity.lower()
keywords = {
'registration': ['ลงทะเบียน', 'ชําระเงิน', 'ค่าธรรมเนียม', 'เปิดเรียน'],
'deadline': ['วันสุดท้าย', 'กําหนด', 'ภายใน', 'ต้องส่ง'],
'examination': ['สอบ', 'ปริญญานิพนธ์', 'วิทยานิพนธ์', 'สอบปากเปล่า'],
'holiday': ['วันหยุด', 'ชดเชย', 'เทศกาล'],
}
for event_type, terms in keywords.items():
if any(term in activity_lower for term in terms):
return event_type
return 'academic'
async def initialize(self, openai_api_key: str):
"""Asynchronously validate and normalize the event"""
validator = CalendarEventValidator(openai_api_key)
result = await validator.validate_event(self)
if not result.is_valid:
raise ValueError(f"Invalid calendar event: {', '.join(result.errors)}")
for field, value in result.normalized_data.items():
setattr(self, field, value)
if result.warnings:
logger.warning(f"Calendar event warnings: {', '.join(result.warnings)}")
def to_searchable_text(self) -> str:
"""Convert event to searchable text format"""
return f"""
ภาคการศึกษา: {self.semester}
ประเภท: {self.event_type}
วันที่: {self.date}
เวลา: {self.time or '-'}
กิจกรรม: {self.activity}
หมวดหมู่: {self.section or '-'}
หมายเหตุ: {self.note or '-'}
""".strip()
class CacheManager:
"""Manages caching for different components of the RAG pipeline"""
def __init__(self, cache_dir: Path, ttl: int = 3600):
"""
Initialize CacheManager
"""
self.cache_dir = cache_dir
self.ttl = ttl
self.embeddings_cache = self._load_cache("embeddings")
self.query_cache = self._load_cache("queries")
self.document_cache = self._load_cache("documents")
def _generate_key(self, data: Union[str, Dict, Any]) -> str:
"""Generate a unique cache key"""
if isinstance(data, str):
content = data.encode('utf-8')
else:
content = json.dumps(data, sort_keys=True).encode('utf-8')
return hashlib.md5(content).hexdigest()
def _load_cache(self, cache_type: str) -> Dict:
"""Load cache from disk"""
cache_path = self.cache_dir / f"{cache_type}_cache.pkl"
if cache_path.exists():
try:
with open(cache_path, 'rb') as f:
cache = pickle.load(f)
self._clean_expired_entries(cache)
return cache
except Exception as e:
logger.warning(f"Failed to load {cache_type} cache: {e}")
return {}
return {}
def _save_cache(self, cache_type: str, cache_data: Dict):
"""Save cache to disk"""
cache_path = self.cache_dir / f"{cache_type}_cache.pkl"
try:
with open(cache_path, 'wb') as f:
pickle.dump(cache_data, f)
except Exception as e:
logger.error(f"Failed to save {cache_type} cache: {e}")
def _clean_expired_entries(self, cache: Dict):
"""Remove expired cache entries"""
current_time = datetime.now()
expired_keys = [
key for key, (_, timestamp) in cache.items()
if current_time - timestamp > timedelta(seconds=self.ttl)
]
for key in expired_keys:
del cache[key]
def get_embedding_cache(self, text: str) -> Optional[Any]:
"""Get cached embedding for text"""
key = self._generate_key(text)
if key in self.embeddings_cache:
embedding, timestamp = self.embeddings_cache[key]
if datetime.now() - timestamp <= timedelta(seconds=self.ttl):
return embedding
return None
def set_embedding_cache(self, text: str, embedding: Any):
"""Cache embedding for text"""
key = self._generate_key(text)
self.embeddings_cache[key] = (embedding, datetime.now())
self._save_cache("embeddings", self.embeddings_cache)
def get_query_cache(self, query: str) -> Optional[Dict]:
"""Get cached query results"""
key = self._generate_key(query)
if key in self.query_cache:
result, timestamp = self.query_cache[key]
if datetime.now() - timestamp <= timedelta(seconds=self.ttl):
return result
return None
def set_query_cache(self, query: str, result: Dict):
"""Cache query results"""
key = self._generate_key(query)
self.query_cache[key] = (result, datetime.now())
self._save_cache("queries", self.query_cache)
def set_document_cache(self, doc_id: str, document: Any):
"""Cache document"""
self.document_cache[doc_id] = (document, datetime.now())
self._save_cache("documents", self.document_cache)
@dataclass
class ModelConfig:
openai_api_key: str
embedder_model: str = "sentence-transformers/paraphrase-multilingual-mpnet-base-v2"
openai_model: str = "gpt-4o"
temperature: float = 0.7
@dataclass
class RetrieverConfig:
top_k: int = 5
@dataclass
class CacheConfig:
enabled: bool = True
cache_dir: Path = Path("./cache")
ttl: int = 86400 # 24 hours
@dataclass
class ProcessingConfig:
batch_size: int = 32
@dataclass
class LocalizationConfig:
enable_thai_normalization: bool = True
@dataclass
class PipelineConfig:
model: ModelConfig
retriever: RetrieverConfig = field(default_factory=RetrieverConfig)
cache: CacheConfig = field(default_factory=CacheConfig)
processing: ProcessingConfig = field(default_factory=ProcessingConfig)
localization: LocalizationConfig = field(default_factory=LocalizationConfig)
def create_default_config(api_key: str) -> PipelineConfig:
"""
Create a default pipeline configuration with optimized settings for Thai language processing.
Args:
api_key (str): OpenAI API key
Returns:
PipelineConfig: Configured pipeline settings
"""
return PipelineConfig(
model=ModelConfig(
openai_api_key=api_key,
temperature=0.3 # Lower temperature for more focused responses
),
retriever=RetrieverConfig(
top_k=5 # Optimal number of documents to retrieve
),
cache=CacheConfig(
enabled=True,
cache_dir=Path("./cache"),
ttl=86400 # 24 hour cache
),
processing=ProcessingConfig(
batch_size=32 # Default batch size for processing
),
localization=LocalizationConfig(
enable_thai_normalization=True # Enable Thai text normalization
)
)
class CalendarDataProcessor:
"""Process and structure calendar data from the new raw-data.json format"""
@staticmethod
def parse_calendar_json(json_data: Dict) -> List[CalendarEvent]:
"""Parse the new calendar JSON format into CalendarEvent objects"""
events = []
# Extract academic calendar data - handle direct dictionary input
calendar_data = json_data.get('academic_calendar', []) if isinstance(json_data, dict) else json_data
for semester_block in calendar_data:
semester = semester_block.get('education', '')
schedule = semester_block.get('schedule', [])
# Handle regular schedule events
for event in schedule:
if 'section' in event and 'details' in event:
# Process section-based events (thesis deadlines, etc.)
section = event['section']
for detail in event['details']:
if 'ภาคต้น' in detail and 'ภาคปลาย' in detail:
# Handle dual-semester events
for sem_key in ['ภาคต้น', 'ภาคปลาย']:
if detail.get(sem_key):
events.append(CalendarEvent(
date=detail[sem_key],
time='',
activity=detail['title'],
note=section,
semester=sem_key,
event_type='deadline',
section=section
))
else:
# Single semester event
events.append(CalendarEvent(
date=detail.get('date', ''),
time='',
activity=detail.get('title', ''),
note=section,
semester=ThaiTextPreprocessor.normalize_thai_text(semester),
event_type='deadline',
section=section
))
else:
# Regular calendar event
event_type = CalendarEvent.classify_event_type(event.get('activity', ''))
# Clean semester string
cleaned_semester = semester
if '(' in semester:
match = re.search(r'\((.*?)\)', semester)
if match:
cleaned_semester = match.group(1)
cleaned_semester = ThaiTextPreprocessor.normalize_thai_text(cleaned_semester)
events.append(CalendarEvent(
date=event.get('date', ''),
time=event.get('time', ''),
activity=event.get('activity', ''),
note=event.get('note', ''),
semester=cleaned_semester,
event_type=event_type
))
return events
@staticmethod
def extract_program_details(json_data: Dict) -> ProgramDetailInfo:
"""Extract and structure program details into ProgramDetailInfo object"""
raw_details = json_data.get('program_details', {})
# Process application info
app_info_data = raw_details.get('application_info', {})
app_info = ApplicationInfo(
application_portal=app_info_data.get('application_portal', ''),
program_email=app_info_data.get('program_email', '')
)
# Process required documents
req_docs = {}
raw_docs = raw_details.get('required_documents', {})
# Process mandatory documents
mandatory_docs = {}
for doc_key, doc_value in raw_docs.get('mandatory', {}).items():
mandatory_docs[doc_key] = RequiredDocument(
name=doc_key,
description=doc_value
)
req_docs['mandatory'] = mandatory_docs
# Process optional documents
optional_docs = {}
for doc_key, doc_data in raw_docs.get('optional', {}).items():
if doc_key == 'english_proficiency':
ep_data = doc_data
optional_docs[doc_key] = RequiredDocument(
name=ep_data.get('name', ''),
description=str(ep_data.get('accepted_tests', {})),
conditions=f"Validity: {ep_data.get('validity', '')}, Benefits: {ep_data.get('benefits', '')}, Exemptions: {ep_data.get('exemptions', '')}"
)
else:
optional_docs[doc_key] = RequiredDocument(
name=doc_data.get('name', ''),
description='',
conditions=doc_data.get('condition', '')
)
req_docs['optional'] = optional_docs
# Process selection steps
selection_steps = []
for step_data in raw_details.get('selection_process', {}).get('steps', []):
for step_num, description in step_data.items():
selection_steps.append(SelectionStep(
step_number=step_num,
description=description
))
return [ProgramDetailInfo(
application_info=app_info,
required_documents=req_docs,
submission_process=raw_details.get('submission_process', ''),
selection_process=selection_steps
)]
@staticmethod
def extract_contact_details(json_data: Dict) -> List[ContactDetail]:
"""Extract and structure contact details into ContactDetail objects"""
raw_contacts = json_data.get('contact_details', [])
contact_details = []
# Handle the case where raw_contacts might be a single object instead of a list
if not isinstance(raw_contacts, list):
raw_contacts = [raw_contacts]
for contact_data in raw_contacts:
# Skip if contact_data is not a dictionary
if not isinstance(contact_data, dict):
continue
try:
# Process transportation data
transportation_data = contact_data.get('transportation', {})
transportation = Transportation(
boat=transportation_data.get('boat', ''),
bts=transportation_data.get('bts', ''),
mrt=transportation_data.get('mrt', ''),
airport_link=transportation_data.get('airport_link', ''),
bus=transportation_data.get('bus', {})
)
# Process contact information
contact_info = Contact(
email=contact_data.get('email', ''),
facebook=contact_data.get('facebook', {})
)
# Create ContactDetail object
contact_details.append(ContactDetail(
event_type=contact_data.get('event_type', ''),
department=contact_data.get('department', ''),
faculty=contact_data.get('faculty', ''),
university=contact_data.get('university', ''),
location=contact_data.get('location', ''),
contact=contact_info,
transportation=transportation
))
except Exception as e:
continue
return contact_details
@staticmethod
def extract_course_structure(json_data: Dict) -> List[CourseStructure]:
"""Extract and structure course information into CourseStructure objects"""
course_structures = []
# Get course structure data
course_data = json_data.get('course_structure', {})
program_metadata = course_data.get('program_metadata', {})
curriculum = course_data.get('curriculum_structure', {})
# Process foundation courses
foundation_data = curriculum.get('foundation_courses', {})
foundation_courses = []
for course in foundation_data.get('courses', []):
foundation_courses.append(Course(
code=course.get('code', ''),
title_th=course.get('title', {}).get('th', ''),
title_en=course.get('title', {}).get('en', ''),
credits=course.get('credits', 0)
))
# Process core courses
core_data = curriculum.get('core_courses', {})
core_courses = []
for course in core_data.get('modules', []):
core_courses.append(Course(
code=course.get('code', ''),
title_th=course.get('title', {}).get('th', ''),
title_en=course.get('title', {}).get('en', ''),
credits=course.get('credits', 0)
))
# Process elective courses
elective_data = curriculum.get('electives', {})
elective_courses = []
for course in elective_data.get('course_groups', []):
elective_courses.append(Course(
code=course.get('code', ''),
title_th=course.get('title', {}).get('th', ''),
title_en=course.get('title', {}).get('en', ''),
credits=course.get('credits', 0)
))
# Process research courses
research_data = curriculum.get('research', {})
research_courses = []
for course in research_data.get('course', []):
research_courses.append(Course(
code=course.get('code', ''),
title_th=course.get('title', {}).get('th', ''),
title_en=course.get('title', {}).get('en', ''),
credits=course.get('credits', 0)
))
# Create course categories
structure = {
'หมวดวิชาปรับพื้นฐาน': CourseCategory( # Previously foundation_courses
description="วิชาพื้นฐานที่จำเป็นต้องเรียน foundation courses รายวิชาปรับพื้นฐาน",
credits=foundation_data.get('metadata', {}).get('credits', 'non-credit'),
minimum_credits=None,
courses=foundation_courses
),
'หมวดวิชาบังคับ': CourseCategory( # Previously core_courses
description="วิชาบังคับ วิชาหลัก core courses รายวิชาที่ต้องเรียน",
credits=0,
minimum_credits=core_data.get('minimum_requirement_credits'),
courses=core_courses
),
'หมวดวิชาเลือก': CourseCategory( # Previously elective_courses
description="วิชาเลือก elective courses รายวิชาเลือก วิชาที่สามารถเลือกเรียนได้",
credits=0,
minimum_credits=elective_data.get('minimum_requirement_credits'),
courses=elective_courses
),
'หมวดวิชาการค้นคว้าอิสระ': CourseCategory( # Previously research_courses
description="วิชาค้นคว้าอิสระ research courses วิทยานิพนธ์",
credits=0,
minimum_credits=research_data.get('minimum_requirement_credits'),
courses=research_courses
)
}
# Create course structure
course_structure = CourseStructure(
event_type='curriculum_structure',
program_name=program_metadata.get('name', ''),
department=program_metadata.get('department', ''),
total_credits=program_metadata.get('total_credits', 0),
degree_level=program_metadata.get('degree_level', ''),
structure=structure
)
return [course_structure]
@staticmethod
def extract_program_study_plan(json_data: Dict) -> List[StudyPlan]:
"""Extract and structure study plan information into StudyPlan objects"""
study_plan_data = json_data.get('program_study_plan', {})
# Initialize the years dictionary to store all year/semester data
years_dict = {}
for year_key, year_data in study_plan_data.items():
years_dict[year_key] = {}
for semester_key, semester_data in year_data.items():
# Get metadata
metadata = semester_data.get('metadata', {})
# Initialize semester structure
semester_struct = {
'metadata': metadata,
'courses': []
}
# Handle both 'modules' and 'courses' keys
course_data = semester_data.get('modules', []) or semester_data.get('courses', [])
# Add courses to semester
for course in course_data:
course_info = {
'code': course.get('code', ''),
'title': course.get('title', {'th': '', 'en': ''}),
'credits': course.get('credits', 0)
}
semester_struct['courses'].append(course_info)
# Add semester data to year
years_dict[year_key][semester_key] = semester_struct
# Create StudyPlan object
study_plan = StudyPlan(
event_type='study_plan',
years=years_dict
)
return [study_plan]
@staticmethod
def extract_fees(json_data: Dict) -> List[TuitionFee]:
"""Extract and structure fee information into TuitionFee objects"""
fees_data = json_data.get('fees', {})
# Parse regular tuition fee
regular_fee_str = fees_data.get('tuition', '')
regular_amount = float(regular_fee_str.split()[0]) if regular_fee_str else 0
regular_fee = RegularFee(
amount=regular_amount,
currency='THB',
period='per semester'
)
# Parse late payment fee
late_fee_str = fees_data.get('late_payment', '')
late_amount = float(late_fee_str.split()[0]) if late_fee_str else 0
late_payment_fee = LatePaymentFee(
amount=late_amount,
currency='THB'
)
# Create TuitionFee object
tuition_fee = TuitionFee(
event_type='tuition_fee',
regular_fee=regular_fee,
late_payment_fee=late_payment_fee
)
return [tuition_fee]
class HybridDocumentStore:
"""Enhanced document store with hybrid retrieval capabilities"""
def __init__(self, config: PipelineConfig):
self.store = InMemoryDocumentStore()
self.embedder = SentenceTransformersDocumentEmbedder(
model=config.model.embedder_model
)
# Initialize BM25 retriever
self.bm25_retriever = InMemoryBM25Retriever(
document_store=self.store,
top_k=config.retriever.top_k
)
# Initialize embedding retriever
self.embedding_retriever = InMemoryEmbeddingRetriever(
document_store=self.store,
top_k=config.retriever.top_k
)
self.cache_manager = CacheManager(
cache_dir=config.cache.cache_dir,
ttl=config.cache.ttl
)
self.embedder.warm_up()
# Initialize containers
self.events = []
self.event_type_index = {}
self.semester_index = {}
self._document_counter = 0
# Additional data containers
self.course_data = []
self.contact_data = []
self.study_plan_data = []
def _generate_unique_id(self) -> str:
"""Generate a unique document ID"""
self._document_counter += 1
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
return f"doc_{timestamp}_{self._document_counter}"
def _compute_embedding(self, text: str) -> Any:
"""Compute embedding with caching"""
cached_embedding = self.cache_manager.get_embedding_cache(text)
if cached_embedding is not None:
return cached_embedding
doc = Document(content=text)
embedding = self.embedder.run(documents=[doc])["documents"][0].embedding
self.cache_manager.set_embedding_cache(text, embedding)
return embedding
def _format_required_docs(self, docs: Dict) -> str:
"""Format required documents information with detailed English proficiency requirements"""
result = []
if 'mandatory' in docs:
result.append("เอกสารที่ต้องใช้:")
for doc in docs['mandatory'].values():
result.append(f"- {doc.name}: {doc.description}")
if 'optional' in docs:
result.append("\nเอกสารเพิ่มเติม:")
for doc_key, doc in docs['optional'].items():
if doc_key == 'english_proficiency':
result.append(f"- {doc.name}")
# Parse and format the accepted tests
try:
accepted_tests = eval(doc.description)
result.append(" เกณฑ์คะแนนที่ยอมรับ:")
for test, requirement in accepted_tests.items():
result.append(f" * {test}: {requirement}")
except:
result.append(f" {doc.description}")
if doc.conditions:
conditions = doc.conditions.split(', ')
for condition in conditions:
result.append(f" {condition}")
else:
desc = f"- {doc.name}"
if doc.conditions:
desc += f" ({doc.conditions})"
result.append(desc)
return "\n".join(result)
def _format_selection_steps(self, steps: List[SelectionStep]) -> str:
"""Format selection process steps"""
return "\n".join(f"{step.step_number}. {step.description}" for step in steps)
def add_events(self,
events: List[CalendarEvent],
contact_details: Optional[List[ContactDetail]] = None,
course_structure: Optional[List[CourseStructure]] = None,
study_plans: Optional[List[StudyPlan]] = None,
program_details: Optional[List[ProgramDetailInfo]] = None,
tuition_fees: Optional[List[TuitionFee]] = None):
"""Add events and additional data with caching"""
documents = []
added_events = set() # Track added events to prevent duplicates
# Process calendar events
for event in events:
event_key = f"{event.date}_{event.activity}_{event.semester}"
if event_key not in added_events:
added_events.add(event_key)
self.events.append(event)
event_idx = len(self.events) - 1
# Update indices
if event.event_type not in self.event_type_index:
self.event_type_index[event.event_type] = []
self.event_type_index[event.event_type].append(event_idx)
if event.semester not in self.semester_index:
self.semester_index[event.semester] = []
self.semester_index[event.semester].append(event_idx)
# Create document
text = event.to_searchable_text()
embedding = self._compute_embedding(text)
doc = Document(
id=self._generate_unique_id(),
content=text,
embedding=embedding,
meta={
'event_type': event.event_type,
'semester': event.semester,
'date': event.date,
'event_idx': event_idx
}
)
documents.append(doc)
self.cache_manager.set_document_cache(str(event_idx), doc)
# Process contact details
if contact_details:
for contact in contact_details:
self.contact_data.append(contact)
text = f"""
ข้อมูลการติดต่อ:
คณะ: {contact.faculty}
ภาควิชา: {contact.department}
มหาวิทยาลัย: {contact.university}
สถานที่: {contact.location}
การติดต่อ:
อีเมล: {contact.contact.email}
Facebook: {json.dumps(contact.contact.facebook, ensure_ascii=False)}
การเดินทาง:
เรือ: {contact.transportation.boat}
BTS: {contact.transportation.bts}
MRT: {contact.transportation.mrt}
Airport Link: {contact.transportation.airport_link}
รถประจำทาง: {json.dumps(contact.transportation.bus, ensure_ascii=False)}
"""
embedding = self._compute_embedding(text)
doc = Document(
id=self._generate_unique_id(),
content=text,
embedding=embedding,
meta={'event_type': 'contact'}
)
documents.append(doc)
# Process course structure
if course_structure:
for course in course_structure:
text = f"""
โครงสร้างหลักสูตร:
ชื่อหลักสูตร: {course.program_name}
ภาควิชา: {course.department}
หน่วยกิตรวม: {course.total_credits}
ระดับการศึกษา: {course.degree_level}
รายละเอียดโครงสร้าง:
หมวดวิชาปรับพื้นฐาน/วิชาพื้นฐาน:
คำอธิบาย: {course.structure['หมวดวิชาปรับพื้นฐาน'].description or 'ไม่ระบุ'}
หน่วยกิต: {course.structure['หมวดวิชาปรับพื้นฐาน'].credits}
รายวิชา:
"""
# Add foundation courses
foundation_courses = []
for c in course.structure['หมวดวิชาปรับพื้นฐาน'].courses:
foundation_courses.append(f"- {c.code}: {c.title_th} ({c.title_en}) - {c.credits} หน่วยกิต")
text += "\n".join(foundation_courses)
text += f"""
หมวดวิชาบังคับ/วิชาหลัก:
หน่วยกิตขั้นต่ำ: {course.structure['หมวดวิชาบังคับ'].minimum_credits}
รายวิชา:
"""
# Add core courses
core_courses = []
for c in course.structure['หมวดวิชาบังคับ'].courses:
core_courses.append(f"- {c.code}: {c.title_th} ({c.title_en}) - {c.credits} หน่วยกิต")
text += "\n".join(core_courses)
text += f"""
หมวดวิชาเลือก:
หน่วยกิตขั้นต่ำ: {course.structure['หมวดวิชาเลือก'].minimum_credits}
รายวิชา:
"""
# Add elective courses
elective_courses = []
for c in course.structure['หมวดวิชาเลือก'].courses:
elective_courses.append(f"- {c.code}: {c.title_th} ({c.title_en}) - {c.credits} หน่วยกิต")
text += "\n".join(elective_courses)
text += f"""
หมวดวิชาการค้นคว้าอิสระ:
หน่วยกิตขั้นต่ำ: {course.structure['หมวดวิชาการค้นคว้าอิสระ'].minimum_credits}
รายวิชา:
"""
# Add research courses
research_courses = []
for c in course.structure['หมวดวิชาการค้นคว้าอิสระ'].courses:
research_courses.append(f"- {c.code}: {c.title_th} ({c.title_en}) - {c.credits} หน่วยกิต")
text += "\n".join(research_courses)
doc = Document(
id=self._generate_unique_id(),
content=text.strip(),
embedding=self._compute_embedding(text),
meta={'event_type': 'curriculum'}
)
documents.append(doc)
# Process study plans
if study_plans:
for plan in study_plans:
self.study_plan_data.append(plan)
for year, semesters in plan.years.items():
for semester, data in semesters.items():
# Convert year and semester format
year_num = year.replace('year', '')
semester_num = semester.replace('semester', '')
# Determine course type and translate to Thai
course_type = data.get('metadata', {}).get('course_type', 'core')
course_type_th = 'วิชาหลัก' if course_type == 'core' else 'วิชาเลือก'
# Calculate total credits
total_credits = sum(course.get('credits', 0) for course in data.get('courses', []))
text = f"""แผนการศึกษา:
ปี: {year_num}
ภาคการศึกษา: {semester_num}
ประเภทรายวิชา: {course_type_th} ({course_type})
จำนวนหน่วยกิตรวม: {total_credits}
รายวิชาที่ต้องเรียน:"""
# Add courses
if 'courses' in data:
for course in data['courses']:
text += f"\n- {course['code']}: {course['title'].get('th', '')} ({course['title'].get('en', '')}) - {course['credits']} หน่วยกิต"
embedding = self._compute_embedding(text)
doc = Document(
id=self._generate_unique_id(),
content=text,
embedding=embedding,
meta={
'event_type': 'study_plan',
'year': year_num,
'semester': semester_num,
'course_type': course_type
}
)
documents.append(doc)
if program_details:
for detail in program_details:
# Main application document
app_text = f"""
ข้อมูลการสมัคร:
เว็บไซต์รับสมัคร: {detail.application_info.application_portal}
อีเมล: {detail.application_info.program_email}
เอกสารที่ต้องใช้:
{self._format_required_docs(detail.required_documents)}
ขั้นตอนการส่งเอกสาร:
{detail.submission_process}
ขั้นตอนการคัดเลือก:
{self._format_selection_steps(detail.selection_process)}
"""
doc = Document(
id=self._generate_unique_id(),
content=app_text.strip(),
embedding=self._compute_embedding(app_text),
meta={'event_type': 'program_details'}
)
documents.append(doc)
# Create separate document for English proficiency requirements
if 'optional' in detail.required_documents:
eng_prof = next((doc for doc_key, doc in detail.required_documents['optional'].items()
if doc_key == 'english_proficiency'), None)
if eng_prof:
eng_text = f"""
ข้อกำหนดภาษาอังกฤษ:
{eng_prof.name}
รายละเอียด: {eng_prof.description}
เงื่อนไข: {eng_prof.conditions}
"""
eng_doc = Document(
id=self._generate_unique_id(),
content=eng_text.strip(),
embedding=self._compute_embedding(eng_text),
meta={
'event_type': 'program_details' }
)
documents.append(eng_doc)
# Process tuition fees
if tuition_fees:
for fee in tuition_fees:
fee_text = f"""
ค่าธรรมเนียมการศึกษา:
- ค่าเล่าเรียน: {fee.regular_fee.amount:,.2f} {fee.regular_fee.currency} {fee.regular_fee.period}
- ค่าปรับชำระล่าช้า: {fee.late_payment_fee.amount:,.2f} {fee.late_payment_fee.currency}
"""
doc = Document(
id=self._generate_unique_id(),
content=fee_text.strip(),
embedding=self._compute_embedding(fee_text),
meta={'event_type': 'fees'}
)
documents.append(doc)
batch_size = 10
for i in range(0, len(documents), batch_size):
batch = documents[i:i + batch_size]
try:
self.store.write_documents(batch)
except Exception as e:
logger.error(f"Error writing document batch {i//batch_size + 1}: {str(e)}")
for doc in batch:
try:
self.store.write_documents([doc])
except Exception as e2:
logger.error(f"Failed to write document {doc.id}: {str(e2)}")
def hybrid_search(self,
query: str,
event_type: Optional[str] = None,
detail_type: Optional[str] = None,
semester: Optional[str] = None,
top_k: int = 10,
weight_semantic: float = 0.5) -> List[Document]:
"""Hybrid search combining semantic and lexical search results"""
cache_key = json.dumps({
'query': query,
'event_type': event_type,
'semester': semester,
'top_k': top_k,
'weight_semantic': weight_semantic
})
cached_results = self.cache_manager.get_query_cache(cache_key)
if cached_results is not None:
return cached_results
# Get semantic search results
query_embedding = self._compute_embedding(query)
semantic_results = self.embedding_retriever.run(query_embedding=query_embedding)["documents"]
# Get BM25 results
bm25_results = self.bm25_retriever.run(
query=query
)["documents"]
if event_type == "program_details":
weight_semantic = 0.3 # Give more weight to keyword matching
# Combine results using score fusion
combined_results = self._merge_results(
semantic_results=semantic_results,
bm25_results=bm25_results,
weight_semantic=weight_semantic,
top_k=top_k
)
# Filter results based on metadata
filtered_results = []
for doc in combined_results:
if event_type and event_type != "program_details" and doc.meta.get('event_type') != event_type:
continue # Keep only relevant event type unless it's program_details
filtered_results.append(doc)
final_results = filtered_results[:top_k]
self.cache_manager.set_query_cache(cache_key, final_results)
return final_results
def _merge_results(self,
semantic_results: List[Document],
bm25_results: List[Document],
weight_semantic: float,
top_k: int) -> List[Document]:
"""Merge semantic and BM25 results using weighted score fusion"""
# Create dictionaries to store normalized scores
semantic_scores = {}
bm25_scores = {}
# Normalize semantic scores
max_semantic_score = max(doc.score for doc in semantic_results) if semantic_results else 1.0
for doc in semantic_results:
semantic_scores[doc.id] = doc.score / max_semantic_score if max_semantic_score > 0 else 0
# Normalize BM25 scores
max_bm25_score = max(doc.score for doc in bm25_results) if bm25_results else 1.0
for doc in bm25_results:
bm25_scores[doc.id] = doc.score / max_bm25_score if max_bm25_score > 0 else 0
# Combine scores
combined_scores = {}
all_docs = {doc.id: doc for doc in semantic_results + bm25_results}
for doc_id in all_docs:
semantic_score = semantic_scores.get(doc_id, 0)
bm25_score = bm25_scores.get(doc_id, 0)
# Weighted combination
combined_scores[doc_id] = (
weight_semantic * semantic_score +
(1 - weight_semantic) * bm25_score
)
# Sort by combined score and return top_k results
sorted_docs = sorted(
all_docs.values(),
key=lambda x: combined_scores[x.id],
reverse=True
)
return sorted_docs[:top_k]
def search_with_reranking(self,
query: str,
event_type: Optional[str] = None,
detail_type: Optional[str] = None,
semester: Optional[str] = None,
top_k_initial: int = 20,
top_k_final: int = 5,
weight_semantic: float = 0.5) -> List[Document]:
"""
Two-stage retrieval with hybrid search followed by cross-encoder reranking
"""
# Generate cache key for the reranked query
cache_key = json.dumps({
'query': query,
'event_type': event_type,
'semester': semester,
'top_k_initial': top_k_initial,
'top_k_final': top_k_final,
'weight_semantic': weight_semantic,
'reranked': True # Indicate this is a reranked query
})
# Check cache first
cached_results = self.cache_manager.get_query_cache(cache_key)
if cached_results is not None:
return cached_results
# 1. Get larger initial result set
initial_results = self.hybrid_search(
query=query,
event_type=event_type,
detail_type=detail_type,
semester=semester,
top_k=top_k_initial,
weight_semantic=weight_semantic
)
# If we don't have enough initial results, just return what we have
if len(initial_results) <= top_k_final:
return initial_results
try:
# We'll lazily initialize the cross encoder to save memory
cross_encoder = SentenceTransformersCrossEncoder("cross-encoder/mmarco-mMiniLMv2-L12-H384-v1")
pairs = [(query, doc.content) for doc in initial_results]
scores = cross_encoder.predict(pairs)
for doc, score in zip(initial_results, scores):
doc.score = float(score) # Ensure score is a regular float
reranked_results = sorted(initial_results, key=lambda x: x.score, reverse=True)[:top_k_final]
# Cache the results
self.cache_manager.set_query_cache(cache_key, reranked_results)
return reranked_results
except Exception as e:
logger.error(f"Reranking failed: {str(e)}. Falling back to hybrid search results.")
return initial_results[:top_k_final]
class ResponseGenerator:
"""Generate responses with enhanced conversation context awareness"""
def __init__(self, config: PipelineConfig):
self.generator = OpenAIGenerator(
api_key=Secret.from_token(config.model.openai_api_key),
model=config.model.openai_model
)
self.prompt_builder = PromptBuilder(
template="""
คุณเป็นที่ปรึกษาทางวิชาการ กรุณาตอบคำถามต่อไปนี้โดยใช้ข้อมูลจากเอกสารที่ให้มาและพิจารณาบริบทจากประวัติการสนทนา
{% if conversation_history %}
ประวัติการสนทนา:
{% for message in conversation_history %}
{% if message.role == 'user' %}
ผู้ใช้: {{ message.content }}
{% else %}
ที่ปรึกษา: {{ message.content }}
{% endif %}
{% endfor %}
{% endif %}
คำถามปัจจุบัน: {{query}}
ข้อมูลที่เกี่ยวข้อง:
{% for doc in context %}
---
ประเภท: {{doc.meta.event_type}}{% if doc.meta.detail_type %}, รายละเอียด: {{doc.meta.detail_type}}{% endif %}
เนื้อหา:
{{doc.content}}
{% endfor %}
คำแนะนำในการตอบ:
1. ตอบเฉพาะข้อมูลที่มีในเอกสารเท่านั้น
2. หากไม่มีข้อมูลให้ตอบว่า "ขออภัย ไม่พบข้อมูลที่เกี่ยวข้องกับคำถามนี้"
3. หากข้อมูลไม่ชัดเจนให้ระบุว่าข้อมูลอาจไม่ครบถ้วน
4. จัดรูปแบบคำตอบให้อ่านง่าย ใช้หัวข้อและย่อหน้าตามความเหมาะสม
5. สำหรับคำถามเกี่ยวกับข้อกำหนดภาษาอังกฤษหรือขั้นตอนการสมัคร ให้อธิบายข้อมูลอย่างละเอียด
6. ใส่ข้อความ "หากมีข้อสงสัยเพิ่มเติม สามารถสอบถามได้" ท้ายคำตอบเสมอ
7. คำนึงถึงประวัติการสนทนาและให้คำตอบที่ต่อเนื่องกับบทสนทนาก่อนหน้า
8. หากคำถามอ้างอิงถึงข้อมูลในบทสนทนาก่อนหน้า (เช่น "แล้วอันนั้นล่ะ", "มีอะไรอีกบ้าง", "คำถามก่อนหน้า") ให้พิจารณาบริบทและตอบคำถามอย่างตรงประเด็น แต่ไม่ต้องแสดงคำถามก่อนหน้าในคำตอบ
9. กรณีคำถามมีความไม่ชัดเจน ใช้ประวัติการสนทนาเพื่อเข้าใจบริบทของคำถาม
สำคัญ: ไม่ต้องใส่คำว่า "คำถามก่อนหน้าคือ [คำถามก่อนหน้า] และคำตอบคือ..." ในคำตอบของคุณ ให้ตอบคำถามโดยตรง
กรุณาตอบเป็นภาษาไทย:
"""
)
def generate_response(self,
query: str,
documents: List[Document],
query_info: Dict[str, Any],
conversation_history: List[Dict[str, str]] = None) -> str:
"""Generate response using retrieved documents and conversation history"""
try:
# Enhanced handling of reference questions
reference_keywords = ["ก่อนหน้านี้", "ก่อนหน้า", "ที่ผ่านมา", "คำถามก่อนหน้า", "คำถามที่แล้ว",
"previous", "earlier", "before", "last time", "last question"]
is_reference_question = any(keyword in query.lower() for keyword in reference_keywords)
# For reference questions, we'll add additional prompting
enhanced_context = conversation_history or []
result = self.prompt_builder.run(
query=query,
context=documents,
format=query_info.get("response_format", "detailed"),
conversation_history=enhanced_context,
is_reference_question=is_reference_question
)
response = self.generator.run(prompt=result["prompt"])
return response["replies"][0]
except Exception as e:
logger.error(f"Response generation failed: {str(e)}")
return "ขออภัย ไม่สามารถประมวลผลคำตอบได้ในขณะนี้"
class AdvancedQueryProcessor:
"""Process queries with better understanding"""
def __init__(self, config: PipelineConfig):
self.generator = OpenAIGenerator(
api_key=Secret.from_token(config.model.openai_api_key),
model=config.model.openai_model
)
self.prompt_builder = PromptBuilder(
template="""
คุณเป็นผู้ช่วย AI ที่เชี่ยวชาญด้านการศึกษาในประเทศไทย หน้าที่ของคุณคือการวิเคราะห์และจำแนกคำถามของผู้ใช้ให้ตรงกับหมวดหมู่ข้อมูลที่เหมาะสม ได้แก่:
1. **รายละเอียดโปรแกรมการศึกษา (program_details)**: ข้อมูลเกี่ยวกับหลักสูตร โปรแกรมการเรียนการสอน และโครงสร้างหลักสูตร
2. **ข้อมูลการติดต่อ (contact)**: ข้อมูลการติดต่อของหน่วยงานหรือบุคคลที่เกี่ยวข้องในสถาบันการศึกษา
3. **โครงสร้างหลักสูตร (curriculum)**: รายละเอียดเกี่ยวกับวิชาเรียน หน่วยกิต และแผนการศึกษา
4. **ค่าเล่าเรียน (fees)**: ข้อมูลเกี่ยวกับค่าใช้จ่ายในการศึกษา ค่าธรรมเนียม และทุนการศึกษา
5. **แผนการศึกษารายปี (study_plan)**: ข้อมูลแผนการเรียนแบ่งตามชั้นปีและภาคการศึกษา รายละเอียดรายวิชาที่ต้องลงทะเบียนในแต่ละเทอม และจำนวนหน่วยกิตรวม
**คำถาม**: {{query}}
**คำแนะนำในการวิเคราะห์**:
- ตรวจสอบคำสำคัญในคำถามเพื่อระบุหมวดหมู่ที่สอดคล้อง
- หากคำถามเกี่ยวข้องกับหลายหมวดหมู่ ให้จัดลำดับความสำคัญตามความต้องการของผู้ใช้
- หากไม่สามารถระบุหมวดหมู่ได้อย่างชัดเจน ให้จัดหมวดหมู่เป็น "อื่นๆ" และระบุความไม่แน่นอน
**รูปแบบการตอบกลับ**:
หมายเหตุ:
- รูปแบบปีการศึกษาที่ยอมรับ: "ปีที่ 1", "ปี 1", "ชั้นปีที่ 1"
- รูปแบบภาคการศึกษาที่ยอมรับ: "เทอมที่ 1", "เทอม 1", "ภาคการศึกษาที่ 1"
- หากข้อมูลไม่ครบ ให้ระบุค่าสำหรับฟิลด์ที่ขาดหายเป็น null พร้อมข้อความแจ้งความไม่แน่นอน
ให้ผลลัพธ์ในรูปแบบ JSON ตามโครงสร้าง:
{
"event_type": "program_details" | "contact" | "curriculum" | "fees" | "study_plan",
"year": "ปีที่ X", // แปลงเป็นรูปแบบมาตรฐาน หรือ null หากไม่ระบุ
"semester": "เทอมที่ X", // แปลงเป็นรูปแบบมาตรฐาน หรือ null หากไม่ระบุ
"key_terms": ["คำสำคัญที่เกี่ยวข้อง"],
"response_format": "detailed",
"uncertainty": "low" // ระบุระดับความไม่แน่นอน (เช่น 'low', 'high')
}
ตัวอย่าง:
Input: "โปรแกรมการศึกษามีรายละเอียดอะไรบ้าง"
Output: {
"event_type": "program_details",
"year": null,
"semester": null,
"key_terms": ["โปรแกรมการศึกษา", "รายละเอียด"],
"response_format": "detailed",
"uncertainty": "low"
}
Input: "ฉันจะติดต่อภาควิชาได้อย่างไร"
Output: {
"event_type": "contact",
"year": null,
"semester": null,
"key_terms": ["ติดต่อ", "ภาควิชา"],
"response_format": "detailed",
"uncertainty": "low"
}
Input: "โครงสร้างหลักสูตรของปี 2 เป็นอย่างไร"
Output: {
"event_type": "curriculum",
"year": "ปีที่ 2",
"semester": null,
"key_terms": ["โครงสร้างหลักสูตร"],
"response_format": "detailed",
"uncertainty": "low"
}
Input: "ค่าเล่าเรียนสำหรับเทอม 1 เท่าไหร่"
Output: {
"event_type": "fees",
"year": null,
"semester": "เทอมที่ 1",
"key_terms": ["ค่าเล่าเรียน", "เทอม 1"],
"response_format": "detailed",
"uncertainty": "low"
}
Input: "ปี 1 เทอม 1 ต้องเรียนอะไรบ้าง"
Output: {
"event_type": "study_plan",
"year": null,
"semester": null,
"key_terms": ["เรียนอะไร", "เทอม"],
"response_format": "detailed",
"uncertainty": "low"
}
กรุณาตอบเป็นภาษาไทยและตรวจสอบให้แน่ใจว่า JSON มีโครงสร้างที่ถูกต้อง
"""
)
def normalize_year_semester(self, query: str) -> str:
"""Normalize year and semester formats in queries"""
# Year patterns
year_patterns = {
r'ปี\s*(\d+)': r'ปีที่ \1',
r'ชั้นปีที่\s*(\d+)': r'ปีที่ \1',
r'ปีการศึกษาที่\s*(\d+)': r'ปีที่ \1'
}
# Semester patterns
semester_patterns = {
r'เทอม\s*(\d+)': r'เทอมที่ \1',
r'ภาคเรียนที่\s*(\d+)': r'เทอมที่ \1',
r'ภาคการศึกษาที่\s*(\d+)': r'เทอมที่ \1'
}
normalized_query = query
for pattern, replacement in year_patterns.items():
normalized_query = re.sub(pattern, replacement, normalized_query)
for pattern, replacement in semester_patterns.items():
normalized_query = re.sub(pattern, replacement, normalized_query)
return normalized_query
def normalize_query(self, query: str) -> str:
"""เพิ่มการเปลี่ยนแปลงคำ (synonym mapping) เพื่อลดปัญหา Vocabulary Mismatch"""
normalized_query = self.normalize_year_semester(query)
# เพิ่ม mapping สำหรับคำที่มีความหมายเดียวกัน
synonyms = {
"วิชาเลือก": "หมวดวิชาเลือก"
# สามารถเพิ่มคำอื่น ๆ ได้ตามต้องการ
}
for original, replacement in synonyms.items():
normalized_query = normalized_query.replace(original, replacement)
return normalized_query
def _get_default_analysis(self, query: str) -> Dict[str, Any]:
logger.info("Returning default analysis")
return {
"original_query": query,
"event_type": None,
"semester": None,
"key_terms": [],
"response_format": "detailed"
}
def process_query(self, query: str) -> Dict[str, Any]:
"""Enhanced query processing with support for detail types and better categorization."""
try:
# ใช้ normalize_query ที่แก้ไขแล้วเพื่อให้คำค้นมีรูปแบบที่ตรงกับดัชนีข้อมูล
normalized_query = self.normalize_query(query)
result = self.prompt_builder.run(query=normalized_query)
response = self.generator.run(prompt=result["prompt"])
if not response or not response.get("replies") or not response["replies"][0]:
logger.warning("Received empty response from OpenAI")
return self._get_default_analysis(query)
# ทำความสะอาด JSON string
json_str = response["replies"][0]
json_str = json_str.replace("```json", "").replace("```", "").strip()
analysis = json.loads(json_str)
analysis['detail_type'] = None
# Enhanced categorization with detail types
if any(keyword in query.lower() for keyword in ['ภาษาอังกฤษ', 'toefl', 'ielts', 'swu-set', 'โทอิค', 'คะแนนภาษา']):
analysis['event_type'] = 'program_details'
elif any(keyword in query.lower() for keyword in ['สมัคร', 'ขั้นตอน', 'วิธีการ', 'เอกสาร', 'หลักฐาน', 'admission']):
analysis['event_type'] = 'program_details'
analysis['detail_type'] = None
elif any(keyword in query.lower() for keyword in ['ค่าเทอม', 'ค่าธรรมเนียม', 'ค่าเรียน', 'ค่าปรับ', 'ค่าใช้จ่าย']):
analysis['event_type'] = 'fees'
elif any(keyword in query.lower() for keyword in ['หน่วยกิต', 'วิชา', 'หลักสูตร', 'แผนการเรียน', 'วิชาเลือก', 'วิชาบังคับ', 'วิชาหลัก', 'หมวดวิชา']):
analysis['event_type'] = 'curriculum'
return {
"original_query": query,
**analysis
}
except Exception as e:
logger.error(f"Query processing failed: {str(e)}")
return self._get_default_analysis(query)
# First, let's modify the AcademicCalendarRAG class to maintain conversation history
class AcademicCalendarRAG:
"""Enhanced RAG system for academic calendar and program information with conversation memory"""
def __init__(self, config: PipelineConfig):
self.config = config
self.document_store = HybridDocumentStore(config)
self.query_processor = AdvancedQueryProcessor(config)
self.response_generator = ResponseGenerator(config)
self.data_processor = CalendarDataProcessor()
# Initialize conversation memory
self.conversation_history = []
self.max_history_length = 5 # Keep last 5 exchanges (10 messages)
# Initialize data containers
self.calendar_events = []
self.program_details = []
self.contact_details = []
self.course_structure = []
self.study_plans = []
self.tuition_fees = []
def add_to_conversation(self, role: str, content: str):
"""Add a message to the conversation history"""
self.conversation_history.append({"role": role, "content": content})
# Limit history length to prevent context overflow
if len(self.conversation_history) > self.max_history_length * 2: # Each exchange is 2 messages
self.conversation_history = self.conversation_history[-(self.max_history_length * 2):]
def load_data(self, json_data: Dict):
"""Load and process all data sources"""
try:
raw_events = self.data_processor.parse_calendar_json(json_data)
for event in raw_events:
if not event.event_type:
event.event_type = CalendarEvent.classify_event_type(event.activity)
self.calendar_events.append(event)
# Process other data types
self.program_details = self.data_processor.extract_program_details(json_data)
self.contact_details = self.data_processor.extract_contact_details(json_data)
self.course_structure = self.data_processor.extract_course_structure(json_data)
self.study_plans = self.data_processor.extract_program_study_plan(json_data)
self.tuition_fees = self.data_processor.extract_fees(json_data)
self.document_store.add_events(
events=self.calendar_events,
program_details=self.program_details,
contact_details=self.contact_details,
course_structure=self.course_structure,
study_plans=self.study_plans,
tuition_fees=self.tuition_fees
)
except Exception as e:
logger.error(f"Error loading data: {str(e)}")
raise
def process_query(self, query: str, conversation_history=None) -> Dict[str, Any]:
"""Process user query using conversation history and hybrid retrieval."""
# Use provided conversation history or the internal history
if conversation_history is not None:
self.conversation_history = conversation_history
# Add the current query to history
self.add_to_conversation("user", query)
# Create a context-enhanced query by including relevant previous exchanges
query_with_context = query
if self.conversation_history and len(self.conversation_history) > 1:
# Extract previous exchanges to provide context (up to 2 previous exchanges)
prev_exchanges = self.conversation_history[:-1]
if len(prev_exchanges) > 4: # Limit to last 2 exchanges (4 messages)
prev_exchanges = prev_exchanges[-4:]
context_str = "\n".join([f"{msg['role']}: {msg['content']}" for msg in prev_exchanges])
query_with_context = f"Previous conversation:\n{context_str}\n\nCurrent question: {query}"
# Process with conversation context
max_attempts = 4 # Allow up to 4 attempts
attempt = 0
weight_values = [0.3, 0.7, 0.3, 0.7] # Switching semantic retrieval weight
while attempt < max_attempts:
attempt += 1
try:
# Analyze query - use context-enhanced query for better understanding
if attempt <= 2:
query_info = self.query_processor.process_query(query_with_context if attempt == 1 else query)
else:
query_info = self.query_processor._get_default_analysis(query)
logger.info(f"Retrying query processing (attempt {attempt}) with default analysis")
weight_semantic = weight_values[attempt - 1]
# Get relevant documents using hybrid search
logger.info(f"Attempt {attempt}: Searching with weight_semantic={weight_semantic}")
documents = self.document_store.hybrid_search(
query=query_with_context if attempt == 1 else query,
event_type=query_info.get("event_type"),
detail_type=query_info.get("detail_type"),
semester=query_info.get("semester"),
top_k=self.config.retriever.top_k,
weight_semantic=weight_semantic
)
# Generate response with conversation context
response = self.response_generator.generate_response(
query=query,
documents=documents,
query_info=query_info,
conversation_history=self.conversation_history
).strip()
# If response indicates no relevant information, retry with adjusted approach
if "ขออภัย ไม่พบข้อมูลที่เกี่ยวข้อง" in response and attempt < max_attempts:
continue # Try again with new weight or default analysis
# Add the response to conversation history
self.add_to_conversation("assistant", response)
return {
"query": query,
"answer": response,
"relevant_docs": documents,
"query_info": query_info
}
except Exception as e:
logger.error(f"Error processing query: {str(e)}")
return {
"query": query,
"answer": "ขออภัย ไม่สามารถประมวลผลคำตอบได้ในขณะนี้",
"error": "Maximum retry attempts reached"
}
# def main():
# """Main function demonstrating hybrid retrieval"""
# try:
# # Load API key
# with open("key.txt", "r") as f:
# openai_api_key = f.read().strip()
# # Create config with hybrid retrieval settings
# config = create_default_config(openai_api_key)
# pipeline = AcademicCalendarRAG(config)
# # Load and process data
# with open("raw-data.json", "r", encoding="utf-8") as f:
# raw_data = json.load(f)
# pipeline.load_data(raw_data)
# # Test queries with different semantic weights
# queries = ["ค่าเทอมเท่าไหร่","เปิดเรียนวันไหน","ขั้นตอนการสมัครที่สาขานี้มีอะไรบ้าง","ต้องใช้ระดับภาษาอังกฤษเท่าไหร่ในการสมัครเรียนที่นี้","ถ้าจะไปติดต่อมาหลายต้องลง mrt อะไร","มีวิชาหลักเเละวิชาเลือกออะไรบ้าง", "ปีที่ 1 เทอม 1 ต้องเรียนอะไรบ้าง", "ปีที่ 2 เทอม 1 ต้องเรียนอะไรบ้าง"]
# # queries = ["ปีที่ 1 เทอม 1 ต้องเรียนอะไรบ้าง"]
# print("=" * 80)
# for query in queries:
# print(f"\nQuery: {query}")
# result = pipeline.process_query(query)
# print(f"Answer: {result['answer']}")
# print("-" * 40)
# except Exception as e:
# logger.error(f"Pipeline execution failed: {str(e)}")
# raise
# if __name__ == "__main__":
# main()