1use std::cmp::Ordering;
75use std::collections::{BTreeMap, VecDeque};
76use std::{cell::RefCell, rc::Rc, sync::Arc};
77
78use anyhow::{Context, anyhow};
79use arrow::array::Int32Array;
80use arrow::datatypes::{DataType, Field};
81use arrow::{
82 array::RecordBatch, datatypes::Schema as ArrowSchema, datatypes::SchemaRef as ArrowSchemaRef,
83};
84use differential_dataflow::lattice::Lattice;
85use differential_dataflow::{AsCollection, Hashable, VecCollection};
86use futures::StreamExt;
87use iceberg::ErrorKind;
88use iceberg::arrow::{arrow_schema_to_schema, schema_to_arrow_schema};
89use iceberg::spec::{
90 DataFile, FormatVersion, Snapshot, StructType, read_data_files_from_avro,
91 write_data_files_to_avro,
92};
93use iceberg::spec::{Schema, SchemaRef};
94use iceberg::table::Table;
95use iceberg::transaction::{ApplyTransactionAction, Transaction};
96use iceberg::writer::base_writer::data_file_writer::DataFileWriter;
97use iceberg::writer::base_writer::data_file_writer::DataFileWriterBuilder;
98use iceberg::writer::base_writer::equality_delete_writer::{
99 EqualityDeleteFileWriter, EqualityDeleteFileWriterBuilder, EqualityDeleteWriterConfig,
100};
101use iceberg::writer::base_writer::position_delete_writer::PositionDeleteFileWriter;
102use iceberg::writer::base_writer::position_delete_writer::{
103 PositionDeleteFileWriterBuilder, PositionDeleteWriterConfig,
104};
105use iceberg::writer::combined_writer::delta_writer::{DeltaWriter, DeltaWriterBuilder};
106use iceberg::writer::file_writer::ParquetWriterBuilder;
107use iceberg::writer::file_writer::location_generator::{
108 DefaultFileNameGenerator, DefaultLocationGenerator,
109};
110use iceberg::writer::{IcebergWriter, IcebergWriterBuilder};
111use iceberg::{Catalog, NamespaceIdent, TableCreation, TableIdent};
112use itertools::Itertools;
113use mz_arrow_util::builder::ArrowBuilder;
114use mz_interchange::avro::DiffPair;
115use mz_ore::cast::CastFrom;
116use mz_ore::error::ErrorExt;
117use mz_ore::future::InTask;
118use mz_ore::result::ResultExt;
119use mz_ore::retry::{Retry, RetryResult};
120use mz_persist_client::Diagnostics;
121use mz_persist_client::write::WriteHandle;
122use mz_persist_types::codec_impls::UnitSchema;
123use mz_repr::{Diff, GlobalId, Row, Timestamp};
124use mz_storage_types::StorageDiff;
125use mz_storage_types::configuration::StorageConfiguration;
126use mz_storage_types::controller::CollectionMetadata;
127use mz_storage_types::errors::DataflowError;
128use mz_storage_types::sinks::{IcebergSinkConnection, StorageSinkDesc};
129use mz_storage_types::sources::SourceData;
130use mz_timely_util::antichain::AntichainExt;
131use mz_timely_util::builder_async::{Event, OperatorBuilder, PressOnDropButton};
132use parquet::arrow::PARQUET_FIELD_ID_META_KEY;
133use parquet::file::properties::WriterProperties;
134use serde::{Deserialize, Serialize};
135use timely::PartialOrder;
136use timely::container::CapacityContainerBuilder;
137use timely::dataflow::channels::pact::{Exchange, Pipeline};
138use timely::dataflow::operators::{Broadcast, CapabilitySet, Concatenate, Map, ToStream};
139use timely::dataflow::{Scope, Stream};
140use timely::progress::{Antichain, Timestamp as _};
141
142use crate::healthcheck::{HealthStatusMessage, HealthStatusUpdate, StatusNamespace};
143use crate::render::sinks::SinkRender;
144use crate::storage_state::StorageState;
145
146const DEFAULT_ARRAY_BUILDER_ITEM_CAPACITY: usize = 1024;
149const DEFAULT_ARRAY_BUILDER_DATA_CAPACITY: usize = 1024;
153
154const PARQUET_FILE_PREFIX: &str = "mz_data";
156const INITIAL_DESCRIPTIONS_TO_MINT: u64 = 3;
159
160type ParquetWriterType = ParquetWriterBuilder<DefaultLocationGenerator, DefaultFileNameGenerator>;
161type DeltaWriterType = DeltaWriter<
162 DataFileWriter<ParquetWriterType>,
163 PositionDeleteFileWriter<ParquetWriterType>,
164 EqualityDeleteFileWriter<ParquetWriterType>,
165>;
166
167fn add_field_ids_to_arrow_schema(schema: ArrowSchema) -> ArrowSchema {
171 let fields: Vec<Field> = schema
172 .fields()
173 .iter()
174 .enumerate()
175 .map(|(i, field)| {
176 let mut metadata = field.metadata().clone();
177 metadata.insert(PARQUET_FIELD_ID_META_KEY.to_string(), (i + 1).to_string());
178 Field::new(field.name(), field.data_type().clone(), field.is_nullable())
179 .with_metadata(metadata)
180 })
181 .collect();
182
183 ArrowSchema::new(fields).with_metadata(schema.metadata().clone())
184}
185
186fn merge_materialize_metadata_into_iceberg_schema(
190 materialize_arrow_schema: &ArrowSchema,
191 iceberg_schema: &Schema,
192) -> anyhow::Result<ArrowSchema> {
193 let iceberg_arrow_schema = schema_to_arrow_schema(iceberg_schema)
195 .context("Failed to convert Iceberg schema to Arrow schema")?;
196
197 let fields: Vec<Field> = iceberg_arrow_schema
199 .fields()
200 .iter()
201 .map(|iceberg_field| {
202 let mz_field = materialize_arrow_schema
204 .field_with_name(iceberg_field.name())
205 .with_context(|| {
206 format!(
207 "Field '{}' not found in Materialize schema",
208 iceberg_field.name()
209 )
210 })?;
211
212 let mut metadata = iceberg_field.metadata().clone();
214
215 if let Some(extension_name) = mz_field.metadata().get("ARROW:extension:name") {
217 metadata.insert("ARROW:extension:name".to_string(), extension_name.clone());
218 }
219
220 Ok(Field::new(
222 iceberg_field.name(),
223 iceberg_field.data_type().clone(),
224 iceberg_field.is_nullable(),
225 )
226 .with_metadata(metadata))
227 })
228 .collect::<anyhow::Result<Vec<_>>>()?;
229
230 Ok(ArrowSchema::new(fields).with_metadata(iceberg_arrow_schema.metadata().clone()))
231}
232
233async fn reload_table(
234 catalog: &dyn Catalog,
235 namespace: String,
236 table_name: String,
237 current_table: Table,
238) -> anyhow::Result<Table> {
239 let namespace_ident = NamespaceIdent::new(namespace.clone());
240 let table_ident = TableIdent::new(namespace_ident, table_name.clone());
241 let current_schema = current_table.metadata().current_schema_id();
242 let current_partition_spec = current_table.metadata().default_partition_spec_id();
243
244 match catalog.load_table(&table_ident).await {
245 Ok(table) => {
246 let reloaded_schema = table.metadata().current_schema_id();
247 let reloaded_partition_spec = table.metadata().default_partition_spec_id();
248 if reloaded_schema != current_schema {
249 return Err(anyhow::anyhow!(
250 "Iceberg table '{}' schema changed during operation but schema evolution isn't supported, expected schema ID {}, got {}",
251 table_name,
252 current_schema,
253 reloaded_schema
254 ));
255 }
256
257 if reloaded_partition_spec != current_partition_spec {
258 return Err(anyhow::anyhow!(
259 "Iceberg table '{}' partition spec changed during operation but partition spec evolution isn't supported, expected partition spec ID {}, got {}",
260 table_name,
261 current_partition_spec,
262 reloaded_partition_spec
263 ));
264 }
265
266 Ok(table)
267 }
268 Err(err) => Err(err).context("Failed to reload Iceberg table"),
269 }
270}
271
272async fn load_or_create_table(
274 catalog: &dyn Catalog,
275 namespace: String,
276 table_name: String,
277 schema: &Schema,
278) -> anyhow::Result<iceberg::table::Table> {
279 let namespace_ident = NamespaceIdent::new(namespace.clone());
280 let table_ident = TableIdent::new(namespace_ident.clone(), table_name.clone());
281
282 match catalog.load_table(&table_ident).await {
284 Ok(table) => {
285 Ok(table)
288 }
289 Err(err) => {
290 if matches!(err.kind(), ErrorKind::TableNotFound { .. }) {
291 let table_creation = TableCreation::builder()
295 .name(table_name.clone())
296 .schema(schema.clone())
297 .build();
301
302 catalog
303 .create_table(&namespace_ident, table_creation)
304 .await
305 .with_context(|| {
306 format!(
307 "Failed to create Iceberg table '{}' in namespace '{}'",
308 table_name, namespace
309 )
310 })
311 } else {
312 Err(err).context("Failed to load Iceberg table")
314 }
315 }
316 }
317}
318
319fn retrieve_upper_from_snapshots(
324 snapshots: &mut [Arc<Snapshot>],
325) -> anyhow::Result<Option<(Antichain<Timestamp>, u64)>> {
326 snapshots.sort_by(|a, b| Ord::cmp(&b.sequence_number(), &a.sequence_number()));
327
328 for snapshot in snapshots {
329 let props = &snapshot.summary().additional_properties;
330 if let (Some(frontier_json), Some(sink_version_str)) =
331 (props.get("mz-frontier"), props.get("mz-sink-version"))
332 {
333 let frontier: Vec<Timestamp> = serde_json::from_str(frontier_json)
334 .context("Failed to deserialize frontier from snapshot properties")?;
335 let frontier = Antichain::from_iter(frontier);
336
337 let sink_version = sink_version_str
338 .parse::<u64>()
339 .context("Failed to parse mz-sink-version from snapshot properties")?;
340
341 return Ok(Some((frontier, sink_version)));
342 }
343 if snapshot.summary().operation.as_str() != "replace" {
344 anyhow::bail!(
349 "Iceberg table is in an inconsistent state: snapshot {} has operation '{}' but is missing 'mz-frontier' property. Schema or partition spec evolution is not supported.",
350 snapshot.snapshot_id(),
351 snapshot.summary().operation.as_str(),
352 );
353 }
354 }
355
356 Ok(None)
357}
358
359fn relation_desc_to_iceberg_schema(
363 desc: &mz_repr::RelationDesc,
364) -> anyhow::Result<(ArrowSchema, SchemaRef)> {
365 let arrow_schema = mz_arrow_util::builder::desc_to_schema(desc)
366 .context("Failed to convert RelationDesc to Arrow schema")?;
367
368 let arrow_schema_with_ids = add_field_ids_to_arrow_schema(arrow_schema);
369
370 let iceberg_schema = arrow_schema_to_schema(&arrow_schema_with_ids)
371 .context("Failed to convert Arrow schema to Iceberg schema")?;
372
373 Ok((arrow_schema_with_ids, Arc::new(iceberg_schema)))
374}
375
376fn build_schema_with_op_column(schema: &ArrowSchema) -> ArrowSchema {
378 let mut fields: Vec<Field> = schema.fields().iter().map(|f| f.as_ref().clone()).collect();
379 fields.push(Field::new("__op", DataType::Int32, false));
380 ArrowSchema::new(fields)
381}
382
383fn row_to_recordbatch(
387 row: DiffPair<Row>,
388 schema: ArrowSchemaRef,
389 schema_with_op: ArrowSchemaRef,
390) -> anyhow::Result<RecordBatch> {
391 let mut builder = ArrowBuilder::new_with_schema(
392 Arc::clone(&schema),
393 DEFAULT_ARRAY_BUILDER_ITEM_CAPACITY,
394 DEFAULT_ARRAY_BUILDER_DATA_CAPACITY,
395 )
396 .context("Failed to create builder")?;
397
398 let mut op_values = Vec::new();
399
400 if let Some(before) = row.before {
401 builder
402 .add_row(&before)
403 .context("Failed to add delete row to builder")?;
404 op_values.push(-1i32); }
406 if let Some(after) = row.after {
407 builder
408 .add_row(&after)
409 .context("Failed to add insert row to builder")?;
410 op_values.push(1i32); }
412
413 let batch = builder
414 .to_record_batch()
415 .context("Failed to create record batch")?;
416
417 let mut columns = Vec::with_capacity(batch.columns().len() + 1);
418 columns.extend_from_slice(batch.columns());
419 let op_column = Arc::new(Int32Array::from(op_values));
420 columns.push(op_column);
421
422 let batch_with_op = RecordBatch::try_new(schema_with_op, columns)
423 .context("Failed to create batch with op column")?;
424 Ok(batch_with_op)
425}
426
427fn mint_batch_descriptions<G, D>(
432 name: String,
433 sink_id: GlobalId,
434 input: &VecCollection<G, D, Diff>,
435 sink: &StorageSinkDesc<CollectionMetadata, Timestamp>,
436 connection: IcebergSinkConnection,
437 storage_configuration: StorageConfiguration,
438 initial_schema: SchemaRef,
439) -> (
440 VecCollection<G, D, Diff>,
441 Stream<G, (Antichain<Timestamp>, Antichain<Timestamp>)>,
442 Stream<G, HealthStatusMessage>,
443 PressOnDropButton,
444)
445where
446 G: Scope<Timestamp = Timestamp>,
447 D: Clone + 'static,
448{
449 let scope = input.scope();
450 let name_for_error = name.clone();
451 let mut builder = OperatorBuilder::new(name, scope.clone());
452 let sink_version = sink.version;
453
454 let hashed_id = sink_id.hashed();
455 let is_active_worker = usize::cast_from(hashed_id) % scope.peers() == scope.index();
456 let (output, output_stream) = builder.new_output();
457 let (batch_desc_output, batch_desc_stream) =
458 builder.new_output::<CapacityContainerBuilder<_>>();
459 let mut input =
460 builder.new_input_for_many(&input.inner, Pipeline, [&output, &batch_desc_output]);
461
462 let as_of = sink.as_of.clone();
463 let commit_interval = sink
464 .commit_interval
465 .expect("the planner should have enforced this")
466 .clone();
467
468 let (button, errors): (_, Stream<G, Rc<anyhow::Error>>) = builder.build_fallible(move |caps| {
469 Box::pin(async move {
470 let [data_capset, capset]: &mut [_; 2] = caps.try_into().unwrap();
471 *data_capset = CapabilitySet::new();
472
473 if !is_active_worker {
474 *capset = CapabilitySet::new();
475 *data_capset = CapabilitySet::new();
476 while let Some(event) = input.next().await {
477 match event {
478 Event::Data([output_cap, _], mut data) => {
479 output.give_container(&output_cap, &mut data);
480 }
481 Event::Progress(_) => {}
482 }
483 }
484 return Ok(());
485 }
486
487 let catalog = connection
488 .catalog_connection
489 .connect(&storage_configuration, InTask::Yes)
490 .await
491 .context("Failed to connect to iceberg catalog")?;
492
493 let table = load_or_create_table(
494 catalog.as_ref(),
495 connection.namespace.clone(),
496 connection.table.clone(),
497 initial_schema.as_ref(),
498 )
499 .await?;
500
501 let mut snapshots: Vec<_> = table.metadata().snapshots().cloned().collect();
502 let resume = retrieve_upper_from_snapshots(&mut snapshots)?;
503 let (resume_upper, resume_version) = match resume {
504 Some((f, v)) => (f, v),
505 None => (Antichain::from_elem(Timestamp::minimum()), 0),
506 };
507
508 let overcompacted =
510 *resume_upper != [Timestamp::minimum()] &&
512 PartialOrder::less_than(&resume_upper, &as_of);
514
515 if overcompacted {
516 let err = format!(
517 "{name_for_error}: input compacted past resume upper: as_of {}, resume_upper: {}",
518 as_of.pretty(),
519 resume_upper.pretty()
520 );
521 return Err(anyhow::anyhow!("{err}"));
525 };
526
527 if resume_version > sink_version {
528 anyhow::bail!("Fenced off by newer sink version: resume_version {}, sink_version {}", resume_version, sink_version);
529 }
530
531 let mut initialized = false;
532 let mut observed_frontier;
533 let mut minted_batches = VecDeque::new();
538 loop {
539 if let Some(event) = input.next().await {
540 match event {
541 Event::Data([output_cap, _], mut data) => {
542 output.give_container(&output_cap, &mut data);
543 continue;
544 }
545 Event::Progress(frontier) => {
546 observed_frontier = frontier;
547 }
548 }
549 } else {
550 return Ok(());
551 }
552
553 if !initialized {
554 if PartialOrder::less_equal(&observed_frontier, &resume_upper) || PartialOrder::less_equal(&observed_frontier, &as_of) {
557 continue;
558 }
559
560 let mut batch_descriptions = vec![];
561 let mut current_upper = observed_frontier.clone();
562 let current_upper_ts = current_upper.as_option().unwrap().clone();
563
564 if PartialOrder::less_than(&resume_upper, ¤t_upper) {
566 let batch_description = (resume_upper.clone(), current_upper.clone());
567 batch_descriptions.push(batch_description);
568 }
569
570 for i in 1..INITIAL_DESCRIPTIONS_TO_MINT + 1 {
572 let duration_millis = commit_interval.as_millis()
573 .checked_mul(u128::from(i))
574 .expect("commit interval multiplication overflow");
575 let duration_ts = Timestamp::new(u64::try_from(duration_millis)
576 .expect("commit interval too large for u64"));
577 let desired_batch_upper = Antichain::from_elem(current_upper_ts.step_forward_by(&duration_ts));
578
579 let batch_description = (current_upper.clone(), desired_batch_upper);
580 current_upper = batch_description.1.clone();
581 batch_descriptions.push(batch_description);
582 }
583
584 minted_batches.extend(batch_descriptions.clone());
585
586 for desc in batch_descriptions {
587 batch_desc_output.give(&capset[0], desc);
588 }
589
590 capset.downgrade(current_upper);
591
592 initialized = true;
593 } else {
594 while let Some(oldest_desc) = minted_batches.front() {
597 let oldest_upper = &oldest_desc.1;
598 if !PartialOrder::less_equal(oldest_upper, &observed_frontier) {
599 break;
600 }
601
602 let newest_upper = minted_batches.back().unwrap().1.clone();
603 let new_lower = newest_upper.clone();
604 let duration_ts = Timestamp::new(commit_interval.as_millis()
605 .try_into()
606 .expect("commit interval too large for u64"));
607 let new_upper = Antichain::from_elem(newest_upper
608 .as_option()
609 .unwrap()
610 .step_forward_by(&duration_ts));
611
612 let new_batch_description = (new_lower.clone(), new_upper.clone());
613 minted_batches.pop_front();
614 minted_batches.push_back(new_batch_description.clone());
615
616 batch_desc_output.give(&capset[0], new_batch_description);
617
618 capset.downgrade(new_upper);
619 }
620 }
621 }
622 })
623 });
624
625 let statuses = errors.map(|error| HealthStatusMessage {
626 id: None,
627 update: HealthStatusUpdate::halting(format!("{}", error.display_with_causes()), None),
628 namespace: StatusNamespace::Iceberg,
629 });
630 (
631 output_stream.as_collection(),
632 batch_desc_stream,
633 statuses,
634 button.press_on_drop(),
635 )
636}
637
638#[derive(Clone, Debug, Serialize, Deserialize)]
639#[serde(try_from = "AvroDataFile", into = "AvroDataFile")]
640struct SerializableDataFile {
641 pub data_file: DataFile,
642 pub schema: Schema,
643}
644
645#[derive(Clone, Debug, Serialize, Deserialize)]
653struct AvroDataFile {
654 pub data_file: Vec<u8>,
655 pub schema: Schema,
656}
657
658impl From<SerializableDataFile> for AvroDataFile {
659 fn from(value: SerializableDataFile) -> Self {
660 let mut data_file = Vec::new();
661 write_data_files_to_avro(
662 &mut data_file,
663 [value.data_file],
664 &StructType::new(vec![]),
665 FormatVersion::V2,
666 )
667 .expect("serialization into buffer");
668 AvroDataFile {
669 data_file,
670 schema: value.schema,
671 }
672 }
673}
674
675impl TryFrom<AvroDataFile> for SerializableDataFile {
676 type Error = String;
677
678 fn try_from(value: AvroDataFile) -> Result<Self, Self::Error> {
679 let data_files = read_data_files_from_avro(
680 &mut &*value.data_file,
681 &value.schema,
682 0,
683 &StructType::new(vec![]),
684 FormatVersion::V2,
685 )
686 .map_err_to_string_with_causes()?;
687 let Some(data_file) = data_files.into_iter().next() else {
688 return Err("No DataFile found in Avro data".into());
689 };
690 Ok(SerializableDataFile {
691 data_file,
692 schema: value.schema,
693 })
694 }
695}
696
697#[derive(Clone, Debug, Serialize, Deserialize)]
699struct BoundedDataFile {
700 pub data_file: SerializableDataFile,
701 pub batch_desc: (Antichain<Timestamp>, Antichain<Timestamp>),
702}
703
704impl BoundedDataFile {
705 pub fn new(
706 file: DataFile,
707 schema: Schema,
708 batch_desc: (Antichain<Timestamp>, Antichain<Timestamp>),
709 ) -> Self {
710 Self {
711 data_file: SerializableDataFile {
712 data_file: file,
713 schema,
714 },
715 batch_desc,
716 }
717 }
718
719 pub fn batch_desc(&self) -> &(Antichain<Timestamp>, Antichain<Timestamp>) {
720 &self.batch_desc
721 }
722
723 pub fn data_file(&self) -> &DataFile {
724 &self.data_file.data_file
725 }
726
727 pub fn into_data_file(self) -> DataFile {
728 self.data_file.data_file
729 }
730}
731
732#[derive(Clone, Debug, Default)]
734struct BoundedDataFileSet {
735 pub data_files: Vec<BoundedDataFile>,
736}
737
738fn write_data_files<G>(
742 name: String,
743 input: VecCollection<G, (Option<Row>, DiffPair<Row>), Diff>,
744 batch_desc_input: &Stream<G, (Antichain<Timestamp>, Antichain<Timestamp>)>,
745 connection: IcebergSinkConnection,
746 storage_configuration: StorageConfiguration,
747 materialize_arrow_schema: Arc<ArrowSchema>,
748) -> (
749 Stream<G, BoundedDataFile>,
750 Stream<G, HealthStatusMessage>,
751 PressOnDropButton,
752)
753where
754 G: Scope<Timestamp = Timestamp>,
755{
756 let scope = input.scope();
757 let mut builder = OperatorBuilder::new(name, scope.clone());
758
759 let (output, output_stream) = builder.new_output::<CapacityContainerBuilder<_>>();
760
761 let mut batch_desc_input =
762 builder.new_input_for(&batch_desc_input.broadcast(), Pipeline, &output);
763 let mut input = builder.new_disconnected_input(&input.inner, Pipeline);
764
765 let (button, errors) = builder.build_fallible(|caps| {
766 Box::pin(async move {
767 let [capset]: &mut [_; 1] = caps.try_into().unwrap();
768 let catalog = connection
769 .catalog_connection
770 .connect(&storage_configuration, InTask::Yes)
771 .await
772 .context("Failed to connect to iceberg catalog")?;
773
774 let namespace_ident = NamespaceIdent::new(connection.namespace.clone());
775 let table_ident = TableIdent::new(namespace_ident, connection.table.clone());
776 let table = catalog
777 .load_table(&table_ident)
778 .await
779 .context("Failed to load Iceberg table")?;
780
781 let table_metadata = table.metadata().clone();
782 let current_schema = Arc::clone(table_metadata.current_schema());
783
784 let arrow_schema = Arc::new(
785 merge_materialize_metadata_into_iceberg_schema(
786 materialize_arrow_schema.as_ref(),
787 current_schema.as_ref(),
788 )
789 .context("Failed to merge Materialize metadata into Iceberg schema")?,
790 );
791
792 let schema_with_op = Arc::new(build_schema_with_op_column(&arrow_schema));
793
794 let location = table_metadata.location();
798 let corrected_location = match location.rsplit_once("/metadata/") {
799 Some((a, b)) if b.ends_with(".metadata.json") => a,
800 _ => location,
801 };
802
803 let data_location = format!("{}/data", corrected_location);
804 let location_generator = DefaultLocationGenerator::with_data_location(data_location);
805
806 let unique_suffix = format!("-{}", uuid::Uuid::new_v4());
808 let file_name_generator = DefaultFileNameGenerator::new(
809 PARQUET_FILE_PREFIX.to_string(),
810 Some(unique_suffix),
811 iceberg::spec::DataFileFormat::Parquet,
812 );
813
814 let file_io = table.file_io().clone();
815
816 let Some((_, equality_indices)) = connection.key_desc_and_indices else {
817 return Err(anyhow::anyhow!(
818 "Iceberg sink requires key columns for equality deletes"
819 ));
820 };
821
822 let equality_ids: Vec<i32> = equality_indices
823 .iter()
824 .map(|u| i32::try_from(*u).map(|v| v + 1))
825 .collect::<Result<Vec<i32>, _>>()
826 .context("Failed to convert equality index to i32 (index too large)")?;
827
828 let writer_properties = WriterProperties::new();
829
830 let create_delta_writer = || async {
831 let data_parquet_writer = ParquetWriterBuilder::new(
832 writer_properties.clone(),
833 Arc::clone(¤t_schema),
834 None,
835 file_io.clone(),
836 location_generator.clone(),
837 file_name_generator.clone(),
838 );
839 let data_writer_builder = DataFileWriterBuilder::new(data_parquet_writer, None, 0);
840
841 let pos_arrow_schema = PositionDeleteWriterConfig::arrow_schema();
842 let pos_schema =
843 Arc::new(arrow_schema_to_schema(&pos_arrow_schema).context(
844 "Failed to convert position delete Arrow schema to Iceberg schema",
845 )?);
846 let pos_config = PositionDeleteWriterConfig::new(None, 0, None);
847 let pos_parquet_writer = ParquetWriterBuilder::new(
848 writer_properties.clone(),
849 pos_schema,
850 None,
851 file_io.clone(),
852 location_generator.clone(),
853 file_name_generator.clone(),
854 );
855 let pos_delete_writer_builder =
856 PositionDeleteFileWriterBuilder::new(pos_parquet_writer, pos_config);
857
858 let eq_config = EqualityDeleteWriterConfig::new(
859 equality_ids.clone(),
860 Arc::clone(¤t_schema),
861 None,
862 0,
863 )
864 .context("Failed to create EqualityDeleteWriterConfig")?;
865
866 let eq_schema = Arc::new(
867 arrow_schema_to_schema(eq_config.projected_arrow_schema_ref()).context(
868 "Failed to convert equality delete Arrow schema to Iceberg schema",
869 )?,
870 );
871
872 let eq_parquet_writer = ParquetWriterBuilder::new(
873 writer_properties.clone(),
874 eq_schema,
875 None,
876 file_io.clone(),
877 location_generator.clone(),
878 file_name_generator.clone(),
879 );
880 let eq_delete_writer_builder =
881 EqualityDeleteFileWriterBuilder::new(eq_parquet_writer, eq_config);
882
883 DeltaWriterBuilder::new(
884 data_writer_builder,
885 pos_delete_writer_builder,
886 eq_delete_writer_builder,
887 equality_ids.clone(),
888 )
889 .build()
890 .await
891 .context("Failed to create DeltaWriter")
892 };
893
894 let mut stashed_rows: BTreeMap<Timestamp, Vec<(Option<Row>, DiffPair<Row>)>> =
897 BTreeMap::new();
898
899 #[allow(clippy::disallowed_types)]
904 let mut in_flight_batches: std::collections::HashMap<
905 (Antichain<Timestamp>, Antichain<Timestamp>),
906 DeltaWriterType,
907 > = std::collections::HashMap::new();
908
909 let mut batch_description_frontier = Antichain::from_elem(Timestamp::minimum());
910 let mut processed_batch_description_frontier =
911 Antichain::from_elem(Timestamp::minimum());
912 let mut input_frontier = Antichain::from_elem(Timestamp::minimum());
913 let mut processed_input_frontier = Antichain::from_elem(Timestamp::minimum());
914
915 while !(batch_description_frontier.is_empty() && input_frontier.is_empty()) {
916 tokio::select! {
917 _ = batch_desc_input.ready() => {},
918 _ = input.ready() => {}
919 }
920
921 while let Some(event) = batch_desc_input.next_sync() {
922 match event {
923 Event::Data(_cap, data) => {
924 for batch_desc in data {
925 let (lower, upper) = &batch_desc;
926 let mut delta_writer = create_delta_writer().await?;
927 let row_ts_keys: Vec<_> = stashed_rows.keys().cloned().collect();
929 for row_ts in row_ts_keys {
930 let ts = Antichain::from_elem(row_ts.clone());
931 if PartialOrder::less_equal(lower, &ts)
932 && PartialOrder::less_than(&ts, upper)
933 {
934 if let Some(rows) = stashed_rows.remove(&row_ts) {
935 for (_row, diff_pair) in rows {
936 let record_batch = row_to_recordbatch(
937 diff_pair.clone(),
938 Arc::clone(&arrow_schema),
939 Arc::clone(&schema_with_op),
940 )
941 .context("failed to convert row to recordbatch")?;
942 delta_writer.write(record_batch).await.context(
943 "Failed to write row to DeltaWriter",
944 )?;
945 }
946 }
947 }
948 }
949 let prev =
950 in_flight_batches.insert(batch_desc.clone(), delta_writer);
951 if prev.is_some() {
952 anyhow::bail!(
953 "Duplicate batch description received for description {:?}",
954 batch_desc
955 );
956 }
957 }
958 }
959 Event::Progress(frontier) => {
960 batch_description_frontier = frontier;
961 }
962 }
963 }
964
965 let ready_events = std::iter::from_fn(|| input.next_sync()).collect_vec();
966 for event in ready_events {
967 match event {
968 Event::Data(_cap, data) => {
969 for ((row, diff_pair), ts, _diff) in data {
970 let row_ts = ts.clone();
971 let ts_antichain = Antichain::from_elem(row_ts.clone());
972 let mut written = false;
973 for (batch_desc, delta_writer) in in_flight_batches.iter_mut() {
975 let (lower, upper) = batch_desc;
976 if PartialOrder::less_equal(lower, &ts_antichain)
977 && PartialOrder::less_than(&ts_antichain, upper)
978 {
979 let record_batch = row_to_recordbatch(
980 diff_pair.clone(),
981 Arc::clone(&arrow_schema),
982 Arc::clone(&schema_with_op),
983 )
984 .context("failed to convert row to recordbatch")?;
985 delta_writer
986 .write(record_batch)
987 .await
988 .context("Failed to write row to DeltaWriter")?;
989 written = true;
990 break;
991 }
992 }
993 if !written {
994 let entry = stashed_rows.entry(row_ts).or_default();
996 entry.push((row, diff_pair));
997 }
998 }
999 }
1000 Event::Progress(frontier) => {
1001 input_frontier = frontier;
1002 }
1003 }
1004 }
1005
1006 if PartialOrder::less_equal(
1008 &processed_batch_description_frontier,
1009 &batch_description_frontier,
1010 ) || PartialOrder::less_than(&processed_input_frontier, &input_frontier)
1011 {
1012 let ready_batches: Vec<_> = in_flight_batches
1014 .extract_if(|(lower, upper), _| {
1015 PartialOrder::less_than(lower, &batch_description_frontier)
1016 && PartialOrder::less_than(upper, &input_frontier)
1017 })
1018 .collect();
1019
1020 if !ready_batches.is_empty() {
1021 let mut max_upper = Antichain::from_elem(Timestamp::minimum());
1022 for (desc, mut delta_writer) in ready_batches {
1023 let data_files = delta_writer
1024 .close()
1025 .await
1026 .context("Failed to close DeltaWriter")?;
1027 for data_file in data_files {
1028 let file = BoundedDataFile::new(
1029 data_file,
1030 current_schema.as_ref().clone(),
1031 desc.clone(),
1032 );
1033 output.give(&capset[0], file);
1034 }
1035
1036 max_upper = max_upper.join(&desc.1);
1037 }
1038
1039 capset.downgrade(max_upper);
1040 }
1041 processed_batch_description_frontier.clone_from(&batch_description_frontier);
1042 processed_input_frontier.clone_from(&input_frontier);
1043 }
1044 }
1045 Ok(())
1046 })
1047 });
1048
1049 let statuses = errors.map(|error| HealthStatusMessage {
1050 id: None,
1051 update: HealthStatusUpdate::halting(format!("{}", error.display_with_causes()), None),
1052 namespace: StatusNamespace::Iceberg,
1053 });
1054 (output_stream, statuses, button.press_on_drop())
1055}
1056
1057fn commit_to_iceberg<G>(
1061 name: String,
1062 sink_id: GlobalId,
1063 sink_version: u64,
1064 batch_input: &Stream<G, BoundedDataFile>,
1065 batch_desc_input: &Stream<G, (Antichain<Timestamp>, Antichain<Timestamp>)>,
1066 write_frontier: Rc<RefCell<Antichain<Timestamp>>>,
1067 connection: IcebergSinkConnection,
1068 storage_configuration: StorageConfiguration,
1069 write_handle: impl Future<
1070 Output = anyhow::Result<WriteHandle<SourceData, (), Timestamp, StorageDiff>>,
1071 > + 'static,
1072) -> (Stream<G, HealthStatusMessage>, PressOnDropButton)
1073where
1074 G: Scope<Timestamp = Timestamp>,
1075{
1076 let scope = batch_input.scope();
1077 let mut builder = OperatorBuilder::new(name, scope.clone());
1078
1079 let hashed_id = sink_id.hashed();
1080 let is_active_worker = usize::cast_from(hashed_id) % scope.peers() == scope.index();
1081
1082 let mut input = builder.new_disconnected_input(batch_input, Exchange::new(move |_| hashed_id));
1083 let mut batch_desc_input =
1084 builder.new_disconnected_input(batch_desc_input, Exchange::new(move |_| hashed_id));
1085
1086 let (button, errors) = builder.build_fallible(move |_caps| {
1087 Box::pin(async move {
1088 if !is_active_worker {
1089 write_frontier.borrow_mut().clear();
1090 return Ok(());
1091 }
1092
1093 let catalog = connection
1094 .catalog_connection
1095 .connect(&storage_configuration, InTask::Yes)
1096 .await
1097 .context("Failed to connect to iceberg catalog")?;
1098
1099 let mut write_handle = write_handle.await?;
1100
1101 let namespace_ident = NamespaceIdent::new(connection.namespace.clone());
1102 let table_ident = TableIdent::new(namespace_ident, connection.table.clone());
1103 let mut table = catalog
1104 .load_table(&table_ident)
1105 .await
1106 .context("Failed to load Iceberg table")?;
1107
1108 #[allow(clippy::disallowed_types)]
1109 let mut batch_descriptions: std::collections::HashMap<
1110 (Antichain<Timestamp>, Antichain<Timestamp>),
1111 BoundedDataFileSet,
1112 > = std::collections::HashMap::new();
1113
1114
1115 let mut batch_description_frontier = Antichain::from_elem(Timestamp::minimum());
1116 let mut input_frontier = Antichain::from_elem(Timestamp::minimum());
1117
1118 while !(batch_description_frontier.is_empty() && input_frontier.is_empty()) {
1119 tokio::select! {
1120 _ = batch_desc_input.ready() => {},
1121 _ = input.ready() => {}
1122 }
1123
1124 while let Some(event) = batch_desc_input.next_sync() {
1125 match event {
1126 Event::Data(_cap, data) => {
1127 for batch_desc in data {
1128 let prev = batch_descriptions.insert(batch_desc, BoundedDataFileSet { data_files: vec![] });
1129 if let Some(prev) = prev {
1130 anyhow::bail!(
1131 "Duplicate batch description received in commit operator: {:?}",
1132 prev
1133 );
1134 }
1135 }
1136 }
1137 Event::Progress(frontier) => {
1138 batch_description_frontier = frontier;
1139 }
1140 }
1141 }
1142
1143 let ready_events = std::iter::from_fn(|| input.next_sync()).collect_vec();
1144 for event in ready_events {
1145 match event {
1146 Event::Data(_cap, data) => {
1147 for bounded_data_file in data {
1148 let entry = batch_descriptions.entry(bounded_data_file.batch_desc().clone()).or_default();
1149 entry.data_files.push(bounded_data_file);
1150 }
1151 }
1152 Event::Progress(frontier) => {
1153 input_frontier = frontier;
1154 }
1155 }
1156 }
1157
1158 let mut done_batches: Vec<_> = batch_descriptions
1159 .keys()
1160 .filter(|(lower, _upper)| {
1161 PartialOrder::less_than(lower, &input_frontier)
1162 })
1163 .cloned()
1164 .collect();
1165
1166 done_batches.sort_by(|a, b| {
1168 if PartialOrder::less_than(a, b) {
1169 Ordering::Less
1170 } else if PartialOrder::less_than(b, a) {
1171 Ordering::Greater
1172 } else {
1173 Ordering::Equal
1174 }
1175 });
1176
1177 for batch in done_batches {
1178 let file_set = batch_descriptions.remove(&batch).unwrap();
1179
1180 let mut data_files = vec![];
1181 let mut delete_files = vec![];
1182 for file in file_set.data_files {
1183 match file.data_file().content_type() {
1184 iceberg::spec::DataContentType::Data => {
1185 data_files.push(file.into_data_file());
1186 }
1187 iceberg::spec::DataContentType::PositionDeletes |
1188 iceberg::spec::DataContentType::EqualityDeletes => {
1189 delete_files.push(file.into_data_file());
1190 }
1191 }
1192 }
1193
1194 let frontier = batch.1.clone();
1195 let tx = Transaction::new(&table);
1196
1197 let frontier_json = serde_json::to_string(&frontier.elements())
1198 .context("Failed to serialize frontier to JSON")?;
1199
1200 let mut action = tx.row_delta().set_snapshot_properties(
1202 vec![
1203 ("mz-sink-id".to_string(), sink_id.to_string()),
1204 ("mz-frontier".to_string(), frontier_json),
1205 ("mz-sink-version".to_string(), sink_version.to_string()),
1206 ].into_iter().collect()
1207 );
1208
1209 if !data_files.is_empty() || !delete_files.is_empty() {
1210 action = action.add_data_files(data_files).add_delete_files(delete_files);
1211 }
1212
1213 let tx = action.apply(tx).context(
1214 "Failed to apply data file addition to iceberg table transaction",
1215 )?;
1216
1217 table = Retry::default().max_tries(5).retry_async(|_| async {
1218 let new_table = tx.clone().commit(catalog.as_ref()).await;
1219 match new_table {
1220 Err(e) if matches!(e.kind(), ErrorKind::CatalogCommitConflicts) => {
1221 let table = reload_table(
1222 catalog.as_ref(),
1223 connection.namespace.clone(),
1224 connection.table.clone(),
1225 table.clone(),
1226 ).await;
1227 let table = match table {
1228 Ok(table) => table,
1229 Err(e) => return RetryResult::RetryableErr(anyhow!(e)),
1230 };
1231
1232 let mut snapshots: Vec<_> = table.metadata().snapshots().cloned().collect();
1233 let last = retrieve_upper_from_snapshots(&mut snapshots);
1234 let last = match last {
1235 Ok(val) => val,
1236 Err(e) => return RetryResult::RetryableErr(anyhow!(e)),
1237 };
1238
1239 if let Some((last_frontier, _last_version)) = last {
1241 if PartialOrder::less_equal(&frontier, &last_frontier) {
1242 return RetryResult::FatalErr(anyhow!(
1243 "Iceberg table '{}' has been modified by another writer. Current frontier: {:?}, last frontier: {:?}.",
1244 connection.table,
1245 frontier,
1246 last_frontier,
1247 ));
1248 }
1249 }
1250
1251 RetryResult::Ok(table)
1252 }
1253 Err(e) => RetryResult::RetryableErr(anyhow!(e)),
1254 Ok(table) => RetryResult::Ok(table)
1255 }
1256 }).await.context("failed to commit to iceberg")?;
1257
1258 let mut expect_upper = write_handle.shared_upper();
1259 loop {
1260 if PartialOrder::less_equal(&frontier, &expect_upper) {
1261 break;
1263 }
1264
1265 const EMPTY: &[((SourceData, ()), Timestamp, StorageDiff)] = &[];
1266 match write_handle
1267 .compare_and_append(EMPTY, expect_upper, frontier.clone())
1268 .await
1269 .expect("valid usage")
1270 {
1271 Ok(()) => break,
1272 Err(mismatch) => {
1273 expect_upper = mismatch.current;
1274 }
1275 }
1276 }
1277 write_frontier.borrow_mut().clone_from(&frontier);
1278
1279 }
1280 }
1281
1282
1283 Ok(())
1284 })
1285 });
1286
1287 let statuses = errors.map(|error| HealthStatusMessage {
1288 id: None,
1289 update: HealthStatusUpdate::halting(format!("{}", error.display_with_causes()), None),
1290 namespace: StatusNamespace::Iceberg,
1291 });
1292
1293 (statuses, button.press_on_drop())
1294}
1295
1296impl<G: Scope<Timestamp = Timestamp>> SinkRender<G> for IcebergSinkConnection {
1297 fn get_key_indices(&self) -> Option<&[usize]> {
1298 self.key_desc_and_indices
1299 .as_ref()
1300 .map(|(_, indices)| indices.as_slice())
1301 }
1302
1303 fn get_relation_key_indices(&self) -> Option<&[usize]> {
1304 self.relation_key_indices.as_deref()
1305 }
1306
1307 fn render_sink(
1308 &self,
1309 storage_state: &mut StorageState,
1310 sink: &StorageSinkDesc<CollectionMetadata, Timestamp>,
1311 sink_id: GlobalId,
1312 input: VecCollection<G, (Option<Row>, DiffPair<Row>), Diff>,
1313 _err_collection: VecCollection<G, DataflowError, Diff>,
1314 ) -> (Stream<G, HealthStatusMessage>, Vec<PressOnDropButton>) {
1315 let mut scope = input.scope();
1316
1317 let write_handle = {
1318 let persist = Arc::clone(&storage_state.persist_clients);
1319 let shard_meta = sink.to_storage_metadata.clone();
1320 async move {
1321 let client = persist.open(shard_meta.persist_location).await?;
1322 let handle = client
1323 .open_writer(
1324 shard_meta.data_shard,
1325 Arc::new(shard_meta.relation_desc),
1326 Arc::new(UnitSchema),
1327 Diagnostics::from_purpose("sink handle"),
1328 )
1329 .await?;
1330 Ok(handle)
1331 }
1332 };
1333
1334 let write_frontier = Rc::new(RefCell::new(Antichain::from_elem(Timestamp::minimum())));
1335 storage_state
1336 .sink_write_frontiers
1337 .insert(sink_id, Rc::clone(&write_frontier));
1338
1339 let (arrow_schema_with_ids, iceberg_schema) =
1340 match relation_desc_to_iceberg_schema(&sink.from_desc) {
1341 Ok(schemas) => schemas,
1342 Err(err) => {
1343 let error_stream = std::iter::once(HealthStatusMessage {
1344 id: None,
1345 update: HealthStatusUpdate::halting(
1346 format!("{}", err.display_with_causes()),
1347 None,
1348 ),
1349 namespace: StatusNamespace::Iceberg,
1350 })
1351 .to_stream(&mut scope);
1352 return (error_stream, vec![]);
1353 }
1354 };
1355
1356 let connection_for_minter = self.clone();
1357 let (minted_input, batch_descriptions, mint_status, mint_button) = mint_batch_descriptions(
1358 format!("{sink_id}-iceberg-mint"),
1359 sink_id,
1360 &input,
1361 sink,
1362 connection_for_minter,
1363 storage_state.storage_configuration.clone(),
1364 Arc::clone(&iceberg_schema),
1365 );
1366
1367 let connection_for_writer = self.clone();
1368 let (datafiles, write_status, write_button) = write_data_files(
1369 format!("{sink_id}-write-data-files"),
1370 minted_input,
1371 &batch_descriptions,
1372 connection_for_writer,
1373 storage_state.storage_configuration.clone(),
1374 Arc::new(arrow_schema_with_ids.clone()),
1375 );
1376
1377 let connection_for_committer = self.clone();
1378 let (commit_status, commit_button) = commit_to_iceberg(
1379 format!("{sink_id}-commit-to-iceberg"),
1380 sink_id,
1381 sink.version,
1382 &datafiles,
1383 &batch_descriptions,
1384 Rc::clone(&write_frontier),
1385 connection_for_committer,
1386 storage_state.storage_configuration.clone(),
1387 write_handle,
1388 );
1389
1390 let running_status = Some(HealthStatusMessage {
1391 id: None,
1392 update: HealthStatusUpdate::running(),
1393 namespace: StatusNamespace::Iceberg,
1394 })
1395 .to_stream(&mut scope);
1396
1397 let statuses =
1398 scope.concatenate([running_status, mint_status, write_status, commit_status]);
1399
1400 (statuses, vec![mint_button, write_button, commit_button])
1401 }
1402}