File size: 11,796 Bytes
bc852b0 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 |
from haystack import Pipeline, Document
from haystack.components.generators.openai import OpenAIGenerator
from haystack.components.builders import PromptBuilder
from haystack.components.embedders import SentenceTransformersDocumentEmbedder
from haystack.components.retrievers.in_memory import InMemoryEmbeddingRetriever
from haystack.document_stores.in_memory import InMemoryDocumentStore
from haystack.utils import Secret
from pathlib import Path
import logging
from dataclasses import dataclass, field
from typing import List, Dict, Any, Optional
import json
import asyncio
from datetime import datetime
import re
# Setup logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class LocalizationConfig:
"""Configuration for Thai language handling"""
thai_tokenizer_model: str = "thai-tokenizer"
enable_thai_normalization: bool = True
remove_thai_tones: bool = False
keep_english: bool = True
custom_stopwords: List[str] = field(default_factory=list)
custom_synonyms: Dict[str, List[str]] = field(default_factory=dict)
@dataclass
class RetrieverConfig:
"""Configuration for document retrieval"""
top_k: int = 5
similarity_threshold: float = 0.7
filter_duplicates: bool = True
@dataclass
class ModelConfig:
"""Configuration for language models"""
openai_api_key: str
temperature: float = 0.3
max_tokens: int = 2000
model: str = "gpt-4"
embedder_model: str = "sentence-transformers/paraphrase-multilingual-mpnet-base-v2"
@dataclass
class PipelineConfig:
"""Main configuration for the RAG pipeline"""
model: ModelConfig
retriever: RetrieverConfig = field(default_factory=RetrieverConfig)
localization: LocalizationConfig = field(default_factory=LocalizationConfig)
def __post_init__(self):
if not self.model.openai_api_key:
raise ValueError("OpenAI API key is required")
class ThaiTextPreprocessor:
"""Thai text preprocessing utilities"""
@staticmethod
def normalize_thai_text(text: str) -> str:
"""Normalize Thai text"""
if not text:
return text
# Normalize whitespace
text = re.sub(r'\s+', ' ', text.strip())
# Normalize Thai numerals
thai_digits = '๐๑๒๓๔๕๖๗๘๙'
arabic_digits = '0123456789'
for thai, arabic in zip(thai_digits, arabic_digits):
text = text.replace(thai, arabic)
return text
class CalendarEvent:
"""Represents an academic calendar event"""
def __init__(self,
date: str,
activity: str,
semester: str,
event_type: str = "academic",
note: str = "",
time: str = "",
section: Optional[str] = None):
self.date = date
self.activity = activity
self.semester = semester
self.event_type = event_type
self.note = note
self.time = time
self.section = section
def to_searchable_text(self) -> str:
"""Convert event to searchable text format"""
return f"""
ภาคการศึกษา: {self.semester}
ประเภท: {self.event_type}
วันที่: {self.date}
เวลา: {self.time or '-'}
กิจกรรม: {self.activity}
หมวดหมู่: {self.section or '-'}
หมายเหตุ: {self.note or '-'}
""".strip()
@staticmethod
def from_dict(data: Dict[str, Any]) -> 'CalendarEvent':
"""Create event from dictionary"""
return CalendarEvent(
date=data.get('date', ''),
activity=data.get('activity', ''),
semester=data.get('semester', ''),
event_type=data.get('event_type', 'academic'),
note=data.get('note', ''),
time=data.get('time', ''),
section=data.get('section')
)
class CalendarRAG:
"""Main RAG pipeline for academic calendar"""
def __init__(self, config: PipelineConfig):
"""Initialize the pipeline with configuration"""
self.config = config
self.document_store = InMemoryDocumentStore()
self.embedder = SentenceTransformersDocumentEmbedder(
model=config.model.embedder_model
)
self.text_preprocessor = ThaiTextPreprocessor()
# Initialize OpenAI components
self.generator = OpenAIGenerator(
api_key=Secret.from_token(config.model.openai_api_key),
model=config.model.model,
temperature=config.model.temperature
)
self.query_analyzer = PromptBuilder(
template="""
วิเคราะห์คำถามเกี่ยวกับปฏิทินการศึกษานี้:
คำถาม: {{query}}
กรุณาระบุ:
1. ประเภทของข้อมูลที่ต้องการ
2. ภาคการศึกษาที่เกี่ยวข้อง
3. คำสำคัญที่ต้องค้นหา
ตอบในรูปแบบ JSON:
{
"event_type": "registration|deadline|examination|academic|holiday",
"semester": "ภาคการศึกษาที่ระบุ หรือ null",
"key_terms": ["คำสำคัญไม่เกิน 3 คำ"]
}
"""
)
self.answer_generator = PromptBuilder(
template="""
คุณเป็นผู้ช่วยให้ข้อมูลปฏิทินการศึกษา กรุณาตอบคำถามต่อไปนี้โดยใช้ข้อมูลที่ให้มา:
คำถาม: {{query}}
ข้อมูลที่เกี่ยวข้อง:
{% for doc in documents %}
---
{{doc.content}}
{% endfor %}
คำแนะนำ:
1. ตอบเป็นภาษาไทย
2. ระบุวันที่และข้อกำหนดให้ชัดเจน
3. รวมหมายเหตุหรือเงื่อนไขที่สำคัญ
"""
)
def load_data(self, calendar_data: List[Dict[str, Any]]) -> None:
"""Load calendar data into the system"""
documents = []
for entry in calendar_data:
# Create calendar event
event = CalendarEvent.from_dict(entry)
# Create searchable document
doc = Document(
content=event.to_searchable_text(),
meta={
"event_type": event.event_type,
"semester": event.semester,
"date": event.date
}
)
documents.append(doc)
# Compute embeddings
embedded_docs = self.embedder.run(documents=documents)["documents"]
# Store documents
self.document_store.write_documents(embedded_docs)
def process_query(self, query: str) -> Dict[str, Any]:
"""Process a calendar query and return results"""
try:
# Analyze query
query_info = self._analyze_query(query)
# Retrieve relevant documents
documents = self._retrieve_documents(
query,
event_type=query_info.get("event_type"),
semester=query_info.get("semester")
)
# Generate answer
answer = self._generate_answer(query, documents)
return {
"answer": answer,
"documents": documents,
"query_info": query_info
}
except Exception as e:
logger.error(f"Query processing failed: {str(e)}")
return {
"answer": "ขออภัย ไม่สามารถประมวลผลคำถามได้ในขณะนี้",
"documents": [],
"query_info": {}
}
def _analyze_query(self, query: str) -> Dict[str, Any]:
"""Analyze and extract information from query"""
try:
# Normalize query
normalized_query = self.text_preprocessor.normalize_thai_text(query)
# Get analysis from OpenAI
prompt_result = self.query_analyzer.run(query=normalized_query)
response = self.generator.run(prompt=prompt_result["prompt"])
if not response or not response.get("replies"):
raise ValueError("Empty response from query analyzer")
analysis = json.loads(response["replies"][0])
analysis["original_query"] = query
return analysis
except Exception as e:
logger.error(f"Query analysis failed: {str(e)}")
return {
"original_query": query,
"event_type": None,
"semester": None,
"key_terms": []
}
def _retrieve_documents(self,
query: str,
event_type: Optional[str] = None,
semester: Optional[str] = None) -> List[Document]:
"""Retrieve relevant documents"""
# Create retriever
retriever = InMemoryEmbeddingRetriever(
document_store=self.document_store,
top_k=self.config.retriever.top_k
)
# Get query embedding
query_doc = Document(content=query)
embedded_query = self.embedder.run(documents=[query_doc])["documents"][0]
# Retrieve documents
results = retriever.run(query_embedding=embedded_query.embedding)["documents"]
# Filter results if needed
filtered_results = []
for doc in results:
if event_type and doc.meta['event_type'] != event_type:
continue
if semester and doc.meta['semester'] != semester:
continue
filtered_results.append(doc)
return filtered_results[:self.config.retriever.top_k]
def _generate_answer(self, query: str, documents: List[Document]) -> str:
"""Generate answer from retrieved documents"""
try:
prompt_result = self.answer_generator.run(
query=query,
documents=documents
)
response = self.generator.run(prompt=prompt_result["prompt"])
if not response or not response.get("replies"):
raise ValueError("Empty response from answer generator")
return response["replies"][0]
except Exception as e:
logger.error(f"Answer generation failed: {str(e)}")
return "ขออภัย ไม่สามารถสร้างคำตอบได้ในขณะนี้"
def create_default_config(api_key: str) -> PipelineConfig:
"""Create default pipeline configuration"""
model_config = ModelConfig(openai_api_key=api_key)
return PipelineConfig(
model=model_config,
retriever=RetrieverConfig(),
localization=LocalizationConfig()
) |