Spaces:
Running
Running
import numpy as np | |
from langchain_pinecone import Pinecone | |
from langflow.base.vectorstores.model import LCVectorStoreComponent, check_cached_vector_store | |
from langflow.helpers.data import docs_to_data | |
from langflow.io import DataInput, DropdownInput, HandleInput, IntInput, MultilineInput, SecretStrInput, StrInput | |
from langflow.schema import Data | |
class PineconeVectorStoreComponent(LCVectorStoreComponent): | |
display_name = "Pinecone" | |
description = "Pinecone Vector Store with search capabilities" | |
documentation = "https://python.langchain.com/v0.2/docs/integrations/vectorstores/pinecone/" | |
name = "Pinecone" | |
icon = "Pinecone" | |
inputs = [ | |
StrInput(name="index_name", display_name="Index Name", required=True), | |
StrInput(name="namespace", display_name="Namespace", info="Namespace for the index."), | |
DropdownInput( | |
name="distance_strategy", | |
display_name="Distance Strategy", | |
options=["Cosine", "Euclidean", "Dot Product"], | |
value="Cosine", | |
advanced=True, | |
), | |
SecretStrInput(name="pinecone_api_key", display_name="Pinecone API Key", required=True), | |
StrInput( | |
name="text_key", | |
display_name="Text Key", | |
info="Key in the record to use as text.", | |
value="text", | |
advanced=True, | |
), | |
MultilineInput(name="search_query", display_name="Search Query"), | |
DataInput( | |
name="ingest_data", | |
display_name="Ingest Data", | |
is_list=True, | |
), | |
HandleInput(name="embedding", display_name="Embedding", input_types=["Embeddings"]), | |
IntInput( | |
name="number_of_results", | |
display_name="Number of Results", | |
info="Number of results to return.", | |
value=4, | |
advanced=True, | |
), | |
] | |
def build_vector_store(self) -> Pinecone: | |
"""Build and return a Pinecone vector store instance.""" | |
try: | |
from langchain_pinecone._utilities import DistanceStrategy | |
# Wrap the embedding model to ensure float32 output | |
wrapped_embeddings = Float32Embeddings(self.embedding) | |
# Convert distance strategy | |
distance_strategy = self.distance_strategy.replace(" ", "_").upper() | |
distance_strategy = DistanceStrategy[distance_strategy] | |
# Initialize Pinecone instance with wrapped embeddings | |
pinecone = Pinecone( | |
index_name=self.index_name, | |
embedding=wrapped_embeddings, # Use wrapped embeddings | |
text_key=self.text_key, | |
namespace=self.namespace, | |
distance_strategy=distance_strategy, | |
pinecone_api_key=self.pinecone_api_key, | |
) | |
except Exception as e: | |
error_msg = "Error building Pinecone vector store" | |
raise ValueError(error_msg) from e | |
else: | |
# Process documents if any | |
documents = [] | |
if self.ingest_data: | |
for doc in self.ingest_data: | |
if isinstance(doc, Data): | |
documents.append(doc.to_lc_document()) | |
else: | |
documents.append(doc) | |
if documents: | |
pinecone.add_documents(documents) | |
return pinecone | |
def search_documents(self) -> list[Data]: | |
"""Search documents in the vector store.""" | |
try: | |
if not self.search_query or not isinstance(self.search_query, str) or not self.search_query.strip(): | |
return [] | |
vector_store = self.build_vector_store() | |
docs = vector_store.similarity_search( | |
query=self.search_query, | |
k=self.number_of_results, | |
) | |
except Exception as e: | |
error_msg = "Error searching documents" | |
raise ValueError(error_msg) from e | |
else: | |
data = docs_to_data(docs) | |
self.status = data | |
return data | |
class Float32Embeddings: | |
"""Wrapper class to ensure float32 embeddings.""" | |
def __init__(self, base_embeddings): | |
self.base_embeddings = base_embeddings | |
def embed_documents(self, texts): | |
embeddings = self.base_embeddings.embed_documents(texts) | |
if isinstance(embeddings, np.ndarray): | |
return [[self._force_float32(x) for x in vec] for vec in embeddings] | |
return [[self._force_float32(x) for x in vec] for vec in embeddings] | |
def embed_query(self, text): | |
embedding = self.base_embeddings.embed_query(text) | |
if isinstance(embedding, np.ndarray): | |
return [self._force_float32(x) for x in embedding] | |
return [self._force_float32(x) for x in embedding] | |
def _force_float32(self, value): | |
"""Convert any numeric type to Python float.""" | |
return float(np.float32(value)) | |