Spaces:
Runtime error
Runtime error
import os | |
import re | |
import glob | |
import time | |
from collections import defaultdict | |
from langchain.text_splitter import RecursiveCharacterTextSplitter | |
from langchain_core.documents import Document | |
from langchain_community.embeddings import HuggingFaceEmbeddings | |
from langchain_community.vectorstores import FAISS | |
# PyMuPDF λΌμ΄λΈλ¬λ¦¬ | |
try: | |
import fitz # PyMuPDF | |
PYMUPDF_AVAILABLE = True | |
print("β PyMuPDF λΌμ΄λΈλ¬λ¦¬ μ¬μ© κ°λ₯") | |
except ImportError: | |
PYMUPDF_AVAILABLE = False | |
print("β οΈ PyMuPDF λΌμ΄λΈλ¬λ¦¬κ° μ€μΉλμ§ μμ. pip install PyMuPDFλ‘ μ€μΉνμΈμ.") | |
# PDF μ²λ¦¬μ© | |
import pytesseract | |
from PIL import Image | |
from pdf2image import convert_from_path | |
import pdfplumber | |
from pymupdf4llm import LlamaMarkdownReader | |
# -------------------------------- | |
# λ‘κ·Έ μΆλ ₯ | |
# -------------------------------- | |
def log(msg): | |
print(f"[{time.strftime('%H:%M:%S')}] {msg}") | |
# -------------------------------- | |
# ν μ€νΈ μ μ ν¨μ | |
# -------------------------------- | |
def clean_text(text): | |
return re.sub(r"[^\uAC00-\uD7A3\u1100-\u11FF\u3130-\u318F\w\s.,!?\"'()$:\-]", "", text) | |
def apply_corrections(text): | |
corrections = { | |
'ΒΊΒ©': 'μ 보', 'Γ': 'μ', 'Β½': 'μ΄μ', 'Γ': '', 'Β©': '', | |
'Γ’β¬β’': "'", 'Γ’β¬Ε': '"', 'Γ’β¬': '"' | |
} | |
for k, v in corrections.items(): | |
text = text.replace(k, v) | |
return text | |
# -------------------------------- | |
# HWPX μ²λ¦¬ (μΉμ λ³ μ²λ¦¬λ§ μ¬μ©) | |
# -------------------------------- | |
def load_hwpx(file_path): | |
"""HWPX νμΌ λ‘λ© (XML νμ± λ°©μλ§ μ¬μ©)""" | |
import zipfile | |
import xml.etree.ElementTree as ET | |
import chardet | |
log(f"π₯ HWPX μΉμ λ³ μ²λ¦¬ μμ: {file_path}") | |
start = time.time() | |
documents = [] | |
try: | |
with zipfile.ZipFile(file_path, 'r') as zip_ref: | |
file_list = zip_ref.namelist() | |
section_files = [f for f in file_list | |
if f.startswith('Contents/section') and f.endswith('.xml')] | |
section_files.sort() # section0.xml, section1.xml μμλ‘ μ λ ¬ | |
log(f"π λ°κ²¬λ μΉμ νμΌ: {len(section_files)}κ°") | |
for section_idx, section_file in enumerate(section_files): | |
with zip_ref.open(section_file) as xml_file: | |
raw = xml_file.read() | |
encoding = chardet.detect(raw)['encoding'] or 'utf-8' | |
try: | |
text = raw.decode(encoding) | |
except UnicodeDecodeError: | |
text = raw.decode("cp949", errors="replace") | |
tree = ET.ElementTree(ET.fromstring(text)) | |
root = tree.getroot() | |
# λ€μμ€νμ΄μ€ μμ΄ ν μ€νΈ μ°ΎκΈ° | |
t_elements = [elem for elem in root.iter() if elem.tag.endswith('}t') or elem.tag == 't'] | |
body_text = "" | |
for elem in t_elements: | |
if elem.text: | |
body_text += clean_text(elem.text) + " " | |
# page λ©νλ°μ΄ν°λ λΉ κ°μΌλ‘ μ€μ | |
page_value = "" | |
if body_text.strip(): | |
documents.append(Document( | |
page_content=apply_corrections(body_text), | |
metadata={ | |
"source": file_path, | |
"filename": os.path.basename(file_path), | |
"type": "hwpx_body", | |
"page": page_value, | |
"total_sections": len(section_files) | |
} | |
)) | |
log(f"β μΉμ ν μ€νΈ μΆμΆ μλ£ (chars: {len(body_text)})") | |
# ν μ°ΎκΈ° | |
table_elements = [elem for elem in root.iter() if elem.tag.endswith('}table') or elem.tag == 'table'] | |
if table_elements: | |
table_text = "" | |
for table_idx, table in enumerate(table_elements): | |
table_text += f"[Table {table_idx + 1}]\n" | |
rows = [elem for elem in table.iter() if elem.tag.endswith('}tr') or elem.tag == 'tr'] | |
for row in rows: | |
row_text = [] | |
cells = [elem for elem in row.iter() if elem.tag.endswith('}tc') or elem.tag == 'tc'] | |
for cell in cells: | |
cell_texts = [] | |
for t_elem in cell.iter(): | |
if (t_elem.tag.endswith('}t') or t_elem.tag == 't') and t_elem.text: | |
cell_texts.append(clean_text(t_elem.text)) | |
row_text.append(" ".join(cell_texts)) | |
if row_text: | |
table_text += "\t".join(row_text) + "\n" | |
if table_text.strip(): | |
documents.append(Document( | |
page_content=apply_corrections(table_text), | |
metadata={ | |
"source": file_path, | |
"filename": os.path.basename(file_path), | |
"type": "hwpx_table", | |
"page": page_value, | |
"total_sections": len(section_files) | |
} | |
)) | |
log(f"π ν μΆμΆ μλ£") | |
# μ΄λ―Έμ§ μ°ΎκΈ° | |
if [elem for elem in root.iter() if elem.tag.endswith('}picture') or elem.tag == 'picture']: | |
documents.append(Document( | |
page_content="[μ΄λ―Έμ§ ν¬ν¨]", | |
metadata={ | |
"source": file_path, | |
"filename": os.path.basename(file_path), | |
"type": "hwpx_image", | |
"page": page_value, | |
"total_sections": len(section_files) | |
} | |
)) | |
log(f"πΌοΈ μ΄λ―Έμ§ λ°κ²¬") | |
except Exception as e: | |
log(f"β HWPX μ²λ¦¬ μ€λ₯: {e}") | |
duration = time.time() - start | |
# λ¬Έμ μ 보 μμ½ μΆλ ₯ | |
if documents: | |
log(f"π μΆμΆλ λ¬Έμ μ: {len(documents)}") | |
log(f"β HWPX μ²λ¦¬ μλ£: {file_path} β±οΈ {duration:.2f}μ΄, μ΄ {len(documents)}κ° λ¬Έμ") | |
return documents | |
# -------------------------------- | |
# PDF μ²λ¦¬ ν¨μλ€ (κΈ°μ‘΄κ³Ό λμΌ) | |
# -------------------------------- | |
def run_ocr_on_image(image: Image.Image, lang='kor+eng'): | |
return pytesseract.image_to_string(image, lang=lang) | |
def extract_images_with_ocr(pdf_path, lang='kor+eng'): | |
try: | |
images = convert_from_path(pdf_path) | |
page_ocr_data = {} | |
for idx, img in enumerate(images): | |
page_num = idx + 1 | |
text = run_ocr_on_image(img, lang=lang) | |
if text.strip(): | |
page_ocr_data[page_num] = text.strip() | |
return page_ocr_data | |
except Exception as e: | |
print(f"β μ΄λ―Έμ§ OCR μ€ν¨: {e}") | |
return {} | |
def extract_tables_with_pdfplumber(pdf_path): | |
page_table_data = {} | |
try: | |
with pdfplumber.open(pdf_path) as pdf: | |
for i, page in enumerate(pdf.pages): | |
page_num = i + 1 | |
tables = page.extract_tables() | |
table_text = "" | |
for t_index, table in enumerate(tables): | |
if table: | |
table_text += f"[Table {t_index+1}]\n" | |
for row in table: | |
row_text = "\t".join(cell if cell else "" for cell in row) | |
table_text += row_text + "\n" | |
if table_text.strip(): | |
page_table_data[page_num] = table_text.strip() | |
return page_table_data | |
except Exception as e: | |
print(f"β ν μΆμΆ μ€ν¨: {e}") | |
return {} | |
def extract_body_text_with_pages(pdf_path): | |
page_body_data = {} | |
try: | |
pdf_processor = LlamaMarkdownReader() | |
docs = pdf_processor.load_data(file_path=pdf_path) | |
combined_text = "" | |
for d in docs: | |
if isinstance(d, dict) and "text" in d: | |
combined_text += d["text"] | |
elif hasattr(d, "text"): | |
combined_text += d.text | |
if combined_text.strip(): | |
chars_per_page = 2000 | |
start = 0 | |
page_num = 1 | |
while start < len(combined_text): | |
end = start + chars_per_page | |
if end > len(combined_text): | |
end = len(combined_text) | |
page_text = combined_text[start:end] | |
if page_text.strip(): | |
page_body_data[page_num] = page_text.strip() | |
page_num += 1 | |
if end == len(combined_text): | |
break | |
start = end - 100 | |
except Exception as e: | |
print(f"β λ³Έλ¬Έ μΆμΆ μ€ν¨: {e}") | |
return page_body_data | |
def load_pdf_with_metadata(pdf_path): | |
"""PDF νμΌμμ νμ΄μ§λ³ μ 보λ₯Ό μΆμΆ""" | |
log(f"π PDF νμ΄μ§λ³ μ²λ¦¬ μμ: {pdf_path}") | |
start = time.time() | |
# λ¨Όμ PyPDFLoaderλ‘ μ€μ νμ΄μ§ μ νμΈ | |
try: | |
from langchain_community.document_loaders import PyPDFLoader | |
loader = PyPDFLoader(pdf_path) | |
pdf_pages = loader.load() | |
actual_total_pages = len(pdf_pages) | |
log(f"π PyPDFLoaderλ‘ νμΈν μ€μ νμ΄μ§ μ: {actual_total_pages}") | |
except Exception as e: | |
log(f"β PyPDFLoader νμ΄μ§ μ νμΈ μ€ν¨: {e}") | |
actual_total_pages = 1 | |
try: | |
page_tables = extract_tables_with_pdfplumber(pdf_path) | |
except Exception as e: | |
page_tables = {} | |
print(f"β ν μΆμΆ μ€ν¨: {e}") | |
try: | |
page_ocr = extract_images_with_ocr(pdf_path) | |
except Exception as e: | |
page_ocr = {} | |
print(f"β μ΄λ―Έμ§ OCR μ€ν¨: {e}") | |
try: | |
page_body = extract_body_text_with_pages(pdf_path) | |
except Exception as e: | |
page_body = {} | |
print(f"β λ³Έλ¬Έ μΆμΆ μ€ν¨: {e}") | |
duration = time.time() - start | |
log(f"β PDF νμ΄μ§λ³ μ²λ¦¬ μλ£: {pdf_path} β±οΈ {duration:.2f}μ΄") | |
# μ€μ νμ΄μ§ μλ₯Ό κΈ°μ€μΌλ‘ μ€μ | |
all_pages = set(page_tables.keys()) | set(page_ocr.keys()) | set(page_body.keys()) | |
if all_pages: | |
max_extracted_page = max(all_pages) | |
# μ€μ νμ΄μ§ μμ μΆμΆλ νμ΄μ§ μ μ€ ν° κ° μ¬μ© | |
total_pages = max(actual_total_pages, max_extracted_page) | |
else: | |
total_pages = actual_total_pages | |
log(f"π μ΅μ’ μ€μ λ μ΄ νμ΄μ§ μ: {total_pages}") | |
docs = [] | |
for page_num in sorted(all_pages): | |
if page_num in page_tables and page_tables[page_num].strip(): | |
docs.append(Document( | |
page_content=clean_text(apply_corrections(page_tables[page_num])), | |
metadata={ | |
"source": pdf_path, | |
"filename": os.path.basename(pdf_path), | |
"type": "table", | |
"page": page_num, | |
"total_pages": total_pages | |
} | |
)) | |
log(f"π νμ΄μ§ {page_num}: ν μΆμΆ μλ£") | |
if page_num in page_body and page_body[page_num].strip(): | |
docs.append(Document( | |
page_content=clean_text(apply_corrections(page_body[page_num])), | |
metadata={ | |
"source": pdf_path, | |
"filename": os.path.basename(pdf_path), | |
"type": "body", | |
"page": page_num, | |
"total_pages": total_pages | |
} | |
)) | |
log(f"π νμ΄μ§ {page_num}: λ³Έλ¬Έ μΆμΆ μλ£") | |
if page_num in page_ocr and page_ocr[page_num].strip(): | |
docs.append(Document( | |
page_content=clean_text(apply_corrections(page_ocr[page_num])), | |
metadata={ | |
"source": pdf_path, | |
"filename": os.path.basename(pdf_path), | |
"type": "ocr", | |
"page": page_num, | |
"total_pages": total_pages | |
} | |
)) | |
log(f"πΌοΈ νμ΄μ§ {page_num}: OCR μΆμΆ μλ£") | |
if not docs: | |
docs.append(Document( | |
page_content="[λ΄μ© μΆμΆ μ€ν¨]", | |
metadata={ | |
"source": pdf_path, | |
"filename": os.path.basename(pdf_path), | |
"type": "error", | |
"page": 1, | |
"total_pages": total_pages | |
} | |
)) | |
# νμ΄μ§ μ 보 μμ½ μΆλ ₯ | |
if docs: | |
page_numbers = [doc.metadata.get('page', 0) for doc in docs if doc.metadata.get('page')] | |
if page_numbers: | |
log(f"π μΆμΆλ νμ΄μ§ λ²μ: {min(page_numbers)} ~ {max(page_numbers)}") | |
log(f"π μΆμΆλ νμ΄μ§λ³ PDF λ¬Έμ: {len(docs)}κ° (μ΄ {total_pages}νμ΄μ§)") | |
return docs | |
# -------------------------------- | |
# λ¬Έμ λ‘λ© λ° λΆν | |
# -------------------------------- | |
def load_documents(folder_path): | |
documents = [] | |
for file in glob.glob(os.path.join(folder_path, "*.hwpx")): | |
log(f"π HWPX νμΌ νμΈ: {file}") | |
docs = load_hwpx(file) | |
documents.extend(docs) | |
for file in glob.glob(os.path.join(folder_path, "*.pdf")): | |
log(f"π PDF νμΌ νμΈ: {file}") | |
documents.extend(load_pdf_with_metadata(file)) | |
log(f"π λ¬Έμ λ‘λ© μ 체 μλ£! μ΄ λ¬Έμ μ: {len(documents)}") | |
return documents | |
def split_documents(documents, chunk_size=800, chunk_overlap=100): | |
log("πͺ μ²ν¬ λΆν μμ") | |
splitter = RecursiveCharacterTextSplitter( | |
chunk_size=chunk_size, | |
chunk_overlap=chunk_overlap, | |
length_function=len | |
) | |
chunks = [] | |
for doc in documents: | |
split = splitter.split_text(doc.page_content) | |
for i, chunk in enumerate(split): | |
enriched_chunk = f"passage: {chunk}" | |
chunks.append(Document( | |
page_content=enriched_chunk, | |
metadata={**doc.metadata, "chunk_index": i} | |
)) | |
log(f"β μ²ν¬ λΆν μλ£: μ΄ {len(chunks)}κ° μμ±") | |
return chunks | |
# -------------------------------- | |
# λ©μΈ μ€ν | |
# -------------------------------- | |
if __name__ == "__main__": | |
folder = "dataset_test" | |
log("π PyMuPDF κΈ°λ° λ¬Έμ μ²λ¦¬ μμ") | |
docs = load_documents(folder) | |
log("π¦ λ¬Έμ λ‘λ© μλ£") | |
# νμ΄μ§ μ 보 νμΈ | |
log("π νμ΄μ§ μ 보 μμ½:") | |
page_info = {} | |
for doc in docs: | |
source = doc.metadata.get('source', 'unknown') | |
page = doc.metadata.get('page', 'unknown') | |
doc_type = doc.metadata.get('type', 'unknown') | |
if source not in page_info: | |
page_info[source] = {'pages': set(), 'types': set()} | |
page_info[source]['pages'].add(page) | |
page_info[source]['types'].add(doc_type) | |
for source, info in page_info.items(): | |
max_page = max(info['pages']) if info['pages'] and isinstance(max(info['pages']), int) else 'unknown' | |
log(f" π {os.path.basename(source)}: {max_page}νμ΄μ§, νμ : {info['types']}") | |
chunks = split_documents(docs) | |
log("π‘ E5-Large-Instruct μλ² λ© μ€λΉ μ€") | |
embedding_model = HuggingFaceEmbeddings( | |
model_name="intfloat/e5-large-v2", | |
model_kwargs={"device": "cuda"} | |
) | |
vectorstore = FAISS.from_documents(chunks, embedding_model) | |
vectorstore.save_local("vector_db") | |
log(f"π μ 체 λ¬Έμ μ: {len(docs)}") | |
log(f"π μ²ν¬ μ΄ μ: {len(chunks)}") | |
log("β FAISS μ μ₯ μλ£: vector_db") | |
# νμ΄μ§ μ λ³΄κ° ν¬ν¨λ μν μΆλ ₯ | |
log("\nπ μ€μ νμ΄μ§ μ 보 ν¬ν¨ μν:") | |
for i, chunk in enumerate(chunks[:5]): | |
meta = chunk.metadata | |
log(f" μ²ν¬ {i+1}: {meta.get('type')} | νμ΄μ§ {meta.get('page')} | {os.path.basename(meta.get('source', 'unknown'))}") | |