Tai Truong
fix readme
d202ada
import base64
import contextlib
import hashlib
import tempfile
from pathlib import Path
from typing import TYPE_CHECKING, Any
from fastapi import UploadFile
from platformdirs import user_cache_dir
if TYPE_CHECKING:
from langflow.api.v1.schemas import BuildStatus
CACHE: dict[str, Any] = {}
CACHE_DIR = user_cache_dir("langflow", "langflow")
PREFIX = "langflow_cache"
class CacheMiss:
def __repr__(self) -> str:
return "<CACHE_MISS>"
def __bool__(self) -> bool:
return False
def create_cache_folder(func):
def wrapper(*args, **kwargs):
# Get the destination folder
cache_path = Path(CACHE_DIR) / PREFIX
# Create the destination folder if it doesn't exist
cache_path.mkdir(parents=True, exist_ok=True)
return func(*args, **kwargs)
return wrapper
@create_cache_folder
def clear_old_cache_files(max_cache_size: int = 3) -> None:
cache_dir = Path(tempfile.gettempdir()) / PREFIX
cache_files = list(cache_dir.glob("*.dill"))
if len(cache_files) > max_cache_size:
cache_files_sorted_by_mtime = sorted(cache_files, key=lambda x: x.stat().st_mtime, reverse=True)
for cache_file in cache_files_sorted_by_mtime[max_cache_size:]:
with contextlib.suppress(OSError):
cache_file.unlink()
def filter_json(json_data):
filtered_data = json_data.copy()
# Remove 'viewport' and 'chatHistory' keys
if "viewport" in filtered_data:
del filtered_data["viewport"]
if "chatHistory" in filtered_data:
del filtered_data["chatHistory"]
# Filter nodes
if "nodes" in filtered_data:
for node in filtered_data["nodes"]:
if "position" in node:
del node["position"]
if "positionAbsolute" in node:
del node["positionAbsolute"]
if "selected" in node:
del node["selected"]
if "dragging" in node:
del node["dragging"]
return filtered_data
@create_cache_folder
def save_binary_file(content: str, file_name: str, accepted_types: list[str]) -> str:
"""Save a binary file to the specified folder.
Args:
content: The content of the file as a bytes object.
file_name: The name of the file, including its extension.
accepted_types: A list of accepted file types.
Returns:
The path to the saved file.
"""
if not any(file_name.endswith(suffix) for suffix in accepted_types):
msg = f"File {file_name} is not accepted"
raise ValueError(msg)
# Get the destination folder
cache_path = Path(CACHE_DIR) / PREFIX
if not content:
msg = "Please, reload the file in the loader."
raise ValueError(msg)
data = content.split(",")[1]
decoded_bytes = base64.b64decode(data)
# Create the full file path
file_path = cache_path / file_name
# Save the binary content to the file
file_path.write_bytes(decoded_bytes)
return str(file_path)
@create_cache_folder
def save_uploaded_file(file: UploadFile, folder_name):
"""Save an uploaded file to the specified folder with a hash of its content as the file name.
Args:
file: The uploaded file object.
folder_name: The name of the folder to save the file in.
Returns:
The path to the saved file.
"""
cache_path = Path(CACHE_DIR)
folder_path = cache_path / folder_name
filename = file.filename
file_extension = Path(filename).suffix if isinstance(filename, str | Path) else ""
file_object = file.file
# Create the folder if it doesn't exist
if not folder_path.exists():
folder_path.mkdir()
# Create a hash of the file content
sha256_hash = hashlib.sha256()
# Reset the file cursor to the beginning of the file
file_object.seek(0)
# Iterate over the uploaded file in small chunks to conserve memory
while chunk := file_object.read(8192): # Read 8KB at a time (adjust as needed)
sha256_hash.update(chunk)
# Use the hex digest of the hash as the file name
hex_dig = sha256_hash.hexdigest()
file_name = f"{hex_dig}{file_extension}"
# Reset the file cursor to the beginning of the file
file_object.seek(0)
# Save the file with the hash as its name
file_path = folder_path / file_name
with file_path.open("wb") as new_file:
while chunk := file_object.read(8192):
new_file.write(chunk)
return file_path
def update_build_status(cache_service, flow_id: str, status: "BuildStatus") -> None:
cached_flow = cache_service[flow_id]
if cached_flow is None:
msg = f"Flow {flow_id} not found in cache"
raise ValueError(msg)
cached_flow["status"] = status
cache_service[flow_id] = cached_flow
cached_flow["status"] = status
cache_service[flow_id] = cached_flow
CACHE_MISS = CacheMiss()