Draken007's picture
Upload 7228 files
2a0bc63 verified
"""
handling of key-value storage on a Cassandra table.
One row per partition, serializes a multiple partition key into a string
"""
from typing import Any, Dict, List, Optional, cast
from warnings import warn
from cassandra.cluster import Session
from cassio.table.tables import ElasticCassandraTable
class KVCache:
"""
This class is a rewriting of the KVCache created for use in LangChain
integration, this time relying on the class-table-hierarchy (cassio.table.*).
It mostly provides a translation layer between parameters and key names,
using a clustered table class internally.
Additional kwargs, for use in this new table class, are passed as they are
in order to enable their usage already before adapting the LangChain
integration code.
"""
DEPRECATION_MESSAGE = (
"Class `KVCache` is a legacy construct and "
"will be deprecated in future versions of CassIO."
)
def __init__(
self,
table: str,
keys: List[Any],
session: Optional[Session] = None,
keyspace: Optional[str] = None,
):
#
warn(self.DEPRECATION_MESSAGE, DeprecationWarning, stacklevel=2)
# for LangChain this is what we expect - no other uses are planned:
assert all(isinstance(k, str) for k in keys)
p_k_type = ["TEXT"] * len(keys)
#
self.table = ElasticCassandraTable(
session,
keyspace,
table,
keys=keys,
primary_key_type=p_k_type,
)
def clear(self) -> None:
self.table.clear()
return None
def put(
self,
key_dict: Dict[str, str],
cache_value: str,
ttl_seconds: Optional[int] = None,
) -> None:
self.table.put(body_blob=cache_value, ttl_seconds=ttl_seconds, **key_dict)
return None
def get(self, key_dict: Dict[str, Any]) -> Optional[str]:
entry = self.table.get(**key_dict)
if entry is None:
return None
else:
return cast(str, entry["body_blob"])
def delete(self, key_dict: Dict[str, Any]) -> None:
"""Will not complain if the row does not exist."""
self.table.delete(**key_dict)
return None