mz_storage/sink/
iceberg.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Iceberg sink implementation.
11//!
12//! This code renders a [`IcebergSinkConnection`] into a dataflow that writes
13//! data to an Iceberg table. The dataflow consists of three operators:
14//!
15//! ```text
16//!        ┏━━━━━━━━━━━━━━┓
17//!        ┃   persist    ┃
18//!        ┃    source    ┃
19//!        ┗━━━━━━┯━━━━━━━┛
20//!               │ row data, the input to this module
21//!               │
22//!        ┏━━━━━━v━━━━━━━┓
23//!        ┃     mint     ┃ (single worker)
24//!        ┃    batch     ┃ loads/creates the Iceberg table,
25//!        ┃ descriptions ┃ determines resume upper
26//!        ┗━━━┯━━━━━┯━━━━┛
27//!            │     │ batch descriptions (broadcast)
28//!       rows │     ├─────────────────────────┐
29//!            │     │                         │
30//!        ┏━━━v━━━━━v━━━━┓    ╭─────────────╮ │
31//!        ┃    write     ┃───>│ S3 / object │ │
32//!        ┃  data files  ┃    │   storage   │ │
33//!        ┗━━━━━━┯━━━━━━━┛    ╰─────────────╯ │
34//!               │ file metadata              │
35//!               │                            │
36//!        ┏━━━━━━v━━━━━━━━━━━━━━━━━━━━━━━━━━━━v┓
37//!        ┃           commit to               ┃ (single worker)
38//!        ┃             iceberg               ┃
39//!        ┗━━━━━━━━━━━━━━┯━━━━━━━━━━━━━━━━━━━━━┛
40//!                      │
41//!              ╭───────v───────╮
42//!              │ Iceberg table │
43//!              │  (snapshots)  │
44//!              ╰───────────────╯
45//! ```
46//! # Minting batch descriptions
47//! The "mint batch descriptions" operator is responsible for generating
48//! time-based batch boundaries that group writes into Iceberg snapshots.
49//! It maintains a sliding window of future batch descriptions so that
50//! writers can start processing data even while earlier batches are still being written.
51//! Knowing the batch boundaries ahead of time is important because we need to
52//! be able to make the claim that all data files written for a given batch
53//! include all data up to the upper `t` but not beyond it.
54//! This could be trivially achieved by waiting for all data to arrive up to a certain
55//! frontier, but that would prevent us from streaming writes out to object storage
56//! until the entire batch is complete, which would increase latency and reduce throughput.
57//!
58//! # Writing data files
59//! The "write data files" operator receives rows along with batch descriptions.
60//! It matches rows to batches by timestamp; if a batch description hasn't arrived yet,
61//! rows are stashed until it does. This allows batches to be minted ahead of data arrival.
62//! The operator uses an Iceberg `DeltaWriter` to write Parquet data files
63//! (and position delete files if necessary) to object storage.
64//! It outputs metadata about the written files along with their batch descriptions
65//! for the commit operator to consume.
66//!
67//! # Committing to Iceberg
68//! The "commit to iceberg" operator receives metadata about written data files
69//! along with their batch descriptions. It groups files by batch and creates
70//! Iceberg snapshots that include all files for each batch. It updates the Iceberg
71//! table's metadata to reflect the new snapshots, including updating the
72//! `mz-frontier` property to track progress.
73
74use std::cmp::Ordering;
75use std::collections::{BTreeMap, VecDeque};
76use std::{cell::RefCell, rc::Rc, sync::Arc};
77
78use anyhow::{Context, anyhow};
79use arrow::array::Int32Array;
80use arrow::datatypes::{DataType, Field};
81use arrow::{
82    array::RecordBatch, datatypes::Schema as ArrowSchema, datatypes::SchemaRef as ArrowSchemaRef,
83};
84use differential_dataflow::lattice::Lattice;
85use differential_dataflow::{AsCollection, Hashable, VecCollection};
86use futures::StreamExt;
87use iceberg::ErrorKind;
88use iceberg::arrow::{arrow_schema_to_schema, schema_to_arrow_schema};
89use iceberg::spec::{
90    DataFile, FormatVersion, Snapshot, StructType, read_data_files_from_avro,
91    write_data_files_to_avro,
92};
93use iceberg::spec::{Schema, SchemaRef};
94use iceberg::table::Table;
95use iceberg::transaction::{ApplyTransactionAction, Transaction};
96use iceberg::writer::base_writer::data_file_writer::DataFileWriter;
97use iceberg::writer::base_writer::data_file_writer::DataFileWriterBuilder;
98use iceberg::writer::base_writer::equality_delete_writer::{
99    EqualityDeleteFileWriter, EqualityDeleteFileWriterBuilder, EqualityDeleteWriterConfig,
100};
101use iceberg::writer::base_writer::position_delete_writer::PositionDeleteFileWriter;
102use iceberg::writer::base_writer::position_delete_writer::{
103    PositionDeleteFileWriterBuilder, PositionDeleteWriterConfig,
104};
105use iceberg::writer::combined_writer::delta_writer::{DeltaWriter, DeltaWriterBuilder};
106use iceberg::writer::file_writer::ParquetWriterBuilder;
107use iceberg::writer::file_writer::location_generator::{
108    DefaultFileNameGenerator, DefaultLocationGenerator,
109};
110use iceberg::writer::file_writer::rolling_writer::RollingFileWriterBuilder;
111use iceberg::writer::{IcebergWriter, IcebergWriterBuilder};
112use iceberg::{Catalog, NamespaceIdent, TableCreation, TableIdent};
113use itertools::Itertools;
114use mz_arrow_util::builder::ArrowBuilder;
115use mz_interchange::avro::DiffPair;
116use mz_ore::cast::CastFrom;
117use mz_ore::error::ErrorExt;
118use mz_ore::future::InTask;
119use mz_ore::result::ResultExt;
120use mz_ore::retry::{Retry, RetryResult};
121use mz_persist_client::Diagnostics;
122use mz_persist_client::write::WriteHandle;
123use mz_persist_types::codec_impls::UnitSchema;
124use mz_repr::{Diff, GlobalId, Row, Timestamp};
125use mz_storage_types::StorageDiff;
126use mz_storage_types::configuration::StorageConfiguration;
127use mz_storage_types::controller::CollectionMetadata;
128use mz_storage_types::errors::DataflowError;
129use mz_storage_types::sinks::{IcebergSinkConnection, StorageSinkDesc};
130use mz_storage_types::sources::SourceData;
131use mz_timely_util::antichain::AntichainExt;
132use mz_timely_util::builder_async::{Event, OperatorBuilder, PressOnDropButton};
133use parquet::arrow::PARQUET_FIELD_ID_META_KEY;
134use parquet::file::properties::WriterProperties;
135use serde::{Deserialize, Serialize};
136use timely::PartialOrder;
137use timely::container::CapacityContainerBuilder;
138use timely::dataflow::channels::pact::{Exchange, Pipeline};
139use timely::dataflow::operators::{Broadcast, CapabilitySet, Concatenate, Map, ToStream};
140use timely::dataflow::{Scope, Stream};
141use timely::progress::{Antichain, Timestamp as _};
142
143use crate::healthcheck::{HealthStatusMessage, HealthStatusUpdate, StatusNamespace};
144use crate::metrics::sink::iceberg::IcebergSinkMetrics;
145use crate::render::sinks::SinkRender;
146use crate::statistics::SinkStatistics;
147use crate::storage_state::StorageState;
148
149/// Set the default capacity for the array builders inside the ArrowBuilder. This is the
150/// number of items each builder can hold before it needs to allocate more memory.
151const DEFAULT_ARRAY_BUILDER_ITEM_CAPACITY: usize = 1024;
152/// Set the default buffer capacity for the string and binary array builders inside the
153/// ArrowBuilder. This is the number of bytes each builder can hold before it needs to allocate
154/// more memory.
155const DEFAULT_ARRAY_BUILDER_DATA_CAPACITY: usize = 1024;
156
157/// The prefix for Parquet files written by this sink.
158const PARQUET_FILE_PREFIX: &str = "mz_data";
159/// The number of batch descriptions to mint ahead of the observed frontier. This determines how
160/// many batches we have in-flight at any given time.
161const INITIAL_DESCRIPTIONS_TO_MINT: u64 = 3;
162
163type DeltaWriterType = DeltaWriter<
164    DataFileWriter<ParquetWriterBuilder, DefaultLocationGenerator, DefaultFileNameGenerator>,
165    PositionDeleteFileWriter<
166        ParquetWriterBuilder,
167        DefaultLocationGenerator,
168        DefaultFileNameGenerator,
169    >,
170    EqualityDeleteFileWriter<
171        ParquetWriterBuilder,
172        DefaultLocationGenerator,
173        DefaultFileNameGenerator,
174    >,
175>;
176
177/// Add Parquet field IDs to an Arrow schema. Iceberg requires field IDs in the
178/// Parquet metadata for schema evolution tracking.
179/// TODO: Support nested data types with proper field IDs.
180fn add_field_ids_to_arrow_schema(schema: ArrowSchema) -> ArrowSchema {
181    let fields: Vec<Field> = schema
182        .fields()
183        .iter()
184        .enumerate()
185        .map(|(i, field)| {
186            let mut metadata = field.metadata().clone();
187            metadata.insert(PARQUET_FIELD_ID_META_KEY.to_string(), (i + 1).to_string());
188            Field::new(field.name(), field.data_type().clone(), field.is_nullable())
189                .with_metadata(metadata)
190        })
191        .collect();
192
193    ArrowSchema::new(fields).with_metadata(schema.metadata().clone())
194}
195
196/// Merge Materialize extension metadata into Iceberg's Arrow schema.
197/// This uses Iceberg's data types (e.g. Utf8) and field IDs while preserving
198/// Materialize's extension names for ArrowBuilder compatibility.
199fn merge_materialize_metadata_into_iceberg_schema(
200    materialize_arrow_schema: &ArrowSchema,
201    iceberg_schema: &Schema,
202) -> anyhow::Result<ArrowSchema> {
203    // First, convert Iceberg schema to Arrow (this gives us the correct data types)
204    let iceberg_arrow_schema = schema_to_arrow_schema(iceberg_schema)
205        .context("Failed to convert Iceberg schema to Arrow schema")?;
206
207    // Now merge in the Materialize extension metadata
208    let fields: Vec<Field> = iceberg_arrow_schema
209        .fields()
210        .iter()
211        .map(|iceberg_field| {
212            // Find the corresponding Materialize field by name to get extension metadata
213            let mz_field = materialize_arrow_schema
214                .field_with_name(iceberg_field.name())
215                .with_context(|| {
216                    format!(
217                        "Field '{}' not found in Materialize schema",
218                        iceberg_field.name()
219                    )
220                })?;
221
222            // Start with Iceberg field's metadata (which includes field IDs)
223            let mut metadata = iceberg_field.metadata().clone();
224
225            // Add Materialize extension name
226            if let Some(extension_name) = mz_field.metadata().get("ARROW:extension:name") {
227                metadata.insert("ARROW:extension:name".to_string(), extension_name.clone());
228            }
229
230            // Use Iceberg's data type and field ID, but add Materialize extension metadata
231            Ok(Field::new(
232                iceberg_field.name(),
233                iceberg_field.data_type().clone(),
234                iceberg_field.is_nullable(),
235            )
236            .with_metadata(metadata))
237        })
238        .collect::<anyhow::Result<Vec<_>>>()?;
239
240    Ok(ArrowSchema::new(fields).with_metadata(iceberg_arrow_schema.metadata().clone()))
241}
242
243async fn reload_table(
244    catalog: &dyn Catalog,
245    namespace: String,
246    table_name: String,
247    current_table: Table,
248) -> anyhow::Result<Table> {
249    let namespace_ident = NamespaceIdent::new(namespace.clone());
250    let table_ident = TableIdent::new(namespace_ident, table_name.clone());
251    let current_schema = current_table.metadata().current_schema_id();
252    let current_partition_spec = current_table.metadata().default_partition_spec_id();
253
254    match catalog.load_table(&table_ident).await {
255        Ok(table) => {
256            let reloaded_schema = table.metadata().current_schema_id();
257            let reloaded_partition_spec = table.metadata().default_partition_spec_id();
258            if reloaded_schema != current_schema {
259                return Err(anyhow::anyhow!(
260                    "Iceberg table '{}' schema changed during operation but schema evolution isn't supported, expected schema ID {}, got {}",
261                    table_name,
262                    current_schema,
263                    reloaded_schema
264                ));
265            }
266
267            if reloaded_partition_spec != current_partition_spec {
268                return Err(anyhow::anyhow!(
269                    "Iceberg table '{}' partition spec changed during operation but partition spec evolution isn't supported, expected partition spec ID {}, got {}",
270                    table_name,
271                    current_partition_spec,
272                    reloaded_partition_spec
273                ));
274            }
275
276            Ok(table)
277        }
278        Err(err) => Err(err).context("Failed to reload Iceberg table"),
279    }
280}
281
282/// Load an existing Iceberg table or create it if it doesn't exist.
283async fn load_or_create_table(
284    catalog: &dyn Catalog,
285    namespace: String,
286    table_name: String,
287    schema: &Schema,
288) -> anyhow::Result<iceberg::table::Table> {
289    let namespace_ident = NamespaceIdent::new(namespace.clone());
290    let table_ident = TableIdent::new(namespace_ident.clone(), table_name.clone());
291
292    // Try to load the table first
293    match catalog.load_table(&table_ident).await {
294        Ok(table) => {
295            // Table exists, return it
296            // TODO: Add proper schema evolution/validation to ensure compatibility
297            Ok(table)
298        }
299        Err(err) => {
300            if matches!(err.kind(), ErrorKind::TableNotFound { .. })
301                || err
302                    .message()
303                    .contains("Tried to load a table that does not exist")
304            {
305                // Table doesn't exist, create it
306                // Note: location is not specified, letting the catalog determine the default location
307                // based on its warehouse configuration
308                let table_creation = TableCreation::builder()
309                    .name(table_name.clone())
310                    .schema(schema.clone())
311                    // Use unpartitioned spec by default
312                    // TODO: Consider making partition spec configurable
313                    // .partition_spec(UnboundPartitionSpec::builder().build())
314                    .build();
315
316                catalog
317                    .create_table(&namespace_ident, table_creation)
318                    .await
319                    .with_context(|| {
320                        format!(
321                            "Failed to create Iceberg table '{}' in namespace '{}'",
322                            table_name, namespace
323                        )
324                    })
325            } else {
326                // Some other error occurred
327                Err(err).context("Failed to load Iceberg table")
328            }
329        }
330    }
331}
332
333/// Find the most recent Materialize frontier from Iceberg snapshots.
334/// We store the frontier in snapshot metadata to track where we left off after restarts.
335/// Snapshots with operation="replace" (compactions) don't have our metadata and are skipped.
336/// The input slice will be sorted by sequence number in descending order.
337fn retrieve_upper_from_snapshots(
338    snapshots: &mut [Arc<Snapshot>],
339) -> anyhow::Result<Option<(Antichain<Timestamp>, u64)>> {
340    snapshots.sort_by(|a, b| Ord::cmp(&b.sequence_number(), &a.sequence_number()));
341
342    for snapshot in snapshots {
343        let props = &snapshot.summary().additional_properties;
344        if let (Some(frontier_json), Some(sink_version_str)) =
345            (props.get("mz-frontier"), props.get("mz-sink-version"))
346        {
347            let frontier: Vec<Timestamp> = serde_json::from_str(frontier_json)
348                .context("Failed to deserialize frontier from snapshot properties")?;
349            let frontier = Antichain::from_iter(frontier);
350
351            let sink_version = sink_version_str
352                .parse::<u64>()
353                .context("Failed to parse mz-sink-version from snapshot properties")?;
354
355            return Ok(Some((frontier, sink_version)));
356        }
357        if snapshot.summary().operation.as_str() != "replace" {
358            // This is a bad heuristic, but we have no real other way to identify compactions
359            // right now other than assume they will be the only operation writing "replace" operations.
360            // That means if we find a snapshot with some other operation, but no mz-frontier, we are in an
361            // inconsistent state and have to error out.
362            anyhow::bail!(
363                "Iceberg table is in an inconsistent state: snapshot {} has operation '{}' but is missing 'mz-frontier' property. Schema or partition spec evolution is not supported.",
364                snapshot.snapshot_id(),
365                snapshot.summary().operation.as_str(),
366            );
367        }
368    }
369
370    Ok(None)
371}
372
373/// Convert a Materialize RelationDesc into Arrow and Iceberg schemas.
374/// Returns the Arrow schema (with field IDs) for writing Parquet files,
375/// and the Iceberg schema for table creation/validation.
376fn relation_desc_to_iceberg_schema(
377    desc: &mz_repr::RelationDesc,
378) -> anyhow::Result<(ArrowSchema, SchemaRef)> {
379    let arrow_schema = mz_arrow_util::builder::desc_to_schema(desc)
380        .context("Failed to convert RelationDesc to Arrow schema")?;
381
382    let arrow_schema_with_ids = add_field_ids_to_arrow_schema(arrow_schema);
383
384    let iceberg_schema = arrow_schema_to_schema(&arrow_schema_with_ids)
385        .context("Failed to convert Arrow schema to Iceberg schema")?;
386
387    Ok((arrow_schema_with_ids, Arc::new(iceberg_schema)))
388}
389
390/// Build a new Arrow schema by adding an __op column to the existing schema.
391fn build_schema_with_op_column(schema: &ArrowSchema) -> ArrowSchema {
392    let mut fields: Vec<Field> = schema.fields().iter().map(|f| f.as_ref().clone()).collect();
393    fields.push(Field::new("__op", DataType::Int32, false));
394    ArrowSchema::new(fields)
395}
396
397/// Convert a Materialize DiffPair into an Arrow RecordBatch with an __op column.
398/// The __op column indicates whether each row is an insert (1) or delete (-1), which
399/// the DeltaWriter uses to generate the appropriate Iceberg data/delete files.
400fn row_to_recordbatch(
401    row: DiffPair<Row>,
402    schema: ArrowSchemaRef,
403    schema_with_op: ArrowSchemaRef,
404) -> anyhow::Result<RecordBatch> {
405    let mut builder = ArrowBuilder::new_with_schema(
406        Arc::clone(&schema),
407        DEFAULT_ARRAY_BUILDER_ITEM_CAPACITY,
408        DEFAULT_ARRAY_BUILDER_DATA_CAPACITY,
409    )
410    .context("Failed to create builder")?;
411
412    let mut op_values = Vec::new();
413
414    if let Some(before) = row.before {
415        builder
416            .add_row(&before)
417            .context("Failed to add delete row to builder")?;
418        op_values.push(-1i32); // Delete operation
419    }
420    if let Some(after) = row.after {
421        builder
422            .add_row(&after)
423            .context("Failed to add insert row to builder")?;
424        op_values.push(1i32); // Insert operation
425    }
426
427    let batch = builder
428        .to_record_batch()
429        .context("Failed to create record batch")?;
430
431    let mut columns = Vec::with_capacity(batch.columns().len() + 1);
432    columns.extend_from_slice(batch.columns());
433    let op_column = Arc::new(Int32Array::from(op_values));
434    columns.push(op_column);
435
436    let batch_with_op = RecordBatch::try_new(schema_with_op, columns)
437        .context("Failed to create batch with op column")?;
438    Ok(batch_with_op)
439}
440
441/// Generate time-based batch boundaries for grouping writes into Iceberg snapshots.
442/// Batches are minted with configurable windows to balance write efficiency with latency.
443/// We maintain a sliding window of future batch descriptions so writers can start
444/// processing data even while earlier batches are still being written.
445fn mint_batch_descriptions<G, D>(
446    name: String,
447    sink_id: GlobalId,
448    input: &VecCollection<G, D, Diff>,
449    sink: &StorageSinkDesc<CollectionMetadata, Timestamp>,
450    connection: IcebergSinkConnection,
451    storage_configuration: StorageConfiguration,
452    initial_schema: SchemaRef,
453) -> (
454    VecCollection<G, D, Diff>,
455    Stream<G, (Antichain<Timestamp>, Antichain<Timestamp>)>,
456    Stream<G, HealthStatusMessage>,
457    PressOnDropButton,
458)
459where
460    G: Scope<Timestamp = Timestamp>,
461    D: Clone + 'static,
462{
463    let scope = input.scope();
464    let name_for_error = name.clone();
465    let mut builder = OperatorBuilder::new(name, scope.clone());
466    let sink_version = sink.version;
467
468    let hashed_id = sink_id.hashed();
469    let is_active_worker = usize::cast_from(hashed_id) % scope.peers() == scope.index();
470    let (output, output_stream) = builder.new_output();
471    let (batch_desc_output, batch_desc_stream) =
472        builder.new_output::<CapacityContainerBuilder<_>>();
473    let mut input =
474        builder.new_input_for_many(&input.inner, Pipeline, [&output, &batch_desc_output]);
475
476    let as_of = sink.as_of.clone();
477    let commit_interval = sink
478        .commit_interval
479        .expect("the planner should have enforced this")
480        .clone();
481
482    let (button, errors): (_, Stream<G, Rc<anyhow::Error>>) = builder.build_fallible(move |caps| {
483        Box::pin(async move {
484            let [data_capset, capset]: &mut [_; 2] = caps.try_into().unwrap();
485            *data_capset = CapabilitySet::new();
486
487            if !is_active_worker {
488                *capset = CapabilitySet::new();
489                *data_capset = CapabilitySet::new();
490                while let Some(event) = input.next().await {
491                    match event {
492                        Event::Data([output_cap, _], mut data) => {
493                            output.give_container(&output_cap, &mut data);
494                        }
495                        Event::Progress(_) => {}
496                    }
497                }
498                return Ok(());
499            }
500
501            let catalog = connection
502                .catalog_connection
503                .connect(&storage_configuration, InTask::Yes)
504                .await
505                .context("Failed to connect to iceberg catalog")?;
506
507            let table = load_or_create_table(
508                catalog.as_ref(),
509                connection.namespace.clone(),
510                connection.table.clone(),
511                initial_schema.as_ref(),
512            )
513            .await?;
514
515            let mut snapshots: Vec<_> = table.metadata().snapshots().cloned().collect();
516            let resume = retrieve_upper_from_snapshots(&mut snapshots)?;
517            let (resume_upper, resume_version) = match resume {
518                Some((f, v)) => (f, v),
519                None => (Antichain::from_elem(Timestamp::minimum()), 0),
520            };
521
522            // The input has overcompacted if
523            let overcompacted =
524                // ..we have made some progress in the past
525                *resume_upper != [Timestamp::minimum()] &&
526                // ..but the since frontier is now beyond that
527                PartialOrder::less_than(&resume_upper, &as_of);
528
529            if overcompacted {
530                let err = format!(
531                    "{name_for_error}: input compacted past resume upper: as_of {}, resume_upper: {}",
532                    as_of.pretty(),
533                    resume_upper.pretty()
534                );
535                // This would normally be an assertion but because it can happen after a
536                // Materialize backup/restore we log an error so that it appears on Sentry but
537                // leaves the rest of the objects in the cluster unaffected.
538                return Err(anyhow::anyhow!("{err}"));
539            };
540
541            if resume_version > sink_version {
542                anyhow::bail!("Fenced off by newer sink version: resume_version {}, sink_version {}", resume_version, sink_version);
543            }
544
545            let mut initialized = false;
546            let mut observed_frontier;
547            // Track minted batches to maintain a sliding window of open batch descriptions.
548            // This is needed to know when to retire old batches and mint new ones.
549            // It's "sortedness" is derived from the monotonicity of batch descriptions,
550            // and the fact that we only ever push new descriptions to the back and pop from the front.
551            let mut minted_batches = VecDeque::new();
552            loop {
553                if let Some(event) = input.next().await {
554                    match event {
555                        Event::Data([output_cap, _], mut data) => {
556                            output.give_container(&output_cap, &mut data);
557                            continue;
558                        }
559                        Event::Progress(frontier) => {
560                            observed_frontier = frontier;
561                        }
562                    }
563                } else {
564                    return Ok(());
565                }
566
567                if !initialized {
568                    // We only start minting after we've reached as_of and resume_upper to avoid
569                    // minting batches that would be immediately skipped.
570                    if PartialOrder::less_equal(&observed_frontier, &resume_upper) || PartialOrder::less_equal(&observed_frontier, &as_of) {
571                        continue;
572                    }
573
574                    let mut batch_descriptions = vec![];
575                    let mut current_upper = observed_frontier.clone();
576                    let current_upper_ts = current_upper.as_option().unwrap().clone();
577
578                    // If we're resuming, create a catch-up batch from resume_upper to current frontier
579                    if PartialOrder::less_than(&resume_upper, &current_upper) {
580                        let batch_description = (resume_upper.clone(), current_upper.clone());
581                        batch_descriptions.push(batch_description);
582                    }
583
584                    // Mint initial future batch descriptions at configurable intervals
585                    for i in 1..INITIAL_DESCRIPTIONS_TO_MINT + 1 {
586                        let duration_millis = commit_interval.as_millis()
587                            .checked_mul(u128::from(i))
588                            .expect("commit interval multiplication overflow");
589                        let duration_ts = Timestamp::new(u64::try_from(duration_millis)
590                            .expect("commit interval too large for u64"));
591                        let desired_batch_upper = Antichain::from_elem(current_upper_ts.step_forward_by(&duration_ts));
592
593                        let batch_description = (current_upper.clone(), desired_batch_upper);
594                        current_upper = batch_description.1.clone();
595                        batch_descriptions.push(batch_description);
596                    }
597
598                    minted_batches.extend(batch_descriptions.clone());
599
600                    for desc in batch_descriptions {
601                        batch_desc_output.give(&capset[0], desc);
602                    }
603
604                    capset.downgrade(current_upper);
605
606                    initialized = true;
607                } else {
608                    // Maintain a sliding window: when the oldest batch becomes ready, retire it
609                    // and mint a new future batch to keep the pipeline full
610                    while let Some(oldest_desc) = minted_batches.front() {
611                        let oldest_upper = &oldest_desc.1;
612                        if !PartialOrder::less_equal(oldest_upper, &observed_frontier) {
613                            break;
614                        }
615
616                        let newest_upper = minted_batches.back().unwrap().1.clone();
617                        let new_lower = newest_upper.clone();
618                        let duration_ts = Timestamp::new(commit_interval.as_millis()
619                            .try_into()
620                            .expect("commit interval too large for u64"));
621                        let new_upper = Antichain::from_elem(newest_upper
622                            .as_option()
623                            .unwrap()
624                            .step_forward_by(&duration_ts));
625
626                        let new_batch_description = (new_lower.clone(), new_upper.clone());
627                        minted_batches.pop_front();
628                        minted_batches.push_back(new_batch_description.clone());
629
630                        batch_desc_output.give(&capset[0], new_batch_description);
631
632                        capset.downgrade(new_upper);
633                    }
634                }
635            }
636        })
637    });
638
639    let statuses = errors.map(|error| HealthStatusMessage {
640        id: None,
641        update: HealthStatusUpdate::halting(format!("{}", error.display_with_causes()), None),
642        namespace: StatusNamespace::Iceberg,
643    });
644    (
645        output_stream.as_collection(),
646        batch_desc_stream,
647        statuses,
648        button.press_on_drop(),
649    )
650}
651
652#[derive(Clone, Debug, Serialize, Deserialize)]
653#[serde(try_from = "AvroDataFile", into = "AvroDataFile")]
654struct SerializableDataFile {
655    pub data_file: DataFile,
656    pub schema: Schema,
657}
658
659/// A wrapper around Iceberg's DataFile that implements Serialize and Deserialize.
660/// This is slightly complicated by the fact that Iceberg's DataFile doesn't implement
661/// these traits directly, so we serialize to/from Avro bytes (which Iceberg supports natively).
662/// The avro ser(de) also requires the Iceberg schema to be provided, so we include that as well.
663/// It is distinctly possible that this is overkill, but it avoids re-implementing
664/// Iceberg's serialization logic here.
665/// If at some point this becomes a serious overhead, we can revisit this decision.
666#[derive(Clone, Debug, Serialize, Deserialize)]
667struct AvroDataFile {
668    pub data_file: Vec<u8>,
669    /// Schema serialized as JSON bytes to avoid bincode issues with HashMap
670    pub schema: Vec<u8>,
671}
672
673impl From<SerializableDataFile> for AvroDataFile {
674    fn from(value: SerializableDataFile) -> Self {
675        let mut data_file = Vec::new();
676        write_data_files_to_avro(
677            &mut data_file,
678            [value.data_file],
679            &StructType::new(vec![]),
680            FormatVersion::V2,
681        )
682        .expect("serialization into buffer");
683        let schema = serde_json::to_vec(&value.schema).expect("schema serialization");
684        AvroDataFile { data_file, schema }
685    }
686}
687
688impl TryFrom<AvroDataFile> for SerializableDataFile {
689    type Error = String;
690
691    fn try_from(value: AvroDataFile) -> Result<Self, Self::Error> {
692        let schema: Schema = serde_json::from_slice(&value.schema)
693            .map_err(|e| format!("Failed to deserialize schema: {}", e))?;
694        let data_files = read_data_files_from_avro(
695            &mut &*value.data_file,
696            &schema,
697            0,
698            &StructType::new(vec![]),
699            FormatVersion::V2,
700        )
701        .map_err_to_string_with_causes()?;
702        let Some(data_file) = data_files.into_iter().next() else {
703            return Err("No DataFile found in Avro data".into());
704        };
705        Ok(SerializableDataFile { data_file, schema })
706    }
707}
708
709/// A DataFile along with its associated batch description (lower and upper bounds).
710#[derive(Clone, Debug, Serialize, Deserialize)]
711struct BoundedDataFile {
712    pub data_file: SerializableDataFile,
713    pub batch_desc: (Antichain<Timestamp>, Antichain<Timestamp>),
714}
715
716impl BoundedDataFile {
717    pub fn new(
718        file: DataFile,
719        schema: Schema,
720        batch_desc: (Antichain<Timestamp>, Antichain<Timestamp>),
721    ) -> Self {
722        Self {
723            data_file: SerializableDataFile {
724                data_file: file,
725                schema,
726            },
727            batch_desc,
728        }
729    }
730
731    pub fn batch_desc(&self) -> &(Antichain<Timestamp>, Antichain<Timestamp>) {
732        &self.batch_desc
733    }
734
735    pub fn data_file(&self) -> &DataFile {
736        &self.data_file.data_file
737    }
738
739    pub fn into_data_file(self) -> DataFile {
740        self.data_file.data_file
741    }
742}
743
744/// A set of DataFiles along with their associated batch descriptions.
745#[derive(Clone, Debug, Default)]
746struct BoundedDataFileSet {
747    pub data_files: Vec<BoundedDataFile>,
748}
749
750/// Write rows into Parquet data files bounded by batch descriptions.
751/// Rows are matched to batches by timestamp; if a batch description hasn't arrived yet,
752/// rows are stashed until it does. This allows batches to be minted ahead of data arrival.
753fn write_data_files<G>(
754    name: String,
755    input: VecCollection<G, (Option<Row>, DiffPair<Row>), Diff>,
756    batch_desc_input: &Stream<G, (Antichain<Timestamp>, Antichain<Timestamp>)>,
757    connection: IcebergSinkConnection,
758    storage_configuration: StorageConfiguration,
759    materialize_arrow_schema: Arc<ArrowSchema>,
760    metrics: IcebergSinkMetrics,
761    statistics: SinkStatistics,
762) -> (
763    Stream<G, BoundedDataFile>,
764    Stream<G, HealthStatusMessage>,
765    PressOnDropButton,
766)
767where
768    G: Scope<Timestamp = Timestamp>,
769{
770    let scope = input.scope();
771    let mut builder = OperatorBuilder::new(name, scope.clone());
772
773    let (output, output_stream) = builder.new_output::<CapacityContainerBuilder<_>>();
774
775    let mut batch_desc_input =
776        builder.new_input_for(&batch_desc_input.broadcast(), Pipeline, &output);
777    let mut input = builder.new_disconnected_input(&input.inner, Pipeline);
778
779    let (button, errors) = builder.build_fallible(|caps| {
780        Box::pin(async move {
781            let [capset]: &mut [_; 1] = caps.try_into().unwrap();
782            let catalog = connection
783                .catalog_connection
784                .connect(&storage_configuration, InTask::Yes)
785                .await
786                .context("Failed to connect to iceberg catalog")?;
787
788            let namespace_ident = NamespaceIdent::new(connection.namespace.clone());
789            let table_ident = TableIdent::new(namespace_ident, connection.table.clone());
790            let table = catalog
791                .load_table(&table_ident)
792                .await
793                .context("Failed to load Iceberg table")?;
794
795            let table_metadata = table.metadata().clone();
796            let current_schema = Arc::clone(table_metadata.current_schema());
797
798            let arrow_schema = Arc::new(
799                merge_materialize_metadata_into_iceberg_schema(
800                    materialize_arrow_schema.as_ref(),
801                    current_schema.as_ref(),
802                )
803                .context("Failed to merge Materialize metadata into Iceberg schema")?,
804            );
805
806            let schema_with_op = Arc::new(build_schema_with_op_column(&arrow_schema));
807
808            // WORKAROUND: S3 Tables catalog incorrectly sets location to the metadata file path
809            // instead of the warehouse root. Strip off the /metadata/*.metadata.json suffix.
810            // No clear way to detect this properly right now, so we use heuristics.
811            let location = table_metadata.location();
812            let corrected_location = match location.rsplit_once("/metadata/") {
813                Some((a, b)) if b.ends_with(".metadata.json") => a,
814                _ => location,
815            };
816
817            let data_location = format!("{}/data", corrected_location);
818            let location_generator = DefaultLocationGenerator::with_data_location(data_location);
819
820            // Add a unique suffix to avoid filename collisions across restarts and workers
821            let unique_suffix = format!("-{}", uuid::Uuid::new_v4());
822            let file_name_generator = DefaultFileNameGenerator::new(
823                PARQUET_FILE_PREFIX.to_string(),
824                Some(unique_suffix),
825                iceberg::spec::DataFileFormat::Parquet,
826            );
827
828            let file_io = table.file_io().clone();
829
830            let Some((_, equality_indices)) = connection.key_desc_and_indices else {
831                return Err(anyhow::anyhow!(
832                    "Iceberg sink requires key columns for equality deletes"
833                ));
834            };
835
836            let equality_ids: Vec<i32> = equality_indices
837                .iter()
838                .map(|u| i32::try_from(*u).map(|v| v + 1))
839                .collect::<Result<Vec<i32>, _>>()
840                .context("Failed to convert equality index to i32 (index too large)")?;
841
842            let writer_properties = WriterProperties::new();
843
844            let create_delta_writer = || async {
845                let data_parquet_writer = ParquetWriterBuilder::new(
846                    writer_properties.clone(),
847                    Arc::clone(&current_schema),
848                );
849                let data_rolling_writer = RollingFileWriterBuilder::new_with_default_file_size(
850                    data_parquet_writer,
851                    Arc::clone(&current_schema),
852                    file_io.clone(),
853                    location_generator.clone(),
854                    file_name_generator.clone(),
855                );
856                let data_writer_builder = DataFileWriterBuilder::new(data_rolling_writer);
857
858                let pos_arrow_schema = PositionDeleteWriterConfig::arrow_schema();
859                let pos_schema =
860                    Arc::new(arrow_schema_to_schema(&pos_arrow_schema).context(
861                        "Failed to convert position delete Arrow schema to Iceberg schema",
862                    )?);
863                let pos_config = PositionDeleteWriterConfig::new(None, 0, None);
864                let pos_parquet_writer =
865                    ParquetWriterBuilder::new(writer_properties.clone(), pos_schema);
866                let pos_rolling_writer = RollingFileWriterBuilder::new_with_default_file_size(
867                    pos_parquet_writer,
868                    Arc::clone(&current_schema),
869                    file_io.clone(),
870                    location_generator.clone(),
871                    file_name_generator.clone(),
872                );
873                let pos_delete_writer_builder =
874                    PositionDeleteFileWriterBuilder::new(pos_rolling_writer, pos_config);
875
876                let eq_config = EqualityDeleteWriterConfig::new(
877                    equality_ids.clone(),
878                    Arc::clone(&current_schema),
879                )
880                .context("Failed to create EqualityDeleteWriterConfig")?;
881
882                let eq_schema = Arc::new(
883                    arrow_schema_to_schema(eq_config.projected_arrow_schema_ref()).context(
884                        "Failed to convert equality delete Arrow schema to Iceberg schema",
885                    )?,
886                );
887
888                let eq_parquet_writer =
889                    ParquetWriterBuilder::new(writer_properties.clone(), eq_schema);
890                let eq_rolling_writer = RollingFileWriterBuilder::new_with_default_file_size(
891                    eq_parquet_writer,
892                    Arc::clone(&current_schema),
893                    file_io.clone(),
894                    location_generator.clone(),
895                    file_name_generator.clone(),
896                );
897                let eq_delete_writer_builder =
898                    EqualityDeleteFileWriterBuilder::new(eq_rolling_writer, eq_config);
899
900                DeltaWriterBuilder::new(
901                    data_writer_builder,
902                    pos_delete_writer_builder,
903                    eq_delete_writer_builder,
904                    equality_ids.clone(),
905                )
906                .build(None)
907                .await
908                .context("Failed to create DeltaWriter")
909            };
910
911            // Rows can arrive before their batch description due to dataflow parallelism.
912            // Stash them until we know which batch they belong to.
913            let mut stashed_rows: BTreeMap<Timestamp, Vec<(Option<Row>, DiffPair<Row>)>> =
914                BTreeMap::new();
915
916            // Track batches currently being written. When a row arrives, we check if it belongs
917            // to an in-flight batch. When frontiers advance past a batch's upper, we close the
918            // writer and emit its data files downstream.
919            // Antichains don't implement Ord, so we use a HashMap with tuple keys instead.
920            #[allow(clippy::disallowed_types)]
921            let mut in_flight_batches: std::collections::HashMap<
922                (Antichain<Timestamp>, Antichain<Timestamp>),
923                DeltaWriterType,
924            > = std::collections::HashMap::new();
925
926            let mut batch_description_frontier = Antichain::from_elem(Timestamp::minimum());
927            let mut processed_batch_description_frontier =
928                Antichain::from_elem(Timestamp::minimum());
929            let mut input_frontier = Antichain::from_elem(Timestamp::minimum());
930            let mut processed_input_frontier = Antichain::from_elem(Timestamp::minimum());
931
932            while !(batch_description_frontier.is_empty() && input_frontier.is_empty()) {
933                tokio::select! {
934                    _ = batch_desc_input.ready() => {},
935                    _ = input.ready() => {}
936                }
937
938                while let Some(event) = batch_desc_input.next_sync() {
939                    match event {
940                        Event::Data(_cap, data) => {
941                            for batch_desc in data {
942                                let (lower, upper) = &batch_desc;
943                                let mut delta_writer = create_delta_writer().await?;
944                                // Drain any stashed rows that belong to this batch
945                                let row_ts_keys: Vec<_> = stashed_rows.keys().cloned().collect();
946                                for row_ts in row_ts_keys {
947                                    let ts = Antichain::from_elem(row_ts.clone());
948                                    if PartialOrder::less_equal(lower, &ts)
949                                        && PartialOrder::less_than(&ts, upper)
950                                    {
951                                        if let Some(rows) = stashed_rows.remove(&row_ts) {
952                                            for (_row, diff_pair) in rows {
953                                                metrics.stashed_rows.dec();
954                                                let record_batch = row_to_recordbatch(
955                                                    diff_pair.clone(),
956                                                    Arc::clone(&arrow_schema),
957                                                    Arc::clone(&schema_with_op),
958                                                )
959                                                .context("failed to convert row to recordbatch")?;
960                                                delta_writer.write(record_batch).await.context(
961                                                    "Failed to write row to DeltaWriter",
962                                                )?;
963                                            }
964                                        }
965                                    }
966                                }
967                                let prev =
968                                    in_flight_batches.insert(batch_desc.clone(), delta_writer);
969                                if prev.is_some() {
970                                    anyhow::bail!(
971                                        "Duplicate batch description received for description {:?}",
972                                        batch_desc
973                                    );
974                                }
975                            }
976                        }
977                        Event::Progress(frontier) => {
978                            batch_description_frontier = frontier;
979                        }
980                    }
981                }
982
983                let ready_events = std::iter::from_fn(|| input.next_sync()).collect_vec();
984                for event in ready_events {
985                    match event {
986                        Event::Data(_cap, data) => {
987                            for ((row, diff_pair), ts, _diff) in data {
988                                let row_ts = ts.clone();
989                                let ts_antichain = Antichain::from_elem(row_ts.clone());
990                                let mut written = false;
991                                // Try writing the row to any in-flight batch it belongs to...
992                                for (batch_desc, delta_writer) in in_flight_batches.iter_mut() {
993                                    let (lower, upper) = batch_desc;
994                                    if PartialOrder::less_equal(lower, &ts_antichain)
995                                        && PartialOrder::less_than(&ts_antichain, upper)
996                                    {
997                                        let record_batch = row_to_recordbatch(
998                                            diff_pair.clone(),
999                                            Arc::clone(&arrow_schema),
1000                                            Arc::clone(&schema_with_op),
1001                                        )
1002                                        .context("failed to convert row to recordbatch")?;
1003                                        delta_writer
1004                                            .write(record_batch)
1005                                            .await
1006                                            .context("Failed to write row to DeltaWriter")?;
1007                                        written = true;
1008                                        break;
1009                                    }
1010                                }
1011                                if !written {
1012                                    // ...otherwise stash it for later
1013                                    let entry = stashed_rows.entry(row_ts).or_default();
1014                                    metrics.stashed_rows.inc();
1015                                    entry.push((row, diff_pair));
1016                                }
1017                            }
1018                        }
1019                        Event::Progress(frontier) => {
1020                            input_frontier = frontier;
1021                        }
1022                    }
1023                }
1024
1025                // Check if frontiers have advanced, which may unlock batches ready to close
1026                if PartialOrder::less_equal(
1027                    &processed_batch_description_frontier,
1028                    &batch_description_frontier,
1029                ) || PartialOrder::less_than(&processed_input_frontier, &input_frontier)
1030                {
1031                    // Close batches whose upper is now in the past
1032                    let ready_batches: Vec<_> = in_flight_batches
1033                        .extract_if(|(lower, upper), _| {
1034                            PartialOrder::less_than(lower, &batch_description_frontier)
1035                                && PartialOrder::less_than(upper, &input_frontier)
1036                        })
1037                        .collect();
1038
1039                    if !ready_batches.is_empty() {
1040                        let mut max_upper = Antichain::from_elem(Timestamp::minimum());
1041                        for (desc, mut delta_writer) in ready_batches {
1042                            let data_files = delta_writer
1043                                .close()
1044                                .await
1045                                .context("Failed to close DeltaWriter")?;
1046                            for data_file in data_files {
1047                                match data_file.content_type() {
1048                                    iceberg::spec::DataContentType::Data => {
1049                                        metrics.data_files_written.inc();
1050                                    }
1051                                    iceberg::spec::DataContentType::PositionDeletes
1052                                    | iceberg::spec::DataContentType::EqualityDeletes => {
1053                                        metrics.delete_files_written.inc();
1054                                    }
1055                                }
1056                                statistics.inc_messages_staged_by(data_file.record_count());
1057                                statistics.inc_bytes_staged_by(data_file.file_size_in_bytes());
1058                                let file = BoundedDataFile::new(
1059                                    data_file,
1060                                    current_schema.as_ref().clone(),
1061                                    desc.clone(),
1062                                );
1063                                output.give(&capset[0], file);
1064                            }
1065
1066                            max_upper = max_upper.join(&desc.1);
1067                        }
1068
1069                        capset.downgrade(max_upper);
1070                    }
1071                    processed_batch_description_frontier.clone_from(&batch_description_frontier);
1072                    processed_input_frontier.clone_from(&input_frontier);
1073                }
1074            }
1075            Ok(())
1076        })
1077    });
1078
1079    let statuses = errors.map(|error| HealthStatusMessage {
1080        id: None,
1081        update: HealthStatusUpdate::halting(format!("{}", error.display_with_causes()), None),
1082        namespace: StatusNamespace::Iceberg,
1083    });
1084    (output_stream, statuses, button.press_on_drop())
1085}
1086
1087/// Commit completed batches to Iceberg as snapshots.
1088/// Batches are committed in timestamp order to ensure strong consistency guarantees downstream.
1089/// Each snapshot includes the Materialize frontier in its metadata for resume support.
1090fn commit_to_iceberg<G>(
1091    name: String,
1092    sink_id: GlobalId,
1093    sink_version: u64,
1094    batch_input: &Stream<G, BoundedDataFile>,
1095    batch_desc_input: &Stream<G, (Antichain<Timestamp>, Antichain<Timestamp>)>,
1096    write_frontier: Rc<RefCell<Antichain<Timestamp>>>,
1097    connection: IcebergSinkConnection,
1098    storage_configuration: StorageConfiguration,
1099    write_handle: impl Future<
1100        Output = anyhow::Result<WriteHandle<SourceData, (), Timestamp, StorageDiff>>,
1101    > + 'static,
1102    metrics: IcebergSinkMetrics,
1103    statistics: SinkStatistics,
1104) -> (Stream<G, HealthStatusMessage>, PressOnDropButton)
1105where
1106    G: Scope<Timestamp = Timestamp>,
1107{
1108    let scope = batch_input.scope();
1109    let mut builder = OperatorBuilder::new(name, scope.clone());
1110
1111    let hashed_id = sink_id.hashed();
1112    let is_active_worker = usize::cast_from(hashed_id) % scope.peers() == scope.index();
1113
1114    let mut input = builder.new_disconnected_input(batch_input, Exchange::new(move |_| hashed_id));
1115    let mut batch_desc_input =
1116        builder.new_disconnected_input(batch_desc_input, Exchange::new(move |_| hashed_id));
1117
1118    let (button, errors) = builder.build_fallible(move |_caps| {
1119        Box::pin(async move {
1120            if !is_active_worker {
1121                write_frontier.borrow_mut().clear();
1122                return Ok(());
1123            }
1124
1125            let catalog = connection
1126                .catalog_connection
1127                .connect(&storage_configuration, InTask::Yes)
1128                .await
1129                .context("Failed to connect to iceberg catalog")?;
1130
1131            let mut write_handle = write_handle.await?;
1132
1133            let namespace_ident = NamespaceIdent::new(connection.namespace.clone());
1134            let table_ident = TableIdent::new(namespace_ident, connection.table.clone());
1135            let mut table = catalog
1136                .load_table(&table_ident)
1137                .await
1138                .context("Failed to load Iceberg table")?;
1139
1140            #[allow(clippy::disallowed_types)]
1141            let mut batch_descriptions: std::collections::HashMap<
1142                (Antichain<Timestamp>, Antichain<Timestamp>),
1143                BoundedDataFileSet,
1144            > = std::collections::HashMap::new();
1145
1146
1147            let mut batch_description_frontier = Antichain::from_elem(Timestamp::minimum());
1148            let mut input_frontier = Antichain::from_elem(Timestamp::minimum());
1149
1150            while !(batch_description_frontier.is_empty() && input_frontier.is_empty()) {
1151                tokio::select! {
1152                    _ = batch_desc_input.ready() => {},
1153                    _ = input.ready() => {}
1154                }
1155
1156                while let Some(event) = batch_desc_input.next_sync() {
1157                    match event {
1158                        Event::Data(_cap, data) => {
1159                            for batch_desc in data {
1160                                let prev = batch_descriptions.insert(batch_desc, BoundedDataFileSet { data_files: vec![] });
1161                                if let Some(prev) = prev {
1162                                    anyhow::bail!(
1163                                        "Duplicate batch description received in commit operator: {:?}",
1164                                        prev
1165                                    );
1166                                }
1167                            }
1168                        }
1169                        Event::Progress(frontier) => {
1170                            batch_description_frontier = frontier;
1171                        }
1172                    }
1173                }
1174
1175                let ready_events = std::iter::from_fn(|| input.next_sync()).collect_vec();
1176                for event in ready_events {
1177                    match event {
1178                        Event::Data(_cap, data) => {
1179                            for bounded_data_file in data {
1180                                let entry = batch_descriptions.entry(bounded_data_file.batch_desc().clone()).or_default();
1181                                entry.data_files.push(bounded_data_file);
1182                            }
1183                        }
1184                        Event::Progress(frontier) => {
1185                            input_frontier = frontier;
1186                        }
1187                    }
1188                }
1189
1190                let mut done_batches: Vec<_> = batch_descriptions
1191                    .keys()
1192                    .filter(|(lower, _upper)| {
1193                        PartialOrder::less_than(lower, &input_frontier)
1194                    })
1195                    .cloned()
1196                    .collect();
1197
1198                // Commit batches in timestamp order to maintain consistency
1199                done_batches.sort_by(|a, b| {
1200                    if PartialOrder::less_than(a, b) {
1201                        Ordering::Less
1202                    } else if PartialOrder::less_than(b, a) {
1203                        Ordering::Greater
1204                    } else {
1205                        Ordering::Equal
1206                    }
1207                });
1208
1209                for batch in done_batches {
1210                    let file_set = batch_descriptions.remove(&batch).unwrap();
1211
1212                    let mut data_files = vec![];
1213                    let mut delete_files = vec![];
1214                    // Track totals for committed statistics
1215                    let mut total_messages: u64 = 0;
1216                    let mut total_bytes: u64 = 0;
1217                    for file in file_set.data_files {
1218                        total_messages += file.data_file().record_count();
1219                        total_bytes += file.data_file().file_size_in_bytes();
1220                        match file.data_file().content_type() {
1221                            iceberg::spec::DataContentType::Data => {
1222                                data_files.push(file.into_data_file());
1223                            }
1224                            iceberg::spec::DataContentType::PositionDeletes |
1225                            iceberg::spec::DataContentType::EqualityDeletes => {
1226                                delete_files.push(file.into_data_file());
1227                            }
1228                        }
1229                    }
1230
1231                    let frontier = batch.1.clone();
1232                    let tx = Transaction::new(&table);
1233
1234                    let frontier_json = serde_json::to_string(&frontier.elements())
1235                        .context("Failed to serialize frontier to JSON")?;
1236
1237                    // Store the frontier in snapshot metadata so we can resume from this point
1238                    let mut action = tx.row_delta().set_snapshot_properties(
1239                        vec![
1240                            ("mz-sink-id".to_string(), sink_id.to_string()),
1241                            ("mz-frontier".to_string(), frontier_json),
1242                            ("mz-sink-version".to_string(), sink_version.to_string()),
1243                        ].into_iter().collect()
1244                    );
1245
1246                    if !data_files.is_empty() || !delete_files.is_empty() {
1247                        action = action.add_data_files(data_files).add_delete_files(delete_files);
1248                    }
1249
1250                    let tx = action.apply(tx).context(
1251                        "Failed to apply data file addition to iceberg table transaction",
1252                    )?;
1253
1254                    table = Retry::default().max_tries(5).retry_async(|_| async {
1255                        let new_table = tx.clone().commit(catalog.as_ref()).await;
1256                        match new_table {
1257                            Err(e) if matches!(e.kind(), ErrorKind::CatalogCommitConflicts) => {
1258                                metrics.commit_conflicts.inc();
1259                                let table = reload_table(
1260                                    catalog.as_ref(),
1261                                    connection.namespace.clone(),
1262                                    connection.table.clone(),
1263                                    table.clone(),
1264                                ).await;
1265                                let table = match table {
1266                                    Ok(table) => table,
1267                                    Err(e) => return RetryResult::RetryableErr(anyhow!(e)),
1268                                };
1269
1270                                let mut snapshots: Vec<_> = table.metadata().snapshots().cloned().collect();
1271                                let last = retrieve_upper_from_snapshots(&mut snapshots);
1272                                let last = match last {
1273                                    Ok(val) => val,
1274                                    Err(e) => return RetryResult::RetryableErr(anyhow!(e)),
1275                                };
1276
1277                                // Check if another writer has advanced the frontier beyond ours (fencing check)
1278                                if let Some((last_frontier, _last_version)) = last {
1279                                    if PartialOrder::less_equal(&frontier, &last_frontier) {
1280                                        return RetryResult::FatalErr(anyhow!(
1281                                            "Iceberg table '{}' has been modified by another writer. Current frontier: {:?}, last frontier: {:?}.",
1282                                            connection.table,
1283                                            frontier,
1284                                            last_frontier,
1285                                        ));
1286                                    }
1287                                }
1288
1289                                RetryResult::Ok(table)
1290                            }
1291                            Err(e) => {
1292                                metrics.commit_failures.inc();
1293                                RetryResult::RetryableErr(anyhow!(e))
1294                            },
1295                            Ok(table) => RetryResult::Ok(table)
1296                        }
1297                    }).await.context("failed to commit to iceberg")?;
1298
1299                    metrics.snapshots_committed.inc();
1300                    statistics.inc_messages_committed_by(total_messages);
1301                    statistics.inc_bytes_committed_by(total_bytes);
1302
1303                    let mut expect_upper = write_handle.shared_upper();
1304                    loop {
1305                        if PartialOrder::less_equal(&frontier, &expect_upper) {
1306                            // The frontier has already been advanced as far as necessary.
1307                            break;
1308                        }
1309
1310                        const EMPTY: &[((SourceData, ()), Timestamp, StorageDiff)] = &[];
1311                        match write_handle
1312                            .compare_and_append(EMPTY, expect_upper, frontier.clone())
1313                            .await
1314                            .expect("valid usage")
1315                        {
1316                            Ok(()) => break,
1317                            Err(mismatch) => {
1318                                expect_upper = mismatch.current;
1319                            }
1320                        }
1321                    }
1322                    write_frontier.borrow_mut().clone_from(&frontier);
1323
1324                }
1325            }
1326
1327
1328            Ok(())
1329        })
1330    });
1331
1332    let statuses = errors.map(|error| HealthStatusMessage {
1333        id: None,
1334        update: HealthStatusUpdate::halting(format!("{}", error.display_with_causes()), None),
1335        namespace: StatusNamespace::Iceberg,
1336    });
1337
1338    (statuses, button.press_on_drop())
1339}
1340
1341impl<G: Scope<Timestamp = Timestamp>> SinkRender<G> for IcebergSinkConnection {
1342    fn get_key_indices(&self) -> Option<&[usize]> {
1343        self.key_desc_and_indices
1344            .as_ref()
1345            .map(|(_, indices)| indices.as_slice())
1346    }
1347
1348    fn get_relation_key_indices(&self) -> Option<&[usize]> {
1349        self.relation_key_indices.as_deref()
1350    }
1351
1352    fn render_sink(
1353        &self,
1354        storage_state: &mut StorageState,
1355        sink: &StorageSinkDesc<CollectionMetadata, Timestamp>,
1356        sink_id: GlobalId,
1357        input: VecCollection<G, (Option<Row>, DiffPair<Row>), Diff>,
1358        _err_collection: VecCollection<G, DataflowError, Diff>,
1359    ) -> (Stream<G, HealthStatusMessage>, Vec<PressOnDropButton>) {
1360        let mut scope = input.scope();
1361
1362        let write_handle = {
1363            let persist = Arc::clone(&storage_state.persist_clients);
1364            let shard_meta = sink.to_storage_metadata.clone();
1365            async move {
1366                let client = persist.open(shard_meta.persist_location).await?;
1367                let handle = client
1368                    .open_writer(
1369                        shard_meta.data_shard,
1370                        Arc::new(shard_meta.relation_desc),
1371                        Arc::new(UnitSchema),
1372                        Diagnostics::from_purpose("sink handle"),
1373                    )
1374                    .await?;
1375                Ok(handle)
1376            }
1377        };
1378
1379        let write_frontier = Rc::new(RefCell::new(Antichain::from_elem(Timestamp::minimum())));
1380        storage_state
1381            .sink_write_frontiers
1382            .insert(sink_id, Rc::clone(&write_frontier));
1383
1384        let (arrow_schema_with_ids, iceberg_schema) =
1385            match relation_desc_to_iceberg_schema(&sink.from_desc) {
1386                Ok(schemas) => schemas,
1387                Err(err) => {
1388                    let error_stream = std::iter::once(HealthStatusMessage {
1389                        id: None,
1390                        update: HealthStatusUpdate::halting(
1391                            format!("{}", err.display_with_causes()),
1392                            None,
1393                        ),
1394                        namespace: StatusNamespace::Iceberg,
1395                    })
1396                    .to_stream(&mut scope);
1397                    return (error_stream, vec![]);
1398                }
1399            };
1400
1401        let metrics = storage_state
1402            .metrics
1403            .get_iceberg_sink_metrics(sink_id, scope.index());
1404
1405        let statistics = storage_state
1406            .aggregated_statistics
1407            .get_sink(&sink_id)
1408            .expect("statistics initialized")
1409            .clone();
1410
1411        let connection_for_minter = self.clone();
1412        let (minted_input, batch_descriptions, mint_status, mint_button) = mint_batch_descriptions(
1413            format!("{sink_id}-iceberg-mint"),
1414            sink_id,
1415            &input,
1416            sink,
1417            connection_for_minter,
1418            storage_state.storage_configuration.clone(),
1419            Arc::clone(&iceberg_schema),
1420        );
1421
1422        let connection_for_writer = self.clone();
1423        let (datafiles, write_status, write_button) = write_data_files(
1424            format!("{sink_id}-write-data-files"),
1425            minted_input,
1426            &batch_descriptions,
1427            connection_for_writer,
1428            storage_state.storage_configuration.clone(),
1429            Arc::new(arrow_schema_with_ids.clone()),
1430            metrics.clone(),
1431            statistics.clone(),
1432        );
1433
1434        let connection_for_committer = self.clone();
1435        let (commit_status, commit_button) = commit_to_iceberg(
1436            format!("{sink_id}-commit-to-iceberg"),
1437            sink_id,
1438            sink.version,
1439            &datafiles,
1440            &batch_descriptions,
1441            Rc::clone(&write_frontier),
1442            connection_for_committer,
1443            storage_state.storage_configuration.clone(),
1444            write_handle,
1445            metrics.clone(),
1446            statistics,
1447        );
1448
1449        let running_status = Some(HealthStatusMessage {
1450            id: None,
1451            update: HealthStatusUpdate::running(),
1452            namespace: StatusNamespace::Iceberg,
1453        })
1454        .to_stream(&mut scope);
1455
1456        let statuses =
1457            scope.concatenate([running_status, mint_status, write_status, commit_status]);
1458
1459        (statuses, vec![mint_button, write_button, commit_button])
1460    }
1461}