Skip to main content

mz_storage/sink/
iceberg.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Iceberg sink implementation.
11//!
12//! This code renders a [`IcebergSinkConnection`] into a dataflow that writes
13//! data to an Iceberg table. The dataflow consists of three operators:
14//!
15//! ```text
16//!        ┏━━━━━━━━━━━━━━┓
17//!        ┃   persist    ┃
18//!        ┃    source    ┃
19//!        ┗━━━━━━┯━━━━━━━┛
20//!               │ row data, the input to this module
21//!               │
22//!        ┏━━━━━━v━━━━━━━┓
23//!        ┃     mint     ┃ (single worker)
24//!        ┃    batch     ┃ loads/creates the Iceberg table,
25//!        ┃ descriptions ┃ determines resume upper
26//!        ┗━━━┯━━━━━┯━━━━┛
27//!            │     │ batch descriptions (broadcast)
28//!       rows │     ├─────────────────────────┐
29//!            │     │                         │
30//!        ┏━━━v━━━━━v━━━━┓    ╭─────────────╮ │
31//!        ┃    write     ┃───>│ S3 / object │ │
32//!        ┃  data files  ┃    │   storage   │ │
33//!        ┗━━━━━━┯━━━━━━━┛    ╰─────────────╯ │
34//!               │ file metadata              │
35//!               │                            │
36//!        ┏━━━━━━v━━━━━━━━━━━━━━━━━━━━━━━━━━━━v┓
37//!        ┃           commit to               ┃ (single worker)
38//!        ┃             iceberg               ┃
39//!        ┗━━━━━━━━━━━━━━┯━━━━━━━━━━━━━━━━━━━━━┛
40//!                      │
41//!              ╭───────v───────╮
42//!              │ Iceberg table │
43//!              │  (snapshots)  │
44//!              ╰───────────────╯
45//! ```
46//! # Minting batch descriptions
47//! The "mint batch descriptions" operator is responsible for generating
48//! time-based batch boundaries that group writes into Iceberg snapshots.
49//! It maintains a sliding window of future batch descriptions so that
50//! writers can start processing data even while earlier batches are still being written.
51//! Knowing the batch boundaries ahead of time is important because we need to
52//! be able to make the claim that all data files written for a given batch
53//! include all data up to the upper `t` but not beyond it.
54//! This could be trivially achieved by waiting for all data to arrive up to a certain
55//! frontier, but that would prevent us from streaming writes out to object storage
56//! until the entire batch is complete, which would increase latency and reduce throughput.
57//!
58//! # Writing data files
59//! The "write data files" operator receives rows along with batch descriptions.
60//! It matches rows to batches by timestamp; if a batch description hasn't arrived yet,
61//! rows are stashed until it does. This allows batches to be minted ahead of data arrival.
62//! The operator uses an Iceberg `DeltaWriter` to write Parquet data files
63//! (and position delete files if necessary) to object storage.
64//! It outputs metadata about the written files along with their batch descriptions
65//! for the commit operator to consume.
66//!
67//! # Committing to Iceberg
68//! The "commit to iceberg" operator receives metadata about written data files
69//! along with their batch descriptions. It groups files by batch and creates
70//! Iceberg snapshots that include all files for each batch. It updates the Iceberg
71//! table's metadata to reflect the new snapshots, including updating the
72//! `mz-frontier` property to track progress.
73
74use std::cmp::Ordering;
75use std::collections::{BTreeMap, VecDeque};
76use std::convert::Infallible;
77use std::time::Instant;
78use std::{cell::RefCell, rc::Rc, sync::Arc};
79
80use anyhow::{Context, anyhow};
81use arrow::array::{ArrayRef, Int32Array, RecordBatch};
82use arrow::datatypes::{DataType, Field, Schema as ArrowSchema, SchemaRef as ArrowSchemaRef};
83use differential_dataflow::lattice::Lattice;
84use differential_dataflow::{AsCollection, Hashable, VecCollection};
85use futures::StreamExt;
86use iceberg::ErrorKind;
87use iceberg::arrow::{arrow_schema_to_schema, schema_to_arrow_schema};
88use iceberg::spec::{
89    DataFile, FormatVersion, Snapshot, StructType, read_data_files_from_avro,
90    write_data_files_to_avro,
91};
92use iceberg::spec::{Schema, SchemaRef};
93use iceberg::table::Table;
94use iceberg::transaction::{ApplyTransactionAction, Transaction};
95use iceberg::writer::base_writer::data_file_writer::DataFileWriter;
96use iceberg::writer::base_writer::data_file_writer::DataFileWriterBuilder;
97use iceberg::writer::base_writer::equality_delete_writer::{
98    EqualityDeleteFileWriter, EqualityDeleteFileWriterBuilder, EqualityDeleteWriterConfig,
99};
100use iceberg::writer::base_writer::position_delete_writer::PositionDeleteFileWriter;
101use iceberg::writer::base_writer::position_delete_writer::{
102    PositionDeleteFileWriterBuilder, PositionDeleteWriterConfig,
103};
104use iceberg::writer::combined_writer::delta_writer::{DeltaWriter, DeltaWriterBuilder};
105use iceberg::writer::file_writer::ParquetWriterBuilder;
106use iceberg::writer::file_writer::location_generator::{
107    DefaultFileNameGenerator, DefaultLocationGenerator,
108};
109use iceberg::writer::file_writer::rolling_writer::RollingFileWriterBuilder;
110use iceberg::writer::{IcebergWriter, IcebergWriterBuilder};
111use iceberg::{Catalog, NamespaceIdent, TableCreation, TableIdent};
112use itertools::Itertools;
113use mz_arrow_util::builder::{ARROW_EXTENSION_NAME_KEY, ArrowBuilder};
114use mz_interchange::avro::DiffPair;
115use mz_ore::cast::CastFrom;
116use mz_ore::error::ErrorExt;
117use mz_ore::future::InTask;
118use mz_ore::result::ResultExt;
119use mz_ore::retry::{Retry, RetryResult};
120use mz_persist_client::Diagnostics;
121use mz_persist_client::write::WriteHandle;
122use mz_persist_types::codec_impls::UnitSchema;
123use mz_repr::{Diff, GlobalId, Row, Timestamp};
124use mz_storage_types::StorageDiff;
125use mz_storage_types::configuration::StorageConfiguration;
126use mz_storage_types::controller::CollectionMetadata;
127use mz_storage_types::errors::DataflowError;
128use mz_storage_types::sinks::{IcebergSinkConnection, StorageSinkDesc};
129use mz_storage_types::sources::SourceData;
130use mz_timely_util::antichain::AntichainExt;
131use mz_timely_util::builder_async::{Event, OperatorBuilder, PressOnDropButton};
132use parquet::arrow::PARQUET_FIELD_ID_META_KEY;
133use parquet::file::properties::WriterProperties;
134use serde::{Deserialize, Serialize};
135use timely::PartialOrder;
136use timely::container::CapacityContainerBuilder;
137use timely::dataflow::channels::pact::{Exchange, Pipeline};
138use timely::dataflow::operators::{Broadcast, CapabilitySet, Concatenate, Map, ToStream};
139use timely::dataflow::{Scope, Stream};
140use timely::progress::{Antichain, Timestamp as _};
141use tracing::debug;
142
143use crate::healthcheck::{HealthStatusMessage, HealthStatusUpdate, StatusNamespace};
144use crate::metrics::sink::iceberg::IcebergSinkMetrics;
145use crate::render::sinks::SinkRender;
146use crate::statistics::SinkStatistics;
147use crate::storage_state::StorageState;
148
149/// Set the default capacity for the array builders inside the ArrowBuilder. This is the
150/// number of items each builder can hold before it needs to allocate more memory.
151const DEFAULT_ARRAY_BUILDER_ITEM_CAPACITY: usize = 1024;
152/// Set the default buffer capacity for the string and binary array builders inside the
153/// ArrowBuilder. This is the number of bytes each builder can hold before it needs to allocate
154/// more memory.
155const DEFAULT_ARRAY_BUILDER_DATA_CAPACITY: usize = 1024;
156
157/// The prefix for Parquet files written by this sink.
158const PARQUET_FILE_PREFIX: &str = "mz_data";
159/// The number of batch descriptions to mint ahead of the observed frontier. This determines how
160/// many batches we have in-flight at any given time.
161const INITIAL_DESCRIPTIONS_TO_MINT: u64 = 3;
162
163type DeltaWriterType = DeltaWriter<
164    DataFileWriter<ParquetWriterBuilder, DefaultLocationGenerator, DefaultFileNameGenerator>,
165    PositionDeleteFileWriter<
166        ParquetWriterBuilder,
167        DefaultLocationGenerator,
168        DefaultFileNameGenerator,
169    >,
170    EqualityDeleteFileWriter<
171        ParquetWriterBuilder,
172        DefaultLocationGenerator,
173        DefaultFileNameGenerator,
174    >,
175>;
176
177/// Add Parquet field IDs to an Arrow schema. Iceberg requires field IDs in the
178/// Parquet metadata for schema evolution tracking. Field IDs are assigned
179/// recursively to all nested fields (structs, lists, maps) using a depth-first,
180/// pre-order traversal.
181fn add_field_ids_to_arrow_schema(schema: ArrowSchema) -> ArrowSchema {
182    let mut next_field_id = 1i32;
183    let fields: Vec<Field> = schema
184        .fields()
185        .iter()
186        .map(|field| add_field_ids_recursive(field, &mut next_field_id))
187        .collect();
188    ArrowSchema::new(fields).with_metadata(schema.metadata().clone())
189}
190
191/// Recursively add field IDs to a field and all its nested children.
192fn add_field_ids_recursive(field: &Field, next_id: &mut i32) -> Field {
193    let current_id = *next_id;
194    *next_id += 1;
195
196    let mut metadata = field.metadata().clone();
197    metadata.insert(
198        PARQUET_FIELD_ID_META_KEY.to_string(),
199        current_id.to_string(),
200    );
201
202    let new_data_type = add_field_ids_to_datatype(field.data_type(), next_id);
203
204    Field::new(field.name(), new_data_type, field.is_nullable()).with_metadata(metadata)
205}
206
207/// Add field IDs to nested fields within a DataType.
208fn add_field_ids_to_datatype(data_type: &DataType, next_id: &mut i32) -> DataType {
209    match data_type {
210        DataType::Struct(fields) => {
211            let new_fields: Vec<Field> = fields
212                .iter()
213                .map(|f| add_field_ids_recursive(f, next_id))
214                .collect();
215            DataType::Struct(new_fields.into())
216        }
217        DataType::List(element_field) => {
218            let new_element = add_field_ids_recursive(element_field, next_id);
219            DataType::List(Arc::new(new_element))
220        }
221        DataType::LargeList(element_field) => {
222            let new_element = add_field_ids_recursive(element_field, next_id);
223            DataType::LargeList(Arc::new(new_element))
224        }
225        DataType::Map(entries_field, sorted) => {
226            let new_entries = add_field_ids_recursive(entries_field, next_id);
227            DataType::Map(Arc::new(new_entries), *sorted)
228        }
229        _ => data_type.clone(),
230    }
231}
232
233/// Merge Materialize extension metadata into Iceberg's Arrow schema.
234/// This uses Iceberg's data types (e.g. Utf8) and field IDs while preserving
235/// Materialize's extension names for ArrowBuilder compatibility.
236/// Handles nested types (structs, lists, maps) recursively.
237fn merge_materialize_metadata_into_iceberg_schema(
238    materialize_arrow_schema: &ArrowSchema,
239    iceberg_schema: &Schema,
240) -> anyhow::Result<ArrowSchema> {
241    // First, convert Iceberg schema to Arrow (this gives us the correct data types)
242    let iceberg_arrow_schema = schema_to_arrow_schema(iceberg_schema)
243        .context("Failed to convert Iceberg schema to Arrow schema")?;
244
245    // Now merge in the Materialize extension metadata
246    let fields: Vec<Field> = iceberg_arrow_schema
247        .fields()
248        .iter()
249        .map(|iceberg_field| {
250            // Find the corresponding Materialize field by name to get extension metadata
251            let mz_field = materialize_arrow_schema
252                .field_with_name(iceberg_field.name())
253                .with_context(|| {
254                    format!(
255                        "Field '{}' not found in Materialize schema",
256                        iceberg_field.name()
257                    )
258                })?;
259
260            merge_field_metadata_recursive(iceberg_field, Some(mz_field))
261        })
262        .collect::<anyhow::Result<Vec<_>>>()?;
263
264    Ok(ArrowSchema::new(fields).with_metadata(iceberg_arrow_schema.metadata().clone()))
265}
266
267/// Recursively merge Materialize extension metadata into an Iceberg field.
268fn merge_field_metadata_recursive(
269    iceberg_field: &Field,
270    mz_field: Option<&Field>,
271) -> anyhow::Result<Field> {
272    // Start with Iceberg field's metadata (which includes field IDs)
273    let mut metadata = iceberg_field.metadata().clone();
274
275    // Add Materialize extension name if available
276    if let Some(mz_f) = mz_field {
277        if let Some(extension_name) = mz_f.metadata().get(ARROW_EXTENSION_NAME_KEY) {
278            metadata.insert(ARROW_EXTENSION_NAME_KEY.to_string(), extension_name.clone());
279        }
280    }
281
282    // Recursively process nested types
283    let new_data_type = match iceberg_field.data_type() {
284        DataType::Struct(iceberg_fields) => {
285            let mz_struct_fields = match mz_field {
286                Some(f) => match f.data_type() {
287                    DataType::Struct(fields) => Some(fields),
288                    other => anyhow::bail!(
289                        "Type mismatch for field '{}': Iceberg schema has Struct, but Materialize schema has {:?}",
290                        iceberg_field.name(),
291                        other
292                    ),
293                },
294                None => None,
295            };
296
297            let new_fields: Vec<Field> = iceberg_fields
298                .iter()
299                .map(|iceberg_inner| {
300                    let mz_inner = mz_struct_fields.and_then(|fields| {
301                        fields.iter().find(|f| f.name() == iceberg_inner.name())
302                    });
303                    merge_field_metadata_recursive(iceberg_inner, mz_inner.map(|f| f.as_ref()))
304                })
305                .collect::<anyhow::Result<Vec<_>>>()?;
306
307            DataType::Struct(new_fields.into())
308        }
309        DataType::List(iceberg_element) => {
310            let mz_element = match mz_field {
311                Some(f) => match f.data_type() {
312                    DataType::List(element) => Some(element.as_ref()),
313                    other => anyhow::bail!(
314                        "Type mismatch for field '{}': Iceberg schema has List, but Materialize schema has {:?}",
315                        iceberg_field.name(),
316                        other
317                    ),
318                },
319                None => None,
320            };
321            let new_element = merge_field_metadata_recursive(iceberg_element, mz_element)?;
322            DataType::List(Arc::new(new_element))
323        }
324        DataType::LargeList(iceberg_element) => {
325            let mz_element = match mz_field {
326                Some(f) => match f.data_type() {
327                    DataType::LargeList(element) => Some(element.as_ref()),
328                    other => anyhow::bail!(
329                        "Type mismatch for field '{}': Iceberg schema has LargeList, but Materialize schema has {:?}",
330                        iceberg_field.name(),
331                        other
332                    ),
333                },
334                None => None,
335            };
336            let new_element = merge_field_metadata_recursive(iceberg_element, mz_element)?;
337            DataType::LargeList(Arc::new(new_element))
338        }
339        DataType::Map(iceberg_entries, sorted) => {
340            let mz_entries = match mz_field {
341                Some(f) => match f.data_type() {
342                    DataType::Map(entries, _) => Some(entries.as_ref()),
343                    other => anyhow::bail!(
344                        "Type mismatch for field '{}': Iceberg schema has Map, but Materialize schema has {:?}",
345                        iceberg_field.name(),
346                        other
347                    ),
348                },
349                None => None,
350            };
351            let new_entries = merge_field_metadata_recursive(iceberg_entries, mz_entries)?;
352            DataType::Map(Arc::new(new_entries), *sorted)
353        }
354        other => other.clone(),
355    };
356
357    Ok(Field::new(
358        iceberg_field.name(),
359        new_data_type,
360        iceberg_field.is_nullable(),
361    )
362    .with_metadata(metadata))
363}
364
365async fn reload_table(
366    catalog: &dyn Catalog,
367    namespace: String,
368    table_name: String,
369    current_table: Table,
370) -> anyhow::Result<Table> {
371    let namespace_ident = NamespaceIdent::new(namespace.clone());
372    let table_ident = TableIdent::new(namespace_ident, table_name.clone());
373    let current_schema = current_table.metadata().current_schema_id();
374    let current_partition_spec = current_table.metadata().default_partition_spec_id();
375
376    match catalog.load_table(&table_ident).await {
377        Ok(table) => {
378            let reloaded_schema = table.metadata().current_schema_id();
379            let reloaded_partition_spec = table.metadata().default_partition_spec_id();
380            if reloaded_schema != current_schema {
381                return Err(anyhow::anyhow!(
382                    "Iceberg table '{}' schema changed during operation but schema evolution isn't supported, expected schema ID {}, got {}",
383                    table_name,
384                    current_schema,
385                    reloaded_schema
386                ));
387            }
388
389            if reloaded_partition_spec != current_partition_spec {
390                return Err(anyhow::anyhow!(
391                    "Iceberg table '{}' partition spec changed during operation but partition spec evolution isn't supported, expected partition spec ID {}, got {}",
392                    table_name,
393                    current_partition_spec,
394                    reloaded_partition_spec
395                ));
396            }
397
398            Ok(table)
399        }
400        Err(err) => Err(err).context("Failed to reload Iceberg table"),
401    }
402}
403
404/// Load an existing Iceberg table or create it if it doesn't exist.
405async fn load_or_create_table(
406    catalog: &dyn Catalog,
407    namespace: String,
408    table_name: String,
409    schema: &Schema,
410) -> anyhow::Result<iceberg::table::Table> {
411    let namespace_ident = NamespaceIdent::new(namespace.clone());
412    let table_ident = TableIdent::new(namespace_ident.clone(), table_name.clone());
413
414    // Try to load the table first
415    match catalog.load_table(&table_ident).await {
416        Ok(table) => {
417            // Table exists, return it
418            // TODO: Add proper schema evolution/validation to ensure compatibility
419            Ok(table)
420        }
421        Err(err) => {
422            if matches!(err.kind(), ErrorKind::TableNotFound { .. })
423                || err
424                    .message()
425                    .contains("Tried to load a table that does not exist")
426            {
427                // Table doesn't exist, create it
428                // Note: location is not specified, letting the catalog determine the default location
429                // based on its warehouse configuration
430                let table_creation = TableCreation::builder()
431                    .name(table_name.clone())
432                    .schema(schema.clone())
433                    // Use unpartitioned spec by default
434                    // TODO: Consider making partition spec configurable
435                    // .partition_spec(UnboundPartitionSpec::builder().build())
436                    .build();
437
438                catalog
439                    .create_table(&namespace_ident, table_creation)
440                    .await
441                    .with_context(|| {
442                        format!(
443                            "Failed to create Iceberg table '{}' in namespace '{}'",
444                            table_name, namespace
445                        )
446                    })
447            } else {
448                // Some other error occurred
449                Err(err).context("Failed to load Iceberg table")
450            }
451        }
452    }
453}
454
455/// Find the most recent Materialize frontier from Iceberg snapshots.
456/// We store the frontier in snapshot metadata to track where we left off after restarts.
457/// Snapshots with operation="replace" (compactions) don't have our metadata and are skipped.
458/// The input slice will be sorted by sequence number in descending order.
459fn retrieve_upper_from_snapshots(
460    snapshots: &mut [Arc<Snapshot>],
461) -> anyhow::Result<Option<(Antichain<Timestamp>, u64)>> {
462    snapshots.sort_by(|a, b| Ord::cmp(&b.sequence_number(), &a.sequence_number()));
463
464    for snapshot in snapshots {
465        let props = &snapshot.summary().additional_properties;
466        if let (Some(frontier_json), Some(sink_version_str)) =
467            (props.get("mz-frontier"), props.get("mz-sink-version"))
468        {
469            let frontier: Vec<Timestamp> = serde_json::from_str(frontier_json)
470                .context("Failed to deserialize frontier from snapshot properties")?;
471            let frontier = Antichain::from_iter(frontier);
472
473            let sink_version = sink_version_str
474                .parse::<u64>()
475                .context("Failed to parse mz-sink-version from snapshot properties")?;
476
477            return Ok(Some((frontier, sink_version)));
478        }
479        if snapshot.summary().operation.as_str() != "replace" {
480            // This is a bad heuristic, but we have no real other way to identify compactions
481            // right now other than assume they will be the only operation writing "replace" operations.
482            // That means if we find a snapshot with some other operation, but no mz-frontier, we are in an
483            // inconsistent state and have to error out.
484            anyhow::bail!(
485                "Iceberg table is in an inconsistent state: snapshot {} has operation '{}' but is missing 'mz-frontier' property. Schema or partition spec evolution is not supported.",
486                snapshot.snapshot_id(),
487                snapshot.summary().operation.as_str(),
488            );
489        }
490    }
491
492    Ok(None)
493}
494
495/// Convert a Materialize RelationDesc into Arrow and Iceberg schemas.
496/// Returns the Arrow schema (with field IDs) for writing Parquet files,
497/// and the Iceberg schema for table creation/validation.
498fn relation_desc_to_iceberg_schema(
499    desc: &mz_repr::RelationDesc,
500) -> anyhow::Result<(ArrowSchema, SchemaRef)> {
501    let arrow_schema = mz_arrow_util::builder::desc_to_schema(desc)
502        .context("Failed to convert RelationDesc to Arrow schema")?;
503
504    let arrow_schema_with_ids = add_field_ids_to_arrow_schema(arrow_schema);
505
506    let iceberg_schema = arrow_schema_to_schema(&arrow_schema_with_ids)
507        .context("Failed to convert Arrow schema to Iceberg schema")?;
508
509    Ok((arrow_schema_with_ids, Arc::new(iceberg_schema)))
510}
511
512/// Build a new Arrow schema by adding an __op column to the existing schema.
513fn build_schema_with_op_column(schema: &ArrowSchema) -> ArrowSchema {
514    let mut fields: Vec<Arc<Field>> = schema.fields().iter().cloned().collect();
515    fields.push(Arc::new(Field::new("__op", DataType::Int32, false)));
516    ArrowSchema::new(fields)
517}
518
519/// Convert a Materialize DiffPair into an Arrow RecordBatch with an __op column.
520/// The __op column indicates whether each row is an insert (1) or delete (-1), which
521/// the DeltaWriter uses to generate the appropriate Iceberg data/delete files.
522fn row_to_recordbatch(
523    row: DiffPair<Row>,
524    schema: ArrowSchemaRef,
525    schema_with_op: ArrowSchemaRef,
526) -> anyhow::Result<RecordBatch> {
527    let mut builder = ArrowBuilder::new_with_schema(
528        Arc::clone(&schema),
529        DEFAULT_ARRAY_BUILDER_ITEM_CAPACITY,
530        DEFAULT_ARRAY_BUILDER_DATA_CAPACITY,
531    )
532    .context("Failed to create builder")?;
533
534    let mut op_values = Vec::new();
535
536    if let Some(before) = row.before {
537        builder
538            .add_row(&before)
539            .context("Failed to add delete row to builder")?;
540        op_values.push(-1i32); // Delete operation
541    }
542    if let Some(after) = row.after {
543        builder
544            .add_row(&after)
545            .context("Failed to add insert row to builder")?;
546        op_values.push(1i32); // Insert operation
547    }
548
549    let batch = builder
550        .to_record_batch()
551        .context("Failed to create record batch")?;
552
553    let mut columns: Vec<ArrayRef> = batch.columns().to_vec();
554    let op_column = Arc::new(Int32Array::from(op_values));
555    columns.push(op_column);
556
557    let batch_with_op = RecordBatch::try_new(schema_with_op, columns)
558        .context("Failed to create batch with op column")?;
559    Ok(batch_with_op)
560}
561
562/// Generate time-based batch boundaries for grouping writes into Iceberg snapshots.
563/// Batches are minted with configurable windows to balance write efficiency with latency.
564/// We maintain a sliding window of future batch descriptions so writers can start
565/// processing data even while earlier batches are still being written.
566fn mint_batch_descriptions<G, D>(
567    name: String,
568    sink_id: GlobalId,
569    input: &VecCollection<G, D, Diff>,
570    sink: &StorageSinkDesc<CollectionMetadata, Timestamp>,
571    connection: IcebergSinkConnection,
572    storage_configuration: StorageConfiguration,
573    initial_schema: SchemaRef,
574) -> (
575    VecCollection<G, D, Diff>,
576    Stream<G, (Antichain<Timestamp>, Antichain<Timestamp>)>,
577    Stream<G, Infallible>,
578    Stream<G, HealthStatusMessage>,
579    PressOnDropButton,
580)
581where
582    G: Scope<Timestamp = Timestamp>,
583    D: Clone + 'static,
584{
585    let scope = input.scope();
586    let name_for_error = name.clone();
587    let name_for_logging = name.clone();
588    let mut builder = OperatorBuilder::new(name, scope.clone());
589    let sink_version = sink.version;
590
591    let hashed_id = sink_id.hashed();
592    let is_active_worker = usize::cast_from(hashed_id) % scope.peers() == scope.index();
593    let (_, table_ready_stream) = builder.new_output::<CapacityContainerBuilder<_>>();
594    let (output, output_stream) = builder.new_output();
595    let (batch_desc_output, batch_desc_stream) =
596        builder.new_output::<CapacityContainerBuilder<_>>();
597    let mut input =
598        builder.new_input_for_many(&input.inner, Pipeline, [&output, &batch_desc_output]);
599
600    let as_of = sink.as_of.clone();
601    let commit_interval = sink
602        .commit_interval
603        .expect("the planner should have enforced this")
604        .clone();
605
606    let (button, errors): (_, Stream<G, Rc<anyhow::Error>>) = builder.build_fallible(move |caps| {
607        Box::pin(async move {
608            let [table_ready_capset, data_capset, capset]: &mut [_; 3] = caps.try_into().unwrap();
609            *data_capset = CapabilitySet::new();
610
611            if !is_active_worker {
612                *capset = CapabilitySet::new();
613                *data_capset = CapabilitySet::new();
614                *table_ready_capset = CapabilitySet::new();
615                while let Some(event) = input.next().await {
616                    match event {
617                        Event::Data([output_cap, _], mut data) => {
618                            output.give_container(&output_cap, &mut data);
619                        }
620                        Event::Progress(_) => {}
621                    }
622                }
623                return Ok(());
624            }
625
626            let catalog = connection
627                .catalog_connection
628                .connect(&storage_configuration, InTask::Yes)
629                .await
630                .context("Failed to connect to iceberg catalog")?;
631
632            let table = load_or_create_table(
633                catalog.as_ref(),
634                connection.namespace.clone(),
635                connection.table.clone(),
636                initial_schema.as_ref(),
637            )
638            .await?;
639            debug!(
640                ?sink_id,
641                %name_for_logging,
642                namespace = %connection.namespace,
643                table = %connection.table,
644                "iceberg mint loaded/created table"
645            );
646
647            *table_ready_capset = CapabilitySet::new();
648
649            let mut snapshots: Vec<_> = table.metadata().snapshots().cloned().collect();
650            let resume = retrieve_upper_from_snapshots(&mut snapshots)?;
651            let (resume_upper, resume_version) = match resume {
652                Some((f, v)) => (f, v),
653                None => (Antichain::from_elem(Timestamp::minimum()), 0),
654            };
655            debug!(
656                ?sink_id,
657                %name_for_logging,
658                resume_upper = %resume_upper.pretty(),
659                resume_version,
660                as_of = %as_of.pretty(),
661                "iceberg mint resume position loaded"
662            );
663
664            // The input has overcompacted if
665            let overcompacted =
666                // ..we have made some progress in the past
667                *resume_upper != [Timestamp::minimum()] &&
668                // ..but the since frontier is now beyond that
669                PartialOrder::less_than(&resume_upper, &as_of);
670
671            if overcompacted {
672                let err = format!(
673                    "{name_for_error}: input compacted past resume upper: as_of {}, resume_upper: {}",
674                    as_of.pretty(),
675                    resume_upper.pretty()
676                );
677                // This would normally be an assertion but because it can happen after a
678                // Materialize backup/restore we log an error so that it appears on Sentry but
679                // leaves the rest of the objects in the cluster unaffected.
680                return Err(anyhow::anyhow!("{err}"));
681            };
682
683            if resume_version > sink_version {
684                anyhow::bail!("Fenced off by newer sink version: resume_version {}, sink_version {}", resume_version, sink_version);
685            }
686
687            let mut initialized = false;
688            let mut observed_frontier;
689            let mut max_seen_ts: Option<Timestamp> = None;
690            // Track minted batches to maintain a sliding window of open batch descriptions.
691            // This is needed to know when to retire old batches and mint new ones.
692            // It's "sortedness" is derived from the monotonicity of batch descriptions,
693            // and the fact that we only ever push new descriptions to the back and pop from the front.
694            let mut minted_batches = VecDeque::new();
695            loop {
696                if let Some(event) = input.next().await {
697                    match event {
698                        Event::Data([output_cap, _], mut data) => {
699                            if !initialized {
700                                for (_, ts, _) in data.iter() {
701                                    match max_seen_ts.as_mut() {
702                                        Some(max) => {
703                                            if max.less_than(ts) {
704                                                *max = ts.clone();
705                                            }
706                                        }
707                                        None => {
708                                            max_seen_ts = Some(ts.clone());
709                                        }
710                                    }
711                                }
712                            }
713                            output.give_container(&output_cap, &mut data);
714                            continue;
715                        }
716                        Event::Progress(frontier) => {
717                            observed_frontier = frontier;
718                        }
719                    }
720                } else {
721                    return Ok(());
722                }
723
724                if !initialized {
725                    if observed_frontier.is_empty() {
726                        // Bounded inputs can close (frontier becomes empty) before we finish
727                        // initialization. For example, a loadgen source configured for a finite
728                        // dataset may emit all rows at time t and then immediately close. If we
729                        // saw any data, synthesize an upper one tick past the maximum timestamp
730                        // so we can mint a snapshot batch and commit it.
731                        if let Some(max_ts) = max_seen_ts.as_ref() {
732                            let synthesized_upper =
733                                Antichain::from_elem(max_ts.step_forward());
734                            debug!(
735                                ?sink_id,
736                                %name_for_logging,
737                                max_seen_ts = %max_ts,
738                                synthesized_upper = %synthesized_upper.pretty(),
739                                "iceberg mint input closed before initialization; using max seen ts"
740                            );
741                            observed_frontier = synthesized_upper;
742                        } else {
743                            debug!(
744                                ?sink_id,
745                                %name_for_logging,
746                                "iceberg mint input closed before initialization with no data"
747                            );
748                            // Input stream closed before initialization completed and no data arrived.
749                            return Ok(());
750                        }
751                    }
752
753                    // We only start minting after we've reached as_of and resume_upper to avoid
754                    // minting batches that would be immediately skipped.
755                    if PartialOrder::less_than(&observed_frontier, &resume_upper)
756                        || PartialOrder::less_than(&observed_frontier, &as_of)
757                    {
758                        continue;
759                    }
760
761                    let mut batch_descriptions = vec![];
762                    let mut current_upper = observed_frontier.clone();
763                    let current_upper_ts = current_upper.as_option().expect("frontier not empty").clone();
764
765                    // Create a catch-up batch from the later of resume_upper or as_of to current frontier.
766                    // We use the later of the two because:
767                    // - For fresh sinks: resume_upper = minimum, as_of = actual timestamp, data starts at as_of
768                    // - For resuming: as_of <= resume_upper (enforced by overcompaction check), data starts at resume_upper
769                    let batch_lower = if PartialOrder::less_than(&resume_upper, &as_of) {
770                        as_of.clone()
771                    } else {
772                        resume_upper.clone()
773                    };
774
775                    if batch_lower == current_upper {
776                        // Snapshot! as_of is exactly at the frontier. We still need to mint
777                        // a batch to create the snapshot, so we step the upper forward by one.
778                        current_upper = Antichain::from_elem(current_upper_ts.step_forward());
779                    }
780
781                    let batch_description = (batch_lower.clone(), current_upper.clone());
782                    debug!(
783                        ?sink_id,
784                        %name_for_logging,
785                        batch_lower = %batch_lower.pretty(),
786                        current_upper = %current_upper.pretty(),
787                        "iceberg mint initializing (catch-up batch)"
788                    );
789                    debug!(
790                        "{}: creating catch-up batch [{}, {})",
791                        name_for_logging,
792                        batch_lower.pretty(),
793                        current_upper.pretty()
794                    );
795                    batch_descriptions.push(batch_description);
796                    // Mint initial future batch descriptions at configurable intervals
797                    for i in 1..INITIAL_DESCRIPTIONS_TO_MINT {
798                        let duration_millis = commit_interval.as_millis()
799                            .checked_mul(u128::from(i))
800                            .expect("commit interval multiplication overflow");
801                        let duration_ts = Timestamp::new(
802                            u64::try_from(duration_millis)
803                                .expect("commit interval too large for u64"),
804                        );
805                        let desired_batch_upper = Antichain::from_elem(
806                            current_upper_ts.step_forward_by(&duration_ts),
807                        );
808
809                        let batch_description =
810                            (current_upper.clone(), desired_batch_upper.clone());
811                        debug!(
812                            "{}: minting future batch {}/{} [{}, {})",
813                            name_for_logging,
814                            i,
815                            INITIAL_DESCRIPTIONS_TO_MINT,
816                            current_upper.pretty(),
817                            desired_batch_upper.pretty()
818                        );
819                        current_upper = batch_description.1.clone();
820                        batch_descriptions.push(batch_description);
821                    }
822
823                    minted_batches.extend(batch_descriptions.clone());
824
825                    for desc in batch_descriptions {
826                        batch_desc_output.give(&capset[0], desc);
827                    }
828
829                    capset.downgrade(current_upper);
830
831                    initialized = true;
832                } else {
833                    if observed_frontier.is_empty() {
834                        // We're done!
835                        return Ok(());
836                    }
837                    // Maintain a sliding window: when the oldest batch becomes ready, retire it
838                    // and mint a new future batch to keep the pipeline full
839                    while let Some(oldest_desc) = minted_batches.front() {
840                        let oldest_upper = &oldest_desc.1;
841                        if !PartialOrder::less_equal(oldest_upper, &observed_frontier) {
842                            break;
843                        }
844
845                        let newest_upper = minted_batches.back().unwrap().1.clone();
846                        let new_lower = newest_upper.clone();
847                        let duration_ts = Timestamp::new(commit_interval.as_millis()
848                            .try_into()
849                            .expect("commit interval too large for u64"));
850                        let new_upper = Antichain::from_elem(newest_upper
851                            .as_option()
852                            .unwrap()
853                            .step_forward_by(&duration_ts));
854
855                        let new_batch_description = (new_lower.clone(), new_upper.clone());
856                        minted_batches.pop_front();
857                        minted_batches.push_back(new_batch_description.clone());
858
859                        batch_desc_output.give(&capset[0], new_batch_description);
860
861                        capset.downgrade(new_upper);
862                    }
863                }
864            }
865        })
866    });
867
868    let statuses = errors.map(|error| HealthStatusMessage {
869        id: None,
870        update: HealthStatusUpdate::halting(format!("{}", error.display_with_causes()), None),
871        namespace: StatusNamespace::Iceberg,
872    });
873    (
874        output_stream.as_collection(),
875        batch_desc_stream,
876        table_ready_stream,
877        statuses,
878        button.press_on_drop(),
879    )
880}
881
882#[derive(Clone, Debug, Serialize, Deserialize)]
883#[serde(try_from = "AvroDataFile", into = "AvroDataFile")]
884struct SerializableDataFile {
885    pub data_file: DataFile,
886    pub schema: Schema,
887}
888
889/// A wrapper around Iceberg's DataFile that implements Serialize and Deserialize.
890/// This is slightly complicated by the fact that Iceberg's DataFile doesn't implement
891/// these traits directly, so we serialize to/from Avro bytes (which Iceberg supports natively).
892/// The avro ser(de) also requires the Iceberg schema to be provided, so we include that as well.
893/// It is distinctly possible that this is overkill, but it avoids re-implementing
894/// Iceberg's serialization logic here.
895/// If at some point this becomes a serious overhead, we can revisit this decision.
896#[derive(Clone, Debug, Serialize, Deserialize)]
897struct AvroDataFile {
898    pub data_file: Vec<u8>,
899    /// Schema serialized as JSON bytes to avoid bincode issues with HashMap
900    pub schema: Vec<u8>,
901}
902
903impl From<SerializableDataFile> for AvroDataFile {
904    fn from(value: SerializableDataFile) -> Self {
905        let mut data_file = Vec::new();
906        write_data_files_to_avro(
907            &mut data_file,
908            [value.data_file],
909            &StructType::new(vec![]),
910            FormatVersion::V2,
911        )
912        .expect("serialization into buffer");
913        let schema = serde_json::to_vec(&value.schema).expect("schema serialization");
914        AvroDataFile { data_file, schema }
915    }
916}
917
918impl TryFrom<AvroDataFile> for SerializableDataFile {
919    type Error = String;
920
921    fn try_from(value: AvroDataFile) -> Result<Self, Self::Error> {
922        let schema: Schema = serde_json::from_slice(&value.schema)
923            .map_err(|e| format!("Failed to deserialize schema: {}", e))?;
924        let data_files = read_data_files_from_avro(
925            &mut &*value.data_file,
926            &schema,
927            0,
928            &StructType::new(vec![]),
929            FormatVersion::V2,
930        )
931        .map_err_to_string_with_causes()?;
932        let Some(data_file) = data_files.into_iter().next() else {
933            return Err("No DataFile found in Avro data".into());
934        };
935        Ok(SerializableDataFile { data_file, schema })
936    }
937}
938
939/// A DataFile along with its associated batch description (lower and upper bounds).
940#[derive(Clone, Debug, Serialize, Deserialize)]
941struct BoundedDataFile {
942    pub data_file: SerializableDataFile,
943    pub batch_desc: (Antichain<Timestamp>, Antichain<Timestamp>),
944}
945
946impl BoundedDataFile {
947    pub fn new(
948        file: DataFile,
949        schema: Schema,
950        batch_desc: (Antichain<Timestamp>, Antichain<Timestamp>),
951    ) -> Self {
952        Self {
953            data_file: SerializableDataFile {
954                data_file: file,
955                schema,
956            },
957            batch_desc,
958        }
959    }
960
961    pub fn batch_desc(&self) -> &(Antichain<Timestamp>, Antichain<Timestamp>) {
962        &self.batch_desc
963    }
964
965    pub fn data_file(&self) -> &DataFile {
966        &self.data_file.data_file
967    }
968
969    pub fn into_data_file(self) -> DataFile {
970        self.data_file.data_file
971    }
972}
973
974/// A set of DataFiles along with their associated batch descriptions.
975#[derive(Clone, Debug, Default)]
976struct BoundedDataFileSet {
977    pub data_files: Vec<BoundedDataFile>,
978}
979
980/// Write rows into Parquet data files bounded by batch descriptions.
981/// Rows are matched to batches by timestamp; if a batch description hasn't arrived yet,
982/// rows are stashed until it does. This allows batches to be minted ahead of data arrival.
983fn write_data_files<G>(
984    name: String,
985    input: VecCollection<G, (Option<Row>, DiffPair<Row>), Diff>,
986    batch_desc_input: &Stream<G, (Antichain<Timestamp>, Antichain<Timestamp>)>,
987    table_ready_stream: &Stream<G, Infallible>,
988    as_of: Antichain<Timestamp>,
989    connection: IcebergSinkConnection,
990    storage_configuration: StorageConfiguration,
991    materialize_arrow_schema: Arc<ArrowSchema>,
992    metrics: Arc<IcebergSinkMetrics>,
993    statistics: SinkStatistics,
994) -> (
995    Stream<G, BoundedDataFile>,
996    Stream<G, HealthStatusMessage>,
997    PressOnDropButton,
998)
999where
1000    G: Scope<Timestamp = Timestamp>,
1001{
1002    let scope = input.scope();
1003    let name_for_logging = name.clone();
1004    let mut builder = OperatorBuilder::new(name, scope.clone());
1005
1006    let (output, output_stream) = builder.new_output::<CapacityContainerBuilder<_>>();
1007
1008    let mut table_ready_input = builder.new_disconnected_input(table_ready_stream, Pipeline);
1009    let mut batch_desc_input =
1010        builder.new_input_for(&batch_desc_input.broadcast(), Pipeline, &output);
1011    let mut input = builder.new_disconnected_input(&input.inner, Pipeline);
1012
1013    let (button, errors) = builder.build_fallible(move |caps| {
1014        Box::pin(async move {
1015            let [capset]: &mut [_; 1] = caps.try_into().unwrap();
1016            let catalog = connection
1017                .catalog_connection
1018                .connect(&storage_configuration, InTask::Yes)
1019                .await
1020                .context("Failed to connect to iceberg catalog")?;
1021
1022            let namespace_ident = NamespaceIdent::new(connection.namespace.clone());
1023            let table_ident = TableIdent::new(namespace_ident, connection.table.clone());
1024            while let Some(_) = table_ready_input.next().await {
1025                // Wait for table to be ready
1026            }
1027            let table = catalog
1028                .load_table(&table_ident)
1029                .await
1030                .context("Failed to load Iceberg table")?;
1031
1032            let table_metadata = table.metadata().clone();
1033            let current_schema = Arc::clone(table_metadata.current_schema());
1034
1035            // Merge Materialize extension metadata into the Iceberg schema.
1036            // We need extension metadata for ArrowBuilder to work correctly (it uses
1037            // extension names to know how to handle different types like records vs arrays).
1038            let arrow_schema = Arc::new(
1039                merge_materialize_metadata_into_iceberg_schema(
1040                    materialize_arrow_schema.as_ref(),
1041                    current_schema.as_ref(),
1042                )
1043                .context("Failed to merge Materialize metadata into Iceberg schema")?,
1044            );
1045
1046            // Build schema_with_op by adding the __op column used by DeltaWriter.
1047            let schema_with_op = Arc::new(build_schema_with_op_column(&arrow_schema));
1048
1049            // WORKAROUND: S3 Tables catalog incorrectly sets location to the metadata file path
1050            // instead of the warehouse root. Strip off the /metadata/*.metadata.json suffix.
1051            // No clear way to detect this properly right now, so we use heuristics.
1052            let location = table_metadata.location();
1053            let corrected_location = match location.rsplit_once("/metadata/") {
1054                Some((a, b)) if b.ends_with(".metadata.json") => a,
1055                _ => location,
1056            };
1057
1058            let data_location = format!("{}/data", corrected_location);
1059            let location_generator = DefaultLocationGenerator::with_data_location(data_location);
1060
1061            // Add a unique suffix to avoid filename collisions across restarts and workers
1062            let unique_suffix = format!("-{}", uuid::Uuid::new_v4());
1063            let file_name_generator = DefaultFileNameGenerator::new(
1064                PARQUET_FILE_PREFIX.to_string(),
1065                Some(unique_suffix),
1066                iceberg::spec::DataFileFormat::Parquet,
1067            );
1068
1069            let file_io = table.file_io().clone();
1070
1071            let Some((_, equality_indices)) = connection.key_desc_and_indices else {
1072                return Err(anyhow::anyhow!(
1073                    "Iceberg sink requires key columns for equality deletes"
1074                ));
1075            };
1076
1077            let equality_ids: Vec<i32> = equality_indices
1078                .iter()
1079                .map(|u| i32::try_from(*u).map(|v| v + 1))
1080                .collect::<Result<Vec<i32>, _>>()
1081                .context("Failed to convert equality index to i32 (index too large)")?;
1082
1083            let writer_properties = WriterProperties::new();
1084
1085            let arrow_schema_for_closure = Arc::clone(&arrow_schema);
1086            let current_schema_for_closure = Arc::clone(&current_schema);
1087            let file_io_for_closure = file_io.clone();
1088            let location_generator_for_closure = location_generator.clone();
1089            let file_name_generator_for_closure = file_name_generator.clone();
1090            let equality_ids_for_closure = equality_ids.clone();
1091            let writer_properties_for_closure = writer_properties.clone();
1092
1093            let create_delta_writer = move |disable_seen_rows: bool| {
1094                let arrow_schema = Arc::clone(&arrow_schema_for_closure);
1095                let current_schema = Arc::clone(&current_schema_for_closure);
1096                let file_io = file_io_for_closure.clone();
1097                let location_generator = location_generator_for_closure.clone();
1098                let file_name_generator = file_name_generator_for_closure.clone();
1099                let equality_ids = equality_ids_for_closure.clone();
1100                let writer_properties = writer_properties_for_closure.clone();
1101
1102                async move {
1103                    let data_parquet_writer = ParquetWriterBuilder::new(
1104                        writer_properties.clone(),
1105                        Arc::clone(&current_schema),
1106                    )
1107                    .with_arrow_schema(Arc::clone(&arrow_schema))
1108                    .context("Arrow schema validation failed")?;
1109                    let data_rolling_writer = RollingFileWriterBuilder::new_with_default_file_size(
1110                        data_parquet_writer,
1111                        Arc::clone(&current_schema),
1112                        file_io.clone(),
1113                        location_generator.clone(),
1114                        file_name_generator.clone(),
1115                    );
1116                    let data_writer_builder = DataFileWriterBuilder::new(data_rolling_writer);
1117
1118                    let pos_arrow_schema = PositionDeleteWriterConfig::arrow_schema();
1119                    let pos_schema = Arc::new(arrow_schema_to_schema(&pos_arrow_schema).context(
1120                        "Failed to convert position delete Arrow schema to Iceberg schema",
1121                    )?);
1122                    let pos_config = PositionDeleteWriterConfig::new(None, 0, None);
1123                    let pos_parquet_writer =
1124                        ParquetWriterBuilder::new(writer_properties.clone(), pos_schema);
1125                    let pos_rolling_writer = RollingFileWriterBuilder::new_with_default_file_size(
1126                        pos_parquet_writer,
1127                        Arc::clone(&current_schema),
1128                        file_io.clone(),
1129                        location_generator.clone(),
1130                        file_name_generator.clone(),
1131                    );
1132                    let pos_delete_writer_builder =
1133                        PositionDeleteFileWriterBuilder::new(pos_rolling_writer, pos_config);
1134
1135                    let eq_config = EqualityDeleteWriterConfig::new(
1136                        equality_ids.clone(),
1137                        Arc::clone(&current_schema),
1138                    )
1139                    .context("Failed to create EqualityDeleteWriterConfig")?;
1140
1141                    let eq_schema = Arc::new(
1142                        arrow_schema_to_schema(eq_config.projected_arrow_schema_ref()).context(
1143                            "Failed to convert equality delete Arrow schema to Iceberg schema",
1144                        )?,
1145                    );
1146
1147                    let eq_parquet_writer =
1148                        ParquetWriterBuilder::new(writer_properties.clone(), eq_schema);
1149                    let eq_rolling_writer = RollingFileWriterBuilder::new_with_default_file_size(
1150                        eq_parquet_writer,
1151                        Arc::clone(&current_schema),
1152                        file_io.clone(),
1153                        location_generator.clone(),
1154                        file_name_generator.clone(),
1155                    );
1156                    let eq_delete_writer_builder =
1157                        EqualityDeleteFileWriterBuilder::new(eq_rolling_writer, eq_config);
1158
1159                    let mut builder = DeltaWriterBuilder::new(
1160                        data_writer_builder,
1161                        pos_delete_writer_builder,
1162                        eq_delete_writer_builder,
1163                        equality_ids.clone(),
1164                    );
1165
1166                    if disable_seen_rows {
1167                        builder = builder.with_max_seen_rows(0);
1168                    }
1169
1170                    builder
1171                        .build(None)
1172                        .await
1173                        .context("Failed to create DeltaWriter")
1174                }
1175            };
1176
1177            // Rows can arrive before their batch description due to dataflow parallelism.
1178            // Stash them until we know which batch they belong to.
1179            let mut stashed_rows: BTreeMap<Timestamp, Vec<(Option<Row>, DiffPair<Row>)>> =
1180                BTreeMap::new();
1181
1182            // Track batches currently being written. When a row arrives, we check if it belongs
1183            // to an in-flight batch. When frontiers advance to a batch's upper, we close the
1184            // writer and emit its data files downstream.
1185            // Antichains don't implement Ord, so we use a HashMap with tuple keys instead.
1186            #[allow(clippy::disallowed_types)]
1187            let mut in_flight_batches: std::collections::HashMap<
1188                (Antichain<Timestamp>, Antichain<Timestamp>),
1189                DeltaWriterType,
1190            > = std::collections::HashMap::new();
1191
1192            let mut batch_description_frontier = Antichain::from_elem(Timestamp::minimum());
1193            let mut processed_batch_description_frontier =
1194                Antichain::from_elem(Timestamp::minimum());
1195            let mut input_frontier = Antichain::from_elem(Timestamp::minimum());
1196            let mut processed_input_frontier = Antichain::from_elem(Timestamp::minimum());
1197
1198            // Track the minimum batch lower bound to prune data that's already committed
1199            let mut min_batch_lower: Option<Antichain<Timestamp>> = None;
1200
1201            while !(batch_description_frontier.is_empty() && input_frontier.is_empty()) {
1202                let mut staged_messages_since_flush: u64 = 0;
1203                tokio::select! {
1204                    _ = batch_desc_input.ready() => {},
1205                    _ = input.ready() => {}
1206                }
1207
1208                while let Some(event) = batch_desc_input.next_sync() {
1209                    match event {
1210                        Event::Data(_cap, data) => {
1211                            for batch_desc in data {
1212                                let (lower, upper) = &batch_desc;
1213
1214                                // Track the minimum batch lower bound (first batch received)
1215                                if min_batch_lower.is_none() {
1216                                    min_batch_lower = Some(lower.clone());
1217                                    debug!(
1218                                        "{}: set min_batch_lower to {}",
1219                                        name_for_logging,
1220                                        lower.pretty()
1221                                    );
1222
1223                                    // Prune any stashed rows that arrived before min_batch_lower (already committed)
1224                                    let to_remove: Vec<_> = stashed_rows
1225                                        .keys()
1226                                        .filter(|ts| {
1227                                            let ts_antichain = Antichain::from_elem((*ts).clone());
1228                                            PartialOrder::less_than(&ts_antichain, lower)
1229                                        })
1230                                        .cloned()
1231                                        .collect();
1232
1233                                    if !to_remove.is_empty() {
1234                                        let mut removed_count = 0;
1235                                        for ts in to_remove {
1236                                            if let Some(rows) = stashed_rows.remove(&ts) {
1237                                                removed_count += rows.len();
1238                                                for _ in &rows {
1239                                                    metrics.stashed_rows.dec();
1240                                                }
1241                                            }
1242                                        }
1243                                        debug!(
1244                                            "{}: pruned {} already-committed rows (< min_batch_lower)",
1245                                            name_for_logging,
1246                                            removed_count
1247                                        );
1248                                    }
1249                                }
1250
1251                                // Disable seen_rows tracking for snapshot batch to save memory
1252                                let is_snapshot = lower == &as_of;
1253                                debug!(
1254                                    "{}: received batch description [{}, {}), snapshot={}",
1255                                    name_for_logging,
1256                                    lower.pretty(),
1257                                    upper.pretty(),
1258                                    is_snapshot
1259                                );
1260                                let mut delta_writer = create_delta_writer(is_snapshot).await?;
1261                                // Drain any stashed rows that belong to this batch
1262                                let row_ts_keys: Vec<_> = stashed_rows.keys().cloned().collect();
1263                                let mut drained_count = 0;
1264                                for row_ts in row_ts_keys {
1265                                    let ts = Antichain::from_elem(row_ts.clone());
1266                                    if PartialOrder::less_equal(lower, &ts)
1267                                        && PartialOrder::less_than(&ts, upper)
1268                                    {
1269                                        if let Some(rows) = stashed_rows.remove(&row_ts) {
1270                                            drained_count += rows.len();
1271                                            for (_row, diff_pair) in rows {
1272                                                metrics.stashed_rows.dec();
1273                                                let record_batch = row_to_recordbatch(
1274                                                    diff_pair.clone(),
1275                                                    Arc::clone(&arrow_schema),
1276                                                    Arc::clone(&schema_with_op),
1277                                                )
1278                                                .context("failed to convert row to recordbatch")?;
1279                                                delta_writer.write(record_batch).await.context(
1280                                                    "Failed to write row to DeltaWriter",
1281                                                )?;
1282                                                staged_messages_since_flush += 1;
1283                                                if staged_messages_since_flush >= 10_000 {
1284                                                    statistics.inc_messages_staged_by(
1285                                                        staged_messages_since_flush,
1286                                                    );
1287                                                    staged_messages_since_flush = 0;
1288                                                }
1289                                            }
1290                                        }
1291                                    }
1292                                }
1293                                if drained_count > 0 {
1294                                    debug!(
1295                                        "{}: drained {} stashed rows into batch [{}, {})",
1296                                        name_for_logging,
1297                                        drained_count,
1298                                        lower.pretty(),
1299                                        upper.pretty()
1300                                    );
1301                                }
1302                                let prev =
1303                                    in_flight_batches.insert(batch_desc.clone(), delta_writer);
1304                                if prev.is_some() {
1305                                    anyhow::bail!(
1306                                        "Duplicate batch description received for description {:?}",
1307                                        batch_desc
1308                                    );
1309                                }
1310                            }
1311                        }
1312                        Event::Progress(frontier) => {
1313                            batch_description_frontier = frontier;
1314                        }
1315                    }
1316                }
1317
1318                let ready_events = std::iter::from_fn(|| input.next_sync()).collect_vec();
1319                for event in ready_events {
1320                    match event {
1321                        Event::Data(_cap, data) => {
1322                            let mut dropped_per_time = BTreeMap::new();
1323                            let mut stashed_per_time = BTreeMap::new();
1324                            for ((row, diff_pair), ts, _diff) in data {
1325                                let row_ts = ts.clone();
1326                                let ts_antichain = Antichain::from_elem(row_ts.clone());
1327                                let mut written = false;
1328                                // Try writing the row to any in-flight batch it belongs to...
1329                                for (batch_desc, delta_writer) in in_flight_batches.iter_mut() {
1330                                    let (lower, upper) = batch_desc;
1331                                    if PartialOrder::less_equal(lower, &ts_antichain)
1332                                        && PartialOrder::less_than(&ts_antichain, upper)
1333                                    {
1334                                        let record_batch = row_to_recordbatch(
1335                                            diff_pair.clone(),
1336                                            Arc::clone(&arrow_schema),
1337                                            Arc::clone(&schema_with_op),
1338                                        )
1339                                        .context("failed to convert row to recordbatch")?;
1340                                        delta_writer
1341                                            .write(record_batch)
1342                                            .await
1343                                            .context("Failed to write row to DeltaWriter")?;
1344                                        staged_messages_since_flush += 1;
1345                                        if staged_messages_since_flush >= 10_000 {
1346                                            statistics.inc_messages_staged_by(
1347                                                staged_messages_since_flush,
1348                                            );
1349                                            staged_messages_since_flush = 0;
1350                                        }
1351                                        written = true;
1352                                        break;
1353                                    }
1354                                }
1355                                if !written {
1356                                    // Drop data that's before the first batch we received (already committed)
1357                                    if let Some(ref min_lower) = min_batch_lower {
1358                                        if PartialOrder::less_than(&ts_antichain, min_lower) {
1359                                            dropped_per_time
1360                                                .entry(ts_antichain.into_option().unwrap())
1361                                                .and_modify(|c| *c += 1)
1362                                                .or_insert(1);
1363                                            continue;
1364                                        }
1365                                    }
1366
1367                                    stashed_per_time.entry(ts).and_modify(|c| *c += 1).or_insert(1);
1368                                    let entry = stashed_rows.entry(row_ts).or_default();
1369                                    metrics.stashed_rows.inc();
1370                                    entry.push((row, diff_pair));
1371                                }
1372                            }
1373
1374                            for (ts, count) in dropped_per_time {
1375                                debug!(
1376                                    "{}: dropped {} rows at timestamp {} (< min_batch_lower, already committed)",
1377                                    name_for_logging, count, ts
1378                                );
1379                            }
1380
1381                            for (ts, count) in stashed_per_time {
1382                                debug!(
1383                                    "{}: stashed {} rows at timestamp {} (waiting for batch description)",
1384                                    name_for_logging, count, ts
1385                                );
1386                            }
1387                        }
1388                        Event::Progress(frontier) => {
1389                            input_frontier = frontier;
1390                        }
1391                    }
1392                }
1393                if staged_messages_since_flush > 0 {
1394                    statistics.inc_messages_staged_by(staged_messages_since_flush);
1395                }
1396
1397                // Check if frontiers have advanced, which may unlock batches ready to close
1398                if PartialOrder::less_than(
1399                    &processed_batch_description_frontier,
1400                    &batch_description_frontier,
1401                ) || PartialOrder::less_than(&processed_input_frontier, &input_frontier)
1402                {
1403                    // Close batches whose upper is now in the past
1404                    // Upper bounds are exclusive, so we check if upper is less_equal to the frontier.
1405                    // Remember: a frontier at x means all timestamps less than x have been observed.
1406                    // Or, in other words we still might yet see timestamps at [x, infinity). X itself will
1407                    // be covered by the _next_ batches lower inclusive bound, so we can safely close the batch if its upper is <= x.
1408                    let ready_batches: Vec<_> = in_flight_batches
1409                        .extract_if(|(lower, upper), _| {
1410                            PartialOrder::less_than(lower, &batch_description_frontier)
1411                                && PartialOrder::less_equal(upper, &input_frontier)
1412                        })
1413                        .collect();
1414
1415                    if !ready_batches.is_empty() {
1416                        debug!(
1417                            "{}: closing {} batches (batch_frontier: {}, input_frontier: {})",
1418                            name_for_logging,
1419                            ready_batches.len(),
1420                            batch_description_frontier.pretty(),
1421                            input_frontier.pretty()
1422                        );
1423                        let mut max_upper = Antichain::from_elem(Timestamp::minimum());
1424                        for (desc, mut delta_writer) in ready_batches {
1425                            let close_started_at = Instant::now();
1426                            let data_files = delta_writer.close().await;
1427                            metrics
1428                                .writer_close_duration_seconds
1429                                .observe(close_started_at.elapsed().as_secs_f64());
1430                            let data_files = data_files.context("Failed to close DeltaWriter")?;
1431                            debug!(
1432                                "{}: closed batch [{}, {}), wrote {} files",
1433                                name_for_logging,
1434                                desc.0.pretty(),
1435                                desc.1.pretty(),
1436                                data_files.len()
1437                            );
1438                            for data_file in data_files {
1439                                match data_file.content_type() {
1440                                    iceberg::spec::DataContentType::Data => {
1441                                        metrics.data_files_written.inc();
1442                                    }
1443                                    iceberg::spec::DataContentType::PositionDeletes
1444                                    | iceberg::spec::DataContentType::EqualityDeletes => {
1445                                        metrics.delete_files_written.inc();
1446                                    }
1447                                }
1448                                statistics.inc_messages_staged_by(data_file.record_count());
1449                                statistics.inc_bytes_staged_by(data_file.file_size_in_bytes());
1450                                let file = BoundedDataFile::new(
1451                                    data_file,
1452                                    current_schema.as_ref().clone(),
1453                                    desc.clone(),
1454                                );
1455                                output.give(&capset[0], file);
1456                            }
1457
1458                            max_upper = max_upper.join(&desc.1);
1459                        }
1460
1461                        capset.downgrade(max_upper);
1462                    }
1463                    processed_batch_description_frontier.clone_from(&batch_description_frontier);
1464                    processed_input_frontier.clone_from(&input_frontier);
1465                }
1466            }
1467            Ok(())
1468        })
1469    });
1470
1471    let statuses = errors.map(|error| HealthStatusMessage {
1472        id: None,
1473        update: HealthStatusUpdate::halting(format!("{}", error.display_with_causes()), None),
1474        namespace: StatusNamespace::Iceberg,
1475    });
1476    (output_stream, statuses, button.press_on_drop())
1477}
1478
1479/// Commit completed batches to Iceberg as snapshots.
1480/// Batches are committed in timestamp order to ensure strong consistency guarantees downstream.
1481/// Each snapshot includes the Materialize frontier in its metadata for resume support.
1482fn commit_to_iceberg<G>(
1483    name: String,
1484    sink_id: GlobalId,
1485    sink_version: u64,
1486    batch_input: &Stream<G, BoundedDataFile>,
1487    batch_desc_input: &Stream<G, (Antichain<Timestamp>, Antichain<Timestamp>)>,
1488    table_ready_stream: &Stream<G, Infallible>,
1489    write_frontier: Rc<RefCell<Antichain<Timestamp>>>,
1490    connection: IcebergSinkConnection,
1491    storage_configuration: StorageConfiguration,
1492    write_handle: impl Future<
1493        Output = anyhow::Result<WriteHandle<SourceData, (), Timestamp, StorageDiff>>,
1494    > + 'static,
1495    metrics: Arc<IcebergSinkMetrics>,
1496    statistics: SinkStatistics,
1497) -> (Stream<G, HealthStatusMessage>, PressOnDropButton)
1498where
1499    G: Scope<Timestamp = Timestamp>,
1500{
1501    let scope = batch_input.scope();
1502    let mut builder = OperatorBuilder::new(name, scope.clone());
1503
1504    let hashed_id = sink_id.hashed();
1505    let is_active_worker = usize::cast_from(hashed_id) % scope.peers() == scope.index();
1506    let name_for_logging = format!("{sink_id}-commit-to-iceberg");
1507
1508    let mut input = builder.new_disconnected_input(batch_input, Exchange::new(move |_| hashed_id));
1509    let mut batch_desc_input =
1510        builder.new_disconnected_input(batch_desc_input, Exchange::new(move |_| hashed_id));
1511    let mut table_ready_input = builder.new_disconnected_input(table_ready_stream, Pipeline);
1512
1513    let (button, errors) = builder.build_fallible(move |_caps| {
1514        Box::pin(async move {
1515            if !is_active_worker {
1516                write_frontier.borrow_mut().clear();
1517                return Ok(());
1518            }
1519
1520            let catalog = connection
1521                .catalog_connection
1522                .connect(&storage_configuration, InTask::Yes)
1523                .await
1524                .context("Failed to connect to iceberg catalog")?;
1525
1526            let mut write_handle = write_handle.await?;
1527
1528            let namespace_ident = NamespaceIdent::new(connection.namespace.clone());
1529            let table_ident = TableIdent::new(namespace_ident, connection.table.clone());
1530            while let Some(_) = table_ready_input.next().await {
1531                // Wait for table to be ready
1532            }
1533            let mut table = catalog
1534                .load_table(&table_ident)
1535                .await
1536                .context("Failed to load Iceberg table")?;
1537
1538            #[allow(clippy::disallowed_types)]
1539            let mut batch_descriptions: std::collections::HashMap<
1540                (Antichain<Timestamp>, Antichain<Timestamp>),
1541                BoundedDataFileSet,
1542            > = std::collections::HashMap::new();
1543
1544
1545            let mut batch_description_frontier = Antichain::from_elem(Timestamp::minimum());
1546            let mut input_frontier = Antichain::from_elem(Timestamp::minimum());
1547
1548            while !(batch_description_frontier.is_empty() && input_frontier.is_empty()) {
1549                tokio::select! {
1550                    _ = batch_desc_input.ready() => {},
1551                    _ = input.ready() => {}
1552                }
1553
1554                while let Some(event) = batch_desc_input.next_sync() {
1555                    match event {
1556                        Event::Data(_cap, data) => {
1557                            for batch_desc in data {
1558                                let prev = batch_descriptions.insert(
1559                                    batch_desc,
1560                                    BoundedDataFileSet { data_files: vec![] },
1561                                );
1562                                if let Some(prev) = prev {
1563                                    anyhow::bail!(
1564                                        "Duplicate batch description received \
1565                                         in commit operator: {:?}",
1566                                        prev
1567                                    );
1568                                }
1569                            }
1570                        }
1571                        Event::Progress(frontier) => {
1572                            batch_description_frontier = frontier;
1573                        }
1574                    }
1575                }
1576
1577                let ready_events = std::iter::from_fn(|| input.next_sync()).collect_vec();
1578                for event in ready_events {
1579                    match event {
1580                        Event::Data(_cap, data) => {
1581                            for bounded_data_file in data {
1582                                let entry = batch_descriptions
1583                                    .entry(bounded_data_file.batch_desc().clone())
1584                                    .or_default();
1585                                entry.data_files.push(bounded_data_file);
1586                            }
1587                        }
1588                        Event::Progress(frontier) => {
1589                            input_frontier = frontier;
1590                        }
1591                    }
1592                }
1593
1594                // Collect batches whose data files have all arrived.
1595                // The writer emits all data files for a batch at a capability <= the batch's
1596                // lower bound, then downgrades its capability to the batch's upper bound.
1597                // So once the input frontier advances past lower, we know the writer has
1598                // finished emitting files for this batch and dropped its capability.
1599                let mut done_batches: Vec<_> = batch_descriptions
1600                    .keys()
1601                    .filter(|(lower, _upper)| {
1602                        PartialOrder::less_than(lower, &input_frontier)
1603                    })
1604                    .cloned()
1605                    .collect();
1606
1607                // Commit batches in timestamp order to maintain consistency
1608                done_batches.sort_by(|a, b| {
1609                    if PartialOrder::less_than(a, b) {
1610                        Ordering::Less
1611                    } else if PartialOrder::less_than(b, a) {
1612                        Ordering::Greater
1613                    } else {
1614                        Ordering::Equal
1615                    }
1616                });
1617
1618                for batch in done_batches {
1619                    let file_set = batch_descriptions.remove(&batch).unwrap();
1620
1621                    let mut data_files = vec![];
1622                    let mut delete_files = vec![];
1623                    // Track totals for committed statistics
1624                    let mut total_messages: u64 = 0;
1625                    let mut total_bytes: u64 = 0;
1626                    for file in file_set.data_files {
1627                        total_messages += file.data_file().record_count();
1628                        total_bytes += file.data_file().file_size_in_bytes();
1629                        match file.data_file().content_type() {
1630                            iceberg::spec::DataContentType::Data => {
1631                                data_files.push(file.into_data_file());
1632                            }
1633                            iceberg::spec::DataContentType::PositionDeletes |
1634                            iceberg::spec::DataContentType::EqualityDeletes => {
1635                                delete_files.push(file.into_data_file());
1636                            }
1637                        }
1638                    }
1639
1640                    debug!(
1641                        ?sink_id,
1642                        %name_for_logging,
1643                        lower = %batch.0.pretty(),
1644                        upper = %batch.1.pretty(),
1645                        data_files = data_files.len(),
1646                        delete_files = delete_files.len(),
1647                        total_messages,
1648                        total_bytes,
1649                        "iceberg commit applying batch"
1650                    );
1651
1652                    let instant = Instant::now();
1653
1654                    let frontier = batch.1.clone();
1655                    let tx = Transaction::new(&table);
1656
1657                    let frontier_json = serde_json::to_string(&frontier.elements())
1658                        .context("Failed to serialize frontier to JSON")?;
1659
1660                    // Store the frontier in snapshot metadata so we can resume from this point
1661                    let mut action = tx.row_delta().set_snapshot_properties(
1662                        vec![
1663                            ("mz-sink-id".to_string(), sink_id.to_string()),
1664                            ("mz-frontier".to_string(), frontier_json),
1665                            ("mz-sink-version".to_string(), sink_version.to_string()),
1666                        ].into_iter().collect()
1667                    ).with_check_duplicate(false);
1668
1669                    if !data_files.is_empty() || !delete_files.is_empty() {
1670                        action = action.add_data_files(data_files).add_delete_files(delete_files);
1671                    }
1672
1673                    let tx = action.apply(tx).context(
1674                        "Failed to apply data file addition to iceberg table transaction",
1675                    )?;
1676
1677                    let commit_result = Retry::default().max_tries(5).retry_async(|_| async {
1678                        let new_table = tx.clone().commit(catalog.as_ref()).await;
1679                        match new_table {
1680                            Err(e) if matches!(e.kind(), ErrorKind::CatalogCommitConflicts) => {
1681                                metrics.commit_conflicts.inc();
1682                                let table = reload_table(
1683                                    catalog.as_ref(),
1684                                    connection.namespace.clone(),
1685                                    connection.table.clone(),
1686                                    table.clone(),
1687                                ).await;
1688                                let table = match table {
1689                                    Ok(table) => table,
1690                                    Err(e) => return RetryResult::RetryableErr(anyhow!(e)),
1691                                };
1692
1693                                let mut snapshots: Vec<_> =
1694                                    table.metadata().snapshots().cloned().collect();
1695                                let last = retrieve_upper_from_snapshots(&mut snapshots);
1696                                let last = match last {
1697                                    Ok(val) => val,
1698                                    Err(e) => return RetryResult::RetryableErr(anyhow!(e)),
1699                                };
1700
1701                                // Check if another writer has advanced the frontier beyond ours (fencing check)
1702                                if let Some((last_frontier, _last_version)) = last {
1703                                    if PartialOrder::less_equal(&frontier, &last_frontier) {
1704                                        return RetryResult::FatalErr(anyhow!(
1705                                            "Iceberg table '{}' has been modified by another writer. Current frontier: {:?}, last frontier: {:?}.",
1706                                            connection.table,
1707                                            frontier,
1708                                            last_frontier,
1709                                        ));
1710                                    }
1711                                }
1712
1713                                RetryResult::Ok(table)
1714                            }
1715                            Err(e) => {
1716                                metrics.commit_failures.inc();
1717                                RetryResult::RetryableErr(anyhow!(e))
1718                            },
1719                            Ok(table) => RetryResult::Ok(table)
1720                        }
1721                    }).await.context("failed to commit to iceberg");
1722                    let duration = instant.elapsed();
1723                    metrics
1724                        .commit_duration_seconds
1725                        .observe(duration.as_secs_f64());
1726                    table = commit_result?;
1727
1728                    debug!(
1729                        ?sink_id,
1730                        %name_for_logging,
1731                        lower = %batch.0.pretty(),
1732                        upper = %batch.1.pretty(),
1733                        total_messages,
1734                        total_bytes,
1735                        ?duration,
1736                        "iceberg commit applied batch"
1737                    );
1738
1739                    metrics.snapshots_committed.inc();
1740                    statistics.inc_messages_committed_by(total_messages);
1741                    statistics.inc_bytes_committed_by(total_bytes);
1742
1743                    let mut expect_upper = write_handle.shared_upper();
1744                    loop {
1745                        if PartialOrder::less_equal(&frontier, &expect_upper) {
1746                            // The frontier has already been advanced as far as necessary.
1747                            break;
1748                        }
1749
1750                        const EMPTY: &[((SourceData, ()), Timestamp, StorageDiff)] = &[];
1751                        match write_handle
1752                            .compare_and_append(EMPTY, expect_upper, frontier.clone())
1753                            .await
1754                            .expect("valid usage")
1755                        {
1756                            Ok(()) => break,
1757                            Err(mismatch) => {
1758                                expect_upper = mismatch.current;
1759                            }
1760                        }
1761                    }
1762                    write_frontier.borrow_mut().clone_from(&frontier);
1763
1764                }
1765            }
1766
1767
1768            Ok(())
1769        })
1770    });
1771
1772    let statuses = errors.map(|error| HealthStatusMessage {
1773        id: None,
1774        update: HealthStatusUpdate::halting(format!("{}", error.display_with_causes()), None),
1775        namespace: StatusNamespace::Iceberg,
1776    });
1777
1778    (statuses, button.press_on_drop())
1779}
1780
1781impl<G: Scope<Timestamp = Timestamp>> SinkRender<G> for IcebergSinkConnection {
1782    fn get_key_indices(&self) -> Option<&[usize]> {
1783        self.key_desc_and_indices
1784            .as_ref()
1785            .map(|(_, indices)| indices.as_slice())
1786    }
1787
1788    fn get_relation_key_indices(&self) -> Option<&[usize]> {
1789        self.relation_key_indices.as_deref()
1790    }
1791
1792    fn render_sink(
1793        &self,
1794        storage_state: &mut StorageState,
1795        sink: &StorageSinkDesc<CollectionMetadata, Timestamp>,
1796        sink_id: GlobalId,
1797        input: VecCollection<G, (Option<Row>, DiffPair<Row>), Diff>,
1798        _err_collection: VecCollection<G, DataflowError, Diff>,
1799    ) -> (Stream<G, HealthStatusMessage>, Vec<PressOnDropButton>) {
1800        let mut scope = input.scope();
1801
1802        let write_handle = {
1803            let persist = Arc::clone(&storage_state.persist_clients);
1804            let shard_meta = sink.to_storage_metadata.clone();
1805            async move {
1806                let client = persist.open(shard_meta.persist_location).await?;
1807                let handle = client
1808                    .open_writer(
1809                        shard_meta.data_shard,
1810                        Arc::new(shard_meta.relation_desc),
1811                        Arc::new(UnitSchema),
1812                        Diagnostics::from_purpose("sink handle"),
1813                    )
1814                    .await?;
1815                Ok(handle)
1816            }
1817        };
1818
1819        let write_frontier = Rc::new(RefCell::new(Antichain::from_elem(Timestamp::minimum())));
1820        storage_state
1821            .sink_write_frontiers
1822            .insert(sink_id, Rc::clone(&write_frontier));
1823
1824        let (arrow_schema_with_ids, iceberg_schema) =
1825            match relation_desc_to_iceberg_schema(&sink.from_desc) {
1826                Ok(schemas) => schemas,
1827                Err(err) => {
1828                    let error_stream = std::iter::once(HealthStatusMessage {
1829                        id: None,
1830                        update: HealthStatusUpdate::halting(
1831                            format!("{}", err.display_with_causes()),
1832                            None,
1833                        ),
1834                        namespace: StatusNamespace::Iceberg,
1835                    })
1836                    .to_stream(&mut scope);
1837                    return (error_stream, vec![]);
1838                }
1839            };
1840
1841        let metrics = Arc::new(
1842            storage_state
1843                .metrics
1844                .get_iceberg_sink_metrics(sink_id, scope.index()),
1845        );
1846
1847        let statistics = storage_state
1848            .aggregated_statistics
1849            .get_sink(&sink_id)
1850            .expect("statistics initialized")
1851            .clone();
1852
1853        let connection_for_minter = self.clone();
1854        let (minted_input, batch_descriptions, table_ready, mint_status, mint_button) =
1855            mint_batch_descriptions(
1856                format!("{sink_id}-iceberg-mint"),
1857                sink_id,
1858                &input,
1859                sink,
1860                connection_for_minter,
1861                storage_state.storage_configuration.clone(),
1862                Arc::clone(&iceberg_schema),
1863            );
1864
1865        let connection_for_writer = self.clone();
1866        let (datafiles, write_status, write_button) = write_data_files(
1867            format!("{sink_id}-write-data-files"),
1868            minted_input,
1869            &batch_descriptions,
1870            &table_ready,
1871            sink.as_of.clone(),
1872            connection_for_writer,
1873            storage_state.storage_configuration.clone(),
1874            Arc::new(arrow_schema_with_ids.clone()),
1875            Arc::clone(&metrics),
1876            statistics.clone(),
1877        );
1878
1879        let connection_for_committer = self.clone();
1880        let (commit_status, commit_button) = commit_to_iceberg(
1881            format!("{sink_id}-commit-to-iceberg"),
1882            sink_id,
1883            sink.version,
1884            &datafiles,
1885            &batch_descriptions,
1886            &table_ready,
1887            Rc::clone(&write_frontier),
1888            connection_for_committer,
1889            storage_state.storage_configuration.clone(),
1890            write_handle,
1891            Arc::clone(&metrics),
1892            statistics,
1893        );
1894
1895        let running_status = Some(HealthStatusMessage {
1896            id: None,
1897            update: HealthStatusUpdate::running(),
1898            namespace: StatusNamespace::Iceberg,
1899        })
1900        .to_stream(&mut scope);
1901
1902        let statuses =
1903            scope.concatenate([running_status, mint_status, write_status, commit_status]);
1904
1905        (statuses, vec![mint_button, write_button, commit_button])
1906    }
1907}