Spaces:
Runtime error
Runtime error
import json | |
import elasticache_auto_discovery | |
from pymemcache.client.hash import HashClient | |
from inference.core.env import ELASTICACHE_ENDPOINT | |
from inference.core.logger import logger | |
nodes = elasticache_auto_discovery.discover(ELASTICACHE_ENDPOINT) | |
# set up memcache | |
nodes = map(lambda x: (x[1], int(x[2])), nodes) | |
memcache_client = HashClient(nodes) | |
def trackUsage(endpoint, actor, n=1): | |
"""Tracks the usage of an endpoint by an actor. | |
This function increments the usage count for a given endpoint by an actor. | |
It also handles initialization if the count does not exist. | |
Args: | |
endpoint (str): The endpoint being accessed. | |
actor (str): The actor accessing the endpoint. | |
n (int, optional): The number of times the endpoint was accessed. Defaults to 1. | |
Returns: | |
None: This function does not return anything but updates the memcache client. | |
""" | |
# count an inference | |
try: | |
job = endpoint + "endpoint:::actor" + actor | |
current_infers = memcache_client.incr(job, n) | |
if current_infers is None: # not yet set; initialize at 1 | |
memcache_client.set(job, n) | |
current_infers = n | |
# store key | |
job_keys = memcache_client.get("JOB_KEYS") | |
if job_keys is None: | |
memcache_client.add("JOB_KEYS", json.dumps([job])) | |
else: | |
decoded = json.loads(job_keys) | |
decoded.append(job) | |
decoded = list(set(decoded)) | |
memcache_client.set("JOB_KEYS", json.dumps(decoded)) | |
actor_keys = memcache_client.get("ACTOR_KEYS") | |
if actor_keys is None: | |
ak = {} | |
ak[actor] = n | |
memcache_client.add("ACTOR_KEYS", json.dumps(ak)) | |
else: | |
decoded = json.loads(actor_keys) | |
if actor in actor_keys: | |
actor_keys[actor] += n | |
else: | |
actor_keys[actor] = n | |
memcache_client.set("ACTOR_KEYS", json.dumps(actor_keys)) | |
except Exception as e: | |
logger.debug("WARNING: there was an error in counting this inference") | |
logger.debug(e) | |