1use std::cmp::Ordering;
75use std::collections::{BTreeMap, VecDeque};
76use std::{cell::RefCell, rc::Rc, sync::Arc};
77
78use anyhow::{Context, anyhow};
79use arrow::array::Int32Array;
80use arrow::datatypes::{DataType, Field};
81use arrow::{
82 array::RecordBatch, datatypes::Schema as ArrowSchema, datatypes::SchemaRef as ArrowSchemaRef,
83};
84use differential_dataflow::lattice::Lattice;
85use differential_dataflow::{AsCollection, Hashable, VecCollection};
86use futures::StreamExt;
87use iceberg::ErrorKind;
88use iceberg::arrow::{arrow_schema_to_schema, schema_to_arrow_schema};
89use iceberg::spec::{
90 DataFile, FormatVersion, Snapshot, StructType, read_data_files_from_avro,
91 write_data_files_to_avro,
92};
93use iceberg::spec::{Schema, SchemaRef};
94use iceberg::table::Table;
95use iceberg::transaction::{ApplyTransactionAction, Transaction};
96use iceberg::writer::base_writer::data_file_writer::DataFileWriter;
97use iceberg::writer::base_writer::data_file_writer::DataFileWriterBuilder;
98use iceberg::writer::base_writer::equality_delete_writer::{
99 EqualityDeleteFileWriter, EqualityDeleteFileWriterBuilder, EqualityDeleteWriterConfig,
100};
101use iceberg::writer::base_writer::position_delete_writer::PositionDeleteFileWriter;
102use iceberg::writer::base_writer::position_delete_writer::{
103 PositionDeleteFileWriterBuilder, PositionDeleteWriterConfig,
104};
105use iceberg::writer::combined_writer::delta_writer::{DeltaWriter, DeltaWriterBuilder};
106use iceberg::writer::file_writer::ParquetWriterBuilder;
107use iceberg::writer::file_writer::location_generator::{
108 DefaultFileNameGenerator, DefaultLocationGenerator,
109};
110use iceberg::writer::file_writer::rolling_writer::RollingFileWriterBuilder;
111use iceberg::writer::{IcebergWriter, IcebergWriterBuilder};
112use iceberg::{Catalog, NamespaceIdent, TableCreation, TableIdent};
113use itertools::Itertools;
114use mz_arrow_util::builder::ArrowBuilder;
115use mz_interchange::avro::DiffPair;
116use mz_ore::cast::CastFrom;
117use mz_ore::error::ErrorExt;
118use mz_ore::future::InTask;
119use mz_ore::result::ResultExt;
120use mz_ore::retry::{Retry, RetryResult};
121use mz_persist_client::Diagnostics;
122use mz_persist_client::write::WriteHandle;
123use mz_persist_types::codec_impls::UnitSchema;
124use mz_repr::{Diff, GlobalId, Row, Timestamp};
125use mz_storage_types::StorageDiff;
126use mz_storage_types::configuration::StorageConfiguration;
127use mz_storage_types::controller::CollectionMetadata;
128use mz_storage_types::errors::DataflowError;
129use mz_storage_types::sinks::{IcebergSinkConnection, StorageSinkDesc};
130use mz_storage_types::sources::SourceData;
131use mz_timely_util::antichain::AntichainExt;
132use mz_timely_util::builder_async::{Event, OperatorBuilder, PressOnDropButton};
133use parquet::arrow::PARQUET_FIELD_ID_META_KEY;
134use parquet::file::properties::WriterProperties;
135use serde::{Deserialize, Serialize};
136use timely::PartialOrder;
137use timely::container::CapacityContainerBuilder;
138use timely::dataflow::channels::pact::{Exchange, Pipeline};
139use timely::dataflow::operators::{Broadcast, CapabilitySet, Concatenate, Map, ToStream};
140use timely::dataflow::{Scope, Stream};
141use timely::progress::{Antichain, Timestamp as _};
142
143use crate::healthcheck::{HealthStatusMessage, HealthStatusUpdate, StatusNamespace};
144use crate::metrics::sink::iceberg::IcebergSinkMetrics;
145use crate::render::sinks::SinkRender;
146use crate::statistics::SinkStatistics;
147use crate::storage_state::StorageState;
148
149const DEFAULT_ARRAY_BUILDER_ITEM_CAPACITY: usize = 1024;
152const DEFAULT_ARRAY_BUILDER_DATA_CAPACITY: usize = 1024;
156
157const PARQUET_FILE_PREFIX: &str = "mz_data";
159const INITIAL_DESCRIPTIONS_TO_MINT: u64 = 3;
162
163type DeltaWriterType = DeltaWriter<
164 DataFileWriter<ParquetWriterBuilder, DefaultLocationGenerator, DefaultFileNameGenerator>,
165 PositionDeleteFileWriter<
166 ParquetWriterBuilder,
167 DefaultLocationGenerator,
168 DefaultFileNameGenerator,
169 >,
170 EqualityDeleteFileWriter<
171 ParquetWriterBuilder,
172 DefaultLocationGenerator,
173 DefaultFileNameGenerator,
174 >,
175>;
176
177fn add_field_ids_to_arrow_schema(schema: ArrowSchema) -> ArrowSchema {
181 let fields: Vec<Field> = schema
182 .fields()
183 .iter()
184 .enumerate()
185 .map(|(i, field)| {
186 let mut metadata = field.metadata().clone();
187 metadata.insert(PARQUET_FIELD_ID_META_KEY.to_string(), (i + 1).to_string());
188 Field::new(field.name(), field.data_type().clone(), field.is_nullable())
189 .with_metadata(metadata)
190 })
191 .collect();
192
193 ArrowSchema::new(fields).with_metadata(schema.metadata().clone())
194}
195
196fn merge_materialize_metadata_into_iceberg_schema(
200 materialize_arrow_schema: &ArrowSchema,
201 iceberg_schema: &Schema,
202) -> anyhow::Result<ArrowSchema> {
203 let iceberg_arrow_schema = schema_to_arrow_schema(iceberg_schema)
205 .context("Failed to convert Iceberg schema to Arrow schema")?;
206
207 let fields: Vec<Field> = iceberg_arrow_schema
209 .fields()
210 .iter()
211 .map(|iceberg_field| {
212 let mz_field = materialize_arrow_schema
214 .field_with_name(iceberg_field.name())
215 .with_context(|| {
216 format!(
217 "Field '{}' not found in Materialize schema",
218 iceberg_field.name()
219 )
220 })?;
221
222 let mut metadata = iceberg_field.metadata().clone();
224
225 if let Some(extension_name) = mz_field.metadata().get("ARROW:extension:name") {
227 metadata.insert("ARROW:extension:name".to_string(), extension_name.clone());
228 }
229
230 Ok(Field::new(
232 iceberg_field.name(),
233 iceberg_field.data_type().clone(),
234 iceberg_field.is_nullable(),
235 )
236 .with_metadata(metadata))
237 })
238 .collect::<anyhow::Result<Vec<_>>>()?;
239
240 Ok(ArrowSchema::new(fields).with_metadata(iceberg_arrow_schema.metadata().clone()))
241}
242
243async fn reload_table(
244 catalog: &dyn Catalog,
245 namespace: String,
246 table_name: String,
247 current_table: Table,
248) -> anyhow::Result<Table> {
249 let namespace_ident = NamespaceIdent::new(namespace.clone());
250 let table_ident = TableIdent::new(namespace_ident, table_name.clone());
251 let current_schema = current_table.metadata().current_schema_id();
252 let current_partition_spec = current_table.metadata().default_partition_spec_id();
253
254 match catalog.load_table(&table_ident).await {
255 Ok(table) => {
256 let reloaded_schema = table.metadata().current_schema_id();
257 let reloaded_partition_spec = table.metadata().default_partition_spec_id();
258 if reloaded_schema != current_schema {
259 return Err(anyhow::anyhow!(
260 "Iceberg table '{}' schema changed during operation but schema evolution isn't supported, expected schema ID {}, got {}",
261 table_name,
262 current_schema,
263 reloaded_schema
264 ));
265 }
266
267 if reloaded_partition_spec != current_partition_spec {
268 return Err(anyhow::anyhow!(
269 "Iceberg table '{}' partition spec changed during operation but partition spec evolution isn't supported, expected partition spec ID {}, got {}",
270 table_name,
271 current_partition_spec,
272 reloaded_partition_spec
273 ));
274 }
275
276 Ok(table)
277 }
278 Err(err) => Err(err).context("Failed to reload Iceberg table"),
279 }
280}
281
282async fn load_or_create_table(
284 catalog: &dyn Catalog,
285 namespace: String,
286 table_name: String,
287 schema: &Schema,
288) -> anyhow::Result<iceberg::table::Table> {
289 let namespace_ident = NamespaceIdent::new(namespace.clone());
290 let table_ident = TableIdent::new(namespace_ident.clone(), table_name.clone());
291
292 match catalog.load_table(&table_ident).await {
294 Ok(table) => {
295 Ok(table)
298 }
299 Err(err) => {
300 if matches!(err.kind(), ErrorKind::TableNotFound { .. })
301 || err
302 .message()
303 .contains("Tried to load a table that does not exist")
304 {
305 let table_creation = TableCreation::builder()
309 .name(table_name.clone())
310 .schema(schema.clone())
311 .build();
315
316 catalog
317 .create_table(&namespace_ident, table_creation)
318 .await
319 .with_context(|| {
320 format!(
321 "Failed to create Iceberg table '{}' in namespace '{}'",
322 table_name, namespace
323 )
324 })
325 } else {
326 Err(err).context("Failed to load Iceberg table")
328 }
329 }
330 }
331}
332
333fn retrieve_upper_from_snapshots(
338 snapshots: &mut [Arc<Snapshot>],
339) -> anyhow::Result<Option<(Antichain<Timestamp>, u64)>> {
340 snapshots.sort_by(|a, b| Ord::cmp(&b.sequence_number(), &a.sequence_number()));
341
342 for snapshot in snapshots {
343 let props = &snapshot.summary().additional_properties;
344 if let (Some(frontier_json), Some(sink_version_str)) =
345 (props.get("mz-frontier"), props.get("mz-sink-version"))
346 {
347 let frontier: Vec<Timestamp> = serde_json::from_str(frontier_json)
348 .context("Failed to deserialize frontier from snapshot properties")?;
349 let frontier = Antichain::from_iter(frontier);
350
351 let sink_version = sink_version_str
352 .parse::<u64>()
353 .context("Failed to parse mz-sink-version from snapshot properties")?;
354
355 return Ok(Some((frontier, sink_version)));
356 }
357 if snapshot.summary().operation.as_str() != "replace" {
358 anyhow::bail!(
363 "Iceberg table is in an inconsistent state: snapshot {} has operation '{}' but is missing 'mz-frontier' property. Schema or partition spec evolution is not supported.",
364 snapshot.snapshot_id(),
365 snapshot.summary().operation.as_str(),
366 );
367 }
368 }
369
370 Ok(None)
371}
372
373fn relation_desc_to_iceberg_schema(
377 desc: &mz_repr::RelationDesc,
378) -> anyhow::Result<(ArrowSchema, SchemaRef)> {
379 let arrow_schema = mz_arrow_util::builder::desc_to_schema(desc)
380 .context("Failed to convert RelationDesc to Arrow schema")?;
381
382 let arrow_schema_with_ids = add_field_ids_to_arrow_schema(arrow_schema);
383
384 let iceberg_schema = arrow_schema_to_schema(&arrow_schema_with_ids)
385 .context("Failed to convert Arrow schema to Iceberg schema")?;
386
387 Ok((arrow_schema_with_ids, Arc::new(iceberg_schema)))
388}
389
390fn build_schema_with_op_column(schema: &ArrowSchema) -> ArrowSchema {
392 let mut fields: Vec<Field> = schema.fields().iter().map(|f| f.as_ref().clone()).collect();
393 fields.push(Field::new("__op", DataType::Int32, false));
394 ArrowSchema::new(fields)
395}
396
397fn row_to_recordbatch(
401 row: DiffPair<Row>,
402 schema: ArrowSchemaRef,
403 schema_with_op: ArrowSchemaRef,
404) -> anyhow::Result<RecordBatch> {
405 let mut builder = ArrowBuilder::new_with_schema(
406 Arc::clone(&schema),
407 DEFAULT_ARRAY_BUILDER_ITEM_CAPACITY,
408 DEFAULT_ARRAY_BUILDER_DATA_CAPACITY,
409 )
410 .context("Failed to create builder")?;
411
412 let mut op_values = Vec::new();
413
414 if let Some(before) = row.before {
415 builder
416 .add_row(&before)
417 .context("Failed to add delete row to builder")?;
418 op_values.push(-1i32); }
420 if let Some(after) = row.after {
421 builder
422 .add_row(&after)
423 .context("Failed to add insert row to builder")?;
424 op_values.push(1i32); }
426
427 let batch = builder
428 .to_record_batch()
429 .context("Failed to create record batch")?;
430
431 let mut columns = Vec::with_capacity(batch.columns().len() + 1);
432 columns.extend_from_slice(batch.columns());
433 let op_column = Arc::new(Int32Array::from(op_values));
434 columns.push(op_column);
435
436 let batch_with_op = RecordBatch::try_new(schema_with_op, columns)
437 .context("Failed to create batch with op column")?;
438 Ok(batch_with_op)
439}
440
441fn mint_batch_descriptions<G, D>(
446 name: String,
447 sink_id: GlobalId,
448 input: &VecCollection<G, D, Diff>,
449 sink: &StorageSinkDesc<CollectionMetadata, Timestamp>,
450 connection: IcebergSinkConnection,
451 storage_configuration: StorageConfiguration,
452 initial_schema: SchemaRef,
453) -> (
454 VecCollection<G, D, Diff>,
455 Stream<G, (Antichain<Timestamp>, Antichain<Timestamp>)>,
456 Stream<G, HealthStatusMessage>,
457 PressOnDropButton,
458)
459where
460 G: Scope<Timestamp = Timestamp>,
461 D: Clone + 'static,
462{
463 let scope = input.scope();
464 let name_for_error = name.clone();
465 let mut builder = OperatorBuilder::new(name, scope.clone());
466 let sink_version = sink.version;
467
468 let hashed_id = sink_id.hashed();
469 let is_active_worker = usize::cast_from(hashed_id) % scope.peers() == scope.index();
470 let (output, output_stream) = builder.new_output();
471 let (batch_desc_output, batch_desc_stream) =
472 builder.new_output::<CapacityContainerBuilder<_>>();
473 let mut input =
474 builder.new_input_for_many(&input.inner, Pipeline, [&output, &batch_desc_output]);
475
476 let as_of = sink.as_of.clone();
477 let commit_interval = sink
478 .commit_interval
479 .expect("the planner should have enforced this")
480 .clone();
481
482 let (button, errors): (_, Stream<G, Rc<anyhow::Error>>) = builder.build_fallible(move |caps| {
483 Box::pin(async move {
484 let [data_capset, capset]: &mut [_; 2] = caps.try_into().unwrap();
485 *data_capset = CapabilitySet::new();
486
487 if !is_active_worker {
488 *capset = CapabilitySet::new();
489 *data_capset = CapabilitySet::new();
490 while let Some(event) = input.next().await {
491 match event {
492 Event::Data([output_cap, _], mut data) => {
493 output.give_container(&output_cap, &mut data);
494 }
495 Event::Progress(_) => {}
496 }
497 }
498 return Ok(());
499 }
500
501 let catalog = connection
502 .catalog_connection
503 .connect(&storage_configuration, InTask::Yes)
504 .await
505 .context("Failed to connect to iceberg catalog")?;
506
507 let table = load_or_create_table(
508 catalog.as_ref(),
509 connection.namespace.clone(),
510 connection.table.clone(),
511 initial_schema.as_ref(),
512 )
513 .await?;
514
515 let mut snapshots: Vec<_> = table.metadata().snapshots().cloned().collect();
516 let resume = retrieve_upper_from_snapshots(&mut snapshots)?;
517 let (resume_upper, resume_version) = match resume {
518 Some((f, v)) => (f, v),
519 None => (Antichain::from_elem(Timestamp::minimum()), 0),
520 };
521
522 let overcompacted =
524 *resume_upper != [Timestamp::minimum()] &&
526 PartialOrder::less_than(&resume_upper, &as_of);
528
529 if overcompacted {
530 let err = format!(
531 "{name_for_error}: input compacted past resume upper: as_of {}, resume_upper: {}",
532 as_of.pretty(),
533 resume_upper.pretty()
534 );
535 return Err(anyhow::anyhow!("{err}"));
539 };
540
541 if resume_version > sink_version {
542 anyhow::bail!("Fenced off by newer sink version: resume_version {}, sink_version {}", resume_version, sink_version);
543 }
544
545 let mut initialized = false;
546 let mut observed_frontier;
547 let mut minted_batches = VecDeque::new();
552 loop {
553 if let Some(event) = input.next().await {
554 match event {
555 Event::Data([output_cap, _], mut data) => {
556 output.give_container(&output_cap, &mut data);
557 continue;
558 }
559 Event::Progress(frontier) => {
560 observed_frontier = frontier;
561 }
562 }
563 } else {
564 return Ok(());
565 }
566
567 if !initialized {
568 if PartialOrder::less_equal(&observed_frontier, &resume_upper) || PartialOrder::less_equal(&observed_frontier, &as_of) {
571 continue;
572 }
573
574 let mut batch_descriptions = vec![];
575 let mut current_upper = observed_frontier.clone();
576 let current_upper_ts = current_upper.as_option().unwrap().clone();
577
578 if PartialOrder::less_than(&resume_upper, ¤t_upper) {
580 let batch_description = (resume_upper.clone(), current_upper.clone());
581 batch_descriptions.push(batch_description);
582 }
583
584 for i in 1..INITIAL_DESCRIPTIONS_TO_MINT + 1 {
586 let duration_millis = commit_interval.as_millis()
587 .checked_mul(u128::from(i))
588 .expect("commit interval multiplication overflow");
589 let duration_ts = Timestamp::new(u64::try_from(duration_millis)
590 .expect("commit interval too large for u64"));
591 let desired_batch_upper = Antichain::from_elem(current_upper_ts.step_forward_by(&duration_ts));
592
593 let batch_description = (current_upper.clone(), desired_batch_upper);
594 current_upper = batch_description.1.clone();
595 batch_descriptions.push(batch_description);
596 }
597
598 minted_batches.extend(batch_descriptions.clone());
599
600 for desc in batch_descriptions {
601 batch_desc_output.give(&capset[0], desc);
602 }
603
604 capset.downgrade(current_upper);
605
606 initialized = true;
607 } else {
608 while let Some(oldest_desc) = minted_batches.front() {
611 let oldest_upper = &oldest_desc.1;
612 if !PartialOrder::less_equal(oldest_upper, &observed_frontier) {
613 break;
614 }
615
616 let newest_upper = minted_batches.back().unwrap().1.clone();
617 let new_lower = newest_upper.clone();
618 let duration_ts = Timestamp::new(commit_interval.as_millis()
619 .try_into()
620 .expect("commit interval too large for u64"));
621 let new_upper = Antichain::from_elem(newest_upper
622 .as_option()
623 .unwrap()
624 .step_forward_by(&duration_ts));
625
626 let new_batch_description = (new_lower.clone(), new_upper.clone());
627 minted_batches.pop_front();
628 minted_batches.push_back(new_batch_description.clone());
629
630 batch_desc_output.give(&capset[0], new_batch_description);
631
632 capset.downgrade(new_upper);
633 }
634 }
635 }
636 })
637 });
638
639 let statuses = errors.map(|error| HealthStatusMessage {
640 id: None,
641 update: HealthStatusUpdate::halting(format!("{}", error.display_with_causes()), None),
642 namespace: StatusNamespace::Iceberg,
643 });
644 (
645 output_stream.as_collection(),
646 batch_desc_stream,
647 statuses,
648 button.press_on_drop(),
649 )
650}
651
652#[derive(Clone, Debug, Serialize, Deserialize)]
653#[serde(try_from = "AvroDataFile", into = "AvroDataFile")]
654struct SerializableDataFile {
655 pub data_file: DataFile,
656 pub schema: Schema,
657}
658
659#[derive(Clone, Debug, Serialize, Deserialize)]
667struct AvroDataFile {
668 pub data_file: Vec<u8>,
669 pub schema: Vec<u8>,
671}
672
673impl From<SerializableDataFile> for AvroDataFile {
674 fn from(value: SerializableDataFile) -> Self {
675 let mut data_file = Vec::new();
676 write_data_files_to_avro(
677 &mut data_file,
678 [value.data_file],
679 &StructType::new(vec![]),
680 FormatVersion::V2,
681 )
682 .expect("serialization into buffer");
683 let schema = serde_json::to_vec(&value.schema).expect("schema serialization");
684 AvroDataFile { data_file, schema }
685 }
686}
687
688impl TryFrom<AvroDataFile> for SerializableDataFile {
689 type Error = String;
690
691 fn try_from(value: AvroDataFile) -> Result<Self, Self::Error> {
692 let schema: Schema = serde_json::from_slice(&value.schema)
693 .map_err(|e| format!("Failed to deserialize schema: {}", e))?;
694 let data_files = read_data_files_from_avro(
695 &mut &*value.data_file,
696 &schema,
697 0,
698 &StructType::new(vec![]),
699 FormatVersion::V2,
700 )
701 .map_err_to_string_with_causes()?;
702 let Some(data_file) = data_files.into_iter().next() else {
703 return Err("No DataFile found in Avro data".into());
704 };
705 Ok(SerializableDataFile { data_file, schema })
706 }
707}
708
709#[derive(Clone, Debug, Serialize, Deserialize)]
711struct BoundedDataFile {
712 pub data_file: SerializableDataFile,
713 pub batch_desc: (Antichain<Timestamp>, Antichain<Timestamp>),
714}
715
716impl BoundedDataFile {
717 pub fn new(
718 file: DataFile,
719 schema: Schema,
720 batch_desc: (Antichain<Timestamp>, Antichain<Timestamp>),
721 ) -> Self {
722 Self {
723 data_file: SerializableDataFile {
724 data_file: file,
725 schema,
726 },
727 batch_desc,
728 }
729 }
730
731 pub fn batch_desc(&self) -> &(Antichain<Timestamp>, Antichain<Timestamp>) {
732 &self.batch_desc
733 }
734
735 pub fn data_file(&self) -> &DataFile {
736 &self.data_file.data_file
737 }
738
739 pub fn into_data_file(self) -> DataFile {
740 self.data_file.data_file
741 }
742}
743
744#[derive(Clone, Debug, Default)]
746struct BoundedDataFileSet {
747 pub data_files: Vec<BoundedDataFile>,
748}
749
750fn write_data_files<G>(
754 name: String,
755 input: VecCollection<G, (Option<Row>, DiffPair<Row>), Diff>,
756 batch_desc_input: &Stream<G, (Antichain<Timestamp>, Antichain<Timestamp>)>,
757 connection: IcebergSinkConnection,
758 storage_configuration: StorageConfiguration,
759 materialize_arrow_schema: Arc<ArrowSchema>,
760 metrics: IcebergSinkMetrics,
761 statistics: SinkStatistics,
762) -> (
763 Stream<G, BoundedDataFile>,
764 Stream<G, HealthStatusMessage>,
765 PressOnDropButton,
766)
767where
768 G: Scope<Timestamp = Timestamp>,
769{
770 let scope = input.scope();
771 let mut builder = OperatorBuilder::new(name, scope.clone());
772
773 let (output, output_stream) = builder.new_output::<CapacityContainerBuilder<_>>();
774
775 let mut batch_desc_input =
776 builder.new_input_for(&batch_desc_input.broadcast(), Pipeline, &output);
777 let mut input = builder.new_disconnected_input(&input.inner, Pipeline);
778
779 let (button, errors) = builder.build_fallible(|caps| {
780 Box::pin(async move {
781 let [capset]: &mut [_; 1] = caps.try_into().unwrap();
782 let catalog = connection
783 .catalog_connection
784 .connect(&storage_configuration, InTask::Yes)
785 .await
786 .context("Failed to connect to iceberg catalog")?;
787
788 let namespace_ident = NamespaceIdent::new(connection.namespace.clone());
789 let table_ident = TableIdent::new(namespace_ident, connection.table.clone());
790 let table = catalog
791 .load_table(&table_ident)
792 .await
793 .context("Failed to load Iceberg table")?;
794
795 let table_metadata = table.metadata().clone();
796 let current_schema = Arc::clone(table_metadata.current_schema());
797
798 let arrow_schema = Arc::new(
799 merge_materialize_metadata_into_iceberg_schema(
800 materialize_arrow_schema.as_ref(),
801 current_schema.as_ref(),
802 )
803 .context("Failed to merge Materialize metadata into Iceberg schema")?,
804 );
805
806 let schema_with_op = Arc::new(build_schema_with_op_column(&arrow_schema));
807
808 let location = table_metadata.location();
812 let corrected_location = match location.rsplit_once("/metadata/") {
813 Some((a, b)) if b.ends_with(".metadata.json") => a,
814 _ => location,
815 };
816
817 let data_location = format!("{}/data", corrected_location);
818 let location_generator = DefaultLocationGenerator::with_data_location(data_location);
819
820 let unique_suffix = format!("-{}", uuid::Uuid::new_v4());
822 let file_name_generator = DefaultFileNameGenerator::new(
823 PARQUET_FILE_PREFIX.to_string(),
824 Some(unique_suffix),
825 iceberg::spec::DataFileFormat::Parquet,
826 );
827
828 let file_io = table.file_io().clone();
829
830 let Some((_, equality_indices)) = connection.key_desc_and_indices else {
831 return Err(anyhow::anyhow!(
832 "Iceberg sink requires key columns for equality deletes"
833 ));
834 };
835
836 let equality_ids: Vec<i32> = equality_indices
837 .iter()
838 .map(|u| i32::try_from(*u).map(|v| v + 1))
839 .collect::<Result<Vec<i32>, _>>()
840 .context("Failed to convert equality index to i32 (index too large)")?;
841
842 let writer_properties = WriterProperties::new();
843
844 let create_delta_writer = || async {
845 let data_parquet_writer = ParquetWriterBuilder::new(
846 writer_properties.clone(),
847 Arc::clone(¤t_schema),
848 );
849 let data_rolling_writer = RollingFileWriterBuilder::new_with_default_file_size(
850 data_parquet_writer,
851 Arc::clone(¤t_schema),
852 file_io.clone(),
853 location_generator.clone(),
854 file_name_generator.clone(),
855 );
856 let data_writer_builder = DataFileWriterBuilder::new(data_rolling_writer);
857
858 let pos_arrow_schema = PositionDeleteWriterConfig::arrow_schema();
859 let pos_schema =
860 Arc::new(arrow_schema_to_schema(&pos_arrow_schema).context(
861 "Failed to convert position delete Arrow schema to Iceberg schema",
862 )?);
863 let pos_config = PositionDeleteWriterConfig::new(None, 0, None);
864 let pos_parquet_writer =
865 ParquetWriterBuilder::new(writer_properties.clone(), pos_schema);
866 let pos_rolling_writer = RollingFileWriterBuilder::new_with_default_file_size(
867 pos_parquet_writer,
868 Arc::clone(¤t_schema),
869 file_io.clone(),
870 location_generator.clone(),
871 file_name_generator.clone(),
872 );
873 let pos_delete_writer_builder =
874 PositionDeleteFileWriterBuilder::new(pos_rolling_writer, pos_config);
875
876 let eq_config = EqualityDeleteWriterConfig::new(
877 equality_ids.clone(),
878 Arc::clone(¤t_schema),
879 )
880 .context("Failed to create EqualityDeleteWriterConfig")?;
881
882 let eq_schema = Arc::new(
883 arrow_schema_to_schema(eq_config.projected_arrow_schema_ref()).context(
884 "Failed to convert equality delete Arrow schema to Iceberg schema",
885 )?,
886 );
887
888 let eq_parquet_writer =
889 ParquetWriterBuilder::new(writer_properties.clone(), eq_schema);
890 let eq_rolling_writer = RollingFileWriterBuilder::new_with_default_file_size(
891 eq_parquet_writer,
892 Arc::clone(¤t_schema),
893 file_io.clone(),
894 location_generator.clone(),
895 file_name_generator.clone(),
896 );
897 let eq_delete_writer_builder =
898 EqualityDeleteFileWriterBuilder::new(eq_rolling_writer, eq_config);
899
900 DeltaWriterBuilder::new(
901 data_writer_builder,
902 pos_delete_writer_builder,
903 eq_delete_writer_builder,
904 equality_ids.clone(),
905 )
906 .build(None)
907 .await
908 .context("Failed to create DeltaWriter")
909 };
910
911 let mut stashed_rows: BTreeMap<Timestamp, Vec<(Option<Row>, DiffPair<Row>)>> =
914 BTreeMap::new();
915
916 #[allow(clippy::disallowed_types)]
921 let mut in_flight_batches: std::collections::HashMap<
922 (Antichain<Timestamp>, Antichain<Timestamp>),
923 DeltaWriterType,
924 > = std::collections::HashMap::new();
925
926 let mut batch_description_frontier = Antichain::from_elem(Timestamp::minimum());
927 let mut processed_batch_description_frontier =
928 Antichain::from_elem(Timestamp::minimum());
929 let mut input_frontier = Antichain::from_elem(Timestamp::minimum());
930 let mut processed_input_frontier = Antichain::from_elem(Timestamp::minimum());
931
932 while !(batch_description_frontier.is_empty() && input_frontier.is_empty()) {
933 tokio::select! {
934 _ = batch_desc_input.ready() => {},
935 _ = input.ready() => {}
936 }
937
938 while let Some(event) = batch_desc_input.next_sync() {
939 match event {
940 Event::Data(_cap, data) => {
941 for batch_desc in data {
942 let (lower, upper) = &batch_desc;
943 let mut delta_writer = create_delta_writer().await?;
944 let row_ts_keys: Vec<_> = stashed_rows.keys().cloned().collect();
946 for row_ts in row_ts_keys {
947 let ts = Antichain::from_elem(row_ts.clone());
948 if PartialOrder::less_equal(lower, &ts)
949 && PartialOrder::less_than(&ts, upper)
950 {
951 if let Some(rows) = stashed_rows.remove(&row_ts) {
952 for (_row, diff_pair) in rows {
953 metrics.stashed_rows.dec();
954 let record_batch = row_to_recordbatch(
955 diff_pair.clone(),
956 Arc::clone(&arrow_schema),
957 Arc::clone(&schema_with_op),
958 )
959 .context("failed to convert row to recordbatch")?;
960 delta_writer.write(record_batch).await.context(
961 "Failed to write row to DeltaWriter",
962 )?;
963 }
964 }
965 }
966 }
967 let prev =
968 in_flight_batches.insert(batch_desc.clone(), delta_writer);
969 if prev.is_some() {
970 anyhow::bail!(
971 "Duplicate batch description received for description {:?}",
972 batch_desc
973 );
974 }
975 }
976 }
977 Event::Progress(frontier) => {
978 batch_description_frontier = frontier;
979 }
980 }
981 }
982
983 let ready_events = std::iter::from_fn(|| input.next_sync()).collect_vec();
984 for event in ready_events {
985 match event {
986 Event::Data(_cap, data) => {
987 for ((row, diff_pair), ts, _diff) in data {
988 let row_ts = ts.clone();
989 let ts_antichain = Antichain::from_elem(row_ts.clone());
990 let mut written = false;
991 for (batch_desc, delta_writer) in in_flight_batches.iter_mut() {
993 let (lower, upper) = batch_desc;
994 if PartialOrder::less_equal(lower, &ts_antichain)
995 && PartialOrder::less_than(&ts_antichain, upper)
996 {
997 let record_batch = row_to_recordbatch(
998 diff_pair.clone(),
999 Arc::clone(&arrow_schema),
1000 Arc::clone(&schema_with_op),
1001 )
1002 .context("failed to convert row to recordbatch")?;
1003 delta_writer
1004 .write(record_batch)
1005 .await
1006 .context("Failed to write row to DeltaWriter")?;
1007 written = true;
1008 break;
1009 }
1010 }
1011 if !written {
1012 let entry = stashed_rows.entry(row_ts).or_default();
1014 metrics.stashed_rows.inc();
1015 entry.push((row, diff_pair));
1016 }
1017 }
1018 }
1019 Event::Progress(frontier) => {
1020 input_frontier = frontier;
1021 }
1022 }
1023 }
1024
1025 if PartialOrder::less_equal(
1027 &processed_batch_description_frontier,
1028 &batch_description_frontier,
1029 ) || PartialOrder::less_than(&processed_input_frontier, &input_frontier)
1030 {
1031 let ready_batches: Vec<_> = in_flight_batches
1033 .extract_if(|(lower, upper), _| {
1034 PartialOrder::less_than(lower, &batch_description_frontier)
1035 && PartialOrder::less_than(upper, &input_frontier)
1036 })
1037 .collect();
1038
1039 if !ready_batches.is_empty() {
1040 let mut max_upper = Antichain::from_elem(Timestamp::minimum());
1041 for (desc, mut delta_writer) in ready_batches {
1042 let data_files = delta_writer
1043 .close()
1044 .await
1045 .context("Failed to close DeltaWriter")?;
1046 for data_file in data_files {
1047 match data_file.content_type() {
1048 iceberg::spec::DataContentType::Data => {
1049 metrics.data_files_written.inc();
1050 }
1051 iceberg::spec::DataContentType::PositionDeletes
1052 | iceberg::spec::DataContentType::EqualityDeletes => {
1053 metrics.delete_files_written.inc();
1054 }
1055 }
1056 statistics.inc_messages_staged_by(data_file.record_count());
1057 statistics.inc_bytes_staged_by(data_file.file_size_in_bytes());
1058 let file = BoundedDataFile::new(
1059 data_file,
1060 current_schema.as_ref().clone(),
1061 desc.clone(),
1062 );
1063 output.give(&capset[0], file);
1064 }
1065
1066 max_upper = max_upper.join(&desc.1);
1067 }
1068
1069 capset.downgrade(max_upper);
1070 }
1071 processed_batch_description_frontier.clone_from(&batch_description_frontier);
1072 processed_input_frontier.clone_from(&input_frontier);
1073 }
1074 }
1075 Ok(())
1076 })
1077 });
1078
1079 let statuses = errors.map(|error| HealthStatusMessage {
1080 id: None,
1081 update: HealthStatusUpdate::halting(format!("{}", error.display_with_causes()), None),
1082 namespace: StatusNamespace::Iceberg,
1083 });
1084 (output_stream, statuses, button.press_on_drop())
1085}
1086
1087fn commit_to_iceberg<G>(
1091 name: String,
1092 sink_id: GlobalId,
1093 sink_version: u64,
1094 batch_input: &Stream<G, BoundedDataFile>,
1095 batch_desc_input: &Stream<G, (Antichain<Timestamp>, Antichain<Timestamp>)>,
1096 write_frontier: Rc<RefCell<Antichain<Timestamp>>>,
1097 connection: IcebergSinkConnection,
1098 storage_configuration: StorageConfiguration,
1099 write_handle: impl Future<
1100 Output = anyhow::Result<WriteHandle<SourceData, (), Timestamp, StorageDiff>>,
1101 > + 'static,
1102 metrics: IcebergSinkMetrics,
1103 statistics: SinkStatistics,
1104) -> (Stream<G, HealthStatusMessage>, PressOnDropButton)
1105where
1106 G: Scope<Timestamp = Timestamp>,
1107{
1108 let scope = batch_input.scope();
1109 let mut builder = OperatorBuilder::new(name, scope.clone());
1110
1111 let hashed_id = sink_id.hashed();
1112 let is_active_worker = usize::cast_from(hashed_id) % scope.peers() == scope.index();
1113
1114 let mut input = builder.new_disconnected_input(batch_input, Exchange::new(move |_| hashed_id));
1115 let mut batch_desc_input =
1116 builder.new_disconnected_input(batch_desc_input, Exchange::new(move |_| hashed_id));
1117
1118 let (button, errors) = builder.build_fallible(move |_caps| {
1119 Box::pin(async move {
1120 if !is_active_worker {
1121 write_frontier.borrow_mut().clear();
1122 return Ok(());
1123 }
1124
1125 let catalog = connection
1126 .catalog_connection
1127 .connect(&storage_configuration, InTask::Yes)
1128 .await
1129 .context("Failed to connect to iceberg catalog")?;
1130
1131 let mut write_handle = write_handle.await?;
1132
1133 let namespace_ident = NamespaceIdent::new(connection.namespace.clone());
1134 let table_ident = TableIdent::new(namespace_ident, connection.table.clone());
1135 let mut table = catalog
1136 .load_table(&table_ident)
1137 .await
1138 .context("Failed to load Iceberg table")?;
1139
1140 #[allow(clippy::disallowed_types)]
1141 let mut batch_descriptions: std::collections::HashMap<
1142 (Antichain<Timestamp>, Antichain<Timestamp>),
1143 BoundedDataFileSet,
1144 > = std::collections::HashMap::new();
1145
1146
1147 let mut batch_description_frontier = Antichain::from_elem(Timestamp::minimum());
1148 let mut input_frontier = Antichain::from_elem(Timestamp::minimum());
1149
1150 while !(batch_description_frontier.is_empty() && input_frontier.is_empty()) {
1151 tokio::select! {
1152 _ = batch_desc_input.ready() => {},
1153 _ = input.ready() => {}
1154 }
1155
1156 while let Some(event) = batch_desc_input.next_sync() {
1157 match event {
1158 Event::Data(_cap, data) => {
1159 for batch_desc in data {
1160 let prev = batch_descriptions.insert(batch_desc, BoundedDataFileSet { data_files: vec![] });
1161 if let Some(prev) = prev {
1162 anyhow::bail!(
1163 "Duplicate batch description received in commit operator: {:?}",
1164 prev
1165 );
1166 }
1167 }
1168 }
1169 Event::Progress(frontier) => {
1170 batch_description_frontier = frontier;
1171 }
1172 }
1173 }
1174
1175 let ready_events = std::iter::from_fn(|| input.next_sync()).collect_vec();
1176 for event in ready_events {
1177 match event {
1178 Event::Data(_cap, data) => {
1179 for bounded_data_file in data {
1180 let entry = batch_descriptions.entry(bounded_data_file.batch_desc().clone()).or_default();
1181 entry.data_files.push(bounded_data_file);
1182 }
1183 }
1184 Event::Progress(frontier) => {
1185 input_frontier = frontier;
1186 }
1187 }
1188 }
1189
1190 let mut done_batches: Vec<_> = batch_descriptions
1191 .keys()
1192 .filter(|(lower, _upper)| {
1193 PartialOrder::less_than(lower, &input_frontier)
1194 })
1195 .cloned()
1196 .collect();
1197
1198 done_batches.sort_by(|a, b| {
1200 if PartialOrder::less_than(a, b) {
1201 Ordering::Less
1202 } else if PartialOrder::less_than(b, a) {
1203 Ordering::Greater
1204 } else {
1205 Ordering::Equal
1206 }
1207 });
1208
1209 for batch in done_batches {
1210 let file_set = batch_descriptions.remove(&batch).unwrap();
1211
1212 let mut data_files = vec![];
1213 let mut delete_files = vec![];
1214 let mut total_messages: u64 = 0;
1216 let mut total_bytes: u64 = 0;
1217 for file in file_set.data_files {
1218 total_messages += file.data_file().record_count();
1219 total_bytes += file.data_file().file_size_in_bytes();
1220 match file.data_file().content_type() {
1221 iceberg::spec::DataContentType::Data => {
1222 data_files.push(file.into_data_file());
1223 }
1224 iceberg::spec::DataContentType::PositionDeletes |
1225 iceberg::spec::DataContentType::EqualityDeletes => {
1226 delete_files.push(file.into_data_file());
1227 }
1228 }
1229 }
1230
1231 let frontier = batch.1.clone();
1232 let tx = Transaction::new(&table);
1233
1234 let frontier_json = serde_json::to_string(&frontier.elements())
1235 .context("Failed to serialize frontier to JSON")?;
1236
1237 let mut action = tx.row_delta().set_snapshot_properties(
1239 vec![
1240 ("mz-sink-id".to_string(), sink_id.to_string()),
1241 ("mz-frontier".to_string(), frontier_json),
1242 ("mz-sink-version".to_string(), sink_version.to_string()),
1243 ].into_iter().collect()
1244 );
1245
1246 if !data_files.is_empty() || !delete_files.is_empty() {
1247 action = action.add_data_files(data_files).add_delete_files(delete_files);
1248 }
1249
1250 let tx = action.apply(tx).context(
1251 "Failed to apply data file addition to iceberg table transaction",
1252 )?;
1253
1254 table = Retry::default().max_tries(5).retry_async(|_| async {
1255 let new_table = tx.clone().commit(catalog.as_ref()).await;
1256 match new_table {
1257 Err(e) if matches!(e.kind(), ErrorKind::CatalogCommitConflicts) => {
1258 metrics.commit_conflicts.inc();
1259 let table = reload_table(
1260 catalog.as_ref(),
1261 connection.namespace.clone(),
1262 connection.table.clone(),
1263 table.clone(),
1264 ).await;
1265 let table = match table {
1266 Ok(table) => table,
1267 Err(e) => return RetryResult::RetryableErr(anyhow!(e)),
1268 };
1269
1270 let mut snapshots: Vec<_> = table.metadata().snapshots().cloned().collect();
1271 let last = retrieve_upper_from_snapshots(&mut snapshots);
1272 let last = match last {
1273 Ok(val) => val,
1274 Err(e) => return RetryResult::RetryableErr(anyhow!(e)),
1275 };
1276
1277 if let Some((last_frontier, _last_version)) = last {
1279 if PartialOrder::less_equal(&frontier, &last_frontier) {
1280 return RetryResult::FatalErr(anyhow!(
1281 "Iceberg table '{}' has been modified by another writer. Current frontier: {:?}, last frontier: {:?}.",
1282 connection.table,
1283 frontier,
1284 last_frontier,
1285 ));
1286 }
1287 }
1288
1289 RetryResult::Ok(table)
1290 }
1291 Err(e) => {
1292 metrics.commit_failures.inc();
1293 RetryResult::RetryableErr(anyhow!(e))
1294 },
1295 Ok(table) => RetryResult::Ok(table)
1296 }
1297 }).await.context("failed to commit to iceberg")?;
1298
1299 metrics.snapshots_committed.inc();
1300 statistics.inc_messages_committed_by(total_messages);
1301 statistics.inc_bytes_committed_by(total_bytes);
1302
1303 let mut expect_upper = write_handle.shared_upper();
1304 loop {
1305 if PartialOrder::less_equal(&frontier, &expect_upper) {
1306 break;
1308 }
1309
1310 const EMPTY: &[((SourceData, ()), Timestamp, StorageDiff)] = &[];
1311 match write_handle
1312 .compare_and_append(EMPTY, expect_upper, frontier.clone())
1313 .await
1314 .expect("valid usage")
1315 {
1316 Ok(()) => break,
1317 Err(mismatch) => {
1318 expect_upper = mismatch.current;
1319 }
1320 }
1321 }
1322 write_frontier.borrow_mut().clone_from(&frontier);
1323
1324 }
1325 }
1326
1327
1328 Ok(())
1329 })
1330 });
1331
1332 let statuses = errors.map(|error| HealthStatusMessage {
1333 id: None,
1334 update: HealthStatusUpdate::halting(format!("{}", error.display_with_causes()), None),
1335 namespace: StatusNamespace::Iceberg,
1336 });
1337
1338 (statuses, button.press_on_drop())
1339}
1340
1341impl<G: Scope<Timestamp = Timestamp>> SinkRender<G> for IcebergSinkConnection {
1342 fn get_key_indices(&self) -> Option<&[usize]> {
1343 self.key_desc_and_indices
1344 .as_ref()
1345 .map(|(_, indices)| indices.as_slice())
1346 }
1347
1348 fn get_relation_key_indices(&self) -> Option<&[usize]> {
1349 self.relation_key_indices.as_deref()
1350 }
1351
1352 fn render_sink(
1353 &self,
1354 storage_state: &mut StorageState,
1355 sink: &StorageSinkDesc<CollectionMetadata, Timestamp>,
1356 sink_id: GlobalId,
1357 input: VecCollection<G, (Option<Row>, DiffPair<Row>), Diff>,
1358 _err_collection: VecCollection<G, DataflowError, Diff>,
1359 ) -> (Stream<G, HealthStatusMessage>, Vec<PressOnDropButton>) {
1360 let mut scope = input.scope();
1361
1362 let write_handle = {
1363 let persist = Arc::clone(&storage_state.persist_clients);
1364 let shard_meta = sink.to_storage_metadata.clone();
1365 async move {
1366 let client = persist.open(shard_meta.persist_location).await?;
1367 let handle = client
1368 .open_writer(
1369 shard_meta.data_shard,
1370 Arc::new(shard_meta.relation_desc),
1371 Arc::new(UnitSchema),
1372 Diagnostics::from_purpose("sink handle"),
1373 )
1374 .await?;
1375 Ok(handle)
1376 }
1377 };
1378
1379 let write_frontier = Rc::new(RefCell::new(Antichain::from_elem(Timestamp::minimum())));
1380 storage_state
1381 .sink_write_frontiers
1382 .insert(sink_id, Rc::clone(&write_frontier));
1383
1384 let (arrow_schema_with_ids, iceberg_schema) =
1385 match relation_desc_to_iceberg_schema(&sink.from_desc) {
1386 Ok(schemas) => schemas,
1387 Err(err) => {
1388 let error_stream = std::iter::once(HealthStatusMessage {
1389 id: None,
1390 update: HealthStatusUpdate::halting(
1391 format!("{}", err.display_with_causes()),
1392 None,
1393 ),
1394 namespace: StatusNamespace::Iceberg,
1395 })
1396 .to_stream(&mut scope);
1397 return (error_stream, vec![]);
1398 }
1399 };
1400
1401 let metrics = storage_state
1402 .metrics
1403 .get_iceberg_sink_metrics(sink_id, scope.index());
1404
1405 let statistics = storage_state
1406 .aggregated_statistics
1407 .get_sink(&sink_id)
1408 .expect("statistics initialized")
1409 .clone();
1410
1411 let connection_for_minter = self.clone();
1412 let (minted_input, batch_descriptions, mint_status, mint_button) = mint_batch_descriptions(
1413 format!("{sink_id}-iceberg-mint"),
1414 sink_id,
1415 &input,
1416 sink,
1417 connection_for_minter,
1418 storage_state.storage_configuration.clone(),
1419 Arc::clone(&iceberg_schema),
1420 );
1421
1422 let connection_for_writer = self.clone();
1423 let (datafiles, write_status, write_button) = write_data_files(
1424 format!("{sink_id}-write-data-files"),
1425 minted_input,
1426 &batch_descriptions,
1427 connection_for_writer,
1428 storage_state.storage_configuration.clone(),
1429 Arc::new(arrow_schema_with_ids.clone()),
1430 metrics.clone(),
1431 statistics.clone(),
1432 );
1433
1434 let connection_for_committer = self.clone();
1435 let (commit_status, commit_button) = commit_to_iceberg(
1436 format!("{sink_id}-commit-to-iceberg"),
1437 sink_id,
1438 sink.version,
1439 &datafiles,
1440 &batch_descriptions,
1441 Rc::clone(&write_frontier),
1442 connection_for_committer,
1443 storage_state.storage_configuration.clone(),
1444 write_handle,
1445 metrics.clone(),
1446 statistics,
1447 );
1448
1449 let running_status = Some(HealthStatusMessage {
1450 id: None,
1451 update: HealthStatusUpdate::running(),
1452 namespace: StatusNamespace::Iceberg,
1453 })
1454 .to_stream(&mut scope);
1455
1456 let statuses =
1457 scope.concatenate([running_status, mint_status, write_status, commit_status]);
1458
1459 (statuses, vec![mint_button, write_button, commit_button])
1460 }
1461}