open-webui-rag-system / document_processor_image_test.py
hugging2021's picture
Upload folder using huggingface_hub
5f3b20a verified
import os
import re
import glob
import time
from collections import defaultdict
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_core.documents import Document
from langchain_community.embeddings import HuggingFaceEmbeddings
from langchain_community.vectorstores import FAISS
# PyMuPDF 라이브러리
try:
import fitz # PyMuPDF
PYMUPDF_AVAILABLE = True
print("βœ… PyMuPDF 라이브러리 μ‚¬μš© κ°€λŠ₯")
except ImportError:
PYMUPDF_AVAILABLE = False
print("⚠️ PyMuPDF λΌμ΄λΈŒλŸ¬λ¦¬κ°€ μ„€μΉ˜λ˜μ§€ μ•ŠμŒ. pip install PyMuPDF둜 μ„€μΉ˜ν•˜μ„Έμš”.")
# PDF 처리용
import pytesseract
from PIL import Image
from pdf2image import convert_from_path
import pdfplumber
from pymupdf4llm import LlamaMarkdownReader
# --------------------------------
# 둜그 좜λ ₯
# --------------------------------
def log(msg):
print(f"[{time.strftime('%H:%M:%S')}] {msg}")
# --------------------------------
# ν…μŠ€νŠΈ μ •μ œ ν•¨μˆ˜
# --------------------------------
def clean_text(text):
return re.sub(r"[^\uAC00-\uD7A3\u1100-\u11FF\u3130-\u318F\w\s.,!?\"'()$:\-]", "", text)
def apply_corrections(text):
corrections = {
'ΒΊΒ©': '정보', 'Ì': '의', 'Β½': '운영', 'Γƒ': '', 'Β©': '',
'Ò€ℒ': "'", 'Ò€œ': '"', 'Ò€': '"'
}
for k, v in corrections.items():
text = text.replace(k, v)
return text
# --------------------------------
# HWPX 처리 (μ„Ήμ…˜λ³„ 처리만 μ‚¬μš©)
# --------------------------------
def load_hwpx(file_path):
"""HWPX 파일 λ‘œλ”© (XML νŒŒμ‹± λ°©μ‹λ§Œ μ‚¬μš©)"""
import zipfile
import xml.etree.ElementTree as ET
import chardet
log(f"πŸ“₯ HWPX μ„Ήμ…˜λ³„ 처리 μ‹œμž‘: {file_path}")
start = time.time()
documents = []
try:
with zipfile.ZipFile(file_path, 'r') as zip_ref:
file_list = zip_ref.namelist()
section_files = [f for f in file_list
if f.startswith('Contents/section') and f.endswith('.xml')]
section_files.sort() # section0.xml, section1.xml μˆœμ„œλ‘œ μ •λ ¬
log(f"πŸ“„ 발견된 μ„Ήμ…˜ 파일: {len(section_files)}개")
for section_idx, section_file in enumerate(section_files):
with zip_ref.open(section_file) as xml_file:
raw = xml_file.read()
encoding = chardet.detect(raw)['encoding'] or 'utf-8'
try:
text = raw.decode(encoding)
except UnicodeDecodeError:
text = raw.decode("cp949", errors="replace")
tree = ET.ElementTree(ET.fromstring(text))
root = tree.getroot()
# λ„€μž„μŠ€νŽ˜μ΄μŠ€ 없이 ν…μŠ€νŠΈ μ°ΎκΈ°
t_elements = [elem for elem in root.iter() if elem.tag.endswith('}t') or elem.tag == 't']
body_text = ""
for elem in t_elements:
if elem.text:
body_text += clean_text(elem.text) + " "
# page λ©”νƒ€λ°μ΄ν„°λŠ” 빈 κ°’μœΌλ‘œ μ„€μ •
page_value = ""
if body_text.strip():
documents.append(Document(
page_content=apply_corrections(body_text),
metadata={
"source": file_path,
"filename": os.path.basename(file_path),
"type": "hwpx_body",
"page": page_value,
"total_sections": len(section_files)
}
))
log(f"βœ… μ„Ήμ…˜ ν…μŠ€νŠΈ μΆ”μΆœ μ™„λ£Œ (chars: {len(body_text)})")
# ν‘œ μ°ΎκΈ°
table_elements = [elem for elem in root.iter() if elem.tag.endswith('}table') or elem.tag == 'table']
if table_elements:
table_text = ""
for table_idx, table in enumerate(table_elements):
table_text += f"[Table {table_idx + 1}]\n"
rows = [elem for elem in table.iter() if elem.tag.endswith('}tr') or elem.tag == 'tr']
for row in rows:
row_text = []
cells = [elem for elem in row.iter() if elem.tag.endswith('}tc') or elem.tag == 'tc']
for cell in cells:
cell_texts = []
for t_elem in cell.iter():
if (t_elem.tag.endswith('}t') or t_elem.tag == 't') and t_elem.text:
cell_texts.append(clean_text(t_elem.text))
row_text.append(" ".join(cell_texts))
if row_text:
table_text += "\t".join(row_text) + "\n"
if table_text.strip():
documents.append(Document(
page_content=apply_corrections(table_text),
metadata={
"source": file_path,
"filename": os.path.basename(file_path),
"type": "hwpx_table",
"page": page_value,
"total_sections": len(section_files)
}
))
log(f"πŸ“Š ν‘œ μΆ”μΆœ μ™„λ£Œ")
# 이미지 μ°ΎκΈ°
if [elem for elem in root.iter() if elem.tag.endswith('}picture') or elem.tag == 'picture']:
documents.append(Document(
page_content="[이미지 포함]",
metadata={
"source": file_path,
"filename": os.path.basename(file_path),
"type": "hwpx_image",
"page": page_value,
"total_sections": len(section_files)
}
))
log(f"πŸ–ΌοΈ 이미지 발견")
except Exception as e:
log(f"❌ HWPX 처리 였λ₯˜: {e}")
duration = time.time() - start
# λ¬Έμ„œ 정보 μš”μ•½ 좜λ ₯
if documents:
log(f"πŸ“‹ μΆ”μΆœλœ λ¬Έμ„œ 수: {len(documents)}")
log(f"βœ… HWPX 처리 μ™„λ£Œ: {file_path} ⏱️ {duration:.2f}초, 총 {len(documents)}개 λ¬Έμ„œ")
return documents
# --------------------------------
# PDF 처리 ν•¨μˆ˜λ“€ (κΈ°μ‘΄κ³Ό 동일)
# --------------------------------
def run_ocr_on_image(image: Image.Image, lang='kor+eng'):
return pytesseract.image_to_string(image, lang=lang)
def extract_images_with_ocr(pdf_path, lang='kor+eng'):
try:
images = convert_from_path(pdf_path)
page_ocr_data = {}
for idx, img in enumerate(images):
page_num = idx + 1
text = run_ocr_on_image(img, lang=lang)
if text.strip():
page_ocr_data[page_num] = text.strip()
return page_ocr_data
except Exception as e:
print(f"❌ 이미지 OCR μ‹€νŒ¨: {e}")
return {}
def extract_tables_with_pdfplumber(pdf_path):
page_table_data = {}
try:
with pdfplumber.open(pdf_path) as pdf:
for i, page in enumerate(pdf.pages):
page_num = i + 1
tables = page.extract_tables()
table_text = ""
for t_index, table in enumerate(tables):
if table:
table_text += f"[Table {t_index+1}]\n"
for row in table:
row_text = "\t".join(cell if cell else "" for cell in row)
table_text += row_text + "\n"
if table_text.strip():
page_table_data[page_num] = table_text.strip()
return page_table_data
except Exception as e:
print(f"❌ ν‘œ μΆ”μΆœ μ‹€νŒ¨: {e}")
return {}
def extract_body_text_with_pages(pdf_path):
page_body_data = {}
try:
pdf_processor = LlamaMarkdownReader()
docs = pdf_processor.load_data(file_path=pdf_path)
combined_text = ""
for d in docs:
if isinstance(d, dict) and "text" in d:
combined_text += d["text"]
elif hasattr(d, "text"):
combined_text += d.text
if combined_text.strip():
chars_per_page = 2000
start = 0
page_num = 1
while start < len(combined_text):
end = start + chars_per_page
if end > len(combined_text):
end = len(combined_text)
page_text = combined_text[start:end]
if page_text.strip():
page_body_data[page_num] = page_text.strip()
page_num += 1
if end == len(combined_text):
break
start = end - 100
except Exception as e:
print(f"❌ λ³Έλ¬Έ μΆ”μΆœ μ‹€νŒ¨: {e}")
return page_body_data
def load_pdf_with_metadata(pdf_path):
"""PDF νŒŒμΌμ—μ„œ νŽ˜μ΄μ§€λ³„ 정보λ₯Ό μΆ”μΆœ"""
log(f"πŸ“‘ PDF νŽ˜μ΄μ§€λ³„ 처리 μ‹œμž‘: {pdf_path}")
start = time.time()
# λ¨Όμ € PyPDFLoader둜 μ‹€μ œ νŽ˜μ΄μ§€ 수 확인
try:
from langchain_community.document_loaders import PyPDFLoader
loader = PyPDFLoader(pdf_path)
pdf_pages = loader.load()
actual_total_pages = len(pdf_pages)
log(f"πŸ“„ PyPDFLoader둜 ν™•μΈν•œ μ‹€μ œ νŽ˜μ΄μ§€ 수: {actual_total_pages}")
except Exception as e:
log(f"❌ PyPDFLoader νŽ˜μ΄μ§€ 수 확인 μ‹€νŒ¨: {e}")
actual_total_pages = 1
try:
page_tables = extract_tables_with_pdfplumber(pdf_path)
except Exception as e:
page_tables = {}
print(f"❌ ν‘œ μΆ”μΆœ μ‹€νŒ¨: {e}")
try:
page_ocr = extract_images_with_ocr(pdf_path)
except Exception as e:
page_ocr = {}
print(f"❌ 이미지 OCR μ‹€νŒ¨: {e}")
try:
page_body = extract_body_text_with_pages(pdf_path)
except Exception as e:
page_body = {}
print(f"❌ λ³Έλ¬Έ μΆ”μΆœ μ‹€νŒ¨: {e}")
duration = time.time() - start
log(f"βœ… PDF νŽ˜μ΄μ§€λ³„ 처리 μ™„λ£Œ: {pdf_path} ⏱️ {duration:.2f}초")
# μ‹€μ œ νŽ˜μ΄μ§€ 수λ₯Ό κΈ°μ€€μœΌλ‘œ μ„€μ •
all_pages = set(page_tables.keys()) | set(page_ocr.keys()) | set(page_body.keys())
if all_pages:
max_extracted_page = max(all_pages)
# μ‹€μ œ νŽ˜μ΄μ§€ μˆ˜μ™€ μΆ”μΆœλœ νŽ˜μ΄μ§€ 수 쀑 큰 κ°’ μ‚¬μš©
total_pages = max(actual_total_pages, max_extracted_page)
else:
total_pages = actual_total_pages
log(f"πŸ“Š μ΅œμ’… μ„€μ •λœ 총 νŽ˜μ΄μ§€ 수: {total_pages}")
docs = []
for page_num in sorted(all_pages):
if page_num in page_tables and page_tables[page_num].strip():
docs.append(Document(
page_content=clean_text(apply_corrections(page_tables[page_num])),
metadata={
"source": pdf_path,
"filename": os.path.basename(pdf_path),
"type": "table",
"page": page_num,
"total_pages": total_pages
}
))
log(f"πŸ“Š νŽ˜μ΄μ§€ {page_num}: ν‘œ μΆ”μΆœ μ™„λ£Œ")
if page_num in page_body and page_body[page_num].strip():
docs.append(Document(
page_content=clean_text(apply_corrections(page_body[page_num])),
metadata={
"source": pdf_path,
"filename": os.path.basename(pdf_path),
"type": "body",
"page": page_num,
"total_pages": total_pages
}
))
log(f"πŸ“„ νŽ˜μ΄μ§€ {page_num}: λ³Έλ¬Έ μΆ”μΆœ μ™„λ£Œ")
if page_num in page_ocr and page_ocr[page_num].strip():
docs.append(Document(
page_content=clean_text(apply_corrections(page_ocr[page_num])),
metadata={
"source": pdf_path,
"filename": os.path.basename(pdf_path),
"type": "ocr",
"page": page_num,
"total_pages": total_pages
}
))
log(f"πŸ–ΌοΈ νŽ˜μ΄μ§€ {page_num}: OCR μΆ”μΆœ μ™„λ£Œ")
if not docs:
docs.append(Document(
page_content="[λ‚΄μš© μΆ”μΆœ μ‹€νŒ¨]",
metadata={
"source": pdf_path,
"filename": os.path.basename(pdf_path),
"type": "error",
"page": 1,
"total_pages": total_pages
}
))
# νŽ˜μ΄μ§€ 정보 μš”μ•½ 좜λ ₯
if docs:
page_numbers = [doc.metadata.get('page', 0) for doc in docs if doc.metadata.get('page')]
if page_numbers:
log(f"πŸ“‹ μΆ”μΆœλœ νŽ˜μ΄μ§€ λ²”μœ„: {min(page_numbers)} ~ {max(page_numbers)}")
log(f"πŸ“Š μΆ”μΆœλœ νŽ˜μ΄μ§€λ³„ PDF λ¬Έμ„œ: {len(docs)}개 (총 {total_pages}νŽ˜μ΄μ§€)")
return docs
# --------------------------------
# λ¬Έμ„œ λ‘œλ”© 및 λΆ„ν• 
# --------------------------------
def load_documents(folder_path):
documents = []
for file in glob.glob(os.path.join(folder_path, "*.hwpx")):
log(f"πŸ“„ HWPX 파일 확인: {file}")
docs = load_hwpx(file)
documents.extend(docs)
for file in glob.glob(os.path.join(folder_path, "*.pdf")):
log(f"πŸ“„ PDF 파일 확인: {file}")
documents.extend(load_pdf_with_metadata(file))
log(f"πŸ“š λ¬Έμ„œ λ‘œλ”© 전체 μ™„λ£Œ! 총 λ¬Έμ„œ 수: {len(documents)}")
return documents
def split_documents(documents, chunk_size=800, chunk_overlap=100):
log("πŸ”ͺ 청크 λΆ„ν•  μ‹œμž‘")
splitter = RecursiveCharacterTextSplitter(
chunk_size=chunk_size,
chunk_overlap=chunk_overlap,
length_function=len
)
chunks = []
for doc in documents:
split = splitter.split_text(doc.page_content)
for i, chunk in enumerate(split):
enriched_chunk = f"passage: {chunk}"
chunks.append(Document(
page_content=enriched_chunk,
metadata={**doc.metadata, "chunk_index": i}
))
log(f"βœ… 청크 λΆ„ν•  μ™„λ£Œ: 총 {len(chunks)}개 생성")
return chunks
# --------------------------------
# 메인 μ‹€ν–‰
# --------------------------------
if __name__ == "__main__":
folder = "dataset_test"
log("πŸš€ PyMuPDF 기반 λ¬Έμ„œ 처리 μ‹œμž‘")
docs = load_documents(folder)
log("πŸ“¦ λ¬Έμ„œ λ‘œλ”© μ™„λ£Œ")
# νŽ˜μ΄μ§€ 정보 확인
log("πŸ“„ νŽ˜μ΄μ§€ 정보 μš”μ•½:")
page_info = {}
for doc in docs:
source = doc.metadata.get('source', 'unknown')
page = doc.metadata.get('page', 'unknown')
doc_type = doc.metadata.get('type', 'unknown')
if source not in page_info:
page_info[source] = {'pages': set(), 'types': set()}
page_info[source]['pages'].add(page)
page_info[source]['types'].add(doc_type)
for source, info in page_info.items():
max_page = max(info['pages']) if info['pages'] and isinstance(max(info['pages']), int) else 'unknown'
log(f" πŸ“„ {os.path.basename(source)}: {max_page}νŽ˜μ΄μ§€, νƒ€μž…: {info['types']}")
chunks = split_documents(docs)
log("πŸ’‘ E5-Large-Instruct μž„λ² λ”© μ€€λΉ„ 쀑")
embedding_model = HuggingFaceEmbeddings(
model_name="intfloat/e5-large-v2",
model_kwargs={"device": "cuda"}
)
vectorstore = FAISS.from_documents(chunks, embedding_model)
vectorstore.save_local("vector_db")
log(f"πŸ“Š 전체 λ¬Έμ„œ 수: {len(docs)}")
log(f"πŸ”— 청크 총 수: {len(chunks)}")
log("βœ… FAISS μ €μž₯ μ™„λ£Œ: vector_db")
# νŽ˜μ΄μ§€ 정보가 ν¬ν•¨λœ μƒ˜ν”Œ 좜λ ₯
log("\nπŸ“‹ μ‹€μ œ νŽ˜μ΄μ§€ 정보 포함 μƒ˜ν”Œ:")
for i, chunk in enumerate(chunks[:5]):
meta = chunk.metadata
log(f" 청크 {i+1}: {meta.get('type')} | νŽ˜μ΄μ§€ {meta.get('page')} | {os.path.basename(meta.get('source', 'unknown'))}")