import json from typing import Any from langchain_community.vectorstores import OpenSearchVectorSearch from loguru import logger from langflow.base.vectorstores.model import LCVectorStoreComponent, check_cached_vector_store from langflow.io import ( BoolInput, DataInput, DropdownInput, FloatInput, HandleInput, IntInput, MultilineInput, SecretStrInput, StrInput, ) from langflow.schema import Data class OpenSearchVectorStoreComponent(LCVectorStoreComponent): """OpenSearch Vector Store with advanced, customizable search capabilities.""" display_name: str = "OpenSearch" description: str = "OpenSearch Vector Store with advanced, customizable search capabilities." documentation = "https://python.langchain.com/docs/integrations/vectorstores/opensearch" name = "OpenSearch" icon = "OpenSearch" inputs = [ StrInput( name="opensearch_url", display_name="OpenSearch URL", value="http://localhost:9200", info="URL for OpenSearch cluster (e.g. https://192.168.1.1:9200).", ), StrInput( name="index_name", display_name="Index Name", value="langflow", info="The index name where the vectors will be stored in OpenSearch cluster.", ), MultilineInput( name="search_input", display_name="Search Input", info=( "Enter a search query. Leave empty to retrieve all documents. " "If you need a more advanced search consider using Hybrid Search Query instead." ), value="", ), DataInput( name="ingest_data", display_name="Ingest Data", is_list=True, ), HandleInput(name="embedding", display_name="Embedding", input_types=["Embeddings"]), DropdownInput( name="search_type", display_name="Search Type", options=["similarity", "similarity_score_threshold", "mmr"], value="similarity", advanced=True, ), IntInput( name="number_of_results", display_name="Number of Results", info="Number of results to return.", advanced=True, value=4, ), FloatInput( name="search_score_threshold", display_name="Search Score Threshold", info="Minimum similarity score threshold for search results.", value=0.0, advanced=True, ), StrInput( name="username", display_name="Username", value="admin", advanced=True, ), SecretStrInput( name="password", display_name="Password", value="admin", advanced=True, ), BoolInput( name="use_ssl", display_name="Use SSL", value=True, advanced=True, ), BoolInput( name="verify_certs", display_name="Verify Certificates", value=False, advanced=True, ), MultilineInput( name="hybrid_search_query", display_name="Hybrid Search Query", value="", advanced=True, info=( "Provide a custom hybrid search query in JSON format. This allows you to combine " "vector similarity and keyword matching." ), ), ] @check_cached_vector_store def build_vector_store(self) -> OpenSearchVectorSearch: """Builds the OpenSearch Vector Store object.""" try: from langchain_community.vectorstores import OpenSearchVectorSearch except ImportError as e: error_message = f"Failed to import required modules: {e}" logger.exception(error_message) raise ImportError(error_message) from e try: opensearch = OpenSearchVectorSearch( index_name=self.index_name, embedding_function=self.embedding, opensearch_url=self.opensearch_url, http_auth=(self.username, self.password), use_ssl=self.use_ssl, verify_certs=self.verify_certs, ssl_assert_hostname=False, ssl_show_warn=False, ) except Exception as e: error_message = f"Failed to create OpenSearchVectorSearch instance: {e}" logger.exception(error_message) raise RuntimeError(error_message) from e if self.ingest_data: self._add_documents_to_vector_store(opensearch) return opensearch def _add_documents_to_vector_store(self, vector_store: "OpenSearchVectorSearch") -> None: """Adds documents to the Vector Store.""" documents = [] for _input in self.ingest_data or []: if isinstance(_input, Data): documents.append(_input.to_lc_document()) else: error_message = f"Expected Data object, got {type(_input)}" logger.error(error_message) raise TypeError(error_message) if documents and self.embedding is not None: logger.debug(f"Adding {len(documents)} documents to the Vector Store.") try: vector_store.add_documents(documents) except Exception as e: error_message = f"Error adding documents to Vector Store: {e}" logger.exception(error_message) raise RuntimeError(error_message) from e else: logger.debug("No documents to add to the Vector Store.") def search(self, query: str | None = None) -> list[dict[str, Any]]: """Search for similar documents in the vector store or retrieve all documents if no query is provided.""" try: vector_store = self.build_vector_store() query = query or "" if self.hybrid_search_query.strip(): try: hybrid_query = json.loads(self.hybrid_search_query) except json.JSONDecodeError as e: error_message = f"Invalid hybrid search query JSON: {e}" logger.exception(error_message) raise ValueError(error_message) from e results = vector_store.client.search(index=self.index_name, body=hybrid_query) processed_results = [] for hit in results.get("hits", {}).get("hits", []): source = hit.get("_source", {}) text = source.get("text", "") metadata = source.get("metadata", {}) if isinstance(text, dict): text = text.get("text", "") processed_results.append( { "page_content": text, "metadata": metadata, } ) return processed_results search_kwargs = {"k": self.number_of_results} search_type = self.search_type.lower() if search_type == "similarity": results = vector_store.similarity_search(query, **search_kwargs) return [{"page_content": doc.page_content, "metadata": doc.metadata} for doc in results] if search_type == "similarity_score_threshold": search_kwargs["score_threshold"] = self.search_score_threshold results = vector_store.similarity_search_with_relevance_scores(query, **search_kwargs) return [ { "page_content": doc.page_content, "metadata": doc.metadata, "score": score, } for doc, score in results ] if search_type == "mmr": results = vector_store.max_marginal_relevance_search(query, **search_kwargs) return [{"page_content": doc.page_content, "metadata": doc.metadata} for doc in results] except Exception as e: error_message = f"Error during search: {e}" logger.exception(error_message) raise RuntimeError(error_message) from e error_message = f"Error during search. Invalid search type: {self.search_type}" logger.error(error_message) raise ValueError(error_message) def search_documents(self) -> list[Data]: """Search for documents in the vector store based on the search input. If no search input is provided, retrieve all documents. """ try: query = self.search_input.strip() if self.search_input else None results = self.search(query) retrieved_data = [ Data( file_path=result["metadata"].get("file_path", ""), text=result["page_content"], ) for result in results ] except Exception as e: error_message = f"Error during document search: {e}" logger.exception(error_message) raise RuntimeError(error_message) from e self.status = retrieved_data return retrieved_data