Skip to main content

Module iceberg

Module iceberg 

Source
Expand description

Iceberg sink implementation.

This code renders a IcebergSinkConnection into a dataflow that writes data to an Iceberg table. The dataflow consists of three operators:

       ┏━━━━━━━━━━━━━━┓
       ┃   persist    ┃
       ┃    source    ┃
       ┗━━━━━━┯━━━━━━━┛
              │ row data, the input to this module
              │
       ┏━━━━━━v━━━━━━━┓
       ┃     mint     ┃ (single worker)
       ┃    batch     ┃ loads/creates the Iceberg table,
       ┃ descriptions ┃ determines resume upper
       ┗━━━┯━━━━━┯━━━━┛
           │     │ batch descriptions (broadcast)
      rows │     ├─────────────────────────┐
           │     │                         │
       ┏━━━v━━━━━v━━━━┓    ╭─────────────╮ │
       ┃    write     ┃───>│ S3 / object │ │
       ┃  data files  ┃    │   storage   │ │
       ┗━━━━━━┯━━━━━━━┛    ╰─────────────╯ │
              │ file metadata              │
              │                            │
       ┏━━━━━━v━━━━━━━━━━━━━━━━━━━━━━━━━━━━v┓
       ┃           commit to                ┃ (single worker)
       ┃             iceberg                ┃
       ┗━━━━━━━━━━━━━┯━━━━━━━━━━━━━━━━━━━━━━┛
                     │
             ╭───────v───────╮
             │ Iceberg table │
             │  (snapshots)  │
             ╰───────────────╯

§Minting batch descriptions

The “mint batch descriptions” operator is responsible for generating time-based batch boundaries that group writes into Iceberg snapshots. It maintains a sliding window of future batch descriptions so that writers can start processing data even while earlier batches are still being written. Knowing the batch boundaries ahead of time is important because we need to be able to make the claim that all data files written for a given batch include all data up to the upper t but not beyond it. This could be trivially achieved by waiting for all data to arrive up to a certain frontier, but that would prevent us from streaming writes out to object storage until the entire batch is complete, which would increase latency and reduce throughput.

§Writing data files

The “write data files” operator receives rows along with batch descriptions. It matches rows to batches by timestamp; if a batch description hasn’t arrived yet, rows are stashed until it does. This allows batches to be minted ahead of data arrival. The operator uses an Iceberg DeltaWriter to write Parquet data files (and position delete files if necessary) to object storage. It outputs metadata about the written files along with their batch descriptions for the commit operator to consume.

§Committing to Iceberg

The “commit to iceberg” operator receives metadata about written data files along with their batch descriptions. It groups files by batch and creates Iceberg snapshots that include all files for each batch. It updates the Iceberg table’s metadata to reflect the new snapshots, including updating the mz-frontier property to track progress.

Structs§

AppendEnvelopeHandler 🔒
AvroDataFile 🔒
A wrapper around Iceberg’s DataFile that implements Serialize and Deserialize. This is slightly complicated by the fact that Iceberg’s DataFile doesn’t implement these traits directly, so we serialize to/from Avro bytes (which Iceberg supports natively). The avro ser(de) also requires the Iceberg schema to be provided, so we include that as well. It is distinctly possible that this is overkill, but it avoids re-implementing Iceberg’s serialization logic here. If at some point this becomes a serious overhead, we can revisit this decision.
BoundedDataFile 🔒
A DataFile along with its associated batch description (lower and upper bounds).
BoundedDataFileSet 🔒
A set of DataFiles along with their associated batch descriptions.
SerializableDataFile 🔒
UpsertEnvelopeHandler 🔒
WriterContext 🔒
Shared state produced by the async setup in write_data_files that both envelope handlers need to construct Parquet writers.

Constants§

DEFAULT_ARRAY_BUILDER_DATA_CAPACITY 🔒
Set the default buffer capacity for the string and binary array builders inside the ArrowBuilder. This is the number of bytes each builder can hold before it needs to allocate more memory.
DEFAULT_ARRAY_BUILDER_ITEM_CAPACITY 🔒
Set the default capacity for the array builders inside the ArrowBuilder. This is the number of items each builder can hold before it needs to allocate more memory.
ICEBERG_UINT64_DECIMAL_PRECISION 🔒
The precision needed to store all UInt64 values in a Decimal128. UInt64 max value is 18,446,744,073,709,551,615 which has 20 digits.
INITIAL_DESCRIPTIONS_TO_MINT 🔒
The number of batch descriptions to mint ahead of the observed frontier. This determines how many batches we have in-flight at any given time.
PARQUET_FILE_PREFIX 🔒
The prefix for Parquet files written by this sink.

Traits§

EnvelopeHandler 🔒
Envelope-specific logic for writing Iceberg data files.

Functions§

add_field_ids_recursive 🔒
Recursively add field IDs to a field and all its nested children.
add_field_ids_to_arrow_schema 🔒
Add Parquet field IDs to an Arrow schema. Iceberg requires field IDs in the Parquet metadata for schema evolution tracking. Field IDs are assigned recursively to all nested fields (structs, lists, maps) using a depth-first, pre-order traversal.
add_field_ids_to_datatype 🔒
Add field IDs to nested fields within a DataType.
build_schema_with_append_columns 🔒
Build a new Arrow schema by appending _mz_diff (Int32) and _mz_timestamp (Int64) columns. These are user-visible Iceberg columns written in append mode. Parquet field IDs are assigned sequentially after the existing maximum field ID so the extended schema can be converted to a valid Iceberg schema via arrow_schema_to_schema.
build_schema_with_op_column 🔒
Build a new Arrow schema by adding an __op column to the existing schema.
commit_to_iceberg 🔒
Commit completed batches to Iceberg as snapshots. Batches are committed in timestamp order to ensure strong consistency guarantees downstream. Each snapshot includes the Materialize frontier in its metadata for resume support.
equality_ids_for_indices 🔒
Resolve Materialize key column indexes to Iceberg top-level field IDs.
iceberg_type_overrides 🔒
Type overrides for Iceberg-compatible Arrow schemas.
load_or_create_table 🔒
Load an existing Iceberg table or create it if it doesn’t exist.
merge_field_metadata_recursive 🔒
Recursively merge Materialize extension metadata into an Iceberg field.
merge_materialize_metadata_into_iceberg_schema 🔒
Merge Materialize extension metadata into Iceberg’s Arrow schema. This uses Iceberg’s data types (e.g. Utf8) and field IDs while preserving Materialize’s extension names for ArrowBuilder compatibility. Handles nested types (structs, lists, maps) recursively.
mint_batch_descriptions 🔒
Generate time-based batch boundaries for grouping writes into Iceberg snapshots. Batches are minted with configurable windows to balance write efficiency with latency. We maintain a sliding window of future batch descriptions so writers can start processing data even while earlier batches are still being written.
relation_desc_to_iceberg_schema 🔒
Convert a Materialize RelationDesc into Arrow and Iceberg schemas.
reload_table 🔒
retrieve_upper_from_snapshots 🔒
Find the most recent Materialize frontier from Iceberg snapshots. We store the frontier in snapshot metadata to track where we left off after restarts. Snapshots with operation=“replace” (compactions) don’t have our metadata and are skipped. The input slice will be sorted by sequence number in descending order.
try_commit_batch 🔒
Attempt a single commit of a batch of data files to an Iceberg table. On conflict or failure, reloads the table and returns a retryable error. On success, returns the updated table state.
write_data_files 🔒
Construct the envelope-specific closures that write_data_files needs.