import gradio as gr import os from typing import List, Dict, Any, Optional import hashlib from datetime import datetime import numpy as np # PDF 처리 라이브러리 try: import fitz # PyMuPDF PDF_AVAILABLE = True except ImportError: PDF_AVAILABLE = False print("⚠️ PyMuPDF not installed. Install with: pip install pymupdf") try: from sentence_transformers import SentenceTransformer ST_AVAILABLE = True except ImportError: ST_AVAILABLE = False print("⚠️ Sentence Transformers not installed. Install with: pip install sentence-transformers") # Custom CSS for gradient background and styling custom_css = """ .gradio-container { background: linear-gradient(135deg, #667eea 0%, #764ba2 25%, #f093fb 50%, #4facfe 75%, #00f2fe 100%); background-size: 400% 400%; animation: gradient-animation 15s ease infinite; min-height: 100vh; } @keyframes gradient-animation { 0% { background-position: 0% 50%; } 50% { background-position: 100% 50%; } 100% { background-position: 0% 50%; } } .dark .gradio-container { background: linear-gradient(135deg, #1a1a2e 0%, #16213e 25%, #0f3460 50%, #533483 75%, #e94560 100%); background-size: 400% 400%; animation: gradient-animation 15s ease infinite; } .main-container { background-color: rgba(255, 255, 255, 0.95); backdrop-filter: blur(10px); border-radius: 20px; padding: 20px; box-shadow: 0 8px 32px 0 rgba(31, 38, 135, 0.37); border: 1px solid rgba(255, 255, 255, 0.18); margin: 10px; } .dark .main-container { background-color: rgba(30, 30, 30, 0.95); border: 1px solid rgba(255, 255, 255, 0.1); } .pdf-status { padding: 10px; border-radius: 10px; margin: 10px 0; font-size: 0.9em; } .pdf-success { background-color: rgba(52, 211, 153, 0.2); border: 1px solid rgba(52, 211, 153, 0.5); color: #10b981; } .pdf-error { background-color: rgba(248, 113, 113, 0.2); border: 1px solid rgba(248, 113, 113, 0.5); color: #ef4444; } .pdf-info { background-color: rgba(59, 130, 246, 0.2); border: 1px solid rgba(59, 130, 246, 0.5); color: #3b82f6; } .rag-context { background-color: rgba(251, 191, 36, 0.1); border-left: 4px solid #f59e0b; padding: 10px; margin: 10px 0; border-radius: 5px; } """ class SimpleTextSplitter: """텍스트 분할기""" def __init__(self, chunk_size=800, chunk_overlap=100): self.chunk_size = chunk_size self.chunk_overlap = chunk_overlap def split_text(self, text: str) -> List[str]: """텍스트를 청크로 분할""" chunks = [] sentences = text.split('. ') current_chunk = "" for sentence in sentences: if len(current_chunk) + len(sentence) < self.chunk_size: current_chunk += sentence + ". " else: if current_chunk: chunks.append(current_chunk.strip()) current_chunk = sentence + ". " if current_chunk: chunks.append(current_chunk.strip()) return chunks class PDFRAGSystem: """PDF 기반 RAG 시스템""" def __init__(self): self.documents = {} self.document_chunks = {} self.embeddings_store = {} self.text_splitter = SimpleTextSplitter(chunk_size=800, chunk_overlap=100) # 임베딩 모델 초기화 self.embedder = None if ST_AVAILABLE: try: self.embedder = SentenceTransformer('sentence-transformers/all-MiniLM-L6-v2') print("✅ 임베딩 모델 로드 성공") except Exception as e: print(f"⚠️ 임베딩 모델 로드 실패: {e}") def extract_text_from_pdf(self, pdf_path: str) -> Dict[str, Any]: """PDF에서 텍스트 추출""" if not PDF_AVAILABLE: return { "metadata": { "title": "PDF Reader Not Available", "file_name": os.path.basename(pdf_path), "pages": 0 }, "full_text": "PDF 처리를 위해 'pip install pymupdf'를 실행해주세요." } try: doc = fitz.open(pdf_path) text_content = [] metadata = { "title": doc.metadata.get("title", os.path.basename(pdf_path)), "pages": len(doc), "file_name": os.path.basename(pdf_path) } for page_num, page in enumerate(doc): text = page.get_text() if text.strip(): text_content.append(text) doc.close() return { "metadata": metadata, "full_text": "\n\n".join(text_content) } except Exception as e: raise Exception(f"PDF 처리 오류: {str(e)}") def process_and_store_pdf(self, pdf_path: str, doc_id: str) -> Dict[str, Any]: """PDF 처리 및 저장""" try: # PDF 텍스트 추출 pdf_data = self.extract_text_from_pdf(pdf_path) # 텍스트를 청크로 분할 chunks = self.text_splitter.split_text(pdf_data["full_text"]) # 청크 저장 self.document_chunks[doc_id] = chunks # 임베딩 생성 if self.embedder: embeddings = self.embedder.encode(chunks) self.embeddings_store[doc_id] = embeddings # 문서 정보 저장 self.documents[doc_id] = { "metadata": pdf_data["metadata"], "chunk_count": len(chunks), "upload_time": datetime.now().isoformat() } return { "success": True, "doc_id": doc_id, "chunks": len(chunks), "pages": pdf_data["metadata"]["pages"], "title": pdf_data["metadata"]["title"] } except Exception as e: return {"success": False, "error": str(e)} def search_relevant_chunks(self, query: str, doc_ids: List[str], top_k: int = 3) -> List[Dict]: """관련 청크 검색""" all_relevant_chunks = [] if self.embedder and self.embeddings_store: # 임베딩 기반 검색 query_embedding = self.embedder.encode([query])[0] for doc_id in doc_ids: if doc_id in self.embeddings_store and doc_id in self.document_chunks: doc_embeddings = self.embeddings_store[doc_id] chunks = self.document_chunks[doc_id] # 코사인 유사도 계산 similarities = [] for emb in doc_embeddings: sim = np.dot(query_embedding, emb) / (np.linalg.norm(query_embedding) * np.linalg.norm(emb)) similarities.append(sim) # 상위 청크 선택 top_indices = np.argsort(similarities)[-top_k:][::-1] for idx in top_indices: if similarities[idx] > 0.2: all_relevant_chunks.append({ "content": chunks[idx], "doc_name": self.documents[doc_id]["metadata"]["file_name"], "similarity": similarities[idx] }) else: # 키워드 기반 검색 query_keywords = set(query.lower().split()) for doc_id in doc_ids: if doc_id in self.document_chunks: chunks = self.document_chunks[doc_id] for chunk in chunks[:top_k]: # 처음 몇 개만 사용 chunk_lower = chunk.lower() score = sum(1 for keyword in query_keywords if keyword in chunk_lower) if score > 0: all_relevant_chunks.append({ "content": chunk[:500], # 길이 제한 "doc_name": self.documents[doc_id]["metadata"]["file_name"], "similarity": score / len(query_keywords) if query_keywords else 0 }) # 정렬 및 반환 all_relevant_chunks.sort(key=lambda x: x.get('similarity', 0), reverse=True) return all_relevant_chunks[:top_k] def create_rag_prompt(self, query: str, doc_ids: List[str], top_k: int = 3) -> str: """RAG 프롬프트 생성""" relevant_chunks = self.search_relevant_chunks(query, doc_ids, top_k) if not relevant_chunks: return query # 프롬프트 구성 prompt_parts = [] prompt_parts.append("다음 문서 내용을 참고하여 질문에 답변해주세요:\n") prompt_parts.append("=" * 50) for i, chunk in enumerate(relevant_chunks, 1): prompt_parts.append(f"\n[참고문서 {i} - {chunk['doc_name']}]") content = chunk['content'][:400] if len(chunk['content']) > 400 else chunk['content'] prompt_parts.append(content) prompt_parts.append("") prompt_parts.append("=" * 50) prompt_parts.append(f"\n질문: {query}") prompt_parts.append("\n위 참고문서를 바탕으로 자세하고 정확하게 답변해주세요:") return "\n".join(prompt_parts) # RAG 시스템 인스턴스 생성 rag_system = PDFRAGSystem() # State variable to track current model current_model = gr.State("openai/gpt-oss-120b") def upload_pdf(file): """PDF 파일 업로드 처리""" if file is None: return ( gr.update(value="
파일을 선택해주세요
"), gr.update(choices=[]), gr.update(value=False) ) try: # 파일 해시를 ID로 사용 with open(file.name, 'rb') as f: file_hash = hashlib.md5(f.read()).hexdigest()[:8] doc_id = f"doc_{file_hash}" # PDF 처리 및 저장 result = rag_system.process_and_store_pdf(file.name, doc_id) if result["success"]: status_html = f"""
✅ PDF 업로드 성공!
📄 파일: {result['title']}
📑 페이지: {result['pages']}페이지
🔍 청크: {result['chunks']}개 생성
""" # 문서 목록 업데이트 doc_choices = [f"{doc_id}: {rag_system.documents[doc_id]['metadata']['file_name']}" for doc_id in rag_system.documents.keys()] return ( status_html, gr.update(choices=doc_choices, value=doc_choices), gr.update(value=True) ) else: status_html = f"""
❌ 업로드 실패: {result['error']}
""" return status_html, gr.update(), gr.update(value=False) except Exception as e: return ( f"
❌ 오류: {str(e)}
", gr.update(), gr.update(value=False) ) def clear_documents(): """문서 초기화""" rag_system.documents = {} rag_system.document_chunks = {} rag_system.embeddings_store = {} return ( gr.update(value="
✅ 모든 문서가 삭제되었습니다
"), gr.update(choices=[], value=[]), gr.update(value=False) ) def switch_model(model_choice): """Function to switch between models""" return gr.update(visible=False), gr.update(visible=True), model_choice def create_rag_context_display(query, selected_docs, top_k): """RAG 컨텍스트 표시용 HTML 생성""" if not selected_docs: return "" doc_ids = [doc.split(":")[0] for doc in selected_docs] chunks = rag_system.search_relevant_chunks(query, doc_ids, top_k) if not chunks: return "" html = "
📚 참고 문서:
" for i, chunk in enumerate(chunks, 1): html += f"
{i}. {chunk['doc_name']} (유사도: {chunk['similarity']:.2f})
" html += f"{chunk['content'][:200]}...
" html += "
" return html # Main interface with gr.Blocks(fill_height=True, theme="Nymbo/Nymbo_Theme", css=custom_css) as demo: # JavaScript to handle message passing gr.HTML(""" """) with gr.Row(): # Sidebar with gr.Column(scale=1): with gr.Group(elem_classes="main-container"): gr.Markdown("# 🚀 Inference Provider + RAG") gr.Markdown( "OpenAI GPT-OSS models with PDF RAG support. " "Sign in with your Hugging Face account to use this API." ) # Model selection model_dropdown = gr.Dropdown( choices=["openai/gpt-oss-120b", "openai/gpt-oss-20b"], value="openai/gpt-oss-120b", label="📊 Select Model", info="Choose between different model sizes" ) # Login button login_button = gr.LoginButton("Sign in with Hugging Face", size="lg") # Reload button to apply model change reload_btn = gr.Button("🔄 Apply Model Change", variant="primary", size="lg") # RAG Settings with gr.Accordion("📚 PDF RAG Settings", open=True): pdf_upload = gr.File( label="Upload PDF", file_types=[".pdf"], type="filepath" ) upload_status = gr.HTML( value="
📤 PDF를 업로드하여 문서 기반 답변을 받으세요
" ) document_list = gr.CheckboxGroup( choices=[], label="📄 업로드된 문서", info="참고할 문서를 선택하세요" ) clear_btn = gr.Button("🗑️ 모든 문서 삭제", size="sm") enable_rag = gr.Checkbox( label="RAG 활성화", value=False, info="선택한 문서를 참고하여 답변 생성" ) top_k_chunks = gr.Slider( minimum=1, maximum=5, value=3, step=1, label="참조 청크 수", info="답변 생성시 참고할 문서 조각 개수" ) # Additional options with gr.Accordion("⚙️ Advanced Options", open=False): gr.Markdown("*These options will be available after model implementation*") temperature = gr.Slider( minimum=0, maximum=2, value=0.7, step=0.1, label="Temperature" ) max_tokens = gr.Slider( minimum=1, maximum=4096, value=512, step=1, label="Max Tokens" ) # Main chat area with gr.Column(scale=3): with gr.Group(elem_classes="main-container"): gr.Markdown("## 💬 Chat Interface") # RAG 상태 표시 rag_status = gr.HTML( value="
🔍 RAG: 비활성화
" ) # RAG 컨텍스트 표시 영역 rag_context_display = gr.HTML(value="", visible=False) # Container for model interfaces with gr.Column(visible=True) as model_120b_container: gr.Markdown("### Model: openai/gpt-oss-120b") # RAG 처리를 위한 커스텀 인터페이스 with gr.Group(): # 사용자 입력 텍스트박스 user_input = gr.Textbox( label="메시지 입력", placeholder="문서에 대해 질문하거나 일반 대화를 시작하세요...", lines=2 ) with gr.Row(): send_btn = gr.Button("📤 전송", variant="primary") clear_chat_btn = gr.Button("🗑️ 대화 초기화") # 원본 모델 로드 original_model = gr.load( "models/openai/gpt-oss-120b", accept_token=login_button, provider="fireworks-ai" ) with gr.Column(visible=False) as model_20b_container: gr.Markdown("### Model: openai/gpt-oss-20b") with gr.Group(): # 사용자 입력 텍스트박스 (20b용) user_input_20b = gr.Textbox( label="메시지 입력", placeholder="문서에 대해 질문하거나 일반 대화를 시작하세요...", lines=2 ) with gr.Row(): send_btn_20b = gr.Button("📤 전송", variant="primary") clear_chat_btn_20b = gr.Button("🗑️ 대화 초기화") # 원본 모델 로드 original_model_20b = gr.load( "models/openai/gpt-oss-20b", accept_token=login_button, provider="fireworks-ai" ) # Event Handlers # PDF 업로드 pdf_upload.upload( fn=upload_pdf, inputs=[pdf_upload], outputs=[upload_status, document_list, enable_rag] ) # 문서 삭제 clear_btn.click( fn=clear_documents, outputs=[upload_status, document_list, enable_rag] ) # RAG 상태 업데이트 enable_rag.change( fn=lambda x: gr.update( value=f"
🔍 RAG: {'활성화' if x else '비활성화'}
" ), inputs=[enable_rag], outputs=[rag_status] ) # 모델 전환 reload_btn.click( fn=switch_model, inputs=[model_dropdown], outputs=[model_120b_container, model_20b_container, current_model] ).then( fn=lambda: gr.Info("Model switched successfully!"), inputs=[], outputs=[] ) # Update visibility based on dropdown selection def update_visibility(model_choice): if model_choice == "openai/gpt-oss-120b": return gr.update(visible=True), gr.update(visible=False) else: return gr.update(visible=False), gr.update(visible=True) model_dropdown.change( fn=update_visibility, inputs=[model_dropdown], outputs=[model_120b_container, model_20b_container] ) # 메시지 전송 처리 (RAG 포함) def process_message(message, enable_rag, selected_docs, top_k): """메시지를 RAG로 처리하여 모델에 전송""" if enable_rag and selected_docs: doc_ids = [doc.split(":")[0] for doc in selected_docs] enhanced_message = rag_system.create_rag_prompt(message, doc_ids, top_k) context_html = create_rag_context_display(message, selected_docs, top_k) return enhanced_message, gr.update(value=context_html, visible=True) else: return message, gr.update(value="", visible=False) # 120b 모델용 이벤트 send_btn.click( fn=process_message, inputs=[user_input, enable_rag, document_list, top_k_chunks], outputs=[user_input, rag_context_display] ) user_input.submit( fn=process_message, inputs=[user_input, enable_rag, document_list, top_k_chunks], outputs=[user_input, rag_context_display] ) # 20b 모델용 이벤트 send_btn_20b.click( fn=process_message, inputs=[user_input_20b, enable_rag, document_list, top_k_chunks], outputs=[user_input_20b, rag_context_display] ) user_input_20b.submit( fn=process_message, inputs=[user_input_20b, enable_rag, document_list, top_k_chunks], outputs=[user_input_20b, rag_context_display] ) demo.launch()