Tai Truong
fix readme
d202ada
from typing import Any
from langchain.schema import Document
from langchain_elasticsearch import ElasticsearchStore
from loguru import logger
from langflow.base.vectorstores.model import LCVectorStoreComponent, check_cached_vector_store
from langflow.io import (
DataInput,
DropdownInput,
FloatInput,
HandleInput,
IntInput,
MultilineInput,
SecretStrInput,
StrInput,
)
from langflow.schema import Data
class ElasticsearchVectorStoreComponent(LCVectorStoreComponent):
"""Elasticsearch Vector Store with with advanced, customizable search capabilities."""
display_name: str = "Elasticsearch"
description: str = "Elasticsearch Vector Store with with advanced, customizable search capabilities."
documentation = "https://python.langchain.com/docs/integrations/vectorstores/elasticsearch"
name = "Elasticsearch"
icon = "ElasticsearchStore"
inputs = [
StrInput(
name="elasticsearch_url",
display_name="Elasticsearch URL",
value="http://localhost:9200",
info="URL for self-managed Elasticsearch deployments (e.g., http://localhost:9200). "
"Do not use with Elastic Cloud deployments, use Elastic Cloud ID instead.",
),
SecretStrInput(
name="cloud_id",
display_name="Elastic Cloud ID",
value="",
info="Use this for Elastic Cloud deployments. Do not use together with 'Elasticsearch URL'.",
),
StrInput(
name="index_name",
display_name="Index Name",
value="langflow",
info="The index name where the vectors will be stored in Elasticsearch cluster.",
),
MultilineInput(
name="search_input",
display_name="Search Input",
info="Enter a search query. Leave empty to retrieve all documents.",
),
StrInput(
name="username",
display_name="Username",
value="",
advanced=False,
info=(
"Elasticsearch username (e.g., 'elastic'). "
"Required for both local and Elastic Cloud setups unless API keys are used."
),
),
SecretStrInput(
name="password",
display_name="Password",
value="",
advanced=False,
info=(
"Elasticsearch password for the specified user. "
"Required for both local and Elastic Cloud setups unless API keys are used."
),
),
DataInput(
name="ingest_data",
display_name="Ingest Data",
is_list=True,
),
HandleInput(
name="embedding",
display_name="Embedding",
input_types=["Embeddings"],
),
DropdownInput(
name="search_type",
display_name="Search Type",
options=["similarity", "mmr"],
value="similarity",
advanced=True,
),
IntInput(
name="number_of_results",
display_name="Number of Results",
info="Number of results to return.",
advanced=True,
value=4,
),
FloatInput(
name="search_score_threshold",
display_name="Search Score Threshold",
info="Minimum similarity score threshold for search results.",
value=0.0,
advanced=True,
),
SecretStrInput(
name="api_key",
display_name="Elastic API Key",
value="",
advanced=True,
info="API Key for Elastic Cloud authentication. If used, 'username' and 'password' are not required.",
),
]
@check_cached_vector_store
def build_vector_store(self) -> ElasticsearchStore:
"""Builds the Elasticsearch Vector Store object."""
if self.cloud_id and self.elasticsearch_url:
msg = (
"Both 'cloud_id' and 'elasticsearch_url' provided. "
"Please use only one based on your deployment (Cloud or Local)."
)
raise ValueError(msg)
es_params = {
"index_name": self.index_name,
"embedding": self.embedding,
"es_user": self.username or None,
"es_password": self.password or None,
}
if self.cloud_id:
es_params["es_cloud_id"] = self.cloud_id
else:
es_params["es_url"] = self.elasticsearch_url
if self.api_key:
es_params["api_key"] = self.api_key
elasticsearch = ElasticsearchStore(**es_params)
# If documents are provided, add them to the store
if self.ingest_data:
documents = self._prepare_documents()
if documents:
elasticsearch.add_documents(documents)
return elasticsearch
def _prepare_documents(self) -> list[Document]:
"""Prepares documents from the input data to add to the vector store."""
documents = []
for data in self.ingest_data:
if isinstance(data, Data):
documents.append(data.to_lc_document())
else:
error_message = "Vector Store Inputs must be Data objects."
logger.error(error_message)
raise TypeError(error_message)
return documents
def _add_documents_to_vector_store(self, vector_store: "ElasticsearchStore") -> None:
"""Adds documents to the Vector Store."""
documents = self._prepare_documents()
if documents and self.embedding:
logger.debug(f"Adding {len(documents)} documents to the Vector Store.")
vector_store.add_documents(documents)
else:
logger.debug("No documents to add to the Vector Store.")
def search(self, query: str | None = None) -> list[dict[str, Any]]:
"""Search for similar documents in the vector store or retrieve all documents if no query is provided."""
vector_store = self.build_vector_store()
search_kwargs = {
"k": self.number_of_results,
"score_threshold": self.search_score_threshold,
}
if query:
search_type = self.search_type.lower()
if search_type not in {"similarity", "mmr"}:
msg = f"Invalid search type: {self.search_type}"
logger.error(msg)
raise ValueError(msg)
try:
if search_type == "similarity":
results = vector_store.similarity_search_with_score(query, **search_kwargs)
elif search_type == "mmr":
results = vector_store.max_marginal_relevance_search(query, **search_kwargs)
except Exception as e:
msg = (
"Error occurred while querying the Elasticsearch VectorStore,"
" there is no Data into the VectorStore."
)
logger.exception(msg)
raise ValueError(msg) from e
return [
{"page_content": doc.page_content, "metadata": doc.metadata, "score": score} for doc, score in results
]
results = self.get_all_documents(vector_store, **search_kwargs)
return [{"page_content": doc.page_content, "metadata": doc.metadata, "score": score} for doc, score in results]
def get_all_documents(self, vector_store: ElasticsearchStore, **kwargs) -> list[tuple[Document, float]]:
"""Retrieve all documents from the vector store."""
client = vector_store.client
index_name = self.index_name
query = {
"query": {"match_all": {}},
"size": kwargs.get("k", self.number_of_results),
}
response = client.search(index=index_name, body=query)
results = []
for hit in response["hits"]["hits"]:
doc = Document(
page_content=hit["_source"].get("text", ""),
metadata=hit["_source"].get("metadata", {}),
)
score = hit["_score"]
results.append((doc, score))
return results
def search_documents(self) -> list[Data]:
"""Search for documents in the vector store based on the search input.
If no search input is provided, retrieve all documents.
"""
results = self.search(self.search_input)
retrieved_data = [
Data(
text=result["page_content"],
file_path=result["metadata"].get("file_path", ""),
)
for result in results
]
self.status = retrieved_data
return retrieved_data
def get_retriever_kwargs(self):
"""Get the keyword arguments for the retriever."""
return {
"search_type": self.search_type.lower(),
"search_kwargs": {
"k": self.number_of_results,
"score_threshold": self.search_score_threshold,
},
}