Spaces:
Running
Running
# /// script | |
# requires-python = ">=3.13" | |
# dependencies = [ | |
# "marimo", | |
# "polars==1.29.0", | |
# "pyarrow==20.0.0", | |
# "pyiceberg==0.9.1", | |
# "sqlalchemy==2.0.40", | |
# ] | |
# /// | |
import marimo | |
__generated_with = "0.13.7" | |
app = marimo.App(width="full") | |
def _(): | |
import marimo as mo | |
import sqlalchemy | |
import polars as pl | |
from pathlib import Path | |
from pyiceberg.partitioning import PartitionSpec, PartitionField | |
from pyiceberg.transforms import IdentityTransform | |
return IdentityTransform, PartitionField, PartitionSpec, mo, pl | |
def _(): | |
from pyiceberg.catalog import load_catalog | |
warehouse_path = "warehouse" | |
catalog = load_catalog( | |
"default", | |
**{ | |
'type': 'sql', | |
"uri": f"sqlite:///{warehouse_path}/iceberg.db", | |
"warehouse": f"file://{warehouse_path}", | |
}, | |
) | |
return (catalog,) | |
def _(pl): | |
df_taxi = pl.read_csv("yellow_tripdata_2015-01.csv").to_arrow() | |
return (df_taxi,) | |
def _(df_taxi): | |
df_taxi.group_by("passenger_count").aggregate([([], "count_all")]) | |
return | |
def _(IdentityTransform, PartitionField, PartitionSpec): | |
spec = PartitionSpec( | |
PartitionField(source_id=3, field_id=1000, name="passenger_count", transform=IdentityTransform()) | |
) | |
return | |
def _(df_taxi): | |
df_taxi.schema | |
return | |
def _(catalog, df_taxi): | |
catalog.create_namespace_if_not_exists("default") | |
table = catalog.create_table_if_not_exists( | |
"default.taxi", | |
schema=df_taxi.schema, | |
) | |
return (table,) | |
def _(df_taxi, table): | |
if not table.current_snapshot(): | |
table.append(df_taxi) | |
return | |
def _(catalog): | |
( | |
catalog | |
.load_table("default.taxi") | |
.to_polars() | |
.group_by("passenger_count") | |
.len() | |
.sort("passenger_count") | |
.collect() | |
) | |
return | |
def _(pl): | |
pl.scan_csv("yellow_tripdata_2015-01.csv").group_by("passenger_count").len().sort("passenger_count").collect() | |
return | |
def _(pl): | |
pl.read_csv("yellow_tripdata_2015-01.csv").group_by("passenger_count").len().sort("passenger_count") | |
return | |
def _(mo): | |
mo.md(r"""The partition is great, but the comparison with `read_csv` is a bit unfair. Let's convert the `.csv` file to `.parquet` and also add a partition in polars with statistics. """) | |
return | |
def _(pl): | |
pl.read_csv("yellow_tripdata_2015-01.csv").write_parquet("taxi.parquet", partition_by=["passenger_count"], statistics=True) | |
return | |
def _(pl): | |
pl.scan_parquet("taxi.parquet").group_by("passenger_count").len().sort("passenger_count").collect() | |
return | |
def _(pl): | |
pl.read_parquet("taxi.parquet").group_by("passenger_count").len().sort("passenger_count") | |
return | |
def _(): | |
return | |
if __name__ == "__main__": | |
app.run() | |