import os import re import tempfile import shutil import logging from pathlib import Path import time from huggingface_hub import ( create_repo, upload_folder, list_repo_files, whoami, hf_hub_download, delete_file as hf_delete_file, HfApi, create_pull_request as hf_create_pull_request, ) from huggingface_hub.hf_api import CommitOperationDelete, CommitOperationAdd, CommitOperation from huggingface_hub.utils import HfHubHTTPError logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s" ) logger = logging.getLogger(__name__) def hf_add_space_comment(owner_repo,token,discussion,comment): result = HfApi(token=token).comment_discussion( repo_id=owner_repo, discussion_num=discussion, comment=comment ) return result def _get_api_token(ui_token_from_textbox=None): env_token = os.getenv('HF_TOKEN') if ui_token_from_textbox: logger.debug("Using HF_TOKEN from UI textbox.") return ui_token_from_textbox.strip(), None if env_token: logger.debug("Using HF_TOKEN from environment variable.") return env_token, None logger.warning("Hugging Face API token not provided in UI or HF_TOKEN env var.") return None, "Error: Hugging Face API token not provided in UI or HF_TOKEN env var." def _determine_repo_id(ui_api_token_from_textbox, owner_ui, space_name_ui): if not space_name_ui: return None, "Error: Space Name cannot be empty." if "/" in space_name_ui: return None, "Error: Space Name should not contain '/'. Use Owner field for the owner part." final_owner = owner_ui error_message = None if not final_owner: logger.info("Owner not specified, attempting to auto-detect from token.") resolved_api_token, token_err = _get_api_token(ui_api_token_from_textbox) if token_err: return None, f"Error auto-detecting owner: {token_err}" if not resolved_api_token: return None, "Error: API token required for auto owner determination if Owner field is empty." try: user_info = whoami(token=resolved_api_token) if user_info and 'name' in user_info: final_owner = user_info['name'] logger.info(f"Auto-detected owner: {final_owner}") else: error_message = "Error: Could not retrieve username from token. Check token permissions or specify Owner." logger.error(error_message) except Exception as e: error_message = f"Error retrieving username from token: {str(e)}. Specify Owner or check token." logger.exception("Error retrieving username from token:") if error_message: return None, error_message if not final_owner: return None, "Error: Owner could not be determined. Please specify it in the Owner field." repo_id = f"{final_owner}/{space_name_ui}" logger.info(f"Determined repo_id: {repo_id}") return repo_id, None def parse_markdown(markdown_input): space_info = {"repo_name_md": "", "owner_md": "", "files": []} current_file_path = None current_file_content_lines = [] in_file_definition = False in_code_block = False file_parsing_errors = [] lines = markdown_input.strip().split("\n") cleaned_lines = [] for line_content_orig in lines: if line_content_orig.strip().startswith("# "): if line_content_orig.strip().startswith("# ### File:") or \ line_content_orig.strip().startswith("# ## File Structure") or \ line_content_orig.strip().startswith("# # Space:"): cleaned_lines.append(line_content_orig.strip()[2:]) else: cleaned_lines.append(line_content_orig) else: cleaned_lines.append(line_content_orig) lines = cleaned_lines for i, line_content_orig in enumerate(lines): line_content_stripped = line_content_orig.strip() line_num = i + 1 file_match = re.match(r"### File:\s*(?P[^\n]+)", line_content_stripped) if file_match: if current_file_path is not None and in_file_definition: content_to_save = "\n".join(current_file_content_lines).strip() space_info["files"].append({"path": current_file_path, "content": content_to_save}) filename_line = file_match.group("filename_line").strip() current_file_path = filename_line current_file_path = re.split(r'\s*\(', current_file_path, 1)[0].strip() current_file_path = current_file_path.strip('`\'"').strip() if not current_file_path: file_parsing_errors.append(f"Line {line_num}: Found '### File:' but filename is empty or invalid.") current_file_path = None in_file_definition = False continue current_file_content_lines = [] in_file_definition = True in_code_block = False logger.debug(f"Parsed file header: {current_file_path}") continue if not in_file_definition: if line_content_stripped.startswith("# Space:"): full_space_name_md = line_content_stripped.replace("# Space:", "").strip() if "/" in full_space_name_md: parts = full_space_name_md.split("/", 1) if len(parts) == 2: space_info["owner_md"], space_info["repo_name_md"] = parts[0].strip(), parts[1].strip() else: space_info["repo_name_md"] = full_space_name_md else: space_info["repo_name_md"] = full_space_name_md logger.debug(f"Parsed space header: {space_info['owner_md']}/{space_info['repo_name_md']}") continue if line_content_stripped.startswith("## File Structure"): structure_block_start = i + 1 while structure_block_start < len(lines) and not lines[structure_block_start].strip().startswith("```"): structure_block_start += 1 if structure_block_start < len(lines) and lines[structure_block_start].strip().startswith("```"): structure_block_end = structure_block_start + 1 while structure_block_end < len(lines) and not lines[structure_block_end].strip().startswith("```"): structure_block_end += 1 if structure_block_end < len(lines) and lines[structure_block_end].strip().startswith("```"): logger.debug(f"Skipping File Structure block from line {i+1} to {structure_block_end+1}") i = structure_block_end continue continue if in_file_definition: if line_content_stripped.startswith("```"): in_code_block = not in_code_block logger.debug(f"Toggled code block to {in_code_block} at line {line_num}") continue if in_code_block: current_file_content_lines.append(line_content_orig) elif line_content_stripped.startswith("[Binary file") or line_content_stripped.startswith("[Error loading content:") or line_content_stripped.startswith("[Binary or Skipped file]"): current_file_content_lines.append(line_content_orig) logger.debug(f"Parsed binary/error marker for {current_file_path} at line {line_num}") else: pass if current_file_path is not None and in_file_definition: content_to_save = "\n".join(current_file_content_lines).strip() space_info["files"].append({"path": current_file_path, "content": content_to_save}) space_info["files"] = [f for f in space_info["files"] if f.get("path")] space_info["owner_md"] = space_info["owner_md"].strip() space_info["repo_name_md"] = space_info["repo_name_md"].strip() if file_parsing_errors: logger.warning(f"Markdown parsing encountered errors: {file_parsing_errors}") logger.info(f"Parsed markdown. Found {len(space_info['files'])} files.") return space_info def get_space_repository_info(ui_api_token_from_textbox, space_name_ui, owner_ui): repo_id_for_error_logging = f"{owner_ui}/{space_name_ui}" if owner_ui else space_name_ui sdk = None files = [] error = None repo_id = None logger.info(f"Attempting to get repo info for {repo_id_for_error_logging}") try: resolved_api_token, token_err = _get_api_token(ui_api_token_from_textbox) if token_err: return None, [], token_err repo_id, err_repo_id = _determine_repo_id(resolved_api_token, owner_ui, space_name_ui) if err_repo_id: return None, [], err_repo_id repo_id_for_error_logging = repo_id api = HfApi(token=resolved_api_token) repo_info_obj = api.repo_info(repo_id=repo_id, repo_type="space", timeout=20) sdk = repo_info_obj.sdk files = [sibling.rfilename for sibling in repo_info_obj.siblings if sibling.rfilename] if not files and repo_info_obj.siblings: logger.warning(f"Repo {repo_id} has siblings but no rfilenames extracted. Total siblings: {len(repo_info_obj.siblings)}") logger.info(f"Successfully got repo info for {repo_id}. SDK: {sdk}, Files found: {len(files)}") except HfHubHTTPError as e_http: logger.error(f"HTTP error getting repo info for {repo_id_for_error_logging or 'unknown repo'}: {e_http}") error_message = str(e_http) status_code = e_http.response.status_code if e_http.response is not None else None if status_code == 404: error = f"Space '{repo_id_for_error_logging or 'unknown repo'}' not found (404)." elif status_code in (401,403): error = f"Access denied for '{repo_id_for_error_logging or 'unknown repo'}' ({status_code}). Check token permissions." else: error = f"HTTP Error {status_code or 'unknown'} for '{repo_id_for_error_logging or 'unknown repo'}': {error_message}" except Exception as e: logger.warning(f"Could not get full repo_info for {repo_id_for_error_logging or 'unknown repo'}, attempting list_repo_files fallback: {e}") error = f"Error retrieving Space info for `{repo_id_for_error_logging or 'unknown repo'}`: {str(e)}. Attempting file list fallback." try: resolved_api_token_fb, token_err_fb = _get_api_token(ui_api_token_from_textbox) if token_err_fb: return None, [], f"{error}\nAPI Token Error during fallback: {token_err_fb}" repo_id_fb, err_repo_id_fb = _determine_repo_id(resolved_api_token_fb, owner_ui, space_name_ui) if err_repo_id_fb: return None, [], f"{error}\nRepo ID Error during fallback: {err_repo_id_fb}" files = list_repo_files(repo_id=repo_id_fb, token=resolved_api_token_fb, repo_type="space", timeout=20) error = f"Warning: Could not fetch full Space info (SDK etc.) for `{repo_id_for_error_logging or 'unknown repo'}`: {str(e)}. File list loaded via fallback." logger.info(f"Fallback list_repo_files successful for {repo_id_fb}. Files found: {len(files)}") except HfHubHTTPError as e2_http: logger.error(f"HTTP error during fallback list_repo_files for {repo_id_for_error_logging or 'unknown repo'}: {e2_http}") error_message_fb = str(e2_http) status_code_fb = e2_http.response.status_code if e2_http.response is not None else None if status_code_fb == 404: error = f"Space '{repo_id_for_error_logging or 'unknown repo'}' not found during fallback (404)." else: error = f"HTTP Error {status_code_fb or 'unknown'} for '{repo_id_for_error_logging or 'unknown repo'}' during fallback: {error_message_fb}" files = [] except Exception as e2: logger.exception(f"Error listing files for {repo_id_for_error_logging or 'unknown repo'} during fallback: {e2}") error = f"{error}\nError listing files during fallback for `{repo_id_for_error_logging or 'unknown repo'}`: {str(e2)}" files = [] if not files and not error and (repo_id_for_error_logging is not None): error = f"No files found in Space `{repo_id_for_error_logging or 'unknown repo'}`." return sdk, files, error def list_space_files_for_browsing(ui_api_token_from_textbox, space_name_ui, owner_ui): files, err = get_space_repository_info(ui_api_token_from_textbox, space_name_ui, owner_ui)[1:] return files, err def get_space_file_content(ui_api_token_from_textbox, space_name_ui, owner_ui, file_path_in_repo): repo_id_for_error_logging = f"{owner_ui}/{space_name_ui}" if owner_ui else space_name_ui repo_id = None logger.info(f"Attempting to get content for file '{file_path_in_repo}' from {repo_id_for_error_logging}") try: resolved_api_token, token_err = _get_api_token(ui_api_token_from_textbox) if token_err: return None, token_err repo_id, err_repo_id = _determine_repo_id(resolved_api_token, owner_ui, space_name_ui) if err_repo_id: return None, err_repo_id repo_id_for_error_logging = repo_id if not file_path_in_repo: return None, "Error: File path cannot be empty." file_path_in_repo = file_path_in_repo.replace("\\", "/") downloaded_file_path = hf_hub_download( repo_id=repo_id, filename=file_path_in_repo, repo_type="space", token=resolved_api_token, local_dir_use_symlinks=False, cache_dir=None, timeout=20 ) content = Path(downloaded_file_path).read_text(encoding="utf-8") logger.info(f"Successfully downloaded and read content for '{file_path_in_repo}'.") return content, None except FileNotFoundError: logger.error(f"FileNotFoundError for '{file_path_in_repo}' in {repo_id_for_error_logging or 'unknown'}") return None, f"Error: File '{file_path_in_repo}' not found in Space '{repo_id_for_error_logging or 'unknown repo'}' (404)." except UnicodeDecodeError: logger.warning(f"UnicodeDecodeError for '{file_path_in_repo}'. Likely binary.") return None, f"Error: File '{file_path_in_repo}' is not valid UTF-8 text. Cannot display." except HfHubHTTPError as e_http: logger.error(f"HTTP error fetching file {file_path_in_repo} from {repo_id_for_error_logging or 'unknown repo'}: {e_http}") error_message = str(e_http) status_code = e_http.response.status_code if e_http.response is not None else None if status_code == 404: return None, f"Error: File '{file_path_in_repo}' not found in Space '{repo_id_for_error_logging or 'unknown repo'}' (404)." if status_code in (401, 403): return None, f"Error: Access denied or authentication required for '{repo_id_for_error_logging or 'unknown repo'}' ({status_code}). Check token permissions." return None, f"HTTP Error {status_code or 'unknown'} fetching file '{file_path_in_repo}': {error_message}" except Exception as e: logger.exception(f"Error fetching file content for {file_path_in_repo} from {repo_id_for_error_logging or 'unknown repo'}:") return None, f"Error fetching file content: {str(e)}" def apply_staged_file_changes(ui_api_token_from_textbox, owner_ui, space_name_ui, file_changeset): repo_id_for_error_logging = f"{owner_ui}/{space_name_ui}" if owner_ui else space_name_ui repo_id = None status_messages = [] logger.info(f"Attempting to apply {len(file_changeset)} staged file changes to {repo_id_for_error_logging}") if not owner_ui or not space_name_ui: return "Error: Cannot apply file changes. Owner and Space Name must be provided." try: resolved_api_token, token_err = _get_api_token(ui_api_token_from_textbox) if token_err: return f"API Token Error: {token_err}" repo_id, err_repo_id = _determine_repo_id(resolved_api_token, owner_ui, space_name_ui) if err_repo_id: return f"Repo ID Error: {err_repo_id}" repo_id_for_error_logging = repo_id api = HfApi(token=resolved_api_token) temp_dir = None paths_to_upload = {} delete_operations = [] try: temp_dir = tempfile.TemporaryDirectory() repo_staging_path = Path(temp_dir.name) / "repo_staging_content" repo_staging_path.mkdir(exist_ok=True) gitattributes_path_local = repo_staging_path / ".gitattributes" try: with open(gitattributes_path_local, "w", encoding="utf-8") as f: f.write("* text=auto eol=lf\n") paths_to_upload[str(gitattributes_path_local)] = ".gitattributes" except Exception as e: status_messages.append(f"Warning: Could not stage .gitattributes file: {e}") logger.warning(f"Could not stage .gitattributes: {e}") for change in file_changeset: if change['type'] == 'UPDATE_FILE' or change['type'] == 'CREATE_FILE': file_path_in_repo = change['path'].lstrip('/').replace(os.sep, '/') if not file_path_in_repo: status_messages.append(f"Skipping {change['type']} operation: empty path.") continue content_to_write = change.get('content', '') if content_to_write.startswith("[Binary file") or content_to_write.startswith("[Error loading content:") or content_to_write.startswith("[Binary or Skipped file]"): status_messages.append(f"Skipping {change['type']} for '{file_path_in_repo}': Content is a binary/error placeholder.") logger.warning(f"Skipping {change['type']} operation for '{file_path_in_repo}': Content is binary/error placeholder.") continue file_path_local = repo_staging_path / file_path_in_repo file_path_local.parent.mkdir(parents=True, exist_ok=True) try: with open(file_path_local, "w", encoding="utf-8") as f: f.write(content_to_write) paths_to_upload[str(file_path_local)] = file_path_in_repo logger.debug(f"Staged file for {change['type']}: {file_path_in_repo}") except Exception as file_write_error: status_messages.append(f"Error staging file {file_path_in_repo} for {change['type']}: {file_write_error}") logger.error(f"Error writing file {file_path_in_repo} during staging for {change['type']}: {file_write_error}") elif change['type'] == 'DELETE_FILE': file_path_in_repo = change['path'].lstrip('/').replace(os.sep, '/') if not file_path_in_repo: status_messages.append(f"Skipping DELETE_FILE operation: empty path.") continue delete_operations.append(CommitOperationDelete(path_in_repo=file_path_in_repo)) logger.debug(f"Added DELETE_FILE operation for: {file_path_in_repo}") if delete_operations: try: commit_message_delete = f"AI Space Builder: Deleted {len(delete_operations)} files." logger.info(f"Performing delete commit for {repo_id_for_error_logging}: {commit_message_delete}") api.create_commit( repo_id=repo_id, repo_type="space", operations=delete_operations, commit_message=commit_message_delete, timeout=30 ) status_messages.append(f"File Deletions: Successfully committed {len(delete_operations)} deletions.") logger.info("Delete commit successful.") except HfHubHTTPError as e_http: status_messages.append(f"File Deletion HTTP Error ({e_http.response.status_code if e_http.response else 'N/A'}): {e_http.response.text if e_http.response else str(e_http)}. Check logs.") logger.error(f"HTTP error during delete commit for {repo_id}: {e_http}") except Exception as e_delete_commit: status_messages.append(f"File Deletion Error: {str(e_delete_commit)}. Check logs.") logger.exception(f"Error during delete commit for {repo_id}:") if paths_to_upload: try: commit_message_upload = f"AI Space Builder: Updated Space content for {repo_id}" logger.info(f"Uploading staged files from {str(repo_staging_path)} to {repo_id}...") upload_folder( repo_id=repo_id, folder_path=str(repo_staging_path), path_in_repo=".", token=resolved_api_token, repo_type="space", commit_message=commit_message_upload, allow_patterns=["*"], timeout=120 ) status_messages.append(f"File Uploads/Updates: Successfully uploaded/updated {len(paths_to_upload)} files.") logger.info("Upload/Update commit successful.") except HfHubHTTPError as e_http: status_messages.append(f"File Upload/Update HTTP Error ({e_http.response.status_code if e_http.response else 'N/A'}): {e_http.response.text if e_http.response else str(e_http)}. Check logs.") logger.error(f"HTTP error during upload_folder for {repo_id}: {e_http}") except Exception as e_upload: status_messages.append(f"File Upload/Update Error: {str(e_upload)}. Check logs.") logger.exception(f"Error during upload_folder for {repo_id}:") else: status_messages.append("No file changes (create/update/delete) to commit.") logger.info("No file changes to commit.") finally: if temp_dir: try: temp_dir.cleanup() logger.info("Cleaned up temporary staging directory.") except Exception as e: logger.error(f"Error cleaning up temp dir: {e}") except HfHubHTTPError as e_http: logger.error(f"Top-level HTTP error during apply_staged_file_changes for {repo_id_for_error_logging or 'unknown repo'}: {e_http}") status_messages.append(f"API HTTP Error ({e_http.response.status_code if e_http.response else 'N/A'}): {e_http.response.text if e_http.response else str(e_http)}") except Exception as e: logger.exception(f"Top-level error during apply_staged_file_changes for {repo_id_for_error_logging or 'unknown repo'}:") status_messages.append(f"An unexpected error occurred during apply file changes: {str(e)}") final_status = " | ".join(status_messages) if status_messages else "No file operations were applied." logger.info(f"Finished applying staged file changes. Final status: {final_status}") return final_status def build_logic_create_space(ui_api_token_from_textbox, space_name_ui, owner_ui, sdk_ui, markdown_input, private): repo_id_for_error_logging = f"{owner_ui}/{space_name_ui}" if owner_ui else space_name_ui logger.info(f"Attempting to create space: {repo_id_for_error_logging}") if not space_name_ui: return "Error: Space Name cannot be empty." if "/" in space_name_ui: return "Error: Space Name should not contain '/'." resolved_api_token, token_err = _get_api_token(ui_api_token_from_textbox) if token_err: return f"API Token Error: {token_err}" final_owner = owner_ui if not final_owner: try: user_info = whoami(token=resolved_api_token) final_owner = user_info.get('name') if not final_owner: raise Exception("Could not find user name from token.") except Exception as e: return f"Error auto-detecting owner: {str(e)}. Specify Owner field." if not final_owner: return "Error: Owner could not be determined." repo_id = f"{final_owner}/{space_name_ui}" temp_dir = None try: api = HfApi(token=resolved_api_token) # Create the repository api.create_repo(repo_id=repo_id, repo_type="space", space_sdk=sdk_ui, private=private, exist_ok=False) logger.info(f"Successfully created empty space: {repo_id}") # Stage files from markdown for upload if markdown is provided if markdown_input: parsed_md = parse_markdown(markdown_input) files_to_upload = parsed_md.get("files", []) if files_to_upload: logger.info(f"Staging {len(files_to_upload)} files for upload after creation.") temp_dir = tempfile.TemporaryDirectory() repo_staging_path = Path(temp_dir.name) / "initial_content" repo_staging_path.mkdir(exist_ok=True) paths_to_upload = {} status_messages = [] # Add .gitattributes gitattributes_path_local = repo_staging_path / ".gitattributes" try: with open(gitattributes_path_local, "w", encoding="utf-8") as f: f.write("* text=auto eol=lf\n") paths_to_upload[str(gitattributes_path_local)] = ".gitattributes" except Exception as e: status_messages.append(f"Warning: Could not stage .gitattributes file: {e}") logger.warning(f"Could not stage .gitattributes: {e}") for file_info in files_to_upload: file_path_in_repo = file_info['path'].lstrip('/').replace(os.sep, '/') content_to_write = file_info.get('content', '') if not file_path_in_repo: status_messages.append(f"Skipping file creation: empty path.") continue if content_to_write.startswith("[Binary file") or content_to_write.startswith("[Error loading content:") or content_to_write.startswith("[Binary or Skipped file]"): status_messages.append(f"Skipping file '{file_path_in_repo}': Content is a binary/error placeholder.") logger.warning(f"Skipping file '{file_path_in_repo}': Content is binary/error placeholder.") continue file_path_local = repo_staging_path / file_path_in_repo file_path_local.parent.mkdir(parents=True, exist_ok=True) try: with open(file_path_local, "w", encoding="utf-8") as f: f.write(content_to_write) paths_to_upload[str(file_path_local)] = file_path_in_repo except Exception as file_write_error: status_messages.append(f"Error staging file {file_path_in_repo}: {file_write_error}") if paths_to_upload: try: commit_message_upload = "Initial files from AI Space Builder" upload_folder( repo_id=repo_id, folder_path=str(repo_staging_path), path_in_repo=".", token=resolved_api_token, repo_type="space", commit_message=commit_message_upload, allow_patterns=["*"], timeout=120 ) status_messages.append(f"Successfully uploaded initial {len(paths_to_upload)} files.") logger.info("Initial upload successful.") except HfHubHTTPError as e_http: status_messages.append(f"Initial Upload HTTP Error ({e_http.response.status_code if e_http.response else 'N/A'}): {e_http.response.text if e_http.response else str(e_http)}. Check logs.") logger.error(f"HTTP error during initial upload for {repo_id}: {e_http}") except Exception as e_upload: status_messages.append(f"Initial Upload Error: {str(e_upload)}. Check logs.") logger.exception(f"Error during initial upload for {repo_id}:") else: status_messages = ["No files parsed from markdown for initial upload."] else: status_messages = ["No markdown provided for initial files."] final_status = "Successfully created space" if status_messages: final_status += " | " + " | ".join(status_messages) return final_status except HfHubHTTPError as e_http: logger.error(f"HTTP error creating space {repo_id_for_error_logging or 'unknown repo'}: {e_http}") status_code = e_http.response.status_code if e_http.response else 'N/A' if status_code == 409: # Conflict - repo already exists return f"Create Space Error ({status_code}): Space '{repo_id_for_error_logging or 'unknown'}' already exists. Use 'Build / Update' instead." if status_code in (401, 403): return f"Create Space Error ({status_code}): Access denied or authentication required for '{repo_id_for_error_logging or 'unknown'}'. Check token permissions." return f"Create Space HTTP Error ({status_code}): {e_http.response.text if e_http.response else str(e_http)}" except Exception as e: logger.exception(f"Error creating space {repo_id_for_error_logging or 'unknown repo'}:") return f"Create Space Error: {str(e)}" finally: if temp_dir: try: temp_dir.cleanup() except Exception as e: logger.error(f"Error cleaning up temp dir: {e}") def delete_space_file(ui_api_token_from_textbox, space_name_ui, owner_ui, file_path_in_repo, commit_message_ui=None): repo_id_for_error_logging = f"{owner_ui}/{space_name_ui}" if owner_ui else space_name_ui repo_id = None logger.info(f"Attempting manual file deletion for '{file_path_in_repo}' from {repo_id_for_error_logging}") try: resolved_api_token, token_err = _get_api_token(ui_api_token_from_textbox) if token_err: return f"API Token Error: {token_err}" repo_id, err_repo_id = _determine_repo_id(resolved_api_token, owner_ui, space_name_ui) if err_repo_id: return f"Repo ID Error: {err_repo_id}" repo_id_for_error_logging = repo_id if not file_path_in_repo: return "Error: File path cannot be empty for deletion." file_path_in_repo = file_path_in_repo.lstrip('/').replace(os.sep, '/') effective_commit_message = commit_message_ui or f"Deleted file: {file_path_in_repo} via AI Space Editor UI" hf_delete_file( path_in_repo=file_path_in_repo, repo_id=repo_id, repo_type="space", token=resolved_api_token, commit_message=effective_commit_message, timeout=20 ) logger.info(f"Successfully deleted file: {file_path_in_repo}") return f"Successfully deleted file: `{file_path_in_repo}`" except FileNotFoundError: logger.error(f"FileNotFoundError during manual delete for '{file_path_in_repo}' in {repo_id_for_error_logging or 'unknown'}") return f"Delete Error: File '{file_path_in_repo}' not found in Space '{repo_id_for_error_logging or 'unknown repo'}' (404)." except HfHubHTTPError as e_http: logger.error(f"HTTP error deleting file {file_path_in_repo} from {repo_id_for_error_logging or 'unknown repo'}: {e_http}") error_message = str(e_http) status_code = e_http.response.status_code if e_http.response is not None else None if status_code == 404: return f"Delete Error: File '{file_path_in_repo}' not found in Space '{repo_id_for_error_logging or 'unknown repo'}' for deletion (404)." if status_code in (401, 403): return f"Delete Error: Access denied or authentication required for '{repo_id_for_error_logging or 'unknown repo'}' ({status_code}). Check token permissions." return f"Delete HTTP Error {status_code or 'unknown'} deleting file '{file_path_in_repo}': {error_message}" except Exception as e: logger.exception(f"Error deleting file {file_path_in_repo} from {repo_id_for_error_logging or 'unknown repo'}:") return f"Delete Error deleting file '{file_path_in_repo}': {str(e)}" def update_space_file(ui_api_token_from_textbox, space_name_ui, owner_ui, file_path_in_repo, file_content, commit_message_ui): repo_id_for_error_logging = f"{owner_ui}/{space_name_ui}" if owner_ui else space_name_ui repo_id = None logger.info(f"Attempting manual file update for '{file_path_in_repo}' in {repo_id_for_error_logging}") try: resolved_api_token, token_err = _get_api_token(ui_api_token_from_textbox) if token_err: return f"API Token Error: {token_err}" repo_id, err_repo_id = _determine_repo_id(resolved_api_token, owner_ui, space_name_ui) if err_repo_id: return f"Repo ID Error: {err_repo_id}" repo_id_for_error_logging = repo_id if not file_path_in_repo: return "Update Error: File Path to update cannot be empty." file_path_in_repo = file_path_in_repo.lstrip('/').replace(os.sep, '/') commit_msg = commit_message_ui or f"Update {file_path_in_repo} via AI Space Editor UI" api = HfApi(token=resolved_api_token) tmp_file_path = None try: with tempfile.NamedTemporaryFile(mode='w', delete=False, encoding='utf-8') as tmp_file_obj: tmp_file_obj.write(file_content) tmp_file_path = tmp_file_obj.name api.upload_file( path_or_fileobj=tmp_file_path, path_in_repo=file_path_in_repo, repo_id=repo_id, repo_type="space", commit_message=commit_msg, timeout=20 ) logger.info(f"Successfully updated file: {file_path_in_repo}") return f"Successfully updated `{file_path_in_repo}`" finally: if tmp_file_path and os.path.exists(tmp_file_path): os.remove(tmp_file_path) except FileNotFoundError: logger.error(f"FileNotFoundError during manual update for '{file_path_in_repo}' in {repo_id_for_error_logging or 'unknown'}") return f"Update Error: Local temporary file not found during upload for '{file_path_in_repo}'." except UnicodeDecodeError: logger.warning(f"UnicodeDecodeError for '{file_path_in_repo}' during manual update.") return f"Update Error: Content for '{file_path_in_repo}' is not valid UTF-8 text. Cannot edit this way." except HfHubHTTPError as e_http: logger.error(f"HTTP error in update_space_file for {repo_id_for_error_logging or 'unknown repo'}, file {file_path_in_repo}: {e_http}") error_message = str(e_http) status_code = e_http.response.status_code if e_http.response is not None else None if status_code == 404: return f"Update Error: Space '{repo_id_for_error_logging or 'unknown repo'}' or file '{file_path_in_repo}' not found (404)." if status_code in (401, 403): return f"Update Error: Access denied or authentication required for '{repo_id_for_error_logging or 'unknown repo'}' ({status_code}). Check token permissions." return f"Update HTTP Error {status_code or 'unknown'} updating file '{file_path_in_repo}': {error_message}" except Exception as e: logger.exception(f"Error in update_space_file for {repo_id_for_error_logging or 'unknown repo'}, file {file_path_in_repo}:") return f"Update Error updating file for `{repo_id_for_error_logging or 'unknown repo'}`: {str(e)}" def get_space_runtime_status(ui_api_token_from_textbox, space_name_ui, owner_ui): repo_id_for_error_logging = f"{owner_ui}/{space_name_ui}" if owner_ui else space_name_ui repo_id = None logger.info(f"Fetching runtime status for Space: {repo_id_for_error_logging}") try: resolved_api_token, token_err = _get_api_token(ui_api_token_from_textbox) if token_err: return None, f"API Token Error: {token_err}" repo_id, err_repo_id = _determine_repo_id(resolved_api_token, owner_ui, space_name_ui) if err_repo_id: return None, f"Repo ID Error: {err_repo_id}" repo_id_for_error_logging = repo_id api = HfApi(token=resolved_api_token) runtime_info = api.get_space_runtime(repo_id=repo_id, timeout=20) logger.info(f"Received runtime info for {repo_id}. Stage: {runtime_info.stage}") status_details = { "stage": runtime_info.stage, "hardware": runtime_info.hardware, "requested_hardware": runtime_info.requested_hardware if hasattr(runtime_info, 'requested_hardware') else None, "error_message": None, "status": runtime_info.status if hasattr(runtime_info, 'status') else None, "full_log_link": f"https://huggingface.co/spaces/{repo_id}/logs" if repo_id else "#" } if runtime_info.stage == "ERRORED": error_content = None if hasattr(runtime_info, 'error') and runtime_info.error: error_content = str(runtime_info.error) if 'build' in runtime_info.raw and isinstance(runtime_info.raw['build'], dict) and runtime_info.raw['build'].get('status') == 'error': error_content = f"Build Error: {runtime_info.raw['build'].get('message', error_content or 'Unknown build error')}" elif 'run' in runtime_info.raw and isinstance(runtime_info.raw['run'], dict) and runtime_info.raw['run'].get('status') == 'error': error_content = f"Runtime Error: {runtime_info.raw['run'].get('message', error_content or 'Unknown runtime error')}" elif 'message' in runtime_info.raw and isinstance(runtime_info.raw['message'], str) and ('error' in runtime_info.raw['message'].lower() or runtime_info.raw['message'].strip().endswith('!')): error_content = runtime_info.raw['message'] status_details["error_message"] = error_content if error_content else "Space is in an errored state. Check logs for details." logger.info(f"Runtime status details for {repo_id}: {status_details}") return status_details, None except HfHubHTTPError as e_http: logger.error(f"HTTP error fetching runtime status for {repo_id_for_error_logging or 'unknown repo'}: {e_http}") error_message = str(e_http) status_code = e_http.response.status_code if e_http.response is not None else None if status_code == 404: return None, f"Status Error: Space '{repo_id_for_error_logging or 'unknown repo'}' not found or has no active runtime status (404)." if status_code in (401, 403): return None, f"Status Error: Access denied or authentication required for '{repo_id_for_error_logging or 'unknown repo'}' ({status_code}). Check token permissions." return None, f"Status HTTP Error {status_code or 'unknown'} fetching runtime status for '{repo_id_for_error_logging or 'unknown repo'}': {error_message}" except Exception as e: logger.exception(f"Error fetching runtime status for {repo_id_for_error_logging or 'unknown repo'}:") return None, f"Status Error fetching runtime status: {str(e)}" def build_logic_set_space_privacy(hf_api_key, repo_id, private: bool): logger.info(f"Attempting to set privacy for '{repo_id}' to {private}.") try: token, err = _get_api_token(hf_api_key) if err or not token: logger.error(f"Token error setting privacy: {err or 'Token not found'}") return f"Error getting token: {err or 'Token not found.'}" api = HfApi(token=token) api.update_repo_visibility(repo_id=repo_id, private=private, repo_type='space') logger.info(f"Successfully set privacy for {repo_id} to {private}.") return f"Successfully set privacy for `{repo_id}` to `{private}`." except HfHubHTTPError as e_http: logger.error(f"HTTP error setting privacy for {repo_id}: {e_http}") status_code = e_http.response.status_code if e_http.response else 'N/A' return f"HTTP Error ({status_code}) setting privacy for `{repo_id}`: {e_http.response.text if e_http.response else str(e_http)}" except Exception as e: logger.exception(f"Error setting privacy for {repo_id}:") return f"Error setting privacy for `{repo_id}`: {e}" def build_logic_delete_space(hf_api_key, owner, space_name): repo_id = f"{owner}/{space_name}" logger.warning(f"Attempting DESTRUCTIVE DELETE_SPACE action for '{repo_id}'.") try: token, err = _get_api_token(hf_api_key) if err or not token: logger.error(f"Token error deleting space: {err or 'Token not found'}") return f"Error getting token: {err or 'Token not found.'}" api = HfApi(token=token) api.delete_repo(repo_id=repo_id, repo_type='space') logger.warning(f"Successfully deleted space {repo_id}.") return f"Successfully deleted space `{repo_id}`." except HfHubHTTPError as e_http: logger.error(f"HTTP error deleting space {repo_id}: {e_http}") status_code = e_http.response.status_code if e_http.response else 'N/A' return f"HTTP Error ({status_code}) deleting space `{repo_id}`: {e_http.response.text if e_http.response else str(e_http)}" except Exception as e: logger.exception(f"Error deleting space {repo_id}:") return f"Error deleting space `{repo_id}`: {e}" def build_logic_create_pull_request(hf_api_key, source_repo_id, target_repo_id, title, body=""): logger.info(f"Attempting to create PR from '{source_repo_id}' to '{target_repo_id}'. Title: '{title}'") try: token, err = _get_api_token(hf_api_key) if err or not token: logger.error(f"Token error creating PR: {err or 'Token not found'}") return f"Error getting token: {err or 'Token not found.'}" api = HfApi(token=token) pr_url = hf_create_pull_request( repo_id=target_repo_id, title=title, description=body, base_repo=source_repo_id, token=token, timeout=30 ) logger.info(f"Successfully created PR: {pr_url}") return f"Successfully created Pull Request: {pr_url}" except HfHubHTTPError as e_http: logger.error(f"HTTP error creating PR from {source_repo_id} to {target_repo_id}: {e_http}") status_code = e_http.response.status_code if e_http.response else 'N/A' if status_code in (401, 403): return f"PR Error ({status_code}): Access denied or authentication required to create PR on '{target_repo_id}'. Check token permissions." if status_code == 404: return f"PR Error ({status_code}): Target repository '{target_repo_id}' not found." if e_http.response and isinstance(e_http.response.text, str) and 'already exists' in e_http.response.text: return f"PR Error: Pull Request already exists." return f"PR HTTP Error ({status_code}): {e_http.response.text if e_http.response else str(e_http)}" except Exception as e: logger.exception(f"Error creating PR from {source_repo_id} to {target_repo_id}:") return f"PR Error: {e}" def build_logic_add_comment(hf_api_key, repo_id, comment_text): logger.info(f"Attempting to add comment to '{repo_id}'. Text: '{comment_text[:50]}...'") try: token, err = _get_api_token(hf_api_key) if err or not token: logger.error(f"Token error adding comment: {err or 'Token not found'}") return f"Error getting token: {err or 'Token not found.'}" # Use the new public API method add_space_comment hf_add_space_comment( repo_id=repo_id, comment=comment_text, token=token, timeout=20 ) logger.info(f"Successfully added comment to {repo_id}.") # Note: hf_add_space_comment doesn't return the comment URL directly return f"Successfully added comment to `{repo_id}`." except HfHubHTTPError as e_http: logger.error(f"HTTP error adding comment to {repo_id}: {e_http}") status_code = e_http.response.status_code if e_http.response else 'N/A' if status_code in (401, 403): return f"Comment Error ({status_code}): Access denied or authentication required to add comment on '{repo_id}'. Check token permissions." if status_code == 404: return f"Comment Error ({status_code}): Repository '{repo_id}' not found." return f"Comment HTTP Error ({status_code}): {e_http.response.text if e_http.response else str(e_http)}" except Exception as e: logger.exception(f"Error adding comment to {repo_id}:") return f"Comment Error: {e}" def build_logic_like_space(hf_api_key, repo_id): logger.info(f"Attempting to like space '{repo_id}'.") return "'Like system' not installed." def duplicate_space(hf_api_key, source_repo_id, target_repo_id, private: bool = False): """Duplicates a Hugging Face Space.""" logger.info(f"Attempting to duplicate '{source_repo_id}' to '{target_repo_id}' (private={private}).") try: token, err = _get_api_token(hf_api_key) if err or not token: logger.error(f"Token error duplicating space: {err or 'Token not found'}") return f"Error getting token: {err or 'Token not found.'}" if '/' in target_repo_id: target_owner, target_space_name = target_repo_id.split('/', 1) if not target_owner or not target_space_name or '/' in target_space_name: return f"Error: Invalid target repository ID format '{target_repo_id}'. Must be '/'." else: target_space_name = target_repo_id try: user_info = whoami(token=token) target_owner = user_info.get('name') if not target_owner: raise Exception("Could not determine owner from token.") target_repo_id = f"{target_owner}/{target_space_name}" except Exception as e: logger.error(f"Could not determine target owner from token: {e}") return f"Error: Target repository ID '{target_repo_id}' is missing owner, and owner could not be determined from token ({e}). Use '/' format or set the Owner field." api = HfApi(token=token) api.duplicate_repo( from_repo=source_repo_id, to_repo=target_repo_id, repo_type="space", token=token, private=private, exist_ok=True ) logger.info(f"Successfully duplicated space from {source_repo_id} to {target_repo_id}.") return f"Successfully duplicated space from `{source_repo_id}` to `{target_repo_id}`." except HfHubHTTPError as e_http: logger.error(f"HTTP error duplicating space from {source_repo_id} to {target_repo_id}: {e_http}") status_code = e_http.response.status_code if e_http.response else 'N/A' return f"HTTP Error ({status_code}) duplicating space: {e_http.response.text if e_http.response else str(e_http)}" except Exception as e: logger.exception(f"Error duplicating space from {source_repo_id} to {target_repo_id}:") return f"Error duplicating space: {e}" def list_user_spaces(hf_api_key, owner=None): """Lists spaces for the authenticated user or a specific owner/org.""" logger.info(f"Attempting to list spaces for owner: {owner or 'authenticated user'}.") try: token, err = _get_api_token(hf_api_key) if err or not token: logger.error(f"Token error listing spaces: {err or 'Token not found'}") return None, f"Error getting token: {err or 'Token not found.'}" effective_owner = owner if not effective_owner: try: user_info = whoami(token=token) effective_owner = user_info.get('name') if not effective_owner: raise Exception("Could not determine owner from token.") logger.info(f"Listing spaces for auto-detected owner: {effective_owner}") except Exception as e: logger.error(f"Could not determine owner from token for listing: {e}") return None, f"Error auto-detecting owner for listing: {e}. Please specify Owner field." api = HfApi(token=token) spaces = api.list_spaces(author=effective_owner) #datasets = api.list_datasets(author=effective_owner) #models = api.list_models(author=effective_owner) space_ids = [f"{r.author}/{r.id}" for r in spaces] logger.info(f"Successfully listed {len(space_ids)} spaces for {effective_owner}.") return space_ids, None except HfHubHTTPError as e_http: logger.error(f"HTTP error listing spaces for {owner or 'authenticated user'}: {e_http}") status_code = e_http.response.status_code if e_http.response else 'N/A' if status_code == 404: return [], f"HTTP Error ({status_code}): Owner '{owner}' not found or has no accessible spaces." if status_code in (401, 403): return [], f"HTTP Error ({status_code}): Access denied or authentication required for listing spaces for '{owner}'. Check token permissions." return None, f"HTTP Error ({status_code}) listing spaces for '{owner or 'authenticated user'}': {e_http.response.text if e_http.response else str(e_http)}" except Exception as e: logger.exception(f"Error listing spaces for {owner or 'authenticated user'}:") return None, f"Error listing spaces: {e}"