File size: 6,453 Bytes
246d201
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
import importlib.util
import os

from joblib import Parallel, delayed

from openhands.core.config import LLMConfig
from openhands.core.logger import openhands_logger as logger

try:
    # check if those we need later are available using importlib
    if importlib.util.find_spec('chromadb') is None:
        raise ImportError(
            'chromadb is not available. Please install it using poetry install --with llama-index'
        )

    if (
        importlib.util.find_spec(
            'llama_index.core.indices.vector_store.retrievers.retriever'
        )
        is None
        or importlib.util.find_spec('llama_index.core.indices.vector_store.base')
        is None
    ):
        raise ImportError(
            'llama_index is not available. Please install it using poetry install --with llama-index'
        )

    from llama_index.core import Document, VectorStoreIndex
    from llama_index.core.base.embeddings.base import BaseEmbedding
    from llama_index.core.ingestion import IngestionPipeline
    from llama_index.core.schema import TextNode

    LLAMA_INDEX_AVAILABLE = True

except ImportError:
    LLAMA_INDEX_AVAILABLE = False

# Define supported embedding models
SUPPORTED_OLLAMA_EMBED_MODELS = [
    'llama2',
    'mxbai-embed-large',
    'nomic-embed-text',
    'all-minilm',
    'stable-code',
    'bge-m3',
    'bge-large',
    'paraphrase-multilingual',
    'snowflake-arctic-embed',
]


def check_llama_index():
    """Utility function to check the availability of llama_index.



    Raises:

        ImportError: If llama_index is not available.

    """
    if not LLAMA_INDEX_AVAILABLE:
        raise ImportError(
            'llama_index and its dependencies are not installed. '
            'To use memory features, please run: poetry install --with llama-index.'
        )


class EmbeddingsLoader:
    """Loader for embedding model initialization."""

    @staticmethod
    def get_embedding_model(strategy: str, llm_config: LLMConfig) -> 'BaseEmbedding':
        """Initialize and return the appropriate embedding model based on the strategy.



        Parameters:

        - strategy: The embedding strategy to use.

        - llm_config: Configuration for the LLM.



        Returns:

        - An instance of the selected embedding model or None.

        """

        if strategy in SUPPORTED_OLLAMA_EMBED_MODELS:
            from llama_index.embeddings.ollama import OllamaEmbedding

            return OllamaEmbedding(
                model_name=strategy,
                base_url=llm_config.embedding_base_url,
                ollama_additional_kwargs={'mirostat': 0},
            )
        elif strategy == 'openai':
            from llama_index.embeddings.openai import OpenAIEmbedding

            return OpenAIEmbedding(
                model='text-embedding-ada-002',
                api_key=llm_config.api_key.get_secret_value()
                if llm_config.api_key
                else None,
            )
        elif strategy == 'azureopenai':
            from llama_index.embeddings.azure_openai import AzureOpenAIEmbedding

            return AzureOpenAIEmbedding(
                model='text-embedding-ada-002',
                deployment_name=llm_config.embedding_deployment_name,
                api_key=llm_config.api_key,
                azure_endpoint=llm_config.base_url,
                api_version=llm_config.api_version,
            )
        elif strategy == 'voyage':
            from llama_index.embeddings.voyageai import VoyageEmbedding

            return VoyageEmbedding(
                model_name='voyage-code-3',
            )
        elif (strategy is not None) and (strategy.lower() == 'none'):
            # TODO: this works but is not elegant enough. The incentive is when
            # an agent using embeddings is not used, there is no reason we need to
            # initialize an embedding model
            return None
        else:
            from llama_index.embeddings.huggingface import HuggingFaceEmbedding

            # initialize the local embedding model
            local_embed_model = HuggingFaceEmbedding(
                model_name='BAAI/bge-small-en-v1.5'
            )

            # for local embeddings, we need torch
            import torch

            # choose the best device
            # first determine what is available: CUDA, MPS, or CPU
            if torch.cuda.is_available():
                device = 'cuda'
            elif torch.backends.mps.is_available() and torch.backends.mps.is_built():
                device = 'mps'
            else:
                device = 'cpu'
                os.environ['CUDA_VISIBLE_DEVICES'] = ''
                os.environ['PYTORCH_FORCE_CPU'] = (
                    '1'  # try to force CPU to avoid errors
                )

                # override CUDA availability
                torch.cuda.is_available = lambda: False

            # disable MPS to avoid errors
            if device != 'mps' and hasattr(torch.backends, 'mps'):
                torch.backends.mps.is_available = lambda: False
                torch.backends.mps.is_built = False

            # the device being used
            logger.debug(f'Using device for embeddings: {device}')

            return local_embed_model


# --------------------------------------------------------------------------
# Utility functions to run pipelines, split out for profiling
# --------------------------------------------------------------------------
def run_pipeline(

    embed_model: 'BaseEmbedding', documents: list['Document'], num_workers: int

) -> list['TextNode']:
    """Run a pipeline embedding documents."""

    # set up a pipeline with the transformations to make
    pipeline = IngestionPipeline(
        transformations=[
            embed_model,
        ],
    )

    # run the pipeline with num_workers
    nodes = pipeline.run(
        documents=documents, show_progress=True, num_workers=num_workers
    )
    return nodes


def insert_batch_docs(

    index: 'VectorStoreIndex', documents: list['Document'], num_workers: int

) -> list['TextNode']:
    """Run the document indexing in parallel."""
    results = Parallel(n_jobs=num_workers, backend='threading')(
        delayed(index.insert)(doc) for doc in documents
    )
    return results