|
import os |
|
import re |
|
import tempfile |
|
import shutil |
|
import logging |
|
from pathlib import Path |
|
import time |
|
|
|
from huggingface_hub import ( |
|
create_repo, |
|
upload_folder, |
|
list_repo_files, |
|
whoami, |
|
hf_hub_download, |
|
delete_file as hf_delete_file, |
|
HfApi, |
|
create_pull_request as hf_create_pull_request, |
|
) |
|
from huggingface_hub.hf_api import CommitOperationDelete, CommitOperationAdd, CommitOperation |
|
from huggingface_hub.utils import HfHubHTTPError |
|
|
|
logging.basicConfig( |
|
level=logging.INFO, |
|
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s" |
|
) |
|
logger = logging.getLogger(__name__) |
|
|
|
def hf_add_space_comment(owner_repo,token,discussion,comment): |
|
result = HfApi(token=token).comment_discussion( |
|
repo_id=owner_repo, |
|
discussion_num=discussion, |
|
comment=comment |
|
) |
|
return result |
|
|
|
def _get_api_token(ui_token_from_textbox=None): |
|
env_token = os.getenv('HF_TOKEN') |
|
if ui_token_from_textbox: |
|
logger.debug("Using HF_TOKEN from UI textbox.") |
|
return ui_token_from_textbox.strip(), None |
|
if env_token: |
|
logger.debug("Using HF_TOKEN from environment variable.") |
|
return env_token, None |
|
logger.warning("Hugging Face API token not provided in UI or HF_TOKEN env var.") |
|
return None, "Error: Hugging Face API token not provided in UI or HF_TOKEN env var." |
|
|
|
def _determine_repo_id(ui_api_token_from_textbox, owner_ui, space_name_ui): |
|
if not space_name_ui: return None, "Error: Space Name cannot be empty." |
|
if "/" in space_name_ui: return None, "Error: Space Name should not contain '/'. Use Owner field for the owner part." |
|
|
|
final_owner = owner_ui |
|
error_message = None |
|
|
|
if not final_owner: |
|
logger.info("Owner not specified, attempting to auto-detect from token.") |
|
resolved_api_token, token_err = _get_api_token(ui_api_token_from_textbox) |
|
if token_err: return None, f"Error auto-detecting owner: {token_err}" |
|
if not resolved_api_token: return None, "Error: API token required for auto owner determination if Owner field is empty." |
|
try: |
|
user_info = whoami(token=resolved_api_token) |
|
if user_info and 'name' in user_info: |
|
final_owner = user_info['name'] |
|
logger.info(f"Auto-detected owner: {final_owner}") |
|
else: |
|
error_message = "Error: Could not retrieve username from token. Check token permissions or specify Owner." |
|
logger.error(error_message) |
|
except Exception as e: |
|
error_message = f"Error retrieving username from token: {str(e)}. Specify Owner or check token." |
|
logger.exception("Error retrieving username from token:") |
|
if error_message: return None, error_message |
|
|
|
if not final_owner: return None, "Error: Owner could not be determined. Please specify it in the Owner field." |
|
repo_id = f"{final_owner}/{space_name_ui}" |
|
logger.info(f"Determined repo_id: {repo_id}") |
|
return repo_id, None |
|
|
|
def parse_markdown(markdown_input): |
|
space_info = {"repo_name_md": "", "owner_md": "", "files": []} |
|
current_file_path = None |
|
current_file_content_lines = [] |
|
in_file_definition = False |
|
in_code_block = False |
|
file_parsing_errors = [] |
|
|
|
lines = markdown_input.strip().split("\n") |
|
|
|
cleaned_lines = [] |
|
for line_content_orig in lines: |
|
if line_content_orig.strip().startswith("# "): |
|
if line_content_orig.strip().startswith("# ### File:") or \ |
|
line_content_orig.strip().startswith("# ## File Structure") or \ |
|
line_content_orig.strip().startswith("# # Space:"): |
|
cleaned_lines.append(line_content_orig.strip()[2:]) |
|
else: |
|
cleaned_lines.append(line_content_orig) |
|
else: |
|
cleaned_lines.append(line_content_orig) |
|
|
|
lines = cleaned_lines |
|
|
|
|
|
for i, line_content_orig in enumerate(lines): |
|
line_content_stripped = line_content_orig.strip() |
|
line_num = i + 1 |
|
|
|
file_match = re.match(r"### File:\s*(?P<filename_line>[^\n]+)", line_content_stripped) |
|
if file_match: |
|
if current_file_path is not None and in_file_definition: |
|
content_to_save = "\n".join(current_file_content_lines).strip() |
|
space_info["files"].append({"path": current_file_path, "content": content_to_save}) |
|
|
|
filename_line = file_match.group("filename_line").strip() |
|
current_file_path = filename_line |
|
current_file_path = re.split(r'\s*\(', current_file_path, 1)[0].strip() |
|
current_file_path = current_file_path.strip('`\'"').strip() |
|
|
|
if not current_file_path: |
|
file_parsing_errors.append(f"Line {line_num}: Found '### File:' but filename is empty or invalid.") |
|
current_file_path = None |
|
in_file_definition = False |
|
continue |
|
|
|
current_file_content_lines = [] |
|
in_file_definition = True |
|
in_code_block = False |
|
logger.debug(f"Parsed file header: {current_file_path}") |
|
continue |
|
|
|
if not in_file_definition: |
|
if line_content_stripped.startswith("# Space:"): |
|
full_space_name_md = line_content_stripped.replace("# Space:", "").strip() |
|
if "/" in full_space_name_md: |
|
parts = full_space_name_md.split("/", 1) |
|
if len(parts) == 2: |
|
space_info["owner_md"], space_info["repo_name_md"] = parts[0].strip(), parts[1].strip() |
|
else: |
|
space_info["repo_name_md"] = full_space_name_md |
|
else: |
|
space_info["repo_name_md"] = full_space_name_md |
|
logger.debug(f"Parsed space header: {space_info['owner_md']}/{space_info['repo_name_md']}") |
|
continue |
|
if line_content_stripped.startswith("## File Structure"): |
|
structure_block_start = i + 1 |
|
while structure_block_start < len(lines) and not lines[structure_block_start].strip().startswith("```"): |
|
structure_block_start += 1 |
|
if structure_block_start < len(lines) and lines[structure_block_start].strip().startswith("```"): |
|
structure_block_end = structure_block_start + 1 |
|
while structure_block_end < len(lines) and not lines[structure_block_end].strip().startswith("```"): |
|
structure_block_end += 1 |
|
if structure_block_end < len(lines) and lines[structure_block_end].strip().startswith("```"): |
|
logger.debug(f"Skipping File Structure block from line {i+1} to {structure_block_end+1}") |
|
i = structure_block_end |
|
continue |
|
continue |
|
|
|
if in_file_definition: |
|
if line_content_stripped.startswith("```"): |
|
in_code_block = not in_code_block |
|
logger.debug(f"Toggled code block to {in_code_block} at line {line_num}") |
|
continue |
|
|
|
if in_code_block: |
|
current_file_content_lines.append(line_content_orig) |
|
elif line_content_stripped.startswith("[Binary file") or line_content_stripped.startswith("[Error loading content:") or line_content_stripped.startswith("[Binary or Skipped file]"): |
|
current_file_content_lines.append(line_content_orig) |
|
logger.debug(f"Parsed binary/error marker for {current_file_path} at line {line_num}") |
|
else: |
|
pass |
|
|
|
if current_file_path is not None and in_file_definition: |
|
content_to_save = "\n".join(current_file_content_lines).strip() |
|
space_info["files"].append({"path": current_file_path, "content": content_to_save}) |
|
|
|
space_info["files"] = [f for f in space_info["files"] if f.get("path")] |
|
space_info["owner_md"] = space_info["owner_md"].strip() |
|
space_info["repo_name_md"] = space_info["repo_name_md"].strip() |
|
|
|
if file_parsing_errors: |
|
logger.warning(f"Markdown parsing encountered errors: {file_parsing_errors}") |
|
|
|
logger.info(f"Parsed markdown. Found {len(space_info['files'])} files.") |
|
return space_info |
|
|
|
def get_space_repository_info(ui_api_token_from_textbox, space_name_ui, owner_ui): |
|
repo_id_for_error_logging = f"{owner_ui}/{space_name_ui}" if owner_ui else space_name_ui |
|
sdk = None |
|
files = [] |
|
error = None |
|
repo_id = None |
|
|
|
logger.info(f"Attempting to get repo info for {repo_id_for_error_logging}") |
|
|
|
try: |
|
resolved_api_token, token_err = _get_api_token(ui_api_token_from_textbox) |
|
if token_err: return None, [], token_err |
|
|
|
repo_id, err_repo_id = _determine_repo_id(resolved_api_token, owner_ui, space_name_ui) |
|
if err_repo_id: return None, [], err_repo_id |
|
repo_id_for_error_logging = repo_id |
|
|
|
api = HfApi(token=resolved_api_token) |
|
repo_info_obj = api.repo_info(repo_id=repo_id, repo_type="space", timeout=20) |
|
sdk = repo_info_obj.sdk |
|
files = [sibling.rfilename for sibling in repo_info_obj.siblings if sibling.rfilename] |
|
|
|
if not files and repo_info_obj.siblings: |
|
logger.warning(f"Repo {repo_id} has siblings but no rfilenames extracted. Total siblings: {len(repo_info_obj.siblings)}") |
|
|
|
logger.info(f"Successfully got repo info for {repo_id}. SDK: {sdk}, Files found: {len(files)}") |
|
|
|
|
|
except HfHubHTTPError as e_http: |
|
logger.error(f"HTTP error getting repo info for {repo_id_for_error_logging or 'unknown repo'}: {e_http}") |
|
error_message = str(e_http) |
|
status_code = e_http.response.status_code if e_http.response is not None else None |
|
|
|
if status_code == 404: |
|
error = f"Space '{repo_id_for_error_logging or 'unknown repo'}' not found (404)." |
|
elif status_code in (401,403): |
|
error = f"Access denied for '{repo_id_for_error_logging or 'unknown repo'}' ({status_code}). Check token permissions." |
|
else: |
|
error = f"HTTP Error {status_code or 'unknown'} for '{repo_id_for_error_logging or 'unknown repo'}': {error_message}" |
|
|
|
except Exception as e: |
|
logger.warning(f"Could not get full repo_info for {repo_id_for_error_logging or 'unknown repo'}, attempting list_repo_files fallback: {e}") |
|
error = f"Error retrieving Space info for `{repo_id_for_error_logging or 'unknown repo'}`: {str(e)}. Attempting file list fallback." |
|
|
|
try: |
|
resolved_api_token_fb, token_err_fb = _get_api_token(ui_api_token_from_textbox) |
|
if token_err_fb: return None, [], f"{error}\nAPI Token Error during fallback: {token_err_fb}" |
|
repo_id_fb, err_repo_id_fb = _determine_repo_id(resolved_api_token_fb, owner_ui, space_name_ui) |
|
if err_repo_id_fb: return None, [], f"{error}\nRepo ID Error during fallback: {err_repo_id_fb}" |
|
|
|
files = list_repo_files(repo_id=repo_id_fb, token=resolved_api_token_fb, repo_type="space", timeout=20) |
|
error = f"Warning: Could not fetch full Space info (SDK etc.) for `{repo_id_for_error_logging or 'unknown repo'}`: {str(e)}. File list loaded via fallback." |
|
logger.info(f"Fallback list_repo_files successful for {repo_id_fb}. Files found: {len(files)}") |
|
|
|
except HfHubHTTPError as e2_http: |
|
logger.error(f"HTTP error during fallback list_repo_files for {repo_id_for_error_logging or 'unknown repo'}: {e2_http}") |
|
error_message_fb = str(e2_http) |
|
status_code_fb = e2_http.response.status_code if e2_http.response is not None else None |
|
if status_code_fb == 404: |
|
error = f"Space '{repo_id_for_error_logging or 'unknown repo'}' not found during fallback (404)." |
|
else: |
|
error = f"HTTP Error {status_code_fb or 'unknown'} for '{repo_id_for_error_logging or 'unknown repo'}' during fallback: {error_message_fb}" |
|
files = [] |
|
|
|
except Exception as e2: |
|
logger.exception(f"Error listing files for {repo_id_for_error_logging or 'unknown repo'} during fallback: {e2}") |
|
error = f"{error}\nError listing files during fallback for `{repo_id_for_error_logging or 'unknown repo'}`: {str(e2)}" |
|
files = [] |
|
|
|
if not files and not error and (repo_id_for_error_logging is not None): |
|
error = f"No files found in Space `{repo_id_for_error_logging or 'unknown repo'}`." |
|
|
|
return sdk, files, error |
|
|
|
def list_space_files_for_browsing(ui_api_token_from_textbox, space_name_ui, owner_ui): |
|
files, err = get_space_repository_info(ui_api_token_from_textbox, space_name_ui, owner_ui)[1:] |
|
return files, err |
|
|
|
def get_space_file_content(ui_api_token_from_textbox, space_name_ui, owner_ui, file_path_in_repo): |
|
repo_id_for_error_logging = f"{owner_ui}/{space_name_ui}" if owner_ui else space_name_ui |
|
repo_id = None |
|
logger.info(f"Attempting to get content for file '{file_path_in_repo}' from {repo_id_for_error_logging}") |
|
try: |
|
resolved_api_token, token_err = _get_api_token(ui_api_token_from_textbox) |
|
if token_err: return None, token_err |
|
repo_id, err_repo_id = _determine_repo_id(resolved_api_token, owner_ui, space_name_ui) |
|
if err_repo_id: return None, err_repo_id |
|
repo_id_for_error_logging = repo_id |
|
|
|
if not file_path_in_repo: return None, "Error: File path cannot be empty." |
|
file_path_in_repo = file_path_in_repo.replace("\\", "/") |
|
|
|
downloaded_file_path = hf_hub_download( |
|
repo_id=repo_id, |
|
filename=file_path_in_repo, |
|
repo_type="space", |
|
token=resolved_api_token, |
|
local_dir_use_symlinks=False, |
|
cache_dir=None, |
|
timeout=20 |
|
) |
|
content = Path(downloaded_file_path).read_text(encoding="utf-8") |
|
logger.info(f"Successfully downloaded and read content for '{file_path_in_repo}'.") |
|
return content, None |
|
except FileNotFoundError: |
|
logger.error(f"FileNotFoundError for '{file_path_in_repo}' in {repo_id_for_error_logging or 'unknown'}") |
|
return None, f"Error: File '{file_path_in_repo}' not found in Space '{repo_id_for_error_logging or 'unknown repo'}' (404)." |
|
except UnicodeDecodeError: |
|
logger.warning(f"UnicodeDecodeError for '{file_path_in_repo}'. Likely binary.") |
|
return None, f"Error: File '{file_path_in_repo}' is not valid UTF-8 text. Cannot display." |
|
except HfHubHTTPError as e_http: |
|
logger.error(f"HTTP error fetching file {file_path_in_repo} from {repo_id_for_error_logging or 'unknown repo'}: {e_http}") |
|
error_message = str(e_http) |
|
status_code = e_http.response.status_code if e_http.response is not None else None |
|
if status_code == 404: |
|
return None, f"Error: File '{file_path_in_repo}' not found in Space '{repo_id_for_error_logging or 'unknown repo'}' (404)." |
|
if status_code in (401, 403): |
|
return None, f"Error: Access denied or authentication required for '{repo_id_for_error_logging or 'unknown repo'}' ({status_code}). Check token permissions." |
|
return None, f"HTTP Error {status_code or 'unknown'} fetching file '{file_path_in_repo}': {error_message}" |
|
except Exception as e: |
|
logger.exception(f"Error fetching file content for {file_path_in_repo} from {repo_id_for_error_logging or 'unknown repo'}:") |
|
return None, f"Error fetching file content: {str(e)}" |
|
|
|
def apply_staged_file_changes(ui_api_token_from_textbox, owner_ui, space_name_ui, file_changeset): |
|
repo_id_for_error_logging = f"{owner_ui}/{space_name_ui}" if owner_ui else space_name_ui |
|
repo_id = None |
|
status_messages = [] |
|
|
|
logger.info(f"Attempting to apply {len(file_changeset)} staged file changes to {repo_id_for_error_logging}") |
|
|
|
if not owner_ui or not space_name_ui: |
|
return "Error: Cannot apply file changes. Owner and Space Name must be provided." |
|
|
|
try: |
|
resolved_api_token, token_err = _get_api_token(ui_api_token_from_textbox) |
|
if token_err: return f"API Token Error: {token_err}" |
|
|
|
repo_id, err_repo_id = _determine_repo_id(resolved_api_token, owner_ui, space_name_ui) |
|
if err_repo_id: return f"Repo ID Error: {err_repo_id}" |
|
repo_id_for_error_logging = repo_id |
|
|
|
api = HfApi(token=resolved_api_token) |
|
|
|
temp_dir = None |
|
paths_to_upload = {} |
|
delete_operations = [] |
|
|
|
try: |
|
temp_dir = tempfile.TemporaryDirectory() |
|
repo_staging_path = Path(temp_dir.name) / "repo_staging_content" |
|
repo_staging_path.mkdir(exist_ok=True) |
|
|
|
gitattributes_path_local = repo_staging_path / ".gitattributes" |
|
try: |
|
with open(gitattributes_path_local, "w", encoding="utf-8") as f: |
|
f.write("* text=auto eol=lf\n") |
|
paths_to_upload[str(gitattributes_path_local)] = ".gitattributes" |
|
except Exception as e: |
|
status_messages.append(f"Warning: Could not stage .gitattributes file: {e}") |
|
logger.warning(f"Could not stage .gitattributes: {e}") |
|
|
|
|
|
for change in file_changeset: |
|
if change['type'] == 'UPDATE_FILE' or change['type'] == 'CREATE_FILE': |
|
file_path_in_repo = change['path'].lstrip('/').replace(os.sep, '/') |
|
if not file_path_in_repo: |
|
status_messages.append(f"Skipping {change['type']} operation: empty path.") |
|
continue |
|
|
|
content_to_write = change.get('content', '') |
|
if content_to_write.startswith("[Binary file") or content_to_write.startswith("[Error loading content:") or content_to_write.startswith("[Binary or Skipped file]"): |
|
status_messages.append(f"Skipping {change['type']} for '{file_path_in_repo}': Content is a binary/error placeholder.") |
|
logger.warning(f"Skipping {change['type']} operation for '{file_path_in_repo}': Content is binary/error placeholder.") |
|
continue |
|
|
|
file_path_local = repo_staging_path / file_path_in_repo |
|
file_path_local.parent.mkdir(parents=True, exist_ok=True) |
|
|
|
try: |
|
with open(file_path_local, "w", encoding="utf-8") as f: |
|
f.write(content_to_write) |
|
paths_to_upload[str(file_path_local)] = file_path_in_repo |
|
logger.debug(f"Staged file for {change['type']}: {file_path_in_repo}") |
|
except Exception as file_write_error: |
|
status_messages.append(f"Error staging file {file_path_in_repo} for {change['type']}: {file_write_error}") |
|
logger.error(f"Error writing file {file_path_in_repo} during staging for {change['type']}: {file_write_error}") |
|
|
|
|
|
elif change['type'] == 'DELETE_FILE': |
|
file_path_in_repo = change['path'].lstrip('/').replace(os.sep, '/') |
|
if not file_path_in_repo: |
|
status_messages.append(f"Skipping DELETE_FILE operation: empty path.") |
|
continue |
|
delete_operations.append(CommitOperationDelete(path_in_repo=file_path_in_repo)) |
|
logger.debug(f"Added DELETE_FILE operation for: {file_path_in_repo}") |
|
|
|
|
|
if delete_operations: |
|
try: |
|
commit_message_delete = f"AI Space Builder: Deleted {len(delete_operations)} files." |
|
logger.info(f"Performing delete commit for {repo_id_for_error_logging}: {commit_message_delete}") |
|
api.create_commit( |
|
repo_id=repo_id, |
|
repo_type="space", |
|
operations=delete_operations, |
|
commit_message=commit_message_delete, |
|
timeout=30 |
|
) |
|
status_messages.append(f"File Deletions: Successfully committed {len(delete_operations)} deletions.") |
|
logger.info("Delete commit successful.") |
|
except HfHubHTTPError as e_http: |
|
status_messages.append(f"File Deletion HTTP Error ({e_http.response.status_code if e_http.response else 'N/A'}): {e_http.response.text if e_http.response else str(e_http)}. Check logs.") |
|
logger.error(f"HTTP error during delete commit for {repo_id}: {e_http}") |
|
except Exception as e_delete_commit: |
|
status_messages.append(f"File Deletion Error: {str(e_delete_commit)}. Check logs.") |
|
logger.exception(f"Error during delete commit for {repo_id}:") |
|
|
|
|
|
if paths_to_upload: |
|
try: |
|
commit_message_upload = f"AI Space Builder: Updated Space content for {repo_id}" |
|
logger.info(f"Uploading staged files from {str(repo_staging_path)} to {repo_id}...") |
|
upload_folder( |
|
repo_id=repo_id, |
|
folder_path=str(repo_staging_path), |
|
path_in_repo=".", |
|
token=resolved_api_token, |
|
repo_type="space", |
|
commit_message=commit_message_upload, |
|
allow_patterns=["*"], |
|
timeout=120 |
|
) |
|
status_messages.append(f"File Uploads/Updates: Successfully uploaded/updated {len(paths_to_upload)} files.") |
|
logger.info("Upload/Update commit successful.") |
|
except HfHubHTTPError as e_http: |
|
status_messages.append(f"File Upload/Update HTTP Error ({e_http.response.status_code if e_http.response else 'N/A'}): {e_http.response.text if e_http.response else str(e_http)}. Check logs.") |
|
logger.error(f"HTTP error during upload_folder for {repo_id}: {e_http}") |
|
except Exception as e_upload: |
|
status_messages.append(f"File Upload/Update Error: {str(e_upload)}. Check logs.") |
|
logger.exception(f"Error during upload_folder for {repo_id}:") |
|
|
|
else: |
|
status_messages.append("No file changes (create/update/delete) to commit.") |
|
logger.info("No file changes to commit.") |
|
|
|
|
|
finally: |
|
if temp_dir: |
|
try: |
|
temp_dir.cleanup() |
|
logger.info("Cleaned up temporary staging directory.") |
|
except Exception as e: |
|
logger.error(f"Error cleaning up temp dir: {e}") |
|
|
|
except HfHubHTTPError as e_http: |
|
logger.error(f"Top-level HTTP error during apply_staged_file_changes for {repo_id_for_error_logging or 'unknown repo'}: {e_http}") |
|
status_messages.append(f"API HTTP Error ({e_http.response.status_code if e_http.response else 'N/A'}): {e_http.response.text if e_http.response else str(e_http)}") |
|
except Exception as e: |
|
logger.exception(f"Top-level error during apply_staged_file_changes for {repo_id_for_error_logging or 'unknown repo'}:") |
|
status_messages.append(f"An unexpected error occurred during apply file changes: {str(e)}") |
|
|
|
final_status = " | ".join(status_messages) if status_messages else "No file operations were applied." |
|
logger.info(f"Finished applying staged file changes. Final status: {final_status}") |
|
return final_status |
|
|
|
def build_logic_create_space(ui_api_token_from_textbox, space_name_ui, owner_ui, sdk_ui, markdown_input, private): |
|
repo_id_for_error_logging = f"{owner_ui}/{space_name_ui}" if owner_ui else space_name_ui |
|
logger.info(f"Attempting to create space: {repo_id_for_error_logging}") |
|
|
|
if not space_name_ui: return "Error: Space Name cannot be empty." |
|
if "/" in space_name_ui: return "Error: Space Name should not contain '/'." |
|
|
|
resolved_api_token, token_err = _get_api_token(ui_api_token_from_textbox) |
|
if token_err: return f"API Token Error: {token_err}" |
|
|
|
final_owner = owner_ui |
|
if not final_owner: |
|
try: |
|
user_info = whoami(token=resolved_api_token) |
|
final_owner = user_info.get('name') |
|
if not final_owner: raise Exception("Could not find user name from token.") |
|
except Exception as e: |
|
return f"Error auto-detecting owner: {str(e)}. Specify Owner field." |
|
|
|
if not final_owner: return "Error: Owner could not be determined." |
|
|
|
repo_id = f"{final_owner}/{space_name_ui}" |
|
temp_dir = None |
|
|
|
try: |
|
api = HfApi(token=resolved_api_token) |
|
|
|
|
|
api.create_repo(repo_id=repo_id, repo_type="space", space_sdk=sdk_ui, private=private, exist_ok=False) |
|
logger.info(f"Successfully created empty space: {repo_id}") |
|
|
|
|
|
if markdown_input: |
|
parsed_md = parse_markdown(markdown_input) |
|
files_to_upload = parsed_md.get("files", []) |
|
if files_to_upload: |
|
logger.info(f"Staging {len(files_to_upload)} files for upload after creation.") |
|
temp_dir = tempfile.TemporaryDirectory() |
|
repo_staging_path = Path(temp_dir.name) / "initial_content" |
|
repo_staging_path.mkdir(exist_ok=True) |
|
paths_to_upload = {} |
|
status_messages = [] |
|
|
|
|
|
gitattributes_path_local = repo_staging_path / ".gitattributes" |
|
try: |
|
with open(gitattributes_path_local, "w", encoding="utf-8") as f: |
|
f.write("* text=auto eol=lf\n") |
|
paths_to_upload[str(gitattributes_path_local)] = ".gitattributes" |
|
except Exception as e: |
|
status_messages.append(f"Warning: Could not stage .gitattributes file: {e}") |
|
logger.warning(f"Could not stage .gitattributes: {e}") |
|
|
|
|
|
for file_info in files_to_upload: |
|
file_path_in_repo = file_info['path'].lstrip('/').replace(os.sep, '/') |
|
content_to_write = file_info.get('content', '') |
|
|
|
if not file_path_in_repo: |
|
status_messages.append(f"Skipping file creation: empty path.") |
|
continue |
|
|
|
if content_to_write.startswith("[Binary file") or content_to_write.startswith("[Error loading content:") or content_to_write.startswith("[Binary or Skipped file]"): |
|
status_messages.append(f"Skipping file '{file_path_in_repo}': Content is a binary/error placeholder.") |
|
logger.warning(f"Skipping file '{file_path_in_repo}': Content is binary/error placeholder.") |
|
continue |
|
|
|
file_path_local = repo_staging_path / file_path_in_repo |
|
file_path_local.parent.mkdir(parents=True, exist_ok=True) |
|
try: |
|
with open(file_path_local, "w", encoding="utf-8") as f: |
|
f.write(content_to_write) |
|
paths_to_upload[str(file_path_local)] = file_path_in_repo |
|
except Exception as file_write_error: |
|
status_messages.append(f"Error staging file {file_path_in_repo}: {file_write_error}") |
|
|
|
|
|
if paths_to_upload: |
|
try: |
|
commit_message_upload = "Initial files from AI Space Builder" |
|
upload_folder( |
|
repo_id=repo_id, |
|
folder_path=str(repo_staging_path), |
|
path_in_repo=".", |
|
token=resolved_api_token, |
|
repo_type="space", |
|
commit_message=commit_message_upload, |
|
allow_patterns=["*"], |
|
timeout=120 |
|
) |
|
status_messages.append(f"Successfully uploaded initial {len(paths_to_upload)} files.") |
|
logger.info("Initial upload successful.") |
|
except HfHubHTTPError as e_http: |
|
status_messages.append(f"Initial Upload HTTP Error ({e_http.response.status_code if e_http.response else 'N/A'}): {e_http.response.text if e_http.response else str(e_http)}. Check logs.") |
|
logger.error(f"HTTP error during initial upload for {repo_id}: {e_http}") |
|
except Exception as e_upload: |
|
status_messages.append(f"Initial Upload Error: {str(e_upload)}. Check logs.") |
|
logger.exception(f"Error during initial upload for {repo_id}:") |
|
else: |
|
status_messages = ["No files parsed from markdown for initial upload."] |
|
else: |
|
status_messages = ["No markdown provided for initial files."] |
|
|
|
|
|
final_status = "Successfully created space" |
|
if status_messages: |
|
final_status += " | " + " | ".join(status_messages) |
|
|
|
return final_status |
|
|
|
except HfHubHTTPError as e_http: |
|
logger.error(f"HTTP error creating space {repo_id_for_error_logging or 'unknown repo'}: {e_http}") |
|
status_code = e_http.response.status_code if e_http.response else 'N/A' |
|
if status_code == 409: |
|
return f"Create Space Error ({status_code}): Space '{repo_id_for_error_logging or 'unknown'}' already exists. Use 'Build / Update' instead." |
|
if status_code in (401, 403): |
|
return f"Create Space Error ({status_code}): Access denied or authentication required for '{repo_id_for_error_logging or 'unknown'}'. Check token permissions." |
|
return f"Create Space HTTP Error ({status_code}): {e_http.response.text if e_http.response else str(e_http)}" |
|
|
|
except Exception as e: |
|
logger.exception(f"Error creating space {repo_id_for_error_logging or 'unknown repo'}:") |
|
return f"Create Space Error: {str(e)}" |
|
finally: |
|
if temp_dir: |
|
try: |
|
temp_dir.cleanup() |
|
except Exception as e: |
|
logger.error(f"Error cleaning up temp dir: {e}") |
|
|
|
|
|
def delete_space_file(ui_api_token_from_textbox, space_name_ui, owner_ui, file_path_in_repo, commit_message_ui=None): |
|
repo_id_for_error_logging = f"{owner_ui}/{space_name_ui}" if owner_ui else space_name_ui |
|
repo_id = None |
|
logger.info(f"Attempting manual file deletion for '{file_path_in_repo}' from {repo_id_for_error_logging}") |
|
try: |
|
resolved_api_token, token_err = _get_api_token(ui_api_token_from_textbox) |
|
if token_err: return f"API Token Error: {token_err}" |
|
repo_id, err_repo_id = _determine_repo_id(resolved_api_token, owner_ui, space_name_ui) |
|
if err_repo_id: return f"Repo ID Error: {err_repo_id}" |
|
repo_id_for_error_logging = repo_id |
|
|
|
if not file_path_in_repo: return "Error: File path cannot be empty for deletion." |
|
file_path_in_repo = file_path_in_repo.lstrip('/').replace(os.sep, '/') |
|
|
|
effective_commit_message = commit_message_ui or f"Deleted file: {file_path_in_repo} via AI Space Editor UI" |
|
|
|
hf_delete_file( |
|
path_in_repo=file_path_in_repo, |
|
repo_id=repo_id, |
|
repo_type="space", |
|
token=resolved_api_token, |
|
commit_message=effective_commit_message, |
|
timeout=20 |
|
) |
|
logger.info(f"Successfully deleted file: {file_path_in_repo}") |
|
return f"Successfully deleted file: `{file_path_in_repo}`" |
|
|
|
except FileNotFoundError: |
|
logger.error(f"FileNotFoundError during manual delete for '{file_path_in_repo}' in {repo_id_for_error_logging or 'unknown'}") |
|
return f"Delete Error: File '{file_path_in_repo}' not found in Space '{repo_id_for_error_logging or 'unknown repo'}' (404)." |
|
except HfHubHTTPError as e_http: |
|
logger.error(f"HTTP error deleting file {file_path_in_repo} from {repo_id_for_error_logging or 'unknown repo'}: {e_http}") |
|
error_message = str(e_http) |
|
status_code = e_http.response.status_code if e_http.response is not None else None |
|
|
|
if status_code == 404: |
|
return f"Delete Error: File '{file_path_in_repo}' not found in Space '{repo_id_for_error_logging or 'unknown repo'}' for deletion (404)." |
|
if status_code in (401, 403): |
|
return f"Delete Error: Access denied or authentication required for '{repo_id_for_error_logging or 'unknown repo'}' ({status_code}). Check token permissions." |
|
return f"Delete HTTP Error {status_code or 'unknown'} deleting file '{file_path_in_repo}': {error_message}" |
|
except Exception as e: |
|
logger.exception(f"Error deleting file {file_path_in_repo} from {repo_id_for_error_logging or 'unknown repo'}:") |
|
return f"Delete Error deleting file '{file_path_in_repo}': {str(e)}" |
|
|
|
def update_space_file(ui_api_token_from_textbox, space_name_ui, owner_ui, file_path_in_repo, file_content, commit_message_ui): |
|
repo_id_for_error_logging = f"{owner_ui}/{space_name_ui}" if owner_ui else space_name_ui |
|
repo_id = None |
|
logger.info(f"Attempting manual file update for '{file_path_in_repo}' in {repo_id_for_error_logging}") |
|
try: |
|
resolved_api_token, token_err = _get_api_token(ui_api_token_from_textbox) |
|
if token_err: return f"API Token Error: {token_err}" |
|
repo_id, err_repo_id = _determine_repo_id(resolved_api_token, owner_ui, space_name_ui) |
|
if err_repo_id: return f"Repo ID Error: {err_repo_id}" |
|
repo_id_for_error_logging = repo_id |
|
|
|
if not file_path_in_repo: return "Update Error: File Path to update cannot be empty." |
|
file_path_in_repo = file_path_in_repo.lstrip('/').replace(os.sep, '/') |
|
commit_msg = commit_message_ui or f"Update {file_path_in_repo} via AI Space Editor UI" |
|
|
|
api = HfApi(token=resolved_api_token) |
|
|
|
tmp_file_path = None |
|
try: |
|
with tempfile.NamedTemporaryFile(mode='w', delete=False, encoding='utf-8') as tmp_file_obj: |
|
tmp_file_obj.write(file_content) |
|
tmp_file_path = tmp_file_obj.name |
|
|
|
api.upload_file( |
|
path_or_fileobj=tmp_file_path, |
|
path_in_repo=file_path_in_repo, |
|
repo_id=repo_id, |
|
repo_type="space", |
|
commit_message=commit_msg, |
|
timeout=20 |
|
) |
|
logger.info(f"Successfully updated file: {file_path_in_repo}") |
|
return f"Successfully updated `{file_path_in_repo}`" |
|
finally: |
|
if tmp_file_path and os.path.exists(tmp_file_path): |
|
os.remove(tmp_file_path) |
|
|
|
except FileNotFoundError: |
|
logger.error(f"FileNotFoundError during manual update for '{file_path_in_repo}' in {repo_id_for_error_logging or 'unknown'}") |
|
return f"Update Error: Local temporary file not found during upload for '{file_path_in_repo}'." |
|
except UnicodeDecodeError: |
|
logger.warning(f"UnicodeDecodeError for '{file_path_in_repo}' during manual update.") |
|
return f"Update Error: Content for '{file_path_in_repo}' is not valid UTF-8 text. Cannot edit this way." |
|
except HfHubHTTPError as e_http: |
|
logger.error(f"HTTP error in update_space_file for {repo_id_for_error_logging or 'unknown repo'}, file {file_path_in_repo}: {e_http}") |
|
error_message = str(e_http) |
|
status_code = e_http.response.status_code if e_http.response is not None else None |
|
if status_code == 404: |
|
return f"Update Error: Space '{repo_id_for_error_logging or 'unknown repo'}' or file '{file_path_in_repo}' not found (404)." |
|
if status_code in (401, 403): |
|
return f"Update Error: Access denied or authentication required for '{repo_id_for_error_logging or 'unknown repo'}' ({status_code}). Check token permissions." |
|
return f"Update HTTP Error {status_code or 'unknown'} updating file '{file_path_in_repo}': {error_message}" |
|
except Exception as e: |
|
logger.exception(f"Error in update_space_file for {repo_id_for_error_logging or 'unknown repo'}, file {file_path_in_repo}:") |
|
return f"Update Error updating file for `{repo_id_for_error_logging or 'unknown repo'}`: {str(e)}" |
|
|
|
def get_space_runtime_status(ui_api_token_from_textbox, space_name_ui, owner_ui): |
|
repo_id_for_error_logging = f"{owner_ui}/{space_name_ui}" if owner_ui else space_name_ui |
|
repo_id = None |
|
logger.info(f"Fetching runtime status for Space: {repo_id_for_error_logging}") |
|
try: |
|
resolved_api_token, token_err = _get_api_token(ui_api_token_from_textbox) |
|
if token_err: return None, f"API Token Error: {token_err}" |
|
repo_id, err_repo_id = _determine_repo_id(resolved_api_token, owner_ui, space_name_ui) |
|
if err_repo_id: return None, f"Repo ID Error: {err_repo_id}" |
|
repo_id_for_error_logging = repo_id |
|
|
|
api = HfApi(token=resolved_api_token) |
|
|
|
runtime_info = api.get_space_runtime(repo_id=repo_id, timeout=20) |
|
logger.info(f"Received runtime info for {repo_id}. Stage: {runtime_info.stage}") |
|
|
|
status_details = { |
|
"stage": runtime_info.stage, |
|
"hardware": runtime_info.hardware, |
|
"requested_hardware": runtime_info.requested_hardware if hasattr(runtime_info, 'requested_hardware') else None, |
|
"error_message": None, |
|
"status": runtime_info.status if hasattr(runtime_info, 'status') else None, |
|
"full_log_link": f"https://huggingface.co/spaces/{repo_id}/logs" if repo_id else "#" |
|
} |
|
|
|
if runtime_info.stage == "ERRORED": |
|
error_content = None |
|
if hasattr(runtime_info, 'error') and runtime_info.error: error_content = str(runtime_info.error) |
|
if 'build' in runtime_info.raw and isinstance(runtime_info.raw['build'], dict) and runtime_info.raw['build'].get('status') == 'error': |
|
error_content = f"Build Error: {runtime_info.raw['build'].get('message', error_content or 'Unknown build error')}" |
|
elif 'run' in runtime_info.raw and isinstance(runtime_info.raw['run'], dict) and runtime_info.raw['run'].get('status') == 'error': |
|
error_content = f"Runtime Error: {runtime_info.raw['run'].get('message', error_content or 'Unknown runtime error')}" |
|
elif 'message' in runtime_info.raw and isinstance(runtime_info.raw['message'], str) and ('error' in runtime_info.raw['message'].lower() or runtime_info.raw['message'].strip().endswith('!')): |
|
error_content = runtime_info.raw['message'] |
|
|
|
status_details["error_message"] = error_content if error_content else "Space is in an errored state. Check logs for details." |
|
|
|
logger.info(f"Runtime status details for {repo_id}: {status_details}") |
|
return status_details, None |
|
|
|
except HfHubHTTPError as e_http: |
|
logger.error(f"HTTP error fetching runtime status for {repo_id_for_error_logging or 'unknown repo'}: {e_http}") |
|
error_message = str(e_http) |
|
status_code = e_http.response.status_code if e_http.response is not None else None |
|
|
|
if status_code == 404: |
|
return None, f"Status Error: Space '{repo_id_for_error_logging or 'unknown repo'}' not found or has no active runtime status (404)." |
|
if status_code in (401, 403): |
|
return None, f"Status Error: Access denied or authentication required for '{repo_id_for_error_logging or 'unknown repo'}' ({status_code}). Check token permissions." |
|
return None, f"Status HTTP Error {status_code or 'unknown'} fetching runtime status for '{repo_id_for_error_logging or 'unknown repo'}': {error_message}" |
|
|
|
except Exception as e: |
|
logger.exception(f"Error fetching runtime status for {repo_id_for_error_logging or 'unknown repo'}:") |
|
return None, f"Status Error fetching runtime status: {str(e)}" |
|
|
|
def build_logic_set_space_privacy(hf_api_key, repo_id, private: bool): |
|
logger.info(f"Attempting to set privacy for '{repo_id}' to {private}.") |
|
try: |
|
token, err = _get_api_token(hf_api_key) |
|
if err or not token: |
|
logger.error(f"Token error setting privacy: {err or 'Token not found'}") |
|
return f"Error getting token: {err or 'Token not found.'}" |
|
api = HfApi(token=token) |
|
api.update_repo_visibility(repo_id=repo_id, private=private, repo_type='space') |
|
logger.info(f"Successfully set privacy for {repo_id} to {private}.") |
|
return f"Successfully set privacy for `{repo_id}` to `{private}`." |
|
except HfHubHTTPError as e_http: |
|
logger.error(f"HTTP error setting privacy for {repo_id}: {e_http}") |
|
status_code = e_http.response.status_code if e_http.response else 'N/A' |
|
return f"HTTP Error ({status_code}) setting privacy for `{repo_id}`: {e_http.response.text if e_http.response else str(e_http)}" |
|
except Exception as e: |
|
logger.exception(f"Error setting privacy for {repo_id}:") |
|
return f"Error setting privacy for `{repo_id}`: {e}" |
|
|
|
def build_logic_delete_space(hf_api_key, owner, space_name): |
|
repo_id = f"{owner}/{space_name}" |
|
logger.warning(f"Attempting DESTRUCTIVE DELETE_SPACE action for '{repo_id}'.") |
|
try: |
|
token, err = _get_api_token(hf_api_key) |
|
if err or not token: |
|
logger.error(f"Token error deleting space: {err or 'Token not found'}") |
|
return f"Error getting token: {err or 'Token not found.'}" |
|
api = HfApi(token=token) |
|
api.delete_repo(repo_id=repo_id, repo_type='space') |
|
logger.warning(f"Successfully deleted space {repo_id}.") |
|
return f"Successfully deleted space `{repo_id}`." |
|
except HfHubHTTPError as e_http: |
|
logger.error(f"HTTP error deleting space {repo_id}: {e_http}") |
|
status_code = e_http.response.status_code if e_http.response else 'N/A' |
|
return f"HTTP Error ({status_code}) deleting space `{repo_id}`: {e_http.response.text if e_http.response else str(e_http)}" |
|
except Exception as e: |
|
logger.exception(f"Error deleting space {repo_id}:") |
|
return f"Error deleting space `{repo_id}`: {e}" |
|
|
|
|
|
def build_logic_create_pull_request(hf_api_key, source_repo_id, target_repo_id, title, body=""): |
|
logger.info(f"Attempting to create PR from '{source_repo_id}' to '{target_repo_id}'. Title: '{title}'") |
|
try: |
|
token, err = _get_api_token(hf_api_key) |
|
if err or not token: |
|
logger.error(f"Token error creating PR: {err or 'Token not found'}") |
|
return f"Error getting token: {err or 'Token not found.'}" |
|
|
|
api = HfApi(token=token) |
|
|
|
pr_url = hf_create_pull_request( |
|
repo_id=target_repo_id, |
|
title=title, |
|
description=body, |
|
base_repo=source_repo_id, |
|
token=token, |
|
timeout=30 |
|
) |
|
|
|
logger.info(f"Successfully created PR: {pr_url}") |
|
return f"Successfully created Pull Request: {pr_url}" |
|
|
|
except HfHubHTTPError as e_http: |
|
logger.error(f"HTTP error creating PR from {source_repo_id} to {target_repo_id}: {e_http}") |
|
status_code = e_http.response.status_code if e_http.response else 'N/A' |
|
if status_code in (401, 403): |
|
return f"PR Error ({status_code}): Access denied or authentication required to create PR on '{target_repo_id}'. Check token permissions." |
|
if status_code == 404: |
|
return f"PR Error ({status_code}): Target repository '{target_repo_id}' not found." |
|
if e_http.response and isinstance(e_http.response.text, str) and 'already exists' in e_http.response.text: |
|
return f"PR Error: Pull Request already exists." |
|
return f"PR HTTP Error ({status_code}): {e_http.response.text if e_http.response else str(e_http)}" |
|
|
|
except Exception as e: |
|
logger.exception(f"Error creating PR from {source_repo_id} to {target_repo_id}:") |
|
return f"PR Error: {e}" |
|
|
|
|
|
def build_logic_add_comment(hf_api_key, repo_id, comment_text): |
|
logger.info(f"Attempting to add comment to '{repo_id}'. Text: '{comment_text[:50]}...'") |
|
try: |
|
token, err = _get_api_token(hf_api_key) |
|
if err or not token: |
|
logger.error(f"Token error adding comment: {err or 'Token not found'}") |
|
return f"Error getting token: {err or 'Token not found.'}" |
|
|
|
|
|
hf_add_space_comment( |
|
repo_id=repo_id, |
|
comment=comment_text, |
|
token=token, |
|
timeout=20 |
|
) |
|
|
|
logger.info(f"Successfully added comment to {repo_id}.") |
|
|
|
return f"Successfully added comment to `{repo_id}`." |
|
|
|
except HfHubHTTPError as e_http: |
|
logger.error(f"HTTP error adding comment to {repo_id}: {e_http}") |
|
status_code = e_http.response.status_code if e_http.response else 'N/A' |
|
if status_code in (401, 403): |
|
return f"Comment Error ({status_code}): Access denied or authentication required to add comment on '{repo_id}'. Check token permissions." |
|
if status_code == 404: |
|
return f"Comment Error ({status_code}): Repository '{repo_id}' not found." |
|
return f"Comment HTTP Error ({status_code}): {e_http.response.text if e_http.response else str(e_http)}" |
|
|
|
except Exception as e: |
|
logger.exception(f"Error adding comment to {repo_id}:") |
|
return f"Comment Error: {e}" |
|
|
|
|
|
def build_logic_like_space(hf_api_key, repo_id): |
|
logger.info(f"Attempting to like space '{repo_id}'.") |
|
return "'Like system' not installed." |
|
|
|
|
|
def duplicate_space(hf_api_key, source_repo_id, target_repo_id, private: bool = False): |
|
"""Duplicates a Hugging Face Space.""" |
|
logger.info(f"Attempting to duplicate '{source_repo_id}' to '{target_repo_id}' (private={private}).") |
|
try: |
|
token, err = _get_api_token(hf_api_key) |
|
if err or not token: |
|
logger.error(f"Token error duplicating space: {err or 'Token not found'}") |
|
return f"Error getting token: {err or 'Token not found.'}" |
|
|
|
if '/' in target_repo_id: |
|
target_owner, target_space_name = target_repo_id.split('/', 1) |
|
if not target_owner or not target_space_name or '/' in target_space_name: |
|
return f"Error: Invalid target repository ID format '{target_repo_id}'. Must be '<owner>/<space_name>'." |
|
else: |
|
target_space_name = target_repo_id |
|
try: |
|
user_info = whoami(token=token) |
|
target_owner = user_info.get('name') |
|
if not target_owner: raise Exception("Could not determine owner from token.") |
|
target_repo_id = f"{target_owner}/{target_space_name}" |
|
except Exception as e: |
|
logger.error(f"Could not determine target owner from token: {e}") |
|
return f"Error: Target repository ID '{target_repo_id}' is missing owner, and owner could not be determined from token ({e}). Use '<owner>/<space_name>' format or set the Owner field." |
|
|
|
api = HfApi(token=token) |
|
api.duplicate_repo( |
|
from_repo=source_repo_id, |
|
to_repo=target_repo_id, |
|
repo_type="space", |
|
token=token, |
|
private=private, |
|
exist_ok=True |
|
) |
|
logger.info(f"Successfully duplicated space from {source_repo_id} to {target_repo_id}.") |
|
return f"Successfully duplicated space from `{source_repo_id}` to `{target_repo_id}`." |
|
except HfHubHTTPError as e_http: |
|
logger.error(f"HTTP error duplicating space from {source_repo_id} to {target_repo_id}: {e_http}") |
|
status_code = e_http.response.status_code if e_http.response else 'N/A' |
|
return f"HTTP Error ({status_code}) duplicating space: {e_http.response.text if e_http.response else str(e_http)}" |
|
except Exception as e: |
|
logger.exception(f"Error duplicating space from {source_repo_id} to {target_repo_id}:") |
|
return f"Error duplicating space: {e}" |
|
|
|
def list_user_spaces(hf_api_key, owner=None): |
|
"""Lists spaces for the authenticated user or a specific owner/org.""" |
|
logger.info(f"Attempting to list spaces for owner: {owner or 'authenticated user'}.") |
|
try: |
|
token, err = _get_api_token(hf_api_key) |
|
if err or not token: |
|
logger.error(f"Token error listing spaces: {err or 'Token not found'}") |
|
return None, f"Error getting token: {err or 'Token not found.'}" |
|
|
|
effective_owner = owner |
|
if not effective_owner: |
|
try: |
|
user_info = whoami(token=token) |
|
effective_owner = user_info.get('name') |
|
if not effective_owner: raise Exception("Could not determine owner from token.") |
|
logger.info(f"Listing spaces for auto-detected owner: {effective_owner}") |
|
except Exception as e: |
|
logger.error(f"Could not determine owner from token for listing: {e}") |
|
return None, f"Error auto-detecting owner for listing: {e}. Please specify Owner field." |
|
|
|
api = HfApi(token=token) |
|
spaces = api.list_spaces(author=effective_owner) |
|
|
|
|
|
space_ids = [f"{r.author}/{r.id}" for r in spaces] |
|
|
|
logger.info(f"Successfully listed {len(space_ids)} spaces for {effective_owner}.") |
|
return space_ids, None |
|
|
|
except HfHubHTTPError as e_http: |
|
logger.error(f"HTTP error listing spaces for {owner or 'authenticated user'}: {e_http}") |
|
status_code = e_http.response.status_code if e_http.response else 'N/A' |
|
if status_code == 404: |
|
return [], f"HTTP Error ({status_code}): Owner '{owner}' not found or has no accessible spaces." |
|
if status_code in (401, 403): |
|
return [], f"HTTP Error ({status_code}): Access denied or authentication required for listing spaces for '{owner}'. Check token permissions." |
|
return None, f"HTTP Error ({status_code}) listing spaces for '{owner or 'authenticated user'}': {e_http.response.text if e_http.response else str(e_http)}" |
|
except Exception as e: |
|
logger.exception(f"Error listing spaces for {owner or 'authenticated user'}:") |
|
return None, f"Error listing spaces: {e}" |