Spaces:
Running
Running
from datetime import datetime, timezone | |
import pytest | |
from httpx import AsyncClient | |
from langflow.services.auth.utils import create_super_user, get_password_hash | |
from langflow.services.database.models.user import UserUpdate | |
from langflow.services.database.models.user.model import User | |
from langflow.services.database.utils import async_session_getter | |
from langflow.services.deps import get_db_service, get_settings_service | |
from sqlmodel import select | |
async def super_user(client): # noqa: ARG001 | |
settings_manager = get_settings_service() | |
auth_settings = settings_manager.auth_settings | |
async with async_session_getter(get_db_service()) as db: | |
return await create_super_user( | |
db=db, | |
username=auth_settings.SUPERUSER, | |
password=auth_settings.SUPERUSER_PASSWORD, | |
) | |
async def super_user_headers( | |
client: AsyncClient, | |
super_user, # noqa: ARG001 | |
): | |
settings_service = get_settings_service() | |
auth_settings = settings_service.auth_settings | |
login_data = { | |
"username": auth_settings.SUPERUSER, | |
"password": auth_settings.SUPERUSER_PASSWORD, | |
} | |
response = await client.post("api/v1/login", data=login_data) | |
assert response.status_code == 200 | |
tokens = response.json() | |
a_token = tokens["access_token"] | |
return {"Authorization": f"Bearer {a_token}"} | |
async def deactivated_user(client): # noqa: ARG001 | |
async with async_session_getter(get_db_service()) as session: | |
user = User( | |
username="deactivateduser", | |
password=get_password_hash("testpassword"), | |
is_active=False, | |
is_superuser=False, | |
last_login_at=datetime.now(tz=timezone.utc), | |
) | |
session.add(user) | |
await session.commit() | |
await session.refresh(user) | |
return user | |
async def test_user_waiting_for_approval(client): | |
username = "waitingforapproval" | |
password = "testpassword" # noqa: S105 | |
# Debug: Check if the user already exists | |
async with async_session_getter(get_db_service()) as session: | |
stmt = select(User).where(User.username == username) | |
existing_user = (await session.exec(stmt)).first() | |
if existing_user: | |
pytest.fail( | |
f"User {username} already exists before the test. Database URL: {get_db_service().database_url}" | |
) | |
# Create a user that is not active and has never logged in | |
async with async_session_getter(get_db_service()) as session: | |
user = User( | |
username=username, | |
password=get_password_hash(password), | |
is_active=False, | |
last_login_at=None, | |
) | |
session.add(user) | |
await session.commit() | |
login_data = {"username": "waitingforapproval", "password": "testpassword"} | |
response = await client.post("api/v1/login", data=login_data) | |
assert response.status_code == 400 | |
assert response.json()["detail"] == "Waiting for approval" | |
# Debug: Check if the user still exists after the test | |
async with async_session_getter(get_db_service()) as session: | |
stmt = select(User).where(User.username == username) | |
existing_user = (await session.exec(stmt)).first() | |
if existing_user: | |
pass | |
else: | |
pytest.fail(f"User {username} does not exist after the test. This is unexpected.") | |
async def test_deactivated_user_cannot_login(client: AsyncClient, deactivated_user): | |
login_data = {"username": deactivated_user.username, "password": "testpassword"} | |
response = await client.post("api/v1/login", data=login_data) | |
assert response.status_code == 401, response.json() | |
assert response.json()["detail"] == "Inactive user", response.text | |
async def test_deactivated_user_cannot_access(client: AsyncClient, logged_in_headers): | |
# Assuming the headers for deactivated_user | |
response = await client.get("api/v1/users/", headers=logged_in_headers) | |
assert response.status_code == 403, response.status_code | |
assert response.json()["detail"] == "The user doesn't have enough privileges", response.text | |
async def test_data_consistency_after_update(client: AsyncClient, active_user, logged_in_headers, super_user_headers): | |
user_id = active_user.id | |
update_data = UserUpdate(is_active=False) | |
response = await client.patch(f"/api/v1/users/{user_id}", json=update_data.model_dump(), headers=super_user_headers) | |
assert response.status_code == 200, response.json() | |
# Fetch the updated user from the database | |
response = await client.get("api/v1/users/whoami", headers=logged_in_headers) | |
assert response.status_code == 401, response.json() | |
assert response.json()["detail"] == "User not found or is inactive." | |
async def test_data_consistency_after_delete(client: AsyncClient, test_user, super_user_headers): | |
user_id = test_user.get("id") | |
response = await client.delete(f"/api/v1/users/{user_id}", headers=super_user_headers) | |
assert response.status_code == 200, response.json() | |
# Attempt to fetch the deleted user from the database | |
response = await client.get("api/v1/users/", headers=super_user_headers) | |
assert response.status_code == 200 | |
assert all(user["id"] != user_id for user in response.json()["users"]) | |
async def test_inactive_user(client: AsyncClient): | |
# Create a user that is not active and has a last_login_at value | |
async with async_session_getter(get_db_service()) as session: | |
user = User( | |
username="inactiveuser", | |
password=get_password_hash("testpassword"), | |
is_active=False, | |
last_login_at=datetime(2023, 1, 1, 0, 0, 0, tzinfo=timezone.utc), | |
) | |
session.add(user) | |
await session.commit() | |
login_data = {"username": "inactiveuser", "password": "testpassword"} | |
response = await client.post("api/v1/login", data=login_data) | |
assert response.status_code == 401 | |
assert response.json()["detail"] == "Inactive user" | |
def test_add_user(test_user): | |
assert test_user["username"] == "testuser" | |
async def test_read_all_users(client: AsyncClient, super_user_headers): | |
response = await client.get("api/v1/users/", headers=super_user_headers) | |
assert response.status_code == 200, response.json() | |
assert isinstance(response.json()["users"], list) | |
async def test_normal_user_cant_read_all_users(client: AsyncClient, logged_in_headers): | |
response = await client.get("api/v1/users/", headers=logged_in_headers) | |
assert response.status_code == 403, response.json() | |
assert response.json() == {"detail": "The user doesn't have enough privileges"} | |
async def test_patch_user(client: AsyncClient, active_user, logged_in_headers): | |
user_id = active_user.id | |
update_data = UserUpdate( | |
username="newname", | |
) | |
response = await client.patch(f"/api/v1/users/{user_id}", json=update_data.model_dump(), headers=logged_in_headers) | |
assert response.status_code == 200, response.json() | |
update_data = UserUpdate( | |
profile_image="new_image", | |
) | |
response = await client.patch(f"/api/v1/users/{user_id}", json=update_data.model_dump(), headers=logged_in_headers) | |
assert response.status_code == 200, response.json() | |
async def test_patch_reset_password(client: AsyncClient, active_user, logged_in_headers): | |
user_id = active_user.id | |
update_data = UserUpdate( | |
password="newpassword", # noqa: S106 | |
) | |
response = await client.patch( | |
f"/api/v1/users/{user_id}/reset-password", | |
json=update_data.model_dump(), | |
headers=logged_in_headers, | |
) | |
assert response.status_code == 200, response.json() | |
# Now we need to test if the new password works | |
login_data = {"username": active_user.username, "password": "newpassword"} | |
response = await client.post("api/v1/login", data=login_data) | |
assert response.status_code == 200 | |
async def test_patch_user_wrong_id(client: AsyncClient, logged_in_headers): | |
user_id = "wrong_id" | |
update_data = UserUpdate( | |
username="newname", | |
) | |
response = await client.patch(f"/api/v1/users/{user_id}", json=update_data.model_dump(), headers=logged_in_headers) | |
assert response.status_code == 422, response.json() | |
json_response = response.json() | |
detail = json_response["detail"] | |
error = detail[0] | |
assert error["loc"] == ["path", "user_id"] | |
assert error["type"] == "uuid_parsing" | |
async def test_delete_user(client: AsyncClient, test_user, super_user_headers): | |
user_id = test_user["id"] | |
response = await client.delete(f"/api/v1/users/{user_id}", headers=super_user_headers) | |
assert response.status_code == 200 | |
assert response.json() == {"detail": "User deleted"} | |
async def test_delete_user_wrong_id(client: AsyncClient, super_user_headers): | |
user_id = "wrong_id" | |
response = await client.delete(f"/api/v1/users/{user_id}", headers=super_user_headers) | |
assert response.status_code == 422 | |
json_response = response.json() | |
detail = json_response["detail"] | |
error = detail[0] | |
assert error["loc"] == ["path", "user_id"] | |
assert error["type"] == "uuid_parsing" | |
async def test_normal_user_cant_delete_user(client: AsyncClient, test_user, logged_in_headers): | |
user_id = test_user["id"] | |
response = await client.delete(f"/api/v1/users/{user_id}", headers=logged_in_headers) | |
assert response.status_code == 403 | |
assert response.json() == {"detail": "The user doesn't have enough privileges"} | |