OpkaGames's picture
Upload folder using huggingface_hub
870ab6b
from toolz import curried
import uuid
from weakref import WeakValueDictionary
from typing import Union, Dict, Set, MutableMapping
from typing import TypedDict, Final
from altair.utils._importers import import_vegafusion
from altair.utils.core import _DataFrameLike
from altair.utils.data import _DataType, _ToValuesReturnType, MaxRowsError
from altair.vegalite.data import default_data_transformer
# Temporary storage for dataframes that have been extracted
# from charts by the vegafusion data transformer. Use a WeakValueDictionary
# rather than a dict so that the Python interpreter is free to garbage
# collect the stored DataFrames.
extracted_inline_tables: MutableMapping[str, _DataFrameLike] = WeakValueDictionary()
# Special URL prefix that VegaFusion uses to denote that a
# dataset in a Vega spec corresponds to an entry in the `inline_datasets`
# kwarg of vf.runtime.pre_transform_spec().
VEGAFUSION_PREFIX: Final = "vegafusion+dataset://"
class _ToVegaFusionReturnUrlDict(TypedDict):
url: str
@curried.curry
def vegafusion_data_transformer(
data: _DataType, max_rows: int = 100000
) -> Union[_ToVegaFusionReturnUrlDict, _ToValuesReturnType]:
"""VegaFusion Data Transformer"""
if hasattr(data, "__geo_interface__"):
# Use default transformer for geo interface objects
# # (e.g. a geopandas GeoDataFrame)
return default_data_transformer(data)
elif hasattr(data, "__dataframe__"):
table_name = f"table_{uuid.uuid4()}".replace("-", "_")
extracted_inline_tables[table_name] = data
return {"url": VEGAFUSION_PREFIX + table_name}
else:
# Use default transformer if we don't recognize data type
return default_data_transformer(data)
def get_inline_table_names(vega_spec: dict) -> Set[str]:
"""Get a set of the inline datasets names in the provided Vega spec
Inline datasets are encoded as URLs that start with the table://
prefix.
Parameters
----------
vega_spec: dict
A Vega specification dict
Returns
-------
set of str
Set of the names of the inline datasets that are referenced
in the specification.
Examples
--------
>>> spec = {
... "data": [
... {
... "name": "foo",
... "url": "https://path/to/file.csv"
... },
... {
... "name": "bar",
... "url": "vegafusion+dataset://inline_dataset_123"
... }
... ]
... }
>>> get_inline_table_names(spec)
{'inline_dataset_123'}
"""
table_names = set()
# Process datasets
for data in vega_spec.get("data", []):
url = data.get("url", "")
if url.startswith(VEGAFUSION_PREFIX):
name = url[len(VEGAFUSION_PREFIX) :]
table_names.add(name)
# Recursively process child marks, which may have their own datasets
for mark in vega_spec.get("marks", []):
table_names.update(get_inline_table_names(mark))
return table_names
def get_inline_tables(vega_spec: dict) -> Dict[str, _DataFrameLike]:
"""Get the inline tables referenced by a Vega specification
Note: This function should only be called on a Vega spec that corresponds
to a chart that was processed by the vegafusion_data_transformer.
Furthermore, this function may only be called once per spec because
the returned dataframes are deleted from internal storage.
Parameters
----------
vega_spec: dict
A Vega specification dict
Returns
-------
dict from str to dataframe
dict from inline dataset name to dataframe object
"""
table_names = get_inline_table_names(vega_spec)
tables = {}
for table_name in table_names:
try:
tables[table_name] = extracted_inline_tables.pop(table_name)
except KeyError:
# named dataset that was provided by the user
pass
return tables
def compile_with_vegafusion(vegalite_spec: dict) -> dict:
"""Compile a Vega-Lite spec to Vega and pre-transform with VegaFusion
Note: This function should only be called on a Vega-Lite spec
that was generated with the "vegafusion" data transformer enabled.
In particular, this spec may contain references to extract datasets
using table:// prefixed URLs.
Parameters
----------
vegalite_spec: dict
A Vega-Lite spec that was generated from an Altair chart with
the "vegafusion" data transformer enabled
Returns
-------
dict
A Vega spec that has been pre-transformed by VegaFusion
"""
# Local import to avoid circular ImportError
from altair import vegalite_compilers, data_transformers
vf = import_vegafusion()
# Compile Vega-Lite spec to Vega
compiler = vegalite_compilers.get()
if compiler is None:
raise ValueError("No active vega-lite compiler plugin found")
vega_spec = compiler(vegalite_spec)
# Retrieve dict of inline tables referenced by the spec
inline_tables = get_inline_tables(vega_spec)
# Pre-evaluate transforms in vega spec with vegafusion
row_limit = data_transformers.options.get("max_rows", None)
transformed_vega_spec, warnings = vf.runtime.pre_transform_spec(
vega_spec,
vf.get_local_tz(),
inline_datasets=inline_tables,
row_limit=row_limit,
)
# Check from row limit warning and convert to MaxRowsError
for warning in warnings:
if warning.get("type") == "RowLimitExceeded":
raise MaxRowsError(
"The number of dataset rows after filtering and aggregation exceeds\n"
f"the current limit of {row_limit}. Try adding an aggregation to reduce\n"
"the size of the dataset that must be loaded into the browser. Or, disable\n"
"the limit by calling alt.data_transformers.disable_max_rows(). Note that\n"
"disabling this limit may cause the browser to freeze or crash."
)
return transformed_vega_spec
def using_vegafusion() -> bool:
"""Check whether the vegafusion data transfomer is enabled"""
# Local import to avoid circular ImportError
from altair import data_transformers
return data_transformers.active == "vegafusion"