broadfield-dev commited on
Commit
fe71f92
·
verified ·
1 Parent(s): ba9e0a6

Update app_logic.py

Browse files
Files changed (1) hide show
  1. app_logic.py +86 -126
app_logic.py CHANGED
@@ -11,9 +11,8 @@ from huggingface_hub import (
11
  )
12
  import logging
13
  from pathlib import Path
14
- from PIL import Image # For type hinting from gr.Image(type="pil")
15
 
16
- # Attempt to import keylock_decode
17
  try:
18
  from keylock_decode import decode_from_image_pil
19
  KEYLOCK_DECODE_AVAILABLE = True
@@ -28,8 +27,7 @@ logging.basicConfig(
28
  )
29
  logger = logging.getLogger(__name__)
30
 
31
-
32
- # --- Helper Function to Get API Token (Unchanged from previous version) ---
33
  def _get_api_token(ui_token_from_textbox=None):
34
  env_token = os.getenv('HF_TOKEN')
35
  if env_token:
@@ -41,185 +39,147 @@ def _get_api_token(ui_token_from_textbox=None):
41
  logger.warning("HF API token not found in environment or UI textbox.")
42
  return None, "Error: Hugging Face API token not provided. Please enter it in the textbox or load it from an image."
43
 
44
- # --- Updated Function for KeyLock-Decode Integration with Debugging ---
45
  def load_token_from_image_and_set_env(image_pil_object: Image.Image, password: str):
46
  if not KEYLOCK_DECODE_AVAILABLE:
47
  return "Error: KeyLock-Decode library is not installed. This feature is disabled."
48
-
49
- if image_pil_object is None:
50
- return "Error: No image provided for decoding."
51
- if not password:
52
- return "Error: Password cannot be empty for image decoding."
53
-
54
  status_messages_display = []
55
-
56
- # --- DEBUGGING: Check image properties and save it ---
57
  debug_image_path_str = "Not saved (error or no image)."
58
  try:
59
  if image_pil_object:
60
- # Log original properties
61
  original_mode = getattr(image_pil_object, 'mode', 'N/A')
62
- original_format = getattr(image_pil_object, 'format', 'N/A') # Format might be None for PIL object not read from file
63
  original_size = getattr(image_pil_object, 'size', 'N/A')
64
  logger.info(f"Received PIL Image from Gradio. Mode: {original_mode}, Original Format: {original_format}, Size: {original_size}")
65
  status_messages_display.append(f"[DEBUG] Gradio provided PIL Image - Mode: {original_mode}, Original Format: {original_format}, Size: {original_size}")
66
-
67
- # Save the image received from Gradio to a temporary file for external checking
68
- # Ensure the temp directory is writable in the Hugging Face Space environment
69
- # /tmp/ is usually writable.
70
  temp_dir = Path(tempfile.gettempdir()) / "gradio_image_debug"
71
  temp_dir.mkdir(parents=True, exist_ok=True)
72
  debug_image_path = temp_dir / "image_from_gradio.png"
73
-
74
- # Save the image. If it's RGBA, try to keep it RGBA.
75
- # PNG is lossless, so this should be okay if the mode is preserved.
76
  image_pil_object.save(debug_image_path, "PNG")
77
  debug_image_path_str = str(debug_image_path)
78
- status_messages_display.append(f"**[DEBUG ACTION REQUIRED]** Image received from Gradio has been saved to: `{debug_image_path_str}` in the Space's temporary filesystem. Please try decoding this *specific file* using the standalone `keylock-decode` CLI to verify its integrity. E.g., `keylock-decode {debug_image_path_str} \"YOUR_PASSWORD\"`")
79
  logger.info(f"Debug image from Gradio saved to: {debug_image_path_str}")
80
  else:
81
- logger.warning("image_pil_object is None when trying to debug-save.")
82
- status_messages_display.append("[DEBUG] No PIL image object received to save for debugging.")
83
-
84
  except Exception as save_exc:
85
  logger.exception("Error during debug-saving of image from Gradio:")
86
- status_messages_display.append(f"[DEBUG] Error saving image for debugging: {str(save_exc)}. Path was: {debug_image_path_str}")
87
- # --- END DEBUGGING ---
88
-
89
  try:
90
- logger.info(f"Attempting to decode from image using KeyLock-Decode...")
91
- # Pass the original PIL object
92
- decoded_data, status_msgs_from_lib = decode_from_image_pil(
93
- image_pil_object, # Use the PIL object directly
94
- password,
95
- set_environment_variables=True
96
- )
97
-
98
  status_messages_display.extend(status_msgs_from_lib)
99
-
100
  if decoded_data:
101
  status_messages_display.append("\n**Decoded Data Summary (sensitive values masked):**")
102
  for key, value in decoded_data.items():
103
  display_value = '********' if any(k_word in key.upper() for k_word in ['TOKEN', 'KEY', 'SECRET', 'PASS']) else value
104
  status_messages_display.append(f"- {key}: {display_value}")
105
-
106
  if os.getenv('HF_TOKEN'):
107
- status_messages_display.append(f"\n**SUCCESS: HF_TOKEN was found and has been set in the current process environment.** It will be prioritized for operations.")
108
  elif 'HF_TOKEN' in decoded_data:
109
- status_messages_display.append(f"\nWarning: HF_TOKEN was in decoded data but os.getenv('HF_TOKEN') is not picking it up. This is unexpected if 'set_environment_variables=True' worked.")
110
  else:
111
- status_messages_display.append("\nNote: HF_TOKEN was not specifically found in the decoded data from the image.")
112
- # else:
113
- # if not any("No payload data found" in msg for msg in status_msgs_from_lib) and not any("Failed to decrypt" in msg for msg in status_msgs_from_lib): # Avoid redundant messages
114
- # status_messages_display.append("No data was decoded, or the decoded data was empty (and no specific error from library).")
115
-
116
-
117
  except ValueError as e:
118
- logger.error(f"KeyLock-Decode ValueError: {e}")
119
  status_messages_display.append(f"**Decoding Error (e.g., bad password, corrupted data):** {e}")
120
  except Exception as e:
121
- logger.exception("An unexpected error occurred during image decoding with KeyLock-Decode:")
122
  status_messages_display.append(f"**An unexpected error occurred during decoding:** {str(e)}")
123
-
124
  return "\n".join(status_messages_display)
125
 
126
 
127
- # --- `parse_markdown` (Unchanged) ---
128
  def parse_markdown(markdown_input):
129
- """Parse markdown input to extract space details and file structure."""
 
 
 
130
  space_info = {"repo_name_md": "", "owner_md": "", "files": []}
131
- current_file = None
132
- file_content = []
133
- in_file_content = False
134
- in_code_block = False
 
 
 
 
135
 
136
  lines = markdown_input.strip().split("\n")
137
- for line_idx, line_content_orig in enumerate(lines):
138
- line_content_stripped = line_content_orig.strip()
139
 
140
- if in_file_content:
141
- if line_content_stripped.startswith("```"):
142
- if in_code_block:
143
- file_content.append(line_content_orig)
144
- in_code_block = False
145
- else:
146
- in_code_block = True
147
- file_content.append(line_content_orig)
148
- elif in_code_block:
149
- file_content.append(line_content_orig)
150
- elif not in_code_block:
151
- if line_content_stripped.startswith("### File:") or line_content_stripped.startswith("## File Structure") or line_content_stripped.startswith("# Space:"):
152
- if current_file and file_content:
153
- space_info["files"].append({"path": current_file, "content": "\n".join(file_content)})
154
- current_file = None
155
- file_content = []
156
- in_file_content = False
157
- else:
158
- file_content.append(line_content_orig)
159
 
160
- if line_content_stripped.startswith("# Space:"):
161
- if current_file and file_content:
162
- space_info["files"].append({"path": current_file, "content": "\n".join(file_content)})
163
- full_space_name_md = line_content_stripped.replace("# Space:", "").strip()
164
- if "/" in full_space_name_md:
165
- space_info["owner_md"], space_info["repo_name_md"] = full_space_name_md.split("/", 1)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
166
  else:
167
- space_info["repo_name_md"] = full_space_name_md
168
- current_file = None
169
- file_content = []
170
- in_file_content = False
171
- in_code_block = False
172
-
173
- elif line_content_stripped.startswith("## File Structure"):
174
- if current_file and file_content:
175
- space_info["files"].append({"path": current_file, "content": "\n".join(file_content)})
176
- current_file = None
177
- file_content = []
178
- in_file_content = False
179
- in_code_block = False
180
- continue
181
-
182
- elif line_content_stripped.startswith("### File:"):
183
- if current_file and file_content:
184
- space_info["files"].append({"path": current_file, "content": "\n".join(file_content)})
185
-
186
- current_file = line_content_stripped.replace("### File:", "").strip()
187
- file_content = []
188
- in_file_content = True
189
- in_code_block = False
190
-
191
- if current_file and file_content:
192
- space_info["files"].append({"path": current_file, "content": "\n".join(file_content)})
193
-
194
  space_info["files"] = [f for f in space_info["files"] if f.get("path")]
195
  return space_info
196
 
 
197
  # --- `_determine_repo_id` (Unchanged) ---
198
  def _determine_repo_id(ui_api_token_from_textbox, space_name_ui, owner_ui):
199
- if not space_name_ui:
200
- return None, "Error: Space Name cannot be empty."
201
- if "/" in space_name_ui:
202
- return None, "Error: Space Name should not contain '/'. Please use the Owner field for the namespace."
203
-
204
  final_owner = owner_ui; error_message = None
205
  if not final_owner:
206
  resolved_api_token, token_err = _get_api_token(ui_api_token_from_textbox)
207
  if token_err: return None, token_err
208
- if not resolved_api_token: return None, "Error: API token required for auto owner determination (internal check failed)."
209
  try:
210
  user_info = whoami(token=resolved_api_token)
211
- if user_info and 'name' in user_info:
212
- final_owner = user_info['name']
213
- logger.info(f"Determined owner: {final_owner} using API token.")
214
- else:
215
- error_message = "Error: Could not retrieve username from API token. Check permissions or specify Owner."
216
- except Exception as e:
217
- error_message = f"Error retrieving username from API token: {str(e)}. Specify Owner manually."
218
  if error_message: return None, error_message
219
  if not final_owner: return None, "Error: Owner could not be determined."
220
  return f"{final_owner}/{space_name_ui}", None
221
 
222
- # --- Core Functions: `create_space`, `view_space_files`, `update_space_file` (Unchanged from previous correct version) ---
223
  def create_space(ui_api_token_from_textbox, space_name_ui, owner_ui, sdk_ui, markdown_input):
224
  repo_id_for_error_logging = f"{owner_ui}/{space_name_ui}" if owner_ui else space_name_ui
225
  try:
@@ -228,7 +188,7 @@ def create_space(ui_api_token_from_textbox, space_name_ui, owner_ui, sdk_ui, mar
228
  repo_id, err = _determine_repo_id(ui_api_token_from_textbox, space_name_ui, owner_ui)
229
  if err: return err
230
  repo_id_for_error_logging = repo_id
231
- space_info = parse_markdown(markdown_input)
232
  if not space_info["files"]: return "Error: No files found in markdown. Use '### File: path/to/file.ext'."
233
  with tempfile.TemporaryDirectory() as temp_dir:
234
  repo_local_path = Path(temp_dir) / "repo_upload_content"
@@ -237,7 +197,7 @@ def create_space(ui_api_token_from_textbox, space_name_ui, owner_ui, sdk_ui, mar
237
  if not file_info.get("path"): continue
238
  file_path_abs = repo_local_path / file_info["path"]
239
  file_path_abs.parent.mkdir(parents=True, exist_ok=True)
240
- with open(file_path_abs, "w", encoding="utf-8") as f: f.write(file_info["content"])
241
  try:
242
  create_repo(repo_id=repo_id, token=resolved_api_token, repo_type="space", space_sdk=sdk_ui, private=False)
243
  except Exception as e:
 
11
  )
12
  import logging
13
  from pathlib import Path
14
+ from PIL import Image
15
 
 
16
  try:
17
  from keylock_decode import decode_from_image_pil
18
  KEYLOCK_DECODE_AVAILABLE = True
 
27
  )
28
  logger = logging.getLogger(__name__)
29
 
30
+ # --- Helper Function to Get API Token (Unchanged) ---
 
31
  def _get_api_token(ui_token_from_textbox=None):
32
  env_token = os.getenv('HF_TOKEN')
33
  if env_token:
 
39
  logger.warning("HF API token not found in environment or UI textbox.")
40
  return None, "Error: Hugging Face API token not provided. Please enter it in the textbox or load it from an image."
41
 
42
+ # --- `load_token_from_image_and_set_env` (Unchanged from previous debug version) ---
43
  def load_token_from_image_and_set_env(image_pil_object: Image.Image, password: str):
44
  if not KEYLOCK_DECODE_AVAILABLE:
45
  return "Error: KeyLock-Decode library is not installed. This feature is disabled."
46
+ if image_pil_object is None: return "Error: No image provided for decoding."
47
+ if not password: return "Error: Password cannot be empty for image decoding."
 
 
 
 
48
  status_messages_display = []
 
 
49
  debug_image_path_str = "Not saved (error or no image)."
50
  try:
51
  if image_pil_object:
 
52
  original_mode = getattr(image_pil_object, 'mode', 'N/A')
53
+ original_format = getattr(image_pil_object, 'format', 'N/A')
54
  original_size = getattr(image_pil_object, 'size', 'N/A')
55
  logger.info(f"Received PIL Image from Gradio. Mode: {original_mode}, Original Format: {original_format}, Size: {original_size}")
56
  status_messages_display.append(f"[DEBUG] Gradio provided PIL Image - Mode: {original_mode}, Original Format: {original_format}, Size: {original_size}")
 
 
 
 
57
  temp_dir = Path(tempfile.gettempdir()) / "gradio_image_debug"
58
  temp_dir.mkdir(parents=True, exist_ok=True)
59
  debug_image_path = temp_dir / "image_from_gradio.png"
 
 
 
60
  image_pil_object.save(debug_image_path, "PNG")
61
  debug_image_path_str = str(debug_image_path)
62
+ status_messages_display.append(f"**[DEBUG ACTION REQUIRED]** Image from Gradio saved to: `{debug_image_path_str}`. Try decoding this file with the standalone `keylock-decode` CLI.")
63
  logger.info(f"Debug image from Gradio saved to: {debug_image_path_str}")
64
  else:
65
+ status_messages_display.append("[DEBUG] No PIL image object received for debug-saving.")
 
 
66
  except Exception as save_exc:
67
  logger.exception("Error during debug-saving of image from Gradio:")
68
+ status_messages_display.append(f"[DEBUG] Error saving image for debugging: {str(save_exc)}. Path: {debug_image_path_str}")
 
 
69
  try:
70
+ decoded_data, status_msgs_from_lib = decode_from_image_pil(image_pil_object, password, set_environment_variables=True)
 
 
 
 
 
 
 
71
  status_messages_display.extend(status_msgs_from_lib)
 
72
  if decoded_data:
73
  status_messages_display.append("\n**Decoded Data Summary (sensitive values masked):**")
74
  for key, value in decoded_data.items():
75
  display_value = '********' if any(k_word in key.upper() for k_word in ['TOKEN', 'KEY', 'SECRET', 'PASS']) else value
76
  status_messages_display.append(f"- {key}: {display_value}")
 
77
  if os.getenv('HF_TOKEN'):
78
+ status_messages_display.append(f"\n**SUCCESS: HF_TOKEN was found and set in environment.**")
79
  elif 'HF_TOKEN' in decoded_data:
80
+ status_messages_display.append(f"\nWarning: HF_TOKEN decoded but os.getenv('HF_TOKEN') not found.")
81
  else:
82
+ status_messages_display.append("\nNote: HF_TOKEN not specifically found in decoded image data.")
 
 
 
 
 
83
  except ValueError as e:
 
84
  status_messages_display.append(f"**Decoding Error (e.g., bad password, corrupted data):** {e}")
85
  except Exception as e:
 
86
  status_messages_display.append(f"**An unexpected error occurred during decoding:** {str(e)}")
 
87
  return "\n".join(status_messages_display)
88
 
89
 
90
+ # --- Updated `parse_markdown` function ---
91
  def parse_markdown(markdown_input):
92
+ """
93
+ Parse markdown input to extract space details and file structure.
94
+ Correctly handles code blocks by excluding the ``` fences.
95
+ """
96
  space_info = {"repo_name_md": "", "owner_md": "", "files": []}
97
+ current_file_path = None
98
+ current_file_content_lines = []
99
+
100
+ # State machine variables
101
+ # in_file_definition: True when we are after "### File: " and before the next "### File: " or end of input.
102
+ # in_code_block: True when we are between ``` and ```.
103
+ in_file_definition = False
104
+ in_code_block = False
105
 
106
  lines = markdown_input.strip().split("\n")
 
 
107
 
108
+ for line_content_orig in lines:
109
+ line_content_stripped = line_content_orig.strip()
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
110
 
111
+ # Check for new file definition first
112
+ if line_content_stripped.startswith("### File:"):
113
+ # Save previous file's content if any
114
+ if current_file_path and in_file_definition:
115
+ space_info["files"].append({
116
+ "path": current_file_path,
117
+ "content": "\n".join(current_file_content_lines)
118
+ })
119
+ current_file_content_lines = [] # Reset for the new file
120
+
121
+ current_file_path = line_content_stripped.replace("### File:", "").strip()
122
+ in_file_definition = True
123
+ in_code_block = False # Reset code block state for a new file
124
+ continue # Move to next line, don't process "### File:" as content
125
+
126
+ # If we are not in any file definition, skip lines until we find one
127
+ if not in_file_definition:
128
+ if line_content_stripped.startswith("# Space:"): # Parse space name from markdown
129
+ full_space_name_md = line_content_stripped.replace("# Space:", "").strip()
130
+ if "/" in full_space_name_md:
131
+ space_info["owner_md"], space_info["repo_name_md"] = full_space_name_md.split("/", 1)
132
+ else:
133
+ space_info["repo_name_md"] = full_space_name_md
134
+ # Other top-level constructs like "## File Structure" can be ignored or parsed if needed
135
+ continue
136
+
137
+ # Now, we are definitely within a "### File: ..." block's content area
138
+
139
+ # Handle code block fences ```
140
+ if line_content_stripped.startswith("```"):
141
+ if in_code_block:
142
+ in_code_block = False # Exiting a code block
143
  else:
144
+ in_code_block = True # Entering a code block
145
+ # In either case, we *don't* append the ``` line itself to content
146
+ continue # Move to next line
147
+
148
+ # If we are inside a code block OR not (i.e., plain text lines for the file),
149
+ # append the original line (preserving leading/trailing whitespace for that line).
150
+ current_file_content_lines.append(line_content_orig)
151
+
152
+ # After loop, append the last file's content if any
153
+ if current_file_path and in_file_definition: # Ensure we were actually defining a file
154
+ space_info["files"].append({
155
+ "path": current_file_path,
156
+ "content": "\n".join(current_file_content_lines)
157
+ })
158
+
159
+ # Filter out any entries that might have been added with no path (shouldn't happen with current logic)
 
 
 
 
 
 
 
 
 
 
 
160
  space_info["files"] = [f for f in space_info["files"] if f.get("path")]
161
  return space_info
162
 
163
+
164
  # --- `_determine_repo_id` (Unchanged) ---
165
  def _determine_repo_id(ui_api_token_from_textbox, space_name_ui, owner_ui):
166
+ if not space_name_ui: return None, "Error: Space Name cannot be empty."
167
+ if "/" in space_name_ui: return None, "Error: Space Name should not contain '/'. Use Owner field."
 
 
 
168
  final_owner = owner_ui; error_message = None
169
  if not final_owner:
170
  resolved_api_token, token_err = _get_api_token(ui_api_token_from_textbox)
171
  if token_err: return None, token_err
172
+ if not resolved_api_token: return None, "Error: API token required for auto owner determination."
173
  try:
174
  user_info = whoami(token=resolved_api_token)
175
+ if user_info and 'name' in user_info: final_owner = user_info['name']
176
+ else: error_message = "Error: Could not retrieve username. Check token/permissions or specify Owner."
177
+ except Exception as e: error_message = f"Error retrieving username: {str(e)}. Specify Owner."
 
 
 
 
178
  if error_message: return None, error_message
179
  if not final_owner: return None, "Error: Owner could not be determined."
180
  return f"{final_owner}/{space_name_ui}", None
181
 
182
+ # --- Core Functions: `create_space`, `view_space_files`, `update_space_file` (Unchanged) ---
183
  def create_space(ui_api_token_from_textbox, space_name_ui, owner_ui, sdk_ui, markdown_input):
184
  repo_id_for_error_logging = f"{owner_ui}/{space_name_ui}" if owner_ui else space_name_ui
185
  try:
 
188
  repo_id, err = _determine_repo_id(ui_api_token_from_textbox, space_name_ui, owner_ui)
189
  if err: return err
190
  repo_id_for_error_logging = repo_id
191
+ space_info = parse_markdown(markdown_input) # Uses the updated parser
192
  if not space_info["files"]: return "Error: No files found in markdown. Use '### File: path/to/file.ext'."
193
  with tempfile.TemporaryDirectory() as temp_dir:
194
  repo_local_path = Path(temp_dir) / "repo_upload_content"
 
197
  if not file_info.get("path"): continue
198
  file_path_abs = repo_local_path / file_info["path"]
199
  file_path_abs.parent.mkdir(parents=True, exist_ok=True)
200
+ with open(file_path_abs, "w", encoding="utf-8") as f: f.write(file_info["content"]) # Content is now clean
201
  try:
202
  create_repo(repo_id=repo_id, token=resolved_api_token, repo_type="space", space_sdk=sdk_ui, private=False)
203
  except Exception as e: