ArneBinder commited on
Commit
904cd26
·
verified ·
1 Parent(s): 12537b9

delete outdated files

Browse files
Files changed (1) hide show
  1. vector_store.py +0 -301
vector_store.py DELETED
@@ -1,301 +0,0 @@
1
- import abc
2
- import json
3
- import os
4
- from typing import Any, Generic, List, Optional, Tuple, TypeVar
5
-
6
- import numpy as np
7
- from qdrant_client import QdrantClient
8
- from qdrant_client.models import Distance, PointStruct, VectorParams
9
-
10
- T = TypeVar("T", bound=dict[str, Any])
11
- E = TypeVar("E")
12
-
13
-
14
- class VectorStore(Generic[T, E], abc.ABC):
15
- """Abstract base class for a vector store.
16
-
17
- A vector store is a key-value store that maps an ID to a vector embedding and a payload. The
18
- payload can be any JSON-serializable object, e.g. a dictionary.
19
- """
20
-
21
- INDEX_FILE = "vectors_index.json"
22
- EMBEDDINGS_FILE = "vectors_data.npy"
23
- PAYLOADS_FILE = "vectors_payloads.json"
24
-
25
- @abc.abstractmethod
26
- def _add(self, embedding: E, payload: T, emb_id: str) -> None:
27
- """Save an embedding with payload for a given ID."""
28
- pass
29
-
30
- @abc.abstractmethod
31
- def _get(self, emb_id: str) -> Optional[E]:
32
- """Get the embedding for a given ID."""
33
- pass
34
-
35
- @abc.abstractmethod
36
- def clear(self) -> None:
37
- """Clear the store."""
38
- pass
39
-
40
- def _get_emb_id(self, emb_id: Optional[str] = None, payload: Optional[T] = None) -> str:
41
- if emb_id is None:
42
- if payload is None:
43
- raise ValueError("Either emb_id or payload must be provided.")
44
- emb_id = json.dumps(payload, sort_keys=True)
45
- return emb_id
46
-
47
- def add(self, embedding: E, payload: T, emb_id: Optional[str] = None) -> None:
48
- if emb_id is None:
49
- emb_id = json.dumps(payload, sort_keys=True)
50
- self._add(embedding=embedding, payload=payload, emb_id=emb_id)
51
-
52
- def get(self, emb_id: Optional[str] = None, payload: Optional[T] = None) -> Optional[E]:
53
- return self._get(emb_id=self._get_emb_id(emb_id=emb_id, payload=payload))
54
-
55
- def has(self, emb_id: Optional[str] = None, payload: Optional[T] = None) -> bool:
56
- return self.get(emb_id=emb_id, payload=payload) is not None
57
-
58
- @abc.abstractmethod
59
- def _retrieve_similar(
60
- self, ref_id: str, top_k: Optional[int] = None, min_similarity: Optional[float] = None
61
- ) -> List[Tuple[T, float]]:
62
- """Retrieve IDs, payloads and the respective similarity scores with respect to the
63
- reference entry. In the case that the reference entry is not in the store itself, an empty
64
- list will be returned.
65
-
66
- Args:
67
- ref_id: The ID of the reference entry.
68
- top_k: If provided, only the top-k most similar entries will be returned.
69
- min_similarity: If provided, only entries with a similarity score greater or equal to
70
- this value will be returned.
71
-
72
- Returns:
73
- A list of tuples consisting of the ID and the similarity score, sorted by similarity
74
- score in descending order.
75
- """
76
- pass
77
-
78
- def retrieve_similar(
79
- self, ref_id: Optional[str] = None, ref_payload: Optional[T] = None, **kwargs
80
- ) -> List[Tuple[T, float]]:
81
- if not self.has(emb_id=ref_id, payload=ref_payload):
82
- return []
83
- return self._retrieve_similar(
84
- ref_id=self._get_emb_id(emb_id=ref_id, payload=ref_payload), **kwargs
85
- )
86
-
87
- @abc.abstractmethod
88
- def __len__(self):
89
- pass
90
-
91
- def _add_from_directory(self, directory: str) -> None:
92
- with open(os.path.join(directory, self.INDEX_FILE), "r") as f:
93
- index = json.load(f)
94
- embeddings_np = np.load(os.path.join(directory, self.EMBEDDINGS_FILE))
95
- with open(os.path.join(directory, self.PAYLOADS_FILE), "r") as f:
96
- payloads = json.load(f)
97
- for emb_id, emb, payload in zip(index, embeddings_np, payloads):
98
- self._add(emb_id=emb_id, payload=payload, embedding=emb.tolist())
99
-
100
- @abc.abstractmethod
101
- def as_indices_vectors_payloads(self) -> Tuple[List[str], np.ndarray, List[T]]:
102
- """Return a tuple of indices, vectors and payloads."""
103
- pass
104
-
105
- def _save_to_directory(self, directory: str) -> None:
106
- indices, vectors, payloads = self.as_indices_vectors_payloads()
107
- np.save(os.path.join(directory, self.EMBEDDINGS_FILE), vectors)
108
- with open(os.path.join(directory, self.PAYLOADS_FILE), "w") as f:
109
- json.dump(payloads, f)
110
- with open(os.path.join(directory, self.INDEX_FILE), "w") as f:
111
- json.dump(indices, f)
112
-
113
- def save_to_directory(self, directory: str) -> None:
114
- """Save the vector store to a directory."""
115
- os.makedirs(directory, exist_ok=True)
116
- self._save_to_directory(directory)
117
-
118
- def load_from_directory(self, directory: str, replace: bool = False) -> None:
119
- """Load the vector store from a directory.
120
-
121
- If `replace` is True, the current content of the store will be replaced.
122
- """
123
- if replace:
124
- self.clear()
125
- self._add_from_directory(directory)
126
-
127
-
128
- def vector_norm(vector: List[float]) -> float:
129
- return sum(x**2 for x in vector) ** 0.5
130
-
131
-
132
- def cosine_similarity(a: List[float], b: List[float]) -> float:
133
- return sum(a * b for a, b in zip(a, b)) / (vector_norm(a) * vector_norm(b))
134
-
135
-
136
- class SimpleVectorStore(VectorStore[T, List[float]]):
137
- """Simple in-memory vector store using a dictionary."""
138
-
139
- def __init__(self):
140
- self.vectors: dict[str, List[float]] = {}
141
- self.payloads: dict[str, T] = {}
142
- self._cache = {}
143
- self._sim = cosine_similarity
144
-
145
- def _add(self, embedding: E, payload: T, emb_id: str) -> None:
146
- self.vectors[emb_id] = embedding
147
- self.payloads[emb_id] = payload
148
-
149
- def _get(self, emb_id: str) -> Optional[E]:
150
- return self.vectors.get(emb_id)
151
-
152
- def delete(self, emb_id: str) -> None:
153
- if emb_id in self.vectors:
154
- del self.vectors[emb_id]
155
- del self.payloads[emb_id]
156
- # remove from cache
157
- self._cache = {k: v for k, v in self._cache.items() if emb_id not in k}
158
-
159
- def clear(self) -> None:
160
- self.vectors.clear()
161
- self._cache.clear()
162
- self.payloads.clear()
163
-
164
- def __len__(self):
165
- return len(self.vectors)
166
-
167
- def _retrieve_similar(
168
- self, ref_id: str, top_k: Optional[int] = None, min_similarity: Optional[float] = None
169
- ) -> List[Tuple[str, T, float]]:
170
- ref_embedding = self.get(emb_id=ref_id)
171
- if ref_embedding is None:
172
- raise ValueError(f"Reference embedding '{ref_id}' not found.")
173
-
174
- # calculate similarity to all embeddings
175
- similarities = {}
176
- for emb_id, embedding in self.vectors.items():
177
- if (emb_id, ref_id) not in self._cache:
178
- # use cosine similarity
179
- self._cache[(emb_id, ref_id)] = self._sim(ref_embedding, embedding)
180
- similarities[emb_id] = self._cache[(emb_id, ref_id)]
181
-
182
- # sort by similarity
183
- similar_entries = sorted(similarities.items(), key=lambda x: x[1], reverse=True)
184
-
185
- if min_similarity is not None:
186
- similar_entries = [
187
- (emb_id, sim) for emb_id, sim in similar_entries if sim >= min_similarity
188
- ]
189
- if top_k is not None:
190
- similar_entries = similar_entries[:top_k]
191
-
192
- return [(emb_id, self.payloads[emb_id], sim) for emb_id, sim in similar_entries]
193
-
194
- def _save_to_directory(self, directory: str) -> None:
195
- indices = list(self.vectors.keys())
196
- with open(os.path.join(directory, self.INDEX_FILE), "w") as f:
197
- json.dump(indices, f)
198
- embeddings_np = np.array(list(self.vectors.values()))
199
- np.save(os.path.join(directory, self.EMBEDDINGS_FILE), embeddings_np)
200
- payloads = [self.payloads[idx] for idx in indices]
201
- with open(os.path.join(directory, self.PAYLOADS_FILE), "w") as f:
202
- json.dump(payloads, f)
203
-
204
- def as_indices_vectors_payloads(self) -> Tuple[List[str], np.ndarray, List[T]]:
205
- indices = list(self.vectors.keys())
206
- embeddings_np = np.array(list(self.vectors.values()))
207
- payloads = [self.payloads[idx] for idx in indices]
208
- return indices, embeddings_np, payloads
209
-
210
-
211
- class QdrantVectorStore(VectorStore[T, List[float]]):
212
- """Vector store using Qdrant as a backend."""
213
-
214
- COLLECTION_NAME = "ADUs"
215
- MAX_LIMIT = 100
216
-
217
- def __init__(
218
- self,
219
- location: str = ":memory:",
220
- vector_size: int = 768,
221
- distance: Distance = Distance.COSINE,
222
- ):
223
- self.client = QdrantClient(location=location)
224
- self.emb_id2point_id = {}
225
- self.point_id2emb_id = {}
226
- self.client.create_collection(
227
- collection_name=self.COLLECTION_NAME,
228
- vectors_config=VectorParams(size=vector_size, distance=distance),
229
- )
230
-
231
- def __len__(self):
232
- return self.client.get_collection(collection_name=self.COLLECTION_NAME).points_count
233
-
234
- def _add(self, emb_id: str, payload: T, embedding: List[float]) -> None:
235
-
236
- if emb_id in self.emb_id2point_id:
237
- # update existing entry
238
- point_id = self.emb_id2point_id[emb_id]
239
- else:
240
- # we use the length of the emb_id2point_id dict as the index,
241
- # because we assume that, even when we delete an entry from
242
- # the store, we do not delete it from the index
243
- point_id = len(self.emb_id2point_id)
244
- self.emb_id2point_id[emb_id] = point_id
245
- self.point_id2emb_id[point_id] = emb_id
246
-
247
- self.client.upsert(
248
- collection_name=self.COLLECTION_NAME,
249
- points=[PointStruct(id=point_id, vector=embedding, payload=payload)],
250
- )
251
-
252
- def _get(self, emb_id: str) -> Optional[List[float]]:
253
- if emb_id not in self.emb_id2point_id:
254
- return None
255
- points = self.client.retrieve(
256
- collection_name=self.COLLECTION_NAME,
257
- ids=[self.emb_id2point_id[emb_id]],
258
- with_vectors=True,
259
- )
260
- if len(points) == 0:
261
- return None
262
- elif len(points) == 1:
263
- return points[0].vector
264
- else:
265
- raise ValueError(f"Multiple points found for ID '{emb_id}'.")
266
-
267
- def _retrieve_similar(
268
- self, ref_id: str, top_k: Optional[int] = None, min_similarity: Optional[float] = None
269
- ) -> List[Tuple[str, T, float]]:
270
- similar_entries = self.client.recommend(
271
- collection_name=self.COLLECTION_NAME,
272
- positive=[self.emb_id2point_id[ref_id]],
273
- limit=top_k or self.MAX_LIMIT,
274
- score_threshold=min_similarity,
275
- )
276
- return [
277
- (self.point_id2emb_id[entry.id], entry.payload, entry.score)
278
- for entry in similar_entries
279
- ]
280
-
281
- def clear(self) -> None:
282
- vectors_config = self.client.get_collection(
283
- collection_name=self.COLLECTION_NAME
284
- ).vectors_config
285
- self.client.delete_collection(collection_name=self.COLLECTION_NAME)
286
- self.client.create_collection(
287
- collection_name=self.COLLECTION_NAME,
288
- vectors_config=vectors_config,
289
- )
290
- self.emb_id2point_id.clear()
291
- self.point_id2emb_id.clear()
292
-
293
- def as_indices_vectors_payloads(self) -> Tuple[List[str], np.ndarray, List[T]]:
294
- num_entries = self.client.get_collection(collection_name=self.COLLECTION_NAME).points_count
295
- data, point_ids = self.client.scroll(
296
- collection_name=self.COLLECTION_NAME, with_vectors=True, limit=num_entries
297
- )
298
- vectors_np = np.array([point.vector for point in data])
299
- payloads = [point.payload for point in data]
300
- emb_ids = [self.point_id2emb_id[point.id] for point in data]
301
- return emb_ids, vectors_np, payloads