Tai Truong
fix readme
d202ada
from langflow.base.data import BaseFileComponent
from langflow.base.data.utils import TEXT_FILE_TYPES, parallel_load_data, parse_text_file_to_data
from langflow.io import BoolInput, IntInput
from langflow.schema import Data
class FileComponent(BaseFileComponent):
"""Handles loading and processing of individual or zipped text files.
This component supports processing multiple valid files within a zip archive,
resolving paths, validating file types, and optionally using multithreading for processing.
"""
display_name = "File"
description = "Load a file to be used in your project."
icon = "file-text"
name = "File"
VALID_EXTENSIONS = TEXT_FILE_TYPES
inputs = [
*BaseFileComponent._base_inputs,
BoolInput(
name="use_multithreading",
display_name="[Deprecated] Use Multithreading",
advanced=True,
value=True,
info="Set 'Processing Concurrency' greater than 1 to enable multithreading.",
),
IntInput(
name="concurrency_multithreading",
display_name="Processing Concurrency",
advanced=False,
info="When multiple files are being processed, the number of files to process concurrently.",
value=1,
),
]
outputs = [
*BaseFileComponent._base_outputs,
]
def process_files(self, file_list: list[BaseFileComponent.BaseFile]) -> list[BaseFileComponent.BaseFile]:
"""Processes files either sequentially or in parallel, depending on concurrency settings.
Args:
file_list (list[BaseFileComponent.BaseFile]): List of files to process.
Returns:
list[BaseFileComponent.BaseFile]: Updated list of files with merged data.
"""
def process_file(file_path: str, *, silent_errors: bool = False) -> Data | None:
"""Processes a single file and returns its Data object."""
try:
return parse_text_file_to_data(file_path, silent_errors=silent_errors)
except FileNotFoundError as e:
msg = f"File not found: {file_path}. Error: {e}"
self.log(msg)
if not silent_errors:
raise
return None
except Exception as e:
msg = f"Unexpected error processing {file_path}: {e}"
self.log(msg)
if not silent_errors:
raise
return None
if not file_list:
self.log("No files to process.")
return file_list
concurrency = 1 if not self.use_multithreading else max(1, self.concurrency_multithreading)
file_count = len(file_list)
parallel_processing_threshold = 2
if concurrency < parallel_processing_threshold or file_count < parallel_processing_threshold:
if file_count > 1:
self.log(f"Processing {file_count} files sequentially.")
processed_data = [process_file(str(file.path), silent_errors=self.silent_errors) for file in file_list]
else:
self.log(f"Starting parallel processing of {file_count} files with concurrency: {concurrency}.")
file_paths = [str(file.path) for file in file_list]
processed_data = parallel_load_data(
file_paths,
silent_errors=self.silent_errors,
load_function=process_file,
max_concurrency=concurrency,
)
# Use rollup_basefile_data to merge processed data with BaseFile objects
return self.rollup_data(file_list, processed_data)