Expand description
Iceberg sink implementation.
This code renders a IcebergSinkConnection into a dataflow that writes
data to an Iceberg table. The dataflow consists of three operators:
┏━━━━━━━━━━━━━━┓
┃ persist ┃
┃ source ┃
┗━━━━━━┯━━━━━━━┛
│ row data, the input to this module
│
┏━━━━━━v━━━━━━━┓
┃ mint ┃ (single worker)
┃ batch ┃ loads/creates the Iceberg table,
┃ descriptions ┃ determines resume upper
┗━━━┯━━━━━┯━━━━┛
│ │ batch descriptions (broadcast)
rows │ ├─────────────────────────┐
│ │ │
┏━━━v━━━━━v━━━━┓ ╭─────────────╮ │
┃ write ┃───>│ S3 / object │ │
┃ data files ┃ │ storage │ │
┗━━━━━━┯━━━━━━━┛ ╰─────────────╯ │
│ file metadata │
│ │
┏━━━━━━v━━━━━━━━━━━━━━━━━━━━━━━━━━━━v┓
┃ commit to ┃ (single worker)
┃ iceberg ┃
┗━━━━━━━━━━━━━━┯━━━━━━━━━━━━━━━━━━━━━┛
│
╭───────v───────╮
│ Iceberg table │
│ (snapshots) │
╰───────────────╯§Minting batch descriptions
The “mint batch descriptions” operator is responsible for generating
time-based batch boundaries that group writes into Iceberg snapshots.
It maintains a sliding window of future batch descriptions so that
writers can start processing data even while earlier batches are still being written.
Knowing the batch boundaries ahead of time is important because we need to
be able to make the claim that all data files written for a given batch
include all data up to the upper t but not beyond it.
This could be trivially achieved by waiting for all data to arrive up to a certain
frontier, but that would prevent us from streaming writes out to object storage
until the entire batch is complete, which would increase latency and reduce throughput.
§Writing data files
The “write data files” operator receives rows along with batch descriptions.
It matches rows to batches by timestamp; if a batch description hasn’t arrived yet,
rows are stashed until it does. This allows batches to be minted ahead of data arrival.
The operator uses an Iceberg DeltaWriter to write Parquet data files
(and position delete files if necessary) to object storage.
It outputs metadata about the written files along with their batch descriptions
for the commit operator to consume.
§Committing to Iceberg
The “commit to iceberg” operator receives metadata about written data files
along with their batch descriptions. It groups files by batch and creates
Iceberg snapshots that include all files for each batch. It updates the Iceberg
table’s metadata to reflect the new snapshots, including updating the
mz-frontier property to track progress.
Structs§
- Avro
Data 🔒File - A wrapper around Iceberg’s DataFile that implements Serialize and Deserialize. This is slightly complicated by the fact that Iceberg’s DataFile doesn’t implement these traits directly, so we serialize to/from Avro bytes (which Iceberg supports natively). The avro ser(de) also requires the Iceberg schema to be provided, so we include that as well. It is distinctly possible that this is overkill, but it avoids re-implementing Iceberg’s serialization logic here. If at some point this becomes a serious overhead, we can revisit this decision.
- Bounded
Data 🔒File - A DataFile along with its associated batch description (lower and upper bounds).
- Bounded
Data 🔒File Set - A set of DataFiles along with their associated batch descriptions.
- Serializable
Data 🔒File
Constants§
- DEFAULT_
ARRAY_ 🔒BUILDER_ DATA_ CAPACITY - Set the default buffer capacity for the string and binary array builders inside the ArrowBuilder. This is the number of bytes each builder can hold before it needs to allocate more memory.
- DEFAULT_
ARRAY_ 🔒BUILDER_ ITEM_ CAPACITY - Set the default capacity for the array builders inside the ArrowBuilder. This is the number of items each builder can hold before it needs to allocate more memory.
- INITIAL_
DESCRIPTIONS_ 🔒TO_ MINT - The number of batch descriptions to mint ahead of the observed frontier. This determines how many batches we have in-flight at any given time.
- PARQUET_
FILE_ 🔒PREFIX - The prefix for Parquet files written by this sink.
Functions§
- add_
field_ 🔒ids_ to_ arrow_ schema - Add Parquet field IDs to an Arrow schema. Iceberg requires field IDs in the Parquet metadata for schema evolution tracking. TODO: Support nested data types with proper field IDs.
- build_
schema_ 🔒with_ op_ column - Build a new Arrow schema by adding an __op column to the existing schema.
- commit_
to_ 🔒iceberg - Commit completed batches to Iceberg as snapshots. Batches are committed in timestamp order to ensure strong consistency guarantees downstream. Each snapshot includes the Materialize frontier in its metadata for resume support.
- load_
or_ 🔒create_ table - Load an existing Iceberg table or create it if it doesn’t exist.
- merge_
materialize_ 🔒metadata_ into_ iceberg_ schema - Merge Materialize extension metadata into Iceberg’s Arrow schema. This uses Iceberg’s data types (e.g. Utf8) and field IDs while preserving Materialize’s extension names for ArrowBuilder compatibility.
- mint_
batch_ 🔒descriptions - Generate time-based batch boundaries for grouping writes into Iceberg snapshots. Batches are minted with configurable windows to balance write efficiency with latency. We maintain a sliding window of future batch descriptions so writers can start processing data even while earlier batches are still being written.
- relation_
desc_ 🔒to_ iceberg_ schema - Convert a Materialize RelationDesc into Arrow and Iceberg schemas. Returns the Arrow schema (with field IDs) for writing Parquet files, and the Iceberg schema for table creation/validation.
- reload_
table 🔒 - retrieve_
upper_ 🔒from_ snapshots - Find the most recent Materialize frontier from Iceberg snapshots. We store the frontier in snapshot metadata to track where we left off after restarts. Snapshots with operation=“replace” (compactions) don’t have our metadata and are skipped. The input slice will be sorted by sequence number in descending order.
- row_
to_ 🔒recordbatch - Convert a Materialize DiffPair into an Arrow RecordBatch with an __op column. The __op column indicates whether each row is an insert (1) or delete (-1), which the DeltaWriter uses to generate the appropriate Iceberg data/delete files.
- write_
data_ 🔒files - Write rows into Parquet data files bounded by batch descriptions. Rows are matched to batches by timestamp; if a batch description hasn’t arrived yet, rows are stashed until it does. This allows batches to be minted ahead of data arrival.