JirasakJo's picture
Update calendar_rag.py
d3040a9 verified
raw
history blame
59.5 kB
from haystack import *
from haystack.components.generators.openai import OpenAIGenerator
from haystack.components.builders import PromptBuilder
from haystack.components.embedders import SentenceTransformersDocumentEmbedder
from haystack.components.retrievers.in_memory import *
from haystack.document_stores.in_memory import *
from haystack.utils import Secret
from pathlib import Path
import hashlib
from datetime import *
from typing import *
from dataclasses import *
import json
import logging
import re
import pickle
# Setup logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class ValidationResult:
"""Stores the result of a validation check"""
is_valid: bool
errors: List[str]
warnings: List[str]
normalized_data: Dict[str, str]
@dataclass
class ApplicationInfo:
application_portal: str
program_email: str
@dataclass
class RequiredDocument:
name: str
description: str
conditions: Optional[str] = None
@dataclass
class SelectionStep:
step_number: str
description: str
@dataclass
class ProgramDetailInfo:
application_info: ApplicationInfo
required_documents: Dict[str, Dict[str, RequiredDocument]]
submission_process: str
selection_process: List[SelectionStep]
@dataclass
class Transportation:
boat: str
bts: str
mrt: str
airport_link: str
bus: Dict[str, str]
@dataclass
class Contact:
email: str
facebook: Dict[str, str]
@dataclass
class ContactDetail:
event_type: str
department: str
faculty: str
university: str
location: str
contact: Contact
transportation: Transportation
@dataclass
class Course:
code: str
title_th: str
title_en: str
credits: int
@dataclass
class CourseCategory:
description: Optional[str]
credits: Union[str, int]
minimum_credits: Optional[int]
courses: List[Course]
@dataclass
class CourseStructure:
event_type: str
program_name: str
department: str
total_credits: int
degree_level: str
structure: Dict[str, CourseCategory]
@dataclass
class StudyPlan:
event_type: str
years: Dict[str, Dict[str, Any]]
@dataclass
class RegularFee:
amount: float
currency: str
period: str
@dataclass
class LatePaymentFee:
amount: float
currency: str
@dataclass
class TuitionFee:
event_type: str
regular_fee: RegularFee
late_payment_fee: LatePaymentFee
class OpenAIDateParser:
"""Uses OpenAI to parse complex Thai date formats"""
def __init__(self, api_key: str, model: str = "gpt-4o"):
self.generator = OpenAIGenerator(
api_key=Secret.from_token(api_key),
model=model
)
self.prompt_builder = PromptBuilder(
template="""
Parse the following Thai date range into a structured format:
Date: {{date}}
Return in JSON format:
{
"start_date": "YYYY-MM-DD",
"end_date": "YYYY-MM-DD" (if range),
"is_range": true/false
}
Notes:
- Convert Buddhist Era (BE) to CE
- Handle abbreviated Thai months
- Account for date ranges with dashes
- Return null for end_date if it's a single date
Example inputs and outputs:
Input: "จ 8 ก.ค. – จ 19 ส.ค. 67"
Output: {"start_date": "2024-07-08", "end_date": "2024-08-19", "is_range": true}
Input: "15 มกราคม 2567"
Output: {"start_date": "2024-01-15", "end_date": null, "is_range": false}
"""
)
async def parse_date(self, date_str: str) -> Dict[str, Union[str, bool]]:
"""Parse complex Thai date format using OpenAI"""
try:
result = self.prompt_builder.run(date=date_str)
response = await self.generator.arun(prompt=result["prompt"])
if not response or not response.get("replies"):
raise ValueError("Empty response from OpenAI")
parsed = json.loads(response["replies"][0])
for date_field in ['start_date', 'end_date']:
if parsed.get(date_field):
datetime.strptime(parsed[date_field], '%Y-%m-%d')
return parsed
except Exception as e:
logger.error(f"OpenAI date parsing failed for '{date_str}': {str(e)}")
raise ValueError(f"Could not parse date: {date_str}")
class ThaiTextPreprocessor:
"""Handles Thai text preprocessing and normalization"""
CHAR_MAP = {'ํา': 'ำ','์': '','–': '-','—': '-','٫': ',',}
@classmethod
def normalize_thai_text(cls, text: str) -> str:
"""Normalize Thai text by applying character mappings and spacing rules"""
if not text:
return text
for old, new in cls.CHAR_MAP.items():
text = text.replace(old, new)
text = re.sub(r'\s+', ' ', text.strip())
thai_digits = '๐๑๒๓๔๕๖๗๘๙'
arabic_digits = '0123456789'
for thai, arabic in zip(thai_digits, arabic_digits):
text = text.replace(thai, arabic)
return text
class CalendarEventValidator:
"""Validates and preprocesses calendar events"""
def __init__(self, openai_api_key: str):
self.preprocessor = ThaiTextPreprocessor()
self.date_parser = OpenAIDateParser(api_key=openai_api_key)
async def validate_event(self, event: 'CalendarEvent') -> ValidationResult:
"""Validate a calendar event and return validation results"""
errors = []
warnings = []
normalized_data = {}
if event.date:
try:
parsed_date = await self.date_parser.parse_date(event.date)
normalized_data['date'] = parsed_date['start_date']
if parsed_date['is_range'] and parsed_date['end_date']:
range_note = f"ถึงวันที่ {parsed_date['end_date']}"
if event.note:
normalized_data['note'] = f"{event.note}; {range_note}"
else:
normalized_data['note'] = range_note
except ValueError as e:
errors.append(f"Invalid date format: {event.date}")
else:
errors.append("Date is required")
if event.time:
time_pattern = r'^([01]?[0-9]|2[0-3]):([0-5][0-9])$'
if not re.match(time_pattern, event.time):
errors.append(f"Invalid time format: {event.time}")
normalized_data['time'] = event.time
if event.activity:
normalized_activity = self.preprocessor.normalize_thai_text(event.activity)
if len(normalized_activity) < 3:
warnings.append("Activity description is very short")
normalized_data['activity'] = normalized_activity
else:
errors.append("Activity is required")
valid_semesters = {'ภาคต้น', 'ภาคปลาย', 'ภาคฤดูร้อน'}
if event.semester:
normalized_semester = self.preprocessor.normalize_thai_text(event.semester)
if normalized_semester not in valid_semesters:
warnings.append(f"Unusual semester value: {event.semester}")
normalized_data['semester'] = normalized_semester
else:
errors.append("Semester is required")
valid_types = {'registration', 'deadline', 'examination', 'academic', 'holiday'}
if event.event_type not in valid_types:
errors.append(f"Invalid event type: {event.event_type}")
normalized_data['event_type'] = event.event_type
if event.note and 'note' not in normalized_data:
normalized_data['note'] = self.preprocessor.normalize_thai_text(event.note)
if event.section:
normalized_data['section'] = self.preprocessor.normalize_thai_text(event.section)
return ValidationResult(
is_valid=len(errors) == 0,
errors=errors,
warnings=warnings,
normalized_data=normalized_data
)
@dataclass
class CalendarEvent:
"""Structured representation of a calendar event with validation"""
date: str
time: str
activity: str
note: str
semester: str
event_type: str
section: Optional[str] = None
@staticmethod
def classify_event_type(activity: str) -> str:
"""Classify event type based on activity description"""
activity_lower = activity.lower()
keywords = {
'registration': ['ลงทะเบียน', 'ชําระเงิน', 'ค่าธรรมเนียม', 'เปิดเรียน'],
'deadline': ['วันสุดท้าย', 'กําหนด', 'ภายใน', 'ต้องส่ง'],
'examination': ['สอบ', 'ปริญญานิพนธ์', 'วิทยานิพนธ์', 'สอบปากเปล่า'],
'holiday': ['วันหยุด', 'ชดเชย', 'เทศกาล'],
}
for event_type, terms in keywords.items():
if any(term in activity_lower for term in terms):
return event_type
return 'academic'
async def initialize(self, openai_api_key: str):
"""Asynchronously validate and normalize the event"""
validator = CalendarEventValidator(openai_api_key)
result = await validator.validate_event(self)
if not result.is_valid:
raise ValueError(f"Invalid calendar event: {', '.join(result.errors)}")
for field, value in result.normalized_data.items():
setattr(self, field, value)
if result.warnings:
logger.warning(f"Calendar event warnings: {', '.join(result.warnings)}")
def to_searchable_text(self) -> str:
"""Convert event to searchable text format"""
return f"""
ภาคการศึกษา: {self.semester}
ประเภท: {self.event_type}
วันที่: {self.date}
เวลา: {self.time or '-'}
กิจกรรม: {self.activity}
หมวดหมู่: {self.section or '-'}
หมายเหตุ: {self.note or '-'}
""".strip()
class CacheManager:
"""Manages caching for different components of the RAG pipeline"""
def __init__(self, cache_dir: Path, ttl: int = 3600):
"""
Initialize CacheManager
"""
self.cache_dir = cache_dir
self.ttl = ttl
self.embeddings_cache = self._load_cache("embeddings")
self.query_cache = self._load_cache("queries")
self.document_cache = self._load_cache("documents")
def _generate_key(self, data: Union[str, Dict, Any]) -> str:
"""Generate a unique cache key"""
if isinstance(data, str):
content = data.encode('utf-8')
else:
content = json.dumps(data, sort_keys=True).encode('utf-8')
return hashlib.md5(content).hexdigest()
def _load_cache(self, cache_type: str) -> Dict:
"""Load cache from disk"""
cache_path = self.cache_dir / f"{cache_type}_cache.pkl"
if cache_path.exists():
try:
with open(cache_path, 'rb') as f:
cache = pickle.load(f)
self._clean_expired_entries(cache)
return cache
except Exception as e:
logger.warning(f"Failed to load {cache_type} cache: {e}")
return {}
return {}
def _save_cache(self, cache_type: str, cache_data: Dict):
"""Save cache to disk"""
cache_path = self.cache_dir / f"{cache_type}_cache.pkl"
try:
with open(cache_path, 'wb') as f:
pickle.dump(cache_data, f)
except Exception as e:
logger.error(f"Failed to save {cache_type} cache: {e}")
def _clean_expired_entries(self, cache: Dict):
"""Remove expired cache entries"""
current_time = datetime.now()
expired_keys = [
key for key, (_, timestamp) in cache.items()
if current_time - timestamp > timedelta(seconds=self.ttl)
]
for key in expired_keys:
del cache[key]
def get_embedding_cache(self, text: str) -> Optional[Any]:
"""Get cached embedding for text"""
key = self._generate_key(text)
if key in self.embeddings_cache:
embedding, timestamp = self.embeddings_cache[key]
if datetime.now() - timestamp <= timedelta(seconds=self.ttl):
return embedding
return None
def set_embedding_cache(self, text: str, embedding: Any):
"""Cache embedding for text"""
key = self._generate_key(text)
self.embeddings_cache[key] = (embedding, datetime.now())
self._save_cache("embeddings", self.embeddings_cache)
def get_query_cache(self, query: str) -> Optional[Dict]:
"""Get cached query results"""
key = self._generate_key(query)
if key in self.query_cache:
result, timestamp = self.query_cache[key]
if datetime.now() - timestamp <= timedelta(seconds=self.ttl):
return result
return None
def set_query_cache(self, query: str, result: Dict):
"""Cache query results"""
key = self._generate_key(query)
self.query_cache[key] = (result, datetime.now())
self._save_cache("queries", self.query_cache)
def set_document_cache(self, doc_id: str, document: Any):
"""Cache document"""
self.document_cache[doc_id] = (document, datetime.now())
self._save_cache("documents", self.document_cache)
@dataclass
class ModelConfig:
openai_api_key: str
embedder_model: str = "sentence-transformers/paraphrase-multilingual-mpnet-base-v2"
openai_model: str = "gpt-4o"
temperature: float = 0.7
@dataclass
class RetrieverConfig:
top_k: int = 5
@dataclass
class CacheConfig:
enabled: bool = True
cache_dir: Path = Path("./cache")
ttl: int = 86400 # 24 hours
@dataclass
class ProcessingConfig:
batch_size: int = 32
@dataclass
class LocalizationConfig:
enable_thai_normalization: bool = True
@dataclass
class PipelineConfig:
model: ModelConfig
retriever: RetrieverConfig = field(default_factory=RetrieverConfig)
cache: CacheConfig = field(default_factory=CacheConfig)
processing: ProcessingConfig = field(default_factory=ProcessingConfig)
localization: LocalizationConfig = field(default_factory=LocalizationConfig)
def create_default_config(api_key: str) -> PipelineConfig:
"""
Create a default pipeline configuration with optimized settings for Thai language processing.
Args:
api_key (str): OpenAI API key
Returns:
PipelineConfig: Configured pipeline settings
"""
return PipelineConfig(
model=ModelConfig(
openai_api_key=api_key,
temperature=0.3 # Lower temperature for more focused responses
),
retriever=RetrieverConfig(
top_k=5 # Optimal number of documents to retrieve
),
cache=CacheConfig(
enabled=True,
cache_dir=Path("./cache"),
ttl=86400 # 24 hour cache
),
processing=ProcessingConfig(
batch_size=32 # Default batch size for processing
),
localization=LocalizationConfig(
enable_thai_normalization=True # Enable Thai text normalization
)
)
class CalendarDataProcessor:
"""Process and structure calendar data from the new raw-data.json format"""
@staticmethod
def parse_calendar_json(json_data: Dict) -> List[CalendarEvent]:
"""Parse the new calendar JSON format into CalendarEvent objects"""
events = []
# Extract academic calendar data - handle direct dictionary input
calendar_data = json_data.get('academic_calendar', []) if isinstance(json_data, dict) else json_data
for semester_block in calendar_data:
semester = semester_block.get('education', '')
schedule = semester_block.get('schedule', [])
# Handle regular schedule events
for event in schedule:
if 'section' in event and 'details' in event:
# Process section-based events (thesis deadlines, etc.)
section = event['section']
for detail in event['details']:
if 'ภาคต้น' in detail and 'ภาคปลาย' in detail:
# Handle dual-semester events
for sem_key in ['ภาคต้น', 'ภาคปลาย']:
if detail.get(sem_key):
events.append(CalendarEvent(
date=detail[sem_key],
time='',
activity=detail['title'],
note=section,
semester=sem_key,
event_type='deadline',
section=section
))
else:
# Single semester event
events.append(CalendarEvent(
date=detail.get('date', ''),
time='',
activity=detail.get('title', ''),
note=section,
semester=ThaiTextPreprocessor.normalize_thai_text(semester),
event_type='deadline',
section=section
))
else:
# Regular calendar event
event_type = CalendarEvent.classify_event_type(event.get('activity', ''))
# Clean semester string
cleaned_semester = semester
if '(' in semester:
match = re.search(r'\((.*?)\)', semester)
if match:
cleaned_semester = match.group(1)
cleaned_semester = ThaiTextPreprocessor.normalize_thai_text(cleaned_semester)
events.append(CalendarEvent(
date=event.get('date', ''),
time=event.get('time', ''),
activity=event.get('activity', ''),
note=event.get('note', ''),
semester=cleaned_semester,
event_type=event_type
))
return events
@staticmethod
def extract_program_details(json_data: Dict) -> ProgramDetailInfo:
"""Extract and structure program details into ProgramDetailInfo object"""
raw_details = json_data.get('program_details', {})
# Process application info
app_info_data = raw_details.get('application_info', {})
app_info = ApplicationInfo(
application_portal=app_info_data.get('application_portal', ''),
program_email=app_info_data.get('program_email', '')
)
# Process required documents
req_docs = {}
raw_docs = raw_details.get('required_documents', {})
# Process mandatory documents
mandatory_docs = {}
for doc_key, doc_value in raw_docs.get('mandatory', {}).items():
mandatory_docs[doc_key] = RequiredDocument(
name=doc_key,
description=doc_value
)
req_docs['mandatory'] = mandatory_docs
# Process optional documents
optional_docs = {}
for doc_key, doc_data in raw_docs.get('optional', {}).items():
if doc_key == 'english_proficiency':
ep_data = doc_data
optional_docs[doc_key] = RequiredDocument(
name=ep_data.get('name', ''),
description=str(ep_data.get('accepted_tests', {})),
conditions=f"Validity: {ep_data.get('validity', '')}, Benefits: {ep_data.get('benefits', '')}, Exemptions: {ep_data.get('exemptions', '')}"
)
else:
optional_docs[doc_key] = RequiredDocument(
name=doc_data.get('name', ''),
description='',
conditions=doc_data.get('condition', '')
)
req_docs['optional'] = optional_docs
# Process selection steps
selection_steps = []
for step_data in raw_details.get('selection_process', {}).get('steps', []):
for step_num, description in step_data.items():
selection_steps.append(SelectionStep(
step_number=step_num,
description=description
))
return [ProgramDetailInfo(
application_info=app_info,
required_documents=req_docs,
submission_process=raw_details.get('submission_process', ''),
selection_process=selection_steps
)]
@staticmethod
def extract_contact_details(json_data: Dict) -> List[ContactDetail]:
"""Extract and structure contact details into ContactDetail objects"""
raw_contacts = json_data.get('contact_details', [])
contact_details = []
# Handle the case where raw_contacts might be a single object instead of a list
if not isinstance(raw_contacts, list):
raw_contacts = [raw_contacts]
for contact_data in raw_contacts:
# Skip if contact_data is not a dictionary
if not isinstance(contact_data, dict):
continue
try:
# Process transportation data
transportation_data = contact_data.get('transportation', {})
transportation = Transportation(
boat=transportation_data.get('boat', ''),
bts=transportation_data.get('bts', ''),
mrt=transportation_data.get('mrt', ''),
airport_link=transportation_data.get('airport_link', ''),
bus=transportation_data.get('bus', {})
)
# Process contact information
contact_info = Contact(
email=contact_data.get('email', ''),
facebook=contact_data.get('facebook', {})
)
# Create ContactDetail object
contact_details.append(ContactDetail(
event_type=contact_data.get('event_type', ''),
department=contact_data.get('department', ''),
faculty=contact_data.get('faculty', ''),
university=contact_data.get('university', ''),
location=contact_data.get('location', ''),
contact=contact_info,
transportation=transportation
))
except Exception as e:
print(f"Error processing contact data: {e}")
continue
return contact_details
@staticmethod
def extract_course_structure(json_data: Dict) -> List[CourseStructure]:
"""Extract and structure course information into CourseStructure objects"""
course_structures = []
# Get course structure data
course_data = json_data.get('course_structure', {})
program_metadata = course_data.get('program_metadata', {})
curriculum = course_data.get('curriculum_structure', {})
# Process foundation courses
foundation_data = curriculum.get('foundation_courses', {})
foundation_courses = []
for course in foundation_data.get('courses', []):
foundation_courses.append(Course(
code=course.get('code', ''),
title_th=course.get('title', {}).get('th', ''),
title_en=course.get('title', {}).get('en', ''),
credits=course.get('credits', 0)
))
# Process core courses
core_data = curriculum.get('core_courses', {})
core_courses = []
for course in core_data.get('modules', []):
core_courses.append(Course(
code=course.get('code', ''),
title_th=course.get('title', {}).get('th', ''),
title_en=course.get('title', {}).get('en', ''),
credits=course.get('credits', 0)
))
# Process elective courses
elective_data = curriculum.get('electives', {})
elective_courses = []
for course in elective_data.get('course_groups', []):
elective_courses.append(Course(
code=course.get('code', ''),
title_th=course.get('title', {}).get('th', ''),
title_en=course.get('title', {}).get('en', ''),
credits=course.get('credits', 0)
))
# Process research courses
research_data = curriculum.get('research', {})
research_courses = []
for course in research_data.get('course', []):
research_courses.append(Course(
code=course.get('code', ''),
title_th=course.get('title', {}).get('th', ''),
title_en=course.get('title', {}).get('en', ''),
credits=course.get('credits', 0)
))
# Create course categories
structure = {
'หมวดวิชาปรับพื้นฐาน': CourseCategory( # Previously foundation_courses
description=foundation_data.get('metadata', {}).get('description'),
credits=foundation_data.get('metadata', {}).get('credits', 'non-credit'),
minimum_credits=None,
courses=foundation_courses
),
'หมวดวิชาบังคับ': CourseCategory( # Previously core_courses
description=None,
credits=0,
minimum_credits=core_data.get('minimum_requirement_credits'),
courses=core_courses
),
'หมวดวิชาเลือก': CourseCategory( # Previously elective_courses
description=None,
credits=0,
minimum_credits=elective_data.get('minimum_requirement_credits'),
courses=elective_courses
),
'หมวดวิชาการค้นคว้าอิสระ': CourseCategory( # Previously research_courses
description=None,
credits=0,
minimum_credits=research_data.get('minimum_requirement_credits'),
courses=research_courses
)
}
# Create course structure
course_structure = CourseStructure(
event_type='curriculum_structure',
program_name=program_metadata.get('name', ''),
department=program_metadata.get('department', ''),
total_credits=program_metadata.get('total_credits', 0),
degree_level=program_metadata.get('degree_level', ''),
structure=structure
)
return [course_structure]
@staticmethod
def extract_program_study_plan(json_data: Dict) -> List[StudyPlan]:
"""Extract and structure study plan information into StudyPlan objects"""
study_plan_data = json_data.get('program_study_plan', {})
# Initialize the years dictionary to store all year/semester data
years_dict = {}
for year_key, year_data in study_plan_data.items():
years_dict[year_key] = {}
for semester_key, semester_data in year_data.items():
# Get metadata
metadata = semester_data.get('metadata', {})
# Initialize semester structure
semester_struct = {
'metadata': metadata,
'courses': []
}
# Handle both 'modules' and 'courses' keys
course_data = semester_data.get('modules', []) or semester_data.get('courses', [])
# Add courses to semester
for course in course_data:
course_info = {
'code': course.get('code', ''),
'title': course.get('title', {'th': '', 'en': ''}),
'credits': course.get('credits', 0)
}
semester_struct['courses'].append(course_info)
# Add semester data to year
years_dict[year_key][semester_key] = semester_struct
# Create StudyPlan object
study_plan = StudyPlan(
event_type='study_plan',
years=years_dict
)
return [study_plan]
@staticmethod
def extract_fees(json_data: Dict) -> List[TuitionFee]:
"""Extract and structure fee information into TuitionFee objects"""
fees_data = json_data.get('fees', {})
# Parse regular tuition fee
regular_fee_str = fees_data.get('tuition', '')
regular_amount = float(regular_fee_str.split()[0]) if regular_fee_str else 0
regular_fee = RegularFee(
amount=regular_amount,
currency='THB',
period='per semester'
)
# Parse late payment fee
late_fee_str = fees_data.get('late_payment', '')
late_amount = float(late_fee_str.split()[0]) if late_fee_str else 0
late_payment_fee = LatePaymentFee(
amount=late_amount,
currency='THB'
)
# Create TuitionFee object
tuition_fee = TuitionFee(
event_type='tuition_fee',
regular_fee=regular_fee,
late_payment_fee=late_payment_fee
)
return [tuition_fee]
class HybridDocumentStore:
"""Enhanced document store with hybrid retrieval capabilities"""
def __init__(self, config: PipelineConfig):
self.store = InMemoryDocumentStore()
self.embedder = SentenceTransformersDocumentEmbedder(
model=config.model.embedder_model
)
# Initialize BM25 retriever
self.bm25_retriever = InMemoryBM25Retriever(
document_store=self.store,
top_k=config.retriever.top_k
)
# Initialize embedding retriever
self.embedding_retriever = InMemoryEmbeddingRetriever(
document_store=self.store,
top_k=config.retriever.top_k
)
self.cache_manager = CacheManager(
cache_dir=config.cache.cache_dir,
ttl=config.cache.ttl
)
self.embedder.warm_up()
# Initialize containers
self.events = []
self.event_type_index = {}
self.semester_index = {}
self._document_counter = 0
# Additional data containers
self.course_data = []
self.contact_data = []
self.study_plan_data = []
def _generate_unique_id(self) -> str:
"""Generate a unique document ID"""
self._document_counter += 1
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
return f"doc_{timestamp}_{self._document_counter}"
def _compute_embedding(self, text: str) -> Any:
"""Compute embedding with caching"""
cached_embedding = self.cache_manager.get_embedding_cache(text)
if cached_embedding is not None:
return cached_embedding
doc = Document(content=text)
embedding = self.embedder.run(documents=[doc])["documents"][0].embedding
self.cache_manager.set_embedding_cache(text, embedding)
return embedding
def add_document(self, text: str, event_type: str):
"""Add a single document to the store"""
try:
# Compute embedding
embedding = self._compute_embedding(text)
# Create document with unique ID
doc = Document(
id=self._generate_unique_id(),
content=text,
embedding=embedding,
meta={'event_type': event_type}
)
# Write document
self.store.write_documents([doc])
# Cache document
self.cache_manager.set_document_cache(doc.id, doc)
except Exception as e:
logger.error(f"Error adding document: {str(e)}")
raise
def add_events(self, events: List[CalendarEvent], contact_details: Optional[List[ContactDetail]] = None,
course_structure: Optional[List[CourseStructure]] = None,
study_plans: Optional[List[StudyPlan]] = None):
"""Add events and additional data with caching"""
documents = []
added_events = set() # Track added events to prevent duplicates
# Process calendar events
for event in events:
event_key = f"{event.date}_{event.activity}_{event.semester}"
if event_key not in added_events:
added_events.add(event_key)
self.events.append(event)
event_idx = len(self.events) - 1
# Update indices
if event.event_type not in self.event_type_index:
self.event_type_index[event.event_type] = []
self.event_type_index[event.event_type].append(event_idx)
if event.semester not in self.semester_index:
self.semester_index[event.semester] = []
self.semester_index[event.semester].append(event_idx)
# Create document
text = event.to_searchable_text()
embedding = self._compute_embedding(text)
doc = Document(
id=self._generate_unique_id(),
content=text,
embedding=embedding,
meta={
'event_type': event.event_type,
'semester': event.semester,
'date': event.date,
'event_idx': event_idx
}
)
documents.append(doc)
self.cache_manager.set_document_cache(str(event_idx), doc)
# Process contact details
if contact_details:
for contact in contact_details:
self.contact_data.append(contact)
text = f"""
ข้อมูลการติดต่อ:
คณะ: {contact.faculty}
ภาควิชา: {contact.department}
มหาวิทยาลัย: {contact.university}
สถานที่: {contact.location}
การติดต่อ:
อีเมล: {contact.contact.email}
Facebook: {json.dumps(contact.contact.facebook, ensure_ascii=False)}
การเดินทาง:
เรือ: {contact.transportation.boat}
BTS: {contact.transportation.bts}
MRT: {contact.transportation.mrt}
Airport Link: {contact.transportation.airport_link}
รถประจำทาง: {json.dumps(contact.transportation.bus, ensure_ascii=False)}
"""
embedding = self._compute_embedding(text)
doc = Document(
id=self._generate_unique_id(),
content=text,
embedding=embedding,
meta={'event_type': 'contact'}
)
documents.append(doc)
# Process course structure
if course_structure:
for course in course_structure:
self.course_data.append(course)
text = f"""
โครงสร้างหลักสูตร:
ชื่อหลักสูตร: {course.program_name}
ภาควิชา: {course.department}
หน่วยกิตรวม: {course.total_credits}
ระดับการศึกษา: {course.degree_level}
รายละเอียดโครงสร้าง:
"""
for category_name, category in course.structure.items():
text += f"\n{category_name}:\n"
if category.description:
text += f"คำอธิบาย: {category.description}\n"
text += f"หน่วยกิต: {category.credits}\n"
if category.minimum_credits:
text += f"หน่วยกิตขั้นต่ำ: {category.minimum_credits}\n"
text += "รายวิชา:\n"
for course_item in category.courses:
text += f"- {course_item.code}: {course_item.title_th} ({course_item.title_en}) - {course_item.credits} หน่วยกิต\n"
embedding = self._compute_embedding(text)
doc = Document(
id=self._generate_unique_id(),
content=text,
embedding=embedding,
meta={'event_type': 'curriculum'}
)
documents.append(doc)
# Process study plans
if study_plans:
for plan in study_plans:
self.study_plan_data.append(plan)
text = "แผนการศึกษา:\n"
for year, semesters in plan.years.items():
text += f"\nปีที่ {year}:\n"
for semester, data in semesters.items():
text += f"\n{semester}:\n"
if 'metadata' in data and data['metadata']:
text += f"ข้อมูลเพิ่มเติม: {json.dumps(data['metadata'], ensure_ascii=False)}\n"
if 'courses' in data:
for course in data['courses']:
text += f"- {course['code']}: {course['title'].get('th', '')} ({course['title'].get('en', '')}) - {course['credits']} หน่วยกิต\n"
embedding = self._compute_embedding(text)
doc = Document(
id=self._generate_unique_id(),
content=text,
embedding=embedding,
meta={'event_type': 'study_plan'}
)
documents.append(doc)
batch_size = 10
for i in range(0, len(documents), batch_size):
batch = documents[i:i + batch_size]
try:
self.store.write_documents(batch)
except Exception as e:
logger.error(f"Error writing document batch {i//batch_size + 1}: {str(e)}")
for doc in batch:
try:
self.store.write_documents([doc])
except Exception as e2:
logger.error(f"Failed to write document {doc.id}: {str(e2)}")
def hybrid_search(self,
query: str,
event_type: Optional[str] = None,
semester: Optional[str] = None,
top_k: int = 10,
weight_semantic: float = 0.5) -> List[Document]:
"""Hybrid search combining semantic and lexical search results"""
cache_key = json.dumps({
'query': query,
'event_type': event_type,
'semester': semester,
'top_k': top_k,
'weight_semantic': weight_semantic
})
cached_results = self.cache_manager.get_query_cache(cache_key)
if cached_results is not None:
return cached_results
# Get semantic search results
query_embedding = self._compute_embedding(query)
semantic_results = self.embedding_retriever.run(
query_embedding=query_embedding
)["documents"]
# Get BM25 results
bm25_results = self.bm25_retriever.run(
query=query
)["documents"]
# Combine results using score fusion
combined_results = self._merge_results(
semantic_results=semantic_results,
bm25_results=bm25_results,
weight_semantic=weight_semantic,
top_k=top_k
)
# Filter results based on metadata
filtered_results = []
for doc in combined_results:
if event_type and doc.meta.get('event_type') != event_type:
continue
if semester and doc.meta.get('semester') != semester:
continue
filtered_results.append(doc)
final_results = filtered_results[:top_k]
self.cache_manager.set_query_cache(cache_key, final_results)
return final_results
def _merge_results(self,
semantic_results: List[Document],
bm25_results: List[Document],
weight_semantic: float,
top_k: int) -> List[Document]:
"""Merge semantic and BM25 results using weighted score fusion"""
# Create dictionaries to store normalized scores
semantic_scores = {}
bm25_scores = {}
# Normalize semantic scores
max_semantic_score = max(doc.score for doc in semantic_results) if semantic_results else 1.0
for doc in semantic_results:
semantic_scores[doc.id] = doc.score / max_semantic_score if max_semantic_score > 0 else 0
# Normalize BM25 scores
max_bm25_score = max(doc.score for doc in bm25_results) if bm25_results else 1.0
for doc in bm25_results:
bm25_scores[doc.id] = doc.score / max_bm25_score if max_bm25_score > 0 else 0
# Combine scores
combined_scores = {}
all_docs = {doc.id: doc for doc in semantic_results + bm25_results}
for doc_id in all_docs:
semantic_score = semantic_scores.get(doc_id, 0)
bm25_score = bm25_scores.get(doc_id, 0)
# Weighted combination
combined_scores[doc_id] = (
weight_semantic * semantic_score +
(1 - weight_semantic) * bm25_score
)
# Sort by combined score and return top_k results
sorted_docs = sorted(
all_docs.values(),
key=lambda x: combined_scores[x.id],
reverse=True
)
return sorted_docs[:top_k]
class AdvancedQueryProcessor:
"""Process queries with better understanding"""
def __init__(self, config: PipelineConfig):
self.generator = OpenAIGenerator(
api_key=Secret.from_token(config.model.openai_api_key),
model=config.model.openai_model
)
self.prompt_builder = PromptBuilder(
template="""
วิเคราะห์คำถามที่เกี่ยวข้องกับปฏิทินการศึกษา (ภาษาไทย):
คำถาม: {{query}}
ระบุ:
1. ประเภทของข้อมูลที่ต้องการค้นหา
2. ภาคการศึกษาที่ระบุไว้ (ถ้ามี)
3. คำสำคัญที่เกี่ยวข้อง
ให้ผลลัพธ์ในรูปแบบ JSON:
{
"event_type": "ลงทะเบียน|กำหนดเวลา|การสอบ|วิชาการ|วันหยุด",
"semester": "ภาคการศึกษาที่ระบุ หรือ null",
"key_terms": ["คำสำคัญ 3 คำที่สำคัญที่สุด"],
"response_format": "รายการ|คำตอบเดียว|คำตอบละเอียด"
}
"""
)
def _get_default_analysis(self, query: str) -> Dict[str, Any]:
"""Return default analysis when processing fails"""
logger.info("Returning default analysis")
return {
"original_query": query,
"event_type": None,
"semester": None,
"key_terms": [],
"response_format": "detailed"
}
def process_query(self, query: str) -> Dict[str, Any]:
"""Enhanced query processing with better error handling."""
try:
result = self.prompt_builder.run(query=query)
response = self.generator.run(prompt=result["prompt"])
if not response or not response.get("replies") or not response["replies"][0]:
logger.warning("Received empty response from OpenAI")
return self._get_default_analysis(query)
try:
analysis = json.loads(response["replies"][0])
except json.JSONDecodeError as je:
return self._get_default_analysis(query)
# **Ensure course-related queries retrieve study plans & curricula**
course_keywords = ['หน่วยกิต', 'วิชา', 'หลักสูตร', 'แผนการเรียน', 'วิชาเลือก', 'วิชาบังคับ', 'วิชาการค้นคว้า', 'วิชาหลัก']
if any(keyword in query for keyword in course_keywords):
analysis['event_type'] = 'curriculum'
# **Ensure fee-related queries retrieve tuition fee documents**
fee_keywords = ['ค่าเทอม', 'ค่าธรรมเนียม', 'ค่าเรียน', 'ค่าปรับ']
if any(keyword in query for keyword in fee_keywords):
analysis['event_type'] = 'fees'
return {
"original_query": query,
**analysis
}
except Exception as e:
logger.error(f"Query processing failed: {str(e)}")
return self._get_default_analysis(query)
class ResponseGenerator:
"""Generate responses with better context utilization"""
def __init__(self, config: PipelineConfig):
self.generator = OpenAIGenerator(
api_key=Secret.from_token(config.model.openai_api_key),
model=config.model.openai_model
)
self.prompt_builder = PromptBuilder(
template="""
คุณเป็นที่ปรึกษาทางวิชาการ กรุณาตอบคำถามต่อไปนี้โดยใช้ข้อมูลจากปฏิทินการศึกษาที่ให้มา
คำถาม: {{query}}
ข้อมูลที่เกี่ยวข้องจากปฏิทินการศึกษา:
{% for doc in context %}
---
{{doc.content}}
{% endfor %}
**ห้ามเดาข้อมูลเอง ถ้าไม่มีข้อมูลให้ตอบว่า "ไม่มีข้อมูลที่ตรงกับคำถาม"**
กรุณาตอบเป็นภาษาไทย:
ต้องบอกเสมอว่า **หากมีข้อสงสัยเพิ่มเติมสามารถสอบถามได้**
"""
)
def generate_response(self,
query: str,
documents: List[Document],
query_info: Dict[str, Any]) -> str:
"""Generate response using retrieved documents"""
try:
result = self.prompt_builder.run(
query=query,
context=documents,
format=query_info["response_format"]
)
response = self.generator.run(prompt=result["prompt"])
return response["replies"][0]
except Exception as e:
logger.error(f"Response generation failed: {str(e)}")
return "ขออภัย ไม่สามารถประมวลผลคำตอบได้ในขณะนี้"
class AcademicCalendarRAG:
"""Enhanced RAG system for academic calendar and program information"""
def __init__(self, config: PipelineConfig):
self.config = config
self.document_store = HybridDocumentStore(config) # Use the new hybrid store
self.query_processor = AdvancedQueryProcessor(config)
self.response_generator = ResponseGenerator(config)
self.data_processor = CalendarDataProcessor()
# Initialize data containers
self.calendar_events = []
self.program_details = []
self.contact_details = []
self.course_structure = []
self.study_plans = []
self.tuition_fees = []
def load_data(self, json_data: Dict):
"""Load and process all data sources"""
try:
raw_events = self.data_processor.parse_calendar_json(json_data)
for event in raw_events:
if not event.event_type:
event.event_type = CalendarEvent.classify_event_type(event.activity)
self.calendar_events.append(event)
# Process other data types
self.program_details = self.data_processor.extract_program_details(json_data)
self.contact_details = self.data_processor.extract_contact_details(json_data)
self.course_structure = self.data_processor.extract_course_structure(json_data)
self.study_plans = self.data_processor.extract_program_study_plan(json_data)
self.tuition_fees = self.data_processor.extract_fees(json_data)
self._add_calendar_events()
self._add_program_info()
except Exception as e:
logger.error(f"Error loading data: {str(e)}")
raise
def _add_calendar_events(self):
"""Add calendar events and other data to document store"""
if self.calendar_events:
self.document_store.add_events(
events=self.calendar_events,
contact_details=self.contact_details,
course_structure=self.course_structure,
study_plans=self.study_plans
)
def _add_program_info(self):
"""Enhanced method to add program-related information to document store"""
if self.program_details:
for detail in self.program_details:
text = f"""
ข้อมูลการสมัคร:
เว็บไซต์รับสมัคร: {detail.application_info.application_portal}
อีเมล: {detail.application_info.program_email}
เอกสารที่ต้องใช้:
{self._format_required_docs(detail.required_documents)}
ขั้นตอนการส่งเอกสาร:
{detail.submission_process}
ขั้นตอนการคัดเลือก:
{self._format_selection_steps(detail.selection_process)}
"""
self.document_store.add_document(text, "program_details")
if self.tuition_fees:
for fee in self.tuition_fees:
text = f"""
ค่าธรรมเนียมการศึกษา:
ค่าเล่าเรียนปกติ: {fee.regular_fee.amount:,.2f} {fee.regular_fee.currency} {fee.regular_fee.period}
ค่าปรับชำระล่าช้า: {fee.late_payment_fee.amount:,.2f} {fee.late_payment_fee.currency}
"""
self.document_store.add_document(text, "fees")
def _format_required_docs(self, docs: Dict) -> str:
"""Format required documents information with detailed English proficiency requirements"""
result = []
if 'mandatory' in docs:
result.append("เอกสารที่ต้องใช้:")
for doc in docs['mandatory'].values():
result.append(f"- {doc.name}: {doc.description}")
if 'optional' in docs:
result.append("\nเอกสารเพิ่มเติม:")
for doc_key, doc in docs['optional'].items():
if doc_key == 'english_proficiency':
result.append(f"- {doc.name}")
# Parse and format the accepted tests
try:
accepted_tests = eval(doc.description)
result.append(" เกณฑ์คะแนนที่ยอมรับ:")
for test, requirement in accepted_tests.items():
result.append(f" * {test}: {requirement}")
except:
result.append(f" {doc.description}")
if doc.conditions:
conditions = doc.conditions.split(', ')
for condition in conditions:
result.append(f" {condition}")
else:
desc = f"- {doc.name}"
if doc.conditions:
desc += f" ({doc.conditions})"
result.append(desc)
return "\n".join(result)
def _format_selection_steps(self, steps: List[SelectionStep]) -> str:
"""Format selection process steps"""
return "\n".join(f"{step.step_number}. {step.description}" for step in steps)
def _get_fee_documents(self) -> List[Document]:
"""Get fee-related documents"""
if not self.tuition_fees:
return []
documents = []
for fee in self.tuition_fees:
text = f"""
ค่าธรรมเนียมการศึกษา:
- ค่าเล่าเรียน: {fee.regular_fee.amount:,.2f} {fee.regular_fee.currency} {fee.regular_fee.period}
- ค่าปรับชำระล่าช้า: {fee.late_payment_fee.amount:,.2f} {fee.late_payment_fee.currency}
"""
doc = Document(
content=text,
meta={"event_type": "fees"}
)
documents.append(doc)
return documents
def process_query(self, query: str, weight_semantic: float = 0.5) -> Dict[str, Any]:
"""Process user query using hybrid retrieval"""
try:
# Analyze query
query_info = self.query_processor.process_query(query)
# Get relevant documents using hybrid search
documents = self.document_store.hybrid_search(
query=query,
event_type=query_info.get("event_type"),
semester=query_info.get("semester"),
top_k=self.config.retriever.top_k,
weight_semantic= 0.3
)
# Add fee information for fee-related queries
if query_info.get("event_type") == "fees" and self.tuition_fees:
fee_docs = self._get_fee_documents()
documents.extend(fee_docs)
# Generate response
response = self.response_generator.generate_response(
query=query,
documents=documents,
query_info=query_info
)
return {
"query": query,
"answer": response,
"relevant_docs": documents,
"query_info": query_info
}
except Exception as e:
logger.error(f"Error processing query: {str(e)}")
return {
"query": query,
"answer": "ขออภัย ไม่สามารถประมวลผลคำตอบได้ในขณะนี้",
"error": str(e)
}
# def main():
# """Main function demonstrating hybrid retrieval"""
# try:
# # Load API key
# with open("key.txt", "r") as f:
# openai_api_key = f.read().strip()
# # Create config with hybrid retrieval settings
# config = create_default_config(openai_api_key)
# pipeline = AcademicCalendarRAG(config)
# # Load and process data
# with open("raw-data.json", "r", encoding="utf-8") as f:
# raw_data = json.load(f)
# pipeline.load_data(raw_data)
# # Test queries with different semantic weights
# queries = ["มีวิชาหลักหรือวิชาเลือกอะไรบ้าง"]
# print("=" * 80)
# for query in queries:
# print(f"\nQuery: {query}")
# result = pipeline.process_query(query)
# print(f"Answer: {result['answer']}")
# print("-" * 40)
# except Exception as e:
# logger.error(f"Pipeline execution failed: {str(e)}")
# raise
# if __name__ == "__main__":
# main()