Spaces:
Running
Running
from langflow.base.vectorstores.model import LCVectorStoreComponent, check_cached_vector_store | |
from langflow.helpers.data import docs_to_data | |
from langflow.io import ( | |
BoolInput, | |
DataInput, | |
DictInput, | |
DropdownInput, | |
FloatInput, | |
HandleInput, | |
IntInput, | |
MultilineInput, | |
SecretStrInput, | |
StrInput, | |
) | |
from langflow.schema import Data | |
class MilvusVectorStoreComponent(LCVectorStoreComponent): | |
"""Milvus vector store with search capabilities.""" | |
display_name: str = "Milvus" | |
description: str = "Milvus vector store with search capabilities" | |
documentation = "https://python.langchain.com/docs/integrations/vectorstores/milvus" | |
name = "Milvus" | |
icon = "Milvus" | |
inputs = [ | |
StrInput(name="collection_name", display_name="Collection Name", value="langflow"), | |
StrInput(name="collection_description", display_name="Collection Description", value=""), | |
StrInput( | |
name="uri", | |
display_name="Connection URI", | |
value="http://localhost:19530", | |
), | |
SecretStrInput( | |
name="password", | |
display_name="Token", | |
value="", | |
info="Ignore this field if no token is required to make connection.", | |
), | |
DictInput(name="connection_args", display_name="Other Connection Arguments", advanced=True), | |
StrInput(name="primary_field", display_name="Primary Field Name", value="pk"), | |
StrInput(name="text_field", display_name="Text Field Name", value="text"), | |
StrInput(name="vector_field", display_name="Vector Field Name", value="vector"), | |
DropdownInput( | |
name="consistency_level", | |
display_name="Consistencey Level", | |
options=["Bounded", "Session", "Strong", "Eventual"], | |
value="Session", | |
advanced=True, | |
), | |
DictInput(name="index_params", display_name="Index Parameters", advanced=True), | |
DictInput(name="search_params", display_name="Search Parameters", advanced=True), | |
BoolInput(name="drop_old", display_name="Drop Old Collection", value=False, advanced=True), | |
FloatInput(name="timeout", display_name="Timeout", advanced=True), | |
MultilineInput(name="search_query", display_name="Search Query"), | |
DataInput( | |
name="ingest_data", | |
display_name="Ingest Data", | |
is_list=True, | |
), | |
HandleInput(name="embedding", display_name="Embedding", input_types=["Embeddings"]), | |
IntInput( | |
name="number_of_results", | |
display_name="Number of Results", | |
info="Number of results to return.", | |
value=4, | |
advanced=True, | |
), | |
] | |
def build_vector_store(self): | |
try: | |
from langchain_milvus.vectorstores import Milvus as LangchainMilvus | |
except ImportError as e: | |
msg = "Could not import Milvus integration package. Please install it with `pip install langchain-milvus`." | |
raise ImportError(msg) from e | |
self.connection_args.update(uri=self.uri, token=self.password) | |
milvus_store = LangchainMilvus( | |
embedding_function=self.embedding, | |
collection_name=self.collection_name, | |
collection_description=self.collection_description, | |
connection_args=self.connection_args, | |
consistency_level=self.consistency_level, | |
index_params=self.index_params, | |
search_params=self.search_params, | |
drop_old=self.drop_old, | |
auto_id=True, | |
primary_field=self.primary_field, | |
text_field=self.text_field, | |
vector_field=self.vector_field, | |
timeout=self.timeout, | |
) | |
documents = [] | |
for _input in self.ingest_data or []: | |
if isinstance(_input, Data): | |
documents.append(_input.to_lc_document()) | |
else: | |
documents.append(_input) | |
if documents: | |
milvus_store.add_documents(documents) | |
return milvus_store | |
def search_documents(self) -> list[Data]: | |
vector_store = self.build_vector_store() | |
if self.search_query and isinstance(self.search_query, str) and self.search_query.strip(): | |
docs = vector_store.similarity_search( | |
query=self.search_query, | |
k=self.number_of_results, | |
) | |
data = docs_to_data(docs) | |
self.status = data | |
return data | |
return [] | |