Spaces:
Running
Running
File size: 3,035 Bytes
5b120a4 37429b9 5b120a4 37429b9 5b120a4 37429b9 5b120a4 37429b9 5b120a4 37429b9 5b120a4 37429b9 5b120a4 37429b9 5b120a4 37429b9 5b120a4 37429b9 5b120a4 37429b9 5b120a4 37429b9 5b120a4 37429b9 5b120a4 37429b9 5b120a4 37429b9 5b120a4 37429b9 5b120a4 37429b9 5b120a4 37429b9 5b120a4 37429b9 5b120a4 37429b9 5b120a4 37429b9 5b120a4 37429b9 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 |
# /// script
# requires-python = ">=3.13"
# dependencies = [
# "marimo",
# "polars==1.29.0",
# "pyarrow==20.0.0",
# "pyiceberg==0.9.1",
# "sqlalchemy==2.0.40",
# ]
# ///
import marimo
__generated_with = "0.13.7"
app = marimo.App(width="full")
@app.cell
def _():
import marimo as mo
import sqlalchemy
import polars as pl
from pathlib import Path
from pyiceberg.partitioning import PartitionSpec, PartitionField
from pyiceberg.transforms import IdentityTransform
return IdentityTransform, PartitionField, PartitionSpec, mo, pl
@app.cell
def _():
from pyiceberg.catalog import load_catalog
warehouse_path = "warehouse"
catalog = load_catalog(
"default",
**{
'type': 'sql',
"uri": f"sqlite:///{warehouse_path}/iceberg.db",
"warehouse": f"file://{warehouse_path}",
},
)
return (catalog,)
@app.cell
def _(pl):
df_taxi = pl.read_csv("yellow_tripdata_2015-01.csv").to_arrow()
return (df_taxi,)
@app.cell
def _(df_taxi):
df_taxi.group_by("passenger_count").aggregate([([], "count_all")])
return
@app.cell
def _(IdentityTransform, PartitionField, PartitionSpec):
spec = PartitionSpec(
PartitionField(source_id=3, field_id=1000, name="passenger_count", transform=IdentityTransform())
)
return
@app.cell
def _(df_taxi):
df_taxi.schema
return
@app.cell
def _(catalog, df_taxi):
catalog.create_namespace_if_not_exists("default")
table = catalog.create_table_if_not_exists(
"default.taxi",
schema=df_taxi.schema,
)
return (table,)
@app.cell
def _(df_taxi, table):
if not table.current_snapshot():
table.append(df_taxi)
return
@app.cell
def _(catalog):
(
catalog
.load_table("default.taxi")
.to_polars()
.group_by("passenger_count")
.len()
.sort("passenger_count")
.collect()
)
return
@app.cell
def _(pl):
pl.scan_csv("yellow_tripdata_2015-01.csv").group_by("passenger_count").len().sort("passenger_count").collect()
return
@app.cell
def _(pl):
pl.read_csv("yellow_tripdata_2015-01.csv").group_by("passenger_count").len().sort("passenger_count")
return
@app.cell(hide_code=True)
def _(mo):
mo.md(r"""The partition is great, but the comparison with `read_csv` is a bit unfair. Let's convert the `.csv` file to `.parquet` and also add a partition in polars with statistics. """)
return
@app.cell
def _(pl):
pl.read_csv("yellow_tripdata_2015-01.csv").write_parquet("taxi.parquet", partition_by=["passenger_count"], statistics=True)
return
@app.cell
def _(pl):
pl.scan_parquet("taxi.parquet").group_by("passenger_count").len().sort("passenger_count").collect()
return
@app.cell
def _(pl):
pl.read_parquet("taxi.parquet").group_by("passenger_count").len().sort("passenger_count")
return
@app.cell
def _():
return
if __name__ == "__main__":
app.run()
|