1use std::cmp::Ordering;
86use std::collections::{BTreeMap, VecDeque};
87use std::convert::Infallible;
88use std::future::Future;
89use std::time::Instant;
90use std::{cell::RefCell, rc::Rc, sync::Arc};
91
92use anyhow::{Context, anyhow};
93use arrow::array::{ArrayRef, Int32Array, Int64Array, RecordBatch};
94use arrow::datatypes::{DataType, Field, Schema as ArrowSchema};
95use differential_dataflow::lattice::Lattice;
96use differential_dataflow::{AsCollection, Hashable, VecCollection};
97use futures::StreamExt;
98use iceberg::ErrorKind;
99use iceberg::arrow::{arrow_schema_to_schema, schema_to_arrow_schema};
100use iceberg::spec::{
101 DataFile, FormatVersion, Snapshot, StructType, read_data_files_from_avro,
102 write_data_files_to_avro,
103};
104use iceberg::spec::{Schema, SchemaRef};
105use iceberg::table::Table;
106use iceberg::transaction::{ApplyTransactionAction, Transaction};
107use iceberg::writer::base_writer::data_file_writer::DataFileWriterBuilder;
108use iceberg::writer::base_writer::equality_delete_writer::{
109 EqualityDeleteFileWriterBuilder, EqualityDeleteWriterConfig,
110};
111use iceberg::writer::base_writer::position_delete_writer::{
112 PositionDeleteFileWriterBuilder, PositionDeleteWriterConfig,
113};
114use iceberg::writer::combined_writer::delta_writer::DeltaWriterBuilder;
115use iceberg::writer::file_writer::ParquetWriterBuilder;
116use iceberg::writer::file_writer::location_generator::{
117 DefaultFileNameGenerator, DefaultLocationGenerator,
118};
119use iceberg::writer::file_writer::rolling_writer::RollingFileWriterBuilder;
120use iceberg::writer::{IcebergWriter, IcebergWriterBuilder};
121use iceberg::{Catalog, NamespaceIdent, TableCreation, TableIdent};
122use itertools::Itertools;
123use mz_arrow_util::builder::{ARROW_EXTENSION_NAME_KEY, ArrowBuilder};
124use mz_interchange::avro::DiffPair;
125use mz_interchange::envelopes::for_each_diff_pair;
126use mz_ore::cast::CastFrom;
127use mz_ore::error::ErrorExt;
128use mz_ore::future::InTask;
129use mz_ore::result::ResultExt;
130use mz_ore::retry::{Retry, RetryResult};
131use mz_persist_client::Diagnostics;
132use mz_persist_client::write::WriteHandle;
133use mz_persist_types::codec_impls::UnitSchema;
134use mz_repr::{Diff, GlobalId, Row, Timestamp};
135use mz_storage_types::StorageDiff;
136use mz_storage_types::configuration::StorageConfiguration;
137use mz_storage_types::controller::CollectionMetadata;
138use mz_storage_types::errors::DataflowError;
139use mz_storage_types::sinks::{
140 IcebergSinkConnection, SinkEnvelope, StorageSinkDesc, iceberg_type_overrides,
141};
142use mz_storage_types::sources::SourceData;
143use mz_timely_util::antichain::AntichainExt;
144use mz_timely_util::builder_async::{Event, OperatorBuilder, PressOnDropButton};
145use parquet::arrow::PARQUET_FIELD_ID_META_KEY;
146use parquet::file::properties::WriterProperties;
147use serde::{Deserialize, Serialize};
148use timely::PartialOrder;
149use timely::container::CapacityContainerBuilder;
150use timely::dataflow::StreamVec;
151use timely::dataflow::channels::pact::{Exchange, Pipeline};
152use timely::dataflow::operators::vec::{Broadcast, Map, ToStream};
153use timely::dataflow::operators::{CapabilitySet, Concatenate};
154use timely::progress::{Antichain, Timestamp as _};
155use tracing::debug;
156
157use crate::healthcheck::{HealthStatusMessage, HealthStatusUpdate, StatusNamespace};
158use crate::metrics::sink::iceberg::IcebergSinkMetrics;
159use crate::render::sinks::{PkViolationWarner, SinkBatchStream, SinkRender};
160use crate::statistics::SinkStatistics;
161use crate::storage_state::StorageState;
162
163const DEFAULT_ARRAY_BUILDER_ITEM_CAPACITY: usize = 1024;
166const DEFAULT_ARRAY_BUILDER_DATA_CAPACITY: usize = 1024;
170
171const PARQUET_FILE_PREFIX: &str = "mz_data";
173const INITIAL_DESCRIPTIONS_TO_MINT: u64 = 3;
176
177struct WriterContext {
180 arrow_schema: Arc<ArrowSchema>,
182 current_schema: Arc<Schema>,
184 file_io: iceberg::io::FileIO,
186 location_generator: DefaultLocationGenerator,
188 file_name_generator: DefaultFileNameGenerator,
190 writer_properties: WriterProperties,
191}
192
193trait EnvelopeHandler: Send {
195 fn new(
197 ctx: WriterContext,
198 connection: &IcebergSinkConnection,
199 materialize_arrow_schema: &Arc<ArrowSchema>,
200 ) -> anyhow::Result<Self>
201 where
202 Self: Sized;
203
204 async fn create_writer(&self, is_snapshot: bool) -> anyhow::Result<Box<dyn IcebergWriter>>;
210
211 fn row_to_batch(&self, diff_pair: DiffPair<Row>, ts: Timestamp) -> anyhow::Result<RecordBatch>;
212}
213
214struct UpsertEnvelopeHandler {
215 ctx: WriterContext,
216 equality_ids: Vec<i32>,
218 pos_schema: Arc<Schema>,
220 eq_schema: Arc<Schema>,
222 eq_config: EqualityDeleteWriterConfig,
224 schema_with_op: Arc<ArrowSchema>,
228}
229
230impl EnvelopeHandler for UpsertEnvelopeHandler {
231 fn new(
232 ctx: WriterContext,
233 connection: &IcebergSinkConnection,
234 materialize_arrow_schema: &Arc<ArrowSchema>,
235 ) -> anyhow::Result<Self> {
236 let Some((_, equality_indices)) = &connection.key_desc_and_indices else {
237 return Err(anyhow::anyhow!(
238 "Iceberg sink requires key columns for equality deletes"
239 ));
240 };
241
242 let equality_ids = equality_ids_for_indices(
243 ctx.current_schema.as_ref(),
244 materialize_arrow_schema.as_ref(),
245 equality_indices,
246 )?;
247
248 let pos_arrow_schema = PositionDeleteWriterConfig::arrow_schema();
249 let pos_schema = Arc::new(
250 arrow_schema_to_schema(&pos_arrow_schema)
251 .context("Failed to convert position delete Arrow schema to Iceberg schema")?,
252 );
253
254 let eq_config =
255 EqualityDeleteWriterConfig::new(equality_ids.clone(), Arc::clone(&ctx.current_schema))
256 .context("Failed to create EqualityDeleteWriterConfig")?;
257 let eq_schema = Arc::new(
258 arrow_schema_to_schema(eq_config.projected_arrow_schema_ref())
259 .context("Failed to convert equality delete Arrow schema to Iceberg schema")?,
260 );
261
262 let schema_with_op = Arc::new(build_schema_with_op_column(&ctx.arrow_schema));
263
264 Ok(Self {
265 ctx,
266 equality_ids,
267 pos_schema,
268 eq_schema,
269 eq_config,
270 schema_with_op,
271 })
272 }
273
274 async fn create_writer(&self, is_snapshot: bool) -> anyhow::Result<Box<dyn IcebergWriter>> {
275 let data_parquet_writer = ParquetWriterBuilder::new(
276 self.ctx.writer_properties.clone(),
277 Arc::clone(&self.ctx.current_schema),
278 )
279 .with_arrow_schema(Arc::clone(&self.ctx.arrow_schema))
280 .context("Arrow schema validation failed")?;
281 let data_rolling_writer = RollingFileWriterBuilder::new_with_default_file_size(
282 data_parquet_writer,
283 Arc::clone(&self.ctx.current_schema),
284 self.ctx.file_io.clone(),
285 self.ctx.location_generator.clone(),
286 self.ctx.file_name_generator.clone(),
287 );
288 let data_writer_builder = DataFileWriterBuilder::new(data_rolling_writer);
289
290 let pos_config = PositionDeleteWriterConfig::new(None, 0, None);
291 let pos_parquet_writer = ParquetWriterBuilder::new(
292 self.ctx.writer_properties.clone(),
293 Arc::clone(&self.pos_schema),
294 );
295 let pos_rolling_writer = RollingFileWriterBuilder::new_with_default_file_size(
296 pos_parquet_writer,
297 Arc::clone(&self.ctx.current_schema),
298 self.ctx.file_io.clone(),
299 self.ctx.location_generator.clone(),
300 self.ctx.file_name_generator.clone(),
301 );
302 let pos_delete_writer_builder =
303 PositionDeleteFileWriterBuilder::new(pos_rolling_writer, pos_config);
304
305 let eq_parquet_writer = ParquetWriterBuilder::new(
306 self.ctx.writer_properties.clone(),
307 Arc::clone(&self.eq_schema),
308 );
309 let eq_rolling_writer = RollingFileWriterBuilder::new_with_default_file_size(
310 eq_parquet_writer,
311 Arc::clone(&self.ctx.current_schema),
312 self.ctx.file_io.clone(),
313 self.ctx.location_generator.clone(),
314 self.ctx.file_name_generator.clone(),
315 );
316 let eq_delete_writer_builder =
317 EqualityDeleteFileWriterBuilder::new(eq_rolling_writer, self.eq_config.clone());
318
319 let mut builder = DeltaWriterBuilder::new(
320 data_writer_builder,
321 pos_delete_writer_builder,
322 eq_delete_writer_builder,
323 self.equality_ids.clone(),
324 );
325
326 builder = if is_snapshot {
327 builder.with_max_seen_rows(0)
329 } else {
330 builder.with_max_seen_rows(usize::MAX)
342 };
343
344 Ok(Box::new(
345 builder
346 .build(None)
347 .await
348 .context("Failed to create DeltaWriter")?,
349 ))
350 }
351
352 fn row_to_batch(
355 &self,
356 diff_pair: DiffPair<Row>,
357 _ts: Timestamp,
358 ) -> anyhow::Result<RecordBatch> {
359 let mut builder = ArrowBuilder::new_with_schema(
360 Arc::clone(&self.ctx.arrow_schema),
361 DEFAULT_ARRAY_BUILDER_ITEM_CAPACITY,
362 DEFAULT_ARRAY_BUILDER_DATA_CAPACITY,
363 )
364 .context("Failed to create builder")?;
365
366 let mut op_values = Vec::new();
367
368 if let Some(before) = diff_pair.before {
369 builder
370 .add_row(&before)
371 .context("Failed to add delete row to builder")?;
372 op_values.push(-1i32);
373 }
374 if let Some(after) = diff_pair.after {
375 builder
376 .add_row(&after)
377 .context("Failed to add insert row to builder")?;
378 op_values.push(1i32);
379 }
380
381 let batch = builder
382 .to_record_batch()
383 .context("Failed to create record batch")?;
384
385 let mut columns: Vec<ArrayRef> = batch.columns().to_vec();
386 columns.push(Arc::new(Int32Array::from(op_values)));
387
388 RecordBatch::try_new(Arc::clone(&self.schema_with_op), columns)
389 .context("Failed to create batch with op column")
390 }
391}
392
393struct AppendEnvelopeHandler {
394 ctx: WriterContext,
395 user_schema_for_append: Arc<ArrowSchema>,
398}
399
400impl EnvelopeHandler for AppendEnvelopeHandler {
401 fn new(
402 ctx: WriterContext,
403 _connection: &IcebergSinkConnection,
404 _materialize_arrow_schema: &Arc<ArrowSchema>,
405 ) -> anyhow::Result<Self> {
406 let n = ctx.arrow_schema.fields().len().saturating_sub(2);
409 let user_schema_for_append =
410 Arc::new(ArrowSchema::new(ctx.arrow_schema.fields()[..n].to_vec()));
411
412 Ok(Self {
413 ctx,
414 user_schema_for_append,
415 })
416 }
417
418 async fn create_writer(&self, _is_snapshot: bool) -> anyhow::Result<Box<dyn IcebergWriter>> {
419 let data_parquet_writer = ParquetWriterBuilder::new(
420 self.ctx.writer_properties.clone(),
421 Arc::clone(&self.ctx.current_schema),
422 )
423 .with_arrow_schema(Arc::clone(&self.ctx.arrow_schema))
424 .context("Arrow schema validation failed")?;
425 let data_rolling_writer = RollingFileWriterBuilder::new_with_default_file_size(
426 data_parquet_writer,
427 Arc::clone(&self.ctx.current_schema),
428 self.ctx.file_io.clone(),
429 self.ctx.location_generator.clone(),
430 self.ctx.file_name_generator.clone(),
431 );
432 Ok(Box::new(
433 DataFileWriterBuilder::new(data_rolling_writer)
434 .build(None)
435 .await
436 .context("Failed to create DataFileWriter")?,
437 ))
438 }
439
440 fn row_to_batch(&self, diff_pair: DiffPair<Row>, ts: Timestamp) -> anyhow::Result<RecordBatch> {
443 let mut builder = ArrowBuilder::new_with_schema(
444 Arc::clone(&self.user_schema_for_append),
445 DEFAULT_ARRAY_BUILDER_ITEM_CAPACITY,
446 DEFAULT_ARRAY_BUILDER_DATA_CAPACITY,
447 )
448 .context("Failed to create builder")?;
449
450 let mut diff_values: Vec<i32> = Vec::new();
451 let ts_i64 = i64::try_from(u64::from(ts)).unwrap_or(i64::MAX);
452
453 if let Some(before) = diff_pair.before {
454 builder
455 .add_row(&before)
456 .context("Failed to add before row to builder")?;
457 diff_values.push(-1i32);
458 }
459 if let Some(after) = diff_pair.after {
460 builder
461 .add_row(&after)
462 .context("Failed to add after row to builder")?;
463 diff_values.push(1i32);
464 }
465
466 let n = diff_values.len();
467 let batch = builder
468 .to_record_batch()
469 .context("Failed to create record batch")?;
470
471 let mut columns: Vec<ArrayRef> = batch.columns().to_vec();
472 columns.push(Arc::new(Int32Array::from(diff_values)));
473 columns.push(Arc::new(Int64Array::from(vec![ts_i64; n])));
474
475 RecordBatch::try_new(Arc::clone(&self.ctx.arrow_schema), columns)
476 .context("Failed to create append record batch")
477 }
478}
479
480fn add_field_ids_to_arrow_schema(schema: ArrowSchema) -> ArrowSchema {
485 let mut next_field_id = 1i32;
486 let fields: Vec<Field> = schema
487 .fields()
488 .iter()
489 .map(|field| add_field_ids_recursive(field, &mut next_field_id))
490 .collect();
491 ArrowSchema::new(fields).with_metadata(schema.metadata().clone())
492}
493
494fn add_field_ids_recursive(field: &Field, next_id: &mut i32) -> Field {
496 let current_id = *next_id;
497 *next_id += 1;
498
499 let mut metadata = field.metadata().clone();
500 metadata.insert(
501 PARQUET_FIELD_ID_META_KEY.to_string(),
502 current_id.to_string(),
503 );
504
505 let new_data_type = add_field_ids_to_datatype(field.data_type(), next_id);
506
507 Field::new(field.name(), new_data_type, field.is_nullable()).with_metadata(metadata)
508}
509
510fn add_field_ids_to_datatype(data_type: &DataType, next_id: &mut i32) -> DataType {
512 match data_type {
513 DataType::Struct(fields) => {
514 let new_fields: Vec<Field> = fields
515 .iter()
516 .map(|f| add_field_ids_recursive(f, next_id))
517 .collect();
518 DataType::Struct(new_fields.into())
519 }
520 DataType::List(element_field) => {
521 let new_element = add_field_ids_recursive(element_field, next_id);
522 DataType::List(Arc::new(new_element))
523 }
524 DataType::LargeList(element_field) => {
525 let new_element = add_field_ids_recursive(element_field, next_id);
526 DataType::LargeList(Arc::new(new_element))
527 }
528 DataType::Map(entries_field, sorted) => {
529 let new_entries = add_field_ids_recursive(entries_field, next_id);
530 DataType::Map(Arc::new(new_entries), *sorted)
531 }
532 _ => data_type.clone(),
533 }
534}
535
536fn merge_materialize_metadata_into_iceberg_schema(
541 materialize_arrow_schema: &ArrowSchema,
542 iceberg_schema: &Schema,
543) -> anyhow::Result<ArrowSchema> {
544 let iceberg_arrow_schema = schema_to_arrow_schema(iceberg_schema)
546 .context("Failed to convert Iceberg schema to Arrow schema")?;
547
548 let fields: Vec<Field> = iceberg_arrow_schema
550 .fields()
551 .iter()
552 .map(|iceberg_field| {
553 let mz_field = materialize_arrow_schema
555 .field_with_name(iceberg_field.name())
556 .with_context(|| {
557 format!(
558 "Field '{}' not found in Materialize schema",
559 iceberg_field.name()
560 )
561 })?;
562
563 merge_field_metadata_recursive(iceberg_field, Some(mz_field))
564 })
565 .collect::<anyhow::Result<Vec<_>>>()?;
566
567 Ok(ArrowSchema::new(fields).with_metadata(iceberg_arrow_schema.metadata().clone()))
568}
569
570fn merge_field_metadata_recursive(
572 iceberg_field: &Field,
573 mz_field: Option<&Field>,
574) -> anyhow::Result<Field> {
575 let mut metadata = iceberg_field.metadata().clone();
577
578 if let Some(mz_f) = mz_field {
580 if let Some(extension_name) = mz_f.metadata().get(ARROW_EXTENSION_NAME_KEY) {
581 metadata.insert(ARROW_EXTENSION_NAME_KEY.to_string(), extension_name.clone());
582 }
583 }
584
585 let new_data_type = match iceberg_field.data_type() {
587 DataType::Struct(iceberg_fields) => {
588 let mz_struct_fields = match mz_field {
589 Some(f) => match f.data_type() {
590 DataType::Struct(fields) => Some(fields),
591 other => anyhow::bail!(
592 "Type mismatch for field '{}': Iceberg schema has Struct, but Materialize schema has {:?}",
593 iceberg_field.name(),
594 other
595 ),
596 },
597 None => None,
598 };
599
600 let new_fields: Vec<Field> = iceberg_fields
601 .iter()
602 .map(|iceberg_inner| {
603 let mz_inner = mz_struct_fields.and_then(|fields| {
604 fields.iter().find(|f| f.name() == iceberg_inner.name())
605 });
606 merge_field_metadata_recursive(iceberg_inner, mz_inner.map(|f| f.as_ref()))
607 })
608 .collect::<anyhow::Result<Vec<_>>>()?;
609
610 DataType::Struct(new_fields.into())
611 }
612 DataType::List(iceberg_element) => {
613 let mz_element = match mz_field {
614 Some(f) => match f.data_type() {
615 DataType::List(element) => Some(element.as_ref()),
616 other => anyhow::bail!(
617 "Type mismatch for field '{}': Iceberg schema has List, but Materialize schema has {:?}",
618 iceberg_field.name(),
619 other
620 ),
621 },
622 None => None,
623 };
624 let new_element = merge_field_metadata_recursive(iceberg_element, mz_element)?;
625 DataType::List(Arc::new(new_element))
626 }
627 DataType::LargeList(iceberg_element) => {
628 let mz_element = match mz_field {
629 Some(f) => match f.data_type() {
630 DataType::LargeList(element) => Some(element.as_ref()),
631 other => anyhow::bail!(
632 "Type mismatch for field '{}': Iceberg schema has LargeList, but Materialize schema has {:?}",
633 iceberg_field.name(),
634 other
635 ),
636 },
637 None => None,
638 };
639 let new_element = merge_field_metadata_recursive(iceberg_element, mz_element)?;
640 DataType::LargeList(Arc::new(new_element))
641 }
642 DataType::Map(iceberg_entries, sorted) => {
643 let mz_entries = match mz_field {
644 Some(f) => match f.data_type() {
645 DataType::Map(entries, _) => Some(entries.as_ref()),
646 other => anyhow::bail!(
647 "Type mismatch for field '{}': Iceberg schema has Map, but Materialize schema has {:?}",
648 iceberg_field.name(),
649 other
650 ),
651 },
652 None => None,
653 };
654 let new_entries = merge_field_metadata_recursive(iceberg_entries, mz_entries)?;
655 DataType::Map(Arc::new(new_entries), *sorted)
656 }
657 other => other.clone(),
658 };
659
660 Ok(Field::new(
661 iceberg_field.name(),
662 new_data_type,
663 iceberg_field.is_nullable(),
664 )
665 .with_metadata(metadata))
666}
667
668async fn reload_table(
669 catalog: &dyn Catalog,
670 namespace: String,
671 table_name: String,
672 current_table: Table,
673) -> anyhow::Result<Table> {
674 let namespace_ident = NamespaceIdent::new(namespace.clone());
675 let table_ident = TableIdent::new(namespace_ident, table_name.clone());
676 let current_schema = current_table.metadata().current_schema_id();
677 let current_partition_spec = current_table.metadata().default_partition_spec_id();
678
679 match catalog.load_table(&table_ident).await {
680 Ok(table) => {
681 let reloaded_schema = table.metadata().current_schema_id();
682 let reloaded_partition_spec = table.metadata().default_partition_spec_id();
683 if reloaded_schema != current_schema {
684 return Err(anyhow::anyhow!(
685 "Iceberg table '{}' schema changed during operation but schema evolution isn't supported, expected schema ID {}, got {}",
686 table_name,
687 current_schema,
688 reloaded_schema
689 ));
690 }
691
692 if reloaded_partition_spec != current_partition_spec {
693 return Err(anyhow::anyhow!(
694 "Iceberg table '{}' partition spec changed during operation but partition spec evolution isn't supported, expected partition spec ID {}, got {}",
695 table_name,
696 current_partition_spec,
697 reloaded_partition_spec
698 ));
699 }
700
701 Ok(table)
702 }
703 Err(err) => Err(err).context("Failed to reload Iceberg table"),
704 }
705}
706
707async fn try_commit_batch(
711 mut table: Table,
712 snapshot_properties: Vec<(String, String)>,
713 data_files: Vec<DataFile>,
714 delete_files: Vec<DataFile>,
715 catalog: &dyn Catalog,
716 conn_namespace: &str,
717 conn_table: &str,
718 sink_version: u64,
719 frontier: &Antichain<Timestamp>,
720 batch_lower: &Antichain<Timestamp>,
721 batch_upper: &Antichain<Timestamp>,
722 metrics: &IcebergSinkMetrics,
723) -> (Table, RetryResult<(), anyhow::Error>) {
724 let tx = Transaction::new(&table);
725 let mut action = tx
726 .row_delta()
727 .set_snapshot_properties(snapshot_properties.into_iter().collect())
728 .with_check_duplicate(false);
729
730 if !data_files.is_empty() || !delete_files.is_empty() {
731 action = action
732 .add_data_files(data_files)
733 .add_delete_files(delete_files);
734 }
735
736 let tx = match action
737 .apply(tx)
738 .context("Failed to apply data file addition to iceberg table transaction")
739 {
740 Ok(tx) => tx,
741 Err(e) => {
742 match reload_table(
743 catalog,
744 conn_namespace.to_string(),
745 conn_table.to_string(),
746 table.clone(),
747 )
748 .await
749 {
750 Ok(reloaded) => table = reloaded,
751 Err(reload_err) => {
752 return (table, RetryResult::RetryableErr(anyhow!(reload_err)));
753 }
754 }
755 return (
756 table,
757 RetryResult::RetryableErr(anyhow!(
758 "Failed to apply data file addition to iceberg table transaction: {}",
759 e
760 )),
761 );
762 }
763 };
764
765 let new_table = tx.commit(catalog).await;
766 match new_table {
767 Err(e) if matches!(e.kind(), ErrorKind::CatalogCommitConflicts) => {
768 metrics.commit_conflicts.inc();
769 match reload_table(
770 catalog,
771 conn_namespace.to_string(),
772 conn_table.to_string(),
773 table.clone(),
774 )
775 .await
776 {
777 Ok(reloaded) => table = reloaded,
778 Err(e) => {
779 return (table, RetryResult::RetryableErr(anyhow!(e)));
780 }
781 };
782
783 let mut snapshots: Vec<_> = table.metadata().snapshots().cloned().collect();
784 let last = retrieve_upper_from_snapshots(&mut snapshots);
785 let last = match last {
786 Ok(val) => val,
787 Err(e) => {
788 return (table, RetryResult::RetryableErr(anyhow!(e)));
789 }
790 };
791
792 if let Some((last_frontier, last_version)) = last {
794 if last_version > sink_version {
795 return (
796 table,
797 RetryResult::FatalErr(anyhow!(
798 "Iceberg table '{}' has been modified by another writer \
799 with version {}. Current sink version: {}. \
800 Frontiers may be out of sync, aborting to avoid data loss.",
801 conn_table,
802 last_version,
803 sink_version,
804 )),
805 );
806 }
807 if PartialOrder::less_equal(frontier, &last_frontier) {
808 return (
809 table,
810 RetryResult::FatalErr(anyhow!(
811 "Iceberg table '{}' has been modified by another writer. \
812 Current frontier: {:?}, last frontier: {:?}.",
813 conn_table,
814 frontier,
815 last_frontier,
816 )),
817 );
818 }
819 }
820
821 (
822 table,
823 RetryResult::RetryableErr(anyhow!(
824 "Commit conflict detected when committing batch [{}, {}) \
825 to Iceberg table '{}.{}'. Retrying...",
826 batch_lower.pretty(),
827 batch_upper.pretty(),
828 conn_namespace,
829 conn_table
830 )),
831 )
832 }
833 Err(e) => {
834 metrics.commit_failures.inc();
835 (table, RetryResult::RetryableErr(anyhow!(e)))
836 }
837 Ok(new_table) => (new_table, RetryResult::Ok(())),
838 }
839}
840
841async fn load_or_create_table(
843 catalog: &dyn Catalog,
844 namespace: String,
845 table_name: String,
846 schema: &Schema,
847) -> anyhow::Result<iceberg::table::Table> {
848 let namespace_ident = NamespaceIdent::new(namespace.clone());
849 let table_ident = TableIdent::new(namespace_ident.clone(), table_name.clone());
850
851 match catalog.load_table(&table_ident).await {
853 Ok(table) => {
854 Ok(table)
857 }
858 Err(err) => {
859 if matches!(err.kind(), ErrorKind::TableNotFound { .. })
860 || err
861 .message()
862 .contains("Tried to load a table that does not exist")
863 {
864 let table_creation = TableCreation::builder()
868 .name(table_name.clone())
869 .schema(schema.clone())
870 .build();
874
875 catalog
876 .create_table(&namespace_ident, table_creation)
877 .await
878 .with_context(|| {
879 format!(
880 "Failed to create Iceberg table '{}' in namespace '{}'",
881 table_name, namespace
882 )
883 })
884 } else {
885 Err(err).context("Failed to load Iceberg table")
887 }
888 }
889 }
890}
891
892fn retrieve_upper_from_snapshots(
897 snapshots: &mut [Arc<Snapshot>],
898) -> anyhow::Result<Option<(Antichain<Timestamp>, u64)>> {
899 snapshots.sort_by(|a, b| Ord::cmp(&b.sequence_number(), &a.sequence_number()));
900
901 for snapshot in snapshots {
902 let props = &snapshot.summary().additional_properties;
903 if let (Some(frontier_json), Some(sink_version_str)) =
904 (props.get("mz-frontier"), props.get("mz-sink-version"))
905 {
906 let frontier: Vec<Timestamp> = serde_json::from_str(frontier_json)
907 .context("Failed to deserialize frontier from snapshot properties")?;
908 let frontier = Antichain::from_iter(frontier);
909
910 let sink_version = sink_version_str
911 .parse::<u64>()
912 .context("Failed to parse mz-sink-version from snapshot properties")?;
913
914 return Ok(Some((frontier, sink_version)));
915 }
916 if snapshot.summary().operation.as_str() != "replace" {
917 anyhow::bail!(
922 "Iceberg table is in an inconsistent state: snapshot {} has operation '{}' but is missing 'mz-frontier' property. Schema or partition spec evolution is not supported.",
923 snapshot.snapshot_id(),
924 snapshot.summary().operation.as_str(),
925 );
926 }
927 }
928
929 Ok(None)
930}
931
932fn relation_desc_to_iceberg_schema(
942 desc: &mz_repr::RelationDesc,
943) -> anyhow::Result<(ArrowSchema, SchemaRef)> {
944 let arrow_schema =
945 mz_arrow_util::builder::desc_to_schema_with_overrides(desc, iceberg_type_overrides)
946 .context("Failed to convert RelationDesc to Iceberg-compatible Arrow schema")?;
947
948 let arrow_schema_with_ids = add_field_ids_to_arrow_schema(arrow_schema);
949
950 let iceberg_schema = arrow_schema_to_schema(&arrow_schema_with_ids)
951 .context("Failed to convert Arrow schema to Iceberg schema")?;
952
953 Ok((arrow_schema_with_ids, Arc::new(iceberg_schema)))
954}
955
956fn equality_ids_for_indices(
961 current_schema: &Schema,
962 materialize_arrow_schema: &ArrowSchema,
963 equality_indices: &[usize],
964) -> anyhow::Result<Vec<i32>> {
965 let top_level_fields = current_schema.as_struct();
966
967 equality_indices
968 .iter()
969 .map(|index| {
970 let mz_field = materialize_arrow_schema
971 .fields()
972 .get(*index)
973 .with_context(|| format!("Equality delete key index {index} is out of bounds"))?;
974 let field_name = mz_field.name();
975 let iceberg_field = top_level_fields
976 .field_by_name(field_name)
977 .with_context(|| {
978 format!(
979 "Equality delete key column '{}' not found in Iceberg table schema",
980 field_name
981 )
982 })?;
983 Ok(iceberg_field.id)
984 })
985 .collect()
986}
987
988fn build_schema_with_op_column(schema: &ArrowSchema) -> ArrowSchema {
990 let mut fields: Vec<Arc<Field>> = schema.fields().iter().cloned().collect();
991 fields.push(Arc::new(Field::new("__op", DataType::Int32, false)));
992 ArrowSchema::new(fields)
993}
994
995#[allow(clippy::disallowed_types)]
1000fn build_schema_with_append_columns(schema: &ArrowSchema) -> ArrowSchema {
1001 use mz_storage_types::sinks::{ICEBERG_APPEND_DIFF_COLUMN, ICEBERG_APPEND_TIMESTAMP_COLUMN};
1002 let mut fields: Vec<Arc<Field>> = schema.fields().iter().cloned().collect();
1003 fields.push(Arc::new(Field::new(
1004 ICEBERG_APPEND_DIFF_COLUMN,
1005 DataType::Int32,
1006 false,
1007 )));
1008 fields.push(Arc::new(Field::new(
1009 ICEBERG_APPEND_TIMESTAMP_COLUMN,
1010 DataType::Int64,
1011 false,
1012 )));
1013
1014 add_field_ids_to_arrow_schema(ArrowSchema::new(fields).with_metadata(schema.metadata().clone()))
1015}
1016
1017fn mint_batch_descriptions<'scope, D>(
1022 name: String,
1023 sink_id: GlobalId,
1024 input: VecCollection<'scope, Timestamp, D, Diff>,
1025 sink: &StorageSinkDesc<CollectionMetadata, Timestamp>,
1026 connection: IcebergSinkConnection,
1027 storage_configuration: StorageConfiguration,
1028 initial_schema: SchemaRef,
1029) -> (
1030 VecCollection<'scope, Timestamp, D, Diff>,
1031 StreamVec<'scope, Timestamp, (Antichain<Timestamp>, Antichain<Timestamp>)>,
1032 StreamVec<'scope, Timestamp, Infallible>,
1033 StreamVec<'scope, Timestamp, HealthStatusMessage>,
1034 PressOnDropButton,
1035)
1036where
1037 D: Clone + 'static,
1038{
1039 let scope = input.scope();
1040 let name_for_error = name.clone();
1041 let name_for_logging = name.clone();
1042 let mut builder = OperatorBuilder::new(name, scope.clone());
1043 let sink_version = sink.version;
1044
1045 let hashed_id = sink_id.hashed();
1046 let is_active_worker = usize::cast_from(hashed_id) % scope.peers() == scope.index();
1047 let (_, table_ready_stream) = builder.new_output::<CapacityContainerBuilder<Vec<_>>>();
1048 let (output, output_stream) = builder.new_output();
1049 let (batch_desc_output, batch_desc_stream) =
1050 builder.new_output::<CapacityContainerBuilder<Vec<_>>>();
1051 let mut input =
1052 builder.new_input_for_many(input.inner, Pipeline, [&output, &batch_desc_output]);
1053
1054 let as_of = sink.as_of.clone();
1055 let commit_interval = sink
1056 .commit_interval
1057 .expect("the planner should have enforced this")
1058 .clone();
1059
1060 let (button, errors): (_, StreamVec<'scope, Timestamp, Rc<anyhow::Error>>) =
1061 builder.build_fallible(move |caps| {
1062 Box::pin(async move {
1063 let [table_ready_capset, data_capset, capset]: &mut [_; 3] = caps.try_into().unwrap();
1064 *data_capset = CapabilitySet::new();
1065
1066 if !is_active_worker {
1067 *capset = CapabilitySet::new();
1068 *data_capset = CapabilitySet::new();
1069 *table_ready_capset = CapabilitySet::new();
1070 while let Some(event) = input.next().await {
1071 match event {
1072 Event::Data([output_cap, _], mut data) => {
1073 output.give_container(&output_cap, &mut data);
1074 }
1075 Event::Progress(_) => {}
1076 }
1077 }
1078 return Ok(());
1079 }
1080
1081 let catalog = connection
1082 .catalog_connection
1083 .connect(&storage_configuration, InTask::Yes)
1084 .await
1085 .with_context(|| {
1086 format!(
1087 "Failed to connect to Iceberg catalog '{}' for table '{}.{}'",
1088 connection.catalog_connection.uri, connection.namespace, connection.table
1089 )
1090 })?;
1091
1092 let table = load_or_create_table(
1093 catalog.as_ref(),
1094 connection.namespace.clone(),
1095 connection.table.clone(),
1096 initial_schema.as_ref(),
1097 )
1098 .await?;
1099 debug!(
1100 ?sink_id,
1101 %name_for_logging,
1102 namespace = %connection.namespace,
1103 table = %connection.table,
1104 "iceberg mint loaded/created table"
1105 );
1106
1107 *table_ready_capset = CapabilitySet::new();
1108
1109 let mut snapshots: Vec<_> = table.metadata().snapshots().cloned().collect();
1110 let resume = retrieve_upper_from_snapshots(&mut snapshots)?;
1111 let (resume_upper, resume_version) = match resume {
1112 Some((f, v)) => (f, v),
1113 None => (Antichain::from_elem(Timestamp::minimum()), 0),
1114 };
1115 debug!(
1116 ?sink_id,
1117 %name_for_logging,
1118 resume_upper = %resume_upper.pretty(),
1119 resume_version,
1120 as_of = %as_of.pretty(),
1121 "iceberg mint resume position loaded"
1122 );
1123
1124 let overcompacted =
1126 *resume_upper != [Timestamp::minimum()] &&
1128 PartialOrder::less_than(&resume_upper, &as_of);
1130
1131 if overcompacted {
1132 let err = format!(
1133 "{name_for_error}: input compacted past resume upper: as_of {}, resume_upper: {}",
1134 as_of.pretty(),
1135 resume_upper.pretty()
1136 );
1137 return Err(anyhow::anyhow!("{err}"));
1141 };
1142
1143 if resume_version > sink_version {
1144 anyhow::bail!("Fenced off by newer sink version: resume_version {}, sink_version {}", resume_version, sink_version);
1145 }
1146
1147 let mut initialized = false;
1148 let mut observed_frontier;
1149 let mut max_seen_ts: Option<Timestamp> = None;
1150 let mut minted_batches = VecDeque::new();
1155 loop {
1156 if let Some(event) = input.next().await {
1157 match event {
1158 Event::Data([output_cap, _], mut data) => {
1159 if !initialized {
1160 for (_, ts, _) in data.iter() {
1161 match max_seen_ts.as_mut() {
1162 Some(max) => {
1163 if max.less_than(ts) {
1164 *max = ts.clone();
1165 }
1166 }
1167 None => {
1168 max_seen_ts = Some(ts.clone());
1169 }
1170 }
1171 }
1172 }
1173 output.give_container(&output_cap, &mut data);
1174 continue;
1175 }
1176 Event::Progress(frontier) => {
1177 observed_frontier = frontier;
1178 }
1179 }
1180 } else {
1181 return Ok(());
1182 }
1183
1184 if !initialized {
1185 if observed_frontier.is_empty() {
1186 if let Some(max_ts) = max_seen_ts.as_ref() {
1192 let synthesized_upper =
1193 Antichain::from_elem(max_ts.step_forward());
1194 debug!(
1195 ?sink_id,
1196 %name_for_logging,
1197 max_seen_ts = %max_ts,
1198 synthesized_upper = %synthesized_upper.pretty(),
1199 "iceberg mint input closed before initialization; using max seen ts"
1200 );
1201 observed_frontier = synthesized_upper;
1202 } else {
1203 debug!(
1204 ?sink_id,
1205 %name_for_logging,
1206 "iceberg mint input closed before initialization with no data"
1207 );
1208 return Ok(());
1210 }
1211 }
1212
1213 if PartialOrder::less_than(&observed_frontier, &resume_upper)
1216 || PartialOrder::less_than(&observed_frontier, &as_of)
1217 {
1218 continue;
1219 }
1220
1221 let mut batch_descriptions = vec![];
1222 let mut current_upper = observed_frontier.clone();
1223 let current_upper_ts = current_upper.as_option().expect("frontier not empty").clone();
1224
1225 let batch_lower = if PartialOrder::less_than(&resume_upper, &as_of) {
1230 as_of.clone()
1231 } else {
1232 resume_upper.clone()
1233 };
1234
1235 if batch_lower == current_upper {
1236 current_upper = Antichain::from_elem(current_upper_ts.step_forward());
1239 }
1240
1241 let batch_description = (batch_lower.clone(), current_upper.clone());
1242 debug!(
1243 ?sink_id,
1244 %name_for_logging,
1245 batch_lower = %batch_lower.pretty(),
1246 current_upper = %current_upper.pretty(),
1247 "iceberg mint initializing (catch-up batch)"
1248 );
1249 debug!(
1250 "{}: creating catch-up batch [{}, {})",
1251 name_for_logging,
1252 batch_lower.pretty(),
1253 current_upper.pretty()
1254 );
1255 batch_descriptions.push(batch_description);
1256 for i in 1..INITIAL_DESCRIPTIONS_TO_MINT {
1258 let duration_millis = commit_interval.as_millis()
1259 .checked_mul(u128::from(i))
1260 .expect("commit interval multiplication overflow");
1261 let duration_ts = Timestamp::new(
1262 u64::try_from(duration_millis)
1263 .expect("commit interval too large for u64"),
1264 );
1265 let desired_batch_upper = Antichain::from_elem(
1266 current_upper_ts.step_forward_by(&duration_ts),
1267 );
1268
1269 let batch_description =
1270 (current_upper.clone(), desired_batch_upper.clone());
1271 debug!(
1272 "{}: minting future batch {}/{} [{}, {})",
1273 name_for_logging,
1274 i,
1275 INITIAL_DESCRIPTIONS_TO_MINT,
1276 current_upper.pretty(),
1277 desired_batch_upper.pretty()
1278 );
1279 current_upper = batch_description.1.clone();
1280 batch_descriptions.push(batch_description);
1281 }
1282
1283 minted_batches.extend(batch_descriptions.clone());
1284
1285 for desc in batch_descriptions {
1286 batch_desc_output.give(&capset[0], desc);
1287 }
1288
1289 capset.downgrade(current_upper);
1290
1291 initialized = true;
1292 } else {
1293 if observed_frontier.is_empty() {
1294 return Ok(());
1296 }
1297 while let Some(oldest_desc) = minted_batches.front() {
1300 let oldest_upper = &oldest_desc.1;
1301 if !PartialOrder::less_equal(oldest_upper, &observed_frontier) {
1302 break;
1303 }
1304
1305 let newest_upper = minted_batches.back().unwrap().1.clone();
1306 let new_lower = newest_upper.clone();
1307 let duration_ts = Timestamp::new(commit_interval.as_millis()
1308 .try_into()
1309 .expect("commit interval too large for u64"));
1310 let new_upper = Antichain::from_elem(newest_upper
1311 .as_option()
1312 .unwrap()
1313 .step_forward_by(&duration_ts));
1314
1315 let new_batch_description = (new_lower.clone(), new_upper.clone());
1316 minted_batches.pop_front();
1317 minted_batches.push_back(new_batch_description.clone());
1318
1319 batch_desc_output.give(&capset[0], new_batch_description);
1320
1321 capset.downgrade(new_upper);
1322 }
1323 }
1324 }
1325 })
1326 });
1327
1328 let statuses = errors.map(|error| HealthStatusMessage {
1329 id: None,
1330 update: HealthStatusUpdate::halting(format!("{}", error.display_with_causes()), None),
1331 namespace: StatusNamespace::Iceberg,
1332 });
1333 (
1334 output_stream.as_collection(),
1335 batch_desc_stream,
1336 table_ready_stream,
1337 statuses,
1338 button.press_on_drop(),
1339 )
1340}
1341
1342#[derive(Clone, Debug, Serialize, Deserialize)]
1343#[serde(try_from = "AvroDataFile", into = "AvroDataFile")]
1344struct SerializableDataFile {
1345 pub data_file: DataFile,
1346 pub schema: Schema,
1347}
1348
1349#[derive(Clone, Debug, Serialize, Deserialize)]
1357struct AvroDataFile {
1358 pub data_file: Vec<u8>,
1359 pub schema: Vec<u8>,
1361}
1362
1363impl From<SerializableDataFile> for AvroDataFile {
1364 fn from(value: SerializableDataFile) -> Self {
1365 let mut data_file = Vec::new();
1366 write_data_files_to_avro(
1367 &mut data_file,
1368 [value.data_file],
1369 &StructType::new(vec![]),
1370 FormatVersion::V2,
1371 )
1372 .expect("serialization into buffer");
1373 let schema = serde_json::to_vec(&value.schema).expect("schema serialization");
1374 AvroDataFile { data_file, schema }
1375 }
1376}
1377
1378impl TryFrom<AvroDataFile> for SerializableDataFile {
1379 type Error = String;
1380
1381 fn try_from(value: AvroDataFile) -> Result<Self, Self::Error> {
1382 let schema: Schema = serde_json::from_slice(&value.schema)
1383 .map_err(|e| format!("Failed to deserialize schema: {}", e))?;
1384 let data_files = read_data_files_from_avro(
1385 &mut &*value.data_file,
1386 &schema,
1387 0,
1388 &StructType::new(vec![]),
1389 FormatVersion::V2,
1390 )
1391 .map_err_to_string_with_causes()?;
1392 let Some(data_file) = data_files.into_iter().next() else {
1393 return Err("No DataFile found in Avro data".into());
1394 };
1395 Ok(SerializableDataFile { data_file, schema })
1396 }
1397}
1398
1399#[derive(Clone, Debug, Serialize, Deserialize)]
1401struct BoundedDataFile {
1402 pub data_file: SerializableDataFile,
1403 pub batch_desc: (Antichain<Timestamp>, Antichain<Timestamp>),
1404}
1405
1406impl BoundedDataFile {
1407 pub fn new(
1408 file: DataFile,
1409 schema: Schema,
1410 batch_desc: (Antichain<Timestamp>, Antichain<Timestamp>),
1411 ) -> Self {
1412 Self {
1413 data_file: SerializableDataFile {
1414 data_file: file,
1415 schema,
1416 },
1417 batch_desc,
1418 }
1419 }
1420
1421 pub fn batch_desc(&self) -> &(Antichain<Timestamp>, Antichain<Timestamp>) {
1422 &self.batch_desc
1423 }
1424
1425 pub fn data_file(&self) -> &DataFile {
1426 &self.data_file.data_file
1427 }
1428
1429 pub fn into_data_file(self) -> DataFile {
1430 self.data_file.data_file
1431 }
1432}
1433
1434#[derive(Clone, Debug, Default)]
1436struct BoundedDataFileSet {
1437 pub data_files: Vec<BoundedDataFile>,
1438}
1439
1440fn write_data_files<'scope, H: EnvelopeHandler + 'static>(
1446 name: String,
1447 input: VecCollection<'scope, Timestamp, (Option<Row>, DiffPair<Row>), Diff>,
1448 batch_desc_input: StreamVec<'scope, Timestamp, (Antichain<Timestamp>, Antichain<Timestamp>)>,
1449 table_ready_stream: StreamVec<'scope, Timestamp, Infallible>,
1450 as_of: Antichain<Timestamp>,
1451 connection: IcebergSinkConnection,
1452 storage_configuration: StorageConfiguration,
1453 materialize_arrow_schema: Arc<ArrowSchema>,
1454 metrics: Arc<IcebergSinkMetrics>,
1455 statistics: SinkStatistics,
1456) -> (
1457 StreamVec<'scope, Timestamp, BoundedDataFile>,
1458 StreamVec<'scope, Timestamp, HealthStatusMessage>,
1459 PressOnDropButton,
1460) {
1461 let scope = input.scope();
1462 let name_for_logging = name.clone();
1463 let mut builder = OperatorBuilder::new(name, scope.clone());
1464
1465 let (output, output_stream) = builder.new_output::<CapacityContainerBuilder<_>>();
1466
1467 let mut table_ready_input = builder.new_disconnected_input(table_ready_stream, Pipeline);
1468 let mut batch_desc_input =
1469 builder.new_input_for(batch_desc_input.broadcast(), Pipeline, &output);
1470 let mut input = builder.new_disconnected_input(input.inner, Pipeline);
1471
1472 let (button, errors) = builder.build_fallible(move |caps| {
1473 Box::pin(async move {
1474 let [capset]: &mut [_; 1] = caps.try_into().unwrap();
1475 let catalog = connection
1476 .catalog_connection
1477 .connect(&storage_configuration, InTask::Yes)
1478 .await
1479 .with_context(|| {
1480 format!(
1481 "Failed to connect to Iceberg catalog '{}' for table '{}.{}'",
1482 connection.catalog_connection.uri, connection.namespace, connection.table
1483 )
1484 })?;
1485
1486 let namespace_ident = NamespaceIdent::new(connection.namespace.clone());
1487 let table_ident = TableIdent::new(namespace_ident, connection.table.clone());
1488 while let Some(_) = table_ready_input.next().await {
1489 }
1491 let table = catalog
1492 .load_table(&table_ident)
1493 .await
1494 .with_context(|| {
1495 format!(
1496 "Failed to load Iceberg table '{}.{}' in write_data_files operator",
1497 connection.namespace, connection.table
1498 )
1499 })?;
1500
1501 let table_metadata = table.metadata().clone();
1502 let current_schema = Arc::clone(table_metadata.current_schema());
1503
1504 let arrow_schema = Arc::new(
1508 merge_materialize_metadata_into_iceberg_schema(
1509 materialize_arrow_schema.as_ref(),
1510 current_schema.as_ref(),
1511 )
1512 .context("Failed to merge Materialize metadata into Iceberg schema")?,
1513 );
1514
1515 let location = table_metadata.location();
1519 let corrected_location = match location.rsplit_once("/metadata/") {
1520 Some((a, b)) if b.ends_with(".metadata.json") => a,
1521 _ => location,
1522 };
1523
1524 let data_location = format!("{}/data", corrected_location);
1525 let location_generator = DefaultLocationGenerator::with_data_location(data_location);
1526
1527 let unique_suffix = format!("-{}", uuid::Uuid::new_v4());
1529 let file_name_generator = DefaultFileNameGenerator::new(
1530 PARQUET_FILE_PREFIX.to_string(),
1531 Some(unique_suffix),
1532 iceberg::spec::DataFileFormat::Parquet,
1533 );
1534
1535 let file_io = table.file_io().clone();
1536
1537 let writer_properties = WriterProperties::new();
1538
1539 let ctx = WriterContext {
1540 arrow_schema,
1541 current_schema: Arc::clone(¤t_schema),
1542 file_io,
1543 location_generator,
1544 file_name_generator,
1545 writer_properties,
1546 };
1547 let handler = H::new(ctx, &connection, &materialize_arrow_schema)?;
1548
1549 let mut stashed_rows: BTreeMap<Timestamp, Vec<(Option<Row>, DiffPair<Row>)>> =
1552 BTreeMap::new();
1553
1554 #[allow(clippy::disallowed_types)]
1559 let mut in_flight_batches: std::collections::HashMap<
1560 (Antichain<Timestamp>, Antichain<Timestamp>),
1561 Box<dyn IcebergWriter>,
1562 > = std::collections::HashMap::new();
1563
1564 let mut batch_description_frontier = Antichain::from_elem(Timestamp::minimum());
1565 let mut processed_batch_description_frontier =
1566 Antichain::from_elem(Timestamp::minimum());
1567 let mut input_frontier = Antichain::from_elem(Timestamp::minimum());
1568 let mut processed_input_frontier = Antichain::from_elem(Timestamp::minimum());
1569
1570 let mut min_batch_lower: Option<Antichain<Timestamp>> = None;
1572
1573 while !(batch_description_frontier.is_empty() && input_frontier.is_empty()) {
1574 let mut staged_messages_since_flush: u64 = 0;
1575 tokio::select! {
1576 _ = batch_desc_input.ready() => {},
1577 _ = input.ready() => {}
1578 }
1579
1580 while let Some(event) = batch_desc_input.next_sync() {
1581 match event {
1582 Event::Data(_cap, data) => {
1583 for batch_desc in data {
1584 let (lower, upper) = &batch_desc;
1585
1586 if min_batch_lower.is_none() {
1588 min_batch_lower = Some(lower.clone());
1589 debug!(
1590 "{}: set min_batch_lower to {}",
1591 name_for_logging,
1592 lower.pretty()
1593 );
1594
1595 let to_remove: Vec<_> = stashed_rows
1597 .keys()
1598 .filter(|ts| {
1599 let ts_antichain = Antichain::from_elem((*ts).clone());
1600 PartialOrder::less_than(&ts_antichain, lower)
1601 })
1602 .cloned()
1603 .collect();
1604
1605 if !to_remove.is_empty() {
1606 let mut removed_count = 0;
1607 for ts in to_remove {
1608 if let Some(rows) = stashed_rows.remove(&ts) {
1609 removed_count += rows.len();
1610 for _ in &rows {
1611 metrics.stashed_rows.dec();
1612 }
1613 }
1614 }
1615 debug!(
1616 "{}: pruned {} already-committed rows (< min_batch_lower)",
1617 name_for_logging,
1618 removed_count
1619 );
1620 }
1621 }
1622
1623 let is_snapshot = lower == &as_of;
1625 debug!(
1626 "{}: received batch description [{}, {}), snapshot={}",
1627 name_for_logging,
1628 lower.pretty(),
1629 upper.pretty(),
1630 is_snapshot
1631 );
1632 let mut batch_writer =
1633 handler.create_writer(is_snapshot).await?;
1634 let row_ts_keys: Vec<_> = stashed_rows.keys().cloned().collect();
1636 let mut drained_count = 0;
1637 for row_ts in row_ts_keys {
1638 let ts = Antichain::from_elem(row_ts.clone());
1639 if PartialOrder::less_equal(lower, &ts)
1640 && PartialOrder::less_than(&ts, upper)
1641 {
1642 if let Some(rows) = stashed_rows.remove(&row_ts) {
1643 drained_count += rows.len();
1644 for (_row, diff_pair) in rows {
1645 metrics.stashed_rows.dec();
1646 let record_batch = handler.row_to_batch(
1647 diff_pair.clone(),
1648 row_ts.clone(),
1649 )
1650 .context("failed to convert row to recordbatch")?;
1651 batch_writer.write(record_batch).await?;
1652 staged_messages_since_flush += 1;
1653 if staged_messages_since_flush >= 10_000 {
1654 statistics.inc_messages_staged_by(
1655 staged_messages_since_flush,
1656 );
1657 staged_messages_since_flush = 0;
1658 }
1659 }
1660 }
1661 }
1662 }
1663 if drained_count > 0 {
1664 debug!(
1665 "{}: drained {} stashed rows into batch [{}, {})",
1666 name_for_logging,
1667 drained_count,
1668 lower.pretty(),
1669 upper.pretty()
1670 );
1671 }
1672 let prev =
1673 in_flight_batches.insert(batch_desc.clone(), batch_writer);
1674 if prev.is_some() {
1675 anyhow::bail!(
1676 "Duplicate batch description received for description {:?}",
1677 batch_desc
1678 );
1679 }
1680 }
1681 }
1682 Event::Progress(frontier) => {
1683 batch_description_frontier = frontier;
1684 }
1685 }
1686 }
1687
1688 let ready_events = std::iter::from_fn(|| input.next_sync()).collect_vec();
1689 for event in ready_events {
1690 match event {
1691 Event::Data(_cap, data) => {
1692 let mut dropped_per_time = BTreeMap::new();
1693 let mut stashed_per_time = BTreeMap::new();
1694 for ((row, diff_pair), ts, _diff) in data {
1695 let row_ts = ts.clone();
1696 let ts_antichain = Antichain::from_elem(row_ts.clone());
1697 let mut written = false;
1698 for (batch_desc, batch_writer) in in_flight_batches.iter_mut() {
1700 let (lower, upper) = batch_desc;
1701 if PartialOrder::less_equal(lower, &ts_antichain)
1702 && PartialOrder::less_than(&ts_antichain, upper)
1703 {
1704 let record_batch = handler.row_to_batch(
1705 diff_pair.clone(),
1706 row_ts.clone(),
1707 )
1708 .context("failed to convert row to recordbatch")?;
1709 batch_writer.write(record_batch).await?;
1710 staged_messages_since_flush += 1;
1711 if staged_messages_since_flush >= 10_000 {
1712 statistics.inc_messages_staged_by(
1713 staged_messages_since_flush,
1714 );
1715 staged_messages_since_flush = 0;
1716 }
1717 written = true;
1718 break;
1719 }
1720 }
1721 if !written {
1722 if let Some(ref min_lower) = min_batch_lower {
1724 if PartialOrder::less_than(&ts_antichain, min_lower) {
1725 dropped_per_time
1726 .entry(ts_antichain.into_option().unwrap())
1727 .and_modify(|c| *c += 1)
1728 .or_insert(1);
1729 continue;
1730 }
1731 }
1732
1733 stashed_per_time.entry(ts).and_modify(|c| *c += 1).or_insert(1);
1734 let entry = stashed_rows.entry(row_ts).or_default();
1735 metrics.stashed_rows.inc();
1736 entry.push((row, diff_pair));
1737 }
1738 }
1739
1740 for (ts, count) in dropped_per_time {
1741 debug!(
1742 "{}: dropped {} rows at timestamp {} (< min_batch_lower, already committed)",
1743 name_for_logging, count, ts
1744 );
1745 }
1746
1747 for (ts, count) in stashed_per_time {
1748 debug!(
1749 "{}: stashed {} rows at timestamp {} (waiting for batch description)",
1750 name_for_logging, count, ts
1751 );
1752 }
1753 }
1754 Event::Progress(frontier) => {
1755 input_frontier = frontier;
1756 }
1757 }
1758 }
1759 if staged_messages_since_flush > 0 {
1760 statistics.inc_messages_staged_by(staged_messages_since_flush);
1761 }
1762
1763 if PartialOrder::less_than(
1765 &processed_batch_description_frontier,
1766 &batch_description_frontier,
1767 ) || PartialOrder::less_than(&processed_input_frontier, &input_frontier)
1768 {
1769 let ready_batches: Vec<_> = in_flight_batches
1775 .extract_if(|(lower, upper), _| {
1776 PartialOrder::less_than(lower, &batch_description_frontier)
1777 && PartialOrder::less_equal(upper, &input_frontier)
1778 })
1779 .collect();
1780
1781 if !ready_batches.is_empty() {
1782 debug!(
1783 "{}: closing {} batches (batch_frontier: {}, input_frontier: {})",
1784 name_for_logging,
1785 ready_batches.len(),
1786 batch_description_frontier.pretty(),
1787 input_frontier.pretty()
1788 );
1789 let mut max_upper = Antichain::from_elem(Timestamp::minimum());
1790 for (desc, mut batch_writer) in ready_batches {
1791 let close_started_at = Instant::now();
1792 let data_files = batch_writer.close().await;
1793 metrics
1794 .writer_close_duration_seconds
1795 .observe(close_started_at.elapsed().as_secs_f64());
1796 let data_files = data_files.context("Failed to close batch writer")?;
1797 debug!(
1798 "{}: closed batch [{}, {}), wrote {} files",
1799 name_for_logging,
1800 desc.0.pretty(),
1801 desc.1.pretty(),
1802 data_files.len()
1803 );
1804 for data_file in data_files {
1805 match data_file.content_type() {
1806 iceberg::spec::DataContentType::Data => {
1807 metrics.data_files_written.inc();
1808 }
1809 iceberg::spec::DataContentType::PositionDeletes
1810 | iceberg::spec::DataContentType::EqualityDeletes => {
1811 metrics.delete_files_written.inc();
1812 }
1813 }
1814 statistics.inc_messages_staged_by(data_file.record_count());
1815 statistics.inc_bytes_staged_by(data_file.file_size_in_bytes());
1816 let file = BoundedDataFile::new(
1817 data_file,
1818 current_schema.as_ref().clone(),
1819 desc.clone(),
1820 );
1821 output.give(&capset[0], file);
1822 }
1823
1824 max_upper = max_upper.join(&desc.1);
1825 }
1826
1827 capset.downgrade(max_upper);
1828 }
1829 processed_batch_description_frontier.clone_from(&batch_description_frontier);
1830 processed_input_frontier.clone_from(&input_frontier);
1831 }
1832 }
1833 Ok(())
1834 })
1835 });
1836
1837 let statuses = errors.map(|error| HealthStatusMessage {
1838 id: None,
1839 update: HealthStatusUpdate::halting(format!("{}", error.display_with_causes()), None),
1840 namespace: StatusNamespace::Iceberg,
1841 });
1842 (output_stream, statuses, button.press_on_drop())
1843}
1844
1845#[cfg(test)]
1846mod tests {
1847 use super::*;
1848 use iceberg::spec::{PrimitiveType, Type};
1849 use mz_repr::SqlScalarType;
1850 use mz_storage_types::sinks::ICEBERG_UINT64_DECIMAL_PRECISION;
1851
1852 #[mz_ore::test]
1853 fn test_iceberg_type_overrides() {
1854 let result = iceberg_type_overrides(&SqlScalarType::UInt16);
1856 assert_eq!(result.unwrap().0, DataType::Int32);
1857
1858 let result = iceberg_type_overrides(&SqlScalarType::UInt32);
1860 assert_eq!(result.unwrap().0, DataType::Int64);
1861
1862 let result = iceberg_type_overrides(&SqlScalarType::UInt64);
1864 assert_eq!(
1865 result.unwrap().0,
1866 DataType::Decimal128(ICEBERG_UINT64_DECIMAL_PRECISION, 0)
1867 );
1868
1869 let result = iceberg_type_overrides(&SqlScalarType::MzTimestamp);
1871 assert_eq!(
1872 result.unwrap().0,
1873 DataType::Decimal128(ICEBERG_UINT64_DECIMAL_PRECISION, 0)
1874 );
1875
1876 assert!(iceberg_type_overrides(&SqlScalarType::Int32).is_none());
1878 assert!(iceberg_type_overrides(&SqlScalarType::String).is_none());
1879 assert!(iceberg_type_overrides(&SqlScalarType::Bool).is_none());
1880 }
1881
1882 #[mz_ore::test]
1883 fn test_iceberg_schema_with_nested_uint64() {
1884 let desc = mz_repr::RelationDesc::builder()
1887 .with_column(
1888 "items",
1889 SqlScalarType::List {
1890 element_type: Box::new(SqlScalarType::UInt64),
1891 custom_id: None,
1892 }
1893 .nullable(true),
1894 )
1895 .finish();
1896
1897 let schema =
1898 mz_arrow_util::builder::desc_to_schema_with_overrides(&desc, iceberg_type_overrides)
1899 .expect("schema conversion should succeed");
1900
1901 if let DataType::List(field) = schema.field(0).data_type() {
1903 assert_eq!(
1904 field.data_type(),
1905 &DataType::Decimal128(ICEBERG_UINT64_DECIMAL_PRECISION, 0)
1906 );
1907 } else {
1908 panic!("Expected List type");
1909 }
1910 }
1911
1912 #[mz_ore::test]
1913 fn test_iceberg_interval_override() {
1914 let result = iceberg_type_overrides(&SqlScalarType::Interval);
1916 assert_eq!(result.unwrap().0, DataType::LargeUtf8);
1917
1918 let desc = mz_repr::RelationDesc::builder()
1920 .with_column("id", SqlScalarType::Int32.nullable(false))
1921 .with_column("dur", SqlScalarType::Interval.nullable(true))
1922 .finish();
1923
1924 let (arrow_schema, iceberg_schema) =
1925 relation_desc_to_iceberg_schema(&desc).expect("schema conversion should succeed");
1926
1927 assert_eq!(arrow_schema.field(1).data_type(), &DataType::LargeUtf8);
1929
1930 let field = iceberg_schema
1932 .as_struct()
1933 .field_by_name("dur")
1934 .expect("field should exist");
1935 assert_eq!(*field.field_type, Type::Primitive(PrimitiveType::String));
1936 }
1937
1938 #[mz_ore::test]
1939 fn test_iceberg_range_schema() {
1940 let desc = mz_repr::RelationDesc::builder()
1942 .with_column("id", SqlScalarType::Int32.nullable(false))
1943 .with_column(
1944 "r",
1945 SqlScalarType::Range {
1946 element_type: Box::new(SqlScalarType::Int32),
1947 }
1948 .nullable(true),
1949 )
1950 .finish();
1951
1952 let (_arrow_schema, iceberg_schema) =
1953 relation_desc_to_iceberg_schema(&desc).expect("schema conversion should succeed");
1954
1955 let field = iceberg_schema
1957 .as_struct()
1958 .field_by_name("r")
1959 .expect("field should exist");
1960 assert!(
1961 matches!(&*field.field_type, Type::Struct(_)),
1962 "range should be struct, got: {:?}",
1963 field.field_type
1964 );
1965 }
1966
1967 #[mz_ore::test]
1968 fn equality_ids_follow_iceberg_field_ids() {
1969 let map_entries = Field::new(
1970 "entries",
1971 DataType::Struct(
1972 vec![
1973 Field::new("key", DataType::Utf8, false),
1974 Field::new("value", DataType::Utf8, true),
1975 ]
1976 .into(),
1977 ),
1978 false,
1979 );
1980 let materialize_arrow_schema = ArrowSchema::new(vec![
1981 Field::new("attrs", DataType::Map(Arc::new(map_entries), false), true),
1982 Field::new("key_col", DataType::Int32, false),
1983 ]);
1984 let materialize_arrow_schema = add_field_ids_to_arrow_schema(materialize_arrow_schema);
1985 let iceberg_schema = arrow_schema_to_schema(&materialize_arrow_schema)
1986 .expect("schema conversion should succeed");
1987
1988 let equality_ids =
1989 equality_ids_for_indices(&iceberg_schema, &materialize_arrow_schema, &[1])
1990 .expect("field lookup should succeed");
1991
1992 let expected_id = iceberg_schema
1993 .as_struct()
1994 .field_by_name("key_col")
1995 .expect("top-level field should exist")
1996 .id;
1997 assert_eq!(equality_ids, vec![expected_id]);
1998 assert_ne!(expected_id, 2);
1999 }
2000}
2001
2002fn commit_to_iceberg<'scope>(
2006 name: String,
2007 sink_id: GlobalId,
2008 sink_version: u64,
2009 batch_input: StreamVec<'scope, Timestamp, BoundedDataFile>,
2010 batch_desc_input: StreamVec<'scope, Timestamp, (Antichain<Timestamp>, Antichain<Timestamp>)>,
2011 table_ready_stream: StreamVec<'scope, Timestamp, Infallible>,
2012 write_frontier: Rc<RefCell<Antichain<Timestamp>>>,
2013 connection: IcebergSinkConnection,
2014 storage_configuration: StorageConfiguration,
2015 write_handle: impl Future<
2016 Output = anyhow::Result<WriteHandle<SourceData, (), Timestamp, StorageDiff>>,
2017 > + 'static,
2018 metrics: Arc<IcebergSinkMetrics>,
2019 statistics: SinkStatistics,
2020) -> (
2021 StreamVec<'scope, Timestamp, HealthStatusMessage>,
2022 PressOnDropButton,
2023) {
2024 let scope = batch_input.scope();
2025 let mut builder = OperatorBuilder::new(name, scope.clone());
2026
2027 let hashed_id = sink_id.hashed();
2028 let is_active_worker = usize::cast_from(hashed_id) % scope.peers() == scope.index();
2029 let name_for_logging = format!("{sink_id}-commit-to-iceberg");
2030
2031 let mut input = builder.new_disconnected_input(batch_input, Exchange::new(move |_| hashed_id));
2032 let mut batch_desc_input =
2033 builder.new_disconnected_input(batch_desc_input, Exchange::new(move |_| hashed_id));
2034 let mut table_ready_input = builder.new_disconnected_input(table_ready_stream, Pipeline);
2035
2036 let (button, errors) = builder.build_fallible(move |_caps| {
2037 Box::pin(async move {
2038 if !is_active_worker {
2039 write_frontier.borrow_mut().clear();
2040 return Ok(());
2041 }
2042
2043 let catalog = connection
2044 .catalog_connection
2045 .connect(&storage_configuration, InTask::Yes)
2046 .await
2047 .with_context(|| {
2048 format!(
2049 "Failed to connect to Iceberg catalog '{}' for table '{}.{}'",
2050 connection.catalog_connection.uri, connection.namespace, connection.table
2051 )
2052 })?;
2053
2054 let mut write_handle = write_handle.await?;
2055
2056 let namespace_ident = NamespaceIdent::new(connection.namespace.clone());
2057 let table_ident = TableIdent::new(namespace_ident, connection.table.clone());
2058 while let Some(_) = table_ready_input.next().await {
2059 }
2061 let mut table = catalog.load_table(&table_ident).await.with_context(|| {
2062 format!(
2063 "Failed to load Iceberg table '{}.{}' in commit_to_iceberg operator",
2064 connection.namespace, connection.table
2065 )
2066 })?;
2067
2068 #[allow(clippy::disallowed_types)]
2069 let mut batch_descriptions: std::collections::HashMap<
2070 (Antichain<Timestamp>, Antichain<Timestamp>),
2071 BoundedDataFileSet,
2072 > = std::collections::HashMap::new();
2073
2074 let mut batch_description_frontier = Antichain::from_elem(Timestamp::minimum());
2075 let mut input_frontier = Antichain::from_elem(Timestamp::minimum());
2076
2077 while !(batch_description_frontier.is_empty() && input_frontier.is_empty()) {
2078 tokio::select! {
2079 _ = batch_desc_input.ready() => {},
2080 _ = input.ready() => {}
2081 }
2082
2083 while let Some(event) = batch_desc_input.next_sync() {
2084 match event {
2085 Event::Data(_cap, data) => {
2086 for batch_desc in data {
2087 let prev = batch_descriptions
2088 .insert(batch_desc, BoundedDataFileSet { data_files: vec![] });
2089 if let Some(prev) = prev {
2090 anyhow::bail!(
2091 "Duplicate batch description received \
2092 in commit operator: {:?}",
2093 prev
2094 );
2095 }
2096 }
2097 }
2098 Event::Progress(frontier) => {
2099 batch_description_frontier = frontier;
2100 }
2101 }
2102 }
2103
2104 let ready_events = std::iter::from_fn(|| input.next_sync()).collect_vec();
2105 for event in ready_events {
2106 match event {
2107 Event::Data(_cap, data) => {
2108 for bounded_data_file in data {
2109 let entry = batch_descriptions
2110 .entry(bounded_data_file.batch_desc().clone())
2111 .or_default();
2112 entry.data_files.push(bounded_data_file);
2113 }
2114 }
2115 Event::Progress(frontier) => {
2116 input_frontier = frontier;
2117 }
2118 }
2119 }
2120
2121 let mut done_batches: Vec<_> = batch_descriptions
2127 .keys()
2128 .filter(|(lower, _upper)| PartialOrder::less_than(lower, &input_frontier))
2129 .cloned()
2130 .collect();
2131
2132 done_batches.sort_by(|a, b| {
2134 if PartialOrder::less_than(a, b) {
2135 Ordering::Less
2136 } else if PartialOrder::less_than(b, a) {
2137 Ordering::Greater
2138 } else {
2139 Ordering::Equal
2140 }
2141 });
2142
2143 for batch in done_batches {
2144 let file_set = batch_descriptions.remove(&batch).unwrap();
2145
2146 let mut data_files = vec![];
2147 let mut delete_files = vec![];
2148 let mut total_messages: u64 = 0;
2150 let mut total_bytes: u64 = 0;
2151 for file in file_set.data_files {
2152 total_messages += file.data_file().record_count();
2153 total_bytes += file.data_file().file_size_in_bytes();
2154 match file.data_file().content_type() {
2155 iceberg::spec::DataContentType::Data => {
2156 data_files.push(file.into_data_file());
2157 }
2158 iceberg::spec::DataContentType::PositionDeletes
2159 | iceberg::spec::DataContentType::EqualityDeletes => {
2160 delete_files.push(file.into_data_file());
2161 }
2162 }
2163 }
2164
2165 debug!(
2166 ?sink_id,
2167 %name_for_logging,
2168 lower = %batch.0.pretty(),
2169 upper = %batch.1.pretty(),
2170 data_files = data_files.len(),
2171 delete_files = delete_files.len(),
2172 total_messages,
2173 total_bytes,
2174 "iceberg commit applying batch"
2175 );
2176
2177 let instant = Instant::now();
2178
2179 let frontier = batch.1.clone();
2180 let frontier_json = serde_json::to_string(&frontier.elements())
2181 .context("Failed to serialize frontier to JSON")?;
2182 let snapshot_properties = vec![
2183 ("mz-sink-id".to_string(), sink_id.to_string()),
2184 ("mz-frontier".to_string(), frontier_json),
2185 ("mz-sink-version".to_string(), sink_version.to_string()),
2186 ];
2187
2188 let (table_state, commit_result) = Retry::default()
2189 .max_tries(5)
2190 .retry_async_with_state(table, |_, table| {
2191 let snapshot_properties = snapshot_properties.clone();
2192 let data_files = data_files.clone();
2193 let delete_files = delete_files.clone();
2194 let metrics = Arc::clone(&metrics);
2195 let catalog = Arc::clone(&catalog);
2196 let conn_namespace = connection.namespace.clone();
2197 let conn_table = connection.table.clone();
2198 let frontier = frontier.clone();
2199 let batch_lower = batch.0.clone();
2200 let batch_upper = batch.1.clone();
2201 async move {
2202 try_commit_batch(
2203 table,
2204 snapshot_properties,
2205 data_files,
2206 delete_files,
2207 catalog.as_ref(),
2208 &conn_namespace,
2209 &conn_table,
2210 sink_version,
2211 &frontier,
2212 &batch_lower,
2213 &batch_upper,
2214 &metrics,
2215 )
2216 .await
2217 }
2218 })
2219 .await;
2220 let commit_result = commit_result.with_context(|| {
2221 format!(
2222 "failed to commit batch to Iceberg table '{}.{}'",
2223 connection.namespace, connection.table
2224 )
2225 });
2226 table = table_state;
2227 let duration = instant.elapsed();
2228 metrics
2229 .commit_duration_seconds
2230 .observe(duration.as_secs_f64());
2231 commit_result?;
2232
2233 debug!(
2234 ?sink_id,
2235 %name_for_logging,
2236 lower = %batch.0.pretty(),
2237 upper = %batch.1.pretty(),
2238 total_messages,
2239 total_bytes,
2240 ?duration,
2241 "iceberg commit applied batch"
2242 );
2243
2244 metrics.snapshots_committed.inc();
2245 statistics.inc_messages_committed_by(total_messages);
2246 statistics.inc_bytes_committed_by(total_bytes);
2247
2248 let mut expect_upper = write_handle.shared_upper();
2249 loop {
2250 if PartialOrder::less_equal(&frontier, &expect_upper) {
2251 break;
2253 }
2254
2255 const EMPTY: &[((SourceData, ()), Timestamp, StorageDiff)] = &[];
2256 match write_handle
2257 .compare_and_append(EMPTY, expect_upper, frontier.clone())
2258 .await
2259 .expect("valid usage")
2260 {
2261 Ok(()) => break,
2262 Err(mismatch) => {
2263 expect_upper = mismatch.current;
2264 }
2265 }
2266 }
2267 write_frontier.borrow_mut().clone_from(&frontier);
2268 }
2269 }
2270
2271 Ok(())
2272 })
2273 });
2274
2275 let statuses = errors.map(|error| HealthStatusMessage {
2276 id: None,
2277 update: HealthStatusUpdate::halting(format!("{}", error.display_with_causes()), None),
2278 namespace: StatusNamespace::Iceberg,
2279 });
2280
2281 (statuses, button.press_on_drop())
2282}
2283
2284impl<'scope> SinkRender<'scope> for IcebergSinkConnection {
2285 fn get_key_indices(&self) -> Option<&[usize]> {
2286 self.key_desc_and_indices
2287 .as_ref()
2288 .map(|(_, indices)| indices.as_slice())
2289 }
2290
2291 fn get_relation_key_indices(&self) -> Option<&[usize]> {
2292 self.relation_key_indices.as_deref()
2293 }
2294
2295 fn render_sink(
2296 &self,
2297 storage_state: &mut StorageState,
2298 sink: &StorageSinkDesc<CollectionMetadata, Timestamp>,
2299 sink_id: GlobalId,
2300 batches: SinkBatchStream<'scope>,
2301 key_is_synthetic: bool,
2302 _err_collection: VecCollection<'scope, Timestamp, DataflowError, Diff>,
2303 ) -> (
2304 StreamVec<'scope, Timestamp, HealthStatusMessage>,
2305 Vec<PressOnDropButton>,
2306 ) {
2307 let scope = batches.scope();
2308
2309 let (input, walker_button) = walk_sink_arrangement(
2310 format!("{sink_id}-iceberg-walker"),
2311 batches,
2312 sink_id,
2313 sink.from,
2314 key_is_synthetic,
2315 );
2316
2317 let write_handle = {
2318 let persist = Arc::clone(&storage_state.persist_clients);
2319 let shard_meta = sink.to_storage_metadata.clone();
2320 async move {
2321 let client = persist.open(shard_meta.persist_location).await?;
2322 let handle = client
2323 .open_writer(
2324 shard_meta.data_shard,
2325 Arc::new(shard_meta.relation_desc),
2326 Arc::new(UnitSchema),
2327 Diagnostics::from_purpose("sink handle"),
2328 )
2329 .await?;
2330 Ok(handle)
2331 }
2332 };
2333
2334 let write_frontier = Rc::new(RefCell::new(Antichain::from_elem(Timestamp::minimum())));
2335 storage_state
2336 .sink_write_frontiers
2337 .insert(sink_id, Rc::clone(&write_frontier));
2338
2339 let (arrow_schema_with_ids, iceberg_schema) =
2340 match (|| -> Result<(ArrowSchema, Arc<Schema>), anyhow::Error> {
2341 let (arrow_schema_with_ids, iceberg_schema) =
2342 relation_desc_to_iceberg_schema(&sink.from_desc)?;
2343
2344 Ok(if sink.envelope == SinkEnvelope::Append {
2345 let extended_arrow = build_schema_with_append_columns(&arrow_schema_with_ids);
2350 let extended_iceberg = Arc::new(
2351 arrow_schema_to_schema(&extended_arrow)
2352 .context("Failed to build Iceberg schema with append columns")?,
2353 );
2354 (extended_arrow, extended_iceberg)
2355 } else {
2356 (arrow_schema_with_ids, iceberg_schema)
2357 })
2358 })() {
2359 Ok(schemas) => schemas,
2360 Err(err) => {
2361 let error_stream = std::iter::once(HealthStatusMessage {
2362 id: None,
2363 update: HealthStatusUpdate::halting(
2364 format!("{}", err.display_with_causes()),
2365 None,
2366 ),
2367 namespace: StatusNamespace::Iceberg,
2368 })
2369 .to_stream(scope);
2370 return (error_stream, vec![]);
2371 }
2372 };
2373
2374 let metrics = Arc::new(
2375 storage_state
2376 .metrics
2377 .get_iceberg_sink_metrics(sink_id, scope.index()),
2378 );
2379
2380 let statistics = storage_state
2381 .aggregated_statistics
2382 .get_sink(&sink_id)
2383 .expect("statistics initialized")
2384 .clone();
2385
2386 let connection_for_minter = self.clone();
2387 let (minted_input, batch_descriptions, table_ready, mint_status, mint_button) =
2388 mint_batch_descriptions(
2389 format!("{sink_id}-iceberg-mint"),
2390 sink_id,
2391 input,
2392 sink,
2393 connection_for_minter,
2394 storage_state.storage_configuration.clone(),
2395 Arc::clone(&iceberg_schema),
2396 );
2397
2398 let connection_for_writer = self.clone();
2399 let (datafiles, write_status, write_button) = match sink.envelope {
2400 SinkEnvelope::Upsert => write_data_files::<UpsertEnvelopeHandler>(
2401 format!("{sink_id}-write-data-files"),
2402 minted_input,
2403 batch_descriptions.clone(),
2404 table_ready.clone(),
2405 sink.as_of.clone(),
2406 connection_for_writer,
2407 storage_state.storage_configuration.clone(),
2408 Arc::new(arrow_schema_with_ids.clone()),
2409 Arc::clone(&metrics),
2410 statistics.clone(),
2411 ),
2412 SinkEnvelope::Append => write_data_files::<AppendEnvelopeHandler>(
2413 format!("{sink_id}-write-data-files"),
2414 minted_input,
2415 batch_descriptions.clone(),
2416 table_ready.clone(),
2417 sink.as_of.clone(),
2418 connection_for_writer,
2419 storage_state.storage_configuration.clone(),
2420 Arc::new(arrow_schema_with_ids.clone()),
2421 Arc::clone(&metrics),
2422 statistics.clone(),
2423 ),
2424 SinkEnvelope::Debezium => {
2425 unreachable!("Iceberg sink only supports Upsert and Append envelopes")
2426 }
2427 };
2428
2429 let connection_for_committer = self.clone();
2430 let (commit_status, commit_button) = commit_to_iceberg(
2431 format!("{sink_id}-commit-to-iceberg"),
2432 sink_id,
2433 sink.version,
2434 datafiles,
2435 batch_descriptions,
2436 table_ready,
2437 Rc::clone(&write_frontier),
2438 connection_for_committer,
2439 storage_state.storage_configuration.clone(),
2440 write_handle,
2441 Arc::clone(&metrics),
2442 statistics,
2443 );
2444
2445 let running_status = Some(HealthStatusMessage {
2446 id: None,
2447 update: HealthStatusUpdate::running(),
2448 namespace: StatusNamespace::Iceberg,
2449 })
2450 .to_stream(scope);
2451
2452 let statuses =
2453 scope.concatenate([running_status, mint_status, write_status, commit_status]);
2454
2455 (
2456 statuses,
2457 vec![walker_button, mint_button, write_button, commit_button],
2458 )
2459 }
2460}
2461
2462fn walk_sink_arrangement<'scope>(
2469 name: String,
2470 batches: SinkBatchStream<'scope>,
2471 sink_id: GlobalId,
2472 from_id: GlobalId,
2473 key_is_synthetic: bool,
2474) -> (
2475 VecCollection<'scope, Timestamp, (Option<Row>, DiffPair<Row>), Diff>,
2476 PressOnDropButton,
2477) {
2478 let mut builder = OperatorBuilder::new(name, batches.scope());
2479 let (output, stream) = builder.new_output::<CapacityContainerBuilder<Vec<_>>>();
2480 let mut input = builder.new_input_for(batches, Pipeline, &output);
2481
2482 let button = builder.build(move |_caps| async move {
2483 let mut pk_warner = (!key_is_synthetic).then(|| PkViolationWarner::new(sink_id, from_id));
2484
2485 while let Some(event) = input.next().await {
2486 if let Event::Data(cap, mut batches) = event {
2487 for batch in batches.drain(..) {
2488 for_each_diff_pair(&batch, |key, time, diff_pair| {
2489 if let Some(warner) = pk_warner.as_mut() {
2490 warner.observe(key, time);
2491 }
2492 output.give(&cap, ((None, diff_pair), time, Diff::ONE));
2496 });
2497 if let Some(warner) = pk_warner.as_mut() {
2501 warner.flush();
2502 }
2503 }
2504 }
2505 }
2506 });
2507
2508 (stream.as_collection(), button.press_on_drop())
2509}