import os import re import tempfile import shutil import logging from pathlib import Path from huggingface_hub import ( create_repo, upload_folder, list_repo_files, whoami, hf_hub_download, delete_file as hf_delete_file, HfApi ) from huggingface_hub.hf_api import CommitOperationDelete, CommitOperationAdd, CommitOperation # Import the general HTTP error from huggingface_hub.utils from huggingface_hub.utils import HfHubHTTPError # For catching specific HF HTTP errors # Setup basic logging logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s" ) logger = logging.getLogger(__name__) # --- Helper Function to Get API Token --- def _get_api_token(ui_token_from_textbox=None): env_token = os.getenv('HF_TOKEN') if env_token: logger.info("Using HF_TOKEN from environment variable.") return env_token, None if ui_token_from_textbox: logger.info("Using HF_TOKEN from UI textbox.") return ui_token_from_textbox.strip(), None logger.warning("Hugging Face API token not provided in UI or HF_TOKEN env var.") return None, "Error: Hugging Face API token not provided in UI or HF_TOKEN env var." # --- Helper Function to Determine Repo ID --- def _determine_repo_id(ui_api_token_from_textbox, space_name_ui, owner_ui): if not space_name_ui: return None, "Error: Space Name cannot be empty." if "/" in space_name_ui: return None, "Error: Space Name should not contain '/'. Use Owner field for the owner part." final_owner = owner_ui error_message = None if not final_owner: logger.info("Owner not specified, attempting to auto-detect from token.") resolved_api_token, token_err = _get_api_token(ui_api_token_from_textbox) if token_err: return None, token_err if not resolved_api_token: return None, "Error: API token required for auto owner determination if Owner field is empty." try: user_info = whoami(token=resolved_api_token) if user_info and 'name' in user_info: final_owner = user_info['name'] logger.info(f"Auto-detected owner: {final_owner}") else: error_message = "Error: Could not retrieve username from token. Check token permissions or specify Owner." logger.error(error_message) except Exception as e: error_message = f"Error retrieving username from token: {str(e)}. Specify Owner or check token." logger.exception("Error retrieving username from token:") if error_message: return None, error_message if not final_owner: return None, "Error: Owner could not be determined. Please specify it in the Owner field." repo_id = f"{final_owner}/{space_name_ui}" logger.info(f"Determined repo_id: {repo_id}") return repo_id, None # --- Corrected Markdown Parsing --- # This function remains mostly the same as its purpose is just to parse the *AI's output format* # into a structured format, not necessarily to represent the *current state* of a Space. # The app.py logic will use this output and combine it with the current Space state. def parse_markdown(markdown_input): space_info = {"repo_name_md": "", "owner_md": "", "files": []} current_file_path = None current_file_content_lines = [] in_file_definition = False in_code_block = False file_parsing_errors = [] # To collect potential parsing issues lines = markdown_input.strip().split("\n") # Clean up potential leading '#' added by Gradio's Markdown sometimes cleaned_lines = [] for line_content_orig in lines: if line_content_orig.strip().startswith("# "): # Only strip leading # if it looks like a Markdown heading related to our format if line_content_orig.strip().startswith("# ### File:") or \ line_content_orig.strip().startswith("# ## File Structure") or \ line_content_orig.strip().startswith("# # Space:"): cleaned_lines.append(line_content_orig.strip()[2:]) else: cleaned_lines.append(line_content_orig) else: cleaned_lines.append(line_content_orig) lines = cleaned_lines for i, line_content_orig in enumerate(lines): line_content_stripped = line_content_orig.strip() line_num = i + 1 # Check for file header file_match = re.match(r"### File:\s*(?P[^\n]+)", line_content_stripped) if file_match: # Before processing a new file, save the content of the previous one if current_file_path is not None and in_file_definition: # Check if we were inside a file definition # Remove leading/trailing blank lines from content lines content_to_save = "\n".join(current_file_content_lines).strip() space_info["files"].append({"path": current_file_path, "content": content_to_save}) filename_line = file_match.group("filename_line").strip() current_file_path = filename_line # Clean up potential trailing descriptions like "(main application)" current_file_path = re.split(r'\s*\(', current_file_path, 1)[0].strip() # Clean up potential backticks around the filename current_file_path = current_file_path.strip('`\'"').strip() # Add more chars to strip if not current_file_path: file_parsing_errors.append(f"Line {line_num}: Found '### File:' but filename is empty or invalid.") current_file_path = None # Invalidate current file path if parsing failed in_file_definition = False # Exit file definition mode until a valid one is found continue # Move to next line current_file_content_lines = [] in_file_definition = True in_code_block = False # Reset code block flag for the new file logger.debug(f"Parsed file header: {current_file_path}") continue # Move to next line # If not a file header, check for other top-level structures *before* file definitions start if not in_file_definition: if line_content_stripped.startswith("# Space:"): full_space_name_md = line_content_stripped.replace("# Space:", "").strip() if "/" in full_space_name_md: parts = full_space_name_md.split("/", 1) if len(parts) == 2: space_info["owner_md"], space_info["repo_name_md"] = parts[0].strip(), parts[1].strip() else: space_info["repo_name_md"] = full_space_name_md # Handle case like "user/repo/" else: space_info["repo_name_md"] = full_space_name_md logger.debug(f"Parsed space header: {space_info['owner_md']}/{space_info['repo_name_md']}") continue # Ignore ## File Structure headers and their code blocks, as they are not file content if line_content_stripped.startswith("## File Structure"): # Need to consume the following code block if it exists structure_block_start = i + 1 while structure_block_start < len(lines) and not lines[structure_block_start].strip().startswith("```"): structure_block_start += 1 if structure_block_start < len(lines) and lines[structure_block_start].strip().startswith("```"): # Found opening ```, look for closing ``` structure_block_end = structure_block_start + 1 while structure_block_end < len(lines) and not lines[structure_block_end].strip().startswith("```"): structure_block_end += 1 if structure_block_end < len(lines) and lines[structure_block_end].strip().startswith("```"): # Found closing ```, skip all these lines logger.debug(f"Skipping File Structure block from line {i+1} to {structure_block_end+1}") i = structure_block_end # Adjust loop counter (outer loop will increment) continue # Ignore other lines outside a file block definition continue # If we are inside a file definition block (in_file_definition is True) if in_file_definition: # Check for code block start/end if line_content_stripped.startswith("```"): # Toggle code block status in_code_block = not in_code_block # We consume the ``` line(s), do not add to content logger.debug(f"Toggled code block to {in_code_block} at line {line_num}") continue # Do not add the ``` line to content # If inside a code block, add the line as-is (original content, including leading/trailing whitespace) if in_code_block: current_file_content_lines.append(line_content_orig) # If not inside a code block, check for binary file marker or error messages elif line_content_stripped.startswith("[Binary file") or line_content_stripped.startswith("[Error loading content:") or line_content_stripped.startswith("[Binary or Skipped file]"): # Handle binary file markers or error messages as content if not in code block current_file_content_lines.append(line_content_orig) logger.debug(f"Parsed binary/error marker for {current_file_path} at line {line_num}") # Any other lines outside code blocks within a file definition are ignored (e.g., descriptions, blank lines) # This assumes all code/content *must* be within ``` blocks or be a specific marker line. else: # Optionally log ignored lines within a file block if debugging parsing # logger.debug(f"Ignoring line {line_num} within file {current_file_path}: '{line_content_orig}'") pass # After the loop, save the content of the last file if we were inside a file definition if current_file_path is not None and in_file_definition: content_to_save = "\n".join(current_file_content_lines).strip() space_info["files"].append({"path": current_file_path, "content": content_to_save}) # Ensure all file paths are valid and clean up empty files if necessary (based on content parsing) # The parsing logic above should handle stripping content, but this is a final check space_info["files"] = [f for f in space_info["files"] if f.get("path")] # Ensure path exists # Clean up owner/repo names from potential whitespace space_info["owner_md"] = space_info["owner_md"].strip() space_info["repo_name_md"] = space_info["repo_name_md"].strip() if file_parsing_errors: logger.warning(f"Markdown parsing encountered errors: {file_parsing_errors}") # You might want to return the errors or include them in the space_info dict # For now, we just log them. logger.info(f"Parsed markdown. Found {len(space_info['files'])} files.") return space_info # --- Function to Get Space SDK and Files --- def get_space_repository_info(ui_api_token_from_textbox, space_name_ui, owner_ui): repo_id_for_error_logging = f"{owner_ui}/{space_name_ui}" if owner_ui else space_name_ui sdk = None files = [] error = None repo_id = None # Define repo_id here to ensure it's available for error logging after _determine_repo_id logger.info(f"Attempting to get repo info for {repo_id_for_error_logging}") try: resolved_api_token, token_err = _get_api_token(ui_api_token_from_textbox) if token_err: return None, [], token_err repo_id, err_repo_id = _determine_repo_id(ui_api_token_from_textbox, space_name_ui, owner_ui) if err_repo_id: return None, [], err_repo_id repo_id_for_error_logging = repo_id # Update logging name api = HfApi(token=resolved_api_token) # Use repo_info endpoint as it's more robust and gives SDK repo_info_obj = api.repo_info(repo_id=repo_id, repo_type="space", timeout=20) # Added timeout, increased slightly sdk = repo_info_obj.sdk files = [sibling.rfilename for sibling in repo_info_obj.siblings if sibling.rfilename] if not files and repo_info_obj.siblings: logger.warning(f"Repo {repo_id} has siblings but no rfilenames extracted. Total siblings: {len(repo_info_obj.siblings)}") # Sometimes empty repos exist, or listing might fail partially. # Continue, files list is just empty. logger.info(f"Successfully got repo info for {repo_id}. SDK: {sdk}, Files found: {len(files)}") except HfHubHTTPError as e_http: # Catch specific HF HTTP errors first logger.error(f"HTTP error getting repo info for {repo_id_for_error_logging or 'unknown repo'}: {e_http}") error_message = str(e_http) status_code = e_http.response.status_code if e_http.response is not None else None if status_code == 404: error = f"Space '{repo_id_for_error_logging or 'unknown repo'}' not found (404)." elif status_code in (401,403): error = f"Access denied for '{repo_id_for_error_logging or 'unknown repo'}' ({status_code}). Check token permissions." else: error = f"HTTP Error {status_code or 'unknown'} for '{repo_id_for_error_logging or 'unknown repo'}': {error_message}" except Exception as e: # Catch other general exceptions # If repo_info failed, try listing files as a fallback logger.warning(f"Could not get full repo_info for {repo_id_for_error_logging or 'unknown repo'}, attempting list_repo_files fallback: {e}") error = f"Error retrieving Space info for `{repo_id_for_error_logging or 'unknown repo'}`: {str(e)}. Attempting file list fallback." # Set a warning message try: # Re-determine repo_id and get token for fallback resolved_api_token_fb, token_err_fb = _get_api_token(ui_api_token_from_textbox) if token_err_fb: return None, [], f"{error}\nAPI Token Error during fallback: {token_err_fb}" # Propagate token error repo_id_fb, err_repo_id_fb = _determine_repo_id(ui_api_token_from_textbox, space_name_ui, owner_ui) if err_repo_id_fb: return None, [], f"{error}\nRepo ID Error during fallback: {err_repo_id_fb}" # Propagate repo ID error # Attempt to list files files = list_repo_files(repo_id=repo_id_fb, token=resolved_api_token_fb, repo_type="space", timeout=20) # Added timeout # If fallback is successful, update error message to a warning about repo_info error = f"Warning: Could not fetch full Space info (SDK etc.) for `{repo_id_for_error_logging or 'unknown repo'}`: {str(e)}. File list loaded via fallback." logger.info(f"Fallback list_repo_files successful for {repo_id_fb}. Files found: {len(files)}") except HfHubHTTPError as e2_http: logger.error(f"HTTP error during fallback list_repo_files for {repo_id_for_error_logging or 'unknown repo'}: {e2_http}") error_message_fb = str(e2_http) status_code_fb = e2_http.response.status_code if e2_http.response is not None else None if status_code_fb == 404: error = f"Space '{repo_id_for_error_logging or 'unknown repo'}' not found during fallback (404)." else: error = f"HTTP Error {status_code_fb or 'unknown'} for '{repo_id_for_error_logging or 'unknown repo'}' during fallback: {error_message_fb}" files = [] # Ensure files list is empty on fallback error except Exception as e2: logger.exception(f"Error listing files for {repo_id_for_error_logging or 'unknown repo'} during fallback: {e2}") error = f"{error}\nError listing files during fallback for `{repo_id_for_error_logging or 'unknown repo'}`: {str(e2)}" files = [] # Ensure files list is empty on fallback error # Final check: if files are still empty and there's no specific error, provide a generic "no files" message # or if a specific 404 error occurred. if not files and not error and (repo_id_for_error_logging is not None): error = f"No files found in Space `{repo_id_for_error_logging or 'unknown repo'}`." return sdk, files, error # --- Function to list files --- def list_space_files_for_browsing(ui_api_token_from_textbox, space_name_ui, owner_ui): files, err = get_space_repository_info(ui_api_token_from_textbox, space_name_ui, owner_ui)[1:] return files, err # --- Function to Fetch File Content from Hub --- def get_space_file_content(ui_api_token_from_textbox, space_name_ui, owner_ui, file_path_in_repo): repo_id_for_error_logging = f"{owner_ui}/{space_name_ui}" if owner_ui else space_name_ui repo_id = None logger.info(f"Attempting to get content for file '{file_path_in_repo}' from {repo_id_for_error_logging}") try: resolved_api_token, token_err = _get_api_token(ui_api_token_from_textbox) if token_err: return None, token_err repo_id, err_repo_id = _determine_repo_id(ui_api_token_from_textbox, space_name_ui, owner_ui) if err_repo_id: return None, err_repo_id repo_id_for_error_logging = repo_id # Update logging name if not file_path_in_repo: return None, "Error: File path cannot be empty." # Ensure file_path_in_repo uses forward slashes file_path_in_repo = file_path_in_repo.replace("\\", "/") # Use hf_hub_download first, which caches locally downloaded_file_path = hf_hub_download( repo_id=repo_id, filename=file_path_in_repo, repo_type="space", token=resolved_api_token, local_dir_use_symlinks=False, # Avoid symlinks issues cache_dir=None, # Use default cache dir timeout=20 # Added timeout ) content = Path(downloaded_file_path).read_text(encoding="utf-8") logger.info(f"Successfully downloaded and read content for '{file_path_in_repo}'.") return content, None except FileNotFoundError: logger.error(f"FileNotFoundError for '{file_path_in_repo}' in {repo_id_for_error_logging or 'unknown'}") return None, f"Error: File '{file_path_in_repo}' not found in Space '{repo_id_for_error_logging or 'unknown repo'}' (404)." # Often gets translated from 404 by hf_hub_download except UnicodeDecodeError: # If read_text fails, it's likely binary or non-utf8 text logger.warning(f"UnicodeDecodeError for '{file_path_in_repo}'. Likely binary.") return None, f"Error: File '{file_path_in_repo}' is not valid UTF-8 text. Cannot display." except HfHubHTTPError as e_http: logger.error(f"HTTP error fetching file {file_path_in_repo} from {repo_id_for_error_logging or 'unknown repo'}: {e_http}") error_message = str(e_http) status_code = e_http.response.status_code if e_http.response is not None else None if status_code == 404: return None, f"Error: File '{file_path_in_repo}' not found in Space '{repo_id_for_error_logging or 'unknown repo'}' (404)." if status_code in (401, 403): return None, f"Error: Access denied or authentication required for '{repo_id_for_error_logging or 'unknown repo'}' ({status_code}). Check token permissions." return None, f"HTTP Error {status_code or 'unknown'} fetching file '{file_path_in_repo}': {error_message}" except Exception as e: logger.exception(f"Error fetching file content for {file_path_in_repo} from {repo_id_for_error_logging or 'unknown repo'}:") return None, f"Error fetching file content: {str(e)}" # --- Create/Update Space from Staged Changes --- # This function is modified to take a list of operations (changeset) instead of markdown # It's designed to be called by handle_confirm_changes def apply_staged_changes(ui_api_token_from_textbox, owner_ui, space_name_ui, changeset): repo_id_for_error_logging = f"{owner_ui}/{space_name_ui}" if owner_ui else space_name_ui repo_id = None status_messages = [] operations = [] # List of CommitOperation objects logger.info(f"Attempting to apply {len(changeset)} staged changes to {repo_id_for_error_logging}") try: resolved_api_token, token_err = _get_api_token(ui_api_token_from_textbox) if token_err: return [token_err] repo_id, err_repo_id = _determine_repo_id(ui_api_token_from_textbox, owner_ui, space_name_ui) if err_repo_id: return [err_repo_id] repo_id_for_error_logging = repo_id api = HfApi(token=resolved_api_token) # First, handle Space Creation if it's part of the changeset create_space_op = next((c for c in changeset if c['type'] == 'CREATE_SPACE'), None) if create_space_op: # The repo_id here should match the one determined above, derived from owner_ui/space_name_ui # We assume the UI fields owner_ui and space_name_ui are set to the *target* space for creation # This is a slight divergence from the AI's CREATE_SPACE command which specifies repo_id # We'll prioritize the UI fields as the user's explicit target. # The AI command should ideally set the UI fields first. # TODO: Refine AI CREATE_SPACE to update UI fields first? Or make build_logic_create_space use the AI's repo_id? # Let's adjust `create_space` to take the repo_id directly from the AI command, but validate/use UI token/owner if needed. # But the *current* structure assumes owner_ui/space_name_ui are the target. # Let's stick to the current structure for now: CREATE_SPACE action is noted, but the target repo is from UI fields. # The `create_space` call below *will* create the repo specified by UI fields. # The file operations will then be applied to this new repo. logger.info(f"Detected CREATE_SPACE action for {create_space_op['repo_id']}. Proceeding with creation of {repo_id_for_error_logging} based on UI fields.") # Call the existing create_space logic, but perhaps without markdown? # The subsequent file operations will populate it. # We need a way to create an *empty* repo first. HfApi().create_repo handles this. try: api.create_repo(repo_id=repo_id, repo_type="space", space_sdk=create_space_op.get('sdk', 'gradio'), private=create_space_op.get('private', False), exist_ok=True) status_messages.append(f"CREATE_SPACE: Successfully created or ensured space [{repo_id}](https://huggingface.co/spaces/{repo_id}) exists with SDK '{create_space_op.get('sdk', 'gradio')}' and private={create_space_op.get('private', False)}.") logger.info(f"Successfully created or ensured space {repo_id} exists.") except Exception as e: status_messages.append(f"CREATE_SPACE Error: {e}") logger.error(f"Error creating space {repo_id}: {e}") # If space creation fails, subsequent operations will also fail. # Should we stop here? Let's add a check. # We could try to proceed with file operations if `exist_ok=True` succeeded partially. # For simplicity, let's just report the error and continue, hoping upload_folder is resilient. # A better approach might stop if create_repo fails definitively (e.g., 401/403/409). pass # Continue attempting other operations # Prepare commit operations for file changes (Add/Update/Delete) temp_dir = None operations = [] # Reset operations list for the file commit paths_to_upload = {} # Map of local temp path -> path in repo for Add/Update try: temp_dir = tempfile.TemporaryDirectory() repo_staging_path = Path(temp_dir.name) / "repo_staging_content" repo_staging_path.mkdir(exist_ok=True) # Always stage .gitattributes to ensure consistent line endings gitattributes_path_local = repo_staging_path / ".gitattributes" with open(gitattributes_path_local, "w", encoding="utf-8") as f: f.write("* text=auto eol=lf\n") paths_to_upload[str(gitattributes_path_local)] = ".gitattributes" for change in changeset: if change['type'] == 'UPDATE_FILE' or change['type'] == 'CREATE_FILE': file_path_in_repo = change['path'].lstrip('/').replace(os.sep, '/') if not file_path_in_repo: status_messages.append(f"Skipping {change['type']} operation: empty path.") continue content_to_write = change.get('content', '') # Skip files that were marked as binary/error during loading if content_to_write.startswith("[Binary file") or content_to_write.startswith("[Error loading content:") or content_to_write.startswith("[Binary or Skipped file]"): status_messages.append(f"Skipping {change['type']} for '{file_path_in_repo}': Content is a binary/error placeholder.") logger.warning(f"Skipping {change['type']} operation for '{file_path_in_repo}': Content is binary/error placeholder.") continue file_path_local = repo_staging_path / file_path_in_repo file_path_local.parent.mkdir(parents=True, exist_ok=True) # Create parent directories try: with open(file_path_local, "w", encoding="utf-8") as f: f.write(content_to_write) paths_to_upload[str(file_path_local)] = file_path_in_repo logger.info(f"Staged file for {change['type']}: {file_path_in_repo}") except Exception as file_write_error: status_messages.append(f"Error staging file {file_path_in_repo} for {change['type']}: {file_write_error}") logger.error(f"Error writing file {file_path_in_repo} during staging for {change['type']}: {file_write_error}") elif change['type'] == 'DELETE_FILE': file_path_in_repo = change['path'].lstrip('/').replace(os.sep, '/') if not file_path_in_repo: status_messages.append(f"Skipping DELETE_FILE operation: empty path.") continue operations.append(CommitOperationDelete(path_in_repo=file_path_in_repo)) logger.info(f"Added DELETE_FILE operation for: {file_path_in_repo}") # SET_PRIVACY and DELETE_SPACE are handled separately below after the commit # Add Add/Update operations from staged files for local_path, repo_path in paths_to_upload.items(): # upload_folder/upload_file is simpler than CommitOperationAdd for content # Let's use upload_folder approach if there are files to upload pass # Just stage paths, upload_folder handles the ops # Perform the combined file commit (uploads and deletes) if paths_to_upload or operations: # Check if there's anything to commit logger.info(f"Committing file changes to {repo_id_for_error_logging}. Uploads: {len(paths_to_upload)}, Deletes: {len(operations)}") # upload_folder automatically handles Add/Update operations based on the local folder state # Deletes need to be handled via CommitOperations or a separate delete call. # The simplest approach might be: # 1. Perform deletes using create_commit with CommitOperationDelete. # 2. Perform adds/updates using upload_folder. # This requires two commits if there are both types of operations. delete_operations = [op for op in operations if isinstance(op, CommitOperationDelete)] if delete_operations: try: commit_message_delete = f"AI Space Builder: Deleted {len(delete_operations)} files." logger.info(f"Performing delete commit for {repo_id_for_error_logging}: {commit_message_delete}") api.create_commit( repo_id=repo_id, repo_type="space", operations=delete_operations, commit_message=commit_message_delete ) status_messages.append(f"File Deletions: Successfully committed {len(delete_operations)} deletions.") logger.info("Delete commit successful.") except HfHubHTTPError as e_http: status_messages.append(f"File Deletion Error ({e_http.response.status_code if e_http.response else 'N/A'}): {e_http.response.text if e_http.response else str(e_http)}. Check logs.") logger.error(f"HTTP error during delete commit for {repo_id}: {e_http}") except Exception as e_delete_commit: status_messages.append(f"File Deletion Error: {str(e_delete_commit)}. Check logs.") logger.exception(f"Error during delete commit for {repo_id}:") if paths_to_upload: # If there are files to upload/update (including .gitattributes) try: commit_message_upload = f"AI Space Builder: Updated Space content for {repo_id}" logger.info(f"Uploading staged files from {str(repo_staging_path)} to {repo_id}...") upload_folder( repo_id=repo_id, folder_path=str(repo_staging_path), path_in_repo=".", # Upload to the root of the repository token=resolved_api_token, repo_type="space", commit_message=commit_message_upload, allow_patterns=["*"], # Ensure all staged files are considered # Use force_patterns to ensure specific files are uploaded even if git thinks they are binary # force_patterns=[f.get("path").replace("\\", "/") for f in space_info["files"] if f.get("path")] # This requires knowing the original markdown files # Let's rely on .gitattributes text=auto instead for now. ) status_messages.append(f"File Uploads/Updates: Successfully uploaded/updated {len(paths_to_upload)} files.") logger.info("Upload/Update commit successful.") except HfHubHTTPError as e_http: status_messages.append(f"File Upload/Update Error ({e_http.response.status_code if e_http.response else 'N/A'}): {e_http.response.text if e_http.response else str(e_http)}. Check logs.") logger.error(f"HTTP error during upload_folder for {repo_id}: {e_http}") except Exception as e_upload: status_messages.append(f"File Upload/Update Error: {str(e_upload)}. Check logs.") logger.exception(f"Error during upload_folder for {repo_id}:") else: status_messages.append("No file changes (create/update/delete) to commit.") logger.info("No file changes to commit.") finally: # Clean up temporary directory if temp_dir: try: temp_dir.cleanup() logger.info("Cleaned up temporary staging directory.") except Exception as e: logger.error(f"Error cleaning up temp dir: {e}") # Handle Space Privacy and Delete actions *after* the file commit (if any) for change in changeset: if change['type'] == 'SET_PRIVACY': try: target_repo_id = change.get('repo_id', repo_id) # Use specified repo_id or current if not target_repo_id: status_messages.append("SET_PRIVACY Error: Target repo_id not specified.") continue api.update_repo_visibility(repo_id=target_repo_id, private=change['private'], repo_type='space') status_messages.append(f"SET_PRIVACY: Successfully set `{target_repo_id}` to `private={change['private']}`.") logger.info(f"Successfully set privacy for {target_repo_id} to {change['private']}.") except HfHubHTTPError as e_http: status_messages.append(f"SET_PRIVACY Error ({e_http.response.status_code if e_http.response else 'N/A'}): {e_http.response.text if e_http.response else str(e_http)}. Check token/permissions.") logger.error(f"HTTP error setting privacy for {target_repo_id}: {e_http}") except Exception as e: status_messages.append(f"SET_PRIVACY Error: {str(e)}. Check logs.") logger.exception(f"Error setting privacy for {target_repo_id}:") elif change['type'] == 'DELETE_SPACE': # This is destructive and typically requires the exact owner/space_name delete_owner = change.get('owner') or owner_ui # Use specified owner or current UI owner delete_space = change.get('space_name') or space_name_ui # Use specified space_name or current UI space name delete_repo_id = f"{delete_owner}/{delete_space}" if delete_owner and delete_space else repo_id if not delete_repo_id: status_messages.append("DELETE_SPACE Error: Target repo_id not specified.") continue # Add an extra safeguard: Only delete the *currently loaded* space unless AI specifies otherwise AND it matches the current UI fields? # Or strictly use the repo_id from the action? Let's strictly use the action's specified repo_id, # falling back to UI fields only if the action didn't provide them. # The action format is `DELETE_SPACE` (implies current UI space) or maybe `DELETE_SPACE owner/repo`? # The prompt defined `DELETE_SPACE` only, implying the current space. Let's enforce that. # The change object should have owner/space_name populated by generate_and_stage_changes based on current UI. if delete_repo_id != repo_id: status_messages.append(f"DELETE_SPACE Error: AI requested deletion of '{delete_repo_id}', but this action is only permitted for the currently loaded space '{repo_id}'. Action blocked.") logger.warning(f"Blocked DELETE_SPACE action: requested '{delete_repo_id}', current '{repo_id}'.") continue # Block deletion if not the currently loaded space logger.warning(f"Attempting DESTRUCTIVE DELETE_SPACE action for {delete_repo_id}") try: api.delete_repo(repo_id=delete_repo_id, repo_type='space') status_messages.append(f"DELETE_SPACE: Successfully deleted space `{delete_repo_id}`.") logger.info(f"Successfully deleted space {delete_repo_id}.") except HfHubHTTPError as e_http: status_messages.append(f"DELETE_SPACE Error ({e_http.response.status_code if e_http.response else 'N/A'}): {e_http.response.text if e_http.response else str(e_http)}. Check token/permissions.") logger.error(f"HTTP error deleting space {delete_repo_id}: {e_http}") except Exception as e: status_messages.append(f"DELETE_SPACE Error: {str(e)}. Check logs.") logger.exception(f"Error deleting space {delete_repo_id}:") except HfHubHTTPError as e_http: logger.error(f"Top-level HTTP error during apply_staged_changes for {repo_id_for_error_logging or 'unknown repo'}: {e_http}") status_messages.append(f"API HTTP Error ({e_http.response.status_code if e_http.response else 'N/A'}): {e_http.response.text if e_http.response else str(e_http)}") except Exception as e: logger.exception(f"Top-level error during apply_staged_changes for {repo_id_for_error_logging or 'unknown repo'}:") status_messages.append(f"An unexpected error occurred: {str(e)}") # Format the final status message final_status = " | ".join(status_messages) if status_messages else "No operations were applied." logger.info(f"Finished applying staged changes. Final status: {final_status}") return final_status # --- Delete Single File (Manual UI Trigger) --- # This function remains for direct UI file deletion, distinct from the AI-driven workflow def delete_space_file(ui_api_token_from_textbox, space_name_ui, owner_ui, file_path_in_repo, commit_message_ui=None): repo_id_for_error_logging = f"{owner_ui}/{space_name_ui}" if owner_ui else space_name_ui repo_id = None logger.info(f"Attempting manual file deletion for '{file_path_in_repo}' from {repo_id_for_error_logging}") try: resolved_api_token, token_err = _get_api_token(ui_api_token_from_textbox) if token_err: return f"API Token Error: {token_err}" repo_id, err_repo_id = _determine_repo_id(ui_api_token_from_textbox, space_name_ui, owner_ui) if err_repo_id: return f"Repo ID Error: {err_repo_id}" repo_id_for_error_logging = repo_id # Update logging name if not file_path_in_repo: return "Error: File path cannot be empty for deletion." file_path_in_repo = file_path_in_repo.lstrip('/').replace(os.sep, '/') # Clean path for Hub # Prevent deleting essential files like .gitattributes or README.md unless explicitly handled? # For now, allow deleting anything selected in the dropdown. effective_commit_message = commit_message_ui or f"Deleted file: {file_path_in_repo} via AI Space Editor UI" # Use hf_delete_file directly hf_delete_file( path_in_repo=file_path_in_repo, repo_id=repo_id, repo_type="space", token=resolved_api_token, commit_message=effective_commit_message, timeout=20 # Added timeout ) logger.info(f"Successfully deleted file: {file_path_in_repo}") return f"Successfully deleted file: `{file_path_in_repo}`" except FileNotFoundError: logger.error(f"FileNotFoundError during manual delete for '{file_path_in_repo}' in {repo_id_for_error_logging or 'unknown'}") return f"Error: File '{file_path_in_repo}' not found in Space '{repo_id_for_error_logging or 'unknown repo'}' (404)." # hf_delete_file translates 404 except HfHubHTTPError as e_http: # Catch specific HF HTTP errors logger.error(f"HTTP error deleting file {file_path_in_repo} from {repo_id_for_error_logging or 'unknown repo'}: {e_http}") error_message = str(e_http) status_code = e_http.response.status_code if e_http.response is not None else None if status_code == 404: return f"Error: File '{file_path_in_repo}' not found in Space '{repo_id_for_error_logging or 'unknown repo'}' for deletion (404)." if status_code in (401, 403): return f"Error: Access denied or authentication required for '{repo_id_for_error_logging or 'unknown repo'}' ({status_code}). Check token permissions." return f"HTTP Error {status_code or 'unknown'} deleting file '{file_path_in_repo}': {error_message}" except Exception as e: logger.exception(f"Error deleting file {file_path_in_repo} from {repo_id_for_error_logging or 'unknown repo'}:") return f"Error deleting file '{file_path_in_repo}': {str(e)}" # --- Update Single File (Manual UI Trigger) --- # This function remains for direct UI file editing, distinct from the AI-driven workflow def update_space_file(ui_api_token_from_textbox, space_name_ui, owner_ui, file_path_in_repo, file_content, commit_message_ui): repo_id_for_error_logging = f"{owner_ui}/{space_name_ui}" if owner_ui else space_name_ui repo_id = None logger.info(f"Attempting manual file update for '{file_path_in_repo}' in {repo_id_for_error_logging}") try: resolved_api_token, token_err = _get_api_token(ui_api_token_from_textbox) if token_err: return token_err repo_id, err_repo_id = _determine_repo_id(ui_api_token_from_textbox, space_name_ui, owner_ui) if err_repo_id: return err_repo_id repo_id_for_error_logging = repo_id # Update logging name if not file_path_in_repo: return "Error: File Path to update cannot be empty." file_path_in_repo = file_path_in_repo.lstrip('/').replace(os.sep, '/') # Clean path for Hub commit_msg = commit_message_ui or f"Update {file_path_in_repo} via AI Space Editor UI" api = HfApi(token=resolved_api_token) # Use a temporary file to upload content safely tmp_file_path = None try: with tempfile.NamedTemporaryFile(mode='w', delete=False, encoding='utf-8') as tmp_file_obj: tmp_file_obj.write(file_content) tmp_file_path = tmp_file_obj.name # Upload the temporary file to the specified path in the repo api.upload_file( path_or_fileobj=tmp_file_path, path_in_repo=file_path_in_repo, repo_id=repo_id, repo_type="space", commit_message=commit_msg, timeout=20 # Added timeout ) logger.info(f"Successfully updated file: {file_path_in_repo}") return f"Successfully updated `{file_path_in_repo}`" finally: # Ensure the temporary file is removed if tmp_file_path and os.path.exists(tmp_file_path): os.remove(tmp_file_path) except FileNotFoundError: logger.error(f"FileNotFoundError during manual update for '{file_path_in_repo}' in {repo_id_for_error_logging or 'unknown'}") return f"Error: Local temporary file not found during upload for '{file_path_in_repo}'." except UnicodeDecodeError: # If read_text fails, it's likely binary or non-utf8 text logger.warning(f"UnicodeDecodeError for '{file_path_in_repo}' during manual update.") return f"Error: Content for '{file_path_in_repo}' is not valid UTF-8 text. Cannot edit this way." except HfHubHTTPError as e_http: logger.error(f"HTTP error in update_space_file for {repo_id_for_error_logging or 'unknown repo'}, file {file_path_in_repo}: {e_http}") error_message = str(e_http) status_code = e_http.response.status_code if e_http.response is not None else None if status_code == 404: return f"Error: Space '{repo_id_for_error_logging or 'unknown repo'}' or file '{file_path_in_repo}' not found (404)." if status_code in (401, 403): return f"Error: Access denied or authentication required for '{repo_id_for_error_logging or 'unknown repo'}' ({status_code}). Check token permissions." return f"HTTP Error {status_code or 'unknown'} updating file '{file_path_in_repo}': {error_message}" except Exception as e: logger.exception(f"Error in update_space_file for {repo_id_for_error_logging or 'unknown repo'}, file {file_path_in_repo}:") return f"Error updating file for `{repo_id_for_error_logging or 'unknown repo'}`: {str(e)}" # --- Get Space Runtime Status --- def get_space_runtime_status(ui_api_token_from_textbox, space_name_ui, owner_ui): repo_id_for_error_logging = f"{owner_ui}/{space_name_ui}" if owner_ui else space_name_ui repo_id = None logger.info(f"Fetching runtime status for Space: {repo_id_for_error_logging}") try: resolved_api_token, token_err = _get_api_token(ui_api_token_from_textbox) if token_err: return None, f"API Token Error: {token_err}" repo_id, err_repo_id = _determine_repo_id(ui_api_token_from_textbox, space_name_ui, owner_ui) if err_repo_id: return None, f"Repo ID Error: {err_repo_id}" repo_id_for_error_logging = repo_id # Update logging name api = HfApi(token=resolved_api_token) # Use get_space_runtime which provides details like stage, hardware, etc. # Added timeout for robustness runtime_info = api.get_space_runtime(repo_id=repo_id, timeout=20) logger.info(f"Received runtime info for {repo_id}. Stage: {runtime_info.stage}") # Structure the details for display status_details = { "stage": runtime_info.stage, "hardware": runtime_info.hardware, "requested_hardware": runtime_info.requested_hardware if hasattr(runtime_info, 'requested_hardware') else None, # requested_hardware might not always be present "error_message": None, # Populate this if stage is ERRORED "status": runtime_info.status if hasattr(runtime_info, 'status') else None, # Additional status field "full_log_link": f"https://huggingface.co/spaces/{repo_id}/logs" if repo_id else "#" # We can add more fields from runtime_info.raw if useful } # Check for specific error states or messages if runtime_info.stage == "ERRORED": error_content = None # Look for error details in various places within the raw data or the error attribute if hasattr(runtime_info, 'error') and runtime_info.error: error_content = str(runtime_info.error) # Check build/run specific error messages in raw data if 'build' in runtime_info.raw and isinstance(runtime_info.raw['build'], dict) and runtime_info.raw['build'].get('status') == 'error': error_content = f"Build Error: {runtime_info.raw['build'].get('message', error_content or 'Unknown build error')}" elif 'run' in runtime_info.raw and isinstance(runtime_info.raw['run'], dict) and runtime_info.raw['run'].get('status') == 'error': error_content = f"Runtime Error: {runtime_info.raw['run'].get('message', error_content or 'Unknown runtime error')}" elif 'message' in runtime_info.raw and isinstance(runtime_info.raw['message'], str) and ('error' in runtime_info.raw['message'].lower() or runtime_info.raw['message'].strip().endswith('!')): # Basic check for message indicative of error error_content = runtime_info.raw['message'] # Prioritize messages from raw data if they look like errors status_details["error_message"] = error_content if error_content else "Space is in an errored state. Check logs for details." logger.info(f"Runtime status details for {repo_id}: {status_details}") return status_details, None except HfHubHTTPError as e_http: # Catch specific HF HTTP errors logger.error(f"HTTP error fetching runtime status for {repo_id_for_error_logging or 'unknown repo'}: {e_http}") error_message = str(e_http) status_code = e_http.response.status_code if e_http.response is not None else None if status_code == 404: # A 404 could mean the space doesn't exist or doesn't have an active runtime state recorded return None, f"Error: Space '{repo_id_for_error_logging or 'unknown repo'}' not found or has no active runtime status (404)." if status_code in (401, 403): return None, f"Error: Access denied or authentication required for '{repo_id_for_error_logging or 'unknown repo'}' ({status_code}). Check token permissions." return None, f"HTTP Error {status_code or 'unknown'} fetching runtime status for '{repo_id_for_error_logging or 'unknown repo'}': {error_message}" except Exception as e: logger.exception(f"Error fetching runtime status for {repo_id_for_error_logging or 'unknown repo'}:") return None, f"Error fetching runtime status: {str(e)}" # --- Function to set space privacy --- def build_logic_set_space_privacy(hf_api_key, repo_id, private: bool): """Sets the privacy of a Hugging Face Space.""" logger.info(f"Attempting to set privacy for '{repo_id}' to {private}.") try: token, err = _get_api_token(hf_api_key) if err or not token: logger.error(f"Token error setting privacy: {err or 'Token not found'}") return f"Error getting token: {err or 'Token not found.'}" api = HfApi(token=token) api.update_repo_visibility(repo_id=repo_id, private=private, repo_type='space') logger.info(f"Successfully set privacy for {repo_id} to {private}.") return f"Successfully set privacy for `{repo_id}` to `{private}`." except HfHubHTTPError as e_http: logger.error(f"HTTP error setting privacy for {repo_id}: {e_http}") status_code = e_http.response.status_code if e_http.response else 'N/A' return f"HTTP Error ({status_code}) setting privacy for `{repo_id}`: {e_http.response.text if e_http.response else str(e_http)}" except Exception as e: logger.exception(f"Error setting privacy for {repo_id}:") return f"Error setting privacy for `{repo_id}`: {e}" # --- Function to delete an entire space --- def build_logic_delete_space(hf_api_key, owner, space_name): """Deletes an entire Hugging Face Space.""" repo_id = f"{owner}/{space_name}" logger.warning(f"Attempting DESTRUCTIVE DELETE_SPACE action for '{repo_id}'.") try: token, err = _get_api_token(hf_api_key) if err or not token: logger.error(f"Token error deleting space: {err or 'Token not found'}") return f"Error getting token: {err or 'Token not found.'}" api = HfApi(token=token) api.delete_repo(repo_id=repo_id, repo_type='space') logger.warning(f"Successfully deleted space {repo_id}.") return f"Successfully deleted space `{repo_id}`." except HfHubHTTPError as e_http: logger.error(f"HTTP error deleting space {repo_id}: {e_http}") status_code = e_http.response.status_code if e_http.response else 'N/A' return f"HTTP Error ({status_code}) deleting space `{repo_id}`: {e_http.response.text if e_http.response else str(e_http)}" except Exception as e: logger.exception(f"Error deleting space {repo_id}:") return f"Error deleting space `{repo_id}`: {e}"