Tai Truong
fix readme
d202ada
import json
from typing import Any
from langchain_community.vectorstores import OpenSearchVectorSearch
from loguru import logger
from langflow.base.vectorstores.model import LCVectorStoreComponent, check_cached_vector_store
from langflow.io import (
BoolInput,
DataInput,
DropdownInput,
FloatInput,
HandleInput,
IntInput,
MultilineInput,
SecretStrInput,
StrInput,
)
from langflow.schema import Data
class OpenSearchVectorStoreComponent(LCVectorStoreComponent):
"""OpenSearch Vector Store with advanced, customizable search capabilities."""
display_name: str = "OpenSearch"
description: str = "OpenSearch Vector Store with advanced, customizable search capabilities."
documentation = "https://python.langchain.com/docs/integrations/vectorstores/opensearch"
name = "OpenSearch"
icon = "OpenSearch"
inputs = [
StrInput(
name="opensearch_url",
display_name="OpenSearch URL",
value="http://localhost:9200",
info="URL for OpenSearch cluster (e.g. https://192.168.1.1:9200).",
),
StrInput(
name="index_name",
display_name="Index Name",
value="langflow",
info="The index name where the vectors will be stored in OpenSearch cluster.",
),
MultilineInput(
name="search_input",
display_name="Search Input",
info=(
"Enter a search query. Leave empty to retrieve all documents. "
"If you need a more advanced search consider using Hybrid Search Query instead."
),
value="",
),
DataInput(
name="ingest_data",
display_name="Ingest Data",
is_list=True,
),
HandleInput(name="embedding", display_name="Embedding", input_types=["Embeddings"]),
DropdownInput(
name="search_type",
display_name="Search Type",
options=["similarity", "similarity_score_threshold", "mmr"],
value="similarity",
advanced=True,
),
IntInput(
name="number_of_results",
display_name="Number of Results",
info="Number of results to return.",
advanced=True,
value=4,
),
FloatInput(
name="search_score_threshold",
display_name="Search Score Threshold",
info="Minimum similarity score threshold for search results.",
value=0.0,
advanced=True,
),
StrInput(
name="username",
display_name="Username",
value="admin",
advanced=True,
),
SecretStrInput(
name="password",
display_name="Password",
value="admin",
advanced=True,
),
BoolInput(
name="use_ssl",
display_name="Use SSL",
value=True,
advanced=True,
),
BoolInput(
name="verify_certs",
display_name="Verify Certificates",
value=False,
advanced=True,
),
MultilineInput(
name="hybrid_search_query",
display_name="Hybrid Search Query",
value="",
advanced=True,
info=(
"Provide a custom hybrid search query in JSON format. This allows you to combine "
"vector similarity and keyword matching."
),
),
]
@check_cached_vector_store
def build_vector_store(self) -> OpenSearchVectorSearch:
"""Builds the OpenSearch Vector Store object."""
try:
from langchain_community.vectorstores import OpenSearchVectorSearch
except ImportError as e:
error_message = f"Failed to import required modules: {e}"
logger.exception(error_message)
raise ImportError(error_message) from e
try:
opensearch = OpenSearchVectorSearch(
index_name=self.index_name,
embedding_function=self.embedding,
opensearch_url=self.opensearch_url,
http_auth=(self.username, self.password),
use_ssl=self.use_ssl,
verify_certs=self.verify_certs,
ssl_assert_hostname=False,
ssl_show_warn=False,
)
except Exception as e:
error_message = f"Failed to create OpenSearchVectorSearch instance: {e}"
logger.exception(error_message)
raise RuntimeError(error_message) from e
if self.ingest_data:
self._add_documents_to_vector_store(opensearch)
return opensearch
def _add_documents_to_vector_store(self, vector_store: "OpenSearchVectorSearch") -> None:
"""Adds documents to the Vector Store."""
documents = []
for _input in self.ingest_data or []:
if isinstance(_input, Data):
documents.append(_input.to_lc_document())
else:
error_message = f"Expected Data object, got {type(_input)}"
logger.error(error_message)
raise TypeError(error_message)
if documents and self.embedding is not None:
logger.debug(f"Adding {len(documents)} documents to the Vector Store.")
try:
vector_store.add_documents(documents)
except Exception as e:
error_message = f"Error adding documents to Vector Store: {e}"
logger.exception(error_message)
raise RuntimeError(error_message) from e
else:
logger.debug("No documents to add to the Vector Store.")
def search(self, query: str | None = None) -> list[dict[str, Any]]:
"""Search for similar documents in the vector store or retrieve all documents if no query is provided."""
try:
vector_store = self.build_vector_store()
query = query or ""
if self.hybrid_search_query.strip():
try:
hybrid_query = json.loads(self.hybrid_search_query)
except json.JSONDecodeError as e:
error_message = f"Invalid hybrid search query JSON: {e}"
logger.exception(error_message)
raise ValueError(error_message) from e
results = vector_store.client.search(index=self.index_name, body=hybrid_query)
processed_results = []
for hit in results.get("hits", {}).get("hits", []):
source = hit.get("_source", {})
text = source.get("text", "")
metadata = source.get("metadata", {})
if isinstance(text, dict):
text = text.get("text", "")
processed_results.append(
{
"page_content": text,
"metadata": metadata,
}
)
return processed_results
search_kwargs = {"k": self.number_of_results}
search_type = self.search_type.lower()
if search_type == "similarity":
results = vector_store.similarity_search(query, **search_kwargs)
return [{"page_content": doc.page_content, "metadata": doc.metadata} for doc in results]
if search_type == "similarity_score_threshold":
search_kwargs["score_threshold"] = self.search_score_threshold
results = vector_store.similarity_search_with_relevance_scores(query, **search_kwargs)
return [
{
"page_content": doc.page_content,
"metadata": doc.metadata,
"score": score,
}
for doc, score in results
]
if search_type == "mmr":
results = vector_store.max_marginal_relevance_search(query, **search_kwargs)
return [{"page_content": doc.page_content, "metadata": doc.metadata} for doc in results]
except Exception as e:
error_message = f"Error during search: {e}"
logger.exception(error_message)
raise RuntimeError(error_message) from e
error_message = f"Error during search. Invalid search type: {self.search_type}"
logger.error(error_message)
raise ValueError(error_message)
def search_documents(self) -> list[Data]:
"""Search for documents in the vector store based on the search input.
If no search input is provided, retrieve all documents.
"""
try:
query = self.search_input.strip() if self.search_input else None
results = self.search(query)
retrieved_data = [
Data(
file_path=result["metadata"].get("file_path", ""),
text=result["page_content"],
)
for result in results
]
except Exception as e:
error_message = f"Error during document search: {e}"
logger.exception(error_message)
raise RuntimeError(error_message) from e
self.status = retrieved_data
return retrieved_data