File size: 7,518 Bytes
cfd3735
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
import importlib
import os
import uuid
from typing import List

import pinecone
import pytest

from langchain.docstore.document import Document
from langchain.embeddings import OpenAIEmbeddings
from langchain.vectorstores.pinecone import Pinecone

index_name = "langchain-test-index"  # name of the index
namespace_name = "langchain-test-namespace"  # name of the namespace
dimension = 1536  # dimension of the embeddings


def reset_pinecone() -> None:
    assert os.environ.get("PINECONE_API_KEY") is not None
    assert os.environ.get("PINECONE_ENVIRONMENT") is not None

    import pinecone

    importlib.reload(pinecone)

    pinecone.init(
        api_key=os.environ.get("PINECONE_API_KEY"),
        environment=os.environ.get("PINECONE_ENVIRONMENT"),
    )


class TestPinecone:
    index: pinecone.Index

    @classmethod
    def setup_class(cls) -> None:
        reset_pinecone()

        cls.index = pinecone.Index(index_name)

        if index_name in pinecone.list_indexes():
            index_stats = cls.index.describe_index_stats()
            if index_stats["dimension"] == dimension:
                # delete all the vectors in the index if the dimension is the same
                # from all namespaces
                index_stats = cls.index.describe_index_stats()
                for _namespace_name in index_stats["namespaces"].keys():
                    cls.index.delete(delete_all=True, namespace=_namespace_name)

            else:
                pinecone.delete_index(index_name)
                pinecone.create_index(name=index_name, dimension=dimension)
        else:
            pinecone.create_index(name=index_name, dimension=dimension)

        # insure the index is empty
        index_stats = cls.index.describe_index_stats()
        assert index_stats["dimension"] == dimension
        if index_stats["namespaces"].get(namespace_name) is not None:
            assert index_stats["namespaces"][namespace_name]["vector_count"] == 0

    @classmethod
    def teardown_class(cls) -> None:
        index_stats = cls.index.describe_index_stats()
        for _namespace_name in index_stats["namespaces"].keys():
            cls.index.delete(delete_all=True, namespace=_namespace_name)

        reset_pinecone()

    @pytest.fixture(autouse=True)
    def setup(self) -> None:
        # delete all the vectors in the index
        index_stats = self.index.describe_index_stats()
        for _namespace_name in index_stats["namespaces"].keys():
            self.index.delete(delete_all=True, namespace=_namespace_name)

        reset_pinecone()

    @pytest.mark.vcr()
    def test_from_texts(
        self, texts: List[str], embedding_openai: OpenAIEmbeddings
    ) -> None:
        """Test end to end construction and search."""
        unique_id = uuid.uuid4().hex
        needs = f"foobuu {unique_id} booo"
        texts.insert(0, needs)

        docsearch = Pinecone.from_texts(
            texts=texts,
            embedding=embedding_openai,
            index_name=index_name,
            namespace=namespace_name,
        )
        output = docsearch.similarity_search(unique_id, k=1, namespace=namespace_name)
        assert output == [Document(page_content=needs)]

    @pytest.mark.vcr()
    def test_from_texts_with_metadatas(
        self, texts: List[str], embedding_openai: OpenAIEmbeddings
    ) -> None:
        """Test end to end construction and search."""

        unique_id = uuid.uuid4().hex
        needs = f"foobuu {unique_id} booo"
        texts.insert(0, needs)

        metadatas = [{"page": i} for i in range(len(texts))]
        docsearch = Pinecone.from_texts(
            texts,
            embedding_openai,
            index_name=index_name,
            metadatas=metadatas,
            namespace=namespace_name,
        )
        output = docsearch.similarity_search(needs, k=1, namespace=namespace_name)

        # TODO: why metadata={"page": 0.0}) instead of {"page": 0}?
        assert output == [Document(page_content=needs, metadata={"page": 0.0})]

    @pytest.mark.vcr()
    def test_from_texts_with_scores(self, embedding_openai: OpenAIEmbeddings) -> None:
        """Test end to end construction and search with scores and IDs."""
        texts = ["foo", "bar", "baz"]
        metadatas = [{"page": i} for i in range(len(texts))]
        docsearch = Pinecone.from_texts(
            texts,
            embedding_openai,
            index_name=index_name,
            metadatas=metadatas,
            namespace=namespace_name,
        )
        output = docsearch.similarity_search_with_score(
            "foo", k=3, namespace=namespace_name
        )
        docs = [o[0] for o in output]
        scores = [o[1] for o in output]
        sorted_documents = sorted(docs, key=lambda x: x.metadata["page"])

        # TODO: why metadata={"page": 0.0}) instead of {"page": 0}, etc???
        assert sorted_documents == [
            Document(page_content="foo", metadata={"page": 0.0}),
            Document(page_content="bar", metadata={"page": 1.0}),
            Document(page_content="baz", metadata={"page": 2.0}),
        ]
        assert scores[0] > scores[1] > scores[2]

    def test_from_existing_index_with_namespaces(
        self, embedding_openai: OpenAIEmbeddings
    ) -> None:
        """Test that namespaces are properly handled."""
        # Create two indexes with the same name but different namespaces
        texts_1 = ["foo", "bar", "baz"]
        metadatas = [{"page": i} for i in range(len(texts_1))]
        Pinecone.from_texts(
            texts_1,
            embedding_openai,
            index_name=index_name,
            metadatas=metadatas,
            namespace=f"{index_name}-1",
        )

        texts_2 = ["foo2", "bar2", "baz2"]
        metadatas = [{"page": i} for i in range(len(texts_2))]

        Pinecone.from_texts(
            texts_2,
            embedding_openai,
            index_name=index_name,
            metadatas=metadatas,
            namespace=f"{index_name}-2",
        )

        # Search with namespace
        docsearch = Pinecone.from_existing_index(
            index_name=index_name,
            embedding=embedding_openai,
            namespace=f"{index_name}-1",
        )
        output = docsearch.similarity_search("foo", k=20, namespace=f"{index_name}-1")
        # check that we don't get results from the other namespace
        page_contents = sorted(set([o.page_content for o in output]))
        assert all(content in ["foo", "bar", "baz"] for content in page_contents)
        assert all(content not in ["foo2", "bar2", "baz2"] for content in page_contents)

    def test_add_documents_with_ids(
        self, texts: List[str], embedding_openai: OpenAIEmbeddings
    ) -> None:
        ids = [uuid.uuid4().hex for _ in range(len(texts))]
        Pinecone.from_texts(
            texts=texts,
            ids=ids,
            embedding=embedding_openai,
            index_name=index_name,
            namespace=index_name,
        )
        index_stats = self.index.describe_index_stats()
        assert index_stats["namespaces"][index_name]["vector_count"] == len(texts)

        ids_1 = [uuid.uuid4().hex for _ in range(len(texts))]
        Pinecone.from_texts(
            texts=texts,
            ids=ids_1,
            embedding=embedding_openai,
            index_name=index_name,
            namespace=index_name,
        )
        index_stats = self.index.describe_index_stats()
        assert index_stats["namespaces"][index_name]["vector_count"] == len(texts) * 2