1use std::cmp::Ordering;
75use std::collections::{BTreeMap, VecDeque};
76use std::convert::Infallible;
77use std::future::Future;
78use std::time::Instant;
79use std::{cell::RefCell, rc::Rc, sync::Arc};
80
81use anyhow::{Context, anyhow};
82use arrow::array::{ArrayRef, Int32Array, Int64Array, RecordBatch};
83use arrow::datatypes::{DataType, Field, Schema as ArrowSchema};
84use differential_dataflow::lattice::Lattice;
85use differential_dataflow::{AsCollection, Hashable, VecCollection};
86use futures::StreamExt;
87use iceberg::ErrorKind;
88use iceberg::arrow::{arrow_schema_to_schema, schema_to_arrow_schema};
89use iceberg::spec::{
90 DataFile, FormatVersion, Snapshot, StructType, read_data_files_from_avro,
91 write_data_files_to_avro,
92};
93use iceberg::spec::{Schema, SchemaRef};
94use iceberg::table::Table;
95use iceberg::transaction::{ApplyTransactionAction, Transaction};
96use iceberg::writer::base_writer::data_file_writer::DataFileWriterBuilder;
97use iceberg::writer::base_writer::equality_delete_writer::{
98 EqualityDeleteFileWriterBuilder, EqualityDeleteWriterConfig,
99};
100use iceberg::writer::base_writer::position_delete_writer::{
101 PositionDeleteFileWriterBuilder, PositionDeleteWriterConfig,
102};
103use iceberg::writer::combined_writer::delta_writer::DeltaWriterBuilder;
104use iceberg::writer::file_writer::ParquetWriterBuilder;
105use iceberg::writer::file_writer::location_generator::{
106 DefaultFileNameGenerator, DefaultLocationGenerator,
107};
108use iceberg::writer::file_writer::rolling_writer::RollingFileWriterBuilder;
109use iceberg::writer::{IcebergWriter, IcebergWriterBuilder};
110use iceberg::{Catalog, NamespaceIdent, TableCreation, TableIdent};
111use itertools::Itertools;
112use mz_arrow_util::builder::{ARROW_EXTENSION_NAME_KEY, ArrowBuilder};
113use mz_interchange::avro::DiffPair;
114use mz_ore::cast::CastFrom;
115use mz_ore::error::ErrorExt;
116use mz_ore::future::InTask;
117use mz_ore::result::ResultExt;
118use mz_ore::retry::{Retry, RetryResult};
119use mz_persist_client::Diagnostics;
120use mz_persist_client::write::WriteHandle;
121use mz_persist_types::codec_impls::UnitSchema;
122use mz_repr::{Diff, GlobalId, Row, Timestamp};
123use mz_storage_types::StorageDiff;
124use mz_storage_types::configuration::StorageConfiguration;
125use mz_storage_types::controller::CollectionMetadata;
126use mz_storage_types::errors::DataflowError;
127use mz_storage_types::sinks::{IcebergSinkConnection, SinkEnvelope, StorageSinkDesc};
128use mz_storage_types::sources::SourceData;
129use mz_timely_util::antichain::AntichainExt;
130use mz_timely_util::builder_async::{Event, OperatorBuilder, PressOnDropButton};
131use parquet::arrow::PARQUET_FIELD_ID_META_KEY;
132use parquet::file::properties::WriterProperties;
133use serde::{Deserialize, Serialize};
134use timely::PartialOrder;
135use timely::container::CapacityContainerBuilder;
136use timely::dataflow::StreamVec;
137use timely::dataflow::channels::pact::{Exchange, Pipeline};
138use timely::dataflow::operators::vec::{Broadcast, Map, ToStream};
139use timely::dataflow::operators::{CapabilitySet, Concatenate};
140use timely::progress::{Antichain, Timestamp as _};
141use tracing::debug;
142
143use crate::healthcheck::{HealthStatusMessage, HealthStatusUpdate, StatusNamespace};
144use crate::metrics::sink::iceberg::IcebergSinkMetrics;
145use crate::render::sinks::SinkRender;
146use crate::statistics::SinkStatistics;
147use crate::storage_state::StorageState;
148
149const DEFAULT_ARRAY_BUILDER_ITEM_CAPACITY: usize = 1024;
152const DEFAULT_ARRAY_BUILDER_DATA_CAPACITY: usize = 1024;
156
157const PARQUET_FILE_PREFIX: &str = "mz_data";
159const INITIAL_DESCRIPTIONS_TO_MINT: u64 = 3;
162
163struct WriterContext {
166 arrow_schema: Arc<ArrowSchema>,
168 current_schema: Arc<Schema>,
170 file_io: iceberg::io::FileIO,
172 location_generator: DefaultLocationGenerator,
174 file_name_generator: DefaultFileNameGenerator,
176 writer_properties: WriterProperties,
177}
178
179trait EnvelopeHandler: Send {
181 fn new(
183 ctx: WriterContext,
184 connection: &IcebergSinkConnection,
185 materialize_arrow_schema: &Arc<ArrowSchema>,
186 ) -> anyhow::Result<Self>
187 where
188 Self: Sized;
189
190 async fn create_writer(&self, is_snapshot: bool) -> anyhow::Result<Box<dyn IcebergWriter>>;
196
197 fn row_to_batch(&self, diff_pair: DiffPair<Row>, ts: Timestamp) -> anyhow::Result<RecordBatch>;
198}
199
200struct UpsertEnvelopeHandler {
201 ctx: WriterContext,
202 equality_ids: Vec<i32>,
204 pos_schema: Arc<Schema>,
206 eq_schema: Arc<Schema>,
208 eq_config: EqualityDeleteWriterConfig,
210 schema_with_op: Arc<ArrowSchema>,
214}
215
216impl EnvelopeHandler for UpsertEnvelopeHandler {
217 fn new(
218 ctx: WriterContext,
219 connection: &IcebergSinkConnection,
220 materialize_arrow_schema: &Arc<ArrowSchema>,
221 ) -> anyhow::Result<Self> {
222 let Some((_, equality_indices)) = &connection.key_desc_and_indices else {
223 return Err(anyhow::anyhow!(
224 "Iceberg sink requires key columns for equality deletes"
225 ));
226 };
227
228 let equality_ids = equality_ids_for_indices(
229 ctx.current_schema.as_ref(),
230 materialize_arrow_schema.as_ref(),
231 equality_indices,
232 )?;
233
234 let pos_arrow_schema = PositionDeleteWriterConfig::arrow_schema();
235 let pos_schema = Arc::new(
236 arrow_schema_to_schema(&pos_arrow_schema)
237 .context("Failed to convert position delete Arrow schema to Iceberg schema")?,
238 );
239
240 let eq_config =
241 EqualityDeleteWriterConfig::new(equality_ids.clone(), Arc::clone(&ctx.current_schema))
242 .context("Failed to create EqualityDeleteWriterConfig")?;
243 let eq_schema = Arc::new(
244 arrow_schema_to_schema(eq_config.projected_arrow_schema_ref())
245 .context("Failed to convert equality delete Arrow schema to Iceberg schema")?,
246 );
247
248 let schema_with_op = Arc::new(build_schema_with_op_column(&ctx.arrow_schema));
249
250 Ok(Self {
251 ctx,
252 equality_ids,
253 pos_schema,
254 eq_schema,
255 eq_config,
256 schema_with_op,
257 })
258 }
259
260 async fn create_writer(&self, is_snapshot: bool) -> anyhow::Result<Box<dyn IcebergWriter>> {
261 let data_parquet_writer = ParquetWriterBuilder::new(
262 self.ctx.writer_properties.clone(),
263 Arc::clone(&self.ctx.current_schema),
264 )
265 .with_arrow_schema(Arc::clone(&self.ctx.arrow_schema))
266 .context("Arrow schema validation failed")?;
267 let data_rolling_writer = RollingFileWriterBuilder::new_with_default_file_size(
268 data_parquet_writer,
269 Arc::clone(&self.ctx.current_schema),
270 self.ctx.file_io.clone(),
271 self.ctx.location_generator.clone(),
272 self.ctx.file_name_generator.clone(),
273 );
274 let data_writer_builder = DataFileWriterBuilder::new(data_rolling_writer);
275
276 let pos_config = PositionDeleteWriterConfig::new(None, 0, None);
277 let pos_parquet_writer = ParquetWriterBuilder::new(
278 self.ctx.writer_properties.clone(),
279 Arc::clone(&self.pos_schema),
280 );
281 let pos_rolling_writer = RollingFileWriterBuilder::new_with_default_file_size(
282 pos_parquet_writer,
283 Arc::clone(&self.ctx.current_schema),
284 self.ctx.file_io.clone(),
285 self.ctx.location_generator.clone(),
286 self.ctx.file_name_generator.clone(),
287 );
288 let pos_delete_writer_builder =
289 PositionDeleteFileWriterBuilder::new(pos_rolling_writer, pos_config);
290
291 let eq_parquet_writer = ParquetWriterBuilder::new(
292 self.ctx.writer_properties.clone(),
293 Arc::clone(&self.eq_schema),
294 );
295 let eq_rolling_writer = RollingFileWriterBuilder::new_with_default_file_size(
296 eq_parquet_writer,
297 Arc::clone(&self.ctx.current_schema),
298 self.ctx.file_io.clone(),
299 self.ctx.location_generator.clone(),
300 self.ctx.file_name_generator.clone(),
301 );
302 let eq_delete_writer_builder =
303 EqualityDeleteFileWriterBuilder::new(eq_rolling_writer, self.eq_config.clone());
304
305 let mut builder = DeltaWriterBuilder::new(
306 data_writer_builder,
307 pos_delete_writer_builder,
308 eq_delete_writer_builder,
309 self.equality_ids.clone(),
310 );
311
312 if is_snapshot {
313 builder = builder.with_max_seen_rows(0);
314 }
315
316 Ok(Box::new(
317 builder
318 .build(None)
319 .await
320 .context("Failed to create DeltaWriter")?,
321 ))
322 }
323
324 fn row_to_batch(
327 &self,
328 diff_pair: DiffPair<Row>,
329 _ts: Timestamp,
330 ) -> anyhow::Result<RecordBatch> {
331 let mut builder = ArrowBuilder::new_with_schema(
332 Arc::clone(&self.ctx.arrow_schema),
333 DEFAULT_ARRAY_BUILDER_ITEM_CAPACITY,
334 DEFAULT_ARRAY_BUILDER_DATA_CAPACITY,
335 )
336 .context("Failed to create builder")?;
337
338 let mut op_values = Vec::new();
339
340 if let Some(before) = diff_pair.before {
341 builder
342 .add_row(&before)
343 .context("Failed to add delete row to builder")?;
344 op_values.push(-1i32);
345 }
346 if let Some(after) = diff_pair.after {
347 builder
348 .add_row(&after)
349 .context("Failed to add insert row to builder")?;
350 op_values.push(1i32);
351 }
352
353 let batch = builder
354 .to_record_batch()
355 .context("Failed to create record batch")?;
356
357 let mut columns: Vec<ArrayRef> = batch.columns().to_vec();
358 columns.push(Arc::new(Int32Array::from(op_values)));
359
360 RecordBatch::try_new(Arc::clone(&self.schema_with_op), columns)
361 .context("Failed to create batch with op column")
362 }
363}
364
365struct AppendEnvelopeHandler {
366 ctx: WriterContext,
367 user_schema_for_append: Arc<ArrowSchema>,
370}
371
372impl EnvelopeHandler for AppendEnvelopeHandler {
373 fn new(
374 ctx: WriterContext,
375 _connection: &IcebergSinkConnection,
376 _materialize_arrow_schema: &Arc<ArrowSchema>,
377 ) -> anyhow::Result<Self> {
378 let n = ctx.arrow_schema.fields().len().saturating_sub(2);
381 let user_schema_for_append =
382 Arc::new(ArrowSchema::new(ctx.arrow_schema.fields()[..n].to_vec()));
383
384 Ok(Self {
385 ctx,
386 user_schema_for_append,
387 })
388 }
389
390 async fn create_writer(&self, _is_snapshot: bool) -> anyhow::Result<Box<dyn IcebergWriter>> {
391 let data_parquet_writer = ParquetWriterBuilder::new(
392 self.ctx.writer_properties.clone(),
393 Arc::clone(&self.ctx.current_schema),
394 )
395 .with_arrow_schema(Arc::clone(&self.ctx.arrow_schema))
396 .context("Arrow schema validation failed")?;
397 let data_rolling_writer = RollingFileWriterBuilder::new_with_default_file_size(
398 data_parquet_writer,
399 Arc::clone(&self.ctx.current_schema),
400 self.ctx.file_io.clone(),
401 self.ctx.location_generator.clone(),
402 self.ctx.file_name_generator.clone(),
403 );
404 Ok(Box::new(
405 DataFileWriterBuilder::new(data_rolling_writer)
406 .build(None)
407 .await
408 .context("Failed to create DataFileWriter")?,
409 ))
410 }
411
412 fn row_to_batch(&self, diff_pair: DiffPair<Row>, ts: Timestamp) -> anyhow::Result<RecordBatch> {
415 let mut builder = ArrowBuilder::new_with_schema(
416 Arc::clone(&self.user_schema_for_append),
417 DEFAULT_ARRAY_BUILDER_ITEM_CAPACITY,
418 DEFAULT_ARRAY_BUILDER_DATA_CAPACITY,
419 )
420 .context("Failed to create builder")?;
421
422 let mut diff_values: Vec<i32> = Vec::new();
423 let ts_i64 = i64::try_from(u64::from(ts)).unwrap_or(i64::MAX);
424
425 if let Some(before) = diff_pair.before {
426 builder
427 .add_row(&before)
428 .context("Failed to add before row to builder")?;
429 diff_values.push(-1i32);
430 }
431 if let Some(after) = diff_pair.after {
432 builder
433 .add_row(&after)
434 .context("Failed to add after row to builder")?;
435 diff_values.push(1i32);
436 }
437
438 let n = diff_values.len();
439 let batch = builder
440 .to_record_batch()
441 .context("Failed to create record batch")?;
442
443 let mut columns: Vec<ArrayRef> = batch.columns().to_vec();
444 columns.push(Arc::new(Int32Array::from(diff_values)));
445 columns.push(Arc::new(Int64Array::from(vec![ts_i64; n])));
446
447 RecordBatch::try_new(Arc::clone(&self.ctx.arrow_schema), columns)
448 .context("Failed to create append record batch")
449 }
450}
451
452const ICEBERG_UINT64_DECIMAL_PRECISION: u8 = 20;
455
456fn add_field_ids_to_arrow_schema(schema: ArrowSchema) -> ArrowSchema {
461 let mut next_field_id = 1i32;
462 let fields: Vec<Field> = schema
463 .fields()
464 .iter()
465 .map(|field| add_field_ids_recursive(field, &mut next_field_id))
466 .collect();
467 ArrowSchema::new(fields).with_metadata(schema.metadata().clone())
468}
469
470fn add_field_ids_recursive(field: &Field, next_id: &mut i32) -> Field {
472 let current_id = *next_id;
473 *next_id += 1;
474
475 let mut metadata = field.metadata().clone();
476 metadata.insert(
477 PARQUET_FIELD_ID_META_KEY.to_string(),
478 current_id.to_string(),
479 );
480
481 let new_data_type = add_field_ids_to_datatype(field.data_type(), next_id);
482
483 Field::new(field.name(), new_data_type, field.is_nullable()).with_metadata(metadata)
484}
485
486fn add_field_ids_to_datatype(data_type: &DataType, next_id: &mut i32) -> DataType {
488 match data_type {
489 DataType::Struct(fields) => {
490 let new_fields: Vec<Field> = fields
491 .iter()
492 .map(|f| add_field_ids_recursive(f, next_id))
493 .collect();
494 DataType::Struct(new_fields.into())
495 }
496 DataType::List(element_field) => {
497 let new_element = add_field_ids_recursive(element_field, next_id);
498 DataType::List(Arc::new(new_element))
499 }
500 DataType::LargeList(element_field) => {
501 let new_element = add_field_ids_recursive(element_field, next_id);
502 DataType::LargeList(Arc::new(new_element))
503 }
504 DataType::Map(entries_field, sorted) => {
505 let new_entries = add_field_ids_recursive(entries_field, next_id);
506 DataType::Map(Arc::new(new_entries), *sorted)
507 }
508 _ => data_type.clone(),
509 }
510}
511
512fn merge_materialize_metadata_into_iceberg_schema(
517 materialize_arrow_schema: &ArrowSchema,
518 iceberg_schema: &Schema,
519) -> anyhow::Result<ArrowSchema> {
520 let iceberg_arrow_schema = schema_to_arrow_schema(iceberg_schema)
522 .context("Failed to convert Iceberg schema to Arrow schema")?;
523
524 let fields: Vec<Field> = iceberg_arrow_schema
526 .fields()
527 .iter()
528 .map(|iceberg_field| {
529 let mz_field = materialize_arrow_schema
531 .field_with_name(iceberg_field.name())
532 .with_context(|| {
533 format!(
534 "Field '{}' not found in Materialize schema",
535 iceberg_field.name()
536 )
537 })?;
538
539 merge_field_metadata_recursive(iceberg_field, Some(mz_field))
540 })
541 .collect::<anyhow::Result<Vec<_>>>()?;
542
543 Ok(ArrowSchema::new(fields).with_metadata(iceberg_arrow_schema.metadata().clone()))
544}
545
546fn merge_field_metadata_recursive(
548 iceberg_field: &Field,
549 mz_field: Option<&Field>,
550) -> anyhow::Result<Field> {
551 let mut metadata = iceberg_field.metadata().clone();
553
554 if let Some(mz_f) = mz_field {
556 if let Some(extension_name) = mz_f.metadata().get(ARROW_EXTENSION_NAME_KEY) {
557 metadata.insert(ARROW_EXTENSION_NAME_KEY.to_string(), extension_name.clone());
558 }
559 }
560
561 let new_data_type = match iceberg_field.data_type() {
563 DataType::Struct(iceberg_fields) => {
564 let mz_struct_fields = match mz_field {
565 Some(f) => match f.data_type() {
566 DataType::Struct(fields) => Some(fields),
567 other => anyhow::bail!(
568 "Type mismatch for field '{}': Iceberg schema has Struct, but Materialize schema has {:?}",
569 iceberg_field.name(),
570 other
571 ),
572 },
573 None => None,
574 };
575
576 let new_fields: Vec<Field> = iceberg_fields
577 .iter()
578 .map(|iceberg_inner| {
579 let mz_inner = mz_struct_fields.and_then(|fields| {
580 fields.iter().find(|f| f.name() == iceberg_inner.name())
581 });
582 merge_field_metadata_recursive(iceberg_inner, mz_inner.map(|f| f.as_ref()))
583 })
584 .collect::<anyhow::Result<Vec<_>>>()?;
585
586 DataType::Struct(new_fields.into())
587 }
588 DataType::List(iceberg_element) => {
589 let mz_element = match mz_field {
590 Some(f) => match f.data_type() {
591 DataType::List(element) => Some(element.as_ref()),
592 other => anyhow::bail!(
593 "Type mismatch for field '{}': Iceberg schema has List, but Materialize schema has {:?}",
594 iceberg_field.name(),
595 other
596 ),
597 },
598 None => None,
599 };
600 let new_element = merge_field_metadata_recursive(iceberg_element, mz_element)?;
601 DataType::List(Arc::new(new_element))
602 }
603 DataType::LargeList(iceberg_element) => {
604 let mz_element = match mz_field {
605 Some(f) => match f.data_type() {
606 DataType::LargeList(element) => Some(element.as_ref()),
607 other => anyhow::bail!(
608 "Type mismatch for field '{}': Iceberg schema has LargeList, but Materialize schema has {:?}",
609 iceberg_field.name(),
610 other
611 ),
612 },
613 None => None,
614 };
615 let new_element = merge_field_metadata_recursive(iceberg_element, mz_element)?;
616 DataType::LargeList(Arc::new(new_element))
617 }
618 DataType::Map(iceberg_entries, sorted) => {
619 let mz_entries = match mz_field {
620 Some(f) => match f.data_type() {
621 DataType::Map(entries, _) => Some(entries.as_ref()),
622 other => anyhow::bail!(
623 "Type mismatch for field '{}': Iceberg schema has Map, but Materialize schema has {:?}",
624 iceberg_field.name(),
625 other
626 ),
627 },
628 None => None,
629 };
630 let new_entries = merge_field_metadata_recursive(iceberg_entries, mz_entries)?;
631 DataType::Map(Arc::new(new_entries), *sorted)
632 }
633 other => other.clone(),
634 };
635
636 Ok(Field::new(
637 iceberg_field.name(),
638 new_data_type,
639 iceberg_field.is_nullable(),
640 )
641 .with_metadata(metadata))
642}
643
644async fn reload_table(
645 catalog: &dyn Catalog,
646 namespace: String,
647 table_name: String,
648 current_table: Table,
649) -> anyhow::Result<Table> {
650 let namespace_ident = NamespaceIdent::new(namespace.clone());
651 let table_ident = TableIdent::new(namespace_ident, table_name.clone());
652 let current_schema = current_table.metadata().current_schema_id();
653 let current_partition_spec = current_table.metadata().default_partition_spec_id();
654
655 match catalog.load_table(&table_ident).await {
656 Ok(table) => {
657 let reloaded_schema = table.metadata().current_schema_id();
658 let reloaded_partition_spec = table.metadata().default_partition_spec_id();
659 if reloaded_schema != current_schema {
660 return Err(anyhow::anyhow!(
661 "Iceberg table '{}' schema changed during operation but schema evolution isn't supported, expected schema ID {}, got {}",
662 table_name,
663 current_schema,
664 reloaded_schema
665 ));
666 }
667
668 if reloaded_partition_spec != current_partition_spec {
669 return Err(anyhow::anyhow!(
670 "Iceberg table '{}' partition spec changed during operation but partition spec evolution isn't supported, expected partition spec ID {}, got {}",
671 table_name,
672 current_partition_spec,
673 reloaded_partition_spec
674 ));
675 }
676
677 Ok(table)
678 }
679 Err(err) => Err(err).context("Failed to reload Iceberg table"),
680 }
681}
682
683async fn try_commit_batch(
687 mut table: Table,
688 snapshot_properties: Vec<(String, String)>,
689 data_files: Vec<DataFile>,
690 delete_files: Vec<DataFile>,
691 catalog: &dyn Catalog,
692 conn_namespace: &str,
693 conn_table: &str,
694 sink_version: u64,
695 frontier: &Antichain<Timestamp>,
696 batch_lower: &Antichain<Timestamp>,
697 batch_upper: &Antichain<Timestamp>,
698 metrics: &IcebergSinkMetrics,
699) -> (Table, RetryResult<(), anyhow::Error>) {
700 let tx = Transaction::new(&table);
701 let mut action = tx
702 .row_delta()
703 .set_snapshot_properties(snapshot_properties.into_iter().collect())
704 .with_check_duplicate(false);
705
706 if !data_files.is_empty() || !delete_files.is_empty() {
707 action = action
708 .add_data_files(data_files)
709 .add_delete_files(delete_files);
710 }
711
712 let tx = match action
713 .apply(tx)
714 .context("Failed to apply data file addition to iceberg table transaction")
715 {
716 Ok(tx) => tx,
717 Err(e) => {
718 match reload_table(
719 catalog,
720 conn_namespace.to_string(),
721 conn_table.to_string(),
722 table.clone(),
723 )
724 .await
725 {
726 Ok(reloaded) => table = reloaded,
727 Err(reload_err) => {
728 return (table, RetryResult::RetryableErr(anyhow!(reload_err)));
729 }
730 }
731 return (
732 table,
733 RetryResult::RetryableErr(anyhow!(
734 "Failed to apply data file addition to iceberg table transaction: {}",
735 e
736 )),
737 );
738 }
739 };
740
741 let new_table = tx.commit(catalog).await;
742 match new_table {
743 Err(e) if matches!(e.kind(), ErrorKind::CatalogCommitConflicts) => {
744 metrics.commit_conflicts.inc();
745 match reload_table(
746 catalog,
747 conn_namespace.to_string(),
748 conn_table.to_string(),
749 table.clone(),
750 )
751 .await
752 {
753 Ok(reloaded) => table = reloaded,
754 Err(e) => {
755 return (table, RetryResult::RetryableErr(anyhow!(e)));
756 }
757 };
758
759 let mut snapshots: Vec<_> = table.metadata().snapshots().cloned().collect();
760 let last = retrieve_upper_from_snapshots(&mut snapshots);
761 let last = match last {
762 Ok(val) => val,
763 Err(e) => {
764 return (table, RetryResult::RetryableErr(anyhow!(e)));
765 }
766 };
767
768 if let Some((last_frontier, last_version)) = last {
770 if last_version > sink_version {
771 return (
772 table,
773 RetryResult::FatalErr(anyhow!(
774 "Iceberg table '{}' has been modified by another writer \
775 with version {}. Current sink version: {}. \
776 Frontiers may be out of sync, aborting to avoid data loss.",
777 conn_table,
778 last_version,
779 sink_version,
780 )),
781 );
782 }
783 if PartialOrder::less_equal(frontier, &last_frontier) {
784 return (
785 table,
786 RetryResult::FatalErr(anyhow!(
787 "Iceberg table '{}' has been modified by another writer. \
788 Current frontier: {:?}, last frontier: {:?}.",
789 conn_table,
790 frontier,
791 last_frontier,
792 )),
793 );
794 }
795 }
796
797 (
798 table,
799 RetryResult::RetryableErr(anyhow!(
800 "Commit conflict detected when committing batch [{}, {}) \
801 to Iceberg table '{}.{}'. Retrying...",
802 batch_lower.pretty(),
803 batch_upper.pretty(),
804 conn_namespace,
805 conn_table
806 )),
807 )
808 }
809 Err(e) => {
810 metrics.commit_failures.inc();
811 (table, RetryResult::RetryableErr(anyhow!(e)))
812 }
813 Ok(new_table) => (new_table, RetryResult::Ok(())),
814 }
815}
816
817async fn load_or_create_table(
819 catalog: &dyn Catalog,
820 namespace: String,
821 table_name: String,
822 schema: &Schema,
823) -> anyhow::Result<iceberg::table::Table> {
824 let namespace_ident = NamespaceIdent::new(namespace.clone());
825 let table_ident = TableIdent::new(namespace_ident.clone(), table_name.clone());
826
827 match catalog.load_table(&table_ident).await {
829 Ok(table) => {
830 Ok(table)
833 }
834 Err(err) => {
835 if matches!(err.kind(), ErrorKind::TableNotFound { .. })
836 || err
837 .message()
838 .contains("Tried to load a table that does not exist")
839 {
840 let table_creation = TableCreation::builder()
844 .name(table_name.clone())
845 .schema(schema.clone())
846 .build();
850
851 catalog
852 .create_table(&namespace_ident, table_creation)
853 .await
854 .with_context(|| {
855 format!(
856 "Failed to create Iceberg table '{}' in namespace '{}'",
857 table_name, namespace
858 )
859 })
860 } else {
861 Err(err).context("Failed to load Iceberg table")
863 }
864 }
865 }
866}
867
868fn retrieve_upper_from_snapshots(
873 snapshots: &mut [Arc<Snapshot>],
874) -> anyhow::Result<Option<(Antichain<Timestamp>, u64)>> {
875 snapshots.sort_by(|a, b| Ord::cmp(&b.sequence_number(), &a.sequence_number()));
876
877 for snapshot in snapshots {
878 let props = &snapshot.summary().additional_properties;
879 if let (Some(frontier_json), Some(sink_version_str)) =
880 (props.get("mz-frontier"), props.get("mz-sink-version"))
881 {
882 let frontier: Vec<Timestamp> = serde_json::from_str(frontier_json)
883 .context("Failed to deserialize frontier from snapshot properties")?;
884 let frontier = Antichain::from_iter(frontier);
885
886 let sink_version = sink_version_str
887 .parse::<u64>()
888 .context("Failed to parse mz-sink-version from snapshot properties")?;
889
890 return Ok(Some((frontier, sink_version)));
891 }
892 if snapshot.summary().operation.as_str() != "replace" {
893 anyhow::bail!(
898 "Iceberg table is in an inconsistent state: snapshot {} has operation '{}' but is missing 'mz-frontier' property. Schema or partition spec evolution is not supported.",
899 snapshot.snapshot_id(),
900 snapshot.summary().operation.as_str(),
901 );
902 }
903 }
904
905 Ok(None)
906}
907
908fn iceberg_type_overrides(scalar_type: &mz_repr::SqlScalarType) -> Option<(DataType, String)> {
916 use mz_repr::SqlScalarType;
917 match scalar_type {
918 SqlScalarType::UInt16 => Some((DataType::Int32, "uint2".to_string())),
919 SqlScalarType::UInt32 => Some((DataType::Int64, "uint4".to_string())),
920 SqlScalarType::UInt64 => Some((
921 DataType::Decimal128(ICEBERG_UINT64_DECIMAL_PRECISION, 0),
922 "uint8".to_string(),
923 )),
924 SqlScalarType::MzTimestamp => Some((
925 DataType::Decimal128(ICEBERG_UINT64_DECIMAL_PRECISION, 0),
926 "mz_timestamp".to_string(),
927 )),
928 _ => None,
929 }
930}
931
932fn relation_desc_to_iceberg_schema(
942 desc: &mz_repr::RelationDesc,
943) -> anyhow::Result<(ArrowSchema, SchemaRef)> {
944 let arrow_schema =
945 mz_arrow_util::builder::desc_to_schema_with_overrides(desc, iceberg_type_overrides)
946 .context("Failed to convert RelationDesc to Iceberg-compatible Arrow schema")?;
947
948 let arrow_schema_with_ids = add_field_ids_to_arrow_schema(arrow_schema);
949
950 let iceberg_schema = arrow_schema_to_schema(&arrow_schema_with_ids)
951 .context("Failed to convert Arrow schema to Iceberg schema")?;
952
953 Ok((arrow_schema_with_ids, Arc::new(iceberg_schema)))
954}
955
956fn equality_ids_for_indices(
961 current_schema: &Schema,
962 materialize_arrow_schema: &ArrowSchema,
963 equality_indices: &[usize],
964) -> anyhow::Result<Vec<i32>> {
965 let top_level_fields = current_schema.as_struct();
966
967 equality_indices
968 .iter()
969 .map(|index| {
970 let mz_field = materialize_arrow_schema
971 .fields()
972 .get(*index)
973 .with_context(|| format!("Equality delete key index {index} is out of bounds"))?;
974 let field_name = mz_field.name();
975 let iceberg_field = top_level_fields
976 .field_by_name(field_name)
977 .with_context(|| {
978 format!(
979 "Equality delete key column '{}' not found in Iceberg table schema",
980 field_name
981 )
982 })?;
983 Ok(iceberg_field.id)
984 })
985 .collect()
986}
987
988fn build_schema_with_op_column(schema: &ArrowSchema) -> ArrowSchema {
990 let mut fields: Vec<Arc<Field>> = schema.fields().iter().cloned().collect();
991 fields.push(Arc::new(Field::new("__op", DataType::Int32, false)));
992 ArrowSchema::new(fields)
993}
994
995#[allow(clippy::disallowed_types)]
1000fn build_schema_with_append_columns(schema: &ArrowSchema) -> ArrowSchema {
1001 use mz_storage_types::sinks::{ICEBERG_APPEND_DIFF_COLUMN, ICEBERG_APPEND_TIMESTAMP_COLUMN};
1002 let mut fields: Vec<Arc<Field>> = schema.fields().iter().cloned().collect();
1003 fields.push(Arc::new(Field::new(
1004 ICEBERG_APPEND_DIFF_COLUMN,
1005 DataType::Int32,
1006 false,
1007 )));
1008 fields.push(Arc::new(Field::new(
1009 ICEBERG_APPEND_TIMESTAMP_COLUMN,
1010 DataType::Int64,
1011 false,
1012 )));
1013
1014 add_field_ids_to_arrow_schema(ArrowSchema::new(fields).with_metadata(schema.metadata().clone()))
1015}
1016
1017fn mint_batch_descriptions<'scope, D>(
1022 name: String,
1023 sink_id: GlobalId,
1024 input: VecCollection<'scope, Timestamp, D, Diff>,
1025 sink: &StorageSinkDesc<CollectionMetadata, Timestamp>,
1026 connection: IcebergSinkConnection,
1027 storage_configuration: StorageConfiguration,
1028 initial_schema: SchemaRef,
1029) -> (
1030 VecCollection<'scope, Timestamp, D, Diff>,
1031 StreamVec<'scope, Timestamp, (Antichain<Timestamp>, Antichain<Timestamp>)>,
1032 StreamVec<'scope, Timestamp, Infallible>,
1033 StreamVec<'scope, Timestamp, HealthStatusMessage>,
1034 PressOnDropButton,
1035)
1036where
1037 D: Clone + 'static,
1038{
1039 let scope = input.scope();
1040 let name_for_error = name.clone();
1041 let name_for_logging = name.clone();
1042 let mut builder = OperatorBuilder::new(name, scope.clone());
1043 let sink_version = sink.version;
1044
1045 let hashed_id = sink_id.hashed();
1046 let is_active_worker = usize::cast_from(hashed_id) % scope.peers() == scope.index();
1047 let (_, table_ready_stream) = builder.new_output::<CapacityContainerBuilder<Vec<_>>>();
1048 let (output, output_stream) = builder.new_output();
1049 let (batch_desc_output, batch_desc_stream) =
1050 builder.new_output::<CapacityContainerBuilder<Vec<_>>>();
1051 let mut input =
1052 builder.new_input_for_many(input.inner, Pipeline, [&output, &batch_desc_output]);
1053
1054 let as_of = sink.as_of.clone();
1055 let commit_interval = sink
1056 .commit_interval
1057 .expect("the planner should have enforced this")
1058 .clone();
1059
1060 let (button, errors): (_, StreamVec<'scope, Timestamp, Rc<anyhow::Error>>) =
1061 builder.build_fallible(move |caps| {
1062 Box::pin(async move {
1063 let [table_ready_capset, data_capset, capset]: &mut [_; 3] = caps.try_into().unwrap();
1064 *data_capset = CapabilitySet::new();
1065
1066 if !is_active_worker {
1067 *capset = CapabilitySet::new();
1068 *data_capset = CapabilitySet::new();
1069 *table_ready_capset = CapabilitySet::new();
1070 while let Some(event) = input.next().await {
1071 match event {
1072 Event::Data([output_cap, _], mut data) => {
1073 output.give_container(&output_cap, &mut data);
1074 }
1075 Event::Progress(_) => {}
1076 }
1077 }
1078 return Ok(());
1079 }
1080
1081 let catalog = connection
1082 .catalog_connection
1083 .connect(&storage_configuration, InTask::Yes)
1084 .await
1085 .with_context(|| {
1086 format!(
1087 "Failed to connect to Iceberg catalog '{}' for table '{}.{}'",
1088 connection.catalog_connection.uri, connection.namespace, connection.table
1089 )
1090 })?;
1091
1092 let table = load_or_create_table(
1093 catalog.as_ref(),
1094 connection.namespace.clone(),
1095 connection.table.clone(),
1096 initial_schema.as_ref(),
1097 )
1098 .await?;
1099 debug!(
1100 ?sink_id,
1101 %name_for_logging,
1102 namespace = %connection.namespace,
1103 table = %connection.table,
1104 "iceberg mint loaded/created table"
1105 );
1106
1107 *table_ready_capset = CapabilitySet::new();
1108
1109 let mut snapshots: Vec<_> = table.metadata().snapshots().cloned().collect();
1110 let resume = retrieve_upper_from_snapshots(&mut snapshots)?;
1111 let (resume_upper, resume_version) = match resume {
1112 Some((f, v)) => (f, v),
1113 None => (Antichain::from_elem(Timestamp::minimum()), 0),
1114 };
1115 debug!(
1116 ?sink_id,
1117 %name_for_logging,
1118 resume_upper = %resume_upper.pretty(),
1119 resume_version,
1120 as_of = %as_of.pretty(),
1121 "iceberg mint resume position loaded"
1122 );
1123
1124 let overcompacted =
1126 *resume_upper != [Timestamp::minimum()] &&
1128 PartialOrder::less_than(&resume_upper, &as_of);
1130
1131 if overcompacted {
1132 let err = format!(
1133 "{name_for_error}: input compacted past resume upper: as_of {}, resume_upper: {}",
1134 as_of.pretty(),
1135 resume_upper.pretty()
1136 );
1137 return Err(anyhow::anyhow!("{err}"));
1141 };
1142
1143 if resume_version > sink_version {
1144 anyhow::bail!("Fenced off by newer sink version: resume_version {}, sink_version {}", resume_version, sink_version);
1145 }
1146
1147 let mut initialized = false;
1148 let mut observed_frontier;
1149 let mut max_seen_ts: Option<Timestamp> = None;
1150 let mut minted_batches = VecDeque::new();
1155 loop {
1156 if let Some(event) = input.next().await {
1157 match event {
1158 Event::Data([output_cap, _], mut data) => {
1159 if !initialized {
1160 for (_, ts, _) in data.iter() {
1161 match max_seen_ts.as_mut() {
1162 Some(max) => {
1163 if max.less_than(ts) {
1164 *max = ts.clone();
1165 }
1166 }
1167 None => {
1168 max_seen_ts = Some(ts.clone());
1169 }
1170 }
1171 }
1172 }
1173 output.give_container(&output_cap, &mut data);
1174 continue;
1175 }
1176 Event::Progress(frontier) => {
1177 observed_frontier = frontier;
1178 }
1179 }
1180 } else {
1181 return Ok(());
1182 }
1183
1184 if !initialized {
1185 if observed_frontier.is_empty() {
1186 if let Some(max_ts) = max_seen_ts.as_ref() {
1192 let synthesized_upper =
1193 Antichain::from_elem(max_ts.step_forward());
1194 debug!(
1195 ?sink_id,
1196 %name_for_logging,
1197 max_seen_ts = %max_ts,
1198 synthesized_upper = %synthesized_upper.pretty(),
1199 "iceberg mint input closed before initialization; using max seen ts"
1200 );
1201 observed_frontier = synthesized_upper;
1202 } else {
1203 debug!(
1204 ?sink_id,
1205 %name_for_logging,
1206 "iceberg mint input closed before initialization with no data"
1207 );
1208 return Ok(());
1210 }
1211 }
1212
1213 if PartialOrder::less_than(&observed_frontier, &resume_upper)
1216 || PartialOrder::less_than(&observed_frontier, &as_of)
1217 {
1218 continue;
1219 }
1220
1221 let mut batch_descriptions = vec![];
1222 let mut current_upper = observed_frontier.clone();
1223 let current_upper_ts = current_upper.as_option().expect("frontier not empty").clone();
1224
1225 let batch_lower = if PartialOrder::less_than(&resume_upper, &as_of) {
1230 as_of.clone()
1231 } else {
1232 resume_upper.clone()
1233 };
1234
1235 if batch_lower == current_upper {
1236 current_upper = Antichain::from_elem(current_upper_ts.step_forward());
1239 }
1240
1241 let batch_description = (batch_lower.clone(), current_upper.clone());
1242 debug!(
1243 ?sink_id,
1244 %name_for_logging,
1245 batch_lower = %batch_lower.pretty(),
1246 current_upper = %current_upper.pretty(),
1247 "iceberg mint initializing (catch-up batch)"
1248 );
1249 debug!(
1250 "{}: creating catch-up batch [{}, {})",
1251 name_for_logging,
1252 batch_lower.pretty(),
1253 current_upper.pretty()
1254 );
1255 batch_descriptions.push(batch_description);
1256 for i in 1..INITIAL_DESCRIPTIONS_TO_MINT {
1258 let duration_millis = commit_interval.as_millis()
1259 .checked_mul(u128::from(i))
1260 .expect("commit interval multiplication overflow");
1261 let duration_ts = Timestamp::new(
1262 u64::try_from(duration_millis)
1263 .expect("commit interval too large for u64"),
1264 );
1265 let desired_batch_upper = Antichain::from_elem(
1266 current_upper_ts.step_forward_by(&duration_ts),
1267 );
1268
1269 let batch_description =
1270 (current_upper.clone(), desired_batch_upper.clone());
1271 debug!(
1272 "{}: minting future batch {}/{} [{}, {})",
1273 name_for_logging,
1274 i,
1275 INITIAL_DESCRIPTIONS_TO_MINT,
1276 current_upper.pretty(),
1277 desired_batch_upper.pretty()
1278 );
1279 current_upper = batch_description.1.clone();
1280 batch_descriptions.push(batch_description);
1281 }
1282
1283 minted_batches.extend(batch_descriptions.clone());
1284
1285 for desc in batch_descriptions {
1286 batch_desc_output.give(&capset[0], desc);
1287 }
1288
1289 capset.downgrade(current_upper);
1290
1291 initialized = true;
1292 } else {
1293 if observed_frontier.is_empty() {
1294 return Ok(());
1296 }
1297 while let Some(oldest_desc) = minted_batches.front() {
1300 let oldest_upper = &oldest_desc.1;
1301 if !PartialOrder::less_equal(oldest_upper, &observed_frontier) {
1302 break;
1303 }
1304
1305 let newest_upper = minted_batches.back().unwrap().1.clone();
1306 let new_lower = newest_upper.clone();
1307 let duration_ts = Timestamp::new(commit_interval.as_millis()
1308 .try_into()
1309 .expect("commit interval too large for u64"));
1310 let new_upper = Antichain::from_elem(newest_upper
1311 .as_option()
1312 .unwrap()
1313 .step_forward_by(&duration_ts));
1314
1315 let new_batch_description = (new_lower.clone(), new_upper.clone());
1316 minted_batches.pop_front();
1317 minted_batches.push_back(new_batch_description.clone());
1318
1319 batch_desc_output.give(&capset[0], new_batch_description);
1320
1321 capset.downgrade(new_upper);
1322 }
1323 }
1324 }
1325 })
1326 });
1327
1328 let statuses = errors.map(|error| HealthStatusMessage {
1329 id: None,
1330 update: HealthStatusUpdate::halting(format!("{}", error.display_with_causes()), None),
1331 namespace: StatusNamespace::Iceberg,
1332 });
1333 (
1334 output_stream.as_collection(),
1335 batch_desc_stream,
1336 table_ready_stream,
1337 statuses,
1338 button.press_on_drop(),
1339 )
1340}
1341
1342#[derive(Clone, Debug, Serialize, Deserialize)]
1343#[serde(try_from = "AvroDataFile", into = "AvroDataFile")]
1344struct SerializableDataFile {
1345 pub data_file: DataFile,
1346 pub schema: Schema,
1347}
1348
1349#[derive(Clone, Debug, Serialize, Deserialize)]
1357struct AvroDataFile {
1358 pub data_file: Vec<u8>,
1359 pub schema: Vec<u8>,
1361}
1362
1363impl From<SerializableDataFile> for AvroDataFile {
1364 fn from(value: SerializableDataFile) -> Self {
1365 let mut data_file = Vec::new();
1366 write_data_files_to_avro(
1367 &mut data_file,
1368 [value.data_file],
1369 &StructType::new(vec![]),
1370 FormatVersion::V2,
1371 )
1372 .expect("serialization into buffer");
1373 let schema = serde_json::to_vec(&value.schema).expect("schema serialization");
1374 AvroDataFile { data_file, schema }
1375 }
1376}
1377
1378impl TryFrom<AvroDataFile> for SerializableDataFile {
1379 type Error = String;
1380
1381 fn try_from(value: AvroDataFile) -> Result<Self, Self::Error> {
1382 let schema: Schema = serde_json::from_slice(&value.schema)
1383 .map_err(|e| format!("Failed to deserialize schema: {}", e))?;
1384 let data_files = read_data_files_from_avro(
1385 &mut &*value.data_file,
1386 &schema,
1387 0,
1388 &StructType::new(vec![]),
1389 FormatVersion::V2,
1390 )
1391 .map_err_to_string_with_causes()?;
1392 let Some(data_file) = data_files.into_iter().next() else {
1393 return Err("No DataFile found in Avro data".into());
1394 };
1395 Ok(SerializableDataFile { data_file, schema })
1396 }
1397}
1398
1399#[derive(Clone, Debug, Serialize, Deserialize)]
1401struct BoundedDataFile {
1402 pub data_file: SerializableDataFile,
1403 pub batch_desc: (Antichain<Timestamp>, Antichain<Timestamp>),
1404}
1405
1406impl BoundedDataFile {
1407 pub fn new(
1408 file: DataFile,
1409 schema: Schema,
1410 batch_desc: (Antichain<Timestamp>, Antichain<Timestamp>),
1411 ) -> Self {
1412 Self {
1413 data_file: SerializableDataFile {
1414 data_file: file,
1415 schema,
1416 },
1417 batch_desc,
1418 }
1419 }
1420
1421 pub fn batch_desc(&self) -> &(Antichain<Timestamp>, Antichain<Timestamp>) {
1422 &self.batch_desc
1423 }
1424
1425 pub fn data_file(&self) -> &DataFile {
1426 &self.data_file.data_file
1427 }
1428
1429 pub fn into_data_file(self) -> DataFile {
1430 self.data_file.data_file
1431 }
1432}
1433
1434#[derive(Clone, Debug, Default)]
1436struct BoundedDataFileSet {
1437 pub data_files: Vec<BoundedDataFile>,
1438}
1439
1440fn write_data_files<'scope, H: EnvelopeHandler + 'static>(
1446 name: String,
1447 input: VecCollection<'scope, Timestamp, (Option<Row>, DiffPair<Row>), Diff>,
1448 batch_desc_input: StreamVec<'scope, Timestamp, (Antichain<Timestamp>, Antichain<Timestamp>)>,
1449 table_ready_stream: StreamVec<'scope, Timestamp, Infallible>,
1450 as_of: Antichain<Timestamp>,
1451 connection: IcebergSinkConnection,
1452 storage_configuration: StorageConfiguration,
1453 materialize_arrow_schema: Arc<ArrowSchema>,
1454 metrics: Arc<IcebergSinkMetrics>,
1455 statistics: SinkStatistics,
1456) -> (
1457 StreamVec<'scope, Timestamp, BoundedDataFile>,
1458 StreamVec<'scope, Timestamp, HealthStatusMessage>,
1459 PressOnDropButton,
1460) {
1461 let scope = input.scope();
1462 let name_for_logging = name.clone();
1463 let mut builder = OperatorBuilder::new(name, scope.clone());
1464
1465 let (output, output_stream) = builder.new_output::<CapacityContainerBuilder<_>>();
1466
1467 let mut table_ready_input = builder.new_disconnected_input(table_ready_stream, Pipeline);
1468 let mut batch_desc_input =
1469 builder.new_input_for(batch_desc_input.broadcast(), Pipeline, &output);
1470 let mut input = builder.new_disconnected_input(input.inner, Pipeline);
1471
1472 let (button, errors) = builder.build_fallible(move |caps| {
1473 Box::pin(async move {
1474 let [capset]: &mut [_; 1] = caps.try_into().unwrap();
1475 let catalog = connection
1476 .catalog_connection
1477 .connect(&storage_configuration, InTask::Yes)
1478 .await
1479 .with_context(|| {
1480 format!(
1481 "Failed to connect to Iceberg catalog '{}' for table '{}.{}'",
1482 connection.catalog_connection.uri, connection.namespace, connection.table
1483 )
1484 })?;
1485
1486 let namespace_ident = NamespaceIdent::new(connection.namespace.clone());
1487 let table_ident = TableIdent::new(namespace_ident, connection.table.clone());
1488 while let Some(_) = table_ready_input.next().await {
1489 }
1491 let table = catalog
1492 .load_table(&table_ident)
1493 .await
1494 .with_context(|| {
1495 format!(
1496 "Failed to load Iceberg table '{}.{}' in write_data_files operator",
1497 connection.namespace, connection.table
1498 )
1499 })?;
1500
1501 let table_metadata = table.metadata().clone();
1502 let current_schema = Arc::clone(table_metadata.current_schema());
1503
1504 let arrow_schema = Arc::new(
1508 merge_materialize_metadata_into_iceberg_schema(
1509 materialize_arrow_schema.as_ref(),
1510 current_schema.as_ref(),
1511 )
1512 .context("Failed to merge Materialize metadata into Iceberg schema")?,
1513 );
1514
1515 let location = table_metadata.location();
1519 let corrected_location = match location.rsplit_once("/metadata/") {
1520 Some((a, b)) if b.ends_with(".metadata.json") => a,
1521 _ => location,
1522 };
1523
1524 let data_location = format!("{}/data", corrected_location);
1525 let location_generator = DefaultLocationGenerator::with_data_location(data_location);
1526
1527 let unique_suffix = format!("-{}", uuid::Uuid::new_v4());
1529 let file_name_generator = DefaultFileNameGenerator::new(
1530 PARQUET_FILE_PREFIX.to_string(),
1531 Some(unique_suffix),
1532 iceberg::spec::DataFileFormat::Parquet,
1533 );
1534
1535 let file_io = table.file_io().clone();
1536
1537 let writer_properties = WriterProperties::new();
1538
1539 let ctx = WriterContext {
1540 arrow_schema,
1541 current_schema: Arc::clone(¤t_schema),
1542 file_io,
1543 location_generator,
1544 file_name_generator,
1545 writer_properties,
1546 };
1547 let handler = H::new(ctx, &connection, &materialize_arrow_schema)?;
1548
1549 let mut stashed_rows: BTreeMap<Timestamp, Vec<(Option<Row>, DiffPair<Row>)>> =
1552 BTreeMap::new();
1553
1554 #[allow(clippy::disallowed_types)]
1559 let mut in_flight_batches: std::collections::HashMap<
1560 (Antichain<Timestamp>, Antichain<Timestamp>),
1561 Box<dyn IcebergWriter>,
1562 > = std::collections::HashMap::new();
1563
1564 let mut batch_description_frontier = Antichain::from_elem(Timestamp::minimum());
1565 let mut processed_batch_description_frontier =
1566 Antichain::from_elem(Timestamp::minimum());
1567 let mut input_frontier = Antichain::from_elem(Timestamp::minimum());
1568 let mut processed_input_frontier = Antichain::from_elem(Timestamp::minimum());
1569
1570 let mut min_batch_lower: Option<Antichain<Timestamp>> = None;
1572
1573 while !(batch_description_frontier.is_empty() && input_frontier.is_empty()) {
1574 let mut staged_messages_since_flush: u64 = 0;
1575 tokio::select! {
1576 _ = batch_desc_input.ready() => {},
1577 _ = input.ready() => {}
1578 }
1579
1580 while let Some(event) = batch_desc_input.next_sync() {
1581 match event {
1582 Event::Data(_cap, data) => {
1583 for batch_desc in data {
1584 let (lower, upper) = &batch_desc;
1585
1586 if min_batch_lower.is_none() {
1588 min_batch_lower = Some(lower.clone());
1589 debug!(
1590 "{}: set min_batch_lower to {}",
1591 name_for_logging,
1592 lower.pretty()
1593 );
1594
1595 let to_remove: Vec<_> = stashed_rows
1597 .keys()
1598 .filter(|ts| {
1599 let ts_antichain = Antichain::from_elem((*ts).clone());
1600 PartialOrder::less_than(&ts_antichain, lower)
1601 })
1602 .cloned()
1603 .collect();
1604
1605 if !to_remove.is_empty() {
1606 let mut removed_count = 0;
1607 for ts in to_remove {
1608 if let Some(rows) = stashed_rows.remove(&ts) {
1609 removed_count += rows.len();
1610 for _ in &rows {
1611 metrics.stashed_rows.dec();
1612 }
1613 }
1614 }
1615 debug!(
1616 "{}: pruned {} already-committed rows (< min_batch_lower)",
1617 name_for_logging,
1618 removed_count
1619 );
1620 }
1621 }
1622
1623 let is_snapshot = lower == &as_of;
1625 debug!(
1626 "{}: received batch description [{}, {}), snapshot={}",
1627 name_for_logging,
1628 lower.pretty(),
1629 upper.pretty(),
1630 is_snapshot
1631 );
1632 let mut batch_writer =
1633 handler.create_writer(is_snapshot).await?;
1634 let row_ts_keys: Vec<_> = stashed_rows.keys().cloned().collect();
1636 let mut drained_count = 0;
1637 for row_ts in row_ts_keys {
1638 let ts = Antichain::from_elem(row_ts.clone());
1639 if PartialOrder::less_equal(lower, &ts)
1640 && PartialOrder::less_than(&ts, upper)
1641 {
1642 if let Some(rows) = stashed_rows.remove(&row_ts) {
1643 drained_count += rows.len();
1644 for (_row, diff_pair) in rows {
1645 metrics.stashed_rows.dec();
1646 let record_batch = handler.row_to_batch(
1647 diff_pair.clone(),
1648 row_ts.clone(),
1649 )
1650 .context("failed to convert row to recordbatch")?;
1651 batch_writer.write(record_batch).await?;
1652 staged_messages_since_flush += 1;
1653 if staged_messages_since_flush >= 10_000 {
1654 statistics.inc_messages_staged_by(
1655 staged_messages_since_flush,
1656 );
1657 staged_messages_since_flush = 0;
1658 }
1659 }
1660 }
1661 }
1662 }
1663 if drained_count > 0 {
1664 debug!(
1665 "{}: drained {} stashed rows into batch [{}, {})",
1666 name_for_logging,
1667 drained_count,
1668 lower.pretty(),
1669 upper.pretty()
1670 );
1671 }
1672 let prev =
1673 in_flight_batches.insert(batch_desc.clone(), batch_writer);
1674 if prev.is_some() {
1675 anyhow::bail!(
1676 "Duplicate batch description received for description {:?}",
1677 batch_desc
1678 );
1679 }
1680 }
1681 }
1682 Event::Progress(frontier) => {
1683 batch_description_frontier = frontier;
1684 }
1685 }
1686 }
1687
1688 let ready_events = std::iter::from_fn(|| input.next_sync()).collect_vec();
1689 for event in ready_events {
1690 match event {
1691 Event::Data(_cap, data) => {
1692 let mut dropped_per_time = BTreeMap::new();
1693 let mut stashed_per_time = BTreeMap::new();
1694 for ((row, diff_pair), ts, _diff) in data {
1695 let row_ts = ts.clone();
1696 let ts_antichain = Antichain::from_elem(row_ts.clone());
1697 let mut written = false;
1698 for (batch_desc, batch_writer) in in_flight_batches.iter_mut() {
1700 let (lower, upper) = batch_desc;
1701 if PartialOrder::less_equal(lower, &ts_antichain)
1702 && PartialOrder::less_than(&ts_antichain, upper)
1703 {
1704 let record_batch = handler.row_to_batch(
1705 diff_pair.clone(),
1706 row_ts.clone(),
1707 )
1708 .context("failed to convert row to recordbatch")?;
1709 batch_writer.write(record_batch).await?;
1710 staged_messages_since_flush += 1;
1711 if staged_messages_since_flush >= 10_000 {
1712 statistics.inc_messages_staged_by(
1713 staged_messages_since_flush,
1714 );
1715 staged_messages_since_flush = 0;
1716 }
1717 written = true;
1718 break;
1719 }
1720 }
1721 if !written {
1722 if let Some(ref min_lower) = min_batch_lower {
1724 if PartialOrder::less_than(&ts_antichain, min_lower) {
1725 dropped_per_time
1726 .entry(ts_antichain.into_option().unwrap())
1727 .and_modify(|c| *c += 1)
1728 .or_insert(1);
1729 continue;
1730 }
1731 }
1732
1733 stashed_per_time.entry(ts).and_modify(|c| *c += 1).or_insert(1);
1734 let entry = stashed_rows.entry(row_ts).or_default();
1735 metrics.stashed_rows.inc();
1736 entry.push((row, diff_pair));
1737 }
1738 }
1739
1740 for (ts, count) in dropped_per_time {
1741 debug!(
1742 "{}: dropped {} rows at timestamp {} (< min_batch_lower, already committed)",
1743 name_for_logging, count, ts
1744 );
1745 }
1746
1747 for (ts, count) in stashed_per_time {
1748 debug!(
1749 "{}: stashed {} rows at timestamp {} (waiting for batch description)",
1750 name_for_logging, count, ts
1751 );
1752 }
1753 }
1754 Event::Progress(frontier) => {
1755 input_frontier = frontier;
1756 }
1757 }
1758 }
1759 if staged_messages_since_flush > 0 {
1760 statistics.inc_messages_staged_by(staged_messages_since_flush);
1761 }
1762
1763 if PartialOrder::less_than(
1765 &processed_batch_description_frontier,
1766 &batch_description_frontier,
1767 ) || PartialOrder::less_than(&processed_input_frontier, &input_frontier)
1768 {
1769 let ready_batches: Vec<_> = in_flight_batches
1775 .extract_if(|(lower, upper), _| {
1776 PartialOrder::less_than(lower, &batch_description_frontier)
1777 && PartialOrder::less_equal(upper, &input_frontier)
1778 })
1779 .collect();
1780
1781 if !ready_batches.is_empty() {
1782 debug!(
1783 "{}: closing {} batches (batch_frontier: {}, input_frontier: {})",
1784 name_for_logging,
1785 ready_batches.len(),
1786 batch_description_frontier.pretty(),
1787 input_frontier.pretty()
1788 );
1789 let mut max_upper = Antichain::from_elem(Timestamp::minimum());
1790 for (desc, mut batch_writer) in ready_batches {
1791 let close_started_at = Instant::now();
1792 let data_files = batch_writer.close().await;
1793 metrics
1794 .writer_close_duration_seconds
1795 .observe(close_started_at.elapsed().as_secs_f64());
1796 let data_files = data_files.context("Failed to close batch writer")?;
1797 debug!(
1798 "{}: closed batch [{}, {}), wrote {} files",
1799 name_for_logging,
1800 desc.0.pretty(),
1801 desc.1.pretty(),
1802 data_files.len()
1803 );
1804 for data_file in data_files {
1805 match data_file.content_type() {
1806 iceberg::spec::DataContentType::Data => {
1807 metrics.data_files_written.inc();
1808 }
1809 iceberg::spec::DataContentType::PositionDeletes
1810 | iceberg::spec::DataContentType::EqualityDeletes => {
1811 metrics.delete_files_written.inc();
1812 }
1813 }
1814 statistics.inc_messages_staged_by(data_file.record_count());
1815 statistics.inc_bytes_staged_by(data_file.file_size_in_bytes());
1816 let file = BoundedDataFile::new(
1817 data_file,
1818 current_schema.as_ref().clone(),
1819 desc.clone(),
1820 );
1821 output.give(&capset[0], file);
1822 }
1823
1824 max_upper = max_upper.join(&desc.1);
1825 }
1826
1827 capset.downgrade(max_upper);
1828 }
1829 processed_batch_description_frontier.clone_from(&batch_description_frontier);
1830 processed_input_frontier.clone_from(&input_frontier);
1831 }
1832 }
1833 Ok(())
1834 })
1835 });
1836
1837 let statuses = errors.map(|error| HealthStatusMessage {
1838 id: None,
1839 update: HealthStatusUpdate::halting(format!("{}", error.display_with_causes()), None),
1840 namespace: StatusNamespace::Iceberg,
1841 });
1842 (output_stream, statuses, button.press_on_drop())
1843}
1844
1845#[cfg(test)]
1846mod tests {
1847 use super::*;
1848 use mz_repr::SqlScalarType;
1849
1850 #[mz_ore::test]
1851 fn test_iceberg_type_overrides() {
1852 let result = iceberg_type_overrides(&SqlScalarType::UInt16);
1854 assert_eq!(result.unwrap().0, DataType::Int32);
1855
1856 let result = iceberg_type_overrides(&SqlScalarType::UInt32);
1858 assert_eq!(result.unwrap().0, DataType::Int64);
1859
1860 let result = iceberg_type_overrides(&SqlScalarType::UInt64);
1862 assert_eq!(
1863 result.unwrap().0,
1864 DataType::Decimal128(ICEBERG_UINT64_DECIMAL_PRECISION, 0)
1865 );
1866
1867 let result = iceberg_type_overrides(&SqlScalarType::MzTimestamp);
1869 assert_eq!(
1870 result.unwrap().0,
1871 DataType::Decimal128(ICEBERG_UINT64_DECIMAL_PRECISION, 0)
1872 );
1873
1874 assert!(iceberg_type_overrides(&SqlScalarType::Int32).is_none());
1876 assert!(iceberg_type_overrides(&SqlScalarType::String).is_none());
1877 assert!(iceberg_type_overrides(&SqlScalarType::Bool).is_none());
1878 }
1879
1880 #[mz_ore::test]
1881 fn test_iceberg_schema_with_nested_uint64() {
1882 let desc = mz_repr::RelationDesc::builder()
1885 .with_column(
1886 "items",
1887 SqlScalarType::List {
1888 element_type: Box::new(SqlScalarType::UInt64),
1889 custom_id: None,
1890 }
1891 .nullable(true),
1892 )
1893 .finish();
1894
1895 let schema =
1896 mz_arrow_util::builder::desc_to_schema_with_overrides(&desc, iceberg_type_overrides)
1897 .expect("schema conversion should succeed");
1898
1899 if let DataType::List(field) = schema.field(0).data_type() {
1901 assert_eq!(
1902 field.data_type(),
1903 &DataType::Decimal128(ICEBERG_UINT64_DECIMAL_PRECISION, 0)
1904 );
1905 } else {
1906 panic!("Expected List type");
1907 }
1908 }
1909
1910 #[mz_ore::test]
1911 fn equality_ids_follow_iceberg_field_ids() {
1912 let map_entries = Field::new(
1913 "entries",
1914 DataType::Struct(
1915 vec![
1916 Field::new("key", DataType::Utf8, false),
1917 Field::new("value", DataType::Utf8, true),
1918 ]
1919 .into(),
1920 ),
1921 false,
1922 );
1923 let materialize_arrow_schema = ArrowSchema::new(vec![
1924 Field::new("attrs", DataType::Map(Arc::new(map_entries), false), true),
1925 Field::new("key_col", DataType::Int32, false),
1926 ]);
1927 let materialize_arrow_schema = add_field_ids_to_arrow_schema(materialize_arrow_schema);
1928 let iceberg_schema = arrow_schema_to_schema(&materialize_arrow_schema)
1929 .expect("schema conversion should succeed");
1930
1931 let equality_ids =
1932 equality_ids_for_indices(&iceberg_schema, &materialize_arrow_schema, &[1])
1933 .expect("field lookup should succeed");
1934
1935 let expected_id = iceberg_schema
1936 .as_struct()
1937 .field_by_name("key_col")
1938 .expect("top-level field should exist")
1939 .id;
1940 assert_eq!(equality_ids, vec![expected_id]);
1941 assert_ne!(expected_id, 2);
1942 }
1943}
1944
1945fn commit_to_iceberg<'scope>(
1949 name: String,
1950 sink_id: GlobalId,
1951 sink_version: u64,
1952 batch_input: StreamVec<'scope, Timestamp, BoundedDataFile>,
1953 batch_desc_input: StreamVec<'scope, Timestamp, (Antichain<Timestamp>, Antichain<Timestamp>)>,
1954 table_ready_stream: StreamVec<'scope, Timestamp, Infallible>,
1955 write_frontier: Rc<RefCell<Antichain<Timestamp>>>,
1956 connection: IcebergSinkConnection,
1957 storage_configuration: StorageConfiguration,
1958 write_handle: impl Future<
1959 Output = anyhow::Result<WriteHandle<SourceData, (), Timestamp, StorageDiff>>,
1960 > + 'static,
1961 metrics: Arc<IcebergSinkMetrics>,
1962 statistics: SinkStatistics,
1963) -> (
1964 StreamVec<'scope, Timestamp, HealthStatusMessage>,
1965 PressOnDropButton,
1966) {
1967 let scope = batch_input.scope();
1968 let mut builder = OperatorBuilder::new(name, scope.clone());
1969
1970 let hashed_id = sink_id.hashed();
1971 let is_active_worker = usize::cast_from(hashed_id) % scope.peers() == scope.index();
1972 let name_for_logging = format!("{sink_id}-commit-to-iceberg");
1973
1974 let mut input = builder.new_disconnected_input(batch_input, Exchange::new(move |_| hashed_id));
1975 let mut batch_desc_input =
1976 builder.new_disconnected_input(batch_desc_input, Exchange::new(move |_| hashed_id));
1977 let mut table_ready_input = builder.new_disconnected_input(table_ready_stream, Pipeline);
1978
1979 let (button, errors) = builder.build_fallible(move |_caps| {
1980 Box::pin(async move {
1981 if !is_active_worker {
1982 write_frontier.borrow_mut().clear();
1983 return Ok(());
1984 }
1985
1986 let catalog = connection
1987 .catalog_connection
1988 .connect(&storage_configuration, InTask::Yes)
1989 .await
1990 .with_context(|| {
1991 format!(
1992 "Failed to connect to Iceberg catalog '{}' for table '{}.{}'",
1993 connection.catalog_connection.uri, connection.namespace, connection.table
1994 )
1995 })?;
1996
1997 let mut write_handle = write_handle.await?;
1998
1999 let namespace_ident = NamespaceIdent::new(connection.namespace.clone());
2000 let table_ident = TableIdent::new(namespace_ident, connection.table.clone());
2001 while let Some(_) = table_ready_input.next().await {
2002 }
2004 let mut table = catalog.load_table(&table_ident).await.with_context(|| {
2005 format!(
2006 "Failed to load Iceberg table '{}.{}' in commit_to_iceberg operator",
2007 connection.namespace, connection.table
2008 )
2009 })?;
2010
2011 #[allow(clippy::disallowed_types)]
2012 let mut batch_descriptions: std::collections::HashMap<
2013 (Antichain<Timestamp>, Antichain<Timestamp>),
2014 BoundedDataFileSet,
2015 > = std::collections::HashMap::new();
2016
2017 let mut batch_description_frontier = Antichain::from_elem(Timestamp::minimum());
2018 let mut input_frontier = Antichain::from_elem(Timestamp::minimum());
2019
2020 while !(batch_description_frontier.is_empty() && input_frontier.is_empty()) {
2021 tokio::select! {
2022 _ = batch_desc_input.ready() => {},
2023 _ = input.ready() => {}
2024 }
2025
2026 while let Some(event) = batch_desc_input.next_sync() {
2027 match event {
2028 Event::Data(_cap, data) => {
2029 for batch_desc in data {
2030 let prev = batch_descriptions
2031 .insert(batch_desc, BoundedDataFileSet { data_files: vec![] });
2032 if let Some(prev) = prev {
2033 anyhow::bail!(
2034 "Duplicate batch description received \
2035 in commit operator: {:?}",
2036 prev
2037 );
2038 }
2039 }
2040 }
2041 Event::Progress(frontier) => {
2042 batch_description_frontier = frontier;
2043 }
2044 }
2045 }
2046
2047 let ready_events = std::iter::from_fn(|| input.next_sync()).collect_vec();
2048 for event in ready_events {
2049 match event {
2050 Event::Data(_cap, data) => {
2051 for bounded_data_file in data {
2052 let entry = batch_descriptions
2053 .entry(bounded_data_file.batch_desc().clone())
2054 .or_default();
2055 entry.data_files.push(bounded_data_file);
2056 }
2057 }
2058 Event::Progress(frontier) => {
2059 input_frontier = frontier;
2060 }
2061 }
2062 }
2063
2064 let mut done_batches: Vec<_> = batch_descriptions
2070 .keys()
2071 .filter(|(lower, _upper)| PartialOrder::less_than(lower, &input_frontier))
2072 .cloned()
2073 .collect();
2074
2075 done_batches.sort_by(|a, b| {
2077 if PartialOrder::less_than(a, b) {
2078 Ordering::Less
2079 } else if PartialOrder::less_than(b, a) {
2080 Ordering::Greater
2081 } else {
2082 Ordering::Equal
2083 }
2084 });
2085
2086 for batch in done_batches {
2087 let file_set = batch_descriptions.remove(&batch).unwrap();
2088
2089 let mut data_files = vec![];
2090 let mut delete_files = vec![];
2091 let mut total_messages: u64 = 0;
2093 let mut total_bytes: u64 = 0;
2094 for file in file_set.data_files {
2095 total_messages += file.data_file().record_count();
2096 total_bytes += file.data_file().file_size_in_bytes();
2097 match file.data_file().content_type() {
2098 iceberg::spec::DataContentType::Data => {
2099 data_files.push(file.into_data_file());
2100 }
2101 iceberg::spec::DataContentType::PositionDeletes
2102 | iceberg::spec::DataContentType::EqualityDeletes => {
2103 delete_files.push(file.into_data_file());
2104 }
2105 }
2106 }
2107
2108 debug!(
2109 ?sink_id,
2110 %name_for_logging,
2111 lower = %batch.0.pretty(),
2112 upper = %batch.1.pretty(),
2113 data_files = data_files.len(),
2114 delete_files = delete_files.len(),
2115 total_messages,
2116 total_bytes,
2117 "iceberg commit applying batch"
2118 );
2119
2120 let instant = Instant::now();
2121
2122 let frontier = batch.1.clone();
2123 let frontier_json = serde_json::to_string(&frontier.elements())
2124 .context("Failed to serialize frontier to JSON")?;
2125 let snapshot_properties = vec![
2126 ("mz-sink-id".to_string(), sink_id.to_string()),
2127 ("mz-frontier".to_string(), frontier_json),
2128 ("mz-sink-version".to_string(), sink_version.to_string()),
2129 ];
2130
2131 let (table_state, commit_result) = Retry::default()
2132 .max_tries(5)
2133 .retry_async_with_state(table, |_, table| {
2134 let snapshot_properties = snapshot_properties.clone();
2135 let data_files = data_files.clone();
2136 let delete_files = delete_files.clone();
2137 let metrics = Arc::clone(&metrics);
2138 let catalog = Arc::clone(&catalog);
2139 let conn_namespace = connection.namespace.clone();
2140 let conn_table = connection.table.clone();
2141 let frontier = frontier.clone();
2142 let batch_lower = batch.0.clone();
2143 let batch_upper = batch.1.clone();
2144 async move {
2145 try_commit_batch(
2146 table,
2147 snapshot_properties,
2148 data_files,
2149 delete_files,
2150 catalog.as_ref(),
2151 &conn_namespace,
2152 &conn_table,
2153 sink_version,
2154 &frontier,
2155 &batch_lower,
2156 &batch_upper,
2157 &metrics,
2158 )
2159 .await
2160 }
2161 })
2162 .await;
2163 let commit_result = commit_result.with_context(|| {
2164 format!(
2165 "failed to commit batch to Iceberg table '{}.{}'",
2166 connection.namespace, connection.table
2167 )
2168 });
2169 table = table_state;
2170 let duration = instant.elapsed();
2171 metrics
2172 .commit_duration_seconds
2173 .observe(duration.as_secs_f64());
2174 commit_result?;
2175
2176 debug!(
2177 ?sink_id,
2178 %name_for_logging,
2179 lower = %batch.0.pretty(),
2180 upper = %batch.1.pretty(),
2181 total_messages,
2182 total_bytes,
2183 ?duration,
2184 "iceberg commit applied batch"
2185 );
2186
2187 metrics.snapshots_committed.inc();
2188 statistics.inc_messages_committed_by(total_messages);
2189 statistics.inc_bytes_committed_by(total_bytes);
2190
2191 let mut expect_upper = write_handle.shared_upper();
2192 loop {
2193 if PartialOrder::less_equal(&frontier, &expect_upper) {
2194 break;
2196 }
2197
2198 const EMPTY: &[((SourceData, ()), Timestamp, StorageDiff)] = &[];
2199 match write_handle
2200 .compare_and_append(EMPTY, expect_upper, frontier.clone())
2201 .await
2202 .expect("valid usage")
2203 {
2204 Ok(()) => break,
2205 Err(mismatch) => {
2206 expect_upper = mismatch.current;
2207 }
2208 }
2209 }
2210 write_frontier.borrow_mut().clone_from(&frontier);
2211 }
2212 }
2213
2214 Ok(())
2215 })
2216 });
2217
2218 let statuses = errors.map(|error| HealthStatusMessage {
2219 id: None,
2220 update: HealthStatusUpdate::halting(format!("{}", error.display_with_causes()), None),
2221 namespace: StatusNamespace::Iceberg,
2222 });
2223
2224 (statuses, button.press_on_drop())
2225}
2226
2227impl<'scope> SinkRender<'scope> for IcebergSinkConnection {
2228 fn get_key_indices(&self) -> Option<&[usize]> {
2229 self.key_desc_and_indices
2230 .as_ref()
2231 .map(|(_, indices)| indices.as_slice())
2232 }
2233
2234 fn get_relation_key_indices(&self) -> Option<&[usize]> {
2235 self.relation_key_indices.as_deref()
2236 }
2237
2238 fn render_sink(
2239 &self,
2240 storage_state: &mut StorageState,
2241 sink: &StorageSinkDesc<CollectionMetadata, Timestamp>,
2242 sink_id: GlobalId,
2243 input: VecCollection<'scope, Timestamp, (Option<Row>, DiffPair<Row>), Diff>,
2244 _err_collection: VecCollection<'scope, Timestamp, DataflowError, Diff>,
2245 ) -> (
2246 StreamVec<'scope, Timestamp, HealthStatusMessage>,
2247 Vec<PressOnDropButton>,
2248 ) {
2249 let scope = input.scope();
2250
2251 let write_handle = {
2252 let persist = Arc::clone(&storage_state.persist_clients);
2253 let shard_meta = sink.to_storage_metadata.clone();
2254 async move {
2255 let client = persist.open(shard_meta.persist_location).await?;
2256 let handle = client
2257 .open_writer(
2258 shard_meta.data_shard,
2259 Arc::new(shard_meta.relation_desc),
2260 Arc::new(UnitSchema),
2261 Diagnostics::from_purpose("sink handle"),
2262 )
2263 .await?;
2264 Ok(handle)
2265 }
2266 };
2267
2268 let write_frontier = Rc::new(RefCell::new(Antichain::from_elem(Timestamp::minimum())));
2269 storage_state
2270 .sink_write_frontiers
2271 .insert(sink_id, Rc::clone(&write_frontier));
2272
2273 let (arrow_schema_with_ids, iceberg_schema) =
2274 match (|| -> Result<(ArrowSchema, Arc<Schema>), anyhow::Error> {
2275 let (arrow_schema_with_ids, iceberg_schema) =
2276 relation_desc_to_iceberg_schema(&sink.from_desc)?;
2277
2278 Ok(if sink.envelope == SinkEnvelope::Append {
2279 let extended_arrow = build_schema_with_append_columns(&arrow_schema_with_ids);
2284 let extended_iceberg = Arc::new(
2285 arrow_schema_to_schema(&extended_arrow)
2286 .context("Failed to build Iceberg schema with append columns")?,
2287 );
2288 (extended_arrow, extended_iceberg)
2289 } else {
2290 (arrow_schema_with_ids, iceberg_schema)
2291 })
2292 })() {
2293 Ok(schemas) => schemas,
2294 Err(err) => {
2295 let error_stream = std::iter::once(HealthStatusMessage {
2296 id: None,
2297 update: HealthStatusUpdate::halting(
2298 format!("{}", err.display_with_causes()),
2299 None,
2300 ),
2301 namespace: StatusNamespace::Iceberg,
2302 })
2303 .to_stream(scope);
2304 return (error_stream, vec![]);
2305 }
2306 };
2307
2308 let metrics = Arc::new(
2309 storage_state
2310 .metrics
2311 .get_iceberg_sink_metrics(sink_id, scope.index()),
2312 );
2313
2314 let statistics = storage_state
2315 .aggregated_statistics
2316 .get_sink(&sink_id)
2317 .expect("statistics initialized")
2318 .clone();
2319
2320 let connection_for_minter = self.clone();
2321 let (minted_input, batch_descriptions, table_ready, mint_status, mint_button) =
2322 mint_batch_descriptions(
2323 format!("{sink_id}-iceberg-mint"),
2324 sink_id,
2325 input,
2326 sink,
2327 connection_for_minter,
2328 storage_state.storage_configuration.clone(),
2329 Arc::clone(&iceberg_schema),
2330 );
2331
2332 let connection_for_writer = self.clone();
2333 let (datafiles, write_status, write_button) = match sink.envelope {
2334 SinkEnvelope::Upsert => write_data_files::<UpsertEnvelopeHandler>(
2335 format!("{sink_id}-write-data-files"),
2336 minted_input,
2337 batch_descriptions.clone(),
2338 table_ready.clone(),
2339 sink.as_of.clone(),
2340 connection_for_writer,
2341 storage_state.storage_configuration.clone(),
2342 Arc::new(arrow_schema_with_ids.clone()),
2343 Arc::clone(&metrics),
2344 statistics.clone(),
2345 ),
2346 SinkEnvelope::Append => write_data_files::<AppendEnvelopeHandler>(
2347 format!("{sink_id}-write-data-files"),
2348 minted_input,
2349 batch_descriptions.clone(),
2350 table_ready.clone(),
2351 sink.as_of.clone(),
2352 connection_for_writer,
2353 storage_state.storage_configuration.clone(),
2354 Arc::new(arrow_schema_with_ids.clone()),
2355 Arc::clone(&metrics),
2356 statistics.clone(),
2357 ),
2358 SinkEnvelope::Debezium => {
2359 unreachable!("Iceberg sink only supports Upsert and Append envelopes")
2360 }
2361 };
2362
2363 let connection_for_committer = self.clone();
2364 let (commit_status, commit_button) = commit_to_iceberg(
2365 format!("{sink_id}-commit-to-iceberg"),
2366 sink_id,
2367 sink.version,
2368 datafiles,
2369 batch_descriptions,
2370 table_ready,
2371 Rc::clone(&write_frontier),
2372 connection_for_committer,
2373 storage_state.storage_configuration.clone(),
2374 write_handle,
2375 Arc::clone(&metrics),
2376 statistics,
2377 );
2378
2379 let running_status = Some(HealthStatusMessage {
2380 id: None,
2381 update: HealthStatusUpdate::running(),
2382 namespace: StatusNamespace::Iceberg,
2383 })
2384 .to_stream(scope);
2385
2386 let statuses =
2387 scope.concatenate([running_status, mint_status, write_status, commit_status]);
2388
2389 (statuses, vec![mint_button, write_button, commit_button])
2390 }
2391}