mz_storage/sink/
iceberg.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Iceberg sink implementation.
11//!
12//! This code renders a [`IcebergSinkConnection`] into a dataflow that writes
13//! data to an Iceberg table. The dataflow consists of three operators:
14//!
15//! ```text
16//!        ┏━━━━━━━━━━━━━━┓
17//!        ┃   persist    ┃
18//!        ┃    source    ┃
19//!        ┗━━━━━━┯━━━━━━━┛
20//!               │ row data, the input to this module
21//!               │
22//!        ┏━━━━━━v━━━━━━━┓
23//!        ┃     mint     ┃ (single worker)
24//!        ┃    batch     ┃ loads/creates the Iceberg table,
25//!        ┃ descriptions ┃ determines resume upper
26//!        ┗━━━┯━━━━━┯━━━━┛
27//!            │     │ batch descriptions (broadcast)
28//!       rows │     ├─────────────────────────┐
29//!            │     │                         │
30//!        ┏━━━v━━━━━v━━━━┓    ╭─────────────╮ │
31//!        ┃    write     ┃───>│ S3 / object │ │
32//!        ┃  data files  ┃    │   storage   │ │
33//!        ┗━━━━━━┯━━━━━━━┛    ╰─────────────╯ │
34//!               │ file metadata              │
35//!               │                            │
36//!        ┏━━━━━━v━━━━━━━━━━━━━━━━━━━━━━━━━━━━v┓
37//!        ┃           commit to               ┃ (single worker)
38//!        ┃             iceberg               ┃
39//!        ┗━━━━━━━━━━━━━━┯━━━━━━━━━━━━━━━━━━━━━┛
40//!                      │
41//!              ╭───────v───────╮
42//!              │ Iceberg table │
43//!              │  (snapshots)  │
44//!              ╰───────────────╯
45//! ```
46//! # Minting batch descriptions
47//! The "mint batch descriptions" operator is responsible for generating
48//! time-based batch boundaries that group writes into Iceberg snapshots.
49//! It maintains a sliding window of future batch descriptions so that
50//! writers can start processing data even while earlier batches are still being written.
51//! Knowing the batch boundaries ahead of time is important because we need to
52//! be able to make the claim that all data files written for a given batch
53//! include all data up to the upper `t` but not beyond it.
54//! This could be trivially achieved by waiting for all data to arrive up to a certain
55//! frontier, but that would prevent us from streaming writes out to object storage
56//! until the entire batch is complete, which would increase latency and reduce throughput.
57//!
58//! # Writing data files
59//! The "write data files" operator receives rows along with batch descriptions.
60//! It matches rows to batches by timestamp; if a batch description hasn't arrived yet,
61//! rows are stashed until it does. This allows batches to be minted ahead of data arrival.
62//! The operator uses an Iceberg `DeltaWriter` to write Parquet data files
63//! (and position delete files if necessary) to object storage.
64//! It outputs metadata about the written files along with their batch descriptions
65//! for the commit operator to consume.
66//!
67//! # Committing to Iceberg
68//! The "commit to iceberg" operator receives metadata about written data files
69//! along with their batch descriptions. It groups files by batch and creates
70//! Iceberg snapshots that include all files for each batch. It updates the Iceberg
71//! table's metadata to reflect the new snapshots, including updating the
72//! `mz-frontier` property to track progress.
73
74use std::cmp::Ordering;
75use std::collections::{BTreeMap, VecDeque};
76use std::{cell::RefCell, rc::Rc, sync::Arc};
77
78use anyhow::{Context, anyhow};
79use arrow::array::Int32Array;
80use arrow::datatypes::{DataType, Field};
81use arrow::{
82    array::RecordBatch, datatypes::Schema as ArrowSchema, datatypes::SchemaRef as ArrowSchemaRef,
83};
84use differential_dataflow::lattice::Lattice;
85use differential_dataflow::{AsCollection, Hashable, VecCollection};
86use futures::StreamExt;
87use iceberg::ErrorKind;
88use iceberg::arrow::{arrow_schema_to_schema, schema_to_arrow_schema};
89use iceberg::spec::{
90    DataFile, FormatVersion, Snapshot, StructType, read_data_files_from_avro,
91    write_data_files_to_avro,
92};
93use iceberg::spec::{Schema, SchemaRef};
94use iceberg::table::Table;
95use iceberg::transaction::{ApplyTransactionAction, Transaction};
96use iceberg::writer::base_writer::data_file_writer::DataFileWriter;
97use iceberg::writer::base_writer::data_file_writer::DataFileWriterBuilder;
98use iceberg::writer::base_writer::equality_delete_writer::{
99    EqualityDeleteFileWriter, EqualityDeleteFileWriterBuilder, EqualityDeleteWriterConfig,
100};
101use iceberg::writer::base_writer::position_delete_writer::PositionDeleteFileWriter;
102use iceberg::writer::base_writer::position_delete_writer::{
103    PositionDeleteFileWriterBuilder, PositionDeleteWriterConfig,
104};
105use iceberg::writer::combined_writer::delta_writer::{DeltaWriter, DeltaWriterBuilder};
106use iceberg::writer::file_writer::ParquetWriterBuilder;
107use iceberg::writer::file_writer::location_generator::{
108    DefaultFileNameGenerator, DefaultLocationGenerator,
109};
110use iceberg::writer::{IcebergWriter, IcebergWriterBuilder};
111use iceberg::{Catalog, NamespaceIdent, TableCreation, TableIdent};
112use itertools::Itertools;
113use mz_arrow_util::builder::ArrowBuilder;
114use mz_interchange::avro::DiffPair;
115use mz_ore::cast::CastFrom;
116use mz_ore::error::ErrorExt;
117use mz_ore::future::InTask;
118use mz_ore::result::ResultExt;
119use mz_ore::retry::{Retry, RetryResult};
120use mz_persist_client::Diagnostics;
121use mz_persist_client::write::WriteHandle;
122use mz_persist_types::codec_impls::UnitSchema;
123use mz_repr::{Diff, GlobalId, Row, Timestamp};
124use mz_storage_types::StorageDiff;
125use mz_storage_types::configuration::StorageConfiguration;
126use mz_storage_types::controller::CollectionMetadata;
127use mz_storage_types::errors::DataflowError;
128use mz_storage_types::sinks::{IcebergSinkConnection, StorageSinkDesc};
129use mz_storage_types::sources::SourceData;
130use mz_timely_util::antichain::AntichainExt;
131use mz_timely_util::builder_async::{Event, OperatorBuilder, PressOnDropButton};
132use parquet::arrow::PARQUET_FIELD_ID_META_KEY;
133use parquet::file::properties::WriterProperties;
134use serde::{Deserialize, Serialize};
135use timely::PartialOrder;
136use timely::container::CapacityContainerBuilder;
137use timely::dataflow::channels::pact::{Exchange, Pipeline};
138use timely::dataflow::operators::{Broadcast, CapabilitySet, Concatenate, Map, ToStream};
139use timely::dataflow::{Scope, Stream};
140use timely::progress::{Antichain, Timestamp as _};
141
142use crate::healthcheck::{HealthStatusMessage, HealthStatusUpdate, StatusNamespace};
143use crate::render::sinks::SinkRender;
144use crate::storage_state::StorageState;
145
146/// Set the default capacity for the array builders inside the ArrowBuilder. This is the
147/// number of items each builder can hold before it needs to allocate more memory.
148const DEFAULT_ARRAY_BUILDER_ITEM_CAPACITY: usize = 1024;
149/// Set the default buffer capacity for the string and binary array builders inside the
150/// ArrowBuilder. This is the number of bytes each builder can hold before it needs to allocate
151/// more memory.
152const DEFAULT_ARRAY_BUILDER_DATA_CAPACITY: usize = 1024;
153
154/// The prefix for Parquet files written by this sink.
155const PARQUET_FILE_PREFIX: &str = "mz_data";
156/// The number of batch descriptions to mint ahead of the observed frontier. This determines how
157/// many batches we have in-flight at any given time.
158const INITIAL_DESCRIPTIONS_TO_MINT: u64 = 3;
159
160type ParquetWriterType = ParquetWriterBuilder<DefaultLocationGenerator, DefaultFileNameGenerator>;
161type DeltaWriterType = DeltaWriter<
162    DataFileWriter<ParquetWriterType>,
163    PositionDeleteFileWriter<ParquetWriterType>,
164    EqualityDeleteFileWriter<ParquetWriterType>,
165>;
166
167/// Add Parquet field IDs to an Arrow schema. Iceberg requires field IDs in the
168/// Parquet metadata for schema evolution tracking.
169/// TODO: Support nested data types with proper field IDs.
170fn add_field_ids_to_arrow_schema(schema: ArrowSchema) -> ArrowSchema {
171    let fields: Vec<Field> = schema
172        .fields()
173        .iter()
174        .enumerate()
175        .map(|(i, field)| {
176            let mut metadata = field.metadata().clone();
177            metadata.insert(PARQUET_FIELD_ID_META_KEY.to_string(), (i + 1).to_string());
178            Field::new(field.name(), field.data_type().clone(), field.is_nullable())
179                .with_metadata(metadata)
180        })
181        .collect();
182
183    ArrowSchema::new(fields).with_metadata(schema.metadata().clone())
184}
185
186/// Merge Materialize extension metadata into Iceberg's Arrow schema.
187/// This uses Iceberg's data types (e.g. Utf8) and field IDs while preserving
188/// Materialize's extension names for ArrowBuilder compatibility.
189fn merge_materialize_metadata_into_iceberg_schema(
190    materialize_arrow_schema: &ArrowSchema,
191    iceberg_schema: &Schema,
192) -> anyhow::Result<ArrowSchema> {
193    // First, convert Iceberg schema to Arrow (this gives us the correct data types)
194    let iceberg_arrow_schema = schema_to_arrow_schema(iceberg_schema)
195        .context("Failed to convert Iceberg schema to Arrow schema")?;
196
197    // Now merge in the Materialize extension metadata
198    let fields: Vec<Field> = iceberg_arrow_schema
199        .fields()
200        .iter()
201        .map(|iceberg_field| {
202            // Find the corresponding Materialize field by name to get extension metadata
203            let mz_field = materialize_arrow_schema
204                .field_with_name(iceberg_field.name())
205                .with_context(|| {
206                    format!(
207                        "Field '{}' not found in Materialize schema",
208                        iceberg_field.name()
209                    )
210                })?;
211
212            // Start with Iceberg field's metadata (which includes field IDs)
213            let mut metadata = iceberg_field.metadata().clone();
214
215            // Add Materialize extension name
216            if let Some(extension_name) = mz_field.metadata().get("ARROW:extension:name") {
217                metadata.insert("ARROW:extension:name".to_string(), extension_name.clone());
218            }
219
220            // Use Iceberg's data type and field ID, but add Materialize extension metadata
221            Ok(Field::new(
222                iceberg_field.name(),
223                iceberg_field.data_type().clone(),
224                iceberg_field.is_nullable(),
225            )
226            .with_metadata(metadata))
227        })
228        .collect::<anyhow::Result<Vec<_>>>()?;
229
230    Ok(ArrowSchema::new(fields).with_metadata(iceberg_arrow_schema.metadata().clone()))
231}
232
233async fn reload_table(
234    catalog: &dyn Catalog,
235    namespace: String,
236    table_name: String,
237    current_table: Table,
238) -> anyhow::Result<Table> {
239    let namespace_ident = NamespaceIdent::new(namespace.clone());
240    let table_ident = TableIdent::new(namespace_ident, table_name.clone());
241    let current_schema = current_table.metadata().current_schema_id();
242    let current_partition_spec = current_table.metadata().default_partition_spec_id();
243
244    match catalog.load_table(&table_ident).await {
245        Ok(table) => {
246            let reloaded_schema = table.metadata().current_schema_id();
247            let reloaded_partition_spec = table.metadata().default_partition_spec_id();
248            if reloaded_schema != current_schema {
249                return Err(anyhow::anyhow!(
250                    "Iceberg table '{}' schema changed during operation but schema evolution isn't supported, expected schema ID {}, got {}",
251                    table_name,
252                    current_schema,
253                    reloaded_schema
254                ));
255            }
256
257            if reloaded_partition_spec != current_partition_spec {
258                return Err(anyhow::anyhow!(
259                    "Iceberg table '{}' partition spec changed during operation but partition spec evolution isn't supported, expected partition spec ID {}, got {}",
260                    table_name,
261                    current_partition_spec,
262                    reloaded_partition_spec
263                ));
264            }
265
266            Ok(table)
267        }
268        Err(err) => Err(err).context("Failed to reload Iceberg table"),
269    }
270}
271
272/// Load an existing Iceberg table or create it if it doesn't exist.
273async fn load_or_create_table(
274    catalog: &dyn Catalog,
275    namespace: String,
276    table_name: String,
277    schema: &Schema,
278) -> anyhow::Result<iceberg::table::Table> {
279    let namespace_ident = NamespaceIdent::new(namespace.clone());
280    let table_ident = TableIdent::new(namespace_ident.clone(), table_name.clone());
281
282    // Try to load the table first
283    match catalog.load_table(&table_ident).await {
284        Ok(table) => {
285            // Table exists, return it
286            // TODO: Add proper schema evolution/validation to ensure compatibility
287            Ok(table)
288        }
289        Err(err) => {
290            if matches!(err.kind(), ErrorKind::TableNotFound { .. }) {
291                // Table doesn't exist, create it
292                // Note: location is not specified, letting the catalog determine the default location
293                // based on its warehouse configuration
294                let table_creation = TableCreation::builder()
295                    .name(table_name.clone())
296                    .schema(schema.clone())
297                    // Use unpartitioned spec by default
298                    // TODO: Consider making partition spec configurable
299                    // .partition_spec(UnboundPartitionSpec::builder().build())
300                    .build();
301
302                catalog
303                    .create_table(&namespace_ident, table_creation)
304                    .await
305                    .with_context(|| {
306                        format!(
307                            "Failed to create Iceberg table '{}' in namespace '{}'",
308                            table_name, namespace
309                        )
310                    })
311            } else {
312                // Some other error occurred
313                Err(err).context("Failed to load Iceberg table")
314            }
315        }
316    }
317}
318
319/// Find the most recent Materialize frontier from Iceberg snapshots.
320/// We store the frontier in snapshot metadata to track where we left off after restarts.
321/// Snapshots with operation="replace" (compactions) don't have our metadata and are skipped.
322/// The input slice will be sorted by sequence number in descending order.
323fn retrieve_upper_from_snapshots(
324    snapshots: &mut [Arc<Snapshot>],
325) -> anyhow::Result<Option<(Antichain<Timestamp>, u64)>> {
326    snapshots.sort_by(|a, b| Ord::cmp(&b.sequence_number(), &a.sequence_number()));
327
328    for snapshot in snapshots {
329        let props = &snapshot.summary().additional_properties;
330        if let (Some(frontier_json), Some(sink_version_str)) =
331            (props.get("mz-frontier"), props.get("mz-sink-version"))
332        {
333            let frontier: Vec<Timestamp> = serde_json::from_str(frontier_json)
334                .context("Failed to deserialize frontier from snapshot properties")?;
335            let frontier = Antichain::from_iter(frontier);
336
337            let sink_version = sink_version_str
338                .parse::<u64>()
339                .context("Failed to parse mz-sink-version from snapshot properties")?;
340
341            return Ok(Some((frontier, sink_version)));
342        }
343        if snapshot.summary().operation.as_str() != "replace" {
344            // This is a bad heuristic, but we have no real other way to identify compactions
345            // right now other than assume they will be the only operation writing "replace" operations.
346            // That means if we find a snapshot with some other operation, but no mz-frontier, we are in an
347            // inconsistent state and have to error out.
348            anyhow::bail!(
349                "Iceberg table is in an inconsistent state: snapshot {} has operation '{}' but is missing 'mz-frontier' property. Schema or partition spec evolution is not supported.",
350                snapshot.snapshot_id(),
351                snapshot.summary().operation.as_str(),
352            );
353        }
354    }
355
356    Ok(None)
357}
358
359/// Convert a Materialize RelationDesc into Arrow and Iceberg schemas.
360/// Returns the Arrow schema (with field IDs) for writing Parquet files,
361/// and the Iceberg schema for table creation/validation.
362fn relation_desc_to_iceberg_schema(
363    desc: &mz_repr::RelationDesc,
364) -> anyhow::Result<(ArrowSchema, SchemaRef)> {
365    let arrow_schema = mz_arrow_util::builder::desc_to_schema(desc)
366        .context("Failed to convert RelationDesc to Arrow schema")?;
367
368    let arrow_schema_with_ids = add_field_ids_to_arrow_schema(arrow_schema);
369
370    let iceberg_schema = arrow_schema_to_schema(&arrow_schema_with_ids)
371        .context("Failed to convert Arrow schema to Iceberg schema")?;
372
373    Ok((arrow_schema_with_ids, Arc::new(iceberg_schema)))
374}
375
376/// Build a new Arrow schema by adding an __op column to the existing schema.
377fn build_schema_with_op_column(schema: &ArrowSchema) -> ArrowSchema {
378    let mut fields: Vec<Field> = schema.fields().iter().map(|f| f.as_ref().clone()).collect();
379    fields.push(Field::new("__op", DataType::Int32, false));
380    ArrowSchema::new(fields)
381}
382
383/// Convert a Materialize DiffPair into an Arrow RecordBatch with an __op column.
384/// The __op column indicates whether each row is an insert (1) or delete (-1), which
385/// the DeltaWriter uses to generate the appropriate Iceberg data/delete files.
386fn row_to_recordbatch(
387    row: DiffPair<Row>,
388    schema: ArrowSchemaRef,
389    schema_with_op: ArrowSchemaRef,
390) -> anyhow::Result<RecordBatch> {
391    let mut builder = ArrowBuilder::new_with_schema(
392        Arc::clone(&schema),
393        DEFAULT_ARRAY_BUILDER_ITEM_CAPACITY,
394        DEFAULT_ARRAY_BUILDER_DATA_CAPACITY,
395    )
396    .context("Failed to create builder")?;
397
398    let mut op_values = Vec::new();
399
400    if let Some(before) = row.before {
401        builder
402            .add_row(&before)
403            .context("Failed to add delete row to builder")?;
404        op_values.push(-1i32); // Delete operation
405    }
406    if let Some(after) = row.after {
407        builder
408            .add_row(&after)
409            .context("Failed to add insert row to builder")?;
410        op_values.push(1i32); // Insert operation
411    }
412
413    let batch = builder
414        .to_record_batch()
415        .context("Failed to create record batch")?;
416
417    let mut columns = Vec::with_capacity(batch.columns().len() + 1);
418    columns.extend_from_slice(batch.columns());
419    let op_column = Arc::new(Int32Array::from(op_values));
420    columns.push(op_column);
421
422    let batch_with_op = RecordBatch::try_new(schema_with_op, columns)
423        .context("Failed to create batch with op column")?;
424    Ok(batch_with_op)
425}
426
427/// Generate time-based batch boundaries for grouping writes into Iceberg snapshots.
428/// Batches are minted with configurable windows to balance write efficiency with latency.
429/// We maintain a sliding window of future batch descriptions so writers can start
430/// processing data even while earlier batches are still being written.
431fn mint_batch_descriptions<G, D>(
432    name: String,
433    sink_id: GlobalId,
434    input: &VecCollection<G, D, Diff>,
435    sink: &StorageSinkDesc<CollectionMetadata, Timestamp>,
436    connection: IcebergSinkConnection,
437    storage_configuration: StorageConfiguration,
438    initial_schema: SchemaRef,
439) -> (
440    VecCollection<G, D, Diff>,
441    Stream<G, (Antichain<Timestamp>, Antichain<Timestamp>)>,
442    Stream<G, HealthStatusMessage>,
443    PressOnDropButton,
444)
445where
446    G: Scope<Timestamp = Timestamp>,
447    D: Clone + 'static,
448{
449    let scope = input.scope();
450    let name_for_error = name.clone();
451    let mut builder = OperatorBuilder::new(name, scope.clone());
452    let sink_version = sink.version;
453
454    let hashed_id = sink_id.hashed();
455    let is_active_worker = usize::cast_from(hashed_id) % scope.peers() == scope.index();
456    let (output, output_stream) = builder.new_output();
457    let (batch_desc_output, batch_desc_stream) =
458        builder.new_output::<CapacityContainerBuilder<_>>();
459    let mut input =
460        builder.new_input_for_many(&input.inner, Pipeline, [&output, &batch_desc_output]);
461
462    let as_of = sink.as_of.clone();
463    let commit_interval = sink
464        .commit_interval
465        .expect("the planner should have enforced this")
466        .clone();
467
468    let (button, errors): (_, Stream<G, Rc<anyhow::Error>>) = builder.build_fallible(move |caps| {
469        Box::pin(async move {
470            let [data_capset, capset]: &mut [_; 2] = caps.try_into().unwrap();
471            *data_capset = CapabilitySet::new();
472
473            if !is_active_worker {
474                *capset = CapabilitySet::new();
475                *data_capset = CapabilitySet::new();
476                while let Some(event) = input.next().await {
477                    match event {
478                        Event::Data([output_cap, _], mut data) => {
479                            output.give_container(&output_cap, &mut data);
480                        }
481                        Event::Progress(_) => {}
482                    }
483                }
484                return Ok(());
485            }
486
487            let catalog = connection
488                .catalog_connection
489                .connect(&storage_configuration, InTask::Yes)
490                .await
491                .context("Failed to connect to iceberg catalog")?;
492
493            let table = load_or_create_table(
494                catalog.as_ref(),
495                connection.namespace.clone(),
496                connection.table.clone(),
497                initial_schema.as_ref(),
498            )
499            .await?;
500
501            let mut snapshots: Vec<_> = table.metadata().snapshots().cloned().collect();
502            let resume = retrieve_upper_from_snapshots(&mut snapshots)?;
503            let (resume_upper, resume_version) = match resume {
504                Some((f, v)) => (f, v),
505                None => (Antichain::from_elem(Timestamp::minimum()), 0),
506            };
507
508            // The input has overcompacted if
509            let overcompacted =
510                // ..we have made some progress in the past
511                *resume_upper != [Timestamp::minimum()] &&
512                // ..but the since frontier is now beyond that
513                PartialOrder::less_than(&resume_upper, &as_of);
514
515            if overcompacted {
516                let err = format!(
517                    "{name_for_error}: input compacted past resume upper: as_of {}, resume_upper: {}",
518                    as_of.pretty(),
519                    resume_upper.pretty()
520                );
521                // This would normally be an assertion but because it can happen after a
522                // Materialize backup/restore we log an error so that it appears on Sentry but
523                // leaves the rest of the objects in the cluster unaffected.
524                return Err(anyhow::anyhow!("{err}"));
525            };
526
527            if resume_version > sink_version {
528                anyhow::bail!("Fenced off by newer sink version: resume_version {}, sink_version {}", resume_version, sink_version);
529            }
530
531            let mut initialized = false;
532            let mut observed_frontier;
533            // Track minted batches to maintain a sliding window of open batch descriptions.
534            // This is needed to know when to retire old batches and mint new ones.
535            // It's "sortedness" is derived from the monotonicity of batch descriptions,
536            // and the fact that we only ever push new descriptions to the back and pop from the front.
537            let mut minted_batches = VecDeque::new();
538            loop {
539                if let Some(event) = input.next().await {
540                    match event {
541                        Event::Data([output_cap, _], mut data) => {
542                            output.give_container(&output_cap, &mut data);
543                            continue;
544                        }
545                        Event::Progress(frontier) => {
546                            observed_frontier = frontier;
547                        }
548                    }
549                } else {
550                    return Ok(());
551                }
552
553                if !initialized {
554                    // We only start minting after we've reached as_of and resume_upper to avoid
555                    // minting batches that would be immediately skipped.
556                    if PartialOrder::less_equal(&observed_frontier, &resume_upper) || PartialOrder::less_equal(&observed_frontier, &as_of) {
557                        continue;
558                    }
559
560                    let mut batch_descriptions = vec![];
561                    let mut current_upper = observed_frontier.clone();
562                    let current_upper_ts = current_upper.as_option().unwrap().clone();
563
564                    // If we're resuming, create a catch-up batch from resume_upper to current frontier
565                    if PartialOrder::less_than(&resume_upper, &current_upper) {
566                        let batch_description = (resume_upper.clone(), current_upper.clone());
567                        batch_descriptions.push(batch_description);
568                    }
569
570                    // Mint initial future batch descriptions at configurable intervals
571                    for i in 1..INITIAL_DESCRIPTIONS_TO_MINT + 1 {
572                        let duration_millis = commit_interval.as_millis()
573                            .checked_mul(u128::from(i))
574                            .expect("commit interval multiplication overflow");
575                        let duration_ts = Timestamp::new(u64::try_from(duration_millis)
576                            .expect("commit interval too large for u64"));
577                        let desired_batch_upper = Antichain::from_elem(current_upper_ts.step_forward_by(&duration_ts));
578
579                        let batch_description = (current_upper.clone(), desired_batch_upper);
580                        current_upper = batch_description.1.clone();
581                        batch_descriptions.push(batch_description);
582                    }
583
584                    minted_batches.extend(batch_descriptions.clone());
585
586                    for desc in batch_descriptions {
587                        batch_desc_output.give(&capset[0], desc);
588                    }
589
590                    capset.downgrade(current_upper);
591
592                    initialized = true;
593                } else {
594                    // Maintain a sliding window: when the oldest batch becomes ready, retire it
595                    // and mint a new future batch to keep the pipeline full
596                    while let Some(oldest_desc) = minted_batches.front() {
597                        let oldest_upper = &oldest_desc.1;
598                        if !PartialOrder::less_equal(oldest_upper, &observed_frontier) {
599                            break;
600                        }
601
602                        let newest_upper = minted_batches.back().unwrap().1.clone();
603                        let new_lower = newest_upper.clone();
604                        let duration_ts = Timestamp::new(commit_interval.as_millis()
605                            .try_into()
606                            .expect("commit interval too large for u64"));
607                        let new_upper = Antichain::from_elem(newest_upper
608                            .as_option()
609                            .unwrap()
610                            .step_forward_by(&duration_ts));
611
612                        let new_batch_description = (new_lower.clone(), new_upper.clone());
613                        minted_batches.pop_front();
614                        minted_batches.push_back(new_batch_description.clone());
615
616                        batch_desc_output.give(&capset[0], new_batch_description);
617
618                        capset.downgrade(new_upper);
619                    }
620                }
621            }
622        })
623    });
624
625    let statuses = errors.map(|error| HealthStatusMessage {
626        id: None,
627        update: HealthStatusUpdate::halting(format!("{}", error.display_with_causes()), None),
628        namespace: StatusNamespace::Iceberg,
629    });
630    (
631        output_stream.as_collection(),
632        batch_desc_stream,
633        statuses,
634        button.press_on_drop(),
635    )
636}
637
638#[derive(Clone, Debug, Serialize, Deserialize)]
639#[serde(try_from = "AvroDataFile", into = "AvroDataFile")]
640struct SerializableDataFile {
641    pub data_file: DataFile,
642    pub schema: Schema,
643}
644
645/// A wrapper around Iceberg's DataFile that implements Serialize and Deserialize.
646/// This is slightly complicated by the fact that Iceberg's DataFile doesn't implement
647/// these traits directly, so we serialize to/from Avro bytes (which Iceberg supports natively).
648/// The avro ser(de) also requires the Iceberg schema to be provided, so we include that as well.
649/// It is distinctly possible that this is overkill, but it avoids re-implementing
650/// Iceberg's serialization logic here.
651/// If at some point this becomes a serious overhead, we can revisit this decision.
652#[derive(Clone, Debug, Serialize, Deserialize)]
653struct AvroDataFile {
654    pub data_file: Vec<u8>,
655    pub schema: Schema,
656}
657
658impl From<SerializableDataFile> for AvroDataFile {
659    fn from(value: SerializableDataFile) -> Self {
660        let mut data_file = Vec::new();
661        write_data_files_to_avro(
662            &mut data_file,
663            [value.data_file],
664            &StructType::new(vec![]),
665            FormatVersion::V2,
666        )
667        .expect("serialization into buffer");
668        AvroDataFile {
669            data_file,
670            schema: value.schema,
671        }
672    }
673}
674
675impl TryFrom<AvroDataFile> for SerializableDataFile {
676    type Error = String;
677
678    fn try_from(value: AvroDataFile) -> Result<Self, Self::Error> {
679        let data_files = read_data_files_from_avro(
680            &mut &*value.data_file,
681            &value.schema,
682            0,
683            &StructType::new(vec![]),
684            FormatVersion::V2,
685        )
686        .map_err_to_string_with_causes()?;
687        let Some(data_file) = data_files.into_iter().next() else {
688            return Err("No DataFile found in Avro data".into());
689        };
690        Ok(SerializableDataFile {
691            data_file,
692            schema: value.schema,
693        })
694    }
695}
696
697/// A DataFile along with its associated batch description (lower and upper bounds).
698#[derive(Clone, Debug, Serialize, Deserialize)]
699struct BoundedDataFile {
700    pub data_file: SerializableDataFile,
701    pub batch_desc: (Antichain<Timestamp>, Antichain<Timestamp>),
702}
703
704impl BoundedDataFile {
705    pub fn new(
706        file: DataFile,
707        schema: Schema,
708        batch_desc: (Antichain<Timestamp>, Antichain<Timestamp>),
709    ) -> Self {
710        Self {
711            data_file: SerializableDataFile {
712                data_file: file,
713                schema,
714            },
715            batch_desc,
716        }
717    }
718
719    pub fn batch_desc(&self) -> &(Antichain<Timestamp>, Antichain<Timestamp>) {
720        &self.batch_desc
721    }
722
723    pub fn data_file(&self) -> &DataFile {
724        &self.data_file.data_file
725    }
726
727    pub fn into_data_file(self) -> DataFile {
728        self.data_file.data_file
729    }
730}
731
732/// A set of DataFiles along with their associated batch descriptions.
733#[derive(Clone, Debug, Default)]
734struct BoundedDataFileSet {
735    pub data_files: Vec<BoundedDataFile>,
736}
737
738/// Write rows into Parquet data files bounded by batch descriptions.
739/// Rows are matched to batches by timestamp; if a batch description hasn't arrived yet,
740/// rows are stashed until it does. This allows batches to be minted ahead of data arrival.
741fn write_data_files<G>(
742    name: String,
743    input: VecCollection<G, (Option<Row>, DiffPair<Row>), Diff>,
744    batch_desc_input: &Stream<G, (Antichain<Timestamp>, Antichain<Timestamp>)>,
745    connection: IcebergSinkConnection,
746    storage_configuration: StorageConfiguration,
747    materialize_arrow_schema: Arc<ArrowSchema>,
748) -> (
749    Stream<G, BoundedDataFile>,
750    Stream<G, HealthStatusMessage>,
751    PressOnDropButton,
752)
753where
754    G: Scope<Timestamp = Timestamp>,
755{
756    let scope = input.scope();
757    let mut builder = OperatorBuilder::new(name, scope.clone());
758
759    let (output, output_stream) = builder.new_output::<CapacityContainerBuilder<_>>();
760
761    let mut batch_desc_input =
762        builder.new_input_for(&batch_desc_input.broadcast(), Pipeline, &output);
763    let mut input = builder.new_disconnected_input(&input.inner, Pipeline);
764
765    let (button, errors) = builder.build_fallible(|caps| {
766        Box::pin(async move {
767            let [capset]: &mut [_; 1] = caps.try_into().unwrap();
768            let catalog = connection
769                .catalog_connection
770                .connect(&storage_configuration, InTask::Yes)
771                .await
772                .context("Failed to connect to iceberg catalog")?;
773
774            let namespace_ident = NamespaceIdent::new(connection.namespace.clone());
775            let table_ident = TableIdent::new(namespace_ident, connection.table.clone());
776            let table = catalog
777                .load_table(&table_ident)
778                .await
779                .context("Failed to load Iceberg table")?;
780
781            let table_metadata = table.metadata().clone();
782            let current_schema = Arc::clone(table_metadata.current_schema());
783
784            let arrow_schema = Arc::new(
785                merge_materialize_metadata_into_iceberg_schema(
786                    materialize_arrow_schema.as_ref(),
787                    current_schema.as_ref(),
788                )
789                .context("Failed to merge Materialize metadata into Iceberg schema")?,
790            );
791
792            let schema_with_op = Arc::new(build_schema_with_op_column(&arrow_schema));
793
794            // WORKAROUND: S3 Tables catalog incorrectly sets location to the metadata file path
795            // instead of the warehouse root. Strip off the /metadata/*.metadata.json suffix.
796            // No clear way to detect this properly right now, so we use heuristics.
797            let location = table_metadata.location();
798            let corrected_location = match location.rsplit_once("/metadata/") {
799                Some((a, b)) if b.ends_with(".metadata.json") => a,
800                _ => location,
801            };
802
803            let data_location = format!("{}/data", corrected_location);
804            let location_generator = DefaultLocationGenerator::with_data_location(data_location);
805
806            // Add a unique suffix to avoid filename collisions across restarts and workers
807            let unique_suffix = format!("-{}", uuid::Uuid::new_v4());
808            let file_name_generator = DefaultFileNameGenerator::new(
809                PARQUET_FILE_PREFIX.to_string(),
810                Some(unique_suffix),
811                iceberg::spec::DataFileFormat::Parquet,
812            );
813
814            let file_io = table.file_io().clone();
815
816            let Some((_, equality_indices)) = connection.key_desc_and_indices else {
817                return Err(anyhow::anyhow!(
818                    "Iceberg sink requires key columns for equality deletes"
819                ));
820            };
821
822            let equality_ids: Vec<i32> = equality_indices
823                .iter()
824                .map(|u| i32::try_from(*u).map(|v| v + 1))
825                .collect::<Result<Vec<i32>, _>>()
826                .context("Failed to convert equality index to i32 (index too large)")?;
827
828            let writer_properties = WriterProperties::new();
829
830            let create_delta_writer = || async {
831                let data_parquet_writer = ParquetWriterBuilder::new(
832                    writer_properties.clone(),
833                    Arc::clone(&current_schema),
834                    None,
835                    file_io.clone(),
836                    location_generator.clone(),
837                    file_name_generator.clone(),
838                );
839                let data_writer_builder = DataFileWriterBuilder::new(data_parquet_writer, None, 0);
840
841                let pos_arrow_schema = PositionDeleteWriterConfig::arrow_schema();
842                let pos_schema =
843                    Arc::new(arrow_schema_to_schema(&pos_arrow_schema).context(
844                        "Failed to convert position delete Arrow schema to Iceberg schema",
845                    )?);
846                let pos_config = PositionDeleteWriterConfig::new(None, 0, None);
847                let pos_parquet_writer = ParquetWriterBuilder::new(
848                    writer_properties.clone(),
849                    pos_schema,
850                    None,
851                    file_io.clone(),
852                    location_generator.clone(),
853                    file_name_generator.clone(),
854                );
855                let pos_delete_writer_builder =
856                    PositionDeleteFileWriterBuilder::new(pos_parquet_writer, pos_config);
857
858                let eq_config = EqualityDeleteWriterConfig::new(
859                    equality_ids.clone(),
860                    Arc::clone(&current_schema),
861                    None,
862                    0,
863                )
864                .context("Failed to create EqualityDeleteWriterConfig")?;
865
866                let eq_schema = Arc::new(
867                    arrow_schema_to_schema(eq_config.projected_arrow_schema_ref()).context(
868                        "Failed to convert equality delete Arrow schema to Iceberg schema",
869                    )?,
870                );
871
872                let eq_parquet_writer = ParquetWriterBuilder::new(
873                    writer_properties.clone(),
874                    eq_schema,
875                    None,
876                    file_io.clone(),
877                    location_generator.clone(),
878                    file_name_generator.clone(),
879                );
880                let eq_delete_writer_builder =
881                    EqualityDeleteFileWriterBuilder::new(eq_parquet_writer, eq_config);
882
883                DeltaWriterBuilder::new(
884                    data_writer_builder,
885                    pos_delete_writer_builder,
886                    eq_delete_writer_builder,
887                    equality_ids.clone(),
888                )
889                .build()
890                .await
891                .context("Failed to create DeltaWriter")
892            };
893
894            // Rows can arrive before their batch description due to dataflow parallelism.
895            // Stash them until we know which batch they belong to.
896            let mut stashed_rows: BTreeMap<Timestamp, Vec<(Option<Row>, DiffPair<Row>)>> =
897                BTreeMap::new();
898
899            // Track batches currently being written. When a row arrives, we check if it belongs
900            // to an in-flight batch. When frontiers advance past a batch's upper, we close the
901            // writer and emit its data files downstream.
902            // Antichains don't implement Ord, so we use a HashMap with tuple keys instead.
903            #[allow(clippy::disallowed_types)]
904            let mut in_flight_batches: std::collections::HashMap<
905                (Antichain<Timestamp>, Antichain<Timestamp>),
906                DeltaWriterType,
907            > = std::collections::HashMap::new();
908
909            let mut batch_description_frontier = Antichain::from_elem(Timestamp::minimum());
910            let mut processed_batch_description_frontier =
911                Antichain::from_elem(Timestamp::minimum());
912            let mut input_frontier = Antichain::from_elem(Timestamp::minimum());
913            let mut processed_input_frontier = Antichain::from_elem(Timestamp::minimum());
914
915            while !(batch_description_frontier.is_empty() && input_frontier.is_empty()) {
916                tokio::select! {
917                    _ = batch_desc_input.ready() => {},
918                    _ = input.ready() => {}
919                }
920
921                while let Some(event) = batch_desc_input.next_sync() {
922                    match event {
923                        Event::Data(_cap, data) => {
924                            for batch_desc in data {
925                                let (lower, upper) = &batch_desc;
926                                let mut delta_writer = create_delta_writer().await?;
927                                // Drain any stashed rows that belong to this batch
928                                let row_ts_keys: Vec<_> = stashed_rows.keys().cloned().collect();
929                                for row_ts in row_ts_keys {
930                                    let ts = Antichain::from_elem(row_ts.clone());
931                                    if PartialOrder::less_equal(lower, &ts)
932                                        && PartialOrder::less_than(&ts, upper)
933                                    {
934                                        if let Some(rows) = stashed_rows.remove(&row_ts) {
935                                            for (_row, diff_pair) in rows {
936                                                let record_batch = row_to_recordbatch(
937                                                    diff_pair.clone(),
938                                                    Arc::clone(&arrow_schema),
939                                                    Arc::clone(&schema_with_op),
940                                                )
941                                                .context("failed to convert row to recordbatch")?;
942                                                delta_writer.write(record_batch).await.context(
943                                                    "Failed to write row to DeltaWriter",
944                                                )?;
945                                            }
946                                        }
947                                    }
948                                }
949                                let prev =
950                                    in_flight_batches.insert(batch_desc.clone(), delta_writer);
951                                if prev.is_some() {
952                                    anyhow::bail!(
953                                        "Duplicate batch description received for description {:?}",
954                                        batch_desc
955                                    );
956                                }
957                            }
958                        }
959                        Event::Progress(frontier) => {
960                            batch_description_frontier = frontier;
961                        }
962                    }
963                }
964
965                let ready_events = std::iter::from_fn(|| input.next_sync()).collect_vec();
966                for event in ready_events {
967                    match event {
968                        Event::Data(_cap, data) => {
969                            for ((row, diff_pair), ts, _diff) in data {
970                                let row_ts = ts.clone();
971                                let ts_antichain = Antichain::from_elem(row_ts.clone());
972                                let mut written = false;
973                                // Try writing the row to any in-flight batch it belongs to...
974                                for (batch_desc, delta_writer) in in_flight_batches.iter_mut() {
975                                    let (lower, upper) = batch_desc;
976                                    if PartialOrder::less_equal(lower, &ts_antichain)
977                                        && PartialOrder::less_than(&ts_antichain, upper)
978                                    {
979                                        let record_batch = row_to_recordbatch(
980                                            diff_pair.clone(),
981                                            Arc::clone(&arrow_schema),
982                                            Arc::clone(&schema_with_op),
983                                        )
984                                        .context("failed to convert row to recordbatch")?;
985                                        delta_writer
986                                            .write(record_batch)
987                                            .await
988                                            .context("Failed to write row to DeltaWriter")?;
989                                        written = true;
990                                        break;
991                                    }
992                                }
993                                if !written {
994                                    // ...otherwise stash it for later
995                                    let entry = stashed_rows.entry(row_ts).or_default();
996                                    entry.push((row, diff_pair));
997                                }
998                            }
999                        }
1000                        Event::Progress(frontier) => {
1001                            input_frontier = frontier;
1002                        }
1003                    }
1004                }
1005
1006                // Check if frontiers have advanced, which may unlock batches ready to close
1007                if PartialOrder::less_equal(
1008                    &processed_batch_description_frontier,
1009                    &batch_description_frontier,
1010                ) || PartialOrder::less_than(&processed_input_frontier, &input_frontier)
1011                {
1012                    // Close batches whose upper is now in the past
1013                    let ready_batches: Vec<_> = in_flight_batches
1014                        .extract_if(|(lower, upper), _| {
1015                            PartialOrder::less_than(lower, &batch_description_frontier)
1016                                && PartialOrder::less_than(upper, &input_frontier)
1017                        })
1018                        .collect();
1019
1020                    if !ready_batches.is_empty() {
1021                        let mut max_upper = Antichain::from_elem(Timestamp::minimum());
1022                        for (desc, mut delta_writer) in ready_batches {
1023                            let data_files = delta_writer
1024                                .close()
1025                                .await
1026                                .context("Failed to close DeltaWriter")?;
1027                            for data_file in data_files {
1028                                let file = BoundedDataFile::new(
1029                                    data_file,
1030                                    current_schema.as_ref().clone(),
1031                                    desc.clone(),
1032                                );
1033                                output.give(&capset[0], file);
1034                            }
1035
1036                            max_upper = max_upper.join(&desc.1);
1037                        }
1038
1039                        capset.downgrade(max_upper);
1040                    }
1041                    processed_batch_description_frontier.clone_from(&batch_description_frontier);
1042                    processed_input_frontier.clone_from(&input_frontier);
1043                }
1044            }
1045            Ok(())
1046        })
1047    });
1048
1049    let statuses = errors.map(|error| HealthStatusMessage {
1050        id: None,
1051        update: HealthStatusUpdate::halting(format!("{}", error.display_with_causes()), None),
1052        namespace: StatusNamespace::Iceberg,
1053    });
1054    (output_stream, statuses, button.press_on_drop())
1055}
1056
1057/// Commit completed batches to Iceberg as snapshots.
1058/// Batches are committed in timestamp order to ensure strong consistency guarantees downstream.
1059/// Each snapshot includes the Materialize frontier in its metadata for resume support.
1060fn commit_to_iceberg<G>(
1061    name: String,
1062    sink_id: GlobalId,
1063    sink_version: u64,
1064    batch_input: &Stream<G, BoundedDataFile>,
1065    batch_desc_input: &Stream<G, (Antichain<Timestamp>, Antichain<Timestamp>)>,
1066    write_frontier: Rc<RefCell<Antichain<Timestamp>>>,
1067    connection: IcebergSinkConnection,
1068    storage_configuration: StorageConfiguration,
1069    write_handle: impl Future<
1070        Output = anyhow::Result<WriteHandle<SourceData, (), Timestamp, StorageDiff>>,
1071    > + 'static,
1072) -> (Stream<G, HealthStatusMessage>, PressOnDropButton)
1073where
1074    G: Scope<Timestamp = Timestamp>,
1075{
1076    let scope = batch_input.scope();
1077    let mut builder = OperatorBuilder::new(name, scope.clone());
1078
1079    let hashed_id = sink_id.hashed();
1080    let is_active_worker = usize::cast_from(hashed_id) % scope.peers() == scope.index();
1081
1082    let mut input = builder.new_disconnected_input(batch_input, Exchange::new(move |_| hashed_id));
1083    let mut batch_desc_input =
1084        builder.new_disconnected_input(batch_desc_input, Exchange::new(move |_| hashed_id));
1085
1086    let (button, errors) = builder.build_fallible(move |_caps| {
1087        Box::pin(async move {
1088            if !is_active_worker {
1089                write_frontier.borrow_mut().clear();
1090                return Ok(());
1091            }
1092
1093            let catalog = connection
1094                .catalog_connection
1095                .connect(&storage_configuration, InTask::Yes)
1096                .await
1097                .context("Failed to connect to iceberg catalog")?;
1098
1099            let mut write_handle = write_handle.await?;
1100
1101            let namespace_ident = NamespaceIdent::new(connection.namespace.clone());
1102            let table_ident = TableIdent::new(namespace_ident, connection.table.clone());
1103            let mut table = catalog
1104                .load_table(&table_ident)
1105                .await
1106                .context("Failed to load Iceberg table")?;
1107
1108            #[allow(clippy::disallowed_types)]
1109            let mut batch_descriptions: std::collections::HashMap<
1110                (Antichain<Timestamp>, Antichain<Timestamp>),
1111                BoundedDataFileSet,
1112            > = std::collections::HashMap::new();
1113
1114
1115            let mut batch_description_frontier = Antichain::from_elem(Timestamp::minimum());
1116            let mut input_frontier = Antichain::from_elem(Timestamp::minimum());
1117
1118            while !(batch_description_frontier.is_empty() && input_frontier.is_empty()) {
1119                tokio::select! {
1120                    _ = batch_desc_input.ready() => {},
1121                    _ = input.ready() => {}
1122                }
1123
1124                while let Some(event) = batch_desc_input.next_sync() {
1125                    match event {
1126                        Event::Data(_cap, data) => {
1127                            for batch_desc in data {
1128                                let prev = batch_descriptions.insert(batch_desc, BoundedDataFileSet { data_files: vec![] });
1129                                if let Some(prev) = prev {
1130                                    anyhow::bail!(
1131                                        "Duplicate batch description received in commit operator: {:?}",
1132                                        prev
1133                                    );
1134                                }
1135                            }
1136                        }
1137                        Event::Progress(frontier) => {
1138                            batch_description_frontier = frontier;
1139                        }
1140                    }
1141                }
1142
1143                let ready_events = std::iter::from_fn(|| input.next_sync()).collect_vec();
1144                for event in ready_events {
1145                    match event {
1146                        Event::Data(_cap, data) => {
1147                            for bounded_data_file in data {
1148                                let entry = batch_descriptions.entry(bounded_data_file.batch_desc().clone()).or_default();
1149                                entry.data_files.push(bounded_data_file);
1150                            }
1151                        }
1152                        Event::Progress(frontier) => {
1153                            input_frontier = frontier;
1154                        }
1155                    }
1156                }
1157
1158                let mut done_batches: Vec<_> = batch_descriptions
1159                    .keys()
1160                    .filter(|(lower, _upper)| {
1161                        PartialOrder::less_than(lower, &input_frontier)
1162                    })
1163                    .cloned()
1164                    .collect();
1165
1166                // Commit batches in timestamp order to maintain consistency
1167                done_batches.sort_by(|a, b| {
1168                    if PartialOrder::less_than(a, b) {
1169                        Ordering::Less
1170                    } else if PartialOrder::less_than(b, a) {
1171                        Ordering::Greater
1172                    } else {
1173                        Ordering::Equal
1174                    }
1175                });
1176
1177                for batch in done_batches {
1178                    let file_set = batch_descriptions.remove(&batch).unwrap();
1179
1180                    let mut data_files = vec![];
1181                    let mut delete_files = vec![];
1182                    for file in file_set.data_files {
1183                        match file.data_file().content_type() {
1184                            iceberg::spec::DataContentType::Data => {
1185                                data_files.push(file.into_data_file());
1186                            }
1187                            iceberg::spec::DataContentType::PositionDeletes |
1188                            iceberg::spec::DataContentType::EqualityDeletes => {
1189                                delete_files.push(file.into_data_file());
1190                            }
1191                        }
1192                    }
1193
1194                    let frontier = batch.1.clone();
1195                    let tx = Transaction::new(&table);
1196
1197                    let frontier_json = serde_json::to_string(&frontier.elements())
1198                        .context("Failed to serialize frontier to JSON")?;
1199
1200                    // Store the frontier in snapshot metadata so we can resume from this point
1201                    let mut action = tx.row_delta().set_snapshot_properties(
1202                        vec![
1203                            ("mz-sink-id".to_string(), sink_id.to_string()),
1204                            ("mz-frontier".to_string(), frontier_json),
1205                            ("mz-sink-version".to_string(), sink_version.to_string()),
1206                        ].into_iter().collect()
1207                    );
1208
1209                    if !data_files.is_empty() || !delete_files.is_empty() {
1210                        action = action.add_data_files(data_files).add_delete_files(delete_files);
1211                    }
1212
1213                    let tx = action.apply(tx).context(
1214                        "Failed to apply data file addition to iceberg table transaction",
1215                    )?;
1216
1217                    table = Retry::default().max_tries(5).retry_async(|_| async {
1218                        let new_table = tx.clone().commit(catalog.as_ref()).await;
1219                        match new_table {
1220                            Err(e) if matches!(e.kind(), ErrorKind::CatalogCommitConflicts) => {
1221                                let table = reload_table(
1222                                    catalog.as_ref(),
1223                                    connection.namespace.clone(),
1224                                    connection.table.clone(),
1225                                    table.clone(),
1226                                ).await;
1227                                let table = match table {
1228                                    Ok(table) => table,
1229                                    Err(e) => return RetryResult::RetryableErr(anyhow!(e)),
1230                                };
1231
1232                                let mut snapshots: Vec<_> = table.metadata().snapshots().cloned().collect();
1233                                let last = retrieve_upper_from_snapshots(&mut snapshots);
1234                                let last = match last {
1235                                    Ok(val) => val,
1236                                    Err(e) => return RetryResult::RetryableErr(anyhow!(e)),
1237                                };
1238
1239                                // Check if another writer has advanced the frontier beyond ours (fencing check)
1240                                if let Some((last_frontier, _last_version)) = last {
1241                                    if PartialOrder::less_equal(&frontier, &last_frontier) {
1242                                        return RetryResult::FatalErr(anyhow!(
1243                                            "Iceberg table '{}' has been modified by another writer. Current frontier: {:?}, last frontier: {:?}.",
1244                                            connection.table,
1245                                            frontier,
1246                                            last_frontier,
1247                                        ));
1248                                    }
1249                                }
1250
1251                                RetryResult::Ok(table)
1252                            }
1253                            Err(e) => RetryResult::RetryableErr(anyhow!(e)),
1254                            Ok(table) => RetryResult::Ok(table)
1255                        }
1256                    }).await.context("failed to commit to iceberg")?;
1257
1258                    let mut expect_upper = write_handle.shared_upper();
1259                    loop {
1260                        if PartialOrder::less_equal(&frontier, &expect_upper) {
1261                            // The frontier has already been advanced as far as necessary.
1262                            break;
1263                        }
1264
1265                        const EMPTY: &[((SourceData, ()), Timestamp, StorageDiff)] = &[];
1266                        match write_handle
1267                            .compare_and_append(EMPTY, expect_upper, frontier.clone())
1268                            .await
1269                            .expect("valid usage")
1270                        {
1271                            Ok(()) => break,
1272                            Err(mismatch) => {
1273                                expect_upper = mismatch.current;
1274                            }
1275                        }
1276                    }
1277                    write_frontier.borrow_mut().clone_from(&frontier);
1278
1279                }
1280            }
1281
1282
1283            Ok(())
1284        })
1285    });
1286
1287    let statuses = errors.map(|error| HealthStatusMessage {
1288        id: None,
1289        update: HealthStatusUpdate::halting(format!("{}", error.display_with_causes()), None),
1290        namespace: StatusNamespace::Iceberg,
1291    });
1292
1293    (statuses, button.press_on_drop())
1294}
1295
1296impl<G: Scope<Timestamp = Timestamp>> SinkRender<G> for IcebergSinkConnection {
1297    fn get_key_indices(&self) -> Option<&[usize]> {
1298        self.key_desc_and_indices
1299            .as_ref()
1300            .map(|(_, indices)| indices.as_slice())
1301    }
1302
1303    fn get_relation_key_indices(&self) -> Option<&[usize]> {
1304        self.relation_key_indices.as_deref()
1305    }
1306
1307    fn render_sink(
1308        &self,
1309        storage_state: &mut StorageState,
1310        sink: &StorageSinkDesc<CollectionMetadata, Timestamp>,
1311        sink_id: GlobalId,
1312        input: VecCollection<G, (Option<Row>, DiffPair<Row>), Diff>,
1313        _err_collection: VecCollection<G, DataflowError, Diff>,
1314    ) -> (Stream<G, HealthStatusMessage>, Vec<PressOnDropButton>) {
1315        let mut scope = input.scope();
1316
1317        let write_handle = {
1318            let persist = Arc::clone(&storage_state.persist_clients);
1319            let shard_meta = sink.to_storage_metadata.clone();
1320            async move {
1321                let client = persist.open(shard_meta.persist_location).await?;
1322                let handle = client
1323                    .open_writer(
1324                        shard_meta.data_shard,
1325                        Arc::new(shard_meta.relation_desc),
1326                        Arc::new(UnitSchema),
1327                        Diagnostics::from_purpose("sink handle"),
1328                    )
1329                    .await?;
1330                Ok(handle)
1331            }
1332        };
1333
1334        let write_frontier = Rc::new(RefCell::new(Antichain::from_elem(Timestamp::minimum())));
1335        storage_state
1336            .sink_write_frontiers
1337            .insert(sink_id, Rc::clone(&write_frontier));
1338
1339        let (arrow_schema_with_ids, iceberg_schema) =
1340            match relation_desc_to_iceberg_schema(&sink.from_desc) {
1341                Ok(schemas) => schemas,
1342                Err(err) => {
1343                    let error_stream = std::iter::once(HealthStatusMessage {
1344                        id: None,
1345                        update: HealthStatusUpdate::halting(
1346                            format!("{}", err.display_with_causes()),
1347                            None,
1348                        ),
1349                        namespace: StatusNamespace::Iceberg,
1350                    })
1351                    .to_stream(&mut scope);
1352                    return (error_stream, vec![]);
1353                }
1354            };
1355
1356        let connection_for_minter = self.clone();
1357        let (minted_input, batch_descriptions, mint_status, mint_button) = mint_batch_descriptions(
1358            format!("{sink_id}-iceberg-mint"),
1359            sink_id,
1360            &input,
1361            sink,
1362            connection_for_minter,
1363            storage_state.storage_configuration.clone(),
1364            Arc::clone(&iceberg_schema),
1365        );
1366
1367        let connection_for_writer = self.clone();
1368        let (datafiles, write_status, write_button) = write_data_files(
1369            format!("{sink_id}-write-data-files"),
1370            minted_input,
1371            &batch_descriptions,
1372            connection_for_writer,
1373            storage_state.storage_configuration.clone(),
1374            Arc::new(arrow_schema_with_ids.clone()),
1375        );
1376
1377        let connection_for_committer = self.clone();
1378        let (commit_status, commit_button) = commit_to_iceberg(
1379            format!("{sink_id}-commit-to-iceberg"),
1380            sink_id,
1381            sink.version,
1382            &datafiles,
1383            &batch_descriptions,
1384            Rc::clone(&write_frontier),
1385            connection_for_committer,
1386            storage_state.storage_configuration.clone(),
1387            write_handle,
1388        );
1389
1390        let running_status = Some(HealthStatusMessage {
1391            id: None,
1392            update: HealthStatusUpdate::running(),
1393            namespace: StatusNamespace::Iceberg,
1394        })
1395        .to_stream(&mut scope);
1396
1397        let statuses =
1398            scope.concatenate([running_status, mint_status, write_status, commit_status]);
1399
1400        (statuses, vec![mint_button, write_button, commit_button])
1401    }
1402}