Blazgo commited on
Commit
4e8cc0a
·
verified ·
1 Parent(s): 2a94399

Fix long queue waits with mechanism to prevent running duplicate jobs

Browse files

This PR introduces a mechanism to prevent running duplicate model merge jobs. It tracks active jobs by hashing the YAML configuration and checks if a new job matches an existing one. If a duplicate is detected, the user is prompted to either continue with the new job (canceling the old one) or abort the operation. A duplicate may be caused by a user losing connection to the Space, and the job may be stuck in queue. The next job they place may not finish because the previous job is still stuck. **They may have to wait up to 6 hours (but usually ~2-4) for the Space to restart.**

### Key Changes:
* Active Job Tracking: Jobs are tracked by their YAML configuration hash.
* Duplicate Detection: Before starting a new job, the system checks for duplicates.
* User Prompt: If a duplicate is detected, the user can choose to cancel the old job and continue with the new one.

### Why This Change is Needed:
* Prevents multiple identical merge jobs from running simultaneously, saving resources and avoiding long queues and delays.

Files changed (1) hide show
  1. app.py +33 -3
app.py CHANGED
@@ -1,5 +1,6 @@
1
  import os
2
  import pathlib
 
3
  import random
4
  import string
5
  import tempfile
@@ -113,12 +114,40 @@ examples = [[str(f)] for f in pathlib.Path("examples").glob("*.yaml")]
113
  COMMUNITY_HF_TOKEN = os.getenv("COMMUNITY_HF_TOKEN")
114
 
115
 
 
 
 
 
 
 
 
116
  def merge(yaml_config: str, hf_token: str, repo_name: str) -> Iterable[List[Log]]:
117
  runner = LogsViewRunner()
118
 
119
  if not yaml_config:
120
  yield runner.log("Empty yaml, pick an example below", level="ERROR")
121
  return
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
122
  try:
123
  merge_config = MergeConfiguration.model_validate(yaml.safe_load(yaml_config))
124
  except Exception as e:
@@ -154,7 +183,6 @@ def merge(yaml_config: str, hf_token: str, repo_name: str) -> Iterable[List[Log]
154
  if not repo_name:
155
  yield runner.log("No repo name provided. Generating a random one.")
156
  repo_name = f"mergekit-{merge_config.merge_method}"
157
- # Make repo_name "unique" (no need to be extra careful on uniqueness)
158
  repo_name += "-" + "".join(random.choices(string.ascii_lowercase, k=7))
159
  repo_name = repo_name.replace("/", "-").strip("-")
160
 
@@ -169,8 +197,7 @@ def merge(yaml_config: str, hf_token: str, repo_name: str) -> Iterable[List[Log]
169
  yield runner.log(f"Error creating repo {e}", level="ERROR")
170
  return
171
 
172
- # Set tmp HF_HOME to avoid filling up disk Space
173
- tmp_env = os.environ.copy() # taken from https://stackoverflow.com/a/4453495
174
  tmp_env["HF_HOME"] = f"{tmpdirname}/.cache"
175
  full_cli = cli + f" --lora-merge-cache {tmpdirname}/.lora_cache"
176
  yield from runner.run_command(full_cli.split(), cwd=merged_path, env=tmp_env)
@@ -188,6 +215,9 @@ def merge(yaml_config: str, hf_token: str, repo_name: str) -> Iterable[List[Log]
188
  )
189
  yield runner.log(f"Model successfully uploaded to HF: {repo_url.repo_id}")
190
 
 
 
 
191
  # This is workaround. As the space always getting stuck.
192
  def _restart_space():
193
  huggingface_hub.HfApi().restart_space(repo_id="arcee-ai/mergekit-gui", token=COMMUNITY_HF_TOKEN, factory_reboot=False)
 
1
  import os
2
  import pathlib
3
+ import hashlib
4
  import random
5
  import string
6
  import tempfile
 
114
  COMMUNITY_HF_TOKEN = os.getenv("COMMUNITY_HF_TOKEN")
115
 
116
 
117
+ # A dictionary to store active jobs and their respective job IDs (which will be used to track them)
118
+ active_jobs = {}
119
+
120
+ def get_yaml_hash(yaml_config: str) -> str:
121
+ """Generate a hash for the YAML config to detect duplicates."""
122
+ return hashlib.sha256(yaml_config.encode("utf-8")).hexdigest()
123
+
124
  def merge(yaml_config: str, hf_token: str, repo_name: str) -> Iterable[List[Log]]:
125
  runner = LogsViewRunner()
126
 
127
  if not yaml_config:
128
  yield runner.log("Empty yaml, pick an example below", level="ERROR")
129
  return
130
+
131
+ yaml_hash = get_yaml_hash(yaml_config)
132
+
133
+ # Check if this YAML job is already running
134
+ if yaml_hash in active_jobs:
135
+ old_job_id = active_jobs[yaml_hash]
136
+ yield runner.log(f"Duplicate job detected! An identical job is already running with Job ID: {old_job_id}.", level="WARNING")
137
+ user_input = yield gradio.inputs.Button(label="Continue with new job", info="Do you want to cancel the old job and continue with the new one?")
138
+
139
+ if user_input == "Continue with new job":
140
+ # Cancel the old job and remove it from active jobs
141
+ runner.log(f"Cancelling the old job with Job ID: {old_job_id}")
142
+ # This part assumes you have the ability to cancel the previous job if needed
143
+ # In real implementation, you'd stop the old task/process here
144
+ active_jobs.pop(yaml_hash) # Remove the old job from the active jobs list
145
+ else:
146
+ # If user chooses not to continue, exit
147
+ yield runner.log("Duplicate job detected. Operation aborted.", level="ERROR")
148
+ return
149
+
150
+ # Proceed with the merge
151
  try:
152
  merge_config = MergeConfiguration.model_validate(yaml.safe_load(yaml_config))
153
  except Exception as e:
 
183
  if not repo_name:
184
  yield runner.log("No repo name provided. Generating a random one.")
185
  repo_name = f"mergekit-{merge_config.merge_method}"
 
186
  repo_name += "-" + "".join(random.choices(string.ascii_lowercase, k=7))
187
  repo_name = repo_name.replace("/", "-").strip("-")
188
 
 
197
  yield runner.log(f"Error creating repo {e}", level="ERROR")
198
  return
199
 
200
+ tmp_env = os.environ.copy()
 
201
  tmp_env["HF_HOME"] = f"{tmpdirname}/.cache"
202
  full_cli = cli + f" --lora-merge-cache {tmpdirname}/.lora_cache"
203
  yield from runner.run_command(full_cli.split(), cwd=merged_path, env=tmp_env)
 
215
  )
216
  yield runner.log(f"Model successfully uploaded to HF: {repo_url.repo_id}")
217
 
218
+ # Track this YAML as an active job
219
+ active_jobs[yaml_hash] = "new_job_id"
220
+
221
  # This is workaround. As the space always getting stuck.
222
  def _restart_space():
223
  huggingface_hub.HfApi().restart_space(repo_id="arcee-ai/mergekit-gui", token=COMMUNITY_HF_TOKEN, factory_reboot=False)