petrojm's picture
changes to app and sambaparse
2c44136
import os
import yaml
import subprocess
import json
import logging
from typing import Dict, Optional, List, Tuple, Union, Any
from dotenv import load_dotenv
from langchain.docstore.document import Document
import shutil
from langchain_community.document_loaders import PyMuPDFLoader
from langchain_text_splitters import RecursiveCharacterTextSplitter
load_dotenv()
logging.basicConfig(
level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s"
)
logger = logging.getLogger(__name__)
class SambaParse:
def __init__(self, config_path: str):
with open(config_path, "r") as file:
self.config = yaml.safe_load(file)
# Set the default Unstructured API key as an environment variable if not already set
if "UNSTRUCTURED_API_KEY" not in os.environ:
default_api_key = self.config.get("partitioning", {}).get("default_unstructured_api_key")
if default_api_key:
os.environ["UNSTRUCTURED_API_KEY"] = default_api_key
def run_ingest(
self,
source_type: str,
input_path: Optional[str] = None,
additional_metadata: Optional[Dict] = None,
) -> Tuple[List[str], List[Dict], List[Document]]:
"""
Runs the ingest process for the specified source type and input path.
Args:
source_type (str): The type of source to ingest (e.g., 'local', 'confluence', 'github', 'google-drive').
input_path (Optional[str]): The input path for the source (only required for 'local' source type).
additional_metadata (Optional[Dict]): Additional metadata to include in the processed documents.
Returns:
Tuple[List[str], List[Dict], List[Document]]: A tuple containing the extracted texts, metadata, and LangChain documents.
"""
if not self.config["partitioning"]["partition_by_api"]:
return self._run_ingest_pymupdf(input_path, additional_metadata)
output_dir = self.config["processor"]["output_dir"]
# Create the output directory if it doesn't exist
os.makedirs(output_dir, exist_ok=True)
# Delete contents of the output directory using shell command
del_command = f"rm -rf {output_dir}/*"
logger.info(f"Deleting contents of output directory: {output_dir}")
subprocess.run(del_command, shell=True, check=True)
command = [
"unstructured-ingest",
source_type,
"--output-dir",
output_dir,
"--num-processes",
str(self.config["processor"]["num_processes"]),
]
if self.config["processor"]["reprocess"] == True:
command.extend(["--reprocess"])
# Add partition arguments
command.extend(
[
"--strategy",
self.config["partitioning"]["strategy"],
"--ocr-languages",
",".join(self.config["partitioning"]["ocr_languages"]),
"--encoding",
self.config["partitioning"]["encoding"],
"--fields-include",
",".join(self.config["partitioning"]["fields_include"]),
"--metadata-exclude",
",".join(self.config["partitioning"]["metadata_exclude"]),
"--metadata-include",
",".join(self.config["partitioning"]["metadata_include"]),
]
)
if self.config["partitioning"]["skip_infer_table_types"]:
command.extend(
[
"--skip-infer-table-types",
",".join(self.config["partitioning"]["skip_infer_table_types"]),
]
)
if self.config["partitioning"]["flatten_metadata"]:
command.append("--flatten-metadata")
if source_type == "local":
if input_path is None:
raise ValueError("Input path is required for local source type.")
command.extend(["--input-path", f'"{input_path}"'])
if self.config["sources"]["local"]["recursive"]:
command.append("--recursive")
elif source_type == "confluence":
command.extend(
[
"--url",
self.config["sources"]["confluence"]["url"],
"--user-email",
self.config["sources"]["confluence"]["user_email"],
"--api-token",
self.config["sources"]["confluence"]["api_token"],
]
)
elif source_type == "github":
command.extend(
[
"--url",
self.config["sources"]["github"]["url"],
"--git-branch",
self.config["sources"]["github"]["branch"],
]
)
elif source_type == "google-drive":
command.extend(
[
"--drive-id",
self.config["sources"]["google_drive"]["drive_id"],
"--service-account-key",
self.config["sources"]["google_drive"]["service_account_key"],
]
)
if self.config["sources"]["google_drive"]["recursive"]:
command.append("--recursive")
else:
raise ValueError(f"Unsupported source type: {source_type}")
if self.config["processor"]["verbose"]:
command.append("--verbose")
if self.config["partitioning"]["partition_by_api"]:
api_key = os.getenv("UNSTRUCTURED_API_KEY")
partition_endpoint_url = f"{self.config['partitioning']['partition_endpoint']}:{self.config['partitioning']['unstructured_port']}"
if api_key:
command.extend(["--partition-by-api", "--api-key", api_key])
command.extend(["--partition-endpoint", partition_endpoint_url])
else:
logger.warning("No Unstructured API key available. Partitioning by API will be skipped.")
if self.config["partitioning"]["strategy"] == "hi_res":
if (
"hi_res_model_name" in self.config["partitioning"]
and self.config["partitioning"]["hi_res_model_name"]
):
command.extend(
[
"--hi-res-model-name",
self.config["partitioning"]["hi_res_model_name"],
]
)
logger.warning(
"You've chosen the high-resolution partitioning strategy. Grab a cup of coffee or tea while you wait, as this may take some time due to OCR and table detection."
)
if self.config["chunking"]["enabled"]:
command.extend(
[
"--chunking-strategy",
self.config["chunking"]["strategy"],
"--chunk-max-characters",
str(self.config["chunking"]["chunk_max_characters"]),
"--chunk-overlap",
str(self.config["chunking"]["chunk_overlap"]),
]
)
if self.config["chunking"]["strategy"] == "by_title":
command.extend(
[
"--chunk-combine-text-under-n-chars",
str(self.config["chunking"]["combine_under_n_chars"]),
]
)
if self.config["embedding"]["enabled"]:
command.extend(
[
"--embedding-provider",
self.config["embedding"]["provider"],
"--embedding-model-name",
self.config["embedding"]["model_name"],
]
)
if self.config["destination_connectors"]["enabled"]:
destination_type = self.config["destination_connectors"]["type"]
if destination_type == "chroma":
command.extend(
[
"chroma",
"--host",
self.config["destination_connectors"]["chroma"]["host"],
"--port",
str(self.config["destination_connectors"]["chroma"]["port"]),
"--collection-name",
self.config["destination_connectors"]["chroma"][
"collection_name"
],
"--tenant",
self.config["destination_connectors"]["chroma"]["tenant"],
"--database",
self.config["destination_connectors"]["chroma"]["database"],
"--batch-size",
str(self.config["destination_connectors"]["batch_size"]),
]
)
elif destination_type == "qdrant":
command.extend(
[
"qdrant",
"--location",
self.config["destination_connectors"]["qdrant"]["location"],
"--collection-name",
self.config["destination_connectors"]["qdrant"][
"collection_name"
],
"--batch-size",
str(self.config["destination_connectors"]["batch_size"]),
]
)
else:
raise ValueError(
f"Unsupported destination connector type: {destination_type}"
)
command_str = " ".join(command)
logger.info(f"Running command: {command_str}")
logger.info(
"This may take some time depending on the size of your data. Please be patient..."
)
subprocess.run(command_str, shell=True, check=True)
logger.info("Ingest process completed successfully!")
# Call the additional processing function if enabled
if self.config["additional_processing"]["enabled"]:
logger.info("Performing additional processing...")
texts, metadata_list, langchain_docs = additional_processing(
directory=output_dir,
extend_metadata=self.config["additional_processing"]["extend_metadata"],
additional_metadata=additional_metadata,
replace_table_text=self.config["additional_processing"][
"replace_table_text"
],
table_text_key=self.config["additional_processing"]["table_text_key"],
return_langchain_docs=self.config["additional_processing"][
"return_langchain_docs"
],
convert_metadata_keys_to_string=self.config["additional_processing"][
"convert_metadata_keys_to_string"
],
)
logger.info("Additional processing completed.")
return texts, metadata_list, langchain_docs
def _run_ingest_pymupdf(
self, input_path: str, additional_metadata: Optional[Dict] = None
) -> Tuple[List[str], List[Dict], List[Document]]:
"""
Runs the ingest process using PyMuPDF via LangChain.
Args:
input_path (str): The input path for the source.
additional_metadata (Optional[Dict]): Additional metadata to include in the processed documents.
Returns:
Tuple[List[str], List[Dict], List[Document]]: A tuple containing the extracted texts, metadata, and LangChain documents.
"""
if not input_path:
raise ValueError("Input path is required for PyMuPDF processing.")
texts = []
metadata_list = []
langchain_docs = []
if os.path.isfile(input_path):
file_paths = [input_path]
else:
file_paths = [
os.path.join(input_path, f)
for f in os.listdir(input_path)
if f.lower().endswith('.pdf')
]
for file_path in file_paths:
loader = PyMuPDFLoader(file_path)
docs = loader.load()
splitter = RecursiveCharacterTextSplitter(
chunk_size=1000,
chunk_overlap=200,
length_function=len,
separators=['\n\n', '\n', ' ', ''],
is_separator_regex=False,
)
docs = splitter.split_documents(docs)
for doc in docs:
text = doc.page_content
metadata = doc.metadata
# Add 'filename' key to metadata
metadata['filename'] = os.path.basename(metadata['source'])
if additional_metadata:
metadata.update(additional_metadata)
texts.append(text)
metadata_list.append(metadata)
langchain_docs.append(doc)
return texts, metadata_list, langchain_docs
def convert_to_string(value: Union[List, Tuple, Dict, Any]) -> str:
"""
Convert a value to its string representation.
Args:
value (Union[List, Tuple, Dict, Any]): The value to be converted to a string.
Returns:
str: The string representation of the value.
"""
if isinstance(value, (list, tuple)):
return ", ".join(map(str, value))
elif isinstance(value, dict):
return json.dumps(value)
else:
return str(value)
def additional_processing(
directory: str,
extend_metadata: bool,
additional_metadata: Optional[Dict],
replace_table_text: bool,
table_text_key: str,
return_langchain_docs: bool,
convert_metadata_keys_to_string: bool,
):
"""
Performs additional processing on the extracted documents.
Args:
directory (str): The directory containing the extracted JSON files.
extend_metadata (bool): Whether to extend the metadata with additional metadata.
additional_metadata (Optional[Dict]): Additional metadata to include in the processed documents.
replace_table_text (bool): Whether to replace table text with the specified table text key.
table_text_key (str): The key to use for replacing table text.
return_langchain_docs (bool): Whether to return LangChain documents.
convert_metadata_keys_to_string (bool): Whether to convert non-string metadata keys to string.
Returns:
Tuple[List[str], List[Dict], List[Document]]: A tuple containing the extracted texts, metadata, and LangChain documents.
"""
if os.path.isfile(directory):
file_paths = [directory]
else:
file_paths = [
os.path.join(directory, f)
for f in os.listdir(directory)
if f.endswith(".json")
]
texts = []
metadata_list = []
langchain_docs = []
for file_path in file_paths:
with open(file_path, "r") as file:
data = json.load(file)
for element in data:
if extend_metadata and additional_metadata:
element["metadata"].update(additional_metadata)
if replace_table_text and element["type"] == "Table":
element["text"] = element["metadata"][table_text_key]
metadata = element["metadata"].copy()
if convert_metadata_keys_to_string:
metadata = {
str(key): convert_to_string(value)
for key, value in metadata.items()
}
for key in element:
if key not in ["text", "metadata", "embeddings"]:
metadata[key] = element[key]
if "page_number" in metadata:
metadata["page"] = metadata["page_number"]
else:
metadata["page"] = 1
metadata_list.append(metadata)
texts.append(element["text"])
if return_langchain_docs:
langchain_docs.extend(get_langchain_docs(texts, metadata_list))
with open(file_path, "w") as file:
json.dump(data, file, indent=2)
return texts, metadata_list, langchain_docs
def get_langchain_docs(texts: List[str], metadata_list: List[Dict]) -> List[Document]:
"""
Creates LangChain documents from the extracted texts and metadata.
Args:
texts (List[str]): The extracted texts.
metadata_list (List[Dict]): The metadata associated with each text.
Returns:
List[Document]: A list of LangChain documents.
"""
return [
Document(page_content=content, metadata=metadata)
for content, metadata in zip(texts, metadata_list)
]
def parse_doc_universal(
doc: str, additional_metadata: Optional[Dict] = None, source_type: str = "local"
) -> Tuple[List[str], List[Dict], List[Document]]:
"""
Extract text, tables, images, and metadata from a document or a folder of documents.
Args:
doc (str): Path to the document or folder of documents.
additional_metadata (Optional[Dict], optional): Additional metadata to include in the processed documents.
Defaults to an empty dictionary.
source_type (str, optional): The type of source to ingest. Defaults to 'local'.
Returns:
Tuple[List[str], List[Dict], List[Document]]: A tuple containing:
- A list of extracted text per page.
- A list of extracted metadata per page.
- A list of LangChain documents.
"""
if additional_metadata is None:
additional_metadata = {}
# Get the directory of the current file
current_dir = os.path.dirname(os.path.abspath(__file__))
# Join the current directory with the relative path of the config file
config_path = os.path.join(current_dir, "config.yaml")
wrapper = SambaParse(config_path)
def process_file(file_path):
if file_path.lower().endswith('.pdf'):
return wrapper._run_ingest_pymupdf(file_path, additional_metadata)
else:
# Use the original method for non-PDF files
return wrapper.run_ingest(source_type, input_path=file_path, additional_metadata=additional_metadata)
if os.path.isfile(doc):
return process_file(doc)
else:
all_texts, all_metadata, all_docs = [], [], []
for root, _, files in os.walk(doc):
for file in files:
file_path = os.path.join(root, file)
texts, metadata_list, langchain_docs = process_file(file_path)
all_texts.extend(texts)
all_metadata.extend(metadata_list)
all_docs.extend(langchain_docs)
return all_texts, all_metadata, all_docs
def parse_doc_streamlit(docs: List,
kit_dir: str,
additional_metadata: Optional[Dict] = None,
) -> List[Document]:
"""
Parse the uploaded documents and return a list of LangChain documents.
Args:
docs (List[UploadFile]): A list of uploaded files.
kit_dir (str): The directory of the current kit.
additional_metadata (Optional[Dict], optional): Additional metadata to include in the processed documents.
Defaults to an empty dictionary.
Returns:
List[Document]: A list of LangChain documents.
"""
if additional_metadata is None:
additional_metadata = {}
# Create the data/tmp folder if it doesn't exist
temp_folder = os.path.join(kit_dir, "data/tmp")
if not os.path.exists(temp_folder):
os.makedirs(temp_folder)
else:
# If there are already files there, delete them
for filename in os.listdir(temp_folder):
file_path = os.path.join(temp_folder, filename)
try:
if os.path.isfile(file_path) or os.path.islink(file_path):
os.unlink(file_path)
elif os.path.isdir(file_path):
shutil.rmtree(file_path)
except Exception as e:
print(f'Failed to delete {file_path}. Reason: {e}')
# Save all selected files to the tmp dir with their file names
for doc in docs:
temp_file = os.path.join(temp_folder, doc.name)
with open(temp_file, "wb") as f:
f.write(doc.getvalue())
# Pass in the temp folder for processing into the parse_doc_universal function
_, _, langchain_docs = parse_doc_universal(doc=temp_folder, additional_metadata=additional_metadata)
return langchain_docs