Spaces:
Running
Running
import asyncio | |
import re | |
import shutil | |
import tempfile | |
from contextlib import suppress | |
from io import BytesIO | |
from pathlib import Path | |
from unittest.mock import MagicMock | |
import pytest | |
from asgi_lifespan import LifespanManager | |
from httpx import ASGITransport, AsyncClient | |
from langflow.services.deps import get_storage_service | |
from langflow.services.storage.service import StorageService | |
from sqlmodel import Session | |
def mock_storage_service(): | |
# Create a mock instance of StorageService | |
service = MagicMock(spec=StorageService) | |
# Setup mock behaviors for the service methods as needed | |
service.save_file.return_value = None | |
service.get_file.return_value = b"file content" # Binary content for files | |
service.list_files.return_value = ["file1.txt", "file2.jpg"] | |
service.delete_file.return_value = None | |
# Mock the settings service with proper max_file_size_upload attribute | |
settings_mock = MagicMock() | |
settings_mock.settings = MagicMock() | |
settings_mock.settings.max_file_size_upload = 1 # Default 1MB limit | |
service.settings_service = settings_mock | |
return service | |
async def files_client_fixture( | |
session: Session, # noqa: ARG001 | |
monkeypatch, | |
request, | |
load_flows_dir, | |
mock_storage_service, | |
): | |
# Set the database url to a test database | |
if "noclient" in request.keywords: | |
yield | |
else: | |
def init_app(): | |
db_dir = tempfile.mkdtemp() | |
db_path = Path(db_dir) / "test.db" | |
monkeypatch.setenv("LANGFLOW_DATABASE_URL", f"sqlite:///{db_path}") | |
monkeypatch.setenv("LANGFLOW_AUTO_LOGIN", "false") | |
if "load_flows" in request.keywords: | |
shutil.copyfile( | |
pytest.BASIC_EXAMPLE_PATH, Path(load_flows_dir) / "c54f9130-f2fa-4a3e-b22a-3856d946351b.json" | |
) | |
monkeypatch.setenv("LANGFLOW_LOAD_FLOWS_PATH", load_flows_dir) | |
monkeypatch.setenv("LANGFLOW_AUTO_LOGIN", "true") | |
from langflow.main import create_app | |
app = create_app() | |
return app, db_path | |
app, db_path = await asyncio.to_thread(init_app) | |
app.dependency_overrides[get_storage_service] = lambda: mock_storage_service | |
async with ( | |
LifespanManager(app, startup_timeout=None, shutdown_timeout=None) as manager, | |
AsyncClient(transport=ASGITransport(app=manager.app), base_url="http://testserver/") as client, | |
): | |
yield client | |
# app.dependency_overrides.clear() | |
monkeypatch.undo() | |
# clear the temp db | |
with suppress(FileNotFoundError): | |
db_path.unlink() | |
async def max_file_size_upload_fixture(monkeypatch): | |
monkeypatch.setenv("LANGFLOW_MAX_FILE_SIZE_UPLOAD", "1") | |
yield | |
monkeypatch.undo() | |
async def max_file_size_upload_10mb_fixture(monkeypatch): | |
monkeypatch.setenv("LANGFLOW_MAX_FILE_SIZE_UPLOAD", "10") | |
yield | |
monkeypatch.undo() | |
async def test_upload_file(files_client, created_api_key, flow): | |
headers = {"x-api-key": created_api_key.api_key} | |
response = await files_client.post( | |
f"api/v1/files/upload/{flow.id}", | |
files={"file": ("test.txt", b"test content")}, | |
headers=headers, | |
) | |
assert response.status_code == 201 | |
response_json = response.json() | |
assert response_json["flowId"] == str(flow.id) | |
# Check that the file_path matches the expected pattern | |
file_path_pattern = re.compile(rf"{flow.id}/\d{{4}}-\d{{2}}-\d{{2}}_\d{{2}}-\d{{2}}-\d{{2}}_test\.txt") | |
assert file_path_pattern.match(response_json["file_path"]) | |
async def test_download_file(files_client, created_api_key, flow): | |
headers = {"x-api-key": created_api_key.api_key} | |
response = await files_client.get(f"api/v1/files/download/{flow.id}/test.txt", headers=headers) | |
assert response.status_code == 200 | |
assert response.content == b"file content" | |
async def test_list_files(files_client, created_api_key, flow): | |
headers = {"x-api-key": created_api_key.api_key} | |
response = await files_client.get(f"api/v1/files/list/{flow.id}", headers=headers) | |
assert response.status_code == 200 | |
assert response.json() == {"files": ["file1.txt", "file2.jpg"]} | |
async def test_delete_file(files_client, created_api_key, flow): | |
headers = {"x-api-key": created_api_key.api_key} | |
response = await files_client.delete(f"api/v1/files/delete/{flow.id}/test.txt", headers=headers) | |
assert response.status_code == 200 | |
assert response.json() == {"message": "File test.txt deleted successfully"} | |
async def test_file_operations(client, created_api_key, flow): | |
headers = {"x-api-key": created_api_key.api_key} | |
flow_id = flow.id | |
file_name = "test.txt" | |
file_content = b"Hello, world!" | |
# Step 1: Upload the file | |
response = await client.post( | |
f"api/v1/files/upload/{flow_id}", | |
files={"file": (file_name, file_content)}, | |
headers=headers, | |
) | |
assert response.status_code == 201 | |
response_json = response.json() | |
assert response_json["flowId"] == str(flow_id) | |
# Check that the file_path matches the expected pattern | |
file_path_pattern = re.compile(rf"{flow_id}/\d{{4}}-\d{{2}}-\d{{2}}_\d{{2}}-\d{{2}}-\d{{2}}_{file_name}") | |
assert file_path_pattern.match(response_json["file_path"]) | |
# Extract the full file name with timestamp from the response | |
full_file_name = response_json["file_path"].split("/")[-1] | |
# Step 2: List files in the folder | |
response = await client.get(f"api/v1/files/list/{flow_id}", headers=headers) | |
assert response.status_code == 200 | |
assert full_file_name in response.json()["files"] | |
# Step 3: Download the file and verify its content | |
response = await client.get(f"api/v1/files/download/{flow_id}/{full_file_name}", headers=headers) | |
assert response.status_code == 200 | |
assert response.content == file_content | |
assert response.headers["content-type"] == "application/octet-stream" | |
# Step 4: Delete the file | |
response = await client.delete(f"api/v1/files/delete/{flow_id}/{full_file_name}", headers=headers) | |
assert response.status_code == 200 | |
assert response.json() == {"message": f"File {full_file_name} deleted successfully"} | |
# Verify that the file is indeed deleted | |
response = await client.get(f"api/v1/files/list/{flow_id}", headers=headers) | |
assert full_file_name not in response.json()["files"] | |
async def test_upload_file_size_limit(files_client, created_api_key, flow): | |
headers = {"x-api-key": created_api_key.api_key} | |
# Test file under the limit (500KB) | |
small_content = b"x" * (500 * 1024) | |
small_file = ("small_file.txt", small_content, "application/octet-stream") | |
headers["Content-Length"] = str(len(small_content)) | |
response = await files_client.post( | |
f"api/v1/files/upload/{flow.id}", | |
files={"file": small_file}, | |
headers=headers, | |
) | |
assert response.status_code == 201, f"Expected 201, got {response.status_code}: {response.json()}" | |
# Test file over the limit (1MB + 1KB) | |
large_content = b"x" * (1024 * 1024 + 1024) | |
bio = BytesIO(large_content) | |
headers["Content-Length"] = str(len(large_content)) | |
response = await files_client.post( | |
f"api/v1/files/upload/{flow.id}", | |
files={"file": ("large_file.txt", bio, "application/octet-stream")}, | |
headers=headers, | |
) | |
assert response.status_code == 413, f"Expected 413, got {response.status_code}: {response.json()}" | |
assert "Content size limit exceeded. Maximum allowed is 1MB and got 1.001MB." in response.json()["detail"] | |