Skip to main content

mz_storage/sink/
iceberg.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Iceberg sink implementation.
11//!
12//! This code renders a [`IcebergSinkConnection`] into a dataflow that writes
13//! data to an Iceberg table. The dataflow consists of three operators:
14//!
15//! ```text
16//!        ┏━━━━━━━━━━━━━━┓
17//!        ┃   persist    ┃
18//!        ┃    source    ┃
19//!        ┗━━━━━━┯━━━━━━━┛
20//!               │ row data, the input to this module
21//!               │
22//!        ┏━━━━━━v━━━━━━━┓
23//!        ┃     mint     ┃ (single worker)
24//!        ┃    batch     ┃ loads/creates the Iceberg table,
25//!        ┃ descriptions ┃ determines resume upper
26//!        ┗━━━┯━━━━━┯━━━━┛
27//!            │     │ batch descriptions (broadcast)
28//!       rows │     ├─────────────────────────┐
29//!            │     │                         │
30//!        ┏━━━v━━━━━v━━━━┓    ╭─────────────╮ │
31//!        ┃    write     ┃───>│ S3 / object │ │
32//!        ┃  data files  ┃    │   storage   │ │
33//!        ┗━━━━━━┯━━━━━━━┛    ╰─────────────╯ │
34//!               │ file metadata              │
35//!               │                            │
36//!        ┏━━━━━━v━━━━━━━━━━━━━━━━━━━━━━━━━━━━v┓
37//!        ┃           commit to                ┃ (single worker)
38//!        ┃             iceberg                ┃
39//!        ┗━━━━━━━━━━━━━┯━━━━━━━━━━━━━━━━━━━━━━┛
40//!                      │
41//!              ╭───────v───────╮
42//!              │ Iceberg table │
43//!              │  (snapshots)  │
44//!              ╰───────────────╯
45//! ```
46//! # Minting batch descriptions
47//! The "mint batch descriptions" operator is responsible for generating
48//! time-based batch boundaries that group writes into Iceberg snapshots.
49//! It maintains a sliding window of future batch descriptions so that
50//! writers can start processing data even while earlier batches are still being written.
51//! Knowing the batch boundaries ahead of time is important because we need to
52//! be able to make the claim that all data files written for a given batch
53//! include all data up to the upper `t` but not beyond it.
54//! This could be trivially achieved by waiting for all data to arrive up to a certain
55//! frontier, but that would prevent us from streaming writes out to object storage
56//! until the entire batch is complete, which would increase latency and reduce throughput.
57//!
58//! # Writing data files
59//! The "write data files" operator receives rows along with batch descriptions.
60//! It matches rows to batches by timestamp; if a batch description hasn't arrived yet,
61//! rows are stashed until it does. This allows batches to be minted ahead of data arrival.
62//! The operator uses an Iceberg `DeltaWriter` to write Parquet data files
63//! (and position delete files if necessary) to object storage.
64//! It outputs metadata about the written files along with their batch descriptions
65//! for the commit operator to consume.
66//!
67//! # Committing to Iceberg
68//! The "commit to iceberg" operator receives metadata about written data files
69//! along with their batch descriptions. It groups files by batch and creates
70//! Iceberg snapshots that include all files for each batch. It updates the Iceberg
71//! table's metadata to reflect the new snapshots, including updating the
72//! `mz-frontier` property to track progress.
73
74use std::cmp::Ordering;
75use std::collections::{BTreeMap, VecDeque};
76use std::convert::Infallible;
77use std::future::Future;
78use std::time::Instant;
79use std::{cell::RefCell, rc::Rc, sync::Arc};
80
81use anyhow::{Context, anyhow};
82use arrow::array::{ArrayRef, Int32Array, Int64Array, RecordBatch};
83use arrow::datatypes::{DataType, Field, Schema as ArrowSchema};
84use differential_dataflow::lattice::Lattice;
85use differential_dataflow::{AsCollection, Hashable, VecCollection};
86use futures::StreamExt;
87use iceberg::ErrorKind;
88use iceberg::arrow::{arrow_schema_to_schema, schema_to_arrow_schema};
89use iceberg::spec::{
90    DataFile, FormatVersion, Snapshot, StructType, read_data_files_from_avro,
91    write_data_files_to_avro,
92};
93use iceberg::spec::{Schema, SchemaRef};
94use iceberg::table::Table;
95use iceberg::transaction::{ApplyTransactionAction, Transaction};
96use iceberg::writer::base_writer::data_file_writer::DataFileWriterBuilder;
97use iceberg::writer::base_writer::equality_delete_writer::{
98    EqualityDeleteFileWriterBuilder, EqualityDeleteWriterConfig,
99};
100use iceberg::writer::base_writer::position_delete_writer::{
101    PositionDeleteFileWriterBuilder, PositionDeleteWriterConfig,
102};
103use iceberg::writer::combined_writer::delta_writer::DeltaWriterBuilder;
104use iceberg::writer::file_writer::ParquetWriterBuilder;
105use iceberg::writer::file_writer::location_generator::{
106    DefaultFileNameGenerator, DefaultLocationGenerator,
107};
108use iceberg::writer::file_writer::rolling_writer::RollingFileWriterBuilder;
109use iceberg::writer::{IcebergWriter, IcebergWriterBuilder};
110use iceberg::{Catalog, NamespaceIdent, TableCreation, TableIdent};
111use itertools::Itertools;
112use mz_arrow_util::builder::{ARROW_EXTENSION_NAME_KEY, ArrowBuilder};
113use mz_interchange::avro::DiffPair;
114use mz_ore::cast::CastFrom;
115use mz_ore::error::ErrorExt;
116use mz_ore::future::InTask;
117use mz_ore::result::ResultExt;
118use mz_ore::retry::{Retry, RetryResult};
119use mz_persist_client::Diagnostics;
120use mz_persist_client::write::WriteHandle;
121use mz_persist_types::codec_impls::UnitSchema;
122use mz_repr::{Diff, GlobalId, Row, Timestamp};
123use mz_storage_types::StorageDiff;
124use mz_storage_types::configuration::StorageConfiguration;
125use mz_storage_types::controller::CollectionMetadata;
126use mz_storage_types::errors::DataflowError;
127use mz_storage_types::sinks::{IcebergSinkConnection, SinkEnvelope, StorageSinkDesc};
128use mz_storage_types::sources::SourceData;
129use mz_timely_util::antichain::AntichainExt;
130use mz_timely_util::builder_async::{Event, OperatorBuilder, PressOnDropButton};
131use parquet::arrow::PARQUET_FIELD_ID_META_KEY;
132use parquet::file::properties::WriterProperties;
133use serde::{Deserialize, Serialize};
134use timely::PartialOrder;
135use timely::container::CapacityContainerBuilder;
136use timely::dataflow::StreamVec;
137use timely::dataflow::channels::pact::{Exchange, Pipeline};
138use timely::dataflow::operators::vec::{Broadcast, Map, ToStream};
139use timely::dataflow::operators::{CapabilitySet, Concatenate};
140use timely::progress::{Antichain, Timestamp as _};
141use tracing::debug;
142
143use crate::healthcheck::{HealthStatusMessage, HealthStatusUpdate, StatusNamespace};
144use crate::metrics::sink::iceberg::IcebergSinkMetrics;
145use crate::render::sinks::SinkRender;
146use crate::statistics::SinkStatistics;
147use crate::storage_state::StorageState;
148
149/// Set the default capacity for the array builders inside the ArrowBuilder. This is the
150/// number of items each builder can hold before it needs to allocate more memory.
151const DEFAULT_ARRAY_BUILDER_ITEM_CAPACITY: usize = 1024;
152/// Set the default buffer capacity for the string and binary array builders inside the
153/// ArrowBuilder. This is the number of bytes each builder can hold before it needs to allocate
154/// more memory.
155const DEFAULT_ARRAY_BUILDER_DATA_CAPACITY: usize = 1024;
156
157/// The prefix for Parquet files written by this sink.
158const PARQUET_FILE_PREFIX: &str = "mz_data";
159/// The number of batch descriptions to mint ahead of the observed frontier. This determines how
160/// many batches we have in-flight at any given time.
161const INITIAL_DESCRIPTIONS_TO_MINT: u64 = 3;
162
163/// Shared state produced by the async setup in [`write_data_files`] that both
164/// envelope handlers need to construct Parquet writers.
165struct WriterContext {
166    /// Arrow schema for data columns, with Materialize extension metadata merged in.
167    arrow_schema: Arc<ArrowSchema>,
168    /// Iceberg table schema, used to configure Parquet writers.
169    current_schema: Arc<Schema>,
170    /// File I/O for writing Parquet files to object storage.
171    file_io: iceberg::io::FileIO,
172    /// Generates file paths under the table's data directory.
173    location_generator: DefaultLocationGenerator,
174    /// Generates unique file names with a per-worker UUID suffix.
175    file_name_generator: DefaultFileNameGenerator,
176    writer_properties: WriterProperties,
177}
178
179/// Envelope-specific logic for writing Iceberg data files.
180trait EnvelopeHandler: Send {
181    /// Construct from the shared writer context after async setup completes.
182    fn new(
183        ctx: WriterContext,
184        connection: &IcebergSinkConnection,
185        materialize_arrow_schema: &Arc<ArrowSchema>,
186    ) -> anyhow::Result<Self>
187    where
188        Self: Sized;
189
190    /// Create an [`IcebergWriter`] for a new batch.
191    ///
192    /// `is_snapshot` is true for the initial "snapshot" batch (lower == as_of), which
193    /// contains all pre-existing data and can be very large. Implementations may use
194    /// this to disable memory-intensive optimisations like seen-rows deduplication.
195    async fn create_writer(&self, is_snapshot: bool) -> anyhow::Result<Box<dyn IcebergWriter>>;
196
197    fn row_to_batch(&self, diff_pair: DiffPair<Row>, ts: Timestamp) -> anyhow::Result<RecordBatch>;
198}
199
200struct UpsertEnvelopeHandler {
201    ctx: WriterContext,
202    /// Iceberg field IDs of the key columns, used for equality delete files.
203    equality_ids: Vec<i32>,
204    /// Iceberg schema for position delete files.
205    pos_schema: Arc<Schema>,
206    /// Iceberg schema for equality delete files (projected to key columns only).
207    eq_schema: Arc<Schema>,
208    /// Configuration for the equality delete writer (projected schema + column IDs).
209    eq_config: EqualityDeleteWriterConfig,
210    /// Arrow schema with an appended `__op` column that the
211    /// [`DeltaWriter`](iceberg::writer::combined_writer::delta_writer::DeltaWriter)
212    /// uses to distinguish inserts (+1) from deletes (-1).
213    schema_with_op: Arc<ArrowSchema>,
214}
215
216impl EnvelopeHandler for UpsertEnvelopeHandler {
217    fn new(
218        ctx: WriterContext,
219        connection: &IcebergSinkConnection,
220        materialize_arrow_schema: &Arc<ArrowSchema>,
221    ) -> anyhow::Result<Self> {
222        let Some((_, equality_indices)) = &connection.key_desc_and_indices else {
223            return Err(anyhow::anyhow!(
224                "Iceberg sink requires key columns for equality deletes"
225            ));
226        };
227
228        let equality_ids = equality_ids_for_indices(
229            ctx.current_schema.as_ref(),
230            materialize_arrow_schema.as_ref(),
231            equality_indices,
232        )?;
233
234        let pos_arrow_schema = PositionDeleteWriterConfig::arrow_schema();
235        let pos_schema = Arc::new(
236            arrow_schema_to_schema(&pos_arrow_schema)
237                .context("Failed to convert position delete Arrow schema to Iceberg schema")?,
238        );
239
240        let eq_config =
241            EqualityDeleteWriterConfig::new(equality_ids.clone(), Arc::clone(&ctx.current_schema))
242                .context("Failed to create EqualityDeleteWriterConfig")?;
243        let eq_schema = Arc::new(
244            arrow_schema_to_schema(eq_config.projected_arrow_schema_ref())
245                .context("Failed to convert equality delete Arrow schema to Iceberg schema")?,
246        );
247
248        let schema_with_op = Arc::new(build_schema_with_op_column(&ctx.arrow_schema));
249
250        Ok(Self {
251            ctx,
252            equality_ids,
253            pos_schema,
254            eq_schema,
255            eq_config,
256            schema_with_op,
257        })
258    }
259
260    async fn create_writer(&self, is_snapshot: bool) -> anyhow::Result<Box<dyn IcebergWriter>> {
261        let data_parquet_writer = ParquetWriterBuilder::new(
262            self.ctx.writer_properties.clone(),
263            Arc::clone(&self.ctx.current_schema),
264        )
265        .with_arrow_schema(Arc::clone(&self.ctx.arrow_schema))
266        .context("Arrow schema validation failed")?;
267        let data_rolling_writer = RollingFileWriterBuilder::new_with_default_file_size(
268            data_parquet_writer,
269            Arc::clone(&self.ctx.current_schema),
270            self.ctx.file_io.clone(),
271            self.ctx.location_generator.clone(),
272            self.ctx.file_name_generator.clone(),
273        );
274        let data_writer_builder = DataFileWriterBuilder::new(data_rolling_writer);
275
276        let pos_config = PositionDeleteWriterConfig::new(None, 0, None);
277        let pos_parquet_writer = ParquetWriterBuilder::new(
278            self.ctx.writer_properties.clone(),
279            Arc::clone(&self.pos_schema),
280        );
281        let pos_rolling_writer = RollingFileWriterBuilder::new_with_default_file_size(
282            pos_parquet_writer,
283            Arc::clone(&self.ctx.current_schema),
284            self.ctx.file_io.clone(),
285            self.ctx.location_generator.clone(),
286            self.ctx.file_name_generator.clone(),
287        );
288        let pos_delete_writer_builder =
289            PositionDeleteFileWriterBuilder::new(pos_rolling_writer, pos_config);
290
291        let eq_parquet_writer = ParquetWriterBuilder::new(
292            self.ctx.writer_properties.clone(),
293            Arc::clone(&self.eq_schema),
294        );
295        let eq_rolling_writer = RollingFileWriterBuilder::new_with_default_file_size(
296            eq_parquet_writer,
297            Arc::clone(&self.ctx.current_schema),
298            self.ctx.file_io.clone(),
299            self.ctx.location_generator.clone(),
300            self.ctx.file_name_generator.clone(),
301        );
302        let eq_delete_writer_builder =
303            EqualityDeleteFileWriterBuilder::new(eq_rolling_writer, self.eq_config.clone());
304
305        let mut builder = DeltaWriterBuilder::new(
306            data_writer_builder,
307            pos_delete_writer_builder,
308            eq_delete_writer_builder,
309            self.equality_ids.clone(),
310        );
311
312        if is_snapshot {
313            builder = builder.with_max_seen_rows(0);
314        }
315
316        Ok(Box::new(
317            builder
318                .build(None)
319                .await
320                .context("Failed to create DeltaWriter")?,
321        ))
322    }
323
324    /// The `__op` column indicates whether each row is an insert (+1) or delete (-1),
325    /// which the DeltaWriter uses to generate the appropriate Iceberg data/delete files.
326    fn row_to_batch(
327        &self,
328        diff_pair: DiffPair<Row>,
329        _ts: Timestamp,
330    ) -> anyhow::Result<RecordBatch> {
331        let mut builder = ArrowBuilder::new_with_schema(
332            Arc::clone(&self.ctx.arrow_schema),
333            DEFAULT_ARRAY_BUILDER_ITEM_CAPACITY,
334            DEFAULT_ARRAY_BUILDER_DATA_CAPACITY,
335        )
336        .context("Failed to create builder")?;
337
338        let mut op_values = Vec::new();
339
340        if let Some(before) = diff_pair.before {
341            builder
342                .add_row(&before)
343                .context("Failed to add delete row to builder")?;
344            op_values.push(-1i32);
345        }
346        if let Some(after) = diff_pair.after {
347            builder
348                .add_row(&after)
349                .context("Failed to add insert row to builder")?;
350            op_values.push(1i32);
351        }
352
353        let batch = builder
354            .to_record_batch()
355            .context("Failed to create record batch")?;
356
357        let mut columns: Vec<ArrayRef> = batch.columns().to_vec();
358        columns.push(Arc::new(Int32Array::from(op_values)));
359
360        RecordBatch::try_new(Arc::clone(&self.schema_with_op), columns)
361            .context("Failed to create batch with op column")
362    }
363}
364
365struct AppendEnvelopeHandler {
366    ctx: WriterContext,
367    /// Arrow schema with only user columns (no `_mz_diff`/`_mz_timestamp`), used by
368    /// [`ArrowBuilder`] to serialize row data before the extra columns are appended.
369    user_schema_for_append: Arc<ArrowSchema>,
370}
371
372impl EnvelopeHandler for AppendEnvelopeHandler {
373    fn new(
374        ctx: WriterContext,
375        _connection: &IcebergSinkConnection,
376        _materialize_arrow_schema: &Arc<ArrowSchema>,
377    ) -> anyhow::Result<Self> {
378        // arrow_schema already includes _mz_diff + _mz_timestamp (added in render_sink); strip
379        // the last two fields so ArrowBuilder only processes the user columns.
380        let n = ctx.arrow_schema.fields().len().saturating_sub(2);
381        let user_schema_for_append =
382            Arc::new(ArrowSchema::new(ctx.arrow_schema.fields()[..n].to_vec()));
383
384        Ok(Self {
385            ctx,
386            user_schema_for_append,
387        })
388    }
389
390    async fn create_writer(&self, _is_snapshot: bool) -> anyhow::Result<Box<dyn IcebergWriter>> {
391        let data_parquet_writer = ParquetWriterBuilder::new(
392            self.ctx.writer_properties.clone(),
393            Arc::clone(&self.ctx.current_schema),
394        )
395        .with_arrow_schema(Arc::clone(&self.ctx.arrow_schema))
396        .context("Arrow schema validation failed")?;
397        let data_rolling_writer = RollingFileWriterBuilder::new_with_default_file_size(
398            data_parquet_writer,
399            Arc::clone(&self.ctx.current_schema),
400            self.ctx.file_io.clone(),
401            self.ctx.location_generator.clone(),
402            self.ctx.file_name_generator.clone(),
403        );
404        Ok(Box::new(
405            DataFileWriterBuilder::new(data_rolling_writer)
406                .build(None)
407                .await
408                .context("Failed to create DataFileWriter")?,
409        ))
410    }
411
412    /// Every change is written as a plain data row: the `before` half (if present) gets
413    /// `_mz_diff = -1` and the `after` half gets `_mz_diff = +1`. Both carry the same `_mz_timestamp`.
414    fn row_to_batch(&self, diff_pair: DiffPair<Row>, ts: Timestamp) -> anyhow::Result<RecordBatch> {
415        let mut builder = ArrowBuilder::new_with_schema(
416            Arc::clone(&self.user_schema_for_append),
417            DEFAULT_ARRAY_BUILDER_ITEM_CAPACITY,
418            DEFAULT_ARRAY_BUILDER_DATA_CAPACITY,
419        )
420        .context("Failed to create builder")?;
421
422        let mut diff_values: Vec<i32> = Vec::new();
423        let ts_i64 = i64::try_from(u64::from(ts)).unwrap_or(i64::MAX);
424
425        if let Some(before) = diff_pair.before {
426            builder
427                .add_row(&before)
428                .context("Failed to add before row to builder")?;
429            diff_values.push(-1i32);
430        }
431        if let Some(after) = diff_pair.after {
432            builder
433                .add_row(&after)
434                .context("Failed to add after row to builder")?;
435            diff_values.push(1i32);
436        }
437
438        let n = diff_values.len();
439        let batch = builder
440            .to_record_batch()
441            .context("Failed to create record batch")?;
442
443        let mut columns: Vec<ArrayRef> = batch.columns().to_vec();
444        columns.push(Arc::new(Int32Array::from(diff_values)));
445        columns.push(Arc::new(Int64Array::from(vec![ts_i64; n])));
446
447        RecordBatch::try_new(Arc::clone(&self.ctx.arrow_schema), columns)
448            .context("Failed to create append record batch")
449    }
450}
451
452/// The precision needed to store all UInt64 values in a Decimal128.
453/// UInt64 max value is 18,446,744,073,709,551,615 which has 20 digits.
454const ICEBERG_UINT64_DECIMAL_PRECISION: u8 = 20;
455
456/// Add Parquet field IDs to an Arrow schema. Iceberg requires field IDs in the
457/// Parquet metadata for schema evolution tracking. Field IDs are assigned
458/// recursively to all nested fields (structs, lists, maps) using a depth-first,
459/// pre-order traversal.
460fn add_field_ids_to_arrow_schema(schema: ArrowSchema) -> ArrowSchema {
461    let mut next_field_id = 1i32;
462    let fields: Vec<Field> = schema
463        .fields()
464        .iter()
465        .map(|field| add_field_ids_recursive(field, &mut next_field_id))
466        .collect();
467    ArrowSchema::new(fields).with_metadata(schema.metadata().clone())
468}
469
470/// Recursively add field IDs to a field and all its nested children.
471fn add_field_ids_recursive(field: &Field, next_id: &mut i32) -> Field {
472    let current_id = *next_id;
473    *next_id += 1;
474
475    let mut metadata = field.metadata().clone();
476    metadata.insert(
477        PARQUET_FIELD_ID_META_KEY.to_string(),
478        current_id.to_string(),
479    );
480
481    let new_data_type = add_field_ids_to_datatype(field.data_type(), next_id);
482
483    Field::new(field.name(), new_data_type, field.is_nullable()).with_metadata(metadata)
484}
485
486/// Add field IDs to nested fields within a DataType.
487fn add_field_ids_to_datatype(data_type: &DataType, next_id: &mut i32) -> DataType {
488    match data_type {
489        DataType::Struct(fields) => {
490            let new_fields: Vec<Field> = fields
491                .iter()
492                .map(|f| add_field_ids_recursive(f, next_id))
493                .collect();
494            DataType::Struct(new_fields.into())
495        }
496        DataType::List(element_field) => {
497            let new_element = add_field_ids_recursive(element_field, next_id);
498            DataType::List(Arc::new(new_element))
499        }
500        DataType::LargeList(element_field) => {
501            let new_element = add_field_ids_recursive(element_field, next_id);
502            DataType::LargeList(Arc::new(new_element))
503        }
504        DataType::Map(entries_field, sorted) => {
505            let new_entries = add_field_ids_recursive(entries_field, next_id);
506            DataType::Map(Arc::new(new_entries), *sorted)
507        }
508        _ => data_type.clone(),
509    }
510}
511
512/// Merge Materialize extension metadata into Iceberg's Arrow schema.
513/// This uses Iceberg's data types (e.g. Utf8) and field IDs while preserving
514/// Materialize's extension names for ArrowBuilder compatibility.
515/// Handles nested types (structs, lists, maps) recursively.
516fn merge_materialize_metadata_into_iceberg_schema(
517    materialize_arrow_schema: &ArrowSchema,
518    iceberg_schema: &Schema,
519) -> anyhow::Result<ArrowSchema> {
520    // First, convert Iceberg schema to Arrow (this gives us the correct data types)
521    let iceberg_arrow_schema = schema_to_arrow_schema(iceberg_schema)
522        .context("Failed to convert Iceberg schema to Arrow schema")?;
523
524    // Now merge in the Materialize extension metadata
525    let fields: Vec<Field> = iceberg_arrow_schema
526        .fields()
527        .iter()
528        .map(|iceberg_field| {
529            // Find the corresponding Materialize field by name to get extension metadata
530            let mz_field = materialize_arrow_schema
531                .field_with_name(iceberg_field.name())
532                .with_context(|| {
533                    format!(
534                        "Field '{}' not found in Materialize schema",
535                        iceberg_field.name()
536                    )
537                })?;
538
539            merge_field_metadata_recursive(iceberg_field, Some(mz_field))
540        })
541        .collect::<anyhow::Result<Vec<_>>>()?;
542
543    Ok(ArrowSchema::new(fields).with_metadata(iceberg_arrow_schema.metadata().clone()))
544}
545
546/// Recursively merge Materialize extension metadata into an Iceberg field.
547fn merge_field_metadata_recursive(
548    iceberg_field: &Field,
549    mz_field: Option<&Field>,
550) -> anyhow::Result<Field> {
551    // Start with Iceberg field's metadata (which includes field IDs)
552    let mut metadata = iceberg_field.metadata().clone();
553
554    // Add Materialize extension name if available
555    if let Some(mz_f) = mz_field {
556        if let Some(extension_name) = mz_f.metadata().get(ARROW_EXTENSION_NAME_KEY) {
557            metadata.insert(ARROW_EXTENSION_NAME_KEY.to_string(), extension_name.clone());
558        }
559    }
560
561    // Recursively process nested types
562    let new_data_type = match iceberg_field.data_type() {
563        DataType::Struct(iceberg_fields) => {
564            let mz_struct_fields = match mz_field {
565                Some(f) => match f.data_type() {
566                    DataType::Struct(fields) => Some(fields),
567                    other => anyhow::bail!(
568                        "Type mismatch for field '{}': Iceberg schema has Struct, but Materialize schema has {:?}",
569                        iceberg_field.name(),
570                        other
571                    ),
572                },
573                None => None,
574            };
575
576            let new_fields: Vec<Field> = iceberg_fields
577                .iter()
578                .map(|iceberg_inner| {
579                    let mz_inner = mz_struct_fields.and_then(|fields| {
580                        fields.iter().find(|f| f.name() == iceberg_inner.name())
581                    });
582                    merge_field_metadata_recursive(iceberg_inner, mz_inner.map(|f| f.as_ref()))
583                })
584                .collect::<anyhow::Result<Vec<_>>>()?;
585
586            DataType::Struct(new_fields.into())
587        }
588        DataType::List(iceberg_element) => {
589            let mz_element = match mz_field {
590                Some(f) => match f.data_type() {
591                    DataType::List(element) => Some(element.as_ref()),
592                    other => anyhow::bail!(
593                        "Type mismatch for field '{}': Iceberg schema has List, but Materialize schema has {:?}",
594                        iceberg_field.name(),
595                        other
596                    ),
597                },
598                None => None,
599            };
600            let new_element = merge_field_metadata_recursive(iceberg_element, mz_element)?;
601            DataType::List(Arc::new(new_element))
602        }
603        DataType::LargeList(iceberg_element) => {
604            let mz_element = match mz_field {
605                Some(f) => match f.data_type() {
606                    DataType::LargeList(element) => Some(element.as_ref()),
607                    other => anyhow::bail!(
608                        "Type mismatch for field '{}': Iceberg schema has LargeList, but Materialize schema has {:?}",
609                        iceberg_field.name(),
610                        other
611                    ),
612                },
613                None => None,
614            };
615            let new_element = merge_field_metadata_recursive(iceberg_element, mz_element)?;
616            DataType::LargeList(Arc::new(new_element))
617        }
618        DataType::Map(iceberg_entries, sorted) => {
619            let mz_entries = match mz_field {
620                Some(f) => match f.data_type() {
621                    DataType::Map(entries, _) => Some(entries.as_ref()),
622                    other => anyhow::bail!(
623                        "Type mismatch for field '{}': Iceberg schema has Map, but Materialize schema has {:?}",
624                        iceberg_field.name(),
625                        other
626                    ),
627                },
628                None => None,
629            };
630            let new_entries = merge_field_metadata_recursive(iceberg_entries, mz_entries)?;
631            DataType::Map(Arc::new(new_entries), *sorted)
632        }
633        other => other.clone(),
634    };
635
636    Ok(Field::new(
637        iceberg_field.name(),
638        new_data_type,
639        iceberg_field.is_nullable(),
640    )
641    .with_metadata(metadata))
642}
643
644async fn reload_table(
645    catalog: &dyn Catalog,
646    namespace: String,
647    table_name: String,
648    current_table: Table,
649) -> anyhow::Result<Table> {
650    let namespace_ident = NamespaceIdent::new(namespace.clone());
651    let table_ident = TableIdent::new(namespace_ident, table_name.clone());
652    let current_schema = current_table.metadata().current_schema_id();
653    let current_partition_spec = current_table.metadata().default_partition_spec_id();
654
655    match catalog.load_table(&table_ident).await {
656        Ok(table) => {
657            let reloaded_schema = table.metadata().current_schema_id();
658            let reloaded_partition_spec = table.metadata().default_partition_spec_id();
659            if reloaded_schema != current_schema {
660                return Err(anyhow::anyhow!(
661                    "Iceberg table '{}' schema changed during operation but schema evolution isn't supported, expected schema ID {}, got {}",
662                    table_name,
663                    current_schema,
664                    reloaded_schema
665                ));
666            }
667
668            if reloaded_partition_spec != current_partition_spec {
669                return Err(anyhow::anyhow!(
670                    "Iceberg table '{}' partition spec changed during operation but partition spec evolution isn't supported, expected partition spec ID {}, got {}",
671                    table_name,
672                    current_partition_spec,
673                    reloaded_partition_spec
674                ));
675            }
676
677            Ok(table)
678        }
679        Err(err) => Err(err).context("Failed to reload Iceberg table"),
680    }
681}
682
683/// Attempt a single commit of a batch of data files to an Iceberg table.
684/// On conflict or failure, reloads the table and returns a retryable error.
685/// On success, returns the updated table state.
686async fn try_commit_batch(
687    mut table: Table,
688    snapshot_properties: Vec<(String, String)>,
689    data_files: Vec<DataFile>,
690    delete_files: Vec<DataFile>,
691    catalog: &dyn Catalog,
692    conn_namespace: &str,
693    conn_table: &str,
694    sink_version: u64,
695    frontier: &Antichain<Timestamp>,
696    batch_lower: &Antichain<Timestamp>,
697    batch_upper: &Antichain<Timestamp>,
698    metrics: &IcebergSinkMetrics,
699) -> (Table, RetryResult<(), anyhow::Error>) {
700    let tx = Transaction::new(&table);
701    let mut action = tx
702        .row_delta()
703        .set_snapshot_properties(snapshot_properties.into_iter().collect())
704        .with_check_duplicate(false);
705
706    if !data_files.is_empty() || !delete_files.is_empty() {
707        action = action
708            .add_data_files(data_files)
709            .add_delete_files(delete_files);
710    }
711
712    let tx = match action
713        .apply(tx)
714        .context("Failed to apply data file addition to iceberg table transaction")
715    {
716        Ok(tx) => tx,
717        Err(e) => {
718            match reload_table(
719                catalog,
720                conn_namespace.to_string(),
721                conn_table.to_string(),
722                table.clone(),
723            )
724            .await
725            {
726                Ok(reloaded) => table = reloaded,
727                Err(reload_err) => {
728                    return (table, RetryResult::RetryableErr(anyhow!(reload_err)));
729                }
730            }
731            return (
732                table,
733                RetryResult::RetryableErr(anyhow!(
734                    "Failed to apply data file addition to iceberg table transaction: {}",
735                    e
736                )),
737            );
738        }
739    };
740
741    let new_table = tx.commit(catalog).await;
742    match new_table {
743        Err(e) if matches!(e.kind(), ErrorKind::CatalogCommitConflicts) => {
744            metrics.commit_conflicts.inc();
745            match reload_table(
746                catalog,
747                conn_namespace.to_string(),
748                conn_table.to_string(),
749                table.clone(),
750            )
751            .await
752            {
753                Ok(reloaded) => table = reloaded,
754                Err(e) => {
755                    return (table, RetryResult::RetryableErr(anyhow!(e)));
756                }
757            };
758
759            let mut snapshots: Vec<_> = table.metadata().snapshots().cloned().collect();
760            let last = retrieve_upper_from_snapshots(&mut snapshots);
761            let last = match last {
762                Ok(val) => val,
763                Err(e) => {
764                    return (table, RetryResult::RetryableErr(anyhow!(e)));
765                }
766            };
767
768            // Check if another writer has advanced the frontier beyond ours (fencing check)
769            if let Some((last_frontier, last_version)) = last {
770                if last_version > sink_version {
771                    return (
772                        table,
773                        RetryResult::FatalErr(anyhow!(
774                            "Iceberg table '{}' has been modified by another writer \
775                             with version {}. Current sink version: {}. \
776                             Frontiers may be out of sync, aborting to avoid data loss.",
777                            conn_table,
778                            last_version,
779                            sink_version,
780                        )),
781                    );
782                }
783                if PartialOrder::less_equal(frontier, &last_frontier) {
784                    return (
785                        table,
786                        RetryResult::FatalErr(anyhow!(
787                            "Iceberg table '{}' has been modified by another writer. \
788                             Current frontier: {:?}, last frontier: {:?}.",
789                            conn_table,
790                            frontier,
791                            last_frontier,
792                        )),
793                    );
794                }
795            }
796
797            (
798                table,
799                RetryResult::RetryableErr(anyhow!(
800                    "Commit conflict detected when committing batch [{}, {}) \
801                     to Iceberg table '{}.{}'. Retrying...",
802                    batch_lower.pretty(),
803                    batch_upper.pretty(),
804                    conn_namespace,
805                    conn_table
806                )),
807            )
808        }
809        Err(e) => {
810            metrics.commit_failures.inc();
811            (table, RetryResult::RetryableErr(anyhow!(e)))
812        }
813        Ok(new_table) => (new_table, RetryResult::Ok(())),
814    }
815}
816
817/// Load an existing Iceberg table or create it if it doesn't exist.
818async fn load_or_create_table(
819    catalog: &dyn Catalog,
820    namespace: String,
821    table_name: String,
822    schema: &Schema,
823) -> anyhow::Result<iceberg::table::Table> {
824    let namespace_ident = NamespaceIdent::new(namespace.clone());
825    let table_ident = TableIdent::new(namespace_ident.clone(), table_name.clone());
826
827    // Try to load the table first
828    match catalog.load_table(&table_ident).await {
829        Ok(table) => {
830            // Table exists, return it
831            // TODO: Add proper schema evolution/validation to ensure compatibility
832            Ok(table)
833        }
834        Err(err) => {
835            if matches!(err.kind(), ErrorKind::TableNotFound { .. })
836                || err
837                    .message()
838                    .contains("Tried to load a table that does not exist")
839            {
840                // Table doesn't exist, create it
841                // Note: location is not specified, letting the catalog determine the default location
842                // based on its warehouse configuration
843                let table_creation = TableCreation::builder()
844                    .name(table_name.clone())
845                    .schema(schema.clone())
846                    // Use unpartitioned spec by default
847                    // TODO: Consider making partition spec configurable
848                    // .partition_spec(UnboundPartitionSpec::builder().build())
849                    .build();
850
851                catalog
852                    .create_table(&namespace_ident, table_creation)
853                    .await
854                    .with_context(|| {
855                        format!(
856                            "Failed to create Iceberg table '{}' in namespace '{}'",
857                            table_name, namespace
858                        )
859                    })
860            } else {
861                // Some other error occurred
862                Err(err).context("Failed to load Iceberg table")
863            }
864        }
865    }
866}
867
868/// Find the most recent Materialize frontier from Iceberg snapshots.
869/// We store the frontier in snapshot metadata to track where we left off after restarts.
870/// Snapshots with operation="replace" (compactions) don't have our metadata and are skipped.
871/// The input slice will be sorted by sequence number in descending order.
872fn retrieve_upper_from_snapshots(
873    snapshots: &mut [Arc<Snapshot>],
874) -> anyhow::Result<Option<(Antichain<Timestamp>, u64)>> {
875    snapshots.sort_by(|a, b| Ord::cmp(&b.sequence_number(), &a.sequence_number()));
876
877    for snapshot in snapshots {
878        let props = &snapshot.summary().additional_properties;
879        if let (Some(frontier_json), Some(sink_version_str)) =
880            (props.get("mz-frontier"), props.get("mz-sink-version"))
881        {
882            let frontier: Vec<Timestamp> = serde_json::from_str(frontier_json)
883                .context("Failed to deserialize frontier from snapshot properties")?;
884            let frontier = Antichain::from_iter(frontier);
885
886            let sink_version = sink_version_str
887                .parse::<u64>()
888                .context("Failed to parse mz-sink-version from snapshot properties")?;
889
890            return Ok(Some((frontier, sink_version)));
891        }
892        if snapshot.summary().operation.as_str() != "replace" {
893            // This is a bad heuristic, but we have no real other way to identify compactions
894            // right now other than assume they will be the only operation writing "replace" operations.
895            // That means if we find a snapshot with some other operation, but no mz-frontier, we are in an
896            // inconsistent state and have to error out.
897            anyhow::bail!(
898                "Iceberg table is in an inconsistent state: snapshot {} has operation '{}' but is missing 'mz-frontier' property. Schema or partition spec evolution is not supported.",
899                snapshot.snapshot_id(),
900                snapshot.summary().operation.as_str(),
901            );
902        }
903    }
904
905    Ok(None)
906}
907
908/// Type overrides for Iceberg-compatible Arrow schemas.
909///
910/// Iceberg doesn't support unsigned integer types, so we map them to compatible types:
911/// - UInt8, UInt16 -> Int32
912/// - UInt32 -> Int64
913/// - UInt64 -> Decimal128(20, 0)
914/// - MzTimestamp (which uses UInt64) -> Decimal128(20, 0)
915fn iceberg_type_overrides(scalar_type: &mz_repr::SqlScalarType) -> Option<(DataType, String)> {
916    use mz_repr::SqlScalarType;
917    match scalar_type {
918        SqlScalarType::UInt16 => Some((DataType::Int32, "uint2".to_string())),
919        SqlScalarType::UInt32 => Some((DataType::Int64, "uint4".to_string())),
920        SqlScalarType::UInt64 => Some((
921            DataType::Decimal128(ICEBERG_UINT64_DECIMAL_PRECISION, 0),
922            "uint8".to_string(),
923        )),
924        SqlScalarType::MzTimestamp => Some((
925            DataType::Decimal128(ICEBERG_UINT64_DECIMAL_PRECISION, 0),
926            "mz_timestamp".to_string(),
927        )),
928        _ => None,
929    }
930}
931
932/// Convert a Materialize RelationDesc into Arrow and Iceberg schemas.
933///
934/// Returns a tuple of:
935/// - The Arrow schema (with field IDs and Iceberg-compatible types) for writing Parquet files
936/// - The Iceberg schema for table creation/validation
937///
938/// Iceberg doesn't support unsigned integer types, so we use `iceberg_type_overrides`
939/// to map them to compatible types (e.g., UInt64 -> Decimal128(20,0)). The ArrowBuilder
940/// handles the cross-type conversion (Datum::UInt64 -> Decimal128Builder) automatically.
941fn relation_desc_to_iceberg_schema(
942    desc: &mz_repr::RelationDesc,
943) -> anyhow::Result<(ArrowSchema, SchemaRef)> {
944    let arrow_schema =
945        mz_arrow_util::builder::desc_to_schema_with_overrides(desc, iceberg_type_overrides)
946            .context("Failed to convert RelationDesc to Iceberg-compatible Arrow schema")?;
947
948    let arrow_schema_with_ids = add_field_ids_to_arrow_schema(arrow_schema);
949
950    let iceberg_schema = arrow_schema_to_schema(&arrow_schema_with_ids)
951        .context("Failed to convert Arrow schema to Iceberg schema")?;
952
953    Ok((arrow_schema_with_ids, Arc::new(iceberg_schema)))
954}
955
956/// Resolve Materialize key column indexes to Iceberg top-level field IDs.
957///
958/// Iceberg field IDs are assigned recursively, so a top-level column's field ID
959/// is not necessarily `column_index + 1` once nested fields are present.
960fn equality_ids_for_indices(
961    current_schema: &Schema,
962    materialize_arrow_schema: &ArrowSchema,
963    equality_indices: &[usize],
964) -> anyhow::Result<Vec<i32>> {
965    let top_level_fields = current_schema.as_struct();
966
967    equality_indices
968        .iter()
969        .map(|index| {
970            let mz_field = materialize_arrow_schema
971                .fields()
972                .get(*index)
973                .with_context(|| format!("Equality delete key index {index} is out of bounds"))?;
974            let field_name = mz_field.name();
975            let iceberg_field = top_level_fields
976                .field_by_name(field_name)
977                .with_context(|| {
978                    format!(
979                        "Equality delete key column '{}' not found in Iceberg table schema",
980                        field_name
981                    )
982                })?;
983            Ok(iceberg_field.id)
984        })
985        .collect()
986}
987
988/// Build a new Arrow schema by adding an __op column to the existing schema.
989fn build_schema_with_op_column(schema: &ArrowSchema) -> ArrowSchema {
990    let mut fields: Vec<Arc<Field>> = schema.fields().iter().cloned().collect();
991    fields.push(Arc::new(Field::new("__op", DataType::Int32, false)));
992    ArrowSchema::new(fields)
993}
994
995/// Build a new Arrow schema by appending `_mz_diff` (Int32) and `_mz_timestamp` (Int64) columns.
996/// These are user-visible Iceberg columns written in append mode. Parquet field IDs are
997/// assigned sequentially after the existing maximum field ID so the extended schema can
998/// be converted to a valid Iceberg schema via `arrow_schema_to_schema`.
999#[allow(clippy::disallowed_types)]
1000fn build_schema_with_append_columns(schema: &ArrowSchema) -> ArrowSchema {
1001    use mz_storage_types::sinks::{ICEBERG_APPEND_DIFF_COLUMN, ICEBERG_APPEND_TIMESTAMP_COLUMN};
1002    let mut fields: Vec<Arc<Field>> = schema.fields().iter().cloned().collect();
1003    fields.push(Arc::new(Field::new(
1004        ICEBERG_APPEND_DIFF_COLUMN,
1005        DataType::Int32,
1006        false,
1007    )));
1008    fields.push(Arc::new(Field::new(
1009        ICEBERG_APPEND_TIMESTAMP_COLUMN,
1010        DataType::Int64,
1011        false,
1012    )));
1013
1014    add_field_ids_to_arrow_schema(ArrowSchema::new(fields).with_metadata(schema.metadata().clone()))
1015}
1016
1017/// Generate time-based batch boundaries for grouping writes into Iceberg snapshots.
1018/// Batches are minted with configurable windows to balance write efficiency with latency.
1019/// We maintain a sliding window of future batch descriptions so writers can start
1020/// processing data even while earlier batches are still being written.
1021fn mint_batch_descriptions<'scope, D>(
1022    name: String,
1023    sink_id: GlobalId,
1024    input: VecCollection<'scope, Timestamp, D, Diff>,
1025    sink: &StorageSinkDesc<CollectionMetadata, Timestamp>,
1026    connection: IcebergSinkConnection,
1027    storage_configuration: StorageConfiguration,
1028    initial_schema: SchemaRef,
1029) -> (
1030    VecCollection<'scope, Timestamp, D, Diff>,
1031    StreamVec<'scope, Timestamp, (Antichain<Timestamp>, Antichain<Timestamp>)>,
1032    StreamVec<'scope, Timestamp, Infallible>,
1033    StreamVec<'scope, Timestamp, HealthStatusMessage>,
1034    PressOnDropButton,
1035)
1036where
1037    D: Clone + 'static,
1038{
1039    let scope = input.scope();
1040    let name_for_error = name.clone();
1041    let name_for_logging = name.clone();
1042    let mut builder = OperatorBuilder::new(name, scope.clone());
1043    let sink_version = sink.version;
1044
1045    let hashed_id = sink_id.hashed();
1046    let is_active_worker = usize::cast_from(hashed_id) % scope.peers() == scope.index();
1047    let (_, table_ready_stream) = builder.new_output::<CapacityContainerBuilder<Vec<_>>>();
1048    let (output, output_stream) = builder.new_output();
1049    let (batch_desc_output, batch_desc_stream) =
1050        builder.new_output::<CapacityContainerBuilder<Vec<_>>>();
1051    let mut input =
1052        builder.new_input_for_many(input.inner, Pipeline, [&output, &batch_desc_output]);
1053
1054    let as_of = sink.as_of.clone();
1055    let commit_interval = sink
1056        .commit_interval
1057        .expect("the planner should have enforced this")
1058        .clone();
1059
1060    let (button, errors): (_, StreamVec<'scope, Timestamp, Rc<anyhow::Error>>) =
1061        builder.build_fallible(move |caps| {
1062        Box::pin(async move {
1063            let [table_ready_capset, data_capset, capset]: &mut [_; 3] = caps.try_into().unwrap();
1064            *data_capset = CapabilitySet::new();
1065
1066            if !is_active_worker {
1067                *capset = CapabilitySet::new();
1068                *data_capset = CapabilitySet::new();
1069                *table_ready_capset = CapabilitySet::new();
1070                while let Some(event) = input.next().await {
1071                    match event {
1072                        Event::Data([output_cap, _], mut data) => {
1073                            output.give_container(&output_cap, &mut data);
1074                        }
1075                        Event::Progress(_) => {}
1076                    }
1077                }
1078                return Ok(());
1079            }
1080
1081            let catalog = connection
1082                .catalog_connection
1083                .connect(&storage_configuration, InTask::Yes)
1084                .await
1085                .with_context(|| {
1086                    format!(
1087                        "Failed to connect to Iceberg catalog '{}' for table '{}.{}'",
1088                        connection.catalog_connection.uri, connection.namespace, connection.table
1089                    )
1090                })?;
1091
1092            let table = load_or_create_table(
1093                catalog.as_ref(),
1094                connection.namespace.clone(),
1095                connection.table.clone(),
1096                initial_schema.as_ref(),
1097            )
1098            .await?;
1099            debug!(
1100                ?sink_id,
1101                %name_for_logging,
1102                namespace = %connection.namespace,
1103                table = %connection.table,
1104                "iceberg mint loaded/created table"
1105            );
1106
1107            *table_ready_capset = CapabilitySet::new();
1108
1109            let mut snapshots: Vec<_> = table.metadata().snapshots().cloned().collect();
1110            let resume = retrieve_upper_from_snapshots(&mut snapshots)?;
1111            let (resume_upper, resume_version) = match resume {
1112                Some((f, v)) => (f, v),
1113                None => (Antichain::from_elem(Timestamp::minimum()), 0),
1114            };
1115            debug!(
1116                ?sink_id,
1117                %name_for_logging,
1118                resume_upper = %resume_upper.pretty(),
1119                resume_version,
1120                as_of = %as_of.pretty(),
1121                "iceberg mint resume position loaded"
1122            );
1123
1124            // The input has overcompacted if
1125            let overcompacted =
1126                // ..we have made some progress in the past
1127                *resume_upper != [Timestamp::minimum()] &&
1128                // ..but the since frontier is now beyond that
1129                PartialOrder::less_than(&resume_upper, &as_of);
1130
1131            if overcompacted {
1132                let err = format!(
1133                    "{name_for_error}: input compacted past resume upper: as_of {}, resume_upper: {}",
1134                    as_of.pretty(),
1135                    resume_upper.pretty()
1136                );
1137                // This would normally be an assertion but because it can happen after a
1138                // Materialize backup/restore we log an error so that it appears on Sentry but
1139                // leaves the rest of the objects in the cluster unaffected.
1140                return Err(anyhow::anyhow!("{err}"));
1141            };
1142
1143            if resume_version > sink_version {
1144                anyhow::bail!("Fenced off by newer sink version: resume_version {}, sink_version {}", resume_version, sink_version);
1145            }
1146
1147            let mut initialized = false;
1148            let mut observed_frontier;
1149            let mut max_seen_ts: Option<Timestamp> = None;
1150            // Track minted batches to maintain a sliding window of open batch descriptions.
1151            // This is needed to know when to retire old batches and mint new ones.
1152            // It's "sortedness" is derived from the monotonicity of batch descriptions,
1153            // and the fact that we only ever push new descriptions to the back and pop from the front.
1154            let mut minted_batches = VecDeque::new();
1155            loop {
1156                if let Some(event) = input.next().await {
1157                    match event {
1158                        Event::Data([output_cap, _], mut data) => {
1159                            if !initialized {
1160                                for (_, ts, _) in data.iter() {
1161                                    match max_seen_ts.as_mut() {
1162                                        Some(max) => {
1163                                            if max.less_than(ts) {
1164                                                *max = ts.clone();
1165                                            }
1166                                        }
1167                                        None => {
1168                                            max_seen_ts = Some(ts.clone());
1169                                        }
1170                                    }
1171                                }
1172                            }
1173                            output.give_container(&output_cap, &mut data);
1174                            continue;
1175                        }
1176                        Event::Progress(frontier) => {
1177                            observed_frontier = frontier;
1178                        }
1179                    }
1180                } else {
1181                    return Ok(());
1182                }
1183
1184                if !initialized {
1185                    if observed_frontier.is_empty() {
1186                        // Bounded inputs can close (frontier becomes empty) before we finish
1187                        // initialization. For example, a loadgen source configured for a finite
1188                        // dataset may emit all rows at time t and then immediately close. If we
1189                        // saw any data, synthesize an upper one tick past the maximum timestamp
1190                        // so we can mint a snapshot batch and commit it.
1191                        if let Some(max_ts) = max_seen_ts.as_ref() {
1192                            let synthesized_upper =
1193                                Antichain::from_elem(max_ts.step_forward());
1194                            debug!(
1195                                ?sink_id,
1196                                %name_for_logging,
1197                                max_seen_ts = %max_ts,
1198                                synthesized_upper = %synthesized_upper.pretty(),
1199                                "iceberg mint input closed before initialization; using max seen ts"
1200                            );
1201                            observed_frontier = synthesized_upper;
1202                        } else {
1203                            debug!(
1204                                ?sink_id,
1205                                %name_for_logging,
1206                                "iceberg mint input closed before initialization with no data"
1207                            );
1208                            // Input stream closed before initialization completed and no data arrived.
1209                            return Ok(());
1210                        }
1211                    }
1212
1213                    // We only start minting after we've reached as_of and resume_upper to avoid
1214                    // minting batches that would be immediately skipped.
1215                    if PartialOrder::less_than(&observed_frontier, &resume_upper)
1216                        || PartialOrder::less_than(&observed_frontier, &as_of)
1217                    {
1218                        continue;
1219                    }
1220
1221                    let mut batch_descriptions = vec![];
1222                    let mut current_upper = observed_frontier.clone();
1223                    let current_upper_ts = current_upper.as_option().expect("frontier not empty").clone();
1224
1225                    // Create a catch-up batch from the later of resume_upper or as_of to current frontier.
1226                    // We use the later of the two because:
1227                    // - For fresh sinks: resume_upper = minimum, as_of = actual timestamp, data starts at as_of
1228                    // - For resuming: as_of <= resume_upper (enforced by overcompaction check), data starts at resume_upper
1229                    let batch_lower = if PartialOrder::less_than(&resume_upper, &as_of) {
1230                        as_of.clone()
1231                    } else {
1232                        resume_upper.clone()
1233                    };
1234
1235                    if batch_lower == current_upper {
1236                        // Snapshot! as_of is exactly at the frontier. We still need to mint
1237                        // a batch to create the snapshot, so we step the upper forward by one.
1238                        current_upper = Antichain::from_elem(current_upper_ts.step_forward());
1239                    }
1240
1241                    let batch_description = (batch_lower.clone(), current_upper.clone());
1242                    debug!(
1243                        ?sink_id,
1244                        %name_for_logging,
1245                        batch_lower = %batch_lower.pretty(),
1246                        current_upper = %current_upper.pretty(),
1247                        "iceberg mint initializing (catch-up batch)"
1248                    );
1249                    debug!(
1250                        "{}: creating catch-up batch [{}, {})",
1251                        name_for_logging,
1252                        batch_lower.pretty(),
1253                        current_upper.pretty()
1254                    );
1255                    batch_descriptions.push(batch_description);
1256                    // Mint initial future batch descriptions at configurable intervals
1257                    for i in 1..INITIAL_DESCRIPTIONS_TO_MINT {
1258                        let duration_millis = commit_interval.as_millis()
1259                            .checked_mul(u128::from(i))
1260                            .expect("commit interval multiplication overflow");
1261                        let duration_ts = Timestamp::new(
1262                            u64::try_from(duration_millis)
1263                                .expect("commit interval too large for u64"),
1264                        );
1265                        let desired_batch_upper = Antichain::from_elem(
1266                            current_upper_ts.step_forward_by(&duration_ts),
1267                        );
1268
1269                        let batch_description =
1270                            (current_upper.clone(), desired_batch_upper.clone());
1271                        debug!(
1272                            "{}: minting future batch {}/{} [{}, {})",
1273                            name_for_logging,
1274                            i,
1275                            INITIAL_DESCRIPTIONS_TO_MINT,
1276                            current_upper.pretty(),
1277                            desired_batch_upper.pretty()
1278                        );
1279                        current_upper = batch_description.1.clone();
1280                        batch_descriptions.push(batch_description);
1281                    }
1282
1283                    minted_batches.extend(batch_descriptions.clone());
1284
1285                    for desc in batch_descriptions {
1286                        batch_desc_output.give(&capset[0], desc);
1287                    }
1288
1289                    capset.downgrade(current_upper);
1290
1291                    initialized = true;
1292                } else {
1293                    if observed_frontier.is_empty() {
1294                        // We're done!
1295                        return Ok(());
1296                    }
1297                    // Maintain a sliding window: when the oldest batch becomes ready, retire it
1298                    // and mint a new future batch to keep the pipeline full
1299                    while let Some(oldest_desc) = minted_batches.front() {
1300                        let oldest_upper = &oldest_desc.1;
1301                        if !PartialOrder::less_equal(oldest_upper, &observed_frontier) {
1302                            break;
1303                        }
1304
1305                        let newest_upper = minted_batches.back().unwrap().1.clone();
1306                        let new_lower = newest_upper.clone();
1307                        let duration_ts = Timestamp::new(commit_interval.as_millis()
1308                            .try_into()
1309                            .expect("commit interval too large for u64"));
1310                        let new_upper = Antichain::from_elem(newest_upper
1311                            .as_option()
1312                            .unwrap()
1313                            .step_forward_by(&duration_ts));
1314
1315                        let new_batch_description = (new_lower.clone(), new_upper.clone());
1316                        minted_batches.pop_front();
1317                        minted_batches.push_back(new_batch_description.clone());
1318
1319                        batch_desc_output.give(&capset[0], new_batch_description);
1320
1321                        capset.downgrade(new_upper);
1322                    }
1323                }
1324            }
1325        })
1326    });
1327
1328    let statuses = errors.map(|error| HealthStatusMessage {
1329        id: None,
1330        update: HealthStatusUpdate::halting(format!("{}", error.display_with_causes()), None),
1331        namespace: StatusNamespace::Iceberg,
1332    });
1333    (
1334        output_stream.as_collection(),
1335        batch_desc_stream,
1336        table_ready_stream,
1337        statuses,
1338        button.press_on_drop(),
1339    )
1340}
1341
1342#[derive(Clone, Debug, Serialize, Deserialize)]
1343#[serde(try_from = "AvroDataFile", into = "AvroDataFile")]
1344struct SerializableDataFile {
1345    pub data_file: DataFile,
1346    pub schema: Schema,
1347}
1348
1349/// A wrapper around Iceberg's DataFile that implements Serialize and Deserialize.
1350/// This is slightly complicated by the fact that Iceberg's DataFile doesn't implement
1351/// these traits directly, so we serialize to/from Avro bytes (which Iceberg supports natively).
1352/// The avro ser(de) also requires the Iceberg schema to be provided, so we include that as well.
1353/// It is distinctly possible that this is overkill, but it avoids re-implementing
1354/// Iceberg's serialization logic here.
1355/// If at some point this becomes a serious overhead, we can revisit this decision.
1356#[derive(Clone, Debug, Serialize, Deserialize)]
1357struct AvroDataFile {
1358    pub data_file: Vec<u8>,
1359    /// Schema serialized as JSON bytes to avoid bincode issues with HashMap
1360    pub schema: Vec<u8>,
1361}
1362
1363impl From<SerializableDataFile> for AvroDataFile {
1364    fn from(value: SerializableDataFile) -> Self {
1365        let mut data_file = Vec::new();
1366        write_data_files_to_avro(
1367            &mut data_file,
1368            [value.data_file],
1369            &StructType::new(vec![]),
1370            FormatVersion::V2,
1371        )
1372        .expect("serialization into buffer");
1373        let schema = serde_json::to_vec(&value.schema).expect("schema serialization");
1374        AvroDataFile { data_file, schema }
1375    }
1376}
1377
1378impl TryFrom<AvroDataFile> for SerializableDataFile {
1379    type Error = String;
1380
1381    fn try_from(value: AvroDataFile) -> Result<Self, Self::Error> {
1382        let schema: Schema = serde_json::from_slice(&value.schema)
1383            .map_err(|e| format!("Failed to deserialize schema: {}", e))?;
1384        let data_files = read_data_files_from_avro(
1385            &mut &*value.data_file,
1386            &schema,
1387            0,
1388            &StructType::new(vec![]),
1389            FormatVersion::V2,
1390        )
1391        .map_err_to_string_with_causes()?;
1392        let Some(data_file) = data_files.into_iter().next() else {
1393            return Err("No DataFile found in Avro data".into());
1394        };
1395        Ok(SerializableDataFile { data_file, schema })
1396    }
1397}
1398
1399/// A DataFile along with its associated batch description (lower and upper bounds).
1400#[derive(Clone, Debug, Serialize, Deserialize)]
1401struct BoundedDataFile {
1402    pub data_file: SerializableDataFile,
1403    pub batch_desc: (Antichain<Timestamp>, Antichain<Timestamp>),
1404}
1405
1406impl BoundedDataFile {
1407    pub fn new(
1408        file: DataFile,
1409        schema: Schema,
1410        batch_desc: (Antichain<Timestamp>, Antichain<Timestamp>),
1411    ) -> Self {
1412        Self {
1413            data_file: SerializableDataFile {
1414                data_file: file,
1415                schema,
1416            },
1417            batch_desc,
1418        }
1419    }
1420
1421    pub fn batch_desc(&self) -> &(Antichain<Timestamp>, Antichain<Timestamp>) {
1422        &self.batch_desc
1423    }
1424
1425    pub fn data_file(&self) -> &DataFile {
1426        &self.data_file.data_file
1427    }
1428
1429    pub fn into_data_file(self) -> DataFile {
1430        self.data_file.data_file
1431    }
1432}
1433
1434/// A set of DataFiles along with their associated batch descriptions.
1435#[derive(Clone, Debug, Default)]
1436struct BoundedDataFileSet {
1437    pub data_files: Vec<BoundedDataFile>,
1438}
1439
1440/// Construct the envelope-specific closures that [`write_data_files`] needs.
1441///
1442/// Write rows into Parquet data files bounded by batch descriptions.
1443/// Rows are matched to batches by timestamp; if a batch description hasn't arrived yet,
1444/// rows are stashed until it does. This allows batches to be minted ahead of data arrival.
1445fn write_data_files<'scope, H: EnvelopeHandler + 'static>(
1446    name: String,
1447    input: VecCollection<'scope, Timestamp, (Option<Row>, DiffPair<Row>), Diff>,
1448    batch_desc_input: StreamVec<'scope, Timestamp, (Antichain<Timestamp>, Antichain<Timestamp>)>,
1449    table_ready_stream: StreamVec<'scope, Timestamp, Infallible>,
1450    as_of: Antichain<Timestamp>,
1451    connection: IcebergSinkConnection,
1452    storage_configuration: StorageConfiguration,
1453    materialize_arrow_schema: Arc<ArrowSchema>,
1454    metrics: Arc<IcebergSinkMetrics>,
1455    statistics: SinkStatistics,
1456) -> (
1457    StreamVec<'scope, Timestamp, BoundedDataFile>,
1458    StreamVec<'scope, Timestamp, HealthStatusMessage>,
1459    PressOnDropButton,
1460) {
1461    let scope = input.scope();
1462    let name_for_logging = name.clone();
1463    let mut builder = OperatorBuilder::new(name, scope.clone());
1464
1465    let (output, output_stream) = builder.new_output::<CapacityContainerBuilder<_>>();
1466
1467    let mut table_ready_input = builder.new_disconnected_input(table_ready_stream, Pipeline);
1468    let mut batch_desc_input =
1469        builder.new_input_for(batch_desc_input.broadcast(), Pipeline, &output);
1470    let mut input = builder.new_disconnected_input(input.inner, Pipeline);
1471
1472    let (button, errors) = builder.build_fallible(move |caps| {
1473        Box::pin(async move {
1474            let [capset]: &mut [_; 1] = caps.try_into().unwrap();
1475            let catalog = connection
1476                .catalog_connection
1477                .connect(&storage_configuration, InTask::Yes)
1478                .await
1479                .with_context(|| {
1480                    format!(
1481                        "Failed to connect to Iceberg catalog '{}' for table '{}.{}'",
1482                        connection.catalog_connection.uri, connection.namespace, connection.table
1483                    )
1484                })?;
1485
1486            let namespace_ident = NamespaceIdent::new(connection.namespace.clone());
1487            let table_ident = TableIdent::new(namespace_ident, connection.table.clone());
1488            while let Some(_) = table_ready_input.next().await {
1489                // Wait for table to be ready
1490            }
1491            let table = catalog
1492                .load_table(&table_ident)
1493                .await
1494                .with_context(|| {
1495                    format!(
1496                        "Failed to load Iceberg table '{}.{}' in write_data_files operator",
1497                        connection.namespace, connection.table
1498                    )
1499                })?;
1500
1501            let table_metadata = table.metadata().clone();
1502            let current_schema = Arc::clone(table_metadata.current_schema());
1503
1504            // Merge Materialize extension metadata into the Iceberg schema.
1505            // We need extension metadata for ArrowBuilder to work correctly (it uses
1506            // extension names to know how to handle different types like records vs arrays).
1507            let arrow_schema = Arc::new(
1508                merge_materialize_metadata_into_iceberg_schema(
1509                    materialize_arrow_schema.as_ref(),
1510                    current_schema.as_ref(),
1511                )
1512                .context("Failed to merge Materialize metadata into Iceberg schema")?,
1513            );
1514
1515            // WORKAROUND: S3 Tables catalog incorrectly sets location to the metadata file path
1516            // instead of the warehouse root. Strip off the /metadata/*.metadata.json suffix.
1517            // No clear way to detect this properly right now, so we use heuristics.
1518            let location = table_metadata.location();
1519            let corrected_location = match location.rsplit_once("/metadata/") {
1520                Some((a, b)) if b.ends_with(".metadata.json") => a,
1521                _ => location,
1522            };
1523
1524            let data_location = format!("{}/data", corrected_location);
1525            let location_generator = DefaultLocationGenerator::with_data_location(data_location);
1526
1527            // Add a unique suffix to avoid filename collisions across restarts and workers
1528            let unique_suffix = format!("-{}", uuid::Uuid::new_v4());
1529            let file_name_generator = DefaultFileNameGenerator::new(
1530                PARQUET_FILE_PREFIX.to_string(),
1531                Some(unique_suffix),
1532                iceberg::spec::DataFileFormat::Parquet,
1533            );
1534
1535            let file_io = table.file_io().clone();
1536
1537            let writer_properties = WriterProperties::new();
1538
1539            let ctx = WriterContext {
1540                arrow_schema,
1541                current_schema: Arc::clone(&current_schema),
1542                file_io,
1543                location_generator,
1544                file_name_generator,
1545                writer_properties,
1546            };
1547            let handler = H::new(ctx, &connection, &materialize_arrow_schema)?;
1548
1549            // Rows can arrive before their batch description due to dataflow parallelism.
1550            // Stash them until we know which batch they belong to.
1551            let mut stashed_rows: BTreeMap<Timestamp, Vec<(Option<Row>, DiffPair<Row>)>> =
1552                BTreeMap::new();
1553
1554            // Track batches currently being written. When a row arrives, we check if it belongs
1555            // to an in-flight batch. When frontiers advance to a batch's upper, we close the
1556            // writer and emit its data files downstream.
1557            // Antichains don't implement Ord, so we use a HashMap with tuple keys instead.
1558            #[allow(clippy::disallowed_types)]
1559            let mut in_flight_batches: std::collections::HashMap<
1560                (Antichain<Timestamp>, Antichain<Timestamp>),
1561                Box<dyn IcebergWriter>,
1562            > = std::collections::HashMap::new();
1563
1564            let mut batch_description_frontier = Antichain::from_elem(Timestamp::minimum());
1565            let mut processed_batch_description_frontier =
1566                Antichain::from_elem(Timestamp::minimum());
1567            let mut input_frontier = Antichain::from_elem(Timestamp::minimum());
1568            let mut processed_input_frontier = Antichain::from_elem(Timestamp::minimum());
1569
1570            // Track the minimum batch lower bound to prune data that's already committed
1571            let mut min_batch_lower: Option<Antichain<Timestamp>> = None;
1572
1573            while !(batch_description_frontier.is_empty() && input_frontier.is_empty()) {
1574                let mut staged_messages_since_flush: u64 = 0;
1575                tokio::select! {
1576                    _ = batch_desc_input.ready() => {},
1577                    _ = input.ready() => {}
1578                }
1579
1580                while let Some(event) = batch_desc_input.next_sync() {
1581                    match event {
1582                        Event::Data(_cap, data) => {
1583                            for batch_desc in data {
1584                                let (lower, upper) = &batch_desc;
1585
1586                                // Track the minimum batch lower bound (first batch received)
1587                                if min_batch_lower.is_none() {
1588                                    min_batch_lower = Some(lower.clone());
1589                                    debug!(
1590                                        "{}: set min_batch_lower to {}",
1591                                        name_for_logging,
1592                                        lower.pretty()
1593                                    );
1594
1595                                    // Prune any stashed rows that arrived before min_batch_lower (already committed)
1596                                    let to_remove: Vec<_> = stashed_rows
1597                                        .keys()
1598                                        .filter(|ts| {
1599                                            let ts_antichain = Antichain::from_elem((*ts).clone());
1600                                            PartialOrder::less_than(&ts_antichain, lower)
1601                                        })
1602                                        .cloned()
1603                                        .collect();
1604
1605                                    if !to_remove.is_empty() {
1606                                        let mut removed_count = 0;
1607                                        for ts in to_remove {
1608                                            if let Some(rows) = stashed_rows.remove(&ts) {
1609                                                removed_count += rows.len();
1610                                                for _ in &rows {
1611                                                    metrics.stashed_rows.dec();
1612                                                }
1613                                            }
1614                                        }
1615                                        debug!(
1616                                            "{}: pruned {} already-committed rows (< min_batch_lower)",
1617                                            name_for_logging,
1618                                            removed_count
1619                                        );
1620                                    }
1621                                }
1622
1623                                // Disable seen_rows tracking for snapshot batch to save memory
1624                                let is_snapshot = lower == &as_of;
1625                                debug!(
1626                                    "{}: received batch description [{}, {}), snapshot={}",
1627                                    name_for_logging,
1628                                    lower.pretty(),
1629                                    upper.pretty(),
1630                                    is_snapshot
1631                                );
1632                                let mut batch_writer =
1633                                    handler.create_writer(is_snapshot).await?;
1634                                // Drain any stashed rows that belong to this batch
1635                                let row_ts_keys: Vec<_> = stashed_rows.keys().cloned().collect();
1636                                let mut drained_count = 0;
1637                                for row_ts in row_ts_keys {
1638                                    let ts = Antichain::from_elem(row_ts.clone());
1639                                    if PartialOrder::less_equal(lower, &ts)
1640                                        && PartialOrder::less_than(&ts, upper)
1641                                    {
1642                                        if let Some(rows) = stashed_rows.remove(&row_ts) {
1643                                            drained_count += rows.len();
1644                                            for (_row, diff_pair) in rows {
1645                                                metrics.stashed_rows.dec();
1646                                                let record_batch = handler.row_to_batch(
1647                                                    diff_pair.clone(),
1648                                                    row_ts.clone(),
1649                                                )
1650                                                .context("failed to convert row to recordbatch")?;
1651                                                batch_writer.write(record_batch).await?;
1652                                                staged_messages_since_flush += 1;
1653                                                if staged_messages_since_flush >= 10_000 {
1654                                                    statistics.inc_messages_staged_by(
1655                                                        staged_messages_since_flush,
1656                                                    );
1657                                                    staged_messages_since_flush = 0;
1658                                                }
1659                                            }
1660                                        }
1661                                    }
1662                                }
1663                                if drained_count > 0 {
1664                                    debug!(
1665                                        "{}: drained {} stashed rows into batch [{}, {})",
1666                                        name_for_logging,
1667                                        drained_count,
1668                                        lower.pretty(),
1669                                        upper.pretty()
1670                                    );
1671                                }
1672                                let prev =
1673                                    in_flight_batches.insert(batch_desc.clone(), batch_writer);
1674                                if prev.is_some() {
1675                                    anyhow::bail!(
1676                                        "Duplicate batch description received for description {:?}",
1677                                        batch_desc
1678                                    );
1679                                }
1680                            }
1681                        }
1682                        Event::Progress(frontier) => {
1683                            batch_description_frontier = frontier;
1684                        }
1685                    }
1686                }
1687
1688                let ready_events = std::iter::from_fn(|| input.next_sync()).collect_vec();
1689                for event in ready_events {
1690                    match event {
1691                        Event::Data(_cap, data) => {
1692                            let mut dropped_per_time = BTreeMap::new();
1693                            let mut stashed_per_time = BTreeMap::new();
1694                            for ((row, diff_pair), ts, _diff) in data {
1695                                let row_ts = ts.clone();
1696                                let ts_antichain = Antichain::from_elem(row_ts.clone());
1697                                let mut written = false;
1698                                // Try writing the row to any in-flight batch it belongs to...
1699                                for (batch_desc, batch_writer) in in_flight_batches.iter_mut() {
1700                                    let (lower, upper) = batch_desc;
1701                                    if PartialOrder::less_equal(lower, &ts_antichain)
1702                                        && PartialOrder::less_than(&ts_antichain, upper)
1703                                    {
1704                                        let record_batch = handler.row_to_batch(
1705                                            diff_pair.clone(),
1706                                            row_ts.clone(),
1707                                        )
1708                                        .context("failed to convert row to recordbatch")?;
1709                                        batch_writer.write(record_batch).await?;
1710                                        staged_messages_since_flush += 1;
1711                                        if staged_messages_since_flush >= 10_000 {
1712                                            statistics.inc_messages_staged_by(
1713                                                staged_messages_since_flush,
1714                                            );
1715                                            staged_messages_since_flush = 0;
1716                                        }
1717                                        written = true;
1718                                        break;
1719                                    }
1720                                }
1721                                if !written {
1722                                    // Drop data that's before the first batch we received (already committed)
1723                                    if let Some(ref min_lower) = min_batch_lower {
1724                                        if PartialOrder::less_than(&ts_antichain, min_lower) {
1725                                            dropped_per_time
1726                                                .entry(ts_antichain.into_option().unwrap())
1727                                                .and_modify(|c| *c += 1)
1728                                                .or_insert(1);
1729                                            continue;
1730                                        }
1731                                    }
1732
1733                                    stashed_per_time.entry(ts).and_modify(|c| *c += 1).or_insert(1);
1734                                    let entry = stashed_rows.entry(row_ts).or_default();
1735                                    metrics.stashed_rows.inc();
1736                                    entry.push((row, diff_pair));
1737                                }
1738                            }
1739
1740                            for (ts, count) in dropped_per_time {
1741                                debug!(
1742                                    "{}: dropped {} rows at timestamp {} (< min_batch_lower, already committed)",
1743                                    name_for_logging, count, ts
1744                                );
1745                            }
1746
1747                            for (ts, count) in stashed_per_time {
1748                                debug!(
1749                                    "{}: stashed {} rows at timestamp {} (waiting for batch description)",
1750                                    name_for_logging, count, ts
1751                                );
1752                            }
1753                        }
1754                        Event::Progress(frontier) => {
1755                            input_frontier = frontier;
1756                        }
1757                    }
1758                }
1759                if staged_messages_since_flush > 0 {
1760                    statistics.inc_messages_staged_by(staged_messages_since_flush);
1761                }
1762
1763                // Check if frontiers have advanced, which may unlock batches ready to close
1764                if PartialOrder::less_than(
1765                    &processed_batch_description_frontier,
1766                    &batch_description_frontier,
1767                ) || PartialOrder::less_than(&processed_input_frontier, &input_frontier)
1768                {
1769                    // Close batches whose upper is now in the past
1770                    // Upper bounds are exclusive, so we check if upper is less_equal to the frontier.
1771                    // Remember: a frontier at x means all timestamps less than x have been observed.
1772                    // Or, in other words we still might yet see timestamps at [x, infinity). X itself will
1773                    // be covered by the _next_ batches lower inclusive bound, so we can safely close the batch if its upper is <= x.
1774                    let ready_batches: Vec<_> = in_flight_batches
1775                        .extract_if(|(lower, upper), _| {
1776                            PartialOrder::less_than(lower, &batch_description_frontier)
1777                                && PartialOrder::less_equal(upper, &input_frontier)
1778                        })
1779                        .collect();
1780
1781                    if !ready_batches.is_empty() {
1782                        debug!(
1783                            "{}: closing {} batches (batch_frontier: {}, input_frontier: {})",
1784                            name_for_logging,
1785                            ready_batches.len(),
1786                            batch_description_frontier.pretty(),
1787                            input_frontier.pretty()
1788                        );
1789                        let mut max_upper = Antichain::from_elem(Timestamp::minimum());
1790                        for (desc, mut batch_writer) in ready_batches {
1791                            let close_started_at = Instant::now();
1792                            let data_files = batch_writer.close().await;
1793                            metrics
1794                                .writer_close_duration_seconds
1795                                .observe(close_started_at.elapsed().as_secs_f64());
1796                            let data_files = data_files.context("Failed to close batch writer")?;
1797                            debug!(
1798                                "{}: closed batch [{}, {}), wrote {} files",
1799                                name_for_logging,
1800                                desc.0.pretty(),
1801                                desc.1.pretty(),
1802                                data_files.len()
1803                            );
1804                            for data_file in data_files {
1805                                match data_file.content_type() {
1806                                    iceberg::spec::DataContentType::Data => {
1807                                        metrics.data_files_written.inc();
1808                                    }
1809                                    iceberg::spec::DataContentType::PositionDeletes
1810                                    | iceberg::spec::DataContentType::EqualityDeletes => {
1811                                        metrics.delete_files_written.inc();
1812                                    }
1813                                }
1814                                statistics.inc_messages_staged_by(data_file.record_count());
1815                                statistics.inc_bytes_staged_by(data_file.file_size_in_bytes());
1816                                let file = BoundedDataFile::new(
1817                                    data_file,
1818                                    current_schema.as_ref().clone(),
1819                                    desc.clone(),
1820                                );
1821                                output.give(&capset[0], file);
1822                            }
1823
1824                            max_upper = max_upper.join(&desc.1);
1825                        }
1826
1827                        capset.downgrade(max_upper);
1828                    }
1829                    processed_batch_description_frontier.clone_from(&batch_description_frontier);
1830                    processed_input_frontier.clone_from(&input_frontier);
1831                }
1832            }
1833            Ok(())
1834        })
1835    });
1836
1837    let statuses = errors.map(|error| HealthStatusMessage {
1838        id: None,
1839        update: HealthStatusUpdate::halting(format!("{}", error.display_with_causes()), None),
1840        namespace: StatusNamespace::Iceberg,
1841    });
1842    (output_stream, statuses, button.press_on_drop())
1843}
1844
1845#[cfg(test)]
1846mod tests {
1847    use super::*;
1848    use mz_repr::SqlScalarType;
1849
1850    #[mz_ore::test]
1851    fn test_iceberg_type_overrides() {
1852        // UInt16 should override to Int32
1853        let result = iceberg_type_overrides(&SqlScalarType::UInt16);
1854        assert_eq!(result.unwrap().0, DataType::Int32);
1855
1856        // UInt32 should override to Int64
1857        let result = iceberg_type_overrides(&SqlScalarType::UInt32);
1858        assert_eq!(result.unwrap().0, DataType::Int64);
1859
1860        // UInt64 should override to Decimal128(20, 0)
1861        let result = iceberg_type_overrides(&SqlScalarType::UInt64);
1862        assert_eq!(
1863            result.unwrap().0,
1864            DataType::Decimal128(ICEBERG_UINT64_DECIMAL_PRECISION, 0)
1865        );
1866
1867        // MzTimestamp should override to Decimal128(20, 0)
1868        let result = iceberg_type_overrides(&SqlScalarType::MzTimestamp);
1869        assert_eq!(
1870            result.unwrap().0,
1871            DataType::Decimal128(ICEBERG_UINT64_DECIMAL_PRECISION, 0)
1872        );
1873
1874        // Other types should return None (use default)
1875        assert!(iceberg_type_overrides(&SqlScalarType::Int32).is_none());
1876        assert!(iceberg_type_overrides(&SqlScalarType::String).is_none());
1877        assert!(iceberg_type_overrides(&SqlScalarType::Bool).is_none());
1878    }
1879
1880    #[mz_ore::test]
1881    fn test_iceberg_schema_with_nested_uint64() {
1882        // Test that desc_to_schema_with_overrides handles nested UInt64
1883        // by using iceberg_type_overrides which applies recursively
1884        let desc = mz_repr::RelationDesc::builder()
1885            .with_column(
1886                "items",
1887                SqlScalarType::List {
1888                    element_type: Box::new(SqlScalarType::UInt64),
1889                    custom_id: None,
1890                }
1891                .nullable(true),
1892            )
1893            .finish();
1894
1895        let schema =
1896            mz_arrow_util::builder::desc_to_schema_with_overrides(&desc, iceberg_type_overrides)
1897                .expect("schema conversion should succeed");
1898
1899        // The inner element should be Decimal128, not UInt64
1900        if let DataType::List(field) = schema.field(0).data_type() {
1901            assert_eq!(
1902                field.data_type(),
1903                &DataType::Decimal128(ICEBERG_UINT64_DECIMAL_PRECISION, 0)
1904            );
1905        } else {
1906            panic!("Expected List type");
1907        }
1908    }
1909
1910    #[mz_ore::test]
1911    fn equality_ids_follow_iceberg_field_ids() {
1912        let map_entries = Field::new(
1913            "entries",
1914            DataType::Struct(
1915                vec![
1916                    Field::new("key", DataType::Utf8, false),
1917                    Field::new("value", DataType::Utf8, true),
1918                ]
1919                .into(),
1920            ),
1921            false,
1922        );
1923        let materialize_arrow_schema = ArrowSchema::new(vec![
1924            Field::new("attrs", DataType::Map(Arc::new(map_entries), false), true),
1925            Field::new("key_col", DataType::Int32, false),
1926        ]);
1927        let materialize_arrow_schema = add_field_ids_to_arrow_schema(materialize_arrow_schema);
1928        let iceberg_schema = arrow_schema_to_schema(&materialize_arrow_schema)
1929            .expect("schema conversion should succeed");
1930
1931        let equality_ids =
1932            equality_ids_for_indices(&iceberg_schema, &materialize_arrow_schema, &[1])
1933                .expect("field lookup should succeed");
1934
1935        let expected_id = iceberg_schema
1936            .as_struct()
1937            .field_by_name("key_col")
1938            .expect("top-level field should exist")
1939            .id;
1940        assert_eq!(equality_ids, vec![expected_id]);
1941        assert_ne!(expected_id, 2);
1942    }
1943}
1944
1945/// Commit completed batches to Iceberg as snapshots.
1946/// Batches are committed in timestamp order to ensure strong consistency guarantees downstream.
1947/// Each snapshot includes the Materialize frontier in its metadata for resume support.
1948fn commit_to_iceberg<'scope>(
1949    name: String,
1950    sink_id: GlobalId,
1951    sink_version: u64,
1952    batch_input: StreamVec<'scope, Timestamp, BoundedDataFile>,
1953    batch_desc_input: StreamVec<'scope, Timestamp, (Antichain<Timestamp>, Antichain<Timestamp>)>,
1954    table_ready_stream: StreamVec<'scope, Timestamp, Infallible>,
1955    write_frontier: Rc<RefCell<Antichain<Timestamp>>>,
1956    connection: IcebergSinkConnection,
1957    storage_configuration: StorageConfiguration,
1958    write_handle: impl Future<
1959        Output = anyhow::Result<WriteHandle<SourceData, (), Timestamp, StorageDiff>>,
1960    > + 'static,
1961    metrics: Arc<IcebergSinkMetrics>,
1962    statistics: SinkStatistics,
1963) -> (
1964    StreamVec<'scope, Timestamp, HealthStatusMessage>,
1965    PressOnDropButton,
1966) {
1967    let scope = batch_input.scope();
1968    let mut builder = OperatorBuilder::new(name, scope.clone());
1969
1970    let hashed_id = sink_id.hashed();
1971    let is_active_worker = usize::cast_from(hashed_id) % scope.peers() == scope.index();
1972    let name_for_logging = format!("{sink_id}-commit-to-iceberg");
1973
1974    let mut input = builder.new_disconnected_input(batch_input, Exchange::new(move |_| hashed_id));
1975    let mut batch_desc_input =
1976        builder.new_disconnected_input(batch_desc_input, Exchange::new(move |_| hashed_id));
1977    let mut table_ready_input = builder.new_disconnected_input(table_ready_stream, Pipeline);
1978
1979    let (button, errors) = builder.build_fallible(move |_caps| {
1980        Box::pin(async move {
1981            if !is_active_worker {
1982                write_frontier.borrow_mut().clear();
1983                return Ok(());
1984            }
1985
1986            let catalog = connection
1987                .catalog_connection
1988                .connect(&storage_configuration, InTask::Yes)
1989                .await
1990                .with_context(|| {
1991                    format!(
1992                        "Failed to connect to Iceberg catalog '{}' for table '{}.{}'",
1993                        connection.catalog_connection.uri, connection.namespace, connection.table
1994                    )
1995                })?;
1996
1997            let mut write_handle = write_handle.await?;
1998
1999            let namespace_ident = NamespaceIdent::new(connection.namespace.clone());
2000            let table_ident = TableIdent::new(namespace_ident, connection.table.clone());
2001            while let Some(_) = table_ready_input.next().await {
2002                // Wait for table to be ready
2003            }
2004            let mut table = catalog.load_table(&table_ident).await.with_context(|| {
2005                format!(
2006                    "Failed to load Iceberg table '{}.{}' in commit_to_iceberg operator",
2007                    connection.namespace, connection.table
2008                )
2009            })?;
2010
2011            #[allow(clippy::disallowed_types)]
2012            let mut batch_descriptions: std::collections::HashMap<
2013                (Antichain<Timestamp>, Antichain<Timestamp>),
2014                BoundedDataFileSet,
2015            > = std::collections::HashMap::new();
2016
2017            let mut batch_description_frontier = Antichain::from_elem(Timestamp::minimum());
2018            let mut input_frontier = Antichain::from_elem(Timestamp::minimum());
2019
2020            while !(batch_description_frontier.is_empty() && input_frontier.is_empty()) {
2021                tokio::select! {
2022                    _ = batch_desc_input.ready() => {},
2023                    _ = input.ready() => {}
2024                }
2025
2026                while let Some(event) = batch_desc_input.next_sync() {
2027                    match event {
2028                        Event::Data(_cap, data) => {
2029                            for batch_desc in data {
2030                                let prev = batch_descriptions
2031                                    .insert(batch_desc, BoundedDataFileSet { data_files: vec![] });
2032                                if let Some(prev) = prev {
2033                                    anyhow::bail!(
2034                                        "Duplicate batch description received \
2035                                         in commit operator: {:?}",
2036                                        prev
2037                                    );
2038                                }
2039                            }
2040                        }
2041                        Event::Progress(frontier) => {
2042                            batch_description_frontier = frontier;
2043                        }
2044                    }
2045                }
2046
2047                let ready_events = std::iter::from_fn(|| input.next_sync()).collect_vec();
2048                for event in ready_events {
2049                    match event {
2050                        Event::Data(_cap, data) => {
2051                            for bounded_data_file in data {
2052                                let entry = batch_descriptions
2053                                    .entry(bounded_data_file.batch_desc().clone())
2054                                    .or_default();
2055                                entry.data_files.push(bounded_data_file);
2056                            }
2057                        }
2058                        Event::Progress(frontier) => {
2059                            input_frontier = frontier;
2060                        }
2061                    }
2062                }
2063
2064                // Collect batches whose data files have all arrived.
2065                // The writer emits all data files for a batch at a capability <= the batch's
2066                // lower bound, then downgrades its capability to the batch's upper bound.
2067                // So once the input frontier advances past lower, we know the writer has
2068                // finished emitting files for this batch and dropped its capability.
2069                let mut done_batches: Vec<_> = batch_descriptions
2070                    .keys()
2071                    .filter(|(lower, _upper)| PartialOrder::less_than(lower, &input_frontier))
2072                    .cloned()
2073                    .collect();
2074
2075                // Commit batches in timestamp order to maintain consistency
2076                done_batches.sort_by(|a, b| {
2077                    if PartialOrder::less_than(a, b) {
2078                        Ordering::Less
2079                    } else if PartialOrder::less_than(b, a) {
2080                        Ordering::Greater
2081                    } else {
2082                        Ordering::Equal
2083                    }
2084                });
2085
2086                for batch in done_batches {
2087                    let file_set = batch_descriptions.remove(&batch).unwrap();
2088
2089                    let mut data_files = vec![];
2090                    let mut delete_files = vec![];
2091                    // Track totals for committed statistics
2092                    let mut total_messages: u64 = 0;
2093                    let mut total_bytes: u64 = 0;
2094                    for file in file_set.data_files {
2095                        total_messages += file.data_file().record_count();
2096                        total_bytes += file.data_file().file_size_in_bytes();
2097                        match file.data_file().content_type() {
2098                            iceberg::spec::DataContentType::Data => {
2099                                data_files.push(file.into_data_file());
2100                            }
2101                            iceberg::spec::DataContentType::PositionDeletes
2102                            | iceberg::spec::DataContentType::EqualityDeletes => {
2103                                delete_files.push(file.into_data_file());
2104                            }
2105                        }
2106                    }
2107
2108                    debug!(
2109                        ?sink_id,
2110                        %name_for_logging,
2111                        lower = %batch.0.pretty(),
2112                        upper = %batch.1.pretty(),
2113                        data_files = data_files.len(),
2114                        delete_files = delete_files.len(),
2115                        total_messages,
2116                        total_bytes,
2117                        "iceberg commit applying batch"
2118                    );
2119
2120                    let instant = Instant::now();
2121
2122                    let frontier = batch.1.clone();
2123                    let frontier_json = serde_json::to_string(&frontier.elements())
2124                        .context("Failed to serialize frontier to JSON")?;
2125                    let snapshot_properties = vec![
2126                        ("mz-sink-id".to_string(), sink_id.to_string()),
2127                        ("mz-frontier".to_string(), frontier_json),
2128                        ("mz-sink-version".to_string(), sink_version.to_string()),
2129                    ];
2130
2131                    let (table_state, commit_result) = Retry::default()
2132                        .max_tries(5)
2133                        .retry_async_with_state(table, |_, table| {
2134                            let snapshot_properties = snapshot_properties.clone();
2135                            let data_files = data_files.clone();
2136                            let delete_files = delete_files.clone();
2137                            let metrics = Arc::clone(&metrics);
2138                            let catalog = Arc::clone(&catalog);
2139                            let conn_namespace = connection.namespace.clone();
2140                            let conn_table = connection.table.clone();
2141                            let frontier = frontier.clone();
2142                            let batch_lower = batch.0.clone();
2143                            let batch_upper = batch.1.clone();
2144                            async move {
2145                                try_commit_batch(
2146                                    table,
2147                                    snapshot_properties,
2148                                    data_files,
2149                                    delete_files,
2150                                    catalog.as_ref(),
2151                                    &conn_namespace,
2152                                    &conn_table,
2153                                    sink_version,
2154                                    &frontier,
2155                                    &batch_lower,
2156                                    &batch_upper,
2157                                    &metrics,
2158                                )
2159                                .await
2160                            }
2161                        })
2162                        .await;
2163                    let commit_result = commit_result.with_context(|| {
2164                        format!(
2165                            "failed to commit batch to Iceberg table '{}.{}'",
2166                            connection.namespace, connection.table
2167                        )
2168                    });
2169                    table = table_state;
2170                    let duration = instant.elapsed();
2171                    metrics
2172                        .commit_duration_seconds
2173                        .observe(duration.as_secs_f64());
2174                    commit_result?;
2175
2176                    debug!(
2177                        ?sink_id,
2178                        %name_for_logging,
2179                        lower = %batch.0.pretty(),
2180                        upper = %batch.1.pretty(),
2181                        total_messages,
2182                        total_bytes,
2183                        ?duration,
2184                        "iceberg commit applied batch"
2185                    );
2186
2187                    metrics.snapshots_committed.inc();
2188                    statistics.inc_messages_committed_by(total_messages);
2189                    statistics.inc_bytes_committed_by(total_bytes);
2190
2191                    let mut expect_upper = write_handle.shared_upper();
2192                    loop {
2193                        if PartialOrder::less_equal(&frontier, &expect_upper) {
2194                            // The frontier has already been advanced as far as necessary.
2195                            break;
2196                        }
2197
2198                        const EMPTY: &[((SourceData, ()), Timestamp, StorageDiff)] = &[];
2199                        match write_handle
2200                            .compare_and_append(EMPTY, expect_upper, frontier.clone())
2201                            .await
2202                            .expect("valid usage")
2203                        {
2204                            Ok(()) => break,
2205                            Err(mismatch) => {
2206                                expect_upper = mismatch.current;
2207                            }
2208                        }
2209                    }
2210                    write_frontier.borrow_mut().clone_from(&frontier);
2211                }
2212            }
2213
2214            Ok(())
2215        })
2216    });
2217
2218    let statuses = errors.map(|error| HealthStatusMessage {
2219        id: None,
2220        update: HealthStatusUpdate::halting(format!("{}", error.display_with_causes()), None),
2221        namespace: StatusNamespace::Iceberg,
2222    });
2223
2224    (statuses, button.press_on_drop())
2225}
2226
2227impl<'scope> SinkRender<'scope> for IcebergSinkConnection {
2228    fn get_key_indices(&self) -> Option<&[usize]> {
2229        self.key_desc_and_indices
2230            .as_ref()
2231            .map(|(_, indices)| indices.as_slice())
2232    }
2233
2234    fn get_relation_key_indices(&self) -> Option<&[usize]> {
2235        self.relation_key_indices.as_deref()
2236    }
2237
2238    fn render_sink(
2239        &self,
2240        storage_state: &mut StorageState,
2241        sink: &StorageSinkDesc<CollectionMetadata, Timestamp>,
2242        sink_id: GlobalId,
2243        input: VecCollection<'scope, Timestamp, (Option<Row>, DiffPair<Row>), Diff>,
2244        _err_collection: VecCollection<'scope, Timestamp, DataflowError, Diff>,
2245    ) -> (
2246        StreamVec<'scope, Timestamp, HealthStatusMessage>,
2247        Vec<PressOnDropButton>,
2248    ) {
2249        let scope = input.scope();
2250
2251        let write_handle = {
2252            let persist = Arc::clone(&storage_state.persist_clients);
2253            let shard_meta = sink.to_storage_metadata.clone();
2254            async move {
2255                let client = persist.open(shard_meta.persist_location).await?;
2256                let handle = client
2257                    .open_writer(
2258                        shard_meta.data_shard,
2259                        Arc::new(shard_meta.relation_desc),
2260                        Arc::new(UnitSchema),
2261                        Diagnostics::from_purpose("sink handle"),
2262                    )
2263                    .await?;
2264                Ok(handle)
2265            }
2266        };
2267
2268        let write_frontier = Rc::new(RefCell::new(Antichain::from_elem(Timestamp::minimum())));
2269        storage_state
2270            .sink_write_frontiers
2271            .insert(sink_id, Rc::clone(&write_frontier));
2272
2273        let (arrow_schema_with_ids, iceberg_schema) =
2274            match (|| -> Result<(ArrowSchema, Arc<Schema>), anyhow::Error> {
2275                let (arrow_schema_with_ids, iceberg_schema) =
2276                    relation_desc_to_iceberg_schema(&sink.from_desc)?;
2277
2278                Ok(if sink.envelope == SinkEnvelope::Append {
2279                    // For append mode, extend the Arrow and Iceberg schemas with the user-visible
2280                    // `_mz_diff` and `_mz_timestamp` columns. The minter uses `iceberg_schema` to create
2281                    // the Iceberg table, and `write_data_files` uses `arrow_schema_with_ids` when
2282                    // merging metadata. Both must include these columns before any operator starts.
2283                    let extended_arrow = build_schema_with_append_columns(&arrow_schema_with_ids);
2284                    let extended_iceberg = Arc::new(
2285                        arrow_schema_to_schema(&extended_arrow)
2286                            .context("Failed to build Iceberg schema with append columns")?,
2287                    );
2288                    (extended_arrow, extended_iceberg)
2289                } else {
2290                    (arrow_schema_with_ids, iceberg_schema)
2291                })
2292            })() {
2293                Ok(schemas) => schemas,
2294                Err(err) => {
2295                    let error_stream = std::iter::once(HealthStatusMessage {
2296                        id: None,
2297                        update: HealthStatusUpdate::halting(
2298                            format!("{}", err.display_with_causes()),
2299                            None,
2300                        ),
2301                        namespace: StatusNamespace::Iceberg,
2302                    })
2303                    .to_stream(scope);
2304                    return (error_stream, vec![]);
2305                }
2306            };
2307
2308        let metrics = Arc::new(
2309            storage_state
2310                .metrics
2311                .get_iceberg_sink_metrics(sink_id, scope.index()),
2312        );
2313
2314        let statistics = storage_state
2315            .aggregated_statistics
2316            .get_sink(&sink_id)
2317            .expect("statistics initialized")
2318            .clone();
2319
2320        let connection_for_minter = self.clone();
2321        let (minted_input, batch_descriptions, table_ready, mint_status, mint_button) =
2322            mint_batch_descriptions(
2323                format!("{sink_id}-iceberg-mint"),
2324                sink_id,
2325                input,
2326                sink,
2327                connection_for_minter,
2328                storage_state.storage_configuration.clone(),
2329                Arc::clone(&iceberg_schema),
2330            );
2331
2332        let connection_for_writer = self.clone();
2333        let (datafiles, write_status, write_button) = match sink.envelope {
2334            SinkEnvelope::Upsert => write_data_files::<UpsertEnvelopeHandler>(
2335                format!("{sink_id}-write-data-files"),
2336                minted_input,
2337                batch_descriptions.clone(),
2338                table_ready.clone(),
2339                sink.as_of.clone(),
2340                connection_for_writer,
2341                storage_state.storage_configuration.clone(),
2342                Arc::new(arrow_schema_with_ids.clone()),
2343                Arc::clone(&metrics),
2344                statistics.clone(),
2345            ),
2346            SinkEnvelope::Append => write_data_files::<AppendEnvelopeHandler>(
2347                format!("{sink_id}-write-data-files"),
2348                minted_input,
2349                batch_descriptions.clone(),
2350                table_ready.clone(),
2351                sink.as_of.clone(),
2352                connection_for_writer,
2353                storage_state.storage_configuration.clone(),
2354                Arc::new(arrow_schema_with_ids.clone()),
2355                Arc::clone(&metrics),
2356                statistics.clone(),
2357            ),
2358            SinkEnvelope::Debezium => {
2359                unreachable!("Iceberg sink only supports Upsert and Append envelopes")
2360            }
2361        };
2362
2363        let connection_for_committer = self.clone();
2364        let (commit_status, commit_button) = commit_to_iceberg(
2365            format!("{sink_id}-commit-to-iceberg"),
2366            sink_id,
2367            sink.version,
2368            datafiles,
2369            batch_descriptions,
2370            table_ready,
2371            Rc::clone(&write_frontier),
2372            connection_for_committer,
2373            storage_state.storage_configuration.clone(),
2374            write_handle,
2375            Arc::clone(&metrics),
2376            statistics,
2377        );
2378
2379        let running_status = Some(HealthStatusMessage {
2380            id: None,
2381            update: HealthStatusUpdate::running(),
2382            namespace: StatusNamespace::Iceberg,
2383        })
2384        .to_stream(scope);
2385
2386        let statuses =
2387            scope.concatenate([running_status, mint_status, write_status, commit_status]);
2388
2389        (statuses, vec![mint_button, write_button, commit_button])
2390    }
2391}