1use std::cmp::Ordering;
75use std::collections::{BTreeMap, VecDeque};
76use std::{cell::RefCell, rc::Rc, sync::Arc};
77
78use anyhow::{Context, anyhow};
79use arrow::array::Int32Array;
80use arrow::datatypes::{DataType, Field};
81use arrow::{
82 array::RecordBatch, datatypes::Schema as ArrowSchema, datatypes::SchemaRef as ArrowSchemaRef,
83};
84use differential_dataflow::lattice::Lattice;
85use differential_dataflow::{AsCollection, Hashable, VecCollection};
86use futures::StreamExt;
87use iceberg::ErrorKind;
88use iceberg::arrow::{arrow_schema_to_schema, schema_to_arrow_schema};
89use iceberg::spec::{
90 DataFile, FormatVersion, Snapshot, StructType, read_data_files_from_avro,
91 write_data_files_to_avro,
92};
93use iceberg::spec::{Schema, SchemaRef};
94use iceberg::table::Table;
95use iceberg::transaction::{ApplyTransactionAction, Transaction};
96use iceberg::writer::base_writer::data_file_writer::DataFileWriter;
97use iceberg::writer::base_writer::data_file_writer::DataFileWriterBuilder;
98use iceberg::writer::base_writer::equality_delete_writer::{
99 EqualityDeleteFileWriter, EqualityDeleteFileWriterBuilder, EqualityDeleteWriterConfig,
100};
101use iceberg::writer::base_writer::position_delete_writer::PositionDeleteFileWriter;
102use iceberg::writer::base_writer::position_delete_writer::{
103 PositionDeleteFileWriterBuilder, PositionDeleteWriterConfig,
104};
105use iceberg::writer::combined_writer::delta_writer::{DeltaWriter, DeltaWriterBuilder};
106use iceberg::writer::file_writer::ParquetWriterBuilder;
107use iceberg::writer::file_writer::location_generator::{
108 DefaultFileNameGenerator, DefaultLocationGenerator,
109};
110use iceberg::writer::file_writer::rolling_writer::RollingFileWriterBuilder;
111use iceberg::writer::{IcebergWriter, IcebergWriterBuilder};
112use iceberg::{Catalog, NamespaceIdent, TableCreation, TableIdent};
113use itertools::Itertools;
114use mz_arrow_util::builder::ArrowBuilder;
115use mz_interchange::avro::DiffPair;
116use mz_ore::cast::CastFrom;
117use mz_ore::error::ErrorExt;
118use mz_ore::future::InTask;
119use mz_ore::result::ResultExt;
120use mz_ore::retry::{Retry, RetryResult};
121use mz_persist_client::Diagnostics;
122use mz_persist_client::write::WriteHandle;
123use mz_persist_types::codec_impls::UnitSchema;
124use mz_repr::{Diff, GlobalId, Row, Timestamp};
125use mz_storage_types::StorageDiff;
126use mz_storage_types::configuration::StorageConfiguration;
127use mz_storage_types::controller::CollectionMetadata;
128use mz_storage_types::errors::DataflowError;
129use mz_storage_types::sinks::{IcebergSinkConnection, StorageSinkDesc};
130use mz_storage_types::sources::SourceData;
131use mz_timely_util::antichain::AntichainExt;
132use mz_timely_util::builder_async::{Event, OperatorBuilder, PressOnDropButton};
133use parquet::arrow::PARQUET_FIELD_ID_META_KEY;
134use parquet::file::properties::WriterProperties;
135use serde::{Deserialize, Serialize};
136use timely::PartialOrder;
137use timely::container::CapacityContainerBuilder;
138use timely::dataflow::channels::pact::{Exchange, Pipeline};
139use timely::dataflow::operators::{Broadcast, CapabilitySet, Concatenate, Map, ToStream};
140use timely::dataflow::{Scope, Stream};
141use timely::progress::{Antichain, Timestamp as _};
142
143use crate::healthcheck::{HealthStatusMessage, HealthStatusUpdate, StatusNamespace};
144use crate::render::sinks::SinkRender;
145use crate::storage_state::StorageState;
146
147const DEFAULT_ARRAY_BUILDER_ITEM_CAPACITY: usize = 1024;
150const DEFAULT_ARRAY_BUILDER_DATA_CAPACITY: usize = 1024;
154
155const PARQUET_FILE_PREFIX: &str = "mz_data";
157const INITIAL_DESCRIPTIONS_TO_MINT: u64 = 3;
160
161type DeltaWriterType = DeltaWriter<
162 DataFileWriter<ParquetWriterBuilder, DefaultLocationGenerator, DefaultFileNameGenerator>,
163 PositionDeleteFileWriter<
164 ParquetWriterBuilder,
165 DefaultLocationGenerator,
166 DefaultFileNameGenerator,
167 >,
168 EqualityDeleteFileWriter<
169 ParquetWriterBuilder,
170 DefaultLocationGenerator,
171 DefaultFileNameGenerator,
172 >,
173>;
174
175fn add_field_ids_to_arrow_schema(schema: ArrowSchema) -> ArrowSchema {
179 let fields: Vec<Field> = schema
180 .fields()
181 .iter()
182 .enumerate()
183 .map(|(i, field)| {
184 let mut metadata = field.metadata().clone();
185 metadata.insert(PARQUET_FIELD_ID_META_KEY.to_string(), (i + 1).to_string());
186 Field::new(field.name(), field.data_type().clone(), field.is_nullable())
187 .with_metadata(metadata)
188 })
189 .collect();
190
191 ArrowSchema::new(fields).with_metadata(schema.metadata().clone())
192}
193
194fn merge_materialize_metadata_into_iceberg_schema(
198 materialize_arrow_schema: &ArrowSchema,
199 iceberg_schema: &Schema,
200) -> anyhow::Result<ArrowSchema> {
201 let iceberg_arrow_schema = schema_to_arrow_schema(iceberg_schema)
203 .context("Failed to convert Iceberg schema to Arrow schema")?;
204
205 let fields: Vec<Field> = iceberg_arrow_schema
207 .fields()
208 .iter()
209 .map(|iceberg_field| {
210 let mz_field = materialize_arrow_schema
212 .field_with_name(iceberg_field.name())
213 .with_context(|| {
214 format!(
215 "Field '{}' not found in Materialize schema",
216 iceberg_field.name()
217 )
218 })?;
219
220 let mut metadata = iceberg_field.metadata().clone();
222
223 if let Some(extension_name) = mz_field.metadata().get("ARROW:extension:name") {
225 metadata.insert("ARROW:extension:name".to_string(), extension_name.clone());
226 }
227
228 Ok(Field::new(
230 iceberg_field.name(),
231 iceberg_field.data_type().clone(),
232 iceberg_field.is_nullable(),
233 )
234 .with_metadata(metadata))
235 })
236 .collect::<anyhow::Result<Vec<_>>>()?;
237
238 Ok(ArrowSchema::new(fields).with_metadata(iceberg_arrow_schema.metadata().clone()))
239}
240
241async fn reload_table(
242 catalog: &dyn Catalog,
243 namespace: String,
244 table_name: String,
245 current_table: Table,
246) -> anyhow::Result<Table> {
247 let namespace_ident = NamespaceIdent::new(namespace.clone());
248 let table_ident = TableIdent::new(namespace_ident, table_name.clone());
249 let current_schema = current_table.metadata().current_schema_id();
250 let current_partition_spec = current_table.metadata().default_partition_spec_id();
251
252 match catalog.load_table(&table_ident).await {
253 Ok(table) => {
254 let reloaded_schema = table.metadata().current_schema_id();
255 let reloaded_partition_spec = table.metadata().default_partition_spec_id();
256 if reloaded_schema != current_schema {
257 return Err(anyhow::anyhow!(
258 "Iceberg table '{}' schema changed during operation but schema evolution isn't supported, expected schema ID {}, got {}",
259 table_name,
260 current_schema,
261 reloaded_schema
262 ));
263 }
264
265 if reloaded_partition_spec != current_partition_spec {
266 return Err(anyhow::anyhow!(
267 "Iceberg table '{}' partition spec changed during operation but partition spec evolution isn't supported, expected partition spec ID {}, got {}",
268 table_name,
269 current_partition_spec,
270 reloaded_partition_spec
271 ));
272 }
273
274 Ok(table)
275 }
276 Err(err) => Err(err).context("Failed to reload Iceberg table"),
277 }
278}
279
280async fn load_or_create_table(
282 catalog: &dyn Catalog,
283 namespace: String,
284 table_name: String,
285 schema: &Schema,
286) -> anyhow::Result<iceberg::table::Table> {
287 let namespace_ident = NamespaceIdent::new(namespace.clone());
288 let table_ident = TableIdent::new(namespace_ident.clone(), table_name.clone());
289
290 match catalog.load_table(&table_ident).await {
292 Ok(table) => {
293 Ok(table)
296 }
297 Err(err) => {
298 if matches!(err.kind(), ErrorKind::TableNotFound { .. })
299 || err
300 .message()
301 .contains("Tried to load a table that does not exist")
302 {
303 let table_creation = TableCreation::builder()
307 .name(table_name.clone())
308 .schema(schema.clone())
309 .build();
313
314 catalog
315 .create_table(&namespace_ident, table_creation)
316 .await
317 .with_context(|| {
318 format!(
319 "Failed to create Iceberg table '{}' in namespace '{}'",
320 table_name, namespace
321 )
322 })
323 } else {
324 Err(err).context("Failed to load Iceberg table")
326 }
327 }
328 }
329}
330
331fn retrieve_upper_from_snapshots(
336 snapshots: &mut [Arc<Snapshot>],
337) -> anyhow::Result<Option<(Antichain<Timestamp>, u64)>> {
338 snapshots.sort_by(|a, b| Ord::cmp(&b.sequence_number(), &a.sequence_number()));
339
340 for snapshot in snapshots {
341 let props = &snapshot.summary().additional_properties;
342 if let (Some(frontier_json), Some(sink_version_str)) =
343 (props.get("mz-frontier"), props.get("mz-sink-version"))
344 {
345 let frontier: Vec<Timestamp> = serde_json::from_str(frontier_json)
346 .context("Failed to deserialize frontier from snapshot properties")?;
347 let frontier = Antichain::from_iter(frontier);
348
349 let sink_version = sink_version_str
350 .parse::<u64>()
351 .context("Failed to parse mz-sink-version from snapshot properties")?;
352
353 return Ok(Some((frontier, sink_version)));
354 }
355 if snapshot.summary().operation.as_str() != "replace" {
356 anyhow::bail!(
361 "Iceberg table is in an inconsistent state: snapshot {} has operation '{}' but is missing 'mz-frontier' property. Schema or partition spec evolution is not supported.",
362 snapshot.snapshot_id(),
363 snapshot.summary().operation.as_str(),
364 );
365 }
366 }
367
368 Ok(None)
369}
370
371fn relation_desc_to_iceberg_schema(
375 desc: &mz_repr::RelationDesc,
376) -> anyhow::Result<(ArrowSchema, SchemaRef)> {
377 let arrow_schema = mz_arrow_util::builder::desc_to_schema(desc)
378 .context("Failed to convert RelationDesc to Arrow schema")?;
379
380 let arrow_schema_with_ids = add_field_ids_to_arrow_schema(arrow_schema);
381
382 let iceberg_schema = arrow_schema_to_schema(&arrow_schema_with_ids)
383 .context("Failed to convert Arrow schema to Iceberg schema")?;
384
385 Ok((arrow_schema_with_ids, Arc::new(iceberg_schema)))
386}
387
388fn build_schema_with_op_column(schema: &ArrowSchema) -> ArrowSchema {
390 let mut fields: Vec<Field> = schema.fields().iter().map(|f| f.as_ref().clone()).collect();
391 fields.push(Field::new("__op", DataType::Int32, false));
392 ArrowSchema::new(fields)
393}
394
395fn row_to_recordbatch(
399 row: DiffPair<Row>,
400 schema: ArrowSchemaRef,
401 schema_with_op: ArrowSchemaRef,
402) -> anyhow::Result<RecordBatch> {
403 let mut builder = ArrowBuilder::new_with_schema(
404 Arc::clone(&schema),
405 DEFAULT_ARRAY_BUILDER_ITEM_CAPACITY,
406 DEFAULT_ARRAY_BUILDER_DATA_CAPACITY,
407 )
408 .context("Failed to create builder")?;
409
410 let mut op_values = Vec::new();
411
412 if let Some(before) = row.before {
413 builder
414 .add_row(&before)
415 .context("Failed to add delete row to builder")?;
416 op_values.push(-1i32); }
418 if let Some(after) = row.after {
419 builder
420 .add_row(&after)
421 .context("Failed to add insert row to builder")?;
422 op_values.push(1i32); }
424
425 let batch = builder
426 .to_record_batch()
427 .context("Failed to create record batch")?;
428
429 let mut columns = Vec::with_capacity(batch.columns().len() + 1);
430 columns.extend_from_slice(batch.columns());
431 let op_column = Arc::new(Int32Array::from(op_values));
432 columns.push(op_column);
433
434 let batch_with_op = RecordBatch::try_new(schema_with_op, columns)
435 .context("Failed to create batch with op column")?;
436 Ok(batch_with_op)
437}
438
439fn mint_batch_descriptions<G, D>(
444 name: String,
445 sink_id: GlobalId,
446 input: &VecCollection<G, D, Diff>,
447 sink: &StorageSinkDesc<CollectionMetadata, Timestamp>,
448 connection: IcebergSinkConnection,
449 storage_configuration: StorageConfiguration,
450 initial_schema: SchemaRef,
451) -> (
452 VecCollection<G, D, Diff>,
453 Stream<G, (Antichain<Timestamp>, Antichain<Timestamp>)>,
454 Stream<G, HealthStatusMessage>,
455 PressOnDropButton,
456)
457where
458 G: Scope<Timestamp = Timestamp>,
459 D: Clone + 'static,
460{
461 let scope = input.scope();
462 let name_for_error = name.clone();
463 let mut builder = OperatorBuilder::new(name, scope.clone());
464 let sink_version = sink.version;
465
466 let hashed_id = sink_id.hashed();
467 let is_active_worker = usize::cast_from(hashed_id) % scope.peers() == scope.index();
468 let (output, output_stream) = builder.new_output();
469 let (batch_desc_output, batch_desc_stream) =
470 builder.new_output::<CapacityContainerBuilder<_>>();
471 let mut input =
472 builder.new_input_for_many(&input.inner, Pipeline, [&output, &batch_desc_output]);
473
474 let as_of = sink.as_of.clone();
475 let commit_interval = sink
476 .commit_interval
477 .expect("the planner should have enforced this")
478 .clone();
479
480 let (button, errors): (_, Stream<G, Rc<anyhow::Error>>) = builder.build_fallible(move |caps| {
481 Box::pin(async move {
482 let [data_capset, capset]: &mut [_; 2] = caps.try_into().unwrap();
483 *data_capset = CapabilitySet::new();
484
485 if !is_active_worker {
486 *capset = CapabilitySet::new();
487 *data_capset = CapabilitySet::new();
488 while let Some(event) = input.next().await {
489 match event {
490 Event::Data([output_cap, _], mut data) => {
491 output.give_container(&output_cap, &mut data);
492 }
493 Event::Progress(_) => {}
494 }
495 }
496 return Ok(());
497 }
498
499 let catalog = connection
500 .catalog_connection
501 .connect(&storage_configuration, InTask::Yes)
502 .await
503 .context("Failed to connect to iceberg catalog")?;
504
505 let table = load_or_create_table(
506 catalog.as_ref(),
507 connection.namespace.clone(),
508 connection.table.clone(),
509 initial_schema.as_ref(),
510 )
511 .await?;
512
513 let mut snapshots: Vec<_> = table.metadata().snapshots().cloned().collect();
514 let resume = retrieve_upper_from_snapshots(&mut snapshots)?;
515 let (resume_upper, resume_version) = match resume {
516 Some((f, v)) => (f, v),
517 None => (Antichain::from_elem(Timestamp::minimum()), 0),
518 };
519
520 let overcompacted =
522 *resume_upper != [Timestamp::minimum()] &&
524 PartialOrder::less_than(&resume_upper, &as_of);
526
527 if overcompacted {
528 let err = format!(
529 "{name_for_error}: input compacted past resume upper: as_of {}, resume_upper: {}",
530 as_of.pretty(),
531 resume_upper.pretty()
532 );
533 return Err(anyhow::anyhow!("{err}"));
537 };
538
539 if resume_version > sink_version {
540 anyhow::bail!("Fenced off by newer sink version: resume_version {}, sink_version {}", resume_version, sink_version);
541 }
542
543 let mut initialized = false;
544 let mut observed_frontier;
545 let mut minted_batches = VecDeque::new();
550 loop {
551 if let Some(event) = input.next().await {
552 match event {
553 Event::Data([output_cap, _], mut data) => {
554 output.give_container(&output_cap, &mut data);
555 continue;
556 }
557 Event::Progress(frontier) => {
558 observed_frontier = frontier;
559 }
560 }
561 } else {
562 return Ok(());
563 }
564
565 if !initialized {
566 if PartialOrder::less_equal(&observed_frontier, &resume_upper) || PartialOrder::less_equal(&observed_frontier, &as_of) {
569 continue;
570 }
571
572 let mut batch_descriptions = vec![];
573 let mut current_upper = observed_frontier.clone();
574 let current_upper_ts = current_upper.as_option().unwrap().clone();
575
576 if PartialOrder::less_than(&resume_upper, ¤t_upper) {
578 let batch_description = (resume_upper.clone(), current_upper.clone());
579 batch_descriptions.push(batch_description);
580 }
581
582 for i in 1..INITIAL_DESCRIPTIONS_TO_MINT + 1 {
584 let duration_millis = commit_interval.as_millis()
585 .checked_mul(u128::from(i))
586 .expect("commit interval multiplication overflow");
587 let duration_ts = Timestamp::new(u64::try_from(duration_millis)
588 .expect("commit interval too large for u64"));
589 let desired_batch_upper = Antichain::from_elem(current_upper_ts.step_forward_by(&duration_ts));
590
591 let batch_description = (current_upper.clone(), desired_batch_upper);
592 current_upper = batch_description.1.clone();
593 batch_descriptions.push(batch_description);
594 }
595
596 minted_batches.extend(batch_descriptions.clone());
597
598 for desc in batch_descriptions {
599 batch_desc_output.give(&capset[0], desc);
600 }
601
602 capset.downgrade(current_upper);
603
604 initialized = true;
605 } else {
606 while let Some(oldest_desc) = minted_batches.front() {
609 let oldest_upper = &oldest_desc.1;
610 if !PartialOrder::less_equal(oldest_upper, &observed_frontier) {
611 break;
612 }
613
614 let newest_upper = minted_batches.back().unwrap().1.clone();
615 let new_lower = newest_upper.clone();
616 let duration_ts = Timestamp::new(commit_interval.as_millis()
617 .try_into()
618 .expect("commit interval too large for u64"));
619 let new_upper = Antichain::from_elem(newest_upper
620 .as_option()
621 .unwrap()
622 .step_forward_by(&duration_ts));
623
624 let new_batch_description = (new_lower.clone(), new_upper.clone());
625 minted_batches.pop_front();
626 minted_batches.push_back(new_batch_description.clone());
627
628 batch_desc_output.give(&capset[0], new_batch_description);
629
630 capset.downgrade(new_upper);
631 }
632 }
633 }
634 })
635 });
636
637 let statuses = errors.map(|error| HealthStatusMessage {
638 id: None,
639 update: HealthStatusUpdate::halting(format!("{}", error.display_with_causes()), None),
640 namespace: StatusNamespace::Iceberg,
641 });
642 (
643 output_stream.as_collection(),
644 batch_desc_stream,
645 statuses,
646 button.press_on_drop(),
647 )
648}
649
650#[derive(Clone, Debug, Serialize, Deserialize)]
651#[serde(try_from = "AvroDataFile", into = "AvroDataFile")]
652struct SerializableDataFile {
653 pub data_file: DataFile,
654 pub schema: Schema,
655}
656
657#[derive(Clone, Debug, Serialize, Deserialize)]
665struct AvroDataFile {
666 pub data_file: Vec<u8>,
667 pub schema: Schema,
668}
669
670impl From<SerializableDataFile> for AvroDataFile {
671 fn from(value: SerializableDataFile) -> Self {
672 let mut data_file = Vec::new();
673 write_data_files_to_avro(
674 &mut data_file,
675 [value.data_file],
676 &StructType::new(vec![]),
677 FormatVersion::V2,
678 )
679 .expect("serialization into buffer");
680 AvroDataFile {
681 data_file,
682 schema: value.schema,
683 }
684 }
685}
686
687impl TryFrom<AvroDataFile> for SerializableDataFile {
688 type Error = String;
689
690 fn try_from(value: AvroDataFile) -> Result<Self, Self::Error> {
691 let data_files = read_data_files_from_avro(
692 &mut &*value.data_file,
693 &value.schema,
694 0,
695 &StructType::new(vec![]),
696 FormatVersion::V2,
697 )
698 .map_err_to_string_with_causes()?;
699 let Some(data_file) = data_files.into_iter().next() else {
700 return Err("No DataFile found in Avro data".into());
701 };
702 Ok(SerializableDataFile {
703 data_file,
704 schema: value.schema,
705 })
706 }
707}
708
709#[derive(Clone, Debug, Serialize, Deserialize)]
711struct BoundedDataFile {
712 pub data_file: SerializableDataFile,
713 pub batch_desc: (Antichain<Timestamp>, Antichain<Timestamp>),
714}
715
716impl BoundedDataFile {
717 pub fn new(
718 file: DataFile,
719 schema: Schema,
720 batch_desc: (Antichain<Timestamp>, Antichain<Timestamp>),
721 ) -> Self {
722 Self {
723 data_file: SerializableDataFile {
724 data_file: file,
725 schema,
726 },
727 batch_desc,
728 }
729 }
730
731 pub fn batch_desc(&self) -> &(Antichain<Timestamp>, Antichain<Timestamp>) {
732 &self.batch_desc
733 }
734
735 pub fn data_file(&self) -> &DataFile {
736 &self.data_file.data_file
737 }
738
739 pub fn into_data_file(self) -> DataFile {
740 self.data_file.data_file
741 }
742}
743
744#[derive(Clone, Debug, Default)]
746struct BoundedDataFileSet {
747 pub data_files: Vec<BoundedDataFile>,
748}
749
750fn write_data_files<G>(
754 name: String,
755 input: VecCollection<G, (Option<Row>, DiffPair<Row>), Diff>,
756 batch_desc_input: &Stream<G, (Antichain<Timestamp>, Antichain<Timestamp>)>,
757 connection: IcebergSinkConnection,
758 storage_configuration: StorageConfiguration,
759 materialize_arrow_schema: Arc<ArrowSchema>,
760) -> (
761 Stream<G, BoundedDataFile>,
762 Stream<G, HealthStatusMessage>,
763 PressOnDropButton,
764)
765where
766 G: Scope<Timestamp = Timestamp>,
767{
768 let scope = input.scope();
769 let mut builder = OperatorBuilder::new(name, scope.clone());
770
771 let (output, output_stream) = builder.new_output::<CapacityContainerBuilder<_>>();
772
773 let mut batch_desc_input =
774 builder.new_input_for(&batch_desc_input.broadcast(), Pipeline, &output);
775 let mut input = builder.new_disconnected_input(&input.inner, Pipeline);
776
777 let (button, errors) = builder.build_fallible(|caps| {
778 Box::pin(async move {
779 let [capset]: &mut [_; 1] = caps.try_into().unwrap();
780 let catalog = connection
781 .catalog_connection
782 .connect(&storage_configuration, InTask::Yes)
783 .await
784 .context("Failed to connect to iceberg catalog")?;
785
786 let namespace_ident = NamespaceIdent::new(connection.namespace.clone());
787 let table_ident = TableIdent::new(namespace_ident, connection.table.clone());
788 let table = catalog
789 .load_table(&table_ident)
790 .await
791 .context("Failed to load Iceberg table")?;
792
793 let table_metadata = table.metadata().clone();
794 let current_schema = Arc::clone(table_metadata.current_schema());
795
796 let arrow_schema = Arc::new(
797 merge_materialize_metadata_into_iceberg_schema(
798 materialize_arrow_schema.as_ref(),
799 current_schema.as_ref(),
800 )
801 .context("Failed to merge Materialize metadata into Iceberg schema")?,
802 );
803
804 let schema_with_op = Arc::new(build_schema_with_op_column(&arrow_schema));
805
806 let location = table_metadata.location();
810 let corrected_location = match location.rsplit_once("/metadata/") {
811 Some((a, b)) if b.ends_with(".metadata.json") => a,
812 _ => location,
813 };
814
815 let data_location = format!("{}/data", corrected_location);
816 let location_generator = DefaultLocationGenerator::with_data_location(data_location);
817
818 let unique_suffix = format!("-{}", uuid::Uuid::new_v4());
820 let file_name_generator = DefaultFileNameGenerator::new(
821 PARQUET_FILE_PREFIX.to_string(),
822 Some(unique_suffix),
823 iceberg::spec::DataFileFormat::Parquet,
824 );
825
826 let file_io = table.file_io().clone();
827
828 let Some((_, equality_indices)) = connection.key_desc_and_indices else {
829 return Err(anyhow::anyhow!(
830 "Iceberg sink requires key columns for equality deletes"
831 ));
832 };
833
834 let equality_ids: Vec<i32> = equality_indices
835 .iter()
836 .map(|u| i32::try_from(*u).map(|v| v + 1))
837 .collect::<Result<Vec<i32>, _>>()
838 .context("Failed to convert equality index to i32 (index too large)")?;
839
840 let writer_properties = WriterProperties::new();
841
842 let create_delta_writer = || async {
843 let data_parquet_writer = ParquetWriterBuilder::new(
844 writer_properties.clone(),
845 Arc::clone(¤t_schema),
846 );
847 let data_rolling_writer = RollingFileWriterBuilder::new_with_default_file_size(
848 data_parquet_writer,
849 Arc::clone(¤t_schema),
850 file_io.clone(),
851 location_generator.clone(),
852 file_name_generator.clone(),
853 );
854 let data_writer_builder = DataFileWriterBuilder::new(data_rolling_writer);
855
856 let pos_arrow_schema = PositionDeleteWriterConfig::arrow_schema();
857 let pos_schema =
858 Arc::new(arrow_schema_to_schema(&pos_arrow_schema).context(
859 "Failed to convert position delete Arrow schema to Iceberg schema",
860 )?);
861 let pos_config = PositionDeleteWriterConfig::new(None, 0, None);
862 let pos_parquet_writer =
863 ParquetWriterBuilder::new(writer_properties.clone(), pos_schema);
864 let pos_rolling_writer = RollingFileWriterBuilder::new_with_default_file_size(
865 pos_parquet_writer,
866 Arc::clone(¤t_schema),
867 file_io.clone(),
868 location_generator.clone(),
869 file_name_generator.clone(),
870 );
871 let pos_delete_writer_builder =
872 PositionDeleteFileWriterBuilder::new(pos_rolling_writer, pos_config);
873
874 let eq_config = EqualityDeleteWriterConfig::new(
875 equality_ids.clone(),
876 Arc::clone(¤t_schema),
877 )
878 .context("Failed to create EqualityDeleteWriterConfig")?;
879
880 let eq_schema = Arc::new(
881 arrow_schema_to_schema(eq_config.projected_arrow_schema_ref()).context(
882 "Failed to convert equality delete Arrow schema to Iceberg schema",
883 )?,
884 );
885
886 let eq_parquet_writer =
887 ParquetWriterBuilder::new(writer_properties.clone(), eq_schema);
888 let eq_rolling_writer = RollingFileWriterBuilder::new_with_default_file_size(
889 eq_parquet_writer,
890 Arc::clone(¤t_schema),
891 file_io.clone(),
892 location_generator.clone(),
893 file_name_generator.clone(),
894 );
895 let eq_delete_writer_builder =
896 EqualityDeleteFileWriterBuilder::new(eq_rolling_writer, eq_config);
897
898 DeltaWriterBuilder::new(
899 data_writer_builder,
900 pos_delete_writer_builder,
901 eq_delete_writer_builder,
902 equality_ids.clone(),
903 )
904 .build(None)
905 .await
906 .context("Failed to create DeltaWriter")
907 };
908
909 let mut stashed_rows: BTreeMap<Timestamp, Vec<(Option<Row>, DiffPair<Row>)>> =
912 BTreeMap::new();
913
914 #[allow(clippy::disallowed_types)]
919 let mut in_flight_batches: std::collections::HashMap<
920 (Antichain<Timestamp>, Antichain<Timestamp>),
921 DeltaWriterType,
922 > = std::collections::HashMap::new();
923
924 let mut batch_description_frontier = Antichain::from_elem(Timestamp::minimum());
925 let mut processed_batch_description_frontier =
926 Antichain::from_elem(Timestamp::minimum());
927 let mut input_frontier = Antichain::from_elem(Timestamp::minimum());
928 let mut processed_input_frontier = Antichain::from_elem(Timestamp::minimum());
929
930 while !(batch_description_frontier.is_empty() && input_frontier.is_empty()) {
931 tokio::select! {
932 _ = batch_desc_input.ready() => {},
933 _ = input.ready() => {}
934 }
935
936 while let Some(event) = batch_desc_input.next_sync() {
937 match event {
938 Event::Data(_cap, data) => {
939 for batch_desc in data {
940 let (lower, upper) = &batch_desc;
941 let mut delta_writer = create_delta_writer().await?;
942 let row_ts_keys: Vec<_> = stashed_rows.keys().cloned().collect();
944 for row_ts in row_ts_keys {
945 let ts = Antichain::from_elem(row_ts.clone());
946 if PartialOrder::less_equal(lower, &ts)
947 && PartialOrder::less_than(&ts, upper)
948 {
949 if let Some(rows) = stashed_rows.remove(&row_ts) {
950 for (_row, diff_pair) in rows {
951 let record_batch = row_to_recordbatch(
952 diff_pair.clone(),
953 Arc::clone(&arrow_schema),
954 Arc::clone(&schema_with_op),
955 )
956 .context("failed to convert row to recordbatch")?;
957 delta_writer.write(record_batch).await.context(
958 "Failed to write row to DeltaWriter",
959 )?;
960 }
961 }
962 }
963 }
964 let prev =
965 in_flight_batches.insert(batch_desc.clone(), delta_writer);
966 if prev.is_some() {
967 anyhow::bail!(
968 "Duplicate batch description received for description {:?}",
969 batch_desc
970 );
971 }
972 }
973 }
974 Event::Progress(frontier) => {
975 batch_description_frontier = frontier;
976 }
977 }
978 }
979
980 let ready_events = std::iter::from_fn(|| input.next_sync()).collect_vec();
981 for event in ready_events {
982 match event {
983 Event::Data(_cap, data) => {
984 for ((row, diff_pair), ts, _diff) in data {
985 let row_ts = ts.clone();
986 let ts_antichain = Antichain::from_elem(row_ts.clone());
987 let mut written = false;
988 for (batch_desc, delta_writer) in in_flight_batches.iter_mut() {
990 let (lower, upper) = batch_desc;
991 if PartialOrder::less_equal(lower, &ts_antichain)
992 && PartialOrder::less_than(&ts_antichain, upper)
993 {
994 let record_batch = row_to_recordbatch(
995 diff_pair.clone(),
996 Arc::clone(&arrow_schema),
997 Arc::clone(&schema_with_op),
998 )
999 .context("failed to convert row to recordbatch")?;
1000 delta_writer
1001 .write(record_batch)
1002 .await
1003 .context("Failed to write row to DeltaWriter")?;
1004 written = true;
1005 break;
1006 }
1007 }
1008 if !written {
1009 let entry = stashed_rows.entry(row_ts).or_default();
1011 entry.push((row, diff_pair));
1012 }
1013 }
1014 }
1015 Event::Progress(frontier) => {
1016 input_frontier = frontier;
1017 }
1018 }
1019 }
1020
1021 if PartialOrder::less_equal(
1023 &processed_batch_description_frontier,
1024 &batch_description_frontier,
1025 ) || PartialOrder::less_than(&processed_input_frontier, &input_frontier)
1026 {
1027 let ready_batches: Vec<_> = in_flight_batches
1029 .extract_if(|(lower, upper), _| {
1030 PartialOrder::less_than(lower, &batch_description_frontier)
1031 && PartialOrder::less_than(upper, &input_frontier)
1032 })
1033 .collect();
1034
1035 if !ready_batches.is_empty() {
1036 let mut max_upper = Antichain::from_elem(Timestamp::minimum());
1037 for (desc, mut delta_writer) in ready_batches {
1038 let data_files = delta_writer
1039 .close()
1040 .await
1041 .context("Failed to close DeltaWriter")?;
1042 for data_file in data_files {
1043 let file = BoundedDataFile::new(
1044 data_file,
1045 current_schema.as_ref().clone(),
1046 desc.clone(),
1047 );
1048 output.give(&capset[0], file);
1049 }
1050
1051 max_upper = max_upper.join(&desc.1);
1052 }
1053
1054 capset.downgrade(max_upper);
1055 }
1056 processed_batch_description_frontier.clone_from(&batch_description_frontier);
1057 processed_input_frontier.clone_from(&input_frontier);
1058 }
1059 }
1060 Ok(())
1061 })
1062 });
1063
1064 let statuses = errors.map(|error| HealthStatusMessage {
1065 id: None,
1066 update: HealthStatusUpdate::halting(format!("{}", error.display_with_causes()), None),
1067 namespace: StatusNamespace::Iceberg,
1068 });
1069 (output_stream, statuses, button.press_on_drop())
1070}
1071
1072fn commit_to_iceberg<G>(
1076 name: String,
1077 sink_id: GlobalId,
1078 sink_version: u64,
1079 batch_input: &Stream<G, BoundedDataFile>,
1080 batch_desc_input: &Stream<G, (Antichain<Timestamp>, Antichain<Timestamp>)>,
1081 write_frontier: Rc<RefCell<Antichain<Timestamp>>>,
1082 connection: IcebergSinkConnection,
1083 storage_configuration: StorageConfiguration,
1084 write_handle: impl Future<
1085 Output = anyhow::Result<WriteHandle<SourceData, (), Timestamp, StorageDiff>>,
1086 > + 'static,
1087) -> (Stream<G, HealthStatusMessage>, PressOnDropButton)
1088where
1089 G: Scope<Timestamp = Timestamp>,
1090{
1091 let scope = batch_input.scope();
1092 let mut builder = OperatorBuilder::new(name, scope.clone());
1093
1094 let hashed_id = sink_id.hashed();
1095 let is_active_worker = usize::cast_from(hashed_id) % scope.peers() == scope.index();
1096
1097 let mut input = builder.new_disconnected_input(batch_input, Exchange::new(move |_| hashed_id));
1098 let mut batch_desc_input =
1099 builder.new_disconnected_input(batch_desc_input, Exchange::new(move |_| hashed_id));
1100
1101 let (button, errors) = builder.build_fallible(move |_caps| {
1102 Box::pin(async move {
1103 if !is_active_worker {
1104 write_frontier.borrow_mut().clear();
1105 return Ok(());
1106 }
1107
1108 let catalog = connection
1109 .catalog_connection
1110 .connect(&storage_configuration, InTask::Yes)
1111 .await
1112 .context("Failed to connect to iceberg catalog")?;
1113
1114 let mut write_handle = write_handle.await?;
1115
1116 let namespace_ident = NamespaceIdent::new(connection.namespace.clone());
1117 let table_ident = TableIdent::new(namespace_ident, connection.table.clone());
1118 let mut table = catalog
1119 .load_table(&table_ident)
1120 .await
1121 .context("Failed to load Iceberg table")?;
1122
1123 #[allow(clippy::disallowed_types)]
1124 let mut batch_descriptions: std::collections::HashMap<
1125 (Antichain<Timestamp>, Antichain<Timestamp>),
1126 BoundedDataFileSet,
1127 > = std::collections::HashMap::new();
1128
1129
1130 let mut batch_description_frontier = Antichain::from_elem(Timestamp::minimum());
1131 let mut input_frontier = Antichain::from_elem(Timestamp::minimum());
1132
1133 while !(batch_description_frontier.is_empty() && input_frontier.is_empty()) {
1134 tokio::select! {
1135 _ = batch_desc_input.ready() => {},
1136 _ = input.ready() => {}
1137 }
1138
1139 while let Some(event) = batch_desc_input.next_sync() {
1140 match event {
1141 Event::Data(_cap, data) => {
1142 for batch_desc in data {
1143 let prev = batch_descriptions.insert(batch_desc, BoundedDataFileSet { data_files: vec![] });
1144 if let Some(prev) = prev {
1145 anyhow::bail!(
1146 "Duplicate batch description received in commit operator: {:?}",
1147 prev
1148 );
1149 }
1150 }
1151 }
1152 Event::Progress(frontier) => {
1153 batch_description_frontier = frontier;
1154 }
1155 }
1156 }
1157
1158 let ready_events = std::iter::from_fn(|| input.next_sync()).collect_vec();
1159 for event in ready_events {
1160 match event {
1161 Event::Data(_cap, data) => {
1162 for bounded_data_file in data {
1163 let entry = batch_descriptions.entry(bounded_data_file.batch_desc().clone()).or_default();
1164 entry.data_files.push(bounded_data_file);
1165 }
1166 }
1167 Event::Progress(frontier) => {
1168 input_frontier = frontier;
1169 }
1170 }
1171 }
1172
1173 let mut done_batches: Vec<_> = batch_descriptions
1174 .keys()
1175 .filter(|(lower, _upper)| {
1176 PartialOrder::less_than(lower, &input_frontier)
1177 })
1178 .cloned()
1179 .collect();
1180
1181 done_batches.sort_by(|a, b| {
1183 if PartialOrder::less_than(a, b) {
1184 Ordering::Less
1185 } else if PartialOrder::less_than(b, a) {
1186 Ordering::Greater
1187 } else {
1188 Ordering::Equal
1189 }
1190 });
1191
1192 for batch in done_batches {
1193 let file_set = batch_descriptions.remove(&batch).unwrap();
1194
1195 let mut data_files = vec![];
1196 let mut delete_files = vec![];
1197 for file in file_set.data_files {
1198 match file.data_file().content_type() {
1199 iceberg::spec::DataContentType::Data => {
1200 data_files.push(file.into_data_file());
1201 }
1202 iceberg::spec::DataContentType::PositionDeletes |
1203 iceberg::spec::DataContentType::EqualityDeletes => {
1204 delete_files.push(file.into_data_file());
1205 }
1206 }
1207 }
1208
1209 let frontier = batch.1.clone();
1210 let tx = Transaction::new(&table);
1211
1212 let frontier_json = serde_json::to_string(&frontier.elements())
1213 .context("Failed to serialize frontier to JSON")?;
1214
1215 let mut action = tx.row_delta().set_snapshot_properties(
1217 vec![
1218 ("mz-sink-id".to_string(), sink_id.to_string()),
1219 ("mz-frontier".to_string(), frontier_json),
1220 ("mz-sink-version".to_string(), sink_version.to_string()),
1221 ].into_iter().collect()
1222 );
1223
1224 if !data_files.is_empty() || !delete_files.is_empty() {
1225 action = action.add_data_files(data_files).add_delete_files(delete_files);
1226 }
1227
1228 let tx = action.apply(tx).context(
1229 "Failed to apply data file addition to iceberg table transaction",
1230 )?;
1231
1232 table = Retry::default().max_tries(5).retry_async(|_| async {
1233 let new_table = tx.clone().commit(catalog.as_ref()).await;
1234 match new_table {
1235 Err(e) if matches!(e.kind(), ErrorKind::CatalogCommitConflicts) => {
1236 let table = reload_table(
1237 catalog.as_ref(),
1238 connection.namespace.clone(),
1239 connection.table.clone(),
1240 table.clone(),
1241 ).await;
1242 let table = match table {
1243 Ok(table) => table,
1244 Err(e) => return RetryResult::RetryableErr(anyhow!(e)),
1245 };
1246
1247 let mut snapshots: Vec<_> = table.metadata().snapshots().cloned().collect();
1248 let last = retrieve_upper_from_snapshots(&mut snapshots);
1249 let last = match last {
1250 Ok(val) => val,
1251 Err(e) => return RetryResult::RetryableErr(anyhow!(e)),
1252 };
1253
1254 if let Some((last_frontier, _last_version)) = last {
1256 if PartialOrder::less_equal(&frontier, &last_frontier) {
1257 return RetryResult::FatalErr(anyhow!(
1258 "Iceberg table '{}' has been modified by another writer. Current frontier: {:?}, last frontier: {:?}.",
1259 connection.table,
1260 frontier,
1261 last_frontier,
1262 ));
1263 }
1264 }
1265
1266 RetryResult::Ok(table)
1267 }
1268 Err(e) => RetryResult::RetryableErr(anyhow!(e)),
1269 Ok(table) => RetryResult::Ok(table)
1270 }
1271 }).await.context("failed to commit to iceberg")?;
1272
1273 let mut expect_upper = write_handle.shared_upper();
1274 loop {
1275 if PartialOrder::less_equal(&frontier, &expect_upper) {
1276 break;
1278 }
1279
1280 const EMPTY: &[((SourceData, ()), Timestamp, StorageDiff)] = &[];
1281 match write_handle
1282 .compare_and_append(EMPTY, expect_upper, frontier.clone())
1283 .await
1284 .expect("valid usage")
1285 {
1286 Ok(()) => break,
1287 Err(mismatch) => {
1288 expect_upper = mismatch.current;
1289 }
1290 }
1291 }
1292 write_frontier.borrow_mut().clone_from(&frontier);
1293
1294 }
1295 }
1296
1297
1298 Ok(())
1299 })
1300 });
1301
1302 let statuses = errors.map(|error| HealthStatusMessage {
1303 id: None,
1304 update: HealthStatusUpdate::halting(format!("{}", error.display_with_causes()), None),
1305 namespace: StatusNamespace::Iceberg,
1306 });
1307
1308 (statuses, button.press_on_drop())
1309}
1310
1311impl<G: Scope<Timestamp = Timestamp>> SinkRender<G> for IcebergSinkConnection {
1312 fn get_key_indices(&self) -> Option<&[usize]> {
1313 self.key_desc_and_indices
1314 .as_ref()
1315 .map(|(_, indices)| indices.as_slice())
1316 }
1317
1318 fn get_relation_key_indices(&self) -> Option<&[usize]> {
1319 self.relation_key_indices.as_deref()
1320 }
1321
1322 fn render_sink(
1323 &self,
1324 storage_state: &mut StorageState,
1325 sink: &StorageSinkDesc<CollectionMetadata, Timestamp>,
1326 sink_id: GlobalId,
1327 input: VecCollection<G, (Option<Row>, DiffPair<Row>), Diff>,
1328 _err_collection: VecCollection<G, DataflowError, Diff>,
1329 ) -> (Stream<G, HealthStatusMessage>, Vec<PressOnDropButton>) {
1330 let mut scope = input.scope();
1331
1332 let write_handle = {
1333 let persist = Arc::clone(&storage_state.persist_clients);
1334 let shard_meta = sink.to_storage_metadata.clone();
1335 async move {
1336 let client = persist.open(shard_meta.persist_location).await?;
1337 let handle = client
1338 .open_writer(
1339 shard_meta.data_shard,
1340 Arc::new(shard_meta.relation_desc),
1341 Arc::new(UnitSchema),
1342 Diagnostics::from_purpose("sink handle"),
1343 )
1344 .await?;
1345 Ok(handle)
1346 }
1347 };
1348
1349 let write_frontier = Rc::new(RefCell::new(Antichain::from_elem(Timestamp::minimum())));
1350 storage_state
1351 .sink_write_frontiers
1352 .insert(sink_id, Rc::clone(&write_frontier));
1353
1354 let (arrow_schema_with_ids, iceberg_schema) =
1355 match relation_desc_to_iceberg_schema(&sink.from_desc) {
1356 Ok(schemas) => schemas,
1357 Err(err) => {
1358 let error_stream = std::iter::once(HealthStatusMessage {
1359 id: None,
1360 update: HealthStatusUpdate::halting(
1361 format!("{}", err.display_with_causes()),
1362 None,
1363 ),
1364 namespace: StatusNamespace::Iceberg,
1365 })
1366 .to_stream(&mut scope);
1367 return (error_stream, vec![]);
1368 }
1369 };
1370
1371 let connection_for_minter = self.clone();
1372 let (minted_input, batch_descriptions, mint_status, mint_button) = mint_batch_descriptions(
1373 format!("{sink_id}-iceberg-mint"),
1374 sink_id,
1375 &input,
1376 sink,
1377 connection_for_minter,
1378 storage_state.storage_configuration.clone(),
1379 Arc::clone(&iceberg_schema),
1380 );
1381
1382 let connection_for_writer = self.clone();
1383 let (datafiles, write_status, write_button) = write_data_files(
1384 format!("{sink_id}-write-data-files"),
1385 minted_input,
1386 &batch_descriptions,
1387 connection_for_writer,
1388 storage_state.storage_configuration.clone(),
1389 Arc::new(arrow_schema_with_ids.clone()),
1390 );
1391
1392 let connection_for_committer = self.clone();
1393 let (commit_status, commit_button) = commit_to_iceberg(
1394 format!("{sink_id}-commit-to-iceberg"),
1395 sink_id,
1396 sink.version,
1397 &datafiles,
1398 &batch_descriptions,
1399 Rc::clone(&write_frontier),
1400 connection_for_committer,
1401 storage_state.storage_configuration.clone(),
1402 write_handle,
1403 );
1404
1405 let running_status = Some(HealthStatusMessage {
1406 id: None,
1407 update: HealthStatusUpdate::running(),
1408 namespace: StatusNamespace::Iceberg,
1409 })
1410 .to_stream(&mut scope);
1411
1412 let statuses =
1413 scope.concatenate([running_status, mint_status, write_status, commit_status]);
1414
1415 (statuses, vec![mint_button, write_button, commit_button])
1416 }
1417}