import gradio as gr import os from typing import List, Dict, Any, Optional import hashlib from datetime import datetime import numpy as np # PDF 처리 라이브러리 try: import fitz # PyMuPDF PDF_AVAILABLE = True except ImportError: PDF_AVAILABLE = False print("⚠️ PyMuPDF not installed. Install with: pip install pymupdf") try: from sentence_transformers import SentenceTransformer ST_AVAILABLE = True except ImportError: ST_AVAILABLE = False print("⚠️ Sentence Transformers not installed. Install with: pip install sentence-transformers") # Custom CSS for gradient background and styling custom_css = """ .gradio-container { background: linear-gradient(135deg, #667eea 0%, #764ba2 25%, #f093fb 50%, #4facfe 75%, #00f2fe 100%); background-size: 400% 400%; animation: gradient-animation 15s ease infinite; min-height: 100vh; } @keyframes gradient-animation { 0% { background-position: 0% 50%; } 50% { background-position: 100% 50%; } 100% { background-position: 0% 50%; } } .dark .gradio-container { background: linear-gradient(135deg, #1a1a2e 0%, #16213e 25%, #0f3460 50%, #533483 75%, #e94560 100%); background-size: 400% 400%; animation: gradient-animation 15s ease infinite; } .main-container { background-color: rgba(255, 255, 255, 0.95); backdrop-filter: blur(10px); border-radius: 20px; padding: 20px; box-shadow: 0 8px 32px 0 rgba(31, 38, 135, 0.37); border: 1px solid rgba(255, 255, 255, 0.18); margin: 10px; } .dark .main-container { background-color: rgba(30, 30, 30, 0.95); border: 1px solid rgba(255, 255, 255, 0.1); } .pdf-status { padding: 10px; border-radius: 10px; margin: 10px 0; font-size: 0.9em; } .pdf-success { background-color: rgba(52, 211, 153, 0.2); border: 1px solid rgba(52, 211, 153, 0.5); color: #10b981; } .pdf-error { background-color: rgba(248, 113, 113, 0.2); border: 1px solid rgba(248, 113, 113, 0.5); color: #ef4444; } .pdf-info { background-color: rgba(59, 130, 246, 0.2); border: 1px solid rgba(59, 130, 246, 0.5); color: #3b82f6; } .rag-context { background-color: rgba(251, 191, 36, 0.1); border-left: 4px solid #f59e0b; padding: 10px; margin: 10px 0; border-radius: 5px; } """ class SimpleTextSplitter: """텍스트 분할기""" def __init__(self, chunk_size=800, chunk_overlap=100): self.chunk_size = chunk_size self.chunk_overlap = chunk_overlap def split_text(self, text: str) -> List[str]: """텍스트를 청크로 분할""" chunks = [] sentences = text.split('. ') current_chunk = "" for sentence in sentences: if len(current_chunk) + len(sentence) < self.chunk_size: current_chunk += sentence + ". " else: if current_chunk: chunks.append(current_chunk.strip()) current_chunk = sentence + ". " if current_chunk: chunks.append(current_chunk.strip()) return chunks class PDFRAGSystem: """PDF 기반 RAG 시스템""" def __init__(self): self.documents = {} self.document_chunks = {} self.embeddings_store = {} self.text_splitter = SimpleTextSplitter(chunk_size=800, chunk_overlap=100) # 임베딩 모델 초기화 self.embedder = None if ST_AVAILABLE: try: self.embedder = SentenceTransformer('sentence-transformers/all-MiniLM-L6-v2') print("✅ 임베딩 모델 로드 성공") except Exception as e: print(f"⚠️ 임베딩 모델 로드 실패: {e}") def extract_text_from_pdf(self, pdf_path: str) -> Dict[str, Any]: """PDF에서 텍스트 추출""" if not PDF_AVAILABLE: return { "metadata": { "title": "PDF Reader Not Available", "file_name": os.path.basename(pdf_path), "pages": 0 }, "full_text": "PDF 처리를 위해 'pip install pymupdf'를 실행해주세요." } try: doc = fitz.open(pdf_path) text_content = [] metadata = { "title": doc.metadata.get("title", os.path.basename(pdf_path)), "pages": len(doc), "file_name": os.path.basename(pdf_path) } for page_num, page in enumerate(doc): text = page.get_text() if text.strip(): text_content.append(text) doc.close() return { "metadata": metadata, "full_text": "\n\n".join(text_content) } except Exception as e: raise Exception(f"PDF 처리 오류: {str(e)}") def process_and_store_pdf(self, pdf_path: str, doc_id: str) -> Dict[str, Any]: """PDF 처리 및 저장""" try: # PDF 텍스트 추출 pdf_data = self.extract_text_from_pdf(pdf_path) # 텍스트를 청크로 분할 chunks = self.text_splitter.split_text(pdf_data["full_text"]) # 청크 저장 self.document_chunks[doc_id] = chunks # 임베딩 생성 if self.embedder: embeddings = self.embedder.encode(chunks) self.embeddings_store[doc_id] = embeddings # 문서 정보 저장 self.documents[doc_id] = { "metadata": pdf_data["metadata"], "chunk_count": len(chunks), "upload_time": datetime.now().isoformat() } return { "success": True, "doc_id": doc_id, "chunks": len(chunks), "pages": pdf_data["metadata"]["pages"], "title": pdf_data["metadata"]["title"] } except Exception as e: return {"success": False, "error": str(e)} def search_relevant_chunks(self, query: str, doc_ids: List[str], top_k: int = 3) -> List[Dict]: """관련 청크 검색""" all_relevant_chunks = [] if self.embedder and self.embeddings_store: # 임베딩 기반 검색 query_embedding = self.embedder.encode([query])[0] for doc_id in doc_ids: if doc_id in self.embeddings_store and doc_id in self.document_chunks: doc_embeddings = self.embeddings_store[doc_id] chunks = self.document_chunks[doc_id] # 코사인 유사도 계산 similarities = [] for emb in doc_embeddings: sim = np.dot(query_embedding, emb) / (np.linalg.norm(query_embedding) * np.linalg.norm(emb)) similarities.append(sim) # 상위 청크 선택 top_indices = np.argsort(similarities)[-top_k:][::-1] for idx in top_indices: if similarities[idx] > 0.2: all_relevant_chunks.append({ "content": chunks[idx], "doc_name": self.documents[doc_id]["metadata"]["file_name"], "similarity": similarities[idx] }) else: # 키워드 기반 검색 query_keywords = set(query.lower().split()) for doc_id in doc_ids: if doc_id in self.document_chunks: chunks = self.document_chunks[doc_id] for chunk in chunks[:top_k]: # 처음 몇 개만 사용 chunk_lower = chunk.lower() score = sum(1 for keyword in query_keywords if keyword in chunk_lower) if score > 0: all_relevant_chunks.append({ "content": chunk[:500], # 길이 제한 "doc_name": self.documents[doc_id]["metadata"]["file_name"], "similarity": score / len(query_keywords) if query_keywords else 0 }) # 정렬 및 반환 all_relevant_chunks.sort(key=lambda x: x.get('similarity', 0), reverse=True) return all_relevant_chunks[:top_k] def create_rag_prompt(self, query: str, doc_ids: List[str], top_k: int = 3) -> str: """RAG 프롬프트 생성""" relevant_chunks = self.search_relevant_chunks(query, doc_ids, top_k) if not relevant_chunks: return query # 프롬프트 구성 prompt_parts = [] prompt_parts.append("다음 문서 내용을 참고하여 질문에 답변해주세요:\n") prompt_parts.append("=" * 50) for i, chunk in enumerate(relevant_chunks, 1): prompt_parts.append(f"\n[참고문서 {i} - {chunk['doc_name']}]") content = chunk['content'][:400] if len(chunk['content']) > 400 else chunk['content'] prompt_parts.append(content) prompt_parts.append("") prompt_parts.append("=" * 50) prompt_parts.append(f"\n질문: {query}") prompt_parts.append("\n위 참고문서를 바탕으로 자세하고 정확하게 답변해주세요:") return "\n".join(prompt_parts) # RAG 시스템 인스턴스 생성 rag_system = PDFRAGSystem() # State variable to track current model current_model = gr.State("openai/gpt-oss-120b") def upload_pdf(file): """PDF 파일 업로드 처리""" if file is None: return ( gr.update(value="