Draken007's picture
Upload 7228 files
2a0bc63 verified
# Copyright DataStax, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import annotations
from abc import ABC, abstractmethod
from dataclasses import dataclass
from functools import reduce
from typing import (
Any,
Dict,
Iterable,
List,
Optional,
)
from astrapy.constants import DocumentType, SortType, VectorType
from astrapy.results import (
BulkWriteResult,
DeleteResult,
InsertManyResult,
InsertOneResult,
UpdateResult,
)
from astrapy.collection import AsyncCollection, Collection
def reduce_bulk_write_results(results: List[BulkWriteResult]) -> BulkWriteResult:
"""
Reduce a list of bulk write results into a single one.
Args:
results: a list of BulkWriteResult instances.
Returns:
A new BulkWRiteResult object which summarized the whole input list.
"""
zero = BulkWriteResult.zero()
def _sum_results(r1: BulkWriteResult, r2: BulkWriteResult) -> BulkWriteResult:
bulk_api_results = {**r1.bulk_api_results, **r2.bulk_api_results}
if r1.deleted_count is None or r2.deleted_count is None:
deleted_count = None
else:
deleted_count = r1.deleted_count + r2.deleted_count
inserted_count = r1.inserted_count + r2.inserted_count
matched_count = r1.matched_count + r2.matched_count
modified_count = r1.modified_count + r2.modified_count
upserted_count = r1.upserted_count + r2.upserted_count
upserted_ids = {**r1.upserted_ids, **r2.upserted_ids}
return BulkWriteResult(
bulk_api_results=bulk_api_results,
deleted_count=deleted_count,
inserted_count=inserted_count,
matched_count=matched_count,
modified_count=modified_count,
upserted_count=upserted_count,
upserted_ids=upserted_ids,
)
return reduce(_sum_results, results, zero)
class BaseOperation(ABC):
"""
Base class for all operations amenable to be used
in bulk writes on (sync) collections.
"""
@abstractmethod
def execute(
self,
collection: Collection,
index_in_bulk_write: int,
bulk_write_timeout_ms: Optional[int],
) -> BulkWriteResult: ...
@dataclass
class InsertOne(BaseOperation):
"""
Represents an `insert_one` operation on a (sync) collection.
See the documentation on the collection method for more information.
Attributes:
document: the document to insert.
vector: an optional suitable vector to enrich the document at insertion.
vectorize: a string to be made into a vector, with the same result as the
`vector` attribute, through an embedding service, assuming one is
configured for the collection.
NOTE: This feature is under current development.
"""
document: DocumentType
vector: Optional[VectorType]
vectorize: Optional[str]
def __init__(
self,
document: DocumentType,
*,
vector: Optional[VectorType] = None,
vectorize: Optional[str] = None,
) -> None:
self.document = document
self.vector = vector
self.vectorize = vectorize
def execute(
self,
collection: Collection,
index_in_bulk_write: int,
bulk_write_timeout_ms: Optional[int],
) -> BulkWriteResult:
"""
Execute this operation against a collection as part of a bulk write.
Args:
collection: the collection this write targets.
insert_in_bulk_write: the index in the list of bulkoperations
"""
op_result: InsertOneResult = collection.insert_one(
document=self.document,
vector=self.vector,
vectorize=self.vectorize,
max_time_ms=bulk_write_timeout_ms,
)
return op_result.to_bulk_write_result(index_in_bulk_write=index_in_bulk_write)
@dataclass
class InsertMany(BaseOperation):
"""
Represents an `insert_many` operation on a (sync) collection.
See the documentation on the collection method for more information.
Attributes:
documents: the list document to insert.
vectors: an optional list of vectors to enrich the documents at insertion.
vectorize: an optional list of texts achieving the same effect as `vectors`
except through an embedding service, if one is configured for the collection.
NOTE: This feature is under current development.
ordered: whether the inserts should be done in sequence.
chunk_size: how many documents to include in a single API request.
Exceeding the server maximum allowed value results in an error.
Leave it unspecified (recommended) to use the system default.
concurrency: maximum number of concurrent requests to the API at
a given time. It cannot be more than one for ordered insertions.
"""
documents: Iterable[DocumentType]
vectors: Optional[Iterable[Optional[VectorType]]]
vectorize: Optional[Iterable[Optional[str]]]
ordered: bool
chunk_size: Optional[int]
concurrency: Optional[int]
def __init__(
self,
documents: Iterable[DocumentType],
*,
vectors: Optional[Iterable[Optional[VectorType]]] = None,
vectorize: Optional[Iterable[Optional[str]]] = None,
ordered: bool = True,
chunk_size: Optional[int] = None,
concurrency: Optional[int] = None,
) -> None:
self.documents = documents
self.ordered = ordered
self.vectors = vectors
self.vectorize = vectorize
self.chunk_size = chunk_size
self.concurrency = concurrency
def execute(
self,
collection: Collection,
index_in_bulk_write: int,
bulk_write_timeout_ms: Optional[int],
) -> BulkWriteResult:
"""
Execute this operation against a collection as part of a bulk write.
Args:
collection: the collection this write targets.
insert_in_bulk_write: the index in the list of bulkoperations
"""
op_result: InsertManyResult = collection.insert_many(
documents=self.documents,
vectors=self.vectors,
vectorize=self.vectorize,
ordered=self.ordered,
chunk_size=self.chunk_size,
concurrency=self.concurrency,
max_time_ms=bulk_write_timeout_ms,
)
return op_result.to_bulk_write_result(index_in_bulk_write=index_in_bulk_write)
@dataclass
class UpdateOne(BaseOperation):
"""
Represents an `update_one` operation on a (sync) collection.
See the documentation on the collection method for more information.
Attributes:
filter: a filter condition to select a target document.
update: an update prescription to apply to the document.
vector: a vector of numbers to use for ANN (vector-search) sorting.
vectorize: a string to be made into a vector, with the same result as the
`vector` attribute, through an embedding service, assuming one is
configured for the collection.
NOTE: This feature is under current development.
sort: controls ordering of results, hence which document is affected.
upsert: controls what to do when no documents are found.
"""
filter: Dict[str, Any]
update: Dict[str, Any]
vector: Optional[VectorType]
vectorize: Optional[str]
sort: Optional[SortType]
upsert: bool
def __init__(
self,
filter: Dict[str, Any],
update: Dict[str, Any],
*,
vector: Optional[VectorType] = None,
vectorize: Optional[str] = None,
sort: Optional[SortType] = None,
upsert: bool = False,
) -> None:
self.filter = filter
self.update = update
self.vector = vector
self.vectorize = vectorize
self.sort = sort
self.upsert = upsert
def execute(
self,
collection: Collection,
index_in_bulk_write: int,
bulk_write_timeout_ms: Optional[int],
) -> BulkWriteResult:
"""
Execute this operation against a collection as part of a bulk write.
Args:
collection: the collection this write targets.
insert_in_bulk_write: the index in the list of bulkoperations
"""
op_result: UpdateResult = collection.update_one(
filter=self.filter,
update=self.update,
vector=self.vector,
vectorize=self.vectorize,
sort=self.sort,
upsert=self.upsert,
max_time_ms=bulk_write_timeout_ms,
)
return op_result.to_bulk_write_result(index_in_bulk_write=index_in_bulk_write)
@dataclass
class UpdateMany(BaseOperation):
"""
Represents an `update_many` operation on a (sync) collection.
See the documentation on the collection method for more information.
Attributes:
filter: a filter condition to select target documents.
update: an update prescription to apply to the documents.
upsert: controls what to do when no documents are found.
"""
filter: Dict[str, Any]
update: Dict[str, Any]
upsert: bool
def __init__(
self,
filter: Dict[str, Any],
update: Dict[str, Any],
*,
upsert: bool = False,
) -> None:
self.filter = filter
self.update = update
self.upsert = upsert
def execute(
self,
collection: Collection,
index_in_bulk_write: int,
bulk_write_timeout_ms: Optional[int],
) -> BulkWriteResult:
"""
Execute this operation against a collection as part of a bulk write.
Args:
collection: the collection this write targets.
insert_in_bulk_write: the index in the list of bulkoperations
"""
op_result: UpdateResult = collection.update_many(
filter=self.filter,
update=self.update,
upsert=self.upsert,
max_time_ms=bulk_write_timeout_ms,
)
return op_result.to_bulk_write_result(index_in_bulk_write=index_in_bulk_write)
@dataclass
class ReplaceOne(BaseOperation):
"""
Represents a `replace_one` operation on a (sync) collection.
See the documentation on the collection method for more information.
Attributes:
filter: a filter condition to select a target document.
replacement: the replacement document.
vector: a vector of numbers to use for ANN (vector-search) sorting.
vectorize: a string to be made into a vector, with the same result as the
`vector` attribute, through an embedding service, assuming one is
configured for the collection.
NOTE: This feature is under current development.
sort: controls ordering of results, hence which document is affected.
upsert: controls what to do when no documents are found.
"""
filter: Dict[str, Any]
replacement: DocumentType
vector: Optional[VectorType]
vectorize: Optional[str]
sort: Optional[SortType]
upsert: bool
def __init__(
self,
filter: Dict[str, Any],
replacement: DocumentType,
*,
vector: Optional[VectorType] = None,
vectorize: Optional[str] = None,
sort: Optional[SortType] = None,
upsert: bool = False,
) -> None:
self.filter = filter
self.replacement = replacement
self.vector = vector
self.vectorize = vectorize
self.sort = sort
self.upsert = upsert
def execute(
self,
collection: Collection,
index_in_bulk_write: int,
bulk_write_timeout_ms: Optional[int],
) -> BulkWriteResult:
"""
Execute this operation against a collection as part of a bulk write.
Args:
collection: the collection this write targets.
insert_in_bulk_write: the index in the list of bulkoperations
"""
op_result: UpdateResult = collection.replace_one(
filter=self.filter,
replacement=self.replacement,
vector=self.vector,
vectorize=self.vectorize,
sort=self.sort,
upsert=self.upsert,
max_time_ms=bulk_write_timeout_ms,
)
return op_result.to_bulk_write_result(index_in_bulk_write=index_in_bulk_write)
@dataclass
class DeleteOne(BaseOperation):
"""
Represents a `delete_one` operation on a (sync) collection.
See the documentation on the collection method for more information.
Attributes:
filter: a filter condition to select a target document.
vector: a vector of numbers to use for ANN (vector-search) sorting.
vectorize: a string to be made into a vector, with the same result as the
`vector` attribute, through an embedding service, assuming one is
configured for the collection.
NOTE: This feature is under current development.
sort: controls ordering of results, hence which document is affected.
"""
filter: Dict[str, Any]
vector: Optional[VectorType]
vectorize: Optional[str]
sort: Optional[SortType]
def __init__(
self,
filter: Dict[str, Any],
*,
vector: Optional[VectorType] = None,
vectorize: Optional[str] = None,
sort: Optional[SortType] = None,
) -> None:
self.filter = filter
self.vector = vector
self.vectorize = vectorize
self.sort = sort
def execute(
self,
collection: Collection,
index_in_bulk_write: int,
bulk_write_timeout_ms: Optional[int],
) -> BulkWriteResult:
"""
Execute this operation against a collection as part of a bulk write.
Args:
collection: the collection this write targets.
insert_in_bulk_write: the index in the list of bulkoperations
"""
op_result: DeleteResult = collection.delete_one(
filter=self.filter,
vector=self.vector,
vectorize=self.vectorize,
sort=self.sort,
max_time_ms=bulk_write_timeout_ms,
)
return op_result.to_bulk_write_result(index_in_bulk_write=index_in_bulk_write)
@dataclass
class DeleteMany(BaseOperation):
"""
Represents a `delete_many` operation on a (sync) collection.
See the documentation on the collection method for more information.
Attributes:
filter: a filter condition to select target documents.
"""
filter: Dict[str, Any]
def __init__(
self,
filter: Dict[str, Any],
) -> None:
self.filter = filter
def execute(
self,
collection: Collection,
index_in_bulk_write: int,
bulk_write_timeout_ms: Optional[int],
) -> BulkWriteResult:
"""
Execute this operation against a collection as part of a bulk write.
Args:
collection: the collection this write targets.
insert_in_bulk_write: the index in the list of bulkoperations
"""
op_result: DeleteResult = collection.delete_many(
filter=self.filter, max_time_ms=bulk_write_timeout_ms
)
return op_result.to_bulk_write_result(index_in_bulk_write=index_in_bulk_write)
class AsyncBaseOperation(ABC):
"""
Base class for all operations amenable to be used
in bulk writes on (async) collections.
"""
@abstractmethod
async def execute(
self,
collection: AsyncCollection,
index_in_bulk_write: int,
bulk_write_timeout_ms: Optional[int],
) -> BulkWriteResult: ...
@dataclass
class AsyncInsertOne(AsyncBaseOperation):
"""
Represents an `insert_one` operation on a (async) collection.
See the documentation on the collection method for more information.
Attributes:
document: the document to insert.
vector: an optional suitable vector to enrich the document at insertion.
vectorize: a string to be made into a vector, with the same result as the
`vector` attribute, through an embedding service, assuming one is
configured for the collection.
NOTE: This feature is under current development.
"""
document: DocumentType
vector: Optional[VectorType]
vectorize: Optional[str]
def __init__(
self,
document: DocumentType,
*,
vector: Optional[VectorType] = None,
vectorize: Optional[str] = None,
) -> None:
self.document = document
self.vector = vector
self.vectorize = vectorize
async def execute(
self,
collection: AsyncCollection,
index_in_bulk_write: int,
bulk_write_timeout_ms: Optional[int],
) -> BulkWriteResult:
"""
Execute this operation against a collection as part of a bulk write.
Args:
collection: the collection this write targets.
insert_in_bulk_write: the index in the list of bulkoperations
"""
op_result: InsertOneResult = await collection.insert_one(
document=self.document,
vector=self.vector,
vectorize=self.vectorize,
max_time_ms=bulk_write_timeout_ms,
)
return op_result.to_bulk_write_result(index_in_bulk_write=index_in_bulk_write)
@dataclass
class AsyncInsertMany(AsyncBaseOperation):
"""
Represents an `insert_many` operation on a (async) collection.
See the documentation on the collection method for more information.
Attributes:
documents: the list document to insert.
vectors: an optional list of vectors to enrich the documents at insertion.
vectorize: an optional list of texts achieving the same effect as `vectors`
except through an embedding service, if one is configured for the collection.
NOTE: This feature is under current development.
ordered: whether the inserts should be done in sequence.
chunk_size: how many documents to include in a single API request.
Exceeding the server maximum allowed value results in an error.
Leave it unspecified (recommended) to use the system default.
concurrency: maximum number of concurrent requests to the API at
a given time. It cannot be more than one for ordered insertions.
"""
documents: Iterable[DocumentType]
vectors: Optional[Iterable[Optional[VectorType]]]
vectorize: Optional[Iterable[Optional[str]]]
ordered: bool
chunk_size: Optional[int]
concurrency: Optional[int]
def __init__(
self,
documents: Iterable[DocumentType],
*,
vectors: Optional[Iterable[Optional[VectorType]]] = None,
vectorize: Optional[Iterable[Optional[str]]] = None,
ordered: bool = True,
chunk_size: Optional[int] = None,
concurrency: Optional[int] = None,
) -> None:
self.documents = documents
self.vectors = vectors
self.vectorize = vectorize
self.ordered = ordered
self.chunk_size = chunk_size
self.concurrency = concurrency
async def execute(
self,
collection: AsyncCollection,
index_in_bulk_write: int,
bulk_write_timeout_ms: Optional[int],
) -> BulkWriteResult:
"""
Execute this operation against a collection as part of a bulk write.
Args:
collection: the collection this write targets.
insert_in_bulk_write: the index in the list of bulkoperations
"""
op_result: InsertManyResult = await collection.insert_many(
documents=self.documents,
vectors=self.vectors,
vectorize=self.vectorize,
ordered=self.ordered,
chunk_size=self.chunk_size,
concurrency=self.concurrency,
max_time_ms=bulk_write_timeout_ms,
)
return op_result.to_bulk_write_result(index_in_bulk_write=index_in_bulk_write)
@dataclass
class AsyncUpdateOne(AsyncBaseOperation):
"""
Represents an `update_one` operation on a (async) collection.
See the documentation on the collection method for more information.
Attributes:
filter: a filter condition to select a target document.
update: an update prescription to apply to the document.
vector: a vector of numbers to use for ANN (vector-search) sorting.
vectorize: a string to be made into a vector, with the same result as the
`vector` attribute, through an embedding service, assuming one is
configured for the collection.
NOTE: This feature is under current development.
sort: controls ordering of results, hence which document is affected.
upsert: controls what to do when no documents are found.
"""
filter: Dict[str, Any]
update: Dict[str, Any]
vector: Optional[VectorType]
vectorize: Optional[str]
sort: Optional[SortType]
upsert: bool
def __init__(
self,
filter: Dict[str, Any],
update: Dict[str, Any],
*,
vector: Optional[VectorType] = None,
vectorize: Optional[str] = None,
sort: Optional[SortType] = None,
upsert: bool = False,
) -> None:
self.filter = filter
self.update = update
self.vector = vector
self.vectorize = vectorize
self.sort = sort
self.upsert = upsert
async def execute(
self,
collection: AsyncCollection,
index_in_bulk_write: int,
bulk_write_timeout_ms: Optional[int],
) -> BulkWriteResult:
"""
Execute this operation against a collection as part of a bulk write.
Args:
collection: the collection this write targets.
insert_in_bulk_write: the index in the list of bulkoperations
"""
op_result: UpdateResult = await collection.update_one(
filter=self.filter,
update=self.update,
vector=self.vector,
vectorize=self.vectorize,
sort=self.sort,
upsert=self.upsert,
max_time_ms=bulk_write_timeout_ms,
)
return op_result.to_bulk_write_result(index_in_bulk_write=index_in_bulk_write)
@dataclass
class AsyncUpdateMany(AsyncBaseOperation):
"""
Represents an `update_many` operation on a (async) collection.
See the documentation on the collection method for more information.
Attributes:
filter: a filter condition to select target documents.
update: an update prescription to apply to the documents.
upsert: controls what to do when no documents are found.
"""
filter: Dict[str, Any]
update: Dict[str, Any]
upsert: bool
def __init__(
self,
filter: Dict[str, Any],
update: Dict[str, Any],
*,
upsert: bool = False,
) -> None:
self.filter = filter
self.update = update
self.upsert = upsert
async def execute(
self,
collection: AsyncCollection,
index_in_bulk_write: int,
bulk_write_timeout_ms: Optional[int],
) -> BulkWriteResult:
"""
Execute this operation against a collection as part of a bulk write.
Args:
collection: the collection this write targets.
insert_in_bulk_write: the index in the list of bulkoperations
"""
op_result: UpdateResult = await collection.update_many(
filter=self.filter,
update=self.update,
upsert=self.upsert,
max_time_ms=bulk_write_timeout_ms,
)
return op_result.to_bulk_write_result(index_in_bulk_write=index_in_bulk_write)
@dataclass
class AsyncReplaceOne(AsyncBaseOperation):
"""
Represents a `replace_one` operation on a (async) collection.
See the documentation on the collection method for more information.
Attributes:
filter: a filter condition to select a target document.
replacement: the replacement document.
vector: a vector of numbers to use for ANN (vector-search) sorting.
vectorize: a string to be made into a vector, with the same result as the
`vector` attribute, through an embedding service, assuming one is
configured for the collection.
NOTE: This feature is under current development.
sort: controls ordering of results, hence which document is affected.
upsert: controls what to do when no documents are found.
"""
filter: Dict[str, Any]
replacement: DocumentType
vector: Optional[VectorType]
vectorize: Optional[str]
sort: Optional[SortType]
upsert: bool
def __init__(
self,
filter: Dict[str, Any],
replacement: DocumentType,
*,
vector: Optional[VectorType] = None,
vectorize: Optional[str] = None,
sort: Optional[SortType] = None,
upsert: bool = False,
) -> None:
self.filter = filter
self.replacement = replacement
self.vector = vector
self.vectorize = vectorize
self.sort = sort
self.upsert = upsert
async def execute(
self,
collection: AsyncCollection,
index_in_bulk_write: int,
bulk_write_timeout_ms: Optional[int],
) -> BulkWriteResult:
"""
Execute this operation against a collection as part of a bulk write.
Args:
collection: the collection this write targets.
insert_in_bulk_write: the index in the list of bulkoperations
"""
op_result: UpdateResult = await collection.replace_one(
filter=self.filter,
replacement=self.replacement,
vector=self.vector,
vectorize=self.vectorize,
sort=self.sort,
upsert=self.upsert,
max_time_ms=bulk_write_timeout_ms,
)
return op_result.to_bulk_write_result(index_in_bulk_write=index_in_bulk_write)
@dataclass
class AsyncDeleteOne(AsyncBaseOperation):
"""
Represents a `delete_one` operation on a (async) collection.
See the documentation on the collection method for more information.
Attributes:
filter: a filter condition to select a target document.
vector: a vector of numbers to use for ANN (vector-search) sorting.
vectorize: a string to be made into a vector, with the same result as the
`vector` attribute, through an embedding service, assuming one is
configured for the collection.
NOTE: This feature is under current development.
sort: controls ordering of results, hence which document is affected.
"""
filter: Dict[str, Any]
vector: Optional[VectorType]
vectorize: Optional[str]
sort: Optional[SortType]
def __init__(
self,
filter: Dict[str, Any],
*,
vector: Optional[VectorType] = None,
vectorize: Optional[str] = None,
sort: Optional[SortType] = None,
) -> None:
self.filter = filter
self.vector = vector
self.vectorize = vectorize
self.sort = sort
async def execute(
self,
collection: AsyncCollection,
index_in_bulk_write: int,
bulk_write_timeout_ms: Optional[int],
) -> BulkWriteResult:
"""
Execute this operation against a collection as part of a bulk write.
Args:
collection: the collection this write targets.
insert_in_bulk_write: the index in the list of bulkoperations
"""
op_result: DeleteResult = await collection.delete_one(
filter=self.filter,
vector=self.vector,
vectorize=self.vectorize,
sort=self.sort,
max_time_ms=bulk_write_timeout_ms,
)
return op_result.to_bulk_write_result(index_in_bulk_write=index_in_bulk_write)
@dataclass
class AsyncDeleteMany(AsyncBaseOperation):
"""
Represents a `delete_many` operation on a (async) collection.
See the documentation on the collection method for more information.
Attributes:
filter: a filter condition to select target documents.
"""
filter: Dict[str, Any]
def __init__(
self,
filter: Dict[str, Any],
) -> None:
self.filter = filter
async def execute(
self,
collection: AsyncCollection,
index_in_bulk_write: int,
bulk_write_timeout_ms: Optional[int],
) -> BulkWriteResult:
"""
Execute this operation against a collection as part of a bulk write.
Args:
collection: the collection this write targets.
insert_in_bulk_write: the index in the list of bulkoperations
"""
op_result: DeleteResult = await collection.delete_many(
filter=self.filter, max_time_ms=bulk_write_timeout_ms
)
return op_result.to_bulk_write_result(index_in_bulk_write=index_in_bulk_write)