Expand description
Iceberg sink implementation.
This code renders a IcebergSinkConnection into a dataflow that writes
data to an Iceberg table. The dataflow consists of three operators:
┏━━━━━━━━━━━━━━┓
┃ persist ┃
┃ source ┃
┗━━━━━━┯━━━━━━━┛
│ row data, the input to this module
│
┏━━━━━━v━━━━━━━┓
┃ mint ┃ (single worker)
┃ batch ┃ loads/creates the Iceberg table,
┃ descriptions ┃ determines resume upper
┗━━━┯━━━━━┯━━━━┛
│ │ batch descriptions (broadcast)
rows │ ├─────────────────────────┐
│ │ │
┏━━━v━━━━━v━━━━┓ ╭─────────────╮ │
┃ write ┃───>│ S3 / object │ │
┃ data files ┃ │ storage │ │
┗━━━━━━┯━━━━━━━┛ ╰─────────────╯ │
│ file metadata │
│ │
┏━━━━━━v━━━━━━━━━━━━━━━━━━━━━━━━━━━━v┓
┃ commit to ┃ (single worker)
┃ iceberg ┃
┗━━━━━━━━━━━━━┯━━━━━━━━━━━━━━━━━━━━━━┛
│
╭───────v───────╮
│ Iceberg table │
│ (snapshots) │
╰───────────────╯§Minting batch descriptions
The “mint batch descriptions” operator is responsible for generating
time-based batch boundaries that group writes into Iceberg snapshots.
It maintains a sliding window of future batch descriptions so that
writers can start processing data even while earlier batches are still being written.
Knowing the batch boundaries ahead of time is important because we need to
be able to make the claim that all data files written for a given batch
include all data up to the upper t but not beyond it.
This could be trivially achieved by waiting for all data to arrive up to a certain
frontier, but that would prevent us from streaming writes out to object storage
until the entire batch is complete, which would increase latency and reduce throughput.
§Writing data files
The “write data files” operator receives rows along with batch descriptions.
It matches rows to batches by timestamp; if a batch description hasn’t arrived yet,
rows are stashed until it does. This allows batches to be minted ahead of data arrival.
The operator uses an Iceberg DeltaWriter to write Parquet data files
(and position delete files if necessary) to object storage.
It outputs metadata about the written files along with their batch descriptions
for the commit operator to consume.
§Committing to Iceberg
The “commit to iceberg” operator receives metadata about written data files
along with their batch descriptions. It groups files by batch and creates
Iceberg snapshots that include all files for each batch. It updates the Iceberg
table’s metadata to reflect the new snapshots, including updating the
mz-frontier property to track progress.
Structs§
- Append
Envelope 🔒Handler - Avro
Data 🔒File - A wrapper around Iceberg’s DataFile that implements Serialize and Deserialize. This is slightly complicated by the fact that Iceberg’s DataFile doesn’t implement these traits directly, so we serialize to/from Avro bytes (which Iceberg supports natively). The avro ser(de) also requires the Iceberg schema to be provided, so we include that as well. It is distinctly possible that this is overkill, but it avoids re-implementing Iceberg’s serialization logic here. If at some point this becomes a serious overhead, we can revisit this decision.
- Bounded
Data 🔒File - A DataFile along with its associated batch description (lower and upper bounds).
- Bounded
Data 🔒File Set - A set of DataFiles along with their associated batch descriptions.
- Serializable
Data 🔒File - Upsert
Envelope 🔒Handler - Writer
Context 🔒 - Shared state produced by the async setup in
write_data_filesthat both envelope handlers need to construct Parquet writers.
Constants§
- DEFAULT_
ARRAY_ 🔒BUILDER_ DATA_ CAPACITY - Set the default buffer capacity for the string and binary array builders inside the ArrowBuilder. This is the number of bytes each builder can hold before it needs to allocate more memory.
- DEFAULT_
ARRAY_ 🔒BUILDER_ ITEM_ CAPACITY - Set the default capacity for the array builders inside the ArrowBuilder. This is the number of items each builder can hold before it needs to allocate more memory.
- ICEBERG_
UINT64_ 🔒DECIMAL_ PRECISION - The precision needed to store all UInt64 values in a Decimal128. UInt64 max value is 18,446,744,073,709,551,615 which has 20 digits.
- INITIAL_
DESCRIPTIONS_ 🔒TO_ MINT - The number of batch descriptions to mint ahead of the observed frontier. This determines how many batches we have in-flight at any given time.
- PARQUET_
FILE_ 🔒PREFIX - The prefix for Parquet files written by this sink.
Traits§
- Envelope
Handler 🔒 - Envelope-specific logic for writing Iceberg data files.
Functions§
- add_
field_ 🔒ids_ recursive - Recursively add field IDs to a field and all its nested children.
- add_
field_ 🔒ids_ to_ arrow_ schema - Add Parquet field IDs to an Arrow schema. Iceberg requires field IDs in the Parquet metadata for schema evolution tracking. Field IDs are assigned recursively to all nested fields (structs, lists, maps) using a depth-first, pre-order traversal.
- add_
field_ 🔒ids_ to_ datatype - Add field IDs to nested fields within a DataType.
- build_
schema_ 🔒with_ append_ columns - Build a new Arrow schema by appending
_mz_diff(Int32) and_mz_timestamp(Int64) columns. These are user-visible Iceberg columns written in append mode. Parquet field IDs are assigned sequentially after the existing maximum field ID so the extended schema can be converted to a valid Iceberg schema viaarrow_schema_to_schema. - build_
schema_ 🔒with_ op_ column - Build a new Arrow schema by adding an __op column to the existing schema.
- commit_
to_ 🔒iceberg - Commit completed batches to Iceberg as snapshots. Batches are committed in timestamp order to ensure strong consistency guarantees downstream. Each snapshot includes the Materialize frontier in its metadata for resume support.
- equality_
ids_ 🔒for_ indices - Resolve Materialize key column indexes to Iceberg top-level field IDs.
- iceberg_
type_ 🔒overrides - Type overrides for Iceberg-compatible Arrow schemas.
- load_
or_ 🔒create_ table - Load an existing Iceberg table or create it if it doesn’t exist.
- merge_
field_ 🔒metadata_ recursive - Recursively merge Materialize extension metadata into an Iceberg field.
- merge_
materialize_ 🔒metadata_ into_ iceberg_ schema - Merge Materialize extension metadata into Iceberg’s Arrow schema. This uses Iceberg’s data types (e.g. Utf8) and field IDs while preserving Materialize’s extension names for ArrowBuilder compatibility. Handles nested types (structs, lists, maps) recursively.
- mint_
batch_ 🔒descriptions - Generate time-based batch boundaries for grouping writes into Iceberg snapshots. Batches are minted with configurable windows to balance write efficiency with latency. We maintain a sliding window of future batch descriptions so writers can start processing data even while earlier batches are still being written.
- relation_
desc_ 🔒to_ iceberg_ schema - Convert a Materialize RelationDesc into Arrow and Iceberg schemas.
- reload_
table 🔒 - retrieve_
upper_ 🔒from_ snapshots - Find the most recent Materialize frontier from Iceberg snapshots. We store the frontier in snapshot metadata to track where we left off after restarts. Snapshots with operation=“replace” (compactions) don’t have our metadata and are skipped. The input slice will be sorted by sequence number in descending order.
- try_
commit_ 🔒batch - Attempt a single commit of a batch of data files to an Iceberg table. On conflict or failure, reloads the table and returns a retryable error. On success, returns the updated table state.
- write_
data_ 🔒files - Construct the envelope-specific closures that
write_data_filesneeds.