from dataclasses import dataclass import asyncio import numpy as np class UnlimitedSemaphore: """A context manager that allows unlimited access.""" async def __aenter__(self): pass async def __aexit__(self, exc_type, exc, tb): pass @dataclass class EmbeddingFunc: embedding_dim: int max_token_size: int func: callable concurrent_limit: int = 16 def __post_init__(self): if self.concurrent_limit != 0: self._semaphore = asyncio.Semaphore(self.concurrent_limit) else: self._semaphore = UnlimitedSemaphore() async def __call__(self, *args, **kwargs) -> np.ndarray: async with self._semaphore: return await self.func(*args, **kwargs)