1use std::cmp::Ordering;
75use std::collections::{BTreeMap, VecDeque};
76use std::convert::Infallible;
77use std::future::Future;
78use std::time::Instant;
79use std::{cell::RefCell, rc::Rc, sync::Arc};
80
81use anyhow::{Context, anyhow};
82use arrow::array::{ArrayRef, Int32Array, Int64Array, RecordBatch};
83use arrow::datatypes::{DataType, Field, Schema as ArrowSchema};
84use differential_dataflow::lattice::Lattice;
85use differential_dataflow::{AsCollection, Hashable, VecCollection};
86use futures::StreamExt;
87use iceberg::ErrorKind;
88use iceberg::arrow::{arrow_schema_to_schema, schema_to_arrow_schema};
89use iceberg::spec::{
90 DataFile, FormatVersion, Snapshot, StructType, read_data_files_from_avro,
91 write_data_files_to_avro,
92};
93use iceberg::spec::{Schema, SchemaRef};
94use iceberg::table::Table;
95use iceberg::transaction::{ApplyTransactionAction, Transaction};
96use iceberg::writer::base_writer::data_file_writer::DataFileWriterBuilder;
97use iceberg::writer::base_writer::equality_delete_writer::{
98 EqualityDeleteFileWriterBuilder, EqualityDeleteWriterConfig,
99};
100use iceberg::writer::base_writer::position_delete_writer::{
101 PositionDeleteFileWriterBuilder, PositionDeleteWriterConfig,
102};
103use iceberg::writer::combined_writer::delta_writer::DeltaWriterBuilder;
104use iceberg::writer::file_writer::ParquetWriterBuilder;
105use iceberg::writer::file_writer::location_generator::{
106 DefaultFileNameGenerator, DefaultLocationGenerator,
107};
108use iceberg::writer::file_writer::rolling_writer::RollingFileWriterBuilder;
109use iceberg::writer::{IcebergWriter, IcebergWriterBuilder};
110use iceberg::{Catalog, NamespaceIdent, TableCreation, TableIdent};
111use itertools::Itertools;
112use mz_arrow_util::builder::{ARROW_EXTENSION_NAME_KEY, ArrowBuilder};
113use mz_interchange::avro::DiffPair;
114use mz_ore::cast::CastFrom;
115use mz_ore::error::ErrorExt;
116use mz_ore::future::InTask;
117use mz_ore::result::ResultExt;
118use mz_ore::retry::{Retry, RetryResult};
119use mz_persist_client::Diagnostics;
120use mz_persist_client::write::WriteHandle;
121use mz_persist_types::codec_impls::UnitSchema;
122use mz_repr::{Diff, GlobalId, Row, Timestamp};
123use mz_storage_types::StorageDiff;
124use mz_storage_types::configuration::StorageConfiguration;
125use mz_storage_types::controller::CollectionMetadata;
126use mz_storage_types::errors::DataflowError;
127use mz_storage_types::sinks::{IcebergSinkConnection, SinkEnvelope, StorageSinkDesc};
128use mz_storage_types::sources::SourceData;
129use mz_timely_util::antichain::AntichainExt;
130use mz_timely_util::builder_async::{Event, OperatorBuilder, PressOnDropButton};
131use parquet::arrow::PARQUET_FIELD_ID_META_KEY;
132use parquet::file::properties::WriterProperties;
133use serde::{Deserialize, Serialize};
134use timely::PartialOrder;
135use timely::container::CapacityContainerBuilder;
136use timely::dataflow::channels::pact::{Exchange, Pipeline};
137use timely::dataflow::operators::vec::{Broadcast, Map, ToStream};
138use timely::dataflow::operators::{CapabilitySet, Concatenate};
139use timely::dataflow::{Scope, StreamVec};
140use timely::progress::{Antichain, Timestamp as _};
141use tracing::debug;
142
143use crate::healthcheck::{HealthStatusMessage, HealthStatusUpdate, StatusNamespace};
144use crate::metrics::sink::iceberg::IcebergSinkMetrics;
145use crate::render::sinks::SinkRender;
146use crate::statistics::SinkStatistics;
147use crate::storage_state::StorageState;
148
149const DEFAULT_ARRAY_BUILDER_ITEM_CAPACITY: usize = 1024;
152const DEFAULT_ARRAY_BUILDER_DATA_CAPACITY: usize = 1024;
156
157const PARQUET_FILE_PREFIX: &str = "mz_data";
159const INITIAL_DESCRIPTIONS_TO_MINT: u64 = 3;
162
163struct WriterContext {
166 arrow_schema: Arc<ArrowSchema>,
168 current_schema: Arc<Schema>,
170 file_io: iceberg::io::FileIO,
172 location_generator: DefaultLocationGenerator,
174 file_name_generator: DefaultFileNameGenerator,
176 writer_properties: WriterProperties,
177}
178
179trait EnvelopeHandler: Send {
181 fn new(
183 ctx: WriterContext,
184 connection: &IcebergSinkConnection,
185 materialize_arrow_schema: &Arc<ArrowSchema>,
186 ) -> anyhow::Result<Self>
187 where
188 Self: Sized;
189
190 async fn create_writer(&self, is_snapshot: bool) -> anyhow::Result<Box<dyn IcebergWriter>>;
196
197 fn row_to_batch(&self, diff_pair: DiffPair<Row>, ts: Timestamp) -> anyhow::Result<RecordBatch>;
198}
199
200struct UpsertEnvelopeHandler {
201 ctx: WriterContext,
202 equality_ids: Vec<i32>,
204 pos_schema: Arc<Schema>,
206 eq_schema: Arc<Schema>,
208 eq_config: EqualityDeleteWriterConfig,
210 schema_with_op: Arc<ArrowSchema>,
214}
215
216impl EnvelopeHandler for UpsertEnvelopeHandler {
217 fn new(
218 ctx: WriterContext,
219 connection: &IcebergSinkConnection,
220 materialize_arrow_schema: &Arc<ArrowSchema>,
221 ) -> anyhow::Result<Self> {
222 let Some((_, equality_indices)) = &connection.key_desc_and_indices else {
223 return Err(anyhow::anyhow!(
224 "Iceberg sink requires key columns for equality deletes"
225 ));
226 };
227
228 let equality_ids = equality_ids_for_indices(
229 ctx.current_schema.as_ref(),
230 materialize_arrow_schema.as_ref(),
231 equality_indices,
232 )?;
233
234 let pos_arrow_schema = PositionDeleteWriterConfig::arrow_schema();
235 let pos_schema = Arc::new(
236 arrow_schema_to_schema(&pos_arrow_schema)
237 .context("Failed to convert position delete Arrow schema to Iceberg schema")?,
238 );
239
240 let eq_config =
241 EqualityDeleteWriterConfig::new(equality_ids.clone(), Arc::clone(&ctx.current_schema))
242 .context("Failed to create EqualityDeleteWriterConfig")?;
243 let eq_schema = Arc::new(
244 arrow_schema_to_schema(eq_config.projected_arrow_schema_ref())
245 .context("Failed to convert equality delete Arrow schema to Iceberg schema")?,
246 );
247
248 let schema_with_op = Arc::new(build_schema_with_op_column(&ctx.arrow_schema));
249
250 Ok(Self {
251 ctx,
252 equality_ids,
253 pos_schema,
254 eq_schema,
255 eq_config,
256 schema_with_op,
257 })
258 }
259
260 async fn create_writer(&self, is_snapshot: bool) -> anyhow::Result<Box<dyn IcebergWriter>> {
261 let data_parquet_writer = ParquetWriterBuilder::new(
262 self.ctx.writer_properties.clone(),
263 Arc::clone(&self.ctx.current_schema),
264 )
265 .with_arrow_schema(Arc::clone(&self.ctx.arrow_schema))
266 .context("Arrow schema validation failed")?;
267 let data_rolling_writer = RollingFileWriterBuilder::new_with_default_file_size(
268 data_parquet_writer,
269 Arc::clone(&self.ctx.current_schema),
270 self.ctx.file_io.clone(),
271 self.ctx.location_generator.clone(),
272 self.ctx.file_name_generator.clone(),
273 );
274 let data_writer_builder = DataFileWriterBuilder::new(data_rolling_writer);
275
276 let pos_config = PositionDeleteWriterConfig::new(None, 0, None);
277 let pos_parquet_writer = ParquetWriterBuilder::new(
278 self.ctx.writer_properties.clone(),
279 Arc::clone(&self.pos_schema),
280 );
281 let pos_rolling_writer = RollingFileWriterBuilder::new_with_default_file_size(
282 pos_parquet_writer,
283 Arc::clone(&self.ctx.current_schema),
284 self.ctx.file_io.clone(),
285 self.ctx.location_generator.clone(),
286 self.ctx.file_name_generator.clone(),
287 );
288 let pos_delete_writer_builder =
289 PositionDeleteFileWriterBuilder::new(pos_rolling_writer, pos_config);
290
291 let eq_parquet_writer = ParquetWriterBuilder::new(
292 self.ctx.writer_properties.clone(),
293 Arc::clone(&self.eq_schema),
294 );
295 let eq_rolling_writer = RollingFileWriterBuilder::new_with_default_file_size(
296 eq_parquet_writer,
297 Arc::clone(&self.ctx.current_schema),
298 self.ctx.file_io.clone(),
299 self.ctx.location_generator.clone(),
300 self.ctx.file_name_generator.clone(),
301 );
302 let eq_delete_writer_builder =
303 EqualityDeleteFileWriterBuilder::new(eq_rolling_writer, self.eq_config.clone());
304
305 let mut builder = DeltaWriterBuilder::new(
306 data_writer_builder,
307 pos_delete_writer_builder,
308 eq_delete_writer_builder,
309 self.equality_ids.clone(),
310 );
311
312 if is_snapshot {
313 builder = builder.with_max_seen_rows(0);
314 }
315
316 Ok(Box::new(
317 builder
318 .build(None)
319 .await
320 .context("Failed to create DeltaWriter")?,
321 ))
322 }
323
324 fn row_to_batch(
327 &self,
328 diff_pair: DiffPair<Row>,
329 _ts: Timestamp,
330 ) -> anyhow::Result<RecordBatch> {
331 let mut builder = ArrowBuilder::new_with_schema(
332 Arc::clone(&self.ctx.arrow_schema),
333 DEFAULT_ARRAY_BUILDER_ITEM_CAPACITY,
334 DEFAULT_ARRAY_BUILDER_DATA_CAPACITY,
335 )
336 .context("Failed to create builder")?;
337
338 let mut op_values = Vec::new();
339
340 if let Some(before) = diff_pair.before {
341 builder
342 .add_row(&before)
343 .context("Failed to add delete row to builder")?;
344 op_values.push(-1i32);
345 }
346 if let Some(after) = diff_pair.after {
347 builder
348 .add_row(&after)
349 .context("Failed to add insert row to builder")?;
350 op_values.push(1i32);
351 }
352
353 let batch = builder
354 .to_record_batch()
355 .context("Failed to create record batch")?;
356
357 let mut columns: Vec<ArrayRef> = batch.columns().to_vec();
358 columns.push(Arc::new(Int32Array::from(op_values)));
359
360 RecordBatch::try_new(Arc::clone(&self.schema_with_op), columns)
361 .context("Failed to create batch with op column")
362 }
363}
364
365struct AppendEnvelopeHandler {
366 ctx: WriterContext,
367 user_schema_for_append: Arc<ArrowSchema>,
370}
371
372impl EnvelopeHandler for AppendEnvelopeHandler {
373 fn new(
374 ctx: WriterContext,
375 _connection: &IcebergSinkConnection,
376 _materialize_arrow_schema: &Arc<ArrowSchema>,
377 ) -> anyhow::Result<Self> {
378 let n = ctx.arrow_schema.fields().len().saturating_sub(2);
381 let user_schema_for_append =
382 Arc::new(ArrowSchema::new(ctx.arrow_schema.fields()[..n].to_vec()));
383
384 Ok(Self {
385 ctx,
386 user_schema_for_append,
387 })
388 }
389
390 async fn create_writer(&self, _is_snapshot: bool) -> anyhow::Result<Box<dyn IcebergWriter>> {
391 let data_parquet_writer = ParquetWriterBuilder::new(
392 self.ctx.writer_properties.clone(),
393 Arc::clone(&self.ctx.current_schema),
394 )
395 .with_arrow_schema(Arc::clone(&self.ctx.arrow_schema))
396 .context("Arrow schema validation failed")?;
397 let data_rolling_writer = RollingFileWriterBuilder::new_with_default_file_size(
398 data_parquet_writer,
399 Arc::clone(&self.ctx.current_schema),
400 self.ctx.file_io.clone(),
401 self.ctx.location_generator.clone(),
402 self.ctx.file_name_generator.clone(),
403 );
404 Ok(Box::new(
405 DataFileWriterBuilder::new(data_rolling_writer)
406 .build(None)
407 .await
408 .context("Failed to create DataFileWriter")?,
409 ))
410 }
411
412 fn row_to_batch(&self, diff_pair: DiffPair<Row>, ts: Timestamp) -> anyhow::Result<RecordBatch> {
415 let mut builder = ArrowBuilder::new_with_schema(
416 Arc::clone(&self.user_schema_for_append),
417 DEFAULT_ARRAY_BUILDER_ITEM_CAPACITY,
418 DEFAULT_ARRAY_BUILDER_DATA_CAPACITY,
419 )
420 .context("Failed to create builder")?;
421
422 let mut diff_values: Vec<i32> = Vec::new();
423 let ts_i64 = i64::try_from(u64::from(ts)).unwrap_or(i64::MAX);
424
425 if let Some(before) = diff_pair.before {
426 builder
427 .add_row(&before)
428 .context("Failed to add before row to builder")?;
429 diff_values.push(-1i32);
430 }
431 if let Some(after) = diff_pair.after {
432 builder
433 .add_row(&after)
434 .context("Failed to add after row to builder")?;
435 diff_values.push(1i32);
436 }
437
438 let n = diff_values.len();
439 let batch = builder
440 .to_record_batch()
441 .context("Failed to create record batch")?;
442
443 let mut columns: Vec<ArrayRef> = batch.columns().to_vec();
444 columns.push(Arc::new(Int32Array::from(diff_values)));
445 columns.push(Arc::new(Int64Array::from(vec![ts_i64; n])));
446
447 RecordBatch::try_new(Arc::clone(&self.ctx.arrow_schema), columns)
448 .context("Failed to create append record batch")
449 }
450}
451
452const ICEBERG_UINT64_DECIMAL_PRECISION: u8 = 20;
455
456fn add_field_ids_to_arrow_schema(schema: ArrowSchema) -> ArrowSchema {
461 let mut next_field_id = 1i32;
462 let fields: Vec<Field> = schema
463 .fields()
464 .iter()
465 .map(|field| add_field_ids_recursive(field, &mut next_field_id))
466 .collect();
467 ArrowSchema::new(fields).with_metadata(schema.metadata().clone())
468}
469
470fn add_field_ids_recursive(field: &Field, next_id: &mut i32) -> Field {
472 let current_id = *next_id;
473 *next_id += 1;
474
475 let mut metadata = field.metadata().clone();
476 metadata.insert(
477 PARQUET_FIELD_ID_META_KEY.to_string(),
478 current_id.to_string(),
479 );
480
481 let new_data_type = add_field_ids_to_datatype(field.data_type(), next_id);
482
483 Field::new(field.name(), new_data_type, field.is_nullable()).with_metadata(metadata)
484}
485
486fn add_field_ids_to_datatype(data_type: &DataType, next_id: &mut i32) -> DataType {
488 match data_type {
489 DataType::Struct(fields) => {
490 let new_fields: Vec<Field> = fields
491 .iter()
492 .map(|f| add_field_ids_recursive(f, next_id))
493 .collect();
494 DataType::Struct(new_fields.into())
495 }
496 DataType::List(element_field) => {
497 let new_element = add_field_ids_recursive(element_field, next_id);
498 DataType::List(Arc::new(new_element))
499 }
500 DataType::LargeList(element_field) => {
501 let new_element = add_field_ids_recursive(element_field, next_id);
502 DataType::LargeList(Arc::new(new_element))
503 }
504 DataType::Map(entries_field, sorted) => {
505 let new_entries = add_field_ids_recursive(entries_field, next_id);
506 DataType::Map(Arc::new(new_entries), *sorted)
507 }
508 _ => data_type.clone(),
509 }
510}
511
512fn merge_materialize_metadata_into_iceberg_schema(
517 materialize_arrow_schema: &ArrowSchema,
518 iceberg_schema: &Schema,
519) -> anyhow::Result<ArrowSchema> {
520 let iceberg_arrow_schema = schema_to_arrow_schema(iceberg_schema)
522 .context("Failed to convert Iceberg schema to Arrow schema")?;
523
524 let fields: Vec<Field> = iceberg_arrow_schema
526 .fields()
527 .iter()
528 .map(|iceberg_field| {
529 let mz_field = materialize_arrow_schema
531 .field_with_name(iceberg_field.name())
532 .with_context(|| {
533 format!(
534 "Field '{}' not found in Materialize schema",
535 iceberg_field.name()
536 )
537 })?;
538
539 merge_field_metadata_recursive(iceberg_field, Some(mz_field))
540 })
541 .collect::<anyhow::Result<Vec<_>>>()?;
542
543 Ok(ArrowSchema::new(fields).with_metadata(iceberg_arrow_schema.metadata().clone()))
544}
545
546fn merge_field_metadata_recursive(
548 iceberg_field: &Field,
549 mz_field: Option<&Field>,
550) -> anyhow::Result<Field> {
551 let mut metadata = iceberg_field.metadata().clone();
553
554 if let Some(mz_f) = mz_field {
556 if let Some(extension_name) = mz_f.metadata().get(ARROW_EXTENSION_NAME_KEY) {
557 metadata.insert(ARROW_EXTENSION_NAME_KEY.to_string(), extension_name.clone());
558 }
559 }
560
561 let new_data_type = match iceberg_field.data_type() {
563 DataType::Struct(iceberg_fields) => {
564 let mz_struct_fields = match mz_field {
565 Some(f) => match f.data_type() {
566 DataType::Struct(fields) => Some(fields),
567 other => anyhow::bail!(
568 "Type mismatch for field '{}': Iceberg schema has Struct, but Materialize schema has {:?}",
569 iceberg_field.name(),
570 other
571 ),
572 },
573 None => None,
574 };
575
576 let new_fields: Vec<Field> = iceberg_fields
577 .iter()
578 .map(|iceberg_inner| {
579 let mz_inner = mz_struct_fields.and_then(|fields| {
580 fields.iter().find(|f| f.name() == iceberg_inner.name())
581 });
582 merge_field_metadata_recursive(iceberg_inner, mz_inner.map(|f| f.as_ref()))
583 })
584 .collect::<anyhow::Result<Vec<_>>>()?;
585
586 DataType::Struct(new_fields.into())
587 }
588 DataType::List(iceberg_element) => {
589 let mz_element = match mz_field {
590 Some(f) => match f.data_type() {
591 DataType::List(element) => Some(element.as_ref()),
592 other => anyhow::bail!(
593 "Type mismatch for field '{}': Iceberg schema has List, but Materialize schema has {:?}",
594 iceberg_field.name(),
595 other
596 ),
597 },
598 None => None,
599 };
600 let new_element = merge_field_metadata_recursive(iceberg_element, mz_element)?;
601 DataType::List(Arc::new(new_element))
602 }
603 DataType::LargeList(iceberg_element) => {
604 let mz_element = match mz_field {
605 Some(f) => match f.data_type() {
606 DataType::LargeList(element) => Some(element.as_ref()),
607 other => anyhow::bail!(
608 "Type mismatch for field '{}': Iceberg schema has LargeList, but Materialize schema has {:?}",
609 iceberg_field.name(),
610 other
611 ),
612 },
613 None => None,
614 };
615 let new_element = merge_field_metadata_recursive(iceberg_element, mz_element)?;
616 DataType::LargeList(Arc::new(new_element))
617 }
618 DataType::Map(iceberg_entries, sorted) => {
619 let mz_entries = match mz_field {
620 Some(f) => match f.data_type() {
621 DataType::Map(entries, _) => Some(entries.as_ref()),
622 other => anyhow::bail!(
623 "Type mismatch for field '{}': Iceberg schema has Map, but Materialize schema has {:?}",
624 iceberg_field.name(),
625 other
626 ),
627 },
628 None => None,
629 };
630 let new_entries = merge_field_metadata_recursive(iceberg_entries, mz_entries)?;
631 DataType::Map(Arc::new(new_entries), *sorted)
632 }
633 other => other.clone(),
634 };
635
636 Ok(Field::new(
637 iceberg_field.name(),
638 new_data_type,
639 iceberg_field.is_nullable(),
640 )
641 .with_metadata(metadata))
642}
643
644async fn reload_table(
645 catalog: &dyn Catalog,
646 namespace: String,
647 table_name: String,
648 current_table: Table,
649) -> anyhow::Result<Table> {
650 let namespace_ident = NamespaceIdent::new(namespace.clone());
651 let table_ident = TableIdent::new(namespace_ident, table_name.clone());
652 let current_schema = current_table.metadata().current_schema_id();
653 let current_partition_spec = current_table.metadata().default_partition_spec_id();
654
655 match catalog.load_table(&table_ident).await {
656 Ok(table) => {
657 let reloaded_schema = table.metadata().current_schema_id();
658 let reloaded_partition_spec = table.metadata().default_partition_spec_id();
659 if reloaded_schema != current_schema {
660 return Err(anyhow::anyhow!(
661 "Iceberg table '{}' schema changed during operation but schema evolution isn't supported, expected schema ID {}, got {}",
662 table_name,
663 current_schema,
664 reloaded_schema
665 ));
666 }
667
668 if reloaded_partition_spec != current_partition_spec {
669 return Err(anyhow::anyhow!(
670 "Iceberg table '{}' partition spec changed during operation but partition spec evolution isn't supported, expected partition spec ID {}, got {}",
671 table_name,
672 current_partition_spec,
673 reloaded_partition_spec
674 ));
675 }
676
677 Ok(table)
678 }
679 Err(err) => Err(err).context("Failed to reload Iceberg table"),
680 }
681}
682
683async fn try_commit_batch(
687 mut table: Table,
688 snapshot_properties: Vec<(String, String)>,
689 data_files: Vec<DataFile>,
690 delete_files: Vec<DataFile>,
691 catalog: &dyn Catalog,
692 conn_namespace: &str,
693 conn_table: &str,
694 sink_version: u64,
695 frontier: &Antichain<Timestamp>,
696 batch_lower: &Antichain<Timestamp>,
697 batch_upper: &Antichain<Timestamp>,
698 metrics: &IcebergSinkMetrics,
699) -> (Table, RetryResult<(), anyhow::Error>) {
700 let tx = Transaction::new(&table);
701 let mut action = tx
702 .row_delta()
703 .set_snapshot_properties(snapshot_properties.into_iter().collect())
704 .with_check_duplicate(false);
705
706 if !data_files.is_empty() || !delete_files.is_empty() {
707 action = action
708 .add_data_files(data_files)
709 .add_delete_files(delete_files);
710 }
711
712 let tx = match action
713 .apply(tx)
714 .context("Failed to apply data file addition to iceberg table transaction")
715 {
716 Ok(tx) => tx,
717 Err(e) => {
718 match reload_table(
719 catalog,
720 conn_namespace.to_string(),
721 conn_table.to_string(),
722 table.clone(),
723 )
724 .await
725 {
726 Ok(reloaded) => table = reloaded,
727 Err(reload_err) => {
728 return (table, RetryResult::RetryableErr(anyhow!(reload_err)));
729 }
730 }
731 return (
732 table,
733 RetryResult::RetryableErr(anyhow!(
734 "Failed to apply data file addition to iceberg table transaction: {}",
735 e
736 )),
737 );
738 }
739 };
740
741 let new_table = tx.commit(catalog).await;
742 match new_table {
743 Err(e) if matches!(e.kind(), ErrorKind::CatalogCommitConflicts) => {
744 metrics.commit_conflicts.inc();
745 match reload_table(
746 catalog,
747 conn_namespace.to_string(),
748 conn_table.to_string(),
749 table.clone(),
750 )
751 .await
752 {
753 Ok(reloaded) => table = reloaded,
754 Err(e) => {
755 return (table, RetryResult::RetryableErr(anyhow!(e)));
756 }
757 };
758
759 let mut snapshots: Vec<_> = table.metadata().snapshots().cloned().collect();
760 let last = retrieve_upper_from_snapshots(&mut snapshots);
761 let last = match last {
762 Ok(val) => val,
763 Err(e) => {
764 return (table, RetryResult::RetryableErr(anyhow!(e)));
765 }
766 };
767
768 if let Some((last_frontier, last_version)) = last {
770 if last_version > sink_version {
771 return (
772 table,
773 RetryResult::FatalErr(anyhow!(
774 "Iceberg table '{}' has been modified by another writer \
775 with version {}. Current sink version: {}. \
776 Frontiers may be out of sync, aborting to avoid data loss.",
777 conn_table,
778 last_version,
779 sink_version,
780 )),
781 );
782 }
783 if PartialOrder::less_equal(frontier, &last_frontier) {
784 return (
785 table,
786 RetryResult::FatalErr(anyhow!(
787 "Iceberg table '{}' has been modified by another writer. \
788 Current frontier: {:?}, last frontier: {:?}.",
789 conn_table,
790 frontier,
791 last_frontier,
792 )),
793 );
794 }
795 }
796
797 (
798 table,
799 RetryResult::RetryableErr(anyhow!(
800 "Commit conflict detected when committing batch [{}, {}) \
801 to Iceberg table '{}.{}'. Retrying...",
802 batch_lower.pretty(),
803 batch_upper.pretty(),
804 conn_namespace,
805 conn_table
806 )),
807 )
808 }
809 Err(e) => {
810 metrics.commit_failures.inc();
811 (table, RetryResult::RetryableErr(anyhow!(e)))
812 }
813 Ok(new_table) => (new_table, RetryResult::Ok(())),
814 }
815}
816
817async fn load_or_create_table(
819 catalog: &dyn Catalog,
820 namespace: String,
821 table_name: String,
822 schema: &Schema,
823) -> anyhow::Result<iceberg::table::Table> {
824 let namespace_ident = NamespaceIdent::new(namespace.clone());
825 let table_ident = TableIdent::new(namespace_ident.clone(), table_name.clone());
826
827 match catalog.load_table(&table_ident).await {
829 Ok(table) => {
830 Ok(table)
833 }
834 Err(err) => {
835 if matches!(err.kind(), ErrorKind::TableNotFound { .. })
836 || err
837 .message()
838 .contains("Tried to load a table that does not exist")
839 {
840 let table_creation = TableCreation::builder()
844 .name(table_name.clone())
845 .schema(schema.clone())
846 .build();
850
851 catalog
852 .create_table(&namespace_ident, table_creation)
853 .await
854 .with_context(|| {
855 format!(
856 "Failed to create Iceberg table '{}' in namespace '{}'",
857 table_name, namespace
858 )
859 })
860 } else {
861 Err(err).context("Failed to load Iceberg table")
863 }
864 }
865 }
866}
867
868fn retrieve_upper_from_snapshots(
873 snapshots: &mut [Arc<Snapshot>],
874) -> anyhow::Result<Option<(Antichain<Timestamp>, u64)>> {
875 snapshots.sort_by(|a, b| Ord::cmp(&b.sequence_number(), &a.sequence_number()));
876
877 for snapshot in snapshots {
878 let props = &snapshot.summary().additional_properties;
879 if let (Some(frontier_json), Some(sink_version_str)) =
880 (props.get("mz-frontier"), props.get("mz-sink-version"))
881 {
882 let frontier: Vec<Timestamp> = serde_json::from_str(frontier_json)
883 .context("Failed to deserialize frontier from snapshot properties")?;
884 let frontier = Antichain::from_iter(frontier);
885
886 let sink_version = sink_version_str
887 .parse::<u64>()
888 .context("Failed to parse mz-sink-version from snapshot properties")?;
889
890 return Ok(Some((frontier, sink_version)));
891 }
892 if snapshot.summary().operation.as_str() != "replace" {
893 anyhow::bail!(
898 "Iceberg table is in an inconsistent state: snapshot {} has operation '{}' but is missing 'mz-frontier' property. Schema or partition spec evolution is not supported.",
899 snapshot.snapshot_id(),
900 snapshot.summary().operation.as_str(),
901 );
902 }
903 }
904
905 Ok(None)
906}
907
908fn iceberg_type_overrides(scalar_type: &mz_repr::SqlScalarType) -> Option<(DataType, String)> {
916 use mz_repr::SqlScalarType;
917 match scalar_type {
918 SqlScalarType::UInt16 => Some((DataType::Int32, "uint2".to_string())),
919 SqlScalarType::UInt32 => Some((DataType::Int64, "uint4".to_string())),
920 SqlScalarType::UInt64 => Some((
921 DataType::Decimal128(ICEBERG_UINT64_DECIMAL_PRECISION, 0),
922 "uint8".to_string(),
923 )),
924 SqlScalarType::MzTimestamp => Some((
925 DataType::Decimal128(ICEBERG_UINT64_DECIMAL_PRECISION, 0),
926 "mz_timestamp".to_string(),
927 )),
928 _ => None,
929 }
930}
931
932fn relation_desc_to_iceberg_schema(
942 desc: &mz_repr::RelationDesc,
943) -> anyhow::Result<(ArrowSchema, SchemaRef)> {
944 let arrow_schema =
945 mz_arrow_util::builder::desc_to_schema_with_overrides(desc, iceberg_type_overrides)
946 .context("Failed to convert RelationDesc to Iceberg-compatible Arrow schema")?;
947
948 let arrow_schema_with_ids = add_field_ids_to_arrow_schema(arrow_schema);
949
950 let iceberg_schema = arrow_schema_to_schema(&arrow_schema_with_ids)
951 .context("Failed to convert Arrow schema to Iceberg schema")?;
952
953 Ok((arrow_schema_with_ids, Arc::new(iceberg_schema)))
954}
955
956fn equality_ids_for_indices(
961 current_schema: &Schema,
962 materialize_arrow_schema: &ArrowSchema,
963 equality_indices: &[usize],
964) -> anyhow::Result<Vec<i32>> {
965 let top_level_fields = current_schema.as_struct();
966
967 equality_indices
968 .iter()
969 .map(|index| {
970 let mz_field = materialize_arrow_schema
971 .fields()
972 .get(*index)
973 .with_context(|| format!("Equality delete key index {index} is out of bounds"))?;
974 let field_name = mz_field.name();
975 let iceberg_field = top_level_fields
976 .field_by_name(field_name)
977 .with_context(|| {
978 format!(
979 "Equality delete key column '{}' not found in Iceberg table schema",
980 field_name
981 )
982 })?;
983 Ok(iceberg_field.id)
984 })
985 .collect()
986}
987
988fn build_schema_with_op_column(schema: &ArrowSchema) -> ArrowSchema {
990 let mut fields: Vec<Arc<Field>> = schema.fields().iter().cloned().collect();
991 fields.push(Arc::new(Field::new("__op", DataType::Int32, false)));
992 ArrowSchema::new(fields)
993}
994
995#[allow(clippy::disallowed_types)]
1000fn build_schema_with_append_columns(schema: &ArrowSchema) -> ArrowSchema {
1001 use mz_storage_types::sinks::{ICEBERG_APPEND_DIFF_COLUMN, ICEBERG_APPEND_TIMESTAMP_COLUMN};
1002 let mut fields: Vec<Arc<Field>> = schema.fields().iter().cloned().collect();
1003 fields.push(Arc::new(Field::new(
1004 ICEBERG_APPEND_DIFF_COLUMN,
1005 DataType::Int32,
1006 false,
1007 )));
1008 fields.push(Arc::new(Field::new(
1009 ICEBERG_APPEND_TIMESTAMP_COLUMN,
1010 DataType::Int64,
1011 false,
1012 )));
1013
1014 add_field_ids_to_arrow_schema(ArrowSchema::new(fields).with_metadata(schema.metadata().clone()))
1015}
1016
1017fn mint_batch_descriptions<G, D>(
1022 name: String,
1023 sink_id: GlobalId,
1024 input: VecCollection<G, D, Diff>,
1025 sink: &StorageSinkDesc<CollectionMetadata, Timestamp>,
1026 connection: IcebergSinkConnection,
1027 storage_configuration: StorageConfiguration,
1028 initial_schema: SchemaRef,
1029) -> (
1030 VecCollection<G, D, Diff>,
1031 StreamVec<G, (Antichain<Timestamp>, Antichain<Timestamp>)>,
1032 StreamVec<G, Infallible>,
1033 StreamVec<G, HealthStatusMessage>,
1034 PressOnDropButton,
1035)
1036where
1037 G: Scope<Timestamp = Timestamp>,
1038 D: Clone + 'static,
1039{
1040 let scope = input.scope();
1041 let name_for_error = name.clone();
1042 let name_for_logging = name.clone();
1043 let mut builder = OperatorBuilder::new(name, scope.clone());
1044 let sink_version = sink.version;
1045
1046 let hashed_id = sink_id.hashed();
1047 let is_active_worker = usize::cast_from(hashed_id) % scope.peers() == scope.index();
1048 let (_, table_ready_stream) = builder.new_output::<CapacityContainerBuilder<Vec<_>>>();
1049 let (output, output_stream) = builder.new_output();
1050 let (batch_desc_output, batch_desc_stream) =
1051 builder.new_output::<CapacityContainerBuilder<Vec<_>>>();
1052 let mut input =
1053 builder.new_input_for_many(input.inner, Pipeline, [&output, &batch_desc_output]);
1054
1055 let as_of = sink.as_of.clone();
1056 let commit_interval = sink
1057 .commit_interval
1058 .expect("the planner should have enforced this")
1059 .clone();
1060
1061 let (button, errors): (_, StreamVec<G, Rc<anyhow::Error>>) =
1062 builder.build_fallible(move |caps| {
1063 Box::pin(async move {
1064 let [table_ready_capset, data_capset, capset]: &mut [_; 3] = caps.try_into().unwrap();
1065 *data_capset = CapabilitySet::new();
1066
1067 if !is_active_worker {
1068 *capset = CapabilitySet::new();
1069 *data_capset = CapabilitySet::new();
1070 *table_ready_capset = CapabilitySet::new();
1071 while let Some(event) = input.next().await {
1072 match event {
1073 Event::Data([output_cap, _], mut data) => {
1074 output.give_container(&output_cap, &mut data);
1075 }
1076 Event::Progress(_) => {}
1077 }
1078 }
1079 return Ok(());
1080 }
1081
1082 let catalog = connection
1083 .catalog_connection
1084 .connect(&storage_configuration, InTask::Yes)
1085 .await
1086 .with_context(|| {
1087 format!(
1088 "Failed to connect to Iceberg catalog '{}' for table '{}.{}'",
1089 connection.catalog_connection.uri, connection.namespace, connection.table
1090 )
1091 })?;
1092
1093 let table = load_or_create_table(
1094 catalog.as_ref(),
1095 connection.namespace.clone(),
1096 connection.table.clone(),
1097 initial_schema.as_ref(),
1098 )
1099 .await?;
1100 debug!(
1101 ?sink_id,
1102 %name_for_logging,
1103 namespace = %connection.namespace,
1104 table = %connection.table,
1105 "iceberg mint loaded/created table"
1106 );
1107
1108 *table_ready_capset = CapabilitySet::new();
1109
1110 let mut snapshots: Vec<_> = table.metadata().snapshots().cloned().collect();
1111 let resume = retrieve_upper_from_snapshots(&mut snapshots)?;
1112 let (resume_upper, resume_version) = match resume {
1113 Some((f, v)) => (f, v),
1114 None => (Antichain::from_elem(Timestamp::minimum()), 0),
1115 };
1116 debug!(
1117 ?sink_id,
1118 %name_for_logging,
1119 resume_upper = %resume_upper.pretty(),
1120 resume_version,
1121 as_of = %as_of.pretty(),
1122 "iceberg mint resume position loaded"
1123 );
1124
1125 let overcompacted =
1127 *resume_upper != [Timestamp::minimum()] &&
1129 PartialOrder::less_than(&resume_upper, &as_of);
1131
1132 if overcompacted {
1133 let err = format!(
1134 "{name_for_error}: input compacted past resume upper: as_of {}, resume_upper: {}",
1135 as_of.pretty(),
1136 resume_upper.pretty()
1137 );
1138 return Err(anyhow::anyhow!("{err}"));
1142 };
1143
1144 if resume_version > sink_version {
1145 anyhow::bail!("Fenced off by newer sink version: resume_version {}, sink_version {}", resume_version, sink_version);
1146 }
1147
1148 let mut initialized = false;
1149 let mut observed_frontier;
1150 let mut max_seen_ts: Option<Timestamp> = None;
1151 let mut minted_batches = VecDeque::new();
1156 loop {
1157 if let Some(event) = input.next().await {
1158 match event {
1159 Event::Data([output_cap, _], mut data) => {
1160 if !initialized {
1161 for (_, ts, _) in data.iter() {
1162 match max_seen_ts.as_mut() {
1163 Some(max) => {
1164 if max.less_than(ts) {
1165 *max = ts.clone();
1166 }
1167 }
1168 None => {
1169 max_seen_ts = Some(ts.clone());
1170 }
1171 }
1172 }
1173 }
1174 output.give_container(&output_cap, &mut data);
1175 continue;
1176 }
1177 Event::Progress(frontier) => {
1178 observed_frontier = frontier;
1179 }
1180 }
1181 } else {
1182 return Ok(());
1183 }
1184
1185 if !initialized {
1186 if observed_frontier.is_empty() {
1187 if let Some(max_ts) = max_seen_ts.as_ref() {
1193 let synthesized_upper =
1194 Antichain::from_elem(max_ts.step_forward());
1195 debug!(
1196 ?sink_id,
1197 %name_for_logging,
1198 max_seen_ts = %max_ts,
1199 synthesized_upper = %synthesized_upper.pretty(),
1200 "iceberg mint input closed before initialization; using max seen ts"
1201 );
1202 observed_frontier = synthesized_upper;
1203 } else {
1204 debug!(
1205 ?sink_id,
1206 %name_for_logging,
1207 "iceberg mint input closed before initialization with no data"
1208 );
1209 return Ok(());
1211 }
1212 }
1213
1214 if PartialOrder::less_than(&observed_frontier, &resume_upper)
1217 || PartialOrder::less_than(&observed_frontier, &as_of)
1218 {
1219 continue;
1220 }
1221
1222 let mut batch_descriptions = vec![];
1223 let mut current_upper = observed_frontier.clone();
1224 let current_upper_ts = current_upper.as_option().expect("frontier not empty").clone();
1225
1226 let batch_lower = if PartialOrder::less_than(&resume_upper, &as_of) {
1231 as_of.clone()
1232 } else {
1233 resume_upper.clone()
1234 };
1235
1236 if batch_lower == current_upper {
1237 current_upper = Antichain::from_elem(current_upper_ts.step_forward());
1240 }
1241
1242 let batch_description = (batch_lower.clone(), current_upper.clone());
1243 debug!(
1244 ?sink_id,
1245 %name_for_logging,
1246 batch_lower = %batch_lower.pretty(),
1247 current_upper = %current_upper.pretty(),
1248 "iceberg mint initializing (catch-up batch)"
1249 );
1250 debug!(
1251 "{}: creating catch-up batch [{}, {})",
1252 name_for_logging,
1253 batch_lower.pretty(),
1254 current_upper.pretty()
1255 );
1256 batch_descriptions.push(batch_description);
1257 for i in 1..INITIAL_DESCRIPTIONS_TO_MINT {
1259 let duration_millis = commit_interval.as_millis()
1260 .checked_mul(u128::from(i))
1261 .expect("commit interval multiplication overflow");
1262 let duration_ts = Timestamp::new(
1263 u64::try_from(duration_millis)
1264 .expect("commit interval too large for u64"),
1265 );
1266 let desired_batch_upper = Antichain::from_elem(
1267 current_upper_ts.step_forward_by(&duration_ts),
1268 );
1269
1270 let batch_description =
1271 (current_upper.clone(), desired_batch_upper.clone());
1272 debug!(
1273 "{}: minting future batch {}/{} [{}, {})",
1274 name_for_logging,
1275 i,
1276 INITIAL_DESCRIPTIONS_TO_MINT,
1277 current_upper.pretty(),
1278 desired_batch_upper.pretty()
1279 );
1280 current_upper = batch_description.1.clone();
1281 batch_descriptions.push(batch_description);
1282 }
1283
1284 minted_batches.extend(batch_descriptions.clone());
1285
1286 for desc in batch_descriptions {
1287 batch_desc_output.give(&capset[0], desc);
1288 }
1289
1290 capset.downgrade(current_upper);
1291
1292 initialized = true;
1293 } else {
1294 if observed_frontier.is_empty() {
1295 return Ok(());
1297 }
1298 while let Some(oldest_desc) = minted_batches.front() {
1301 let oldest_upper = &oldest_desc.1;
1302 if !PartialOrder::less_equal(oldest_upper, &observed_frontier) {
1303 break;
1304 }
1305
1306 let newest_upper = minted_batches.back().unwrap().1.clone();
1307 let new_lower = newest_upper.clone();
1308 let duration_ts = Timestamp::new(commit_interval.as_millis()
1309 .try_into()
1310 .expect("commit interval too large for u64"));
1311 let new_upper = Antichain::from_elem(newest_upper
1312 .as_option()
1313 .unwrap()
1314 .step_forward_by(&duration_ts));
1315
1316 let new_batch_description = (new_lower.clone(), new_upper.clone());
1317 minted_batches.pop_front();
1318 minted_batches.push_back(new_batch_description.clone());
1319
1320 batch_desc_output.give(&capset[0], new_batch_description);
1321
1322 capset.downgrade(new_upper);
1323 }
1324 }
1325 }
1326 })
1327 });
1328
1329 let statuses = errors.map(|error| HealthStatusMessage {
1330 id: None,
1331 update: HealthStatusUpdate::halting(format!("{}", error.display_with_causes()), None),
1332 namespace: StatusNamespace::Iceberg,
1333 });
1334 (
1335 output_stream.as_collection(),
1336 batch_desc_stream,
1337 table_ready_stream,
1338 statuses,
1339 button.press_on_drop(),
1340 )
1341}
1342
1343#[derive(Clone, Debug, Serialize, Deserialize)]
1344#[serde(try_from = "AvroDataFile", into = "AvroDataFile")]
1345struct SerializableDataFile {
1346 pub data_file: DataFile,
1347 pub schema: Schema,
1348}
1349
1350#[derive(Clone, Debug, Serialize, Deserialize)]
1358struct AvroDataFile {
1359 pub data_file: Vec<u8>,
1360 pub schema: Vec<u8>,
1362}
1363
1364impl From<SerializableDataFile> for AvroDataFile {
1365 fn from(value: SerializableDataFile) -> Self {
1366 let mut data_file = Vec::new();
1367 write_data_files_to_avro(
1368 &mut data_file,
1369 [value.data_file],
1370 &StructType::new(vec![]),
1371 FormatVersion::V2,
1372 )
1373 .expect("serialization into buffer");
1374 let schema = serde_json::to_vec(&value.schema).expect("schema serialization");
1375 AvroDataFile { data_file, schema }
1376 }
1377}
1378
1379impl TryFrom<AvroDataFile> for SerializableDataFile {
1380 type Error = String;
1381
1382 fn try_from(value: AvroDataFile) -> Result<Self, Self::Error> {
1383 let schema: Schema = serde_json::from_slice(&value.schema)
1384 .map_err(|e| format!("Failed to deserialize schema: {}", e))?;
1385 let data_files = read_data_files_from_avro(
1386 &mut &*value.data_file,
1387 &schema,
1388 0,
1389 &StructType::new(vec![]),
1390 FormatVersion::V2,
1391 )
1392 .map_err_to_string_with_causes()?;
1393 let Some(data_file) = data_files.into_iter().next() else {
1394 return Err("No DataFile found in Avro data".into());
1395 };
1396 Ok(SerializableDataFile { data_file, schema })
1397 }
1398}
1399
1400#[derive(Clone, Debug, Serialize, Deserialize)]
1402struct BoundedDataFile {
1403 pub data_file: SerializableDataFile,
1404 pub batch_desc: (Antichain<Timestamp>, Antichain<Timestamp>),
1405}
1406
1407impl BoundedDataFile {
1408 pub fn new(
1409 file: DataFile,
1410 schema: Schema,
1411 batch_desc: (Antichain<Timestamp>, Antichain<Timestamp>),
1412 ) -> Self {
1413 Self {
1414 data_file: SerializableDataFile {
1415 data_file: file,
1416 schema,
1417 },
1418 batch_desc,
1419 }
1420 }
1421
1422 pub fn batch_desc(&self) -> &(Antichain<Timestamp>, Antichain<Timestamp>) {
1423 &self.batch_desc
1424 }
1425
1426 pub fn data_file(&self) -> &DataFile {
1427 &self.data_file.data_file
1428 }
1429
1430 pub fn into_data_file(self) -> DataFile {
1431 self.data_file.data_file
1432 }
1433}
1434
1435#[derive(Clone, Debug, Default)]
1437struct BoundedDataFileSet {
1438 pub data_files: Vec<BoundedDataFile>,
1439}
1440
1441fn write_data_files<G, H: EnvelopeHandler + 'static>(
1447 name: String,
1448 input: VecCollection<G, (Option<Row>, DiffPair<Row>), Diff>,
1449 batch_desc_input: StreamVec<G, (Antichain<Timestamp>, Antichain<Timestamp>)>,
1450 table_ready_stream: StreamVec<G, Infallible>,
1451 as_of: Antichain<Timestamp>,
1452 connection: IcebergSinkConnection,
1453 storage_configuration: StorageConfiguration,
1454 materialize_arrow_schema: Arc<ArrowSchema>,
1455 metrics: Arc<IcebergSinkMetrics>,
1456 statistics: SinkStatistics,
1457) -> (
1458 StreamVec<G, BoundedDataFile>,
1459 StreamVec<G, HealthStatusMessage>,
1460 PressOnDropButton,
1461)
1462where
1463 G: Scope<Timestamp = Timestamp>,
1464{
1465 let scope = input.scope();
1466 let name_for_logging = name.clone();
1467 let mut builder = OperatorBuilder::new(name, scope.clone());
1468
1469 let (output, output_stream) = builder.new_output::<CapacityContainerBuilder<_>>();
1470
1471 let mut table_ready_input = builder.new_disconnected_input(table_ready_stream, Pipeline);
1472 let mut batch_desc_input =
1473 builder.new_input_for(batch_desc_input.broadcast(), Pipeline, &output);
1474 let mut input = builder.new_disconnected_input(input.inner, Pipeline);
1475
1476 let (button, errors) = builder.build_fallible(move |caps| {
1477 Box::pin(async move {
1478 let [capset]: &mut [_; 1] = caps.try_into().unwrap();
1479 let catalog = connection
1480 .catalog_connection
1481 .connect(&storage_configuration, InTask::Yes)
1482 .await
1483 .with_context(|| {
1484 format!(
1485 "Failed to connect to Iceberg catalog '{}' for table '{}.{}'",
1486 connection.catalog_connection.uri, connection.namespace, connection.table
1487 )
1488 })?;
1489
1490 let namespace_ident = NamespaceIdent::new(connection.namespace.clone());
1491 let table_ident = TableIdent::new(namespace_ident, connection.table.clone());
1492 while let Some(_) = table_ready_input.next().await {
1493 }
1495 let table = catalog
1496 .load_table(&table_ident)
1497 .await
1498 .with_context(|| {
1499 format!(
1500 "Failed to load Iceberg table '{}.{}' in write_data_files operator",
1501 connection.namespace, connection.table
1502 )
1503 })?;
1504
1505 let table_metadata = table.metadata().clone();
1506 let current_schema = Arc::clone(table_metadata.current_schema());
1507
1508 let arrow_schema = Arc::new(
1512 merge_materialize_metadata_into_iceberg_schema(
1513 materialize_arrow_schema.as_ref(),
1514 current_schema.as_ref(),
1515 )
1516 .context("Failed to merge Materialize metadata into Iceberg schema")?,
1517 );
1518
1519 let location = table_metadata.location();
1523 let corrected_location = match location.rsplit_once("/metadata/") {
1524 Some((a, b)) if b.ends_with(".metadata.json") => a,
1525 _ => location,
1526 };
1527
1528 let data_location = format!("{}/data", corrected_location);
1529 let location_generator = DefaultLocationGenerator::with_data_location(data_location);
1530
1531 let unique_suffix = format!("-{}", uuid::Uuid::new_v4());
1533 let file_name_generator = DefaultFileNameGenerator::new(
1534 PARQUET_FILE_PREFIX.to_string(),
1535 Some(unique_suffix),
1536 iceberg::spec::DataFileFormat::Parquet,
1537 );
1538
1539 let file_io = table.file_io().clone();
1540
1541 let writer_properties = WriterProperties::new();
1542
1543 let ctx = WriterContext {
1544 arrow_schema,
1545 current_schema: Arc::clone(¤t_schema),
1546 file_io,
1547 location_generator,
1548 file_name_generator,
1549 writer_properties,
1550 };
1551 let handler = H::new(ctx, &connection, &materialize_arrow_schema)?;
1552
1553 let mut stashed_rows: BTreeMap<Timestamp, Vec<(Option<Row>, DiffPair<Row>)>> =
1556 BTreeMap::new();
1557
1558 #[allow(clippy::disallowed_types)]
1563 let mut in_flight_batches: std::collections::HashMap<
1564 (Antichain<Timestamp>, Antichain<Timestamp>),
1565 Box<dyn IcebergWriter>,
1566 > = std::collections::HashMap::new();
1567
1568 let mut batch_description_frontier = Antichain::from_elem(Timestamp::minimum());
1569 let mut processed_batch_description_frontier =
1570 Antichain::from_elem(Timestamp::minimum());
1571 let mut input_frontier = Antichain::from_elem(Timestamp::minimum());
1572 let mut processed_input_frontier = Antichain::from_elem(Timestamp::minimum());
1573
1574 let mut min_batch_lower: Option<Antichain<Timestamp>> = None;
1576
1577 while !(batch_description_frontier.is_empty() && input_frontier.is_empty()) {
1578 let mut staged_messages_since_flush: u64 = 0;
1579 tokio::select! {
1580 _ = batch_desc_input.ready() => {},
1581 _ = input.ready() => {}
1582 }
1583
1584 while let Some(event) = batch_desc_input.next_sync() {
1585 match event {
1586 Event::Data(_cap, data) => {
1587 for batch_desc in data {
1588 let (lower, upper) = &batch_desc;
1589
1590 if min_batch_lower.is_none() {
1592 min_batch_lower = Some(lower.clone());
1593 debug!(
1594 "{}: set min_batch_lower to {}",
1595 name_for_logging,
1596 lower.pretty()
1597 );
1598
1599 let to_remove: Vec<_> = stashed_rows
1601 .keys()
1602 .filter(|ts| {
1603 let ts_antichain = Antichain::from_elem((*ts).clone());
1604 PartialOrder::less_than(&ts_antichain, lower)
1605 })
1606 .cloned()
1607 .collect();
1608
1609 if !to_remove.is_empty() {
1610 let mut removed_count = 0;
1611 for ts in to_remove {
1612 if let Some(rows) = stashed_rows.remove(&ts) {
1613 removed_count += rows.len();
1614 for _ in &rows {
1615 metrics.stashed_rows.dec();
1616 }
1617 }
1618 }
1619 debug!(
1620 "{}: pruned {} already-committed rows (< min_batch_lower)",
1621 name_for_logging,
1622 removed_count
1623 );
1624 }
1625 }
1626
1627 let is_snapshot = lower == &as_of;
1629 debug!(
1630 "{}: received batch description [{}, {}), snapshot={}",
1631 name_for_logging,
1632 lower.pretty(),
1633 upper.pretty(),
1634 is_snapshot
1635 );
1636 let mut batch_writer =
1637 handler.create_writer(is_snapshot).await?;
1638 let row_ts_keys: Vec<_> = stashed_rows.keys().cloned().collect();
1640 let mut drained_count = 0;
1641 for row_ts in row_ts_keys {
1642 let ts = Antichain::from_elem(row_ts.clone());
1643 if PartialOrder::less_equal(lower, &ts)
1644 && PartialOrder::less_than(&ts, upper)
1645 {
1646 if let Some(rows) = stashed_rows.remove(&row_ts) {
1647 drained_count += rows.len();
1648 for (_row, diff_pair) in rows {
1649 metrics.stashed_rows.dec();
1650 let record_batch = handler.row_to_batch(
1651 diff_pair.clone(),
1652 row_ts.clone(),
1653 )
1654 .context("failed to convert row to recordbatch")?;
1655 batch_writer.write(record_batch).await?;
1656 staged_messages_since_flush += 1;
1657 if staged_messages_since_flush >= 10_000 {
1658 statistics.inc_messages_staged_by(
1659 staged_messages_since_flush,
1660 );
1661 staged_messages_since_flush = 0;
1662 }
1663 }
1664 }
1665 }
1666 }
1667 if drained_count > 0 {
1668 debug!(
1669 "{}: drained {} stashed rows into batch [{}, {})",
1670 name_for_logging,
1671 drained_count,
1672 lower.pretty(),
1673 upper.pretty()
1674 );
1675 }
1676 let prev =
1677 in_flight_batches.insert(batch_desc.clone(), batch_writer);
1678 if prev.is_some() {
1679 anyhow::bail!(
1680 "Duplicate batch description received for description {:?}",
1681 batch_desc
1682 );
1683 }
1684 }
1685 }
1686 Event::Progress(frontier) => {
1687 batch_description_frontier = frontier;
1688 }
1689 }
1690 }
1691
1692 let ready_events = std::iter::from_fn(|| input.next_sync()).collect_vec();
1693 for event in ready_events {
1694 match event {
1695 Event::Data(_cap, data) => {
1696 let mut dropped_per_time = BTreeMap::new();
1697 let mut stashed_per_time = BTreeMap::new();
1698 for ((row, diff_pair), ts, _diff) in data {
1699 let row_ts = ts.clone();
1700 let ts_antichain = Antichain::from_elem(row_ts.clone());
1701 let mut written = false;
1702 for (batch_desc, batch_writer) in in_flight_batches.iter_mut() {
1704 let (lower, upper) = batch_desc;
1705 if PartialOrder::less_equal(lower, &ts_antichain)
1706 && PartialOrder::less_than(&ts_antichain, upper)
1707 {
1708 let record_batch = handler.row_to_batch(
1709 diff_pair.clone(),
1710 row_ts.clone(),
1711 )
1712 .context("failed to convert row to recordbatch")?;
1713 batch_writer.write(record_batch).await?;
1714 staged_messages_since_flush += 1;
1715 if staged_messages_since_flush >= 10_000 {
1716 statistics.inc_messages_staged_by(
1717 staged_messages_since_flush,
1718 );
1719 staged_messages_since_flush = 0;
1720 }
1721 written = true;
1722 break;
1723 }
1724 }
1725 if !written {
1726 if let Some(ref min_lower) = min_batch_lower {
1728 if PartialOrder::less_than(&ts_antichain, min_lower) {
1729 dropped_per_time
1730 .entry(ts_antichain.into_option().unwrap())
1731 .and_modify(|c| *c += 1)
1732 .or_insert(1);
1733 continue;
1734 }
1735 }
1736
1737 stashed_per_time.entry(ts).and_modify(|c| *c += 1).or_insert(1);
1738 let entry = stashed_rows.entry(row_ts).or_default();
1739 metrics.stashed_rows.inc();
1740 entry.push((row, diff_pair));
1741 }
1742 }
1743
1744 for (ts, count) in dropped_per_time {
1745 debug!(
1746 "{}: dropped {} rows at timestamp {} (< min_batch_lower, already committed)",
1747 name_for_logging, count, ts
1748 );
1749 }
1750
1751 for (ts, count) in stashed_per_time {
1752 debug!(
1753 "{}: stashed {} rows at timestamp {} (waiting for batch description)",
1754 name_for_logging, count, ts
1755 );
1756 }
1757 }
1758 Event::Progress(frontier) => {
1759 input_frontier = frontier;
1760 }
1761 }
1762 }
1763 if staged_messages_since_flush > 0 {
1764 statistics.inc_messages_staged_by(staged_messages_since_flush);
1765 }
1766
1767 if PartialOrder::less_than(
1769 &processed_batch_description_frontier,
1770 &batch_description_frontier,
1771 ) || PartialOrder::less_than(&processed_input_frontier, &input_frontier)
1772 {
1773 let ready_batches: Vec<_> = in_flight_batches
1779 .extract_if(|(lower, upper), _| {
1780 PartialOrder::less_than(lower, &batch_description_frontier)
1781 && PartialOrder::less_equal(upper, &input_frontier)
1782 })
1783 .collect();
1784
1785 if !ready_batches.is_empty() {
1786 debug!(
1787 "{}: closing {} batches (batch_frontier: {}, input_frontier: {})",
1788 name_for_logging,
1789 ready_batches.len(),
1790 batch_description_frontier.pretty(),
1791 input_frontier.pretty()
1792 );
1793 let mut max_upper = Antichain::from_elem(Timestamp::minimum());
1794 for (desc, mut batch_writer) in ready_batches {
1795 let close_started_at = Instant::now();
1796 let data_files = batch_writer.close().await;
1797 metrics
1798 .writer_close_duration_seconds
1799 .observe(close_started_at.elapsed().as_secs_f64());
1800 let data_files = data_files.context("Failed to close batch writer")?;
1801 debug!(
1802 "{}: closed batch [{}, {}), wrote {} files",
1803 name_for_logging,
1804 desc.0.pretty(),
1805 desc.1.pretty(),
1806 data_files.len()
1807 );
1808 for data_file in data_files {
1809 match data_file.content_type() {
1810 iceberg::spec::DataContentType::Data => {
1811 metrics.data_files_written.inc();
1812 }
1813 iceberg::spec::DataContentType::PositionDeletes
1814 | iceberg::spec::DataContentType::EqualityDeletes => {
1815 metrics.delete_files_written.inc();
1816 }
1817 }
1818 statistics.inc_messages_staged_by(data_file.record_count());
1819 statistics.inc_bytes_staged_by(data_file.file_size_in_bytes());
1820 let file = BoundedDataFile::new(
1821 data_file,
1822 current_schema.as_ref().clone(),
1823 desc.clone(),
1824 );
1825 output.give(&capset[0], file);
1826 }
1827
1828 max_upper = max_upper.join(&desc.1);
1829 }
1830
1831 capset.downgrade(max_upper);
1832 }
1833 processed_batch_description_frontier.clone_from(&batch_description_frontier);
1834 processed_input_frontier.clone_from(&input_frontier);
1835 }
1836 }
1837 Ok(())
1838 })
1839 });
1840
1841 let statuses = errors.map(|error| HealthStatusMessage {
1842 id: None,
1843 update: HealthStatusUpdate::halting(format!("{}", error.display_with_causes()), None),
1844 namespace: StatusNamespace::Iceberg,
1845 });
1846 (output_stream, statuses, button.press_on_drop())
1847}
1848
1849#[cfg(test)]
1850mod tests {
1851 use super::*;
1852 use mz_repr::SqlScalarType;
1853
1854 #[mz_ore::test]
1855 fn test_iceberg_type_overrides() {
1856 let result = iceberg_type_overrides(&SqlScalarType::UInt16);
1858 assert_eq!(result.unwrap().0, DataType::Int32);
1859
1860 let result = iceberg_type_overrides(&SqlScalarType::UInt32);
1862 assert_eq!(result.unwrap().0, DataType::Int64);
1863
1864 let result = iceberg_type_overrides(&SqlScalarType::UInt64);
1866 assert_eq!(
1867 result.unwrap().0,
1868 DataType::Decimal128(ICEBERG_UINT64_DECIMAL_PRECISION, 0)
1869 );
1870
1871 let result = iceberg_type_overrides(&SqlScalarType::MzTimestamp);
1873 assert_eq!(
1874 result.unwrap().0,
1875 DataType::Decimal128(ICEBERG_UINT64_DECIMAL_PRECISION, 0)
1876 );
1877
1878 assert!(iceberg_type_overrides(&SqlScalarType::Int32).is_none());
1880 assert!(iceberg_type_overrides(&SqlScalarType::String).is_none());
1881 assert!(iceberg_type_overrides(&SqlScalarType::Bool).is_none());
1882 }
1883
1884 #[mz_ore::test]
1885 fn test_iceberg_schema_with_nested_uint64() {
1886 let desc = mz_repr::RelationDesc::builder()
1889 .with_column(
1890 "items",
1891 SqlScalarType::List {
1892 element_type: Box::new(SqlScalarType::UInt64),
1893 custom_id: None,
1894 }
1895 .nullable(true),
1896 )
1897 .finish();
1898
1899 let schema =
1900 mz_arrow_util::builder::desc_to_schema_with_overrides(&desc, iceberg_type_overrides)
1901 .expect("schema conversion should succeed");
1902
1903 if let DataType::List(field) = schema.field(0).data_type() {
1905 assert_eq!(
1906 field.data_type(),
1907 &DataType::Decimal128(ICEBERG_UINT64_DECIMAL_PRECISION, 0)
1908 );
1909 } else {
1910 panic!("Expected List type");
1911 }
1912 }
1913
1914 #[mz_ore::test]
1915 fn equality_ids_follow_iceberg_field_ids() {
1916 let map_entries = Field::new(
1917 "entries",
1918 DataType::Struct(
1919 vec![
1920 Field::new("key", DataType::Utf8, false),
1921 Field::new("value", DataType::Utf8, true),
1922 ]
1923 .into(),
1924 ),
1925 false,
1926 );
1927 let materialize_arrow_schema = ArrowSchema::new(vec![
1928 Field::new("attrs", DataType::Map(Arc::new(map_entries), false), true),
1929 Field::new("key_col", DataType::Int32, false),
1930 ]);
1931 let materialize_arrow_schema = add_field_ids_to_arrow_schema(materialize_arrow_schema);
1932 let iceberg_schema = arrow_schema_to_schema(&materialize_arrow_schema)
1933 .expect("schema conversion should succeed");
1934
1935 let equality_ids =
1936 equality_ids_for_indices(&iceberg_schema, &materialize_arrow_schema, &[1])
1937 .expect("field lookup should succeed");
1938
1939 let expected_id = iceberg_schema
1940 .as_struct()
1941 .field_by_name("key_col")
1942 .expect("top-level field should exist")
1943 .id;
1944 assert_eq!(equality_ids, vec![expected_id]);
1945 assert_ne!(expected_id, 2);
1946 }
1947}
1948
1949fn commit_to_iceberg<G>(
1953 name: String,
1954 sink_id: GlobalId,
1955 sink_version: u64,
1956 batch_input: StreamVec<G, BoundedDataFile>,
1957 batch_desc_input: StreamVec<G, (Antichain<Timestamp>, Antichain<Timestamp>)>,
1958 table_ready_stream: StreamVec<G, Infallible>,
1959 write_frontier: Rc<RefCell<Antichain<Timestamp>>>,
1960 connection: IcebergSinkConnection,
1961 storage_configuration: StorageConfiguration,
1962 write_handle: impl Future<
1963 Output = anyhow::Result<WriteHandle<SourceData, (), Timestamp, StorageDiff>>,
1964 > + 'static,
1965 metrics: Arc<IcebergSinkMetrics>,
1966 statistics: SinkStatistics,
1967) -> (StreamVec<G, HealthStatusMessage>, PressOnDropButton)
1968where
1969 G: Scope<Timestamp = Timestamp>,
1970{
1971 let scope = batch_input.scope();
1972 let mut builder = OperatorBuilder::new(name, scope.clone());
1973
1974 let hashed_id = sink_id.hashed();
1975 let is_active_worker = usize::cast_from(hashed_id) % scope.peers() == scope.index();
1976 let name_for_logging = format!("{sink_id}-commit-to-iceberg");
1977
1978 let mut input = builder.new_disconnected_input(batch_input, Exchange::new(move |_| hashed_id));
1979 let mut batch_desc_input =
1980 builder.new_disconnected_input(batch_desc_input, Exchange::new(move |_| hashed_id));
1981 let mut table_ready_input = builder.new_disconnected_input(table_ready_stream, Pipeline);
1982
1983 let (button, errors) = builder.build_fallible(move |_caps| {
1984 Box::pin(async move {
1985 if !is_active_worker {
1986 write_frontier.borrow_mut().clear();
1987 return Ok(());
1988 }
1989
1990 let catalog = connection
1991 .catalog_connection
1992 .connect(&storage_configuration, InTask::Yes)
1993 .await
1994 .with_context(|| {
1995 format!(
1996 "Failed to connect to Iceberg catalog '{}' for table '{}.{}'",
1997 connection.catalog_connection.uri, connection.namespace, connection.table
1998 )
1999 })?;
2000
2001 let mut write_handle = write_handle.await?;
2002
2003 let namespace_ident = NamespaceIdent::new(connection.namespace.clone());
2004 let table_ident = TableIdent::new(namespace_ident, connection.table.clone());
2005 while let Some(_) = table_ready_input.next().await {
2006 }
2008 let mut table = catalog.load_table(&table_ident).await.with_context(|| {
2009 format!(
2010 "Failed to load Iceberg table '{}.{}' in commit_to_iceberg operator",
2011 connection.namespace, connection.table
2012 )
2013 })?;
2014
2015 #[allow(clippy::disallowed_types)]
2016 let mut batch_descriptions: std::collections::HashMap<
2017 (Antichain<Timestamp>, Antichain<Timestamp>),
2018 BoundedDataFileSet,
2019 > = std::collections::HashMap::new();
2020
2021 let mut batch_description_frontier = Antichain::from_elem(Timestamp::minimum());
2022 let mut input_frontier = Antichain::from_elem(Timestamp::minimum());
2023
2024 while !(batch_description_frontier.is_empty() && input_frontier.is_empty()) {
2025 tokio::select! {
2026 _ = batch_desc_input.ready() => {},
2027 _ = input.ready() => {}
2028 }
2029
2030 while let Some(event) = batch_desc_input.next_sync() {
2031 match event {
2032 Event::Data(_cap, data) => {
2033 for batch_desc in data {
2034 let prev = batch_descriptions
2035 .insert(batch_desc, BoundedDataFileSet { data_files: vec![] });
2036 if let Some(prev) = prev {
2037 anyhow::bail!(
2038 "Duplicate batch description received \
2039 in commit operator: {:?}",
2040 prev
2041 );
2042 }
2043 }
2044 }
2045 Event::Progress(frontier) => {
2046 batch_description_frontier = frontier;
2047 }
2048 }
2049 }
2050
2051 let ready_events = std::iter::from_fn(|| input.next_sync()).collect_vec();
2052 for event in ready_events {
2053 match event {
2054 Event::Data(_cap, data) => {
2055 for bounded_data_file in data {
2056 let entry = batch_descriptions
2057 .entry(bounded_data_file.batch_desc().clone())
2058 .or_default();
2059 entry.data_files.push(bounded_data_file);
2060 }
2061 }
2062 Event::Progress(frontier) => {
2063 input_frontier = frontier;
2064 }
2065 }
2066 }
2067
2068 let mut done_batches: Vec<_> = batch_descriptions
2074 .keys()
2075 .filter(|(lower, _upper)| PartialOrder::less_than(lower, &input_frontier))
2076 .cloned()
2077 .collect();
2078
2079 done_batches.sort_by(|a, b| {
2081 if PartialOrder::less_than(a, b) {
2082 Ordering::Less
2083 } else if PartialOrder::less_than(b, a) {
2084 Ordering::Greater
2085 } else {
2086 Ordering::Equal
2087 }
2088 });
2089
2090 for batch in done_batches {
2091 let file_set = batch_descriptions.remove(&batch).unwrap();
2092
2093 let mut data_files = vec![];
2094 let mut delete_files = vec![];
2095 let mut total_messages: u64 = 0;
2097 let mut total_bytes: u64 = 0;
2098 for file in file_set.data_files {
2099 total_messages += file.data_file().record_count();
2100 total_bytes += file.data_file().file_size_in_bytes();
2101 match file.data_file().content_type() {
2102 iceberg::spec::DataContentType::Data => {
2103 data_files.push(file.into_data_file());
2104 }
2105 iceberg::spec::DataContentType::PositionDeletes
2106 | iceberg::spec::DataContentType::EqualityDeletes => {
2107 delete_files.push(file.into_data_file());
2108 }
2109 }
2110 }
2111
2112 debug!(
2113 ?sink_id,
2114 %name_for_logging,
2115 lower = %batch.0.pretty(),
2116 upper = %batch.1.pretty(),
2117 data_files = data_files.len(),
2118 delete_files = delete_files.len(),
2119 total_messages,
2120 total_bytes,
2121 "iceberg commit applying batch"
2122 );
2123
2124 let instant = Instant::now();
2125
2126 let frontier = batch.1.clone();
2127 let frontier_json = serde_json::to_string(&frontier.elements())
2128 .context("Failed to serialize frontier to JSON")?;
2129 let snapshot_properties = vec![
2130 ("mz-sink-id".to_string(), sink_id.to_string()),
2131 ("mz-frontier".to_string(), frontier_json),
2132 ("mz-sink-version".to_string(), sink_version.to_string()),
2133 ];
2134
2135 let (table_state, commit_result) = Retry::default()
2136 .max_tries(5)
2137 .retry_async_with_state(table, |_, table| {
2138 let snapshot_properties = snapshot_properties.clone();
2139 let data_files = data_files.clone();
2140 let delete_files = delete_files.clone();
2141 let metrics = Arc::clone(&metrics);
2142 let catalog = Arc::clone(&catalog);
2143 let conn_namespace = connection.namespace.clone();
2144 let conn_table = connection.table.clone();
2145 let frontier = frontier.clone();
2146 let batch_lower = batch.0.clone();
2147 let batch_upper = batch.1.clone();
2148 async move {
2149 try_commit_batch(
2150 table,
2151 snapshot_properties,
2152 data_files,
2153 delete_files,
2154 catalog.as_ref(),
2155 &conn_namespace,
2156 &conn_table,
2157 sink_version,
2158 &frontier,
2159 &batch_lower,
2160 &batch_upper,
2161 &metrics,
2162 )
2163 .await
2164 }
2165 })
2166 .await;
2167 let commit_result = commit_result.with_context(|| {
2168 format!(
2169 "failed to commit batch to Iceberg table '{}.{}'",
2170 connection.namespace, connection.table
2171 )
2172 });
2173 table = table_state;
2174 let duration = instant.elapsed();
2175 metrics
2176 .commit_duration_seconds
2177 .observe(duration.as_secs_f64());
2178 commit_result?;
2179
2180 debug!(
2181 ?sink_id,
2182 %name_for_logging,
2183 lower = %batch.0.pretty(),
2184 upper = %batch.1.pretty(),
2185 total_messages,
2186 total_bytes,
2187 ?duration,
2188 "iceberg commit applied batch"
2189 );
2190
2191 metrics.snapshots_committed.inc();
2192 statistics.inc_messages_committed_by(total_messages);
2193 statistics.inc_bytes_committed_by(total_bytes);
2194
2195 let mut expect_upper = write_handle.shared_upper();
2196 loop {
2197 if PartialOrder::less_equal(&frontier, &expect_upper) {
2198 break;
2200 }
2201
2202 const EMPTY: &[((SourceData, ()), Timestamp, StorageDiff)] = &[];
2203 match write_handle
2204 .compare_and_append(EMPTY, expect_upper, frontier.clone())
2205 .await
2206 .expect("valid usage")
2207 {
2208 Ok(()) => break,
2209 Err(mismatch) => {
2210 expect_upper = mismatch.current;
2211 }
2212 }
2213 }
2214 write_frontier.borrow_mut().clone_from(&frontier);
2215 }
2216 }
2217
2218 Ok(())
2219 })
2220 });
2221
2222 let statuses = errors.map(|error| HealthStatusMessage {
2223 id: None,
2224 update: HealthStatusUpdate::halting(format!("{}", error.display_with_causes()), None),
2225 namespace: StatusNamespace::Iceberg,
2226 });
2227
2228 (statuses, button.press_on_drop())
2229}
2230
2231impl<G: Scope<Timestamp = Timestamp>> SinkRender<G> for IcebergSinkConnection {
2232 fn get_key_indices(&self) -> Option<&[usize]> {
2233 self.key_desc_and_indices
2234 .as_ref()
2235 .map(|(_, indices)| indices.as_slice())
2236 }
2237
2238 fn get_relation_key_indices(&self) -> Option<&[usize]> {
2239 self.relation_key_indices.as_deref()
2240 }
2241
2242 fn render_sink(
2243 &self,
2244 storage_state: &mut StorageState,
2245 sink: &StorageSinkDesc<CollectionMetadata, Timestamp>,
2246 sink_id: GlobalId,
2247 input: VecCollection<G, (Option<Row>, DiffPair<Row>), Diff>,
2248 _err_collection: VecCollection<G, DataflowError, Diff>,
2249 ) -> (StreamVec<G, HealthStatusMessage>, Vec<PressOnDropButton>) {
2250 let mut scope = input.scope();
2251
2252 let write_handle = {
2253 let persist = Arc::clone(&storage_state.persist_clients);
2254 let shard_meta = sink.to_storage_metadata.clone();
2255 async move {
2256 let client = persist.open(shard_meta.persist_location).await?;
2257 let handle = client
2258 .open_writer(
2259 shard_meta.data_shard,
2260 Arc::new(shard_meta.relation_desc),
2261 Arc::new(UnitSchema),
2262 Diagnostics::from_purpose("sink handle"),
2263 )
2264 .await?;
2265 Ok(handle)
2266 }
2267 };
2268
2269 let write_frontier = Rc::new(RefCell::new(Antichain::from_elem(Timestamp::minimum())));
2270 storage_state
2271 .sink_write_frontiers
2272 .insert(sink_id, Rc::clone(&write_frontier));
2273
2274 let (arrow_schema_with_ids, iceberg_schema) =
2275 match (|| -> Result<(ArrowSchema, Arc<Schema>), anyhow::Error> {
2276 let (arrow_schema_with_ids, iceberg_schema) =
2277 relation_desc_to_iceberg_schema(&sink.from_desc)?;
2278
2279 Ok(if sink.envelope == SinkEnvelope::Append {
2280 let extended_arrow = build_schema_with_append_columns(&arrow_schema_with_ids);
2285 let extended_iceberg = Arc::new(
2286 arrow_schema_to_schema(&extended_arrow)
2287 .context("Failed to build Iceberg schema with append columns")?,
2288 );
2289 (extended_arrow, extended_iceberg)
2290 } else {
2291 (arrow_schema_with_ids, iceberg_schema)
2292 })
2293 })() {
2294 Ok(schemas) => schemas,
2295 Err(err) => {
2296 let error_stream = std::iter::once(HealthStatusMessage {
2297 id: None,
2298 update: HealthStatusUpdate::halting(
2299 format!("{}", err.display_with_causes()),
2300 None,
2301 ),
2302 namespace: StatusNamespace::Iceberg,
2303 })
2304 .to_stream(&mut scope);
2305 return (error_stream, vec![]);
2306 }
2307 };
2308
2309 let metrics = Arc::new(
2310 storage_state
2311 .metrics
2312 .get_iceberg_sink_metrics(sink_id, scope.index()),
2313 );
2314
2315 let statistics = storage_state
2316 .aggregated_statistics
2317 .get_sink(&sink_id)
2318 .expect("statistics initialized")
2319 .clone();
2320
2321 let connection_for_minter = self.clone();
2322 let (minted_input, batch_descriptions, table_ready, mint_status, mint_button) =
2323 mint_batch_descriptions(
2324 format!("{sink_id}-iceberg-mint"),
2325 sink_id,
2326 input,
2327 sink,
2328 connection_for_minter,
2329 storage_state.storage_configuration.clone(),
2330 Arc::clone(&iceberg_schema),
2331 );
2332
2333 let connection_for_writer = self.clone();
2334 let (datafiles, write_status, write_button) = match sink.envelope {
2335 SinkEnvelope::Upsert => write_data_files::<_, UpsertEnvelopeHandler>(
2336 format!("{sink_id}-write-data-files"),
2337 minted_input,
2338 batch_descriptions.clone(),
2339 table_ready.clone(),
2340 sink.as_of.clone(),
2341 connection_for_writer,
2342 storage_state.storage_configuration.clone(),
2343 Arc::new(arrow_schema_with_ids.clone()),
2344 Arc::clone(&metrics),
2345 statistics.clone(),
2346 ),
2347 SinkEnvelope::Append => write_data_files::<_, AppendEnvelopeHandler>(
2348 format!("{sink_id}-write-data-files"),
2349 minted_input,
2350 batch_descriptions.clone(),
2351 table_ready.clone(),
2352 sink.as_of.clone(),
2353 connection_for_writer,
2354 storage_state.storage_configuration.clone(),
2355 Arc::new(arrow_schema_with_ids.clone()),
2356 Arc::clone(&metrics),
2357 statistics.clone(),
2358 ),
2359 SinkEnvelope::Debezium => {
2360 unreachable!("Iceberg sink only supports Upsert and Append envelopes")
2361 }
2362 };
2363
2364 let connection_for_committer = self.clone();
2365 let (commit_status, commit_button) = commit_to_iceberg(
2366 format!("{sink_id}-commit-to-iceberg"),
2367 sink_id,
2368 sink.version,
2369 datafiles,
2370 batch_descriptions,
2371 table_ready,
2372 Rc::clone(&write_frontier),
2373 connection_for_committer,
2374 storage_state.storage_configuration.clone(),
2375 write_handle,
2376 Arc::clone(&metrics),
2377 statistics,
2378 );
2379
2380 let running_status = Some(HealthStatusMessage {
2381 id: None,
2382 update: HealthStatusUpdate::running(),
2383 namespace: StatusNamespace::Iceberg,
2384 })
2385 .to_stream(&mut scope);
2386
2387 let statuses =
2388 scope.concatenate([running_status, mint_status, write_status, commit_status]);
2389
2390 (statuses, vec![mint_button, write_button, commit_button])
2391 }
2392}