Spaces:
Running
Running
import json | |
from typing import Any | |
from langchain_community.vectorstores import OpenSearchVectorSearch | |
from loguru import logger | |
from langflow.base.vectorstores.model import LCVectorStoreComponent, check_cached_vector_store | |
from langflow.io import ( | |
BoolInput, | |
DataInput, | |
DropdownInput, | |
FloatInput, | |
HandleInput, | |
IntInput, | |
MultilineInput, | |
SecretStrInput, | |
StrInput, | |
) | |
from langflow.schema import Data | |
class OpenSearchVectorStoreComponent(LCVectorStoreComponent): | |
"""OpenSearch Vector Store with advanced, customizable search capabilities.""" | |
display_name: str = "OpenSearch" | |
description: str = "OpenSearch Vector Store with advanced, customizable search capabilities." | |
documentation = "https://python.langchain.com/docs/integrations/vectorstores/opensearch" | |
name = "OpenSearch" | |
icon = "OpenSearch" | |
inputs = [ | |
StrInput( | |
name="opensearch_url", | |
display_name="OpenSearch URL", | |
value="http://localhost:9200", | |
info="URL for OpenSearch cluster (e.g. https://192.168.1.1:9200).", | |
), | |
StrInput( | |
name="index_name", | |
display_name="Index Name", | |
value="langflow", | |
info="The index name where the vectors will be stored in OpenSearch cluster.", | |
), | |
MultilineInput( | |
name="search_input", | |
display_name="Search Input", | |
info=( | |
"Enter a search query. Leave empty to retrieve all documents. " | |
"If you need a more advanced search consider using Hybrid Search Query instead." | |
), | |
value="", | |
), | |
DataInput( | |
name="ingest_data", | |
display_name="Ingest Data", | |
is_list=True, | |
), | |
HandleInput(name="embedding", display_name="Embedding", input_types=["Embeddings"]), | |
DropdownInput( | |
name="search_type", | |
display_name="Search Type", | |
options=["similarity", "similarity_score_threshold", "mmr"], | |
value="similarity", | |
advanced=True, | |
), | |
IntInput( | |
name="number_of_results", | |
display_name="Number of Results", | |
info="Number of results to return.", | |
advanced=True, | |
value=4, | |
), | |
FloatInput( | |
name="search_score_threshold", | |
display_name="Search Score Threshold", | |
info="Minimum similarity score threshold for search results.", | |
value=0.0, | |
advanced=True, | |
), | |
StrInput( | |
name="username", | |
display_name="Username", | |
value="admin", | |
advanced=True, | |
), | |
SecretStrInput( | |
name="password", | |
display_name="Password", | |
value="admin", | |
advanced=True, | |
), | |
BoolInput( | |
name="use_ssl", | |
display_name="Use SSL", | |
value=True, | |
advanced=True, | |
), | |
BoolInput( | |
name="verify_certs", | |
display_name="Verify Certificates", | |
value=False, | |
advanced=True, | |
), | |
MultilineInput( | |
name="hybrid_search_query", | |
display_name="Hybrid Search Query", | |
value="", | |
advanced=True, | |
info=( | |
"Provide a custom hybrid search query in JSON format. This allows you to combine " | |
"vector similarity and keyword matching." | |
), | |
), | |
] | |
def build_vector_store(self) -> OpenSearchVectorSearch: | |
"""Builds the OpenSearch Vector Store object.""" | |
try: | |
from langchain_community.vectorstores import OpenSearchVectorSearch | |
except ImportError as e: | |
error_message = f"Failed to import required modules: {e}" | |
logger.exception(error_message) | |
raise ImportError(error_message) from e | |
try: | |
opensearch = OpenSearchVectorSearch( | |
index_name=self.index_name, | |
embedding_function=self.embedding, | |
opensearch_url=self.opensearch_url, | |
http_auth=(self.username, self.password), | |
use_ssl=self.use_ssl, | |
verify_certs=self.verify_certs, | |
ssl_assert_hostname=False, | |
ssl_show_warn=False, | |
) | |
except Exception as e: | |
error_message = f"Failed to create OpenSearchVectorSearch instance: {e}" | |
logger.exception(error_message) | |
raise RuntimeError(error_message) from e | |
if self.ingest_data: | |
self._add_documents_to_vector_store(opensearch) | |
return opensearch | |
def _add_documents_to_vector_store(self, vector_store: "OpenSearchVectorSearch") -> None: | |
"""Adds documents to the Vector Store.""" | |
documents = [] | |
for _input in self.ingest_data or []: | |
if isinstance(_input, Data): | |
documents.append(_input.to_lc_document()) | |
else: | |
error_message = f"Expected Data object, got {type(_input)}" | |
logger.error(error_message) | |
raise TypeError(error_message) | |
if documents and self.embedding is not None: | |
logger.debug(f"Adding {len(documents)} documents to the Vector Store.") | |
try: | |
vector_store.add_documents(documents) | |
except Exception as e: | |
error_message = f"Error adding documents to Vector Store: {e}" | |
logger.exception(error_message) | |
raise RuntimeError(error_message) from e | |
else: | |
logger.debug("No documents to add to the Vector Store.") | |
def search(self, query: str | None = None) -> list[dict[str, Any]]: | |
"""Search for similar documents in the vector store or retrieve all documents if no query is provided.""" | |
try: | |
vector_store = self.build_vector_store() | |
query = query or "" | |
if self.hybrid_search_query.strip(): | |
try: | |
hybrid_query = json.loads(self.hybrid_search_query) | |
except json.JSONDecodeError as e: | |
error_message = f"Invalid hybrid search query JSON: {e}" | |
logger.exception(error_message) | |
raise ValueError(error_message) from e | |
results = vector_store.client.search(index=self.index_name, body=hybrid_query) | |
processed_results = [] | |
for hit in results.get("hits", {}).get("hits", []): | |
source = hit.get("_source", {}) | |
text = source.get("text", "") | |
metadata = source.get("metadata", {}) | |
if isinstance(text, dict): | |
text = text.get("text", "") | |
processed_results.append( | |
{ | |
"page_content": text, | |
"metadata": metadata, | |
} | |
) | |
return processed_results | |
search_kwargs = {"k": self.number_of_results} | |
search_type = self.search_type.lower() | |
if search_type == "similarity": | |
results = vector_store.similarity_search(query, **search_kwargs) | |
return [{"page_content": doc.page_content, "metadata": doc.metadata} for doc in results] | |
if search_type == "similarity_score_threshold": | |
search_kwargs["score_threshold"] = self.search_score_threshold | |
results = vector_store.similarity_search_with_relevance_scores(query, **search_kwargs) | |
return [ | |
{ | |
"page_content": doc.page_content, | |
"metadata": doc.metadata, | |
"score": score, | |
} | |
for doc, score in results | |
] | |
if search_type == "mmr": | |
results = vector_store.max_marginal_relevance_search(query, **search_kwargs) | |
return [{"page_content": doc.page_content, "metadata": doc.metadata} for doc in results] | |
except Exception as e: | |
error_message = f"Error during search: {e}" | |
logger.exception(error_message) | |
raise RuntimeError(error_message) from e | |
error_message = f"Error during search. Invalid search type: {self.search_type}" | |
logger.error(error_message) | |
raise ValueError(error_message) | |
def search_documents(self) -> list[Data]: | |
"""Search for documents in the vector store based on the search input. | |
If no search input is provided, retrieve all documents. | |
""" | |
try: | |
query = self.search_input.strip() if self.search_input else None | |
results = self.search(query) | |
retrieved_data = [ | |
Data( | |
file_path=result["metadata"].get("file_path", ""), | |
text=result["page_content"], | |
) | |
for result in results | |
] | |
except Exception as e: | |
error_message = f"Error during document search: {e}" | |
logger.exception(error_message) | |
raise RuntimeError(error_message) from e | |
self.status = retrieved_data | |
return retrieved_data | |