from typing import Any from langchain.schema import Document from langchain_elasticsearch import ElasticsearchStore from loguru import logger from langflow.base.vectorstores.model import LCVectorStoreComponent, check_cached_vector_store from langflow.io import ( DataInput, DropdownInput, FloatInput, HandleInput, IntInput, MultilineInput, SecretStrInput, StrInput, ) from langflow.schema import Data class ElasticsearchVectorStoreComponent(LCVectorStoreComponent): """Elasticsearch Vector Store with with advanced, customizable search capabilities.""" display_name: str = "Elasticsearch" description: str = "Elasticsearch Vector Store with with advanced, customizable search capabilities." documentation = "https://python.langchain.com/docs/integrations/vectorstores/elasticsearch" name = "Elasticsearch" icon = "ElasticsearchStore" inputs = [ StrInput( name="elasticsearch_url", display_name="Elasticsearch URL", value="http://localhost:9200", info="URL for self-managed Elasticsearch deployments (e.g., http://localhost:9200). " "Do not use with Elastic Cloud deployments, use Elastic Cloud ID instead.", ), SecretStrInput( name="cloud_id", display_name="Elastic Cloud ID", value="", info="Use this for Elastic Cloud deployments. Do not use together with 'Elasticsearch URL'.", ), StrInput( name="index_name", display_name="Index Name", value="langflow", info="The index name where the vectors will be stored in Elasticsearch cluster.", ), MultilineInput( name="search_input", display_name="Search Input", info="Enter a search query. Leave empty to retrieve all documents.", ), StrInput( name="username", display_name="Username", value="", advanced=False, info=( "Elasticsearch username (e.g., 'elastic'). " "Required for both local and Elastic Cloud setups unless API keys are used." ), ), SecretStrInput( name="password", display_name="Password", value="", advanced=False, info=( "Elasticsearch password for the specified user. " "Required for both local and Elastic Cloud setups unless API keys are used." ), ), DataInput( name="ingest_data", display_name="Ingest Data", is_list=True, ), HandleInput( name="embedding", display_name="Embedding", input_types=["Embeddings"], ), DropdownInput( name="search_type", display_name="Search Type", options=["similarity", "mmr"], value="similarity", advanced=True, ), IntInput( name="number_of_results", display_name="Number of Results", info="Number of results to return.", advanced=True, value=4, ), FloatInput( name="search_score_threshold", display_name="Search Score Threshold", info="Minimum similarity score threshold for search results.", value=0.0, advanced=True, ), SecretStrInput( name="api_key", display_name="Elastic API Key", value="", advanced=True, info="API Key for Elastic Cloud authentication. If used, 'username' and 'password' are not required.", ), ] @check_cached_vector_store def build_vector_store(self) -> ElasticsearchStore: """Builds the Elasticsearch Vector Store object.""" if self.cloud_id and self.elasticsearch_url: msg = ( "Both 'cloud_id' and 'elasticsearch_url' provided. " "Please use only one based on your deployment (Cloud or Local)." ) raise ValueError(msg) es_params = { "index_name": self.index_name, "embedding": self.embedding, "es_user": self.username or None, "es_password": self.password or None, } if self.cloud_id: es_params["es_cloud_id"] = self.cloud_id else: es_params["es_url"] = self.elasticsearch_url if self.api_key: es_params["api_key"] = self.api_key elasticsearch = ElasticsearchStore(**es_params) # If documents are provided, add them to the store if self.ingest_data: documents = self._prepare_documents() if documents: elasticsearch.add_documents(documents) return elasticsearch def _prepare_documents(self) -> list[Document]: """Prepares documents from the input data to add to the vector store.""" documents = [] for data in self.ingest_data: if isinstance(data, Data): documents.append(data.to_lc_document()) else: error_message = "Vector Store Inputs must be Data objects." logger.error(error_message) raise TypeError(error_message) return documents def _add_documents_to_vector_store(self, vector_store: "ElasticsearchStore") -> None: """Adds documents to the Vector Store.""" documents = self._prepare_documents() if documents and self.embedding: logger.debug(f"Adding {len(documents)} documents to the Vector Store.") vector_store.add_documents(documents) else: logger.debug("No documents to add to the Vector Store.") def search(self, query: str | None = None) -> list[dict[str, Any]]: """Search for similar documents in the vector store or retrieve all documents if no query is provided.""" vector_store = self.build_vector_store() search_kwargs = { "k": self.number_of_results, "score_threshold": self.search_score_threshold, } if query: search_type = self.search_type.lower() if search_type not in {"similarity", "mmr"}: msg = f"Invalid search type: {self.search_type}" logger.error(msg) raise ValueError(msg) try: if search_type == "similarity": results = vector_store.similarity_search_with_score(query, **search_kwargs) elif search_type == "mmr": results = vector_store.max_marginal_relevance_search(query, **search_kwargs) except Exception as e: msg = ( "Error occurred while querying the Elasticsearch VectorStore," " there is no Data into the VectorStore." ) logger.exception(msg) raise ValueError(msg) from e return [ {"page_content": doc.page_content, "metadata": doc.metadata, "score": score} for doc, score in results ] results = self.get_all_documents(vector_store, **search_kwargs) return [{"page_content": doc.page_content, "metadata": doc.metadata, "score": score} for doc, score in results] def get_all_documents(self, vector_store: ElasticsearchStore, **kwargs) -> list[tuple[Document, float]]: """Retrieve all documents from the vector store.""" client = vector_store.client index_name = self.index_name query = { "query": {"match_all": {}}, "size": kwargs.get("k", self.number_of_results), } response = client.search(index=index_name, body=query) results = [] for hit in response["hits"]["hits"]: doc = Document( page_content=hit["_source"].get("text", ""), metadata=hit["_source"].get("metadata", {}), ) score = hit["_score"] results.append((doc, score)) return results def search_documents(self) -> list[Data]: """Search for documents in the vector store based on the search input. If no search input is provided, retrieve all documents. """ results = self.search(self.search_input) retrieved_data = [ Data( text=result["page_content"], file_path=result["metadata"].get("file_path", ""), ) for result in results ] self.status = retrieved_data return retrieved_data def get_retriever_kwargs(self): """Get the keyword arguments for the retriever.""" return { "search_type": self.search_type.lower(), "search_kwargs": { "k": self.number_of_results, "score_threshold": self.search_score_threshold, }, }