Spaces:
Running
Running
# Copyright DataStax, Inc. | |
# | |
# Licensed under the Apache License, Version 2.0 (the "License"); | |
# you may not use this file except in compliance with the License. | |
# You may obtain a copy of the License at | |
# | |
# http://www.apache.org/licenses/LICENSE-2.0 | |
# | |
# Unless required by applicable law or agreed to in writing, software | |
# distributed under the License is distributed on an "AS IS" BASIS, | |
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
# See the License for the specific language governing permissions and | |
# limitations under the License. | |
from __future__ import annotations | |
from abc import ABC, abstractmethod | |
from dataclasses import dataclass | |
from functools import reduce | |
from typing import ( | |
Any, | |
Dict, | |
Iterable, | |
List, | |
Optional, | |
) | |
from astrapy.constants import DocumentType, SortType, VectorType | |
from astrapy.results import ( | |
BulkWriteResult, | |
DeleteResult, | |
InsertManyResult, | |
InsertOneResult, | |
UpdateResult, | |
) | |
from astrapy.collection import AsyncCollection, Collection | |
def reduce_bulk_write_results(results: List[BulkWriteResult]) -> BulkWriteResult: | |
""" | |
Reduce a list of bulk write results into a single one. | |
Args: | |
results: a list of BulkWriteResult instances. | |
Returns: | |
A new BulkWRiteResult object which summarized the whole input list. | |
""" | |
zero = BulkWriteResult.zero() | |
def _sum_results(r1: BulkWriteResult, r2: BulkWriteResult) -> BulkWriteResult: | |
bulk_api_results = {**r1.bulk_api_results, **r2.bulk_api_results} | |
if r1.deleted_count is None or r2.deleted_count is None: | |
deleted_count = None | |
else: | |
deleted_count = r1.deleted_count + r2.deleted_count | |
inserted_count = r1.inserted_count + r2.inserted_count | |
matched_count = r1.matched_count + r2.matched_count | |
modified_count = r1.modified_count + r2.modified_count | |
upserted_count = r1.upserted_count + r2.upserted_count | |
upserted_ids = {**r1.upserted_ids, **r2.upserted_ids} | |
return BulkWriteResult( | |
bulk_api_results=bulk_api_results, | |
deleted_count=deleted_count, | |
inserted_count=inserted_count, | |
matched_count=matched_count, | |
modified_count=modified_count, | |
upserted_count=upserted_count, | |
upserted_ids=upserted_ids, | |
) | |
return reduce(_sum_results, results, zero) | |
class BaseOperation(ABC): | |
""" | |
Base class for all operations amenable to be used | |
in bulk writes on (sync) collections. | |
""" | |
def execute( | |
self, | |
collection: Collection, | |
index_in_bulk_write: int, | |
bulk_write_timeout_ms: Optional[int], | |
) -> BulkWriteResult: ... | |
class InsertOne(BaseOperation): | |
""" | |
Represents an `insert_one` operation on a (sync) collection. | |
See the documentation on the collection method for more information. | |
Attributes: | |
document: the document to insert. | |
vector: an optional suitable vector to enrich the document at insertion. | |
vectorize: a string to be made into a vector, with the same result as the | |
`vector` attribute, through an embedding service, assuming one is | |
configured for the collection. | |
NOTE: This feature is under current development. | |
""" | |
document: DocumentType | |
vector: Optional[VectorType] | |
vectorize: Optional[str] | |
def __init__( | |
self, | |
document: DocumentType, | |
*, | |
vector: Optional[VectorType] = None, | |
vectorize: Optional[str] = None, | |
) -> None: | |
self.document = document | |
self.vector = vector | |
self.vectorize = vectorize | |
def execute( | |
self, | |
collection: Collection, | |
index_in_bulk_write: int, | |
bulk_write_timeout_ms: Optional[int], | |
) -> BulkWriteResult: | |
""" | |
Execute this operation against a collection as part of a bulk write. | |
Args: | |
collection: the collection this write targets. | |
insert_in_bulk_write: the index in the list of bulkoperations | |
""" | |
op_result: InsertOneResult = collection.insert_one( | |
document=self.document, | |
vector=self.vector, | |
vectorize=self.vectorize, | |
max_time_ms=bulk_write_timeout_ms, | |
) | |
return op_result.to_bulk_write_result(index_in_bulk_write=index_in_bulk_write) | |
class InsertMany(BaseOperation): | |
""" | |
Represents an `insert_many` operation on a (sync) collection. | |
See the documentation on the collection method for more information. | |
Attributes: | |
documents: the list document to insert. | |
vectors: an optional list of vectors to enrich the documents at insertion. | |
vectorize: an optional list of texts achieving the same effect as `vectors` | |
except through an embedding service, if one is configured for the collection. | |
NOTE: This feature is under current development. | |
ordered: whether the inserts should be done in sequence. | |
chunk_size: how many documents to include in a single API request. | |
Exceeding the server maximum allowed value results in an error. | |
Leave it unspecified (recommended) to use the system default. | |
concurrency: maximum number of concurrent requests to the API at | |
a given time. It cannot be more than one for ordered insertions. | |
""" | |
documents: Iterable[DocumentType] | |
vectors: Optional[Iterable[Optional[VectorType]]] | |
vectorize: Optional[Iterable[Optional[str]]] | |
ordered: bool | |
chunk_size: Optional[int] | |
concurrency: Optional[int] | |
def __init__( | |
self, | |
documents: Iterable[DocumentType], | |
*, | |
vectors: Optional[Iterable[Optional[VectorType]]] = None, | |
vectorize: Optional[Iterable[Optional[str]]] = None, | |
ordered: bool = True, | |
chunk_size: Optional[int] = None, | |
concurrency: Optional[int] = None, | |
) -> None: | |
self.documents = documents | |
self.ordered = ordered | |
self.vectors = vectors | |
self.vectorize = vectorize | |
self.chunk_size = chunk_size | |
self.concurrency = concurrency | |
def execute( | |
self, | |
collection: Collection, | |
index_in_bulk_write: int, | |
bulk_write_timeout_ms: Optional[int], | |
) -> BulkWriteResult: | |
""" | |
Execute this operation against a collection as part of a bulk write. | |
Args: | |
collection: the collection this write targets. | |
insert_in_bulk_write: the index in the list of bulkoperations | |
""" | |
op_result: InsertManyResult = collection.insert_many( | |
documents=self.documents, | |
vectors=self.vectors, | |
vectorize=self.vectorize, | |
ordered=self.ordered, | |
chunk_size=self.chunk_size, | |
concurrency=self.concurrency, | |
max_time_ms=bulk_write_timeout_ms, | |
) | |
return op_result.to_bulk_write_result(index_in_bulk_write=index_in_bulk_write) | |
class UpdateOne(BaseOperation): | |
""" | |
Represents an `update_one` operation on a (sync) collection. | |
See the documentation on the collection method for more information. | |
Attributes: | |
filter: a filter condition to select a target document. | |
update: an update prescription to apply to the document. | |
vector: a vector of numbers to use for ANN (vector-search) sorting. | |
vectorize: a string to be made into a vector, with the same result as the | |
`vector` attribute, through an embedding service, assuming one is | |
configured for the collection. | |
NOTE: This feature is under current development. | |
sort: controls ordering of results, hence which document is affected. | |
upsert: controls what to do when no documents are found. | |
""" | |
filter: Dict[str, Any] | |
update: Dict[str, Any] | |
vector: Optional[VectorType] | |
vectorize: Optional[str] | |
sort: Optional[SortType] | |
upsert: bool | |
def __init__( | |
self, | |
filter: Dict[str, Any], | |
update: Dict[str, Any], | |
*, | |
vector: Optional[VectorType] = None, | |
vectorize: Optional[str] = None, | |
sort: Optional[SortType] = None, | |
upsert: bool = False, | |
) -> None: | |
self.filter = filter | |
self.update = update | |
self.vector = vector | |
self.vectorize = vectorize | |
self.sort = sort | |
self.upsert = upsert | |
def execute( | |
self, | |
collection: Collection, | |
index_in_bulk_write: int, | |
bulk_write_timeout_ms: Optional[int], | |
) -> BulkWriteResult: | |
""" | |
Execute this operation against a collection as part of a bulk write. | |
Args: | |
collection: the collection this write targets. | |
insert_in_bulk_write: the index in the list of bulkoperations | |
""" | |
op_result: UpdateResult = collection.update_one( | |
filter=self.filter, | |
update=self.update, | |
vector=self.vector, | |
vectorize=self.vectorize, | |
sort=self.sort, | |
upsert=self.upsert, | |
max_time_ms=bulk_write_timeout_ms, | |
) | |
return op_result.to_bulk_write_result(index_in_bulk_write=index_in_bulk_write) | |
class UpdateMany(BaseOperation): | |
""" | |
Represents an `update_many` operation on a (sync) collection. | |
See the documentation on the collection method for more information. | |
Attributes: | |
filter: a filter condition to select target documents. | |
update: an update prescription to apply to the documents. | |
upsert: controls what to do when no documents are found. | |
""" | |
filter: Dict[str, Any] | |
update: Dict[str, Any] | |
upsert: bool | |
def __init__( | |
self, | |
filter: Dict[str, Any], | |
update: Dict[str, Any], | |
*, | |
upsert: bool = False, | |
) -> None: | |
self.filter = filter | |
self.update = update | |
self.upsert = upsert | |
def execute( | |
self, | |
collection: Collection, | |
index_in_bulk_write: int, | |
bulk_write_timeout_ms: Optional[int], | |
) -> BulkWriteResult: | |
""" | |
Execute this operation against a collection as part of a bulk write. | |
Args: | |
collection: the collection this write targets. | |
insert_in_bulk_write: the index in the list of bulkoperations | |
""" | |
op_result: UpdateResult = collection.update_many( | |
filter=self.filter, | |
update=self.update, | |
upsert=self.upsert, | |
max_time_ms=bulk_write_timeout_ms, | |
) | |
return op_result.to_bulk_write_result(index_in_bulk_write=index_in_bulk_write) | |
class ReplaceOne(BaseOperation): | |
""" | |
Represents a `replace_one` operation on a (sync) collection. | |
See the documentation on the collection method for more information. | |
Attributes: | |
filter: a filter condition to select a target document. | |
replacement: the replacement document. | |
vector: a vector of numbers to use for ANN (vector-search) sorting. | |
vectorize: a string to be made into a vector, with the same result as the | |
`vector` attribute, through an embedding service, assuming one is | |
configured for the collection. | |
NOTE: This feature is under current development. | |
sort: controls ordering of results, hence which document is affected. | |
upsert: controls what to do when no documents are found. | |
""" | |
filter: Dict[str, Any] | |
replacement: DocumentType | |
vector: Optional[VectorType] | |
vectorize: Optional[str] | |
sort: Optional[SortType] | |
upsert: bool | |
def __init__( | |
self, | |
filter: Dict[str, Any], | |
replacement: DocumentType, | |
*, | |
vector: Optional[VectorType] = None, | |
vectorize: Optional[str] = None, | |
sort: Optional[SortType] = None, | |
upsert: bool = False, | |
) -> None: | |
self.filter = filter | |
self.replacement = replacement | |
self.vector = vector | |
self.vectorize = vectorize | |
self.sort = sort | |
self.upsert = upsert | |
def execute( | |
self, | |
collection: Collection, | |
index_in_bulk_write: int, | |
bulk_write_timeout_ms: Optional[int], | |
) -> BulkWriteResult: | |
""" | |
Execute this operation against a collection as part of a bulk write. | |
Args: | |
collection: the collection this write targets. | |
insert_in_bulk_write: the index in the list of bulkoperations | |
""" | |
op_result: UpdateResult = collection.replace_one( | |
filter=self.filter, | |
replacement=self.replacement, | |
vector=self.vector, | |
vectorize=self.vectorize, | |
sort=self.sort, | |
upsert=self.upsert, | |
max_time_ms=bulk_write_timeout_ms, | |
) | |
return op_result.to_bulk_write_result(index_in_bulk_write=index_in_bulk_write) | |
class DeleteOne(BaseOperation): | |
""" | |
Represents a `delete_one` operation on a (sync) collection. | |
See the documentation on the collection method for more information. | |
Attributes: | |
filter: a filter condition to select a target document. | |
vector: a vector of numbers to use for ANN (vector-search) sorting. | |
vectorize: a string to be made into a vector, with the same result as the | |
`vector` attribute, through an embedding service, assuming one is | |
configured for the collection. | |
NOTE: This feature is under current development. | |
sort: controls ordering of results, hence which document is affected. | |
""" | |
filter: Dict[str, Any] | |
vector: Optional[VectorType] | |
vectorize: Optional[str] | |
sort: Optional[SortType] | |
def __init__( | |
self, | |
filter: Dict[str, Any], | |
*, | |
vector: Optional[VectorType] = None, | |
vectorize: Optional[str] = None, | |
sort: Optional[SortType] = None, | |
) -> None: | |
self.filter = filter | |
self.vector = vector | |
self.vectorize = vectorize | |
self.sort = sort | |
def execute( | |
self, | |
collection: Collection, | |
index_in_bulk_write: int, | |
bulk_write_timeout_ms: Optional[int], | |
) -> BulkWriteResult: | |
""" | |
Execute this operation against a collection as part of a bulk write. | |
Args: | |
collection: the collection this write targets. | |
insert_in_bulk_write: the index in the list of bulkoperations | |
""" | |
op_result: DeleteResult = collection.delete_one( | |
filter=self.filter, | |
vector=self.vector, | |
vectorize=self.vectorize, | |
sort=self.sort, | |
max_time_ms=bulk_write_timeout_ms, | |
) | |
return op_result.to_bulk_write_result(index_in_bulk_write=index_in_bulk_write) | |
class DeleteMany(BaseOperation): | |
""" | |
Represents a `delete_many` operation on a (sync) collection. | |
See the documentation on the collection method for more information. | |
Attributes: | |
filter: a filter condition to select target documents. | |
""" | |
filter: Dict[str, Any] | |
def __init__( | |
self, | |
filter: Dict[str, Any], | |
) -> None: | |
self.filter = filter | |
def execute( | |
self, | |
collection: Collection, | |
index_in_bulk_write: int, | |
bulk_write_timeout_ms: Optional[int], | |
) -> BulkWriteResult: | |
""" | |
Execute this operation against a collection as part of a bulk write. | |
Args: | |
collection: the collection this write targets. | |
insert_in_bulk_write: the index in the list of bulkoperations | |
""" | |
op_result: DeleteResult = collection.delete_many( | |
filter=self.filter, max_time_ms=bulk_write_timeout_ms | |
) | |
return op_result.to_bulk_write_result(index_in_bulk_write=index_in_bulk_write) | |
class AsyncBaseOperation(ABC): | |
""" | |
Base class for all operations amenable to be used | |
in bulk writes on (async) collections. | |
""" | |
async def execute( | |
self, | |
collection: AsyncCollection, | |
index_in_bulk_write: int, | |
bulk_write_timeout_ms: Optional[int], | |
) -> BulkWriteResult: ... | |
class AsyncInsertOne(AsyncBaseOperation): | |
""" | |
Represents an `insert_one` operation on a (async) collection. | |
See the documentation on the collection method for more information. | |
Attributes: | |
document: the document to insert. | |
vector: an optional suitable vector to enrich the document at insertion. | |
vectorize: a string to be made into a vector, with the same result as the | |
`vector` attribute, through an embedding service, assuming one is | |
configured for the collection. | |
NOTE: This feature is under current development. | |
""" | |
document: DocumentType | |
vector: Optional[VectorType] | |
vectorize: Optional[str] | |
def __init__( | |
self, | |
document: DocumentType, | |
*, | |
vector: Optional[VectorType] = None, | |
vectorize: Optional[str] = None, | |
) -> None: | |
self.document = document | |
self.vector = vector | |
self.vectorize = vectorize | |
async def execute( | |
self, | |
collection: AsyncCollection, | |
index_in_bulk_write: int, | |
bulk_write_timeout_ms: Optional[int], | |
) -> BulkWriteResult: | |
""" | |
Execute this operation against a collection as part of a bulk write. | |
Args: | |
collection: the collection this write targets. | |
insert_in_bulk_write: the index in the list of bulkoperations | |
""" | |
op_result: InsertOneResult = await collection.insert_one( | |
document=self.document, | |
vector=self.vector, | |
vectorize=self.vectorize, | |
max_time_ms=bulk_write_timeout_ms, | |
) | |
return op_result.to_bulk_write_result(index_in_bulk_write=index_in_bulk_write) | |
class AsyncInsertMany(AsyncBaseOperation): | |
""" | |
Represents an `insert_many` operation on a (async) collection. | |
See the documentation on the collection method for more information. | |
Attributes: | |
documents: the list document to insert. | |
vectors: an optional list of vectors to enrich the documents at insertion. | |
vectorize: an optional list of texts achieving the same effect as `vectors` | |
except through an embedding service, if one is configured for the collection. | |
NOTE: This feature is under current development. | |
ordered: whether the inserts should be done in sequence. | |
chunk_size: how many documents to include in a single API request. | |
Exceeding the server maximum allowed value results in an error. | |
Leave it unspecified (recommended) to use the system default. | |
concurrency: maximum number of concurrent requests to the API at | |
a given time. It cannot be more than one for ordered insertions. | |
""" | |
documents: Iterable[DocumentType] | |
vectors: Optional[Iterable[Optional[VectorType]]] | |
vectorize: Optional[Iterable[Optional[str]]] | |
ordered: bool | |
chunk_size: Optional[int] | |
concurrency: Optional[int] | |
def __init__( | |
self, | |
documents: Iterable[DocumentType], | |
*, | |
vectors: Optional[Iterable[Optional[VectorType]]] = None, | |
vectorize: Optional[Iterable[Optional[str]]] = None, | |
ordered: bool = True, | |
chunk_size: Optional[int] = None, | |
concurrency: Optional[int] = None, | |
) -> None: | |
self.documents = documents | |
self.vectors = vectors | |
self.vectorize = vectorize | |
self.ordered = ordered | |
self.chunk_size = chunk_size | |
self.concurrency = concurrency | |
async def execute( | |
self, | |
collection: AsyncCollection, | |
index_in_bulk_write: int, | |
bulk_write_timeout_ms: Optional[int], | |
) -> BulkWriteResult: | |
""" | |
Execute this operation against a collection as part of a bulk write. | |
Args: | |
collection: the collection this write targets. | |
insert_in_bulk_write: the index in the list of bulkoperations | |
""" | |
op_result: InsertManyResult = await collection.insert_many( | |
documents=self.documents, | |
vectors=self.vectors, | |
vectorize=self.vectorize, | |
ordered=self.ordered, | |
chunk_size=self.chunk_size, | |
concurrency=self.concurrency, | |
max_time_ms=bulk_write_timeout_ms, | |
) | |
return op_result.to_bulk_write_result(index_in_bulk_write=index_in_bulk_write) | |
class AsyncUpdateOne(AsyncBaseOperation): | |
""" | |
Represents an `update_one` operation on a (async) collection. | |
See the documentation on the collection method for more information. | |
Attributes: | |
filter: a filter condition to select a target document. | |
update: an update prescription to apply to the document. | |
vector: a vector of numbers to use for ANN (vector-search) sorting. | |
vectorize: a string to be made into a vector, with the same result as the | |
`vector` attribute, through an embedding service, assuming one is | |
configured for the collection. | |
NOTE: This feature is under current development. | |
sort: controls ordering of results, hence which document is affected. | |
upsert: controls what to do when no documents are found. | |
""" | |
filter: Dict[str, Any] | |
update: Dict[str, Any] | |
vector: Optional[VectorType] | |
vectorize: Optional[str] | |
sort: Optional[SortType] | |
upsert: bool | |
def __init__( | |
self, | |
filter: Dict[str, Any], | |
update: Dict[str, Any], | |
*, | |
vector: Optional[VectorType] = None, | |
vectorize: Optional[str] = None, | |
sort: Optional[SortType] = None, | |
upsert: bool = False, | |
) -> None: | |
self.filter = filter | |
self.update = update | |
self.vector = vector | |
self.vectorize = vectorize | |
self.sort = sort | |
self.upsert = upsert | |
async def execute( | |
self, | |
collection: AsyncCollection, | |
index_in_bulk_write: int, | |
bulk_write_timeout_ms: Optional[int], | |
) -> BulkWriteResult: | |
""" | |
Execute this operation against a collection as part of a bulk write. | |
Args: | |
collection: the collection this write targets. | |
insert_in_bulk_write: the index in the list of bulkoperations | |
""" | |
op_result: UpdateResult = await collection.update_one( | |
filter=self.filter, | |
update=self.update, | |
vector=self.vector, | |
vectorize=self.vectorize, | |
sort=self.sort, | |
upsert=self.upsert, | |
max_time_ms=bulk_write_timeout_ms, | |
) | |
return op_result.to_bulk_write_result(index_in_bulk_write=index_in_bulk_write) | |
class AsyncUpdateMany(AsyncBaseOperation): | |
""" | |
Represents an `update_many` operation on a (async) collection. | |
See the documentation on the collection method for more information. | |
Attributes: | |
filter: a filter condition to select target documents. | |
update: an update prescription to apply to the documents. | |
upsert: controls what to do when no documents are found. | |
""" | |
filter: Dict[str, Any] | |
update: Dict[str, Any] | |
upsert: bool | |
def __init__( | |
self, | |
filter: Dict[str, Any], | |
update: Dict[str, Any], | |
*, | |
upsert: bool = False, | |
) -> None: | |
self.filter = filter | |
self.update = update | |
self.upsert = upsert | |
async def execute( | |
self, | |
collection: AsyncCollection, | |
index_in_bulk_write: int, | |
bulk_write_timeout_ms: Optional[int], | |
) -> BulkWriteResult: | |
""" | |
Execute this operation against a collection as part of a bulk write. | |
Args: | |
collection: the collection this write targets. | |
insert_in_bulk_write: the index in the list of bulkoperations | |
""" | |
op_result: UpdateResult = await collection.update_many( | |
filter=self.filter, | |
update=self.update, | |
upsert=self.upsert, | |
max_time_ms=bulk_write_timeout_ms, | |
) | |
return op_result.to_bulk_write_result(index_in_bulk_write=index_in_bulk_write) | |
class AsyncReplaceOne(AsyncBaseOperation): | |
""" | |
Represents a `replace_one` operation on a (async) collection. | |
See the documentation on the collection method for more information. | |
Attributes: | |
filter: a filter condition to select a target document. | |
replacement: the replacement document. | |
vector: a vector of numbers to use for ANN (vector-search) sorting. | |
vectorize: a string to be made into a vector, with the same result as the | |
`vector` attribute, through an embedding service, assuming one is | |
configured for the collection. | |
NOTE: This feature is under current development. | |
sort: controls ordering of results, hence which document is affected. | |
upsert: controls what to do when no documents are found. | |
""" | |
filter: Dict[str, Any] | |
replacement: DocumentType | |
vector: Optional[VectorType] | |
vectorize: Optional[str] | |
sort: Optional[SortType] | |
upsert: bool | |
def __init__( | |
self, | |
filter: Dict[str, Any], | |
replacement: DocumentType, | |
*, | |
vector: Optional[VectorType] = None, | |
vectorize: Optional[str] = None, | |
sort: Optional[SortType] = None, | |
upsert: bool = False, | |
) -> None: | |
self.filter = filter | |
self.replacement = replacement | |
self.vector = vector | |
self.vectorize = vectorize | |
self.sort = sort | |
self.upsert = upsert | |
async def execute( | |
self, | |
collection: AsyncCollection, | |
index_in_bulk_write: int, | |
bulk_write_timeout_ms: Optional[int], | |
) -> BulkWriteResult: | |
""" | |
Execute this operation against a collection as part of a bulk write. | |
Args: | |
collection: the collection this write targets. | |
insert_in_bulk_write: the index in the list of bulkoperations | |
""" | |
op_result: UpdateResult = await collection.replace_one( | |
filter=self.filter, | |
replacement=self.replacement, | |
vector=self.vector, | |
vectorize=self.vectorize, | |
sort=self.sort, | |
upsert=self.upsert, | |
max_time_ms=bulk_write_timeout_ms, | |
) | |
return op_result.to_bulk_write_result(index_in_bulk_write=index_in_bulk_write) | |
class AsyncDeleteOne(AsyncBaseOperation): | |
""" | |
Represents a `delete_one` operation on a (async) collection. | |
See the documentation on the collection method for more information. | |
Attributes: | |
filter: a filter condition to select a target document. | |
vector: a vector of numbers to use for ANN (vector-search) sorting. | |
vectorize: a string to be made into a vector, with the same result as the | |
`vector` attribute, through an embedding service, assuming one is | |
configured for the collection. | |
NOTE: This feature is under current development. | |
sort: controls ordering of results, hence which document is affected. | |
""" | |
filter: Dict[str, Any] | |
vector: Optional[VectorType] | |
vectorize: Optional[str] | |
sort: Optional[SortType] | |
def __init__( | |
self, | |
filter: Dict[str, Any], | |
*, | |
vector: Optional[VectorType] = None, | |
vectorize: Optional[str] = None, | |
sort: Optional[SortType] = None, | |
) -> None: | |
self.filter = filter | |
self.vector = vector | |
self.vectorize = vectorize | |
self.sort = sort | |
async def execute( | |
self, | |
collection: AsyncCollection, | |
index_in_bulk_write: int, | |
bulk_write_timeout_ms: Optional[int], | |
) -> BulkWriteResult: | |
""" | |
Execute this operation against a collection as part of a bulk write. | |
Args: | |
collection: the collection this write targets. | |
insert_in_bulk_write: the index in the list of bulkoperations | |
""" | |
op_result: DeleteResult = await collection.delete_one( | |
filter=self.filter, | |
vector=self.vector, | |
vectorize=self.vectorize, | |
sort=self.sort, | |
max_time_ms=bulk_write_timeout_ms, | |
) | |
return op_result.to_bulk_write_result(index_in_bulk_write=index_in_bulk_write) | |
class AsyncDeleteMany(AsyncBaseOperation): | |
""" | |
Represents a `delete_many` operation on a (async) collection. | |
See the documentation on the collection method for more information. | |
Attributes: | |
filter: a filter condition to select target documents. | |
""" | |
filter: Dict[str, Any] | |
def __init__( | |
self, | |
filter: Dict[str, Any], | |
) -> None: | |
self.filter = filter | |
async def execute( | |
self, | |
collection: AsyncCollection, | |
index_in_bulk_write: int, | |
bulk_write_timeout_ms: Optional[int], | |
) -> BulkWriteResult: | |
""" | |
Execute this operation against a collection as part of a bulk write. | |
Args: | |
collection: the collection this write targets. | |
insert_in_bulk_write: the index in the list of bulkoperations | |
""" | |
op_result: DeleteResult = await collection.delete_many( | |
filter=self.filter, max_time_ms=bulk_write_timeout_ms | |
) | |
return op_result.to_bulk_write_result(index_in_bulk_write=index_in_bulk_write) | |