Skip to main content

mz_storage/sink/
iceberg.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Iceberg sink implementation.
11//!
12//! This code renders a [`IcebergSinkConnection`] into a dataflow that writes
13//! data to an Iceberg table. `SinkRender::render_sink` hands the sink a stream
14//! of arrangement batches keyed on the sink key (the upstream arrangement's
15//! trace reader is already dropped, so the spine is free to compact as
16//! batches flow). A small `walk_sink_arrangement` operator consumes that
17//! stream and emits one `DiffPair` per `(key, timestamp)` update into the
18//! pipeline below.
19//!
20//! ```text
21//!        ┏━━━━━━━━━━━━━━┓
22//!        ┃   persist    ┃
23//!        ┃    source    ┃
24//!        ┗━━━━━━┯━━━━━━━┛
25//!               │ stream of arrangement batches (trace reader dropped)
26//!               │
27//!        ┏━━━━━━v━━━━━━━┓
28//!        ┃    walk      ┃
29//!        ┃ arrangement  ┃ yields individual DiffPairs per (key, timestamp)
30//!        ┗━━━━━━┯━━━━━━━┛
31//!               │ (Option<Row>, DiffPair<Row>) rows
32//!               │
33//!        ┏━━━━━━v━━━━━━━┓
34//!        ┃     mint     ┃ (single worker)
35//!        ┃    batch     ┃ loads/creates the Iceberg table,
36//!        ┃ descriptions ┃ determines resume upper
37//!        ┗━━━┯━━━━━┯━━━━┛
38//!            │     │ batch descriptions (broadcast)
39//!       rows │     ├─────────────────────────┐
40//!            │     │                         │
41//!        ┏━━━v━━━━━v━━━━┓    ╭─────────────╮ │
42//!        ┃    write     ┃───>│ S3 / object │ │
43//!        ┃  data files  ┃    │   storage   │ │
44//!        ┗━━━━━━┯━━━━━━━┛    ╰─────────────╯ │
45//!               │ file metadata              │
46//!               │                            │
47//!        ┏━━━━━━v━━━━━━━━━━━━━━━━━━━━━━━━━━━━v┓
48//!        ┃           commit to                ┃ (single worker)
49//!        ┃             iceberg                ┃
50//!        ┗━━━━━━━━━━━━━┯━━━━━━━━━━━━━━━━━━━━━━┛
51//!                      │
52//!              ╭───────v───────╮
53//!              │ Iceberg table │
54//!              │  (snapshots)  │
55//!              ╰───────────────╯
56//! ```
57//! # Minting batch descriptions
58//! The "mint batch descriptions" operator is responsible for generating
59//! time-based batch boundaries that group writes into Iceberg snapshots.
60//! It maintains a sliding window of future batch descriptions so that
61//! writers can start processing data even while earlier batches are still being written.
62//! Knowing the batch boundaries ahead of time is important because we need to
63//! be able to make the claim that all data files written for a given batch
64//! include all data up to the upper `t` but not beyond it.
65//! This could be trivially achieved by waiting for all data to arrive up to a certain
66//! frontier, but that would prevent us from streaming writes out to object storage
67//! until the entire batch is complete, which would increase latency and reduce throughput.
68//!
69//! # Writing data files
70//! The "write data files" operator receives rows along with batch descriptions.
71//! It matches rows to batches by timestamp; if a batch description hasn't arrived yet,
72//! rows are stashed until it does. This allows batches to be minted ahead of data arrival.
73//! The operator uses an Iceberg `DeltaWriter` to write Parquet data files
74//! (and position delete files if necessary) to object storage.
75//! It outputs metadata about the written files along with their batch descriptions
76//! for the commit operator to consume.
77//!
78//! # Committing to Iceberg
79//! The "commit to iceberg" operator receives metadata about written data files
80//! along with their batch descriptions. It groups files by batch and creates
81//! Iceberg snapshots that include all files for each batch. It updates the Iceberg
82//! table's metadata to reflect the new snapshots, including updating the
83//! `mz-frontier` property to track progress.
84
85use std::cmp::Ordering;
86use std::collections::{BTreeMap, VecDeque};
87use std::convert::Infallible;
88use std::future::Future;
89use std::time::Instant;
90use std::{cell::RefCell, rc::Rc, sync::Arc};
91
92use anyhow::{Context, anyhow};
93use arrow::array::{ArrayRef, Int32Array, Int64Array, RecordBatch};
94use arrow::datatypes::{DataType, Field, Schema as ArrowSchema};
95use differential_dataflow::lattice::Lattice;
96use differential_dataflow::{AsCollection, Hashable, VecCollection};
97use futures::StreamExt;
98use iceberg::ErrorKind;
99use iceberg::arrow::{arrow_schema_to_schema, schema_to_arrow_schema};
100use iceberg::spec::{
101    DataFile, FormatVersion, Snapshot, StructType, read_data_files_from_avro,
102    write_data_files_to_avro,
103};
104use iceberg::spec::{Schema, SchemaRef};
105use iceberg::table::Table;
106use iceberg::transaction::{ApplyTransactionAction, Transaction};
107use iceberg::writer::base_writer::data_file_writer::DataFileWriterBuilder;
108use iceberg::writer::base_writer::equality_delete_writer::{
109    EqualityDeleteFileWriterBuilder, EqualityDeleteWriterConfig,
110};
111use iceberg::writer::base_writer::position_delete_writer::{
112    PositionDeleteFileWriterBuilder, PositionDeleteWriterConfig,
113};
114use iceberg::writer::combined_writer::delta_writer::DeltaWriterBuilder;
115use iceberg::writer::file_writer::ParquetWriterBuilder;
116use iceberg::writer::file_writer::location_generator::{
117    DefaultFileNameGenerator, DefaultLocationGenerator,
118};
119use iceberg::writer::file_writer::rolling_writer::RollingFileWriterBuilder;
120use iceberg::writer::{IcebergWriter, IcebergWriterBuilder};
121use iceberg::{Catalog, NamespaceIdent, TableCreation, TableIdent};
122use itertools::Itertools;
123use mz_arrow_util::builder::{ARROW_EXTENSION_NAME_KEY, ArrowBuilder};
124use mz_interchange::avro::DiffPair;
125use mz_interchange::envelopes::for_each_diff_pair;
126use mz_ore::cast::CastFrom;
127use mz_ore::error::ErrorExt;
128use mz_ore::future::InTask;
129use mz_ore::result::ResultExt;
130use mz_ore::retry::{Retry, RetryResult};
131use mz_persist_client::Diagnostics;
132use mz_persist_client::write::WriteHandle;
133use mz_persist_types::codec_impls::UnitSchema;
134use mz_repr::{Diff, GlobalId, Row, Timestamp};
135use mz_storage_types::StorageDiff;
136use mz_storage_types::configuration::StorageConfiguration;
137use mz_storage_types::controller::CollectionMetadata;
138use mz_storage_types::errors::DataflowError;
139use mz_storage_types::sinks::{
140    IcebergSinkConnection, SinkEnvelope, StorageSinkDesc, iceberg_type_overrides,
141};
142use mz_storage_types::sources::SourceData;
143use mz_timely_util::antichain::AntichainExt;
144use mz_timely_util::builder_async::{Event, OperatorBuilder, PressOnDropButton};
145use parquet::arrow::PARQUET_FIELD_ID_META_KEY;
146use parquet::file::properties::WriterProperties;
147use serde::{Deserialize, Serialize};
148use timely::PartialOrder;
149use timely::container::CapacityContainerBuilder;
150use timely::dataflow::StreamVec;
151use timely::dataflow::channels::pact::{Exchange, Pipeline};
152use timely::dataflow::operators::vec::{Broadcast, Map, ToStream};
153use timely::dataflow::operators::{CapabilitySet, Concatenate};
154use timely::progress::{Antichain, Timestamp as _};
155use tracing::debug;
156
157use crate::healthcheck::{HealthStatusMessage, HealthStatusUpdate, StatusNamespace};
158use crate::metrics::sink::iceberg::IcebergSinkMetrics;
159use crate::render::sinks::{PkViolationWarner, SinkBatchStream, SinkRender};
160use crate::statistics::SinkStatistics;
161use crate::storage_state::StorageState;
162
163/// Set the default capacity for the array builders inside the ArrowBuilder. This is the
164/// number of items each builder can hold before it needs to allocate more memory.
165const DEFAULT_ARRAY_BUILDER_ITEM_CAPACITY: usize = 1024;
166/// Set the default buffer capacity for the string and binary array builders inside the
167/// ArrowBuilder. This is the number of bytes each builder can hold before it needs to allocate
168/// more memory.
169const DEFAULT_ARRAY_BUILDER_DATA_CAPACITY: usize = 1024;
170
171/// The prefix for Parquet files written by this sink.
172const PARQUET_FILE_PREFIX: &str = "mz_data";
173/// The number of batch descriptions to mint ahead of the observed frontier. This determines how
174/// many batches we have in-flight at any given time.
175const INITIAL_DESCRIPTIONS_TO_MINT: u64 = 3;
176
177/// Shared state produced by the async setup in [`write_data_files`] that both
178/// envelope handlers need to construct Parquet writers.
179struct WriterContext {
180    /// Arrow schema for data columns, with Materialize extension metadata merged in.
181    arrow_schema: Arc<ArrowSchema>,
182    /// Iceberg table schema, used to configure Parquet writers.
183    current_schema: Arc<Schema>,
184    /// File I/O for writing Parquet files to object storage.
185    file_io: iceberg::io::FileIO,
186    /// Generates file paths under the table's data directory.
187    location_generator: DefaultLocationGenerator,
188    /// Generates unique file names with a per-worker UUID suffix.
189    file_name_generator: DefaultFileNameGenerator,
190    writer_properties: WriterProperties,
191}
192
193/// Envelope-specific logic for writing Iceberg data files.
194trait EnvelopeHandler: Send {
195    /// Construct from the shared writer context after async setup completes.
196    fn new(
197        ctx: WriterContext,
198        connection: &IcebergSinkConnection,
199        materialize_arrow_schema: &Arc<ArrowSchema>,
200    ) -> anyhow::Result<Self>
201    where
202        Self: Sized;
203
204    /// Create an [`IcebergWriter`] for a new batch.
205    ///
206    /// `is_snapshot` is true for the initial "snapshot" batch (lower == as_of), which
207    /// contains all pre-existing data and can be very large. Implementations may use
208    /// this to disable memory-intensive optimisations like seen-rows deduplication.
209    async fn create_writer(&self, is_snapshot: bool) -> anyhow::Result<Box<dyn IcebergWriter>>;
210
211    fn row_to_batch(&self, diff_pair: DiffPair<Row>, ts: Timestamp) -> anyhow::Result<RecordBatch>;
212}
213
214struct UpsertEnvelopeHandler {
215    ctx: WriterContext,
216    /// Iceberg field IDs of the key columns, used for equality delete files.
217    equality_ids: Vec<i32>,
218    /// Iceberg schema for position delete files.
219    pos_schema: Arc<Schema>,
220    /// Iceberg schema for equality delete files (projected to key columns only).
221    eq_schema: Arc<Schema>,
222    /// Configuration for the equality delete writer (projected schema + column IDs).
223    eq_config: EqualityDeleteWriterConfig,
224    /// Arrow schema with an appended `__op` column that the
225    /// [`DeltaWriter`](iceberg::writer::combined_writer::delta_writer::DeltaWriter)
226    /// uses to distinguish inserts (+1) from deletes (-1).
227    schema_with_op: Arc<ArrowSchema>,
228}
229
230impl EnvelopeHandler for UpsertEnvelopeHandler {
231    fn new(
232        ctx: WriterContext,
233        connection: &IcebergSinkConnection,
234        materialize_arrow_schema: &Arc<ArrowSchema>,
235    ) -> anyhow::Result<Self> {
236        let Some((_, equality_indices)) = &connection.key_desc_and_indices else {
237            return Err(anyhow::anyhow!(
238                "Iceberg sink requires key columns for equality deletes"
239            ));
240        };
241
242        let equality_ids = equality_ids_for_indices(
243            ctx.current_schema.as_ref(),
244            materialize_arrow_schema.as_ref(),
245            equality_indices,
246        )?;
247
248        let pos_arrow_schema = PositionDeleteWriterConfig::arrow_schema();
249        let pos_schema = Arc::new(
250            arrow_schema_to_schema(&pos_arrow_schema)
251                .context("Failed to convert position delete Arrow schema to Iceberg schema")?,
252        );
253
254        let eq_config =
255            EqualityDeleteWriterConfig::new(equality_ids.clone(), Arc::clone(&ctx.current_schema))
256                .context("Failed to create EqualityDeleteWriterConfig")?;
257        let eq_schema = Arc::new(
258            arrow_schema_to_schema(eq_config.projected_arrow_schema_ref())
259                .context("Failed to convert equality delete Arrow schema to Iceberg schema")?,
260        );
261
262        let schema_with_op = Arc::new(build_schema_with_op_column(&ctx.arrow_schema));
263
264        Ok(Self {
265            ctx,
266            equality_ids,
267            pos_schema,
268            eq_schema,
269            eq_config,
270            schema_with_op,
271        })
272    }
273
274    async fn create_writer(&self, is_snapshot: bool) -> anyhow::Result<Box<dyn IcebergWriter>> {
275        let data_parquet_writer = ParquetWriterBuilder::new(
276            self.ctx.writer_properties.clone(),
277            Arc::clone(&self.ctx.current_schema),
278        )
279        .with_arrow_schema(Arc::clone(&self.ctx.arrow_schema))
280        .context("Arrow schema validation failed")?;
281        let data_rolling_writer = RollingFileWriterBuilder::new_with_default_file_size(
282            data_parquet_writer,
283            Arc::clone(&self.ctx.current_schema),
284            self.ctx.file_io.clone(),
285            self.ctx.location_generator.clone(),
286            self.ctx.file_name_generator.clone(),
287        );
288        let data_writer_builder = DataFileWriterBuilder::new(data_rolling_writer);
289
290        let pos_config = PositionDeleteWriterConfig::new(None, 0, None);
291        let pos_parquet_writer = ParquetWriterBuilder::new(
292            self.ctx.writer_properties.clone(),
293            Arc::clone(&self.pos_schema),
294        );
295        let pos_rolling_writer = RollingFileWriterBuilder::new_with_default_file_size(
296            pos_parquet_writer,
297            Arc::clone(&self.ctx.current_schema),
298            self.ctx.file_io.clone(),
299            self.ctx.location_generator.clone(),
300            self.ctx.file_name_generator.clone(),
301        );
302        let pos_delete_writer_builder =
303            PositionDeleteFileWriterBuilder::new(pos_rolling_writer, pos_config);
304
305        let eq_parquet_writer = ParquetWriterBuilder::new(
306            self.ctx.writer_properties.clone(),
307            Arc::clone(&self.eq_schema),
308        );
309        let eq_rolling_writer = RollingFileWriterBuilder::new_with_default_file_size(
310            eq_parquet_writer,
311            Arc::clone(&self.ctx.current_schema),
312            self.ctx.file_io.clone(),
313            self.ctx.location_generator.clone(),
314            self.ctx.file_name_generator.clone(),
315        );
316        let eq_delete_writer_builder =
317            EqualityDeleteFileWriterBuilder::new(eq_rolling_writer, self.eq_config.clone());
318
319        let mut builder = DeltaWriterBuilder::new(
320            data_writer_builder,
321            pos_delete_writer_builder,
322            eq_delete_writer_builder,
323            self.equality_ids.clone(),
324        );
325
326        builder = if is_snapshot {
327            // Snapshot batches only produce inserts, so disable seen_rows tracking to save memory.
328            builder.with_max_seen_rows(0)
329        } else {
330            // For incremental batches, keep all "seen" rows. Do not evict any rows.
331            // The DeltaWriter issues an equality delete if we update (or delete) a row outside the "seen" cache.
332            // But equality deletes only apply to prior snapshots (lower sequence number).
333            //
334            // i.e. The DeltaWriter assumes that rows outside the "seen" cache come from prior snapshots.
335            //
336            // If we insert a row a=foo during this snapshot and then evict it from the "seen" cache,
337            // a subsequent update a=bar (also during this snapshot) will lead to:
338            //   1. Equality delete for a=foo (does nothing because a=foo is from this snapshot, not a prior snapshot)
339            //   2. Insert a=bar
340            // Because the deletion does nothing, we have a=foo and a=bar in the same snapshot.
341            builder.with_max_seen_rows(usize::MAX)
342        };
343
344        Ok(Box::new(
345            builder
346                .build(None)
347                .await
348                .context("Failed to create DeltaWriter")?,
349        ))
350    }
351
352    /// The `__op` column indicates whether each row is an insert (+1) or delete (-1),
353    /// which the DeltaWriter uses to generate the appropriate Iceberg data/delete files.
354    fn row_to_batch(
355        &self,
356        diff_pair: DiffPair<Row>,
357        _ts: Timestamp,
358    ) -> anyhow::Result<RecordBatch> {
359        let mut builder = ArrowBuilder::new_with_schema(
360            Arc::clone(&self.ctx.arrow_schema),
361            DEFAULT_ARRAY_BUILDER_ITEM_CAPACITY,
362            DEFAULT_ARRAY_BUILDER_DATA_CAPACITY,
363        )
364        .context("Failed to create builder")?;
365
366        let mut op_values = Vec::new();
367
368        if let Some(before) = diff_pair.before {
369            builder
370                .add_row(&before)
371                .context("Failed to add delete row to builder")?;
372            op_values.push(-1i32);
373        }
374        if let Some(after) = diff_pair.after {
375            builder
376                .add_row(&after)
377                .context("Failed to add insert row to builder")?;
378            op_values.push(1i32);
379        }
380
381        let batch = builder
382            .to_record_batch()
383            .context("Failed to create record batch")?;
384
385        let mut columns: Vec<ArrayRef> = batch.columns().to_vec();
386        columns.push(Arc::new(Int32Array::from(op_values)));
387
388        RecordBatch::try_new(Arc::clone(&self.schema_with_op), columns)
389            .context("Failed to create batch with op column")
390    }
391}
392
393struct AppendEnvelopeHandler {
394    ctx: WriterContext,
395    /// Arrow schema with only user columns (no `_mz_diff`/`_mz_timestamp`), used by
396    /// [`ArrowBuilder`] to serialize row data before the extra columns are appended.
397    user_schema_for_append: Arc<ArrowSchema>,
398}
399
400impl EnvelopeHandler for AppendEnvelopeHandler {
401    fn new(
402        ctx: WriterContext,
403        _connection: &IcebergSinkConnection,
404        _materialize_arrow_schema: &Arc<ArrowSchema>,
405    ) -> anyhow::Result<Self> {
406        // arrow_schema already includes _mz_diff + _mz_timestamp (added in render_sink); strip
407        // the last two fields so ArrowBuilder only processes the user columns.
408        let n = ctx.arrow_schema.fields().len().saturating_sub(2);
409        let user_schema_for_append =
410            Arc::new(ArrowSchema::new(ctx.arrow_schema.fields()[..n].to_vec()));
411
412        Ok(Self {
413            ctx,
414            user_schema_for_append,
415        })
416    }
417
418    async fn create_writer(&self, _is_snapshot: bool) -> anyhow::Result<Box<dyn IcebergWriter>> {
419        let data_parquet_writer = ParquetWriterBuilder::new(
420            self.ctx.writer_properties.clone(),
421            Arc::clone(&self.ctx.current_schema),
422        )
423        .with_arrow_schema(Arc::clone(&self.ctx.arrow_schema))
424        .context("Arrow schema validation failed")?;
425        let data_rolling_writer = RollingFileWriterBuilder::new_with_default_file_size(
426            data_parquet_writer,
427            Arc::clone(&self.ctx.current_schema),
428            self.ctx.file_io.clone(),
429            self.ctx.location_generator.clone(),
430            self.ctx.file_name_generator.clone(),
431        );
432        Ok(Box::new(
433            DataFileWriterBuilder::new(data_rolling_writer)
434                .build(None)
435                .await
436                .context("Failed to create DataFileWriter")?,
437        ))
438    }
439
440    /// Every change is written as a plain data row: the `before` half (if present) gets
441    /// `_mz_diff = -1` and the `after` half gets `_mz_diff = +1`. Both carry the same `_mz_timestamp`.
442    fn row_to_batch(&self, diff_pair: DiffPair<Row>, ts: Timestamp) -> anyhow::Result<RecordBatch> {
443        let mut builder = ArrowBuilder::new_with_schema(
444            Arc::clone(&self.user_schema_for_append),
445            DEFAULT_ARRAY_BUILDER_ITEM_CAPACITY,
446            DEFAULT_ARRAY_BUILDER_DATA_CAPACITY,
447        )
448        .context("Failed to create builder")?;
449
450        let mut diff_values: Vec<i32> = Vec::new();
451        let ts_i64 = i64::try_from(u64::from(ts)).unwrap_or(i64::MAX);
452
453        if let Some(before) = diff_pair.before {
454            builder
455                .add_row(&before)
456                .context("Failed to add before row to builder")?;
457            diff_values.push(-1i32);
458        }
459        if let Some(after) = diff_pair.after {
460            builder
461                .add_row(&after)
462                .context("Failed to add after row to builder")?;
463            diff_values.push(1i32);
464        }
465
466        let n = diff_values.len();
467        let batch = builder
468            .to_record_batch()
469            .context("Failed to create record batch")?;
470
471        let mut columns: Vec<ArrayRef> = batch.columns().to_vec();
472        columns.push(Arc::new(Int32Array::from(diff_values)));
473        columns.push(Arc::new(Int64Array::from(vec![ts_i64; n])));
474
475        RecordBatch::try_new(Arc::clone(&self.ctx.arrow_schema), columns)
476            .context("Failed to create append record batch")
477    }
478}
479
480/// Add Parquet field IDs to an Arrow schema. Iceberg requires field IDs in the
481/// Parquet metadata for schema evolution tracking. Field IDs are assigned
482/// recursively to all nested fields (structs, lists, maps) using a depth-first,
483/// pre-order traversal.
484fn add_field_ids_to_arrow_schema(schema: ArrowSchema) -> ArrowSchema {
485    let mut next_field_id = 1i32;
486    let fields: Vec<Field> = schema
487        .fields()
488        .iter()
489        .map(|field| add_field_ids_recursive(field, &mut next_field_id))
490        .collect();
491    ArrowSchema::new(fields).with_metadata(schema.metadata().clone())
492}
493
494/// Recursively add field IDs to a field and all its nested children.
495fn add_field_ids_recursive(field: &Field, next_id: &mut i32) -> Field {
496    let current_id = *next_id;
497    *next_id += 1;
498
499    let mut metadata = field.metadata().clone();
500    metadata.insert(
501        PARQUET_FIELD_ID_META_KEY.to_string(),
502        current_id.to_string(),
503    );
504
505    let new_data_type = add_field_ids_to_datatype(field.data_type(), next_id);
506
507    Field::new(field.name(), new_data_type, field.is_nullable()).with_metadata(metadata)
508}
509
510/// Add field IDs to nested fields within a DataType.
511fn add_field_ids_to_datatype(data_type: &DataType, next_id: &mut i32) -> DataType {
512    match data_type {
513        DataType::Struct(fields) => {
514            let new_fields: Vec<Field> = fields
515                .iter()
516                .map(|f| add_field_ids_recursive(f, next_id))
517                .collect();
518            DataType::Struct(new_fields.into())
519        }
520        DataType::List(element_field) => {
521            let new_element = add_field_ids_recursive(element_field, next_id);
522            DataType::List(Arc::new(new_element))
523        }
524        DataType::LargeList(element_field) => {
525            let new_element = add_field_ids_recursive(element_field, next_id);
526            DataType::LargeList(Arc::new(new_element))
527        }
528        DataType::Map(entries_field, sorted) => {
529            let new_entries = add_field_ids_recursive(entries_field, next_id);
530            DataType::Map(Arc::new(new_entries), *sorted)
531        }
532        _ => data_type.clone(),
533    }
534}
535
536/// Merge Materialize extension metadata into Iceberg's Arrow schema.
537/// This uses Iceberg's data types (e.g. Utf8) and field IDs while preserving
538/// Materialize's extension names for ArrowBuilder compatibility.
539/// Handles nested types (structs, lists, maps) recursively.
540fn merge_materialize_metadata_into_iceberg_schema(
541    materialize_arrow_schema: &ArrowSchema,
542    iceberg_schema: &Schema,
543) -> anyhow::Result<ArrowSchema> {
544    // First, convert Iceberg schema to Arrow (this gives us the correct data types)
545    let iceberg_arrow_schema = schema_to_arrow_schema(iceberg_schema)
546        .context("Failed to convert Iceberg schema to Arrow schema")?;
547
548    // Now merge in the Materialize extension metadata
549    let fields: Vec<Field> = iceberg_arrow_schema
550        .fields()
551        .iter()
552        .map(|iceberg_field| {
553            // Find the corresponding Materialize field by name to get extension metadata
554            let mz_field = materialize_arrow_schema
555                .field_with_name(iceberg_field.name())
556                .with_context(|| {
557                    format!(
558                        "Field '{}' not found in Materialize schema",
559                        iceberg_field.name()
560                    )
561                })?;
562
563            merge_field_metadata_recursive(iceberg_field, Some(mz_field))
564        })
565        .collect::<anyhow::Result<Vec<_>>>()?;
566
567    Ok(ArrowSchema::new(fields).with_metadata(iceberg_arrow_schema.metadata().clone()))
568}
569
570/// Recursively merge Materialize extension metadata into an Iceberg field.
571fn merge_field_metadata_recursive(
572    iceberg_field: &Field,
573    mz_field: Option<&Field>,
574) -> anyhow::Result<Field> {
575    // Start with Iceberg field's metadata (which includes field IDs)
576    let mut metadata = iceberg_field.metadata().clone();
577
578    // Add Materialize extension name if available
579    if let Some(mz_f) = mz_field {
580        if let Some(extension_name) = mz_f.metadata().get(ARROW_EXTENSION_NAME_KEY) {
581            metadata.insert(ARROW_EXTENSION_NAME_KEY.to_string(), extension_name.clone());
582        }
583    }
584
585    // Recursively process nested types
586    let new_data_type = match iceberg_field.data_type() {
587        DataType::Struct(iceberg_fields) => {
588            let mz_struct_fields = match mz_field {
589                Some(f) => match f.data_type() {
590                    DataType::Struct(fields) => Some(fields),
591                    other => anyhow::bail!(
592                        "Type mismatch for field '{}': Iceberg schema has Struct, but Materialize schema has {:?}",
593                        iceberg_field.name(),
594                        other
595                    ),
596                },
597                None => None,
598            };
599
600            let new_fields: Vec<Field> = iceberg_fields
601                .iter()
602                .map(|iceberg_inner| {
603                    let mz_inner = mz_struct_fields.and_then(|fields| {
604                        fields.iter().find(|f| f.name() == iceberg_inner.name())
605                    });
606                    merge_field_metadata_recursive(iceberg_inner, mz_inner.map(|f| f.as_ref()))
607                })
608                .collect::<anyhow::Result<Vec<_>>>()?;
609
610            DataType::Struct(new_fields.into())
611        }
612        DataType::List(iceberg_element) => {
613            let mz_element = match mz_field {
614                Some(f) => match f.data_type() {
615                    DataType::List(element) => Some(element.as_ref()),
616                    other => anyhow::bail!(
617                        "Type mismatch for field '{}': Iceberg schema has List, but Materialize schema has {:?}",
618                        iceberg_field.name(),
619                        other
620                    ),
621                },
622                None => None,
623            };
624            let new_element = merge_field_metadata_recursive(iceberg_element, mz_element)?;
625            DataType::List(Arc::new(new_element))
626        }
627        DataType::LargeList(iceberg_element) => {
628            let mz_element = match mz_field {
629                Some(f) => match f.data_type() {
630                    DataType::LargeList(element) => Some(element.as_ref()),
631                    other => anyhow::bail!(
632                        "Type mismatch for field '{}': Iceberg schema has LargeList, but Materialize schema has {:?}",
633                        iceberg_field.name(),
634                        other
635                    ),
636                },
637                None => None,
638            };
639            let new_element = merge_field_metadata_recursive(iceberg_element, mz_element)?;
640            DataType::LargeList(Arc::new(new_element))
641        }
642        DataType::Map(iceberg_entries, sorted) => {
643            let mz_entries = match mz_field {
644                Some(f) => match f.data_type() {
645                    DataType::Map(entries, _) => Some(entries.as_ref()),
646                    other => anyhow::bail!(
647                        "Type mismatch for field '{}': Iceberg schema has Map, but Materialize schema has {:?}",
648                        iceberg_field.name(),
649                        other
650                    ),
651                },
652                None => None,
653            };
654            // The Iceberg arrow representation names map fields differently from
655            // Materialize (`key_value`/`key`/`value` vs `entries`/`keys`/`values`),
656            // so name-based matching on the entries struct would drop the value
657            // field's extension metadata. Merge the entries struct positionally.
658            let new_entries = match mz_entries {
659                Some(mz_entries) => merge_map_entries_metadata(iceberg_entries, mz_entries)?,
660                None => iceberg_entries.as_ref().clone(),
661            };
662            DataType::Map(Arc::new(new_entries), *sorted)
663        }
664        other => other.clone(),
665    };
666
667    Ok(Field::new(
668        iceberg_field.name(),
669        new_data_type,
670        iceberg_field.is_nullable(),
671    )
672    .with_metadata(metadata))
673}
674
675/// Merge metadata into a Map's entries struct, matching key/value positionally.
676///
677/// Iceberg's arrow representation names map fields `key_value`/`key`/`value`,
678/// while Materialize uses `entries`/`keys`/`values`. Name-based matching would
679/// drop the materialize extension metadata for the value field, which then
680/// causes `ArrowBuilder` to fail with "Field 'value' missing extension metadata".
681///
682/// Positional matching is safe because the Arrow spec defines Map structurally,
683/// not by field name: `List<entries: Struct<key: K, value: V>>` with exactly
684/// two struct children — key first, value second — and the names are only
685/// conventional. See `Map` in apache/arrow `format/Schema.fbs`:
686/// <https://github.com/apache/arrow/blob/main/format/Schema.fbs> — "The names
687/// of the child fields may be respectively 'entries', 'key', and 'value', but
688/// this is not enforced."
689///
690/// Future cleanup: we could instead align Materialize's arrow map field names
691/// with the Parquet/Iceberg convention (`key_value`/`key`/`value`) in
692/// `mz_arrow_util::builder::scalar_to_arrow_datatype_impl` and drop this
693/// positional helper. That would also affect `COPY TO S3 ... FORMAT = 'parquet'`
694/// output schemas, so we'd need to confirm no downstream consumers depend on
695/// the current `entries`/`keys`/`values` names before flipping.
696fn merge_map_entries_metadata(
697    iceberg_entries: &Field,
698    mz_entries: &Field,
699) -> anyhow::Result<Field> {
700    let mut metadata = iceberg_entries.metadata().clone();
701    if let Some(extension_name) = mz_entries.metadata().get(ARROW_EXTENSION_NAME_KEY) {
702        metadata.insert(ARROW_EXTENSION_NAME_KEY.to_string(), extension_name.clone());
703    }
704
705    let iceberg_fields = match iceberg_entries.data_type() {
706        DataType::Struct(fields) => fields,
707        other => anyhow::bail!(
708            "Iceberg map entries field '{}' is not a Struct: {:?}",
709            iceberg_entries.name(),
710            other
711        ),
712    };
713    let mz_fields = match mz_entries.data_type() {
714        DataType::Struct(fields) => fields,
715        other => anyhow::bail!(
716            "Materialize map entries field '{}' is not a Struct: {:?}",
717            mz_entries.name(),
718            other
719        ),
720    };
721
722    let new_fields: Vec<Field> = iceberg_fields
723        .iter()
724        .enumerate()
725        .map(|(idx, iceberg_inner)| {
726            let mz_inner = mz_fields.get(idx).map(|f| f.as_ref());
727            merge_field_metadata_recursive(iceberg_inner, mz_inner)
728        })
729        .collect::<anyhow::Result<Vec<_>>>()?;
730
731    Ok(Field::new(
732        iceberg_entries.name(),
733        DataType::Struct(new_fields.into()),
734        iceberg_entries.is_nullable(),
735    )
736    .with_metadata(metadata))
737}
738
739async fn reload_table(
740    catalog: &dyn Catalog,
741    namespace: String,
742    table_name: String,
743    current_table: Table,
744) -> anyhow::Result<Table> {
745    let namespace_ident = NamespaceIdent::new(namespace.clone());
746    let table_ident = TableIdent::new(namespace_ident, table_name.clone());
747    let current_schema = current_table.metadata().current_schema_id();
748    let current_partition_spec = current_table.metadata().default_partition_spec_id();
749
750    match catalog.load_table(&table_ident).await {
751        Ok(table) => {
752            let reloaded_schema = table.metadata().current_schema_id();
753            let reloaded_partition_spec = table.metadata().default_partition_spec_id();
754            if reloaded_schema != current_schema {
755                return Err(anyhow::anyhow!(
756                    "Iceberg table '{}' schema changed during operation but schema evolution isn't supported, expected schema ID {}, got {}",
757                    table_name,
758                    current_schema,
759                    reloaded_schema
760                ));
761            }
762
763            if reloaded_partition_spec != current_partition_spec {
764                return Err(anyhow::anyhow!(
765                    "Iceberg table '{}' partition spec changed during operation but partition spec evolution isn't supported, expected partition spec ID {}, got {}",
766                    table_name,
767                    current_partition_spec,
768                    reloaded_partition_spec
769                ));
770            }
771
772            Ok(table)
773        }
774        Err(err) => Err(err).context("Failed to reload Iceberg table"),
775    }
776}
777
778/// Attempt a single commit of a batch of data files to an Iceberg table.
779/// On conflict or failure, reloads the table and returns a retryable error.
780/// On success, returns the updated table state.
781async fn try_commit_batch(
782    mut table: Table,
783    snapshot_properties: Vec<(String, String)>,
784    data_files: Vec<DataFile>,
785    delete_files: Vec<DataFile>,
786    catalog: &dyn Catalog,
787    conn_namespace: &str,
788    conn_table: &str,
789    sink_version: u64,
790    frontier: &Antichain<Timestamp>,
791    batch_lower: &Antichain<Timestamp>,
792    batch_upper: &Antichain<Timestamp>,
793    metrics: &IcebergSinkMetrics,
794) -> (Table, RetryResult<(), anyhow::Error>) {
795    let tx = Transaction::new(&table);
796    let mut action = tx
797        .row_delta()
798        .set_snapshot_properties(snapshot_properties.into_iter().collect())
799        .with_check_duplicate(false);
800
801    if !data_files.is_empty() || !delete_files.is_empty() {
802        action = action
803            .add_data_files(data_files)
804            .add_delete_files(delete_files);
805    }
806
807    let tx = match action
808        .apply(tx)
809        .context("Failed to apply data file addition to iceberg table transaction")
810    {
811        Ok(tx) => tx,
812        Err(e) => {
813            match reload_table(
814                catalog,
815                conn_namespace.to_string(),
816                conn_table.to_string(),
817                table.clone(),
818            )
819            .await
820            {
821                Ok(reloaded) => table = reloaded,
822                Err(reload_err) => {
823                    return (table, RetryResult::RetryableErr(anyhow!(reload_err)));
824                }
825            }
826            return (
827                table,
828                RetryResult::RetryableErr(anyhow!(
829                    "Failed to apply data file addition to iceberg table transaction: {}",
830                    e
831                )),
832            );
833        }
834    };
835
836    let new_table = tx.commit(catalog).await;
837    match new_table {
838        Err(e) if matches!(e.kind(), ErrorKind::CatalogCommitConflicts) => {
839            metrics.commit_conflicts.inc();
840            match reload_table(
841                catalog,
842                conn_namespace.to_string(),
843                conn_table.to_string(),
844                table.clone(),
845            )
846            .await
847            {
848                Ok(reloaded) => table = reloaded,
849                Err(e) => {
850                    return (table, RetryResult::RetryableErr(anyhow!(e)));
851                }
852            };
853
854            let mut snapshots: Vec<_> = table.metadata().snapshots().cloned().collect();
855            let last = retrieve_upper_from_snapshots(&mut snapshots);
856            let last = match last {
857                Ok(val) => val,
858                Err(e) => {
859                    return (table, RetryResult::RetryableErr(anyhow!(e)));
860                }
861            };
862
863            // Check if another writer has advanced the frontier beyond ours (fencing check)
864            if let Some((last_frontier, last_version)) = last {
865                if last_version > sink_version {
866                    return (
867                        table,
868                        RetryResult::FatalErr(anyhow!(
869                            "Iceberg table '{}' has been modified by another writer \
870                             with version {}. Current sink version: {}. \
871                             Frontiers may be out of sync, aborting to avoid data loss.",
872                            conn_table,
873                            last_version,
874                            sink_version,
875                        )),
876                    );
877                }
878                if PartialOrder::less_equal(frontier, &last_frontier) {
879                    return (
880                        table,
881                        RetryResult::FatalErr(anyhow!(
882                            "Iceberg table '{}' has been modified by another writer. \
883                             Current frontier: {:?}, last frontier: {:?}.",
884                            conn_table,
885                            frontier,
886                            last_frontier,
887                        )),
888                    );
889                }
890            }
891
892            (
893                table,
894                RetryResult::RetryableErr(anyhow!(
895                    "Commit conflict detected when committing batch [{}, {}) \
896                     to Iceberg table '{}.{}'. Retrying...",
897                    batch_lower.pretty(),
898                    batch_upper.pretty(),
899                    conn_namespace,
900                    conn_table
901                )),
902            )
903        }
904        Err(e) => {
905            metrics.commit_failures.inc();
906            (table, RetryResult::RetryableErr(anyhow!(e)))
907        }
908        Ok(new_table) => (new_table, RetryResult::Ok(())),
909    }
910}
911
912/// Load an existing Iceberg table or create it if it doesn't exist.
913async fn load_or_create_table(
914    catalog: &dyn Catalog,
915    namespace: String,
916    table_name: String,
917    schema: &Schema,
918) -> anyhow::Result<iceberg::table::Table> {
919    let namespace_ident = NamespaceIdent::new(namespace.clone());
920    let table_ident = TableIdent::new(namespace_ident.clone(), table_name.clone());
921
922    // Try to load the table first
923    match catalog.load_table(&table_ident).await {
924        Ok(table) => {
925            // Table exists, return it
926            // TODO: Add proper schema evolution/validation to ensure compatibility
927            Ok(table)
928        }
929        Err(err) => {
930            if matches!(err.kind(), ErrorKind::TableNotFound { .. })
931                || err
932                    .message()
933                    .contains("Tried to load a table that does not exist")
934            {
935                // Table doesn't exist, create it
936                // Note: location is not specified, letting the catalog determine the default location
937                // based on its warehouse configuration
938                let table_creation = TableCreation::builder()
939                    .name(table_name.clone())
940                    .schema(schema.clone())
941                    // Use unpartitioned spec by default
942                    // TODO: Consider making partition spec configurable
943                    // .partition_spec(UnboundPartitionSpec::builder().build())
944                    .build();
945
946                catalog
947                    .create_table(&namespace_ident, table_creation)
948                    .await
949                    .with_context(|| {
950                        format!(
951                            "Failed to create Iceberg table '{}' in namespace '{}'",
952                            table_name, namespace
953                        )
954                    })
955            } else {
956                // Some other error occurred
957                Err(err).context("Failed to load Iceberg table")
958            }
959        }
960    }
961}
962
963/// Find the most recent Materialize frontier from Iceberg snapshots.
964/// We store the frontier in snapshot metadata to track where we left off after restarts.
965/// Snapshots with operation="replace" (compactions) don't have our metadata and are skipped.
966/// The input slice will be sorted by sequence number in descending order.
967fn retrieve_upper_from_snapshots(
968    snapshots: &mut [Arc<Snapshot>],
969) -> anyhow::Result<Option<(Antichain<Timestamp>, u64)>> {
970    snapshots.sort_by(|a, b| Ord::cmp(&b.sequence_number(), &a.sequence_number()));
971
972    for snapshot in snapshots {
973        let props = &snapshot.summary().additional_properties;
974        if let (Some(frontier_json), Some(sink_version_str)) =
975            (props.get("mz-frontier"), props.get("mz-sink-version"))
976        {
977            let frontier: Vec<Timestamp> = serde_json::from_str(frontier_json)
978                .context("Failed to deserialize frontier from snapshot properties")?;
979            let frontier = Antichain::from_iter(frontier);
980
981            let sink_version = sink_version_str
982                .parse::<u64>()
983                .context("Failed to parse mz-sink-version from snapshot properties")?;
984
985            return Ok(Some((frontier, sink_version)));
986        }
987        if snapshot.summary().operation.as_str() != "replace" {
988            // This is a bad heuristic, but we have no real other way to identify compactions
989            // right now other than assume they will be the only operation writing "replace" operations.
990            // That means if we find a snapshot with some other operation, but no mz-frontier, we are in an
991            // inconsistent state and have to error out.
992            anyhow::bail!(
993                "Iceberg table is in an inconsistent state: snapshot {} has operation '{}' but is missing 'mz-frontier' property. Schema or partition spec evolution is not supported.",
994                snapshot.snapshot_id(),
995                snapshot.summary().operation.as_str(),
996            );
997        }
998    }
999
1000    Ok(None)
1001}
1002
1003/// Convert a Materialize RelationDesc into Arrow and Iceberg schemas.
1004///
1005/// Returns a tuple of:
1006/// - The Arrow schema (with field IDs and Iceberg-compatible types) for writing Parquet files
1007/// - The Iceberg schema for table creation/validation
1008///
1009/// Iceberg doesn't support unsigned integer types, so we use `iceberg_type_overrides`
1010/// to map them to compatible types (e.g., UInt64 -> Decimal128(20,0)). The ArrowBuilder
1011/// handles the cross-type conversion (Datum::UInt64 -> Decimal128Builder) automatically.
1012fn relation_desc_to_iceberg_schema(
1013    desc: &mz_repr::RelationDesc,
1014) -> anyhow::Result<(ArrowSchema, SchemaRef)> {
1015    let arrow_schema =
1016        mz_arrow_util::builder::desc_to_schema_with_overrides(desc, iceberg_type_overrides)
1017            .context("Failed to convert RelationDesc to Iceberg-compatible Arrow schema")?;
1018
1019    let arrow_schema_with_ids = add_field_ids_to_arrow_schema(arrow_schema);
1020
1021    let iceberg_schema = arrow_schema_to_schema(&arrow_schema_with_ids)
1022        .context("Failed to convert Arrow schema to Iceberg schema")?;
1023
1024    Ok((arrow_schema_with_ids, Arc::new(iceberg_schema)))
1025}
1026
1027/// Resolve Materialize key column indexes to Iceberg top-level field IDs.
1028///
1029/// Iceberg field IDs are assigned recursively, so a top-level column's field ID
1030/// is not necessarily `column_index + 1` once nested fields are present.
1031fn equality_ids_for_indices(
1032    current_schema: &Schema,
1033    materialize_arrow_schema: &ArrowSchema,
1034    equality_indices: &[usize],
1035) -> anyhow::Result<Vec<i32>> {
1036    let top_level_fields = current_schema.as_struct();
1037
1038    equality_indices
1039        .iter()
1040        .map(|index| {
1041            let mz_field = materialize_arrow_schema
1042                .fields()
1043                .get(*index)
1044                .with_context(|| format!("Equality delete key index {index} is out of bounds"))?;
1045            let field_name = mz_field.name();
1046            let iceberg_field = top_level_fields
1047                .field_by_name(field_name)
1048                .with_context(|| {
1049                    format!(
1050                        "Equality delete key column '{}' not found in Iceberg table schema",
1051                        field_name
1052                    )
1053                })?;
1054            Ok(iceberg_field.id)
1055        })
1056        .collect()
1057}
1058
1059/// Build a new Arrow schema by adding an __op column to the existing schema.
1060fn build_schema_with_op_column(schema: &ArrowSchema) -> ArrowSchema {
1061    let mut fields: Vec<Arc<Field>> = schema.fields().iter().cloned().collect();
1062    fields.push(Arc::new(Field::new("__op", DataType::Int32, false)));
1063    ArrowSchema::new(fields)
1064}
1065
1066/// Build a new Arrow schema by appending `_mz_diff` (Int32) and `_mz_timestamp` (Int64) columns.
1067/// These are user-visible Iceberg columns written in append mode. Parquet field IDs are
1068/// assigned sequentially after the existing maximum field ID so the extended schema can
1069/// be converted to a valid Iceberg schema via `arrow_schema_to_schema`.
1070#[allow(clippy::disallowed_types)]
1071fn build_schema_with_append_columns(schema: &ArrowSchema) -> ArrowSchema {
1072    use mz_storage_types::sinks::{ICEBERG_APPEND_DIFF_COLUMN, ICEBERG_APPEND_TIMESTAMP_COLUMN};
1073    let mut fields: Vec<Arc<Field>> = schema.fields().iter().cloned().collect();
1074    fields.push(Arc::new(Field::new(
1075        ICEBERG_APPEND_DIFF_COLUMN,
1076        DataType::Int32,
1077        false,
1078    )));
1079    fields.push(Arc::new(Field::new(
1080        ICEBERG_APPEND_TIMESTAMP_COLUMN,
1081        DataType::Int64,
1082        false,
1083    )));
1084
1085    add_field_ids_to_arrow_schema(ArrowSchema::new(fields).with_metadata(schema.metadata().clone()))
1086}
1087
1088/// Generate time-based batch boundaries for grouping writes into Iceberg snapshots.
1089/// Batches are minted with configurable windows to balance write efficiency with latency.
1090/// We maintain a sliding window of future batch descriptions so writers can start
1091/// processing data even while earlier batches are still being written.
1092fn mint_batch_descriptions<'scope, D>(
1093    name: String,
1094    sink_id: GlobalId,
1095    input: VecCollection<'scope, Timestamp, D, Diff>,
1096    sink: &StorageSinkDesc<CollectionMetadata, Timestamp>,
1097    connection: IcebergSinkConnection,
1098    storage_configuration: StorageConfiguration,
1099    initial_schema: SchemaRef,
1100) -> (
1101    VecCollection<'scope, Timestamp, D, Diff>,
1102    StreamVec<'scope, Timestamp, (Antichain<Timestamp>, Antichain<Timestamp>)>,
1103    StreamVec<'scope, Timestamp, Infallible>,
1104    StreamVec<'scope, Timestamp, HealthStatusMessage>,
1105    PressOnDropButton,
1106)
1107where
1108    D: Clone + 'static,
1109{
1110    let scope = input.scope();
1111    let name_for_error = name.clone();
1112    let name_for_logging = name.clone();
1113    let mut builder = OperatorBuilder::new(name, scope.clone());
1114    let sink_version = sink.version;
1115
1116    let hashed_id = sink_id.hashed();
1117    let is_active_worker = usize::cast_from(hashed_id) % scope.peers() == scope.index();
1118    let (_, table_ready_stream) = builder.new_output::<CapacityContainerBuilder<Vec<_>>>();
1119    let (output, output_stream) = builder.new_output();
1120    let (batch_desc_output, batch_desc_stream) =
1121        builder.new_output::<CapacityContainerBuilder<Vec<_>>>();
1122    let mut input =
1123        builder.new_input_for_many(input.inner, Pipeline, [&output, &batch_desc_output]);
1124
1125    let as_of = sink.as_of.clone();
1126    let commit_interval = sink
1127        .commit_interval
1128        .expect("the planner should have enforced this")
1129        .clone();
1130
1131    let (button, errors): (_, StreamVec<'scope, Timestamp, Rc<anyhow::Error>>) =
1132        builder.build_fallible(move |caps| {
1133        Box::pin(async move {
1134            let [table_ready_capset, data_capset, capset]: &mut [_; 3] = caps.try_into().unwrap();
1135            *data_capset = CapabilitySet::new();
1136
1137            if !is_active_worker {
1138                *capset = CapabilitySet::new();
1139                *data_capset = CapabilitySet::new();
1140                *table_ready_capset = CapabilitySet::new();
1141                while let Some(event) = input.next().await {
1142                    match event {
1143                        Event::Data([output_cap, _], mut data) => {
1144                            output.give_container(&output_cap, &mut data);
1145                        }
1146                        Event::Progress(_) => {}
1147                    }
1148                }
1149                return Ok(());
1150            }
1151
1152            let catalog = connection
1153                .catalog_connection
1154                .connect(&storage_configuration, InTask::Yes)
1155                .await
1156                .with_context(|| {
1157                    format!(
1158                        "Failed to connect to Iceberg catalog '{}' for table '{}.{}'",
1159                        connection.catalog_connection.uri, connection.namespace, connection.table
1160                    )
1161                })?;
1162
1163            let table = load_or_create_table(
1164                catalog.as_ref(),
1165                connection.namespace.clone(),
1166                connection.table.clone(),
1167                initial_schema.as_ref(),
1168            )
1169            .await?;
1170            debug!(
1171                ?sink_id,
1172                %name_for_logging,
1173                namespace = %connection.namespace,
1174                table = %connection.table,
1175                "iceberg mint loaded/created table"
1176            );
1177
1178            *table_ready_capset = CapabilitySet::new();
1179
1180            let mut snapshots: Vec<_> = table.metadata().snapshots().cloned().collect();
1181            let resume = retrieve_upper_from_snapshots(&mut snapshots)?;
1182            let (resume_upper, resume_version) = match resume {
1183                Some((f, v)) => (f, v),
1184                None => (Antichain::from_elem(Timestamp::minimum()), 0),
1185            };
1186            debug!(
1187                ?sink_id,
1188                %name_for_logging,
1189                resume_upper = %resume_upper.pretty(),
1190                resume_version,
1191                as_of = %as_of.pretty(),
1192                "iceberg mint resume position loaded"
1193            );
1194
1195            // The input has overcompacted if
1196            let overcompacted =
1197                // ..we have made some progress in the past
1198                *resume_upper != [Timestamp::minimum()] &&
1199                // ..but the since frontier is now beyond that
1200                PartialOrder::less_than(&resume_upper, &as_of);
1201
1202            if overcompacted {
1203                let err = format!(
1204                    "{name_for_error}: input compacted past resume upper: as_of {}, resume_upper: {}",
1205                    as_of.pretty(),
1206                    resume_upper.pretty()
1207                );
1208                // This would normally be an assertion but because it can happen after a
1209                // Materialize backup/restore we log an error so that it appears on Sentry but
1210                // leaves the rest of the objects in the cluster unaffected.
1211                return Err(anyhow::anyhow!("{err}"));
1212            };
1213
1214            if resume_version > sink_version {
1215                anyhow::bail!("Fenced off by newer sink version: resume_version {}, sink_version {}", resume_version, sink_version);
1216            }
1217
1218            let mut initialized = false;
1219            let mut observed_frontier;
1220            let mut max_seen_ts: Option<Timestamp> = None;
1221            // Track minted batches to maintain a sliding window of open batch descriptions.
1222            // This is needed to know when to retire old batches and mint new ones.
1223            // It's "sortedness" is derived from the monotonicity of batch descriptions,
1224            // and the fact that we only ever push new descriptions to the back and pop from the front.
1225            let mut minted_batches = VecDeque::new();
1226            loop {
1227                if let Some(event) = input.next().await {
1228                    match event {
1229                        Event::Data([output_cap, _], mut data) => {
1230                            if !initialized {
1231                                for (_, ts, _) in data.iter() {
1232                                    match max_seen_ts.as_mut() {
1233                                        Some(max) => {
1234                                            if max.less_than(ts) {
1235                                                *max = ts.clone();
1236                                            }
1237                                        }
1238                                        None => {
1239                                            max_seen_ts = Some(ts.clone());
1240                                        }
1241                                    }
1242                                }
1243                            }
1244                            output.give_container(&output_cap, &mut data);
1245                            continue;
1246                        }
1247                        Event::Progress(frontier) => {
1248                            observed_frontier = frontier;
1249                        }
1250                    }
1251                } else {
1252                    return Ok(());
1253                }
1254
1255                if !initialized {
1256                    if observed_frontier.is_empty() {
1257                        // Bounded inputs can close (frontier becomes empty) before we finish
1258                        // initialization. For example, a loadgen source configured for a finite
1259                        // dataset may emit all rows at time t and then immediately close. If we
1260                        // saw any data, synthesize an upper one tick past the maximum timestamp
1261                        // so we can mint a snapshot batch and commit it.
1262                        if let Some(max_ts) = max_seen_ts.as_ref() {
1263                            let synthesized_upper =
1264                                Antichain::from_elem(max_ts.step_forward());
1265                            debug!(
1266                                ?sink_id,
1267                                %name_for_logging,
1268                                max_seen_ts = %max_ts,
1269                                synthesized_upper = %synthesized_upper.pretty(),
1270                                "iceberg mint input closed before initialization; using max seen ts"
1271                            );
1272                            observed_frontier = synthesized_upper;
1273                        } else {
1274                            debug!(
1275                                ?sink_id,
1276                                %name_for_logging,
1277                                "iceberg mint input closed before initialization with no data"
1278                            );
1279                            // Input stream closed before initialization completed and no data arrived.
1280                            return Ok(());
1281                        }
1282                    }
1283
1284                    // We only start minting after we've reached as_of and resume_upper to avoid
1285                    // minting batches that would be immediately skipped.
1286                    if PartialOrder::less_than(&observed_frontier, &resume_upper)
1287                        || PartialOrder::less_than(&observed_frontier, &as_of)
1288                    {
1289                        continue;
1290                    }
1291
1292                    let mut batch_descriptions = vec![];
1293                    let mut current_upper = observed_frontier.clone();
1294                    let current_upper_ts = current_upper.as_option().expect("frontier not empty").clone();
1295
1296                    // Create a catch-up batch from the later of resume_upper or as_of to current frontier.
1297                    // We use the later of the two because:
1298                    // - For fresh sinks: resume_upper = minimum, as_of = actual timestamp, data starts at as_of
1299                    // - For resuming: as_of <= resume_upper (enforced by overcompaction check), data starts at resume_upper
1300                    let batch_lower = if PartialOrder::less_than(&resume_upper, &as_of) {
1301                        as_of.clone()
1302                    } else {
1303                        resume_upper.clone()
1304                    };
1305
1306                    if batch_lower == current_upper {
1307                        // Snapshot! as_of is exactly at the frontier. We still need to mint
1308                        // a batch to create the snapshot, so we step the upper forward by one.
1309                        current_upper = Antichain::from_elem(current_upper_ts.step_forward());
1310                    }
1311
1312                    let batch_description = (batch_lower.clone(), current_upper.clone());
1313                    debug!(
1314                        ?sink_id,
1315                        %name_for_logging,
1316                        batch_lower = %batch_lower.pretty(),
1317                        current_upper = %current_upper.pretty(),
1318                        "iceberg mint initializing (catch-up batch)"
1319                    );
1320                    debug!(
1321                        "{}: creating catch-up batch [{}, {})",
1322                        name_for_logging,
1323                        batch_lower.pretty(),
1324                        current_upper.pretty()
1325                    );
1326                    batch_descriptions.push(batch_description);
1327                    // Mint initial future batch descriptions at configurable intervals
1328                    for i in 1..INITIAL_DESCRIPTIONS_TO_MINT {
1329                        let duration_millis = commit_interval.as_millis()
1330                            .checked_mul(u128::from(i))
1331                            .expect("commit interval multiplication overflow");
1332                        let duration_ts = Timestamp::new(
1333                            u64::try_from(duration_millis)
1334                                .expect("commit interval too large for u64"),
1335                        );
1336                        let desired_batch_upper = Antichain::from_elem(
1337                            current_upper_ts.step_forward_by(&duration_ts),
1338                        );
1339
1340                        let batch_description =
1341                            (current_upper.clone(), desired_batch_upper.clone());
1342                        debug!(
1343                            "{}: minting future batch {}/{} [{}, {})",
1344                            name_for_logging,
1345                            i,
1346                            INITIAL_DESCRIPTIONS_TO_MINT,
1347                            current_upper.pretty(),
1348                            desired_batch_upper.pretty()
1349                        );
1350                        current_upper = batch_description.1.clone();
1351                        batch_descriptions.push(batch_description);
1352                    }
1353
1354                    minted_batches.extend(batch_descriptions.clone());
1355
1356                    for desc in batch_descriptions {
1357                        batch_desc_output.give(&capset[0], desc);
1358                    }
1359
1360                    capset.downgrade(current_upper);
1361
1362                    initialized = true;
1363                } else {
1364                    if observed_frontier.is_empty() {
1365                        // We're done!
1366                        return Ok(());
1367                    }
1368                    // Maintain a sliding window: when the oldest batch becomes ready, retire it
1369                    // and mint a new future batch to keep the pipeline full
1370                    while let Some(oldest_desc) = minted_batches.front() {
1371                        let oldest_upper = &oldest_desc.1;
1372                        if !PartialOrder::less_equal(oldest_upper, &observed_frontier) {
1373                            break;
1374                        }
1375
1376                        let newest_upper = minted_batches.back().unwrap().1.clone();
1377                        let new_lower = newest_upper.clone();
1378                        let duration_ts = Timestamp::new(commit_interval.as_millis()
1379                            .try_into()
1380                            .expect("commit interval too large for u64"));
1381                        let new_upper = Antichain::from_elem(newest_upper
1382                            .as_option()
1383                            .unwrap()
1384                            .step_forward_by(&duration_ts));
1385
1386                        let new_batch_description = (new_lower.clone(), new_upper.clone());
1387                        minted_batches.pop_front();
1388                        minted_batches.push_back(new_batch_description.clone());
1389
1390                        batch_desc_output.give(&capset[0], new_batch_description);
1391
1392                        capset.downgrade(new_upper);
1393                    }
1394                }
1395            }
1396        })
1397    });
1398
1399    let statuses = errors.map(|error| HealthStatusMessage {
1400        id: None,
1401        update: HealthStatusUpdate::halting(format!("{}", error.display_with_causes()), None),
1402        namespace: StatusNamespace::Iceberg,
1403    });
1404    (
1405        output_stream.as_collection(),
1406        batch_desc_stream,
1407        table_ready_stream,
1408        statuses,
1409        button.press_on_drop(),
1410    )
1411}
1412
1413#[derive(Clone, Debug, Serialize, Deserialize)]
1414#[serde(try_from = "AvroDataFile", into = "AvroDataFile")]
1415struct SerializableDataFile {
1416    pub data_file: DataFile,
1417    pub schema: Schema,
1418}
1419
1420/// A wrapper around Iceberg's DataFile that implements Serialize and Deserialize.
1421/// This is slightly complicated by the fact that Iceberg's DataFile doesn't implement
1422/// these traits directly, so we serialize to/from Avro bytes (which Iceberg supports natively).
1423/// The avro ser(de) also requires the Iceberg schema to be provided, so we include that as well.
1424/// It is distinctly possible that this is overkill, but it avoids re-implementing
1425/// Iceberg's serialization logic here.
1426/// If at some point this becomes a serious overhead, we can revisit this decision.
1427#[derive(Clone, Debug, Serialize, Deserialize)]
1428struct AvroDataFile {
1429    pub data_file: Vec<u8>,
1430    /// Schema serialized as JSON bytes to avoid bincode issues with HashMap
1431    pub schema: Vec<u8>,
1432}
1433
1434impl From<SerializableDataFile> for AvroDataFile {
1435    fn from(value: SerializableDataFile) -> Self {
1436        let mut data_file = Vec::new();
1437        write_data_files_to_avro(
1438            &mut data_file,
1439            [value.data_file],
1440            &StructType::new(vec![]),
1441            FormatVersion::V2,
1442        )
1443        .expect("serialization into buffer");
1444        let schema = serde_json::to_vec(&value.schema).expect("schema serialization");
1445        AvroDataFile { data_file, schema }
1446    }
1447}
1448
1449impl TryFrom<AvroDataFile> for SerializableDataFile {
1450    type Error = String;
1451
1452    fn try_from(value: AvroDataFile) -> Result<Self, Self::Error> {
1453        let schema: Schema = serde_json::from_slice(&value.schema)
1454            .map_err(|e| format!("Failed to deserialize schema: {}", e))?;
1455        let data_files = read_data_files_from_avro(
1456            &mut &*value.data_file,
1457            &schema,
1458            0,
1459            &StructType::new(vec![]),
1460            FormatVersion::V2,
1461        )
1462        .map_err_to_string_with_causes()?;
1463        let Some(data_file) = data_files.into_iter().next() else {
1464            return Err("No DataFile found in Avro data".into());
1465        };
1466        Ok(SerializableDataFile { data_file, schema })
1467    }
1468}
1469
1470/// A DataFile along with its associated batch description (lower and upper bounds).
1471#[derive(Clone, Debug, Serialize, Deserialize)]
1472struct BoundedDataFile {
1473    pub data_file: SerializableDataFile,
1474    pub batch_desc: (Antichain<Timestamp>, Antichain<Timestamp>),
1475}
1476
1477impl BoundedDataFile {
1478    pub fn new(
1479        file: DataFile,
1480        schema: Schema,
1481        batch_desc: (Antichain<Timestamp>, Antichain<Timestamp>),
1482    ) -> Self {
1483        Self {
1484            data_file: SerializableDataFile {
1485                data_file: file,
1486                schema,
1487            },
1488            batch_desc,
1489        }
1490    }
1491
1492    pub fn batch_desc(&self) -> &(Antichain<Timestamp>, Antichain<Timestamp>) {
1493        &self.batch_desc
1494    }
1495
1496    pub fn data_file(&self) -> &DataFile {
1497        &self.data_file.data_file
1498    }
1499
1500    pub fn into_data_file(self) -> DataFile {
1501        self.data_file.data_file
1502    }
1503}
1504
1505/// A set of DataFiles along with their associated batch descriptions.
1506#[derive(Clone, Debug, Default)]
1507struct BoundedDataFileSet {
1508    pub data_files: Vec<BoundedDataFile>,
1509}
1510
1511/// Construct the envelope-specific closures that [`write_data_files`] needs.
1512///
1513/// Write rows into Parquet data files bounded by batch descriptions.
1514/// Rows are matched to batches by timestamp; if a batch description hasn't arrived yet,
1515/// rows are stashed until it does. This allows batches to be minted ahead of data arrival.
1516fn write_data_files<'scope, H: EnvelopeHandler + 'static>(
1517    name: String,
1518    input: VecCollection<'scope, Timestamp, (Option<Row>, DiffPair<Row>), Diff>,
1519    batch_desc_input: StreamVec<'scope, Timestamp, (Antichain<Timestamp>, Antichain<Timestamp>)>,
1520    table_ready_stream: StreamVec<'scope, Timestamp, Infallible>,
1521    as_of: Antichain<Timestamp>,
1522    connection: IcebergSinkConnection,
1523    storage_configuration: StorageConfiguration,
1524    materialize_arrow_schema: Arc<ArrowSchema>,
1525    metrics: Arc<IcebergSinkMetrics>,
1526    statistics: SinkStatistics,
1527) -> (
1528    StreamVec<'scope, Timestamp, BoundedDataFile>,
1529    StreamVec<'scope, Timestamp, HealthStatusMessage>,
1530    PressOnDropButton,
1531) {
1532    let scope = input.scope();
1533    let name_for_logging = name.clone();
1534    let mut builder = OperatorBuilder::new(name, scope.clone());
1535
1536    let (output, output_stream) = builder.new_output::<CapacityContainerBuilder<_>>();
1537
1538    let mut table_ready_input = builder.new_disconnected_input(table_ready_stream, Pipeline);
1539    let mut batch_desc_input =
1540        builder.new_input_for(batch_desc_input.broadcast(), Pipeline, &output);
1541    let mut input = builder.new_disconnected_input(input.inner, Pipeline);
1542
1543    let (button, errors) = builder.build_fallible(move |caps| {
1544        Box::pin(async move {
1545            let [capset]: &mut [_; 1] = caps.try_into().unwrap();
1546            let catalog = connection
1547                .catalog_connection
1548                .connect(&storage_configuration, InTask::Yes)
1549                .await
1550                .with_context(|| {
1551                    format!(
1552                        "Failed to connect to Iceberg catalog '{}' for table '{}.{}'",
1553                        connection.catalog_connection.uri, connection.namespace, connection.table
1554                    )
1555                })?;
1556
1557            let namespace_ident = NamespaceIdent::new(connection.namespace.clone());
1558            let table_ident = TableIdent::new(namespace_ident, connection.table.clone());
1559            while let Some(_) = table_ready_input.next().await {
1560                // Wait for table to be ready
1561            }
1562            let table = catalog
1563                .load_table(&table_ident)
1564                .await
1565                .with_context(|| {
1566                    format!(
1567                        "Failed to load Iceberg table '{}.{}' in write_data_files operator",
1568                        connection.namespace, connection.table
1569                    )
1570                })?;
1571
1572            let table_metadata = table.metadata().clone();
1573            let current_schema = Arc::clone(table_metadata.current_schema());
1574
1575            // Merge Materialize extension metadata into the Iceberg schema.
1576            // We need extension metadata for ArrowBuilder to work correctly (it uses
1577            // extension names to know how to handle different types like records vs arrays).
1578            let arrow_schema = Arc::new(
1579                merge_materialize_metadata_into_iceberg_schema(
1580                    materialize_arrow_schema.as_ref(),
1581                    current_schema.as_ref(),
1582                )
1583                .context("Failed to merge Materialize metadata into Iceberg schema")?,
1584            );
1585
1586            // WORKAROUND: S3 Tables catalog incorrectly sets location to the metadata file path
1587            // instead of the warehouse root. Strip off the /metadata/*.metadata.json suffix.
1588            // No clear way to detect this properly right now, so we use heuristics.
1589            let location = table_metadata.location();
1590            let corrected_location = match location.rsplit_once("/metadata/") {
1591                Some((a, b)) if b.ends_with(".metadata.json") => a,
1592                _ => location,
1593            };
1594
1595            let data_location = format!("{}/data", corrected_location);
1596            let location_generator = DefaultLocationGenerator::with_data_location(data_location);
1597
1598            // Add a unique suffix to avoid filename collisions across restarts and workers
1599            let unique_suffix = format!("-{}", uuid::Uuid::new_v4());
1600            let file_name_generator = DefaultFileNameGenerator::new(
1601                PARQUET_FILE_PREFIX.to_string(),
1602                Some(unique_suffix),
1603                iceberg::spec::DataFileFormat::Parquet,
1604            );
1605
1606            let file_io = table.file_io().clone();
1607
1608            let writer_properties = WriterProperties::new();
1609
1610            let ctx = WriterContext {
1611                arrow_schema,
1612                current_schema: Arc::clone(&current_schema),
1613                file_io,
1614                location_generator,
1615                file_name_generator,
1616                writer_properties,
1617            };
1618            let handler = H::new(ctx, &connection, &materialize_arrow_schema)?;
1619
1620            // Rows can arrive before their batch description due to dataflow parallelism.
1621            // Stash them until we know which batch they belong to.
1622            let mut stashed_rows: BTreeMap<Timestamp, Vec<(Option<Row>, DiffPair<Row>)>> =
1623                BTreeMap::new();
1624
1625            // Track batches currently being written. When a row arrives, we check if it belongs
1626            // to an in-flight batch. When frontiers advance to a batch's upper, we close the
1627            // writer and emit its data files downstream.
1628            // Antichains don't implement Ord, so we use a HashMap with tuple keys instead.
1629            #[allow(clippy::disallowed_types)]
1630            let mut in_flight_batches: std::collections::HashMap<
1631                (Antichain<Timestamp>, Antichain<Timestamp>),
1632                Box<dyn IcebergWriter>,
1633            > = std::collections::HashMap::new();
1634
1635            let mut batch_description_frontier = Antichain::from_elem(Timestamp::minimum());
1636            let mut processed_batch_description_frontier =
1637                Antichain::from_elem(Timestamp::minimum());
1638            let mut input_frontier = Antichain::from_elem(Timestamp::minimum());
1639            let mut processed_input_frontier = Antichain::from_elem(Timestamp::minimum());
1640
1641            // Track the minimum batch lower bound to prune data that's already committed
1642            let mut min_batch_lower: Option<Antichain<Timestamp>> = None;
1643
1644            while !(batch_description_frontier.is_empty() && input_frontier.is_empty()) {
1645                let mut staged_messages_since_flush: u64 = 0;
1646                tokio::select! {
1647                    _ = batch_desc_input.ready() => {},
1648                    _ = input.ready() => {}
1649                }
1650
1651                while let Some(event) = batch_desc_input.next_sync() {
1652                    match event {
1653                        Event::Data(_cap, data) => {
1654                            for batch_desc in data {
1655                                let (lower, upper) = &batch_desc;
1656
1657                                // Track the minimum batch lower bound (first batch received)
1658                                if min_batch_lower.is_none() {
1659                                    min_batch_lower = Some(lower.clone());
1660                                    debug!(
1661                                        "{}: set min_batch_lower to {}",
1662                                        name_for_logging,
1663                                        lower.pretty()
1664                                    );
1665
1666                                    // Prune any stashed rows that arrived before min_batch_lower (already committed)
1667                                    let to_remove: Vec<_> = stashed_rows
1668                                        .keys()
1669                                        .filter(|ts| {
1670                                            let ts_antichain = Antichain::from_elem((*ts).clone());
1671                                            PartialOrder::less_than(&ts_antichain, lower)
1672                                        })
1673                                        .cloned()
1674                                        .collect();
1675
1676                                    if !to_remove.is_empty() {
1677                                        let mut removed_count = 0;
1678                                        for ts in to_remove {
1679                                            if let Some(rows) = stashed_rows.remove(&ts) {
1680                                                removed_count += rows.len();
1681                                                for _ in &rows {
1682                                                    metrics.stashed_rows.dec();
1683                                                }
1684                                            }
1685                                        }
1686                                        debug!(
1687                                            "{}: pruned {} already-committed rows (< min_batch_lower)",
1688                                            name_for_logging,
1689                                            removed_count
1690                                        );
1691                                    }
1692                                }
1693
1694                                // Disable seen_rows tracking for snapshot batch to save memory
1695                                let is_snapshot = lower == &as_of;
1696                                debug!(
1697                                    "{}: received batch description [{}, {}), snapshot={}",
1698                                    name_for_logging,
1699                                    lower.pretty(),
1700                                    upper.pretty(),
1701                                    is_snapshot
1702                                );
1703                                let mut batch_writer =
1704                                    handler.create_writer(is_snapshot).await?;
1705                                // Drain any stashed rows that belong to this batch
1706                                let row_ts_keys: Vec<_> = stashed_rows.keys().cloned().collect();
1707                                let mut drained_count = 0;
1708                                for row_ts in row_ts_keys {
1709                                    let ts = Antichain::from_elem(row_ts.clone());
1710                                    if PartialOrder::less_equal(lower, &ts)
1711                                        && PartialOrder::less_than(&ts, upper)
1712                                    {
1713                                        if let Some(rows) = stashed_rows.remove(&row_ts) {
1714                                            drained_count += rows.len();
1715                                            for (_row, diff_pair) in rows {
1716                                                metrics.stashed_rows.dec();
1717                                                let record_batch = handler.row_to_batch(
1718                                                    diff_pair.clone(),
1719                                                    row_ts.clone(),
1720                                                )
1721                                                .context("failed to convert row to recordbatch")?;
1722                                                batch_writer.write(record_batch).await?;
1723                                                staged_messages_since_flush += 1;
1724                                                if staged_messages_since_flush >= 10_000 {
1725                                                    statistics.inc_messages_staged_by(
1726                                                        staged_messages_since_flush,
1727                                                    );
1728                                                    staged_messages_since_flush = 0;
1729                                                }
1730                                            }
1731                                        }
1732                                    }
1733                                }
1734                                if drained_count > 0 {
1735                                    debug!(
1736                                        "{}: drained {} stashed rows into batch [{}, {})",
1737                                        name_for_logging,
1738                                        drained_count,
1739                                        lower.pretty(),
1740                                        upper.pretty()
1741                                    );
1742                                }
1743                                let prev =
1744                                    in_flight_batches.insert(batch_desc.clone(), batch_writer);
1745                                if prev.is_some() {
1746                                    anyhow::bail!(
1747                                        "Duplicate batch description received for description {:?}",
1748                                        batch_desc
1749                                    );
1750                                }
1751                            }
1752                        }
1753                        Event::Progress(frontier) => {
1754                            batch_description_frontier = frontier;
1755                        }
1756                    }
1757                }
1758
1759                let ready_events = std::iter::from_fn(|| input.next_sync()).collect_vec();
1760                for event in ready_events {
1761                    match event {
1762                        Event::Data(_cap, data) => {
1763                            let mut dropped_per_time = BTreeMap::new();
1764                            let mut stashed_per_time = BTreeMap::new();
1765                            for ((row, diff_pair), ts, _diff) in data {
1766                                let row_ts = ts.clone();
1767                                let ts_antichain = Antichain::from_elem(row_ts.clone());
1768                                let mut written = false;
1769                                // Try writing the row to any in-flight batch it belongs to...
1770                                for (batch_desc, batch_writer) in in_flight_batches.iter_mut() {
1771                                    let (lower, upper) = batch_desc;
1772                                    if PartialOrder::less_equal(lower, &ts_antichain)
1773                                        && PartialOrder::less_than(&ts_antichain, upper)
1774                                    {
1775                                        let record_batch = handler.row_to_batch(
1776                                            diff_pair.clone(),
1777                                            row_ts.clone(),
1778                                        )
1779                                        .context("failed to convert row to recordbatch")?;
1780                                        batch_writer.write(record_batch).await?;
1781                                        staged_messages_since_flush += 1;
1782                                        if staged_messages_since_flush >= 10_000 {
1783                                            statistics.inc_messages_staged_by(
1784                                                staged_messages_since_flush,
1785                                            );
1786                                            staged_messages_since_flush = 0;
1787                                        }
1788                                        written = true;
1789                                        break;
1790                                    }
1791                                }
1792                                if !written {
1793                                    // Drop data that's before the first batch we received (already committed)
1794                                    if let Some(ref min_lower) = min_batch_lower {
1795                                        if PartialOrder::less_than(&ts_antichain, min_lower) {
1796                                            dropped_per_time
1797                                                .entry(ts_antichain.into_option().unwrap())
1798                                                .and_modify(|c| *c += 1)
1799                                                .or_insert(1);
1800                                            continue;
1801                                        }
1802                                    }
1803
1804                                    stashed_per_time.entry(ts).and_modify(|c| *c += 1).or_insert(1);
1805                                    let entry = stashed_rows.entry(row_ts).or_default();
1806                                    metrics.stashed_rows.inc();
1807                                    entry.push((row, diff_pair));
1808                                }
1809                            }
1810
1811                            for (ts, count) in dropped_per_time {
1812                                debug!(
1813                                    "{}: dropped {} rows at timestamp {} (< min_batch_lower, already committed)",
1814                                    name_for_logging, count, ts
1815                                );
1816                            }
1817
1818                            for (ts, count) in stashed_per_time {
1819                                debug!(
1820                                    "{}: stashed {} rows at timestamp {} (waiting for batch description)",
1821                                    name_for_logging, count, ts
1822                                );
1823                            }
1824                        }
1825                        Event::Progress(frontier) => {
1826                            input_frontier = frontier;
1827                        }
1828                    }
1829                }
1830                if staged_messages_since_flush > 0 {
1831                    statistics.inc_messages_staged_by(staged_messages_since_flush);
1832                }
1833
1834                // Check if frontiers have advanced, which may unlock batches ready to close
1835                if PartialOrder::less_than(
1836                    &processed_batch_description_frontier,
1837                    &batch_description_frontier,
1838                ) || PartialOrder::less_than(&processed_input_frontier, &input_frontier)
1839                {
1840                    // Close batches whose upper is now in the past
1841                    // Upper bounds are exclusive, so we check if upper is less_equal to the frontier.
1842                    // Remember: a frontier at x means all timestamps less than x have been observed.
1843                    // Or, in other words we still might yet see timestamps at [x, infinity). X itself will
1844                    // be covered by the _next_ batches lower inclusive bound, so we can safely close the batch if its upper is <= x.
1845                    let ready_batches: Vec<_> = in_flight_batches
1846                        .extract_if(|(lower, upper), _| {
1847                            PartialOrder::less_than(lower, &batch_description_frontier)
1848                                && PartialOrder::less_equal(upper, &input_frontier)
1849                        })
1850                        .collect();
1851
1852                    if !ready_batches.is_empty() {
1853                        debug!(
1854                            "{}: closing {} batches (batch_frontier: {}, input_frontier: {})",
1855                            name_for_logging,
1856                            ready_batches.len(),
1857                            batch_description_frontier.pretty(),
1858                            input_frontier.pretty()
1859                        );
1860                        let mut max_upper = Antichain::from_elem(Timestamp::minimum());
1861                        for (desc, mut batch_writer) in ready_batches {
1862                            let close_started_at = Instant::now();
1863                            let data_files = batch_writer.close().await;
1864                            metrics
1865                                .writer_close_duration_seconds
1866                                .observe(close_started_at.elapsed().as_secs_f64());
1867                            let data_files = data_files.context("Failed to close batch writer")?;
1868                            debug!(
1869                                "{}: closed batch [{}, {}), wrote {} files",
1870                                name_for_logging,
1871                                desc.0.pretty(),
1872                                desc.1.pretty(),
1873                                data_files.len()
1874                            );
1875                            for data_file in data_files {
1876                                match data_file.content_type() {
1877                                    iceberg::spec::DataContentType::Data => {
1878                                        metrics.data_files_written.inc();
1879                                    }
1880                                    iceberg::spec::DataContentType::PositionDeletes
1881                                    | iceberg::spec::DataContentType::EqualityDeletes => {
1882                                        metrics.delete_files_written.inc();
1883                                    }
1884                                }
1885                                statistics.inc_messages_staged_by(data_file.record_count());
1886                                statistics.inc_bytes_staged_by(data_file.file_size_in_bytes());
1887                                let file = BoundedDataFile::new(
1888                                    data_file,
1889                                    current_schema.as_ref().clone(),
1890                                    desc.clone(),
1891                                );
1892                                output.give(&capset[0], file);
1893                            }
1894
1895                            max_upper = max_upper.join(&desc.1);
1896                        }
1897
1898                        capset.downgrade(max_upper);
1899                    }
1900                    processed_batch_description_frontier.clone_from(&batch_description_frontier);
1901                    processed_input_frontier.clone_from(&input_frontier);
1902                }
1903            }
1904            Ok(())
1905        })
1906    });
1907
1908    let statuses = errors.map(|error| HealthStatusMessage {
1909        id: None,
1910        update: HealthStatusUpdate::halting(format!("{}", error.display_with_causes()), None),
1911        namespace: StatusNamespace::Iceberg,
1912    });
1913    (output_stream, statuses, button.press_on_drop())
1914}
1915
1916#[cfg(test)]
1917mod tests {
1918    use super::*;
1919    use iceberg::spec::{PrimitiveType, Type};
1920    use mz_repr::SqlScalarType;
1921    use mz_storage_types::sinks::ICEBERG_UINT64_DECIMAL_PRECISION;
1922
1923    #[mz_ore::test]
1924    fn test_iceberg_type_overrides() {
1925        // UInt16 should override to Int32
1926        let result = iceberg_type_overrides(&SqlScalarType::UInt16);
1927        assert_eq!(result.unwrap().0, DataType::Int32);
1928
1929        // UInt32 should override to Int64
1930        let result = iceberg_type_overrides(&SqlScalarType::UInt32);
1931        assert_eq!(result.unwrap().0, DataType::Int64);
1932
1933        // UInt64 should override to Decimal128(20, 0)
1934        let result = iceberg_type_overrides(&SqlScalarType::UInt64);
1935        assert_eq!(
1936            result.unwrap().0,
1937            DataType::Decimal128(ICEBERG_UINT64_DECIMAL_PRECISION, 0)
1938        );
1939
1940        // MzTimestamp should override to Decimal128(20, 0)
1941        let result = iceberg_type_overrides(&SqlScalarType::MzTimestamp);
1942        assert_eq!(
1943            result.unwrap().0,
1944            DataType::Decimal128(ICEBERG_UINT64_DECIMAL_PRECISION, 0)
1945        );
1946
1947        // Other types should return None (use default)
1948        assert!(iceberg_type_overrides(&SqlScalarType::Int32).is_none());
1949        assert!(iceberg_type_overrides(&SqlScalarType::String).is_none());
1950        assert!(iceberg_type_overrides(&SqlScalarType::Bool).is_none());
1951    }
1952
1953    #[mz_ore::test]
1954    fn test_iceberg_schema_with_nested_uint64() {
1955        // Test that desc_to_schema_with_overrides handles nested UInt64
1956        // by using iceberg_type_overrides which applies recursively
1957        let desc = mz_repr::RelationDesc::builder()
1958            .with_column(
1959                "items",
1960                SqlScalarType::List {
1961                    element_type: Box::new(SqlScalarType::UInt64),
1962                    custom_id: None,
1963                }
1964                .nullable(true),
1965            )
1966            .finish();
1967
1968        let schema =
1969            mz_arrow_util::builder::desc_to_schema_with_overrides(&desc, iceberg_type_overrides)
1970                .expect("schema conversion should succeed");
1971
1972        // The inner element should be Decimal128, not UInt64
1973        if let DataType::List(field) = schema.field(0).data_type() {
1974            assert_eq!(
1975                field.data_type(),
1976                &DataType::Decimal128(ICEBERG_UINT64_DECIMAL_PRECISION, 0)
1977            );
1978        } else {
1979            panic!("Expected List type");
1980        }
1981    }
1982
1983    #[mz_ore::test]
1984    fn test_iceberg_interval_override() {
1985        // Interval should override to LargeUtf8 (string) for Iceberg
1986        let result = iceberg_type_overrides(&SqlScalarType::Interval);
1987        assert_eq!(result.unwrap().0, DataType::LargeUtf8);
1988
1989        // Test full schema conversion with interval column
1990        let desc = mz_repr::RelationDesc::builder()
1991            .with_column("id", SqlScalarType::Int32.nullable(false))
1992            .with_column("dur", SqlScalarType::Interval.nullable(true))
1993            .finish();
1994
1995        let (arrow_schema, iceberg_schema) =
1996            relation_desc_to_iceberg_schema(&desc).expect("schema conversion should succeed");
1997
1998        // Arrow schema should have LargeUtf8 for interval
1999        assert_eq!(arrow_schema.field(1).data_type(), &DataType::LargeUtf8);
2000
2001        // Iceberg schema should have String type
2002        let field = iceberg_schema
2003            .as_struct()
2004            .field_by_name("dur")
2005            .expect("field should exist");
2006        assert_eq!(*field.field_type, Type::Primitive(PrimitiveType::String));
2007    }
2008
2009    #[mz_ore::test]
2010    fn test_iceberg_range_schema() {
2011        // Test full schema conversion with range column
2012        let desc = mz_repr::RelationDesc::builder()
2013            .with_column("id", SqlScalarType::Int32.nullable(false))
2014            .with_column(
2015                "r",
2016                SqlScalarType::Range {
2017                    element_type: Box::new(SqlScalarType::Int32),
2018                }
2019                .nullable(true),
2020            )
2021            .finish();
2022
2023        let (_arrow_schema, iceberg_schema) =
2024            relation_desc_to_iceberg_schema(&desc).expect("schema conversion should succeed");
2025
2026        // Iceberg schema should have a struct type for the range
2027        let field = iceberg_schema
2028            .as_struct()
2029            .field_by_name("r")
2030            .expect("field should exist");
2031        assert!(
2032            matches!(&*field.field_type, Type::Struct(_)),
2033            "range should be struct, got: {:?}",
2034            field.field_type
2035        );
2036    }
2037
2038    #[mz_ore::test]
2039    fn equality_ids_follow_iceberg_field_ids() {
2040        let map_entries = Field::new(
2041            "entries",
2042            DataType::Struct(
2043                vec![
2044                    Field::new("key", DataType::Utf8, false),
2045                    Field::new("value", DataType::Utf8, true),
2046                ]
2047                .into(),
2048            ),
2049            false,
2050        );
2051        let materialize_arrow_schema = ArrowSchema::new(vec![
2052            Field::new("attrs", DataType::Map(Arc::new(map_entries), false), true),
2053            Field::new("key_col", DataType::Int32, false),
2054        ]);
2055        let materialize_arrow_schema = add_field_ids_to_arrow_schema(materialize_arrow_schema);
2056        let iceberg_schema = arrow_schema_to_schema(&materialize_arrow_schema)
2057            .expect("schema conversion should succeed");
2058
2059        let equality_ids =
2060            equality_ids_for_indices(&iceberg_schema, &materialize_arrow_schema, &[1])
2061                .expect("field lookup should succeed");
2062
2063        let expected_id = iceberg_schema
2064            .as_struct()
2065            .field_by_name("key_col")
2066            .expect("top-level field should exist")
2067            .id;
2068        assert_eq!(equality_ids, vec![expected_id]);
2069        assert_ne!(expected_id, 2);
2070    }
2071
2072    /// Regression test: iceberg-rust names map fields `key_value`/`key`/`value`
2073    /// while Materialize uses `entries`/`keys`/`values`. The schema merge must
2074    /// still copy the value field's extension metadata across so ArrowBuilder
2075    /// can build the inner builder.
2076    #[mz_ore::test]
2077    #[allow(clippy::disallowed_types)]
2078    fn merge_map_entries_preserves_value_extension_metadata() {
2079        use std::collections::HashMap;
2080
2081        let mz_value_metadata = HashMap::from([(
2082            ARROW_EXTENSION_NAME_KEY.to_string(),
2083            "materialize.v1.string".to_string(),
2084        )]);
2085        let mz_entries = Field::new(
2086            "entries",
2087            DataType::Struct(
2088                vec![
2089                    Field::new("keys", DataType::Utf8, false),
2090                    Field::new("values", DataType::Utf8, true).with_metadata(mz_value_metadata),
2091                ]
2092                .into(),
2093            ),
2094            false,
2095        );
2096        let mz_map = Field::new("m", DataType::Map(Arc::new(mz_entries), false), true)
2097            .with_metadata(HashMap::from([(
2098                ARROW_EXTENSION_NAME_KEY.to_string(),
2099                "materialize.v1.map".to_string(),
2100            )]));
2101
2102        let iceberg_entries = Field::new(
2103            "key_value",
2104            DataType::Struct(
2105                vec![
2106                    Field::new("key", DataType::Utf8, false),
2107                    Field::new("value", DataType::Utf8, true),
2108                ]
2109                .into(),
2110            ),
2111            false,
2112        );
2113        let iceberg_map = Field::new("m", DataType::Map(Arc::new(iceberg_entries), false), true);
2114
2115        let merged = merge_field_metadata_recursive(&iceberg_map, Some(&mz_map))
2116            .expect("merge should succeed");
2117
2118        let entries = match merged.data_type() {
2119            DataType::Map(entries, _) => entries.as_ref(),
2120            other => panic!("expected Map, got {other:?}"),
2121        };
2122        let entry_fields = match entries.data_type() {
2123            DataType::Struct(fields) => fields,
2124            other => panic!("expected Struct, got {other:?}"),
2125        };
2126        // Iceberg naming must be preserved on the merged schema...
2127        assert_eq!(entry_fields[0].name(), "key");
2128        assert_eq!(entry_fields[1].name(), "value");
2129        // ...and the materialize extension must have been copied positionally
2130        // to the value field even though its name didn't match `values`.
2131        assert_eq!(
2132            entry_fields[1].metadata().get(ARROW_EXTENSION_NAME_KEY),
2133            Some(&"materialize.v1.string".to_string()),
2134        );
2135    }
2136}
2137
2138/// Commit completed batches to Iceberg as snapshots.
2139/// Batches are committed in timestamp order to ensure strong consistency guarantees downstream.
2140/// Each snapshot includes the Materialize frontier in its metadata for resume support.
2141fn commit_to_iceberg<'scope>(
2142    name: String,
2143    sink_id: GlobalId,
2144    sink_version: u64,
2145    batch_input: StreamVec<'scope, Timestamp, BoundedDataFile>,
2146    batch_desc_input: StreamVec<'scope, Timestamp, (Antichain<Timestamp>, Antichain<Timestamp>)>,
2147    table_ready_stream: StreamVec<'scope, Timestamp, Infallible>,
2148    write_frontier: Rc<RefCell<Antichain<Timestamp>>>,
2149    connection: IcebergSinkConnection,
2150    storage_configuration: StorageConfiguration,
2151    write_handle: impl Future<
2152        Output = anyhow::Result<WriteHandle<SourceData, (), Timestamp, StorageDiff>>,
2153    > + 'static,
2154    metrics: Arc<IcebergSinkMetrics>,
2155    statistics: SinkStatistics,
2156) -> (
2157    StreamVec<'scope, Timestamp, HealthStatusMessage>,
2158    PressOnDropButton,
2159) {
2160    let scope = batch_input.scope();
2161    let mut builder = OperatorBuilder::new(name, scope.clone());
2162
2163    let hashed_id = sink_id.hashed();
2164    let is_active_worker = usize::cast_from(hashed_id) % scope.peers() == scope.index();
2165    let name_for_logging = format!("{sink_id}-commit-to-iceberg");
2166
2167    let mut input = builder.new_disconnected_input(batch_input, Exchange::new(move |_| hashed_id));
2168    let mut batch_desc_input =
2169        builder.new_disconnected_input(batch_desc_input, Exchange::new(move |_| hashed_id));
2170    let mut table_ready_input = builder.new_disconnected_input(table_ready_stream, Pipeline);
2171
2172    let (button, errors) = builder.build_fallible(move |_caps| {
2173        Box::pin(async move {
2174            if !is_active_worker {
2175                write_frontier.borrow_mut().clear();
2176                return Ok(());
2177            }
2178
2179            let catalog = connection
2180                .catalog_connection
2181                .connect(&storage_configuration, InTask::Yes)
2182                .await
2183                .with_context(|| {
2184                    format!(
2185                        "Failed to connect to Iceberg catalog '{}' for table '{}.{}'",
2186                        connection.catalog_connection.uri, connection.namespace, connection.table
2187                    )
2188                })?;
2189
2190            let mut write_handle = write_handle.await?;
2191
2192            let namespace_ident = NamespaceIdent::new(connection.namespace.clone());
2193            let table_ident = TableIdent::new(namespace_ident, connection.table.clone());
2194            while let Some(_) = table_ready_input.next().await {
2195                // Wait for table to be ready
2196            }
2197            let mut table = catalog.load_table(&table_ident).await.with_context(|| {
2198                format!(
2199                    "Failed to load Iceberg table '{}.{}' in commit_to_iceberg operator",
2200                    connection.namespace, connection.table
2201                )
2202            })?;
2203
2204            #[allow(clippy::disallowed_types)]
2205            let mut batch_descriptions: std::collections::HashMap<
2206                (Antichain<Timestamp>, Antichain<Timestamp>),
2207                BoundedDataFileSet,
2208            > = std::collections::HashMap::new();
2209
2210            let mut batch_description_frontier = Antichain::from_elem(Timestamp::minimum());
2211            let mut input_frontier = Antichain::from_elem(Timestamp::minimum());
2212
2213            while !(batch_description_frontier.is_empty() && input_frontier.is_empty()) {
2214                tokio::select! {
2215                    _ = batch_desc_input.ready() => {},
2216                    _ = input.ready() => {}
2217                }
2218
2219                while let Some(event) = batch_desc_input.next_sync() {
2220                    match event {
2221                        Event::Data(_cap, data) => {
2222                            for batch_desc in data {
2223                                let prev = batch_descriptions
2224                                    .insert(batch_desc, BoundedDataFileSet { data_files: vec![] });
2225                                if let Some(prev) = prev {
2226                                    anyhow::bail!(
2227                                        "Duplicate batch description received \
2228                                         in commit operator: {:?}",
2229                                        prev
2230                                    );
2231                                }
2232                            }
2233                        }
2234                        Event::Progress(frontier) => {
2235                            batch_description_frontier = frontier;
2236                        }
2237                    }
2238                }
2239
2240                let ready_events = std::iter::from_fn(|| input.next_sync()).collect_vec();
2241                for event in ready_events {
2242                    match event {
2243                        Event::Data(_cap, data) => {
2244                            for bounded_data_file in data {
2245                                let entry = batch_descriptions
2246                                    .entry(bounded_data_file.batch_desc().clone())
2247                                    .or_default();
2248                                entry.data_files.push(bounded_data_file);
2249                            }
2250                        }
2251                        Event::Progress(frontier) => {
2252                            input_frontier = frontier;
2253                        }
2254                    }
2255                }
2256
2257                // Collect batches whose data files have all arrived.
2258                // The writer emits all data files for a batch at a capability <= the batch's
2259                // lower bound, then downgrades its capability to the batch's upper bound.
2260                // So once the input frontier advances past lower, we know the writer has
2261                // finished emitting files for this batch and dropped its capability.
2262                let mut done_batches: Vec<_> = batch_descriptions
2263                    .keys()
2264                    .filter(|(lower, _upper)| PartialOrder::less_than(lower, &input_frontier))
2265                    .cloned()
2266                    .collect();
2267
2268                // Commit batches in timestamp order to maintain consistency
2269                done_batches.sort_by(|a, b| {
2270                    if PartialOrder::less_than(a, b) {
2271                        Ordering::Less
2272                    } else if PartialOrder::less_than(b, a) {
2273                        Ordering::Greater
2274                    } else {
2275                        Ordering::Equal
2276                    }
2277                });
2278
2279                for batch in done_batches {
2280                    let file_set = batch_descriptions.remove(&batch).unwrap();
2281
2282                    let mut data_files = vec![];
2283                    let mut delete_files = vec![];
2284                    // Track totals for committed statistics
2285                    let mut total_messages: u64 = 0;
2286                    let mut total_bytes: u64 = 0;
2287                    for file in file_set.data_files {
2288                        total_messages += file.data_file().record_count();
2289                        total_bytes += file.data_file().file_size_in_bytes();
2290                        match file.data_file().content_type() {
2291                            iceberg::spec::DataContentType::Data => {
2292                                data_files.push(file.into_data_file());
2293                            }
2294                            iceberg::spec::DataContentType::PositionDeletes
2295                            | iceberg::spec::DataContentType::EqualityDeletes => {
2296                                delete_files.push(file.into_data_file());
2297                            }
2298                        }
2299                    }
2300
2301                    debug!(
2302                        ?sink_id,
2303                        %name_for_logging,
2304                        lower = %batch.0.pretty(),
2305                        upper = %batch.1.pretty(),
2306                        data_files = data_files.len(),
2307                        delete_files = delete_files.len(),
2308                        total_messages,
2309                        total_bytes,
2310                        "iceberg commit applying batch"
2311                    );
2312
2313                    let instant = Instant::now();
2314
2315                    let frontier = batch.1.clone();
2316                    let frontier_json = serde_json::to_string(&frontier.elements())
2317                        .context("Failed to serialize frontier to JSON")?;
2318                    let snapshot_properties = vec![
2319                        ("mz-sink-id".to_string(), sink_id.to_string()),
2320                        ("mz-frontier".to_string(), frontier_json),
2321                        ("mz-sink-version".to_string(), sink_version.to_string()),
2322                    ];
2323
2324                    let (table_state, commit_result) = Retry::default()
2325                        .max_tries(5)
2326                        .retry_async_with_state(table, |_, table| {
2327                            let snapshot_properties = snapshot_properties.clone();
2328                            let data_files = data_files.clone();
2329                            let delete_files = delete_files.clone();
2330                            let metrics = Arc::clone(&metrics);
2331                            let catalog = Arc::clone(&catalog);
2332                            let conn_namespace = connection.namespace.clone();
2333                            let conn_table = connection.table.clone();
2334                            let frontier = frontier.clone();
2335                            let batch_lower = batch.0.clone();
2336                            let batch_upper = batch.1.clone();
2337                            async move {
2338                                try_commit_batch(
2339                                    table,
2340                                    snapshot_properties,
2341                                    data_files,
2342                                    delete_files,
2343                                    catalog.as_ref(),
2344                                    &conn_namespace,
2345                                    &conn_table,
2346                                    sink_version,
2347                                    &frontier,
2348                                    &batch_lower,
2349                                    &batch_upper,
2350                                    &metrics,
2351                                )
2352                                .await
2353                            }
2354                        })
2355                        .await;
2356                    let commit_result = commit_result.with_context(|| {
2357                        format!(
2358                            "failed to commit batch to Iceberg table '{}.{}'",
2359                            connection.namespace, connection.table
2360                        )
2361                    });
2362                    table = table_state;
2363                    let duration = instant.elapsed();
2364                    metrics
2365                        .commit_duration_seconds
2366                        .observe(duration.as_secs_f64());
2367                    commit_result?;
2368
2369                    debug!(
2370                        ?sink_id,
2371                        %name_for_logging,
2372                        lower = %batch.0.pretty(),
2373                        upper = %batch.1.pretty(),
2374                        total_messages,
2375                        total_bytes,
2376                        ?duration,
2377                        "iceberg commit applied batch"
2378                    );
2379
2380                    metrics.snapshots_committed.inc();
2381                    statistics.inc_messages_committed_by(total_messages);
2382                    statistics.inc_bytes_committed_by(total_bytes);
2383
2384                    let mut expect_upper = write_handle.shared_upper();
2385                    loop {
2386                        if PartialOrder::less_equal(&frontier, &expect_upper) {
2387                            // The frontier has already been advanced as far as necessary.
2388                            break;
2389                        }
2390
2391                        const EMPTY: &[((SourceData, ()), Timestamp, StorageDiff)] = &[];
2392                        match write_handle
2393                            .compare_and_append(EMPTY, expect_upper, frontier.clone())
2394                            .await
2395                            .expect("valid usage")
2396                        {
2397                            Ok(()) => break,
2398                            Err(mismatch) => {
2399                                expect_upper = mismatch.current;
2400                            }
2401                        }
2402                    }
2403                    write_frontier.borrow_mut().clone_from(&frontier);
2404                }
2405            }
2406
2407            Ok(())
2408        })
2409    });
2410
2411    let statuses = errors.map(|error| HealthStatusMessage {
2412        id: None,
2413        update: HealthStatusUpdate::halting(format!("{}", error.display_with_causes()), None),
2414        namespace: StatusNamespace::Iceberg,
2415    });
2416
2417    (statuses, button.press_on_drop())
2418}
2419
2420impl<'scope> SinkRender<'scope> for IcebergSinkConnection {
2421    fn get_key_indices(&self) -> Option<&[usize]> {
2422        self.key_desc_and_indices
2423            .as_ref()
2424            .map(|(_, indices)| indices.as_slice())
2425    }
2426
2427    fn get_relation_key_indices(&self) -> Option<&[usize]> {
2428        self.relation_key_indices.as_deref()
2429    }
2430
2431    fn render_sink(
2432        &self,
2433        storage_state: &mut StorageState,
2434        sink: &StorageSinkDesc<CollectionMetadata, Timestamp>,
2435        sink_id: GlobalId,
2436        batches: SinkBatchStream<'scope>,
2437        key_is_synthetic: bool,
2438        _err_collection: VecCollection<'scope, Timestamp, DataflowError, Diff>,
2439    ) -> (
2440        StreamVec<'scope, Timestamp, HealthStatusMessage>,
2441        Vec<PressOnDropButton>,
2442    ) {
2443        let scope = batches.scope();
2444
2445        let (input, walker_button) = walk_sink_arrangement(
2446            format!("{sink_id}-iceberg-walker"),
2447            batches,
2448            sink_id,
2449            sink.from,
2450            key_is_synthetic,
2451        );
2452
2453        let write_handle = {
2454            let persist = Arc::clone(&storage_state.persist_clients);
2455            let shard_meta = sink.to_storage_metadata.clone();
2456            async move {
2457                let client = persist.open(shard_meta.persist_location).await?;
2458                let handle = client
2459                    .open_writer(
2460                        shard_meta.data_shard,
2461                        Arc::new(shard_meta.relation_desc),
2462                        Arc::new(UnitSchema),
2463                        Diagnostics::from_purpose("sink handle"),
2464                    )
2465                    .await?;
2466                Ok(handle)
2467            }
2468        };
2469
2470        let write_frontier = Rc::new(RefCell::new(Antichain::from_elem(Timestamp::minimum())));
2471        storage_state
2472            .sink_write_frontiers
2473            .insert(sink_id, Rc::clone(&write_frontier));
2474
2475        let (arrow_schema_with_ids, iceberg_schema) =
2476            match (|| -> Result<(ArrowSchema, Arc<Schema>), anyhow::Error> {
2477                let (arrow_schema_with_ids, iceberg_schema) =
2478                    relation_desc_to_iceberg_schema(&sink.from_desc)?;
2479
2480                Ok(if sink.envelope == SinkEnvelope::Append {
2481                    // For append mode, extend the Arrow and Iceberg schemas with the user-visible
2482                    // `_mz_diff` and `_mz_timestamp` columns. The minter uses `iceberg_schema` to create
2483                    // the Iceberg table, and `write_data_files` uses `arrow_schema_with_ids` when
2484                    // merging metadata. Both must include these columns before any operator starts.
2485                    let extended_arrow = build_schema_with_append_columns(&arrow_schema_with_ids);
2486                    let extended_iceberg = Arc::new(
2487                        arrow_schema_to_schema(&extended_arrow)
2488                            .context("Failed to build Iceberg schema with append columns")?,
2489                    );
2490                    (extended_arrow, extended_iceberg)
2491                } else {
2492                    (arrow_schema_with_ids, iceberg_schema)
2493                })
2494            })() {
2495                Ok(schemas) => schemas,
2496                Err(err) => {
2497                    let error_stream = std::iter::once(HealthStatusMessage {
2498                        id: None,
2499                        update: HealthStatusUpdate::halting(
2500                            format!("{}", err.display_with_causes()),
2501                            None,
2502                        ),
2503                        namespace: StatusNamespace::Iceberg,
2504                    })
2505                    .to_stream(scope);
2506                    return (error_stream, vec![]);
2507                }
2508            };
2509
2510        let metrics = Arc::new(
2511            storage_state
2512                .metrics
2513                .get_iceberg_sink_metrics(sink_id, scope.index()),
2514        );
2515
2516        let statistics = storage_state
2517            .aggregated_statistics
2518            .get_sink(&sink_id)
2519            .expect("statistics initialized")
2520            .clone();
2521
2522        let connection_for_minter = self.clone();
2523        let (minted_input, batch_descriptions, table_ready, mint_status, mint_button) =
2524            mint_batch_descriptions(
2525                format!("{sink_id}-iceberg-mint"),
2526                sink_id,
2527                input,
2528                sink,
2529                connection_for_minter,
2530                storage_state.storage_configuration.clone(),
2531                Arc::clone(&iceberg_schema),
2532            );
2533
2534        let connection_for_writer = self.clone();
2535        let (datafiles, write_status, write_button) = match sink.envelope {
2536            SinkEnvelope::Upsert => write_data_files::<UpsertEnvelopeHandler>(
2537                format!("{sink_id}-write-data-files"),
2538                minted_input,
2539                batch_descriptions.clone(),
2540                table_ready.clone(),
2541                sink.as_of.clone(),
2542                connection_for_writer,
2543                storage_state.storage_configuration.clone(),
2544                Arc::new(arrow_schema_with_ids.clone()),
2545                Arc::clone(&metrics),
2546                statistics.clone(),
2547            ),
2548            SinkEnvelope::Append => write_data_files::<AppendEnvelopeHandler>(
2549                format!("{sink_id}-write-data-files"),
2550                minted_input,
2551                batch_descriptions.clone(),
2552                table_ready.clone(),
2553                sink.as_of.clone(),
2554                connection_for_writer,
2555                storage_state.storage_configuration.clone(),
2556                Arc::new(arrow_schema_with_ids.clone()),
2557                Arc::clone(&metrics),
2558                statistics.clone(),
2559            ),
2560            SinkEnvelope::Debezium => {
2561                unreachable!("Iceberg sink only supports Upsert and Append envelopes")
2562            }
2563        };
2564
2565        let connection_for_committer = self.clone();
2566        let (commit_status, commit_button) = commit_to_iceberg(
2567            format!("{sink_id}-commit-to-iceberg"),
2568            sink_id,
2569            sink.version,
2570            datafiles,
2571            batch_descriptions,
2572            table_ready,
2573            Rc::clone(&write_frontier),
2574            connection_for_committer,
2575            storage_state.storage_configuration.clone(),
2576            write_handle,
2577            Arc::clone(&metrics),
2578            statistics,
2579        );
2580
2581        let running_status = Some(HealthStatusMessage {
2582            id: None,
2583            update: HealthStatusUpdate::running(),
2584            namespace: StatusNamespace::Iceberg,
2585        })
2586        .to_stream(scope);
2587
2588        let statuses =
2589            scope.concatenate([running_status, mint_status, write_status, commit_status]);
2590
2591        (
2592            statuses,
2593            vec![walker_button, mint_button, write_button, commit_button],
2594        )
2595    }
2596}
2597
2598/// Walks each arrangement batch and emits a stream of individual
2599/// `(key, DiffPair)` records that feeds the rest of the Iceberg sink pipeline.
2600///
2601/// Tracks per-`(key, timestamp)` group sizes and rate-limits a warning when a
2602/// non-synthetic key has more than one `DiffPair`. When `key_is_synthetic` the
2603/// arrangement's hash-based key is stripped before emission.
2604fn walk_sink_arrangement<'scope>(
2605    name: String,
2606    batches: SinkBatchStream<'scope>,
2607    sink_id: GlobalId,
2608    from_id: GlobalId,
2609    key_is_synthetic: bool,
2610) -> (
2611    VecCollection<'scope, Timestamp, (Option<Row>, DiffPair<Row>), Diff>,
2612    PressOnDropButton,
2613) {
2614    let mut builder = OperatorBuilder::new(name, batches.scope());
2615    let (output, stream) = builder.new_output::<CapacityContainerBuilder<Vec<_>>>();
2616    let mut input = builder.new_input_for(batches, Pipeline, &output);
2617
2618    let button = builder.build(move |_caps| async move {
2619        let mut pk_warner = (!key_is_synthetic).then(|| PkViolationWarner::new(sink_id, from_id));
2620
2621        while let Some(event) = input.next().await {
2622            if let Event::Data(cap, mut batches) = event {
2623                for batch in batches.drain(..) {
2624                    for_each_diff_pair(&batch, |key, time, diff_pair| {
2625                        if let Some(warner) = pk_warner.as_mut() {
2626                            warner.observe(key, time);
2627                        }
2628                        // The arrangement key is only used downstream for grouping and PK checks;
2629                        // `write_data_files` discards it on both the stash and drain paths. Emit
2630                        // None unconditionally to avoid per-`DiffPair` `Row` clones on this hot path.
2631                        output.give(&cap, ((None, diff_pair), time, Diff::ONE));
2632                    });
2633                    // Flush after each batch so the final `(key, time)` group of the walk is
2634                    // resolved immediately — a PK violation in the last group is otherwise held
2635                    // until more data arrives or the operator shuts down.
2636                    if let Some(warner) = pk_warner.as_mut() {
2637                        warner.flush();
2638                    }
2639                }
2640            }
2641        }
2642    });
2643
2644    (stream.as_collection(), button.press_on_drop())
2645}