Spaces:
Running
Running
""" | |
handling of key-value storage on a Cassandra table. | |
One row per partition, serializes a multiple partition key into a string | |
""" | |
from typing import Any, Dict, List, Optional, cast | |
from warnings import warn | |
from cassandra.cluster import Session | |
from cassio.table.tables import ElasticCassandraTable | |
class KVCache: | |
""" | |
This class is a rewriting of the KVCache created for use in LangChain | |
integration, this time relying on the class-table-hierarchy (cassio.table.*). | |
It mostly provides a translation layer between parameters and key names, | |
using a clustered table class internally. | |
Additional kwargs, for use in this new table class, are passed as they are | |
in order to enable their usage already before adapting the LangChain | |
integration code. | |
""" | |
DEPRECATION_MESSAGE = ( | |
"Class `KVCache` is a legacy construct and " | |
"will be deprecated in future versions of CassIO." | |
) | |
def __init__( | |
self, | |
table: str, | |
keys: List[Any], | |
session: Optional[Session] = None, | |
keyspace: Optional[str] = None, | |
): | |
# | |
warn(self.DEPRECATION_MESSAGE, DeprecationWarning, stacklevel=2) | |
# for LangChain this is what we expect - no other uses are planned: | |
assert all(isinstance(k, str) for k in keys) | |
p_k_type = ["TEXT"] * len(keys) | |
# | |
self.table = ElasticCassandraTable( | |
session, | |
keyspace, | |
table, | |
keys=keys, | |
primary_key_type=p_k_type, | |
) | |
def clear(self) -> None: | |
self.table.clear() | |
return None | |
def put( | |
self, | |
key_dict: Dict[str, str], | |
cache_value: str, | |
ttl_seconds: Optional[int] = None, | |
) -> None: | |
self.table.put(body_blob=cache_value, ttl_seconds=ttl_seconds, **key_dict) | |
return None | |
def get(self, key_dict: Dict[str, Any]) -> Optional[str]: | |
entry = self.table.get(**key_dict) | |
if entry is None: | |
return None | |
else: | |
return cast(str, entry["body_blob"]) | |
def delete(self, key_dict: Dict[str, Any]) -> None: | |
"""Will not complain if the row does not exist.""" | |
self.table.delete(**key_dict) | |
return None | |