|
from toolz import curried |
|
import uuid |
|
from weakref import WeakValueDictionary |
|
|
|
from typing import Union, Dict, Set, MutableMapping |
|
|
|
from typing import TypedDict, Final |
|
|
|
from altair.utils._importers import import_vegafusion |
|
from altair.utils.core import _DataFrameLike |
|
from altair.utils.data import _DataType, _ToValuesReturnType, MaxRowsError |
|
from altair.vegalite.data import default_data_transformer |
|
|
|
|
|
|
|
|
|
|
|
extracted_inline_tables: MutableMapping[str, _DataFrameLike] = WeakValueDictionary() |
|
|
|
|
|
|
|
|
|
VEGAFUSION_PREFIX: Final = "vegafusion+dataset://" |
|
|
|
|
|
class _ToVegaFusionReturnUrlDict(TypedDict): |
|
url: str |
|
|
|
|
|
@curried.curry |
|
def vegafusion_data_transformer( |
|
data: _DataType, max_rows: int = 100000 |
|
) -> Union[_ToVegaFusionReturnUrlDict, _ToValuesReturnType]: |
|
"""VegaFusion Data Transformer""" |
|
if hasattr(data, "__geo_interface__"): |
|
|
|
|
|
return default_data_transformer(data) |
|
elif hasattr(data, "__dataframe__"): |
|
table_name = f"table_{uuid.uuid4()}".replace("-", "_") |
|
extracted_inline_tables[table_name] = data |
|
return {"url": VEGAFUSION_PREFIX + table_name} |
|
else: |
|
|
|
return default_data_transformer(data) |
|
|
|
|
|
def get_inline_table_names(vega_spec: dict) -> Set[str]: |
|
"""Get a set of the inline datasets names in the provided Vega spec |
|
|
|
Inline datasets are encoded as URLs that start with the table:// |
|
prefix. |
|
|
|
Parameters |
|
---------- |
|
vega_spec: dict |
|
A Vega specification dict |
|
|
|
Returns |
|
------- |
|
set of str |
|
Set of the names of the inline datasets that are referenced |
|
in the specification. |
|
|
|
Examples |
|
-------- |
|
>>> spec = { |
|
... "data": [ |
|
... { |
|
... "name": "foo", |
|
... "url": "https://path/to/file.csv" |
|
... }, |
|
... { |
|
... "name": "bar", |
|
... "url": "vegafusion+dataset://inline_dataset_123" |
|
... } |
|
... ] |
|
... } |
|
>>> get_inline_table_names(spec) |
|
{'inline_dataset_123'} |
|
""" |
|
table_names = set() |
|
|
|
|
|
for data in vega_spec.get("data", []): |
|
url = data.get("url", "") |
|
if url.startswith(VEGAFUSION_PREFIX): |
|
name = url[len(VEGAFUSION_PREFIX) :] |
|
table_names.add(name) |
|
|
|
|
|
for mark in vega_spec.get("marks", []): |
|
table_names.update(get_inline_table_names(mark)) |
|
|
|
return table_names |
|
|
|
|
|
def get_inline_tables(vega_spec: dict) -> Dict[str, _DataFrameLike]: |
|
"""Get the inline tables referenced by a Vega specification |
|
|
|
Note: This function should only be called on a Vega spec that corresponds |
|
to a chart that was processed by the vegafusion_data_transformer. |
|
Furthermore, this function may only be called once per spec because |
|
the returned dataframes are deleted from internal storage. |
|
|
|
Parameters |
|
---------- |
|
vega_spec: dict |
|
A Vega specification dict |
|
|
|
Returns |
|
------- |
|
dict from str to dataframe |
|
dict from inline dataset name to dataframe object |
|
""" |
|
table_names = get_inline_table_names(vega_spec) |
|
tables = {} |
|
for table_name in table_names: |
|
try: |
|
tables[table_name] = extracted_inline_tables.pop(table_name) |
|
except KeyError: |
|
|
|
pass |
|
return tables |
|
|
|
|
|
def compile_with_vegafusion(vegalite_spec: dict) -> dict: |
|
"""Compile a Vega-Lite spec to Vega and pre-transform with VegaFusion |
|
|
|
Note: This function should only be called on a Vega-Lite spec |
|
that was generated with the "vegafusion" data transformer enabled. |
|
In particular, this spec may contain references to extract datasets |
|
using table:// prefixed URLs. |
|
|
|
Parameters |
|
---------- |
|
vegalite_spec: dict |
|
A Vega-Lite spec that was generated from an Altair chart with |
|
the "vegafusion" data transformer enabled |
|
|
|
Returns |
|
------- |
|
dict |
|
A Vega spec that has been pre-transformed by VegaFusion |
|
""" |
|
|
|
from altair import vegalite_compilers, data_transformers |
|
|
|
vf = import_vegafusion() |
|
|
|
|
|
compiler = vegalite_compilers.get() |
|
if compiler is None: |
|
raise ValueError("No active vega-lite compiler plugin found") |
|
|
|
vega_spec = compiler(vegalite_spec) |
|
|
|
|
|
inline_tables = get_inline_tables(vega_spec) |
|
|
|
|
|
row_limit = data_transformers.options.get("max_rows", None) |
|
transformed_vega_spec, warnings = vf.runtime.pre_transform_spec( |
|
vega_spec, |
|
vf.get_local_tz(), |
|
inline_datasets=inline_tables, |
|
row_limit=row_limit, |
|
) |
|
|
|
|
|
for warning in warnings: |
|
if warning.get("type") == "RowLimitExceeded": |
|
raise MaxRowsError( |
|
"The number of dataset rows after filtering and aggregation exceeds\n" |
|
f"the current limit of {row_limit}. Try adding an aggregation to reduce\n" |
|
"the size of the dataset that must be loaded into the browser. Or, disable\n" |
|
"the limit by calling alt.data_transformers.disable_max_rows(). Note that\n" |
|
"disabling this limit may cause the browser to freeze or crash." |
|
) |
|
|
|
return transformed_vega_spec |
|
|
|
|
|
def using_vegafusion() -> bool: |
|
"""Check whether the vegafusion data transfomer is enabled""" |
|
|
|
from altair import data_transformers |
|
|
|
return data_transformers.active == "vegafusion" |
|
|