Tai Truong
fix readme
d202ada
from typing import TYPE_CHECKING
from langchain_community.vectorstores import Vectara
from loguru import logger
from langflow.base.vectorstores.model import LCVectorStoreComponent, check_cached_vector_store
from langflow.helpers.data import docs_to_data
from langflow.io import HandleInput, IntInput, MessageTextInput, SecretStrInput, StrInput
from langflow.schema import Data
if TYPE_CHECKING:
from langchain_community.vectorstores import Vectara
class VectaraVectorStoreComponent(LCVectorStoreComponent):
"""Vectara Vector Store with search capabilities."""
display_name: str = "Vectara"
description: str = "Vectara Vector Store with search capabilities"
documentation = "https://python.langchain.com/docs/modules/data_connection/vectorstores/integrations/vectara"
name = "Vectara"
icon = "Vectara"
inputs = [
StrInput(name="vectara_customer_id", display_name="Vectara Customer ID", required=True),
StrInput(name="vectara_corpus_id", display_name="Vectara Corpus ID", required=True),
SecretStrInput(name="vectara_api_key", display_name="Vectara API Key", required=True),
HandleInput(
name="embedding",
display_name="Embedding",
input_types=["Embeddings"],
),
HandleInput(
name="ingest_data",
display_name="Ingest Data",
input_types=["Document", "Data"],
is_list=True,
),
MessageTextInput(
name="search_query",
display_name="Search Query",
),
IntInput(
name="number_of_results",
display_name="Number of Results",
info="Number of results to return.",
value=4,
advanced=True,
),
]
@check_cached_vector_store
def build_vector_store(self) -> "Vectara":
"""Builds the Vectara object."""
try:
from langchain_community.vectorstores import Vectara
except ImportError as e:
msg = "Could not import Vectara. Please install it with `pip install langchain-community`."
raise ImportError(msg) from e
vectara = Vectara(
vectara_customer_id=self.vectara_customer_id,
vectara_corpus_id=self.vectara_corpus_id,
vectara_api_key=self.vectara_api_key,
)
self._add_documents_to_vector_store(vectara)
return vectara
def _add_documents_to_vector_store(self, vector_store: "Vectara") -> None:
"""Adds documents to the Vector Store."""
if not self.ingest_data:
self.status = "No documents to add to Vectara"
return
documents = []
for _input in self.ingest_data or []:
if isinstance(_input, Data):
documents.append(_input.to_lc_document())
else:
documents.append(_input)
if documents:
logger.debug(f"Adding {len(documents)} documents to Vectara.")
vector_store.add_documents(documents)
self.status = f"Added {len(documents)} documents to Vectara"
else:
logger.debug("No documents to add to Vectara.")
self.status = "No valid documents to add to Vectara"
def search_documents(self) -> list[Data]:
vector_store = self.build_vector_store()
if self.search_query and isinstance(self.search_query, str) and self.search_query.strip():
docs = vector_store.similarity_search(
query=self.search_query,
k=self.number_of_results,
)
data = docs_to_data(docs)
self.status = f"Found {len(data)} results for the query: {self.search_query}"
return data
self.status = "No search query provided"
return []