iceberg / app.py
koaning's picture
Update app.py
5b120a4 verified
raw
history blame
3.04 kB
# /// script
# requires-python = ">=3.13"
# dependencies = [
# "marimo",
# "polars==1.29.0",
# "pyarrow==20.0.0",
# "pyiceberg==0.9.1",
# "sqlalchemy==2.0.40",
# ]
# ///
import marimo
__generated_with = "0.13.7"
app = marimo.App(width="full")
@app.cell
def _():
import marimo as mo
import sqlalchemy
import polars as pl
from pathlib import Path
from pyiceberg.partitioning import PartitionSpec, PartitionField
from pyiceberg.transforms import IdentityTransform
return IdentityTransform, PartitionField, PartitionSpec, mo, pl
@app.cell
def _():
from pyiceberg.catalog import load_catalog
warehouse_path = "warehouse"
catalog = load_catalog(
"default",
**{
'type': 'sql',
"uri": f"sqlite:///{warehouse_path}/iceberg.db",
"warehouse": f"file://{warehouse_path}",
},
)
return (catalog,)
@app.cell
def _(pl):
df_taxi = pl.read_csv("yellow_tripdata_2015-01.csv").to_arrow()
return (df_taxi,)
@app.cell
def _(df_taxi):
df_taxi.group_by("passenger_count").aggregate([([], "count_all")])
return
@app.cell
def _(IdentityTransform, PartitionField, PartitionSpec):
spec = PartitionSpec(
PartitionField(source_id=3, field_id=1000, name="passenger_count", transform=IdentityTransform())
)
return
@app.cell
def _(df_taxi):
df_taxi.schema
return
@app.cell
def _(catalog, df_taxi):
catalog.create_namespace_if_not_exists("default")
table = catalog.create_table_if_not_exists(
"default.taxi",
schema=df_taxi.schema,
)
return (table,)
@app.cell
def _(df_taxi, table):
if not table.current_snapshot():
table.append(df_taxi)
return
@app.cell
def _(catalog):
(
catalog
.load_table("default.taxi")
.to_polars()
.group_by("passenger_count")
.len()
.sort("passenger_count")
.collect()
)
return
@app.cell
def _(pl):
pl.scan_csv("yellow_tripdata_2015-01.csv").group_by("passenger_count").len().sort("passenger_count").collect()
return
@app.cell
def _(pl):
pl.read_csv("yellow_tripdata_2015-01.csv").group_by("passenger_count").len().sort("passenger_count")
return
@app.cell(hide_code=True)
def _(mo):
mo.md(r"""The partition is great, but the comparison with `read_csv` is a bit unfair. Let's convert the `.csv` file to `.parquet` and also add a partition in polars with statistics. """)
return
@app.cell
def _(pl):
pl.read_csv("yellow_tripdata_2015-01.csv").write_parquet("taxi.parquet", partition_by=["passenger_count"], statistics=True)
return
@app.cell
def _(pl):
pl.scan_parquet("taxi.parquet").group_by("passenger_count").len().sort("passenger_count").collect()
return
@app.cell
def _(pl):
pl.read_parquet("taxi.parquet").group_by("passenger_count").len().sort("passenger_count")
return
@app.cell
def _():
return
if __name__ == "__main__":
app.run()