Spaces:
Running
Running
from langflow.base.data import BaseFileComponent | |
from langflow.base.data.utils import TEXT_FILE_TYPES, parallel_load_data, parse_text_file_to_data | |
from langflow.io import BoolInput, IntInput | |
from langflow.schema import Data | |
class FileComponent(BaseFileComponent): | |
"""Handles loading and processing of individual or zipped text files. | |
This component supports processing multiple valid files within a zip archive, | |
resolving paths, validating file types, and optionally using multithreading for processing. | |
""" | |
display_name = "File" | |
description = "Load a file to be used in your project." | |
icon = "file-text" | |
name = "File" | |
VALID_EXTENSIONS = TEXT_FILE_TYPES | |
inputs = [ | |
*BaseFileComponent._base_inputs, | |
BoolInput( | |
name="use_multithreading", | |
display_name="[Deprecated] Use Multithreading", | |
advanced=True, | |
value=True, | |
info="Set 'Processing Concurrency' greater than 1 to enable multithreading.", | |
), | |
IntInput( | |
name="concurrency_multithreading", | |
display_name="Processing Concurrency", | |
advanced=False, | |
info="When multiple files are being processed, the number of files to process concurrently.", | |
value=1, | |
), | |
] | |
outputs = [ | |
*BaseFileComponent._base_outputs, | |
] | |
def process_files(self, file_list: list[BaseFileComponent.BaseFile]) -> list[BaseFileComponent.BaseFile]: | |
"""Processes files either sequentially or in parallel, depending on concurrency settings. | |
Args: | |
file_list (list[BaseFileComponent.BaseFile]): List of files to process. | |
Returns: | |
list[BaseFileComponent.BaseFile]: Updated list of files with merged data. | |
""" | |
def process_file(file_path: str, *, silent_errors: bool = False) -> Data | None: | |
"""Processes a single file and returns its Data object.""" | |
try: | |
return parse_text_file_to_data(file_path, silent_errors=silent_errors) | |
except FileNotFoundError as e: | |
msg = f"File not found: {file_path}. Error: {e}" | |
self.log(msg) | |
if not silent_errors: | |
raise | |
return None | |
except Exception as e: | |
msg = f"Unexpected error processing {file_path}: {e}" | |
self.log(msg) | |
if not silent_errors: | |
raise | |
return None | |
if not file_list: | |
self.log("No files to process.") | |
return file_list | |
concurrency = 1 if not self.use_multithreading else max(1, self.concurrency_multithreading) | |
file_count = len(file_list) | |
parallel_processing_threshold = 2 | |
if concurrency < parallel_processing_threshold or file_count < parallel_processing_threshold: | |
if file_count > 1: | |
self.log(f"Processing {file_count} files sequentially.") | |
processed_data = [process_file(str(file.path), silent_errors=self.silent_errors) for file in file_list] | |
else: | |
self.log(f"Starting parallel processing of {file_count} files with concurrency: {concurrency}.") | |
file_paths = [str(file.path) for file in file_list] | |
processed_data = parallel_load_data( | |
file_paths, | |
silent_errors=self.silent_errors, | |
load_function=process_file, | |
max_concurrency=concurrency, | |
) | |
# Use rollup_basefile_data to merge processed data with BaseFile objects | |
return self.rollup_data(file_list, processed_data) | |