from langflow.base.data import BaseFileComponent from langflow.base.data.utils import TEXT_FILE_TYPES, parallel_load_data, parse_text_file_to_data from langflow.io import BoolInput, IntInput from langflow.schema import Data class FileComponent(BaseFileComponent): """Handles loading and processing of individual or zipped text files. This component supports processing multiple valid files within a zip archive, resolving paths, validating file types, and optionally using multithreading for processing. """ display_name = "File" description = "Load a file to be used in your project." icon = "file-text" name = "File" VALID_EXTENSIONS = TEXT_FILE_TYPES inputs = [ *BaseFileComponent._base_inputs, BoolInput( name="use_multithreading", display_name="[Deprecated] Use Multithreading", advanced=True, value=True, info="Set 'Processing Concurrency' greater than 1 to enable multithreading.", ), IntInput( name="concurrency_multithreading", display_name="Processing Concurrency", advanced=False, info="When multiple files are being processed, the number of files to process concurrently.", value=1, ), ] outputs = [ *BaseFileComponent._base_outputs, ] def process_files(self, file_list: list[BaseFileComponent.BaseFile]) -> list[BaseFileComponent.BaseFile]: """Processes files either sequentially or in parallel, depending on concurrency settings. Args: file_list (list[BaseFileComponent.BaseFile]): List of files to process. Returns: list[BaseFileComponent.BaseFile]: Updated list of files with merged data. """ def process_file(file_path: str, *, silent_errors: bool = False) -> Data | None: """Processes a single file and returns its Data object.""" try: return parse_text_file_to_data(file_path, silent_errors=silent_errors) except FileNotFoundError as e: msg = f"File not found: {file_path}. Error: {e}" self.log(msg) if not silent_errors: raise return None except Exception as e: msg = f"Unexpected error processing {file_path}: {e}" self.log(msg) if not silent_errors: raise return None if not file_list: self.log("No files to process.") return file_list concurrency = 1 if not self.use_multithreading else max(1, self.concurrency_multithreading) file_count = len(file_list) parallel_processing_threshold = 2 if concurrency < parallel_processing_threshold or file_count < parallel_processing_threshold: if file_count > 1: self.log(f"Processing {file_count} files sequentially.") processed_data = [process_file(str(file.path), silent_errors=self.silent_errors) for file in file_list] else: self.log(f"Starting parallel processing of {file_count} files with concurrency: {concurrency}.") file_paths = [str(file.path) for file in file_list] processed_data = parallel_load_data( file_paths, silent_errors=self.silent_errors, load_function=process_file, max_concurrency=concurrency, ) # Use rollup_basefile_data to merge processed data with BaseFile objects return self.rollup_data(file_list, processed_data)