Spaces:
Running
Running
from datetime import timedelta | |
from langchain_community.vectorstores import CouchbaseVectorStore | |
from langflow.base.vectorstores.model import LCVectorStoreComponent, check_cached_vector_store | |
from langflow.helpers.data import docs_to_data | |
from langflow.io import DataInput, HandleInput, IntInput, MultilineInput, SecretStrInput, StrInput | |
from langflow.schema import Data | |
class CouchbaseVectorStoreComponent(LCVectorStoreComponent): | |
display_name = "Couchbase" | |
description = "Couchbase Vector Store with search capabilities" | |
documentation = "https://python.langchain.com/v0.1/docs/integrations/document_loaders/couchbase/" | |
name = "Couchbase" | |
icon = "Couchbase" | |
inputs = [ | |
SecretStrInput( | |
name="couchbase_connection_string", display_name="Couchbase Cluster connection string", required=True | |
), | |
StrInput(name="couchbase_username", display_name="Couchbase username", required=True), | |
SecretStrInput(name="couchbase_password", display_name="Couchbase password", required=True), | |
StrInput(name="bucket_name", display_name="Bucket Name", required=True), | |
StrInput(name="scope_name", display_name="Scope Name", required=True), | |
StrInput(name="collection_name", display_name="Collection Name", required=True), | |
StrInput(name="index_name", display_name="Index Name", required=True), | |
MultilineInput(name="search_query", display_name="Search Query"), | |
DataInput( | |
name="ingest_data", | |
display_name="Ingest Data", | |
is_list=True, | |
), | |
HandleInput(name="embedding", display_name="Embedding", input_types=["Embeddings"]), | |
IntInput( | |
name="number_of_results", | |
display_name="Number of Results", | |
info="Number of results to return.", | |
value=4, | |
advanced=True, | |
), | |
] | |
def build_vector_store(self) -> CouchbaseVectorStore: | |
try: | |
from couchbase.auth import PasswordAuthenticator | |
from couchbase.cluster import Cluster | |
from couchbase.options import ClusterOptions | |
except ImportError as e: | |
msg = "Failed to import Couchbase dependencies. Install it using `pip install langflow[couchbase] --pre`" | |
raise ImportError(msg) from e | |
try: | |
auth = PasswordAuthenticator(self.couchbase_username, self.couchbase_password) | |
options = ClusterOptions(auth) | |
cluster = Cluster(self.couchbase_connection_string, options) | |
cluster.wait_until_ready(timedelta(seconds=5)) | |
except Exception as e: | |
msg = f"Failed to connect to Couchbase: {e}" | |
raise ValueError(msg) from e | |
documents = [] | |
for _input in self.ingest_data or []: | |
if isinstance(_input, Data): | |
documents.append(_input.to_lc_document()) | |
else: | |
documents.append(_input) | |
if documents: | |
couchbase_vs = CouchbaseVectorStore.from_documents( | |
documents=documents, | |
cluster=cluster, | |
bucket_name=self.bucket_name, | |
scope_name=self.scope_name, | |
collection_name=self.collection_name, | |
embedding=self.embedding, | |
index_name=self.index_name, | |
) | |
else: | |
couchbase_vs = CouchbaseVectorStore( | |
cluster=cluster, | |
bucket_name=self.bucket_name, | |
scope_name=self.scope_name, | |
collection_name=self.collection_name, | |
embedding=self.embedding, | |
index_name=self.index_name, | |
) | |
return couchbase_vs | |
def search_documents(self) -> list[Data]: | |
vector_store = self.build_vector_store() | |
if self.search_query and isinstance(self.search_query, str) and self.search_query.strip(): | |
docs = vector_store.similarity_search( | |
query=self.search_query, | |
k=self.number_of_results, | |
) | |
data = docs_to_data(docs) | |
self.status = data | |
return data | |
return [] | |