Spaces:
Runtime error
Runtime error
| # Embeddings_Create.py | |
| # Description: Functions for Creating and managing Embeddings in ChromaDB with LLama.cpp/OpenAI/Transformers | |
| # | |
| # Imports: | |
| import logging | |
| import time | |
| from functools import wraps | |
| from threading import Lock, Timer | |
| from typing import List | |
| # | |
| # 3rd-Party Imports: | |
| import requests | |
| from transformers import AutoTokenizer, AutoModel | |
| import torch | |
| # | |
| # Local Imports: | |
| from App_Function_Libraries.LLM_API_Calls import get_openai_embeddings | |
| from App_Function_Libraries.Utils.Utils import load_comprehensive_config | |
| # | |
| ####################################################################################################################### | |
| # | |
| # Functions: | |
| # FIXME - Add all globals to summarize.py | |
| loaded_config = load_comprehensive_config() | |
| embedding_provider = loaded_config['Embeddings']['embedding_provider'] | |
| embedding_model = loaded_config['Embeddings']['embedding_model'] | |
| embedding_api_url = loaded_config['Embeddings']['embedding_api_url'] | |
| embedding_api_key = loaded_config['Embeddings']['embedding_api_key'] | |
| # Embedding Chunking Settings | |
| chunk_size = loaded_config['Embeddings']['chunk_size'] | |
| overlap = loaded_config['Embeddings']['overlap'] | |
| # FIXME - Add logging | |
| class HuggingFaceEmbedder: | |
| def __init__(self, model_name, timeout_seconds=120): # Default timeout of 2 minutes | |
| self.model_name = model_name | |
| self.tokenizer = None | |
| self.model = None | |
| self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu") | |
| self.timeout_seconds = timeout_seconds | |
| self.last_used_time = 0 | |
| self.unload_timer = None | |
| def load_model(self): | |
| if self.model is None: | |
| self.tokenizer = AutoTokenizer.from_pretrained(self.model_name) | |
| self.model = AutoModel.from_pretrained(self.model_name) | |
| self.model.to(self.device) | |
| self.last_used_time = time.time() | |
| self.reset_timer() | |
| def unload_model(self): | |
| if self.model is not None: | |
| del self.model | |
| del self.tokenizer | |
| if torch.cuda.is_available(): | |
| torch.cuda.empty_cache() | |
| self.model = None | |
| self.tokenizer = None | |
| if self.unload_timer: | |
| self.unload_timer.cancel() | |
| def reset_timer(self): | |
| if self.unload_timer: | |
| self.unload_timer.cancel() | |
| self.unload_timer = Timer(self.timeout_seconds, self.unload_model) | |
| self.unload_timer.start() | |
| def create_embeddings(self, texts): | |
| self.load_model() | |
| inputs = self.tokenizer(texts, return_tensors="pt", padding=True, truncation=True, max_length=512) | |
| inputs = {k: v.to(self.device) for k, v in inputs.items()} | |
| with torch.no_grad(): | |
| outputs = self.model(**inputs) | |
| embeddings = outputs.last_hidden_state.mean(dim=1) | |
| return embeddings.cpu().numpy() | |
| # Global variable to hold the embedder | |
| huggingface_embedder = None | |
| class RateLimiter: | |
| def __init__(self, max_calls, period): | |
| self.max_calls = max_calls | |
| self.period = period | |
| self.calls = [] | |
| self.lock = Lock() | |
| def __call__(self, func): | |
| def wrapper(*args, **kwargs): | |
| with self.lock: | |
| now = time.time() | |
| self.calls = [call for call in self.calls if call > now - self.period] | |
| if len(self.calls) >= self.max_calls: | |
| sleep_time = self.calls[0] - (now - self.period) | |
| time.sleep(sleep_time) | |
| self.calls.append(time.time()) | |
| return func(*args, **kwargs) | |
| return wrapper | |
| def exponential_backoff(max_retries=5, base_delay=1): | |
| def decorator(func): | |
| def wrapper(*args, **kwargs): | |
| for attempt in range(max_retries): | |
| try: | |
| return func(*args, **kwargs) | |
| except Exception as e: | |
| if attempt == max_retries - 1: | |
| raise | |
| delay = base_delay * (2 ** attempt) | |
| logging.warning(f"Attempt {attempt + 1} failed. Retrying in {delay} seconds. Error: {str(e)}") | |
| time.sleep(delay) | |
| return wrapper | |
| return decorator | |
| # FIXME - refactor/setup to use config file & perform chunking | |
| # Adjust these values based on API limits | |
| def create_embeddings_batch(texts: List[str], provider: str, model: str, api_url: str, timeout_seconds: int = 300) -> \ | |
| List[List[float]]: | |
| global huggingface_embedder | |
| if provider.lower() == 'huggingface': | |
| if huggingface_embedder is None or huggingface_embedder.model_name != model: | |
| if huggingface_embedder is not None: | |
| huggingface_embedder.unload_model() | |
| huggingface_embedder = HuggingFaceEmbedder(model, timeout_seconds) | |
| embeddings = huggingface_embedder.create_embeddings(texts).tolist() | |
| return embeddings | |
| elif provider.lower() == 'openai': | |
| logging.debug(f"Creating embeddings for {len(texts)} texts using OpenAI API") | |
| return [create_openai_embedding(text, model) for text in texts] | |
| elif provider.lower() == 'local': | |
| response = requests.post( | |
| api_url, | |
| json={"texts": texts, "model": model}, | |
| headers={"Authorization": f"Bearer {embedding_api_key}"} | |
| ) | |
| if response.status_code == 200: | |
| return response.json()['embeddings'] | |
| else: | |
| raise Exception(f"Error from local API: {response.text}") | |
| else: | |
| raise ValueError(f"Unsupported embedding provider: {provider}") | |
| def create_embedding(text: str, provider: str, model: str, api_url: str) -> List[float]: | |
| return create_embeddings_batch([text], provider, model, api_url)[0] | |
| # FIXME | |
| def create_stella_embeddings(text: str) -> List[float]: | |
| if embedding_provider == 'local': | |
| # Load the model and tokenizer | |
| tokenizer = AutoTokenizer.from_pretrained("dunzhang/stella_en_400M_v5") | |
| model = AutoModel.from_pretrained("dunzhang/stella_en_400M_v5") | |
| # Tokenize and encode the text | |
| inputs = tokenizer(text, return_tensors="pt", padding=True, truncation=True, max_length=512) | |
| # Generate embeddings | |
| with torch.no_grad(): | |
| outputs = model(**inputs) | |
| # Use the mean of the last hidden state as the sentence embedding | |
| embeddings = outputs.last_hidden_state.mean(dim=1) | |
| return embeddings[0].tolist() # Convert to list for consistency | |
| elif embedding_provider == 'openai': | |
| return get_openai_embeddings(text, embedding_model) | |
| else: | |
| raise ValueError(f"Unsupported embedding provider: {embedding_provider}") | |
| def create_openai_embedding(text: str, model: str) -> List[float]: | |
| embedding = get_openai_embeddings(text, model) | |
| return embedding | |
| # | |
| # End of File. | |
| ####################################################################################################################### | |