Skip to main content

mz_storage/sink/
iceberg.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Iceberg sink implementation.
11//!
12//! This code renders a [`IcebergSinkConnection`] into a dataflow that writes
13//! data to an Iceberg table. The dataflow consists of three operators:
14//!
15//! ```text
16//!        ┏━━━━━━━━━━━━━━┓
17//!        ┃   persist    ┃
18//!        ┃    source    ┃
19//!        ┗━━━━━━┯━━━━━━━┛
20//!               │ row data, the input to this module
21//!               │
22//!        ┏━━━━━━v━━━━━━━┓
23//!        ┃     mint     ┃ (single worker)
24//!        ┃    batch     ┃ loads/creates the Iceberg table,
25//!        ┃ descriptions ┃ determines resume upper
26//!        ┗━━━┯━━━━━┯━━━━┛
27//!            │     │ batch descriptions (broadcast)
28//!       rows │     ├─────────────────────────┐
29//!            │     │                         │
30//!        ┏━━━v━━━━━v━━━━┓    ╭─────────────╮ │
31//!        ┃    write     ┃───>│ S3 / object │ │
32//!        ┃  data files  ┃    │   storage   │ │
33//!        ┗━━━━━━┯━━━━━━━┛    ╰─────────────╯ │
34//!               │ file metadata              │
35//!               │                            │
36//!        ┏━━━━━━v━━━━━━━━━━━━━━━━━━━━━━━━━━━━v┓
37//!        ┃           commit to               ┃ (single worker)
38//!        ┃             iceberg               ┃
39//!        ┗━━━━━━━━━━━━━━┯━━━━━━━━━━━━━━━━━━━━━┛
40//!                      │
41//!              ╭───────v───────╮
42//!              │ Iceberg table │
43//!              │  (snapshots)  │
44//!              ╰───────────────╯
45//! ```
46//! # Minting batch descriptions
47//! The "mint batch descriptions" operator is responsible for generating
48//! time-based batch boundaries that group writes into Iceberg snapshots.
49//! It maintains a sliding window of future batch descriptions so that
50//! writers can start processing data even while earlier batches are still being written.
51//! Knowing the batch boundaries ahead of time is important because we need to
52//! be able to make the claim that all data files written for a given batch
53//! include all data up to the upper `t` but not beyond it.
54//! This could be trivially achieved by waiting for all data to arrive up to a certain
55//! frontier, but that would prevent us from streaming writes out to object storage
56//! until the entire batch is complete, which would increase latency and reduce throughput.
57//!
58//! # Writing data files
59//! The "write data files" operator receives rows along with batch descriptions.
60//! It matches rows to batches by timestamp; if a batch description hasn't arrived yet,
61//! rows are stashed until it does. This allows batches to be minted ahead of data arrival.
62//! The operator uses an Iceberg `DeltaWriter` to write Parquet data files
63//! (and position delete files if necessary) to object storage.
64//! It outputs metadata about the written files along with their batch descriptions
65//! for the commit operator to consume.
66//!
67//! # Committing to Iceberg
68//! The "commit to iceberg" operator receives metadata about written data files
69//! along with their batch descriptions. It groups files by batch and creates
70//! Iceberg snapshots that include all files for each batch. It updates the Iceberg
71//! table's metadata to reflect the new snapshots, including updating the
72//! `mz-frontier` property to track progress.
73
74use std::cmp::Ordering;
75use std::collections::{BTreeMap, VecDeque};
76use std::{cell::RefCell, rc::Rc, sync::Arc};
77
78use anyhow::{Context, anyhow};
79use arrow::array::{ArrayRef, Int32Array, RecordBatch};
80use arrow::datatypes::{DataType, Field, Schema as ArrowSchema, SchemaRef as ArrowSchemaRef};
81use differential_dataflow::lattice::Lattice;
82use differential_dataflow::{AsCollection, Hashable, VecCollection};
83use futures::StreamExt;
84use iceberg::ErrorKind;
85use iceberg::arrow::{arrow_schema_to_schema, schema_to_arrow_schema};
86use iceberg::spec::{
87    DataFile, FormatVersion, Snapshot, StructType, read_data_files_from_avro,
88    write_data_files_to_avro,
89};
90use iceberg::spec::{Schema, SchemaRef};
91use iceberg::table::Table;
92use iceberg::transaction::{ApplyTransactionAction, Transaction};
93use iceberg::writer::base_writer::data_file_writer::DataFileWriter;
94use iceberg::writer::base_writer::data_file_writer::DataFileWriterBuilder;
95use iceberg::writer::base_writer::equality_delete_writer::{
96    EqualityDeleteFileWriter, EqualityDeleteFileWriterBuilder, EqualityDeleteWriterConfig,
97};
98use iceberg::writer::base_writer::position_delete_writer::PositionDeleteFileWriter;
99use iceberg::writer::base_writer::position_delete_writer::{
100    PositionDeleteFileWriterBuilder, PositionDeleteWriterConfig,
101};
102use iceberg::writer::combined_writer::delta_writer::{DeltaWriter, DeltaWriterBuilder};
103use iceberg::writer::file_writer::ParquetWriterBuilder;
104use iceberg::writer::file_writer::location_generator::{
105    DefaultFileNameGenerator, DefaultLocationGenerator,
106};
107use iceberg::writer::file_writer::rolling_writer::RollingFileWriterBuilder;
108use iceberg::writer::{IcebergWriter, IcebergWriterBuilder};
109use iceberg::{Catalog, NamespaceIdent, TableCreation, TableIdent};
110use itertools::Itertools;
111use mz_arrow_util::builder::{ARROW_EXTENSION_NAME_KEY, ArrowBuilder};
112use mz_interchange::avro::DiffPair;
113use mz_ore::cast::CastFrom;
114use mz_ore::error::ErrorExt;
115use mz_ore::future::InTask;
116use mz_ore::result::ResultExt;
117use mz_ore::retry::{Retry, RetryResult};
118use mz_persist_client::Diagnostics;
119use mz_persist_client::write::WriteHandle;
120use mz_persist_types::codec_impls::UnitSchema;
121use mz_repr::{Diff, GlobalId, Row, Timestamp};
122use mz_storage_types::StorageDiff;
123use mz_storage_types::configuration::StorageConfiguration;
124use mz_storage_types::controller::CollectionMetadata;
125use mz_storage_types::errors::DataflowError;
126use mz_storage_types::sinks::{IcebergSinkConnection, StorageSinkDesc};
127use mz_storage_types::sources::SourceData;
128use mz_timely_util::antichain::AntichainExt;
129use mz_timely_util::builder_async::{Event, OperatorBuilder, PressOnDropButton};
130use parquet::arrow::PARQUET_FIELD_ID_META_KEY;
131use parquet::file::properties::WriterProperties;
132use serde::{Deserialize, Serialize};
133use timely::PartialOrder;
134use timely::container::CapacityContainerBuilder;
135use timely::dataflow::channels::pact::{Exchange, Pipeline};
136use timely::dataflow::operators::{Broadcast, CapabilitySet, Concatenate, Map, ToStream};
137use timely::dataflow::{Scope, Stream};
138use timely::progress::{Antichain, Timestamp as _};
139
140use crate::healthcheck::{HealthStatusMessage, HealthStatusUpdate, StatusNamespace};
141use crate::metrics::sink::iceberg::IcebergSinkMetrics;
142use crate::render::sinks::SinkRender;
143use crate::statistics::SinkStatistics;
144use crate::storage_state::StorageState;
145
146/// Set the default capacity for the array builders inside the ArrowBuilder. This is the
147/// number of items each builder can hold before it needs to allocate more memory.
148const DEFAULT_ARRAY_BUILDER_ITEM_CAPACITY: usize = 1024;
149/// Set the default buffer capacity for the string and binary array builders inside the
150/// ArrowBuilder. This is the number of bytes each builder can hold before it needs to allocate
151/// more memory.
152const DEFAULT_ARRAY_BUILDER_DATA_CAPACITY: usize = 1024;
153
154/// The prefix for Parquet files written by this sink.
155const PARQUET_FILE_PREFIX: &str = "mz_data";
156/// The number of batch descriptions to mint ahead of the observed frontier. This determines how
157/// many batches we have in-flight at any given time.
158const INITIAL_DESCRIPTIONS_TO_MINT: u64 = 3;
159
160type DeltaWriterType = DeltaWriter<
161    DataFileWriter<ParquetWriterBuilder, DefaultLocationGenerator, DefaultFileNameGenerator>,
162    PositionDeleteFileWriter<
163        ParquetWriterBuilder,
164        DefaultLocationGenerator,
165        DefaultFileNameGenerator,
166    >,
167    EqualityDeleteFileWriter<
168        ParquetWriterBuilder,
169        DefaultLocationGenerator,
170        DefaultFileNameGenerator,
171    >,
172>;
173
174/// Add Parquet field IDs to an Arrow schema. Iceberg requires field IDs in the
175/// Parquet metadata for schema evolution tracking. Field IDs are assigned
176/// recursively to all nested fields (structs, lists, maps) using a depth-first,
177/// pre-order traversal.
178fn add_field_ids_to_arrow_schema(schema: ArrowSchema) -> ArrowSchema {
179    let mut next_field_id = 1i32;
180    let fields: Vec<Field> = schema
181        .fields()
182        .iter()
183        .map(|field| add_field_ids_recursive(field, &mut next_field_id))
184        .collect();
185    ArrowSchema::new(fields).with_metadata(schema.metadata().clone())
186}
187
188/// Recursively add field IDs to a field and all its nested children.
189fn add_field_ids_recursive(field: &Field, next_id: &mut i32) -> Field {
190    let current_id = *next_id;
191    *next_id += 1;
192
193    let mut metadata = field.metadata().clone();
194    metadata.insert(
195        PARQUET_FIELD_ID_META_KEY.to_string(),
196        current_id.to_string(),
197    );
198
199    let new_data_type = add_field_ids_to_datatype(field.data_type(), next_id);
200
201    Field::new(field.name(), new_data_type, field.is_nullable()).with_metadata(metadata)
202}
203
204/// Add field IDs to nested fields within a DataType.
205fn add_field_ids_to_datatype(data_type: &DataType, next_id: &mut i32) -> DataType {
206    match data_type {
207        DataType::Struct(fields) => {
208            let new_fields: Vec<Field> = fields
209                .iter()
210                .map(|f| add_field_ids_recursive(f, next_id))
211                .collect();
212            DataType::Struct(new_fields.into())
213        }
214        DataType::List(element_field) => {
215            let new_element = add_field_ids_recursive(element_field, next_id);
216            DataType::List(Arc::new(new_element))
217        }
218        DataType::LargeList(element_field) => {
219            let new_element = add_field_ids_recursive(element_field, next_id);
220            DataType::LargeList(Arc::new(new_element))
221        }
222        DataType::Map(entries_field, sorted) => {
223            let new_entries = add_field_ids_recursive(entries_field, next_id);
224            DataType::Map(Arc::new(new_entries), *sorted)
225        }
226        _ => data_type.clone(),
227    }
228}
229
230/// Merge Materialize extension metadata into Iceberg's Arrow schema.
231/// This uses Iceberg's data types (e.g. Utf8) and field IDs while preserving
232/// Materialize's extension names for ArrowBuilder compatibility.
233/// Handles nested types (structs, lists, maps) recursively.
234fn merge_materialize_metadata_into_iceberg_schema(
235    materialize_arrow_schema: &ArrowSchema,
236    iceberg_schema: &Schema,
237) -> anyhow::Result<ArrowSchema> {
238    // First, convert Iceberg schema to Arrow (this gives us the correct data types)
239    let iceberg_arrow_schema = schema_to_arrow_schema(iceberg_schema)
240        .context("Failed to convert Iceberg schema to Arrow schema")?;
241
242    // Now merge in the Materialize extension metadata
243    let fields: Vec<Field> = iceberg_arrow_schema
244        .fields()
245        .iter()
246        .map(|iceberg_field| {
247            // Find the corresponding Materialize field by name to get extension metadata
248            let mz_field = materialize_arrow_schema
249                .field_with_name(iceberg_field.name())
250                .with_context(|| {
251                    format!(
252                        "Field '{}' not found in Materialize schema",
253                        iceberg_field.name()
254                    )
255                })?;
256
257            merge_field_metadata_recursive(iceberg_field, Some(mz_field))
258        })
259        .collect::<anyhow::Result<Vec<_>>>()?;
260
261    Ok(ArrowSchema::new(fields).with_metadata(iceberg_arrow_schema.metadata().clone()))
262}
263
264/// Recursively merge Materialize extension metadata into an Iceberg field.
265fn merge_field_metadata_recursive(
266    iceberg_field: &Field,
267    mz_field: Option<&Field>,
268) -> anyhow::Result<Field> {
269    // Start with Iceberg field's metadata (which includes field IDs)
270    let mut metadata = iceberg_field.metadata().clone();
271
272    // Add Materialize extension name if available
273    if let Some(mz_f) = mz_field {
274        if let Some(extension_name) = mz_f.metadata().get(ARROW_EXTENSION_NAME_KEY) {
275            metadata.insert(ARROW_EXTENSION_NAME_KEY.to_string(), extension_name.clone());
276        }
277    }
278
279    // Recursively process nested types
280    let new_data_type = match iceberg_field.data_type() {
281        DataType::Struct(iceberg_fields) => {
282            let mz_struct_fields = match mz_field {
283                Some(f) => match f.data_type() {
284                    DataType::Struct(fields) => Some(fields),
285                    other => anyhow::bail!(
286                        "Type mismatch for field '{}': Iceberg schema has Struct, but Materialize schema has {:?}",
287                        iceberg_field.name(),
288                        other
289                    ),
290                },
291                None => None,
292            };
293
294            let new_fields: Vec<Field> = iceberg_fields
295                .iter()
296                .map(|iceberg_inner| {
297                    let mz_inner = mz_struct_fields.and_then(|fields| {
298                        fields.iter().find(|f| f.name() == iceberg_inner.name())
299                    });
300                    merge_field_metadata_recursive(iceberg_inner, mz_inner.map(|f| f.as_ref()))
301                })
302                .collect::<anyhow::Result<Vec<_>>>()?;
303
304            DataType::Struct(new_fields.into())
305        }
306        DataType::List(iceberg_element) => {
307            let mz_element = match mz_field {
308                Some(f) => match f.data_type() {
309                    DataType::List(element) => Some(element.as_ref()),
310                    other => anyhow::bail!(
311                        "Type mismatch for field '{}': Iceberg schema has List, but Materialize schema has {:?}",
312                        iceberg_field.name(),
313                        other
314                    ),
315                },
316                None => None,
317            };
318            let new_element = merge_field_metadata_recursive(iceberg_element, mz_element)?;
319            DataType::List(Arc::new(new_element))
320        }
321        DataType::LargeList(iceberg_element) => {
322            let mz_element = match mz_field {
323                Some(f) => match f.data_type() {
324                    DataType::LargeList(element) => Some(element.as_ref()),
325                    other => anyhow::bail!(
326                        "Type mismatch for field '{}': Iceberg schema has LargeList, but Materialize schema has {:?}",
327                        iceberg_field.name(),
328                        other
329                    ),
330                },
331                None => None,
332            };
333            let new_element = merge_field_metadata_recursive(iceberg_element, mz_element)?;
334            DataType::LargeList(Arc::new(new_element))
335        }
336        DataType::Map(iceberg_entries, sorted) => {
337            let mz_entries = match mz_field {
338                Some(f) => match f.data_type() {
339                    DataType::Map(entries, _) => Some(entries.as_ref()),
340                    other => anyhow::bail!(
341                        "Type mismatch for field '{}': Iceberg schema has Map, but Materialize schema has {:?}",
342                        iceberg_field.name(),
343                        other
344                    ),
345                },
346                None => None,
347            };
348            let new_entries = merge_field_metadata_recursive(iceberg_entries, mz_entries)?;
349            DataType::Map(Arc::new(new_entries), *sorted)
350        }
351        other => other.clone(),
352    };
353
354    Ok(Field::new(
355        iceberg_field.name(),
356        new_data_type,
357        iceberg_field.is_nullable(),
358    )
359    .with_metadata(metadata))
360}
361
362async fn reload_table(
363    catalog: &dyn Catalog,
364    namespace: String,
365    table_name: String,
366    current_table: Table,
367) -> anyhow::Result<Table> {
368    let namespace_ident = NamespaceIdent::new(namespace.clone());
369    let table_ident = TableIdent::new(namespace_ident, table_name.clone());
370    let current_schema = current_table.metadata().current_schema_id();
371    let current_partition_spec = current_table.metadata().default_partition_spec_id();
372
373    match catalog.load_table(&table_ident).await {
374        Ok(table) => {
375            let reloaded_schema = table.metadata().current_schema_id();
376            let reloaded_partition_spec = table.metadata().default_partition_spec_id();
377            if reloaded_schema != current_schema {
378                return Err(anyhow::anyhow!(
379                    "Iceberg table '{}' schema changed during operation but schema evolution isn't supported, expected schema ID {}, got {}",
380                    table_name,
381                    current_schema,
382                    reloaded_schema
383                ));
384            }
385
386            if reloaded_partition_spec != current_partition_spec {
387                return Err(anyhow::anyhow!(
388                    "Iceberg table '{}' partition spec changed during operation but partition spec evolution isn't supported, expected partition spec ID {}, got {}",
389                    table_name,
390                    current_partition_spec,
391                    reloaded_partition_spec
392                ));
393            }
394
395            Ok(table)
396        }
397        Err(err) => Err(err).context("Failed to reload Iceberg table"),
398    }
399}
400
401/// Load an existing Iceberg table or create it if it doesn't exist.
402async fn load_or_create_table(
403    catalog: &dyn Catalog,
404    namespace: String,
405    table_name: String,
406    schema: &Schema,
407) -> anyhow::Result<iceberg::table::Table> {
408    let namespace_ident = NamespaceIdent::new(namespace.clone());
409    let table_ident = TableIdent::new(namespace_ident.clone(), table_name.clone());
410
411    // Try to load the table first
412    match catalog.load_table(&table_ident).await {
413        Ok(table) => {
414            // Table exists, return it
415            // TODO: Add proper schema evolution/validation to ensure compatibility
416            Ok(table)
417        }
418        Err(err) => {
419            if matches!(err.kind(), ErrorKind::TableNotFound { .. })
420                || err
421                    .message()
422                    .contains("Tried to load a table that does not exist")
423            {
424                // Table doesn't exist, create it
425                // Note: location is not specified, letting the catalog determine the default location
426                // based on its warehouse configuration
427                let table_creation = TableCreation::builder()
428                    .name(table_name.clone())
429                    .schema(schema.clone())
430                    // Use unpartitioned spec by default
431                    // TODO: Consider making partition spec configurable
432                    // .partition_spec(UnboundPartitionSpec::builder().build())
433                    .build();
434
435                catalog
436                    .create_table(&namespace_ident, table_creation)
437                    .await
438                    .with_context(|| {
439                        format!(
440                            "Failed to create Iceberg table '{}' in namespace '{}'",
441                            table_name, namespace
442                        )
443                    })
444            } else {
445                // Some other error occurred
446                Err(err).context("Failed to load Iceberg table")
447            }
448        }
449    }
450}
451
452/// Find the most recent Materialize frontier from Iceberg snapshots.
453/// We store the frontier in snapshot metadata to track where we left off after restarts.
454/// Snapshots with operation="replace" (compactions) don't have our metadata and are skipped.
455/// The input slice will be sorted by sequence number in descending order.
456fn retrieve_upper_from_snapshots(
457    snapshots: &mut [Arc<Snapshot>],
458) -> anyhow::Result<Option<(Antichain<Timestamp>, u64)>> {
459    snapshots.sort_by(|a, b| Ord::cmp(&b.sequence_number(), &a.sequence_number()));
460
461    for snapshot in snapshots {
462        let props = &snapshot.summary().additional_properties;
463        if let (Some(frontier_json), Some(sink_version_str)) =
464            (props.get("mz-frontier"), props.get("mz-sink-version"))
465        {
466            let frontier: Vec<Timestamp> = serde_json::from_str(frontier_json)
467                .context("Failed to deserialize frontier from snapshot properties")?;
468            let frontier = Antichain::from_iter(frontier);
469
470            let sink_version = sink_version_str
471                .parse::<u64>()
472                .context("Failed to parse mz-sink-version from snapshot properties")?;
473
474            return Ok(Some((frontier, sink_version)));
475        }
476        if snapshot.summary().operation.as_str() != "replace" {
477            // This is a bad heuristic, but we have no real other way to identify compactions
478            // right now other than assume they will be the only operation writing "replace" operations.
479            // That means if we find a snapshot with some other operation, but no mz-frontier, we are in an
480            // inconsistent state and have to error out.
481            anyhow::bail!(
482                "Iceberg table is in an inconsistent state: snapshot {} has operation '{}' but is missing 'mz-frontier' property. Schema or partition spec evolution is not supported.",
483                snapshot.snapshot_id(),
484                snapshot.summary().operation.as_str(),
485            );
486        }
487    }
488
489    Ok(None)
490}
491
492/// Convert a Materialize RelationDesc into Arrow and Iceberg schemas.
493/// Returns the Arrow schema (with field IDs) for writing Parquet files,
494/// and the Iceberg schema for table creation/validation.
495fn relation_desc_to_iceberg_schema(
496    desc: &mz_repr::RelationDesc,
497) -> anyhow::Result<(ArrowSchema, SchemaRef)> {
498    let arrow_schema = mz_arrow_util::builder::desc_to_schema(desc)
499        .context("Failed to convert RelationDesc to Arrow schema")?;
500
501    let arrow_schema_with_ids = add_field_ids_to_arrow_schema(arrow_schema);
502
503    let iceberg_schema = arrow_schema_to_schema(&arrow_schema_with_ids)
504        .context("Failed to convert Arrow schema to Iceberg schema")?;
505
506    Ok((arrow_schema_with_ids, Arc::new(iceberg_schema)))
507}
508
509/// Build a new Arrow schema by adding an __op column to the existing schema.
510fn build_schema_with_op_column(schema: &ArrowSchema) -> ArrowSchema {
511    let mut fields: Vec<Arc<Field>> = schema.fields().iter().cloned().collect();
512    fields.push(Arc::new(Field::new("__op", DataType::Int32, false)));
513    ArrowSchema::new(fields)
514}
515
516/// Convert a Materialize DiffPair into an Arrow RecordBatch with an __op column.
517/// The __op column indicates whether each row is an insert (1) or delete (-1), which
518/// the DeltaWriter uses to generate the appropriate Iceberg data/delete files.
519fn row_to_recordbatch(
520    row: DiffPair<Row>,
521    schema: ArrowSchemaRef,
522    schema_with_op: ArrowSchemaRef,
523) -> anyhow::Result<RecordBatch> {
524    let mut builder = ArrowBuilder::new_with_schema(
525        Arc::clone(&schema),
526        DEFAULT_ARRAY_BUILDER_ITEM_CAPACITY,
527        DEFAULT_ARRAY_BUILDER_DATA_CAPACITY,
528    )
529    .context("Failed to create builder")?;
530
531    let mut op_values = Vec::new();
532
533    if let Some(before) = row.before {
534        builder
535            .add_row(&before)
536            .context("Failed to add delete row to builder")?;
537        op_values.push(-1i32); // Delete operation
538    }
539    if let Some(after) = row.after {
540        builder
541            .add_row(&after)
542            .context("Failed to add insert row to builder")?;
543        op_values.push(1i32); // Insert operation
544    }
545
546    let batch = builder
547        .to_record_batch()
548        .context("Failed to create record batch")?;
549
550    let mut columns: Vec<ArrayRef> = batch.columns().to_vec();
551    let op_column = Arc::new(Int32Array::from(op_values));
552    columns.push(op_column);
553
554    let batch_with_op = RecordBatch::try_new(schema_with_op, columns)
555        .context("Failed to create batch with op column")?;
556    Ok(batch_with_op)
557}
558
559/// Generate time-based batch boundaries for grouping writes into Iceberg snapshots.
560/// Batches are minted with configurable windows to balance write efficiency with latency.
561/// We maintain a sliding window of future batch descriptions so writers can start
562/// processing data even while earlier batches are still being written.
563fn mint_batch_descriptions<G, D>(
564    name: String,
565    sink_id: GlobalId,
566    input: &VecCollection<G, D, Diff>,
567    sink: &StorageSinkDesc<CollectionMetadata, Timestamp>,
568    connection: IcebergSinkConnection,
569    storage_configuration: StorageConfiguration,
570    initial_schema: SchemaRef,
571) -> (
572    VecCollection<G, D, Diff>,
573    Stream<G, (Antichain<Timestamp>, Antichain<Timestamp>)>,
574    Stream<G, HealthStatusMessage>,
575    PressOnDropButton,
576)
577where
578    G: Scope<Timestamp = Timestamp>,
579    D: Clone + 'static,
580{
581    let scope = input.scope();
582    let name_for_error = name.clone();
583    let mut builder = OperatorBuilder::new(name, scope.clone());
584    let sink_version = sink.version;
585
586    let hashed_id = sink_id.hashed();
587    let is_active_worker = usize::cast_from(hashed_id) % scope.peers() == scope.index();
588    let (output, output_stream) = builder.new_output();
589    let (batch_desc_output, batch_desc_stream) =
590        builder.new_output::<CapacityContainerBuilder<_>>();
591    let mut input =
592        builder.new_input_for_many(&input.inner, Pipeline, [&output, &batch_desc_output]);
593
594    let as_of = sink.as_of.clone();
595    let commit_interval = sink
596        .commit_interval
597        .expect("the planner should have enforced this")
598        .clone();
599
600    let (button, errors): (_, Stream<G, Rc<anyhow::Error>>) = builder.build_fallible(move |caps| {
601        Box::pin(async move {
602            let [data_capset, capset]: &mut [_; 2] = caps.try_into().unwrap();
603            *data_capset = CapabilitySet::new();
604
605            if !is_active_worker {
606                *capset = CapabilitySet::new();
607                *data_capset = CapabilitySet::new();
608                while let Some(event) = input.next().await {
609                    match event {
610                        Event::Data([output_cap, _], mut data) => {
611                            output.give_container(&output_cap, &mut data);
612                        }
613                        Event::Progress(_) => {}
614                    }
615                }
616                return Ok(());
617            }
618
619            let catalog = connection
620                .catalog_connection
621                .connect(&storage_configuration, InTask::Yes)
622                .await
623                .context("Failed to connect to iceberg catalog")?;
624
625            let table = load_or_create_table(
626                catalog.as_ref(),
627                connection.namespace.clone(),
628                connection.table.clone(),
629                initial_schema.as_ref(),
630            )
631            .await?;
632
633            let mut snapshots: Vec<_> = table.metadata().snapshots().cloned().collect();
634            let resume = retrieve_upper_from_snapshots(&mut snapshots)?;
635            let (resume_upper, resume_version) = match resume {
636                Some((f, v)) => (f, v),
637                None => (Antichain::from_elem(Timestamp::minimum()), 0),
638            };
639
640            // The input has overcompacted if
641            let overcompacted =
642                // ..we have made some progress in the past
643                *resume_upper != [Timestamp::minimum()] &&
644                // ..but the since frontier is now beyond that
645                PartialOrder::less_than(&resume_upper, &as_of);
646
647            if overcompacted {
648                let err = format!(
649                    "{name_for_error}: input compacted past resume upper: as_of {}, resume_upper: {}",
650                    as_of.pretty(),
651                    resume_upper.pretty()
652                );
653                // This would normally be an assertion but because it can happen after a
654                // Materialize backup/restore we log an error so that it appears on Sentry but
655                // leaves the rest of the objects in the cluster unaffected.
656                return Err(anyhow::anyhow!("{err}"));
657            };
658
659            if resume_version > sink_version {
660                anyhow::bail!("Fenced off by newer sink version: resume_version {}, sink_version {}", resume_version, sink_version);
661            }
662
663            let mut initialized = false;
664            let mut observed_frontier;
665            // Track minted batches to maintain a sliding window of open batch descriptions.
666            // This is needed to know when to retire old batches and mint new ones.
667            // It's "sortedness" is derived from the monotonicity of batch descriptions,
668            // and the fact that we only ever push new descriptions to the back and pop from the front.
669            let mut minted_batches = VecDeque::new();
670            loop {
671                if let Some(event) = input.next().await {
672                    match event {
673                        Event::Data([output_cap, _], mut data) => {
674                            output.give_container(&output_cap, &mut data);
675                            continue;
676                        }
677                        Event::Progress(frontier) => {
678                            observed_frontier = frontier;
679                        }
680                    }
681                } else {
682                    return Ok(());
683                }
684
685                if !initialized {
686                    // We only start minting after we've reached as_of and resume_upper to avoid
687                    // minting batches that would be immediately skipped.
688                    if observed_frontier.is_empty() {
689                        // Input stream closed before initialization completed
690                        return Ok(());
691                    }
692                    if PartialOrder::less_equal(&observed_frontier, &resume_upper) || PartialOrder::less_equal(&observed_frontier, &as_of) {
693                        continue;
694                    }
695
696                    let mut batch_descriptions = vec![];
697                    let mut current_upper = observed_frontier.clone();
698                    let current_upper_ts = current_upper.as_option().expect("frontier not empty").clone();
699
700                    // If we're resuming, create a catch-up batch from resume_upper to current frontier
701                    if PartialOrder::less_than(&resume_upper, &current_upper) {
702                        let batch_description = (resume_upper.clone(), current_upper.clone());
703                        batch_descriptions.push(batch_description);
704                    }
705
706                    // Mint initial future batch descriptions at configurable intervals
707                    for i in 1..INITIAL_DESCRIPTIONS_TO_MINT + 1 {
708                        let duration_millis = commit_interval.as_millis()
709                            .checked_mul(u128::from(i))
710                            .expect("commit interval multiplication overflow");
711                        let duration_ts = Timestamp::new(u64::try_from(duration_millis)
712                            .expect("commit interval too large for u64"));
713                        let desired_batch_upper = Antichain::from_elem(current_upper_ts.step_forward_by(&duration_ts));
714
715                        let batch_description = (current_upper.clone(), desired_batch_upper);
716                        current_upper = batch_description.1.clone();
717                        batch_descriptions.push(batch_description);
718                    }
719
720                    minted_batches.extend(batch_descriptions.clone());
721
722                    for desc in batch_descriptions {
723                        batch_desc_output.give(&capset[0], desc);
724                    }
725
726                    capset.downgrade(current_upper);
727
728                    initialized = true;
729                } else {
730                    // Maintain a sliding window: when the oldest batch becomes ready, retire it
731                    // and mint a new future batch to keep the pipeline full
732                    while let Some(oldest_desc) = minted_batches.front() {
733                        let oldest_upper = &oldest_desc.1;
734                        if !PartialOrder::less_equal(oldest_upper, &observed_frontier) {
735                            break;
736                        }
737
738                        let newest_upper = minted_batches.back().unwrap().1.clone();
739                        let new_lower = newest_upper.clone();
740                        let duration_ts = Timestamp::new(commit_interval.as_millis()
741                            .try_into()
742                            .expect("commit interval too large for u64"));
743                        let new_upper = Antichain::from_elem(newest_upper
744                            .as_option()
745                            .unwrap()
746                            .step_forward_by(&duration_ts));
747
748                        let new_batch_description = (new_lower.clone(), new_upper.clone());
749                        minted_batches.pop_front();
750                        minted_batches.push_back(new_batch_description.clone());
751
752                        batch_desc_output.give(&capset[0], new_batch_description);
753
754                        capset.downgrade(new_upper);
755                    }
756                }
757            }
758        })
759    });
760
761    let statuses = errors.map(|error| HealthStatusMessage {
762        id: None,
763        update: HealthStatusUpdate::halting(format!("{}", error.display_with_causes()), None),
764        namespace: StatusNamespace::Iceberg,
765    });
766    (
767        output_stream.as_collection(),
768        batch_desc_stream,
769        statuses,
770        button.press_on_drop(),
771    )
772}
773
774#[derive(Clone, Debug, Serialize, Deserialize)]
775#[serde(try_from = "AvroDataFile", into = "AvroDataFile")]
776struct SerializableDataFile {
777    pub data_file: DataFile,
778    pub schema: Schema,
779}
780
781/// A wrapper around Iceberg's DataFile that implements Serialize and Deserialize.
782/// This is slightly complicated by the fact that Iceberg's DataFile doesn't implement
783/// these traits directly, so we serialize to/from Avro bytes (which Iceberg supports natively).
784/// The avro ser(de) also requires the Iceberg schema to be provided, so we include that as well.
785/// It is distinctly possible that this is overkill, but it avoids re-implementing
786/// Iceberg's serialization logic here.
787/// If at some point this becomes a serious overhead, we can revisit this decision.
788#[derive(Clone, Debug, Serialize, Deserialize)]
789struct AvroDataFile {
790    pub data_file: Vec<u8>,
791    /// Schema serialized as JSON bytes to avoid bincode issues with HashMap
792    pub schema: Vec<u8>,
793}
794
795impl From<SerializableDataFile> for AvroDataFile {
796    fn from(value: SerializableDataFile) -> Self {
797        let mut data_file = Vec::new();
798        write_data_files_to_avro(
799            &mut data_file,
800            [value.data_file],
801            &StructType::new(vec![]),
802            FormatVersion::V2,
803        )
804        .expect("serialization into buffer");
805        let schema = serde_json::to_vec(&value.schema).expect("schema serialization");
806        AvroDataFile { data_file, schema }
807    }
808}
809
810impl TryFrom<AvroDataFile> for SerializableDataFile {
811    type Error = String;
812
813    fn try_from(value: AvroDataFile) -> Result<Self, Self::Error> {
814        let schema: Schema = serde_json::from_slice(&value.schema)
815            .map_err(|e| format!("Failed to deserialize schema: {}", e))?;
816        let data_files = read_data_files_from_avro(
817            &mut &*value.data_file,
818            &schema,
819            0,
820            &StructType::new(vec![]),
821            FormatVersion::V2,
822        )
823        .map_err_to_string_with_causes()?;
824        let Some(data_file) = data_files.into_iter().next() else {
825            return Err("No DataFile found in Avro data".into());
826        };
827        Ok(SerializableDataFile { data_file, schema })
828    }
829}
830
831/// A DataFile along with its associated batch description (lower and upper bounds).
832#[derive(Clone, Debug, Serialize, Deserialize)]
833struct BoundedDataFile {
834    pub data_file: SerializableDataFile,
835    pub batch_desc: (Antichain<Timestamp>, Antichain<Timestamp>),
836}
837
838impl BoundedDataFile {
839    pub fn new(
840        file: DataFile,
841        schema: Schema,
842        batch_desc: (Antichain<Timestamp>, Antichain<Timestamp>),
843    ) -> Self {
844        Self {
845            data_file: SerializableDataFile {
846                data_file: file,
847                schema,
848            },
849            batch_desc,
850        }
851    }
852
853    pub fn batch_desc(&self) -> &(Antichain<Timestamp>, Antichain<Timestamp>) {
854        &self.batch_desc
855    }
856
857    pub fn data_file(&self) -> &DataFile {
858        &self.data_file.data_file
859    }
860
861    pub fn into_data_file(self) -> DataFile {
862        self.data_file.data_file
863    }
864}
865
866/// A set of DataFiles along with their associated batch descriptions.
867#[derive(Clone, Debug, Default)]
868struct BoundedDataFileSet {
869    pub data_files: Vec<BoundedDataFile>,
870}
871
872/// Write rows into Parquet data files bounded by batch descriptions.
873/// Rows are matched to batches by timestamp; if a batch description hasn't arrived yet,
874/// rows are stashed until it does. This allows batches to be minted ahead of data arrival.
875fn write_data_files<G>(
876    name: String,
877    input: VecCollection<G, (Option<Row>, DiffPair<Row>), Diff>,
878    batch_desc_input: &Stream<G, (Antichain<Timestamp>, Antichain<Timestamp>)>,
879    connection: IcebergSinkConnection,
880    storage_configuration: StorageConfiguration,
881    materialize_arrow_schema: Arc<ArrowSchema>,
882    metrics: IcebergSinkMetrics,
883    statistics: SinkStatistics,
884) -> (
885    Stream<G, BoundedDataFile>,
886    Stream<G, HealthStatusMessage>,
887    PressOnDropButton,
888)
889where
890    G: Scope<Timestamp = Timestamp>,
891{
892    let scope = input.scope();
893    let mut builder = OperatorBuilder::new(name, scope.clone());
894
895    let (output, output_stream) = builder.new_output::<CapacityContainerBuilder<_>>();
896
897    let mut batch_desc_input =
898        builder.new_input_for(&batch_desc_input.broadcast(), Pipeline, &output);
899    let mut input = builder.new_disconnected_input(&input.inner, Pipeline);
900
901    let (button, errors) = builder.build_fallible(|caps| {
902        Box::pin(async move {
903            let [capset]: &mut [_; 1] = caps.try_into().unwrap();
904            let catalog = connection
905                .catalog_connection
906                .connect(&storage_configuration, InTask::Yes)
907                .await
908                .context("Failed to connect to iceberg catalog")?;
909
910            let namespace_ident = NamespaceIdent::new(connection.namespace.clone());
911            let table_ident = TableIdent::new(namespace_ident, connection.table.clone());
912            let table = catalog
913                .load_table(&table_ident)
914                .await
915                .context("Failed to load Iceberg table")?;
916
917            let table_metadata = table.metadata().clone();
918            let current_schema = Arc::clone(table_metadata.current_schema());
919
920            // Merge Materialize extension metadata into the Iceberg schema.
921            // We need extension metadata for ArrowBuilder to work correctly (it uses
922            // extension names to know how to handle different types like records vs arrays).
923            let arrow_schema = Arc::new(
924                merge_materialize_metadata_into_iceberg_schema(
925                    materialize_arrow_schema.as_ref(),
926                    current_schema.as_ref(),
927                )
928                .context("Failed to merge Materialize metadata into Iceberg schema")?,
929            );
930
931            // Build schema_with_op by adding the __op column used by DeltaWriter.
932            let schema_with_op = Arc::new(build_schema_with_op_column(&arrow_schema));
933
934            // WORKAROUND: S3 Tables catalog incorrectly sets location to the metadata file path
935            // instead of the warehouse root. Strip off the /metadata/*.metadata.json suffix.
936            // No clear way to detect this properly right now, so we use heuristics.
937            let location = table_metadata.location();
938            let corrected_location = match location.rsplit_once("/metadata/") {
939                Some((a, b)) if b.ends_with(".metadata.json") => a,
940                _ => location,
941            };
942
943            let data_location = format!("{}/data", corrected_location);
944            let location_generator = DefaultLocationGenerator::with_data_location(data_location);
945
946            // Add a unique suffix to avoid filename collisions across restarts and workers
947            let unique_suffix = format!("-{}", uuid::Uuid::new_v4());
948            let file_name_generator = DefaultFileNameGenerator::new(
949                PARQUET_FILE_PREFIX.to_string(),
950                Some(unique_suffix),
951                iceberg::spec::DataFileFormat::Parquet,
952            );
953
954            let file_io = table.file_io().clone();
955
956            let Some((_, equality_indices)) = connection.key_desc_and_indices else {
957                return Err(anyhow::anyhow!(
958                    "Iceberg sink requires key columns for equality deletes"
959                ));
960            };
961
962            let equality_ids: Vec<i32> = equality_indices
963                .iter()
964                .map(|u| i32::try_from(*u).map(|v| v + 1))
965                .collect::<Result<Vec<i32>, _>>()
966                .context("Failed to convert equality index to i32 (index too large)")?;
967
968            let writer_properties = WriterProperties::new();
969
970            let create_delta_writer = || async {
971                let data_parquet_writer = ParquetWriterBuilder::new(
972                    writer_properties.clone(),
973                    Arc::clone(&current_schema),
974                )
975                .with_arrow_schema(Arc::clone(&arrow_schema))
976                .context("Arrow schema validation failed")?;
977                let data_rolling_writer = RollingFileWriterBuilder::new_with_default_file_size(
978                    data_parquet_writer,
979                    Arc::clone(&current_schema),
980                    file_io.clone(),
981                    location_generator.clone(),
982                    file_name_generator.clone(),
983                );
984                let data_writer_builder = DataFileWriterBuilder::new(data_rolling_writer);
985
986                let pos_arrow_schema = PositionDeleteWriterConfig::arrow_schema();
987                let pos_schema =
988                    Arc::new(arrow_schema_to_schema(&pos_arrow_schema).context(
989                        "Failed to convert position delete Arrow schema to Iceberg schema",
990                    )?);
991                let pos_config = PositionDeleteWriterConfig::new(None, 0, None);
992                let pos_parquet_writer =
993                    ParquetWriterBuilder::new(writer_properties.clone(), pos_schema);
994                let pos_rolling_writer = RollingFileWriterBuilder::new_with_default_file_size(
995                    pos_parquet_writer,
996                    Arc::clone(&current_schema),
997                    file_io.clone(),
998                    location_generator.clone(),
999                    file_name_generator.clone(),
1000                );
1001                let pos_delete_writer_builder =
1002                    PositionDeleteFileWriterBuilder::new(pos_rolling_writer, pos_config);
1003
1004                let eq_config = EqualityDeleteWriterConfig::new(
1005                    equality_ids.clone(),
1006                    Arc::clone(&current_schema),
1007                )
1008                .context("Failed to create EqualityDeleteWriterConfig")?;
1009
1010                let eq_schema = Arc::new(
1011                    arrow_schema_to_schema(eq_config.projected_arrow_schema_ref()).context(
1012                        "Failed to convert equality delete Arrow schema to Iceberg schema",
1013                    )?,
1014                );
1015
1016                let eq_parquet_writer =
1017                    ParquetWriterBuilder::new(writer_properties.clone(), eq_schema);
1018                let eq_rolling_writer = RollingFileWriterBuilder::new_with_default_file_size(
1019                    eq_parquet_writer,
1020                    Arc::clone(&current_schema),
1021                    file_io.clone(),
1022                    location_generator.clone(),
1023                    file_name_generator.clone(),
1024                );
1025                let eq_delete_writer_builder =
1026                    EqualityDeleteFileWriterBuilder::new(eq_rolling_writer, eq_config);
1027
1028                DeltaWriterBuilder::new(
1029                    data_writer_builder,
1030                    pos_delete_writer_builder,
1031                    eq_delete_writer_builder,
1032                    equality_ids.clone(),
1033                )
1034                .build(None)
1035                .await
1036                .context("Failed to create DeltaWriter")
1037            };
1038
1039            // Rows can arrive before their batch description due to dataflow parallelism.
1040            // Stash them until we know which batch they belong to.
1041            let mut stashed_rows: BTreeMap<Timestamp, Vec<(Option<Row>, DiffPair<Row>)>> =
1042                BTreeMap::new();
1043
1044            // Track batches currently being written. When a row arrives, we check if it belongs
1045            // to an in-flight batch. When frontiers advance past a batch's upper, we close the
1046            // writer and emit its data files downstream.
1047            // Antichains don't implement Ord, so we use a HashMap with tuple keys instead.
1048            #[allow(clippy::disallowed_types)]
1049            let mut in_flight_batches: std::collections::HashMap<
1050                (Antichain<Timestamp>, Antichain<Timestamp>),
1051                DeltaWriterType,
1052            > = std::collections::HashMap::new();
1053
1054            let mut batch_description_frontier = Antichain::from_elem(Timestamp::minimum());
1055            let mut processed_batch_description_frontier =
1056                Antichain::from_elem(Timestamp::minimum());
1057            let mut input_frontier = Antichain::from_elem(Timestamp::minimum());
1058            let mut processed_input_frontier = Antichain::from_elem(Timestamp::minimum());
1059
1060            while !(batch_description_frontier.is_empty() && input_frontier.is_empty()) {
1061                tokio::select! {
1062                    _ = batch_desc_input.ready() => {},
1063                    _ = input.ready() => {}
1064                }
1065
1066                while let Some(event) = batch_desc_input.next_sync() {
1067                    match event {
1068                        Event::Data(_cap, data) => {
1069                            for batch_desc in data {
1070                                let (lower, upper) = &batch_desc;
1071                                let mut delta_writer = create_delta_writer().await?;
1072                                // Drain any stashed rows that belong to this batch
1073                                let row_ts_keys: Vec<_> = stashed_rows.keys().cloned().collect();
1074                                for row_ts in row_ts_keys {
1075                                    let ts = Antichain::from_elem(row_ts.clone());
1076                                    if PartialOrder::less_equal(lower, &ts)
1077                                        && PartialOrder::less_than(&ts, upper)
1078                                    {
1079                                        if let Some(rows) = stashed_rows.remove(&row_ts) {
1080                                            for (_row, diff_pair) in rows {
1081                                                metrics.stashed_rows.dec();
1082                                                let record_batch = row_to_recordbatch(
1083                                                    diff_pair.clone(),
1084                                                    Arc::clone(&arrow_schema),
1085                                                    Arc::clone(&schema_with_op),
1086                                                )
1087                                                .context("failed to convert row to recordbatch")?;
1088                                                delta_writer.write(record_batch).await.context(
1089                                                    "Failed to write row to DeltaWriter",
1090                                                )?;
1091                                            }
1092                                        }
1093                                    }
1094                                }
1095                                let prev =
1096                                    in_flight_batches.insert(batch_desc.clone(), delta_writer);
1097                                if prev.is_some() {
1098                                    anyhow::bail!(
1099                                        "Duplicate batch description received for description {:?}",
1100                                        batch_desc
1101                                    );
1102                                }
1103                            }
1104                        }
1105                        Event::Progress(frontier) => {
1106                            batch_description_frontier = frontier;
1107                        }
1108                    }
1109                }
1110
1111                let ready_events = std::iter::from_fn(|| input.next_sync()).collect_vec();
1112                for event in ready_events {
1113                    match event {
1114                        Event::Data(_cap, data) => {
1115                            for ((row, diff_pair), ts, _diff) in data {
1116                                let row_ts = ts.clone();
1117                                let ts_antichain = Antichain::from_elem(row_ts.clone());
1118                                let mut written = false;
1119                                // Try writing the row to any in-flight batch it belongs to...
1120                                for (batch_desc, delta_writer) in in_flight_batches.iter_mut() {
1121                                    let (lower, upper) = batch_desc;
1122                                    if PartialOrder::less_equal(lower, &ts_antichain)
1123                                        && PartialOrder::less_than(&ts_antichain, upper)
1124                                    {
1125                                        let record_batch = row_to_recordbatch(
1126                                            diff_pair.clone(),
1127                                            Arc::clone(&arrow_schema),
1128                                            Arc::clone(&schema_with_op),
1129                                        )
1130                                        .context("failed to convert row to recordbatch")?;
1131                                        delta_writer
1132                                            .write(record_batch)
1133                                            .await
1134                                            .context("Failed to write row to DeltaWriter")?;
1135                                        written = true;
1136                                        break;
1137                                    }
1138                                }
1139                                if !written {
1140                                    // ...otherwise stash it for later
1141                                    let entry = stashed_rows.entry(row_ts).or_default();
1142                                    metrics.stashed_rows.inc();
1143                                    entry.push((row, diff_pair));
1144                                }
1145                            }
1146                        }
1147                        Event::Progress(frontier) => {
1148                            input_frontier = frontier;
1149                        }
1150                    }
1151                }
1152
1153                // Check if frontiers have advanced, which may unlock batches ready to close
1154                if PartialOrder::less_equal(
1155                    &processed_batch_description_frontier,
1156                    &batch_description_frontier,
1157                ) || PartialOrder::less_than(&processed_input_frontier, &input_frontier)
1158                {
1159                    // Close batches whose upper is now in the past
1160                    let ready_batches: Vec<_> = in_flight_batches
1161                        .extract_if(|(lower, upper), _| {
1162                            PartialOrder::less_than(lower, &batch_description_frontier)
1163                                && PartialOrder::less_than(upper, &input_frontier)
1164                        })
1165                        .collect();
1166
1167                    if !ready_batches.is_empty() {
1168                        let mut max_upper = Antichain::from_elem(Timestamp::minimum());
1169                        for (desc, mut delta_writer) in ready_batches {
1170                            let data_files = delta_writer
1171                                .close()
1172                                .await
1173                                .context("Failed to close DeltaWriter")?;
1174                            for data_file in data_files {
1175                                match data_file.content_type() {
1176                                    iceberg::spec::DataContentType::Data => {
1177                                        metrics.data_files_written.inc();
1178                                    }
1179                                    iceberg::spec::DataContentType::PositionDeletes
1180                                    | iceberg::spec::DataContentType::EqualityDeletes => {
1181                                        metrics.delete_files_written.inc();
1182                                    }
1183                                }
1184                                statistics.inc_messages_staged_by(data_file.record_count());
1185                                statistics.inc_bytes_staged_by(data_file.file_size_in_bytes());
1186                                let file = BoundedDataFile::new(
1187                                    data_file,
1188                                    current_schema.as_ref().clone(),
1189                                    desc.clone(),
1190                                );
1191                                output.give(&capset[0], file);
1192                            }
1193
1194                            max_upper = max_upper.join(&desc.1);
1195                        }
1196
1197                        capset.downgrade(max_upper);
1198                    }
1199                    processed_batch_description_frontier.clone_from(&batch_description_frontier);
1200                    processed_input_frontier.clone_from(&input_frontier);
1201                }
1202            }
1203            Ok(())
1204        })
1205    });
1206
1207    let statuses = errors.map(|error| HealthStatusMessage {
1208        id: None,
1209        update: HealthStatusUpdate::halting(format!("{}", error.display_with_causes()), None),
1210        namespace: StatusNamespace::Iceberg,
1211    });
1212    (output_stream, statuses, button.press_on_drop())
1213}
1214
1215/// Commit completed batches to Iceberg as snapshots.
1216/// Batches are committed in timestamp order to ensure strong consistency guarantees downstream.
1217/// Each snapshot includes the Materialize frontier in its metadata for resume support.
1218fn commit_to_iceberg<G>(
1219    name: String,
1220    sink_id: GlobalId,
1221    sink_version: u64,
1222    batch_input: &Stream<G, BoundedDataFile>,
1223    batch_desc_input: &Stream<G, (Antichain<Timestamp>, Antichain<Timestamp>)>,
1224    write_frontier: Rc<RefCell<Antichain<Timestamp>>>,
1225    connection: IcebergSinkConnection,
1226    storage_configuration: StorageConfiguration,
1227    write_handle: impl Future<
1228        Output = anyhow::Result<WriteHandle<SourceData, (), Timestamp, StorageDiff>>,
1229    > + 'static,
1230    metrics: IcebergSinkMetrics,
1231    statistics: SinkStatistics,
1232) -> (Stream<G, HealthStatusMessage>, PressOnDropButton)
1233where
1234    G: Scope<Timestamp = Timestamp>,
1235{
1236    let scope = batch_input.scope();
1237    let mut builder = OperatorBuilder::new(name, scope.clone());
1238
1239    let hashed_id = sink_id.hashed();
1240    let is_active_worker = usize::cast_from(hashed_id) % scope.peers() == scope.index();
1241
1242    let mut input = builder.new_disconnected_input(batch_input, Exchange::new(move |_| hashed_id));
1243    let mut batch_desc_input =
1244        builder.new_disconnected_input(batch_desc_input, Exchange::new(move |_| hashed_id));
1245
1246    let (button, errors) = builder.build_fallible(move |_caps| {
1247        Box::pin(async move {
1248            if !is_active_worker {
1249                write_frontier.borrow_mut().clear();
1250                return Ok(());
1251            }
1252
1253            let catalog = connection
1254                .catalog_connection
1255                .connect(&storage_configuration, InTask::Yes)
1256                .await
1257                .context("Failed to connect to iceberg catalog")?;
1258
1259            let mut write_handle = write_handle.await?;
1260
1261            let namespace_ident = NamespaceIdent::new(connection.namespace.clone());
1262            let table_ident = TableIdent::new(namespace_ident, connection.table.clone());
1263            let mut table = catalog
1264                .load_table(&table_ident)
1265                .await
1266                .context("Failed to load Iceberg table")?;
1267
1268            #[allow(clippy::disallowed_types)]
1269            let mut batch_descriptions: std::collections::HashMap<
1270                (Antichain<Timestamp>, Antichain<Timestamp>),
1271                BoundedDataFileSet,
1272            > = std::collections::HashMap::new();
1273
1274
1275            let mut batch_description_frontier = Antichain::from_elem(Timestamp::minimum());
1276            let mut input_frontier = Antichain::from_elem(Timestamp::minimum());
1277
1278            while !(batch_description_frontier.is_empty() && input_frontier.is_empty()) {
1279                tokio::select! {
1280                    _ = batch_desc_input.ready() => {},
1281                    _ = input.ready() => {}
1282                }
1283
1284                while let Some(event) = batch_desc_input.next_sync() {
1285                    match event {
1286                        Event::Data(_cap, data) => {
1287                            for batch_desc in data {
1288                                let prev = batch_descriptions.insert(batch_desc, BoundedDataFileSet { data_files: vec![] });
1289                                if let Some(prev) = prev {
1290                                    anyhow::bail!(
1291                                        "Duplicate batch description received in commit operator: {:?}",
1292                                        prev
1293                                    );
1294                                }
1295                            }
1296                        }
1297                        Event::Progress(frontier) => {
1298                            batch_description_frontier = frontier;
1299                        }
1300                    }
1301                }
1302
1303                let ready_events = std::iter::from_fn(|| input.next_sync()).collect_vec();
1304                for event in ready_events {
1305                    match event {
1306                        Event::Data(_cap, data) => {
1307                            for bounded_data_file in data {
1308                                let entry = batch_descriptions.entry(bounded_data_file.batch_desc().clone()).or_default();
1309                                entry.data_files.push(bounded_data_file);
1310                            }
1311                        }
1312                        Event::Progress(frontier) => {
1313                            input_frontier = frontier;
1314                        }
1315                    }
1316                }
1317
1318                let mut done_batches: Vec<_> = batch_descriptions
1319                    .keys()
1320                    .filter(|(lower, _upper)| {
1321                        PartialOrder::less_than(lower, &input_frontier)
1322                    })
1323                    .cloned()
1324                    .collect();
1325
1326                // Commit batches in timestamp order to maintain consistency
1327                done_batches.sort_by(|a, b| {
1328                    if PartialOrder::less_than(a, b) {
1329                        Ordering::Less
1330                    } else if PartialOrder::less_than(b, a) {
1331                        Ordering::Greater
1332                    } else {
1333                        Ordering::Equal
1334                    }
1335                });
1336
1337                for batch in done_batches {
1338                    let file_set = batch_descriptions.remove(&batch).unwrap();
1339
1340                    let mut data_files = vec![];
1341                    let mut delete_files = vec![];
1342                    // Track totals for committed statistics
1343                    let mut total_messages: u64 = 0;
1344                    let mut total_bytes: u64 = 0;
1345                    for file in file_set.data_files {
1346                        total_messages += file.data_file().record_count();
1347                        total_bytes += file.data_file().file_size_in_bytes();
1348                        match file.data_file().content_type() {
1349                            iceberg::spec::DataContentType::Data => {
1350                                data_files.push(file.into_data_file());
1351                            }
1352                            iceberg::spec::DataContentType::PositionDeletes |
1353                            iceberg::spec::DataContentType::EqualityDeletes => {
1354                                delete_files.push(file.into_data_file());
1355                            }
1356                        }
1357                    }
1358
1359                    let frontier = batch.1.clone();
1360                    let tx = Transaction::new(&table);
1361
1362                    let frontier_json = serde_json::to_string(&frontier.elements())
1363                        .context("Failed to serialize frontier to JSON")?;
1364
1365                    // Store the frontier in snapshot metadata so we can resume from this point
1366                    let mut action = tx.row_delta().set_snapshot_properties(
1367                        vec![
1368                            ("mz-sink-id".to_string(), sink_id.to_string()),
1369                            ("mz-frontier".to_string(), frontier_json),
1370                            ("mz-sink-version".to_string(), sink_version.to_string()),
1371                        ].into_iter().collect()
1372                    );
1373
1374                    if !data_files.is_empty() || !delete_files.is_empty() {
1375                        action = action.add_data_files(data_files).add_delete_files(delete_files);
1376                    }
1377
1378                    let tx = action.apply(tx).context(
1379                        "Failed to apply data file addition to iceberg table transaction",
1380                    )?;
1381
1382                    table = Retry::default().max_tries(5).retry_async(|_| async {
1383                        let new_table = tx.clone().commit(catalog.as_ref()).await;
1384                        match new_table {
1385                            Err(e) if matches!(e.kind(), ErrorKind::CatalogCommitConflicts) => {
1386                                metrics.commit_conflicts.inc();
1387                                let table = reload_table(
1388                                    catalog.as_ref(),
1389                                    connection.namespace.clone(),
1390                                    connection.table.clone(),
1391                                    table.clone(),
1392                                ).await;
1393                                let table = match table {
1394                                    Ok(table) => table,
1395                                    Err(e) => return RetryResult::RetryableErr(anyhow!(e)),
1396                                };
1397
1398                                let mut snapshots: Vec<_> = table.metadata().snapshots().cloned().collect();
1399                                let last = retrieve_upper_from_snapshots(&mut snapshots);
1400                                let last = match last {
1401                                    Ok(val) => val,
1402                                    Err(e) => return RetryResult::RetryableErr(anyhow!(e)),
1403                                };
1404
1405                                // Check if another writer has advanced the frontier beyond ours (fencing check)
1406                                if let Some((last_frontier, _last_version)) = last {
1407                                    if PartialOrder::less_equal(&frontier, &last_frontier) {
1408                                        return RetryResult::FatalErr(anyhow!(
1409                                            "Iceberg table '{}' has been modified by another writer. Current frontier: {:?}, last frontier: {:?}.",
1410                                            connection.table,
1411                                            frontier,
1412                                            last_frontier,
1413                                        ));
1414                                    }
1415                                }
1416
1417                                RetryResult::Ok(table)
1418                            }
1419                            Err(e) => {
1420                                metrics.commit_failures.inc();
1421                                RetryResult::RetryableErr(anyhow!(e))
1422                            },
1423                            Ok(table) => RetryResult::Ok(table)
1424                        }
1425                    }).await.context("failed to commit to iceberg")?;
1426
1427                    metrics.snapshots_committed.inc();
1428                    statistics.inc_messages_committed_by(total_messages);
1429                    statistics.inc_bytes_committed_by(total_bytes);
1430
1431                    let mut expect_upper = write_handle.shared_upper();
1432                    loop {
1433                        if PartialOrder::less_equal(&frontier, &expect_upper) {
1434                            // The frontier has already been advanced as far as necessary.
1435                            break;
1436                        }
1437
1438                        const EMPTY: &[((SourceData, ()), Timestamp, StorageDiff)] = &[];
1439                        match write_handle
1440                            .compare_and_append(EMPTY, expect_upper, frontier.clone())
1441                            .await
1442                            .expect("valid usage")
1443                        {
1444                            Ok(()) => break,
1445                            Err(mismatch) => {
1446                                expect_upper = mismatch.current;
1447                            }
1448                        }
1449                    }
1450                    write_frontier.borrow_mut().clone_from(&frontier);
1451
1452                }
1453            }
1454
1455
1456            Ok(())
1457        })
1458    });
1459
1460    let statuses = errors.map(|error| HealthStatusMessage {
1461        id: None,
1462        update: HealthStatusUpdate::halting(format!("{}", error.display_with_causes()), None),
1463        namespace: StatusNamespace::Iceberg,
1464    });
1465
1466    (statuses, button.press_on_drop())
1467}
1468
1469impl<G: Scope<Timestamp = Timestamp>> SinkRender<G> for IcebergSinkConnection {
1470    fn get_key_indices(&self) -> Option<&[usize]> {
1471        self.key_desc_and_indices
1472            .as_ref()
1473            .map(|(_, indices)| indices.as_slice())
1474    }
1475
1476    fn get_relation_key_indices(&self) -> Option<&[usize]> {
1477        self.relation_key_indices.as_deref()
1478    }
1479
1480    fn render_sink(
1481        &self,
1482        storage_state: &mut StorageState,
1483        sink: &StorageSinkDesc<CollectionMetadata, Timestamp>,
1484        sink_id: GlobalId,
1485        input: VecCollection<G, (Option<Row>, DiffPair<Row>), Diff>,
1486        _err_collection: VecCollection<G, DataflowError, Diff>,
1487    ) -> (Stream<G, HealthStatusMessage>, Vec<PressOnDropButton>) {
1488        let mut scope = input.scope();
1489
1490        let write_handle = {
1491            let persist = Arc::clone(&storage_state.persist_clients);
1492            let shard_meta = sink.to_storage_metadata.clone();
1493            async move {
1494                let client = persist.open(shard_meta.persist_location).await?;
1495                let handle = client
1496                    .open_writer(
1497                        shard_meta.data_shard,
1498                        Arc::new(shard_meta.relation_desc),
1499                        Arc::new(UnitSchema),
1500                        Diagnostics::from_purpose("sink handle"),
1501                    )
1502                    .await?;
1503                Ok(handle)
1504            }
1505        };
1506
1507        let write_frontier = Rc::new(RefCell::new(Antichain::from_elem(Timestamp::minimum())));
1508        storage_state
1509            .sink_write_frontiers
1510            .insert(sink_id, Rc::clone(&write_frontier));
1511
1512        let (arrow_schema_with_ids, iceberg_schema) =
1513            match relation_desc_to_iceberg_schema(&sink.from_desc) {
1514                Ok(schemas) => schemas,
1515                Err(err) => {
1516                    let error_stream = std::iter::once(HealthStatusMessage {
1517                        id: None,
1518                        update: HealthStatusUpdate::halting(
1519                            format!("{}", err.display_with_causes()),
1520                            None,
1521                        ),
1522                        namespace: StatusNamespace::Iceberg,
1523                    })
1524                    .to_stream(&mut scope);
1525                    return (error_stream, vec![]);
1526                }
1527            };
1528
1529        let metrics = storage_state
1530            .metrics
1531            .get_iceberg_sink_metrics(sink_id, scope.index());
1532
1533        let statistics = storage_state
1534            .aggregated_statistics
1535            .get_sink(&sink_id)
1536            .expect("statistics initialized")
1537            .clone();
1538
1539        let connection_for_minter = self.clone();
1540        let (minted_input, batch_descriptions, mint_status, mint_button) = mint_batch_descriptions(
1541            format!("{sink_id}-iceberg-mint"),
1542            sink_id,
1543            &input,
1544            sink,
1545            connection_for_minter,
1546            storage_state.storage_configuration.clone(),
1547            Arc::clone(&iceberg_schema),
1548        );
1549
1550        let connection_for_writer = self.clone();
1551        let (datafiles, write_status, write_button) = write_data_files(
1552            format!("{sink_id}-write-data-files"),
1553            minted_input,
1554            &batch_descriptions,
1555            connection_for_writer,
1556            storage_state.storage_configuration.clone(),
1557            Arc::new(arrow_schema_with_ids.clone()),
1558            metrics.clone(),
1559            statistics.clone(),
1560        );
1561
1562        let connection_for_committer = self.clone();
1563        let (commit_status, commit_button) = commit_to_iceberg(
1564            format!("{sink_id}-commit-to-iceberg"),
1565            sink_id,
1566            sink.version,
1567            &datafiles,
1568            &batch_descriptions,
1569            Rc::clone(&write_frontier),
1570            connection_for_committer,
1571            storage_state.storage_configuration.clone(),
1572            write_handle,
1573            metrics.clone(),
1574            statistics,
1575        );
1576
1577        let running_status = Some(HealthStatusMessage {
1578            id: None,
1579            update: HealthStatusUpdate::running(),
1580            namespace: StatusNamespace::Iceberg,
1581        })
1582        .to_stream(&mut scope);
1583
1584        let statuses =
1585            scope.concatenate([running_status, mint_status, write_status, commit_status]);
1586
1587        (statuses, vec![mint_button, write_button, commit_button])
1588    }
1589}