Spaces:
No application file
No application file
| from toolz import curried | |
| import uuid | |
| from weakref import WeakValueDictionary | |
| from typing import Union, Dict, Set, MutableMapping | |
| from typing import TypedDict, Final | |
| from altair.utils._importers import import_vegafusion | |
| from altair.utils.core import DataFrameLike | |
| from altair.utils.data import DataType, ToValuesReturnType, MaxRowsError | |
| from altair.vegalite.data import default_data_transformer | |
| # Temporary storage for dataframes that have been extracted | |
| # from charts by the vegafusion data transformer. Use a WeakValueDictionary | |
| # rather than a dict so that the Python interpreter is free to garbage | |
| # collect the stored DataFrames. | |
| extracted_inline_tables: MutableMapping[str, DataFrameLike] = WeakValueDictionary() | |
| # Special URL prefix that VegaFusion uses to denote that a | |
| # dataset in a Vega spec corresponds to an entry in the `inline_datasets` | |
| # kwarg of vf.runtime.pre_transform_spec(). | |
| VEGAFUSION_PREFIX: Final = "vegafusion+dataset://" | |
| class _ToVegaFusionReturnUrlDict(TypedDict): | |
| url: str | |
| def vegafusion_data_transformer( | |
| data: DataType, max_rows: int = 100000 | |
| ) -> Union[_ToVegaFusionReturnUrlDict, ToValuesReturnType]: | |
| """VegaFusion Data Transformer""" | |
| if hasattr(data, "__geo_interface__"): | |
| # Use default transformer for geo interface objects | |
| # # (e.g. a geopandas GeoDataFrame) | |
| return default_data_transformer(data) | |
| elif hasattr(data, "__dataframe__"): | |
| table_name = f"table_{uuid.uuid4()}".replace("-", "_") | |
| extracted_inline_tables[table_name] = data | |
| return {"url": VEGAFUSION_PREFIX + table_name} | |
| else: | |
| # Use default transformer if we don't recognize data type | |
| return default_data_transformer(data) | |
| def get_inline_table_names(vega_spec: dict) -> Set[str]: | |
| """Get a set of the inline datasets names in the provided Vega spec | |
| Inline datasets are encoded as URLs that start with the table:// | |
| prefix. | |
| Parameters | |
| ---------- | |
| vega_spec: dict | |
| A Vega specification dict | |
| Returns | |
| ------- | |
| set of str | |
| Set of the names of the inline datasets that are referenced | |
| in the specification. | |
| Examples | |
| -------- | |
| >>> spec = { | |
| ... "data": [ | |
| ... { | |
| ... "name": "foo", | |
| ... "url": "https://path/to/file.csv" | |
| ... }, | |
| ... { | |
| ... "name": "bar", | |
| ... "url": "vegafusion+dataset://inline_dataset_123" | |
| ... } | |
| ... ] | |
| ... } | |
| >>> get_inline_table_names(spec) | |
| {'inline_dataset_123'} | |
| """ | |
| table_names = set() | |
| # Process datasets | |
| for data in vega_spec.get("data", []): | |
| url = data.get("url", "") | |
| if url.startswith(VEGAFUSION_PREFIX): | |
| name = url[len(VEGAFUSION_PREFIX) :] | |
| table_names.add(name) | |
| # Recursively process child marks, which may have their own datasets | |
| for mark in vega_spec.get("marks", []): | |
| table_names.update(get_inline_table_names(mark)) | |
| return table_names | |
| def get_inline_tables(vega_spec: dict) -> Dict[str, DataFrameLike]: | |
| """Get the inline tables referenced by a Vega specification | |
| Note: This function should only be called on a Vega spec that corresponds | |
| to a chart that was processed by the vegafusion_data_transformer. | |
| Furthermore, this function may only be called once per spec because | |
| the returned dataframes are deleted from internal storage. | |
| Parameters | |
| ---------- | |
| vega_spec: dict | |
| A Vega specification dict | |
| Returns | |
| ------- | |
| dict from str to dataframe | |
| dict from inline dataset name to dataframe object | |
| """ | |
| table_names = get_inline_table_names(vega_spec) | |
| tables = {} | |
| for table_name in table_names: | |
| try: | |
| tables[table_name] = extracted_inline_tables.pop(table_name) | |
| except KeyError: | |
| # named dataset that was provided by the user | |
| pass | |
| return tables | |
| def compile_with_vegafusion(vegalite_spec: dict) -> dict: | |
| """Compile a Vega-Lite spec to Vega and pre-transform with VegaFusion | |
| Note: This function should only be called on a Vega-Lite spec | |
| that was generated with the "vegafusion" data transformer enabled. | |
| In particular, this spec may contain references to extract datasets | |
| using table:// prefixed URLs. | |
| Parameters | |
| ---------- | |
| vegalite_spec: dict | |
| A Vega-Lite spec that was generated from an Altair chart with | |
| the "vegafusion" data transformer enabled | |
| Returns | |
| ------- | |
| dict | |
| A Vega spec that has been pre-transformed by VegaFusion | |
| """ | |
| # Local import to avoid circular ImportError | |
| from altair import vegalite_compilers, data_transformers | |
| vf = import_vegafusion() | |
| # Compile Vega-Lite spec to Vega | |
| compiler = vegalite_compilers.get() | |
| if compiler is None: | |
| raise ValueError("No active vega-lite compiler plugin found") | |
| vega_spec = compiler(vegalite_spec) | |
| # Retrieve dict of inline tables referenced by the spec | |
| inline_tables = get_inline_tables(vega_spec) | |
| # Pre-evaluate transforms in vega spec with vegafusion | |
| row_limit = data_transformers.options.get("max_rows", None) | |
| transformed_vega_spec, warnings = vf.runtime.pre_transform_spec( | |
| vega_spec, | |
| vf.get_local_tz(), | |
| inline_datasets=inline_tables, | |
| row_limit=row_limit, | |
| ) | |
| # Check from row limit warning and convert to MaxRowsError | |
| for warning in warnings: | |
| if warning.get("type") == "RowLimitExceeded": | |
| raise MaxRowsError( | |
| "The number of dataset rows after filtering and aggregation exceeds\n" | |
| f"the current limit of {row_limit}. Try adding an aggregation to reduce\n" | |
| "the size of the dataset that must be loaded into the browser. Or, disable\n" | |
| "the limit by calling alt.data_transformers.disable_max_rows(). Note that\n" | |
| "disabling this limit may cause the browser to freeze or crash." | |
| ) | |
| return transformed_vega_spec | |
| def using_vegafusion() -> bool: | |
| """Check whether the vegafusion data transformer is enabled""" | |
| # Local import to avoid circular ImportError | |
| from altair import data_transformers | |
| return data_transformers.active == "vegafusion" | |