Skip to main content

mz_storage/sink/
iceberg.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Iceberg sink implementation.
11//!
12//! This code renders a [`IcebergSinkConnection`] into a dataflow that writes
13//! data to an Iceberg table. The dataflow consists of three operators:
14//!
15//! ```text
16//!        ┏━━━━━━━━━━━━━━┓
17//!        ┃   persist    ┃
18//!        ┃    source    ┃
19//!        ┗━━━━━━┯━━━━━━━┛
20//!               │ row data, the input to this module
21//!               │
22//!        ┏━━━━━━v━━━━━━━┓
23//!        ┃     mint     ┃ (single worker)
24//!        ┃    batch     ┃ loads/creates the Iceberg table,
25//!        ┃ descriptions ┃ determines resume upper
26//!        ┗━━━┯━━━━━┯━━━━┛
27//!            │     │ batch descriptions (broadcast)
28//!       rows │     ├─────────────────────────┐
29//!            │     │                         │
30//!        ┏━━━v━━━━━v━━━━┓    ╭─────────────╮ │
31//!        ┃    write     ┃───>│ S3 / object │ │
32//!        ┃  data files  ┃    │   storage   │ │
33//!        ┗━━━━━━┯━━━━━━━┛    ╰─────────────╯ │
34//!               │ file metadata              │
35//!               │                            │
36//!        ┏━━━━━━v━━━━━━━━━━━━━━━━━━━━━━━━━━━━v┓
37//!        ┃           commit to               ┃ (single worker)
38//!        ┃             iceberg               ┃
39//!        ┗━━━━━━━━━━━━━━┯━━━━━━━━━━━━━━━━━━━━━┛
40//!                      │
41//!              ╭───────v───────╮
42//!              │ Iceberg table │
43//!              │  (snapshots)  │
44//!              ╰───────────────╯
45//! ```
46//! # Minting batch descriptions
47//! The "mint batch descriptions" operator is responsible for generating
48//! time-based batch boundaries that group writes into Iceberg snapshots.
49//! It maintains a sliding window of future batch descriptions so that
50//! writers can start processing data even while earlier batches are still being written.
51//! Knowing the batch boundaries ahead of time is important because we need to
52//! be able to make the claim that all data files written for a given batch
53//! include all data up to the upper `t` but not beyond it.
54//! This could be trivially achieved by waiting for all data to arrive up to a certain
55//! frontier, but that would prevent us from streaming writes out to object storage
56//! until the entire batch is complete, which would increase latency and reduce throughput.
57//!
58//! # Writing data files
59//! The "write data files" operator receives rows along with batch descriptions.
60//! It matches rows to batches by timestamp; if a batch description hasn't arrived yet,
61//! rows are stashed until it does. This allows batches to be minted ahead of data arrival.
62//! The operator uses an Iceberg `DeltaWriter` to write Parquet data files
63//! (and position delete files if necessary) to object storage.
64//! It outputs metadata about the written files along with their batch descriptions
65//! for the commit operator to consume.
66//!
67//! # Committing to Iceberg
68//! The "commit to iceberg" operator receives metadata about written data files
69//! along with their batch descriptions. It groups files by batch and creates
70//! Iceberg snapshots that include all files for each batch. It updates the Iceberg
71//! table's metadata to reflect the new snapshots, including updating the
72//! `mz-frontier` property to track progress.
73
74use std::cmp::Ordering;
75use std::collections::{BTreeMap, VecDeque};
76use std::convert::Infallible;
77use std::time::Instant;
78use std::{cell::RefCell, rc::Rc, sync::Arc};
79
80use anyhow::{Context, anyhow};
81use arrow::array::{ArrayRef, Int32Array, RecordBatch};
82use arrow::datatypes::{DataType, Field, Schema as ArrowSchema, SchemaRef as ArrowSchemaRef};
83use differential_dataflow::lattice::Lattice;
84use differential_dataflow::{AsCollection, Hashable, VecCollection};
85use futures::StreamExt;
86use iceberg::ErrorKind;
87use iceberg::arrow::{arrow_schema_to_schema, schema_to_arrow_schema};
88use iceberg::spec::{
89    DataFile, FormatVersion, Snapshot, StructType, read_data_files_from_avro,
90    write_data_files_to_avro,
91};
92use iceberg::spec::{Schema, SchemaRef};
93use iceberg::table::Table;
94use iceberg::transaction::{ApplyTransactionAction, Transaction};
95use iceberg::writer::base_writer::data_file_writer::DataFileWriter;
96use iceberg::writer::base_writer::data_file_writer::DataFileWriterBuilder;
97use iceberg::writer::base_writer::equality_delete_writer::{
98    EqualityDeleteFileWriter, EqualityDeleteFileWriterBuilder, EqualityDeleteWriterConfig,
99};
100use iceberg::writer::base_writer::position_delete_writer::PositionDeleteFileWriter;
101use iceberg::writer::base_writer::position_delete_writer::{
102    PositionDeleteFileWriterBuilder, PositionDeleteWriterConfig,
103};
104use iceberg::writer::combined_writer::delta_writer::{DeltaWriter, DeltaWriterBuilder};
105use iceberg::writer::file_writer::ParquetWriterBuilder;
106use iceberg::writer::file_writer::location_generator::{
107    DefaultFileNameGenerator, DefaultLocationGenerator,
108};
109use iceberg::writer::file_writer::rolling_writer::RollingFileWriterBuilder;
110use iceberg::writer::{IcebergWriter, IcebergWriterBuilder};
111use iceberg::{Catalog, NamespaceIdent, TableCreation, TableIdent};
112use itertools::Itertools;
113use mz_arrow_util::builder::{ARROW_EXTENSION_NAME_KEY, ArrowBuilder};
114use mz_interchange::avro::DiffPair;
115use mz_ore::cast::CastFrom;
116use mz_ore::error::ErrorExt;
117use mz_ore::future::InTask;
118use mz_ore::result::ResultExt;
119use mz_ore::retry::{Retry, RetryResult};
120use mz_persist_client::Diagnostics;
121use mz_persist_client::write::WriteHandle;
122use mz_persist_types::codec_impls::UnitSchema;
123use mz_repr::{Diff, GlobalId, Row, Timestamp};
124use mz_storage_types::StorageDiff;
125use mz_storage_types::configuration::StorageConfiguration;
126use mz_storage_types::controller::CollectionMetadata;
127use mz_storage_types::errors::DataflowError;
128use mz_storage_types::sinks::{IcebergSinkConnection, StorageSinkDesc};
129use mz_storage_types::sources::SourceData;
130use mz_timely_util::antichain::AntichainExt;
131use mz_timely_util::builder_async::{Event, OperatorBuilder, PressOnDropButton};
132use parquet::arrow::PARQUET_FIELD_ID_META_KEY;
133use parquet::file::properties::WriterProperties;
134use serde::{Deserialize, Serialize};
135use timely::PartialOrder;
136use timely::container::CapacityContainerBuilder;
137use timely::dataflow::channels::pact::{Exchange, Pipeline};
138use timely::dataflow::operators::vec::{Broadcast, Map, ToStream};
139use timely::dataflow::operators::{CapabilitySet, Concatenate};
140use timely::dataflow::{Scope, StreamVec};
141use timely::progress::{Antichain, Timestamp as _};
142use tracing::debug;
143
144use crate::healthcheck::{HealthStatusMessage, HealthStatusUpdate, StatusNamespace};
145use crate::metrics::sink::iceberg::IcebergSinkMetrics;
146use crate::render::sinks::SinkRender;
147use crate::statistics::SinkStatistics;
148use crate::storage_state::StorageState;
149
150/// Set the default capacity for the array builders inside the ArrowBuilder. This is the
151/// number of items each builder can hold before it needs to allocate more memory.
152const DEFAULT_ARRAY_BUILDER_ITEM_CAPACITY: usize = 1024;
153/// Set the default buffer capacity for the string and binary array builders inside the
154/// ArrowBuilder. This is the number of bytes each builder can hold before it needs to allocate
155/// more memory.
156const DEFAULT_ARRAY_BUILDER_DATA_CAPACITY: usize = 1024;
157
158/// The prefix for Parquet files written by this sink.
159const PARQUET_FILE_PREFIX: &str = "mz_data";
160/// The number of batch descriptions to mint ahead of the observed frontier. This determines how
161/// many batches we have in-flight at any given time.
162const INITIAL_DESCRIPTIONS_TO_MINT: u64 = 3;
163
164type DeltaWriterType = DeltaWriter<
165    DataFileWriter<ParquetWriterBuilder, DefaultLocationGenerator, DefaultFileNameGenerator>,
166    PositionDeleteFileWriter<
167        ParquetWriterBuilder,
168        DefaultLocationGenerator,
169        DefaultFileNameGenerator,
170    >,
171    EqualityDeleteFileWriter<
172        ParquetWriterBuilder,
173        DefaultLocationGenerator,
174        DefaultFileNameGenerator,
175    >,
176>;
177
178/// Add Parquet field IDs to an Arrow schema. Iceberg requires field IDs in the
179/// Parquet metadata for schema evolution tracking. Field IDs are assigned
180/// recursively to all nested fields (structs, lists, maps) using a depth-first,
181/// pre-order traversal.
182fn add_field_ids_to_arrow_schema(schema: ArrowSchema) -> ArrowSchema {
183    let mut next_field_id = 1i32;
184    let fields: Vec<Field> = schema
185        .fields()
186        .iter()
187        .map(|field| add_field_ids_recursive(field, &mut next_field_id))
188        .collect();
189    ArrowSchema::new(fields).with_metadata(schema.metadata().clone())
190}
191
192/// Recursively add field IDs to a field and all its nested children.
193fn add_field_ids_recursive(field: &Field, next_id: &mut i32) -> Field {
194    let current_id = *next_id;
195    *next_id += 1;
196
197    let mut metadata = field.metadata().clone();
198    metadata.insert(
199        PARQUET_FIELD_ID_META_KEY.to_string(),
200        current_id.to_string(),
201    );
202
203    let new_data_type = add_field_ids_to_datatype(field.data_type(), next_id);
204
205    Field::new(field.name(), new_data_type, field.is_nullable()).with_metadata(metadata)
206}
207
208/// Add field IDs to nested fields within a DataType.
209fn add_field_ids_to_datatype(data_type: &DataType, next_id: &mut i32) -> DataType {
210    match data_type {
211        DataType::Struct(fields) => {
212            let new_fields: Vec<Field> = fields
213                .iter()
214                .map(|f| add_field_ids_recursive(f, next_id))
215                .collect();
216            DataType::Struct(new_fields.into())
217        }
218        DataType::List(element_field) => {
219            let new_element = add_field_ids_recursive(element_field, next_id);
220            DataType::List(Arc::new(new_element))
221        }
222        DataType::LargeList(element_field) => {
223            let new_element = add_field_ids_recursive(element_field, next_id);
224            DataType::LargeList(Arc::new(new_element))
225        }
226        DataType::Map(entries_field, sorted) => {
227            let new_entries = add_field_ids_recursive(entries_field, next_id);
228            DataType::Map(Arc::new(new_entries), *sorted)
229        }
230        _ => data_type.clone(),
231    }
232}
233
234/// Merge Materialize extension metadata into Iceberg's Arrow schema.
235/// This uses Iceberg's data types (e.g. Utf8) and field IDs while preserving
236/// Materialize's extension names for ArrowBuilder compatibility.
237/// Handles nested types (structs, lists, maps) recursively.
238fn merge_materialize_metadata_into_iceberg_schema(
239    materialize_arrow_schema: &ArrowSchema,
240    iceberg_schema: &Schema,
241) -> anyhow::Result<ArrowSchema> {
242    // First, convert Iceberg schema to Arrow (this gives us the correct data types)
243    let iceberg_arrow_schema = schema_to_arrow_schema(iceberg_schema)
244        .context("Failed to convert Iceberg schema to Arrow schema")?;
245
246    // Now merge in the Materialize extension metadata
247    let fields: Vec<Field> = iceberg_arrow_schema
248        .fields()
249        .iter()
250        .map(|iceberg_field| {
251            // Find the corresponding Materialize field by name to get extension metadata
252            let mz_field = materialize_arrow_schema
253                .field_with_name(iceberg_field.name())
254                .with_context(|| {
255                    format!(
256                        "Field '{}' not found in Materialize schema",
257                        iceberg_field.name()
258                    )
259                })?;
260
261            merge_field_metadata_recursive(iceberg_field, Some(mz_field))
262        })
263        .collect::<anyhow::Result<Vec<_>>>()?;
264
265    Ok(ArrowSchema::new(fields).with_metadata(iceberg_arrow_schema.metadata().clone()))
266}
267
268/// Recursively merge Materialize extension metadata into an Iceberg field.
269fn merge_field_metadata_recursive(
270    iceberg_field: &Field,
271    mz_field: Option<&Field>,
272) -> anyhow::Result<Field> {
273    // Start with Iceberg field's metadata (which includes field IDs)
274    let mut metadata = iceberg_field.metadata().clone();
275
276    // Add Materialize extension name if available
277    if let Some(mz_f) = mz_field {
278        if let Some(extension_name) = mz_f.metadata().get(ARROW_EXTENSION_NAME_KEY) {
279            metadata.insert(ARROW_EXTENSION_NAME_KEY.to_string(), extension_name.clone());
280        }
281    }
282
283    // Recursively process nested types
284    let new_data_type = match iceberg_field.data_type() {
285        DataType::Struct(iceberg_fields) => {
286            let mz_struct_fields = match mz_field {
287                Some(f) => match f.data_type() {
288                    DataType::Struct(fields) => Some(fields),
289                    other => anyhow::bail!(
290                        "Type mismatch for field '{}': Iceberg schema has Struct, but Materialize schema has {:?}",
291                        iceberg_field.name(),
292                        other
293                    ),
294                },
295                None => None,
296            };
297
298            let new_fields: Vec<Field> = iceberg_fields
299                .iter()
300                .map(|iceberg_inner| {
301                    let mz_inner = mz_struct_fields.and_then(|fields| {
302                        fields.iter().find(|f| f.name() == iceberg_inner.name())
303                    });
304                    merge_field_metadata_recursive(iceberg_inner, mz_inner.map(|f| f.as_ref()))
305                })
306                .collect::<anyhow::Result<Vec<_>>>()?;
307
308            DataType::Struct(new_fields.into())
309        }
310        DataType::List(iceberg_element) => {
311            let mz_element = match mz_field {
312                Some(f) => match f.data_type() {
313                    DataType::List(element) => Some(element.as_ref()),
314                    other => anyhow::bail!(
315                        "Type mismatch for field '{}': Iceberg schema has List, but Materialize schema has {:?}",
316                        iceberg_field.name(),
317                        other
318                    ),
319                },
320                None => None,
321            };
322            let new_element = merge_field_metadata_recursive(iceberg_element, mz_element)?;
323            DataType::List(Arc::new(new_element))
324        }
325        DataType::LargeList(iceberg_element) => {
326            let mz_element = match mz_field {
327                Some(f) => match f.data_type() {
328                    DataType::LargeList(element) => Some(element.as_ref()),
329                    other => anyhow::bail!(
330                        "Type mismatch for field '{}': Iceberg schema has LargeList, but Materialize schema has {:?}",
331                        iceberg_field.name(),
332                        other
333                    ),
334                },
335                None => None,
336            };
337            let new_element = merge_field_metadata_recursive(iceberg_element, mz_element)?;
338            DataType::LargeList(Arc::new(new_element))
339        }
340        DataType::Map(iceberg_entries, sorted) => {
341            let mz_entries = match mz_field {
342                Some(f) => match f.data_type() {
343                    DataType::Map(entries, _) => Some(entries.as_ref()),
344                    other => anyhow::bail!(
345                        "Type mismatch for field '{}': Iceberg schema has Map, but Materialize schema has {:?}",
346                        iceberg_field.name(),
347                        other
348                    ),
349                },
350                None => None,
351            };
352            let new_entries = merge_field_metadata_recursive(iceberg_entries, mz_entries)?;
353            DataType::Map(Arc::new(new_entries), *sorted)
354        }
355        other => other.clone(),
356    };
357
358    Ok(Field::new(
359        iceberg_field.name(),
360        new_data_type,
361        iceberg_field.is_nullable(),
362    )
363    .with_metadata(metadata))
364}
365
366async fn reload_table(
367    catalog: &dyn Catalog,
368    namespace: String,
369    table_name: String,
370    current_table: Table,
371) -> anyhow::Result<Table> {
372    let namespace_ident = NamespaceIdent::new(namespace.clone());
373    let table_ident = TableIdent::new(namespace_ident, table_name.clone());
374    let current_schema = current_table.metadata().current_schema_id();
375    let current_partition_spec = current_table.metadata().default_partition_spec_id();
376
377    match catalog.load_table(&table_ident).await {
378        Ok(table) => {
379            let reloaded_schema = table.metadata().current_schema_id();
380            let reloaded_partition_spec = table.metadata().default_partition_spec_id();
381            if reloaded_schema != current_schema {
382                return Err(anyhow::anyhow!(
383                    "Iceberg table '{}' schema changed during operation but schema evolution isn't supported, expected schema ID {}, got {}",
384                    table_name,
385                    current_schema,
386                    reloaded_schema
387                ));
388            }
389
390            if reloaded_partition_spec != current_partition_spec {
391                return Err(anyhow::anyhow!(
392                    "Iceberg table '{}' partition spec changed during operation but partition spec evolution isn't supported, expected partition spec ID {}, got {}",
393                    table_name,
394                    current_partition_spec,
395                    reloaded_partition_spec
396                ));
397            }
398
399            Ok(table)
400        }
401        Err(err) => Err(err).context("Failed to reload Iceberg table"),
402    }
403}
404
405/// Load an existing Iceberg table or create it if it doesn't exist.
406async fn load_or_create_table(
407    catalog: &dyn Catalog,
408    namespace: String,
409    table_name: String,
410    schema: &Schema,
411) -> anyhow::Result<iceberg::table::Table> {
412    let namespace_ident = NamespaceIdent::new(namespace.clone());
413    let table_ident = TableIdent::new(namespace_ident.clone(), table_name.clone());
414
415    // Try to load the table first
416    match catalog.load_table(&table_ident).await {
417        Ok(table) => {
418            // Table exists, return it
419            // TODO: Add proper schema evolution/validation to ensure compatibility
420            Ok(table)
421        }
422        Err(err) => {
423            if matches!(err.kind(), ErrorKind::TableNotFound { .. })
424                || err
425                    .message()
426                    .contains("Tried to load a table that does not exist")
427            {
428                // Table doesn't exist, create it
429                // Note: location is not specified, letting the catalog determine the default location
430                // based on its warehouse configuration
431                let table_creation = TableCreation::builder()
432                    .name(table_name.clone())
433                    .schema(schema.clone())
434                    // Use unpartitioned spec by default
435                    // TODO: Consider making partition spec configurable
436                    // .partition_spec(UnboundPartitionSpec::builder().build())
437                    .build();
438
439                catalog
440                    .create_table(&namespace_ident, table_creation)
441                    .await
442                    .with_context(|| {
443                        format!(
444                            "Failed to create Iceberg table '{}' in namespace '{}'",
445                            table_name, namespace
446                        )
447                    })
448            } else {
449                // Some other error occurred
450                Err(err).context("Failed to load Iceberg table")
451            }
452        }
453    }
454}
455
456/// Find the most recent Materialize frontier from Iceberg snapshots.
457/// We store the frontier in snapshot metadata to track where we left off after restarts.
458/// Snapshots with operation="replace" (compactions) don't have our metadata and are skipped.
459/// The input slice will be sorted by sequence number in descending order.
460fn retrieve_upper_from_snapshots(
461    snapshots: &mut [Arc<Snapshot>],
462) -> anyhow::Result<Option<(Antichain<Timestamp>, u64)>> {
463    snapshots.sort_by(|a, b| Ord::cmp(&b.sequence_number(), &a.sequence_number()));
464
465    for snapshot in snapshots {
466        let props = &snapshot.summary().additional_properties;
467        if let (Some(frontier_json), Some(sink_version_str)) =
468            (props.get("mz-frontier"), props.get("mz-sink-version"))
469        {
470            let frontier: Vec<Timestamp> = serde_json::from_str(frontier_json)
471                .context("Failed to deserialize frontier from snapshot properties")?;
472            let frontier = Antichain::from_iter(frontier);
473
474            let sink_version = sink_version_str
475                .parse::<u64>()
476                .context("Failed to parse mz-sink-version from snapshot properties")?;
477
478            return Ok(Some((frontier, sink_version)));
479        }
480        if snapshot.summary().operation.as_str() != "replace" {
481            // This is a bad heuristic, but we have no real other way to identify compactions
482            // right now other than assume they will be the only operation writing "replace" operations.
483            // That means if we find a snapshot with some other operation, but no mz-frontier, we are in an
484            // inconsistent state and have to error out.
485            anyhow::bail!(
486                "Iceberg table is in an inconsistent state: snapshot {} has operation '{}' but is missing 'mz-frontier' property. Schema or partition spec evolution is not supported.",
487                snapshot.snapshot_id(),
488                snapshot.summary().operation.as_str(),
489            );
490        }
491    }
492
493    Ok(None)
494}
495
496/// Convert a Materialize RelationDesc into Arrow and Iceberg schemas.
497/// Returns the Arrow schema (with field IDs) for writing Parquet files,
498/// and the Iceberg schema for table creation/validation.
499fn relation_desc_to_iceberg_schema(
500    desc: &mz_repr::RelationDesc,
501) -> anyhow::Result<(ArrowSchema, SchemaRef)> {
502    let arrow_schema = mz_arrow_util::builder::desc_to_schema(desc)
503        .context("Failed to convert RelationDesc to Arrow schema")?;
504
505    let arrow_schema_with_ids = add_field_ids_to_arrow_schema(arrow_schema);
506
507    let iceberg_schema = arrow_schema_to_schema(&arrow_schema_with_ids)
508        .context("Failed to convert Arrow schema to Iceberg schema")?;
509
510    Ok((arrow_schema_with_ids, Arc::new(iceberg_schema)))
511}
512
513/// Build a new Arrow schema by adding an __op column to the existing schema.
514fn build_schema_with_op_column(schema: &ArrowSchema) -> ArrowSchema {
515    let mut fields: Vec<Arc<Field>> = schema.fields().iter().cloned().collect();
516    fields.push(Arc::new(Field::new("__op", DataType::Int32, false)));
517    ArrowSchema::new(fields)
518}
519
520/// Convert a Materialize DiffPair into an Arrow RecordBatch with an __op column.
521/// The __op column indicates whether each row is an insert (1) or delete (-1), which
522/// the DeltaWriter uses to generate the appropriate Iceberg data/delete files.
523fn row_to_recordbatch(
524    row: DiffPair<Row>,
525    schema: ArrowSchemaRef,
526    schema_with_op: ArrowSchemaRef,
527) -> anyhow::Result<RecordBatch> {
528    let mut builder = ArrowBuilder::new_with_schema(
529        Arc::clone(&schema),
530        DEFAULT_ARRAY_BUILDER_ITEM_CAPACITY,
531        DEFAULT_ARRAY_BUILDER_DATA_CAPACITY,
532    )
533    .context("Failed to create builder")?;
534
535    let mut op_values = Vec::new();
536
537    if let Some(before) = row.before {
538        builder
539            .add_row(&before)
540            .context("Failed to add delete row to builder")?;
541        op_values.push(-1i32); // Delete operation
542    }
543    if let Some(after) = row.after {
544        builder
545            .add_row(&after)
546            .context("Failed to add insert row to builder")?;
547        op_values.push(1i32); // Insert operation
548    }
549
550    let batch = builder
551        .to_record_batch()
552        .context("Failed to create record batch")?;
553
554    let mut columns: Vec<ArrayRef> = batch.columns().to_vec();
555    let op_column = Arc::new(Int32Array::from(op_values));
556    columns.push(op_column);
557
558    let batch_with_op = RecordBatch::try_new(schema_with_op, columns)
559        .context("Failed to create batch with op column")?;
560    Ok(batch_with_op)
561}
562
563/// Generate time-based batch boundaries for grouping writes into Iceberg snapshots.
564/// Batches are minted with configurable windows to balance write efficiency with latency.
565/// We maintain a sliding window of future batch descriptions so writers can start
566/// processing data even while earlier batches are still being written.
567fn mint_batch_descriptions<G, D>(
568    name: String,
569    sink_id: GlobalId,
570    input: VecCollection<G, D, Diff>,
571    sink: &StorageSinkDesc<CollectionMetadata, Timestamp>,
572    connection: IcebergSinkConnection,
573    storage_configuration: StorageConfiguration,
574    initial_schema: SchemaRef,
575) -> (
576    VecCollection<G, D, Diff>,
577    StreamVec<G, (Antichain<Timestamp>, Antichain<Timestamp>)>,
578    StreamVec<G, Infallible>,
579    StreamVec<G, HealthStatusMessage>,
580    PressOnDropButton,
581)
582where
583    G: Scope<Timestamp = Timestamp>,
584    D: Clone + 'static,
585{
586    let scope = input.scope();
587    let name_for_error = name.clone();
588    let name_for_logging = name.clone();
589    let mut builder = OperatorBuilder::new(name, scope.clone());
590    let sink_version = sink.version;
591
592    let hashed_id = sink_id.hashed();
593    let is_active_worker = usize::cast_from(hashed_id) % scope.peers() == scope.index();
594    let (_, table_ready_stream) = builder.new_output::<CapacityContainerBuilder<Vec<_>>>();
595    let (output, output_stream) = builder.new_output();
596    let (batch_desc_output, batch_desc_stream) =
597        builder.new_output::<CapacityContainerBuilder<Vec<_>>>();
598    let mut input =
599        builder.new_input_for_many(input.inner, Pipeline, [&output, &batch_desc_output]);
600
601    let as_of = sink.as_of.clone();
602    let commit_interval = sink
603        .commit_interval
604        .expect("the planner should have enforced this")
605        .clone();
606
607    let (button, errors): (_, StreamVec<G, Rc<anyhow::Error>>) =
608        builder.build_fallible(move |caps| {
609        Box::pin(async move {
610            let [table_ready_capset, data_capset, capset]: &mut [_; 3] = caps.try_into().unwrap();
611            *data_capset = CapabilitySet::new();
612
613            if !is_active_worker {
614                *capset = CapabilitySet::new();
615                *data_capset = CapabilitySet::new();
616                *table_ready_capset = CapabilitySet::new();
617                while let Some(event) = input.next().await {
618                    match event {
619                        Event::Data([output_cap, _], mut data) => {
620                            output.give_container(&output_cap, &mut data);
621                        }
622                        Event::Progress(_) => {}
623                    }
624                }
625                return Ok(());
626            }
627
628            let catalog = connection
629                .catalog_connection
630                .connect(&storage_configuration, InTask::Yes)
631                .await
632                .with_context(|| {
633                    format!(
634                        "Failed to connect to Iceberg catalog '{}' for table '{}.{}'",
635                        connection.catalog_connection.uri, connection.namespace, connection.table
636                    )
637                })?;
638
639            let table = load_or_create_table(
640                catalog.as_ref(),
641                connection.namespace.clone(),
642                connection.table.clone(),
643                initial_schema.as_ref(),
644            )
645            .await?;
646            debug!(
647                ?sink_id,
648                %name_for_logging,
649                namespace = %connection.namespace,
650                table = %connection.table,
651                "iceberg mint loaded/created table"
652            );
653
654            *table_ready_capset = CapabilitySet::new();
655
656            let mut snapshots: Vec<_> = table.metadata().snapshots().cloned().collect();
657            let resume = retrieve_upper_from_snapshots(&mut snapshots)?;
658            let (resume_upper, resume_version) = match resume {
659                Some((f, v)) => (f, v),
660                None => (Antichain::from_elem(Timestamp::minimum()), 0),
661            };
662            debug!(
663                ?sink_id,
664                %name_for_logging,
665                resume_upper = %resume_upper.pretty(),
666                resume_version,
667                as_of = %as_of.pretty(),
668                "iceberg mint resume position loaded"
669            );
670
671            // The input has overcompacted if
672            let overcompacted =
673                // ..we have made some progress in the past
674                *resume_upper != [Timestamp::minimum()] &&
675                // ..but the since frontier is now beyond that
676                PartialOrder::less_than(&resume_upper, &as_of);
677
678            if overcompacted {
679                let err = format!(
680                    "{name_for_error}: input compacted past resume upper: as_of {}, resume_upper: {}",
681                    as_of.pretty(),
682                    resume_upper.pretty()
683                );
684                // This would normally be an assertion but because it can happen after a
685                // Materialize backup/restore we log an error so that it appears on Sentry but
686                // leaves the rest of the objects in the cluster unaffected.
687                return Err(anyhow::anyhow!("{err}"));
688            };
689
690            if resume_version > sink_version {
691                anyhow::bail!("Fenced off by newer sink version: resume_version {}, sink_version {}", resume_version, sink_version);
692            }
693
694            let mut initialized = false;
695            let mut observed_frontier;
696            let mut max_seen_ts: Option<Timestamp> = None;
697            // Track minted batches to maintain a sliding window of open batch descriptions.
698            // This is needed to know when to retire old batches and mint new ones.
699            // It's "sortedness" is derived from the monotonicity of batch descriptions,
700            // and the fact that we only ever push new descriptions to the back and pop from the front.
701            let mut minted_batches = VecDeque::new();
702            loop {
703                if let Some(event) = input.next().await {
704                    match event {
705                        Event::Data([output_cap, _], mut data) => {
706                            if !initialized {
707                                for (_, ts, _) in data.iter() {
708                                    match max_seen_ts.as_mut() {
709                                        Some(max) => {
710                                            if max.less_than(ts) {
711                                                *max = ts.clone();
712                                            }
713                                        }
714                                        None => {
715                                            max_seen_ts = Some(ts.clone());
716                                        }
717                                    }
718                                }
719                            }
720                            output.give_container(&output_cap, &mut data);
721                            continue;
722                        }
723                        Event::Progress(frontier) => {
724                            observed_frontier = frontier;
725                        }
726                    }
727                } else {
728                    return Ok(());
729                }
730
731                if !initialized {
732                    if observed_frontier.is_empty() {
733                        // Bounded inputs can close (frontier becomes empty) before we finish
734                        // initialization. For example, a loadgen source configured for a finite
735                        // dataset may emit all rows at time t and then immediately close. If we
736                        // saw any data, synthesize an upper one tick past the maximum timestamp
737                        // so we can mint a snapshot batch and commit it.
738                        if let Some(max_ts) = max_seen_ts.as_ref() {
739                            let synthesized_upper =
740                                Antichain::from_elem(max_ts.step_forward());
741                            debug!(
742                                ?sink_id,
743                                %name_for_logging,
744                                max_seen_ts = %max_ts,
745                                synthesized_upper = %synthesized_upper.pretty(),
746                                "iceberg mint input closed before initialization; using max seen ts"
747                            );
748                            observed_frontier = synthesized_upper;
749                        } else {
750                            debug!(
751                                ?sink_id,
752                                %name_for_logging,
753                                "iceberg mint input closed before initialization with no data"
754                            );
755                            // Input stream closed before initialization completed and no data arrived.
756                            return Ok(());
757                        }
758                    }
759
760                    // We only start minting after we've reached as_of and resume_upper to avoid
761                    // minting batches that would be immediately skipped.
762                    if PartialOrder::less_than(&observed_frontier, &resume_upper)
763                        || PartialOrder::less_than(&observed_frontier, &as_of)
764                    {
765                        continue;
766                    }
767
768                    let mut batch_descriptions = vec![];
769                    let mut current_upper = observed_frontier.clone();
770                    let current_upper_ts = current_upper.as_option().expect("frontier not empty").clone();
771
772                    // Create a catch-up batch from the later of resume_upper or as_of to current frontier.
773                    // We use the later of the two because:
774                    // - For fresh sinks: resume_upper = minimum, as_of = actual timestamp, data starts at as_of
775                    // - For resuming: as_of <= resume_upper (enforced by overcompaction check), data starts at resume_upper
776                    let batch_lower = if PartialOrder::less_than(&resume_upper, &as_of) {
777                        as_of.clone()
778                    } else {
779                        resume_upper.clone()
780                    };
781
782                    if batch_lower == current_upper {
783                        // Snapshot! as_of is exactly at the frontier. We still need to mint
784                        // a batch to create the snapshot, so we step the upper forward by one.
785                        current_upper = Antichain::from_elem(current_upper_ts.step_forward());
786                    }
787
788                    let batch_description = (batch_lower.clone(), current_upper.clone());
789                    debug!(
790                        ?sink_id,
791                        %name_for_logging,
792                        batch_lower = %batch_lower.pretty(),
793                        current_upper = %current_upper.pretty(),
794                        "iceberg mint initializing (catch-up batch)"
795                    );
796                    debug!(
797                        "{}: creating catch-up batch [{}, {})",
798                        name_for_logging,
799                        batch_lower.pretty(),
800                        current_upper.pretty()
801                    );
802                    batch_descriptions.push(batch_description);
803                    // Mint initial future batch descriptions at configurable intervals
804                    for i in 1..INITIAL_DESCRIPTIONS_TO_MINT {
805                        let duration_millis = commit_interval.as_millis()
806                            .checked_mul(u128::from(i))
807                            .expect("commit interval multiplication overflow");
808                        let duration_ts = Timestamp::new(
809                            u64::try_from(duration_millis)
810                                .expect("commit interval too large for u64"),
811                        );
812                        let desired_batch_upper = Antichain::from_elem(
813                            current_upper_ts.step_forward_by(&duration_ts),
814                        );
815
816                        let batch_description =
817                            (current_upper.clone(), desired_batch_upper.clone());
818                        debug!(
819                            "{}: minting future batch {}/{} [{}, {})",
820                            name_for_logging,
821                            i,
822                            INITIAL_DESCRIPTIONS_TO_MINT,
823                            current_upper.pretty(),
824                            desired_batch_upper.pretty()
825                        );
826                        current_upper = batch_description.1.clone();
827                        batch_descriptions.push(batch_description);
828                    }
829
830                    minted_batches.extend(batch_descriptions.clone());
831
832                    for desc in batch_descriptions {
833                        batch_desc_output.give(&capset[0], desc);
834                    }
835
836                    capset.downgrade(current_upper);
837
838                    initialized = true;
839                } else {
840                    if observed_frontier.is_empty() {
841                        // We're done!
842                        return Ok(());
843                    }
844                    // Maintain a sliding window: when the oldest batch becomes ready, retire it
845                    // and mint a new future batch to keep the pipeline full
846                    while let Some(oldest_desc) = minted_batches.front() {
847                        let oldest_upper = &oldest_desc.1;
848                        if !PartialOrder::less_equal(oldest_upper, &observed_frontier) {
849                            break;
850                        }
851
852                        let newest_upper = minted_batches.back().unwrap().1.clone();
853                        let new_lower = newest_upper.clone();
854                        let duration_ts = Timestamp::new(commit_interval.as_millis()
855                            .try_into()
856                            .expect("commit interval too large for u64"));
857                        let new_upper = Antichain::from_elem(newest_upper
858                            .as_option()
859                            .unwrap()
860                            .step_forward_by(&duration_ts));
861
862                        let new_batch_description = (new_lower.clone(), new_upper.clone());
863                        minted_batches.pop_front();
864                        minted_batches.push_back(new_batch_description.clone());
865
866                        batch_desc_output.give(&capset[0], new_batch_description);
867
868                        capset.downgrade(new_upper);
869                    }
870                }
871            }
872        })
873    });
874
875    let statuses = errors.map(|error| HealthStatusMessage {
876        id: None,
877        update: HealthStatusUpdate::halting(format!("{}", error.display_with_causes()), None),
878        namespace: StatusNamespace::Iceberg,
879    });
880    (
881        output_stream.as_collection(),
882        batch_desc_stream,
883        table_ready_stream,
884        statuses,
885        button.press_on_drop(),
886    )
887}
888
889#[derive(Clone, Debug, Serialize, Deserialize)]
890#[serde(try_from = "AvroDataFile", into = "AvroDataFile")]
891struct SerializableDataFile {
892    pub data_file: DataFile,
893    pub schema: Schema,
894}
895
896/// A wrapper around Iceberg's DataFile that implements Serialize and Deserialize.
897/// This is slightly complicated by the fact that Iceberg's DataFile doesn't implement
898/// these traits directly, so we serialize to/from Avro bytes (which Iceberg supports natively).
899/// The avro ser(de) also requires the Iceberg schema to be provided, so we include that as well.
900/// It is distinctly possible that this is overkill, but it avoids re-implementing
901/// Iceberg's serialization logic here.
902/// If at some point this becomes a serious overhead, we can revisit this decision.
903#[derive(Clone, Debug, Serialize, Deserialize)]
904struct AvroDataFile {
905    pub data_file: Vec<u8>,
906    /// Schema serialized as JSON bytes to avoid bincode issues with HashMap
907    pub schema: Vec<u8>,
908}
909
910impl From<SerializableDataFile> for AvroDataFile {
911    fn from(value: SerializableDataFile) -> Self {
912        let mut data_file = Vec::new();
913        write_data_files_to_avro(
914            &mut data_file,
915            [value.data_file],
916            &StructType::new(vec![]),
917            FormatVersion::V2,
918        )
919        .expect("serialization into buffer");
920        let schema = serde_json::to_vec(&value.schema).expect("schema serialization");
921        AvroDataFile { data_file, schema }
922    }
923}
924
925impl TryFrom<AvroDataFile> for SerializableDataFile {
926    type Error = String;
927
928    fn try_from(value: AvroDataFile) -> Result<Self, Self::Error> {
929        let schema: Schema = serde_json::from_slice(&value.schema)
930            .map_err(|e| format!("Failed to deserialize schema: {}", e))?;
931        let data_files = read_data_files_from_avro(
932            &mut &*value.data_file,
933            &schema,
934            0,
935            &StructType::new(vec![]),
936            FormatVersion::V2,
937        )
938        .map_err_to_string_with_causes()?;
939        let Some(data_file) = data_files.into_iter().next() else {
940            return Err("No DataFile found in Avro data".into());
941        };
942        Ok(SerializableDataFile { data_file, schema })
943    }
944}
945
946/// A DataFile along with its associated batch description (lower and upper bounds).
947#[derive(Clone, Debug, Serialize, Deserialize)]
948struct BoundedDataFile {
949    pub data_file: SerializableDataFile,
950    pub batch_desc: (Antichain<Timestamp>, Antichain<Timestamp>),
951}
952
953impl BoundedDataFile {
954    pub fn new(
955        file: DataFile,
956        schema: Schema,
957        batch_desc: (Antichain<Timestamp>, Antichain<Timestamp>),
958    ) -> Self {
959        Self {
960            data_file: SerializableDataFile {
961                data_file: file,
962                schema,
963            },
964            batch_desc,
965        }
966    }
967
968    pub fn batch_desc(&self) -> &(Antichain<Timestamp>, Antichain<Timestamp>) {
969        &self.batch_desc
970    }
971
972    pub fn data_file(&self) -> &DataFile {
973        &self.data_file.data_file
974    }
975
976    pub fn into_data_file(self) -> DataFile {
977        self.data_file.data_file
978    }
979}
980
981/// A set of DataFiles along with their associated batch descriptions.
982#[derive(Clone, Debug, Default)]
983struct BoundedDataFileSet {
984    pub data_files: Vec<BoundedDataFile>,
985}
986
987/// Write rows into Parquet data files bounded by batch descriptions.
988/// Rows are matched to batches by timestamp; if a batch description hasn't arrived yet,
989/// rows are stashed until it does. This allows batches to be minted ahead of data arrival.
990fn write_data_files<G>(
991    name: String,
992    input: VecCollection<G, (Option<Row>, DiffPair<Row>), Diff>,
993    batch_desc_input: StreamVec<G, (Antichain<Timestamp>, Antichain<Timestamp>)>,
994    table_ready_stream: StreamVec<G, Infallible>,
995    as_of: Antichain<Timestamp>,
996    connection: IcebergSinkConnection,
997    storage_configuration: StorageConfiguration,
998    materialize_arrow_schema: Arc<ArrowSchema>,
999    metrics: Arc<IcebergSinkMetrics>,
1000    statistics: SinkStatistics,
1001) -> (
1002    StreamVec<G, BoundedDataFile>,
1003    StreamVec<G, HealthStatusMessage>,
1004    PressOnDropButton,
1005)
1006where
1007    G: Scope<Timestamp = Timestamp>,
1008{
1009    let scope = input.scope();
1010    let name_for_logging = name.clone();
1011    let mut builder = OperatorBuilder::new(name, scope.clone());
1012
1013    let (output, output_stream) = builder.new_output::<CapacityContainerBuilder<_>>();
1014
1015    let mut table_ready_input = builder.new_disconnected_input(table_ready_stream, Pipeline);
1016    let mut batch_desc_input =
1017        builder.new_input_for(batch_desc_input.broadcast(), Pipeline, &output);
1018    let mut input = builder.new_disconnected_input(input.inner, Pipeline);
1019
1020    let (button, errors) = builder.build_fallible(move |caps| {
1021        Box::pin(async move {
1022            let [capset]: &mut [_; 1] = caps.try_into().unwrap();
1023            let catalog = connection
1024                .catalog_connection
1025                .connect(&storage_configuration, InTask::Yes)
1026                .await
1027                .with_context(|| {
1028                    format!(
1029                        "Failed to connect to Iceberg catalog '{}' for table '{}.{}'",
1030                        connection.catalog_connection.uri, connection.namespace, connection.table
1031                    )
1032                })?;
1033
1034            let namespace_ident = NamespaceIdent::new(connection.namespace.clone());
1035            let table_ident = TableIdent::new(namespace_ident, connection.table.clone());
1036            while let Some(_) = table_ready_input.next().await {
1037                // Wait for table to be ready
1038            }
1039            let table = catalog
1040                .load_table(&table_ident)
1041                .await
1042                .with_context(|| {
1043                    format!(
1044                        "Failed to load Iceberg table '{}.{}' in write_data_files operator",
1045                        connection.namespace, connection.table
1046                    )
1047                })?;
1048
1049            let table_metadata = table.metadata().clone();
1050            let current_schema = Arc::clone(table_metadata.current_schema());
1051
1052            // Merge Materialize extension metadata into the Iceberg schema.
1053            // We need extension metadata for ArrowBuilder to work correctly (it uses
1054            // extension names to know how to handle different types like records vs arrays).
1055            let arrow_schema = Arc::new(
1056                merge_materialize_metadata_into_iceberg_schema(
1057                    materialize_arrow_schema.as_ref(),
1058                    current_schema.as_ref(),
1059                )
1060                .context("Failed to merge Materialize metadata into Iceberg schema")?,
1061            );
1062
1063            // Build schema_with_op by adding the __op column used by DeltaWriter.
1064            let schema_with_op = Arc::new(build_schema_with_op_column(&arrow_schema));
1065
1066            // WORKAROUND: S3 Tables catalog incorrectly sets location to the metadata file path
1067            // instead of the warehouse root. Strip off the /metadata/*.metadata.json suffix.
1068            // No clear way to detect this properly right now, so we use heuristics.
1069            let location = table_metadata.location();
1070            let corrected_location = match location.rsplit_once("/metadata/") {
1071                Some((a, b)) if b.ends_with(".metadata.json") => a,
1072                _ => location,
1073            };
1074
1075            let data_location = format!("{}/data", corrected_location);
1076            let location_generator = DefaultLocationGenerator::with_data_location(data_location);
1077
1078            // Add a unique suffix to avoid filename collisions across restarts and workers
1079            let unique_suffix = format!("-{}", uuid::Uuid::new_v4());
1080            let file_name_generator = DefaultFileNameGenerator::new(
1081                PARQUET_FILE_PREFIX.to_string(),
1082                Some(unique_suffix),
1083                iceberg::spec::DataFileFormat::Parquet,
1084            );
1085
1086            let file_io = table.file_io().clone();
1087
1088            let Some((_, equality_indices)) = connection.key_desc_and_indices else {
1089                return Err(anyhow::anyhow!(
1090                    "Iceberg sink requires key columns for equality deletes"
1091                ));
1092            };
1093
1094            let equality_ids: Vec<i32> = equality_indices
1095                .iter()
1096                .map(|u| i32::try_from(*u).map(|v| v + 1))
1097                .collect::<Result<Vec<i32>, _>>()
1098                .context("Failed to convert equality index to i32 (index too large)")?;
1099
1100            let writer_properties = WriterProperties::new();
1101
1102            // Precompute schemas that don't change between writer instances.
1103            let pos_arrow_schema = PositionDeleteWriterConfig::arrow_schema();
1104            let pos_schema = Arc::new(arrow_schema_to_schema(&pos_arrow_schema).context(
1105                "Failed to convert position delete Arrow schema to Iceberg schema",
1106            )?);
1107
1108            let eq_config = EqualityDeleteWriterConfig::new(
1109                equality_ids.clone(),
1110                Arc::clone(&current_schema),
1111            )
1112            .context("Failed to create EqualityDeleteWriterConfig")?;
1113            let eq_schema = Arc::new(
1114                arrow_schema_to_schema(eq_config.projected_arrow_schema_ref()).context(
1115                    "Failed to convert equality delete Arrow schema to Iceberg schema",
1116                )?,
1117            );
1118
1119            let arrow_schema_for_closure = Arc::clone(&arrow_schema);
1120            let current_schema_for_closure = Arc::clone(&current_schema);
1121
1122            let create_delta_writer = move |disable_seen_rows: bool| {
1123                let arrow_schema = Arc::clone(&arrow_schema_for_closure);
1124                let current_schema = Arc::clone(&current_schema_for_closure);
1125                let file_io = file_io.clone();
1126                let location_generator = location_generator.clone();
1127                let file_name_generator = file_name_generator.clone();
1128                let equality_ids = equality_ids.clone();
1129                let writer_properties = writer_properties.clone();
1130                let pos_schema = Arc::clone(&pos_schema);
1131                let eq_schema = Arc::clone(&eq_schema);
1132                let eq_config = eq_config.clone();
1133
1134                async move {
1135                    let data_parquet_writer = ParquetWriterBuilder::new(
1136                        writer_properties.clone(),
1137                        Arc::clone(&current_schema),
1138                    )
1139                    .with_arrow_schema(Arc::clone(&arrow_schema))
1140                    .context("Arrow schema validation failed")?;
1141                    let data_rolling_writer = RollingFileWriterBuilder::new_with_default_file_size(
1142                        data_parquet_writer,
1143                        Arc::clone(&current_schema),
1144                        file_io.clone(),
1145                        location_generator.clone(),
1146                        file_name_generator.clone(),
1147                    );
1148                    let data_writer_builder = DataFileWriterBuilder::new(data_rolling_writer);
1149
1150                    let pos_config = PositionDeleteWriterConfig::new(None, 0, None);
1151                    let pos_parquet_writer =
1152                        ParquetWriterBuilder::new(writer_properties.clone(), pos_schema);
1153                    let pos_rolling_writer = RollingFileWriterBuilder::new_with_default_file_size(
1154                        pos_parquet_writer,
1155                        Arc::clone(&current_schema),
1156                        file_io.clone(),
1157                        location_generator.clone(),
1158                        file_name_generator.clone(),
1159                    );
1160                    let pos_delete_writer_builder =
1161                        PositionDeleteFileWriterBuilder::new(pos_rolling_writer, pos_config);
1162
1163                    let eq_parquet_writer =
1164                        ParquetWriterBuilder::new(writer_properties.clone(), eq_schema);
1165                    let eq_rolling_writer = RollingFileWriterBuilder::new_with_default_file_size(
1166                        eq_parquet_writer,
1167                        Arc::clone(&current_schema),
1168                        file_io.clone(),
1169                        location_generator.clone(),
1170                        file_name_generator.clone(),
1171                    );
1172                    let eq_delete_writer_builder =
1173                        EqualityDeleteFileWriterBuilder::new(eq_rolling_writer, eq_config);
1174
1175                    let mut builder = DeltaWriterBuilder::new(
1176                        data_writer_builder,
1177                        pos_delete_writer_builder,
1178                        eq_delete_writer_builder,
1179                        equality_ids.clone(),
1180                    );
1181
1182                    if disable_seen_rows {
1183                        builder = builder.with_max_seen_rows(0);
1184                    }
1185
1186                    builder
1187                        .build(None)
1188                        .await
1189                        .context("Failed to create DeltaWriter")
1190                }
1191            };
1192
1193            // Rows can arrive before their batch description due to dataflow parallelism.
1194            // Stash them until we know which batch they belong to.
1195            let mut stashed_rows: BTreeMap<Timestamp, Vec<(Option<Row>, DiffPair<Row>)>> =
1196                BTreeMap::new();
1197
1198            // Track batches currently being written. When a row arrives, we check if it belongs
1199            // to an in-flight batch. When frontiers advance to a batch's upper, we close the
1200            // writer and emit its data files downstream.
1201            // Antichains don't implement Ord, so we use a HashMap with tuple keys instead.
1202            #[allow(clippy::disallowed_types)]
1203            let mut in_flight_batches: std::collections::HashMap<
1204                (Antichain<Timestamp>, Antichain<Timestamp>),
1205                Box<DeltaWriterType>,
1206            > = std::collections::HashMap::new();
1207
1208            let mut batch_description_frontier = Antichain::from_elem(Timestamp::minimum());
1209            let mut processed_batch_description_frontier =
1210                Antichain::from_elem(Timestamp::minimum());
1211            let mut input_frontier = Antichain::from_elem(Timestamp::minimum());
1212            let mut processed_input_frontier = Antichain::from_elem(Timestamp::minimum());
1213
1214            // Track the minimum batch lower bound to prune data that's already committed
1215            let mut min_batch_lower: Option<Antichain<Timestamp>> = None;
1216
1217            while !(batch_description_frontier.is_empty() && input_frontier.is_empty()) {
1218                let mut staged_messages_since_flush: u64 = 0;
1219                tokio::select! {
1220                    _ = batch_desc_input.ready() => {},
1221                    _ = input.ready() => {}
1222                }
1223
1224                while let Some(event) = batch_desc_input.next_sync() {
1225                    match event {
1226                        Event::Data(_cap, data) => {
1227                            for batch_desc in data {
1228                                let (lower, upper) = &batch_desc;
1229
1230                                // Track the minimum batch lower bound (first batch received)
1231                                if min_batch_lower.is_none() {
1232                                    min_batch_lower = Some(lower.clone());
1233                                    debug!(
1234                                        "{}: set min_batch_lower to {}",
1235                                        name_for_logging,
1236                                        lower.pretty()
1237                                    );
1238
1239                                    // Prune any stashed rows that arrived before min_batch_lower (already committed)
1240                                    let to_remove: Vec<_> = stashed_rows
1241                                        .keys()
1242                                        .filter(|ts| {
1243                                            let ts_antichain = Antichain::from_elem((*ts).clone());
1244                                            PartialOrder::less_than(&ts_antichain, lower)
1245                                        })
1246                                        .cloned()
1247                                        .collect();
1248
1249                                    if !to_remove.is_empty() {
1250                                        let mut removed_count = 0;
1251                                        for ts in to_remove {
1252                                            if let Some(rows) = stashed_rows.remove(&ts) {
1253                                                removed_count += rows.len();
1254                                                for _ in &rows {
1255                                                    metrics.stashed_rows.dec();
1256                                                }
1257                                            }
1258                                        }
1259                                        debug!(
1260                                            "{}: pruned {} already-committed rows (< min_batch_lower)",
1261                                            name_for_logging,
1262                                            removed_count
1263                                        );
1264                                    }
1265                                }
1266
1267                                // Disable seen_rows tracking for snapshot batch to save memory
1268                                let is_snapshot = lower == &as_of;
1269                                debug!(
1270                                    "{}: received batch description [{}, {}), snapshot={}",
1271                                    name_for_logging,
1272                                    lower.pretty(),
1273                                    upper.pretty(),
1274                                    is_snapshot
1275                                );
1276                                let mut delta_writer = create_delta_writer(is_snapshot).await?;
1277                                // Drain any stashed rows that belong to this batch
1278                                let row_ts_keys: Vec<_> = stashed_rows.keys().cloned().collect();
1279                                let mut drained_count = 0;
1280                                for row_ts in row_ts_keys {
1281                                    let ts = Antichain::from_elem(row_ts.clone());
1282                                    if PartialOrder::less_equal(lower, &ts)
1283                                        && PartialOrder::less_than(&ts, upper)
1284                                    {
1285                                        if let Some(rows) = stashed_rows.remove(&row_ts) {
1286                                            drained_count += rows.len();
1287                                            for (_row, diff_pair) in rows {
1288                                                metrics.stashed_rows.dec();
1289                                                let record_batch = row_to_recordbatch(
1290                                                    diff_pair.clone(),
1291                                                    Arc::clone(&arrow_schema),
1292                                                    Arc::clone(&schema_with_op),
1293                                                )
1294                                                .context("failed to convert row to recordbatch")?;
1295                                                delta_writer.write(record_batch).await.context(
1296                                                    "Failed to write row to DeltaWriter",
1297                                                )?;
1298                                                staged_messages_since_flush += 1;
1299                                                if staged_messages_since_flush >= 10_000 {
1300                                                    statistics.inc_messages_staged_by(
1301                                                        staged_messages_since_flush,
1302                                                    );
1303                                                    staged_messages_since_flush = 0;
1304                                                }
1305                                            }
1306                                        }
1307                                    }
1308                                }
1309                                if drained_count > 0 {
1310                                    debug!(
1311                                        "{}: drained {} stashed rows into batch [{}, {})",
1312                                        name_for_logging,
1313                                        drained_count,
1314                                        lower.pretty(),
1315                                        upper.pretty()
1316                                    );
1317                                }
1318                                let prev =
1319                                    in_flight_batches.insert(
1320                                        batch_desc.clone(), Box::new(delta_writer)
1321                                    );
1322                                if prev.is_some() {
1323                                    anyhow::bail!(
1324                                        "Duplicate batch description received for description {:?}",
1325                                        batch_desc
1326                                    );
1327                                }
1328                            }
1329                        }
1330                        Event::Progress(frontier) => {
1331                            batch_description_frontier = frontier;
1332                        }
1333                    }
1334                }
1335
1336                let ready_events = std::iter::from_fn(|| input.next_sync()).collect_vec();
1337                for event in ready_events {
1338                    match event {
1339                        Event::Data(_cap, data) => {
1340                            let mut dropped_per_time = BTreeMap::new();
1341                            let mut stashed_per_time = BTreeMap::new();
1342                            for ((row, diff_pair), ts, _diff) in data {
1343                                let row_ts = ts.clone();
1344                                let ts_antichain = Antichain::from_elem(row_ts.clone());
1345                                let mut written = false;
1346                                // Try writing the row to any in-flight batch it belongs to...
1347                                for (batch_desc, delta_writer) in in_flight_batches.iter_mut() {
1348                                    let (lower, upper) = batch_desc;
1349                                    if PartialOrder::less_equal(lower, &ts_antichain)
1350                                        && PartialOrder::less_than(&ts_antichain, upper)
1351                                    {
1352                                        let record_batch = row_to_recordbatch(
1353                                            diff_pair.clone(),
1354                                            Arc::clone(&arrow_schema),
1355                                            Arc::clone(&schema_with_op),
1356                                        )
1357                                        .context("failed to convert row to recordbatch")?;
1358                                        delta_writer
1359                                            .write(record_batch)
1360                                            .await
1361                                            .context("Failed to write row to DeltaWriter")?;
1362                                        staged_messages_since_flush += 1;
1363                                        if staged_messages_since_flush >= 10_000 {
1364                                            statistics.inc_messages_staged_by(
1365                                                staged_messages_since_flush,
1366                                            );
1367                                            staged_messages_since_flush = 0;
1368                                        }
1369                                        written = true;
1370                                        break;
1371                                    }
1372                                }
1373                                if !written {
1374                                    // Drop data that's before the first batch we received (already committed)
1375                                    if let Some(ref min_lower) = min_batch_lower {
1376                                        if PartialOrder::less_than(&ts_antichain, min_lower) {
1377                                            dropped_per_time
1378                                                .entry(ts_antichain.into_option().unwrap())
1379                                                .and_modify(|c| *c += 1)
1380                                                .or_insert(1);
1381                                            continue;
1382                                        }
1383                                    }
1384
1385                                    stashed_per_time.entry(ts).and_modify(|c| *c += 1).or_insert(1);
1386                                    let entry = stashed_rows.entry(row_ts).or_default();
1387                                    metrics.stashed_rows.inc();
1388                                    entry.push((row, diff_pair));
1389                                }
1390                            }
1391
1392                            for (ts, count) in dropped_per_time {
1393                                debug!(
1394                                    "{}: dropped {} rows at timestamp {} (< min_batch_lower, already committed)",
1395                                    name_for_logging, count, ts
1396                                );
1397                            }
1398
1399                            for (ts, count) in stashed_per_time {
1400                                debug!(
1401                                    "{}: stashed {} rows at timestamp {} (waiting for batch description)",
1402                                    name_for_logging, count, ts
1403                                );
1404                            }
1405                        }
1406                        Event::Progress(frontier) => {
1407                            input_frontier = frontier;
1408                        }
1409                    }
1410                }
1411                if staged_messages_since_flush > 0 {
1412                    statistics.inc_messages_staged_by(staged_messages_since_flush);
1413                }
1414
1415                // Check if frontiers have advanced, which may unlock batches ready to close
1416                if PartialOrder::less_than(
1417                    &processed_batch_description_frontier,
1418                    &batch_description_frontier,
1419                ) || PartialOrder::less_than(&processed_input_frontier, &input_frontier)
1420                {
1421                    // Close batches whose upper is now in the past
1422                    // Upper bounds are exclusive, so we check if upper is less_equal to the frontier.
1423                    // Remember: a frontier at x means all timestamps less than x have been observed.
1424                    // Or, in other words we still might yet see timestamps at [x, infinity). X itself will
1425                    // be covered by the _next_ batches lower inclusive bound, so we can safely close the batch if its upper is <= x.
1426                    let ready_batches: Vec<_> = in_flight_batches
1427                        .extract_if(|(lower, upper), _| {
1428                            PartialOrder::less_than(lower, &batch_description_frontier)
1429                                && PartialOrder::less_equal(upper, &input_frontier)
1430                        })
1431                        .collect();
1432
1433                    if !ready_batches.is_empty() {
1434                        debug!(
1435                            "{}: closing {} batches (batch_frontier: {}, input_frontier: {})",
1436                            name_for_logging,
1437                            ready_batches.len(),
1438                            batch_description_frontier.pretty(),
1439                            input_frontier.pretty()
1440                        );
1441                        let mut max_upper = Antichain::from_elem(Timestamp::minimum());
1442                        for (desc, mut delta_writer) in ready_batches {
1443                            let close_started_at = Instant::now();
1444                            let data_files = delta_writer.close().await;
1445                            metrics
1446                                .writer_close_duration_seconds
1447                                .observe(close_started_at.elapsed().as_secs_f64());
1448                            let data_files = data_files.context("Failed to close DeltaWriter")?;
1449                            debug!(
1450                                "{}: closed batch [{}, {}), wrote {} files",
1451                                name_for_logging,
1452                                desc.0.pretty(),
1453                                desc.1.pretty(),
1454                                data_files.len()
1455                            );
1456                            for data_file in data_files {
1457                                match data_file.content_type() {
1458                                    iceberg::spec::DataContentType::Data => {
1459                                        metrics.data_files_written.inc();
1460                                    }
1461                                    iceberg::spec::DataContentType::PositionDeletes
1462                                    | iceberg::spec::DataContentType::EqualityDeletes => {
1463                                        metrics.delete_files_written.inc();
1464                                    }
1465                                }
1466                                statistics.inc_messages_staged_by(data_file.record_count());
1467                                statistics.inc_bytes_staged_by(data_file.file_size_in_bytes());
1468                                let file = BoundedDataFile::new(
1469                                    data_file,
1470                                    current_schema.as_ref().clone(),
1471                                    desc.clone(),
1472                                );
1473                                output.give(&capset[0], file);
1474                            }
1475
1476                            max_upper = max_upper.join(&desc.1);
1477                        }
1478
1479                        capset.downgrade(max_upper);
1480                    }
1481                    processed_batch_description_frontier.clone_from(&batch_description_frontier);
1482                    processed_input_frontier.clone_from(&input_frontier);
1483                }
1484            }
1485            Ok(())
1486        })
1487    });
1488
1489    let statuses = errors.map(|error| HealthStatusMessage {
1490        id: None,
1491        update: HealthStatusUpdate::halting(format!("{}", error.display_with_causes()), None),
1492        namespace: StatusNamespace::Iceberg,
1493    });
1494    (output_stream, statuses, button.press_on_drop())
1495}
1496
1497/// Commit completed batches to Iceberg as snapshots.
1498/// Batches are committed in timestamp order to ensure strong consistency guarantees downstream.
1499/// Each snapshot includes the Materialize frontier in its metadata for resume support.
1500fn commit_to_iceberg<G>(
1501    name: String,
1502    sink_id: GlobalId,
1503    sink_version: u64,
1504    batch_input: StreamVec<G, BoundedDataFile>,
1505    batch_desc_input: StreamVec<G, (Antichain<Timestamp>, Antichain<Timestamp>)>,
1506    table_ready_stream: StreamVec<G, Infallible>,
1507    write_frontier: Rc<RefCell<Antichain<Timestamp>>>,
1508    connection: IcebergSinkConnection,
1509    storage_configuration: StorageConfiguration,
1510    write_handle: impl Future<
1511        Output = anyhow::Result<WriteHandle<SourceData, (), Timestamp, StorageDiff>>,
1512    > + 'static,
1513    metrics: Arc<IcebergSinkMetrics>,
1514    statistics: SinkStatistics,
1515) -> (StreamVec<G, HealthStatusMessage>, PressOnDropButton)
1516where
1517    G: Scope<Timestamp = Timestamp>,
1518{
1519    let scope = batch_input.scope();
1520    let mut builder = OperatorBuilder::new(name, scope.clone());
1521
1522    let hashed_id = sink_id.hashed();
1523    let is_active_worker = usize::cast_from(hashed_id) % scope.peers() == scope.index();
1524    let name_for_logging = format!("{sink_id}-commit-to-iceberg");
1525
1526    let mut input = builder.new_disconnected_input(batch_input, Exchange::new(move |_| hashed_id));
1527    let mut batch_desc_input =
1528        builder.new_disconnected_input(batch_desc_input, Exchange::new(move |_| hashed_id));
1529    let mut table_ready_input = builder.new_disconnected_input(table_ready_stream, Pipeline);
1530
1531    let (button, errors) = builder.build_fallible(move |_caps| {
1532        Box::pin(async move {
1533            if !is_active_worker {
1534                write_frontier.borrow_mut().clear();
1535                return Ok(());
1536            }
1537
1538            let catalog = connection
1539                .catalog_connection
1540                .connect(&storage_configuration, InTask::Yes)
1541                .await
1542                .with_context(|| {
1543                    format!(
1544                        "Failed to connect to Iceberg catalog '{}' for table '{}.{}'",
1545                        connection.catalog_connection.uri, connection.namespace, connection.table
1546                    )
1547                })?;
1548
1549            let mut write_handle = write_handle.await?;
1550
1551            let namespace_ident = NamespaceIdent::new(connection.namespace.clone());
1552            let table_ident = TableIdent::new(namespace_ident, connection.table.clone());
1553            while let Some(_) = table_ready_input.next().await {
1554                // Wait for table to be ready
1555            }
1556            let mut table = catalog
1557                .load_table(&table_ident)
1558                .await
1559                .with_context(|| {
1560                    format!(
1561                        "Failed to load Iceberg table '{}.{}' in commit_to_iceberg operator",
1562                        connection.namespace, connection.table
1563                    )
1564                })?;
1565
1566            #[allow(clippy::disallowed_types)]
1567            let mut batch_descriptions: std::collections::HashMap<
1568                (Antichain<Timestamp>, Antichain<Timestamp>),
1569                BoundedDataFileSet,
1570            > = std::collections::HashMap::new();
1571
1572
1573            let mut batch_description_frontier = Antichain::from_elem(Timestamp::minimum());
1574            let mut input_frontier = Antichain::from_elem(Timestamp::minimum());
1575
1576            while !(batch_description_frontier.is_empty() && input_frontier.is_empty()) {
1577                tokio::select! {
1578                    _ = batch_desc_input.ready() => {},
1579                    _ = input.ready() => {}
1580                }
1581
1582                while let Some(event) = batch_desc_input.next_sync() {
1583                    match event {
1584                        Event::Data(_cap, data) => {
1585                            for batch_desc in data {
1586                                let prev = batch_descriptions.insert(
1587                                    batch_desc,
1588                                    BoundedDataFileSet { data_files: vec![] },
1589                                );
1590                                if let Some(prev) = prev {
1591                                    anyhow::bail!(
1592                                        "Duplicate batch description received \
1593                                         in commit operator: {:?}",
1594                                        prev
1595                                    );
1596                                }
1597                            }
1598                        }
1599                        Event::Progress(frontier) => {
1600                            batch_description_frontier = frontier;
1601                        }
1602                    }
1603                }
1604
1605                let ready_events = std::iter::from_fn(|| input.next_sync()).collect_vec();
1606                for event in ready_events {
1607                    match event {
1608                        Event::Data(_cap, data) => {
1609                            for bounded_data_file in data {
1610                                let entry = batch_descriptions
1611                                    .entry(bounded_data_file.batch_desc().clone())
1612                                    .or_default();
1613                                entry.data_files.push(bounded_data_file);
1614                            }
1615                        }
1616                        Event::Progress(frontier) => {
1617                            input_frontier = frontier;
1618                        }
1619                    }
1620                }
1621
1622                // Collect batches whose data files have all arrived.
1623                // The writer emits all data files for a batch at a capability <= the batch's
1624                // lower bound, then downgrades its capability to the batch's upper bound.
1625                // So once the input frontier advances past lower, we know the writer has
1626                // finished emitting files for this batch and dropped its capability.
1627                let mut done_batches: Vec<_> = batch_descriptions
1628                    .keys()
1629                    .filter(|(lower, _upper)| {
1630                        PartialOrder::less_than(lower, &input_frontier)
1631                    })
1632                    .cloned()
1633                    .collect();
1634
1635                // Commit batches in timestamp order to maintain consistency
1636                done_batches.sort_by(|a, b| {
1637                    if PartialOrder::less_than(a, b) {
1638                        Ordering::Less
1639                    } else if PartialOrder::less_than(b, a) {
1640                        Ordering::Greater
1641                    } else {
1642                        Ordering::Equal
1643                    }
1644                });
1645
1646                for batch in done_batches {
1647                    let file_set = batch_descriptions.remove(&batch).unwrap();
1648
1649                    let mut data_files = vec![];
1650                    let mut delete_files = vec![];
1651                    // Track totals for committed statistics
1652                    let mut total_messages: u64 = 0;
1653                    let mut total_bytes: u64 = 0;
1654                    for file in file_set.data_files {
1655                        total_messages += file.data_file().record_count();
1656                        total_bytes += file.data_file().file_size_in_bytes();
1657                        match file.data_file().content_type() {
1658                            iceberg::spec::DataContentType::Data => {
1659                                data_files.push(file.into_data_file());
1660                            }
1661                            iceberg::spec::DataContentType::PositionDeletes |
1662                            iceberg::spec::DataContentType::EqualityDeletes => {
1663                                delete_files.push(file.into_data_file());
1664                            }
1665                        }
1666                    }
1667
1668                    debug!(
1669                        ?sink_id,
1670                        %name_for_logging,
1671                        lower = %batch.0.pretty(),
1672                        upper = %batch.1.pretty(),
1673                        data_files = data_files.len(),
1674                        delete_files = delete_files.len(),
1675                        total_messages,
1676                        total_bytes,
1677                        "iceberg commit applying batch"
1678                    );
1679
1680                    let instant = Instant::now();
1681
1682                    let frontier = batch.1.clone();
1683                    let tx = Transaction::new(&table);
1684
1685                    let frontier_json = serde_json::to_string(&frontier.elements())
1686                        .context("Failed to serialize frontier to JSON")?;
1687
1688                    // Store the frontier in snapshot metadata so we can resume from this point
1689                    let mut action = tx.row_delta().set_snapshot_properties(
1690                        vec![
1691                            ("mz-sink-id".to_string(), sink_id.to_string()),
1692                            ("mz-frontier".to_string(), frontier_json),
1693                            ("mz-sink-version".to_string(), sink_version.to_string()),
1694                        ].into_iter().collect()
1695                    ).with_check_duplicate(false);
1696
1697                    if !data_files.is_empty() || !delete_files.is_empty() {
1698                        action = action.add_data_files(data_files).add_delete_files(delete_files);
1699                    }
1700
1701                    let tx = action.apply(tx).context(
1702                        "Failed to apply data file addition to iceberg table transaction",
1703                    )?;
1704
1705                    let commit_result = Retry::default().max_tries(5).retry_async(|_| async {
1706                        let new_table = tx.clone().commit(catalog.as_ref()).await;
1707                        match new_table {
1708                            Err(e) if matches!(e.kind(), ErrorKind::CatalogCommitConflicts) => {
1709                                metrics.commit_conflicts.inc();
1710                                let table = reload_table(
1711                                    catalog.as_ref(),
1712                                    connection.namespace.clone(),
1713                                    connection.table.clone(),
1714                                    table.clone(),
1715                                ).await;
1716                                let table = match table {
1717                                    Ok(table) => table,
1718                                    Err(e) => return RetryResult::RetryableErr(anyhow!(e)),
1719                                };
1720
1721                                let mut snapshots: Vec<_> =
1722                                    table.metadata().snapshots().cloned().collect();
1723                                let last = retrieve_upper_from_snapshots(&mut snapshots);
1724                                let last = match last {
1725                                    Ok(val) => val,
1726                                    Err(e) => return RetryResult::RetryableErr(anyhow!(e)),
1727                                };
1728
1729                                // Check if another writer has advanced the frontier beyond ours (fencing check)
1730                                if let Some((last_frontier, _last_version)) = last {
1731                                    if PartialOrder::less_equal(&frontier, &last_frontier) {
1732                                        return RetryResult::FatalErr(anyhow!(
1733                                            "Iceberg table '{}' has been modified by another writer. Current frontier: {:?}, last frontier: {:?}.",
1734                                            connection.table,
1735                                            frontier,
1736                                            last_frontier,
1737                                        ));
1738                                    }
1739                                }
1740
1741                                RetryResult::Ok(table)
1742                            }
1743                            Err(e) => {
1744                                metrics.commit_failures.inc();
1745                                RetryResult::RetryableErr(anyhow!(e))
1746                            },
1747                            Ok(table) => RetryResult::Ok(table)
1748                        }
1749                    })
1750                    .await
1751                    .with_context(|| {
1752                        format!(
1753                            "failed to commit batch to Iceberg table '{}.{}'",
1754                            connection.namespace, connection.table
1755                        )
1756                    });
1757                    let duration = instant.elapsed();
1758                    metrics
1759                        .commit_duration_seconds
1760                        .observe(duration.as_secs_f64());
1761                    table = commit_result?;
1762
1763                    debug!(
1764                        ?sink_id,
1765                        %name_for_logging,
1766                        lower = %batch.0.pretty(),
1767                        upper = %batch.1.pretty(),
1768                        total_messages,
1769                        total_bytes,
1770                        ?duration,
1771                        "iceberg commit applied batch"
1772                    );
1773
1774                    metrics.snapshots_committed.inc();
1775                    statistics.inc_messages_committed_by(total_messages);
1776                    statistics.inc_bytes_committed_by(total_bytes);
1777
1778                    let mut expect_upper = write_handle.shared_upper();
1779                    loop {
1780                        if PartialOrder::less_equal(&frontier, &expect_upper) {
1781                            // The frontier has already been advanced as far as necessary.
1782                            break;
1783                        }
1784
1785                        const EMPTY: &[((SourceData, ()), Timestamp, StorageDiff)] = &[];
1786                        match write_handle
1787                            .compare_and_append(EMPTY, expect_upper, frontier.clone())
1788                            .await
1789                            .expect("valid usage")
1790                        {
1791                            Ok(()) => break,
1792                            Err(mismatch) => {
1793                                expect_upper = mismatch.current;
1794                            }
1795                        }
1796                    }
1797                    write_frontier.borrow_mut().clone_from(&frontier);
1798
1799                }
1800            }
1801
1802
1803            Ok(())
1804        })
1805    });
1806
1807    let statuses = errors.map(|error| HealthStatusMessage {
1808        id: None,
1809        update: HealthStatusUpdate::halting(format!("{}", error.display_with_causes()), None),
1810        namespace: StatusNamespace::Iceberg,
1811    });
1812
1813    (statuses, button.press_on_drop())
1814}
1815
1816impl<G: Scope<Timestamp = Timestamp>> SinkRender<G> for IcebergSinkConnection {
1817    fn get_key_indices(&self) -> Option<&[usize]> {
1818        self.key_desc_and_indices
1819            .as_ref()
1820            .map(|(_, indices)| indices.as_slice())
1821    }
1822
1823    fn get_relation_key_indices(&self) -> Option<&[usize]> {
1824        self.relation_key_indices.as_deref()
1825    }
1826
1827    fn render_sink(
1828        &self,
1829        storage_state: &mut StorageState,
1830        sink: &StorageSinkDesc<CollectionMetadata, Timestamp>,
1831        sink_id: GlobalId,
1832        input: VecCollection<G, (Option<Row>, DiffPair<Row>), Diff>,
1833        _err_collection: VecCollection<G, DataflowError, Diff>,
1834    ) -> (StreamVec<G, HealthStatusMessage>, Vec<PressOnDropButton>) {
1835        let mut scope = input.scope();
1836
1837        let write_handle = {
1838            let persist = Arc::clone(&storage_state.persist_clients);
1839            let shard_meta = sink.to_storage_metadata.clone();
1840            async move {
1841                let client = persist.open(shard_meta.persist_location).await?;
1842                let handle = client
1843                    .open_writer(
1844                        shard_meta.data_shard,
1845                        Arc::new(shard_meta.relation_desc),
1846                        Arc::new(UnitSchema),
1847                        Diagnostics::from_purpose("sink handle"),
1848                    )
1849                    .await?;
1850                Ok(handle)
1851            }
1852        };
1853
1854        let write_frontier = Rc::new(RefCell::new(Antichain::from_elem(Timestamp::minimum())));
1855        storage_state
1856            .sink_write_frontiers
1857            .insert(sink_id, Rc::clone(&write_frontier));
1858
1859        let (arrow_schema_with_ids, iceberg_schema) =
1860            match relation_desc_to_iceberg_schema(&sink.from_desc) {
1861                Ok(schemas) => schemas,
1862                Err(err) => {
1863                    let error_stream = std::iter::once(HealthStatusMessage {
1864                        id: None,
1865                        update: HealthStatusUpdate::halting(
1866                            format!("{}", err.display_with_causes()),
1867                            None,
1868                        ),
1869                        namespace: StatusNamespace::Iceberg,
1870                    })
1871                    .to_stream(&mut scope);
1872                    return (error_stream, vec![]);
1873                }
1874            };
1875
1876        let metrics = Arc::new(
1877            storage_state
1878                .metrics
1879                .get_iceberg_sink_metrics(sink_id, scope.index()),
1880        );
1881
1882        let statistics = storage_state
1883            .aggregated_statistics
1884            .get_sink(&sink_id)
1885            .expect("statistics initialized")
1886            .clone();
1887
1888        let connection_for_minter = self.clone();
1889        let (minted_input, batch_descriptions, table_ready, mint_status, mint_button) =
1890            mint_batch_descriptions(
1891                format!("{sink_id}-iceberg-mint"),
1892                sink_id,
1893                input,
1894                sink,
1895                connection_for_minter,
1896                storage_state.storage_configuration.clone(),
1897                Arc::clone(&iceberg_schema),
1898            );
1899
1900        let connection_for_writer = self.clone();
1901        let (datafiles, write_status, write_button) = write_data_files(
1902            format!("{sink_id}-write-data-files"),
1903            minted_input,
1904            batch_descriptions.clone(),
1905            table_ready.clone(),
1906            sink.as_of.clone(),
1907            connection_for_writer,
1908            storage_state.storage_configuration.clone(),
1909            Arc::new(arrow_schema_with_ids.clone()),
1910            Arc::clone(&metrics),
1911            statistics.clone(),
1912        );
1913
1914        let connection_for_committer = self.clone();
1915        let (commit_status, commit_button) = commit_to_iceberg(
1916            format!("{sink_id}-commit-to-iceberg"),
1917            sink_id,
1918            sink.version,
1919            datafiles,
1920            batch_descriptions,
1921            table_ready,
1922            Rc::clone(&write_frontier),
1923            connection_for_committer,
1924            storage_state.storage_configuration.clone(),
1925            write_handle,
1926            Arc::clone(&metrics),
1927            statistics,
1928        );
1929
1930        let running_status = Some(HealthStatusMessage {
1931            id: None,
1932            update: HealthStatusUpdate::running(),
1933            namespace: StatusNamespace::Iceberg,
1934        })
1935        .to_stream(&mut scope);
1936
1937        let statuses =
1938            scope.concatenate([running_status, mint_status, write_status, commit_status]);
1939
1940        (statuses, vec![mint_button, write_button, commit_button])
1941    }
1942}