File size: 6,453 Bytes
246d201 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 |
import importlib.util
import os
from joblib import Parallel, delayed
from openhands.core.config import LLMConfig
from openhands.core.logger import openhands_logger as logger
try:
# check if those we need later are available using importlib
if importlib.util.find_spec('chromadb') is None:
raise ImportError(
'chromadb is not available. Please install it using poetry install --with llama-index'
)
if (
importlib.util.find_spec(
'llama_index.core.indices.vector_store.retrievers.retriever'
)
is None
or importlib.util.find_spec('llama_index.core.indices.vector_store.base')
is None
):
raise ImportError(
'llama_index is not available. Please install it using poetry install --with llama-index'
)
from llama_index.core import Document, VectorStoreIndex
from llama_index.core.base.embeddings.base import BaseEmbedding
from llama_index.core.ingestion import IngestionPipeline
from llama_index.core.schema import TextNode
LLAMA_INDEX_AVAILABLE = True
except ImportError:
LLAMA_INDEX_AVAILABLE = False
# Define supported embedding models
SUPPORTED_OLLAMA_EMBED_MODELS = [
'llama2',
'mxbai-embed-large',
'nomic-embed-text',
'all-minilm',
'stable-code',
'bge-m3',
'bge-large',
'paraphrase-multilingual',
'snowflake-arctic-embed',
]
def check_llama_index():
"""Utility function to check the availability of llama_index.
Raises:
ImportError: If llama_index is not available.
"""
if not LLAMA_INDEX_AVAILABLE:
raise ImportError(
'llama_index and its dependencies are not installed. '
'To use memory features, please run: poetry install --with llama-index.'
)
class EmbeddingsLoader:
"""Loader for embedding model initialization."""
@staticmethod
def get_embedding_model(strategy: str, llm_config: LLMConfig) -> 'BaseEmbedding':
"""Initialize and return the appropriate embedding model based on the strategy.
Parameters:
- strategy: The embedding strategy to use.
- llm_config: Configuration for the LLM.
Returns:
- An instance of the selected embedding model or None.
"""
if strategy in SUPPORTED_OLLAMA_EMBED_MODELS:
from llama_index.embeddings.ollama import OllamaEmbedding
return OllamaEmbedding(
model_name=strategy,
base_url=llm_config.embedding_base_url,
ollama_additional_kwargs={'mirostat': 0},
)
elif strategy == 'openai':
from llama_index.embeddings.openai import OpenAIEmbedding
return OpenAIEmbedding(
model='text-embedding-ada-002',
api_key=llm_config.api_key.get_secret_value()
if llm_config.api_key
else None,
)
elif strategy == 'azureopenai':
from llama_index.embeddings.azure_openai import AzureOpenAIEmbedding
return AzureOpenAIEmbedding(
model='text-embedding-ada-002',
deployment_name=llm_config.embedding_deployment_name,
api_key=llm_config.api_key,
azure_endpoint=llm_config.base_url,
api_version=llm_config.api_version,
)
elif strategy == 'voyage':
from llama_index.embeddings.voyageai import VoyageEmbedding
return VoyageEmbedding(
model_name='voyage-code-3',
)
elif (strategy is not None) and (strategy.lower() == 'none'):
# TODO: this works but is not elegant enough. The incentive is when
# an agent using embeddings is not used, there is no reason we need to
# initialize an embedding model
return None
else:
from llama_index.embeddings.huggingface import HuggingFaceEmbedding
# initialize the local embedding model
local_embed_model = HuggingFaceEmbedding(
model_name='BAAI/bge-small-en-v1.5'
)
# for local embeddings, we need torch
import torch
# choose the best device
# first determine what is available: CUDA, MPS, or CPU
if torch.cuda.is_available():
device = 'cuda'
elif torch.backends.mps.is_available() and torch.backends.mps.is_built():
device = 'mps'
else:
device = 'cpu'
os.environ['CUDA_VISIBLE_DEVICES'] = ''
os.environ['PYTORCH_FORCE_CPU'] = (
'1' # try to force CPU to avoid errors
)
# override CUDA availability
torch.cuda.is_available = lambda: False
# disable MPS to avoid errors
if device != 'mps' and hasattr(torch.backends, 'mps'):
torch.backends.mps.is_available = lambda: False
torch.backends.mps.is_built = False
# the device being used
logger.debug(f'Using device for embeddings: {device}')
return local_embed_model
# --------------------------------------------------------------------------
# Utility functions to run pipelines, split out for profiling
# --------------------------------------------------------------------------
def run_pipeline(
embed_model: 'BaseEmbedding', documents: list['Document'], num_workers: int
) -> list['TextNode']:
"""Run a pipeline embedding documents."""
# set up a pipeline with the transformations to make
pipeline = IngestionPipeline(
transformations=[
embed_model,
],
)
# run the pipeline with num_workers
nodes = pipeline.run(
documents=documents, show_progress=True, num_workers=num_workers
)
return nodes
def insert_batch_docs(
index: 'VectorStoreIndex', documents: list['Document'], num_workers: int
) -> list['TextNode']:
"""Run the document indexing in parallel."""
results = Parallel(n_jobs=num_workers, backend='threading')(
delayed(index.insert)(doc) for doc in documents
)
return results
|