Spaces:
Running
Running
# Copyright DataStax, Inc. | |
# | |
# Licensed under the Apache License, Version 2.0 (the "License"); | |
# you may not use this file except in compliance with the License. | |
# You may obtain a copy of the License at | |
# | |
# http://www.apache.org/licenses/LICENSE-2.0 | |
# | |
# Unless required by applicable law or agreed to in writing, software | |
# distributed under the License is distributed on an "AS IS" BASIS, | |
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
# See the License for the specific language governing permissions and | |
# limitations under the License. | |
from __future__ import annotations | |
import asyncio | |
import json | |
import logging | |
from concurrent.futures import ThreadPoolExecutor | |
from typing import Any, Dict, Iterable, List, Optional, Tuple, Union, TYPE_CHECKING | |
from astrapy.core.db import ( | |
AstraDBCollection, | |
AsyncAstraDBCollection, | |
) | |
from astrapy.core.defaults import MAX_INSERT_NUM_DOCUMENTS | |
from astrapy.exceptions import ( | |
BulkWriteException, | |
CollectionNotFoundException, | |
CumulativeOperationException, | |
DataAPIFaultyResponseException, | |
DataAPIResponseException, | |
DeleteManyException, | |
InsertManyException, | |
MultiCallTimeoutManager, | |
TooManyDocumentsToCountException, | |
UpdateManyException, | |
recast_method_sync, | |
recast_method_async, | |
base_timeout_info, | |
) | |
from astrapy.constants import ( | |
DocumentType, | |
FilterType, | |
ProjectionType, | |
ReturnDocument, | |
SortType, | |
VectorType, | |
normalize_optional_projection, | |
) | |
from astrapy.database import AsyncDatabase, Database | |
from astrapy.results import ( | |
DeleteResult, | |
InsertManyResult, | |
InsertOneResult, | |
UpdateResult, | |
BulkWriteResult, | |
) | |
from astrapy.cursors import AsyncCursor, Cursor | |
from astrapy.info import CollectionInfo, CollectionOptions | |
if TYPE_CHECKING: | |
from astrapy.operations import AsyncBaseOperation, BaseOperation | |
logger = logging.getLogger(__name__) | |
DEFAULT_INSERT_MANY_CONCURRENCY = 20 | |
DEFAULT_BULK_WRITE_CONCURRENCY = 10 | |
def _prepare_update_info(statuses: List[Dict[str, Any]]) -> Dict[str, Any]: | |
reduced_status = { | |
"matchedCount": sum( | |
status["matchedCount"] for status in statuses if "matchedCount" in status | |
), | |
"modifiedCount": sum( | |
status["modifiedCount"] for status in statuses if "modifiedCount" in status | |
), | |
"upsertedId": [ | |
status["upsertedId"] for status in statuses if "upsertedId" in status | |
], | |
} | |
if reduced_status["upsertedId"]: | |
if len(reduced_status["upsertedId"]) == 1: | |
ups_dict = {"upserted": reduced_status["upsertedId"][0]} | |
else: | |
ups_dict = {"upserteds": reduced_status["upsertedId"]} | |
else: | |
ups_dict = {} | |
return { | |
**{ | |
"n": reduced_status["matchedCount"] + len(reduced_status["upsertedId"]), | |
"updatedExisting": reduced_status["modifiedCount"] > 0, | |
"ok": 1.0, | |
"nModified": reduced_status["modifiedCount"], | |
}, | |
**ups_dict, | |
} | |
def _collate_vector_to_sort( | |
sort: Optional[SortType], | |
vector: Optional[VectorType], | |
vectorize: Optional[str], | |
) -> Optional[SortType]: | |
_vsort: Dict[str, Any] | |
if vector is None: | |
if vectorize is None: | |
return sort | |
else: | |
_vsort = {"$vectorize": vectorize} | |
if sort is None: | |
return _vsort | |
else: | |
raise ValueError( | |
"The `vectorize` and `sort` clauses are mutually exclusive." | |
) | |
else: | |
if vectorize is None: | |
_vsort = {"$vector": vector} | |
if sort is None: | |
return _vsort | |
else: | |
raise ValueError( | |
"The `vector` and `sort` clauses are mutually exclusive." | |
) | |
else: | |
raise ValueError( | |
"The `vector` and `vectorize` parameters cannot be passed at the same time." | |
) | |
def _is_vector_sort(sort: Optional[SortType]) -> bool: | |
if sort is None: | |
return False | |
else: | |
return "$vector" in sort or "$vectorize" in sort | |
def _collate_vector_to_document( | |
document0: DocumentType, vector: Optional[VectorType], vectorize: Optional[str] | |
) -> DocumentType: | |
if vector is None: | |
if vectorize is None: | |
return document0 | |
else: | |
if "$vectorize" in document0: | |
raise ValueError( | |
"Cannot specify the `vectorize` separately for a document with " | |
"its '$vectorize' field already." | |
) | |
else: | |
return { | |
**document0, | |
**{"$vectorize": vectorize}, | |
} | |
else: | |
if vectorize is None: | |
if "$vector" in document0: | |
raise ValueError( | |
"Cannot specify the `vector` separately for a document with " | |
"its '$vector' field already." | |
) | |
else: | |
return { | |
**document0, | |
**{"$vector": vector}, | |
} | |
else: | |
raise ValueError( | |
"The `vector` and `vectorize` parameters cannot be passed at the same time." | |
) | |
def _collate_vectors_to_documents( | |
documents: Iterable[DocumentType], | |
vectors: Optional[Iterable[Optional[VectorType]]], | |
vectorize: Optional[Iterable[Optional[str]]], | |
) -> List[DocumentType]: | |
if vectors is None and vectorize is None: | |
return list(documents) | |
else: | |
_documents = list(documents) | |
_ndocs = len(_documents) | |
_vectors = list(vectors) if vectors else [None] * _ndocs | |
_vectorize = list(vectorize) if vectorize else [None] * _ndocs | |
if _ndocs != len(_vectors): | |
raise ValueError( | |
"The `documents` and `vectors` parameters must have the same length" | |
) | |
elif _ndocs != len(_vectorize): | |
raise ValueError( | |
"The `documents` and `vectorize` parameters must have the same length" | |
) | |
return [ | |
_collate_vector_to_document(_doc, _vec, _vecize) | |
for _doc, _vec, _vecize in zip(_documents, _vectors, _vectorize) | |
] | |
class Collection: | |
""" | |
A Data API collection, the main object to interact with the Data API, | |
especially for DDL operations. | |
This class has a synchronous interface. | |
A Collection is spawned from a Database object, from which it inherits | |
the details on how to reach the API server (endpoint, authentication token). | |
Args: | |
database: a Database object, instantiated earlier. This represents | |
the database the collection belongs to. | |
name: the collection name. This parameter should match an existing | |
collection on the database. | |
namespace: this is the namespace to which the collection belongs. | |
If not specified, the database's working namespace is used. | |
caller_name: name of the application, or framework, on behalf of which | |
the Data API calls are performed. This ends up in the request user-agent. | |
caller_version: version of the caller. | |
Examples: | |
>>> from astrapy import DataAPIClient, Collection | |
>>> my_client = astrapy.DataAPIClient("AstraCS:...") | |
>>> my_db = my_client.get_database_by_api_endpoint( | |
... "https://01234567-....apps.astra.datastax.com" | |
... ) | |
>>> my_coll_1 = Collection(database=my_db, name="my_collection") | |
>>> my_coll_2 = my_db.create_collection( | |
... "my_v_collection", | |
... dimension=3, | |
... metric="cosine", | |
... ) | |
>>> my_coll_3a = my_db.get_collection("my_already_existing_collection") | |
>>> my_coll_3b = my_db.my_already_existing_collection | |
>>> my_coll_3c = my_db["my_already_existing_collection"] | |
Note: | |
creating an instance of Collection does not trigger actual creation | |
of the collection on the database. The latter should have been created | |
beforehand, e.g. through the `create_collection` method of a Database. | |
""" | |
def __init__( | |
self, | |
database: Database, | |
name: str, | |
*, | |
namespace: Optional[str] = None, | |
caller_name: Optional[str] = None, | |
caller_version: Optional[str] = None, | |
) -> None: | |
self._astra_db_collection: AstraDBCollection = AstraDBCollection( | |
collection_name=name, | |
astra_db=database._astra_db, | |
namespace=namespace, | |
caller_name=caller_name, | |
caller_version=caller_version, | |
) | |
# this comes after the above, lets AstraDBCollection resolve namespace | |
self._database = database._copy( | |
namespace=self._astra_db_collection.astra_db.namespace | |
) | |
def __repr__(self) -> str: | |
return ( | |
f'{self.__class__.__name__}(name="{self.name}", ' | |
f'namespace="{self.namespace}", database={self.database})' | |
) | |
def __eq__(self, other: Any) -> bool: | |
if isinstance(other, Collection): | |
return self._astra_db_collection == other._astra_db_collection | |
else: | |
return False | |
def __call__(self, *pargs: Any, **kwargs: Any) -> None: | |
raise TypeError( | |
f"'{self.__class__.__name__}' object is not callable. If you " | |
f"meant to call the '{self.name}' method on a " | |
f"'{self.database.__class__.__name__}' object " | |
"it is failing because no such method exists." | |
) | |
def _copy( | |
self, | |
*, | |
database: Optional[Database] = None, | |
name: Optional[str] = None, | |
namespace: Optional[str] = None, | |
caller_name: Optional[str] = None, | |
caller_version: Optional[str] = None, | |
) -> Collection: | |
return Collection( | |
database=database or self.database._copy(), | |
name=name or self.name, | |
namespace=namespace or self.namespace, | |
caller_name=caller_name or self._astra_db_collection.caller_name, | |
caller_version=caller_version or self._astra_db_collection.caller_version, | |
) | |
def with_options( | |
self, | |
*, | |
name: Optional[str] = None, | |
caller_name: Optional[str] = None, | |
caller_version: Optional[str] = None, | |
) -> Collection: | |
""" | |
Create a clone of this collection with some changed attributes. | |
Args: | |
name: the name of the collection. This parameter is useful to | |
quickly spawn Collection instances each pointing to a different | |
collection existing in the same namespace. | |
caller_name: name of the application, or framework, on behalf of which | |
the Data API calls are performed. This ends up in the request user-agent. | |
caller_version: version of the caller. | |
Returns: | |
a new Collection instance. | |
Example: | |
>>> my_other_coll = my_coll.with_options( | |
... name="the_other_coll", | |
... caller_name="caller_identity", | |
... ) | |
""" | |
return self._copy( | |
name=name, | |
caller_name=caller_name, | |
caller_version=caller_version, | |
) | |
def to_async( | |
self, | |
*, | |
database: Optional[AsyncDatabase] = None, | |
name: Optional[str] = None, | |
namespace: Optional[str] = None, | |
caller_name: Optional[str] = None, | |
caller_version: Optional[str] = None, | |
) -> AsyncCollection: | |
""" | |
Create an AsyncCollection from this one. Save for the arguments | |
explicitly provided as overrides, everything else is kept identical | |
to this collection in the copy (the database is converted into | |
an async object). | |
Args: | |
database: an AsyncDatabase object, instantiated earlier. | |
This represents the database the new collection belongs to. | |
name: the collection name. This parameter should match an existing | |
collection on the database. | |
namespace: this is the namespace to which the collection belongs. | |
If not specified, the database's working namespace is used. | |
caller_name: name of the application, or framework, on behalf of which | |
the Data API calls are performed. This ends up in the request user-agent. | |
caller_version: version of the caller. | |
Returns: | |
the new copy, an AsyncCollection instance. | |
Example: | |
>>> asyncio.run(my_coll.to_async().count_documents({},upper_bound=100)) | |
77 | |
""" | |
return AsyncCollection( | |
database=database or self.database.to_async(), | |
name=name or self.name, | |
namespace=namespace or self.namespace, | |
caller_name=caller_name or self._astra_db_collection.caller_name, | |
caller_version=caller_version or self._astra_db_collection.caller_version, | |
) | |
def set_caller( | |
self, | |
caller_name: Optional[str] = None, | |
caller_version: Optional[str] = None, | |
) -> None: | |
""" | |
Set a new identity for the application/framework on behalf of which | |
the Data API calls are performed (the "caller"). | |
Args: | |
caller_name: name of the application, or framework, on behalf of which | |
the Data API calls are performed. This ends up in the request user-agent. | |
caller_version: version of the caller. | |
Example: | |
>>> my_coll.set_caller(caller_name="the_caller", caller_version="0.1.0") | |
""" | |
logger.info(f"setting caller to {caller_name}/{caller_version}") | |
self._astra_db_collection.set_caller( | |
caller_name=caller_name, | |
caller_version=caller_version, | |
) | |
def options(self, *, max_time_ms: Optional[int] = None) -> CollectionOptions: | |
""" | |
Get the collection options, i.e. its configuration as read from the database. | |
The method issues a request to the Data API each time is invoked, | |
without caching mechanisms: this ensures up-to-date information | |
for usages such as real-time collection validation by the application. | |
Args: | |
max_time_ms: a timeout, in milliseconds, for the underlying HTTP request. | |
Returns: | |
a CollectionOptions instance describing the collection. | |
(See also the database `list_collections` method.) | |
Example: | |
>>> my_coll.options() | |
CollectionOptions(vector=CollectionVectorOptions(dimension=3, metric='cosine')) | |
""" | |
logger.info(f"getting collections in search of '{self.name}'") | |
self_descriptors = [ | |
coll_desc | |
for coll_desc in self.database.list_collections(max_time_ms=max_time_ms) | |
if coll_desc.name == self.name | |
] | |
logger.info(f"finished getting collections in search of '{self.name}'") | |
if self_descriptors: | |
return self_descriptors[0].options # type: ignore[no-any-return] | |
else: | |
raise CollectionNotFoundException( | |
text=f"Collection {self.namespace}.{self.name} not found.", | |
namespace=self.namespace, | |
collection_name=self.name, | |
) | |
def info(self) -> CollectionInfo: | |
""" | |
Information on the collection (name, location, database), in the | |
form of a CollectionInfo object. | |
Not to be confused with the collection `options` method (related | |
to the collection internal configuration). | |
Example: | |
>>> my_coll.info().database_info.region | |
'eu-west-1' | |
>>> my_coll.info().full_name | |
'default_keyspace.my_v_collection' | |
Note: | |
the returned CollectionInfo wraps, among other things, | |
the database information: as such, calling this method | |
triggers the same-named method of a Database object (which, in turn, | |
performs a HTTP request to the DevOps API). | |
See the documentation for `Database.info()` for more details. | |
""" | |
return CollectionInfo( | |
database_info=self.database.info(), | |
namespace=self.namespace, | |
name=self.name, | |
full_name=self.full_name, | |
) | |
def database(self) -> Database: | |
""" | |
a Database object, the database this collection belongs to. | |
Example: | |
>>> my_coll.database.name | |
'the_application_database' | |
""" | |
return self._database | |
def namespace(self) -> str: | |
""" | |
The namespace this collection is in. | |
Example: | |
>>> my_coll.namespace | |
'default_keyspace' | |
""" | |
return self.database.namespace | |
def name(self) -> str: | |
""" | |
The name of this collection. | |
Example: | |
>>> my_coll.name | |
'my_v_collection' | |
""" | |
# type hint added as for some reason the typechecker gets lost | |
return self._astra_db_collection.collection_name # type: ignore[no-any-return, has-type] | |
def full_name(self) -> str: | |
""" | |
The fully-qualified collection name within the database, | |
in the form "namespace.collection_name". | |
Example: | |
>>> my_coll.full_name | |
'default_keyspace.my_v_collection' | |
""" | |
return f"{self.namespace}.{self.name}" | |
def insert_one( | |
self, | |
document: DocumentType, | |
*, | |
vector: Optional[VectorType] = None, | |
vectorize: Optional[str] = None, | |
max_time_ms: Optional[int] = None, | |
) -> InsertOneResult: | |
""" | |
Insert a single document in the collection in an atomic operation. | |
Args: | |
document: the dictionary expressing the document to insert. | |
The `_id` field of the document can be left out, in which | |
case it will be created automatically. | |
vector: a vector (a list of numbers appropriate for the collection) | |
for the document. Passing this parameter is equivalent to | |
providing the vector in the "$vector" field of the document itself, | |
however the two are mutually exclusive. | |
vectorize: a string to be made into a vector, if such a service | |
is configured for the collection. Passing this parameter is | |
equivalent to providing a `$vectorize` field in the document itself, | |
however the two are mutually exclusive. | |
Moreover, this parameter cannot coexist with `vector`. | |
NOTE: This feature is under current development. | |
max_time_ms: a timeout, in milliseconds, for the underlying HTTP request. | |
Returns: | |
an InsertOneResult object. | |
Examples: | |
>>> my_coll.count_documents({}, upper_bound=10) | |
0 | |
>>> my_coll.insert_one( | |
... { | |
... "age": 30, | |
... "name": "Smith", | |
... "food": ["pear", "peach"], | |
... "likes_fruit": True, | |
... }, | |
... ) | |
InsertOneResult(raw_results=..., inserted_id='ed4587a4-...-...-...') | |
>>> my_coll.insert_one({"_id": "user-123", "age": 50, "name": "Maccio"}) | |
InsertOneResult(raw_results=..., inserted_id='user-123') | |
>>> my_coll.count_documents({}, upper_bound=10) | |
2 | |
>>> my_coll.insert_one({"tag": v"}, vector=[10, 11]) | |
InsertOneResult(...) | |
Note: | |
If an `_id` is explicitly provided, which corresponds to a document | |
that exists already in the collection, an error is raised and | |
the insertion fails. | |
""" | |
_document = _collate_vector_to_document(document, vector, vectorize) | |
logger.info(f"inserting one document in '{self.name}'") | |
io_response = self._astra_db_collection.insert_one( | |
_document, | |
timeout_info=base_timeout_info(max_time_ms), | |
) | |
logger.info(f"finished inserting one document in '{self.name}'") | |
if "insertedIds" in io_response.get("status", {}): | |
if io_response["status"]["insertedIds"]: | |
inserted_id = io_response["status"]["insertedIds"][0] | |
return InsertOneResult( | |
raw_results=[io_response], | |
inserted_id=inserted_id, | |
) | |
else: | |
raise DataAPIFaultyResponseException( | |
text="Faulty response from insert_one API command.", | |
raw_response=io_response, | |
) | |
else: | |
raise DataAPIFaultyResponseException( | |
text="Faulty response from insert_one API command.", | |
raw_response=io_response, | |
) | |
def insert_many( | |
self, | |
documents: Iterable[DocumentType], | |
*, | |
vectors: Optional[Iterable[Optional[VectorType]]] = None, | |
vectorize: Optional[Iterable[Optional[str]]] = None, | |
ordered: bool = True, | |
chunk_size: Optional[int] = None, | |
concurrency: Optional[int] = None, | |
max_time_ms: Optional[int] = None, | |
) -> InsertManyResult: | |
""" | |
Insert a list of documents into the collection. | |
This is not an atomic operation. | |
Args: | |
documents: an iterable of dictionaries, each a document to insert. | |
Documents may specify their `_id` field or leave it out, in which | |
case it will be added automatically. | |
vectors: an optional list of vectors (as many vectors as the provided | |
documents) to associate to the documents when inserting. | |
Each vector is added to the corresponding document prior to | |
insertion on database. The list can be a mixture of None and vectors, | |
in which case some documents will not have a vector, unless it is | |
specified in their "$vector" field already. | |
Passing vectors this way is indeed equivalent to the "$vector" field | |
of the documents, however the two are mutually exclusive. | |
vectorize: an optional list of strings to be made into as many vectors | |
(one per document), if such a service is configured for the collection. | |
Passing this parameter is equivalent to providing a `$vectorize` | |
field in the documents themselves, however the two are mutually exclusive. | |
For any given document, this parameter cannot coexist with the | |
corresponding `vector` entry. | |
NOTE: This feature is under current development. | |
ordered: if True (default), the insertions are processed sequentially. | |
If False, they can occur in arbitrary order and possibly concurrently. | |
chunk_size: how many documents to include in a single API request. | |
Exceeding the server maximum allowed value results in an error. | |
Leave it unspecified (recommended) to use the system default. | |
concurrency: maximum number of concurrent requests to the API at | |
a given time. It cannot be more than one for ordered insertions. | |
max_time_ms: a timeout, in milliseconds, for the operation. | |
Returns: | |
an InsertManyResult object. | |
Examples: | |
>>> my_coll.count_documents({}, upper_bound=10) | |
0 | |
>>> my_coll.insert_many([{"a": 10}, {"a": 5}, {"b": [True, False, False]}]) | |
InsertManyResult(raw_results=..., inserted_ids=['184bb06f-...', '...', '...']) | |
>>> my_coll.count_documents({}, upper_bound=100) | |
3 | |
>>> my_coll.insert_many( | |
... [{"seq": i} for i in range(50)], | |
... ordered=False, | |
... concurrency=5, | |
... ) | |
InsertManyResult(raw_results=..., inserted_ids=[... ...]) | |
>>> my_coll.count_documents({}, upper_bound=100) | |
53 | |
# The following are three equivalent statements: | |
>>> my_coll.insert_many( | |
... [{"tag": "a"}, {"tag": "b"}], | |
... vectors=[[1, 2], [3, 4]], | |
... ) | |
InsertManyResult(...) | |
>>> my_coll.insert_many( | |
... [{"tag": "a", "$vector": [1, 2]}, {"tag": "b"}], | |
... vectors=[None, [3, 4]], | |
... ) | |
InsertManyResult(...) | |
>>> my_coll.insert_many( | |
... [ | |
... {"tag": "a", "$vector": [1, 2]}, | |
... {"tag": "b", "$vector": [3, 4]}, | |
... ] | |
... ) | |
InsertManyResult(...) | |
Note: | |
Unordered insertions are executed with some degree of concurrency, | |
so it is usually better to prefer this mode unless the order in the | |
document sequence is important. | |
Note: | |
A failure mode for this command is related to certain faulty documents | |
found among those to insert: a document may have the an `_id` already | |
present on the collection, or its vector dimension may not | |
match the collection setting. | |
For an ordered insertion, the method will raise an exception at | |
the first such faulty document -- nevertheless, all documents processed | |
until then will end up being written to the database. | |
For unordered insertions, if the error stems from faulty documents | |
the insertion proceeds until exhausting the input documents: then, | |
an exception is raised -- and all insertable documents will have been | |
written to the database, including those "after" the troublesome ones. | |
If, on the other hand, there are errors not related to individual | |
documents (such as a network connectivity error), the whole | |
`insert_many` operation will stop in mid-way, an exception will be raised, | |
and only a certain amount of the input documents will | |
have made their way to the database. | |
""" | |
if concurrency is None: | |
if ordered: | |
_concurrency = 1 | |
else: | |
_concurrency = DEFAULT_INSERT_MANY_CONCURRENCY | |
else: | |
_concurrency = concurrency | |
if _concurrency > 1 and ordered: | |
raise ValueError("Cannot run ordered insert_many concurrently.") | |
if chunk_size is None: | |
_chunk_size = MAX_INSERT_NUM_DOCUMENTS | |
else: | |
_chunk_size = chunk_size | |
_documents = _collate_vectors_to_documents(documents, vectors, vectorize) | |
logger.info(f"inserting {len(_documents)} documents in '{self.name}'") | |
raw_results: List[Dict[str, Any]] = [] | |
timeout_manager = MultiCallTimeoutManager(overall_max_time_ms=max_time_ms) | |
if ordered: | |
options = {"ordered": True} | |
inserted_ids: List[Any] = [] | |
for i in range(0, len(_documents), _chunk_size): | |
logger.info(f"inserting a chunk of documents in '{self.name}'") | |
chunk_response = self._astra_db_collection.insert_many( | |
documents=_documents[i : i + _chunk_size], | |
options=options, | |
partial_failures_allowed=True, | |
timeout_info=timeout_manager.remaining_timeout_info(), | |
) | |
logger.info(f"finished inserting a chunk of documents in '{self.name}'") | |
# accumulate the results in this call | |
chunk_inserted_ids = (chunk_response.get("status") or {}).get( | |
"insertedIds", [] | |
) | |
inserted_ids += chunk_inserted_ids | |
raw_results += [chunk_response] | |
# if errors, quit early | |
if chunk_response.get("errors", []): | |
partial_result = InsertManyResult( | |
raw_results=raw_results, | |
inserted_ids=inserted_ids, | |
) | |
raise InsertManyException.from_response( | |
command=None, | |
raw_response=chunk_response, | |
partial_result=partial_result, | |
) | |
# return | |
full_result = InsertManyResult( | |
raw_results=raw_results, | |
inserted_ids=inserted_ids, | |
) | |
logger.info( | |
f"finished inserting {len(_documents)} documents in '{self.name}'" | |
) | |
return full_result | |
else: | |
# unordered: concurrent or not, do all of them and parse the results | |
options = {"ordered": False} | |
if _concurrency > 1: | |
with ThreadPoolExecutor(max_workers=_concurrency) as executor: | |
def _chunk_insertor( | |
document_chunk: List[Dict[str, Any]] | |
) -> Dict[str, Any]: | |
logger.info(f"inserting a chunk of documents in '{self.name}'") | |
im_response = self._astra_db_collection.insert_many( | |
documents=document_chunk, | |
options=options, | |
partial_failures_allowed=True, | |
timeout_info=timeout_manager.remaining_timeout_info(), | |
) | |
logger.info( | |
f"finished inserting a chunk of documents in '{self.name}'" | |
) | |
return im_response | |
raw_results = list( | |
executor.map( | |
_chunk_insertor, | |
( | |
_documents[i : i + _chunk_size] | |
for i in range(0, len(_documents), _chunk_size) | |
), | |
) | |
) | |
else: | |
for i in range(0, len(_documents), _chunk_size): | |
logger.info(f"inserting a chunk of documents in '{self.name}'") | |
raw_results.append( | |
self._astra_db_collection.insert_many( | |
_documents[i : i + _chunk_size], | |
options=options, | |
partial_failures_allowed=True, | |
timeout_info=timeout_manager.remaining_timeout_info(), | |
) | |
) | |
logger.info( | |
f"finished inserting a chunk of documents in '{self.name}'" | |
) | |
# recast raw_results | |
inserted_ids = [ | |
inserted_id | |
for chunk_response in raw_results | |
for inserted_id in (chunk_response.get("status") or {}).get( | |
"insertedIds", [] | |
) | |
] | |
# check-raise | |
if any( | |
[chunk_response.get("errors", []) for chunk_response in raw_results] | |
): | |
partial_result = InsertManyResult( | |
raw_results=raw_results, | |
inserted_ids=inserted_ids, | |
) | |
raise InsertManyException.from_responses( | |
commands=[None for _ in raw_results], | |
raw_responses=raw_results, | |
partial_result=partial_result, | |
) | |
# return | |
full_result = InsertManyResult( | |
raw_results=raw_results, | |
inserted_ids=inserted_ids, | |
) | |
logger.info( | |
f"finished inserting {len(_documents)} documents in '{self.name}'" | |
) | |
return full_result | |
def find( | |
self, | |
filter: Optional[FilterType] = None, | |
*, | |
projection: Optional[ProjectionType] = None, | |
skip: Optional[int] = None, | |
limit: Optional[int] = None, | |
vector: Optional[VectorType] = None, | |
vectorize: Optional[str] = None, | |
include_similarity: Optional[bool] = None, | |
sort: Optional[SortType] = None, | |
max_time_ms: Optional[int] = None, | |
) -> Cursor: | |
""" | |
Find documents on the collection, matching a certain provided filter. | |
The method returns a Cursor that can then be iterated over. Depending | |
on the method call pattern, the iteration over all documents can reflect | |
collection mutations occurred since the `find` method was called, or not. | |
In cases where the cursor reflects mutations in real-time, it will iterate | |
over cursors in an approximate way (i.e. exhibiting occasional skipped | |
or duplicate documents). This happens when making use of the `sort` | |
option in a non-vector-search manner. | |
Args: | |
filter: a predicate expressed as a dictionary according to the | |
Data API filter syntax. Examples are: | |
{} | |
{"name": "John"} | |
{"price": {"$le": 100}} | |
{"$and": [{"name": "John"}, {"price": {"$le": 100}}]} | |
See the Data API documentation for the full set of operators. | |
projection: used to select a subset of fields in the documents being | |
returned. The projection can be: an iterable over the field names | |
to return; a dictionary {field_name: True} to positively select | |
certain fields; or a dictionary {field_name: False} if one wants | |
to discard some fields from the response. | |
The default is to return the whole documents. | |
skip: with this integer parameter, what would be the first `skip` | |
documents returned by the query are discarded, and the results | |
start from the (skip+1)-th document. | |
This parameter can be used only in conjunction with an explicit | |
`sort` criterion of the ascending/descending type (i.e. it cannot | |
be used when not sorting, nor with vector-based ANN search). | |
limit: this (integer) parameter sets a limit over how many documents | |
are returned. Once `limit` is reached (or the cursor is exhausted | |
for lack of matching documents), nothing more is returned. | |
vector: a suitable vector, i.e. a list of float numbers of the appropriate | |
dimensionality, to perform vector search (i.e. ANN, | |
or "approximate nearest-neighbours" search). | |
When running similarity search on a collection, no other sorting | |
criteria can be specified. Moreover, there is an upper bound | |
to the number of documents that can be returned. For details, | |
see the Note about upper bounds and the Data API documentation. | |
vectorize: a string to be made into a vector to perform vector search. | |
This can be supplied in (exclusive) alternative to `vector`, | |
provided such a service is configured for the collection, | |
and achieves the same effect. | |
NOTE: This feature is under current development. | |
include_similarity: a boolean to request the numeric value of the | |
similarity to be returned as an added "$similarity" key in each | |
returned document. Can only be used for vector ANN search, i.e. | |
when either `vector` is supplied or the `sort` parameter has the | |
shape {"$vector": ...}. | |
sort: with this dictionary parameter one can control the order | |
the documents are returned. See the Note about sorting, as well as | |
the one about upper bounds, for details. | |
max_time_ms: a timeout, in milliseconds, for each single one | |
of the underlying HTTP requests used to fetch documents as the | |
cursor is iterated over. | |
Returns: | |
a Cursor object representing iterations over the matching documents | |
(see the Cursor object for how to use it. The simplest thing is to | |
run a for loop: `for document in collection.sort(...):`). | |
Examples: | |
>>> filter = {"seq": {"$exists": True}} | |
>>> for doc in my_coll.find(filter, projection={"seq": True}, limit=5): | |
... print(doc["seq"]) | |
... | |
37 | |
35 | |
10 | |
36 | |
27 | |
>>> cursor1 = my_coll.find( | |
... {}, | |
... limit=4, | |
... sort={"seq": astrapy.constants.SortDocuments.DESCENDING}, | |
... ) | |
>>> [doc["_id"] for doc in cursor1] | |
['97e85f81-...', '1581efe4-...', '...', '...'] | |
>>> cursor2 = my_coll.find({}, limit=3) | |
>>> cursor2.distinct("seq") | |
[37, 35, 10] | |
>>> my_coll.insert_many([ | |
... {"tag": "A", "$vector": [4, 5]}, | |
... {"tag": "B", "$vector": [3, 4]}, | |
... {"tag": "C", "$vector": [3, 2]}, | |
... {"tag": "D", "$vector": [4, 1]}, | |
... {"tag": "E", "$vector": [2, 5]}, | |
... ]) | |
>>> ann_tags = [ | |
... document["tag"] | |
... for document in my_coll.find( | |
... {}, | |
... limit=3, | |
... vector=[3, 3], | |
... ) | |
... ] | |
>>> ann_tags | |
['A', 'B', 'C'] | |
# (assuming the collection has metric VectorMetric.COSINE) | |
Note: | |
The following are example values for the `sort` parameter. | |
When no particular order is required: | |
sort={} # (default when parameter not provided) | |
When sorting by a certain value in ascending/descending order: | |
sort={"field": SortDocuments.ASCENDING} | |
sort={"field": SortDocuments.DESCENDING} | |
When sorting first by "field" and then by "subfield" | |
(while modern Python versions preserve the order of dictionaries, | |
it is suggested for clarity to employ a `collections.OrderedDict` | |
in these cases): | |
sort={ | |
"field": SortDocuments.ASCENDING, | |
"subfield": SortDocuments.ASCENDING, | |
} | |
When running a vector similarity (ANN) search: | |
sort={"$vector": [0.4, 0.15, -0.5]} | |
Note: | |
Some combinations of arguments impose an implicit upper bound on the | |
number of documents that are returned by the Data API. More specifically: | |
(a) Vector ANN searches cannot return more than a number of documents | |
that at the time of writing is set to 1000 items. | |
(b) When using a sort criterion of the ascending/descending type, | |
the Data API will return a smaller number of documents, set to 20 | |
at the time of writing, and stop there. The returned documents are | |
the top results across the whole collection according to the requested | |
criterion. | |
These provisions should be kept in mind even when subsequently running | |
a command such as `.distinct()` on a cursor. | |
Note: | |
When not specifying sorting criteria at all (by vector or otherwise), | |
the cursor can scroll through an arbitrary number of documents as | |
the Data API and the client periodically exchange new chunks of documents. | |
It should be noted that the behavior of the cursor in the case documents | |
have been added/removed after the `find` was started depends on database | |
internals and it is not guaranteed, nor excluded, that such "real-time" | |
changes in the data would be picked up by the cursor. | |
""" | |
_sort = _collate_vector_to_sort(sort, vector, vectorize) | |
if include_similarity is not None and not _is_vector_sort(_sort): | |
raise ValueError( | |
"Cannot use `include_similarity` when not searching through `vector`." | |
) | |
return ( | |
Cursor( | |
collection=self, | |
filter=filter, | |
projection=projection, | |
max_time_ms=max_time_ms, | |
overall_max_time_ms=None, | |
) | |
.skip(skip) | |
.limit(limit) | |
.sort(_sort) | |
.include_similarity(include_similarity) | |
) | |
def find_one( | |
self, | |
filter: Optional[FilterType] = None, | |
*, | |
projection: Optional[ProjectionType] = None, | |
vector: Optional[VectorType] = None, | |
vectorize: Optional[str] = None, | |
include_similarity: Optional[bool] = None, | |
sort: Optional[SortType] = None, | |
max_time_ms: Optional[int] = None, | |
) -> Union[DocumentType, None]: | |
""" | |
Run a search, returning the first document in the collection that matches | |
provided filters, if any is found. | |
Args: | |
filter: a predicate expressed as a dictionary according to the | |
Data API filter syntax. Examples are: | |
{} | |
{"name": "John"} | |
{"price": {"$le": 100}} | |
{"$and": [{"name": "John"}, {"price": {"$le": 100}}]} | |
See the Data API documentation for the full set of operators. | |
projection: used to select a subset of fields in the documents being | |
returned. The projection can be: an iterable over the field names | |
to return; a dictionary {field_name: True} to positively select | |
certain fields; or a dictionary {field_name: False} if one wants | |
to discard some fields from the response. | |
The default is to return the whole documents. | |
vector: a suitable vector, i.e. a list of float numbers of the appropriate | |
dimensionality, to perform vector search (i.e. ANN, | |
or "approximate nearest-neighbours" search), extracting the most | |
similar document in the collection matching the filter. | |
This parameter cannot be used together with `sort`. | |
See the `find` method for more details on this parameter. | |
vectorize: a string to be made into a vector to perform vector search. | |
This can be supplied in (exclusive) alternative to `vector`, | |
provided such a service is configured for the collection, | |
and achieves the same effect. | |
NOTE: This feature is under current development. | |
include_similarity: a boolean to request the numeric value of the | |
similarity to be returned as an added "$similarity" key in the | |
returned document. Can only be used for vector ANN search, i.e. | |
when either `vector` is supplied or the `sort` parameter has the | |
shape {"$vector": ...}. | |
sort: with this dictionary parameter one can control the order | |
the documents are returned. See the Note about sorting for details. | |
max_time_ms: a timeout, in milliseconds, for the underlying HTTP request. | |
Returns: | |
a dictionary expressing the required document, otherwise None. | |
Examples: | |
>>> my_coll.find_one({}) | |
{'_id': '68d1e515-...', 'seq': 37} | |
>>> my_coll.find_one({"seq": 10}) | |
{'_id': 'd560e217-...', 'seq': 10} | |
>>> my_coll.find_one({"seq": 1011}) | |
>>> # (returns None for no matches) | |
>>> my_coll.find_one({}, projection={"seq": False}) | |
{'_id': '68d1e515-...'} | |
>>> my_coll.find_one( | |
... {}, | |
... sort={"seq": astrapy.constants.SortDocuments.DESCENDING}, | |
... ) | |
{'_id': '97e85f81-...', 'seq': 69} | |
>>> my_coll.find_one({}, vector=[1, 0]) | |
{'_id': '...', 'tag': 'D', '$vector': [4.0, 1.0]} | |
Note: | |
See the `find` method for more details on the accepted parameters | |
(whereas `skip` and `limit` are not valid parameters for `find_one`). | |
""" | |
fo_cursor = self.find( | |
filter=filter, | |
projection=projection, | |
skip=None, | |
limit=1, | |
vector=vector, | |
vectorize=vectorize, | |
include_similarity=include_similarity, | |
sort=sort, | |
max_time_ms=max_time_ms, | |
) | |
try: | |
document = fo_cursor.__next__() | |
return document # type: ignore[no-any-return] | |
except StopIteration: | |
return None | |
def distinct( | |
self, | |
key: str, | |
*, | |
filter: Optional[FilterType] = None, | |
max_time_ms: Optional[int] = None, | |
) -> List[Any]: | |
""" | |
Return a list of the unique values of `key` across the documents | |
in the collection that match the provided filter. | |
Args: | |
key: the name of the field whose value is inspected across documents. | |
Keys can use dot-notation to descend to deeper document levels. | |
Example of acceptable `key` values: | |
"field" | |
"field.subfield" | |
"field.3" | |
"field.3.subfield" | |
If lists are encountered and no numeric index is specified, | |
all items in the list are visited. | |
filter: a predicate expressed as a dictionary according to the | |
Data API filter syntax. Examples are: | |
{} | |
{"name": "John"} | |
{"price": {"$le": 100}} | |
{"$and": [{"name": "John"}, {"price": {"$le": 100}}]} | |
See the Data API documentation for the full set of operators. | |
max_time_ms: a timeout, in milliseconds, for the operation. | |
Returns: | |
a list of all different values for `key` found across the documents | |
that match the filter. The result list has no repeated items. | |
Example: | |
>>> my_coll.insert_many( | |
... [ | |
... {"name": "Marco", "food": ["apple", "orange"], "city": "Helsinki"}, | |
... {"name": "Emma", "food": {"likes_fruit": True, "allergies": []}}, | |
... ] | |
... ) | |
InsertManyResult(raw_results=..., inserted_ids=['c5b99f37-...', 'd6416321-...']) | |
>>> my_coll.distinct("name") | |
['Marco', 'Emma'] | |
>>> my_coll.distinct("city") | |
['Helsinki'] | |
>>> my_coll.distinct("food") | |
['apple', 'orange', {'likes_fruit': True, 'allergies': []}] | |
>>> my_coll.distinct("food.1") | |
['orange'] | |
>>> my_coll.distinct("food.allergies") | |
[] | |
>>> my_coll.distinct("food.likes_fruit") | |
[True] | |
Note: | |
It must be kept in mind that `distinct` is a client-side operation, | |
which effectively browses all required documents using the logic | |
of the `find` method and collects the unique values found for `key`. | |
As such, there may be performance, latency and ultimately | |
billing implications if the amount of matching documents is large. | |
Note: | |
For details on the behaviour of "distinct" in conjunction with | |
real-time changes in the collection contents, see the | |
Note of the `find` command. | |
""" | |
f_cursor = Cursor( | |
collection=self, | |
filter=filter, | |
projection={key: True}, | |
max_time_ms=None, | |
overall_max_time_ms=max_time_ms, | |
) | |
return f_cursor.distinct(key) # type: ignore[no-any-return] | |
def count_documents( | |
self, | |
filter: Dict[str, Any], | |
*, | |
upper_bound: int, | |
max_time_ms: Optional[int] = None, | |
) -> int: | |
""" | |
Count the documents in the collection matching the specified filter. | |
Args: | |
filter: a predicate expressed as a dictionary according to the | |
Data API filter syntax. Examples are: | |
{} | |
{"name": "John"} | |
{"price": {"$le": 100}} | |
{"$and": [{"name": "John"}, {"price": {"$le": 100}}]} | |
See the Data API documentation for the full set of operators. | |
upper_bound: a required ceiling on the result of the count operation. | |
If the actual number of documents exceeds this value, | |
an exception will be raised. | |
Furthermore, if the actual number of documents exceeds the maximum | |
count that the Data API can reach (regardless of upper_bound), | |
an exception will be raised. | |
max_time_ms: a timeout, in milliseconds, for the underlying HTTP request. | |
Returns: | |
the exact count of matching documents. | |
Example: | |
>>> my_coll.insert_many([{"seq": i} for i in range(20)]) | |
InsertManyResult(...) | |
>>> my_coll.count_documents({}, upper_bound=100) | |
20 | |
>>> my_coll.count_documents({"seq":{"$gt": 15}}, upper_bound=100) | |
4 | |
>>> my_coll.count_documents({}, upper_bound=10) | |
Traceback (most recent call last): | |
... ... | |
astrapy.exceptions.TooManyDocumentsToCountException | |
Note: | |
Count operations are expensive: for this reason, the best practice | |
is to provide a reasonable `upper_bound` according to the caller | |
expectations. Moreover, indiscriminate usage of count operations | |
for sizeable amounts of documents (i.e. in the thousands and more) | |
is discouraged in favor of alternative application-specific solutions. | |
Keep in mind that the Data API has a hard upper limit on the amount | |
of documents it will count, and that an exception will be thrown | |
by this method if this limit is encountered. | |
""" | |
logger.info("calling count_documents") | |
cd_response = self._astra_db_collection.count_documents( | |
filter=filter, | |
timeout_info=base_timeout_info(max_time_ms), | |
) | |
logger.info("finished calling count_documents") | |
if "count" in cd_response.get("status", {}): | |
count: int = cd_response["status"]["count"] | |
if cd_response["status"].get("moreData", False): | |
raise TooManyDocumentsToCountException( | |
text=f"Document count exceeds {count}, the maximum allowed by the server", | |
server_max_count_exceeded=True, | |
) | |
else: | |
if count > upper_bound: | |
raise TooManyDocumentsToCountException( | |
text="Document count exceeds required upper bound", | |
server_max_count_exceeded=False, | |
) | |
else: | |
return count | |
else: | |
raise DataAPIFaultyResponseException( | |
text="Faulty response from count_documents API command.", | |
raw_response=cd_response, | |
) | |
def estimated_document_count( | |
self, | |
*, | |
max_time_ms: Optional[int] = None, | |
) -> int: | |
""" | |
Query the API server for an estimate of the document count in the collection. | |
Contrary to `count_documents`, this method has no filtering parameters. | |
Args: | |
max_time_ms: a timeout, in milliseconds, for the underlying HTTP request. | |
Returns: | |
a server-provided estimate count of the documents in the collection. | |
Example: | |
>>> my_coll.estimated_document_count() | |
35700 | |
""" | |
ed_response = self.command( | |
{"estimatedDocumentCount": {}}, | |
max_time_ms=max_time_ms, | |
) | |
if "count" in ed_response.get("status", {}): | |
count: int = ed_response["status"]["count"] | |
return count | |
else: | |
raise DataAPIFaultyResponseException( | |
text="Faulty response from estimated_document_count API command.", | |
raw_response=ed_response, | |
) | |
def find_one_and_replace( | |
self, | |
filter: Dict[str, Any], | |
replacement: DocumentType, | |
*, | |
projection: Optional[ProjectionType] = None, | |
vector: Optional[VectorType] = None, | |
vectorize: Optional[str] = None, | |
sort: Optional[SortType] = None, | |
upsert: bool = False, | |
return_document: str = ReturnDocument.BEFORE, | |
max_time_ms: Optional[int] = None, | |
) -> Union[DocumentType, None]: | |
""" | |
Find a document on the collection and replace it entirely with a new one, | |
optionally inserting a new one if no match is found. | |
Args: | |
filter: a predicate expressed as a dictionary according to the | |
Data API filter syntax. Examples are: | |
{} | |
{"name": "John"} | |
{"price": {"$le": 100}} | |
{"$and": [{"name": "John"}, {"price": {"$le": 100}}]} | |
See the Data API documentation for the full set of operators. | |
replacement: the new document to write into the collection. | |
projection: used to select a subset of fields in the document being | |
returned. The projection can be: an iterable over the field names | |
to return; a dictionary {field_name: True} to positively select | |
certain fields; or a dictionary {field_name: False} if one wants | |
to discard some fields from the response. | |
The default is to return the whole documents. | |
vector: a suitable vector, i.e. a list of float numbers of the appropriate | |
dimensionality, to use vector search (i.e. ANN, | |
or "approximate nearest-neighbours" search), as the sorting criterion. | |
In this way, the matched document (if any) will be the one | |
that is most similar to the provided vector. | |
This parameter cannot be used together with `sort`. | |
See the `find` method for more details on this parameter. | |
vectorize: a string to be made into a vector to perform vector search. | |
This can be supplied in (exclusive) alternative to `vector`, | |
provided such a service is configured for the collection, | |
and achieves the same effect. | |
NOTE: This feature is under current development. | |
sort: with this dictionary parameter one can control the sorting | |
order of the documents matching the filter, effectively | |
determining what document will come first and hence be the | |
replaced one. See the `find` method for more on sorting. | |
upsert: this parameter controls the behavior in absence of matches. | |
If True, `replacement` is inserted as a new document | |
if no matches are found on the collection. If False, | |
the operation silently does nothing in case of no matches. | |
return_document: a flag controlling what document is returned: | |
if set to `ReturnDocument.BEFORE`, or the string "before", | |
the document found on database is returned; if set to | |
`ReturnDocument.AFTER`, or the string "after", the new | |
document is returned. The default is "before". | |
max_time_ms: a timeout, in milliseconds, for the underlying HTTP request. | |
Returns: | |
A document (or a projection thereof, as required), either the one | |
before the replace operation or the one after that. | |
Alternatively, the method returns None to represent | |
that no matching document was found, or that no replacement | |
was inserted (depending on the `return_document` parameter). | |
Example: | |
>>> my_coll.insert_one({"_id": "rule1", "text": "all animals are equal"}) | |
InsertOneResult(...) | |
>>> my_coll.find_one_and_replace( | |
... {"_id": "rule1"}, | |
... {"text": "some animals are more equal!"}, | |
... ) | |
{'_id': 'rule1', 'text': 'all animals are equal'} | |
>>> my_coll.find_one_and_replace( | |
... {"text": "some animals are more equal!"}, | |
... {"text": "and the pigs are the rulers"}, | |
... return_document=astrapy.constants.ReturnDocument.AFTER, | |
... ) | |
{'_id': 'rule1', 'text': 'and the pigs are the rulers'} | |
>>> my_coll.find_one_and_replace( | |
... {"_id": "rule2"}, | |
... {"text": "F=ma^2"}, | |
... return_document=astrapy.constants.ReturnDocument.AFTER, | |
... ) | |
>>> # (returns None for no matches) | |
>>> my_coll.find_one_and_replace( | |
... {"_id": "rule2"}, | |
... {"text": "F=ma"}, | |
... upsert=True, | |
... return_document=astrapy.constants.ReturnDocument.AFTER, | |
... projection={"_id": False}, | |
... ) | |
{'text': 'F=ma'} | |
""" | |
_sort = _collate_vector_to_sort(sort, vector, vectorize) | |
options = { | |
"returnDocument": return_document, | |
"upsert": upsert, | |
} | |
logger.info(f"calling find_one_and_replace on '{self.name}'") | |
fo_response = self._astra_db_collection.find_one_and_replace( | |
replacement=replacement, | |
filter=filter, | |
projection=normalize_optional_projection(projection), | |
sort=_sort, | |
options=options, | |
timeout_info=base_timeout_info(max_time_ms), | |
) | |
logger.info(f"finished calling find_one_and_replace on '{self.name}'") | |
if "document" in fo_response.get("data", {}): | |
ret_document = fo_response.get("data", {}).get("document") | |
if ret_document is None: | |
return None | |
else: | |
return ret_document # type: ignore[no-any-return] | |
else: | |
raise DataAPIFaultyResponseException( | |
text="Faulty response from find_one_and_replace API command.", | |
raw_response=fo_response, | |
) | |
def replace_one( | |
self, | |
filter: Dict[str, Any], | |
replacement: DocumentType, | |
*, | |
vector: Optional[VectorType] = None, | |
vectorize: Optional[str] = None, | |
sort: Optional[SortType] = None, | |
upsert: bool = False, | |
max_time_ms: Optional[int] = None, | |
) -> UpdateResult: | |
""" | |
Replace a single document on the collection with a new one, | |
optionally inserting a new one if no match is found. | |
Args: | |
filter: a predicate expressed as a dictionary according to the | |
Data API filter syntax. Examples are: | |
{} | |
{"name": "John"} | |
{"price": {"$le": 100}} | |
{"$and": [{"name": "John"}, {"price": {"$le": 100}}]} | |
See the Data API documentation for the full set of operators. | |
replacement: the new document to write into the collection. | |
vector: a suitable vector, i.e. a list of float numbers of the appropriate | |
dimensionality, to use vector search (i.e. ANN, | |
or "approximate nearest-neighbours" search), as the sorting criterion. | |
In this way, the matched document (if any) will be the one | |
that is most similar to the provided vector. | |
This parameter cannot be used together with `sort`. | |
See the `find` method for more details on this parameter. | |
vectorize: a string to be made into a vector to perform vector search. | |
This can be supplied in (exclusive) alternative to `vector`, | |
provided such a service is configured for the collection, | |
and achieves the same effect. | |
NOTE: This feature is under current development. | |
sort: with this dictionary parameter one can control the sorting | |
order of the documents matching the filter, effectively | |
determining what document will come first and hence be the | |
replaced one. See the `find` method for more on sorting. | |
upsert: this parameter controls the behavior in absence of matches. | |
If True, `replacement` is inserted as a new document | |
if no matches are found on the collection. If False, | |
the operation silently does nothing in case of no matches. | |
max_time_ms: a timeout, in milliseconds, for the underlying HTTP request. | |
Returns: | |
an UpdateResult object summarizing the outcome of the replace operation. | |
Example: | |
>>> my_coll.insert_one({"Marco": "Polo"}) | |
InsertOneResult(...) | |
>>> my_coll.replace_one({"Marco": {"$exists": True}}, {"Buda": "Pest"}) | |
UpdateResult(raw_results=..., update_info={'n': 1, 'updatedExisting': True, 'ok': 1.0, 'nModified': 1}) | |
>>> my_coll.find_one({"Buda": "Pest"}) | |
{'_id': '8424905a-...', 'Buda': 'Pest'} | |
>>> my_coll.replace_one({"Mirco": {"$exists": True}}, {"Oh": "yeah?"}) | |
UpdateResult(raw_results=..., update_info={'n': 0, 'updatedExisting': False, 'ok': 1.0, 'nModified': 0}) | |
>>> my_coll.replace_one({"Mirco": {"$exists": True}}, {"Oh": "yeah?"}, upsert=True) | |
UpdateResult(raw_results=..., update_info={'n': 1, 'updatedExisting': False, 'ok': 1.0, 'nModified': 0, 'upserted': '931b47d6-...'}) | |
""" | |
_sort = _collate_vector_to_sort(sort, vector, vectorize) | |
options = { | |
"upsert": upsert, | |
} | |
logger.info(f"calling find_one_and_replace on '{self.name}'") | |
fo_response = self._astra_db_collection.find_one_and_replace( | |
replacement=replacement, | |
filter=filter, | |
sort=_sort, | |
options=options, | |
timeout_info=base_timeout_info(max_time_ms), | |
) | |
logger.info(f"finished calling find_one_and_replace on '{self.name}'") | |
if "document" in fo_response.get("data", {}): | |
fo_status = fo_response.get("status") or {} | |
_update_info = _prepare_update_info([fo_status]) | |
return UpdateResult( | |
raw_results=[fo_response], | |
update_info=_update_info, | |
) | |
else: | |
raise DataAPIFaultyResponseException( | |
text="Faulty response from find_one_and_replace API command.", | |
raw_response=fo_response, | |
) | |
def find_one_and_update( | |
self, | |
filter: Dict[str, Any], | |
update: Dict[str, Any], | |
*, | |
projection: Optional[ProjectionType] = None, | |
vector: Optional[VectorType] = None, | |
vectorize: Optional[str] = None, | |
sort: Optional[SortType] = None, | |
upsert: bool = False, | |
return_document: str = ReturnDocument.BEFORE, | |
max_time_ms: Optional[int] = None, | |
) -> Union[DocumentType, None]: | |
""" | |
Find a document on the collection and update it as requested, | |
optionally inserting a new one if no match is found. | |
Args: | |
filter: a predicate expressed as a dictionary according to the | |
Data API filter syntax. Examples are: | |
{} | |
{"name": "John"} | |
{"price": {"$le": 100}} | |
{"$and": [{"name": "John"}, {"price": {"$le": 100}}]} | |
See the Data API documentation for the full set of operators. | |
update: the update prescription to apply to the document, expressed | |
as a dictionary as per Data API syntax. Examples are: | |
{"$set": {"field": "value}} | |
{"$inc": {"counter": 10}} | |
{"$unset": {"field": ""}} | |
See the Data API documentation for the full syntax. | |
projection: used to select a subset of fields in the document being | |
returned. The projection can be: an iterable over the field names | |
to return; a dictionary {field_name: True} to positively select | |
certain fields; or a dictionary {field_name: False} if one wants | |
to discard some fields from the response. | |
The default is to return the whole documents. | |
vector: a suitable vector, i.e. a list of float numbers of the appropriate | |
dimensionality, to use vector search (i.e. ANN, | |
or "approximate nearest-neighbours" search), as the sorting criterion. | |
In this way, the matched document (if any) will be the one | |
that is most similar to the provided vector. | |
This parameter cannot be used together with `sort`. | |
See the `find` method for more details on this parameter. | |
vectorize: a string to be made into a vector to perform vector search. | |
This can be supplied in (exclusive) alternative to `vector`, | |
provided such a service is configured for the collection, | |
and achieves the same effect. | |
NOTE: This feature is under current development. | |
sort: with this dictionary parameter one can control the sorting | |
order of the documents matching the filter, effectively | |
determining what document will come first and hence be the | |
updated one. See the `find` method for more on sorting. | |
upsert: this parameter controls the behavior in absence of matches. | |
If True, a new document (resulting from applying the `update` | |
to an empty document) is inserted if no matches are found on | |
the collection. If False, the operation silently does nothing | |
in case of no matches. | |
return_document: a flag controlling what document is returned: | |
if set to `ReturnDocument.BEFORE`, or the string "before", | |
the document found on database is returned; if set to | |
`ReturnDocument.AFTER`, or the string "after", the new | |
document is returned. The default is "before". | |
max_time_ms: a timeout, in milliseconds, for the underlying HTTP request. | |
Returns: | |
A document (or a projection thereof, as required), either the one | |
before the replace operation or the one after that. | |
Alternatively, the method returns None to represent | |
that no matching document was found, or that no update | |
was applied (depending on the `return_document` parameter). | |
Example: | |
>>> my_coll.insert_one({"Marco": "Polo"}) | |
InsertOneResult(...) | |
>>> my_coll.find_one_and_update( | |
... {"Marco": {"$exists": True}}, | |
... {"$set": {"title": "Mr."}}, | |
... ) | |
{'_id': 'a80106f2-...', 'Marco': 'Polo'} | |
>>> my_coll.find_one_and_update( | |
... {"title": "Mr."}, | |
... {"$inc": {"rank": 3}}, | |
... projection=["title", "rank"], | |
... return_document=astrapy.constants.ReturnDocument.AFTER, | |
... ) | |
{'_id': 'a80106f2-...', 'title': 'Mr.', 'rank': 3} | |
>>> my_coll.find_one_and_update( | |
... {"name": "Johnny"}, | |
... {"$set": {"rank": 0}}, | |
... return_document=astrapy.constants.ReturnDocument.AFTER, | |
... ) | |
>>> # (returns None for no matches) | |
>>> my_coll.find_one_and_update( | |
... {"name": "Johnny"}, | |
... {"$set": {"rank": 0}}, | |
... upsert=True, | |
... return_document=astrapy.constants.ReturnDocument.AFTER, | |
... ) | |
{'_id': 'cb4ef2ab-...', 'name': 'Johnny', 'rank': 0} | |
""" | |
_sort = _collate_vector_to_sort(sort, vector, vectorize) | |
options = { | |
"returnDocument": return_document, | |
"upsert": upsert, | |
} | |
logger.info(f"calling find_one_and_update on '{self.name}'") | |
fo_response = self._astra_db_collection.find_one_and_update( | |
update=update, | |
filter=filter, | |
projection=normalize_optional_projection(projection), | |
sort=_sort, | |
options=options, | |
timeout_info=base_timeout_info(max_time_ms), | |
) | |
logger.info(f"finished calling find_one_and_update on '{self.name}'") | |
if "document" in fo_response.get("data", {}): | |
ret_document = fo_response.get("data", {}).get("document") | |
if ret_document is None: | |
return None | |
else: | |
return ret_document # type: ignore[no-any-return] | |
else: | |
raise DataAPIFaultyResponseException( | |
text="Faulty response from find_one_and_update API command.", | |
raw_response=fo_response, | |
) | |
def update_one( | |
self, | |
filter: Dict[str, Any], | |
update: Dict[str, Any], | |
*, | |
vector: Optional[VectorType] = None, | |
vectorize: Optional[str] = None, | |
sort: Optional[SortType] = None, | |
upsert: bool = False, | |
max_time_ms: Optional[int] = None, | |
) -> UpdateResult: | |
""" | |
Update a single document on the collection as requested, | |
optionally inserting a new one if no match is found. | |
Args: | |
filter: a predicate expressed as a dictionary according to the | |
Data API filter syntax. Examples are: | |
{} | |
{"name": "John"} | |
{"price": {"$le": 100}} | |
{"$and": [{"name": "John"}, {"price": {"$le": 100}}]} | |
See the Data API documentation for the full set of operators. | |
update: the update prescription to apply to the document, expressed | |
as a dictionary as per Data API syntax. Examples are: | |
{"$set": {"field": "value}} | |
{"$inc": {"counter": 10}} | |
{"$unset": {"field": ""}} | |
See the Data API documentation for the full syntax. | |
vector: a suitable vector, i.e. a list of float numbers of the appropriate | |
dimensionality, to use vector search (i.e. ANN, | |
or "approximate nearest-neighbours" search), as the sorting criterion. | |
In this way, the matched document (if any) will be the one | |
that is most similar to the provided vector. | |
This parameter cannot be used together with `sort`. | |
See the `find` method for more details on this parameter. | |
vectorize: a string to be made into a vector to perform vector search. | |
This can be supplied in (exclusive) alternative to `vector`, | |
provided such a service is configured for the collection, | |
and achieves the same effect. | |
NOTE: This feature is under current development. | |
sort: with this dictionary parameter one can control the sorting | |
order of the documents matching the filter, effectively | |
determining what document will come first and hence be the | |
updated one. See the `find` method for more on sorting. | |
upsert: this parameter controls the behavior in absence of matches. | |
If True, a new document (resulting from applying the `update` | |
to an empty document) is inserted if no matches are found on | |
the collection. If False, the operation silently does nothing | |
in case of no matches. | |
max_time_ms: a timeout, in milliseconds, for the underlying HTTP request. | |
Returns: | |
an UpdateResult object summarizing the outcome of the update operation. | |
Example: | |
>>> my_coll.insert_one({"Marco": "Polo"}) | |
InsertOneResult(...) | |
>>> my_coll.update_one({"Marco": {"$exists": True}}, {"$inc": {"rank": 3}}) | |
UpdateResult(raw_results=..., update_info={'n': 1, 'updatedExisting': True, 'ok': 1.0, 'nModified': 1}) | |
>>> my_coll.update_one({"Mirko": {"$exists": True}}, {"$inc": {"rank": 3}}) | |
UpdateResult(raw_results=..., update_info={'n': 0, 'updatedExisting': False, 'ok': 1.0, 'nModified': 0}) | |
>>> my_coll.update_one({"Mirko": {"$exists": True}}, {"$inc": {"rank": 3}}, upsert=True) | |
UpdateResult(raw_results=..., update_info={'n': 1, 'updatedExisting': False, 'ok': 1.0, 'nModified': 0, 'upserted': '2a45ff60-...'}) | |
""" | |
_sort = _collate_vector_to_sort(sort, vector, vectorize) | |
options = { | |
"upsert": upsert, | |
} | |
logger.info(f"calling find_one_and_update on '{self.name}'") | |
fo_response = self._astra_db_collection.find_one_and_update( | |
update=update, | |
sort=_sort, | |
filter=filter, | |
options=options, | |
timeout_info=base_timeout_info(max_time_ms), | |
) | |
logger.info(f"finished calling find_one_and_update on '{self.name}'") | |
if "document" in fo_response.get("data", {}): | |
fo_status = fo_response.get("status") or {} | |
_update_info = _prepare_update_info([fo_status]) | |
return UpdateResult( | |
raw_results=[fo_response], | |
update_info=_update_info, | |
) | |
else: | |
raise DataAPIFaultyResponseException( | |
text="Faulty response from find_one_and_update API command.", | |
raw_response=fo_response, | |
) | |
def update_many( | |
self, | |
filter: Dict[str, Any], | |
update: Dict[str, Any], | |
*, | |
upsert: bool = False, | |
max_time_ms: Optional[int] = None, | |
) -> UpdateResult: | |
""" | |
Apply an update operations to all documents matching a condition, | |
optionally inserting one documents in absence of matches. | |
Args: | |
filter: a predicate expressed as a dictionary according to the | |
Data API filter syntax. Examples are: | |
{} | |
{"name": "John"} | |
{"price": {"$le": 100}} | |
{"$and": [{"name": "John"}, {"price": {"$le": 100}}]} | |
See the Data API documentation for the full set of operators. | |
update: the update prescription to apply to the documents, expressed | |
as a dictionary as per Data API syntax. Examples are: | |
{"$set": {"field": "value}} | |
{"$inc": {"counter": 10}} | |
{"$unset": {"field": ""}} | |
See the Data API documentation for the full syntax. | |
upsert: this parameter controls the behavior in absence of matches. | |
If True, a single new document (resulting from applying `update` | |
to an empty document) is inserted if no matches are found on | |
the collection. If False, the operation silently does nothing | |
in case of no matches. | |
max_time_ms: a timeout, in milliseconds, for the operation. | |
Returns: | |
an UpdateResult object summarizing the outcome of the update operation. | |
Example: | |
>>> my_coll.insert_many([{"c": "red"}, {"c": "green"}, {"c": "blue"}]) | |
InsertManyResult(...) | |
>>> my_coll.update_many({"c": {"$ne": "green"}}, {"$set": {"nongreen": True}}) | |
UpdateResult(raw_results=..., update_info={'n': 2, 'updatedExisting': True, 'ok': 1.0, 'nModified': 2}) | |
>>> my_coll.update_many({"c": "orange"}, {"$set": {"is_also_fruit": True}}) | |
UpdateResult(raw_results=..., update_info={'n': 0, 'updatedExisting': False, 'ok': 1.0, 'nModified': 0}) | |
>>> my_coll.update_many( | |
... {"c": "orange"}, | |
... {"$set": {"is_also_fruit": True}}, | |
... upsert=True, | |
... ) | |
UpdateResult(raw_results=..., update_info={'n': 1, 'updatedExisting': False, 'ok': 1.0, 'nModified': 0, 'upserted': '46643050-...'}) | |
Note: | |
Similarly to the case of `find` (see its docstring for more details), | |
running this command while, at the same time, another process is | |
inserting new documents which match the filter of the `update_many` | |
can result in an unpredictable fraction of these documents being updated. | |
In other words, it cannot be easily predicted whether a given | |
newly-inserted document will be picked up by the update_many command or not. | |
""" | |
base_options = { | |
"upsert": upsert, | |
} | |
page_state_options: Dict[str, str] = {} | |
um_responses: List[Dict[str, Any]] = [] | |
um_statuses: List[Dict[str, Any]] = [] | |
must_proceed = True | |
timeout_manager = MultiCallTimeoutManager(overall_max_time_ms=max_time_ms) | |
logger.info(f"starting update_many on '{self.name}'") | |
while must_proceed: | |
options = {**base_options, **page_state_options} | |
logger.info(f"calling update_many on '{self.name}'") | |
this_um_response = self._astra_db_collection.update_many( | |
update=update, | |
filter=filter, | |
options=options, | |
timeout_info=timeout_manager.remaining_timeout_info(), | |
) | |
logger.info(f"finished calling update_many on '{self.name}'") | |
this_um_status = this_um_response.get("status") or {} | |
# | |
# if errors, quit early | |
if this_um_response.get("errors", []): | |
partial_update_info = _prepare_update_info(um_statuses) | |
partial_result = UpdateResult( | |
raw_results=um_responses, | |
update_info=partial_update_info, | |
) | |
all_um_responses = um_responses + [this_um_response] | |
raise UpdateManyException.from_responses( | |
commands=[None for _ in all_um_responses], | |
raw_responses=all_um_responses, | |
partial_result=partial_result, | |
) | |
else: | |
if "status" not in this_um_response: | |
raise DataAPIFaultyResponseException( | |
text="Faulty response from update_many API command.", | |
raw_response=this_um_response, | |
) | |
um_responses.append(this_um_response) | |
um_statuses.append(this_um_status) | |
next_page_state = this_um_status.get("nextPageState") | |
if next_page_state is not None: | |
must_proceed = True | |
page_state_options = {"pageState": next_page_state} | |
else: | |
must_proceed = False | |
page_state_options = {} | |
update_info = _prepare_update_info(um_statuses) | |
logger.info(f"finished update_many on '{self.name}'") | |
return UpdateResult( | |
raw_results=um_responses, | |
update_info=update_info, | |
) | |
def find_one_and_delete( | |
self, | |
filter: Dict[str, Any], | |
*, | |
projection: Optional[ProjectionType] = None, | |
vector: Optional[VectorType] = None, | |
vectorize: Optional[str] = None, | |
sort: Optional[SortType] = None, | |
max_time_ms: Optional[int] = None, | |
) -> Union[DocumentType, None]: | |
""" | |
Find a document in the collection and delete it. The deleted document, | |
however, is the return value of the method. | |
Args: | |
filter: a predicate expressed as a dictionary according to the | |
Data API filter syntax. Examples are: | |
{} | |
{"name": "John"} | |
{"price": {"$le": 100}} | |
{"$and": [{"name": "John"}, {"price": {"$le": 100}}]} | |
See the Data API documentation for the full set of operators. | |
projection: used to select a subset of fields in the document being | |
returned. The projection can be: an iterable over the field names | |
to return; a dictionary {field_name: True} to positively select | |
certain fields; or a dictionary {field_name: False} if one wants | |
to discard some fields from the response. | |
Note that the `_id` field will be returned with the document | |
in any case, regardless of what the provided `projection` requires. | |
The default is to return the whole documents. | |
vector: a suitable vector, i.e. a list of float numbers of the appropriate | |
dimensionality, to use vector search (i.e. ANN, | |
or "approximate nearest-neighbours" search), as the sorting criterion. | |
In this way, the matched document (if any) will be the one | |
that is most similar to the provided vector. | |
This parameter cannot be used together with `sort`. | |
See the `find` method for more details on this parameter. | |
vectorize: a string to be made into a vector to perform vector search. | |
This can be supplied in (exclusive) alternative to `vector`, | |
provided such a service is configured for the collection, | |
and achieves the same effect. | |
NOTE: This feature is under current development. | |
sort: with this dictionary parameter one can control the sorting | |
order of the documents matching the filter, effectively | |
determining what document will come first and hence be the | |
deleted one. See the `find` method for more on sorting. | |
max_time_ms: a timeout, in milliseconds, for the underlying HTTP request. | |
Returns: | |
Either the document (or a projection thereof, as requested), or None | |
if no matches were found in the first place. | |
Example: | |
>>> my_coll.insert_many( | |
... [ | |
... {"species": "swan", "class": "Aves"}, | |
... {"species": "frog", "class": "Amphibia"}, | |
... ], | |
... ) | |
InsertManyResult(...) | |
>>> my_coll.find_one_and_delete( | |
... {"species": {"$ne": "frog"}}, | |
... projection=["species"], | |
... ) | |
{'_id': '5997fb48-...', 'species': 'swan'} | |
>>> my_coll.find_one_and_delete({"species": {"$ne": "frog"}}) | |
>>> # (returns None for no matches) | |
""" | |
_sort = _collate_vector_to_sort(sort, vector, vectorize) | |
_projection = normalize_optional_projection(projection) | |
logger.info(f"calling find_one_and_delete on '{self.name}'") | |
fo_response = self._astra_db_collection.find_one_and_delete( | |
sort=_sort, | |
filter=filter, | |
projection=_projection, | |
timeout_info=base_timeout_info(max_time_ms), | |
) | |
logger.info(f"finished calling find_one_and_delete on '{self.name}'") | |
if "document" in fo_response.get("data", {}): | |
document = fo_response["data"]["document"] | |
return document # type: ignore[no-any-return] | |
else: | |
deleted_count = fo_response.get("status", {}).get("deletedCount") | |
if deleted_count == 0: | |
return None | |
else: | |
raise DataAPIFaultyResponseException( | |
text="Faulty response from find_one_and_delete API command.", | |
raw_response=fo_response, | |
) | |
def delete_one( | |
self, | |
filter: Dict[str, Any], | |
*, | |
vector: Optional[VectorType] = None, | |
vectorize: Optional[str] = None, | |
sort: Optional[SortType] = None, | |
max_time_ms: Optional[int] = None, | |
) -> DeleteResult: | |
""" | |
Delete one document matching a provided filter. | |
This method never deletes more than a single document, regardless | |
of the number of matches to the provided filters. | |
Args: | |
filter: a predicate expressed as a dictionary according to the | |
Data API filter syntax. Examples are: | |
{} | |
{"name": "John"} | |
{"price": {"$le": 100}} | |
{"$and": [{"name": "John"}, {"price": {"$le": 100}}]} | |
See the Data API documentation for the full set of operators. | |
vector: a suitable vector, i.e. a list of float numbers of the appropriate | |
dimensionality, to use vector search (i.e. ANN, | |
or "approximate nearest-neighbours" search), as the sorting criterion. | |
In this way, the matched document (if any) will be the one | |
that is most similar to the provided vector. | |
This parameter cannot be used together with `sort`. | |
See the `find` method for more details on this parameter. | |
vectorize: a string to be made into a vector to perform vector search. | |
This can be supplied in (exclusive) alternative to `vector`, | |
provided such a service is configured for the collection, | |
and achieves the same effect. | |
NOTE: This feature is under current development. | |
sort: with this dictionary parameter one can control the sorting | |
order of the documents matching the filter, effectively | |
determining what document will come first and hence be the | |
deleted one. See the `find` method for more on sorting. | |
max_time_ms: a timeout, in milliseconds, for the underlying HTTP request. | |
Returns: | |
a DeleteResult object summarizing the outcome of the delete operation. | |
Example: | |
>>> my_coll.insert_many([{"seq": 1}, {"seq": 0}, {"seq": 2}]) | |
InsertManyResult(...) | |
>>> my_coll.delete_one({"seq": 1}) | |
DeleteResult(raw_results=..., deleted_count=1) | |
>>> my_coll.distinct("seq") | |
[0, 2] | |
>>> my_coll.delete_one( | |
... {"seq": {"$exists": True}}, | |
... sort={"seq": astrapy.constants.SortDocuments.DESCENDING}, | |
... ) | |
DeleteResult(raw_results=..., deleted_count=1) | |
>>> my_coll.distinct("seq") | |
[0] | |
>>> my_coll.delete_one({"seq": 2}) | |
DeleteResult(raw_results=..., deleted_count=0) | |
""" | |
_sort = _collate_vector_to_sort(sort, vector, vectorize) | |
logger.info(f"calling delete_one_by_predicate on '{self.name}'") | |
do_response = self._astra_db_collection.delete_one_by_predicate( | |
filter=filter, timeout_info=base_timeout_info(max_time_ms), sort=_sort | |
) | |
logger.info(f"finished calling delete_one_by_predicate on '{self.name}'") | |
if "deletedCount" in do_response.get("status", {}): | |
deleted_count = do_response["status"]["deletedCount"] | |
if deleted_count == -1: | |
return DeleteResult( | |
deleted_count=None, | |
raw_results=[do_response], | |
) | |
else: | |
# expected a non-negative integer: | |
return DeleteResult( | |
deleted_count=deleted_count, | |
raw_results=[do_response], | |
) | |
else: | |
raise DataAPIFaultyResponseException( | |
text="Faulty response from delete_one API command.", | |
raw_response=do_response, | |
) | |
def delete_many( | |
self, | |
filter: Dict[str, Any], | |
*, | |
max_time_ms: Optional[int] = None, | |
) -> DeleteResult: | |
""" | |
Delete all documents matching a provided filter. | |
Args: | |
filter: a predicate expressed as a dictionary according to the | |
Data API filter syntax. Examples are: | |
{} | |
{"name": "John"} | |
{"price": {"$le": 100}} | |
{"$and": [{"name": "John"}, {"price": {"$le": 100}}]} | |
See the Data API documentation for the full set of operators. | |
The `delete_many` method does not accept an empty filter: see | |
`delete_all` to completely erase all contents of a collection | |
max_time_ms: a timeout, in milliseconds, for the operation. | |
Returns: | |
a DeleteResult object summarizing the outcome of the delete operation. | |
Example: | |
>>> my_coll.insert_many([{"seq": 1}, {"seq": 0}, {"seq": 2}]) | |
InsertManyResult(...) | |
>>> my_coll.delete_many({"seq": {"$lte": 1}}) | |
DeleteResult(raw_results=..., deleted_count=2) | |
>>> my_coll.distinct("seq") | |
[2] | |
>>> my_coll.delete_many({"seq": {"$lte": 1}}) | |
DeleteResult(raw_results=..., deleted_count=0) | |
Note: | |
This operation is not atomic. Depending on the amount of matching | |
documents, it can keep running (in a blocking way) for a macroscopic | |
time. In that case, new documents that are meanwhile inserted | |
(e.g. from another process/application) will be deleted during | |
the execution of this method call until the collection is devoid | |
of matches. | |
""" | |
if not filter: | |
raise ValueError( | |
"The `filter` parameter to method `delete_many` cannot be " | |
"empty. In order to completely clear the contents of a " | |
"collection, please use the `delete_all` method." | |
) | |
dm_responses: List[Dict[str, Any]] = [] | |
deleted_count = 0 | |
must_proceed = True | |
timeout_manager = MultiCallTimeoutManager(overall_max_time_ms=max_time_ms) | |
logger.info(f"starting delete_many on '{self.name}'") | |
while must_proceed: | |
logger.info(f"calling delete_many on '{self.name}'") | |
this_dm_response = self._astra_db_collection.delete_many( | |
filter=filter, | |
skip_error_check=True, | |
timeout_info=timeout_manager.remaining_timeout_info(), | |
) | |
logger.info(f"finished calling delete_many on '{self.name}'") | |
# if errors, quit early | |
if this_dm_response.get("errors", []): | |
partial_result = DeleteResult( | |
deleted_count=deleted_count, | |
raw_results=dm_responses, | |
) | |
all_dm_responses = dm_responses + [this_dm_response] | |
raise DeleteManyException.from_responses( | |
commands=[None for _ in all_dm_responses], | |
raw_responses=all_dm_responses, | |
partial_result=partial_result, | |
) | |
else: | |
this_dc = this_dm_response.get("status", {}).get("deletedCount") | |
if this_dc is None or this_dc < 0: | |
raise DataAPIFaultyResponseException( | |
text="Faulty response from delete_many API command.", | |
raw_response=this_dm_response, | |
) | |
dm_responses.append(this_dm_response) | |
deleted_count += this_dc | |
must_proceed = this_dm_response.get("status", {}).get("moreData", False) | |
logger.info(f"finished delete_many on '{self.name}'") | |
return DeleteResult( | |
deleted_count=deleted_count, | |
raw_results=dm_responses, | |
) | |
def delete_all(self, *, max_time_ms: Optional[int] = None) -> Dict[str, Any]: | |
""" | |
Delete all documents in a collection. | |
Args: | |
max_time_ms: a timeout, in milliseconds, for the underlying HTTP request. | |
Returns: | |
a dictionary of the form {"ok": 1} to signal successful deletion. | |
Example: | |
>>> my_coll.distinct("seq") | |
[2, 1, 0] | |
>>> my_coll.count_documents({}, upper_bound=100) | |
4 | |
>>> my_coll.delete_all() | |
{'ok': 1} | |
>>> my_coll.count_documents({}, upper_bound=100) | |
0 | |
Note: | |
Use with caution. | |
""" | |
logger.info(f"calling unfiltered delete_many on '{self.name}'") | |
dm_response = self._astra_db_collection.delete_many( | |
filter={}, timeout_info=base_timeout_info(max_time_ms) | |
) | |
logger.info(f"finished calling unfiltered delete_many on '{self.name}'") | |
deleted_count = dm_response["status"]["deletedCount"] | |
if deleted_count == -1: | |
return {"ok": 1} | |
else: | |
raise DataAPIFaultyResponseException( | |
text="Faulty response from delete_many API command.", | |
raw_response=dm_response, | |
) | |
def bulk_write( | |
self, | |
requests: Iterable[BaseOperation], | |
*, | |
ordered: bool = True, | |
concurrency: Optional[int] = None, | |
max_time_ms: Optional[int] = None, | |
) -> BulkWriteResult: | |
""" | |
Execute an arbitrary amount of operations such as inserts, updates, deletes | |
either sequentially or concurrently. | |
This method does not execute atomically, i.e. individual operations are | |
each performed in the same way as the corresponding collection method, | |
and each one is a different and unrelated database mutation. | |
Args: | |
requests: an iterable over concrete subclasses of `BaseOperation`, | |
such as `InsertMany` or `ReplaceOne`. Each such object | |
represents an operation ready to be executed on a collection, | |
and is instantiated by passing the same parameters as one | |
would the corresponding collection method. | |
ordered: whether to launch the `requests` one after the other or | |
in arbitrary order, possibly in a concurrent fashion. For | |
performance reasons, `ordered=False` should be preferred | |
when compatible with the needs of the application flow. | |
concurrency: maximum number of concurrent operations executing at | |
a given time. It cannot be more than one for ordered bulk writes. | |
max_time_ms: a timeout, in milliseconds, for the whole bulk write. | |
Remember that, if the method call times out, then there's no | |
guarantee about what portion of the bulk write has been received | |
and successfully executed by the Data API. | |
Returns: | |
A single BulkWriteResult summarizing the whole list of requested | |
operations. The keys in the map attributes of BulkWriteResult | |
(when present) are the integer indices of the corresponding operation | |
in the `requests` iterable. | |
Example: | |
>>> from astrapy.operations import InsertMany, ReplaceOne | |
>>> op1 = InsertMany([{"a": 1}, {"a": 2}]) | |
>>> op2 = ReplaceOne({"z": 9}, replacement={"z": 9, "replaced": True}, upsert=True) | |
>>> my_coll.bulk_write([op1, op2]) | |
BulkWriteResult(bulk_api_results={0: ..., 1: ...}, deleted_count=0, inserted_count=3, matched_count=0, modified_count=0, upserted_count=1, upserted_ids={1: '2addd676-...'}) | |
>>> my_coll.count_documents({}, upper_bound=100) | |
3 | |
>>> my_coll.distinct("replaced") | |
[True] | |
""" | |
# lazy importing here against circular-import error | |
from astrapy.operations import reduce_bulk_write_results | |
if concurrency is None: | |
if ordered: | |
_concurrency = 1 | |
else: | |
_concurrency = DEFAULT_BULK_WRITE_CONCURRENCY | |
else: | |
_concurrency = concurrency | |
if _concurrency > 1 and ordered: | |
raise ValueError("Cannot run ordered bulk_write concurrently.") | |
timeout_manager = MultiCallTimeoutManager(overall_max_time_ms=max_time_ms) | |
logger.info(f"startng a bulk write on '{self.name}'") | |
if ordered: | |
bulk_write_results: List[BulkWriteResult] = [] | |
for operation_i, operation in enumerate(requests): | |
try: | |
this_bw_result = operation.execute( | |
self, | |
index_in_bulk_write=operation_i, | |
bulk_write_timeout_ms=timeout_manager.remaining_timeout_ms(), | |
) | |
bulk_write_results.append(this_bw_result) | |
except CumulativeOperationException as exc: | |
partial_result = exc.partial_result | |
partial_bw_result = reduce_bulk_write_results( | |
bulk_write_results | |
+ [ | |
partial_result.to_bulk_write_result( | |
index_in_bulk_write=operation_i | |
) | |
] | |
) | |
dar_exception = exc.data_api_response_exception() | |
raise BulkWriteException( | |
text=dar_exception.text, | |
error_descriptors=dar_exception.error_descriptors, | |
detailed_error_descriptors=dar_exception.detailed_error_descriptors, | |
partial_result=partial_bw_result, | |
exceptions=[dar_exception], | |
) | |
except DataAPIResponseException as exc: | |
# the cumulative exceptions, with their | |
# partially-done-info, are handled above: | |
# here it's just one-shot d.a.r. exceptions | |
partial_bw_result = reduce_bulk_write_results(bulk_write_results) | |
dar_exception = exc.data_api_response_exception() | |
raise BulkWriteException( | |
text=dar_exception.text, | |
error_descriptors=dar_exception.error_descriptors, | |
detailed_error_descriptors=dar_exception.detailed_error_descriptors, | |
partial_result=partial_bw_result, | |
exceptions=[dar_exception], | |
) | |
full_bw_result = reduce_bulk_write_results(bulk_write_results) | |
logger.info(f"finished a bulk write on '{self.name}'") | |
return full_bw_result | |
else: | |
def _execute_as_either( | |
operation: BaseOperation, operation_i: int | |
) -> Tuple[Optional[BulkWriteResult], Optional[DataAPIResponseException]]: | |
try: | |
ex_result = operation.execute( | |
self, | |
index_in_bulk_write=operation_i, | |
bulk_write_timeout_ms=timeout_manager.remaining_timeout_ms(), | |
) | |
return (ex_result, None) | |
except DataAPIResponseException as exc: | |
return (None, exc) | |
with ThreadPoolExecutor(max_workers=_concurrency) as executor: | |
bulk_write_either_futures = [ | |
executor.submit( | |
_execute_as_either, | |
operation, | |
operation_i, | |
) | |
for operation_i, operation in enumerate(requests) | |
] | |
bulk_write_either_results = [ | |
bulk_write_either_future.result() | |
for bulk_write_either_future in bulk_write_either_futures | |
] | |
# regroup | |
bulk_write_successes = [ | |
bwr for bwr, _ in bulk_write_either_results if bwr | |
] | |
bulk_write_failures = [ | |
bwf for _, bwf in bulk_write_either_results if bwf | |
] | |
if bulk_write_failures: | |
# extract and cumulate | |
partial_results_from_failures = [ | |
failure.partial_result.to_bulk_write_result( | |
index_in_bulk_write=operation_i | |
) | |
for failure in bulk_write_failures | |
if isinstance(failure, CumulativeOperationException) | |
] | |
partial_bw_result = reduce_bulk_write_results( | |
bulk_write_successes + partial_results_from_failures | |
) | |
# raise and recast the first exception | |
all_dar_exceptions = [ | |
bw_failure.data_api_response_exception() | |
for bw_failure in bulk_write_failures | |
] | |
dar_exception = all_dar_exceptions[0] | |
raise BulkWriteException( | |
text=dar_exception.text, | |
error_descriptors=dar_exception.error_descriptors, | |
detailed_error_descriptors=dar_exception.detailed_error_descriptors, | |
partial_result=partial_bw_result, | |
exceptions=all_dar_exceptions, | |
) | |
else: | |
logger.info(f"finished a bulk write on '{self.name}'") | |
return reduce_bulk_write_results(bulk_write_successes) | |
def drop(self, *, max_time_ms: Optional[int] = None) -> Dict[str, Any]: | |
""" | |
Drop the collection, i.e. delete it from the database along with | |
all the documents it contains. | |
Args: | |
max_time_ms: a timeout, in milliseconds, for the underlying HTTP request. | |
Remember there is not guarantee that a request that has | |
timed out us not in fact honored. | |
Returns: | |
a dictionary of the form {"ok": 1} to signal successful deletion. | |
Example: | |
>>> my_coll.find_one({}) | |
{'_id': '...', 'a': 100} | |
>>> my_coll.drop() | |
{'ok': 1} | |
>>> my_coll.find_one({}) | |
Traceback (most recent call last): | |
... ... | |
astrapy.exceptions.DataAPIResponseException: Collection does not exist, collection name: my_collection | |
Note: | |
Use with caution. | |
Note: | |
Once the method succeeds, methods on this object can still be invoked: | |
however, this hardly makes sense as the underlying actual collection | |
is no more. | |
It is responsibility of the developer to design a correct flow | |
which avoids using a deceased collection any further. | |
Note: | |
Once the method succeeds, methods on this object can still be invoked: | |
however, this hardly makes sense as the underlying actual collection | |
is no more. | |
It is responsibility of the developer to design a correct flow | |
which avoids using a deceased collection any further. | |
""" | |
logger.info(f"dropping collection '{self.name}' (self)") | |
drop_result = self.database.drop_collection(self, max_time_ms=max_time_ms) | |
logger.info(f"finished dropping collection '{self.name}' (self)") | |
return drop_result # type: ignore[no-any-return] | |
def command( | |
self, | |
body: Dict[str, Any], | |
*, | |
max_time_ms: Optional[int] = None, | |
) -> Dict[str, Any]: | |
""" | |
Send a POST request to the Data API for this collection with | |
an arbitrary, caller-provided payload. | |
Args: | |
body: a JSON-serializable dictionary, the payload of the request. | |
max_time_ms: a timeout, in milliseconds, for the underlying HTTP request. | |
Returns: | |
a dictionary with the response of the HTTP request. | |
Example: | |
>>> my_coll.command({"countDocuments": {}}) | |
{'status': {'count': 123}} | |
""" | |
logger.info(f"calling command on '{self.name}'") | |
command_result = self.database.command( | |
body=body, | |
namespace=self.namespace, | |
collection_name=self.name, | |
max_time_ms=max_time_ms, | |
) | |
logger.info(f"finished calling command on '{self.name}'") | |
return command_result # type: ignore[no-any-return] | |
class AsyncCollection: | |
""" | |
A Data API collection, the main object to interact with the Data API, | |
especially for DDL operations. | |
This class has a synchronous interface. | |
A Collection is spawned from a Database object, from which it inherits | |
the details on how to reach the API server (endpoint, authentication token). | |
Args: | |
database: a Database object, instantiated earlier. This represents | |
the database the collection belongs to. | |
name: the collection name. This parameter should match an existing | |
collection on the database. | |
namespace: this is the namespace to which the collection belongs. | |
If not specified, the database's working namespace is used. | |
caller_name: name of the application, or framework, on behalf of which | |
the Data API calls are performed. This ends up in the request user-agent. | |
caller_version: version of the caller. | |
Examples: | |
>>> from astrapy import DataAPIClient, AsyncCollection | |
>>> my_client = astrapy.DataAPIClient("AstraCS:...") | |
>>> my_async_db = my_client.get_async_database_by_api_endpoint( | |
... "https://01234567-....apps.astra.datastax.com" | |
... ) | |
>>> my_async_coll_1 = AsyncCollection(database=my_async_db, name="my_collection") | |
>>> my_async coll_2 = asyncio.run(my_async_db.create_collection( | |
... "my_v_collection", | |
... dimension=3, | |
... metric="cosine", | |
... )) | |
>>> my_async_coll_3a = asyncio.run(my_async_db.get_collection( | |
... "my_already_existing_collection", | |
... )) | |
>>> my_async_coll_3b = my_async_db.my_already_existing_collection | |
>>> my_async_coll_3c = my_async_db["my_already_existing_collection"] | |
Note: | |
creating an instance of Collection does not trigger actual creation | |
of the collection on the database. The latter should have been created | |
beforehand, e.g. through the `create_collection` method of a Database. | |
""" | |
def __init__( | |
self, | |
database: AsyncDatabase, | |
name: str, | |
*, | |
namespace: Optional[str] = None, | |
caller_name: Optional[str] = None, | |
caller_version: Optional[str] = None, | |
) -> None: | |
self._astra_db_collection: AsyncAstraDBCollection = AsyncAstraDBCollection( | |
collection_name=name, | |
astra_db=database._astra_db, | |
namespace=namespace, | |
caller_name=caller_name, | |
caller_version=caller_version, | |
) | |
# this comes after the above, lets AstraDBCollection resolve namespace | |
self._database = database._copy( | |
namespace=self._astra_db_collection.astra_db.namespace | |
) | |
def __repr__(self) -> str: | |
return ( | |
f'{self.__class__.__name__}(name="{self.name}", ' | |
f'namespace="{self.namespace}", database={self.database})' | |
) | |
def __eq__(self, other: Any) -> bool: | |
if isinstance(other, AsyncCollection): | |
return self._astra_db_collection == other._astra_db_collection | |
else: | |
return False | |
def __call__(self, *pargs: Any, **kwargs: Any) -> None: | |
raise TypeError( | |
f"'{self.__class__.__name__}' object is not callable. If you " | |
f"meant to call the '{self.name}' method on a " | |
f"'{self.database.__class__.__name__}' object " | |
"it is failing because no such method exists." | |
) | |
def _copy( | |
self, | |
*, | |
database: Optional[AsyncDatabase] = None, | |
name: Optional[str] = None, | |
namespace: Optional[str] = None, | |
caller_name: Optional[str] = None, | |
caller_version: Optional[str] = None, | |
) -> AsyncCollection: | |
return AsyncCollection( | |
database=database or self.database._copy(), | |
name=name or self.name, | |
namespace=namespace or self.namespace, | |
caller_name=caller_name or self._astra_db_collection.caller_name, | |
caller_version=caller_version or self._astra_db_collection.caller_version, | |
) | |
def with_options( | |
self, | |
*, | |
name: Optional[str] = None, | |
caller_name: Optional[str] = None, | |
caller_version: Optional[str] = None, | |
) -> AsyncCollection: | |
""" | |
Create a clone of this collection with some changed attributes. | |
Args: | |
name: the name of the collection. This parameter is useful to | |
quickly spawn AsyncCollection instances each pointing to a different | |
collection existing in the same namespace. | |
caller_name: name of the application, or framework, on behalf of which | |
the Data API calls are performed. This ends up in the request user-agent. | |
caller_version: version of the caller. | |
Returns: | |
a new AsyncCollection instance. | |
Example: | |
>>> my_other_async_coll = my_async_coll.with_options( | |
... name="the_other_coll", | |
... caller_name="caller_identity", | |
... ) | |
""" | |
return self._copy( | |
name=name, | |
caller_name=caller_name, | |
caller_version=caller_version, | |
) | |
def to_sync( | |
self, | |
*, | |
database: Optional[Database] = None, | |
name: Optional[str] = None, | |
namespace: Optional[str] = None, | |
caller_name: Optional[str] = None, | |
caller_version: Optional[str] = None, | |
) -> Collection: | |
""" | |
Create a Collection from this one. Save for the arguments | |
explicitly provided as overrides, everything else is kept identical | |
to this collection in the copy (the database is converted into | |
a sync object). | |
Args: | |
database: a Database object, instantiated earlier. | |
This represents the database the new collection belongs to. | |
name: the collection name. This parameter should match an existing | |
collection on the database. | |
namespace: this is the namespace to which the collection belongs. | |
If not specified, the database's working namespace is used. | |
caller_name: name of the application, or framework, on behalf of which | |
the Data API calls are performed. This ends up in the request user-agent. | |
caller_version: version of the caller. | |
Returns: | |
the new copy, a Collection instance. | |
Example: | |
>>> my_async_coll.to_sync().count_documents({}, upper_bound=100) | |
77 | |
""" | |
return Collection( | |
database=database or self.database.to_sync(), | |
name=name or self.name, | |
namespace=namespace or self.namespace, | |
caller_name=caller_name or self._astra_db_collection.caller_name, | |
caller_version=caller_version or self._astra_db_collection.caller_version, | |
) | |
def set_caller( | |
self, | |
caller_name: Optional[str] = None, | |
caller_version: Optional[str] = None, | |
) -> None: | |
""" | |
Set a new identity for the application/framework on behalf of which | |
the Data API calls are performed (the "caller"). | |
Args: | |
caller_name: name of the application, or framework, on behalf of which | |
the Data API calls are performed. This ends up in the request user-agent. | |
caller_version: version of the caller. | |
Example: | |
>>> my_coll.set_caller(caller_name="the_caller", caller_version="0.1.0") | |
""" | |
logger.info(f"setting caller to {caller_name}/{caller_version}") | |
self._astra_db_collection.set_caller( | |
caller_name=caller_name, | |
caller_version=caller_version, | |
) | |
async def options(self, *, max_time_ms: Optional[int] = None) -> CollectionOptions: | |
""" | |
Get the collection options, i.e. its configuration as read from the database. | |
The method issues a request to the Data API each time is invoked, | |
without caching mechanisms: this ensures up-to-date information | |
for usages such as real-time collection validation by the application. | |
Args: | |
max_time_ms: a timeout, in milliseconds, for the underlying HTTP request. | |
Returns: | |
a CollectionOptions instance describing the collection. | |
(See also the database `list_collections` method.) | |
Example: | |
>>> asyncio.run(my_async_coll.options()) | |
CollectionOptions(vector=CollectionVectorOptions(dimension=3, metric='cosine')) | |
""" | |
logger.info(f"getting collections in search of '{self.name}'") | |
self_descriptors = [ | |
coll_desc | |
async for coll_desc in self.database.list_collections( | |
max_time_ms=max_time_ms | |
) | |
if coll_desc.name == self.name | |
] | |
logger.info(f"finished getting collections in search of '{self.name}'") | |
if self_descriptors: | |
return self_descriptors[0].options # type: ignore[no-any-return] | |
else: | |
raise CollectionNotFoundException( | |
text=f"Collection {self.namespace}.{self.name} not found.", | |
namespace=self.namespace, | |
collection_name=self.name, | |
) | |
def info(self) -> CollectionInfo: | |
""" | |
Information on the collection (name, location, database), in the | |
form of a CollectionInfo object. | |
Not to be confused with the collection `options` method (related | |
to the collection internal configuration). | |
Example: | |
>>> my_async_coll.info().database_info.region | |
'us-east1' | |
>>> my_async_coll.info().full_name | |
'default_keyspace.my_v_collection' | |
Note: | |
the returned CollectionInfo wraps, among other things, | |
the database information: as such, calling this method | |
triggers the same-named method of a Database object (which, in turn, | |
performs a HTTP request to the DevOps API). | |
See the documentation for `Database.info()` for more details. | |
""" | |
return CollectionInfo( | |
database_info=self.database.info(), | |
namespace=self.namespace, | |
name=self.name, | |
full_name=self.full_name, | |
) | |
def database(self) -> AsyncDatabase: | |
""" | |
a Database object, the database this collection belongs to. | |
Example: | |
>>> my_async_coll.database.name | |
'quicktest' | |
""" | |
return self._database | |
def namespace(self) -> str: | |
""" | |
The namespace this collection is in. | |
Example: | |
>>> my_async_coll.database.namespace | |
'default_keyspace' | |
""" | |
return self.database.namespace | |
def name(self) -> str: | |
""" | |
The name of this collection. | |
Example: | |
>>> my_async_coll.name | |
'my_v_collection' | |
""" | |
# type hint added as for some reason the typechecker gets lost | |
return self._astra_db_collection.collection_name # type: ignore[no-any-return, has-type] | |
def full_name(self) -> str: | |
""" | |
The fully-qualified collection name within the database, | |
in the form "namespace.collection_name". | |
Example: | |
>>> my_async_coll.full_name | |
'default_keyspace.my_v_collection' | |
""" | |
return f"{self.namespace}.{self.name}" | |
async def insert_one( | |
self, | |
document: DocumentType, | |
*, | |
vector: Optional[VectorType] = None, | |
vectorize: Optional[str] = None, | |
max_time_ms: Optional[int] = None, | |
) -> InsertOneResult: | |
""" | |
Insert a single document in the collection in an atomic operation. | |
Args: | |
document: the dictionary expressing the document to insert. | |
The `_id` field of the document can be left out, in which | |
case it will be created automatically. | |
vector: a vector (a list of numbers appropriate for the collection) | |
for the document. Passing this parameter is equivalent to | |
providing the vector in the "$vector" field of the document itself, | |
however the two are mutually exclusive. | |
vectorize: a string to be made into a vector, if such a service | |
is configured for the collection. Passing this parameter is | |
equivalent to providing a `$vectorize` field in the document itself, | |
however the two are mutually exclusive. | |
Moreover, this parameter cannot coexist with `vector`. | |
NOTE: This feature is under current development. | |
max_time_ms: a timeout, in milliseconds, for the underlying HTTP request. | |
Returns: | |
an InsertOneResult object. | |
Example: | |
>>> async def write_and_count(acol: AsyncCollection) -> None: | |
... count0 = await acol.count_documents({}, upper_bound=10) | |
... print("count0", count0) | |
... await acol.insert_one( | |
... { | |
... "age": 30, | |
... "name": "Smith", | |
... "food": ["pear", "peach"], | |
... "likes_fruit": True, | |
... }, | |
... ) | |
... await acol.insert_one({"_id": "user-123", "age": 50, "name": "Maccio"}) | |
... count1 = await acol.count_documents({}, upper_bound=10) | |
... print("count1", count1) | |
... | |
>>> asyncio.run(write_and_count(my_async_coll)) | |
count0 0 | |
count1 2 | |
>>> asyncio.run(my_async_coll.insert_one({"tag": v"}, vector=[10, 11])) | |
InsertOneResult(...) | |
Note: | |
If an `_id` is explicitly provided, which corresponds to a document | |
that exists already in the collection, an error is raised and | |
the insertion fails. | |
""" | |
_document = _collate_vector_to_document(document, vector, vectorize) | |
logger.info(f"inserting one document in '{self.name}'") | |
io_response = await self._astra_db_collection.insert_one( | |
_document, | |
timeout_info=base_timeout_info(max_time_ms), | |
) | |
logger.info(f"finished inserting one document in '{self.name}'") | |
if "insertedIds" in io_response.get("status", {}): | |
if io_response["status"]["insertedIds"]: | |
inserted_id = io_response["status"]["insertedIds"][0] | |
return InsertOneResult( | |
raw_results=[io_response], | |
inserted_id=inserted_id, | |
) | |
else: | |
raise ValueError( | |
"Could not complete a insert_one operation. " | |
f"(gotten '${json.dumps(io_response)}')" | |
) | |
else: | |
raise ValueError( | |
"Could not complete a insert_one operation. " | |
f"(gotten '${json.dumps(io_response)}')" | |
) | |
async def insert_many( | |
self, | |
documents: Iterable[DocumentType], | |
*, | |
vectors: Optional[Iterable[Optional[VectorType]]] = None, | |
vectorize: Optional[Iterable[Optional[str]]] = None, | |
ordered: bool = True, | |
chunk_size: Optional[int] = None, | |
concurrency: Optional[int] = None, | |
max_time_ms: Optional[int] = None, | |
) -> InsertManyResult: | |
""" | |
Insert a list of documents into the collection. | |
This is not an atomic operation. | |
Args: | |
documents: an iterable of dictionaries, each a document to insert. | |
Documents may specify their `_id` field or leave it out, in which | |
case it will be added automatically. | |
vectors: an optional list of vectors (as many vectors as the provided | |
documents) to associate to the documents when inserting. | |
Each vector is added to the corresponding document prior to | |
insertion on database. The list can be a mixture of None and vectors, | |
in which case some documents will not have a vector, unless it is | |
specified in their "$vector" field already. | |
Passing vectors this way is indeed equivalent to the "$vector" field | |
of the documents, however the two are mutually exclusive. | |
vectorize: an optional list of strings to be made into as many vectors | |
(one per document), if such a service is configured for the collection. | |
Passing this parameter is equivalent to providing a `$vectorize` | |
field in the documents themselves, however the two are mutually exclusive. | |
For any given document, this parameter cannot coexist with the | |
corresponding `vector` entry. | |
NOTE: This feature is under current development. | |
ordered: if True (default), the insertions are processed sequentially. | |
If False, they can occur in arbitrary order and possibly concurrently. | |
chunk_size: how many documents to include in a single API request. | |
Exceeding the server maximum allowed value results in an error. | |
Leave it unspecified (recommended) to use the system default. | |
concurrency: maximum number of concurrent requests to the API at | |
a given time. It cannot be more than one for ordered insertions. | |
max_time_ms: a timeout, in milliseconds, for the operation. | |
Returns: | |
an InsertManyResult object. | |
Examples: | |
>>> async def write_and_count(acol: AsyncCollection) -> None: | |
... count0 = await acol.count_documents({}, upper_bound=10) | |
... print("count0", count0) | |
... im_result1 = await acol.insert_many( | |
... [ | |
... {"a": 10}, | |
... {"a": 5}, | |
... {"b": [True, False, False]}, | |
... ], | |
... ) | |
... print("inserted1", im_result1.inserted_ids) | |
... count1 = await acol.count_documents({}, upper_bound=100) | |
... print("count1", count1) | |
... await acol.insert_many( | |
... [{"seq": i} for i in range(50)], | |
... ordered=False, | |
... concurrency=5, | |
... ) | |
... count2 = await acol.count_documents({}, upper_bound=100) | |
... print("count2", count2) | |
... | |
>>> asyncio.run(write_and_count(my_async_coll)) | |
count0 0 | |
inserted1 ['e3c2a684-...', '1de4949f-...', '167dacc3-...'] | |
count1 3 | |
count2 53 | |
# The following are three equivalent statements: | |
>>> asyncio.run(my_async_coll.insert_many( | |
... [{"tag": "a"}, {"tag": "b"}], | |
... vectors=[[1, 2], [3, 4]], | |
... )) | |
InsertManyResult(...) | |
>>> asyncio.run(my_async_coll.insert_many( | |
... [{"tag": "a", "$vector": [1, 2]}, {"tag": "b"}], | |
... vectors=[None, [3, 4]], | |
... )) | |
InsertManyResult(...) | |
>>> asyncio.run(my_async_coll.insert_many( | |
... [ | |
... {"tag": "a", "$vector": [1, 2]}, | |
... {"tag": "b", "$vector": [3, 4]}, | |
... ] | |
... )) | |
InsertManyResult(...) | |
Note: | |
Unordered insertions are executed with some degree of concurrency, | |
so it is usually better to prefer this mode unless the order in the | |
document sequence is important. | |
Note: | |
A failure mode for this command is related to certain faulty documents | |
found among those to insert: a document may have the an `_id` already | |
present on the collection, or its vector dimension may not | |
match the collection setting. | |
For an ordered insertion, the method will raise an exception at | |
the first such faulty document -- nevertheless, all documents processed | |
until then will end up being written to the database. | |
For unordered insertions, if the error stems from faulty documents | |
the insertion proceeds until exhausting the input documents: then, | |
an exception is raised -- and all insertable documents will have been | |
written to the database, including those "after" the troublesome ones. | |
If, on the other hand, there are errors not related to individual | |
documents (such as a network connectivity error), the whole | |
`insert_many` operation will stop in mid-way, an exception will be raised, | |
and only a certain amount of the input documents will | |
have made their way to the database. | |
""" | |
if concurrency is None: | |
if ordered: | |
_concurrency = 1 | |
else: | |
_concurrency = DEFAULT_INSERT_MANY_CONCURRENCY | |
else: | |
_concurrency = concurrency | |
if _concurrency > 1 and ordered: | |
raise ValueError("Cannot run ordered insert_many concurrently.") | |
if chunk_size is None: | |
_chunk_size = MAX_INSERT_NUM_DOCUMENTS | |
else: | |
_chunk_size = chunk_size | |
_documents = _collate_vectors_to_documents(documents, vectors, vectorize) | |
logger.info(f"inserting {len(_documents)} documents in '{self.name}'") | |
raw_results: List[Dict[str, Any]] = [] | |
timeout_manager = MultiCallTimeoutManager(overall_max_time_ms=max_time_ms) | |
if ordered: | |
options = {"ordered": True} | |
inserted_ids: List[Any] = [] | |
for i in range(0, len(_documents), _chunk_size): | |
logger.info(f"inserting a chunk of documents in '{self.name}'") | |
chunk_response = await self._astra_db_collection.insert_many( | |
documents=_documents[i : i + _chunk_size], | |
options=options, | |
partial_failures_allowed=True, | |
timeout_info=timeout_manager.remaining_timeout_info(), | |
) | |
logger.info(f"finished inserting a chunk of documents in '{self.name}'") | |
# accumulate the results in this call | |
chunk_inserted_ids = (chunk_response.get("status") or {}).get( | |
"insertedIds", [] | |
) | |
inserted_ids += chunk_inserted_ids | |
raw_results += [chunk_response] | |
# if errors, quit early | |
if chunk_response.get("errors", []): | |
partial_result = InsertManyResult( | |
raw_results=raw_results, | |
inserted_ids=inserted_ids, | |
) | |
raise InsertManyException.from_response( | |
command=None, | |
raw_response=chunk_response, | |
partial_result=partial_result, | |
) | |
# return | |
full_result = InsertManyResult( | |
raw_results=raw_results, | |
inserted_ids=inserted_ids, | |
) | |
logger.info( | |
f"finished inserting {len(_documents)} documents in '{self.name}'" | |
) | |
return full_result | |
else: | |
# unordered: concurrent or not, do all of them and parse the results | |
options = {"ordered": False} | |
sem = asyncio.Semaphore(_concurrency) | |
async def concurrent_insert_chunk( | |
document_chunk: List[DocumentType], | |
) -> Dict[str, Any]: | |
async with sem: | |
logger.info(f"inserting a chunk of documents in '{self.name}'") | |
im_response = await self._astra_db_collection.insert_many( | |
document_chunk, | |
options=options, | |
partial_failures_allowed=True, | |
timeout_info=timeout_manager.remaining_timeout_info(), | |
) | |
logger.info( | |
f"finished inserting a chunk of documents in '{self.name}'" | |
) | |
return im_response | |
if _concurrency > 1: | |
tasks = [ | |
asyncio.create_task( | |
concurrent_insert_chunk(_documents[i : i + _chunk_size]) | |
) | |
for i in range(0, len(_documents), _chunk_size) | |
] | |
raw_results = await asyncio.gather(*tasks) | |
else: | |
raw_results = [ | |
await concurrent_insert_chunk(_documents[i : i + _chunk_size]) | |
for i in range(0, len(_documents), _chunk_size) | |
] | |
# recast raw_results | |
inserted_ids = [ | |
inserted_id | |
for chunk_response in raw_results | |
for inserted_id in (chunk_response.get("status") or {}).get( | |
"insertedIds", [] | |
) | |
] | |
# check-raise | |
if any( | |
[chunk_response.get("errors", []) for chunk_response in raw_results] | |
): | |
partial_result = InsertManyResult( | |
raw_results=raw_results, | |
inserted_ids=inserted_ids, | |
) | |
raise InsertManyException.from_responses( | |
commands=[None for _ in raw_results], | |
raw_responses=raw_results, | |
partial_result=partial_result, | |
) | |
# return | |
full_result = InsertManyResult( | |
raw_results=raw_results, | |
inserted_ids=inserted_ids, | |
) | |
logger.info( | |
f"finished inserting {len(_documents)} documents in '{self.name}'" | |
) | |
return full_result | |
def find( | |
self, | |
filter: Optional[FilterType] = None, | |
*, | |
projection: Optional[ProjectionType] = None, | |
skip: Optional[int] = None, | |
limit: Optional[int] = None, | |
vector: Optional[VectorType] = None, | |
vectorize: Optional[str] = None, | |
include_similarity: Optional[bool] = None, | |
sort: Optional[SortType] = None, | |
max_time_ms: Optional[int] = None, | |
) -> AsyncCursor: | |
""" | |
Find documents on the collection, matching a certain provided filter. | |
The method returns a Cursor that can then be iterated over. Depending | |
on the method call pattern, the iteration over all documents can reflect | |
collection mutations occurred since the `find` method was called, or not. | |
In cases where the cursor reflects mutations in real-time, it will iterate | |
over cursors in an approximate way (i.e. exhibiting occasional skipped | |
or duplicate documents). This happens when making use of the `sort` | |
option in a non-vector-search manner. | |
Args: | |
filter: a predicate expressed as a dictionary according to the | |
Data API filter syntax. Examples are: | |
{} | |
{"name": "John"} | |
{"price": {"$le": 100}} | |
{"$and": [{"name": "John"}, {"price": {"$le": 100}}]} | |
See the Data API documentation for the full set of operators. | |
projection: used to select a subset of fields in the documents being | |
returned. The projection can be: an iterable over the field names | |
to return; a dictionary {field_name: True} to positively select | |
certain fields; or a dictionary {field_name: False} if one wants | |
to discard some fields from the response. | |
The default is to return the whole documents. | |
skip: with this integer parameter, what would be the first `skip` | |
documents returned by the query are discarded, and the results | |
start from the (skip+1)-th document. | |
This parameter can be used only in conjunction with an explicit | |
`sort` criterion of the ascending/descending type (i.e. it cannot | |
be used when not sorting, nor with vector-based ANN search). | |
limit: this (integer) parameter sets a limit over how many documents | |
are returned. Once `limit` is reached (or the cursor is exhausted | |
for lack of matching documents), nothing more is returned. | |
vector: a suitable vector, i.e. a list of float numbers of the appropriate | |
dimensionality, to perform vector search (i.e. ANN, | |
or "approximate nearest-neighbours" search). | |
When running similarity search on a collection, no other sorting | |
criteria can be specified. Moreover, there is an upper bound | |
to the number of documents that can be returned. For details, | |
see the Note about upper bounds and the Data API documentation. | |
vectorize: a string to be made into a vector to perform vector search. | |
This can be supplied in (exclusive) alternative to `vector`, | |
provided such a service is configured for the collection, | |
and achieves the same effect. | |
NOTE: This feature is under current development. | |
include_similarity: a boolean to request the numeric value of the | |
similarity to be returned as an added "$similarity" key in each | |
returned document. Can only be used for vector ANN search, i.e. | |
when either `vector` is supplied or the `sort` parameter has the | |
shape {"$vector": ...}. | |
sort: with this dictionary parameter one can control the order | |
the documents are returned. See the Note about sorting, as well as | |
the one about upper bounds, for details. | |
max_time_ms: a timeout, in milliseconds, for each single one | |
of the underlying HTTP requests used to fetch documents as the | |
cursor is iterated over. | |
Returns: | |
an AsyncCursor object representing iterations over the matching documents | |
(see the AsyncCursor object for how to use it. The simplest thing is to | |
run a for loop: `for document in collection.sort(...):`). | |
Examples: | |
>>> async def run_finds(acol: AsyncCollection) -> None: | |
... filter = {"seq": {"$exists": True}} | |
... print("find results 1:") | |
... async for doc in acol.find(filter, projection={"seq": True}, limit=5): | |
... print(doc["seq"]) | |
... async_cursor1 = acol.find( | |
... {}, | |
... limit=4, | |
... sort={"seq": astrapy.constants.SortDocuments.DESCENDING}, | |
... ) | |
... ids = [doc["_id"] async for doc in async_cursor1] | |
... print("find results 2:", ids) | |
... async_cursor2 = acol.find({}, limit=3) | |
... seqs = await async_cursor2.distinct("seq") | |
... print("distinct results 3:", seqs) | |
... | |
>>> asyncio.run(run_finds(my_async_coll)) | |
find results 1: | |
48 | |
35 | |
7 | |
11 | |
13 | |
find results 2: ['d656cd9d-...', '479c7ce8-...', '96dc87fd-...', '83f0a21f-...'] | |
distinct results 3: [48, 35, 7] | |
>>> async def run_vector_finds(acol: AsyncCollection) -> None: | |
... await acol.insert_many([ | |
... {"tag": "A", "$vector": [4, 5]}, | |
... {"tag": "B", "$vector": [3, 4]}, | |
... {"tag": "C", "$vector": [3, 2]}, | |
... {"tag": "D", "$vector": [4, 1]}, | |
... {"tag": "E", "$vector": [2, 5]}, | |
... ]) | |
... ann_tags = [ | |
... document["tag"] | |
... async for document in acol.find( | |
... {}, | |
... limit=3, | |
... vector=[3, 3], | |
... ) | |
... ] | |
... return ann_tags | |
... | |
>>> asyncio.run(run_vector_finds(my_async_coll)) | |
['A', 'B', 'C'] | |
# (assuming the collection has metric VectorMetric.COSINE) | |
Note: | |
The following are example values for the `sort` parameter. | |
When no particular order is required: | |
sort={} | |
When sorting by a certain value in ascending/descending order: | |
sort={"field": SortDocuments.ASCENDING} | |
sort={"field": SortDocuments.DESCENDING} | |
When sorting first by "field" and then by "subfield" | |
(while modern Python versions preserve the order of dictionaries, | |
it is suggested for clarity to employ a `collections.OrderedDict` | |
in these cases): | |
sort={ | |
"field": SortDocuments.ASCENDING, | |
"subfield": SortDocuments.ASCENDING, | |
} | |
When running a vector similarity (ANN) search: | |
sort={"$vector": [0.4, 0.15, -0.5]} | |
Note: | |
Some combinations of arguments impose an implicit upper bound on the | |
number of documents that are returned by the Data API. More specifically: | |
(a) Vector ANN searches cannot return more than a number of documents | |
that at the time of writing is set to 1000 items. | |
(b) When using a sort criterion of the ascending/descending type, | |
the Data API will return a smaller number of documents, set to 20 | |
at the time of writing, and stop there. The returned documents are | |
the top results across the whole collection according to the requested | |
criterion. | |
These provisions should be kept in mind even when subsequently running | |
a command such as `.distinct()` on a cursor. | |
Note: | |
When not specifying sorting criteria at all (by vector or otherwise), | |
the cursor can scroll through an arbitrary number of documents as | |
the Data API and the client periodically exchange new chunks of documents. | |
It should be noted that the behavior of the cursor in the case documents | |
have been added/removed after the `find` was started depends on database | |
internals and it is not guaranteed, nor excluded, that such "real-time" | |
changes in the data would be picked up by the cursor. | |
""" | |
_sort = _collate_vector_to_sort(sort, vector, vectorize) | |
if include_similarity is not None and not _is_vector_sort(_sort): | |
raise ValueError( | |
"Cannot use `include_similarity` when not searching through `vector`." | |
) | |
return ( | |
AsyncCursor( | |
collection=self, | |
filter=filter, | |
projection=projection, | |
max_time_ms=max_time_ms, | |
overall_max_time_ms=None, | |
) | |
.skip(skip) | |
.limit(limit) | |
.sort(_sort) | |
.include_similarity(include_similarity) | |
) | |
async def find_one( | |
self, | |
filter: Optional[FilterType] = None, | |
*, | |
projection: Optional[ProjectionType] = None, | |
vector: Optional[VectorType] = None, | |
vectorize: Optional[str] = None, | |
include_similarity: Optional[bool] = None, | |
sort: Optional[SortType] = None, | |
max_time_ms: Optional[int] = None, | |
) -> Union[DocumentType, None]: | |
""" | |
Run a search, returning the first document in the collection that matches | |
provided filters, if any is found. | |
Args: | |
filter: a predicate expressed as a dictionary according to the | |
Data API filter syntax. Examples are: | |
{} | |
{"name": "John"} | |
{"price": {"$le": 100}} | |
{"$and": [{"name": "John"}, {"price": {"$le": 100}}]} | |
See the Data API documentation for the full set of operators. | |
projection: used to select a subset of fields in the documents being | |
returned. The projection can be: an iterable over the field names | |
to return; a dictionary {field_name: True} to positively select | |
certain fields; or a dictionary {field_name: False} if one wants | |
to discard some fields from the response. | |
The default is to return the whole documents. | |
vector: a suitable vector, i.e. a list of float numbers of the appropriate | |
dimensionality, to perform vector search (i.e. ANN, | |
or "approximate nearest-neighbours" search), extracting the most | |
similar document in the collection matching the filter. | |
This parameter cannot be used together with `sort`. | |
See the `find` method for more details on this parameter. | |
vectorize: a string to be made into a vector to perform vector search. | |
This can be supplied in (exclusive) alternative to `vector`, | |
provided such a service is configured for the collection, | |
and achieves the same effect. | |
NOTE: This feature is under current development. | |
include_similarity: a boolean to request the numeric value of the | |
similarity to be returned as an added "$similarity" key in the | |
returned document. Can only be used for vector ANN search, i.e. | |
when either `vector` is supplied or the `sort` parameter has the | |
shape {"$vector": ...}. | |
sort: with this dictionary parameter one can control the order | |
the documents are returned. See the Note about sorting for details. | |
max_time_ms: a timeout, in milliseconds, for the underlying HTTP request. | |
Returns: | |
a dictionary expressing the required document, otherwise None. | |
Example: | |
>>> async def demo_find_one(acol: AsyncCollection) -> None: | |
.... print("Count:", await acol.count_documents({}, upper_bound=100)) | |
... result0 = await acol.find_one({}) | |
... print("result0", result0) | |
... result1 = await acol.find_one({"seq": 10}) | |
... print("result1", result1) | |
... result2 = await acol.find_one({"seq": 1011}) | |
... print("result2", result2) | |
... result3 = await acol.find_one({}, projection={"seq": False}) | |
... print("result3", result3) | |
... result4 = await acol.find_one( | |
... {}, | |
... sort={"seq": astrapy.constants.SortDocuments.DESCENDING}, | |
... ) | |
... print("result4", result4) | |
... | |
>>> | |
>>> asyncio.run(demo_find_one(my_async_coll)) | |
Count: 50 | |
result0 {'_id': '479c7ce8-...', 'seq': 48} | |
result1 {'_id': '93e992c4-...', 'seq': 10} | |
result2 None | |
result3 {'_id': '479c7ce8-...'} | |
result4 {'_id': 'd656cd9d-...', 'seq': 49} | |
>>> asyncio.run(my_async_coll.find_one({}, vector=[1, 0])) | |
{'_id': '...', 'tag': 'D', '$vector': [4.0, 1.0]} | |
Note: | |
See the `find` method for more details on the accepted parameters | |
(whereas `skip` and `limit` are not valid parameters for `find_one`). | |
""" | |
fo_cursor = self.find( | |
filter=filter, | |
projection=projection, | |
skip=None, | |
limit=1, | |
vector=vector, | |
vectorize=vectorize, | |
include_similarity=include_similarity, | |
sort=sort, | |
max_time_ms=max_time_ms, | |
) | |
try: | |
document = await fo_cursor.__anext__() | |
return document # type: ignore[no-any-return] | |
except StopAsyncIteration: | |
return None | |
async def distinct( | |
self, | |
key: str, | |
*, | |
filter: Optional[FilterType] = None, | |
max_time_ms: Optional[int] = None, | |
) -> List[Any]: | |
""" | |
Return a list of the unique values of `key` across the documents | |
in the collection that match the provided filter. | |
Args: | |
key: the name of the field whose value is inspected across documents. | |
Keys can use dot-notation to descend to deeper document levels. | |
Example of acceptable `key` values: | |
"field" | |
"field.subfield" | |
"field.3" | |
"field.3.subfield" | |
If lists are encountered and no numeric index is specified, | |
all items in the list are visited. | |
filter: a predicate expressed as a dictionary according to the | |
Data API filter syntax. Examples are: | |
{} | |
{"name": "John"} | |
{"price": {"$le": 100}} | |
{"$and": [{"name": "John"}, {"price": {"$le": 100}}]} | |
See the Data API documentation for the full set of operators. | |
max_time_ms: a timeout, in milliseconds, for the operation. | |
Returns: | |
a list of all different values for `key` found across the documents | |
that match the filter. The result list has no repeated items. | |
Example: | |
>>> async def run_distinct(acol: AsyncCollection) -> None: | |
... await acol.insert_many( | |
... [ | |
... {"name": "Marco", "food": ["apple", "orange"], "city": "Helsinki"}, | |
... {"name": "Emma", "food": {"likes_fruit": True, "allergies": []}}, | |
... ] | |
... ) | |
... distinct0 = await acol.distinct("name") | |
... print("distinct('name')", distinct0) | |
... distinct1 = await acol.distinct("city") | |
... print("distinct('city')", distinct1) | |
... distinct2 = await acol.distinct("food") | |
... print("distinct('food')", distinct2) | |
... distinct3 = await acol.distinct("food.1") | |
... print("distinct('food.1')", distinct3) | |
... distinct4 = await acol.distinct("food.allergies") | |
... print("distinct('food.allergies')", distinct4) | |
... distinct5 = await acol.distinct("food.likes_fruit") | |
... print("distinct('food.likes_fruit')", distinct5) | |
... | |
>>> asyncio.run(run_distinct(my_async_coll)) | |
distinct('name') ['Emma', 'Marco'] | |
distinct('city') ['Helsinki'] | |
distinct('food') [{'likes_fruit': True, 'allergies': []}, 'apple', 'orange'] | |
distinct('food.1') ['orange'] | |
distinct('food.allergies') [] | |
distinct('food.likes_fruit') [True] | |
Note: | |
It must be kept in mind that `distinct` is a client-side operation, | |
which effectively browses all required documents using the logic | |
of the `find` method and collects the unique values found for `key`. | |
As such, there may be performance, latency and ultimately | |
billing implications if the amount of matching documents is large. | |
Note: | |
For details on the behaviour of "distinct" in conjunction with | |
real-time changes in the collection contents, see the | |
Note of the `find` command. | |
""" | |
f_cursor = AsyncCursor( | |
collection=self, | |
filter=filter, | |
projection={key: True}, | |
max_time_ms=None, | |
overall_max_time_ms=max_time_ms, | |
) | |
return await f_cursor.distinct(key) # type: ignore[no-any-return] | |
async def count_documents( | |
self, | |
filter: Dict[str, Any], | |
*, | |
upper_bound: int, | |
max_time_ms: Optional[int] = None, | |
) -> int: | |
""" | |
Count the documents in the collection matching the specified filter. | |
Args: | |
filter: a predicate expressed as a dictionary according to the | |
Data API filter syntax. Examples are: | |
{} | |
{"name": "John"} | |
{"price": {"$le": 100}} | |
{"$and": [{"name": "John"}, {"price": {"$le": 100}}]} | |
See the Data API documentation for the full set of operators. | |
upper_bound: a required ceiling on the result of the count operation. | |
If the actual number of documents exceeds this value, | |
an exception will be raised. | |
Furthermore, if the actual number of documents exceeds the maximum | |
count that the Data API can reach (regardless of upper_bound), | |
an exception will be raised. | |
max_time_ms: a timeout, in milliseconds, for the underlying HTTP request. | |
Returns: | |
the exact count of matching documents. | |
Example: | |
>>> async def do_count_docs(acol: AsyncCollection) -> None: | |
... await acol.insert_many([{"seq": i} for i in range(20)]) | |
... count0 = await acol.count_documents({}, upper_bound=100) | |
... print("count0", count0) | |
... count1 = await acol.count_documents({"seq":{"$gt": 15}}, upper_bound=100) | |
... print("count1", count1) | |
... count2 = await acol.count_documents({}, upper_bound=10) | |
... print("count2", count2) | |
... | |
>>> asyncio.run(do_count_docs(my_async_coll)) | |
count0 20 | |
count1 4 | |
Traceback (most recent call last): | |
... ... | |
astrapy.exceptions.TooManyDocumentsToCountException | |
Note: | |
Count operations are expensive: for this reason, the best practice | |
is to provide a reasonable `upper_bound` according to the caller | |
expectations. Moreover, indiscriminate usage of count operations | |
for sizeable amounts of documents (i.e. in the thousands and more) | |
is discouraged in favor of alternative application-specific solutions. | |
Keep in mind that the Data API has a hard upper limit on the amount | |
of documents it will count, and that an exception will be thrown | |
by this method if this limit is encountered. | |
""" | |
logger.info("calling count_documents") | |
cd_response = await self._astra_db_collection.count_documents( | |
filter=filter, | |
timeout_info=base_timeout_info(max_time_ms), | |
) | |
logger.info("finished calling count_documents") | |
if "count" in cd_response.get("status", {}): | |
count: int = cd_response["status"]["count"] | |
if cd_response["status"].get("moreData", False): | |
raise TooManyDocumentsToCountException( | |
text=f"Document count exceeds {count}, the maximum allowed by the server", | |
server_max_count_exceeded=True, | |
) | |
else: | |
if count > upper_bound: | |
raise TooManyDocumentsToCountException( | |
text="Document count exceeds required upper bound", | |
server_max_count_exceeded=False, | |
) | |
else: | |
return count | |
else: | |
raise DataAPIFaultyResponseException( | |
text="Faulty response from count_documents API command.", | |
raw_response=cd_response, | |
) | |
async def estimated_document_count( | |
self, | |
*, | |
max_time_ms: Optional[int] = None, | |
) -> int: | |
""" | |
Query the API server for an estimate of the document count in the collection. | |
Contrary to `count_documents`, this method has no filtering parameters. | |
Args: | |
max_time_ms: a timeout, in milliseconds, for the underlying HTTP request. | |
Returns: | |
a server-provided estimate count of the documents in the collection. | |
Example: | |
>>> asyncio.run(my_async_coll.estimated_document_count()) | |
35700 | |
""" | |
ed_response = await self.command( | |
{"estimatedDocumentCount": {}}, | |
max_time_ms=max_time_ms, | |
) | |
if "count" in ed_response.get("status", {}): | |
count: int = ed_response["status"]["count"] | |
return count | |
else: | |
raise DataAPIFaultyResponseException( | |
text="Faulty response from estimated_document_count API command.", | |
raw_response=ed_response, | |
) | |
async def find_one_and_replace( | |
self, | |
filter: Dict[str, Any], | |
replacement: DocumentType, | |
*, | |
projection: Optional[ProjectionType] = None, | |
vector: Optional[VectorType] = None, | |
vectorize: Optional[str] = None, | |
sort: Optional[SortType] = None, | |
upsert: bool = False, | |
return_document: str = ReturnDocument.BEFORE, | |
max_time_ms: Optional[int] = None, | |
) -> Union[DocumentType, None]: | |
""" | |
Find a document on the collection and replace it entirely with a new one, | |
optionally inserting a new one if no match is found. | |
Args: | |
filter: a predicate expressed as a dictionary according to the | |
Data API filter syntax. Examples are: | |
{} | |
{"name": "John"} | |
{"price": {"$le": 100}} | |
{"$and": [{"name": "John"}, {"price": {"$le": 100}}]} | |
See the Data API documentation for the full set of operators. | |
replacement: the new document to write into the collection. | |
projection: used to select a subset of fields in the document being | |
returned. The projection can be: an iterable over the field names | |
to return; a dictionary {field_name: True} to positively select | |
certain fields; or a dictionary {field_name: False} if one wants | |
to discard some fields from the response. | |
The default is to return the whole documents. | |
vector: a suitable vector, i.e. a list of float numbers of the appropriate | |
dimensionality, to use vector search (i.e. ANN, | |
or "approximate nearest-neighbours" search), as the sorting criterion. | |
In this way, the matched document (if any) will be the one | |
that is most similar to the provided vector. | |
This parameter cannot be used together with `sort`. | |
See the `find` method for more details on this parameter. | |
vectorize: a string to be made into a vector to perform vector search. | |
This can be supplied in (exclusive) alternative to `vector`, | |
provided such a service is configured for the collection, | |
and achieves the same effect. | |
NOTE: This feature is under current development. | |
sort: with this dictionary parameter one can control the sorting | |
order of the documents matching the filter, effectively | |
determining what document will come first and hence be the | |
replaced one. See the `find` method for more on sorting. | |
upsert: this parameter controls the behavior in absence of matches. | |
If True, `replacement` is inserted as a new document | |
if no matches are found on the collection. If False, | |
the operation silently does nothing in case of no matches. | |
return_document: a flag controlling what document is returned: | |
if set to `ReturnDocument.BEFORE`, or the string "before", | |
the document found on database is returned; if set to | |
`ReturnDocument.AFTER`, or the string "after", the new | |
document is returned. The default is "before". | |
max_time_ms: a timeout, in milliseconds, for the underlying HTTP request. | |
Returns: | |
A document, either the one before the replace operation or the | |
one after that. Alternatively, the method returns None to represent | |
that no matching document was found, or that no replacement | |
was inserted (depending on the `return_document` parameter). | |
Example: | |
>>> async def do_find_one_and_replace(acol: AsyncCollection) -> None: | |
... await acol.insert_one({"_id": "rule1", "text": "all animals are equal"}) | |
... result0 = await acol.find_one_and_replace( | |
... {"_id": "rule1"}, | |
... {"text": "some animals are more equal!"}, | |
... ) | |
... print("result0", result0) | |
... result1 = await acol.find_one_and_replace( | |
... {"text": "some animals are more equal!"}, | |
... {"text": "and the pigs are the rulers"}, | |
... return_document=astrapy.constants.ReturnDocument.AFTER, | |
... ) | |
... print("result1", result1) | |
... result2 = await acol.find_one_and_replace( | |
... {"_id": "rule2"}, | |
... {"text": "F=ma^2"}, | |
... return_document=astrapy.constants.ReturnDocument.AFTER, | |
... ) | |
... print("result2", result2) | |
... result3 = await acol.find_one_and_replace( | |
... {"_id": "rule2"}, | |
... {"text": "F=ma"}, | |
... upsert=True, | |
... return_document=astrapy.constants.ReturnDocument.AFTER, | |
... projection={"_id": False}, | |
... ) | |
... print("result3", result3) | |
... | |
>>> asyncio.run(do_find_one_and_replace(my_async_coll)) | |
result0 {'_id': 'rule1', 'text': 'all animals are equal'} | |
result1 {'_id': 'rule1', 'text': 'and the pigs are the rulers'} | |
result2 None | |
result3 {'text': 'F=ma'} | |
""" | |
_sort = _collate_vector_to_sort(sort, vector, vectorize) | |
options = { | |
"returnDocument": return_document, | |
"upsert": upsert, | |
} | |
logger.info(f"calling find_one_and_replace on '{self.name}'") | |
fo_response = await self._astra_db_collection.find_one_and_replace( | |
replacement=replacement, | |
filter=filter, | |
projection=normalize_optional_projection(projection), | |
sort=_sort, | |
options=options, | |
timeout_info=base_timeout_info(max_time_ms), | |
) | |
logger.info(f"finished calling find_one_and_replace on '{self.name}'") | |
if "document" in fo_response.get("data", {}): | |
ret_document = fo_response.get("data", {}).get("document") | |
if ret_document is None: | |
return None | |
else: | |
return ret_document # type: ignore[no-any-return] | |
else: | |
raise DataAPIFaultyResponseException( | |
text="Faulty response from find_one_and_replace API command.", | |
raw_response=fo_response, | |
) | |
async def replace_one( | |
self, | |
filter: Dict[str, Any], | |
replacement: DocumentType, | |
*, | |
vector: Optional[VectorType] = None, | |
vectorize: Optional[str] = None, | |
sort: Optional[SortType] = None, | |
upsert: bool = False, | |
max_time_ms: Optional[int] = None, | |
) -> UpdateResult: | |
""" | |
Replace a single document on the collection with a new one, | |
optionally inserting a new one if no match is found. | |
Args: | |
filter: a predicate expressed as a dictionary according to the | |
Data API filter syntax. Examples are: | |
{} | |
{"name": "John"} | |
{"price": {"$le": 100}} | |
{"$and": [{"name": "John"}, {"price": {"$le": 100}}]} | |
See the Data API documentation for the full set of operators. | |
replacement: the new document to write into the collection. | |
vector: a suitable vector, i.e. a list of float numbers of the appropriate | |
dimensionality, to use vector search (i.e. ANN, | |
or "approximate nearest-neighbours" search), as the sorting criterion. | |
In this way, the matched document (if any) will be the one | |
that is most similar to the provided vector. | |
This parameter cannot be used together with `sort`. | |
See the `find` method for more details on this parameter. | |
vectorize: a string to be made into a vector to perform vector search. | |
This can be supplied in (exclusive) alternative to `vector`, | |
provided such a service is configured for the collection, | |
and achieves the same effect. | |
NOTE: This feature is under current development. | |
sort: with this dictionary parameter one can control the sorting | |
order of the documents matching the filter, effectively | |
determining what document will come first and hence be the | |
replaced one. See the `find` method for more on sorting. | |
upsert: this parameter controls the behavior in absence of matches. | |
If True, `replacement` is inserted as a new document | |
if no matches are found on the collection. If False, | |
the operation silently does nothing in case of no matches. | |
max_time_ms: a timeout, in milliseconds, for the underlying HTTP request. | |
Returns: | |
an UpdateResult object summarizing the outcome of the replace operation. | |
Example: | |
>>> async def do_replace_one(acol: AsyncCollection) -> None: | |
... await acol.insert_one({"Marco": "Polo"}) | |
... result0 = await acol.replace_one( | |
... {"Marco": {"$exists": True}}, | |
... {"Buda": "Pest"}, | |
... ) | |
... print("result0.update_info", result0.update_info) | |
... doc1 = await acol.find_one({"Buda": "Pest"}) | |
... print("doc1", doc1) | |
... result1 = await acol.replace_one( | |
... {"Mirco": {"$exists": True}}, | |
... {"Oh": "yeah?"}, | |
... ) | |
... print("result1.update_info", result1.update_info) | |
... result2 = await acol.replace_one( | |
... {"Mirco": {"$exists": True}}, | |
... {"Oh": "yeah?"}, | |
... upsert=True, | |
... ) | |
... print("result2.update_info", result2.update_info) | |
... | |
>>> asyncio.run(do_replace_one(my_async_coll)) | |
result0.update_info {'n': 1, 'updatedExisting': True, 'ok': 1.0, 'nModified': 1} | |
doc1 {'_id': '6e669a5a-...', 'Buda': 'Pest'} | |
result1.update_info {'n': 0, 'updatedExisting': False, 'ok': 1.0, 'nModified': 0} | |
result2.update_info {'n': 1, 'updatedExisting': False, 'ok': 1.0, 'nModified': 0, 'upserted': '30e34e00-...'} | |
""" | |
_sort = _collate_vector_to_sort(sort, vector, vectorize) | |
options = { | |
"upsert": upsert, | |
} | |
logger.info(f"calling find_one_and_replace on '{self.name}'") | |
fo_response = await self._astra_db_collection.find_one_and_replace( | |
replacement=replacement, | |
filter=filter, | |
sort=_sort, | |
options=options, | |
timeout_info=base_timeout_info(max_time_ms), | |
) | |
logger.info(f"finished calling find_one_and_replace on '{self.name}'") | |
if "document" in fo_response.get("data", {}): | |
fo_status = fo_response.get("status") or {} | |
_update_info = _prepare_update_info([fo_status]) | |
return UpdateResult( | |
raw_results=[fo_response], | |
update_info=_update_info, | |
) | |
else: | |
raise DataAPIFaultyResponseException( | |
text="Faulty response from find_one_and_replace API command.", | |
raw_response=fo_response, | |
) | |
async def find_one_and_update( | |
self, | |
filter: Dict[str, Any], | |
update: Dict[str, Any], | |
*, | |
projection: Optional[ProjectionType] = None, | |
vector: Optional[VectorType] = None, | |
vectorize: Optional[str] = None, | |
sort: Optional[SortType] = None, | |
upsert: bool = False, | |
return_document: str = ReturnDocument.BEFORE, | |
max_time_ms: Optional[int] = None, | |
) -> Union[DocumentType, None]: | |
""" | |
Find a document on the collection and update it as requested, | |
optionally inserting a new one if no match is found. | |
Args: | |
filter: a predicate expressed as a dictionary according to the | |
Data API filter syntax. Examples are: | |
{} | |
{"name": "John"} | |
{"price": {"$le": 100}} | |
{"$and": [{"name": "John"}, {"price": {"$le": 100}}]} | |
See the Data API documentation for the full set of operators. | |
update: the update prescription to apply to the document, expressed | |
as a dictionary as per Data API syntax. Examples are: | |
{"$set": {"field": "value}} | |
{"$inc": {"counter": 10}} | |
{"$unset": {"field": ""}} | |
See the Data API documentation for the full syntax. | |
projection: used to select a subset of fields in the document being | |
returned. The projection can be: an iterable over the field names | |
to return; a dictionary {field_name: True} to positively select | |
certain fields; or a dictionary {field_name: False} if one wants | |
to discard some fields from the response. | |
The default is to return the whole documents. | |
vector: a suitable vector, i.e. a list of float numbers of the appropriate | |
dimensionality, to use vector search (i.e. ANN, | |
or "approximate nearest-neighbours" search), as the sorting criterion. | |
In this way, the matched document (if any) will be the one | |
that is most similar to the provided vector. | |
This parameter cannot be used together with `sort`. | |
See the `find` method for more details on this parameter. | |
vectorize: a string to be made into a vector to perform vector search. | |
This can be supplied in (exclusive) alternative to `vector`, | |
provided such a service is configured for the collection, | |
and achieves the same effect. | |
NOTE: This feature is under current development. | |
sort: with this dictionary parameter one can control the sorting | |
order of the documents matching the filter, effectively | |
determining what document will come first and hence be the | |
updated one. See the `find` method for more on sorting. | |
upsert: this parameter controls the behavior in absence of matches. | |
If True, a new document (resulting from applying the `update` | |
to an empty document) is inserted if no matches are found on | |
the collection. If False, the operation silently does nothing | |
in case of no matches. | |
return_document: a flag controlling what document is returned: | |
if set to `ReturnDocument.BEFORE`, or the string "before", | |
the document found on database is returned; if set to | |
`ReturnDocument.AFTER`, or the string "after", the new | |
document is returned. The default is "before". | |
max_time_ms: a timeout, in milliseconds, for the underlying HTTP request. | |
Returns: | |
A document (or a projection thereof, as required), either the one | |
before the replace operation or the one after that. | |
Alternatively, the method returns None to represent | |
that no matching document was found, or that no update | |
was applied (depending on the `return_document` parameter). | |
Example: | |
>>> async def do_find_one_and_update(acol: AsyncCollection) -> None: | |
... await acol.insert_one({"Marco": "Polo"}) | |
... result0 = await acol.find_one_and_update( | |
... {"Marco": {"$exists": True}}, | |
... {"$set": {"title": "Mr."}}, | |
... ) | |
... print("result0", result0) | |
... result1 = await acol.find_one_and_update( | |
... {"title": "Mr."}, | |
... {"$inc": {"rank": 3}}, | |
... projection=["title", "rank"], | |
... return_document=astrapy.constants.ReturnDocument.AFTER, | |
... ) | |
... print("result1", result1) | |
... result2 = await acol.find_one_and_update( | |
... {"name": "Johnny"}, | |
... {"$set": {"rank": 0}}, | |
... return_document=astrapy.constants.ReturnDocument.AFTER, | |
... ) | |
... print("result2", result2) | |
... result3 = await acol.find_one_and_update( | |
... {"name": "Johnny"}, | |
... {"$set": {"rank": 0}}, | |
... upsert=True, | |
... return_document=astrapy.constants.ReturnDocument.AFTER, | |
... ) | |
... print("result3", result3) | |
... | |
>>> asyncio.run(do_find_one_and_update(my_async_coll)) | |
result0 {'_id': 'f7c936d3-b0a0-45eb-a676-e2829662a57c', 'Marco': 'Polo'} | |
result1 {'_id': 'f7c936d3-b0a0-45eb-a676-e2829662a57c', 'title': 'Mr.', 'rank': 3} | |
result2 None | |
result3 {'_id': 'db3d678d-14d4-4caa-82d2-d5fb77dab7ec', 'name': 'Johnny', 'rank': 0} | |
""" | |
_sort = _collate_vector_to_sort(sort, vector, vectorize) | |
options = { | |
"returnDocument": return_document, | |
"upsert": upsert, | |
} | |
logger.info(f"calling find_one_and_update on '{self.name}'") | |
fo_response = await self._astra_db_collection.find_one_and_update( | |
update=update, | |
filter=filter, | |
projection=normalize_optional_projection(projection), | |
sort=_sort, | |
options=options, | |
timeout_info=base_timeout_info(max_time_ms), | |
) | |
logger.info(f"finished calling find_one_and_update on '{self.name}'") | |
if "document" in fo_response.get("data", {}): | |
ret_document = fo_response.get("data", {}).get("document") | |
if ret_document is None: | |
return None | |
else: | |
return ret_document # type: ignore[no-any-return] | |
else: | |
raise DataAPIFaultyResponseException( | |
text="Faulty response from find_one_and_update API command.", | |
raw_response=fo_response, | |
) | |
async def update_one( | |
self, | |
filter: Dict[str, Any], | |
update: Dict[str, Any], | |
*, | |
vector: Optional[VectorType] = None, | |
vectorize: Optional[str] = None, | |
sort: Optional[SortType] = None, | |
upsert: bool = False, | |
max_time_ms: Optional[int] = None, | |
) -> UpdateResult: | |
""" | |
Update a single document on the collection as requested, | |
optionally inserting a new one if no match is found. | |
Args: | |
filter: a predicate expressed as a dictionary according to the | |
Data API filter syntax. Examples are: | |
{} | |
{"name": "John"} | |
{"price": {"$le": 100}} | |
{"$and": [{"name": "John"}, {"price": {"$le": 100}}]} | |
See the Data API documentation for the full set of operators. | |
update: the update prescription to apply to the document, expressed | |
as a dictionary as per Data API syntax. Examples are: | |
{"$set": {"field": "value}} | |
{"$inc": {"counter": 10}} | |
{"$unset": {"field": ""}} | |
See the Data API documentation for the full syntax. | |
vector: a suitable vector, i.e. a list of float numbers of the appropriate | |
dimensionality, to use vector search (i.e. ANN, | |
or "approximate nearest-neighbours" search), as the sorting criterion. | |
In this way, the matched document (if any) will be the one | |
that is most similar to the provided vector. | |
This parameter cannot be used together with `sort`. | |
See the `find` method for more details on this parameter. | |
vectorize: a string to be made into a vector to perform vector search. | |
This can be supplied in (exclusive) alternative to `vector`, | |
provided such a service is configured for the collection, | |
and achieves the same effect. | |
NOTE: This feature is under current development. | |
sort: with this dictionary parameter one can control the sorting | |
order of the documents matching the filter, effectively | |
determining what document will come first and hence be the | |
updated one. See the `find` method for more on sorting. | |
upsert: this parameter controls the behavior in absence of matches. | |
If True, a new document (resulting from applying the `update` | |
to an empty document) is inserted if no matches are found on | |
the collection. If False, the operation silently does nothing | |
in case of no matches. | |
max_time_ms: a timeout, in milliseconds, for the underlying HTTP request. | |
Returns: | |
an UpdateResult object summarizing the outcome of the update operation. | |
Example: | |
>>> async def do_update_one(acol: AsyncCollection) -> None: | |
... await acol.insert_one({"Marco": "Polo"}) | |
... result0 = await acol.update_one( | |
... {"Marco": {"$exists": True}}, | |
... {"$inc": {"rank": 3}}, | |
... ) | |
... print("result0.update_info", result0.update_info) | |
... result1 = await acol.update_one( | |
... {"Mirko": {"$exists": True}}, | |
... {"$inc": {"rank": 3}}, | |
... ) | |
... print("result1.update_info", result1.update_info) | |
... result2 = await acol.update_one( | |
... {"Mirko": {"$exists": True}}, | |
... {"$inc": {"rank": 3}}, | |
... upsert=True, | |
... ) | |
... print("result2.update_info", result2.update_info) | |
... | |
>>> asyncio.run(do_update_one(my_async_coll)) | |
result0.update_info {'n': 1, 'updatedExisting': True, 'ok': 1.0, 'nModified': 1}) | |
result1.update_info {'n': 0, 'updatedExisting': False, 'ok': 1.0, 'nModified': 0}) | |
result2.update_info {'n': 1, 'updatedExisting': False, 'ok': 1.0, 'nModified': 0, 'upserted': '75748092-...'} | |
""" | |
_sort = _collate_vector_to_sort(sort, vector, vectorize) | |
options = { | |
"upsert": upsert, | |
} | |
logger.info(f"calling find_one_and_update on '{self.name}'") | |
fo_response = await self._astra_db_collection.find_one_and_update( | |
update=update, | |
sort=_sort, | |
filter=filter, | |
options=options, | |
timeout_info=base_timeout_info(max_time_ms), | |
) | |
logger.info(f"finished calling find_one_and_update on '{self.name}'") | |
if "document" in fo_response.get("data", {}): | |
fo_status = fo_response.get("status") or {} | |
_update_info = _prepare_update_info([fo_status]) | |
return UpdateResult( | |
raw_results=[fo_response], | |
update_info=_update_info, | |
) | |
else: | |
raise DataAPIFaultyResponseException( | |
text="Faulty response from find_one_and_update API command.", | |
raw_response=fo_response, | |
) | |
async def update_many( | |
self, | |
filter: Dict[str, Any], | |
update: Dict[str, Any], | |
*, | |
upsert: bool = False, | |
max_time_ms: Optional[int] = None, | |
) -> UpdateResult: | |
""" | |
Apply an update operations to all documents matching a condition, | |
optionally inserting one documents in absence of matches. | |
Args: | |
filter: a predicate expressed as a dictionary according to the | |
Data API filter syntax. Examples are: | |
{} | |
{"name": "John"} | |
{"price": {"$le": 100}} | |
{"$and": [{"name": "John"}, {"price": {"$le": 100}}]} | |
See the Data API documentation for the full set of operators. | |
update: the update prescription to apply to the documents, expressed | |
as a dictionary as per Data API syntax. Examples are: | |
{"$set": {"field": "value}} | |
{"$inc": {"counter": 10}} | |
{"$unset": {"field": ""}} | |
See the Data API documentation for the full syntax. | |
upsert: this parameter controls the behavior in absence of matches. | |
If True, a single new document (resulting from applying `update` | |
to an empty document) is inserted if no matches are found on | |
the collection. If False, the operation silently does nothing | |
in case of no matches. | |
max_time_ms: a timeout, in milliseconds, for the operation. | |
Returns: | |
an UpdateResult object summarizing the outcome of the update operation. | |
Example: | |
>>> async def do_update_many(acol: AsyncCollection) -> None: | |
... await acol.insert_many([{"c": "red"}, {"c": "green"}, {"c": "blue"}]) | |
... result0 = await acol.update_many( | |
... {"c": {"$ne": "green"}}, | |
... {"$set": {"nongreen": True}}, | |
... ) | |
... print("result0.update_info", result0.update_info) | |
... result1 = await acol.update_many( | |
... {"c": "orange"}, | |
... {"$set": {"is_also_fruit": True}}, | |
... ) | |
... print("result1.update_info", result1.update_info) | |
... result2 = await acol.update_many( | |
... {"c": "orange"}, | |
... {"$set": {"is_also_fruit": True}}, | |
... upsert=True, | |
... ) | |
... print("result2.update_info", result2.update_info) | |
... | |
>>> asyncio.run(do_update_many(my_async_coll)) | |
result0.update_info {'n': 2, 'updatedExisting': True, 'ok': 1.0, 'nModified': 2} | |
result1.update_info {'n': 0, 'updatedExisting': False, 'ok': 1.0, 'nModified': 0} | |
result2.update_info {'n': 1, 'updatedExisting': False, 'ok': 1.0, 'nModified': 0, 'upserted': '79ffd5a3-ab99-4dff-a2a5-4aaa0e59e854'} | |
Note: | |
Similarly to the case of `find` (see its docstring for more details), | |
running this command while, at the same time, another process is | |
inserting new documents which match the filter of the `update_many` | |
can result in an unpredictable fraction of these documents being updated. | |
In other words, it cannot be easily predicted whether a given | |
newly-inserted document will be picked up by the update_many command or not. | |
""" | |
base_options = { | |
"upsert": upsert, | |
} | |
page_state_options: Dict[str, str] = {} | |
um_responses: List[Dict[str, Any]] = [] | |
um_statuses: List[Dict[str, Any]] = [] | |
must_proceed = True | |
timeout_manager = MultiCallTimeoutManager(overall_max_time_ms=max_time_ms) | |
logger.info(f"starting update_many on '{self.name}'") | |
while must_proceed: | |
options = {**base_options, **page_state_options} | |
logger.info(f"calling update_many on '{self.name}'") | |
this_um_response = await self._astra_db_collection.update_many( | |
update=update, | |
filter=filter, | |
options=options, | |
timeout_info=timeout_manager.remaining_timeout_info(), | |
) | |
logger.info(f"finished calling update_many on '{self.name}'") | |
this_um_status = this_um_response.get("status") or {} | |
# | |
# if errors, quit early | |
if this_um_response.get("errors", []): | |
partial_update_info = _prepare_update_info(um_statuses) | |
partial_result = UpdateResult( | |
raw_results=um_responses, | |
update_info=partial_update_info, | |
) | |
all_um_responses = um_responses + [this_um_response] | |
raise UpdateManyException.from_responses( | |
commands=[None for _ in all_um_responses], | |
raw_responses=all_um_responses, | |
partial_result=partial_result, | |
) | |
else: | |
if "status" not in this_um_response: | |
raise DataAPIFaultyResponseException( | |
text="Faulty response from update_many API command.", | |
raw_response=this_um_response, | |
) | |
um_responses.append(this_um_response) | |
um_statuses.append(this_um_status) | |
next_page_state = this_um_status.get("nextPageState") | |
if next_page_state is not None: | |
must_proceed = True | |
page_state_options = {"pageState": next_page_state} | |
else: | |
must_proceed = False | |
page_state_options = {} | |
update_info = _prepare_update_info(um_statuses) | |
logger.info(f"finished update_many on '{self.name}'") | |
return UpdateResult( | |
raw_results=um_responses, | |
update_info=update_info, | |
) | |
async def find_one_and_delete( | |
self, | |
filter: Dict[str, Any], | |
*, | |
projection: Optional[ProjectionType] = None, | |
vector: Optional[VectorType] = None, | |
vectorize: Optional[str] = None, | |
sort: Optional[SortType] = None, | |
max_time_ms: Optional[int] = None, | |
) -> Union[DocumentType, None]: | |
""" | |
Find a document in the collection and delete it. The deleted document, | |
however, is the return value of the method. | |
Args: | |
filter: a predicate expressed as a dictionary according to the | |
Data API filter syntax. Examples are: | |
{} | |
{"name": "John"} | |
{"price": {"$le": 100}} | |
{"$and": [{"name": "John"}, {"price": {"$le": 100}}]} | |
See the Data API documentation for the full set of operators. | |
projection: used to select a subset of fields in the document being | |
returned. The projection can be: an iterable over the field names | |
to return; a dictionary {field_name: True} to positively select | |
certain fields; or a dictionary {field_name: False} if one wants | |
to discard some fields from the response. | |
Note that the `_id` field will be returned with the document | |
in any case, regardless of what the provided `projection` requires. | |
The default is to return the whole documents. | |
vector: a suitable vector, i.e. a list of float numbers of the appropriate | |
dimensionality, to use vector search (i.e. ANN, | |
or "approximate nearest-neighbours" search), as the sorting criterion. | |
In this way, the matched document (if any) will be the one | |
that is most similar to the provided vector. | |
This parameter cannot be used together with `sort`. | |
See the `find` method for more details on this parameter. | |
vectorize: a string to be made into a vector to perform vector search. | |
This can be supplied in (exclusive) alternative to `vector`, | |
provided such a service is configured for the collection, | |
and achieves the same effect. | |
NOTE: This feature is under current development. | |
sort: with this dictionary parameter one can control the sorting | |
order of the documents matching the filter, effectively | |
determining what document will come first and hence be the | |
deleted one. See the `find` method for more on sorting. | |
max_time_ms: a timeout, in milliseconds, for the underlying HTTP request. | |
Returns: | |
Either the document (or a projection thereof, as requested), or None | |
if no matches were found in the first place. | |
Example: | |
>>> async def do_find_one_and_delete(acol: AsyncCollection) -> None: | |
... await acol.insert_many( | |
... [ | |
... {"species": "swan", "class": "Aves"}, | |
... {"species": "frog", "class": "Amphibia"}, | |
... ], | |
... ) | |
... delete_result0 = await acol.find_one_and_delete( | |
... {"species": {"$ne": "frog"}}, | |
... projection=["species"], | |
... ) | |
... print("delete_result0", delete_result0) | |
... delete_result1 = await acol.find_one_and_delete( | |
... {"species": {"$ne": "frog"}}, | |
... ) | |
... print("delete_result1", delete_result1) | |
... | |
>>> asyncio.run(do_find_one_and_delete(my_async_coll)) | |
delete_result0 {'_id': 'f335cd0f-...', 'species': 'swan'} | |
delete_result1 None | |
""" | |
_sort = _collate_vector_to_sort(sort, vector, vectorize) | |
_projection = normalize_optional_projection(projection) | |
logger.info(f"calling find_one_and_delete on '{self.name}'") | |
fo_response = await self._astra_db_collection.find_one_and_delete( | |
sort=_sort, | |
filter=filter, | |
projection=_projection, | |
timeout_info=base_timeout_info(max_time_ms), | |
) | |
logger.info(f"finished calling find_one_and_delete on '{self.name}'") | |
if "document" in fo_response.get("data", {}): | |
document = fo_response["data"]["document"] | |
return document # type: ignore[no-any-return] | |
else: | |
deleted_count = fo_response.get("status", {}).get("deletedCount") | |
if deleted_count == 0: | |
return None | |
else: | |
raise DataAPIFaultyResponseException( | |
text="Faulty response from find_one_and_delete API command.", | |
raw_response=fo_response, | |
) | |
async def delete_one( | |
self, | |
filter: Dict[str, Any], | |
*, | |
vector: Optional[VectorType] = None, | |
vectorize: Optional[str] = None, | |
sort: Optional[SortType] = None, | |
max_time_ms: Optional[int] = None, | |
) -> DeleteResult: | |
""" | |
Delete one document matching a provided filter. | |
This method never deletes more than a single document, regardless | |
of the number of matches to the provided filters. | |
Args: | |
filter: a predicate expressed as a dictionary according to the | |
Data API filter syntax. Examples are: | |
{} | |
{"name": "John"} | |
{"price": {"$le": 100}} | |
{"$and": [{"name": "John"}, {"price": {"$le": 100}}]} | |
See the Data API documentation for the full set of operators. | |
vector: a suitable vector, i.e. a list of float numbers of the appropriate | |
dimensionality, to use vector search (i.e. ANN, | |
or "approximate nearest-neighbours" search), as the sorting criterion. | |
In this way, the matched document (if any) will be the one | |
that is most similar to the provided vector. | |
This parameter cannot be used together with `sort`. | |
See the `find` method for more details on this parameter. | |
vectorize: a string to be made into a vector to perform vector search. | |
This can be supplied in (exclusive) alternative to `vector`, | |
provided such a service is configured for the collection, | |
and achieves the same effect. | |
NOTE: This feature is under current development. | |
sort: with this dictionary parameter one can control the sorting | |
order of the documents matching the filter, effectively | |
determining what document will come first and hence be the | |
deleted one. See the `find` method for more on sorting. | |
max_time_ms: a timeout, in milliseconds, for the underlying HTTP request. | |
Returns: | |
a DeleteResult object summarizing the outcome of the delete operation. | |
Example: | |
>>> my_coll.insert_many([{"seq": 1}, {"seq": 0}, {"seq": 2}]) | |
InsertManyResult(...) | |
>>> my_coll.delete_one({"seq": 1}) | |
DeleteResult(raw_results=..., deleted_count=1) | |
>>> my_coll.distinct("seq") | |
[0, 2] | |
>>> my_coll.delete_one( | |
... {"seq": {"$exists": True}}, | |
... sort={"seq": astrapy.constants.SortDocuments.DESCENDING}, | |
... ) | |
DeleteResult(raw_results=..., deleted_count=1) | |
>>> my_coll.distinct("seq") | |
[0] | |
>>> my_coll.delete_one({"seq": 2}) | |
DeleteResult(raw_results=..., deleted_count=0) | |
""" | |
_sort = _collate_vector_to_sort(sort, vector, vectorize) | |
logger.info(f"calling delete_one_by_predicate on '{self.name}'") | |
do_response = await self._astra_db_collection.delete_one_by_predicate( | |
filter=filter, | |
timeout_info=base_timeout_info(max_time_ms), | |
sort=_sort, | |
) | |
logger.info(f"finished calling delete_one_by_predicate on '{self.name}'") | |
if "deletedCount" in do_response.get("status", {}): | |
deleted_count = do_response["status"]["deletedCount"] | |
if deleted_count == -1: | |
return DeleteResult( | |
deleted_count=None, | |
raw_results=[do_response], | |
) | |
else: | |
# expected a non-negative integer: | |
return DeleteResult( | |
deleted_count=deleted_count, | |
raw_results=[do_response], | |
) | |
else: | |
raise DataAPIFaultyResponseException( | |
text="Faulty response from delete_one API command.", | |
raw_response=do_response, | |
) | |
async def delete_many( | |
self, | |
filter: Dict[str, Any], | |
*, | |
max_time_ms: Optional[int] = None, | |
) -> DeleteResult: | |
""" | |
Delete all documents matching a provided filter. | |
Args: | |
filter: a predicate expressed as a dictionary according to the | |
Data API filter syntax. Examples are: | |
{} | |
{"name": "John"} | |
{"price": {"$le": 100}} | |
{"$and": [{"name": "John"}, {"price": {"$le": 100}}]} | |
See the Data API documentation for the full set of operators. | |
The `delete_many` method does not accept an empty filter: see | |
`delete_all` to completely erase all contents of a collection | |
max_time_ms: a timeout, in milliseconds, for the operation. | |
Returns: | |
a DeleteResult object summarizing the outcome of the delete operation. | |
Example: | |
>>> async def do_delete_many(acol: AsyncCollection) -> None: | |
... await acol.insert_many([{"seq": 1}, {"seq": 0}, {"seq": 2}]) | |
... delete_result0 = await acol.delete_many({"seq": {"$lte": 1}}) | |
... print("delete_result0.deleted_count", delete_result0.deleted_count) | |
... distinct1 = await acol.distinct("seq") | |
... print("distinct1", distinct1) | |
... delete_result2 = await acol.delete_many({"seq": {"$lte": 1}}) | |
... print("delete_result2.deleted_count", delete_result2.deleted_count) | |
... | |
>>> asyncio.run(do_delete_many(my_async_coll)) | |
delete_result0.deleted_count 2 | |
distinct1 [2] | |
delete_result2.deleted_count 0 | |
Note: | |
This operation is not atomic. Depending on the amount of matching | |
documents, it can keep running (in a blocking way) for a macroscopic | |
time. In that case, new documents that are meanwhile inserted | |
(e.g. from another process/application) will be deleted during | |
the execution of this method call until the collection is devoid | |
of matches. | |
""" | |
if not filter: | |
raise ValueError( | |
"The `filter` parameter to method `delete_many` cannot be " | |
"empty. In order to completely clear the contents of a " | |
"collection, please use the `delete_all` method." | |
) | |
dm_responses: List[Dict[str, Any]] = [] | |
deleted_count = 0 | |
must_proceed = True | |
timeout_manager = MultiCallTimeoutManager(overall_max_time_ms=max_time_ms) | |
logger.info(f"starting delete_many on '{self.name}'") | |
while must_proceed: | |
logger.info(f"calling delete_many on '{self.name}'") | |
this_dm_response = await self._astra_db_collection.delete_many( | |
filter=filter, | |
skip_error_check=True, | |
timeout_info=timeout_manager.remaining_timeout_info(), | |
) | |
logger.info(f"finished calling delete_many on '{self.name}'") | |
# if errors, quit early | |
if this_dm_response.get("errors", []): | |
partial_result = DeleteResult( | |
deleted_count=deleted_count, | |
raw_results=dm_responses, | |
) | |
all_dm_responses = dm_responses + [this_dm_response] | |
raise DeleteManyException.from_responses( | |
commands=[None for _ in all_dm_responses], | |
raw_responses=all_dm_responses, | |
partial_result=partial_result, | |
) | |
else: | |
this_dc = this_dm_response.get("status", {}).get("deletedCount") | |
if this_dc is None or this_dc < 0: | |
raise DataAPIFaultyResponseException( | |
text="Faulty response from delete_many API command.", | |
raw_response=this_dm_response, | |
) | |
dm_responses.append(this_dm_response) | |
deleted_count += this_dc | |
must_proceed = this_dm_response.get("status", {}).get("moreData", False) | |
logger.info(f"finished delete_many on '{self.name}'") | |
return DeleteResult( | |
deleted_count=deleted_count, | |
raw_results=dm_responses, | |
) | |
async def delete_all(self, *, max_time_ms: Optional[int] = None) -> Dict[str, Any]: | |
""" | |
Delete all documents in a collection. | |
Args: | |
max_time_ms: a timeout, in milliseconds, for the underlying HTTP request. | |
Returns: | |
a dictionary of the form {"ok": 1} to signal successful deletion. | |
Example: | |
>>> async def do_delete_all(acol: AsyncCollection) -> None: | |
... distinct0 = await acol.distinct("seq") | |
... print("distinct0", distinct0) | |
... count1 = await acol.count_documents({}, upper_bound=100) | |
... print("count1", count1) | |
... delete_result2 = await acol.delete_all() | |
... print("delete_result2", delete_result2) | |
... count3 = await acol.count_documents({}, upper_bound=100) | |
... print("count3", count3) | |
... | |
>>> asyncio.run(do_delete_all(my_async_coll)) | |
distinct0 [4, 2, 3, 0, 1] | |
count1 5 | |
delete_result2 {'ok': 1} | |
count3 0 | |
Note: | |
Use with caution. | |
Note: | |
Once the method succeeds, methods on this object can still be invoked: | |
however, this hardly makes sense as the underlying actual collection | |
is no more. | |
It is responsibility of the developer to design a correct flow | |
which avoids using a deceased collection any further. | |
""" | |
logger.info(f"calling unfiltered delete_many on '{self.name}'") | |
dm_response = await self._astra_db_collection.delete_many( | |
filter={}, timeout_info=base_timeout_info(max_time_ms) | |
) | |
logger.info(f"finished calling unfiltered delete_many on '{self.name}'") | |
deleted_count = dm_response["status"]["deletedCount"] | |
if deleted_count == -1: | |
return {"ok": 1} | |
else: | |
raise DataAPIFaultyResponseException( | |
text="Faulty response from delete_many API command.", | |
raw_response=dm_response, | |
) | |
async def bulk_write( | |
self, | |
requests: Iterable[AsyncBaseOperation], | |
*, | |
ordered: bool = True, | |
concurrency: Optional[int] = None, | |
max_time_ms: Optional[int] = None, | |
) -> BulkWriteResult: | |
""" | |
Execute an arbitrary amount of operations such as inserts, updates, deletes | |
either sequentially or concurrently. | |
This method does not execute atomically, i.e. individual operations are | |
each performed in the same way as the corresponding collection method, | |
and each one is a different and unrelated database mutation. | |
Args: | |
requests: an iterable over concrete subclasses of `BaseOperation`, | |
such as `AsyncInsertMany` or `AsyncReplaceOne`. Each such object | |
represents an operation ready to be executed on a collection, | |
and is instantiated by passing the same parameters as one | |
would the corresponding collection method. | |
ordered: whether to launch the `requests` one after the other or | |
in arbitrary order, possibly in a concurrent fashion. For | |
performance reasons, `ordered=False` should be preferred | |
when compatible with the needs of the application flow. | |
concurrency: maximum number of concurrent operations executing at | |
a given time. It cannot be more than one for ordered bulk writes. | |
max_time_ms: a timeout, in milliseconds, for the whole bulk write. | |
Remember that, if the method call times out, then there's no | |
guarantee about what portion of the bulk write has been received | |
and successfully executed by the Data API. | |
Returns: | |
A single BulkWriteResult summarizing the whole list of requested | |
operations. The keys in the map attributes of BulkWriteResult | |
(when present) are the integer indices of the corresponding operation | |
in the `requests` iterable. | |
Example: | |
>>> from astrapy.operations import AsyncInsertMany, AsyncReplaceOne, AsyncOperation | |
>>> from astrapy.results import BulkWriteResult | |
>>> | |
>>> async def do_bulk_write( | |
... acol: AsyncCollection, | |
... async_operations: List[AsyncOperation], | |
... ) -> BulkWriteResult: | |
... bw_result = await acol.bulk_write(async_operations) | |
... count0 = await acol.count_documents({}, upper_bound=100) | |
... print("count0", count0) | |
... distinct0 = await acol.distinct("replaced") | |
... print("distinct0", distinct0) | |
... return bw_result | |
... | |
>>> op1 = AsyncInsertMany([{"a": 1}, {"a": 2}]) | |
>>> op2 = AsyncReplaceOne( | |
... {"z": 9}, | |
... replacement={"z": 9, "replaced": True}, | |
... upsert=True, | |
... ) | |
>>> result = asyncio.run(do_bulk_write(my_async_coll, [op1, op2])) | |
count0 3 | |
distinct0 [True] | |
>>> print("result", result) | |
result BulkWriteResult(bulk_api_results={0: ..., 1: ...}, deleted_count=0, inserted_count=3, matched_count=0, modified_count=0, upserted_count=1, upserted_ids={1: 'ccd0a800-...'}) | |
""" | |
# lazy importing here against circular-import error | |
from astrapy.operations import reduce_bulk_write_results | |
if concurrency is None: | |
if ordered: | |
_concurrency = 1 | |
else: | |
_concurrency = DEFAULT_BULK_WRITE_CONCURRENCY | |
else: | |
_concurrency = concurrency | |
if _concurrency > 1 and ordered: | |
raise ValueError("Cannot run ordered bulk_write concurrently.") | |
timeout_manager = MultiCallTimeoutManager(overall_max_time_ms=max_time_ms) | |
logger.info(f"startng a bulk write on '{self.name}'") | |
if ordered: | |
bulk_write_results: List[BulkWriteResult] = [] | |
for operation_i, operation in enumerate(requests): | |
try: | |
this_bw_result = await operation.execute( | |
self, | |
index_in_bulk_write=operation_i, | |
bulk_write_timeout_ms=timeout_manager.remaining_timeout_ms(), | |
) | |
bulk_write_results.append(this_bw_result) | |
except CumulativeOperationException as exc: | |
partial_result = exc.partial_result | |
partial_bw_result = reduce_bulk_write_results( | |
bulk_write_results | |
+ [ | |
partial_result.to_bulk_write_result( | |
index_in_bulk_write=operation_i | |
) | |
] | |
) | |
dar_exception = exc.data_api_response_exception() | |
raise BulkWriteException( | |
text=dar_exception.text, | |
error_descriptors=dar_exception.error_descriptors, | |
detailed_error_descriptors=dar_exception.detailed_error_descriptors, | |
partial_result=partial_bw_result, | |
exceptions=[dar_exception], | |
) | |
except DataAPIResponseException as exc: | |
# the cumulative exceptions, with their | |
# partially-done-info, are handled above: | |
# here it's just one-shot d.a.r. exceptions | |
partial_bw_result = reduce_bulk_write_results(bulk_write_results) | |
dar_exception = exc.data_api_response_exception() | |
raise BulkWriteException( | |
text=dar_exception.text, | |
error_descriptors=dar_exception.error_descriptors, | |
detailed_error_descriptors=dar_exception.detailed_error_descriptors, | |
partial_result=partial_bw_result, | |
exceptions=[dar_exception], | |
) | |
full_bw_result = reduce_bulk_write_results(bulk_write_results) | |
logger.info(f"finished a bulk write on '{self.name}'") | |
return full_bw_result | |
else: | |
sem = asyncio.Semaphore(_concurrency) | |
async def _concurrent_execute_as_either( | |
operation: AsyncBaseOperation, operation_i: int | |
) -> Tuple[Optional[BulkWriteResult], Optional[DataAPIResponseException]]: | |
async with sem: | |
try: | |
ex_result = await operation.execute( | |
self, | |
index_in_bulk_write=operation_i, | |
bulk_write_timeout_ms=timeout_manager.remaining_timeout_ms(), | |
) | |
return (ex_result, None) | |
except DataAPIResponseException as exc: | |
return (None, exc) | |
tasks = [ | |
asyncio.create_task( | |
_concurrent_execute_as_either(operation, operation_i) | |
) | |
for operation_i, operation in enumerate(requests) | |
] | |
bulk_write_either_results = await asyncio.gather(*tasks) | |
# regroup | |
bulk_write_successes = [bwr for bwr, _ in bulk_write_either_results if bwr] | |
bulk_write_failures = [bwf for _, bwf in bulk_write_either_results if bwf] | |
if bulk_write_failures: | |
# extract and cumulate | |
partial_results_from_failures = [ | |
failure.partial_result.to_bulk_write_result( | |
index_in_bulk_write=operation_i | |
) | |
for failure in bulk_write_failures | |
if isinstance(failure, CumulativeOperationException) | |
] | |
partial_bw_result = reduce_bulk_write_results( | |
bulk_write_successes + partial_results_from_failures | |
) | |
# raise and recast the first exception | |
all_dar_exceptions = [ | |
bw_failure.data_api_response_exception() | |
for bw_failure in bulk_write_failures | |
] | |
dar_exception = all_dar_exceptions[0] | |
raise BulkWriteException( | |
text=dar_exception.text, | |
error_descriptors=dar_exception.error_descriptors, | |
detailed_error_descriptors=dar_exception.detailed_error_descriptors, | |
partial_result=partial_bw_result, | |
exceptions=all_dar_exceptions, | |
) | |
else: | |
logger.info(f"finished a bulk write on '{self.name}'") | |
return reduce_bulk_write_results(bulk_write_successes) | |
async def drop(self, *, max_time_ms: Optional[int] = None) -> Dict[str, Any]: | |
""" | |
Drop the collection, i.e. delete it from the database along with | |
all the documents it contains. | |
Args: | |
max_time_ms: a timeout, in milliseconds, for the underlying HTTP request. | |
Remember there is not guarantee that a request that has | |
timed out us not in fact honored. | |
Returns: | |
a dictionary of the form {"ok": 1} to signal successful deletion. | |
Example: | |
>>> async def drop_and_check(acol: AsyncCollection) -> None: | |
... doc0 = await acol.find_one({}) | |
... print("doc0", doc0) | |
... drop_result = await acol.drop() | |
... print("drop_result", drop_result) | |
... doc1 = await acol.find_one({}) | |
... | |
>>> asyncio.run(drop_and_check(my_async_coll)) | |
doc0 {'_id': '...', 'z': -10} | |
drop_result {'ok': 1} | |
Traceback (most recent call last): | |
... ... | |
astrapy.exceptions.DataAPIResponseException: Collection does not exist, collection name: my_collection | |
Note: | |
Use with caution. | |
Note: | |
Once the method succeeds, methods on this object can still be invoked: | |
however, this hardly makes sense as the underlying actual collection | |
is no more. | |
It is responsibility of the developer to design a correct flow | |
which avoids using a deceased collection any further. | |
""" | |
logger.info(f"dropping collection '{self.name}' (self)") | |
drop_result = await self.database.drop_collection(self, max_time_ms=max_time_ms) | |
logger.info(f"finished dropping collection '{self.name}' (self)") | |
return drop_result # type: ignore[no-any-return] | |
async def command( | |
self, | |
body: Dict[str, Any], | |
*, | |
max_time_ms: Optional[int] = None, | |
) -> Dict[str, Any]: | |
""" | |
Send a POST request to the Data API for this collection with | |
an arbitrary, caller-provided payload. | |
Args: | |
body: a JSON-serializable dictionary, the payload of the request. | |
max_time_ms: a timeout, in milliseconds, for the underlying HTTP request. | |
Returns: | |
a dictionary with the response of the HTTP request. | |
Example: | |
>>> asyncio.await(my_async_coll.command({"countDocuments": {}})) | |
{'status': {'count': 123}} | |
""" | |
logger.info(f"calling command on '{self.name}'") | |
command_result = await self.database.command( | |
body=body, | |
namespace=self.namespace, | |
collection_name=self.name, | |
max_time_ms=max_time_ms, | |
) | |
logger.info(f"finished calling command on '{self.name}'") | |
return command_result # type: ignore[no-any-return] | |