Module iceberg

Source
Expand description

Iceberg sink implementation.

This code renders a IcebergSinkConnection into a dataflow that writes data to an Iceberg table. The dataflow consists of three operators:

       ┏━━━━━━━━━━━━━━┓
       ┃   persist    ┃
       ┃    source    ┃
       ┗━━━━━━┯━━━━━━━┛
              │ row data, the input to this module
              │
       ┏━━━━━━v━━━━━━━┓
       ┃     mint     ┃ (single worker)
       ┃    batch     ┃ loads/creates the Iceberg table,
       ┃ descriptions ┃ determines resume upper
       ┗━━━┯━━━━━┯━━━━┛
           │     │ batch descriptions (broadcast)
      rows │     ├─────────────────────────┐
           │     │                         │
       ┏━━━v━━━━━v━━━━┓    ╭─────────────╮ │
       ┃    write     ┃───>│ S3 / object │ │
       ┃  data files  ┃    │   storage   │ │
       ┗━━━━━━┯━━━━━━━┛    ╰─────────────╯ │
              │ file metadata              │
              │                            │
       ┏━━━━━━v━━━━━━━━━━━━━━━━━━━━━━━━━━━━v┓
       ┃           commit to               ┃ (single worker)
       ┃             iceberg               ┃
       ┗━━━━━━━━━━━━━━┯━━━━━━━━━━━━━━━━━━━━━┛
                     │
             ╭───────v───────╮
             │ Iceberg table │
             │  (snapshots)  │
             ╰───────────────╯

§Minting batch descriptions

The “mint batch descriptions” operator is responsible for generating time-based batch boundaries that group writes into Iceberg snapshots. It maintains a sliding window of future batch descriptions so that writers can start processing data even while earlier batches are still being written. Knowing the batch boundaries ahead of time is important because we need to be able to make the claim that all data files written for a given batch include all data up to the upper t but not beyond it. This could be trivially achieved by waiting for all data to arrive up to a certain frontier, but that would prevent us from streaming writes out to object storage until the entire batch is complete, which would increase latency and reduce throughput.

§Writing data files

The “write data files” operator receives rows along with batch descriptions. It matches rows to batches by timestamp; if a batch description hasn’t arrived yet, rows are stashed until it does. This allows batches to be minted ahead of data arrival. The operator uses an Iceberg DeltaWriter to write Parquet data files (and position delete files if necessary) to object storage. It outputs metadata about the written files along with their batch descriptions for the commit operator to consume.

§Committing to Iceberg

The “commit to iceberg” operator receives metadata about written data files along with their batch descriptions. It groups files by batch and creates Iceberg snapshots that include all files for each batch. It updates the Iceberg table’s metadata to reflect the new snapshots, including updating the mz-frontier property to track progress.

Structs§

AvroDataFile 🔒
A wrapper around Iceberg’s DataFile that implements Serialize and Deserialize. This is slightly complicated by the fact that Iceberg’s DataFile doesn’t implement these traits directly, so we serialize to/from Avro bytes (which Iceberg supports natively). The avro ser(de) also requires the Iceberg schema to be provided, so we include that as well. It is distinctly possible that this is overkill, but it avoids re-implementing Iceberg’s serialization logic here. If at some point this becomes a serious overhead, we can revisit this decision.
BoundedDataFile 🔒
A DataFile along with its associated batch description (lower and upper bounds).
BoundedDataFileSet 🔒
A set of DataFiles along with their associated batch descriptions.
SerializableDataFile 🔒

Constants§

DEFAULT_ARRAY_BUILDER_DATA_CAPACITY 🔒
Set the default buffer capacity for the string and binary array builders inside the ArrowBuilder. This is the number of bytes each builder can hold before it needs to allocate more memory.
DEFAULT_ARRAY_BUILDER_ITEM_CAPACITY 🔒
Set the default capacity for the array builders inside the ArrowBuilder. This is the number of items each builder can hold before it needs to allocate more memory.
INITIAL_DESCRIPTIONS_TO_MINT 🔒
The number of batch descriptions to mint ahead of the observed frontier. This determines how many batches we have in-flight at any given time.
PARQUET_FILE_PREFIX 🔒
The prefix for Parquet files written by this sink.

Functions§

add_field_ids_to_arrow_schema 🔒
Add Parquet field IDs to an Arrow schema. Iceberg requires field IDs in the Parquet metadata for schema evolution tracking. TODO: Support nested data types with proper field IDs.
build_schema_with_op_column 🔒
Build a new Arrow schema by adding an __op column to the existing schema.
commit_to_iceberg 🔒
Commit completed batches to Iceberg as snapshots. Batches are committed in timestamp order to ensure strong consistency guarantees downstream. Each snapshot includes the Materialize frontier in its metadata for resume support.
load_or_create_table 🔒
Load an existing Iceberg table or create it if it doesn’t exist.
merge_materialize_metadata_into_iceberg_schema 🔒
Merge Materialize extension metadata into Iceberg’s Arrow schema. This uses Iceberg’s data types (e.g. Utf8) and field IDs while preserving Materialize’s extension names for ArrowBuilder compatibility.
mint_batch_descriptions 🔒
Generate time-based batch boundaries for grouping writes into Iceberg snapshots. Batches are minted with configurable windows to balance write efficiency with latency. We maintain a sliding window of future batch descriptions so writers can start processing data even while earlier batches are still being written.
relation_desc_to_iceberg_schema 🔒
Convert a Materialize RelationDesc into Arrow and Iceberg schemas. Returns the Arrow schema (with field IDs) for writing Parquet files, and the Iceberg schema for table creation/validation.
reload_table 🔒
retrieve_upper_from_snapshots 🔒
Find the most recent Materialize frontier from Iceberg snapshots. We store the frontier in snapshot metadata to track where we left off after restarts. Snapshots with operation=“replace” (compactions) don’t have our metadata and are skipped. The input slice will be sorted by sequence number in descending order.
row_to_recordbatch 🔒
Convert a Materialize DiffPair into an Arrow RecordBatch with an __op column. The __op column indicates whether each row is an insert (1) or delete (-1), which the DeltaWriter uses to generate the appropriate Iceberg data/delete files.
write_data_files 🔒
Write rows into Parquet data files bounded by batch descriptions. Rows are matched to batches by timestamp; if a batch description hasn’t arrived yet, rows are stashed until it does. This allows batches to be minted ahead of data arrival.

Type Aliases§

DeltaWriterType 🔒
ParquetWriterType 🔒