Spaces:
Running
Running
| # /// script | |
| # requires-python = ">=3.13" | |
| # dependencies = [ | |
| # "marimo", | |
| # "polars==1.29.0", | |
| # "pyarrow==20.0.0", | |
| # "pyiceberg==0.9.1", | |
| # "sqlalchemy==2.0.40", | |
| # ] | |
| # /// | |
| import marimo | |
| __generated_with = "0.13.7" | |
| app = marimo.App(width="full") | |
| def _(): | |
| import marimo as mo | |
| import sqlalchemy | |
| import polars as pl | |
| from pathlib import Path | |
| from pyiceberg.partitioning import PartitionSpec, PartitionField | |
| from pyiceberg.transforms import IdentityTransform | |
| return IdentityTransform, PartitionField, PartitionSpec, mo, pl | |
| def _(): | |
| from pyiceberg.catalog import load_catalog | |
| warehouse_path = "warehouse" | |
| catalog = load_catalog( | |
| "default", | |
| **{ | |
| 'type': 'sql', | |
| "uri": f"sqlite:///{warehouse_path}/iceberg.db", | |
| "warehouse": f"file://{warehouse_path}", | |
| }, | |
| ) | |
| return (catalog,) | |
| def _(pl): | |
| df_taxi = pl.read_csv("yellow_tripdata_2015-01.csv.zip").to_arrow() | |
| return (df_taxi,) | |
| def _(df_taxi): | |
| df_taxi.group_by("passenger_count").aggregate([([], "count_all")]) | |
| return | |
| def _(IdentityTransform, PartitionField, PartitionSpec): | |
| spec = PartitionSpec( | |
| PartitionField(source_id=3, field_id=1000, name="passenger_count", transform=IdentityTransform()) | |
| ) | |
| return | |
| def _(df_taxi): | |
| df_taxi.schema | |
| return | |
| def _(catalog, df_taxi): | |
| catalog.create_namespace_if_not_exists("default") | |
| table = catalog.create_table_if_not_exists( | |
| "default.taxi", | |
| schema=df_taxi.schema, | |
| ) | |
| return (table,) | |
| def _(df_taxi, table): | |
| if not table.current_snapshot(): | |
| table.append(df_taxi) | |
| return | |
| def _(catalog): | |
| ( | |
| catalog | |
| .load_table("default.taxi") | |
| .to_polars() | |
| .group_by("passenger_count") | |
| .len() | |
| .sort("passenger_count") | |
| .collect() | |
| ) | |
| return | |
| def _(pl): | |
| pl.scan_csv("yellow_tripdata_2015-01.csv").group_by("passenger_count").len().sort("passenger_count").collect() | |
| return | |
| def _(pl): | |
| pl.read_csv("yellow_tripdata_2015-01.csv").group_by("passenger_count").len().sort("passenger_count") | |
| return | |
| def _(mo): | |
| mo.md(r"""The partition is great, but the comparison with `read_csv` is a bit unfair. Let's convert the `.csv` file to `.parquet` and also add a partition in polars with statistics. """) | |
| return | |
| def _(pl): | |
| pl.read_csv("yellow_tripdata_2015-01.csv").write_parquet("taxi.parquet", partition_by=["passenger_count"], statistics=True) | |
| return | |
| def _(pl): | |
| pl.scan_parquet("taxi.parquet").group_by("passenger_count").len().sort("passenger_count").collect() | |
| return | |
| def _(pl): | |
| pl.read_parquet("taxi.parquet").group_by("passenger_count").len().sort("passenger_count") | |
| return | |
| def _(): | |
| return | |
| if __name__ == "__main__": | |
| app.run() | |