Skip to main content

mz_storage/sink/
iceberg.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Iceberg sink implementation.
11//!
12//! This code renders a [`IcebergSinkConnection`] into a dataflow that writes
13//! data to an Iceberg table. The dataflow consists of three operators:
14//!
15//! ```text
16//!        ┏━━━━━━━━━━━━━━┓
17//!        ┃   persist    ┃
18//!        ┃    source    ┃
19//!        ┗━━━━━━┯━━━━━━━┛
20//!               │ row data, the input to this module
21//!               │
22//!        ┏━━━━━━v━━━━━━━┓
23//!        ┃     mint     ┃ (single worker)
24//!        ┃    batch     ┃ loads/creates the Iceberg table,
25//!        ┃ descriptions ┃ determines resume upper
26//!        ┗━━━┯━━━━━┯━━━━┛
27//!            │     │ batch descriptions (broadcast)
28//!       rows │     ├─────────────────────────┐
29//!            │     │                         │
30//!        ┏━━━v━━━━━v━━━━┓    ╭─────────────╮ │
31//!        ┃    write     ┃───>│ S3 / object │ │
32//!        ┃  data files  ┃    │   storage   │ │
33//!        ┗━━━━━━┯━━━━━━━┛    ╰─────────────╯ │
34//!               │ file metadata              │
35//!               │                            │
36//!        ┏━━━━━━v━━━━━━━━━━━━━━━━━━━━━━━━━━━━v┓
37//!        ┃           commit to                ┃ (single worker)
38//!        ┃             iceberg                ┃
39//!        ┗━━━━━━━━━━━━━┯━━━━━━━━━━━━━━━━━━━━━━┛
40//!                      │
41//!              ╭───────v───────╮
42//!              │ Iceberg table │
43//!              │  (snapshots)  │
44//!              ╰───────────────╯
45//! ```
46//! # Minting batch descriptions
47//! The "mint batch descriptions" operator is responsible for generating
48//! time-based batch boundaries that group writes into Iceberg snapshots.
49//! It maintains a sliding window of future batch descriptions so that
50//! writers can start processing data even while earlier batches are still being written.
51//! Knowing the batch boundaries ahead of time is important because we need to
52//! be able to make the claim that all data files written for a given batch
53//! include all data up to the upper `t` but not beyond it.
54//! This could be trivially achieved by waiting for all data to arrive up to a certain
55//! frontier, but that would prevent us from streaming writes out to object storage
56//! until the entire batch is complete, which would increase latency and reduce throughput.
57//!
58//! # Writing data files
59//! The "write data files" operator receives rows along with batch descriptions.
60//! It matches rows to batches by timestamp; if a batch description hasn't arrived yet,
61//! rows are stashed until it does. This allows batches to be minted ahead of data arrival.
62//! The operator uses an Iceberg `DeltaWriter` to write Parquet data files
63//! (and position delete files if necessary) to object storage.
64//! It outputs metadata about the written files along with their batch descriptions
65//! for the commit operator to consume.
66//!
67//! # Committing to Iceberg
68//! The "commit to iceberg" operator receives metadata about written data files
69//! along with their batch descriptions. It groups files by batch and creates
70//! Iceberg snapshots that include all files for each batch. It updates the Iceberg
71//! table's metadata to reflect the new snapshots, including updating the
72//! `mz-frontier` property to track progress.
73
74use std::cmp::Ordering;
75use std::collections::{BTreeMap, VecDeque};
76use std::convert::Infallible;
77use std::future::Future;
78use std::time::Instant;
79use std::{cell::RefCell, rc::Rc, sync::Arc};
80
81use anyhow::{Context, anyhow};
82use arrow::array::{ArrayRef, Int32Array, Int64Array, RecordBatch};
83use arrow::datatypes::{DataType, Field, Schema as ArrowSchema};
84use differential_dataflow::lattice::Lattice;
85use differential_dataflow::{AsCollection, Hashable, VecCollection};
86use futures::StreamExt;
87use iceberg::ErrorKind;
88use iceberg::arrow::{arrow_schema_to_schema, schema_to_arrow_schema};
89use iceberg::spec::{
90    DataFile, FormatVersion, Snapshot, StructType, read_data_files_from_avro,
91    write_data_files_to_avro,
92};
93use iceberg::spec::{Schema, SchemaRef};
94use iceberg::table::Table;
95use iceberg::transaction::{ApplyTransactionAction, Transaction};
96use iceberg::writer::base_writer::data_file_writer::DataFileWriterBuilder;
97use iceberg::writer::base_writer::equality_delete_writer::{
98    EqualityDeleteFileWriterBuilder, EqualityDeleteWriterConfig,
99};
100use iceberg::writer::base_writer::position_delete_writer::{
101    PositionDeleteFileWriterBuilder, PositionDeleteWriterConfig,
102};
103use iceberg::writer::combined_writer::delta_writer::DeltaWriterBuilder;
104use iceberg::writer::file_writer::ParquetWriterBuilder;
105use iceberg::writer::file_writer::location_generator::{
106    DefaultFileNameGenerator, DefaultLocationGenerator,
107};
108use iceberg::writer::file_writer::rolling_writer::RollingFileWriterBuilder;
109use iceberg::writer::{IcebergWriter, IcebergWriterBuilder};
110use iceberg::{Catalog, NamespaceIdent, TableCreation, TableIdent};
111use itertools::Itertools;
112use mz_arrow_util::builder::{ARROW_EXTENSION_NAME_KEY, ArrowBuilder};
113use mz_interchange::avro::DiffPair;
114use mz_ore::cast::CastFrom;
115use mz_ore::error::ErrorExt;
116use mz_ore::future::InTask;
117use mz_ore::result::ResultExt;
118use mz_ore::retry::{Retry, RetryResult};
119use mz_persist_client::Diagnostics;
120use mz_persist_client::write::WriteHandle;
121use mz_persist_types::codec_impls::UnitSchema;
122use mz_repr::{Diff, GlobalId, Row, Timestamp};
123use mz_storage_types::StorageDiff;
124use mz_storage_types::configuration::StorageConfiguration;
125use mz_storage_types::controller::CollectionMetadata;
126use mz_storage_types::errors::DataflowError;
127use mz_storage_types::sinks::{IcebergSinkConnection, SinkEnvelope, StorageSinkDesc};
128use mz_storage_types::sources::SourceData;
129use mz_timely_util::antichain::AntichainExt;
130use mz_timely_util::builder_async::{Event, OperatorBuilder, PressOnDropButton};
131use parquet::arrow::PARQUET_FIELD_ID_META_KEY;
132use parquet::file::properties::WriterProperties;
133use serde::{Deserialize, Serialize};
134use timely::PartialOrder;
135use timely::container::CapacityContainerBuilder;
136use timely::dataflow::channels::pact::{Exchange, Pipeline};
137use timely::dataflow::operators::vec::{Broadcast, Map, ToStream};
138use timely::dataflow::operators::{CapabilitySet, Concatenate};
139use timely::dataflow::{Scope, StreamVec};
140use timely::progress::{Antichain, Timestamp as _};
141use tracing::debug;
142
143use crate::healthcheck::{HealthStatusMessage, HealthStatusUpdate, StatusNamespace};
144use crate::metrics::sink::iceberg::IcebergSinkMetrics;
145use crate::render::sinks::SinkRender;
146use crate::statistics::SinkStatistics;
147use crate::storage_state::StorageState;
148
149/// Set the default capacity for the array builders inside the ArrowBuilder. This is the
150/// number of items each builder can hold before it needs to allocate more memory.
151const DEFAULT_ARRAY_BUILDER_ITEM_CAPACITY: usize = 1024;
152/// Set the default buffer capacity for the string and binary array builders inside the
153/// ArrowBuilder. This is the number of bytes each builder can hold before it needs to allocate
154/// more memory.
155const DEFAULT_ARRAY_BUILDER_DATA_CAPACITY: usize = 1024;
156
157/// The prefix for Parquet files written by this sink.
158const PARQUET_FILE_PREFIX: &str = "mz_data";
159/// The number of batch descriptions to mint ahead of the observed frontier. This determines how
160/// many batches we have in-flight at any given time.
161const INITIAL_DESCRIPTIONS_TO_MINT: u64 = 3;
162
163/// Shared state produced by the async setup in [`write_data_files`] that both
164/// envelope handlers need to construct Parquet writers.
165struct WriterContext {
166    /// Arrow schema for data columns, with Materialize extension metadata merged in.
167    arrow_schema: Arc<ArrowSchema>,
168    /// Iceberg table schema, used to configure Parquet writers.
169    current_schema: Arc<Schema>,
170    /// File I/O for writing Parquet files to object storage.
171    file_io: iceberg::io::FileIO,
172    /// Generates file paths under the table's data directory.
173    location_generator: DefaultLocationGenerator,
174    /// Generates unique file names with a per-worker UUID suffix.
175    file_name_generator: DefaultFileNameGenerator,
176    writer_properties: WriterProperties,
177}
178
179/// Envelope-specific logic for writing Iceberg data files.
180trait EnvelopeHandler: Send {
181    /// Construct from the shared writer context after async setup completes.
182    fn new(
183        ctx: WriterContext,
184        connection: &IcebergSinkConnection,
185        materialize_arrow_schema: &Arc<ArrowSchema>,
186    ) -> anyhow::Result<Self>
187    where
188        Self: Sized;
189
190    /// Create an [`IcebergWriter`] for a new batch.
191    ///
192    /// `is_snapshot` is true for the initial "snapshot" batch (lower == as_of), which
193    /// contains all pre-existing data and can be very large. Implementations may use
194    /// this to disable memory-intensive optimisations like seen-rows deduplication.
195    async fn create_writer(&self, is_snapshot: bool) -> anyhow::Result<Box<dyn IcebergWriter>>;
196
197    fn row_to_batch(&self, diff_pair: DiffPair<Row>, ts: Timestamp) -> anyhow::Result<RecordBatch>;
198}
199
200struct UpsertEnvelopeHandler {
201    ctx: WriterContext,
202    /// Iceberg field IDs of the key columns, used for equality delete files.
203    equality_ids: Vec<i32>,
204    /// Iceberg schema for position delete files.
205    pos_schema: Arc<Schema>,
206    /// Iceberg schema for equality delete files (projected to key columns only).
207    eq_schema: Arc<Schema>,
208    /// Configuration for the equality delete writer (projected schema + column IDs).
209    eq_config: EqualityDeleteWriterConfig,
210    /// Arrow schema with an appended `__op` column that the
211    /// [`DeltaWriter`](iceberg::writer::combined_writer::delta_writer::DeltaWriter)
212    /// uses to distinguish inserts (+1) from deletes (-1).
213    schema_with_op: Arc<ArrowSchema>,
214}
215
216impl EnvelopeHandler for UpsertEnvelopeHandler {
217    fn new(
218        ctx: WriterContext,
219        connection: &IcebergSinkConnection,
220        materialize_arrow_schema: &Arc<ArrowSchema>,
221    ) -> anyhow::Result<Self> {
222        let Some((_, equality_indices)) = &connection.key_desc_and_indices else {
223            return Err(anyhow::anyhow!(
224                "Iceberg sink requires key columns for equality deletes"
225            ));
226        };
227
228        let equality_ids = equality_ids_for_indices(
229            ctx.current_schema.as_ref(),
230            materialize_arrow_schema.as_ref(),
231            equality_indices,
232        )?;
233
234        let pos_arrow_schema = PositionDeleteWriterConfig::arrow_schema();
235        let pos_schema = Arc::new(
236            arrow_schema_to_schema(&pos_arrow_schema)
237                .context("Failed to convert position delete Arrow schema to Iceberg schema")?,
238        );
239
240        let eq_config =
241            EqualityDeleteWriterConfig::new(equality_ids.clone(), Arc::clone(&ctx.current_schema))
242                .context("Failed to create EqualityDeleteWriterConfig")?;
243        let eq_schema = Arc::new(
244            arrow_schema_to_schema(eq_config.projected_arrow_schema_ref())
245                .context("Failed to convert equality delete Arrow schema to Iceberg schema")?,
246        );
247
248        let schema_with_op = Arc::new(build_schema_with_op_column(&ctx.arrow_schema));
249
250        Ok(Self {
251            ctx,
252            equality_ids,
253            pos_schema,
254            eq_schema,
255            eq_config,
256            schema_with_op,
257        })
258    }
259
260    async fn create_writer(&self, is_snapshot: bool) -> anyhow::Result<Box<dyn IcebergWriter>> {
261        let data_parquet_writer = ParquetWriterBuilder::new(
262            self.ctx.writer_properties.clone(),
263            Arc::clone(&self.ctx.current_schema),
264        )
265        .with_arrow_schema(Arc::clone(&self.ctx.arrow_schema))
266        .context("Arrow schema validation failed")?;
267        let data_rolling_writer = RollingFileWriterBuilder::new_with_default_file_size(
268            data_parquet_writer,
269            Arc::clone(&self.ctx.current_schema),
270            self.ctx.file_io.clone(),
271            self.ctx.location_generator.clone(),
272            self.ctx.file_name_generator.clone(),
273        );
274        let data_writer_builder = DataFileWriterBuilder::new(data_rolling_writer);
275
276        let pos_config = PositionDeleteWriterConfig::new(None, 0, None);
277        let pos_parquet_writer = ParquetWriterBuilder::new(
278            self.ctx.writer_properties.clone(),
279            Arc::clone(&self.pos_schema),
280        );
281        let pos_rolling_writer = RollingFileWriterBuilder::new_with_default_file_size(
282            pos_parquet_writer,
283            Arc::clone(&self.ctx.current_schema),
284            self.ctx.file_io.clone(),
285            self.ctx.location_generator.clone(),
286            self.ctx.file_name_generator.clone(),
287        );
288        let pos_delete_writer_builder =
289            PositionDeleteFileWriterBuilder::new(pos_rolling_writer, pos_config);
290
291        let eq_parquet_writer = ParquetWriterBuilder::new(
292            self.ctx.writer_properties.clone(),
293            Arc::clone(&self.eq_schema),
294        );
295        let eq_rolling_writer = RollingFileWriterBuilder::new_with_default_file_size(
296            eq_parquet_writer,
297            Arc::clone(&self.ctx.current_schema),
298            self.ctx.file_io.clone(),
299            self.ctx.location_generator.clone(),
300            self.ctx.file_name_generator.clone(),
301        );
302        let eq_delete_writer_builder =
303            EqualityDeleteFileWriterBuilder::new(eq_rolling_writer, self.eq_config.clone());
304
305        let mut builder = DeltaWriterBuilder::new(
306            data_writer_builder,
307            pos_delete_writer_builder,
308            eq_delete_writer_builder,
309            self.equality_ids.clone(),
310        );
311
312        if is_snapshot {
313            builder = builder.with_max_seen_rows(0);
314        }
315
316        Ok(Box::new(
317            builder
318                .build(None)
319                .await
320                .context("Failed to create DeltaWriter")?,
321        ))
322    }
323
324    /// The `__op` column indicates whether each row is an insert (+1) or delete (-1),
325    /// which the DeltaWriter uses to generate the appropriate Iceberg data/delete files.
326    fn row_to_batch(
327        &self,
328        diff_pair: DiffPair<Row>,
329        _ts: Timestamp,
330    ) -> anyhow::Result<RecordBatch> {
331        let mut builder = ArrowBuilder::new_with_schema(
332            Arc::clone(&self.ctx.arrow_schema),
333            DEFAULT_ARRAY_BUILDER_ITEM_CAPACITY,
334            DEFAULT_ARRAY_BUILDER_DATA_CAPACITY,
335        )
336        .context("Failed to create builder")?;
337
338        let mut op_values = Vec::new();
339
340        if let Some(before) = diff_pair.before {
341            builder
342                .add_row(&before)
343                .context("Failed to add delete row to builder")?;
344            op_values.push(-1i32);
345        }
346        if let Some(after) = diff_pair.after {
347            builder
348                .add_row(&after)
349                .context("Failed to add insert row to builder")?;
350            op_values.push(1i32);
351        }
352
353        let batch = builder
354            .to_record_batch()
355            .context("Failed to create record batch")?;
356
357        let mut columns: Vec<ArrayRef> = batch.columns().to_vec();
358        columns.push(Arc::new(Int32Array::from(op_values)));
359
360        RecordBatch::try_new(Arc::clone(&self.schema_with_op), columns)
361            .context("Failed to create batch with op column")
362    }
363}
364
365struct AppendEnvelopeHandler {
366    ctx: WriterContext,
367    /// Arrow schema with only user columns (no `_mz_diff`/`_mz_timestamp`), used by
368    /// [`ArrowBuilder`] to serialize row data before the extra columns are appended.
369    user_schema_for_append: Arc<ArrowSchema>,
370}
371
372impl EnvelopeHandler for AppendEnvelopeHandler {
373    fn new(
374        ctx: WriterContext,
375        _connection: &IcebergSinkConnection,
376        _materialize_arrow_schema: &Arc<ArrowSchema>,
377    ) -> anyhow::Result<Self> {
378        // arrow_schema already includes _mz_diff + _mz_timestamp (added in render_sink); strip
379        // the last two fields so ArrowBuilder only processes the user columns.
380        let n = ctx.arrow_schema.fields().len().saturating_sub(2);
381        let user_schema_for_append =
382            Arc::new(ArrowSchema::new(ctx.arrow_schema.fields()[..n].to_vec()));
383
384        Ok(Self {
385            ctx,
386            user_schema_for_append,
387        })
388    }
389
390    async fn create_writer(&self, _is_snapshot: bool) -> anyhow::Result<Box<dyn IcebergWriter>> {
391        let data_parquet_writer = ParquetWriterBuilder::new(
392            self.ctx.writer_properties.clone(),
393            Arc::clone(&self.ctx.current_schema),
394        )
395        .with_arrow_schema(Arc::clone(&self.ctx.arrow_schema))
396        .context("Arrow schema validation failed")?;
397        let data_rolling_writer = RollingFileWriterBuilder::new_with_default_file_size(
398            data_parquet_writer,
399            Arc::clone(&self.ctx.current_schema),
400            self.ctx.file_io.clone(),
401            self.ctx.location_generator.clone(),
402            self.ctx.file_name_generator.clone(),
403        );
404        Ok(Box::new(
405            DataFileWriterBuilder::new(data_rolling_writer)
406                .build(None)
407                .await
408                .context("Failed to create DataFileWriter")?,
409        ))
410    }
411
412    /// Every change is written as a plain data row: the `before` half (if present) gets
413    /// `_mz_diff = -1` and the `after` half gets `_mz_diff = +1`. Both carry the same `_mz_timestamp`.
414    fn row_to_batch(&self, diff_pair: DiffPair<Row>, ts: Timestamp) -> anyhow::Result<RecordBatch> {
415        let mut builder = ArrowBuilder::new_with_schema(
416            Arc::clone(&self.user_schema_for_append),
417            DEFAULT_ARRAY_BUILDER_ITEM_CAPACITY,
418            DEFAULT_ARRAY_BUILDER_DATA_CAPACITY,
419        )
420        .context("Failed to create builder")?;
421
422        let mut diff_values: Vec<i32> = Vec::new();
423        let ts_i64 = i64::try_from(u64::from(ts)).unwrap_or(i64::MAX);
424
425        if let Some(before) = diff_pair.before {
426            builder
427                .add_row(&before)
428                .context("Failed to add before row to builder")?;
429            diff_values.push(-1i32);
430        }
431        if let Some(after) = diff_pair.after {
432            builder
433                .add_row(&after)
434                .context("Failed to add after row to builder")?;
435            diff_values.push(1i32);
436        }
437
438        let n = diff_values.len();
439        let batch = builder
440            .to_record_batch()
441            .context("Failed to create record batch")?;
442
443        let mut columns: Vec<ArrayRef> = batch.columns().to_vec();
444        columns.push(Arc::new(Int32Array::from(diff_values)));
445        columns.push(Arc::new(Int64Array::from(vec![ts_i64; n])));
446
447        RecordBatch::try_new(Arc::clone(&self.ctx.arrow_schema), columns)
448            .context("Failed to create append record batch")
449    }
450}
451
452/// The precision needed to store all UInt64 values in a Decimal128.
453/// UInt64 max value is 18,446,744,073,709,551,615 which has 20 digits.
454const ICEBERG_UINT64_DECIMAL_PRECISION: u8 = 20;
455
456/// Add Parquet field IDs to an Arrow schema. Iceberg requires field IDs in the
457/// Parquet metadata for schema evolution tracking. Field IDs are assigned
458/// recursively to all nested fields (structs, lists, maps) using a depth-first,
459/// pre-order traversal.
460fn add_field_ids_to_arrow_schema(schema: ArrowSchema) -> ArrowSchema {
461    let mut next_field_id = 1i32;
462    let fields: Vec<Field> = schema
463        .fields()
464        .iter()
465        .map(|field| add_field_ids_recursive(field, &mut next_field_id))
466        .collect();
467    ArrowSchema::new(fields).with_metadata(schema.metadata().clone())
468}
469
470/// Recursively add field IDs to a field and all its nested children.
471fn add_field_ids_recursive(field: &Field, next_id: &mut i32) -> Field {
472    let current_id = *next_id;
473    *next_id += 1;
474
475    let mut metadata = field.metadata().clone();
476    metadata.insert(
477        PARQUET_FIELD_ID_META_KEY.to_string(),
478        current_id.to_string(),
479    );
480
481    let new_data_type = add_field_ids_to_datatype(field.data_type(), next_id);
482
483    Field::new(field.name(), new_data_type, field.is_nullable()).with_metadata(metadata)
484}
485
486/// Add field IDs to nested fields within a DataType.
487fn add_field_ids_to_datatype(data_type: &DataType, next_id: &mut i32) -> DataType {
488    match data_type {
489        DataType::Struct(fields) => {
490            let new_fields: Vec<Field> = fields
491                .iter()
492                .map(|f| add_field_ids_recursive(f, next_id))
493                .collect();
494            DataType::Struct(new_fields.into())
495        }
496        DataType::List(element_field) => {
497            let new_element = add_field_ids_recursive(element_field, next_id);
498            DataType::List(Arc::new(new_element))
499        }
500        DataType::LargeList(element_field) => {
501            let new_element = add_field_ids_recursive(element_field, next_id);
502            DataType::LargeList(Arc::new(new_element))
503        }
504        DataType::Map(entries_field, sorted) => {
505            let new_entries = add_field_ids_recursive(entries_field, next_id);
506            DataType::Map(Arc::new(new_entries), *sorted)
507        }
508        _ => data_type.clone(),
509    }
510}
511
512/// Merge Materialize extension metadata into Iceberg's Arrow schema.
513/// This uses Iceberg's data types (e.g. Utf8) and field IDs while preserving
514/// Materialize's extension names for ArrowBuilder compatibility.
515/// Handles nested types (structs, lists, maps) recursively.
516fn merge_materialize_metadata_into_iceberg_schema(
517    materialize_arrow_schema: &ArrowSchema,
518    iceberg_schema: &Schema,
519) -> anyhow::Result<ArrowSchema> {
520    // First, convert Iceberg schema to Arrow (this gives us the correct data types)
521    let iceberg_arrow_schema = schema_to_arrow_schema(iceberg_schema)
522        .context("Failed to convert Iceberg schema to Arrow schema")?;
523
524    // Now merge in the Materialize extension metadata
525    let fields: Vec<Field> = iceberg_arrow_schema
526        .fields()
527        .iter()
528        .map(|iceberg_field| {
529            // Find the corresponding Materialize field by name to get extension metadata
530            let mz_field = materialize_arrow_schema
531                .field_with_name(iceberg_field.name())
532                .with_context(|| {
533                    format!(
534                        "Field '{}' not found in Materialize schema",
535                        iceberg_field.name()
536                    )
537                })?;
538
539            merge_field_metadata_recursive(iceberg_field, Some(mz_field))
540        })
541        .collect::<anyhow::Result<Vec<_>>>()?;
542
543    Ok(ArrowSchema::new(fields).with_metadata(iceberg_arrow_schema.metadata().clone()))
544}
545
546/// Recursively merge Materialize extension metadata into an Iceberg field.
547fn merge_field_metadata_recursive(
548    iceberg_field: &Field,
549    mz_field: Option<&Field>,
550) -> anyhow::Result<Field> {
551    // Start with Iceberg field's metadata (which includes field IDs)
552    let mut metadata = iceberg_field.metadata().clone();
553
554    // Add Materialize extension name if available
555    if let Some(mz_f) = mz_field {
556        if let Some(extension_name) = mz_f.metadata().get(ARROW_EXTENSION_NAME_KEY) {
557            metadata.insert(ARROW_EXTENSION_NAME_KEY.to_string(), extension_name.clone());
558        }
559    }
560
561    // Recursively process nested types
562    let new_data_type = match iceberg_field.data_type() {
563        DataType::Struct(iceberg_fields) => {
564            let mz_struct_fields = match mz_field {
565                Some(f) => match f.data_type() {
566                    DataType::Struct(fields) => Some(fields),
567                    other => anyhow::bail!(
568                        "Type mismatch for field '{}': Iceberg schema has Struct, but Materialize schema has {:?}",
569                        iceberg_field.name(),
570                        other
571                    ),
572                },
573                None => None,
574            };
575
576            let new_fields: Vec<Field> = iceberg_fields
577                .iter()
578                .map(|iceberg_inner| {
579                    let mz_inner = mz_struct_fields.and_then(|fields| {
580                        fields.iter().find(|f| f.name() == iceberg_inner.name())
581                    });
582                    merge_field_metadata_recursive(iceberg_inner, mz_inner.map(|f| f.as_ref()))
583                })
584                .collect::<anyhow::Result<Vec<_>>>()?;
585
586            DataType::Struct(new_fields.into())
587        }
588        DataType::List(iceberg_element) => {
589            let mz_element = match mz_field {
590                Some(f) => match f.data_type() {
591                    DataType::List(element) => Some(element.as_ref()),
592                    other => anyhow::bail!(
593                        "Type mismatch for field '{}': Iceberg schema has List, but Materialize schema has {:?}",
594                        iceberg_field.name(),
595                        other
596                    ),
597                },
598                None => None,
599            };
600            let new_element = merge_field_metadata_recursive(iceberg_element, mz_element)?;
601            DataType::List(Arc::new(new_element))
602        }
603        DataType::LargeList(iceberg_element) => {
604            let mz_element = match mz_field {
605                Some(f) => match f.data_type() {
606                    DataType::LargeList(element) => Some(element.as_ref()),
607                    other => anyhow::bail!(
608                        "Type mismatch for field '{}': Iceberg schema has LargeList, but Materialize schema has {:?}",
609                        iceberg_field.name(),
610                        other
611                    ),
612                },
613                None => None,
614            };
615            let new_element = merge_field_metadata_recursive(iceberg_element, mz_element)?;
616            DataType::LargeList(Arc::new(new_element))
617        }
618        DataType::Map(iceberg_entries, sorted) => {
619            let mz_entries = match mz_field {
620                Some(f) => match f.data_type() {
621                    DataType::Map(entries, _) => Some(entries.as_ref()),
622                    other => anyhow::bail!(
623                        "Type mismatch for field '{}': Iceberg schema has Map, but Materialize schema has {:?}",
624                        iceberg_field.name(),
625                        other
626                    ),
627                },
628                None => None,
629            };
630            let new_entries = merge_field_metadata_recursive(iceberg_entries, mz_entries)?;
631            DataType::Map(Arc::new(new_entries), *sorted)
632        }
633        other => other.clone(),
634    };
635
636    Ok(Field::new(
637        iceberg_field.name(),
638        new_data_type,
639        iceberg_field.is_nullable(),
640    )
641    .with_metadata(metadata))
642}
643
644async fn reload_table(
645    catalog: &dyn Catalog,
646    namespace: String,
647    table_name: String,
648    current_table: Table,
649) -> anyhow::Result<Table> {
650    let namespace_ident = NamespaceIdent::new(namespace.clone());
651    let table_ident = TableIdent::new(namespace_ident, table_name.clone());
652    let current_schema = current_table.metadata().current_schema_id();
653    let current_partition_spec = current_table.metadata().default_partition_spec_id();
654
655    match catalog.load_table(&table_ident).await {
656        Ok(table) => {
657            let reloaded_schema = table.metadata().current_schema_id();
658            let reloaded_partition_spec = table.metadata().default_partition_spec_id();
659            if reloaded_schema != current_schema {
660                return Err(anyhow::anyhow!(
661                    "Iceberg table '{}' schema changed during operation but schema evolution isn't supported, expected schema ID {}, got {}",
662                    table_name,
663                    current_schema,
664                    reloaded_schema
665                ));
666            }
667
668            if reloaded_partition_spec != current_partition_spec {
669                return Err(anyhow::anyhow!(
670                    "Iceberg table '{}' partition spec changed during operation but partition spec evolution isn't supported, expected partition spec ID {}, got {}",
671                    table_name,
672                    current_partition_spec,
673                    reloaded_partition_spec
674                ));
675            }
676
677            Ok(table)
678        }
679        Err(err) => Err(err).context("Failed to reload Iceberg table"),
680    }
681}
682
683/// Attempt a single commit of a batch of data files to an Iceberg table.
684/// On conflict or failure, reloads the table and returns a retryable error.
685/// On success, returns the updated table state.
686async fn try_commit_batch(
687    mut table: Table,
688    snapshot_properties: Vec<(String, String)>,
689    data_files: Vec<DataFile>,
690    delete_files: Vec<DataFile>,
691    catalog: &dyn Catalog,
692    conn_namespace: &str,
693    conn_table: &str,
694    sink_version: u64,
695    frontier: &Antichain<Timestamp>,
696    batch_lower: &Antichain<Timestamp>,
697    batch_upper: &Antichain<Timestamp>,
698    metrics: &IcebergSinkMetrics,
699) -> (Table, RetryResult<(), anyhow::Error>) {
700    let tx = Transaction::new(&table);
701    let mut action = tx
702        .row_delta()
703        .set_snapshot_properties(snapshot_properties.into_iter().collect())
704        .with_check_duplicate(false);
705
706    if !data_files.is_empty() || !delete_files.is_empty() {
707        action = action
708            .add_data_files(data_files)
709            .add_delete_files(delete_files);
710    }
711
712    let tx = match action
713        .apply(tx)
714        .context("Failed to apply data file addition to iceberg table transaction")
715    {
716        Ok(tx) => tx,
717        Err(e) => {
718            match reload_table(
719                catalog,
720                conn_namespace.to_string(),
721                conn_table.to_string(),
722                table.clone(),
723            )
724            .await
725            {
726                Ok(reloaded) => table = reloaded,
727                Err(reload_err) => {
728                    return (table, RetryResult::RetryableErr(anyhow!(reload_err)));
729                }
730            }
731            return (
732                table,
733                RetryResult::RetryableErr(anyhow!(
734                    "Failed to apply data file addition to iceberg table transaction: {}",
735                    e
736                )),
737            );
738        }
739    };
740
741    let new_table = tx.commit(catalog).await;
742    match new_table {
743        Err(e) if matches!(e.kind(), ErrorKind::CatalogCommitConflicts) => {
744            metrics.commit_conflicts.inc();
745            match reload_table(
746                catalog,
747                conn_namespace.to_string(),
748                conn_table.to_string(),
749                table.clone(),
750            )
751            .await
752            {
753                Ok(reloaded) => table = reloaded,
754                Err(e) => {
755                    return (table, RetryResult::RetryableErr(anyhow!(e)));
756                }
757            };
758
759            let mut snapshots: Vec<_> = table.metadata().snapshots().cloned().collect();
760            let last = retrieve_upper_from_snapshots(&mut snapshots);
761            let last = match last {
762                Ok(val) => val,
763                Err(e) => {
764                    return (table, RetryResult::RetryableErr(anyhow!(e)));
765                }
766            };
767
768            // Check if another writer has advanced the frontier beyond ours (fencing check)
769            if let Some((last_frontier, last_version)) = last {
770                if last_version > sink_version {
771                    return (
772                        table,
773                        RetryResult::FatalErr(anyhow!(
774                            "Iceberg table '{}' has been modified by another writer \
775                             with version {}. Current sink version: {}. \
776                             Frontiers may be out of sync, aborting to avoid data loss.",
777                            conn_table,
778                            last_version,
779                            sink_version,
780                        )),
781                    );
782                }
783                if PartialOrder::less_equal(frontier, &last_frontier) {
784                    return (
785                        table,
786                        RetryResult::FatalErr(anyhow!(
787                            "Iceberg table '{}' has been modified by another writer. \
788                             Current frontier: {:?}, last frontier: {:?}.",
789                            conn_table,
790                            frontier,
791                            last_frontier,
792                        )),
793                    );
794                }
795            }
796
797            (
798                table,
799                RetryResult::RetryableErr(anyhow!(
800                    "Commit conflict detected when committing batch [{}, {}) \
801                     to Iceberg table '{}.{}'. Retrying...",
802                    batch_lower.pretty(),
803                    batch_upper.pretty(),
804                    conn_namespace,
805                    conn_table
806                )),
807            )
808        }
809        Err(e) => {
810            metrics.commit_failures.inc();
811            (table, RetryResult::RetryableErr(anyhow!(e)))
812        }
813        Ok(new_table) => (new_table, RetryResult::Ok(())),
814    }
815}
816
817/// Load an existing Iceberg table or create it if it doesn't exist.
818async fn load_or_create_table(
819    catalog: &dyn Catalog,
820    namespace: String,
821    table_name: String,
822    schema: &Schema,
823) -> anyhow::Result<iceberg::table::Table> {
824    let namespace_ident = NamespaceIdent::new(namespace.clone());
825    let table_ident = TableIdent::new(namespace_ident.clone(), table_name.clone());
826
827    // Try to load the table first
828    match catalog.load_table(&table_ident).await {
829        Ok(table) => {
830            // Table exists, return it
831            // TODO: Add proper schema evolution/validation to ensure compatibility
832            Ok(table)
833        }
834        Err(err) => {
835            if matches!(err.kind(), ErrorKind::TableNotFound { .. })
836                || err
837                    .message()
838                    .contains("Tried to load a table that does not exist")
839            {
840                // Table doesn't exist, create it
841                // Note: location is not specified, letting the catalog determine the default location
842                // based on its warehouse configuration
843                let table_creation = TableCreation::builder()
844                    .name(table_name.clone())
845                    .schema(schema.clone())
846                    // Use unpartitioned spec by default
847                    // TODO: Consider making partition spec configurable
848                    // .partition_spec(UnboundPartitionSpec::builder().build())
849                    .build();
850
851                catalog
852                    .create_table(&namespace_ident, table_creation)
853                    .await
854                    .with_context(|| {
855                        format!(
856                            "Failed to create Iceberg table '{}' in namespace '{}'",
857                            table_name, namespace
858                        )
859                    })
860            } else {
861                // Some other error occurred
862                Err(err).context("Failed to load Iceberg table")
863            }
864        }
865    }
866}
867
868/// Find the most recent Materialize frontier from Iceberg snapshots.
869/// We store the frontier in snapshot metadata to track where we left off after restarts.
870/// Snapshots with operation="replace" (compactions) don't have our metadata and are skipped.
871/// The input slice will be sorted by sequence number in descending order.
872fn retrieve_upper_from_snapshots(
873    snapshots: &mut [Arc<Snapshot>],
874) -> anyhow::Result<Option<(Antichain<Timestamp>, u64)>> {
875    snapshots.sort_by(|a, b| Ord::cmp(&b.sequence_number(), &a.sequence_number()));
876
877    for snapshot in snapshots {
878        let props = &snapshot.summary().additional_properties;
879        if let (Some(frontier_json), Some(sink_version_str)) =
880            (props.get("mz-frontier"), props.get("mz-sink-version"))
881        {
882            let frontier: Vec<Timestamp> = serde_json::from_str(frontier_json)
883                .context("Failed to deserialize frontier from snapshot properties")?;
884            let frontier = Antichain::from_iter(frontier);
885
886            let sink_version = sink_version_str
887                .parse::<u64>()
888                .context("Failed to parse mz-sink-version from snapshot properties")?;
889
890            return Ok(Some((frontier, sink_version)));
891        }
892        if snapshot.summary().operation.as_str() != "replace" {
893            // This is a bad heuristic, but we have no real other way to identify compactions
894            // right now other than assume they will be the only operation writing "replace" operations.
895            // That means if we find a snapshot with some other operation, but no mz-frontier, we are in an
896            // inconsistent state and have to error out.
897            anyhow::bail!(
898                "Iceberg table is in an inconsistent state: snapshot {} has operation '{}' but is missing 'mz-frontier' property. Schema or partition spec evolution is not supported.",
899                snapshot.snapshot_id(),
900                snapshot.summary().operation.as_str(),
901            );
902        }
903    }
904
905    Ok(None)
906}
907
908/// Type overrides for Iceberg-compatible Arrow schemas.
909///
910/// Iceberg doesn't support unsigned integer types, so we map them to compatible types:
911/// - UInt8, UInt16 -> Int32
912/// - UInt32 -> Int64
913/// - UInt64 -> Decimal128(20, 0)
914/// - MzTimestamp (which uses UInt64) -> Decimal128(20, 0)
915fn iceberg_type_overrides(scalar_type: &mz_repr::SqlScalarType) -> Option<(DataType, String)> {
916    use mz_repr::SqlScalarType;
917    match scalar_type {
918        SqlScalarType::UInt16 => Some((DataType::Int32, "uint2".to_string())),
919        SqlScalarType::UInt32 => Some((DataType::Int64, "uint4".to_string())),
920        SqlScalarType::UInt64 => Some((
921            DataType::Decimal128(ICEBERG_UINT64_DECIMAL_PRECISION, 0),
922            "uint8".to_string(),
923        )),
924        SqlScalarType::MzTimestamp => Some((
925            DataType::Decimal128(ICEBERG_UINT64_DECIMAL_PRECISION, 0),
926            "mz_timestamp".to_string(),
927        )),
928        _ => None,
929    }
930}
931
932/// Convert a Materialize RelationDesc into Arrow and Iceberg schemas.
933///
934/// Returns a tuple of:
935/// - The Arrow schema (with field IDs and Iceberg-compatible types) for writing Parquet files
936/// - The Iceberg schema for table creation/validation
937///
938/// Iceberg doesn't support unsigned integer types, so we use `iceberg_type_overrides`
939/// to map them to compatible types (e.g., UInt64 -> Decimal128(20,0)). The ArrowBuilder
940/// handles the cross-type conversion (Datum::UInt64 -> Decimal128Builder) automatically.
941fn relation_desc_to_iceberg_schema(
942    desc: &mz_repr::RelationDesc,
943) -> anyhow::Result<(ArrowSchema, SchemaRef)> {
944    let arrow_schema =
945        mz_arrow_util::builder::desc_to_schema_with_overrides(desc, iceberg_type_overrides)
946            .context("Failed to convert RelationDesc to Iceberg-compatible Arrow schema")?;
947
948    let arrow_schema_with_ids = add_field_ids_to_arrow_schema(arrow_schema);
949
950    let iceberg_schema = arrow_schema_to_schema(&arrow_schema_with_ids)
951        .context("Failed to convert Arrow schema to Iceberg schema")?;
952
953    Ok((arrow_schema_with_ids, Arc::new(iceberg_schema)))
954}
955
956/// Resolve Materialize key column indexes to Iceberg top-level field IDs.
957///
958/// Iceberg field IDs are assigned recursively, so a top-level column's field ID
959/// is not necessarily `column_index + 1` once nested fields are present.
960fn equality_ids_for_indices(
961    current_schema: &Schema,
962    materialize_arrow_schema: &ArrowSchema,
963    equality_indices: &[usize],
964) -> anyhow::Result<Vec<i32>> {
965    let top_level_fields = current_schema.as_struct();
966
967    equality_indices
968        .iter()
969        .map(|index| {
970            let mz_field = materialize_arrow_schema
971                .fields()
972                .get(*index)
973                .with_context(|| format!("Equality delete key index {index} is out of bounds"))?;
974            let field_name = mz_field.name();
975            let iceberg_field = top_level_fields
976                .field_by_name(field_name)
977                .with_context(|| {
978                    format!(
979                        "Equality delete key column '{}' not found in Iceberg table schema",
980                        field_name
981                    )
982                })?;
983            Ok(iceberg_field.id)
984        })
985        .collect()
986}
987
988/// Build a new Arrow schema by adding an __op column to the existing schema.
989fn build_schema_with_op_column(schema: &ArrowSchema) -> ArrowSchema {
990    let mut fields: Vec<Arc<Field>> = schema.fields().iter().cloned().collect();
991    fields.push(Arc::new(Field::new("__op", DataType::Int32, false)));
992    ArrowSchema::new(fields)
993}
994
995/// Build a new Arrow schema by appending `_mz_diff` (Int32) and `_mz_timestamp` (Int64) columns.
996/// These are user-visible Iceberg columns written in append mode. Parquet field IDs are
997/// assigned sequentially after the existing maximum field ID so the extended schema can
998/// be converted to a valid Iceberg schema via `arrow_schema_to_schema`.
999#[allow(clippy::disallowed_types)]
1000fn build_schema_with_append_columns(schema: &ArrowSchema) -> ArrowSchema {
1001    use mz_storage_types::sinks::{ICEBERG_APPEND_DIFF_COLUMN, ICEBERG_APPEND_TIMESTAMP_COLUMN};
1002    let mut fields: Vec<Arc<Field>> = schema.fields().iter().cloned().collect();
1003    fields.push(Arc::new(Field::new(
1004        ICEBERG_APPEND_DIFF_COLUMN,
1005        DataType::Int32,
1006        false,
1007    )));
1008    fields.push(Arc::new(Field::new(
1009        ICEBERG_APPEND_TIMESTAMP_COLUMN,
1010        DataType::Int64,
1011        false,
1012    )));
1013
1014    add_field_ids_to_arrow_schema(ArrowSchema::new(fields).with_metadata(schema.metadata().clone()))
1015}
1016
1017/// Generate time-based batch boundaries for grouping writes into Iceberg snapshots.
1018/// Batches are minted with configurable windows to balance write efficiency with latency.
1019/// We maintain a sliding window of future batch descriptions so writers can start
1020/// processing data even while earlier batches are still being written.
1021fn mint_batch_descriptions<G, D>(
1022    name: String,
1023    sink_id: GlobalId,
1024    input: VecCollection<G, D, Diff>,
1025    sink: &StorageSinkDesc<CollectionMetadata, Timestamp>,
1026    connection: IcebergSinkConnection,
1027    storage_configuration: StorageConfiguration,
1028    initial_schema: SchemaRef,
1029) -> (
1030    VecCollection<G, D, Diff>,
1031    StreamVec<G, (Antichain<Timestamp>, Antichain<Timestamp>)>,
1032    StreamVec<G, Infallible>,
1033    StreamVec<G, HealthStatusMessage>,
1034    PressOnDropButton,
1035)
1036where
1037    G: Scope<Timestamp = Timestamp>,
1038    D: Clone + 'static,
1039{
1040    let scope = input.scope();
1041    let name_for_error = name.clone();
1042    let name_for_logging = name.clone();
1043    let mut builder = OperatorBuilder::new(name, scope.clone());
1044    let sink_version = sink.version;
1045
1046    let hashed_id = sink_id.hashed();
1047    let is_active_worker = usize::cast_from(hashed_id) % scope.peers() == scope.index();
1048    let (_, table_ready_stream) = builder.new_output::<CapacityContainerBuilder<Vec<_>>>();
1049    let (output, output_stream) = builder.new_output();
1050    let (batch_desc_output, batch_desc_stream) =
1051        builder.new_output::<CapacityContainerBuilder<Vec<_>>>();
1052    let mut input =
1053        builder.new_input_for_many(input.inner, Pipeline, [&output, &batch_desc_output]);
1054
1055    let as_of = sink.as_of.clone();
1056    let commit_interval = sink
1057        .commit_interval
1058        .expect("the planner should have enforced this")
1059        .clone();
1060
1061    let (button, errors): (_, StreamVec<G, Rc<anyhow::Error>>) =
1062        builder.build_fallible(move |caps| {
1063        Box::pin(async move {
1064            let [table_ready_capset, data_capset, capset]: &mut [_; 3] = caps.try_into().unwrap();
1065            *data_capset = CapabilitySet::new();
1066
1067            if !is_active_worker {
1068                *capset = CapabilitySet::new();
1069                *data_capset = CapabilitySet::new();
1070                *table_ready_capset = CapabilitySet::new();
1071                while let Some(event) = input.next().await {
1072                    match event {
1073                        Event::Data([output_cap, _], mut data) => {
1074                            output.give_container(&output_cap, &mut data);
1075                        }
1076                        Event::Progress(_) => {}
1077                    }
1078                }
1079                return Ok(());
1080            }
1081
1082            let catalog = connection
1083                .catalog_connection
1084                .connect(&storage_configuration, InTask::Yes)
1085                .await
1086                .with_context(|| {
1087                    format!(
1088                        "Failed to connect to Iceberg catalog '{}' for table '{}.{}'",
1089                        connection.catalog_connection.uri, connection.namespace, connection.table
1090                    )
1091                })?;
1092
1093            let table = load_or_create_table(
1094                catalog.as_ref(),
1095                connection.namespace.clone(),
1096                connection.table.clone(),
1097                initial_schema.as_ref(),
1098            )
1099            .await?;
1100            debug!(
1101                ?sink_id,
1102                %name_for_logging,
1103                namespace = %connection.namespace,
1104                table = %connection.table,
1105                "iceberg mint loaded/created table"
1106            );
1107
1108            *table_ready_capset = CapabilitySet::new();
1109
1110            let mut snapshots: Vec<_> = table.metadata().snapshots().cloned().collect();
1111            let resume = retrieve_upper_from_snapshots(&mut snapshots)?;
1112            let (resume_upper, resume_version) = match resume {
1113                Some((f, v)) => (f, v),
1114                None => (Antichain::from_elem(Timestamp::minimum()), 0),
1115            };
1116            debug!(
1117                ?sink_id,
1118                %name_for_logging,
1119                resume_upper = %resume_upper.pretty(),
1120                resume_version,
1121                as_of = %as_of.pretty(),
1122                "iceberg mint resume position loaded"
1123            );
1124
1125            // The input has overcompacted if
1126            let overcompacted =
1127                // ..we have made some progress in the past
1128                *resume_upper != [Timestamp::minimum()] &&
1129                // ..but the since frontier is now beyond that
1130                PartialOrder::less_than(&resume_upper, &as_of);
1131
1132            if overcompacted {
1133                let err = format!(
1134                    "{name_for_error}: input compacted past resume upper: as_of {}, resume_upper: {}",
1135                    as_of.pretty(),
1136                    resume_upper.pretty()
1137                );
1138                // This would normally be an assertion but because it can happen after a
1139                // Materialize backup/restore we log an error so that it appears on Sentry but
1140                // leaves the rest of the objects in the cluster unaffected.
1141                return Err(anyhow::anyhow!("{err}"));
1142            };
1143
1144            if resume_version > sink_version {
1145                anyhow::bail!("Fenced off by newer sink version: resume_version {}, sink_version {}", resume_version, sink_version);
1146            }
1147
1148            let mut initialized = false;
1149            let mut observed_frontier;
1150            let mut max_seen_ts: Option<Timestamp> = None;
1151            // Track minted batches to maintain a sliding window of open batch descriptions.
1152            // This is needed to know when to retire old batches and mint new ones.
1153            // It's "sortedness" is derived from the monotonicity of batch descriptions,
1154            // and the fact that we only ever push new descriptions to the back and pop from the front.
1155            let mut minted_batches = VecDeque::new();
1156            loop {
1157                if let Some(event) = input.next().await {
1158                    match event {
1159                        Event::Data([output_cap, _], mut data) => {
1160                            if !initialized {
1161                                for (_, ts, _) in data.iter() {
1162                                    match max_seen_ts.as_mut() {
1163                                        Some(max) => {
1164                                            if max.less_than(ts) {
1165                                                *max = ts.clone();
1166                                            }
1167                                        }
1168                                        None => {
1169                                            max_seen_ts = Some(ts.clone());
1170                                        }
1171                                    }
1172                                }
1173                            }
1174                            output.give_container(&output_cap, &mut data);
1175                            continue;
1176                        }
1177                        Event::Progress(frontier) => {
1178                            observed_frontier = frontier;
1179                        }
1180                    }
1181                } else {
1182                    return Ok(());
1183                }
1184
1185                if !initialized {
1186                    if observed_frontier.is_empty() {
1187                        // Bounded inputs can close (frontier becomes empty) before we finish
1188                        // initialization. For example, a loadgen source configured for a finite
1189                        // dataset may emit all rows at time t and then immediately close. If we
1190                        // saw any data, synthesize an upper one tick past the maximum timestamp
1191                        // so we can mint a snapshot batch and commit it.
1192                        if let Some(max_ts) = max_seen_ts.as_ref() {
1193                            let synthesized_upper =
1194                                Antichain::from_elem(max_ts.step_forward());
1195                            debug!(
1196                                ?sink_id,
1197                                %name_for_logging,
1198                                max_seen_ts = %max_ts,
1199                                synthesized_upper = %synthesized_upper.pretty(),
1200                                "iceberg mint input closed before initialization; using max seen ts"
1201                            );
1202                            observed_frontier = synthesized_upper;
1203                        } else {
1204                            debug!(
1205                                ?sink_id,
1206                                %name_for_logging,
1207                                "iceberg mint input closed before initialization with no data"
1208                            );
1209                            // Input stream closed before initialization completed and no data arrived.
1210                            return Ok(());
1211                        }
1212                    }
1213
1214                    // We only start minting after we've reached as_of and resume_upper to avoid
1215                    // minting batches that would be immediately skipped.
1216                    if PartialOrder::less_than(&observed_frontier, &resume_upper)
1217                        || PartialOrder::less_than(&observed_frontier, &as_of)
1218                    {
1219                        continue;
1220                    }
1221
1222                    let mut batch_descriptions = vec![];
1223                    let mut current_upper = observed_frontier.clone();
1224                    let current_upper_ts = current_upper.as_option().expect("frontier not empty").clone();
1225
1226                    // Create a catch-up batch from the later of resume_upper or as_of to current frontier.
1227                    // We use the later of the two because:
1228                    // - For fresh sinks: resume_upper = minimum, as_of = actual timestamp, data starts at as_of
1229                    // - For resuming: as_of <= resume_upper (enforced by overcompaction check), data starts at resume_upper
1230                    let batch_lower = if PartialOrder::less_than(&resume_upper, &as_of) {
1231                        as_of.clone()
1232                    } else {
1233                        resume_upper.clone()
1234                    };
1235
1236                    if batch_lower == current_upper {
1237                        // Snapshot! as_of is exactly at the frontier. We still need to mint
1238                        // a batch to create the snapshot, so we step the upper forward by one.
1239                        current_upper = Antichain::from_elem(current_upper_ts.step_forward());
1240                    }
1241
1242                    let batch_description = (batch_lower.clone(), current_upper.clone());
1243                    debug!(
1244                        ?sink_id,
1245                        %name_for_logging,
1246                        batch_lower = %batch_lower.pretty(),
1247                        current_upper = %current_upper.pretty(),
1248                        "iceberg mint initializing (catch-up batch)"
1249                    );
1250                    debug!(
1251                        "{}: creating catch-up batch [{}, {})",
1252                        name_for_logging,
1253                        batch_lower.pretty(),
1254                        current_upper.pretty()
1255                    );
1256                    batch_descriptions.push(batch_description);
1257                    // Mint initial future batch descriptions at configurable intervals
1258                    for i in 1..INITIAL_DESCRIPTIONS_TO_MINT {
1259                        let duration_millis = commit_interval.as_millis()
1260                            .checked_mul(u128::from(i))
1261                            .expect("commit interval multiplication overflow");
1262                        let duration_ts = Timestamp::new(
1263                            u64::try_from(duration_millis)
1264                                .expect("commit interval too large for u64"),
1265                        );
1266                        let desired_batch_upper = Antichain::from_elem(
1267                            current_upper_ts.step_forward_by(&duration_ts),
1268                        );
1269
1270                        let batch_description =
1271                            (current_upper.clone(), desired_batch_upper.clone());
1272                        debug!(
1273                            "{}: minting future batch {}/{} [{}, {})",
1274                            name_for_logging,
1275                            i,
1276                            INITIAL_DESCRIPTIONS_TO_MINT,
1277                            current_upper.pretty(),
1278                            desired_batch_upper.pretty()
1279                        );
1280                        current_upper = batch_description.1.clone();
1281                        batch_descriptions.push(batch_description);
1282                    }
1283
1284                    minted_batches.extend(batch_descriptions.clone());
1285
1286                    for desc in batch_descriptions {
1287                        batch_desc_output.give(&capset[0], desc);
1288                    }
1289
1290                    capset.downgrade(current_upper);
1291
1292                    initialized = true;
1293                } else {
1294                    if observed_frontier.is_empty() {
1295                        // We're done!
1296                        return Ok(());
1297                    }
1298                    // Maintain a sliding window: when the oldest batch becomes ready, retire it
1299                    // and mint a new future batch to keep the pipeline full
1300                    while let Some(oldest_desc) = minted_batches.front() {
1301                        let oldest_upper = &oldest_desc.1;
1302                        if !PartialOrder::less_equal(oldest_upper, &observed_frontier) {
1303                            break;
1304                        }
1305
1306                        let newest_upper = minted_batches.back().unwrap().1.clone();
1307                        let new_lower = newest_upper.clone();
1308                        let duration_ts = Timestamp::new(commit_interval.as_millis()
1309                            .try_into()
1310                            .expect("commit interval too large for u64"));
1311                        let new_upper = Antichain::from_elem(newest_upper
1312                            .as_option()
1313                            .unwrap()
1314                            .step_forward_by(&duration_ts));
1315
1316                        let new_batch_description = (new_lower.clone(), new_upper.clone());
1317                        minted_batches.pop_front();
1318                        minted_batches.push_back(new_batch_description.clone());
1319
1320                        batch_desc_output.give(&capset[0], new_batch_description);
1321
1322                        capset.downgrade(new_upper);
1323                    }
1324                }
1325            }
1326        })
1327    });
1328
1329    let statuses = errors.map(|error| HealthStatusMessage {
1330        id: None,
1331        update: HealthStatusUpdate::halting(format!("{}", error.display_with_causes()), None),
1332        namespace: StatusNamespace::Iceberg,
1333    });
1334    (
1335        output_stream.as_collection(),
1336        batch_desc_stream,
1337        table_ready_stream,
1338        statuses,
1339        button.press_on_drop(),
1340    )
1341}
1342
1343#[derive(Clone, Debug, Serialize, Deserialize)]
1344#[serde(try_from = "AvroDataFile", into = "AvroDataFile")]
1345struct SerializableDataFile {
1346    pub data_file: DataFile,
1347    pub schema: Schema,
1348}
1349
1350/// A wrapper around Iceberg's DataFile that implements Serialize and Deserialize.
1351/// This is slightly complicated by the fact that Iceberg's DataFile doesn't implement
1352/// these traits directly, so we serialize to/from Avro bytes (which Iceberg supports natively).
1353/// The avro ser(de) also requires the Iceberg schema to be provided, so we include that as well.
1354/// It is distinctly possible that this is overkill, but it avoids re-implementing
1355/// Iceberg's serialization logic here.
1356/// If at some point this becomes a serious overhead, we can revisit this decision.
1357#[derive(Clone, Debug, Serialize, Deserialize)]
1358struct AvroDataFile {
1359    pub data_file: Vec<u8>,
1360    /// Schema serialized as JSON bytes to avoid bincode issues with HashMap
1361    pub schema: Vec<u8>,
1362}
1363
1364impl From<SerializableDataFile> for AvroDataFile {
1365    fn from(value: SerializableDataFile) -> Self {
1366        let mut data_file = Vec::new();
1367        write_data_files_to_avro(
1368            &mut data_file,
1369            [value.data_file],
1370            &StructType::new(vec![]),
1371            FormatVersion::V2,
1372        )
1373        .expect("serialization into buffer");
1374        let schema = serde_json::to_vec(&value.schema).expect("schema serialization");
1375        AvroDataFile { data_file, schema }
1376    }
1377}
1378
1379impl TryFrom<AvroDataFile> for SerializableDataFile {
1380    type Error = String;
1381
1382    fn try_from(value: AvroDataFile) -> Result<Self, Self::Error> {
1383        let schema: Schema = serde_json::from_slice(&value.schema)
1384            .map_err(|e| format!("Failed to deserialize schema: {}", e))?;
1385        let data_files = read_data_files_from_avro(
1386            &mut &*value.data_file,
1387            &schema,
1388            0,
1389            &StructType::new(vec![]),
1390            FormatVersion::V2,
1391        )
1392        .map_err_to_string_with_causes()?;
1393        let Some(data_file) = data_files.into_iter().next() else {
1394            return Err("No DataFile found in Avro data".into());
1395        };
1396        Ok(SerializableDataFile { data_file, schema })
1397    }
1398}
1399
1400/// A DataFile along with its associated batch description (lower and upper bounds).
1401#[derive(Clone, Debug, Serialize, Deserialize)]
1402struct BoundedDataFile {
1403    pub data_file: SerializableDataFile,
1404    pub batch_desc: (Antichain<Timestamp>, Antichain<Timestamp>),
1405}
1406
1407impl BoundedDataFile {
1408    pub fn new(
1409        file: DataFile,
1410        schema: Schema,
1411        batch_desc: (Antichain<Timestamp>, Antichain<Timestamp>),
1412    ) -> Self {
1413        Self {
1414            data_file: SerializableDataFile {
1415                data_file: file,
1416                schema,
1417            },
1418            batch_desc,
1419        }
1420    }
1421
1422    pub fn batch_desc(&self) -> &(Antichain<Timestamp>, Antichain<Timestamp>) {
1423        &self.batch_desc
1424    }
1425
1426    pub fn data_file(&self) -> &DataFile {
1427        &self.data_file.data_file
1428    }
1429
1430    pub fn into_data_file(self) -> DataFile {
1431        self.data_file.data_file
1432    }
1433}
1434
1435/// A set of DataFiles along with their associated batch descriptions.
1436#[derive(Clone, Debug, Default)]
1437struct BoundedDataFileSet {
1438    pub data_files: Vec<BoundedDataFile>,
1439}
1440
1441/// Construct the envelope-specific closures that [`write_data_files`] needs.
1442///
1443/// Write rows into Parquet data files bounded by batch descriptions.
1444/// Rows are matched to batches by timestamp; if a batch description hasn't arrived yet,
1445/// rows are stashed until it does. This allows batches to be minted ahead of data arrival.
1446fn write_data_files<G, H: EnvelopeHandler + 'static>(
1447    name: String,
1448    input: VecCollection<G, (Option<Row>, DiffPair<Row>), Diff>,
1449    batch_desc_input: StreamVec<G, (Antichain<Timestamp>, Antichain<Timestamp>)>,
1450    table_ready_stream: StreamVec<G, Infallible>,
1451    as_of: Antichain<Timestamp>,
1452    connection: IcebergSinkConnection,
1453    storage_configuration: StorageConfiguration,
1454    materialize_arrow_schema: Arc<ArrowSchema>,
1455    metrics: Arc<IcebergSinkMetrics>,
1456    statistics: SinkStatistics,
1457) -> (
1458    StreamVec<G, BoundedDataFile>,
1459    StreamVec<G, HealthStatusMessage>,
1460    PressOnDropButton,
1461)
1462where
1463    G: Scope<Timestamp = Timestamp>,
1464{
1465    let scope = input.scope();
1466    let name_for_logging = name.clone();
1467    let mut builder = OperatorBuilder::new(name, scope.clone());
1468
1469    let (output, output_stream) = builder.new_output::<CapacityContainerBuilder<_>>();
1470
1471    let mut table_ready_input = builder.new_disconnected_input(table_ready_stream, Pipeline);
1472    let mut batch_desc_input =
1473        builder.new_input_for(batch_desc_input.broadcast(), Pipeline, &output);
1474    let mut input = builder.new_disconnected_input(input.inner, Pipeline);
1475
1476    let (button, errors) = builder.build_fallible(move |caps| {
1477        Box::pin(async move {
1478            let [capset]: &mut [_; 1] = caps.try_into().unwrap();
1479            let catalog = connection
1480                .catalog_connection
1481                .connect(&storage_configuration, InTask::Yes)
1482                .await
1483                .with_context(|| {
1484                    format!(
1485                        "Failed to connect to Iceberg catalog '{}' for table '{}.{}'",
1486                        connection.catalog_connection.uri, connection.namespace, connection.table
1487                    )
1488                })?;
1489
1490            let namespace_ident = NamespaceIdent::new(connection.namespace.clone());
1491            let table_ident = TableIdent::new(namespace_ident, connection.table.clone());
1492            while let Some(_) = table_ready_input.next().await {
1493                // Wait for table to be ready
1494            }
1495            let table = catalog
1496                .load_table(&table_ident)
1497                .await
1498                .with_context(|| {
1499                    format!(
1500                        "Failed to load Iceberg table '{}.{}' in write_data_files operator",
1501                        connection.namespace, connection.table
1502                    )
1503                })?;
1504
1505            let table_metadata = table.metadata().clone();
1506            let current_schema = Arc::clone(table_metadata.current_schema());
1507
1508            // Merge Materialize extension metadata into the Iceberg schema.
1509            // We need extension metadata for ArrowBuilder to work correctly (it uses
1510            // extension names to know how to handle different types like records vs arrays).
1511            let arrow_schema = Arc::new(
1512                merge_materialize_metadata_into_iceberg_schema(
1513                    materialize_arrow_schema.as_ref(),
1514                    current_schema.as_ref(),
1515                )
1516                .context("Failed to merge Materialize metadata into Iceberg schema")?,
1517            );
1518
1519            // WORKAROUND: S3 Tables catalog incorrectly sets location to the metadata file path
1520            // instead of the warehouse root. Strip off the /metadata/*.metadata.json suffix.
1521            // No clear way to detect this properly right now, so we use heuristics.
1522            let location = table_metadata.location();
1523            let corrected_location = match location.rsplit_once("/metadata/") {
1524                Some((a, b)) if b.ends_with(".metadata.json") => a,
1525                _ => location,
1526            };
1527
1528            let data_location = format!("{}/data", corrected_location);
1529            let location_generator = DefaultLocationGenerator::with_data_location(data_location);
1530
1531            // Add a unique suffix to avoid filename collisions across restarts and workers
1532            let unique_suffix = format!("-{}", uuid::Uuid::new_v4());
1533            let file_name_generator = DefaultFileNameGenerator::new(
1534                PARQUET_FILE_PREFIX.to_string(),
1535                Some(unique_suffix),
1536                iceberg::spec::DataFileFormat::Parquet,
1537            );
1538
1539            let file_io = table.file_io().clone();
1540
1541            let writer_properties = WriterProperties::new();
1542
1543            let ctx = WriterContext {
1544                arrow_schema,
1545                current_schema: Arc::clone(&current_schema),
1546                file_io,
1547                location_generator,
1548                file_name_generator,
1549                writer_properties,
1550            };
1551            let handler = H::new(ctx, &connection, &materialize_arrow_schema)?;
1552
1553            // Rows can arrive before their batch description due to dataflow parallelism.
1554            // Stash them until we know which batch they belong to.
1555            let mut stashed_rows: BTreeMap<Timestamp, Vec<(Option<Row>, DiffPair<Row>)>> =
1556                BTreeMap::new();
1557
1558            // Track batches currently being written. When a row arrives, we check if it belongs
1559            // to an in-flight batch. When frontiers advance to a batch's upper, we close the
1560            // writer and emit its data files downstream.
1561            // Antichains don't implement Ord, so we use a HashMap with tuple keys instead.
1562            #[allow(clippy::disallowed_types)]
1563            let mut in_flight_batches: std::collections::HashMap<
1564                (Antichain<Timestamp>, Antichain<Timestamp>),
1565                Box<dyn IcebergWriter>,
1566            > = std::collections::HashMap::new();
1567
1568            let mut batch_description_frontier = Antichain::from_elem(Timestamp::minimum());
1569            let mut processed_batch_description_frontier =
1570                Antichain::from_elem(Timestamp::minimum());
1571            let mut input_frontier = Antichain::from_elem(Timestamp::minimum());
1572            let mut processed_input_frontier = Antichain::from_elem(Timestamp::minimum());
1573
1574            // Track the minimum batch lower bound to prune data that's already committed
1575            let mut min_batch_lower: Option<Antichain<Timestamp>> = None;
1576
1577            while !(batch_description_frontier.is_empty() && input_frontier.is_empty()) {
1578                let mut staged_messages_since_flush: u64 = 0;
1579                tokio::select! {
1580                    _ = batch_desc_input.ready() => {},
1581                    _ = input.ready() => {}
1582                }
1583
1584                while let Some(event) = batch_desc_input.next_sync() {
1585                    match event {
1586                        Event::Data(_cap, data) => {
1587                            for batch_desc in data {
1588                                let (lower, upper) = &batch_desc;
1589
1590                                // Track the minimum batch lower bound (first batch received)
1591                                if min_batch_lower.is_none() {
1592                                    min_batch_lower = Some(lower.clone());
1593                                    debug!(
1594                                        "{}: set min_batch_lower to {}",
1595                                        name_for_logging,
1596                                        lower.pretty()
1597                                    );
1598
1599                                    // Prune any stashed rows that arrived before min_batch_lower (already committed)
1600                                    let to_remove: Vec<_> = stashed_rows
1601                                        .keys()
1602                                        .filter(|ts| {
1603                                            let ts_antichain = Antichain::from_elem((*ts).clone());
1604                                            PartialOrder::less_than(&ts_antichain, lower)
1605                                        })
1606                                        .cloned()
1607                                        .collect();
1608
1609                                    if !to_remove.is_empty() {
1610                                        let mut removed_count = 0;
1611                                        for ts in to_remove {
1612                                            if let Some(rows) = stashed_rows.remove(&ts) {
1613                                                removed_count += rows.len();
1614                                                for _ in &rows {
1615                                                    metrics.stashed_rows.dec();
1616                                                }
1617                                            }
1618                                        }
1619                                        debug!(
1620                                            "{}: pruned {} already-committed rows (< min_batch_lower)",
1621                                            name_for_logging,
1622                                            removed_count
1623                                        );
1624                                    }
1625                                }
1626
1627                                // Disable seen_rows tracking for snapshot batch to save memory
1628                                let is_snapshot = lower == &as_of;
1629                                debug!(
1630                                    "{}: received batch description [{}, {}), snapshot={}",
1631                                    name_for_logging,
1632                                    lower.pretty(),
1633                                    upper.pretty(),
1634                                    is_snapshot
1635                                );
1636                                let mut batch_writer =
1637                                    handler.create_writer(is_snapshot).await?;
1638                                // Drain any stashed rows that belong to this batch
1639                                let row_ts_keys: Vec<_> = stashed_rows.keys().cloned().collect();
1640                                let mut drained_count = 0;
1641                                for row_ts in row_ts_keys {
1642                                    let ts = Antichain::from_elem(row_ts.clone());
1643                                    if PartialOrder::less_equal(lower, &ts)
1644                                        && PartialOrder::less_than(&ts, upper)
1645                                    {
1646                                        if let Some(rows) = stashed_rows.remove(&row_ts) {
1647                                            drained_count += rows.len();
1648                                            for (_row, diff_pair) in rows {
1649                                                metrics.stashed_rows.dec();
1650                                                let record_batch = handler.row_to_batch(
1651                                                    diff_pair.clone(),
1652                                                    row_ts.clone(),
1653                                                )
1654                                                .context("failed to convert row to recordbatch")?;
1655                                                batch_writer.write(record_batch).await?;
1656                                                staged_messages_since_flush += 1;
1657                                                if staged_messages_since_flush >= 10_000 {
1658                                                    statistics.inc_messages_staged_by(
1659                                                        staged_messages_since_flush,
1660                                                    );
1661                                                    staged_messages_since_flush = 0;
1662                                                }
1663                                            }
1664                                        }
1665                                    }
1666                                }
1667                                if drained_count > 0 {
1668                                    debug!(
1669                                        "{}: drained {} stashed rows into batch [{}, {})",
1670                                        name_for_logging,
1671                                        drained_count,
1672                                        lower.pretty(),
1673                                        upper.pretty()
1674                                    );
1675                                }
1676                                let prev =
1677                                    in_flight_batches.insert(batch_desc.clone(), batch_writer);
1678                                if prev.is_some() {
1679                                    anyhow::bail!(
1680                                        "Duplicate batch description received for description {:?}",
1681                                        batch_desc
1682                                    );
1683                                }
1684                            }
1685                        }
1686                        Event::Progress(frontier) => {
1687                            batch_description_frontier = frontier;
1688                        }
1689                    }
1690                }
1691
1692                let ready_events = std::iter::from_fn(|| input.next_sync()).collect_vec();
1693                for event in ready_events {
1694                    match event {
1695                        Event::Data(_cap, data) => {
1696                            let mut dropped_per_time = BTreeMap::new();
1697                            let mut stashed_per_time = BTreeMap::new();
1698                            for ((row, diff_pair), ts, _diff) in data {
1699                                let row_ts = ts.clone();
1700                                let ts_antichain = Antichain::from_elem(row_ts.clone());
1701                                let mut written = false;
1702                                // Try writing the row to any in-flight batch it belongs to...
1703                                for (batch_desc, batch_writer) in in_flight_batches.iter_mut() {
1704                                    let (lower, upper) = batch_desc;
1705                                    if PartialOrder::less_equal(lower, &ts_antichain)
1706                                        && PartialOrder::less_than(&ts_antichain, upper)
1707                                    {
1708                                        let record_batch = handler.row_to_batch(
1709                                            diff_pair.clone(),
1710                                            row_ts.clone(),
1711                                        )
1712                                        .context("failed to convert row to recordbatch")?;
1713                                        batch_writer.write(record_batch).await?;
1714                                        staged_messages_since_flush += 1;
1715                                        if staged_messages_since_flush >= 10_000 {
1716                                            statistics.inc_messages_staged_by(
1717                                                staged_messages_since_flush,
1718                                            );
1719                                            staged_messages_since_flush = 0;
1720                                        }
1721                                        written = true;
1722                                        break;
1723                                    }
1724                                }
1725                                if !written {
1726                                    // Drop data that's before the first batch we received (already committed)
1727                                    if let Some(ref min_lower) = min_batch_lower {
1728                                        if PartialOrder::less_than(&ts_antichain, min_lower) {
1729                                            dropped_per_time
1730                                                .entry(ts_antichain.into_option().unwrap())
1731                                                .and_modify(|c| *c += 1)
1732                                                .or_insert(1);
1733                                            continue;
1734                                        }
1735                                    }
1736
1737                                    stashed_per_time.entry(ts).and_modify(|c| *c += 1).or_insert(1);
1738                                    let entry = stashed_rows.entry(row_ts).or_default();
1739                                    metrics.stashed_rows.inc();
1740                                    entry.push((row, diff_pair));
1741                                }
1742                            }
1743
1744                            for (ts, count) in dropped_per_time {
1745                                debug!(
1746                                    "{}: dropped {} rows at timestamp {} (< min_batch_lower, already committed)",
1747                                    name_for_logging, count, ts
1748                                );
1749                            }
1750
1751                            for (ts, count) in stashed_per_time {
1752                                debug!(
1753                                    "{}: stashed {} rows at timestamp {} (waiting for batch description)",
1754                                    name_for_logging, count, ts
1755                                );
1756                            }
1757                        }
1758                        Event::Progress(frontier) => {
1759                            input_frontier = frontier;
1760                        }
1761                    }
1762                }
1763                if staged_messages_since_flush > 0 {
1764                    statistics.inc_messages_staged_by(staged_messages_since_flush);
1765                }
1766
1767                // Check if frontiers have advanced, which may unlock batches ready to close
1768                if PartialOrder::less_than(
1769                    &processed_batch_description_frontier,
1770                    &batch_description_frontier,
1771                ) || PartialOrder::less_than(&processed_input_frontier, &input_frontier)
1772                {
1773                    // Close batches whose upper is now in the past
1774                    // Upper bounds are exclusive, so we check if upper is less_equal to the frontier.
1775                    // Remember: a frontier at x means all timestamps less than x have been observed.
1776                    // Or, in other words we still might yet see timestamps at [x, infinity). X itself will
1777                    // be covered by the _next_ batches lower inclusive bound, so we can safely close the batch if its upper is <= x.
1778                    let ready_batches: Vec<_> = in_flight_batches
1779                        .extract_if(|(lower, upper), _| {
1780                            PartialOrder::less_than(lower, &batch_description_frontier)
1781                                && PartialOrder::less_equal(upper, &input_frontier)
1782                        })
1783                        .collect();
1784
1785                    if !ready_batches.is_empty() {
1786                        debug!(
1787                            "{}: closing {} batches (batch_frontier: {}, input_frontier: {})",
1788                            name_for_logging,
1789                            ready_batches.len(),
1790                            batch_description_frontier.pretty(),
1791                            input_frontier.pretty()
1792                        );
1793                        let mut max_upper = Antichain::from_elem(Timestamp::minimum());
1794                        for (desc, mut batch_writer) in ready_batches {
1795                            let close_started_at = Instant::now();
1796                            let data_files = batch_writer.close().await;
1797                            metrics
1798                                .writer_close_duration_seconds
1799                                .observe(close_started_at.elapsed().as_secs_f64());
1800                            let data_files = data_files.context("Failed to close batch writer")?;
1801                            debug!(
1802                                "{}: closed batch [{}, {}), wrote {} files",
1803                                name_for_logging,
1804                                desc.0.pretty(),
1805                                desc.1.pretty(),
1806                                data_files.len()
1807                            );
1808                            for data_file in data_files {
1809                                match data_file.content_type() {
1810                                    iceberg::spec::DataContentType::Data => {
1811                                        metrics.data_files_written.inc();
1812                                    }
1813                                    iceberg::spec::DataContentType::PositionDeletes
1814                                    | iceberg::spec::DataContentType::EqualityDeletes => {
1815                                        metrics.delete_files_written.inc();
1816                                    }
1817                                }
1818                                statistics.inc_messages_staged_by(data_file.record_count());
1819                                statistics.inc_bytes_staged_by(data_file.file_size_in_bytes());
1820                                let file = BoundedDataFile::new(
1821                                    data_file,
1822                                    current_schema.as_ref().clone(),
1823                                    desc.clone(),
1824                                );
1825                                output.give(&capset[0], file);
1826                            }
1827
1828                            max_upper = max_upper.join(&desc.1);
1829                        }
1830
1831                        capset.downgrade(max_upper);
1832                    }
1833                    processed_batch_description_frontier.clone_from(&batch_description_frontier);
1834                    processed_input_frontier.clone_from(&input_frontier);
1835                }
1836            }
1837            Ok(())
1838        })
1839    });
1840
1841    let statuses = errors.map(|error| HealthStatusMessage {
1842        id: None,
1843        update: HealthStatusUpdate::halting(format!("{}", error.display_with_causes()), None),
1844        namespace: StatusNamespace::Iceberg,
1845    });
1846    (output_stream, statuses, button.press_on_drop())
1847}
1848
1849#[cfg(test)]
1850mod tests {
1851    use super::*;
1852    use mz_repr::SqlScalarType;
1853
1854    #[mz_ore::test]
1855    fn test_iceberg_type_overrides() {
1856        // UInt16 should override to Int32
1857        let result = iceberg_type_overrides(&SqlScalarType::UInt16);
1858        assert_eq!(result.unwrap().0, DataType::Int32);
1859
1860        // UInt32 should override to Int64
1861        let result = iceberg_type_overrides(&SqlScalarType::UInt32);
1862        assert_eq!(result.unwrap().0, DataType::Int64);
1863
1864        // UInt64 should override to Decimal128(20, 0)
1865        let result = iceberg_type_overrides(&SqlScalarType::UInt64);
1866        assert_eq!(
1867            result.unwrap().0,
1868            DataType::Decimal128(ICEBERG_UINT64_DECIMAL_PRECISION, 0)
1869        );
1870
1871        // MzTimestamp should override to Decimal128(20, 0)
1872        let result = iceberg_type_overrides(&SqlScalarType::MzTimestamp);
1873        assert_eq!(
1874            result.unwrap().0,
1875            DataType::Decimal128(ICEBERG_UINT64_DECIMAL_PRECISION, 0)
1876        );
1877
1878        // Other types should return None (use default)
1879        assert!(iceberg_type_overrides(&SqlScalarType::Int32).is_none());
1880        assert!(iceberg_type_overrides(&SqlScalarType::String).is_none());
1881        assert!(iceberg_type_overrides(&SqlScalarType::Bool).is_none());
1882    }
1883
1884    #[mz_ore::test]
1885    fn test_iceberg_schema_with_nested_uint64() {
1886        // Test that desc_to_schema_with_overrides handles nested UInt64
1887        // by using iceberg_type_overrides which applies recursively
1888        let desc = mz_repr::RelationDesc::builder()
1889            .with_column(
1890                "items",
1891                SqlScalarType::List {
1892                    element_type: Box::new(SqlScalarType::UInt64),
1893                    custom_id: None,
1894                }
1895                .nullable(true),
1896            )
1897            .finish();
1898
1899        let schema =
1900            mz_arrow_util::builder::desc_to_schema_with_overrides(&desc, iceberg_type_overrides)
1901                .expect("schema conversion should succeed");
1902
1903        // The inner element should be Decimal128, not UInt64
1904        if let DataType::List(field) = schema.field(0).data_type() {
1905            assert_eq!(
1906                field.data_type(),
1907                &DataType::Decimal128(ICEBERG_UINT64_DECIMAL_PRECISION, 0)
1908            );
1909        } else {
1910            panic!("Expected List type");
1911        }
1912    }
1913
1914    #[mz_ore::test]
1915    fn equality_ids_follow_iceberg_field_ids() {
1916        let map_entries = Field::new(
1917            "entries",
1918            DataType::Struct(
1919                vec![
1920                    Field::new("key", DataType::Utf8, false),
1921                    Field::new("value", DataType::Utf8, true),
1922                ]
1923                .into(),
1924            ),
1925            false,
1926        );
1927        let materialize_arrow_schema = ArrowSchema::new(vec![
1928            Field::new("attrs", DataType::Map(Arc::new(map_entries), false), true),
1929            Field::new("key_col", DataType::Int32, false),
1930        ]);
1931        let materialize_arrow_schema = add_field_ids_to_arrow_schema(materialize_arrow_schema);
1932        let iceberg_schema = arrow_schema_to_schema(&materialize_arrow_schema)
1933            .expect("schema conversion should succeed");
1934
1935        let equality_ids =
1936            equality_ids_for_indices(&iceberg_schema, &materialize_arrow_schema, &[1])
1937                .expect("field lookup should succeed");
1938
1939        let expected_id = iceberg_schema
1940            .as_struct()
1941            .field_by_name("key_col")
1942            .expect("top-level field should exist")
1943            .id;
1944        assert_eq!(equality_ids, vec![expected_id]);
1945        assert_ne!(expected_id, 2);
1946    }
1947}
1948
1949/// Commit completed batches to Iceberg as snapshots.
1950/// Batches are committed in timestamp order to ensure strong consistency guarantees downstream.
1951/// Each snapshot includes the Materialize frontier in its metadata for resume support.
1952fn commit_to_iceberg<G>(
1953    name: String,
1954    sink_id: GlobalId,
1955    sink_version: u64,
1956    batch_input: StreamVec<G, BoundedDataFile>,
1957    batch_desc_input: StreamVec<G, (Antichain<Timestamp>, Antichain<Timestamp>)>,
1958    table_ready_stream: StreamVec<G, Infallible>,
1959    write_frontier: Rc<RefCell<Antichain<Timestamp>>>,
1960    connection: IcebergSinkConnection,
1961    storage_configuration: StorageConfiguration,
1962    write_handle: impl Future<
1963        Output = anyhow::Result<WriteHandle<SourceData, (), Timestamp, StorageDiff>>,
1964    > + 'static,
1965    metrics: Arc<IcebergSinkMetrics>,
1966    statistics: SinkStatistics,
1967) -> (StreamVec<G, HealthStatusMessage>, PressOnDropButton)
1968where
1969    G: Scope<Timestamp = Timestamp>,
1970{
1971    let scope = batch_input.scope();
1972    let mut builder = OperatorBuilder::new(name, scope.clone());
1973
1974    let hashed_id = sink_id.hashed();
1975    let is_active_worker = usize::cast_from(hashed_id) % scope.peers() == scope.index();
1976    let name_for_logging = format!("{sink_id}-commit-to-iceberg");
1977
1978    let mut input = builder.new_disconnected_input(batch_input, Exchange::new(move |_| hashed_id));
1979    let mut batch_desc_input =
1980        builder.new_disconnected_input(batch_desc_input, Exchange::new(move |_| hashed_id));
1981    let mut table_ready_input = builder.new_disconnected_input(table_ready_stream, Pipeline);
1982
1983    let (button, errors) = builder.build_fallible(move |_caps| {
1984        Box::pin(async move {
1985            if !is_active_worker {
1986                write_frontier.borrow_mut().clear();
1987                return Ok(());
1988            }
1989
1990            let catalog = connection
1991                .catalog_connection
1992                .connect(&storage_configuration, InTask::Yes)
1993                .await
1994                .with_context(|| {
1995                    format!(
1996                        "Failed to connect to Iceberg catalog '{}' for table '{}.{}'",
1997                        connection.catalog_connection.uri, connection.namespace, connection.table
1998                    )
1999                })?;
2000
2001            let mut write_handle = write_handle.await?;
2002
2003            let namespace_ident = NamespaceIdent::new(connection.namespace.clone());
2004            let table_ident = TableIdent::new(namespace_ident, connection.table.clone());
2005            while let Some(_) = table_ready_input.next().await {
2006                // Wait for table to be ready
2007            }
2008            let mut table = catalog.load_table(&table_ident).await.with_context(|| {
2009                format!(
2010                    "Failed to load Iceberg table '{}.{}' in commit_to_iceberg operator",
2011                    connection.namespace, connection.table
2012                )
2013            })?;
2014
2015            #[allow(clippy::disallowed_types)]
2016            let mut batch_descriptions: std::collections::HashMap<
2017                (Antichain<Timestamp>, Antichain<Timestamp>),
2018                BoundedDataFileSet,
2019            > = std::collections::HashMap::new();
2020
2021            let mut batch_description_frontier = Antichain::from_elem(Timestamp::minimum());
2022            let mut input_frontier = Antichain::from_elem(Timestamp::minimum());
2023
2024            while !(batch_description_frontier.is_empty() && input_frontier.is_empty()) {
2025                tokio::select! {
2026                    _ = batch_desc_input.ready() => {},
2027                    _ = input.ready() => {}
2028                }
2029
2030                while let Some(event) = batch_desc_input.next_sync() {
2031                    match event {
2032                        Event::Data(_cap, data) => {
2033                            for batch_desc in data {
2034                                let prev = batch_descriptions
2035                                    .insert(batch_desc, BoundedDataFileSet { data_files: vec![] });
2036                                if let Some(prev) = prev {
2037                                    anyhow::bail!(
2038                                        "Duplicate batch description received \
2039                                         in commit operator: {:?}",
2040                                        prev
2041                                    );
2042                                }
2043                            }
2044                        }
2045                        Event::Progress(frontier) => {
2046                            batch_description_frontier = frontier;
2047                        }
2048                    }
2049                }
2050
2051                let ready_events = std::iter::from_fn(|| input.next_sync()).collect_vec();
2052                for event in ready_events {
2053                    match event {
2054                        Event::Data(_cap, data) => {
2055                            for bounded_data_file in data {
2056                                let entry = batch_descriptions
2057                                    .entry(bounded_data_file.batch_desc().clone())
2058                                    .or_default();
2059                                entry.data_files.push(bounded_data_file);
2060                            }
2061                        }
2062                        Event::Progress(frontier) => {
2063                            input_frontier = frontier;
2064                        }
2065                    }
2066                }
2067
2068                // Collect batches whose data files have all arrived.
2069                // The writer emits all data files for a batch at a capability <= the batch's
2070                // lower bound, then downgrades its capability to the batch's upper bound.
2071                // So once the input frontier advances past lower, we know the writer has
2072                // finished emitting files for this batch and dropped its capability.
2073                let mut done_batches: Vec<_> = batch_descriptions
2074                    .keys()
2075                    .filter(|(lower, _upper)| PartialOrder::less_than(lower, &input_frontier))
2076                    .cloned()
2077                    .collect();
2078
2079                // Commit batches in timestamp order to maintain consistency
2080                done_batches.sort_by(|a, b| {
2081                    if PartialOrder::less_than(a, b) {
2082                        Ordering::Less
2083                    } else if PartialOrder::less_than(b, a) {
2084                        Ordering::Greater
2085                    } else {
2086                        Ordering::Equal
2087                    }
2088                });
2089
2090                for batch in done_batches {
2091                    let file_set = batch_descriptions.remove(&batch).unwrap();
2092
2093                    let mut data_files = vec![];
2094                    let mut delete_files = vec![];
2095                    // Track totals for committed statistics
2096                    let mut total_messages: u64 = 0;
2097                    let mut total_bytes: u64 = 0;
2098                    for file in file_set.data_files {
2099                        total_messages += file.data_file().record_count();
2100                        total_bytes += file.data_file().file_size_in_bytes();
2101                        match file.data_file().content_type() {
2102                            iceberg::spec::DataContentType::Data => {
2103                                data_files.push(file.into_data_file());
2104                            }
2105                            iceberg::spec::DataContentType::PositionDeletes
2106                            | iceberg::spec::DataContentType::EqualityDeletes => {
2107                                delete_files.push(file.into_data_file());
2108                            }
2109                        }
2110                    }
2111
2112                    debug!(
2113                        ?sink_id,
2114                        %name_for_logging,
2115                        lower = %batch.0.pretty(),
2116                        upper = %batch.1.pretty(),
2117                        data_files = data_files.len(),
2118                        delete_files = delete_files.len(),
2119                        total_messages,
2120                        total_bytes,
2121                        "iceberg commit applying batch"
2122                    );
2123
2124                    let instant = Instant::now();
2125
2126                    let frontier = batch.1.clone();
2127                    let frontier_json = serde_json::to_string(&frontier.elements())
2128                        .context("Failed to serialize frontier to JSON")?;
2129                    let snapshot_properties = vec![
2130                        ("mz-sink-id".to_string(), sink_id.to_string()),
2131                        ("mz-frontier".to_string(), frontier_json),
2132                        ("mz-sink-version".to_string(), sink_version.to_string()),
2133                    ];
2134
2135                    let (table_state, commit_result) = Retry::default()
2136                        .max_tries(5)
2137                        .retry_async_with_state(table, |_, table| {
2138                            let snapshot_properties = snapshot_properties.clone();
2139                            let data_files = data_files.clone();
2140                            let delete_files = delete_files.clone();
2141                            let metrics = Arc::clone(&metrics);
2142                            let catalog = Arc::clone(&catalog);
2143                            let conn_namespace = connection.namespace.clone();
2144                            let conn_table = connection.table.clone();
2145                            let frontier = frontier.clone();
2146                            let batch_lower = batch.0.clone();
2147                            let batch_upper = batch.1.clone();
2148                            async move {
2149                                try_commit_batch(
2150                                    table,
2151                                    snapshot_properties,
2152                                    data_files,
2153                                    delete_files,
2154                                    catalog.as_ref(),
2155                                    &conn_namespace,
2156                                    &conn_table,
2157                                    sink_version,
2158                                    &frontier,
2159                                    &batch_lower,
2160                                    &batch_upper,
2161                                    &metrics,
2162                                )
2163                                .await
2164                            }
2165                        })
2166                        .await;
2167                    let commit_result = commit_result.with_context(|| {
2168                        format!(
2169                            "failed to commit batch to Iceberg table '{}.{}'",
2170                            connection.namespace, connection.table
2171                        )
2172                    });
2173                    table = table_state;
2174                    let duration = instant.elapsed();
2175                    metrics
2176                        .commit_duration_seconds
2177                        .observe(duration.as_secs_f64());
2178                    commit_result?;
2179
2180                    debug!(
2181                        ?sink_id,
2182                        %name_for_logging,
2183                        lower = %batch.0.pretty(),
2184                        upper = %batch.1.pretty(),
2185                        total_messages,
2186                        total_bytes,
2187                        ?duration,
2188                        "iceberg commit applied batch"
2189                    );
2190
2191                    metrics.snapshots_committed.inc();
2192                    statistics.inc_messages_committed_by(total_messages);
2193                    statistics.inc_bytes_committed_by(total_bytes);
2194
2195                    let mut expect_upper = write_handle.shared_upper();
2196                    loop {
2197                        if PartialOrder::less_equal(&frontier, &expect_upper) {
2198                            // The frontier has already been advanced as far as necessary.
2199                            break;
2200                        }
2201
2202                        const EMPTY: &[((SourceData, ()), Timestamp, StorageDiff)] = &[];
2203                        match write_handle
2204                            .compare_and_append(EMPTY, expect_upper, frontier.clone())
2205                            .await
2206                            .expect("valid usage")
2207                        {
2208                            Ok(()) => break,
2209                            Err(mismatch) => {
2210                                expect_upper = mismatch.current;
2211                            }
2212                        }
2213                    }
2214                    write_frontier.borrow_mut().clone_from(&frontier);
2215                }
2216            }
2217
2218            Ok(())
2219        })
2220    });
2221
2222    let statuses = errors.map(|error| HealthStatusMessage {
2223        id: None,
2224        update: HealthStatusUpdate::halting(format!("{}", error.display_with_causes()), None),
2225        namespace: StatusNamespace::Iceberg,
2226    });
2227
2228    (statuses, button.press_on_drop())
2229}
2230
2231impl<G: Scope<Timestamp = Timestamp>> SinkRender<G> for IcebergSinkConnection {
2232    fn get_key_indices(&self) -> Option<&[usize]> {
2233        self.key_desc_and_indices
2234            .as_ref()
2235            .map(|(_, indices)| indices.as_slice())
2236    }
2237
2238    fn get_relation_key_indices(&self) -> Option<&[usize]> {
2239        self.relation_key_indices.as_deref()
2240    }
2241
2242    fn render_sink(
2243        &self,
2244        storage_state: &mut StorageState,
2245        sink: &StorageSinkDesc<CollectionMetadata, Timestamp>,
2246        sink_id: GlobalId,
2247        input: VecCollection<G, (Option<Row>, DiffPair<Row>), Diff>,
2248        _err_collection: VecCollection<G, DataflowError, Diff>,
2249    ) -> (StreamVec<G, HealthStatusMessage>, Vec<PressOnDropButton>) {
2250        let mut scope = input.scope();
2251
2252        let write_handle = {
2253            let persist = Arc::clone(&storage_state.persist_clients);
2254            let shard_meta = sink.to_storage_metadata.clone();
2255            async move {
2256                let client = persist.open(shard_meta.persist_location).await?;
2257                let handle = client
2258                    .open_writer(
2259                        shard_meta.data_shard,
2260                        Arc::new(shard_meta.relation_desc),
2261                        Arc::new(UnitSchema),
2262                        Diagnostics::from_purpose("sink handle"),
2263                    )
2264                    .await?;
2265                Ok(handle)
2266            }
2267        };
2268
2269        let write_frontier = Rc::new(RefCell::new(Antichain::from_elem(Timestamp::minimum())));
2270        storage_state
2271            .sink_write_frontiers
2272            .insert(sink_id, Rc::clone(&write_frontier));
2273
2274        let (arrow_schema_with_ids, iceberg_schema) =
2275            match (|| -> Result<(ArrowSchema, Arc<Schema>), anyhow::Error> {
2276                let (arrow_schema_with_ids, iceberg_schema) =
2277                    relation_desc_to_iceberg_schema(&sink.from_desc)?;
2278
2279                Ok(if sink.envelope == SinkEnvelope::Append {
2280                    // For append mode, extend the Arrow and Iceberg schemas with the user-visible
2281                    // `_mz_diff` and `_mz_timestamp` columns. The minter uses `iceberg_schema` to create
2282                    // the Iceberg table, and `write_data_files` uses `arrow_schema_with_ids` when
2283                    // merging metadata. Both must include these columns before any operator starts.
2284                    let extended_arrow = build_schema_with_append_columns(&arrow_schema_with_ids);
2285                    let extended_iceberg = Arc::new(
2286                        arrow_schema_to_schema(&extended_arrow)
2287                            .context("Failed to build Iceberg schema with append columns")?,
2288                    );
2289                    (extended_arrow, extended_iceberg)
2290                } else {
2291                    (arrow_schema_with_ids, iceberg_schema)
2292                })
2293            })() {
2294                Ok(schemas) => schemas,
2295                Err(err) => {
2296                    let error_stream = std::iter::once(HealthStatusMessage {
2297                        id: None,
2298                        update: HealthStatusUpdate::halting(
2299                            format!("{}", err.display_with_causes()),
2300                            None,
2301                        ),
2302                        namespace: StatusNamespace::Iceberg,
2303                    })
2304                    .to_stream(&mut scope);
2305                    return (error_stream, vec![]);
2306                }
2307            };
2308
2309        let metrics = Arc::new(
2310            storage_state
2311                .metrics
2312                .get_iceberg_sink_metrics(sink_id, scope.index()),
2313        );
2314
2315        let statistics = storage_state
2316            .aggregated_statistics
2317            .get_sink(&sink_id)
2318            .expect("statistics initialized")
2319            .clone();
2320
2321        let connection_for_minter = self.clone();
2322        let (minted_input, batch_descriptions, table_ready, mint_status, mint_button) =
2323            mint_batch_descriptions(
2324                format!("{sink_id}-iceberg-mint"),
2325                sink_id,
2326                input,
2327                sink,
2328                connection_for_minter,
2329                storage_state.storage_configuration.clone(),
2330                Arc::clone(&iceberg_schema),
2331            );
2332
2333        let connection_for_writer = self.clone();
2334        let (datafiles, write_status, write_button) = match sink.envelope {
2335            SinkEnvelope::Upsert => write_data_files::<_, UpsertEnvelopeHandler>(
2336                format!("{sink_id}-write-data-files"),
2337                minted_input,
2338                batch_descriptions.clone(),
2339                table_ready.clone(),
2340                sink.as_of.clone(),
2341                connection_for_writer,
2342                storage_state.storage_configuration.clone(),
2343                Arc::new(arrow_schema_with_ids.clone()),
2344                Arc::clone(&metrics),
2345                statistics.clone(),
2346            ),
2347            SinkEnvelope::Append => write_data_files::<_, AppendEnvelopeHandler>(
2348                format!("{sink_id}-write-data-files"),
2349                minted_input,
2350                batch_descriptions.clone(),
2351                table_ready.clone(),
2352                sink.as_of.clone(),
2353                connection_for_writer,
2354                storage_state.storage_configuration.clone(),
2355                Arc::new(arrow_schema_with_ids.clone()),
2356                Arc::clone(&metrics),
2357                statistics.clone(),
2358            ),
2359            SinkEnvelope::Debezium => {
2360                unreachable!("Iceberg sink only supports Upsert and Append envelopes")
2361            }
2362        };
2363
2364        let connection_for_committer = self.clone();
2365        let (commit_status, commit_button) = commit_to_iceberg(
2366            format!("{sink_id}-commit-to-iceberg"),
2367            sink_id,
2368            sink.version,
2369            datafiles,
2370            batch_descriptions,
2371            table_ready,
2372            Rc::clone(&write_frontier),
2373            connection_for_committer,
2374            storage_state.storage_configuration.clone(),
2375            write_handle,
2376            Arc::clone(&metrics),
2377            statistics,
2378        );
2379
2380        let running_status = Some(HealthStatusMessage {
2381            id: None,
2382            update: HealthStatusUpdate::running(),
2383            namespace: StatusNamespace::Iceberg,
2384        })
2385        .to_stream(&mut scope);
2386
2387        let statuses =
2388            scope.concatenate([running_status, mint_status, write_status, commit_status]);
2389
2390        (statuses, vec![mint_button, write_button, commit_button])
2391    }
2392}