Spaces:
Running
Running
from typing import Any | |
from langchain.schema import Document | |
from langchain_elasticsearch import ElasticsearchStore | |
from loguru import logger | |
from langflow.base.vectorstores.model import LCVectorStoreComponent, check_cached_vector_store | |
from langflow.io import ( | |
DataInput, | |
DropdownInput, | |
FloatInput, | |
HandleInput, | |
IntInput, | |
MultilineInput, | |
SecretStrInput, | |
StrInput, | |
) | |
from langflow.schema import Data | |
class ElasticsearchVectorStoreComponent(LCVectorStoreComponent): | |
"""Elasticsearch Vector Store with with advanced, customizable search capabilities.""" | |
display_name: str = "Elasticsearch" | |
description: str = "Elasticsearch Vector Store with with advanced, customizable search capabilities." | |
documentation = "https://python.langchain.com/docs/integrations/vectorstores/elasticsearch" | |
name = "Elasticsearch" | |
icon = "ElasticsearchStore" | |
inputs = [ | |
StrInput( | |
name="elasticsearch_url", | |
display_name="Elasticsearch URL", | |
value="http://localhost:9200", | |
info="URL for self-managed Elasticsearch deployments (e.g., http://localhost:9200). " | |
"Do not use with Elastic Cloud deployments, use Elastic Cloud ID instead.", | |
), | |
SecretStrInput( | |
name="cloud_id", | |
display_name="Elastic Cloud ID", | |
value="", | |
info="Use this for Elastic Cloud deployments. Do not use together with 'Elasticsearch URL'.", | |
), | |
StrInput( | |
name="index_name", | |
display_name="Index Name", | |
value="langflow", | |
info="The index name where the vectors will be stored in Elasticsearch cluster.", | |
), | |
MultilineInput( | |
name="search_input", | |
display_name="Search Input", | |
info="Enter a search query. Leave empty to retrieve all documents.", | |
), | |
StrInput( | |
name="username", | |
display_name="Username", | |
value="", | |
advanced=False, | |
info=( | |
"Elasticsearch username (e.g., 'elastic'). " | |
"Required for both local and Elastic Cloud setups unless API keys are used." | |
), | |
), | |
SecretStrInput( | |
name="password", | |
display_name="Password", | |
value="", | |
advanced=False, | |
info=( | |
"Elasticsearch password for the specified user. " | |
"Required for both local and Elastic Cloud setups unless API keys are used." | |
), | |
), | |
DataInput( | |
name="ingest_data", | |
display_name="Ingest Data", | |
is_list=True, | |
), | |
HandleInput( | |
name="embedding", | |
display_name="Embedding", | |
input_types=["Embeddings"], | |
), | |
DropdownInput( | |
name="search_type", | |
display_name="Search Type", | |
options=["similarity", "mmr"], | |
value="similarity", | |
advanced=True, | |
), | |
IntInput( | |
name="number_of_results", | |
display_name="Number of Results", | |
info="Number of results to return.", | |
advanced=True, | |
value=4, | |
), | |
FloatInput( | |
name="search_score_threshold", | |
display_name="Search Score Threshold", | |
info="Minimum similarity score threshold for search results.", | |
value=0.0, | |
advanced=True, | |
), | |
SecretStrInput( | |
name="api_key", | |
display_name="Elastic API Key", | |
value="", | |
advanced=True, | |
info="API Key for Elastic Cloud authentication. If used, 'username' and 'password' are not required.", | |
), | |
] | |
def build_vector_store(self) -> ElasticsearchStore: | |
"""Builds the Elasticsearch Vector Store object.""" | |
if self.cloud_id and self.elasticsearch_url: | |
msg = ( | |
"Both 'cloud_id' and 'elasticsearch_url' provided. " | |
"Please use only one based on your deployment (Cloud or Local)." | |
) | |
raise ValueError(msg) | |
es_params = { | |
"index_name": self.index_name, | |
"embedding": self.embedding, | |
"es_user": self.username or None, | |
"es_password": self.password or None, | |
} | |
if self.cloud_id: | |
es_params["es_cloud_id"] = self.cloud_id | |
else: | |
es_params["es_url"] = self.elasticsearch_url | |
if self.api_key: | |
es_params["api_key"] = self.api_key | |
elasticsearch = ElasticsearchStore(**es_params) | |
# If documents are provided, add them to the store | |
if self.ingest_data: | |
documents = self._prepare_documents() | |
if documents: | |
elasticsearch.add_documents(documents) | |
return elasticsearch | |
def _prepare_documents(self) -> list[Document]: | |
"""Prepares documents from the input data to add to the vector store.""" | |
documents = [] | |
for data in self.ingest_data: | |
if isinstance(data, Data): | |
documents.append(data.to_lc_document()) | |
else: | |
error_message = "Vector Store Inputs must be Data objects." | |
logger.error(error_message) | |
raise TypeError(error_message) | |
return documents | |
def _add_documents_to_vector_store(self, vector_store: "ElasticsearchStore") -> None: | |
"""Adds documents to the Vector Store.""" | |
documents = self._prepare_documents() | |
if documents and self.embedding: | |
logger.debug(f"Adding {len(documents)} documents to the Vector Store.") | |
vector_store.add_documents(documents) | |
else: | |
logger.debug("No documents to add to the Vector Store.") | |
def search(self, query: str | None = None) -> list[dict[str, Any]]: | |
"""Search for similar documents in the vector store or retrieve all documents if no query is provided.""" | |
vector_store = self.build_vector_store() | |
search_kwargs = { | |
"k": self.number_of_results, | |
"score_threshold": self.search_score_threshold, | |
} | |
if query: | |
search_type = self.search_type.lower() | |
if search_type not in {"similarity", "mmr"}: | |
msg = f"Invalid search type: {self.search_type}" | |
logger.error(msg) | |
raise ValueError(msg) | |
try: | |
if search_type == "similarity": | |
results = vector_store.similarity_search_with_score(query, **search_kwargs) | |
elif search_type == "mmr": | |
results = vector_store.max_marginal_relevance_search(query, **search_kwargs) | |
except Exception as e: | |
msg = ( | |
"Error occurred while querying the Elasticsearch VectorStore," | |
" there is no Data into the VectorStore." | |
) | |
logger.exception(msg) | |
raise ValueError(msg) from e | |
return [ | |
{"page_content": doc.page_content, "metadata": doc.metadata, "score": score} for doc, score in results | |
] | |
results = self.get_all_documents(vector_store, **search_kwargs) | |
return [{"page_content": doc.page_content, "metadata": doc.metadata, "score": score} for doc, score in results] | |
def get_all_documents(self, vector_store: ElasticsearchStore, **kwargs) -> list[tuple[Document, float]]: | |
"""Retrieve all documents from the vector store.""" | |
client = vector_store.client | |
index_name = self.index_name | |
query = { | |
"query": {"match_all": {}}, | |
"size": kwargs.get("k", self.number_of_results), | |
} | |
response = client.search(index=index_name, body=query) | |
results = [] | |
for hit in response["hits"]["hits"]: | |
doc = Document( | |
page_content=hit["_source"].get("text", ""), | |
metadata=hit["_source"].get("metadata", {}), | |
) | |
score = hit["_score"] | |
results.append((doc, score)) | |
return results | |
def search_documents(self) -> list[Data]: | |
"""Search for documents in the vector store based on the search input. | |
If no search input is provided, retrieve all documents. | |
""" | |
results = self.search(self.search_input) | |
retrieved_data = [ | |
Data( | |
text=result["page_content"], | |
file_path=result["metadata"].get("file_path", ""), | |
) | |
for result in results | |
] | |
self.status = retrieved_data | |
return retrieved_data | |
def get_retriever_kwargs(self): | |
"""Get the keyword arguments for the retriever.""" | |
return { | |
"search_type": self.search_type.lower(), | |
"search_kwargs": { | |
"k": self.number_of_results, | |
"score_threshold": self.search_score_threshold, | |
}, | |
} | |