Skip to main content

mz_storage/sink/
iceberg.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Iceberg sink implementation.
11//!
12//! This code renders a [`IcebergSinkConnection`] into a dataflow that writes
13//! data to an Iceberg table. `SinkRender::render_sink` hands the sink a stream
14//! of arrangement batches keyed on the sink key (the upstream arrangement's
15//! trace reader is already dropped, so the spine is free to compact as
16//! batches flow). A small `walk_sink_arrangement` operator consumes that
17//! stream and emits one `DiffPair` per `(key, timestamp)` update into the
18//! pipeline below.
19//!
20//! ```text
21//!        ┏━━━━━━━━━━━━━━┓
22//!        ┃   persist    ┃
23//!        ┃    source    ┃
24//!        ┗━━━━━━┯━━━━━━━┛
25//!               │ stream of arrangement batches (trace reader dropped)
26//!               │
27//!        ┏━━━━━━v━━━━━━━┓
28//!        ┃    walk      ┃
29//!        ┃ arrangement  ┃ yields individual DiffPairs per (key, timestamp)
30//!        ┗━━━━━━┯━━━━━━━┛
31//!               │ (Option<Row>, DiffPair<Row>) rows
32//!               │
33//!        ┏━━━━━━v━━━━━━━┓
34//!        ┃     mint     ┃ (single worker)
35//!        ┃    batch     ┃ loads/creates the Iceberg table,
36//!        ┃ descriptions ┃ determines resume upper
37//!        ┗━━━┯━━━━━┯━━━━┛
38//!            │     │ batch descriptions (broadcast)
39//!       rows │     ├─────────────────────────┐
40//!            │     │                         │
41//!        ┏━━━v━━━━━v━━━━┓    ╭─────────────╮ │
42//!        ┃    write     ┃───>│ S3 / object │ │
43//!        ┃  data files  ┃    │   storage   │ │
44//!        ┗━━━━━━┯━━━━━━━┛    ╰─────────────╯ │
45//!               │ file metadata              │
46//!               │                            │
47//!        ┏━━━━━━v━━━━━━━━━━━━━━━━━━━━━━━━━━━━v┓
48//!        ┃           commit to                ┃ (single worker)
49//!        ┃             iceberg                ┃
50//!        ┗━━━━━━━━━━━━━┯━━━━━━━━━━━━━━━━━━━━━━┛
51//!                      │
52//!              ╭───────v───────╮
53//!              │ Iceberg table │
54//!              │  (snapshots)  │
55//!              ╰───────────────╯
56//! ```
57//! # Minting batch descriptions
58//! The "mint batch descriptions" operator is responsible for generating
59//! time-based batch boundaries that group writes into Iceberg snapshots.
60//! It maintains a sliding window of future batch descriptions so that
61//! writers can start processing data even while earlier batches are still being written.
62//! Knowing the batch boundaries ahead of time is important because we need to
63//! be able to make the claim that all data files written for a given batch
64//! include all data up to the upper `t` but not beyond it.
65//! This could be trivially achieved by waiting for all data to arrive up to a certain
66//! frontier, but that would prevent us from streaming writes out to object storage
67//! until the entire batch is complete, which would increase latency and reduce throughput.
68//!
69//! # Writing data files
70//! The "write data files" operator receives rows along with batch descriptions.
71//! It matches rows to batches by timestamp; if a batch description hasn't arrived yet,
72//! rows are stashed until it does. This allows batches to be minted ahead of data arrival.
73//! The operator uses an Iceberg `DeltaWriter` to write Parquet data files
74//! (and position delete files if necessary) to object storage.
75//! It outputs metadata about the written files along with their batch descriptions
76//! for the commit operator to consume.
77//!
78//! # Committing to Iceberg
79//! The "commit to iceberg" operator receives metadata about written data files
80//! along with their batch descriptions. It groups files by batch and creates
81//! Iceberg snapshots that include all files for each batch. It updates the Iceberg
82//! table's metadata to reflect the new snapshots, including updating the
83//! `mz-frontier` property to track progress.
84
85use std::cmp::Ordering;
86use std::collections::{BTreeMap, VecDeque};
87use std::convert::Infallible;
88use std::future::Future;
89use std::time::Instant;
90use std::{cell::RefCell, rc::Rc, sync::Arc};
91
92use anyhow::{Context, anyhow};
93use arrow::array::{ArrayRef, Int32Array, Int64Array, RecordBatch};
94use arrow::datatypes::{DataType, Field, Schema as ArrowSchema};
95use differential_dataflow::lattice::Lattice;
96use differential_dataflow::{AsCollection, Hashable, VecCollection};
97use futures::StreamExt;
98use iceberg::ErrorKind;
99use iceberg::arrow::{arrow_schema_to_schema, schema_to_arrow_schema};
100use iceberg::spec::{
101    DataFile, FormatVersion, Snapshot, StructType, read_data_files_from_avro,
102    write_data_files_to_avro,
103};
104use iceberg::spec::{Schema, SchemaRef};
105use iceberg::table::Table;
106use iceberg::transaction::{ApplyTransactionAction, Transaction};
107use iceberg::writer::base_writer::data_file_writer::DataFileWriterBuilder;
108use iceberg::writer::base_writer::equality_delete_writer::{
109    EqualityDeleteFileWriterBuilder, EqualityDeleteWriterConfig,
110};
111use iceberg::writer::base_writer::position_delete_writer::{
112    PositionDeleteFileWriterBuilder, PositionDeleteWriterConfig,
113};
114use iceberg::writer::combined_writer::delta_writer::DeltaWriterBuilder;
115use iceberg::writer::file_writer::ParquetWriterBuilder;
116use iceberg::writer::file_writer::location_generator::{
117    DefaultFileNameGenerator, DefaultLocationGenerator,
118};
119use iceberg::writer::file_writer::rolling_writer::RollingFileWriterBuilder;
120use iceberg::writer::{IcebergWriter, IcebergWriterBuilder};
121use iceberg::{Catalog, NamespaceIdent, TableCreation, TableIdent};
122use itertools::Itertools;
123use mz_arrow_util::builder::{ARROW_EXTENSION_NAME_KEY, ArrowBuilder};
124use mz_interchange::avro::DiffPair;
125use mz_interchange::envelopes::for_each_diff_pair;
126use mz_ore::cast::CastFrom;
127use mz_ore::error::ErrorExt;
128use mz_ore::future::InTask;
129use mz_ore::result::ResultExt;
130use mz_ore::retry::{Retry, RetryResult};
131use mz_persist_client::Diagnostics;
132use mz_persist_client::write::WriteHandle;
133use mz_persist_types::codec_impls::UnitSchema;
134use mz_repr::{Diff, GlobalId, Row, Timestamp};
135use mz_storage_types::StorageDiff;
136use mz_storage_types::configuration::StorageConfiguration;
137use mz_storage_types::controller::CollectionMetadata;
138use mz_storage_types::errors::DataflowError;
139use mz_storage_types::sinks::{
140    IcebergSinkConnection, SinkEnvelope, StorageSinkDesc, iceberg_type_overrides,
141};
142use mz_storage_types::sources::SourceData;
143use mz_timely_util::antichain::AntichainExt;
144use mz_timely_util::builder_async::{Event, OperatorBuilder, PressOnDropButton};
145use parquet::arrow::PARQUET_FIELD_ID_META_KEY;
146use parquet::file::properties::WriterProperties;
147use serde::{Deserialize, Serialize};
148use timely::PartialOrder;
149use timely::container::CapacityContainerBuilder;
150use timely::dataflow::StreamVec;
151use timely::dataflow::channels::pact::{Exchange, Pipeline};
152use timely::dataflow::operators::vec::{Broadcast, Map, ToStream};
153use timely::dataflow::operators::{CapabilitySet, Concatenate};
154use timely::progress::{Antichain, Timestamp as _};
155use tracing::debug;
156
157use crate::healthcheck::{HealthStatusMessage, HealthStatusUpdate, StatusNamespace};
158use crate::metrics::sink::iceberg::IcebergSinkMetrics;
159use crate::render::sinks::{PkViolationWarner, SinkBatchStream, SinkRender};
160use crate::statistics::SinkStatistics;
161use crate::storage_state::StorageState;
162
163/// Set the default capacity for the array builders inside the ArrowBuilder. This is the
164/// number of items each builder can hold before it needs to allocate more memory.
165const DEFAULT_ARRAY_BUILDER_ITEM_CAPACITY: usize = 1024;
166/// Set the default buffer capacity for the string and binary array builders inside the
167/// ArrowBuilder. This is the number of bytes each builder can hold before it needs to allocate
168/// more memory.
169const DEFAULT_ARRAY_BUILDER_DATA_CAPACITY: usize = 1024;
170
171/// The prefix for Parquet files written by this sink.
172const PARQUET_FILE_PREFIX: &str = "mz_data";
173/// The number of batch descriptions to mint ahead of the observed frontier. This determines how
174/// many batches we have in-flight at any given time.
175const INITIAL_DESCRIPTIONS_TO_MINT: u64 = 3;
176
177/// Shared state produced by the async setup in [`write_data_files`] that both
178/// envelope handlers need to construct Parquet writers.
179struct WriterContext {
180    /// Arrow schema for data columns, with Materialize extension metadata merged in.
181    arrow_schema: Arc<ArrowSchema>,
182    /// Iceberg table schema, used to configure Parquet writers.
183    current_schema: Arc<Schema>,
184    /// File I/O for writing Parquet files to object storage.
185    file_io: iceberg::io::FileIO,
186    /// Generates file paths under the table's data directory.
187    location_generator: DefaultLocationGenerator,
188    /// Generates unique file names with a per-worker UUID suffix.
189    file_name_generator: DefaultFileNameGenerator,
190    writer_properties: WriterProperties,
191}
192
193/// Envelope-specific logic for writing Iceberg data files.
194trait EnvelopeHandler: Send {
195    /// Construct from the shared writer context after async setup completes.
196    fn new(
197        ctx: WriterContext,
198        connection: &IcebergSinkConnection,
199        materialize_arrow_schema: &Arc<ArrowSchema>,
200    ) -> anyhow::Result<Self>
201    where
202        Self: Sized;
203
204    /// Create an [`IcebergWriter`] for a new batch.
205    ///
206    /// `is_snapshot` is true for the initial "snapshot" batch (lower == as_of), which
207    /// contains all pre-existing data and can be very large. Implementations may use
208    /// this to disable memory-intensive optimisations like seen-rows deduplication.
209    async fn create_writer(&self, is_snapshot: bool) -> anyhow::Result<Box<dyn IcebergWriter>>;
210
211    fn row_to_batch(&self, diff_pair: DiffPair<Row>, ts: Timestamp) -> anyhow::Result<RecordBatch>;
212}
213
214struct UpsertEnvelopeHandler {
215    ctx: WriterContext,
216    /// Iceberg field IDs of the key columns, used for equality delete files.
217    equality_ids: Vec<i32>,
218    /// Iceberg schema for position delete files.
219    pos_schema: Arc<Schema>,
220    /// Iceberg schema for equality delete files (projected to key columns only).
221    eq_schema: Arc<Schema>,
222    /// Configuration for the equality delete writer (projected schema + column IDs).
223    eq_config: EqualityDeleteWriterConfig,
224    /// Arrow schema with an appended `__op` column that the
225    /// [`DeltaWriter`](iceberg::writer::combined_writer::delta_writer::DeltaWriter)
226    /// uses to distinguish inserts (+1) from deletes (-1).
227    schema_with_op: Arc<ArrowSchema>,
228}
229
230impl EnvelopeHandler for UpsertEnvelopeHandler {
231    fn new(
232        ctx: WriterContext,
233        connection: &IcebergSinkConnection,
234        materialize_arrow_schema: &Arc<ArrowSchema>,
235    ) -> anyhow::Result<Self> {
236        let Some((_, equality_indices)) = &connection.key_desc_and_indices else {
237            return Err(anyhow::anyhow!(
238                "Iceberg sink requires key columns for equality deletes"
239            ));
240        };
241
242        let equality_ids = equality_ids_for_indices(
243            ctx.current_schema.as_ref(),
244            materialize_arrow_schema.as_ref(),
245            equality_indices,
246        )?;
247
248        let pos_arrow_schema = PositionDeleteWriterConfig::arrow_schema();
249        let pos_schema = Arc::new(
250            arrow_schema_to_schema(&pos_arrow_schema)
251                .context("Failed to convert position delete Arrow schema to Iceberg schema")?,
252        );
253
254        let eq_config =
255            EqualityDeleteWriterConfig::new(equality_ids.clone(), Arc::clone(&ctx.current_schema))
256                .context("Failed to create EqualityDeleteWriterConfig")?;
257        let eq_schema = Arc::new(
258            arrow_schema_to_schema(eq_config.projected_arrow_schema_ref())
259                .context("Failed to convert equality delete Arrow schema to Iceberg schema")?,
260        );
261
262        let schema_with_op = Arc::new(build_schema_with_op_column(&ctx.arrow_schema));
263
264        Ok(Self {
265            ctx,
266            equality_ids,
267            pos_schema,
268            eq_schema,
269            eq_config,
270            schema_with_op,
271        })
272    }
273
274    async fn create_writer(&self, is_snapshot: bool) -> anyhow::Result<Box<dyn IcebergWriter>> {
275        let data_parquet_writer = ParquetWriterBuilder::new(
276            self.ctx.writer_properties.clone(),
277            Arc::clone(&self.ctx.current_schema),
278        )
279        .with_arrow_schema(Arc::clone(&self.ctx.arrow_schema))
280        .context("Arrow schema validation failed")?;
281        let data_rolling_writer = RollingFileWriterBuilder::new_with_default_file_size(
282            data_parquet_writer,
283            Arc::clone(&self.ctx.current_schema),
284            self.ctx.file_io.clone(),
285            self.ctx.location_generator.clone(),
286            self.ctx.file_name_generator.clone(),
287        );
288        let data_writer_builder = DataFileWriterBuilder::new(data_rolling_writer);
289
290        let pos_config = PositionDeleteWriterConfig::new(None, 0, None);
291        let pos_parquet_writer = ParquetWriterBuilder::new(
292            self.ctx.writer_properties.clone(),
293            Arc::clone(&self.pos_schema),
294        );
295        let pos_rolling_writer = RollingFileWriterBuilder::new_with_default_file_size(
296            pos_parquet_writer,
297            Arc::clone(&self.ctx.current_schema),
298            self.ctx.file_io.clone(),
299            self.ctx.location_generator.clone(),
300            self.ctx.file_name_generator.clone(),
301        );
302        let pos_delete_writer_builder =
303            PositionDeleteFileWriterBuilder::new(pos_rolling_writer, pos_config);
304
305        let eq_parquet_writer = ParquetWriterBuilder::new(
306            self.ctx.writer_properties.clone(),
307            Arc::clone(&self.eq_schema),
308        );
309        let eq_rolling_writer = RollingFileWriterBuilder::new_with_default_file_size(
310            eq_parquet_writer,
311            Arc::clone(&self.ctx.current_schema),
312            self.ctx.file_io.clone(),
313            self.ctx.location_generator.clone(),
314            self.ctx.file_name_generator.clone(),
315        );
316        let eq_delete_writer_builder =
317            EqualityDeleteFileWriterBuilder::new(eq_rolling_writer, self.eq_config.clone());
318
319        let mut builder = DeltaWriterBuilder::new(
320            data_writer_builder,
321            pos_delete_writer_builder,
322            eq_delete_writer_builder,
323            self.equality_ids.clone(),
324        );
325
326        builder = if is_snapshot {
327            // Snapshot batches only produce inserts, so disable seen_rows tracking to save memory.
328            builder.with_max_seen_rows(0)
329        } else {
330            // For incremental batches, keep all "seen" rows. Do not evict any rows.
331            // The DeltaWriter issues an equality delete if we update (or delete) a row outside the "seen" cache.
332            // But equality deletes only apply to prior snapshots (lower sequence number).
333            //
334            // i.e. The DeltaWriter assumes that rows outside the "seen" cache come from prior snapshots.
335            //
336            // If we insert a row a=foo during this snapshot and then evict it from the "seen" cache,
337            // a subsequent update a=bar (also during this snapshot) will lead to:
338            //   1. Equality delete for a=foo (does nothing because a=foo is from this snapshot, not a prior snapshot)
339            //   2. Insert a=bar
340            // Because the deletion does nothing, we have a=foo and a=bar in the same snapshot.
341            builder.with_max_seen_rows(usize::MAX)
342        };
343
344        Ok(Box::new(
345            builder
346                .build(None)
347                .await
348                .context("Failed to create DeltaWriter")?,
349        ))
350    }
351
352    /// The `__op` column indicates whether each row is an insert (+1) or delete (-1),
353    /// which the DeltaWriter uses to generate the appropriate Iceberg data/delete files.
354    fn row_to_batch(
355        &self,
356        diff_pair: DiffPair<Row>,
357        _ts: Timestamp,
358    ) -> anyhow::Result<RecordBatch> {
359        let mut builder = ArrowBuilder::new_with_schema(
360            Arc::clone(&self.ctx.arrow_schema),
361            DEFAULT_ARRAY_BUILDER_ITEM_CAPACITY,
362            DEFAULT_ARRAY_BUILDER_DATA_CAPACITY,
363        )
364        .context("Failed to create builder")?;
365
366        let mut op_values = Vec::new();
367
368        if let Some(before) = diff_pair.before {
369            builder
370                .add_row(&before)
371                .context("Failed to add delete row to builder")?;
372            op_values.push(-1i32);
373        }
374        if let Some(after) = diff_pair.after {
375            builder
376                .add_row(&after)
377                .context("Failed to add insert row to builder")?;
378            op_values.push(1i32);
379        }
380
381        let batch = builder
382            .to_record_batch()
383            .context("Failed to create record batch")?;
384
385        let mut columns: Vec<ArrayRef> = batch.columns().to_vec();
386        columns.push(Arc::new(Int32Array::from(op_values)));
387
388        RecordBatch::try_new(Arc::clone(&self.schema_with_op), columns)
389            .context("Failed to create batch with op column")
390    }
391}
392
393struct AppendEnvelopeHandler {
394    ctx: WriterContext,
395    /// Arrow schema with only user columns (no `_mz_diff`/`_mz_timestamp`), used by
396    /// [`ArrowBuilder`] to serialize row data before the extra columns are appended.
397    user_schema_for_append: Arc<ArrowSchema>,
398}
399
400impl EnvelopeHandler for AppendEnvelopeHandler {
401    fn new(
402        ctx: WriterContext,
403        _connection: &IcebergSinkConnection,
404        _materialize_arrow_schema: &Arc<ArrowSchema>,
405    ) -> anyhow::Result<Self> {
406        // arrow_schema already includes _mz_diff + _mz_timestamp (added in render_sink); strip
407        // the last two fields so ArrowBuilder only processes the user columns.
408        let n = ctx.arrow_schema.fields().len().saturating_sub(2);
409        let user_schema_for_append =
410            Arc::new(ArrowSchema::new(ctx.arrow_schema.fields()[..n].to_vec()));
411
412        Ok(Self {
413            ctx,
414            user_schema_for_append,
415        })
416    }
417
418    async fn create_writer(&self, _is_snapshot: bool) -> anyhow::Result<Box<dyn IcebergWriter>> {
419        let data_parquet_writer = ParquetWriterBuilder::new(
420            self.ctx.writer_properties.clone(),
421            Arc::clone(&self.ctx.current_schema),
422        )
423        .with_arrow_schema(Arc::clone(&self.ctx.arrow_schema))
424        .context("Arrow schema validation failed")?;
425        let data_rolling_writer = RollingFileWriterBuilder::new_with_default_file_size(
426            data_parquet_writer,
427            Arc::clone(&self.ctx.current_schema),
428            self.ctx.file_io.clone(),
429            self.ctx.location_generator.clone(),
430            self.ctx.file_name_generator.clone(),
431        );
432        Ok(Box::new(
433            DataFileWriterBuilder::new(data_rolling_writer)
434                .build(None)
435                .await
436                .context("Failed to create DataFileWriter")?,
437        ))
438    }
439
440    /// Every change is written as a plain data row: the `before` half (if present) gets
441    /// `_mz_diff = -1` and the `after` half gets `_mz_diff = +1`. Both carry the same `_mz_timestamp`.
442    fn row_to_batch(&self, diff_pair: DiffPair<Row>, ts: Timestamp) -> anyhow::Result<RecordBatch> {
443        let mut builder = ArrowBuilder::new_with_schema(
444            Arc::clone(&self.user_schema_for_append),
445            DEFAULT_ARRAY_BUILDER_ITEM_CAPACITY,
446            DEFAULT_ARRAY_BUILDER_DATA_CAPACITY,
447        )
448        .context("Failed to create builder")?;
449
450        let mut diff_values: Vec<i32> = Vec::new();
451        let ts_i64 = i64::try_from(u64::from(ts)).unwrap_or(i64::MAX);
452
453        if let Some(before) = diff_pair.before {
454            builder
455                .add_row(&before)
456                .context("Failed to add before row to builder")?;
457            diff_values.push(-1i32);
458        }
459        if let Some(after) = diff_pair.after {
460            builder
461                .add_row(&after)
462                .context("Failed to add after row to builder")?;
463            diff_values.push(1i32);
464        }
465
466        let n = diff_values.len();
467        let batch = builder
468            .to_record_batch()
469            .context("Failed to create record batch")?;
470
471        let mut columns: Vec<ArrayRef> = batch.columns().to_vec();
472        columns.push(Arc::new(Int32Array::from(diff_values)));
473        columns.push(Arc::new(Int64Array::from(vec![ts_i64; n])));
474
475        RecordBatch::try_new(Arc::clone(&self.ctx.arrow_schema), columns)
476            .context("Failed to create append record batch")
477    }
478}
479
480/// Add Parquet field IDs to an Arrow schema. Iceberg requires field IDs in the
481/// Parquet metadata for schema evolution tracking. Field IDs are assigned
482/// recursively to all nested fields (structs, lists, maps) using a depth-first,
483/// pre-order traversal.
484fn add_field_ids_to_arrow_schema(schema: ArrowSchema) -> ArrowSchema {
485    let mut next_field_id = 1i32;
486    let fields: Vec<Field> = schema
487        .fields()
488        .iter()
489        .map(|field| add_field_ids_recursive(field, &mut next_field_id))
490        .collect();
491    ArrowSchema::new(fields).with_metadata(schema.metadata().clone())
492}
493
494/// Recursively add field IDs to a field and all its nested children.
495fn add_field_ids_recursive(field: &Field, next_id: &mut i32) -> Field {
496    let current_id = *next_id;
497    *next_id += 1;
498
499    let mut metadata = field.metadata().clone();
500    metadata.insert(
501        PARQUET_FIELD_ID_META_KEY.to_string(),
502        current_id.to_string(),
503    );
504
505    let new_data_type = add_field_ids_to_datatype(field.data_type(), next_id);
506
507    Field::new(field.name(), new_data_type, field.is_nullable()).with_metadata(metadata)
508}
509
510/// Add field IDs to nested fields within a DataType.
511fn add_field_ids_to_datatype(data_type: &DataType, next_id: &mut i32) -> DataType {
512    match data_type {
513        DataType::Struct(fields) => {
514            let new_fields: Vec<Field> = fields
515                .iter()
516                .map(|f| add_field_ids_recursive(f, next_id))
517                .collect();
518            DataType::Struct(new_fields.into())
519        }
520        DataType::List(element_field) => {
521            let new_element = add_field_ids_recursive(element_field, next_id);
522            DataType::List(Arc::new(new_element))
523        }
524        DataType::LargeList(element_field) => {
525            let new_element = add_field_ids_recursive(element_field, next_id);
526            DataType::LargeList(Arc::new(new_element))
527        }
528        DataType::Map(entries_field, sorted) => {
529            let new_entries = add_field_ids_recursive(entries_field, next_id);
530            DataType::Map(Arc::new(new_entries), *sorted)
531        }
532        _ => data_type.clone(),
533    }
534}
535
536/// Merge Materialize extension metadata into Iceberg's Arrow schema.
537/// This uses Iceberg's data types (e.g. Utf8) and field IDs while preserving
538/// Materialize's extension names for ArrowBuilder compatibility.
539/// Handles nested types (structs, lists, maps) recursively.
540fn merge_materialize_metadata_into_iceberg_schema(
541    materialize_arrow_schema: &ArrowSchema,
542    iceberg_schema: &Schema,
543) -> anyhow::Result<ArrowSchema> {
544    // First, convert Iceberg schema to Arrow (this gives us the correct data types)
545    let iceberg_arrow_schema = schema_to_arrow_schema(iceberg_schema)
546        .context("Failed to convert Iceberg schema to Arrow schema")?;
547
548    // Now merge in the Materialize extension metadata
549    let fields: Vec<Field> = iceberg_arrow_schema
550        .fields()
551        .iter()
552        .map(|iceberg_field| {
553            // Find the corresponding Materialize field by name to get extension metadata
554            let mz_field = materialize_arrow_schema
555                .field_with_name(iceberg_field.name())
556                .with_context(|| {
557                    format!(
558                        "Field '{}' not found in Materialize schema",
559                        iceberg_field.name()
560                    )
561                })?;
562
563            merge_field_metadata_recursive(iceberg_field, Some(mz_field))
564        })
565        .collect::<anyhow::Result<Vec<_>>>()?;
566
567    Ok(ArrowSchema::new(fields).with_metadata(iceberg_arrow_schema.metadata().clone()))
568}
569
570/// Recursively merge Materialize extension metadata into an Iceberg field.
571fn merge_field_metadata_recursive(
572    iceberg_field: &Field,
573    mz_field: Option<&Field>,
574) -> anyhow::Result<Field> {
575    // Start with Iceberg field's metadata (which includes field IDs)
576    let mut metadata = iceberg_field.metadata().clone();
577
578    // Add Materialize extension name if available
579    if let Some(mz_f) = mz_field {
580        if let Some(extension_name) = mz_f.metadata().get(ARROW_EXTENSION_NAME_KEY) {
581            metadata.insert(ARROW_EXTENSION_NAME_KEY.to_string(), extension_name.clone());
582        }
583    }
584
585    // Recursively process nested types
586    let new_data_type = match iceberg_field.data_type() {
587        DataType::Struct(iceberg_fields) => {
588            let mz_struct_fields = match mz_field {
589                Some(f) => match f.data_type() {
590                    DataType::Struct(fields) => Some(fields),
591                    other => anyhow::bail!(
592                        "Type mismatch for field '{}': Iceberg schema has Struct, but Materialize schema has {:?}",
593                        iceberg_field.name(),
594                        other
595                    ),
596                },
597                None => None,
598            };
599
600            let new_fields: Vec<Field> = iceberg_fields
601                .iter()
602                .map(|iceberg_inner| {
603                    let mz_inner = mz_struct_fields.and_then(|fields| {
604                        fields.iter().find(|f| f.name() == iceberg_inner.name())
605                    });
606                    merge_field_metadata_recursive(iceberg_inner, mz_inner.map(|f| f.as_ref()))
607                })
608                .collect::<anyhow::Result<Vec<_>>>()?;
609
610            DataType::Struct(new_fields.into())
611        }
612        DataType::List(iceberg_element) => {
613            let mz_element = match mz_field {
614                Some(f) => match f.data_type() {
615                    DataType::List(element) => Some(element.as_ref()),
616                    other => anyhow::bail!(
617                        "Type mismatch for field '{}': Iceberg schema has List, but Materialize schema has {:?}",
618                        iceberg_field.name(),
619                        other
620                    ),
621                },
622                None => None,
623            };
624            let new_element = merge_field_metadata_recursive(iceberg_element, mz_element)?;
625            DataType::List(Arc::new(new_element))
626        }
627        DataType::LargeList(iceberg_element) => {
628            let mz_element = match mz_field {
629                Some(f) => match f.data_type() {
630                    DataType::LargeList(element) => Some(element.as_ref()),
631                    other => anyhow::bail!(
632                        "Type mismatch for field '{}': Iceberg schema has LargeList, but Materialize schema has {:?}",
633                        iceberg_field.name(),
634                        other
635                    ),
636                },
637                None => None,
638            };
639            let new_element = merge_field_metadata_recursive(iceberg_element, mz_element)?;
640            DataType::LargeList(Arc::new(new_element))
641        }
642        DataType::Map(iceberg_entries, sorted) => {
643            let mz_entries = match mz_field {
644                Some(f) => match f.data_type() {
645                    DataType::Map(entries, _) => Some(entries.as_ref()),
646                    other => anyhow::bail!(
647                        "Type mismatch for field '{}': Iceberg schema has Map, but Materialize schema has {:?}",
648                        iceberg_field.name(),
649                        other
650                    ),
651                },
652                None => None,
653            };
654            let new_entries = merge_field_metadata_recursive(iceberg_entries, mz_entries)?;
655            DataType::Map(Arc::new(new_entries), *sorted)
656        }
657        other => other.clone(),
658    };
659
660    Ok(Field::new(
661        iceberg_field.name(),
662        new_data_type,
663        iceberg_field.is_nullable(),
664    )
665    .with_metadata(metadata))
666}
667
668async fn reload_table(
669    catalog: &dyn Catalog,
670    namespace: String,
671    table_name: String,
672    current_table: Table,
673) -> anyhow::Result<Table> {
674    let namespace_ident = NamespaceIdent::new(namespace.clone());
675    let table_ident = TableIdent::new(namespace_ident, table_name.clone());
676    let current_schema = current_table.metadata().current_schema_id();
677    let current_partition_spec = current_table.metadata().default_partition_spec_id();
678
679    match catalog.load_table(&table_ident).await {
680        Ok(table) => {
681            let reloaded_schema = table.metadata().current_schema_id();
682            let reloaded_partition_spec = table.metadata().default_partition_spec_id();
683            if reloaded_schema != current_schema {
684                return Err(anyhow::anyhow!(
685                    "Iceberg table '{}' schema changed during operation but schema evolution isn't supported, expected schema ID {}, got {}",
686                    table_name,
687                    current_schema,
688                    reloaded_schema
689                ));
690            }
691
692            if reloaded_partition_spec != current_partition_spec {
693                return Err(anyhow::anyhow!(
694                    "Iceberg table '{}' partition spec changed during operation but partition spec evolution isn't supported, expected partition spec ID {}, got {}",
695                    table_name,
696                    current_partition_spec,
697                    reloaded_partition_spec
698                ));
699            }
700
701            Ok(table)
702        }
703        Err(err) => Err(err).context("Failed to reload Iceberg table"),
704    }
705}
706
707/// Attempt a single commit of a batch of data files to an Iceberg table.
708/// On conflict or failure, reloads the table and returns a retryable error.
709/// On success, returns the updated table state.
710async fn try_commit_batch(
711    mut table: Table,
712    snapshot_properties: Vec<(String, String)>,
713    data_files: Vec<DataFile>,
714    delete_files: Vec<DataFile>,
715    catalog: &dyn Catalog,
716    conn_namespace: &str,
717    conn_table: &str,
718    sink_version: u64,
719    frontier: &Antichain<Timestamp>,
720    batch_lower: &Antichain<Timestamp>,
721    batch_upper: &Antichain<Timestamp>,
722    metrics: &IcebergSinkMetrics,
723) -> (Table, RetryResult<(), anyhow::Error>) {
724    let tx = Transaction::new(&table);
725    let mut action = tx
726        .row_delta()
727        .set_snapshot_properties(snapshot_properties.into_iter().collect())
728        .with_check_duplicate(false);
729
730    if !data_files.is_empty() || !delete_files.is_empty() {
731        action = action
732            .add_data_files(data_files)
733            .add_delete_files(delete_files);
734    }
735
736    let tx = match action
737        .apply(tx)
738        .context("Failed to apply data file addition to iceberg table transaction")
739    {
740        Ok(tx) => tx,
741        Err(e) => {
742            match reload_table(
743                catalog,
744                conn_namespace.to_string(),
745                conn_table.to_string(),
746                table.clone(),
747            )
748            .await
749            {
750                Ok(reloaded) => table = reloaded,
751                Err(reload_err) => {
752                    return (table, RetryResult::RetryableErr(anyhow!(reload_err)));
753                }
754            }
755            return (
756                table,
757                RetryResult::RetryableErr(anyhow!(
758                    "Failed to apply data file addition to iceberg table transaction: {}",
759                    e
760                )),
761            );
762        }
763    };
764
765    let new_table = tx.commit(catalog).await;
766    match new_table {
767        Err(e) if matches!(e.kind(), ErrorKind::CatalogCommitConflicts) => {
768            metrics.commit_conflicts.inc();
769            match reload_table(
770                catalog,
771                conn_namespace.to_string(),
772                conn_table.to_string(),
773                table.clone(),
774            )
775            .await
776            {
777                Ok(reloaded) => table = reloaded,
778                Err(e) => {
779                    return (table, RetryResult::RetryableErr(anyhow!(e)));
780                }
781            };
782
783            let mut snapshots: Vec<_> = table.metadata().snapshots().cloned().collect();
784            let last = retrieve_upper_from_snapshots(&mut snapshots);
785            let last = match last {
786                Ok(val) => val,
787                Err(e) => {
788                    return (table, RetryResult::RetryableErr(anyhow!(e)));
789                }
790            };
791
792            // Check if another writer has advanced the frontier beyond ours (fencing check)
793            if let Some((last_frontier, last_version)) = last {
794                if last_version > sink_version {
795                    return (
796                        table,
797                        RetryResult::FatalErr(anyhow!(
798                            "Iceberg table '{}' has been modified by another writer \
799                             with version {}. Current sink version: {}. \
800                             Frontiers may be out of sync, aborting to avoid data loss.",
801                            conn_table,
802                            last_version,
803                            sink_version,
804                        )),
805                    );
806                }
807                if PartialOrder::less_equal(frontier, &last_frontier) {
808                    return (
809                        table,
810                        RetryResult::FatalErr(anyhow!(
811                            "Iceberg table '{}' has been modified by another writer. \
812                             Current frontier: {:?}, last frontier: {:?}.",
813                            conn_table,
814                            frontier,
815                            last_frontier,
816                        )),
817                    );
818                }
819            }
820
821            (
822                table,
823                RetryResult::RetryableErr(anyhow!(
824                    "Commit conflict detected when committing batch [{}, {}) \
825                     to Iceberg table '{}.{}'. Retrying...",
826                    batch_lower.pretty(),
827                    batch_upper.pretty(),
828                    conn_namespace,
829                    conn_table
830                )),
831            )
832        }
833        Err(e) => {
834            metrics.commit_failures.inc();
835            (table, RetryResult::RetryableErr(anyhow!(e)))
836        }
837        Ok(new_table) => (new_table, RetryResult::Ok(())),
838    }
839}
840
841/// Load an existing Iceberg table or create it if it doesn't exist.
842async fn load_or_create_table(
843    catalog: &dyn Catalog,
844    namespace: String,
845    table_name: String,
846    schema: &Schema,
847) -> anyhow::Result<iceberg::table::Table> {
848    let namespace_ident = NamespaceIdent::new(namespace.clone());
849    let table_ident = TableIdent::new(namespace_ident.clone(), table_name.clone());
850
851    // Try to load the table first
852    match catalog.load_table(&table_ident).await {
853        Ok(table) => {
854            // Table exists, return it
855            // TODO: Add proper schema evolution/validation to ensure compatibility
856            Ok(table)
857        }
858        Err(err) => {
859            if matches!(err.kind(), ErrorKind::TableNotFound { .. })
860                || err
861                    .message()
862                    .contains("Tried to load a table that does not exist")
863            {
864                // Table doesn't exist, create it
865                // Note: location is not specified, letting the catalog determine the default location
866                // based on its warehouse configuration
867                let table_creation = TableCreation::builder()
868                    .name(table_name.clone())
869                    .schema(schema.clone())
870                    // Use unpartitioned spec by default
871                    // TODO: Consider making partition spec configurable
872                    // .partition_spec(UnboundPartitionSpec::builder().build())
873                    .build();
874
875                catalog
876                    .create_table(&namespace_ident, table_creation)
877                    .await
878                    .with_context(|| {
879                        format!(
880                            "Failed to create Iceberg table '{}' in namespace '{}'",
881                            table_name, namespace
882                        )
883                    })
884            } else {
885                // Some other error occurred
886                Err(err).context("Failed to load Iceberg table")
887            }
888        }
889    }
890}
891
892/// Find the most recent Materialize frontier from Iceberg snapshots.
893/// We store the frontier in snapshot metadata to track where we left off after restarts.
894/// Snapshots with operation="replace" (compactions) don't have our metadata and are skipped.
895/// The input slice will be sorted by sequence number in descending order.
896fn retrieve_upper_from_snapshots(
897    snapshots: &mut [Arc<Snapshot>],
898) -> anyhow::Result<Option<(Antichain<Timestamp>, u64)>> {
899    snapshots.sort_by(|a, b| Ord::cmp(&b.sequence_number(), &a.sequence_number()));
900
901    for snapshot in snapshots {
902        let props = &snapshot.summary().additional_properties;
903        if let (Some(frontier_json), Some(sink_version_str)) =
904            (props.get("mz-frontier"), props.get("mz-sink-version"))
905        {
906            let frontier: Vec<Timestamp> = serde_json::from_str(frontier_json)
907                .context("Failed to deserialize frontier from snapshot properties")?;
908            let frontier = Antichain::from_iter(frontier);
909
910            let sink_version = sink_version_str
911                .parse::<u64>()
912                .context("Failed to parse mz-sink-version from snapshot properties")?;
913
914            return Ok(Some((frontier, sink_version)));
915        }
916        if snapshot.summary().operation.as_str() != "replace" {
917            // This is a bad heuristic, but we have no real other way to identify compactions
918            // right now other than assume they will be the only operation writing "replace" operations.
919            // That means if we find a snapshot with some other operation, but no mz-frontier, we are in an
920            // inconsistent state and have to error out.
921            anyhow::bail!(
922                "Iceberg table is in an inconsistent state: snapshot {} has operation '{}' but is missing 'mz-frontier' property. Schema or partition spec evolution is not supported.",
923                snapshot.snapshot_id(),
924                snapshot.summary().operation.as_str(),
925            );
926        }
927    }
928
929    Ok(None)
930}
931
932/// Convert a Materialize RelationDesc into Arrow and Iceberg schemas.
933///
934/// Returns a tuple of:
935/// - The Arrow schema (with field IDs and Iceberg-compatible types) for writing Parquet files
936/// - The Iceberg schema for table creation/validation
937///
938/// Iceberg doesn't support unsigned integer types, so we use `iceberg_type_overrides`
939/// to map them to compatible types (e.g., UInt64 -> Decimal128(20,0)). The ArrowBuilder
940/// handles the cross-type conversion (Datum::UInt64 -> Decimal128Builder) automatically.
941fn relation_desc_to_iceberg_schema(
942    desc: &mz_repr::RelationDesc,
943) -> anyhow::Result<(ArrowSchema, SchemaRef)> {
944    let arrow_schema =
945        mz_arrow_util::builder::desc_to_schema_with_overrides(desc, iceberg_type_overrides)
946            .context("Failed to convert RelationDesc to Iceberg-compatible Arrow schema")?;
947
948    let arrow_schema_with_ids = add_field_ids_to_arrow_schema(arrow_schema);
949
950    let iceberg_schema = arrow_schema_to_schema(&arrow_schema_with_ids)
951        .context("Failed to convert Arrow schema to Iceberg schema")?;
952
953    Ok((arrow_schema_with_ids, Arc::new(iceberg_schema)))
954}
955
956/// Resolve Materialize key column indexes to Iceberg top-level field IDs.
957///
958/// Iceberg field IDs are assigned recursively, so a top-level column's field ID
959/// is not necessarily `column_index + 1` once nested fields are present.
960fn equality_ids_for_indices(
961    current_schema: &Schema,
962    materialize_arrow_schema: &ArrowSchema,
963    equality_indices: &[usize],
964) -> anyhow::Result<Vec<i32>> {
965    let top_level_fields = current_schema.as_struct();
966
967    equality_indices
968        .iter()
969        .map(|index| {
970            let mz_field = materialize_arrow_schema
971                .fields()
972                .get(*index)
973                .with_context(|| format!("Equality delete key index {index} is out of bounds"))?;
974            let field_name = mz_field.name();
975            let iceberg_field = top_level_fields
976                .field_by_name(field_name)
977                .with_context(|| {
978                    format!(
979                        "Equality delete key column '{}' not found in Iceberg table schema",
980                        field_name
981                    )
982                })?;
983            Ok(iceberg_field.id)
984        })
985        .collect()
986}
987
988/// Build a new Arrow schema by adding an __op column to the existing schema.
989fn build_schema_with_op_column(schema: &ArrowSchema) -> ArrowSchema {
990    let mut fields: Vec<Arc<Field>> = schema.fields().iter().cloned().collect();
991    fields.push(Arc::new(Field::new("__op", DataType::Int32, false)));
992    ArrowSchema::new(fields)
993}
994
995/// Build a new Arrow schema by appending `_mz_diff` (Int32) and `_mz_timestamp` (Int64) columns.
996/// These are user-visible Iceberg columns written in append mode. Parquet field IDs are
997/// assigned sequentially after the existing maximum field ID so the extended schema can
998/// be converted to a valid Iceberg schema via `arrow_schema_to_schema`.
999#[allow(clippy::disallowed_types)]
1000fn build_schema_with_append_columns(schema: &ArrowSchema) -> ArrowSchema {
1001    use mz_storage_types::sinks::{ICEBERG_APPEND_DIFF_COLUMN, ICEBERG_APPEND_TIMESTAMP_COLUMN};
1002    let mut fields: Vec<Arc<Field>> = schema.fields().iter().cloned().collect();
1003    fields.push(Arc::new(Field::new(
1004        ICEBERG_APPEND_DIFF_COLUMN,
1005        DataType::Int32,
1006        false,
1007    )));
1008    fields.push(Arc::new(Field::new(
1009        ICEBERG_APPEND_TIMESTAMP_COLUMN,
1010        DataType::Int64,
1011        false,
1012    )));
1013
1014    add_field_ids_to_arrow_schema(ArrowSchema::new(fields).with_metadata(schema.metadata().clone()))
1015}
1016
1017/// Generate time-based batch boundaries for grouping writes into Iceberg snapshots.
1018/// Batches are minted with configurable windows to balance write efficiency with latency.
1019/// We maintain a sliding window of future batch descriptions so writers can start
1020/// processing data even while earlier batches are still being written.
1021fn mint_batch_descriptions<'scope, D>(
1022    name: String,
1023    sink_id: GlobalId,
1024    input: VecCollection<'scope, Timestamp, D, Diff>,
1025    sink: &StorageSinkDesc<CollectionMetadata, Timestamp>,
1026    connection: IcebergSinkConnection,
1027    storage_configuration: StorageConfiguration,
1028    initial_schema: SchemaRef,
1029) -> (
1030    VecCollection<'scope, Timestamp, D, Diff>,
1031    StreamVec<'scope, Timestamp, (Antichain<Timestamp>, Antichain<Timestamp>)>,
1032    StreamVec<'scope, Timestamp, Infallible>,
1033    StreamVec<'scope, Timestamp, HealthStatusMessage>,
1034    PressOnDropButton,
1035)
1036where
1037    D: Clone + 'static,
1038{
1039    let scope = input.scope();
1040    let name_for_error = name.clone();
1041    let name_for_logging = name.clone();
1042    let mut builder = OperatorBuilder::new(name, scope.clone());
1043    let sink_version = sink.version;
1044
1045    let hashed_id = sink_id.hashed();
1046    let is_active_worker = usize::cast_from(hashed_id) % scope.peers() == scope.index();
1047    let (_, table_ready_stream) = builder.new_output::<CapacityContainerBuilder<Vec<_>>>();
1048    let (output, output_stream) = builder.new_output();
1049    let (batch_desc_output, batch_desc_stream) =
1050        builder.new_output::<CapacityContainerBuilder<Vec<_>>>();
1051    let mut input =
1052        builder.new_input_for_many(input.inner, Pipeline, [&output, &batch_desc_output]);
1053
1054    let as_of = sink.as_of.clone();
1055    let commit_interval = sink
1056        .commit_interval
1057        .expect("the planner should have enforced this")
1058        .clone();
1059
1060    let (button, errors): (_, StreamVec<'scope, Timestamp, Rc<anyhow::Error>>) =
1061        builder.build_fallible(move |caps| {
1062        Box::pin(async move {
1063            let [table_ready_capset, data_capset, capset]: &mut [_; 3] = caps.try_into().unwrap();
1064            *data_capset = CapabilitySet::new();
1065
1066            if !is_active_worker {
1067                *capset = CapabilitySet::new();
1068                *data_capset = CapabilitySet::new();
1069                *table_ready_capset = CapabilitySet::new();
1070                while let Some(event) = input.next().await {
1071                    match event {
1072                        Event::Data([output_cap, _], mut data) => {
1073                            output.give_container(&output_cap, &mut data);
1074                        }
1075                        Event::Progress(_) => {}
1076                    }
1077                }
1078                return Ok(());
1079            }
1080
1081            let catalog = connection
1082                .catalog_connection
1083                .connect(&storage_configuration, InTask::Yes)
1084                .await
1085                .with_context(|| {
1086                    format!(
1087                        "Failed to connect to Iceberg catalog '{}' for table '{}.{}'",
1088                        connection.catalog_connection.uri, connection.namespace, connection.table
1089                    )
1090                })?;
1091
1092            let table = load_or_create_table(
1093                catalog.as_ref(),
1094                connection.namespace.clone(),
1095                connection.table.clone(),
1096                initial_schema.as_ref(),
1097            )
1098            .await?;
1099            debug!(
1100                ?sink_id,
1101                %name_for_logging,
1102                namespace = %connection.namespace,
1103                table = %connection.table,
1104                "iceberg mint loaded/created table"
1105            );
1106
1107            *table_ready_capset = CapabilitySet::new();
1108
1109            let mut snapshots: Vec<_> = table.metadata().snapshots().cloned().collect();
1110            let resume = retrieve_upper_from_snapshots(&mut snapshots)?;
1111            let (resume_upper, resume_version) = match resume {
1112                Some((f, v)) => (f, v),
1113                None => (Antichain::from_elem(Timestamp::minimum()), 0),
1114            };
1115            debug!(
1116                ?sink_id,
1117                %name_for_logging,
1118                resume_upper = %resume_upper.pretty(),
1119                resume_version,
1120                as_of = %as_of.pretty(),
1121                "iceberg mint resume position loaded"
1122            );
1123
1124            // The input has overcompacted if
1125            let overcompacted =
1126                // ..we have made some progress in the past
1127                *resume_upper != [Timestamp::minimum()] &&
1128                // ..but the since frontier is now beyond that
1129                PartialOrder::less_than(&resume_upper, &as_of);
1130
1131            if overcompacted {
1132                let err = format!(
1133                    "{name_for_error}: input compacted past resume upper: as_of {}, resume_upper: {}",
1134                    as_of.pretty(),
1135                    resume_upper.pretty()
1136                );
1137                // This would normally be an assertion but because it can happen after a
1138                // Materialize backup/restore we log an error so that it appears on Sentry but
1139                // leaves the rest of the objects in the cluster unaffected.
1140                return Err(anyhow::anyhow!("{err}"));
1141            };
1142
1143            if resume_version > sink_version {
1144                anyhow::bail!("Fenced off by newer sink version: resume_version {}, sink_version {}", resume_version, sink_version);
1145            }
1146
1147            let mut initialized = false;
1148            let mut observed_frontier;
1149            let mut max_seen_ts: Option<Timestamp> = None;
1150            // Track minted batches to maintain a sliding window of open batch descriptions.
1151            // This is needed to know when to retire old batches and mint new ones.
1152            // It's "sortedness" is derived from the monotonicity of batch descriptions,
1153            // and the fact that we only ever push new descriptions to the back and pop from the front.
1154            let mut minted_batches = VecDeque::new();
1155            loop {
1156                if let Some(event) = input.next().await {
1157                    match event {
1158                        Event::Data([output_cap, _], mut data) => {
1159                            if !initialized {
1160                                for (_, ts, _) in data.iter() {
1161                                    match max_seen_ts.as_mut() {
1162                                        Some(max) => {
1163                                            if max.less_than(ts) {
1164                                                *max = ts.clone();
1165                                            }
1166                                        }
1167                                        None => {
1168                                            max_seen_ts = Some(ts.clone());
1169                                        }
1170                                    }
1171                                }
1172                            }
1173                            output.give_container(&output_cap, &mut data);
1174                            continue;
1175                        }
1176                        Event::Progress(frontier) => {
1177                            observed_frontier = frontier;
1178                        }
1179                    }
1180                } else {
1181                    return Ok(());
1182                }
1183
1184                if !initialized {
1185                    if observed_frontier.is_empty() {
1186                        // Bounded inputs can close (frontier becomes empty) before we finish
1187                        // initialization. For example, a loadgen source configured for a finite
1188                        // dataset may emit all rows at time t and then immediately close. If we
1189                        // saw any data, synthesize an upper one tick past the maximum timestamp
1190                        // so we can mint a snapshot batch and commit it.
1191                        if let Some(max_ts) = max_seen_ts.as_ref() {
1192                            let synthesized_upper =
1193                                Antichain::from_elem(max_ts.step_forward());
1194                            debug!(
1195                                ?sink_id,
1196                                %name_for_logging,
1197                                max_seen_ts = %max_ts,
1198                                synthesized_upper = %synthesized_upper.pretty(),
1199                                "iceberg mint input closed before initialization; using max seen ts"
1200                            );
1201                            observed_frontier = synthesized_upper;
1202                        } else {
1203                            debug!(
1204                                ?sink_id,
1205                                %name_for_logging,
1206                                "iceberg mint input closed before initialization with no data"
1207                            );
1208                            // Input stream closed before initialization completed and no data arrived.
1209                            return Ok(());
1210                        }
1211                    }
1212
1213                    // We only start minting after we've reached as_of and resume_upper to avoid
1214                    // minting batches that would be immediately skipped.
1215                    if PartialOrder::less_than(&observed_frontier, &resume_upper)
1216                        || PartialOrder::less_than(&observed_frontier, &as_of)
1217                    {
1218                        continue;
1219                    }
1220
1221                    let mut batch_descriptions = vec![];
1222                    let mut current_upper = observed_frontier.clone();
1223                    let current_upper_ts = current_upper.as_option().expect("frontier not empty").clone();
1224
1225                    // Create a catch-up batch from the later of resume_upper or as_of to current frontier.
1226                    // We use the later of the two because:
1227                    // - For fresh sinks: resume_upper = minimum, as_of = actual timestamp, data starts at as_of
1228                    // - For resuming: as_of <= resume_upper (enforced by overcompaction check), data starts at resume_upper
1229                    let batch_lower = if PartialOrder::less_than(&resume_upper, &as_of) {
1230                        as_of.clone()
1231                    } else {
1232                        resume_upper.clone()
1233                    };
1234
1235                    if batch_lower == current_upper {
1236                        // Snapshot! as_of is exactly at the frontier. We still need to mint
1237                        // a batch to create the snapshot, so we step the upper forward by one.
1238                        current_upper = Antichain::from_elem(current_upper_ts.step_forward());
1239                    }
1240
1241                    let batch_description = (batch_lower.clone(), current_upper.clone());
1242                    debug!(
1243                        ?sink_id,
1244                        %name_for_logging,
1245                        batch_lower = %batch_lower.pretty(),
1246                        current_upper = %current_upper.pretty(),
1247                        "iceberg mint initializing (catch-up batch)"
1248                    );
1249                    debug!(
1250                        "{}: creating catch-up batch [{}, {})",
1251                        name_for_logging,
1252                        batch_lower.pretty(),
1253                        current_upper.pretty()
1254                    );
1255                    batch_descriptions.push(batch_description);
1256                    // Mint initial future batch descriptions at configurable intervals
1257                    for i in 1..INITIAL_DESCRIPTIONS_TO_MINT {
1258                        let duration_millis = commit_interval.as_millis()
1259                            .checked_mul(u128::from(i))
1260                            .expect("commit interval multiplication overflow");
1261                        let duration_ts = Timestamp::new(
1262                            u64::try_from(duration_millis)
1263                                .expect("commit interval too large for u64"),
1264                        );
1265                        let desired_batch_upper = Antichain::from_elem(
1266                            current_upper_ts.step_forward_by(&duration_ts),
1267                        );
1268
1269                        let batch_description =
1270                            (current_upper.clone(), desired_batch_upper.clone());
1271                        debug!(
1272                            "{}: minting future batch {}/{} [{}, {})",
1273                            name_for_logging,
1274                            i,
1275                            INITIAL_DESCRIPTIONS_TO_MINT,
1276                            current_upper.pretty(),
1277                            desired_batch_upper.pretty()
1278                        );
1279                        current_upper = batch_description.1.clone();
1280                        batch_descriptions.push(batch_description);
1281                    }
1282
1283                    minted_batches.extend(batch_descriptions.clone());
1284
1285                    for desc in batch_descriptions {
1286                        batch_desc_output.give(&capset[0], desc);
1287                    }
1288
1289                    capset.downgrade(current_upper);
1290
1291                    initialized = true;
1292                } else {
1293                    if observed_frontier.is_empty() {
1294                        // We're done!
1295                        return Ok(());
1296                    }
1297                    // Maintain a sliding window: when the oldest batch becomes ready, retire it
1298                    // and mint a new future batch to keep the pipeline full
1299                    while let Some(oldest_desc) = minted_batches.front() {
1300                        let oldest_upper = &oldest_desc.1;
1301                        if !PartialOrder::less_equal(oldest_upper, &observed_frontier) {
1302                            break;
1303                        }
1304
1305                        let newest_upper = minted_batches.back().unwrap().1.clone();
1306                        let new_lower = newest_upper.clone();
1307                        let duration_ts = Timestamp::new(commit_interval.as_millis()
1308                            .try_into()
1309                            .expect("commit interval too large for u64"));
1310                        let new_upper = Antichain::from_elem(newest_upper
1311                            .as_option()
1312                            .unwrap()
1313                            .step_forward_by(&duration_ts));
1314
1315                        let new_batch_description = (new_lower.clone(), new_upper.clone());
1316                        minted_batches.pop_front();
1317                        minted_batches.push_back(new_batch_description.clone());
1318
1319                        batch_desc_output.give(&capset[0], new_batch_description);
1320
1321                        capset.downgrade(new_upper);
1322                    }
1323                }
1324            }
1325        })
1326    });
1327
1328    let statuses = errors.map(|error| HealthStatusMessage {
1329        id: None,
1330        update: HealthStatusUpdate::halting(format!("{}", error.display_with_causes()), None),
1331        namespace: StatusNamespace::Iceberg,
1332    });
1333    (
1334        output_stream.as_collection(),
1335        batch_desc_stream,
1336        table_ready_stream,
1337        statuses,
1338        button.press_on_drop(),
1339    )
1340}
1341
1342#[derive(Clone, Debug, Serialize, Deserialize)]
1343#[serde(try_from = "AvroDataFile", into = "AvroDataFile")]
1344struct SerializableDataFile {
1345    pub data_file: DataFile,
1346    pub schema: Schema,
1347}
1348
1349/// A wrapper around Iceberg's DataFile that implements Serialize and Deserialize.
1350/// This is slightly complicated by the fact that Iceberg's DataFile doesn't implement
1351/// these traits directly, so we serialize to/from Avro bytes (which Iceberg supports natively).
1352/// The avro ser(de) also requires the Iceberg schema to be provided, so we include that as well.
1353/// It is distinctly possible that this is overkill, but it avoids re-implementing
1354/// Iceberg's serialization logic here.
1355/// If at some point this becomes a serious overhead, we can revisit this decision.
1356#[derive(Clone, Debug, Serialize, Deserialize)]
1357struct AvroDataFile {
1358    pub data_file: Vec<u8>,
1359    /// Schema serialized as JSON bytes to avoid bincode issues with HashMap
1360    pub schema: Vec<u8>,
1361}
1362
1363impl From<SerializableDataFile> for AvroDataFile {
1364    fn from(value: SerializableDataFile) -> Self {
1365        let mut data_file = Vec::new();
1366        write_data_files_to_avro(
1367            &mut data_file,
1368            [value.data_file],
1369            &StructType::new(vec![]),
1370            FormatVersion::V2,
1371        )
1372        .expect("serialization into buffer");
1373        let schema = serde_json::to_vec(&value.schema).expect("schema serialization");
1374        AvroDataFile { data_file, schema }
1375    }
1376}
1377
1378impl TryFrom<AvroDataFile> for SerializableDataFile {
1379    type Error = String;
1380
1381    fn try_from(value: AvroDataFile) -> Result<Self, Self::Error> {
1382        let schema: Schema = serde_json::from_slice(&value.schema)
1383            .map_err(|e| format!("Failed to deserialize schema: {}", e))?;
1384        let data_files = read_data_files_from_avro(
1385            &mut &*value.data_file,
1386            &schema,
1387            0,
1388            &StructType::new(vec![]),
1389            FormatVersion::V2,
1390        )
1391        .map_err_to_string_with_causes()?;
1392        let Some(data_file) = data_files.into_iter().next() else {
1393            return Err("No DataFile found in Avro data".into());
1394        };
1395        Ok(SerializableDataFile { data_file, schema })
1396    }
1397}
1398
1399/// A DataFile along with its associated batch description (lower and upper bounds).
1400#[derive(Clone, Debug, Serialize, Deserialize)]
1401struct BoundedDataFile {
1402    pub data_file: SerializableDataFile,
1403    pub batch_desc: (Antichain<Timestamp>, Antichain<Timestamp>),
1404}
1405
1406impl BoundedDataFile {
1407    pub fn new(
1408        file: DataFile,
1409        schema: Schema,
1410        batch_desc: (Antichain<Timestamp>, Antichain<Timestamp>),
1411    ) -> Self {
1412        Self {
1413            data_file: SerializableDataFile {
1414                data_file: file,
1415                schema,
1416            },
1417            batch_desc,
1418        }
1419    }
1420
1421    pub fn batch_desc(&self) -> &(Antichain<Timestamp>, Antichain<Timestamp>) {
1422        &self.batch_desc
1423    }
1424
1425    pub fn data_file(&self) -> &DataFile {
1426        &self.data_file.data_file
1427    }
1428
1429    pub fn into_data_file(self) -> DataFile {
1430        self.data_file.data_file
1431    }
1432}
1433
1434/// A set of DataFiles along with their associated batch descriptions.
1435#[derive(Clone, Debug, Default)]
1436struct BoundedDataFileSet {
1437    pub data_files: Vec<BoundedDataFile>,
1438}
1439
1440/// Construct the envelope-specific closures that [`write_data_files`] needs.
1441///
1442/// Write rows into Parquet data files bounded by batch descriptions.
1443/// Rows are matched to batches by timestamp; if a batch description hasn't arrived yet,
1444/// rows are stashed until it does. This allows batches to be minted ahead of data arrival.
1445fn write_data_files<'scope, H: EnvelopeHandler + 'static>(
1446    name: String,
1447    input: VecCollection<'scope, Timestamp, (Option<Row>, DiffPair<Row>), Diff>,
1448    batch_desc_input: StreamVec<'scope, Timestamp, (Antichain<Timestamp>, Antichain<Timestamp>)>,
1449    table_ready_stream: StreamVec<'scope, Timestamp, Infallible>,
1450    as_of: Antichain<Timestamp>,
1451    connection: IcebergSinkConnection,
1452    storage_configuration: StorageConfiguration,
1453    materialize_arrow_schema: Arc<ArrowSchema>,
1454    metrics: Arc<IcebergSinkMetrics>,
1455    statistics: SinkStatistics,
1456) -> (
1457    StreamVec<'scope, Timestamp, BoundedDataFile>,
1458    StreamVec<'scope, Timestamp, HealthStatusMessage>,
1459    PressOnDropButton,
1460) {
1461    let scope = input.scope();
1462    let name_for_logging = name.clone();
1463    let mut builder = OperatorBuilder::new(name, scope.clone());
1464
1465    let (output, output_stream) = builder.new_output::<CapacityContainerBuilder<_>>();
1466
1467    let mut table_ready_input = builder.new_disconnected_input(table_ready_stream, Pipeline);
1468    let mut batch_desc_input =
1469        builder.new_input_for(batch_desc_input.broadcast(), Pipeline, &output);
1470    let mut input = builder.new_disconnected_input(input.inner, Pipeline);
1471
1472    let (button, errors) = builder.build_fallible(move |caps| {
1473        Box::pin(async move {
1474            let [capset]: &mut [_; 1] = caps.try_into().unwrap();
1475            let catalog = connection
1476                .catalog_connection
1477                .connect(&storage_configuration, InTask::Yes)
1478                .await
1479                .with_context(|| {
1480                    format!(
1481                        "Failed to connect to Iceberg catalog '{}' for table '{}.{}'",
1482                        connection.catalog_connection.uri, connection.namespace, connection.table
1483                    )
1484                })?;
1485
1486            let namespace_ident = NamespaceIdent::new(connection.namespace.clone());
1487            let table_ident = TableIdent::new(namespace_ident, connection.table.clone());
1488            while let Some(_) = table_ready_input.next().await {
1489                // Wait for table to be ready
1490            }
1491            let table = catalog
1492                .load_table(&table_ident)
1493                .await
1494                .with_context(|| {
1495                    format!(
1496                        "Failed to load Iceberg table '{}.{}' in write_data_files operator",
1497                        connection.namespace, connection.table
1498                    )
1499                })?;
1500
1501            let table_metadata = table.metadata().clone();
1502            let current_schema = Arc::clone(table_metadata.current_schema());
1503
1504            // Merge Materialize extension metadata into the Iceberg schema.
1505            // We need extension metadata for ArrowBuilder to work correctly (it uses
1506            // extension names to know how to handle different types like records vs arrays).
1507            let arrow_schema = Arc::new(
1508                merge_materialize_metadata_into_iceberg_schema(
1509                    materialize_arrow_schema.as_ref(),
1510                    current_schema.as_ref(),
1511                )
1512                .context("Failed to merge Materialize metadata into Iceberg schema")?,
1513            );
1514
1515            // WORKAROUND: S3 Tables catalog incorrectly sets location to the metadata file path
1516            // instead of the warehouse root. Strip off the /metadata/*.metadata.json suffix.
1517            // No clear way to detect this properly right now, so we use heuristics.
1518            let location = table_metadata.location();
1519            let corrected_location = match location.rsplit_once("/metadata/") {
1520                Some((a, b)) if b.ends_with(".metadata.json") => a,
1521                _ => location,
1522            };
1523
1524            let data_location = format!("{}/data", corrected_location);
1525            let location_generator = DefaultLocationGenerator::with_data_location(data_location);
1526
1527            // Add a unique suffix to avoid filename collisions across restarts and workers
1528            let unique_suffix = format!("-{}", uuid::Uuid::new_v4());
1529            let file_name_generator = DefaultFileNameGenerator::new(
1530                PARQUET_FILE_PREFIX.to_string(),
1531                Some(unique_suffix),
1532                iceberg::spec::DataFileFormat::Parquet,
1533            );
1534
1535            let file_io = table.file_io().clone();
1536
1537            let writer_properties = WriterProperties::new();
1538
1539            let ctx = WriterContext {
1540                arrow_schema,
1541                current_schema: Arc::clone(&current_schema),
1542                file_io,
1543                location_generator,
1544                file_name_generator,
1545                writer_properties,
1546            };
1547            let handler = H::new(ctx, &connection, &materialize_arrow_schema)?;
1548
1549            // Rows can arrive before their batch description due to dataflow parallelism.
1550            // Stash them until we know which batch they belong to.
1551            let mut stashed_rows: BTreeMap<Timestamp, Vec<(Option<Row>, DiffPair<Row>)>> =
1552                BTreeMap::new();
1553
1554            // Track batches currently being written. When a row arrives, we check if it belongs
1555            // to an in-flight batch. When frontiers advance to a batch's upper, we close the
1556            // writer and emit its data files downstream.
1557            // Antichains don't implement Ord, so we use a HashMap with tuple keys instead.
1558            #[allow(clippy::disallowed_types)]
1559            let mut in_flight_batches: std::collections::HashMap<
1560                (Antichain<Timestamp>, Antichain<Timestamp>),
1561                Box<dyn IcebergWriter>,
1562            > = std::collections::HashMap::new();
1563
1564            let mut batch_description_frontier = Antichain::from_elem(Timestamp::minimum());
1565            let mut processed_batch_description_frontier =
1566                Antichain::from_elem(Timestamp::minimum());
1567            let mut input_frontier = Antichain::from_elem(Timestamp::minimum());
1568            let mut processed_input_frontier = Antichain::from_elem(Timestamp::minimum());
1569
1570            // Track the minimum batch lower bound to prune data that's already committed
1571            let mut min_batch_lower: Option<Antichain<Timestamp>> = None;
1572
1573            while !(batch_description_frontier.is_empty() && input_frontier.is_empty()) {
1574                let mut staged_messages_since_flush: u64 = 0;
1575                tokio::select! {
1576                    _ = batch_desc_input.ready() => {},
1577                    _ = input.ready() => {}
1578                }
1579
1580                while let Some(event) = batch_desc_input.next_sync() {
1581                    match event {
1582                        Event::Data(_cap, data) => {
1583                            for batch_desc in data {
1584                                let (lower, upper) = &batch_desc;
1585
1586                                // Track the minimum batch lower bound (first batch received)
1587                                if min_batch_lower.is_none() {
1588                                    min_batch_lower = Some(lower.clone());
1589                                    debug!(
1590                                        "{}: set min_batch_lower to {}",
1591                                        name_for_logging,
1592                                        lower.pretty()
1593                                    );
1594
1595                                    // Prune any stashed rows that arrived before min_batch_lower (already committed)
1596                                    let to_remove: Vec<_> = stashed_rows
1597                                        .keys()
1598                                        .filter(|ts| {
1599                                            let ts_antichain = Antichain::from_elem((*ts).clone());
1600                                            PartialOrder::less_than(&ts_antichain, lower)
1601                                        })
1602                                        .cloned()
1603                                        .collect();
1604
1605                                    if !to_remove.is_empty() {
1606                                        let mut removed_count = 0;
1607                                        for ts in to_remove {
1608                                            if let Some(rows) = stashed_rows.remove(&ts) {
1609                                                removed_count += rows.len();
1610                                                for _ in &rows {
1611                                                    metrics.stashed_rows.dec();
1612                                                }
1613                                            }
1614                                        }
1615                                        debug!(
1616                                            "{}: pruned {} already-committed rows (< min_batch_lower)",
1617                                            name_for_logging,
1618                                            removed_count
1619                                        );
1620                                    }
1621                                }
1622
1623                                // Disable seen_rows tracking for snapshot batch to save memory
1624                                let is_snapshot = lower == &as_of;
1625                                debug!(
1626                                    "{}: received batch description [{}, {}), snapshot={}",
1627                                    name_for_logging,
1628                                    lower.pretty(),
1629                                    upper.pretty(),
1630                                    is_snapshot
1631                                );
1632                                let mut batch_writer =
1633                                    handler.create_writer(is_snapshot).await?;
1634                                // Drain any stashed rows that belong to this batch
1635                                let row_ts_keys: Vec<_> = stashed_rows.keys().cloned().collect();
1636                                let mut drained_count = 0;
1637                                for row_ts in row_ts_keys {
1638                                    let ts = Antichain::from_elem(row_ts.clone());
1639                                    if PartialOrder::less_equal(lower, &ts)
1640                                        && PartialOrder::less_than(&ts, upper)
1641                                    {
1642                                        if let Some(rows) = stashed_rows.remove(&row_ts) {
1643                                            drained_count += rows.len();
1644                                            for (_row, diff_pair) in rows {
1645                                                metrics.stashed_rows.dec();
1646                                                let record_batch = handler.row_to_batch(
1647                                                    diff_pair.clone(),
1648                                                    row_ts.clone(),
1649                                                )
1650                                                .context("failed to convert row to recordbatch")?;
1651                                                batch_writer.write(record_batch).await?;
1652                                                staged_messages_since_flush += 1;
1653                                                if staged_messages_since_flush >= 10_000 {
1654                                                    statistics.inc_messages_staged_by(
1655                                                        staged_messages_since_flush,
1656                                                    );
1657                                                    staged_messages_since_flush = 0;
1658                                                }
1659                                            }
1660                                        }
1661                                    }
1662                                }
1663                                if drained_count > 0 {
1664                                    debug!(
1665                                        "{}: drained {} stashed rows into batch [{}, {})",
1666                                        name_for_logging,
1667                                        drained_count,
1668                                        lower.pretty(),
1669                                        upper.pretty()
1670                                    );
1671                                }
1672                                let prev =
1673                                    in_flight_batches.insert(batch_desc.clone(), batch_writer);
1674                                if prev.is_some() {
1675                                    anyhow::bail!(
1676                                        "Duplicate batch description received for description {:?}",
1677                                        batch_desc
1678                                    );
1679                                }
1680                            }
1681                        }
1682                        Event::Progress(frontier) => {
1683                            batch_description_frontier = frontier;
1684                        }
1685                    }
1686                }
1687
1688                let ready_events = std::iter::from_fn(|| input.next_sync()).collect_vec();
1689                for event in ready_events {
1690                    match event {
1691                        Event::Data(_cap, data) => {
1692                            let mut dropped_per_time = BTreeMap::new();
1693                            let mut stashed_per_time = BTreeMap::new();
1694                            for ((row, diff_pair), ts, _diff) in data {
1695                                let row_ts = ts.clone();
1696                                let ts_antichain = Antichain::from_elem(row_ts.clone());
1697                                let mut written = false;
1698                                // Try writing the row to any in-flight batch it belongs to...
1699                                for (batch_desc, batch_writer) in in_flight_batches.iter_mut() {
1700                                    let (lower, upper) = batch_desc;
1701                                    if PartialOrder::less_equal(lower, &ts_antichain)
1702                                        && PartialOrder::less_than(&ts_antichain, upper)
1703                                    {
1704                                        let record_batch = handler.row_to_batch(
1705                                            diff_pair.clone(),
1706                                            row_ts.clone(),
1707                                        )
1708                                        .context("failed to convert row to recordbatch")?;
1709                                        batch_writer.write(record_batch).await?;
1710                                        staged_messages_since_flush += 1;
1711                                        if staged_messages_since_flush >= 10_000 {
1712                                            statistics.inc_messages_staged_by(
1713                                                staged_messages_since_flush,
1714                                            );
1715                                            staged_messages_since_flush = 0;
1716                                        }
1717                                        written = true;
1718                                        break;
1719                                    }
1720                                }
1721                                if !written {
1722                                    // Drop data that's before the first batch we received (already committed)
1723                                    if let Some(ref min_lower) = min_batch_lower {
1724                                        if PartialOrder::less_than(&ts_antichain, min_lower) {
1725                                            dropped_per_time
1726                                                .entry(ts_antichain.into_option().unwrap())
1727                                                .and_modify(|c| *c += 1)
1728                                                .or_insert(1);
1729                                            continue;
1730                                        }
1731                                    }
1732
1733                                    stashed_per_time.entry(ts).and_modify(|c| *c += 1).or_insert(1);
1734                                    let entry = stashed_rows.entry(row_ts).or_default();
1735                                    metrics.stashed_rows.inc();
1736                                    entry.push((row, diff_pair));
1737                                }
1738                            }
1739
1740                            for (ts, count) in dropped_per_time {
1741                                debug!(
1742                                    "{}: dropped {} rows at timestamp {} (< min_batch_lower, already committed)",
1743                                    name_for_logging, count, ts
1744                                );
1745                            }
1746
1747                            for (ts, count) in stashed_per_time {
1748                                debug!(
1749                                    "{}: stashed {} rows at timestamp {} (waiting for batch description)",
1750                                    name_for_logging, count, ts
1751                                );
1752                            }
1753                        }
1754                        Event::Progress(frontier) => {
1755                            input_frontier = frontier;
1756                        }
1757                    }
1758                }
1759                if staged_messages_since_flush > 0 {
1760                    statistics.inc_messages_staged_by(staged_messages_since_flush);
1761                }
1762
1763                // Check if frontiers have advanced, which may unlock batches ready to close
1764                if PartialOrder::less_than(
1765                    &processed_batch_description_frontier,
1766                    &batch_description_frontier,
1767                ) || PartialOrder::less_than(&processed_input_frontier, &input_frontier)
1768                {
1769                    // Close batches whose upper is now in the past
1770                    // Upper bounds are exclusive, so we check if upper is less_equal to the frontier.
1771                    // Remember: a frontier at x means all timestamps less than x have been observed.
1772                    // Or, in other words we still might yet see timestamps at [x, infinity). X itself will
1773                    // be covered by the _next_ batches lower inclusive bound, so we can safely close the batch if its upper is <= x.
1774                    let ready_batches: Vec<_> = in_flight_batches
1775                        .extract_if(|(lower, upper), _| {
1776                            PartialOrder::less_than(lower, &batch_description_frontier)
1777                                && PartialOrder::less_equal(upper, &input_frontier)
1778                        })
1779                        .collect();
1780
1781                    if !ready_batches.is_empty() {
1782                        debug!(
1783                            "{}: closing {} batches (batch_frontier: {}, input_frontier: {})",
1784                            name_for_logging,
1785                            ready_batches.len(),
1786                            batch_description_frontier.pretty(),
1787                            input_frontier.pretty()
1788                        );
1789                        let mut max_upper = Antichain::from_elem(Timestamp::minimum());
1790                        for (desc, mut batch_writer) in ready_batches {
1791                            let close_started_at = Instant::now();
1792                            let data_files = batch_writer.close().await;
1793                            metrics
1794                                .writer_close_duration_seconds
1795                                .observe(close_started_at.elapsed().as_secs_f64());
1796                            let data_files = data_files.context("Failed to close batch writer")?;
1797                            debug!(
1798                                "{}: closed batch [{}, {}), wrote {} files",
1799                                name_for_logging,
1800                                desc.0.pretty(),
1801                                desc.1.pretty(),
1802                                data_files.len()
1803                            );
1804                            for data_file in data_files {
1805                                match data_file.content_type() {
1806                                    iceberg::spec::DataContentType::Data => {
1807                                        metrics.data_files_written.inc();
1808                                    }
1809                                    iceberg::spec::DataContentType::PositionDeletes
1810                                    | iceberg::spec::DataContentType::EqualityDeletes => {
1811                                        metrics.delete_files_written.inc();
1812                                    }
1813                                }
1814                                statistics.inc_messages_staged_by(data_file.record_count());
1815                                statistics.inc_bytes_staged_by(data_file.file_size_in_bytes());
1816                                let file = BoundedDataFile::new(
1817                                    data_file,
1818                                    current_schema.as_ref().clone(),
1819                                    desc.clone(),
1820                                );
1821                                output.give(&capset[0], file);
1822                            }
1823
1824                            max_upper = max_upper.join(&desc.1);
1825                        }
1826
1827                        capset.downgrade(max_upper);
1828                    }
1829                    processed_batch_description_frontier.clone_from(&batch_description_frontier);
1830                    processed_input_frontier.clone_from(&input_frontier);
1831                }
1832            }
1833            Ok(())
1834        })
1835    });
1836
1837    let statuses = errors.map(|error| HealthStatusMessage {
1838        id: None,
1839        update: HealthStatusUpdate::halting(format!("{}", error.display_with_causes()), None),
1840        namespace: StatusNamespace::Iceberg,
1841    });
1842    (output_stream, statuses, button.press_on_drop())
1843}
1844
1845#[cfg(test)]
1846mod tests {
1847    use super::*;
1848    use iceberg::spec::{PrimitiveType, Type};
1849    use mz_repr::SqlScalarType;
1850    use mz_storage_types::sinks::ICEBERG_UINT64_DECIMAL_PRECISION;
1851
1852    #[mz_ore::test]
1853    fn test_iceberg_type_overrides() {
1854        // UInt16 should override to Int32
1855        let result = iceberg_type_overrides(&SqlScalarType::UInt16);
1856        assert_eq!(result.unwrap().0, DataType::Int32);
1857
1858        // UInt32 should override to Int64
1859        let result = iceberg_type_overrides(&SqlScalarType::UInt32);
1860        assert_eq!(result.unwrap().0, DataType::Int64);
1861
1862        // UInt64 should override to Decimal128(20, 0)
1863        let result = iceberg_type_overrides(&SqlScalarType::UInt64);
1864        assert_eq!(
1865            result.unwrap().0,
1866            DataType::Decimal128(ICEBERG_UINT64_DECIMAL_PRECISION, 0)
1867        );
1868
1869        // MzTimestamp should override to Decimal128(20, 0)
1870        let result = iceberg_type_overrides(&SqlScalarType::MzTimestamp);
1871        assert_eq!(
1872            result.unwrap().0,
1873            DataType::Decimal128(ICEBERG_UINT64_DECIMAL_PRECISION, 0)
1874        );
1875
1876        // Other types should return None (use default)
1877        assert!(iceberg_type_overrides(&SqlScalarType::Int32).is_none());
1878        assert!(iceberg_type_overrides(&SqlScalarType::String).is_none());
1879        assert!(iceberg_type_overrides(&SqlScalarType::Bool).is_none());
1880    }
1881
1882    #[mz_ore::test]
1883    fn test_iceberg_schema_with_nested_uint64() {
1884        // Test that desc_to_schema_with_overrides handles nested UInt64
1885        // by using iceberg_type_overrides which applies recursively
1886        let desc = mz_repr::RelationDesc::builder()
1887            .with_column(
1888                "items",
1889                SqlScalarType::List {
1890                    element_type: Box::new(SqlScalarType::UInt64),
1891                    custom_id: None,
1892                }
1893                .nullable(true),
1894            )
1895            .finish();
1896
1897        let schema =
1898            mz_arrow_util::builder::desc_to_schema_with_overrides(&desc, iceberg_type_overrides)
1899                .expect("schema conversion should succeed");
1900
1901        // The inner element should be Decimal128, not UInt64
1902        if let DataType::List(field) = schema.field(0).data_type() {
1903            assert_eq!(
1904                field.data_type(),
1905                &DataType::Decimal128(ICEBERG_UINT64_DECIMAL_PRECISION, 0)
1906            );
1907        } else {
1908            panic!("Expected List type");
1909        }
1910    }
1911
1912    #[mz_ore::test]
1913    fn test_iceberg_interval_override() {
1914        // Interval should override to LargeUtf8 (string) for Iceberg
1915        let result = iceberg_type_overrides(&SqlScalarType::Interval);
1916        assert_eq!(result.unwrap().0, DataType::LargeUtf8);
1917
1918        // Test full schema conversion with interval column
1919        let desc = mz_repr::RelationDesc::builder()
1920            .with_column("id", SqlScalarType::Int32.nullable(false))
1921            .with_column("dur", SqlScalarType::Interval.nullable(true))
1922            .finish();
1923
1924        let (arrow_schema, iceberg_schema) =
1925            relation_desc_to_iceberg_schema(&desc).expect("schema conversion should succeed");
1926
1927        // Arrow schema should have LargeUtf8 for interval
1928        assert_eq!(arrow_schema.field(1).data_type(), &DataType::LargeUtf8);
1929
1930        // Iceberg schema should have String type
1931        let field = iceberg_schema
1932            .as_struct()
1933            .field_by_name("dur")
1934            .expect("field should exist");
1935        assert_eq!(*field.field_type, Type::Primitive(PrimitiveType::String));
1936    }
1937
1938    #[mz_ore::test]
1939    fn test_iceberg_range_schema() {
1940        // Test full schema conversion with range column
1941        let desc = mz_repr::RelationDesc::builder()
1942            .with_column("id", SqlScalarType::Int32.nullable(false))
1943            .with_column(
1944                "r",
1945                SqlScalarType::Range {
1946                    element_type: Box::new(SqlScalarType::Int32),
1947                }
1948                .nullable(true),
1949            )
1950            .finish();
1951
1952        let (_arrow_schema, iceberg_schema) =
1953            relation_desc_to_iceberg_schema(&desc).expect("schema conversion should succeed");
1954
1955        // Iceberg schema should have a struct type for the range
1956        let field = iceberg_schema
1957            .as_struct()
1958            .field_by_name("r")
1959            .expect("field should exist");
1960        assert!(
1961            matches!(&*field.field_type, Type::Struct(_)),
1962            "range should be struct, got: {:?}",
1963            field.field_type
1964        );
1965    }
1966
1967    #[mz_ore::test]
1968    fn equality_ids_follow_iceberg_field_ids() {
1969        let map_entries = Field::new(
1970            "entries",
1971            DataType::Struct(
1972                vec![
1973                    Field::new("key", DataType::Utf8, false),
1974                    Field::new("value", DataType::Utf8, true),
1975                ]
1976                .into(),
1977            ),
1978            false,
1979        );
1980        let materialize_arrow_schema = ArrowSchema::new(vec![
1981            Field::new("attrs", DataType::Map(Arc::new(map_entries), false), true),
1982            Field::new("key_col", DataType::Int32, false),
1983        ]);
1984        let materialize_arrow_schema = add_field_ids_to_arrow_schema(materialize_arrow_schema);
1985        let iceberg_schema = arrow_schema_to_schema(&materialize_arrow_schema)
1986            .expect("schema conversion should succeed");
1987
1988        let equality_ids =
1989            equality_ids_for_indices(&iceberg_schema, &materialize_arrow_schema, &[1])
1990                .expect("field lookup should succeed");
1991
1992        let expected_id = iceberg_schema
1993            .as_struct()
1994            .field_by_name("key_col")
1995            .expect("top-level field should exist")
1996            .id;
1997        assert_eq!(equality_ids, vec![expected_id]);
1998        assert_ne!(expected_id, 2);
1999    }
2000}
2001
2002/// Commit completed batches to Iceberg as snapshots.
2003/// Batches are committed in timestamp order to ensure strong consistency guarantees downstream.
2004/// Each snapshot includes the Materialize frontier in its metadata for resume support.
2005fn commit_to_iceberg<'scope>(
2006    name: String,
2007    sink_id: GlobalId,
2008    sink_version: u64,
2009    batch_input: StreamVec<'scope, Timestamp, BoundedDataFile>,
2010    batch_desc_input: StreamVec<'scope, Timestamp, (Antichain<Timestamp>, Antichain<Timestamp>)>,
2011    table_ready_stream: StreamVec<'scope, Timestamp, Infallible>,
2012    write_frontier: Rc<RefCell<Antichain<Timestamp>>>,
2013    connection: IcebergSinkConnection,
2014    storage_configuration: StorageConfiguration,
2015    write_handle: impl Future<
2016        Output = anyhow::Result<WriteHandle<SourceData, (), Timestamp, StorageDiff>>,
2017    > + 'static,
2018    metrics: Arc<IcebergSinkMetrics>,
2019    statistics: SinkStatistics,
2020) -> (
2021    StreamVec<'scope, Timestamp, HealthStatusMessage>,
2022    PressOnDropButton,
2023) {
2024    let scope = batch_input.scope();
2025    let mut builder = OperatorBuilder::new(name, scope.clone());
2026
2027    let hashed_id = sink_id.hashed();
2028    let is_active_worker = usize::cast_from(hashed_id) % scope.peers() == scope.index();
2029    let name_for_logging = format!("{sink_id}-commit-to-iceberg");
2030
2031    let mut input = builder.new_disconnected_input(batch_input, Exchange::new(move |_| hashed_id));
2032    let mut batch_desc_input =
2033        builder.new_disconnected_input(batch_desc_input, Exchange::new(move |_| hashed_id));
2034    let mut table_ready_input = builder.new_disconnected_input(table_ready_stream, Pipeline);
2035
2036    let (button, errors) = builder.build_fallible(move |_caps| {
2037        Box::pin(async move {
2038            if !is_active_worker {
2039                write_frontier.borrow_mut().clear();
2040                return Ok(());
2041            }
2042
2043            let catalog = connection
2044                .catalog_connection
2045                .connect(&storage_configuration, InTask::Yes)
2046                .await
2047                .with_context(|| {
2048                    format!(
2049                        "Failed to connect to Iceberg catalog '{}' for table '{}.{}'",
2050                        connection.catalog_connection.uri, connection.namespace, connection.table
2051                    )
2052                })?;
2053
2054            let mut write_handle = write_handle.await?;
2055
2056            let namespace_ident = NamespaceIdent::new(connection.namespace.clone());
2057            let table_ident = TableIdent::new(namespace_ident, connection.table.clone());
2058            while let Some(_) = table_ready_input.next().await {
2059                // Wait for table to be ready
2060            }
2061            let mut table = catalog.load_table(&table_ident).await.with_context(|| {
2062                format!(
2063                    "Failed to load Iceberg table '{}.{}' in commit_to_iceberg operator",
2064                    connection.namespace, connection.table
2065                )
2066            })?;
2067
2068            #[allow(clippy::disallowed_types)]
2069            let mut batch_descriptions: std::collections::HashMap<
2070                (Antichain<Timestamp>, Antichain<Timestamp>),
2071                BoundedDataFileSet,
2072            > = std::collections::HashMap::new();
2073
2074            let mut batch_description_frontier = Antichain::from_elem(Timestamp::minimum());
2075            let mut input_frontier = Antichain::from_elem(Timestamp::minimum());
2076
2077            while !(batch_description_frontier.is_empty() && input_frontier.is_empty()) {
2078                tokio::select! {
2079                    _ = batch_desc_input.ready() => {},
2080                    _ = input.ready() => {}
2081                }
2082
2083                while let Some(event) = batch_desc_input.next_sync() {
2084                    match event {
2085                        Event::Data(_cap, data) => {
2086                            for batch_desc in data {
2087                                let prev = batch_descriptions
2088                                    .insert(batch_desc, BoundedDataFileSet { data_files: vec![] });
2089                                if let Some(prev) = prev {
2090                                    anyhow::bail!(
2091                                        "Duplicate batch description received \
2092                                         in commit operator: {:?}",
2093                                        prev
2094                                    );
2095                                }
2096                            }
2097                        }
2098                        Event::Progress(frontier) => {
2099                            batch_description_frontier = frontier;
2100                        }
2101                    }
2102                }
2103
2104                let ready_events = std::iter::from_fn(|| input.next_sync()).collect_vec();
2105                for event in ready_events {
2106                    match event {
2107                        Event::Data(_cap, data) => {
2108                            for bounded_data_file in data {
2109                                let entry = batch_descriptions
2110                                    .entry(bounded_data_file.batch_desc().clone())
2111                                    .or_default();
2112                                entry.data_files.push(bounded_data_file);
2113                            }
2114                        }
2115                        Event::Progress(frontier) => {
2116                            input_frontier = frontier;
2117                        }
2118                    }
2119                }
2120
2121                // Collect batches whose data files have all arrived.
2122                // The writer emits all data files for a batch at a capability <= the batch's
2123                // lower bound, then downgrades its capability to the batch's upper bound.
2124                // So once the input frontier advances past lower, we know the writer has
2125                // finished emitting files for this batch and dropped its capability.
2126                let mut done_batches: Vec<_> = batch_descriptions
2127                    .keys()
2128                    .filter(|(lower, _upper)| PartialOrder::less_than(lower, &input_frontier))
2129                    .cloned()
2130                    .collect();
2131
2132                // Commit batches in timestamp order to maintain consistency
2133                done_batches.sort_by(|a, b| {
2134                    if PartialOrder::less_than(a, b) {
2135                        Ordering::Less
2136                    } else if PartialOrder::less_than(b, a) {
2137                        Ordering::Greater
2138                    } else {
2139                        Ordering::Equal
2140                    }
2141                });
2142
2143                for batch in done_batches {
2144                    let file_set = batch_descriptions.remove(&batch).unwrap();
2145
2146                    let mut data_files = vec![];
2147                    let mut delete_files = vec![];
2148                    // Track totals for committed statistics
2149                    let mut total_messages: u64 = 0;
2150                    let mut total_bytes: u64 = 0;
2151                    for file in file_set.data_files {
2152                        total_messages += file.data_file().record_count();
2153                        total_bytes += file.data_file().file_size_in_bytes();
2154                        match file.data_file().content_type() {
2155                            iceberg::spec::DataContentType::Data => {
2156                                data_files.push(file.into_data_file());
2157                            }
2158                            iceberg::spec::DataContentType::PositionDeletes
2159                            | iceberg::spec::DataContentType::EqualityDeletes => {
2160                                delete_files.push(file.into_data_file());
2161                            }
2162                        }
2163                    }
2164
2165                    debug!(
2166                        ?sink_id,
2167                        %name_for_logging,
2168                        lower = %batch.0.pretty(),
2169                        upper = %batch.1.pretty(),
2170                        data_files = data_files.len(),
2171                        delete_files = delete_files.len(),
2172                        total_messages,
2173                        total_bytes,
2174                        "iceberg commit applying batch"
2175                    );
2176
2177                    let instant = Instant::now();
2178
2179                    let frontier = batch.1.clone();
2180                    let frontier_json = serde_json::to_string(&frontier.elements())
2181                        .context("Failed to serialize frontier to JSON")?;
2182                    let snapshot_properties = vec![
2183                        ("mz-sink-id".to_string(), sink_id.to_string()),
2184                        ("mz-frontier".to_string(), frontier_json),
2185                        ("mz-sink-version".to_string(), sink_version.to_string()),
2186                    ];
2187
2188                    let (table_state, commit_result) = Retry::default()
2189                        .max_tries(5)
2190                        .retry_async_with_state(table, |_, table| {
2191                            let snapshot_properties = snapshot_properties.clone();
2192                            let data_files = data_files.clone();
2193                            let delete_files = delete_files.clone();
2194                            let metrics = Arc::clone(&metrics);
2195                            let catalog = Arc::clone(&catalog);
2196                            let conn_namespace = connection.namespace.clone();
2197                            let conn_table = connection.table.clone();
2198                            let frontier = frontier.clone();
2199                            let batch_lower = batch.0.clone();
2200                            let batch_upper = batch.1.clone();
2201                            async move {
2202                                try_commit_batch(
2203                                    table,
2204                                    snapshot_properties,
2205                                    data_files,
2206                                    delete_files,
2207                                    catalog.as_ref(),
2208                                    &conn_namespace,
2209                                    &conn_table,
2210                                    sink_version,
2211                                    &frontier,
2212                                    &batch_lower,
2213                                    &batch_upper,
2214                                    &metrics,
2215                                )
2216                                .await
2217                            }
2218                        })
2219                        .await;
2220                    let commit_result = commit_result.with_context(|| {
2221                        format!(
2222                            "failed to commit batch to Iceberg table '{}.{}'",
2223                            connection.namespace, connection.table
2224                        )
2225                    });
2226                    table = table_state;
2227                    let duration = instant.elapsed();
2228                    metrics
2229                        .commit_duration_seconds
2230                        .observe(duration.as_secs_f64());
2231                    commit_result?;
2232
2233                    debug!(
2234                        ?sink_id,
2235                        %name_for_logging,
2236                        lower = %batch.0.pretty(),
2237                        upper = %batch.1.pretty(),
2238                        total_messages,
2239                        total_bytes,
2240                        ?duration,
2241                        "iceberg commit applied batch"
2242                    );
2243
2244                    metrics.snapshots_committed.inc();
2245                    statistics.inc_messages_committed_by(total_messages);
2246                    statistics.inc_bytes_committed_by(total_bytes);
2247
2248                    let mut expect_upper = write_handle.shared_upper();
2249                    loop {
2250                        if PartialOrder::less_equal(&frontier, &expect_upper) {
2251                            // The frontier has already been advanced as far as necessary.
2252                            break;
2253                        }
2254
2255                        const EMPTY: &[((SourceData, ()), Timestamp, StorageDiff)] = &[];
2256                        match write_handle
2257                            .compare_and_append(EMPTY, expect_upper, frontier.clone())
2258                            .await
2259                            .expect("valid usage")
2260                        {
2261                            Ok(()) => break,
2262                            Err(mismatch) => {
2263                                expect_upper = mismatch.current;
2264                            }
2265                        }
2266                    }
2267                    write_frontier.borrow_mut().clone_from(&frontier);
2268                }
2269            }
2270
2271            Ok(())
2272        })
2273    });
2274
2275    let statuses = errors.map(|error| HealthStatusMessage {
2276        id: None,
2277        update: HealthStatusUpdate::halting(format!("{}", error.display_with_causes()), None),
2278        namespace: StatusNamespace::Iceberg,
2279    });
2280
2281    (statuses, button.press_on_drop())
2282}
2283
2284impl<'scope> SinkRender<'scope> for IcebergSinkConnection {
2285    fn get_key_indices(&self) -> Option<&[usize]> {
2286        self.key_desc_and_indices
2287            .as_ref()
2288            .map(|(_, indices)| indices.as_slice())
2289    }
2290
2291    fn get_relation_key_indices(&self) -> Option<&[usize]> {
2292        self.relation_key_indices.as_deref()
2293    }
2294
2295    fn render_sink(
2296        &self,
2297        storage_state: &mut StorageState,
2298        sink: &StorageSinkDesc<CollectionMetadata, Timestamp>,
2299        sink_id: GlobalId,
2300        batches: SinkBatchStream<'scope>,
2301        key_is_synthetic: bool,
2302        _err_collection: VecCollection<'scope, Timestamp, DataflowError, Diff>,
2303    ) -> (
2304        StreamVec<'scope, Timestamp, HealthStatusMessage>,
2305        Vec<PressOnDropButton>,
2306    ) {
2307        let scope = batches.scope();
2308
2309        let (input, walker_button) = walk_sink_arrangement(
2310            format!("{sink_id}-iceberg-walker"),
2311            batches,
2312            sink_id,
2313            sink.from,
2314            key_is_synthetic,
2315        );
2316
2317        let write_handle = {
2318            let persist = Arc::clone(&storage_state.persist_clients);
2319            let shard_meta = sink.to_storage_metadata.clone();
2320            async move {
2321                let client = persist.open(shard_meta.persist_location).await?;
2322                let handle = client
2323                    .open_writer(
2324                        shard_meta.data_shard,
2325                        Arc::new(shard_meta.relation_desc),
2326                        Arc::new(UnitSchema),
2327                        Diagnostics::from_purpose("sink handle"),
2328                    )
2329                    .await?;
2330                Ok(handle)
2331            }
2332        };
2333
2334        let write_frontier = Rc::new(RefCell::new(Antichain::from_elem(Timestamp::minimum())));
2335        storage_state
2336            .sink_write_frontiers
2337            .insert(sink_id, Rc::clone(&write_frontier));
2338
2339        let (arrow_schema_with_ids, iceberg_schema) =
2340            match (|| -> Result<(ArrowSchema, Arc<Schema>), anyhow::Error> {
2341                let (arrow_schema_with_ids, iceberg_schema) =
2342                    relation_desc_to_iceberg_schema(&sink.from_desc)?;
2343
2344                Ok(if sink.envelope == SinkEnvelope::Append {
2345                    // For append mode, extend the Arrow and Iceberg schemas with the user-visible
2346                    // `_mz_diff` and `_mz_timestamp` columns. The minter uses `iceberg_schema` to create
2347                    // the Iceberg table, and `write_data_files` uses `arrow_schema_with_ids` when
2348                    // merging metadata. Both must include these columns before any operator starts.
2349                    let extended_arrow = build_schema_with_append_columns(&arrow_schema_with_ids);
2350                    let extended_iceberg = Arc::new(
2351                        arrow_schema_to_schema(&extended_arrow)
2352                            .context("Failed to build Iceberg schema with append columns")?,
2353                    );
2354                    (extended_arrow, extended_iceberg)
2355                } else {
2356                    (arrow_schema_with_ids, iceberg_schema)
2357                })
2358            })() {
2359                Ok(schemas) => schemas,
2360                Err(err) => {
2361                    let error_stream = std::iter::once(HealthStatusMessage {
2362                        id: None,
2363                        update: HealthStatusUpdate::halting(
2364                            format!("{}", err.display_with_causes()),
2365                            None,
2366                        ),
2367                        namespace: StatusNamespace::Iceberg,
2368                    })
2369                    .to_stream(scope);
2370                    return (error_stream, vec![]);
2371                }
2372            };
2373
2374        let metrics = Arc::new(
2375            storage_state
2376                .metrics
2377                .get_iceberg_sink_metrics(sink_id, scope.index()),
2378        );
2379
2380        let statistics = storage_state
2381            .aggregated_statistics
2382            .get_sink(&sink_id)
2383            .expect("statistics initialized")
2384            .clone();
2385
2386        let connection_for_minter = self.clone();
2387        let (minted_input, batch_descriptions, table_ready, mint_status, mint_button) =
2388            mint_batch_descriptions(
2389                format!("{sink_id}-iceberg-mint"),
2390                sink_id,
2391                input,
2392                sink,
2393                connection_for_minter,
2394                storage_state.storage_configuration.clone(),
2395                Arc::clone(&iceberg_schema),
2396            );
2397
2398        let connection_for_writer = self.clone();
2399        let (datafiles, write_status, write_button) = match sink.envelope {
2400            SinkEnvelope::Upsert => write_data_files::<UpsertEnvelopeHandler>(
2401                format!("{sink_id}-write-data-files"),
2402                minted_input,
2403                batch_descriptions.clone(),
2404                table_ready.clone(),
2405                sink.as_of.clone(),
2406                connection_for_writer,
2407                storage_state.storage_configuration.clone(),
2408                Arc::new(arrow_schema_with_ids.clone()),
2409                Arc::clone(&metrics),
2410                statistics.clone(),
2411            ),
2412            SinkEnvelope::Append => write_data_files::<AppendEnvelopeHandler>(
2413                format!("{sink_id}-write-data-files"),
2414                minted_input,
2415                batch_descriptions.clone(),
2416                table_ready.clone(),
2417                sink.as_of.clone(),
2418                connection_for_writer,
2419                storage_state.storage_configuration.clone(),
2420                Arc::new(arrow_schema_with_ids.clone()),
2421                Arc::clone(&metrics),
2422                statistics.clone(),
2423            ),
2424            SinkEnvelope::Debezium => {
2425                unreachable!("Iceberg sink only supports Upsert and Append envelopes")
2426            }
2427        };
2428
2429        let connection_for_committer = self.clone();
2430        let (commit_status, commit_button) = commit_to_iceberg(
2431            format!("{sink_id}-commit-to-iceberg"),
2432            sink_id,
2433            sink.version,
2434            datafiles,
2435            batch_descriptions,
2436            table_ready,
2437            Rc::clone(&write_frontier),
2438            connection_for_committer,
2439            storage_state.storage_configuration.clone(),
2440            write_handle,
2441            Arc::clone(&metrics),
2442            statistics,
2443        );
2444
2445        let running_status = Some(HealthStatusMessage {
2446            id: None,
2447            update: HealthStatusUpdate::running(),
2448            namespace: StatusNamespace::Iceberg,
2449        })
2450        .to_stream(scope);
2451
2452        let statuses =
2453            scope.concatenate([running_status, mint_status, write_status, commit_status]);
2454
2455        (
2456            statuses,
2457            vec![walker_button, mint_button, write_button, commit_button],
2458        )
2459    }
2460}
2461
2462/// Walks each arrangement batch and emits a stream of individual
2463/// `(key, DiffPair)` records that feeds the rest of the Iceberg sink pipeline.
2464///
2465/// Tracks per-`(key, timestamp)` group sizes and rate-limits a warning when a
2466/// non-synthetic key has more than one `DiffPair`. When `key_is_synthetic` the
2467/// arrangement's hash-based key is stripped before emission.
2468fn walk_sink_arrangement<'scope>(
2469    name: String,
2470    batches: SinkBatchStream<'scope>,
2471    sink_id: GlobalId,
2472    from_id: GlobalId,
2473    key_is_synthetic: bool,
2474) -> (
2475    VecCollection<'scope, Timestamp, (Option<Row>, DiffPair<Row>), Diff>,
2476    PressOnDropButton,
2477) {
2478    let mut builder = OperatorBuilder::new(name, batches.scope());
2479    let (output, stream) = builder.new_output::<CapacityContainerBuilder<Vec<_>>>();
2480    let mut input = builder.new_input_for(batches, Pipeline, &output);
2481
2482    let button = builder.build(move |_caps| async move {
2483        let mut pk_warner = (!key_is_synthetic).then(|| PkViolationWarner::new(sink_id, from_id));
2484
2485        while let Some(event) = input.next().await {
2486            if let Event::Data(cap, mut batches) = event {
2487                for batch in batches.drain(..) {
2488                    for_each_diff_pair(&batch, |key, time, diff_pair| {
2489                        if let Some(warner) = pk_warner.as_mut() {
2490                            warner.observe(key, time);
2491                        }
2492                        // The arrangement key is only used downstream for grouping and PK checks;
2493                        // `write_data_files` discards it on both the stash and drain paths. Emit
2494                        // None unconditionally to avoid per-`DiffPair` `Row` clones on this hot path.
2495                        output.give(&cap, ((None, diff_pair), time, Diff::ONE));
2496                    });
2497                    // Flush after each batch so the final `(key, time)` group of the walk is
2498                    // resolved immediately — a PK violation in the last group is otherwise held
2499                    // until more data arrives or the operator shuts down.
2500                    if let Some(warner) = pk_warner.as_mut() {
2501                        warner.flush();
2502                    }
2503                }
2504            }
2505        }
2506    });
2507
2508    (stream.as_collection(), button.press_on_drop())
2509}