1use std::cmp::Ordering;
86use std::collections::{BTreeMap, VecDeque};
87use std::convert::Infallible;
88use std::future::Future;
89use std::time::Instant;
90use std::{cell::RefCell, rc::Rc, sync::Arc};
91
92use anyhow::{Context, anyhow};
93use arrow::array::{ArrayRef, Int32Array, Int64Array, RecordBatch};
94use arrow::datatypes::{DataType, Field, Schema as ArrowSchema};
95use differential_dataflow::lattice::Lattice;
96use differential_dataflow::{AsCollection, Hashable, VecCollection};
97use futures::StreamExt;
98use iceberg::ErrorKind;
99use iceberg::arrow::{arrow_schema_to_schema, schema_to_arrow_schema};
100use iceberg::spec::{
101 DataFile, FormatVersion, Snapshot, StructType, read_data_files_from_avro,
102 write_data_files_to_avro,
103};
104use iceberg::spec::{Schema, SchemaRef};
105use iceberg::table::Table;
106use iceberg::transaction::{ApplyTransactionAction, Transaction};
107use iceberg::writer::base_writer::data_file_writer::DataFileWriterBuilder;
108use iceberg::writer::base_writer::equality_delete_writer::{
109 EqualityDeleteFileWriterBuilder, EqualityDeleteWriterConfig,
110};
111use iceberg::writer::base_writer::position_delete_writer::{
112 PositionDeleteFileWriterBuilder, PositionDeleteWriterConfig,
113};
114use iceberg::writer::combined_writer::delta_writer::DeltaWriterBuilder;
115use iceberg::writer::file_writer::ParquetWriterBuilder;
116use iceberg::writer::file_writer::location_generator::{
117 DefaultFileNameGenerator, DefaultLocationGenerator,
118};
119use iceberg::writer::file_writer::rolling_writer::RollingFileWriterBuilder;
120use iceberg::writer::{IcebergWriter, IcebergWriterBuilder};
121use iceberg::{Catalog, NamespaceIdent, TableCreation, TableIdent};
122use itertools::Itertools;
123use mz_arrow_util::builder::{ARROW_EXTENSION_NAME_KEY, ArrowBuilder};
124use mz_interchange::avro::DiffPair;
125use mz_interchange::envelopes::for_each_diff_pair;
126use mz_ore::cast::CastFrom;
127use mz_ore::error::ErrorExt;
128use mz_ore::future::InTask;
129use mz_ore::result::ResultExt;
130use mz_ore::retry::{Retry, RetryResult};
131use mz_persist_client::Diagnostics;
132use mz_persist_client::write::WriteHandle;
133use mz_persist_types::codec_impls::UnitSchema;
134use mz_repr::{Diff, GlobalId, Row, Timestamp};
135use mz_storage_types::StorageDiff;
136use mz_storage_types::configuration::StorageConfiguration;
137use mz_storage_types::controller::CollectionMetadata;
138use mz_storage_types::errors::DataflowError;
139use mz_storage_types::sinks::{
140 IcebergSinkConnection, SinkEnvelope, StorageSinkDesc, iceberg_type_overrides,
141};
142use mz_storage_types::sources::SourceData;
143use mz_timely_util::antichain::AntichainExt;
144use mz_timely_util::builder_async::{Event, OperatorBuilder, PressOnDropButton};
145use parquet::arrow::PARQUET_FIELD_ID_META_KEY;
146use parquet::file::properties::WriterProperties;
147use serde::{Deserialize, Serialize};
148use timely::PartialOrder;
149use timely::container::CapacityContainerBuilder;
150use timely::dataflow::StreamVec;
151use timely::dataflow::channels::pact::{Exchange, Pipeline};
152use timely::dataflow::operators::vec::{Broadcast, Map, ToStream};
153use timely::dataflow::operators::{CapabilitySet, Concatenate};
154use timely::progress::{Antichain, Timestamp as _};
155use tracing::debug;
156
157use crate::healthcheck::{HealthStatusMessage, HealthStatusUpdate, StatusNamespace};
158use crate::metrics::sink::iceberg::IcebergSinkMetrics;
159use crate::render::sinks::{PkViolationWarner, SinkBatchStream, SinkRender};
160use crate::statistics::SinkStatistics;
161use crate::storage_state::StorageState;
162
163const DEFAULT_ARRAY_BUILDER_ITEM_CAPACITY: usize = 1024;
166const DEFAULT_ARRAY_BUILDER_DATA_CAPACITY: usize = 1024;
170
171const PARQUET_FILE_PREFIX: &str = "mz_data";
173const INITIAL_DESCRIPTIONS_TO_MINT: u64 = 3;
176
177struct WriterContext {
180 arrow_schema: Arc<ArrowSchema>,
182 current_schema: Arc<Schema>,
184 file_io: iceberg::io::FileIO,
186 location_generator: DefaultLocationGenerator,
188 file_name_generator: DefaultFileNameGenerator,
190 writer_properties: WriterProperties,
191}
192
193trait EnvelopeHandler: Send {
195 fn new(
197 ctx: WriterContext,
198 connection: &IcebergSinkConnection,
199 materialize_arrow_schema: &Arc<ArrowSchema>,
200 ) -> anyhow::Result<Self>
201 where
202 Self: Sized;
203
204 async fn create_writer(&self, is_snapshot: bool) -> anyhow::Result<Box<dyn IcebergWriter>>;
210
211 fn row_to_batch(&self, diff_pair: DiffPair<Row>, ts: Timestamp) -> anyhow::Result<RecordBatch>;
212}
213
214struct UpsertEnvelopeHandler {
215 ctx: WriterContext,
216 equality_ids: Vec<i32>,
218 pos_schema: Arc<Schema>,
220 eq_schema: Arc<Schema>,
222 eq_config: EqualityDeleteWriterConfig,
224 schema_with_op: Arc<ArrowSchema>,
228}
229
230impl EnvelopeHandler for UpsertEnvelopeHandler {
231 fn new(
232 ctx: WriterContext,
233 connection: &IcebergSinkConnection,
234 materialize_arrow_schema: &Arc<ArrowSchema>,
235 ) -> anyhow::Result<Self> {
236 let Some((_, equality_indices)) = &connection.key_desc_and_indices else {
237 return Err(anyhow::anyhow!(
238 "Iceberg sink requires key columns for equality deletes"
239 ));
240 };
241
242 let equality_ids = equality_ids_for_indices(
243 ctx.current_schema.as_ref(),
244 materialize_arrow_schema.as_ref(),
245 equality_indices,
246 )?;
247
248 let pos_arrow_schema = PositionDeleteWriterConfig::arrow_schema();
249 let pos_schema = Arc::new(
250 arrow_schema_to_schema(&pos_arrow_schema)
251 .context("Failed to convert position delete Arrow schema to Iceberg schema")?,
252 );
253
254 let eq_config =
255 EqualityDeleteWriterConfig::new(equality_ids.clone(), Arc::clone(&ctx.current_schema))
256 .context("Failed to create EqualityDeleteWriterConfig")?;
257 let eq_schema = Arc::new(
258 arrow_schema_to_schema(eq_config.projected_arrow_schema_ref())
259 .context("Failed to convert equality delete Arrow schema to Iceberg schema")?,
260 );
261
262 let schema_with_op = Arc::new(build_schema_with_op_column(&ctx.arrow_schema));
263
264 Ok(Self {
265 ctx,
266 equality_ids,
267 pos_schema,
268 eq_schema,
269 eq_config,
270 schema_with_op,
271 })
272 }
273
274 async fn create_writer(&self, is_snapshot: bool) -> anyhow::Result<Box<dyn IcebergWriter>> {
275 let data_parquet_writer = ParquetWriterBuilder::new(
276 self.ctx.writer_properties.clone(),
277 Arc::clone(&self.ctx.current_schema),
278 )
279 .with_arrow_schema(Arc::clone(&self.ctx.arrow_schema))
280 .context("Arrow schema validation failed")?;
281 let data_rolling_writer = RollingFileWriterBuilder::new_with_default_file_size(
282 data_parquet_writer,
283 Arc::clone(&self.ctx.current_schema),
284 self.ctx.file_io.clone(),
285 self.ctx.location_generator.clone(),
286 self.ctx.file_name_generator.clone(),
287 );
288 let data_writer_builder = DataFileWriterBuilder::new(data_rolling_writer);
289
290 let pos_config = PositionDeleteWriterConfig::new(None, 0, None);
291 let pos_parquet_writer = ParquetWriterBuilder::new(
292 self.ctx.writer_properties.clone(),
293 Arc::clone(&self.pos_schema),
294 );
295 let pos_rolling_writer = RollingFileWriterBuilder::new_with_default_file_size(
296 pos_parquet_writer,
297 Arc::clone(&self.ctx.current_schema),
298 self.ctx.file_io.clone(),
299 self.ctx.location_generator.clone(),
300 self.ctx.file_name_generator.clone(),
301 );
302 let pos_delete_writer_builder =
303 PositionDeleteFileWriterBuilder::new(pos_rolling_writer, pos_config);
304
305 let eq_parquet_writer = ParquetWriterBuilder::new(
306 self.ctx.writer_properties.clone(),
307 Arc::clone(&self.eq_schema),
308 );
309 let eq_rolling_writer = RollingFileWriterBuilder::new_with_default_file_size(
310 eq_parquet_writer,
311 Arc::clone(&self.ctx.current_schema),
312 self.ctx.file_io.clone(),
313 self.ctx.location_generator.clone(),
314 self.ctx.file_name_generator.clone(),
315 );
316 let eq_delete_writer_builder =
317 EqualityDeleteFileWriterBuilder::new(eq_rolling_writer, self.eq_config.clone());
318
319 let mut builder = DeltaWriterBuilder::new(
320 data_writer_builder,
321 pos_delete_writer_builder,
322 eq_delete_writer_builder,
323 self.equality_ids.clone(),
324 );
325
326 builder = if is_snapshot {
327 builder.with_max_seen_rows(0)
329 } else {
330 builder.with_max_seen_rows(usize::MAX)
342 };
343
344 Ok(Box::new(
345 builder
346 .build(None)
347 .await
348 .context("Failed to create DeltaWriter")?,
349 ))
350 }
351
352 fn row_to_batch(
355 &self,
356 diff_pair: DiffPair<Row>,
357 _ts: Timestamp,
358 ) -> anyhow::Result<RecordBatch> {
359 let mut builder = ArrowBuilder::new_with_schema(
360 Arc::clone(&self.ctx.arrow_schema),
361 DEFAULT_ARRAY_BUILDER_ITEM_CAPACITY,
362 DEFAULT_ARRAY_BUILDER_DATA_CAPACITY,
363 )
364 .context("Failed to create builder")?;
365
366 let mut op_values = Vec::new();
367
368 if let Some(before) = diff_pair.before {
369 builder
370 .add_row(&before)
371 .context("Failed to add delete row to builder")?;
372 op_values.push(-1i32);
373 }
374 if let Some(after) = diff_pair.after {
375 builder
376 .add_row(&after)
377 .context("Failed to add insert row to builder")?;
378 op_values.push(1i32);
379 }
380
381 let batch = builder
382 .to_record_batch()
383 .context("Failed to create record batch")?;
384
385 let mut columns: Vec<ArrayRef> = batch.columns().to_vec();
386 columns.push(Arc::new(Int32Array::from(op_values)));
387
388 RecordBatch::try_new(Arc::clone(&self.schema_with_op), columns)
389 .context("Failed to create batch with op column")
390 }
391}
392
393struct AppendEnvelopeHandler {
394 ctx: WriterContext,
395 user_schema_for_append: Arc<ArrowSchema>,
398}
399
400impl EnvelopeHandler for AppendEnvelopeHandler {
401 fn new(
402 ctx: WriterContext,
403 _connection: &IcebergSinkConnection,
404 _materialize_arrow_schema: &Arc<ArrowSchema>,
405 ) -> anyhow::Result<Self> {
406 let n = ctx.arrow_schema.fields().len().saturating_sub(2);
409 let user_schema_for_append =
410 Arc::new(ArrowSchema::new(ctx.arrow_schema.fields()[..n].to_vec()));
411
412 Ok(Self {
413 ctx,
414 user_schema_for_append,
415 })
416 }
417
418 async fn create_writer(&self, _is_snapshot: bool) -> anyhow::Result<Box<dyn IcebergWriter>> {
419 let data_parquet_writer = ParquetWriterBuilder::new(
420 self.ctx.writer_properties.clone(),
421 Arc::clone(&self.ctx.current_schema),
422 )
423 .with_arrow_schema(Arc::clone(&self.ctx.arrow_schema))
424 .context("Arrow schema validation failed")?;
425 let data_rolling_writer = RollingFileWriterBuilder::new_with_default_file_size(
426 data_parquet_writer,
427 Arc::clone(&self.ctx.current_schema),
428 self.ctx.file_io.clone(),
429 self.ctx.location_generator.clone(),
430 self.ctx.file_name_generator.clone(),
431 );
432 Ok(Box::new(
433 DataFileWriterBuilder::new(data_rolling_writer)
434 .build(None)
435 .await
436 .context("Failed to create DataFileWriter")?,
437 ))
438 }
439
440 fn row_to_batch(&self, diff_pair: DiffPair<Row>, ts: Timestamp) -> anyhow::Result<RecordBatch> {
443 let mut builder = ArrowBuilder::new_with_schema(
444 Arc::clone(&self.user_schema_for_append),
445 DEFAULT_ARRAY_BUILDER_ITEM_CAPACITY,
446 DEFAULT_ARRAY_BUILDER_DATA_CAPACITY,
447 )
448 .context("Failed to create builder")?;
449
450 let mut diff_values: Vec<i32> = Vec::new();
451 let ts_i64 = i64::try_from(u64::from(ts)).unwrap_or(i64::MAX);
452
453 if let Some(before) = diff_pair.before {
454 builder
455 .add_row(&before)
456 .context("Failed to add before row to builder")?;
457 diff_values.push(-1i32);
458 }
459 if let Some(after) = diff_pair.after {
460 builder
461 .add_row(&after)
462 .context("Failed to add after row to builder")?;
463 diff_values.push(1i32);
464 }
465
466 let n = diff_values.len();
467 let batch = builder
468 .to_record_batch()
469 .context("Failed to create record batch")?;
470
471 let mut columns: Vec<ArrayRef> = batch.columns().to_vec();
472 columns.push(Arc::new(Int32Array::from(diff_values)));
473 columns.push(Arc::new(Int64Array::from(vec![ts_i64; n])));
474
475 RecordBatch::try_new(Arc::clone(&self.ctx.arrow_schema), columns)
476 .context("Failed to create append record batch")
477 }
478}
479
480fn add_field_ids_to_arrow_schema(schema: ArrowSchema) -> ArrowSchema {
485 let mut next_field_id = 1i32;
486 let fields: Vec<Field> = schema
487 .fields()
488 .iter()
489 .map(|field| add_field_ids_recursive(field, &mut next_field_id))
490 .collect();
491 ArrowSchema::new(fields).with_metadata(schema.metadata().clone())
492}
493
494fn add_field_ids_recursive(field: &Field, next_id: &mut i32) -> Field {
496 let current_id = *next_id;
497 *next_id += 1;
498
499 let mut metadata = field.metadata().clone();
500 metadata.insert(
501 PARQUET_FIELD_ID_META_KEY.to_string(),
502 current_id.to_string(),
503 );
504
505 let new_data_type = add_field_ids_to_datatype(field.data_type(), next_id);
506
507 Field::new(field.name(), new_data_type, field.is_nullable()).with_metadata(metadata)
508}
509
510fn add_field_ids_to_datatype(data_type: &DataType, next_id: &mut i32) -> DataType {
512 match data_type {
513 DataType::Struct(fields) => {
514 let new_fields: Vec<Field> = fields
515 .iter()
516 .map(|f| add_field_ids_recursive(f, next_id))
517 .collect();
518 DataType::Struct(new_fields.into())
519 }
520 DataType::List(element_field) => {
521 let new_element = add_field_ids_recursive(element_field, next_id);
522 DataType::List(Arc::new(new_element))
523 }
524 DataType::LargeList(element_field) => {
525 let new_element = add_field_ids_recursive(element_field, next_id);
526 DataType::LargeList(Arc::new(new_element))
527 }
528 DataType::Map(entries_field, sorted) => {
529 let new_entries = add_field_ids_recursive(entries_field, next_id);
530 DataType::Map(Arc::new(new_entries), *sorted)
531 }
532 _ => data_type.clone(),
533 }
534}
535
536fn merge_materialize_metadata_into_iceberg_schema(
541 materialize_arrow_schema: &ArrowSchema,
542 iceberg_schema: &Schema,
543) -> anyhow::Result<ArrowSchema> {
544 let iceberg_arrow_schema = schema_to_arrow_schema(iceberg_schema)
546 .context("Failed to convert Iceberg schema to Arrow schema")?;
547
548 let fields: Vec<Field> = iceberg_arrow_schema
550 .fields()
551 .iter()
552 .map(|iceberg_field| {
553 let mz_field = materialize_arrow_schema
555 .field_with_name(iceberg_field.name())
556 .with_context(|| {
557 format!(
558 "Field '{}' not found in Materialize schema",
559 iceberg_field.name()
560 )
561 })?;
562
563 merge_field_metadata_recursive(iceberg_field, Some(mz_field))
564 })
565 .collect::<anyhow::Result<Vec<_>>>()?;
566
567 Ok(ArrowSchema::new(fields).with_metadata(iceberg_arrow_schema.metadata().clone()))
568}
569
570fn merge_field_metadata_recursive(
572 iceberg_field: &Field,
573 mz_field: Option<&Field>,
574) -> anyhow::Result<Field> {
575 let mut metadata = iceberg_field.metadata().clone();
577
578 if let Some(mz_f) = mz_field {
580 if let Some(extension_name) = mz_f.metadata().get(ARROW_EXTENSION_NAME_KEY) {
581 metadata.insert(ARROW_EXTENSION_NAME_KEY.to_string(), extension_name.clone());
582 }
583 }
584
585 let new_data_type = match iceberg_field.data_type() {
587 DataType::Struct(iceberg_fields) => {
588 let mz_struct_fields = match mz_field {
589 Some(f) => match f.data_type() {
590 DataType::Struct(fields) => Some(fields),
591 other => anyhow::bail!(
592 "Type mismatch for field '{}': Iceberg schema has Struct, but Materialize schema has {:?}",
593 iceberg_field.name(),
594 other
595 ),
596 },
597 None => None,
598 };
599
600 let new_fields: Vec<Field> = iceberg_fields
601 .iter()
602 .map(|iceberg_inner| {
603 let mz_inner = mz_struct_fields.and_then(|fields| {
604 fields.iter().find(|f| f.name() == iceberg_inner.name())
605 });
606 merge_field_metadata_recursive(iceberg_inner, mz_inner.map(|f| f.as_ref()))
607 })
608 .collect::<anyhow::Result<Vec<_>>>()?;
609
610 DataType::Struct(new_fields.into())
611 }
612 DataType::List(iceberg_element) => {
613 let mz_element = match mz_field {
614 Some(f) => match f.data_type() {
615 DataType::List(element) => Some(element.as_ref()),
616 other => anyhow::bail!(
617 "Type mismatch for field '{}': Iceberg schema has List, but Materialize schema has {:?}",
618 iceberg_field.name(),
619 other
620 ),
621 },
622 None => None,
623 };
624 let new_element = merge_field_metadata_recursive(iceberg_element, mz_element)?;
625 DataType::List(Arc::new(new_element))
626 }
627 DataType::LargeList(iceberg_element) => {
628 let mz_element = match mz_field {
629 Some(f) => match f.data_type() {
630 DataType::LargeList(element) => Some(element.as_ref()),
631 other => anyhow::bail!(
632 "Type mismatch for field '{}': Iceberg schema has LargeList, but Materialize schema has {:?}",
633 iceberg_field.name(),
634 other
635 ),
636 },
637 None => None,
638 };
639 let new_element = merge_field_metadata_recursive(iceberg_element, mz_element)?;
640 DataType::LargeList(Arc::new(new_element))
641 }
642 DataType::Map(iceberg_entries, sorted) => {
643 let mz_entries = match mz_field {
644 Some(f) => match f.data_type() {
645 DataType::Map(entries, _) => Some(entries.as_ref()),
646 other => anyhow::bail!(
647 "Type mismatch for field '{}': Iceberg schema has Map, but Materialize schema has {:?}",
648 iceberg_field.name(),
649 other
650 ),
651 },
652 None => None,
653 };
654 let new_entries = match mz_entries {
659 Some(mz_entries) => merge_map_entries_metadata(iceberg_entries, mz_entries)?,
660 None => iceberg_entries.as_ref().clone(),
661 };
662 DataType::Map(Arc::new(new_entries), *sorted)
663 }
664 other => other.clone(),
665 };
666
667 Ok(Field::new(
668 iceberg_field.name(),
669 new_data_type,
670 iceberg_field.is_nullable(),
671 )
672 .with_metadata(metadata))
673}
674
675fn merge_map_entries_metadata(
697 iceberg_entries: &Field,
698 mz_entries: &Field,
699) -> anyhow::Result<Field> {
700 let mut metadata = iceberg_entries.metadata().clone();
701 if let Some(extension_name) = mz_entries.metadata().get(ARROW_EXTENSION_NAME_KEY) {
702 metadata.insert(ARROW_EXTENSION_NAME_KEY.to_string(), extension_name.clone());
703 }
704
705 let iceberg_fields = match iceberg_entries.data_type() {
706 DataType::Struct(fields) => fields,
707 other => anyhow::bail!(
708 "Iceberg map entries field '{}' is not a Struct: {:?}",
709 iceberg_entries.name(),
710 other
711 ),
712 };
713 let mz_fields = match mz_entries.data_type() {
714 DataType::Struct(fields) => fields,
715 other => anyhow::bail!(
716 "Materialize map entries field '{}' is not a Struct: {:?}",
717 mz_entries.name(),
718 other
719 ),
720 };
721
722 let new_fields: Vec<Field> = iceberg_fields
723 .iter()
724 .enumerate()
725 .map(|(idx, iceberg_inner)| {
726 let mz_inner = mz_fields.get(idx).map(|f| f.as_ref());
727 merge_field_metadata_recursive(iceberg_inner, mz_inner)
728 })
729 .collect::<anyhow::Result<Vec<_>>>()?;
730
731 Ok(Field::new(
732 iceberg_entries.name(),
733 DataType::Struct(new_fields.into()),
734 iceberg_entries.is_nullable(),
735 )
736 .with_metadata(metadata))
737}
738
739async fn reload_table(
740 catalog: &dyn Catalog,
741 namespace: String,
742 table_name: String,
743 current_table: Table,
744) -> anyhow::Result<Table> {
745 let namespace_ident = NamespaceIdent::new(namespace.clone());
746 let table_ident = TableIdent::new(namespace_ident, table_name.clone());
747 let current_schema = current_table.metadata().current_schema_id();
748 let current_partition_spec = current_table.metadata().default_partition_spec_id();
749
750 match catalog.load_table(&table_ident).await {
751 Ok(table) => {
752 let reloaded_schema = table.metadata().current_schema_id();
753 let reloaded_partition_spec = table.metadata().default_partition_spec_id();
754 if reloaded_schema != current_schema {
755 return Err(anyhow::anyhow!(
756 "Iceberg table '{}' schema changed during operation but schema evolution isn't supported, expected schema ID {}, got {}",
757 table_name,
758 current_schema,
759 reloaded_schema
760 ));
761 }
762
763 if reloaded_partition_spec != current_partition_spec {
764 return Err(anyhow::anyhow!(
765 "Iceberg table '{}' partition spec changed during operation but partition spec evolution isn't supported, expected partition spec ID {}, got {}",
766 table_name,
767 current_partition_spec,
768 reloaded_partition_spec
769 ));
770 }
771
772 Ok(table)
773 }
774 Err(err) => Err(err).context("Failed to reload Iceberg table"),
775 }
776}
777
778async fn try_commit_batch(
782 mut table: Table,
783 snapshot_properties: Vec<(String, String)>,
784 data_files: Vec<DataFile>,
785 delete_files: Vec<DataFile>,
786 catalog: &dyn Catalog,
787 conn_namespace: &str,
788 conn_table: &str,
789 sink_version: u64,
790 frontier: &Antichain<Timestamp>,
791 batch_lower: &Antichain<Timestamp>,
792 batch_upper: &Antichain<Timestamp>,
793 metrics: &IcebergSinkMetrics,
794) -> (Table, RetryResult<(), anyhow::Error>) {
795 let tx = Transaction::new(&table);
796 let mut action = tx
797 .row_delta()
798 .set_snapshot_properties(snapshot_properties.into_iter().collect())
799 .with_check_duplicate(false);
800
801 if !data_files.is_empty() || !delete_files.is_empty() {
802 action = action
803 .add_data_files(data_files)
804 .add_delete_files(delete_files);
805 }
806
807 let tx = match action
808 .apply(tx)
809 .context("Failed to apply data file addition to iceberg table transaction")
810 {
811 Ok(tx) => tx,
812 Err(e) => {
813 match reload_table(
814 catalog,
815 conn_namespace.to_string(),
816 conn_table.to_string(),
817 table.clone(),
818 )
819 .await
820 {
821 Ok(reloaded) => table = reloaded,
822 Err(reload_err) => {
823 return (table, RetryResult::RetryableErr(anyhow!(reload_err)));
824 }
825 }
826 return (
827 table,
828 RetryResult::RetryableErr(anyhow!(
829 "Failed to apply data file addition to iceberg table transaction: {}",
830 e
831 )),
832 );
833 }
834 };
835
836 let new_table = tx.commit(catalog).await;
837 match new_table {
838 Err(e) if matches!(e.kind(), ErrorKind::CatalogCommitConflicts) => {
839 metrics.commit_conflicts.inc();
840 match reload_table(
841 catalog,
842 conn_namespace.to_string(),
843 conn_table.to_string(),
844 table.clone(),
845 )
846 .await
847 {
848 Ok(reloaded) => table = reloaded,
849 Err(e) => {
850 return (table, RetryResult::RetryableErr(anyhow!(e)));
851 }
852 };
853
854 let mut snapshots: Vec<_> = table.metadata().snapshots().cloned().collect();
855 let last = retrieve_upper_from_snapshots(&mut snapshots);
856 let last = match last {
857 Ok(val) => val,
858 Err(e) => {
859 return (table, RetryResult::RetryableErr(anyhow!(e)));
860 }
861 };
862
863 if let Some((last_frontier, last_version)) = last {
865 if last_version > sink_version {
866 return (
867 table,
868 RetryResult::FatalErr(anyhow!(
869 "Iceberg table '{}' has been modified by another writer \
870 with version {}. Current sink version: {}. \
871 Frontiers may be out of sync, aborting to avoid data loss.",
872 conn_table,
873 last_version,
874 sink_version,
875 )),
876 );
877 }
878 if PartialOrder::less_equal(frontier, &last_frontier) {
879 return (
880 table,
881 RetryResult::FatalErr(anyhow!(
882 "Iceberg table '{}' has been modified by another writer. \
883 Current frontier: {:?}, last frontier: {:?}.",
884 conn_table,
885 frontier,
886 last_frontier,
887 )),
888 );
889 }
890 }
891
892 (
893 table,
894 RetryResult::RetryableErr(anyhow!(
895 "Commit conflict detected when committing batch [{}, {}) \
896 to Iceberg table '{}.{}'. Retrying...",
897 batch_lower.pretty(),
898 batch_upper.pretty(),
899 conn_namespace,
900 conn_table
901 )),
902 )
903 }
904 Err(e) => {
905 metrics.commit_failures.inc();
906 (table, RetryResult::RetryableErr(anyhow!(e)))
907 }
908 Ok(new_table) => (new_table, RetryResult::Ok(())),
909 }
910}
911
912async fn load_or_create_table(
914 catalog: &dyn Catalog,
915 namespace: String,
916 table_name: String,
917 schema: &Schema,
918) -> anyhow::Result<iceberg::table::Table> {
919 let namespace_ident = NamespaceIdent::new(namespace.clone());
920 let table_ident = TableIdent::new(namespace_ident.clone(), table_name.clone());
921
922 match catalog.load_table(&table_ident).await {
924 Ok(table) => {
925 Ok(table)
928 }
929 Err(err) => {
930 if matches!(err.kind(), ErrorKind::TableNotFound { .. })
931 || err
932 .message()
933 .contains("Tried to load a table that does not exist")
934 {
935 let table_creation = TableCreation::builder()
939 .name(table_name.clone())
940 .schema(schema.clone())
941 .build();
945
946 catalog
947 .create_table(&namespace_ident, table_creation)
948 .await
949 .with_context(|| {
950 format!(
951 "Failed to create Iceberg table '{}' in namespace '{}'",
952 table_name, namespace
953 )
954 })
955 } else {
956 Err(err).context("Failed to load Iceberg table")
958 }
959 }
960 }
961}
962
963fn retrieve_upper_from_snapshots(
968 snapshots: &mut [Arc<Snapshot>],
969) -> anyhow::Result<Option<(Antichain<Timestamp>, u64)>> {
970 snapshots.sort_by(|a, b| Ord::cmp(&b.sequence_number(), &a.sequence_number()));
971
972 for snapshot in snapshots {
973 let props = &snapshot.summary().additional_properties;
974 if let (Some(frontier_json), Some(sink_version_str)) =
975 (props.get("mz-frontier"), props.get("mz-sink-version"))
976 {
977 let frontier: Vec<Timestamp> = serde_json::from_str(frontier_json)
978 .context("Failed to deserialize frontier from snapshot properties")?;
979 let frontier = Antichain::from_iter(frontier);
980
981 let sink_version = sink_version_str
982 .parse::<u64>()
983 .context("Failed to parse mz-sink-version from snapshot properties")?;
984
985 return Ok(Some((frontier, sink_version)));
986 }
987 if snapshot.summary().operation.as_str() != "replace" {
988 anyhow::bail!(
993 "Iceberg table is in an inconsistent state: snapshot {} has operation '{}' but is missing 'mz-frontier' property. Schema or partition spec evolution is not supported.",
994 snapshot.snapshot_id(),
995 snapshot.summary().operation.as_str(),
996 );
997 }
998 }
999
1000 Ok(None)
1001}
1002
1003fn relation_desc_to_iceberg_schema(
1013 desc: &mz_repr::RelationDesc,
1014) -> anyhow::Result<(ArrowSchema, SchemaRef)> {
1015 let arrow_schema =
1016 mz_arrow_util::builder::desc_to_schema_with_overrides(desc, iceberg_type_overrides)
1017 .context("Failed to convert RelationDesc to Iceberg-compatible Arrow schema")?;
1018
1019 let arrow_schema_with_ids = add_field_ids_to_arrow_schema(arrow_schema);
1020
1021 let iceberg_schema = arrow_schema_to_schema(&arrow_schema_with_ids)
1022 .context("Failed to convert Arrow schema to Iceberg schema")?;
1023
1024 Ok((arrow_schema_with_ids, Arc::new(iceberg_schema)))
1025}
1026
1027fn equality_ids_for_indices(
1032 current_schema: &Schema,
1033 materialize_arrow_schema: &ArrowSchema,
1034 equality_indices: &[usize],
1035) -> anyhow::Result<Vec<i32>> {
1036 let top_level_fields = current_schema.as_struct();
1037
1038 equality_indices
1039 .iter()
1040 .map(|index| {
1041 let mz_field = materialize_arrow_schema
1042 .fields()
1043 .get(*index)
1044 .with_context(|| format!("Equality delete key index {index} is out of bounds"))?;
1045 let field_name = mz_field.name();
1046 let iceberg_field = top_level_fields
1047 .field_by_name(field_name)
1048 .with_context(|| {
1049 format!(
1050 "Equality delete key column '{}' not found in Iceberg table schema",
1051 field_name
1052 )
1053 })?;
1054 Ok(iceberg_field.id)
1055 })
1056 .collect()
1057}
1058
1059fn build_schema_with_op_column(schema: &ArrowSchema) -> ArrowSchema {
1061 let mut fields: Vec<Arc<Field>> = schema.fields().iter().cloned().collect();
1062 fields.push(Arc::new(Field::new("__op", DataType::Int32, false)));
1063 ArrowSchema::new(fields)
1064}
1065
1066#[allow(clippy::disallowed_types)]
1071fn build_schema_with_append_columns(schema: &ArrowSchema) -> ArrowSchema {
1072 use mz_storage_types::sinks::{ICEBERG_APPEND_DIFF_COLUMN, ICEBERG_APPEND_TIMESTAMP_COLUMN};
1073 let mut fields: Vec<Arc<Field>> = schema.fields().iter().cloned().collect();
1074 fields.push(Arc::new(Field::new(
1075 ICEBERG_APPEND_DIFF_COLUMN,
1076 DataType::Int32,
1077 false,
1078 )));
1079 fields.push(Arc::new(Field::new(
1080 ICEBERG_APPEND_TIMESTAMP_COLUMN,
1081 DataType::Int64,
1082 false,
1083 )));
1084
1085 add_field_ids_to_arrow_schema(ArrowSchema::new(fields).with_metadata(schema.metadata().clone()))
1086}
1087
1088fn mint_batch_descriptions<'scope, D>(
1093 name: String,
1094 sink_id: GlobalId,
1095 input: VecCollection<'scope, Timestamp, D, Diff>,
1096 sink: &StorageSinkDesc<CollectionMetadata, Timestamp>,
1097 connection: IcebergSinkConnection,
1098 storage_configuration: StorageConfiguration,
1099 initial_schema: SchemaRef,
1100) -> (
1101 VecCollection<'scope, Timestamp, D, Diff>,
1102 StreamVec<'scope, Timestamp, (Antichain<Timestamp>, Antichain<Timestamp>)>,
1103 StreamVec<'scope, Timestamp, Infallible>,
1104 StreamVec<'scope, Timestamp, HealthStatusMessage>,
1105 PressOnDropButton,
1106)
1107where
1108 D: Clone + 'static,
1109{
1110 let scope = input.scope();
1111 let name_for_error = name.clone();
1112 let name_for_logging = name.clone();
1113 let mut builder = OperatorBuilder::new(name, scope.clone());
1114 let sink_version = sink.version;
1115
1116 let hashed_id = sink_id.hashed();
1117 let is_active_worker = usize::cast_from(hashed_id) % scope.peers() == scope.index();
1118 let (_, table_ready_stream) = builder.new_output::<CapacityContainerBuilder<Vec<_>>>();
1119 let (output, output_stream) = builder.new_output();
1120 let (batch_desc_output, batch_desc_stream) =
1121 builder.new_output::<CapacityContainerBuilder<Vec<_>>>();
1122 let mut input =
1123 builder.new_input_for_many(input.inner, Pipeline, [&output, &batch_desc_output]);
1124
1125 let as_of = sink.as_of.clone();
1126 let commit_interval = sink
1127 .commit_interval
1128 .expect("the planner should have enforced this")
1129 .clone();
1130
1131 let (button, errors): (_, StreamVec<'scope, Timestamp, Rc<anyhow::Error>>) =
1132 builder.build_fallible(move |caps| {
1133 Box::pin(async move {
1134 let [table_ready_capset, data_capset, capset]: &mut [_; 3] = caps.try_into().unwrap();
1135 *data_capset = CapabilitySet::new();
1136
1137 if !is_active_worker {
1138 *capset = CapabilitySet::new();
1139 *data_capset = CapabilitySet::new();
1140 *table_ready_capset = CapabilitySet::new();
1141 while let Some(event) = input.next().await {
1142 match event {
1143 Event::Data([output_cap, _], mut data) => {
1144 output.give_container(&output_cap, &mut data);
1145 }
1146 Event::Progress(_) => {}
1147 }
1148 }
1149 return Ok(());
1150 }
1151
1152 let catalog = connection
1153 .catalog_connection
1154 .connect(&storage_configuration, InTask::Yes)
1155 .await
1156 .with_context(|| {
1157 format!(
1158 "Failed to connect to Iceberg catalog '{}' for table '{}.{}'",
1159 connection.catalog_connection.uri, connection.namespace, connection.table
1160 )
1161 })?;
1162
1163 let table = load_or_create_table(
1164 catalog.as_ref(),
1165 connection.namespace.clone(),
1166 connection.table.clone(),
1167 initial_schema.as_ref(),
1168 )
1169 .await?;
1170 debug!(
1171 ?sink_id,
1172 %name_for_logging,
1173 namespace = %connection.namespace,
1174 table = %connection.table,
1175 "iceberg mint loaded/created table"
1176 );
1177
1178 *table_ready_capset = CapabilitySet::new();
1179
1180 let mut snapshots: Vec<_> = table.metadata().snapshots().cloned().collect();
1181 let resume = retrieve_upper_from_snapshots(&mut snapshots)?;
1182 let (resume_upper, resume_version) = match resume {
1183 Some((f, v)) => (f, v),
1184 None => (Antichain::from_elem(Timestamp::minimum()), 0),
1185 };
1186 debug!(
1187 ?sink_id,
1188 %name_for_logging,
1189 resume_upper = %resume_upper.pretty(),
1190 resume_version,
1191 as_of = %as_of.pretty(),
1192 "iceberg mint resume position loaded"
1193 );
1194
1195 let overcompacted =
1197 *resume_upper != [Timestamp::minimum()] &&
1199 PartialOrder::less_than(&resume_upper, &as_of);
1201
1202 if overcompacted {
1203 let err = format!(
1204 "{name_for_error}: input compacted past resume upper: as_of {}, resume_upper: {}",
1205 as_of.pretty(),
1206 resume_upper.pretty()
1207 );
1208 return Err(anyhow::anyhow!("{err}"));
1212 };
1213
1214 if resume_version > sink_version {
1215 anyhow::bail!("Fenced off by newer sink version: resume_version {}, sink_version {}", resume_version, sink_version);
1216 }
1217
1218 let mut initialized = false;
1219 let mut observed_frontier;
1220 let mut max_seen_ts: Option<Timestamp> = None;
1221 let mut minted_batches = VecDeque::new();
1226 loop {
1227 if let Some(event) = input.next().await {
1228 match event {
1229 Event::Data([output_cap, _], mut data) => {
1230 if !initialized {
1231 for (_, ts, _) in data.iter() {
1232 match max_seen_ts.as_mut() {
1233 Some(max) => {
1234 if max.less_than(ts) {
1235 *max = ts.clone();
1236 }
1237 }
1238 None => {
1239 max_seen_ts = Some(ts.clone());
1240 }
1241 }
1242 }
1243 }
1244 output.give_container(&output_cap, &mut data);
1245 continue;
1246 }
1247 Event::Progress(frontier) => {
1248 observed_frontier = frontier;
1249 }
1250 }
1251 } else {
1252 return Ok(());
1253 }
1254
1255 if !initialized {
1256 if observed_frontier.is_empty() {
1257 if let Some(max_ts) = max_seen_ts.as_ref() {
1263 let synthesized_upper =
1264 Antichain::from_elem(max_ts.step_forward());
1265 debug!(
1266 ?sink_id,
1267 %name_for_logging,
1268 max_seen_ts = %max_ts,
1269 synthesized_upper = %synthesized_upper.pretty(),
1270 "iceberg mint input closed before initialization; using max seen ts"
1271 );
1272 observed_frontier = synthesized_upper;
1273 } else {
1274 debug!(
1275 ?sink_id,
1276 %name_for_logging,
1277 "iceberg mint input closed before initialization with no data"
1278 );
1279 return Ok(());
1281 }
1282 }
1283
1284 if PartialOrder::less_than(&observed_frontier, &resume_upper)
1287 || PartialOrder::less_than(&observed_frontier, &as_of)
1288 {
1289 continue;
1290 }
1291
1292 let mut batch_descriptions = vec![];
1293 let mut current_upper = observed_frontier.clone();
1294 let current_upper_ts = current_upper.as_option().expect("frontier not empty").clone();
1295
1296 let batch_lower = if PartialOrder::less_than(&resume_upper, &as_of) {
1301 as_of.clone()
1302 } else {
1303 resume_upper.clone()
1304 };
1305
1306 if batch_lower == current_upper {
1307 current_upper = Antichain::from_elem(current_upper_ts.step_forward());
1310 }
1311
1312 let batch_description = (batch_lower.clone(), current_upper.clone());
1313 debug!(
1314 ?sink_id,
1315 %name_for_logging,
1316 batch_lower = %batch_lower.pretty(),
1317 current_upper = %current_upper.pretty(),
1318 "iceberg mint initializing (catch-up batch)"
1319 );
1320 debug!(
1321 "{}: creating catch-up batch [{}, {})",
1322 name_for_logging,
1323 batch_lower.pretty(),
1324 current_upper.pretty()
1325 );
1326 batch_descriptions.push(batch_description);
1327 for i in 1..INITIAL_DESCRIPTIONS_TO_MINT {
1329 let duration_millis = commit_interval.as_millis()
1330 .checked_mul(u128::from(i))
1331 .expect("commit interval multiplication overflow");
1332 let duration_ts = Timestamp::new(
1333 u64::try_from(duration_millis)
1334 .expect("commit interval too large for u64"),
1335 );
1336 let desired_batch_upper = Antichain::from_elem(
1337 current_upper_ts.step_forward_by(&duration_ts),
1338 );
1339
1340 let batch_description =
1341 (current_upper.clone(), desired_batch_upper.clone());
1342 debug!(
1343 "{}: minting future batch {}/{} [{}, {})",
1344 name_for_logging,
1345 i,
1346 INITIAL_DESCRIPTIONS_TO_MINT,
1347 current_upper.pretty(),
1348 desired_batch_upper.pretty()
1349 );
1350 current_upper = batch_description.1.clone();
1351 batch_descriptions.push(batch_description);
1352 }
1353
1354 minted_batches.extend(batch_descriptions.clone());
1355
1356 for desc in batch_descriptions {
1357 batch_desc_output.give(&capset[0], desc);
1358 }
1359
1360 capset.downgrade(current_upper);
1361
1362 initialized = true;
1363 } else {
1364 if observed_frontier.is_empty() {
1365 return Ok(());
1367 }
1368 while let Some(oldest_desc) = minted_batches.front() {
1371 let oldest_upper = &oldest_desc.1;
1372 if !PartialOrder::less_equal(oldest_upper, &observed_frontier) {
1373 break;
1374 }
1375
1376 let newest_upper = minted_batches.back().unwrap().1.clone();
1377 let new_lower = newest_upper.clone();
1378 let duration_ts = Timestamp::new(commit_interval.as_millis()
1379 .try_into()
1380 .expect("commit interval too large for u64"));
1381 let new_upper = Antichain::from_elem(newest_upper
1382 .as_option()
1383 .unwrap()
1384 .step_forward_by(&duration_ts));
1385
1386 let new_batch_description = (new_lower.clone(), new_upper.clone());
1387 minted_batches.pop_front();
1388 minted_batches.push_back(new_batch_description.clone());
1389
1390 batch_desc_output.give(&capset[0], new_batch_description);
1391
1392 capset.downgrade(new_upper);
1393 }
1394 }
1395 }
1396 })
1397 });
1398
1399 let statuses = errors.map(|error| HealthStatusMessage {
1400 id: None,
1401 update: HealthStatusUpdate::halting(format!("{}", error.display_with_causes()), None),
1402 namespace: StatusNamespace::Iceberg,
1403 });
1404 (
1405 output_stream.as_collection(),
1406 batch_desc_stream,
1407 table_ready_stream,
1408 statuses,
1409 button.press_on_drop(),
1410 )
1411}
1412
1413#[derive(Clone, Debug, Serialize, Deserialize)]
1414#[serde(try_from = "AvroDataFile", into = "AvroDataFile")]
1415struct SerializableDataFile {
1416 pub data_file: DataFile,
1417 pub schema: Schema,
1418}
1419
1420#[derive(Clone, Debug, Serialize, Deserialize)]
1428struct AvroDataFile {
1429 pub data_file: Vec<u8>,
1430 pub schema: Vec<u8>,
1432}
1433
1434impl From<SerializableDataFile> for AvroDataFile {
1435 fn from(value: SerializableDataFile) -> Self {
1436 let mut data_file = Vec::new();
1437 write_data_files_to_avro(
1438 &mut data_file,
1439 [value.data_file],
1440 &StructType::new(vec![]),
1441 FormatVersion::V2,
1442 )
1443 .expect("serialization into buffer");
1444 let schema = serde_json::to_vec(&value.schema).expect("schema serialization");
1445 AvroDataFile { data_file, schema }
1446 }
1447}
1448
1449impl TryFrom<AvroDataFile> for SerializableDataFile {
1450 type Error = String;
1451
1452 fn try_from(value: AvroDataFile) -> Result<Self, Self::Error> {
1453 let schema: Schema = serde_json::from_slice(&value.schema)
1454 .map_err(|e| format!("Failed to deserialize schema: {}", e))?;
1455 let data_files = read_data_files_from_avro(
1456 &mut &*value.data_file,
1457 &schema,
1458 0,
1459 &StructType::new(vec![]),
1460 FormatVersion::V2,
1461 )
1462 .map_err_to_string_with_causes()?;
1463 let Some(data_file) = data_files.into_iter().next() else {
1464 return Err("No DataFile found in Avro data".into());
1465 };
1466 Ok(SerializableDataFile { data_file, schema })
1467 }
1468}
1469
1470#[derive(Clone, Debug, Serialize, Deserialize)]
1472struct BoundedDataFile {
1473 pub data_file: SerializableDataFile,
1474 pub batch_desc: (Antichain<Timestamp>, Antichain<Timestamp>),
1475}
1476
1477impl BoundedDataFile {
1478 pub fn new(
1479 file: DataFile,
1480 schema: Schema,
1481 batch_desc: (Antichain<Timestamp>, Antichain<Timestamp>),
1482 ) -> Self {
1483 Self {
1484 data_file: SerializableDataFile {
1485 data_file: file,
1486 schema,
1487 },
1488 batch_desc,
1489 }
1490 }
1491
1492 pub fn batch_desc(&self) -> &(Antichain<Timestamp>, Antichain<Timestamp>) {
1493 &self.batch_desc
1494 }
1495
1496 pub fn data_file(&self) -> &DataFile {
1497 &self.data_file.data_file
1498 }
1499
1500 pub fn into_data_file(self) -> DataFile {
1501 self.data_file.data_file
1502 }
1503}
1504
1505#[derive(Clone, Debug, Default)]
1507struct BoundedDataFileSet {
1508 pub data_files: Vec<BoundedDataFile>,
1509}
1510
1511fn write_data_files<'scope, H: EnvelopeHandler + 'static>(
1517 name: String,
1518 input: VecCollection<'scope, Timestamp, (Option<Row>, DiffPair<Row>), Diff>,
1519 batch_desc_input: StreamVec<'scope, Timestamp, (Antichain<Timestamp>, Antichain<Timestamp>)>,
1520 table_ready_stream: StreamVec<'scope, Timestamp, Infallible>,
1521 as_of: Antichain<Timestamp>,
1522 connection: IcebergSinkConnection,
1523 storage_configuration: StorageConfiguration,
1524 materialize_arrow_schema: Arc<ArrowSchema>,
1525 metrics: Arc<IcebergSinkMetrics>,
1526 statistics: SinkStatistics,
1527) -> (
1528 StreamVec<'scope, Timestamp, BoundedDataFile>,
1529 StreamVec<'scope, Timestamp, HealthStatusMessage>,
1530 PressOnDropButton,
1531) {
1532 let scope = input.scope();
1533 let name_for_logging = name.clone();
1534 let mut builder = OperatorBuilder::new(name, scope.clone());
1535
1536 let (output, output_stream) = builder.new_output::<CapacityContainerBuilder<_>>();
1537
1538 let mut table_ready_input = builder.new_disconnected_input(table_ready_stream, Pipeline);
1539 let mut batch_desc_input =
1540 builder.new_input_for(batch_desc_input.broadcast(), Pipeline, &output);
1541 let mut input = builder.new_disconnected_input(input.inner, Pipeline);
1542
1543 let (button, errors) = builder.build_fallible(move |caps| {
1544 Box::pin(async move {
1545 let [capset]: &mut [_; 1] = caps.try_into().unwrap();
1546 let catalog = connection
1547 .catalog_connection
1548 .connect(&storage_configuration, InTask::Yes)
1549 .await
1550 .with_context(|| {
1551 format!(
1552 "Failed to connect to Iceberg catalog '{}' for table '{}.{}'",
1553 connection.catalog_connection.uri, connection.namespace, connection.table
1554 )
1555 })?;
1556
1557 let namespace_ident = NamespaceIdent::new(connection.namespace.clone());
1558 let table_ident = TableIdent::new(namespace_ident, connection.table.clone());
1559 while let Some(_) = table_ready_input.next().await {
1560 }
1562 let table = catalog
1563 .load_table(&table_ident)
1564 .await
1565 .with_context(|| {
1566 format!(
1567 "Failed to load Iceberg table '{}.{}' in write_data_files operator",
1568 connection.namespace, connection.table
1569 )
1570 })?;
1571
1572 let table_metadata = table.metadata().clone();
1573 let current_schema = Arc::clone(table_metadata.current_schema());
1574
1575 let arrow_schema = Arc::new(
1579 merge_materialize_metadata_into_iceberg_schema(
1580 materialize_arrow_schema.as_ref(),
1581 current_schema.as_ref(),
1582 )
1583 .context("Failed to merge Materialize metadata into Iceberg schema")?,
1584 );
1585
1586 let location = table_metadata.location();
1590 let corrected_location = match location.rsplit_once("/metadata/") {
1591 Some((a, b)) if b.ends_with(".metadata.json") => a,
1592 _ => location,
1593 };
1594
1595 let data_location = format!("{}/data", corrected_location);
1596 let location_generator = DefaultLocationGenerator::with_data_location(data_location);
1597
1598 let unique_suffix = format!("-{}", uuid::Uuid::new_v4());
1600 let file_name_generator = DefaultFileNameGenerator::new(
1601 PARQUET_FILE_PREFIX.to_string(),
1602 Some(unique_suffix),
1603 iceberg::spec::DataFileFormat::Parquet,
1604 );
1605
1606 let file_io = table.file_io().clone();
1607
1608 let writer_properties = WriterProperties::new();
1609
1610 let ctx = WriterContext {
1611 arrow_schema,
1612 current_schema: Arc::clone(¤t_schema),
1613 file_io,
1614 location_generator,
1615 file_name_generator,
1616 writer_properties,
1617 };
1618 let handler = H::new(ctx, &connection, &materialize_arrow_schema)?;
1619
1620 let mut stashed_rows: BTreeMap<Timestamp, Vec<(Option<Row>, DiffPair<Row>)>> =
1623 BTreeMap::new();
1624
1625 #[allow(clippy::disallowed_types)]
1630 let mut in_flight_batches: std::collections::HashMap<
1631 (Antichain<Timestamp>, Antichain<Timestamp>),
1632 Box<dyn IcebergWriter>,
1633 > = std::collections::HashMap::new();
1634
1635 let mut batch_description_frontier = Antichain::from_elem(Timestamp::minimum());
1636 let mut processed_batch_description_frontier =
1637 Antichain::from_elem(Timestamp::minimum());
1638 let mut input_frontier = Antichain::from_elem(Timestamp::minimum());
1639 let mut processed_input_frontier = Antichain::from_elem(Timestamp::minimum());
1640
1641 let mut min_batch_lower: Option<Antichain<Timestamp>> = None;
1643
1644 while !(batch_description_frontier.is_empty() && input_frontier.is_empty()) {
1645 let mut staged_messages_since_flush: u64 = 0;
1646 tokio::select! {
1647 _ = batch_desc_input.ready() => {},
1648 _ = input.ready() => {}
1649 }
1650
1651 while let Some(event) = batch_desc_input.next_sync() {
1652 match event {
1653 Event::Data(_cap, data) => {
1654 for batch_desc in data {
1655 let (lower, upper) = &batch_desc;
1656
1657 if min_batch_lower.is_none() {
1659 min_batch_lower = Some(lower.clone());
1660 debug!(
1661 "{}: set min_batch_lower to {}",
1662 name_for_logging,
1663 lower.pretty()
1664 );
1665
1666 let to_remove: Vec<_> = stashed_rows
1668 .keys()
1669 .filter(|ts| {
1670 let ts_antichain = Antichain::from_elem((*ts).clone());
1671 PartialOrder::less_than(&ts_antichain, lower)
1672 })
1673 .cloned()
1674 .collect();
1675
1676 if !to_remove.is_empty() {
1677 let mut removed_count = 0;
1678 for ts in to_remove {
1679 if let Some(rows) = stashed_rows.remove(&ts) {
1680 removed_count += rows.len();
1681 for _ in &rows {
1682 metrics.stashed_rows.dec();
1683 }
1684 }
1685 }
1686 debug!(
1687 "{}: pruned {} already-committed rows (< min_batch_lower)",
1688 name_for_logging,
1689 removed_count
1690 );
1691 }
1692 }
1693
1694 let is_snapshot = lower == &as_of;
1696 debug!(
1697 "{}: received batch description [{}, {}), snapshot={}",
1698 name_for_logging,
1699 lower.pretty(),
1700 upper.pretty(),
1701 is_snapshot
1702 );
1703 let mut batch_writer =
1704 handler.create_writer(is_snapshot).await?;
1705 let row_ts_keys: Vec<_> = stashed_rows.keys().cloned().collect();
1707 let mut drained_count = 0;
1708 for row_ts in row_ts_keys {
1709 let ts = Antichain::from_elem(row_ts.clone());
1710 if PartialOrder::less_equal(lower, &ts)
1711 && PartialOrder::less_than(&ts, upper)
1712 {
1713 if let Some(rows) = stashed_rows.remove(&row_ts) {
1714 drained_count += rows.len();
1715 for (_row, diff_pair) in rows {
1716 metrics.stashed_rows.dec();
1717 let record_batch = handler.row_to_batch(
1718 diff_pair.clone(),
1719 row_ts.clone(),
1720 )
1721 .context("failed to convert row to recordbatch")?;
1722 batch_writer.write(record_batch).await?;
1723 staged_messages_since_flush += 1;
1724 if staged_messages_since_flush >= 10_000 {
1725 statistics.inc_messages_staged_by(
1726 staged_messages_since_flush,
1727 );
1728 staged_messages_since_flush = 0;
1729 }
1730 }
1731 }
1732 }
1733 }
1734 if drained_count > 0 {
1735 debug!(
1736 "{}: drained {} stashed rows into batch [{}, {})",
1737 name_for_logging,
1738 drained_count,
1739 lower.pretty(),
1740 upper.pretty()
1741 );
1742 }
1743 let prev =
1744 in_flight_batches.insert(batch_desc.clone(), batch_writer);
1745 if prev.is_some() {
1746 anyhow::bail!(
1747 "Duplicate batch description received for description {:?}",
1748 batch_desc
1749 );
1750 }
1751 }
1752 }
1753 Event::Progress(frontier) => {
1754 batch_description_frontier = frontier;
1755 }
1756 }
1757 }
1758
1759 let ready_events = std::iter::from_fn(|| input.next_sync()).collect_vec();
1760 for event in ready_events {
1761 match event {
1762 Event::Data(_cap, data) => {
1763 let mut dropped_per_time = BTreeMap::new();
1764 let mut stashed_per_time = BTreeMap::new();
1765 for ((row, diff_pair), ts, _diff) in data {
1766 let row_ts = ts.clone();
1767 let ts_antichain = Antichain::from_elem(row_ts.clone());
1768 let mut written = false;
1769 for (batch_desc, batch_writer) in in_flight_batches.iter_mut() {
1771 let (lower, upper) = batch_desc;
1772 if PartialOrder::less_equal(lower, &ts_antichain)
1773 && PartialOrder::less_than(&ts_antichain, upper)
1774 {
1775 let record_batch = handler.row_to_batch(
1776 diff_pair.clone(),
1777 row_ts.clone(),
1778 )
1779 .context("failed to convert row to recordbatch")?;
1780 batch_writer.write(record_batch).await?;
1781 staged_messages_since_flush += 1;
1782 if staged_messages_since_flush >= 10_000 {
1783 statistics.inc_messages_staged_by(
1784 staged_messages_since_flush,
1785 );
1786 staged_messages_since_flush = 0;
1787 }
1788 written = true;
1789 break;
1790 }
1791 }
1792 if !written {
1793 if let Some(ref min_lower) = min_batch_lower {
1795 if PartialOrder::less_than(&ts_antichain, min_lower) {
1796 dropped_per_time
1797 .entry(ts_antichain.into_option().unwrap())
1798 .and_modify(|c| *c += 1)
1799 .or_insert(1);
1800 continue;
1801 }
1802 }
1803
1804 stashed_per_time.entry(ts).and_modify(|c| *c += 1).or_insert(1);
1805 let entry = stashed_rows.entry(row_ts).or_default();
1806 metrics.stashed_rows.inc();
1807 entry.push((row, diff_pair));
1808 }
1809 }
1810
1811 for (ts, count) in dropped_per_time {
1812 debug!(
1813 "{}: dropped {} rows at timestamp {} (< min_batch_lower, already committed)",
1814 name_for_logging, count, ts
1815 );
1816 }
1817
1818 for (ts, count) in stashed_per_time {
1819 debug!(
1820 "{}: stashed {} rows at timestamp {} (waiting for batch description)",
1821 name_for_logging, count, ts
1822 );
1823 }
1824 }
1825 Event::Progress(frontier) => {
1826 input_frontier = frontier;
1827 }
1828 }
1829 }
1830 if staged_messages_since_flush > 0 {
1831 statistics.inc_messages_staged_by(staged_messages_since_flush);
1832 }
1833
1834 if PartialOrder::less_than(
1836 &processed_batch_description_frontier,
1837 &batch_description_frontier,
1838 ) || PartialOrder::less_than(&processed_input_frontier, &input_frontier)
1839 {
1840 let ready_batches: Vec<_> = in_flight_batches
1846 .extract_if(|(lower, upper), _| {
1847 PartialOrder::less_than(lower, &batch_description_frontier)
1848 && PartialOrder::less_equal(upper, &input_frontier)
1849 })
1850 .collect();
1851
1852 if !ready_batches.is_empty() {
1853 debug!(
1854 "{}: closing {} batches (batch_frontier: {}, input_frontier: {})",
1855 name_for_logging,
1856 ready_batches.len(),
1857 batch_description_frontier.pretty(),
1858 input_frontier.pretty()
1859 );
1860 let mut max_upper = Antichain::from_elem(Timestamp::minimum());
1861 for (desc, mut batch_writer) in ready_batches {
1862 let close_started_at = Instant::now();
1863 let data_files = batch_writer.close().await;
1864 metrics
1865 .writer_close_duration_seconds
1866 .observe(close_started_at.elapsed().as_secs_f64());
1867 let data_files = data_files.context("Failed to close batch writer")?;
1868 debug!(
1869 "{}: closed batch [{}, {}), wrote {} files",
1870 name_for_logging,
1871 desc.0.pretty(),
1872 desc.1.pretty(),
1873 data_files.len()
1874 );
1875 for data_file in data_files {
1876 match data_file.content_type() {
1877 iceberg::spec::DataContentType::Data => {
1878 metrics.data_files_written.inc();
1879 }
1880 iceberg::spec::DataContentType::PositionDeletes
1881 | iceberg::spec::DataContentType::EqualityDeletes => {
1882 metrics.delete_files_written.inc();
1883 }
1884 }
1885 statistics.inc_messages_staged_by(data_file.record_count());
1886 statistics.inc_bytes_staged_by(data_file.file_size_in_bytes());
1887 let file = BoundedDataFile::new(
1888 data_file,
1889 current_schema.as_ref().clone(),
1890 desc.clone(),
1891 );
1892 output.give(&capset[0], file);
1893 }
1894
1895 max_upper = max_upper.join(&desc.1);
1896 }
1897
1898 capset.downgrade(max_upper);
1899 }
1900 processed_batch_description_frontier.clone_from(&batch_description_frontier);
1901 processed_input_frontier.clone_from(&input_frontier);
1902 }
1903 }
1904 Ok(())
1905 })
1906 });
1907
1908 let statuses = errors.map(|error| HealthStatusMessage {
1909 id: None,
1910 update: HealthStatusUpdate::halting(format!("{}", error.display_with_causes()), None),
1911 namespace: StatusNamespace::Iceberg,
1912 });
1913 (output_stream, statuses, button.press_on_drop())
1914}
1915
1916#[cfg(test)]
1917mod tests {
1918 use super::*;
1919 use iceberg::spec::{PrimitiveType, Type};
1920 use mz_repr::SqlScalarType;
1921 use mz_storage_types::sinks::ICEBERG_UINT64_DECIMAL_PRECISION;
1922
1923 #[mz_ore::test]
1924 fn test_iceberg_type_overrides() {
1925 let result = iceberg_type_overrides(&SqlScalarType::UInt16);
1927 assert_eq!(result.unwrap().0, DataType::Int32);
1928
1929 let result = iceberg_type_overrides(&SqlScalarType::UInt32);
1931 assert_eq!(result.unwrap().0, DataType::Int64);
1932
1933 let result = iceberg_type_overrides(&SqlScalarType::UInt64);
1935 assert_eq!(
1936 result.unwrap().0,
1937 DataType::Decimal128(ICEBERG_UINT64_DECIMAL_PRECISION, 0)
1938 );
1939
1940 let result = iceberg_type_overrides(&SqlScalarType::MzTimestamp);
1942 assert_eq!(
1943 result.unwrap().0,
1944 DataType::Decimal128(ICEBERG_UINT64_DECIMAL_PRECISION, 0)
1945 );
1946
1947 assert!(iceberg_type_overrides(&SqlScalarType::Int32).is_none());
1949 assert!(iceberg_type_overrides(&SqlScalarType::String).is_none());
1950 assert!(iceberg_type_overrides(&SqlScalarType::Bool).is_none());
1951 }
1952
1953 #[mz_ore::test]
1954 fn test_iceberg_schema_with_nested_uint64() {
1955 let desc = mz_repr::RelationDesc::builder()
1958 .with_column(
1959 "items",
1960 SqlScalarType::List {
1961 element_type: Box::new(SqlScalarType::UInt64),
1962 custom_id: None,
1963 }
1964 .nullable(true),
1965 )
1966 .finish();
1967
1968 let schema =
1969 mz_arrow_util::builder::desc_to_schema_with_overrides(&desc, iceberg_type_overrides)
1970 .expect("schema conversion should succeed");
1971
1972 if let DataType::List(field) = schema.field(0).data_type() {
1974 assert_eq!(
1975 field.data_type(),
1976 &DataType::Decimal128(ICEBERG_UINT64_DECIMAL_PRECISION, 0)
1977 );
1978 } else {
1979 panic!("Expected List type");
1980 }
1981 }
1982
1983 #[mz_ore::test]
1984 fn test_iceberg_interval_override() {
1985 let result = iceberg_type_overrides(&SqlScalarType::Interval);
1987 assert_eq!(result.unwrap().0, DataType::LargeUtf8);
1988
1989 let desc = mz_repr::RelationDesc::builder()
1991 .with_column("id", SqlScalarType::Int32.nullable(false))
1992 .with_column("dur", SqlScalarType::Interval.nullable(true))
1993 .finish();
1994
1995 let (arrow_schema, iceberg_schema) =
1996 relation_desc_to_iceberg_schema(&desc).expect("schema conversion should succeed");
1997
1998 assert_eq!(arrow_schema.field(1).data_type(), &DataType::LargeUtf8);
2000
2001 let field = iceberg_schema
2003 .as_struct()
2004 .field_by_name("dur")
2005 .expect("field should exist");
2006 assert_eq!(*field.field_type, Type::Primitive(PrimitiveType::String));
2007 }
2008
2009 #[mz_ore::test]
2010 fn test_iceberg_range_schema() {
2011 let desc = mz_repr::RelationDesc::builder()
2013 .with_column("id", SqlScalarType::Int32.nullable(false))
2014 .with_column(
2015 "r",
2016 SqlScalarType::Range {
2017 element_type: Box::new(SqlScalarType::Int32),
2018 }
2019 .nullable(true),
2020 )
2021 .finish();
2022
2023 let (_arrow_schema, iceberg_schema) =
2024 relation_desc_to_iceberg_schema(&desc).expect("schema conversion should succeed");
2025
2026 let field = iceberg_schema
2028 .as_struct()
2029 .field_by_name("r")
2030 .expect("field should exist");
2031 assert!(
2032 matches!(&*field.field_type, Type::Struct(_)),
2033 "range should be struct, got: {:?}",
2034 field.field_type
2035 );
2036 }
2037
2038 #[mz_ore::test]
2039 fn equality_ids_follow_iceberg_field_ids() {
2040 let map_entries = Field::new(
2041 "entries",
2042 DataType::Struct(
2043 vec![
2044 Field::new("key", DataType::Utf8, false),
2045 Field::new("value", DataType::Utf8, true),
2046 ]
2047 .into(),
2048 ),
2049 false,
2050 );
2051 let materialize_arrow_schema = ArrowSchema::new(vec![
2052 Field::new("attrs", DataType::Map(Arc::new(map_entries), false), true),
2053 Field::new("key_col", DataType::Int32, false),
2054 ]);
2055 let materialize_arrow_schema = add_field_ids_to_arrow_schema(materialize_arrow_schema);
2056 let iceberg_schema = arrow_schema_to_schema(&materialize_arrow_schema)
2057 .expect("schema conversion should succeed");
2058
2059 let equality_ids =
2060 equality_ids_for_indices(&iceberg_schema, &materialize_arrow_schema, &[1])
2061 .expect("field lookup should succeed");
2062
2063 let expected_id = iceberg_schema
2064 .as_struct()
2065 .field_by_name("key_col")
2066 .expect("top-level field should exist")
2067 .id;
2068 assert_eq!(equality_ids, vec![expected_id]);
2069 assert_ne!(expected_id, 2);
2070 }
2071
2072 #[mz_ore::test]
2077 #[allow(clippy::disallowed_types)]
2078 fn merge_map_entries_preserves_value_extension_metadata() {
2079 use std::collections::HashMap;
2080
2081 let mz_value_metadata = HashMap::from([(
2082 ARROW_EXTENSION_NAME_KEY.to_string(),
2083 "materialize.v1.string".to_string(),
2084 )]);
2085 let mz_entries = Field::new(
2086 "entries",
2087 DataType::Struct(
2088 vec![
2089 Field::new("keys", DataType::Utf8, false),
2090 Field::new("values", DataType::Utf8, true).with_metadata(mz_value_metadata),
2091 ]
2092 .into(),
2093 ),
2094 false,
2095 );
2096 let mz_map = Field::new("m", DataType::Map(Arc::new(mz_entries), false), true)
2097 .with_metadata(HashMap::from([(
2098 ARROW_EXTENSION_NAME_KEY.to_string(),
2099 "materialize.v1.map".to_string(),
2100 )]));
2101
2102 let iceberg_entries = Field::new(
2103 "key_value",
2104 DataType::Struct(
2105 vec![
2106 Field::new("key", DataType::Utf8, false),
2107 Field::new("value", DataType::Utf8, true),
2108 ]
2109 .into(),
2110 ),
2111 false,
2112 );
2113 let iceberg_map = Field::new("m", DataType::Map(Arc::new(iceberg_entries), false), true);
2114
2115 let merged = merge_field_metadata_recursive(&iceberg_map, Some(&mz_map))
2116 .expect("merge should succeed");
2117
2118 let entries = match merged.data_type() {
2119 DataType::Map(entries, _) => entries.as_ref(),
2120 other => panic!("expected Map, got {other:?}"),
2121 };
2122 let entry_fields = match entries.data_type() {
2123 DataType::Struct(fields) => fields,
2124 other => panic!("expected Struct, got {other:?}"),
2125 };
2126 assert_eq!(entry_fields[0].name(), "key");
2128 assert_eq!(entry_fields[1].name(), "value");
2129 assert_eq!(
2132 entry_fields[1].metadata().get(ARROW_EXTENSION_NAME_KEY),
2133 Some(&"materialize.v1.string".to_string()),
2134 );
2135 }
2136}
2137
2138fn commit_to_iceberg<'scope>(
2142 name: String,
2143 sink_id: GlobalId,
2144 sink_version: u64,
2145 batch_input: StreamVec<'scope, Timestamp, BoundedDataFile>,
2146 batch_desc_input: StreamVec<'scope, Timestamp, (Antichain<Timestamp>, Antichain<Timestamp>)>,
2147 table_ready_stream: StreamVec<'scope, Timestamp, Infallible>,
2148 write_frontier: Rc<RefCell<Antichain<Timestamp>>>,
2149 connection: IcebergSinkConnection,
2150 storage_configuration: StorageConfiguration,
2151 write_handle: impl Future<
2152 Output = anyhow::Result<WriteHandle<SourceData, (), Timestamp, StorageDiff>>,
2153 > + 'static,
2154 metrics: Arc<IcebergSinkMetrics>,
2155 statistics: SinkStatistics,
2156) -> (
2157 StreamVec<'scope, Timestamp, HealthStatusMessage>,
2158 PressOnDropButton,
2159) {
2160 let scope = batch_input.scope();
2161 let mut builder = OperatorBuilder::new(name, scope.clone());
2162
2163 let hashed_id = sink_id.hashed();
2164 let is_active_worker = usize::cast_from(hashed_id) % scope.peers() == scope.index();
2165 let name_for_logging = format!("{sink_id}-commit-to-iceberg");
2166
2167 let mut input = builder.new_disconnected_input(batch_input, Exchange::new(move |_| hashed_id));
2168 let mut batch_desc_input =
2169 builder.new_disconnected_input(batch_desc_input, Exchange::new(move |_| hashed_id));
2170 let mut table_ready_input = builder.new_disconnected_input(table_ready_stream, Pipeline);
2171
2172 let (button, errors) = builder.build_fallible(move |_caps| {
2173 Box::pin(async move {
2174 if !is_active_worker {
2175 write_frontier.borrow_mut().clear();
2176 return Ok(());
2177 }
2178
2179 let catalog = connection
2180 .catalog_connection
2181 .connect(&storage_configuration, InTask::Yes)
2182 .await
2183 .with_context(|| {
2184 format!(
2185 "Failed to connect to Iceberg catalog '{}' for table '{}.{}'",
2186 connection.catalog_connection.uri, connection.namespace, connection.table
2187 )
2188 })?;
2189
2190 let mut write_handle = write_handle.await?;
2191
2192 let namespace_ident = NamespaceIdent::new(connection.namespace.clone());
2193 let table_ident = TableIdent::new(namespace_ident, connection.table.clone());
2194 while let Some(_) = table_ready_input.next().await {
2195 }
2197 let mut table = catalog.load_table(&table_ident).await.with_context(|| {
2198 format!(
2199 "Failed to load Iceberg table '{}.{}' in commit_to_iceberg operator",
2200 connection.namespace, connection.table
2201 )
2202 })?;
2203
2204 #[allow(clippy::disallowed_types)]
2205 let mut batch_descriptions: std::collections::HashMap<
2206 (Antichain<Timestamp>, Antichain<Timestamp>),
2207 BoundedDataFileSet,
2208 > = std::collections::HashMap::new();
2209
2210 let mut batch_description_frontier = Antichain::from_elem(Timestamp::minimum());
2211 let mut input_frontier = Antichain::from_elem(Timestamp::minimum());
2212
2213 while !(batch_description_frontier.is_empty() && input_frontier.is_empty()) {
2214 tokio::select! {
2215 _ = batch_desc_input.ready() => {},
2216 _ = input.ready() => {}
2217 }
2218
2219 while let Some(event) = batch_desc_input.next_sync() {
2220 match event {
2221 Event::Data(_cap, data) => {
2222 for batch_desc in data {
2223 let prev = batch_descriptions
2224 .insert(batch_desc, BoundedDataFileSet { data_files: vec![] });
2225 if let Some(prev) = prev {
2226 anyhow::bail!(
2227 "Duplicate batch description received \
2228 in commit operator: {:?}",
2229 prev
2230 );
2231 }
2232 }
2233 }
2234 Event::Progress(frontier) => {
2235 batch_description_frontier = frontier;
2236 }
2237 }
2238 }
2239
2240 let ready_events = std::iter::from_fn(|| input.next_sync()).collect_vec();
2241 for event in ready_events {
2242 match event {
2243 Event::Data(_cap, data) => {
2244 for bounded_data_file in data {
2245 let entry = batch_descriptions
2246 .entry(bounded_data_file.batch_desc().clone())
2247 .or_default();
2248 entry.data_files.push(bounded_data_file);
2249 }
2250 }
2251 Event::Progress(frontier) => {
2252 input_frontier = frontier;
2253 }
2254 }
2255 }
2256
2257 let mut done_batches: Vec<_> = batch_descriptions
2263 .keys()
2264 .filter(|(lower, _upper)| PartialOrder::less_than(lower, &input_frontier))
2265 .cloned()
2266 .collect();
2267
2268 done_batches.sort_by(|a, b| {
2270 if PartialOrder::less_than(a, b) {
2271 Ordering::Less
2272 } else if PartialOrder::less_than(b, a) {
2273 Ordering::Greater
2274 } else {
2275 Ordering::Equal
2276 }
2277 });
2278
2279 for batch in done_batches {
2280 let file_set = batch_descriptions.remove(&batch).unwrap();
2281
2282 let mut data_files = vec![];
2283 let mut delete_files = vec![];
2284 let mut total_messages: u64 = 0;
2286 let mut total_bytes: u64 = 0;
2287 for file in file_set.data_files {
2288 total_messages += file.data_file().record_count();
2289 total_bytes += file.data_file().file_size_in_bytes();
2290 match file.data_file().content_type() {
2291 iceberg::spec::DataContentType::Data => {
2292 data_files.push(file.into_data_file());
2293 }
2294 iceberg::spec::DataContentType::PositionDeletes
2295 | iceberg::spec::DataContentType::EqualityDeletes => {
2296 delete_files.push(file.into_data_file());
2297 }
2298 }
2299 }
2300
2301 debug!(
2302 ?sink_id,
2303 %name_for_logging,
2304 lower = %batch.0.pretty(),
2305 upper = %batch.1.pretty(),
2306 data_files = data_files.len(),
2307 delete_files = delete_files.len(),
2308 total_messages,
2309 total_bytes,
2310 "iceberg commit applying batch"
2311 );
2312
2313 let instant = Instant::now();
2314
2315 let frontier = batch.1.clone();
2316 let frontier_json = serde_json::to_string(&frontier.elements())
2317 .context("Failed to serialize frontier to JSON")?;
2318 let snapshot_properties = vec![
2319 ("mz-sink-id".to_string(), sink_id.to_string()),
2320 ("mz-frontier".to_string(), frontier_json),
2321 ("mz-sink-version".to_string(), sink_version.to_string()),
2322 ];
2323
2324 let (table_state, commit_result) = Retry::default()
2325 .max_tries(5)
2326 .retry_async_with_state(table, |_, table| {
2327 let snapshot_properties = snapshot_properties.clone();
2328 let data_files = data_files.clone();
2329 let delete_files = delete_files.clone();
2330 let metrics = Arc::clone(&metrics);
2331 let catalog = Arc::clone(&catalog);
2332 let conn_namespace = connection.namespace.clone();
2333 let conn_table = connection.table.clone();
2334 let frontier = frontier.clone();
2335 let batch_lower = batch.0.clone();
2336 let batch_upper = batch.1.clone();
2337 async move {
2338 try_commit_batch(
2339 table,
2340 snapshot_properties,
2341 data_files,
2342 delete_files,
2343 catalog.as_ref(),
2344 &conn_namespace,
2345 &conn_table,
2346 sink_version,
2347 &frontier,
2348 &batch_lower,
2349 &batch_upper,
2350 &metrics,
2351 )
2352 .await
2353 }
2354 })
2355 .await;
2356 let commit_result = commit_result.with_context(|| {
2357 format!(
2358 "failed to commit batch to Iceberg table '{}.{}'",
2359 connection.namespace, connection.table
2360 )
2361 });
2362 table = table_state;
2363 let duration = instant.elapsed();
2364 metrics
2365 .commit_duration_seconds
2366 .observe(duration.as_secs_f64());
2367 commit_result?;
2368
2369 debug!(
2370 ?sink_id,
2371 %name_for_logging,
2372 lower = %batch.0.pretty(),
2373 upper = %batch.1.pretty(),
2374 total_messages,
2375 total_bytes,
2376 ?duration,
2377 "iceberg commit applied batch"
2378 );
2379
2380 metrics.snapshots_committed.inc();
2381 statistics.inc_messages_committed_by(total_messages);
2382 statistics.inc_bytes_committed_by(total_bytes);
2383
2384 let mut expect_upper = write_handle.shared_upper();
2385 loop {
2386 if PartialOrder::less_equal(&frontier, &expect_upper) {
2387 break;
2389 }
2390
2391 const EMPTY: &[((SourceData, ()), Timestamp, StorageDiff)] = &[];
2392 match write_handle
2393 .compare_and_append(EMPTY, expect_upper, frontier.clone())
2394 .await
2395 .expect("valid usage")
2396 {
2397 Ok(()) => break,
2398 Err(mismatch) => {
2399 expect_upper = mismatch.current;
2400 }
2401 }
2402 }
2403 write_frontier.borrow_mut().clone_from(&frontier);
2404 }
2405 }
2406
2407 Ok(())
2408 })
2409 });
2410
2411 let statuses = errors.map(|error| HealthStatusMessage {
2412 id: None,
2413 update: HealthStatusUpdate::halting(format!("{}", error.display_with_causes()), None),
2414 namespace: StatusNamespace::Iceberg,
2415 });
2416
2417 (statuses, button.press_on_drop())
2418}
2419
2420impl<'scope> SinkRender<'scope> for IcebergSinkConnection {
2421 fn get_key_indices(&self) -> Option<&[usize]> {
2422 self.key_desc_and_indices
2423 .as_ref()
2424 .map(|(_, indices)| indices.as_slice())
2425 }
2426
2427 fn get_relation_key_indices(&self) -> Option<&[usize]> {
2428 self.relation_key_indices.as_deref()
2429 }
2430
2431 fn render_sink(
2432 &self,
2433 storage_state: &mut StorageState,
2434 sink: &StorageSinkDesc<CollectionMetadata, Timestamp>,
2435 sink_id: GlobalId,
2436 batches: SinkBatchStream<'scope>,
2437 key_is_synthetic: bool,
2438 _err_collection: VecCollection<'scope, Timestamp, DataflowError, Diff>,
2439 ) -> (
2440 StreamVec<'scope, Timestamp, HealthStatusMessage>,
2441 Vec<PressOnDropButton>,
2442 ) {
2443 let scope = batches.scope();
2444
2445 let (input, walker_button) = walk_sink_arrangement(
2446 format!("{sink_id}-iceberg-walker"),
2447 batches,
2448 sink_id,
2449 sink.from,
2450 key_is_synthetic,
2451 );
2452
2453 let write_handle = {
2454 let persist = Arc::clone(&storage_state.persist_clients);
2455 let shard_meta = sink.to_storage_metadata.clone();
2456 async move {
2457 let client = persist.open(shard_meta.persist_location).await?;
2458 let handle = client
2459 .open_writer(
2460 shard_meta.data_shard,
2461 Arc::new(shard_meta.relation_desc),
2462 Arc::new(UnitSchema),
2463 Diagnostics::from_purpose("sink handle"),
2464 )
2465 .await?;
2466 Ok(handle)
2467 }
2468 };
2469
2470 let write_frontier = Rc::new(RefCell::new(Antichain::from_elem(Timestamp::minimum())));
2471 storage_state
2472 .sink_write_frontiers
2473 .insert(sink_id, Rc::clone(&write_frontier));
2474
2475 let (arrow_schema_with_ids, iceberg_schema) =
2476 match (|| -> Result<(ArrowSchema, Arc<Schema>), anyhow::Error> {
2477 let (arrow_schema_with_ids, iceberg_schema) =
2478 relation_desc_to_iceberg_schema(&sink.from_desc)?;
2479
2480 Ok(if sink.envelope == SinkEnvelope::Append {
2481 let extended_arrow = build_schema_with_append_columns(&arrow_schema_with_ids);
2486 let extended_iceberg = Arc::new(
2487 arrow_schema_to_schema(&extended_arrow)
2488 .context("Failed to build Iceberg schema with append columns")?,
2489 );
2490 (extended_arrow, extended_iceberg)
2491 } else {
2492 (arrow_schema_with_ids, iceberg_schema)
2493 })
2494 })() {
2495 Ok(schemas) => schemas,
2496 Err(err) => {
2497 let error_stream = std::iter::once(HealthStatusMessage {
2498 id: None,
2499 update: HealthStatusUpdate::halting(
2500 format!("{}", err.display_with_causes()),
2501 None,
2502 ),
2503 namespace: StatusNamespace::Iceberg,
2504 })
2505 .to_stream(scope);
2506 return (error_stream, vec![]);
2507 }
2508 };
2509
2510 let metrics = Arc::new(
2511 storage_state
2512 .metrics
2513 .get_iceberg_sink_metrics(sink_id, scope.index()),
2514 );
2515
2516 let statistics = storage_state
2517 .aggregated_statistics
2518 .get_sink(&sink_id)
2519 .expect("statistics initialized")
2520 .clone();
2521
2522 let connection_for_minter = self.clone();
2523 let (minted_input, batch_descriptions, table_ready, mint_status, mint_button) =
2524 mint_batch_descriptions(
2525 format!("{sink_id}-iceberg-mint"),
2526 sink_id,
2527 input,
2528 sink,
2529 connection_for_minter,
2530 storage_state.storage_configuration.clone(),
2531 Arc::clone(&iceberg_schema),
2532 );
2533
2534 let connection_for_writer = self.clone();
2535 let (datafiles, write_status, write_button) = match sink.envelope {
2536 SinkEnvelope::Upsert => write_data_files::<UpsertEnvelopeHandler>(
2537 format!("{sink_id}-write-data-files"),
2538 minted_input,
2539 batch_descriptions.clone(),
2540 table_ready.clone(),
2541 sink.as_of.clone(),
2542 connection_for_writer,
2543 storage_state.storage_configuration.clone(),
2544 Arc::new(arrow_schema_with_ids.clone()),
2545 Arc::clone(&metrics),
2546 statistics.clone(),
2547 ),
2548 SinkEnvelope::Append => write_data_files::<AppendEnvelopeHandler>(
2549 format!("{sink_id}-write-data-files"),
2550 minted_input,
2551 batch_descriptions.clone(),
2552 table_ready.clone(),
2553 sink.as_of.clone(),
2554 connection_for_writer,
2555 storage_state.storage_configuration.clone(),
2556 Arc::new(arrow_schema_with_ids.clone()),
2557 Arc::clone(&metrics),
2558 statistics.clone(),
2559 ),
2560 SinkEnvelope::Debezium => {
2561 unreachable!("Iceberg sink only supports Upsert and Append envelopes")
2562 }
2563 };
2564
2565 let connection_for_committer = self.clone();
2566 let (commit_status, commit_button) = commit_to_iceberg(
2567 format!("{sink_id}-commit-to-iceberg"),
2568 sink_id,
2569 sink.version,
2570 datafiles,
2571 batch_descriptions,
2572 table_ready,
2573 Rc::clone(&write_frontier),
2574 connection_for_committer,
2575 storage_state.storage_configuration.clone(),
2576 write_handle,
2577 Arc::clone(&metrics),
2578 statistics,
2579 );
2580
2581 let running_status = Some(HealthStatusMessage {
2582 id: None,
2583 update: HealthStatusUpdate::running(),
2584 namespace: StatusNamespace::Iceberg,
2585 })
2586 .to_stream(scope);
2587
2588 let statuses =
2589 scope.concatenate([running_status, mint_status, write_status, commit_status]);
2590
2591 (
2592 statuses,
2593 vec![walker_button, mint_button, write_button, commit_button],
2594 )
2595 }
2596}
2597
2598fn walk_sink_arrangement<'scope>(
2605 name: String,
2606 batches: SinkBatchStream<'scope>,
2607 sink_id: GlobalId,
2608 from_id: GlobalId,
2609 key_is_synthetic: bool,
2610) -> (
2611 VecCollection<'scope, Timestamp, (Option<Row>, DiffPair<Row>), Diff>,
2612 PressOnDropButton,
2613) {
2614 let mut builder = OperatorBuilder::new(name, batches.scope());
2615 let (output, stream) = builder.new_output::<CapacityContainerBuilder<Vec<_>>>();
2616 let mut input = builder.new_input_for(batches, Pipeline, &output);
2617
2618 let button = builder.build(move |_caps| async move {
2619 let mut pk_warner = (!key_is_synthetic).then(|| PkViolationWarner::new(sink_id, from_id));
2620
2621 while let Some(event) = input.next().await {
2622 if let Event::Data(cap, mut batches) = event {
2623 for batch in batches.drain(..) {
2624 for_each_diff_pair(&batch, |key, time, diff_pair| {
2625 if let Some(warner) = pk_warner.as_mut() {
2626 warner.observe(key, time);
2627 }
2628 output.give(&cap, ((None, diff_pair), time, Diff::ONE));
2632 });
2633 if let Some(warner) = pk_warner.as_mut() {
2637 warner.flush();
2638 }
2639 }
2640 }
2641 }
2642 });
2643
2644 (stream.as_collection(), button.press_on_drop())
2645}