broadfield-dev's picture
Update build_logic.py
adc8c7b verified
import os
import re
import tempfile
import shutil
import logging
from pathlib import Path
import time
from huggingface_hub import (
create_repo,
upload_folder,
list_repo_files,
whoami,
hf_hub_download,
delete_file as hf_delete_file,
HfApi,
create_pull_request as hf_create_pull_request,
)
from huggingface_hub.hf_api import CommitOperationDelete, CommitOperationAdd, CommitOperation
from huggingface_hub.utils import HfHubHTTPError
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
logger = logging.getLogger(__name__)
def hf_add_space_comment(owner_repo,token,discussion,comment):
result = HfApi(token=token).comment_discussion(
repo_id=owner_repo,
discussion_num=discussion,
comment=comment
)
return result
def _get_api_token(ui_token_from_textbox=None):
env_token = os.getenv('HF_TOKEN')
if ui_token_from_textbox:
logger.debug("Using HF_TOKEN from UI textbox.")
return ui_token_from_textbox.strip(), None
if env_token:
logger.debug("Using HF_TOKEN from environment variable.")
return env_token, None
logger.warning("Hugging Face API token not provided in UI or HF_TOKEN env var.")
return None, "Error: Hugging Face API token not provided in UI or HF_TOKEN env var."
def _determine_repo_id(ui_api_token_from_textbox, owner_ui, space_name_ui):
if not space_name_ui: return None, "Error: Space Name cannot be empty."
if "/" in space_name_ui: return None, "Error: Space Name should not contain '/'. Use Owner field for the owner part."
final_owner = owner_ui
error_message = None
if not final_owner:
logger.info("Owner not specified, attempting to auto-detect from token.")
resolved_api_token, token_err = _get_api_token(ui_api_token_from_textbox)
if token_err: return None, f"Error auto-detecting owner: {token_err}"
if not resolved_api_token: return None, "Error: API token required for auto owner determination if Owner field is empty."
try:
user_info = whoami(token=resolved_api_token)
if user_info and 'name' in user_info:
final_owner = user_info['name']
logger.info(f"Auto-detected owner: {final_owner}")
else:
error_message = "Error: Could not retrieve username from token. Check token permissions or specify Owner."
logger.error(error_message)
except Exception as e:
error_message = f"Error retrieving username from token: {str(e)}. Specify Owner or check token."
logger.exception("Error retrieving username from token:")
if error_message: return None, error_message
if not final_owner: return None, "Error: Owner could not be determined. Please specify it in the Owner field."
repo_id = f"{final_owner}/{space_name_ui}"
logger.info(f"Determined repo_id: {repo_id}")
return repo_id, None
def parse_markdown(markdown_input):
space_info = {"repo_name_md": "", "owner_md": "", "files": []}
current_file_path = None
current_file_content_lines = []
in_file_definition = False
in_code_block = False
file_parsing_errors = []
lines = markdown_input.strip().split("\n")
cleaned_lines = []
for line_content_orig in lines:
if line_content_orig.strip().startswith("# "):
if line_content_orig.strip().startswith("# ### File:") or \
line_content_orig.strip().startswith("# ## File Structure") or \
line_content_orig.strip().startswith("# # Space:"):
cleaned_lines.append(line_content_orig.strip()[2:])
else:
cleaned_lines.append(line_content_orig)
else:
cleaned_lines.append(line_content_orig)
lines = cleaned_lines
for i, line_content_orig in enumerate(lines):
line_content_stripped = line_content_orig.strip()
line_num = i + 1
file_match = re.match(r"### File:\s*(?P<filename_line>[^\n]+)", line_content_stripped)
if file_match:
if current_file_path is not None and in_file_definition:
content_to_save = "\n".join(current_file_content_lines).strip()
space_info["files"].append({"path": current_file_path, "content": content_to_save})
filename_line = file_match.group("filename_line").strip()
current_file_path = filename_line
current_file_path = re.split(r'\s*\(', current_file_path, 1)[0].strip()
current_file_path = current_file_path.strip('`\'"').strip()
if not current_file_path:
file_parsing_errors.append(f"Line {line_num}: Found '### File:' but filename is empty or invalid.")
current_file_path = None
in_file_definition = False
continue
current_file_content_lines = []
in_file_definition = True
in_code_block = False
logger.debug(f"Parsed file header: {current_file_path}")
continue
if not in_file_definition:
if line_content_stripped.startswith("# Space:"):
full_space_name_md = line_content_stripped.replace("# Space:", "").strip()
if "/" in full_space_name_md:
parts = full_space_name_md.split("/", 1)
if len(parts) == 2:
space_info["owner_md"], space_info["repo_name_md"] = parts[0].strip(), parts[1].strip()
else:
space_info["repo_name_md"] = full_space_name_md
else:
space_info["repo_name_md"] = full_space_name_md
logger.debug(f"Parsed space header: {space_info['owner_md']}/{space_info['repo_name_md']}")
continue
if line_content_stripped.startswith("## File Structure"):
structure_block_start = i + 1
while structure_block_start < len(lines) and not lines[structure_block_start].strip().startswith("```"):
structure_block_start += 1
if structure_block_start < len(lines) and lines[structure_block_start].strip().startswith("```"):
structure_block_end = structure_block_start + 1
while structure_block_end < len(lines) and not lines[structure_block_end].strip().startswith("```"):
structure_block_end += 1
if structure_block_end < len(lines) and lines[structure_block_end].strip().startswith("```"):
logger.debug(f"Skipping File Structure block from line {i+1} to {structure_block_end+1}")
i = structure_block_end
continue
continue
if in_file_definition:
if line_content_stripped.startswith("```"):
in_code_block = not in_code_block
logger.debug(f"Toggled code block to {in_code_block} at line {line_num}")
continue
if in_code_block:
current_file_content_lines.append(line_content_orig)
elif line_content_stripped.startswith("[Binary file") or line_content_stripped.startswith("[Error loading content:") or line_content_stripped.startswith("[Binary or Skipped file]"):
current_file_content_lines.append(line_content_orig)
logger.debug(f"Parsed binary/error marker for {current_file_path} at line {line_num}")
else:
pass
if current_file_path is not None and in_file_definition:
content_to_save = "\n".join(current_file_content_lines).strip()
space_info["files"].append({"path": current_file_path, "content": content_to_save})
space_info["files"] = [f for f in space_info["files"] if f.get("path")]
space_info["owner_md"] = space_info["owner_md"].strip()
space_info["repo_name_md"] = space_info["repo_name_md"].strip()
if file_parsing_errors:
logger.warning(f"Markdown parsing encountered errors: {file_parsing_errors}")
logger.info(f"Parsed markdown. Found {len(space_info['files'])} files.")
return space_info
def get_space_repository_info(ui_api_token_from_textbox, space_name_ui, owner_ui):
repo_id_for_error_logging = f"{owner_ui}/{space_name_ui}" if owner_ui else space_name_ui
sdk = None
files = []
error = None
repo_id = None
logger.info(f"Attempting to get repo info for {repo_id_for_error_logging}")
try:
resolved_api_token, token_err = _get_api_token(ui_api_token_from_textbox)
if token_err: return None, [], token_err
repo_id, err_repo_id = _determine_repo_id(resolved_api_token, owner_ui, space_name_ui)
if err_repo_id: return None, [], err_repo_id
repo_id_for_error_logging = repo_id
api = HfApi(token=resolved_api_token)
repo_info_obj = api.repo_info(repo_id=repo_id, repo_type="space", timeout=20)
sdk = repo_info_obj.sdk
files = [sibling.rfilename for sibling in repo_info_obj.siblings if sibling.rfilename]
if not files and repo_info_obj.siblings:
logger.warning(f"Repo {repo_id} has siblings but no rfilenames extracted. Total siblings: {len(repo_info_obj.siblings)}")
logger.info(f"Successfully got repo info for {repo_id}. SDK: {sdk}, Files found: {len(files)}")
except HfHubHTTPError as e_http:
logger.error(f"HTTP error getting repo info for {repo_id_for_error_logging or 'unknown repo'}: {e_http}")
error_message = str(e_http)
status_code = e_http.response.status_code if e_http.response is not None else None
if status_code == 404:
error = f"Space '{repo_id_for_error_logging or 'unknown repo'}' not found (404)."
elif status_code in (401,403):
error = f"Access denied for '{repo_id_for_error_logging or 'unknown repo'}' ({status_code}). Check token permissions."
else:
error = f"HTTP Error {status_code or 'unknown'} for '{repo_id_for_error_logging or 'unknown repo'}': {error_message}"
except Exception as e:
logger.warning(f"Could not get full repo_info for {repo_id_for_error_logging or 'unknown repo'}, attempting list_repo_files fallback: {e}")
error = f"Error retrieving Space info for `{repo_id_for_error_logging or 'unknown repo'}`: {str(e)}. Attempting file list fallback."
try:
resolved_api_token_fb, token_err_fb = _get_api_token(ui_api_token_from_textbox)
if token_err_fb: return None, [], f"{error}\nAPI Token Error during fallback: {token_err_fb}"
repo_id_fb, err_repo_id_fb = _determine_repo_id(resolved_api_token_fb, owner_ui, space_name_ui)
if err_repo_id_fb: return None, [], f"{error}\nRepo ID Error during fallback: {err_repo_id_fb}"
files = list_repo_files(repo_id=repo_id_fb, token=resolved_api_token_fb, repo_type="space", timeout=20)
error = f"Warning: Could not fetch full Space info (SDK etc.) for `{repo_id_for_error_logging or 'unknown repo'}`: {str(e)}. File list loaded via fallback."
logger.info(f"Fallback list_repo_files successful for {repo_id_fb}. Files found: {len(files)}")
except HfHubHTTPError as e2_http:
logger.error(f"HTTP error during fallback list_repo_files for {repo_id_for_error_logging or 'unknown repo'}: {e2_http}")
error_message_fb = str(e2_http)
status_code_fb = e2_http.response.status_code if e2_http.response is not None else None
if status_code_fb == 404:
error = f"Space '{repo_id_for_error_logging or 'unknown repo'}' not found during fallback (404)."
else:
error = f"HTTP Error {status_code_fb or 'unknown'} for '{repo_id_for_error_logging or 'unknown repo'}' during fallback: {error_message_fb}"
files = []
except Exception as e2:
logger.exception(f"Error listing files for {repo_id_for_error_logging or 'unknown repo'} during fallback: {e2}")
error = f"{error}\nError listing files during fallback for `{repo_id_for_error_logging or 'unknown repo'}`: {str(e2)}"
files = []
if not files and not error and (repo_id_for_error_logging is not None):
error = f"No files found in Space `{repo_id_for_error_logging or 'unknown repo'}`."
return sdk, files, error
def list_space_files_for_browsing(ui_api_token_from_textbox, space_name_ui, owner_ui):
files, err = get_space_repository_info(ui_api_token_from_textbox, space_name_ui, owner_ui)[1:]
return files, err
def get_space_file_content(ui_api_token_from_textbox, space_name_ui, owner_ui, file_path_in_repo):
repo_id_for_error_logging = f"{owner_ui}/{space_name_ui}" if owner_ui else space_name_ui
repo_id = None
logger.info(f"Attempting to get content for file '{file_path_in_repo}' from {repo_id_for_error_logging}")
try:
resolved_api_token, token_err = _get_api_token(ui_api_token_from_textbox)
if token_err: return None, token_err
repo_id, err_repo_id = _determine_repo_id(resolved_api_token, owner_ui, space_name_ui)
if err_repo_id: return None, err_repo_id
repo_id_for_error_logging = repo_id
if not file_path_in_repo: return None, "Error: File path cannot be empty."
file_path_in_repo = file_path_in_repo.replace("\\", "/")
downloaded_file_path = hf_hub_download(
repo_id=repo_id,
filename=file_path_in_repo,
repo_type="space",
token=resolved_api_token,
local_dir_use_symlinks=False,
cache_dir=None,
timeout=20
)
content = Path(downloaded_file_path).read_text(encoding="utf-8")
logger.info(f"Successfully downloaded and read content for '{file_path_in_repo}'.")
return content, None
except FileNotFoundError:
logger.error(f"FileNotFoundError for '{file_path_in_repo}' in {repo_id_for_error_logging or 'unknown'}")
return None, f"Error: File '{file_path_in_repo}' not found in Space '{repo_id_for_error_logging or 'unknown repo'}' (404)."
except UnicodeDecodeError:
logger.warning(f"UnicodeDecodeError for '{file_path_in_repo}'. Likely binary.")
return None, f"Error: File '{file_path_in_repo}' is not valid UTF-8 text. Cannot display."
except HfHubHTTPError as e_http:
logger.error(f"HTTP error fetching file {file_path_in_repo} from {repo_id_for_error_logging or 'unknown repo'}: {e_http}")
error_message = str(e_http)
status_code = e_http.response.status_code if e_http.response is not None else None
if status_code == 404:
return None, f"Error: File '{file_path_in_repo}' not found in Space '{repo_id_for_error_logging or 'unknown repo'}' (404)."
if status_code in (401, 403):
return None, f"Error: Access denied or authentication required for '{repo_id_for_error_logging or 'unknown repo'}' ({status_code}). Check token permissions."
return None, f"HTTP Error {status_code or 'unknown'} fetching file '{file_path_in_repo}': {error_message}"
except Exception as e:
logger.exception(f"Error fetching file content for {file_path_in_repo} from {repo_id_for_error_logging or 'unknown repo'}:")
return None, f"Error fetching file content: {str(e)}"
def apply_staged_file_changes(ui_api_token_from_textbox, owner_ui, space_name_ui, file_changeset):
repo_id_for_error_logging = f"{owner_ui}/{space_name_ui}" if owner_ui else space_name_ui
repo_id = None
status_messages = []
logger.info(f"Attempting to apply {len(file_changeset)} staged file changes to {repo_id_for_error_logging}")
if not owner_ui or not space_name_ui:
return "Error: Cannot apply file changes. Owner and Space Name must be provided."
try:
resolved_api_token, token_err = _get_api_token(ui_api_token_from_textbox)
if token_err: return f"API Token Error: {token_err}"
repo_id, err_repo_id = _determine_repo_id(resolved_api_token, owner_ui, space_name_ui)
if err_repo_id: return f"Repo ID Error: {err_repo_id}"
repo_id_for_error_logging = repo_id
api = HfApi(token=resolved_api_token)
temp_dir = None
paths_to_upload = {}
delete_operations = []
try:
temp_dir = tempfile.TemporaryDirectory()
repo_staging_path = Path(temp_dir.name) / "repo_staging_content"
repo_staging_path.mkdir(exist_ok=True)
gitattributes_path_local = repo_staging_path / ".gitattributes"
try:
with open(gitattributes_path_local, "w", encoding="utf-8") as f:
f.write("* text=auto eol=lf\n")
paths_to_upload[str(gitattributes_path_local)] = ".gitattributes"
except Exception as e:
status_messages.append(f"Warning: Could not stage .gitattributes file: {e}")
logger.warning(f"Could not stage .gitattributes: {e}")
for change in file_changeset:
if change['type'] == 'UPDATE_FILE' or change['type'] == 'CREATE_FILE':
file_path_in_repo = change['path'].lstrip('/').replace(os.sep, '/')
if not file_path_in_repo:
status_messages.append(f"Skipping {change['type']} operation: empty path.")
continue
content_to_write = change.get('content', '')
if content_to_write.startswith("[Binary file") or content_to_write.startswith("[Error loading content:") or content_to_write.startswith("[Binary or Skipped file]"):
status_messages.append(f"Skipping {change['type']} for '{file_path_in_repo}': Content is a binary/error placeholder.")
logger.warning(f"Skipping {change['type']} operation for '{file_path_in_repo}': Content is binary/error placeholder.")
continue
file_path_local = repo_staging_path / file_path_in_repo
file_path_local.parent.mkdir(parents=True, exist_ok=True)
try:
with open(file_path_local, "w", encoding="utf-8") as f:
f.write(content_to_write)
paths_to_upload[str(file_path_local)] = file_path_in_repo
logger.debug(f"Staged file for {change['type']}: {file_path_in_repo}")
except Exception as file_write_error:
status_messages.append(f"Error staging file {file_path_in_repo} for {change['type']}: {file_write_error}")
logger.error(f"Error writing file {file_path_in_repo} during staging for {change['type']}: {file_write_error}")
elif change['type'] == 'DELETE_FILE':
file_path_in_repo = change['path'].lstrip('/').replace(os.sep, '/')
if not file_path_in_repo:
status_messages.append(f"Skipping DELETE_FILE operation: empty path.")
continue
delete_operations.append(CommitOperationDelete(path_in_repo=file_path_in_repo))
logger.debug(f"Added DELETE_FILE operation for: {file_path_in_repo}")
if delete_operations:
try:
commit_message_delete = f"AI Space Builder: Deleted {len(delete_operations)} files."
logger.info(f"Performing delete commit for {repo_id_for_error_logging}: {commit_message_delete}")
api.create_commit(
repo_id=repo_id,
repo_type="space",
operations=delete_operations,
commit_message=commit_message_delete,
timeout=30
)
status_messages.append(f"File Deletions: Successfully committed {len(delete_operations)} deletions.")
logger.info("Delete commit successful.")
except HfHubHTTPError as e_http:
status_messages.append(f"File Deletion HTTP Error ({e_http.response.status_code if e_http.response else 'N/A'}): {e_http.response.text if e_http.response else str(e_http)}. Check logs.")
logger.error(f"HTTP error during delete commit for {repo_id}: {e_http}")
except Exception as e_delete_commit:
status_messages.append(f"File Deletion Error: {str(e_delete_commit)}. Check logs.")
logger.exception(f"Error during delete commit for {repo_id}:")
if paths_to_upload:
try:
commit_message_upload = f"AI Space Builder: Updated Space content for {repo_id}"
logger.info(f"Uploading staged files from {str(repo_staging_path)} to {repo_id}...")
upload_folder(
repo_id=repo_id,
folder_path=str(repo_staging_path),
path_in_repo=".",
token=resolved_api_token,
repo_type="space",
commit_message=commit_message_upload,
allow_patterns=["*"],
timeout=120
)
status_messages.append(f"File Uploads/Updates: Successfully uploaded/updated {len(paths_to_upload)} files.")
logger.info("Upload/Update commit successful.")
except HfHubHTTPError as e_http:
status_messages.append(f"File Upload/Update HTTP Error ({e_http.response.status_code if e_http.response else 'N/A'}): {e_http.response.text if e_http.response else str(e_http)}. Check logs.")
logger.error(f"HTTP error during upload_folder for {repo_id}: {e_http}")
except Exception as e_upload:
status_messages.append(f"File Upload/Update Error: {str(e_upload)}. Check logs.")
logger.exception(f"Error during upload_folder for {repo_id}:")
else:
status_messages.append("No file changes (create/update/delete) to commit.")
logger.info("No file changes to commit.")
finally:
if temp_dir:
try:
temp_dir.cleanup()
logger.info("Cleaned up temporary staging directory.")
except Exception as e:
logger.error(f"Error cleaning up temp dir: {e}")
except HfHubHTTPError as e_http:
logger.error(f"Top-level HTTP error during apply_staged_file_changes for {repo_id_for_error_logging or 'unknown repo'}: {e_http}")
status_messages.append(f"API HTTP Error ({e_http.response.status_code if e_http.response else 'N/A'}): {e_http.response.text if e_http.response else str(e_http)}")
except Exception as e:
logger.exception(f"Top-level error during apply_staged_file_changes for {repo_id_for_error_logging or 'unknown repo'}:")
status_messages.append(f"An unexpected error occurred during apply file changes: {str(e)}")
final_status = " | ".join(status_messages) if status_messages else "No file operations were applied."
logger.info(f"Finished applying staged file changes. Final status: {final_status}")
return final_status
def build_logic_create_space(ui_api_token_from_textbox, space_name_ui, owner_ui, sdk_ui, markdown_input, private):
repo_id_for_error_logging = f"{owner_ui}/{space_name_ui}" if owner_ui else space_name_ui
logger.info(f"Attempting to create space: {repo_id_for_error_logging}")
if not space_name_ui: return "Error: Space Name cannot be empty."
if "/" in space_name_ui: return "Error: Space Name should not contain '/'."
resolved_api_token, token_err = _get_api_token(ui_api_token_from_textbox)
if token_err: return f"API Token Error: {token_err}"
final_owner = owner_ui
if not final_owner:
try:
user_info = whoami(token=resolved_api_token)
final_owner = user_info.get('name')
if not final_owner: raise Exception("Could not find user name from token.")
except Exception as e:
return f"Error auto-detecting owner: {str(e)}. Specify Owner field."
if not final_owner: return "Error: Owner could not be determined."
repo_id = f"{final_owner}/{space_name_ui}"
temp_dir = None
try:
api = HfApi(token=resolved_api_token)
# Create the repository
api.create_repo(repo_id=repo_id, repo_type="space", space_sdk=sdk_ui, private=private, exist_ok=False)
logger.info(f"Successfully created empty space: {repo_id}")
# Stage files from markdown for upload if markdown is provided
if markdown_input:
parsed_md = parse_markdown(markdown_input)
files_to_upload = parsed_md.get("files", [])
if files_to_upload:
logger.info(f"Staging {len(files_to_upload)} files for upload after creation.")
temp_dir = tempfile.TemporaryDirectory()
repo_staging_path = Path(temp_dir.name) / "initial_content"
repo_staging_path.mkdir(exist_ok=True)
paths_to_upload = {}
status_messages = []
# Add .gitattributes
gitattributes_path_local = repo_staging_path / ".gitattributes"
try:
with open(gitattributes_path_local, "w", encoding="utf-8") as f:
f.write("* text=auto eol=lf\n")
paths_to_upload[str(gitattributes_path_local)] = ".gitattributes"
except Exception as e:
status_messages.append(f"Warning: Could not stage .gitattributes file: {e}")
logger.warning(f"Could not stage .gitattributes: {e}")
for file_info in files_to_upload:
file_path_in_repo = file_info['path'].lstrip('/').replace(os.sep, '/')
content_to_write = file_info.get('content', '')
if not file_path_in_repo:
status_messages.append(f"Skipping file creation: empty path.")
continue
if content_to_write.startswith("[Binary file") or content_to_write.startswith("[Error loading content:") or content_to_write.startswith("[Binary or Skipped file]"):
status_messages.append(f"Skipping file '{file_path_in_repo}': Content is a binary/error placeholder.")
logger.warning(f"Skipping file '{file_path_in_repo}': Content is binary/error placeholder.")
continue
file_path_local = repo_staging_path / file_path_in_repo
file_path_local.parent.mkdir(parents=True, exist_ok=True)
try:
with open(file_path_local, "w", encoding="utf-8") as f:
f.write(content_to_write)
paths_to_upload[str(file_path_local)] = file_path_in_repo
except Exception as file_write_error:
status_messages.append(f"Error staging file {file_path_in_repo}: {file_write_error}")
if paths_to_upload:
try:
commit_message_upload = "Initial files from AI Space Builder"
upload_folder(
repo_id=repo_id,
folder_path=str(repo_staging_path),
path_in_repo=".",
token=resolved_api_token,
repo_type="space",
commit_message=commit_message_upload,
allow_patterns=["*"],
timeout=120
)
status_messages.append(f"Successfully uploaded initial {len(paths_to_upload)} files.")
logger.info("Initial upload successful.")
except HfHubHTTPError as e_http:
status_messages.append(f"Initial Upload HTTP Error ({e_http.response.status_code if e_http.response else 'N/A'}): {e_http.response.text if e_http.response else str(e_http)}. Check logs.")
logger.error(f"HTTP error during initial upload for {repo_id}: {e_http}")
except Exception as e_upload:
status_messages.append(f"Initial Upload Error: {str(e_upload)}. Check logs.")
logger.exception(f"Error during initial upload for {repo_id}:")
else:
status_messages = ["No files parsed from markdown for initial upload."]
else:
status_messages = ["No markdown provided for initial files."]
final_status = "Successfully created space"
if status_messages:
final_status += " | " + " | ".join(status_messages)
return final_status
except HfHubHTTPError as e_http:
logger.error(f"HTTP error creating space {repo_id_for_error_logging or 'unknown repo'}: {e_http}")
status_code = e_http.response.status_code if e_http.response else 'N/A'
if status_code == 409: # Conflict - repo already exists
return f"Create Space Error ({status_code}): Space '{repo_id_for_error_logging or 'unknown'}' already exists. Use 'Build / Update' instead."
if status_code in (401, 403):
return f"Create Space Error ({status_code}): Access denied or authentication required for '{repo_id_for_error_logging or 'unknown'}'. Check token permissions."
return f"Create Space HTTP Error ({status_code}): {e_http.response.text if e_http.response else str(e_http)}"
except Exception as e:
logger.exception(f"Error creating space {repo_id_for_error_logging or 'unknown repo'}:")
return f"Create Space Error: {str(e)}"
finally:
if temp_dir:
try:
temp_dir.cleanup()
except Exception as e:
logger.error(f"Error cleaning up temp dir: {e}")
def delete_space_file(ui_api_token_from_textbox, space_name_ui, owner_ui, file_path_in_repo, commit_message_ui=None):
repo_id_for_error_logging = f"{owner_ui}/{space_name_ui}" if owner_ui else space_name_ui
repo_id = None
logger.info(f"Attempting manual file deletion for '{file_path_in_repo}' from {repo_id_for_error_logging}")
try:
resolved_api_token, token_err = _get_api_token(ui_api_token_from_textbox)
if token_err: return f"API Token Error: {token_err}"
repo_id, err_repo_id = _determine_repo_id(resolved_api_token, owner_ui, space_name_ui)
if err_repo_id: return f"Repo ID Error: {err_repo_id}"
repo_id_for_error_logging = repo_id
if not file_path_in_repo: return "Error: File path cannot be empty for deletion."
file_path_in_repo = file_path_in_repo.lstrip('/').replace(os.sep, '/')
effective_commit_message = commit_message_ui or f"Deleted file: {file_path_in_repo} via AI Space Editor UI"
hf_delete_file(
path_in_repo=file_path_in_repo,
repo_id=repo_id,
repo_type="space",
token=resolved_api_token,
commit_message=effective_commit_message,
timeout=20
)
logger.info(f"Successfully deleted file: {file_path_in_repo}")
return f"Successfully deleted file: `{file_path_in_repo}`"
except FileNotFoundError:
logger.error(f"FileNotFoundError during manual delete for '{file_path_in_repo}' in {repo_id_for_error_logging or 'unknown'}")
return f"Delete Error: File '{file_path_in_repo}' not found in Space '{repo_id_for_error_logging or 'unknown repo'}' (404)."
except HfHubHTTPError as e_http:
logger.error(f"HTTP error deleting file {file_path_in_repo} from {repo_id_for_error_logging or 'unknown repo'}: {e_http}")
error_message = str(e_http)
status_code = e_http.response.status_code if e_http.response is not None else None
if status_code == 404:
return f"Delete Error: File '{file_path_in_repo}' not found in Space '{repo_id_for_error_logging or 'unknown repo'}' for deletion (404)."
if status_code in (401, 403):
return f"Delete Error: Access denied or authentication required for '{repo_id_for_error_logging or 'unknown repo'}' ({status_code}). Check token permissions."
return f"Delete HTTP Error {status_code or 'unknown'} deleting file '{file_path_in_repo}': {error_message}"
except Exception as e:
logger.exception(f"Error deleting file {file_path_in_repo} from {repo_id_for_error_logging or 'unknown repo'}:")
return f"Delete Error deleting file '{file_path_in_repo}': {str(e)}"
def update_space_file(ui_api_token_from_textbox, space_name_ui, owner_ui, file_path_in_repo, file_content, commit_message_ui):
repo_id_for_error_logging = f"{owner_ui}/{space_name_ui}" if owner_ui else space_name_ui
repo_id = None
logger.info(f"Attempting manual file update for '{file_path_in_repo}' in {repo_id_for_error_logging}")
try:
resolved_api_token, token_err = _get_api_token(ui_api_token_from_textbox)
if token_err: return f"API Token Error: {token_err}"
repo_id, err_repo_id = _determine_repo_id(resolved_api_token, owner_ui, space_name_ui)
if err_repo_id: return f"Repo ID Error: {err_repo_id}"
repo_id_for_error_logging = repo_id
if not file_path_in_repo: return "Update Error: File Path to update cannot be empty."
file_path_in_repo = file_path_in_repo.lstrip('/').replace(os.sep, '/')
commit_msg = commit_message_ui or f"Update {file_path_in_repo} via AI Space Editor UI"
api = HfApi(token=resolved_api_token)
tmp_file_path = None
try:
with tempfile.NamedTemporaryFile(mode='w', delete=False, encoding='utf-8') as tmp_file_obj:
tmp_file_obj.write(file_content)
tmp_file_path = tmp_file_obj.name
api.upload_file(
path_or_fileobj=tmp_file_path,
path_in_repo=file_path_in_repo,
repo_id=repo_id,
repo_type="space",
commit_message=commit_msg,
timeout=20
)
logger.info(f"Successfully updated file: {file_path_in_repo}")
return f"Successfully updated `{file_path_in_repo}`"
finally:
if tmp_file_path and os.path.exists(tmp_file_path):
os.remove(tmp_file_path)
except FileNotFoundError:
logger.error(f"FileNotFoundError during manual update for '{file_path_in_repo}' in {repo_id_for_error_logging or 'unknown'}")
return f"Update Error: Local temporary file not found during upload for '{file_path_in_repo}'."
except UnicodeDecodeError:
logger.warning(f"UnicodeDecodeError for '{file_path_in_repo}' during manual update.")
return f"Update Error: Content for '{file_path_in_repo}' is not valid UTF-8 text. Cannot edit this way."
except HfHubHTTPError as e_http:
logger.error(f"HTTP error in update_space_file for {repo_id_for_error_logging or 'unknown repo'}, file {file_path_in_repo}: {e_http}")
error_message = str(e_http)
status_code = e_http.response.status_code if e_http.response is not None else None
if status_code == 404:
return f"Update Error: Space '{repo_id_for_error_logging or 'unknown repo'}' or file '{file_path_in_repo}' not found (404)."
if status_code in (401, 403):
return f"Update Error: Access denied or authentication required for '{repo_id_for_error_logging or 'unknown repo'}' ({status_code}). Check token permissions."
return f"Update HTTP Error {status_code or 'unknown'} updating file '{file_path_in_repo}': {error_message}"
except Exception as e:
logger.exception(f"Error in update_space_file for {repo_id_for_error_logging or 'unknown repo'}, file {file_path_in_repo}:")
return f"Update Error updating file for `{repo_id_for_error_logging or 'unknown repo'}`: {str(e)}"
def get_space_runtime_status(ui_api_token_from_textbox, space_name_ui, owner_ui):
repo_id_for_error_logging = f"{owner_ui}/{space_name_ui}" if owner_ui else space_name_ui
repo_id = None
logger.info(f"Fetching runtime status for Space: {repo_id_for_error_logging}")
try:
resolved_api_token, token_err = _get_api_token(ui_api_token_from_textbox)
if token_err: return None, f"API Token Error: {token_err}"
repo_id, err_repo_id = _determine_repo_id(resolved_api_token, owner_ui, space_name_ui)
if err_repo_id: return None, f"Repo ID Error: {err_repo_id}"
repo_id_for_error_logging = repo_id
api = HfApi(token=resolved_api_token)
runtime_info = api.get_space_runtime(repo_id=repo_id, timeout=20)
logger.info(f"Received runtime info for {repo_id}. Stage: {runtime_info.stage}")
status_details = {
"stage": runtime_info.stage,
"hardware": runtime_info.hardware,
"requested_hardware": runtime_info.requested_hardware if hasattr(runtime_info, 'requested_hardware') else None,
"error_message": None,
"status": runtime_info.status if hasattr(runtime_info, 'status') else None,
"full_log_link": f"https://huggingface.co/spaces/{repo_id}/logs" if repo_id else "#"
}
if runtime_info.stage == "ERRORED":
error_content = None
if hasattr(runtime_info, 'error') and runtime_info.error: error_content = str(runtime_info.error)
if 'build' in runtime_info.raw and isinstance(runtime_info.raw['build'], dict) and runtime_info.raw['build'].get('status') == 'error':
error_content = f"Build Error: {runtime_info.raw['build'].get('message', error_content or 'Unknown build error')}"
elif 'run' in runtime_info.raw and isinstance(runtime_info.raw['run'], dict) and runtime_info.raw['run'].get('status') == 'error':
error_content = f"Runtime Error: {runtime_info.raw['run'].get('message', error_content or 'Unknown runtime error')}"
elif 'message' in runtime_info.raw and isinstance(runtime_info.raw['message'], str) and ('error' in runtime_info.raw['message'].lower() or runtime_info.raw['message'].strip().endswith('!')):
error_content = runtime_info.raw['message']
status_details["error_message"] = error_content if error_content else "Space is in an errored state. Check logs for details."
logger.info(f"Runtime status details for {repo_id}: {status_details}")
return status_details, None
except HfHubHTTPError as e_http:
logger.error(f"HTTP error fetching runtime status for {repo_id_for_error_logging or 'unknown repo'}: {e_http}")
error_message = str(e_http)
status_code = e_http.response.status_code if e_http.response is not None else None
if status_code == 404:
return None, f"Status Error: Space '{repo_id_for_error_logging or 'unknown repo'}' not found or has no active runtime status (404)."
if status_code in (401, 403):
return None, f"Status Error: Access denied or authentication required for '{repo_id_for_error_logging or 'unknown repo'}' ({status_code}). Check token permissions."
return None, f"Status HTTP Error {status_code or 'unknown'} fetching runtime status for '{repo_id_for_error_logging or 'unknown repo'}': {error_message}"
except Exception as e:
logger.exception(f"Error fetching runtime status for {repo_id_for_error_logging or 'unknown repo'}:")
return None, f"Status Error fetching runtime status: {str(e)}"
def build_logic_set_space_privacy(hf_api_key, repo_id, private: bool):
logger.info(f"Attempting to set privacy for '{repo_id}' to {private}.")
try:
token, err = _get_api_token(hf_api_key)
if err or not token:
logger.error(f"Token error setting privacy: {err or 'Token not found'}")
return f"Error getting token: {err or 'Token not found.'}"
api = HfApi(token=token)
api.update_repo_visibility(repo_id=repo_id, private=private, repo_type='space')
logger.info(f"Successfully set privacy for {repo_id} to {private}.")
return f"Successfully set privacy for `{repo_id}` to `{private}`."
except HfHubHTTPError as e_http:
logger.error(f"HTTP error setting privacy for {repo_id}: {e_http}")
status_code = e_http.response.status_code if e_http.response else 'N/A'
return f"HTTP Error ({status_code}) setting privacy for `{repo_id}`: {e_http.response.text if e_http.response else str(e_http)}"
except Exception as e:
logger.exception(f"Error setting privacy for {repo_id}:")
return f"Error setting privacy for `{repo_id}`: {e}"
def build_logic_delete_space(hf_api_key, owner, space_name):
repo_id = f"{owner}/{space_name}"
logger.warning(f"Attempting DESTRUCTIVE DELETE_SPACE action for '{repo_id}'.")
try:
token, err = _get_api_token(hf_api_key)
if err or not token:
logger.error(f"Token error deleting space: {err or 'Token not found'}")
return f"Error getting token: {err or 'Token not found.'}"
api = HfApi(token=token)
api.delete_repo(repo_id=repo_id, repo_type='space')
logger.warning(f"Successfully deleted space {repo_id}.")
return f"Successfully deleted space `{repo_id}`."
except HfHubHTTPError as e_http:
logger.error(f"HTTP error deleting space {repo_id}: {e_http}")
status_code = e_http.response.status_code if e_http.response else 'N/A'
return f"HTTP Error ({status_code}) deleting space `{repo_id}`: {e_http.response.text if e_http.response else str(e_http)}"
except Exception as e:
logger.exception(f"Error deleting space {repo_id}:")
return f"Error deleting space `{repo_id}`: {e}"
def build_logic_create_pull_request(hf_api_key, source_repo_id, target_repo_id, title, body=""):
logger.info(f"Attempting to create PR from '{source_repo_id}' to '{target_repo_id}'. Title: '{title}'")
try:
token, err = _get_api_token(hf_api_key)
if err or not token:
logger.error(f"Token error creating PR: {err or 'Token not found'}")
return f"Error getting token: {err or 'Token not found.'}"
api = HfApi(token=token)
pr_url = hf_create_pull_request(
repo_id=target_repo_id,
title=title,
description=body,
base_repo=source_repo_id,
token=token,
timeout=30
)
logger.info(f"Successfully created PR: {pr_url}")
return f"Successfully created Pull Request: {pr_url}"
except HfHubHTTPError as e_http:
logger.error(f"HTTP error creating PR from {source_repo_id} to {target_repo_id}: {e_http}")
status_code = e_http.response.status_code if e_http.response else 'N/A'
if status_code in (401, 403):
return f"PR Error ({status_code}): Access denied or authentication required to create PR on '{target_repo_id}'. Check token permissions."
if status_code == 404:
return f"PR Error ({status_code}): Target repository '{target_repo_id}' not found."
if e_http.response and isinstance(e_http.response.text, str) and 'already exists' in e_http.response.text:
return f"PR Error: Pull Request already exists."
return f"PR HTTP Error ({status_code}): {e_http.response.text if e_http.response else str(e_http)}"
except Exception as e:
logger.exception(f"Error creating PR from {source_repo_id} to {target_repo_id}:")
return f"PR Error: {e}"
def build_logic_add_comment(hf_api_key, repo_id, comment_text):
logger.info(f"Attempting to add comment to '{repo_id}'. Text: '{comment_text[:50]}...'")
try:
token, err = _get_api_token(hf_api_key)
if err or not token:
logger.error(f"Token error adding comment: {err or 'Token not found'}")
return f"Error getting token: {err or 'Token not found.'}"
# Use the new public API method add_space_comment
hf_add_space_comment(
repo_id=repo_id,
comment=comment_text,
token=token,
timeout=20
)
logger.info(f"Successfully added comment to {repo_id}.")
# Note: hf_add_space_comment doesn't return the comment URL directly
return f"Successfully added comment to `{repo_id}`."
except HfHubHTTPError as e_http:
logger.error(f"HTTP error adding comment to {repo_id}: {e_http}")
status_code = e_http.response.status_code if e_http.response else 'N/A'
if status_code in (401, 403):
return f"Comment Error ({status_code}): Access denied or authentication required to add comment on '{repo_id}'. Check token permissions."
if status_code == 404:
return f"Comment Error ({status_code}): Repository '{repo_id}' not found."
return f"Comment HTTP Error ({status_code}): {e_http.response.text if e_http.response else str(e_http)}"
except Exception as e:
logger.exception(f"Error adding comment to {repo_id}:")
return f"Comment Error: {e}"
def build_logic_like_space(hf_api_key, repo_id):
logger.info(f"Attempting to like space '{repo_id}'.")
return "'Like system' not installed."
def duplicate_space(hf_api_key, source_repo_id, target_repo_id, private: bool = False):
"""Duplicates a Hugging Face Space."""
logger.info(f"Attempting to duplicate '{source_repo_id}' to '{target_repo_id}' (private={private}).")
try:
token, err = _get_api_token(hf_api_key)
if err or not token:
logger.error(f"Token error duplicating space: {err or 'Token not found'}")
return f"Error getting token: {err or 'Token not found.'}"
if '/' in target_repo_id:
target_owner, target_space_name = target_repo_id.split('/', 1)
if not target_owner or not target_space_name or '/' in target_space_name:
return f"Error: Invalid target repository ID format '{target_repo_id}'. Must be '<owner>/<space_name>'."
else:
target_space_name = target_repo_id
try:
user_info = whoami(token=token)
target_owner = user_info.get('name')
if not target_owner: raise Exception("Could not determine owner from token.")
target_repo_id = f"{target_owner}/{target_space_name}"
except Exception as e:
logger.error(f"Could not determine target owner from token: {e}")
return f"Error: Target repository ID '{target_repo_id}' is missing owner, and owner could not be determined from token ({e}). Use '<owner>/<space_name>' format or set the Owner field."
api = HfApi(token=token)
api.duplicate_repo(
from_repo=source_repo_id,
to_repo=target_repo_id,
repo_type="space",
token=token,
private=private,
exist_ok=True
)
logger.info(f"Successfully duplicated space from {source_repo_id} to {target_repo_id}.")
return f"Successfully duplicated space from `{source_repo_id}` to `{target_repo_id}`."
except HfHubHTTPError as e_http:
logger.error(f"HTTP error duplicating space from {source_repo_id} to {target_repo_id}: {e_http}")
status_code = e_http.response.status_code if e_http.response else 'N/A'
return f"HTTP Error ({status_code}) duplicating space: {e_http.response.text if e_http.response else str(e_http)}"
except Exception as e:
logger.exception(f"Error duplicating space from {source_repo_id} to {target_repo_id}:")
return f"Error duplicating space: {e}"
def list_user_spaces(hf_api_key, owner=None):
"""Lists spaces for the authenticated user or a specific owner/org."""
logger.info(f"Attempting to list spaces for owner: {owner or 'authenticated user'}.")
try:
token, err = _get_api_token(hf_api_key)
if err or not token:
logger.error(f"Token error listing spaces: {err or 'Token not found'}")
return None, f"Error getting token: {err or 'Token not found.'}"
effective_owner = owner
if not effective_owner:
try:
user_info = whoami(token=token)
effective_owner = user_info.get('name')
if not effective_owner: raise Exception("Could not determine owner from token.")
logger.info(f"Listing spaces for auto-detected owner: {effective_owner}")
except Exception as e:
logger.error(f"Could not determine owner from token for listing: {e}")
return None, f"Error auto-detecting owner for listing: {e}. Please specify Owner field."
api = HfApi(token=token)
spaces = api.list_spaces(author=effective_owner)
#datasets = api.list_datasets(author=effective_owner)
#models = api.list_models(author=effective_owner)
space_ids = [f"{r.author}/{r.id}" for r in spaces]
logger.info(f"Successfully listed {len(space_ids)} spaces for {effective_owner}.")
return space_ids, None
except HfHubHTTPError as e_http:
logger.error(f"HTTP error listing spaces for {owner or 'authenticated user'}: {e_http}")
status_code = e_http.response.status_code if e_http.response else 'N/A'
if status_code == 404:
return [], f"HTTP Error ({status_code}): Owner '{owner}' not found or has no accessible spaces."
if status_code in (401, 403):
return [], f"HTTP Error ({status_code}): Access denied or authentication required for listing spaces for '{owner}'. Check token permissions."
return None, f"HTTP Error ({status_code}) listing spaces for '{owner or 'authenticated user'}': {e_http.response.text if e_http.response else str(e_http)}"
except Exception as e:
logger.exception(f"Error listing spaces for {owner or 'authenticated user'}:")
return None, f"Error listing spaces: {e}"