1use std::cmp::Ordering;
75use std::collections::{BTreeMap, VecDeque};
76use std::convert::Infallible;
77use std::time::Instant;
78use std::{cell::RefCell, rc::Rc, sync::Arc};
79
80use anyhow::{Context, anyhow};
81use arrow::array::{ArrayRef, Int32Array, RecordBatch};
82use arrow::datatypes::{DataType, Field, Schema as ArrowSchema, SchemaRef as ArrowSchemaRef};
83use differential_dataflow::lattice::Lattice;
84use differential_dataflow::{AsCollection, Hashable, VecCollection};
85use futures::StreamExt;
86use iceberg::ErrorKind;
87use iceberg::arrow::{arrow_schema_to_schema, schema_to_arrow_schema};
88use iceberg::spec::{
89 DataFile, FormatVersion, Snapshot, StructType, read_data_files_from_avro,
90 write_data_files_to_avro,
91};
92use iceberg::spec::{Schema, SchemaRef};
93use iceberg::table::Table;
94use iceberg::transaction::{ApplyTransactionAction, Transaction};
95use iceberg::writer::base_writer::data_file_writer::DataFileWriter;
96use iceberg::writer::base_writer::data_file_writer::DataFileWriterBuilder;
97use iceberg::writer::base_writer::equality_delete_writer::{
98 EqualityDeleteFileWriter, EqualityDeleteFileWriterBuilder, EqualityDeleteWriterConfig,
99};
100use iceberg::writer::base_writer::position_delete_writer::PositionDeleteFileWriter;
101use iceberg::writer::base_writer::position_delete_writer::{
102 PositionDeleteFileWriterBuilder, PositionDeleteWriterConfig,
103};
104use iceberg::writer::combined_writer::delta_writer::{DeltaWriter, DeltaWriterBuilder};
105use iceberg::writer::file_writer::ParquetWriterBuilder;
106use iceberg::writer::file_writer::location_generator::{
107 DefaultFileNameGenerator, DefaultLocationGenerator,
108};
109use iceberg::writer::file_writer::rolling_writer::RollingFileWriterBuilder;
110use iceberg::writer::{IcebergWriter, IcebergWriterBuilder};
111use iceberg::{Catalog, NamespaceIdent, TableCreation, TableIdent};
112use itertools::Itertools;
113use mz_arrow_util::builder::{ARROW_EXTENSION_NAME_KEY, ArrowBuilder};
114use mz_interchange::avro::DiffPair;
115use mz_ore::cast::CastFrom;
116use mz_ore::error::ErrorExt;
117use mz_ore::future::InTask;
118use mz_ore::result::ResultExt;
119use mz_ore::retry::{Retry, RetryResult};
120use mz_persist_client::Diagnostics;
121use mz_persist_client::write::WriteHandle;
122use mz_persist_types::codec_impls::UnitSchema;
123use mz_repr::{Diff, GlobalId, Row, Timestamp};
124use mz_storage_types::StorageDiff;
125use mz_storage_types::configuration::StorageConfiguration;
126use mz_storage_types::controller::CollectionMetadata;
127use mz_storage_types::errors::DataflowError;
128use mz_storage_types::sinks::{IcebergSinkConnection, StorageSinkDesc};
129use mz_storage_types::sources::SourceData;
130use mz_timely_util::antichain::AntichainExt;
131use mz_timely_util::builder_async::{Event, OperatorBuilder, PressOnDropButton};
132use parquet::arrow::PARQUET_FIELD_ID_META_KEY;
133use parquet::file::properties::WriterProperties;
134use serde::{Deserialize, Serialize};
135use timely::PartialOrder;
136use timely::container::CapacityContainerBuilder;
137use timely::dataflow::channels::pact::{Exchange, Pipeline};
138use timely::dataflow::operators::{Broadcast, CapabilitySet, Concatenate, Map, ToStream};
139use timely::dataflow::{Scope, Stream};
140use timely::progress::{Antichain, Timestamp as _};
141use tracing::debug;
142
143use crate::healthcheck::{HealthStatusMessage, HealthStatusUpdate, StatusNamespace};
144use crate::metrics::sink::iceberg::IcebergSinkMetrics;
145use crate::render::sinks::SinkRender;
146use crate::statistics::SinkStatistics;
147use crate::storage_state::StorageState;
148
149const DEFAULT_ARRAY_BUILDER_ITEM_CAPACITY: usize = 1024;
152const DEFAULT_ARRAY_BUILDER_DATA_CAPACITY: usize = 1024;
156
157const PARQUET_FILE_PREFIX: &str = "mz_data";
159const INITIAL_DESCRIPTIONS_TO_MINT: u64 = 3;
162
163type DeltaWriterType = DeltaWriter<
164 DataFileWriter<ParquetWriterBuilder, DefaultLocationGenerator, DefaultFileNameGenerator>,
165 PositionDeleteFileWriter<
166 ParquetWriterBuilder,
167 DefaultLocationGenerator,
168 DefaultFileNameGenerator,
169 >,
170 EqualityDeleteFileWriter<
171 ParquetWriterBuilder,
172 DefaultLocationGenerator,
173 DefaultFileNameGenerator,
174 >,
175>;
176
177fn add_field_ids_to_arrow_schema(schema: ArrowSchema) -> ArrowSchema {
182 let mut next_field_id = 1i32;
183 let fields: Vec<Field> = schema
184 .fields()
185 .iter()
186 .map(|field| add_field_ids_recursive(field, &mut next_field_id))
187 .collect();
188 ArrowSchema::new(fields).with_metadata(schema.metadata().clone())
189}
190
191fn add_field_ids_recursive(field: &Field, next_id: &mut i32) -> Field {
193 let current_id = *next_id;
194 *next_id += 1;
195
196 let mut metadata = field.metadata().clone();
197 metadata.insert(
198 PARQUET_FIELD_ID_META_KEY.to_string(),
199 current_id.to_string(),
200 );
201
202 let new_data_type = add_field_ids_to_datatype(field.data_type(), next_id);
203
204 Field::new(field.name(), new_data_type, field.is_nullable()).with_metadata(metadata)
205}
206
207fn add_field_ids_to_datatype(data_type: &DataType, next_id: &mut i32) -> DataType {
209 match data_type {
210 DataType::Struct(fields) => {
211 let new_fields: Vec<Field> = fields
212 .iter()
213 .map(|f| add_field_ids_recursive(f, next_id))
214 .collect();
215 DataType::Struct(new_fields.into())
216 }
217 DataType::List(element_field) => {
218 let new_element = add_field_ids_recursive(element_field, next_id);
219 DataType::List(Arc::new(new_element))
220 }
221 DataType::LargeList(element_field) => {
222 let new_element = add_field_ids_recursive(element_field, next_id);
223 DataType::LargeList(Arc::new(new_element))
224 }
225 DataType::Map(entries_field, sorted) => {
226 let new_entries = add_field_ids_recursive(entries_field, next_id);
227 DataType::Map(Arc::new(new_entries), *sorted)
228 }
229 _ => data_type.clone(),
230 }
231}
232
233fn merge_materialize_metadata_into_iceberg_schema(
238 materialize_arrow_schema: &ArrowSchema,
239 iceberg_schema: &Schema,
240) -> anyhow::Result<ArrowSchema> {
241 let iceberg_arrow_schema = schema_to_arrow_schema(iceberg_schema)
243 .context("Failed to convert Iceberg schema to Arrow schema")?;
244
245 let fields: Vec<Field> = iceberg_arrow_schema
247 .fields()
248 .iter()
249 .map(|iceberg_field| {
250 let mz_field = materialize_arrow_schema
252 .field_with_name(iceberg_field.name())
253 .with_context(|| {
254 format!(
255 "Field '{}' not found in Materialize schema",
256 iceberg_field.name()
257 )
258 })?;
259
260 merge_field_metadata_recursive(iceberg_field, Some(mz_field))
261 })
262 .collect::<anyhow::Result<Vec<_>>>()?;
263
264 Ok(ArrowSchema::new(fields).with_metadata(iceberg_arrow_schema.metadata().clone()))
265}
266
267fn merge_field_metadata_recursive(
269 iceberg_field: &Field,
270 mz_field: Option<&Field>,
271) -> anyhow::Result<Field> {
272 let mut metadata = iceberg_field.metadata().clone();
274
275 if let Some(mz_f) = mz_field {
277 if let Some(extension_name) = mz_f.metadata().get(ARROW_EXTENSION_NAME_KEY) {
278 metadata.insert(ARROW_EXTENSION_NAME_KEY.to_string(), extension_name.clone());
279 }
280 }
281
282 let new_data_type = match iceberg_field.data_type() {
284 DataType::Struct(iceberg_fields) => {
285 let mz_struct_fields = match mz_field {
286 Some(f) => match f.data_type() {
287 DataType::Struct(fields) => Some(fields),
288 other => anyhow::bail!(
289 "Type mismatch for field '{}': Iceberg schema has Struct, but Materialize schema has {:?}",
290 iceberg_field.name(),
291 other
292 ),
293 },
294 None => None,
295 };
296
297 let new_fields: Vec<Field> = iceberg_fields
298 .iter()
299 .map(|iceberg_inner| {
300 let mz_inner = mz_struct_fields.and_then(|fields| {
301 fields.iter().find(|f| f.name() == iceberg_inner.name())
302 });
303 merge_field_metadata_recursive(iceberg_inner, mz_inner.map(|f| f.as_ref()))
304 })
305 .collect::<anyhow::Result<Vec<_>>>()?;
306
307 DataType::Struct(new_fields.into())
308 }
309 DataType::List(iceberg_element) => {
310 let mz_element = match mz_field {
311 Some(f) => match f.data_type() {
312 DataType::List(element) => Some(element.as_ref()),
313 other => anyhow::bail!(
314 "Type mismatch for field '{}': Iceberg schema has List, but Materialize schema has {:?}",
315 iceberg_field.name(),
316 other
317 ),
318 },
319 None => None,
320 };
321 let new_element = merge_field_metadata_recursive(iceberg_element, mz_element)?;
322 DataType::List(Arc::new(new_element))
323 }
324 DataType::LargeList(iceberg_element) => {
325 let mz_element = match mz_field {
326 Some(f) => match f.data_type() {
327 DataType::LargeList(element) => Some(element.as_ref()),
328 other => anyhow::bail!(
329 "Type mismatch for field '{}': Iceberg schema has LargeList, but Materialize schema has {:?}",
330 iceberg_field.name(),
331 other
332 ),
333 },
334 None => None,
335 };
336 let new_element = merge_field_metadata_recursive(iceberg_element, mz_element)?;
337 DataType::LargeList(Arc::new(new_element))
338 }
339 DataType::Map(iceberg_entries, sorted) => {
340 let mz_entries = match mz_field {
341 Some(f) => match f.data_type() {
342 DataType::Map(entries, _) => Some(entries.as_ref()),
343 other => anyhow::bail!(
344 "Type mismatch for field '{}': Iceberg schema has Map, but Materialize schema has {:?}",
345 iceberg_field.name(),
346 other
347 ),
348 },
349 None => None,
350 };
351 let new_entries = merge_field_metadata_recursive(iceberg_entries, mz_entries)?;
352 DataType::Map(Arc::new(new_entries), *sorted)
353 }
354 other => other.clone(),
355 };
356
357 Ok(Field::new(
358 iceberg_field.name(),
359 new_data_type,
360 iceberg_field.is_nullable(),
361 )
362 .with_metadata(metadata))
363}
364
365async fn reload_table(
366 catalog: &dyn Catalog,
367 namespace: String,
368 table_name: String,
369 current_table: Table,
370) -> anyhow::Result<Table> {
371 let namespace_ident = NamespaceIdent::new(namespace.clone());
372 let table_ident = TableIdent::new(namespace_ident, table_name.clone());
373 let current_schema = current_table.metadata().current_schema_id();
374 let current_partition_spec = current_table.metadata().default_partition_spec_id();
375
376 match catalog.load_table(&table_ident).await {
377 Ok(table) => {
378 let reloaded_schema = table.metadata().current_schema_id();
379 let reloaded_partition_spec = table.metadata().default_partition_spec_id();
380 if reloaded_schema != current_schema {
381 return Err(anyhow::anyhow!(
382 "Iceberg table '{}' schema changed during operation but schema evolution isn't supported, expected schema ID {}, got {}",
383 table_name,
384 current_schema,
385 reloaded_schema
386 ));
387 }
388
389 if reloaded_partition_spec != current_partition_spec {
390 return Err(anyhow::anyhow!(
391 "Iceberg table '{}' partition spec changed during operation but partition spec evolution isn't supported, expected partition spec ID {}, got {}",
392 table_name,
393 current_partition_spec,
394 reloaded_partition_spec
395 ));
396 }
397
398 Ok(table)
399 }
400 Err(err) => Err(err).context("Failed to reload Iceberg table"),
401 }
402}
403
404async fn load_or_create_table(
406 catalog: &dyn Catalog,
407 namespace: String,
408 table_name: String,
409 schema: &Schema,
410) -> anyhow::Result<iceberg::table::Table> {
411 let namespace_ident = NamespaceIdent::new(namespace.clone());
412 let table_ident = TableIdent::new(namespace_ident.clone(), table_name.clone());
413
414 match catalog.load_table(&table_ident).await {
416 Ok(table) => {
417 Ok(table)
420 }
421 Err(err) => {
422 if matches!(err.kind(), ErrorKind::TableNotFound { .. })
423 || err
424 .message()
425 .contains("Tried to load a table that does not exist")
426 {
427 let table_creation = TableCreation::builder()
431 .name(table_name.clone())
432 .schema(schema.clone())
433 .build();
437
438 catalog
439 .create_table(&namespace_ident, table_creation)
440 .await
441 .with_context(|| {
442 format!(
443 "Failed to create Iceberg table '{}' in namespace '{}'",
444 table_name, namespace
445 )
446 })
447 } else {
448 Err(err).context("Failed to load Iceberg table")
450 }
451 }
452 }
453}
454
455fn retrieve_upper_from_snapshots(
460 snapshots: &mut [Arc<Snapshot>],
461) -> anyhow::Result<Option<(Antichain<Timestamp>, u64)>> {
462 snapshots.sort_by(|a, b| Ord::cmp(&b.sequence_number(), &a.sequence_number()));
463
464 for snapshot in snapshots {
465 let props = &snapshot.summary().additional_properties;
466 if let (Some(frontier_json), Some(sink_version_str)) =
467 (props.get("mz-frontier"), props.get("mz-sink-version"))
468 {
469 let frontier: Vec<Timestamp> = serde_json::from_str(frontier_json)
470 .context("Failed to deserialize frontier from snapshot properties")?;
471 let frontier = Antichain::from_iter(frontier);
472
473 let sink_version = sink_version_str
474 .parse::<u64>()
475 .context("Failed to parse mz-sink-version from snapshot properties")?;
476
477 return Ok(Some((frontier, sink_version)));
478 }
479 if snapshot.summary().operation.as_str() != "replace" {
480 anyhow::bail!(
485 "Iceberg table is in an inconsistent state: snapshot {} has operation '{}' but is missing 'mz-frontier' property. Schema or partition spec evolution is not supported.",
486 snapshot.snapshot_id(),
487 snapshot.summary().operation.as_str(),
488 );
489 }
490 }
491
492 Ok(None)
493}
494
495fn relation_desc_to_iceberg_schema(
499 desc: &mz_repr::RelationDesc,
500) -> anyhow::Result<(ArrowSchema, SchemaRef)> {
501 let arrow_schema = mz_arrow_util::builder::desc_to_schema(desc)
502 .context("Failed to convert RelationDesc to Arrow schema")?;
503
504 let arrow_schema_with_ids = add_field_ids_to_arrow_schema(arrow_schema);
505
506 let iceberg_schema = arrow_schema_to_schema(&arrow_schema_with_ids)
507 .context("Failed to convert Arrow schema to Iceberg schema")?;
508
509 Ok((arrow_schema_with_ids, Arc::new(iceberg_schema)))
510}
511
512fn build_schema_with_op_column(schema: &ArrowSchema) -> ArrowSchema {
514 let mut fields: Vec<Arc<Field>> = schema.fields().iter().cloned().collect();
515 fields.push(Arc::new(Field::new("__op", DataType::Int32, false)));
516 ArrowSchema::new(fields)
517}
518
519fn row_to_recordbatch(
523 row: DiffPair<Row>,
524 schema: ArrowSchemaRef,
525 schema_with_op: ArrowSchemaRef,
526) -> anyhow::Result<RecordBatch> {
527 let mut builder = ArrowBuilder::new_with_schema(
528 Arc::clone(&schema),
529 DEFAULT_ARRAY_BUILDER_ITEM_CAPACITY,
530 DEFAULT_ARRAY_BUILDER_DATA_CAPACITY,
531 )
532 .context("Failed to create builder")?;
533
534 let mut op_values = Vec::new();
535
536 if let Some(before) = row.before {
537 builder
538 .add_row(&before)
539 .context("Failed to add delete row to builder")?;
540 op_values.push(-1i32); }
542 if let Some(after) = row.after {
543 builder
544 .add_row(&after)
545 .context("Failed to add insert row to builder")?;
546 op_values.push(1i32); }
548
549 let batch = builder
550 .to_record_batch()
551 .context("Failed to create record batch")?;
552
553 let mut columns: Vec<ArrayRef> = batch.columns().to_vec();
554 let op_column = Arc::new(Int32Array::from(op_values));
555 columns.push(op_column);
556
557 let batch_with_op = RecordBatch::try_new(schema_with_op, columns)
558 .context("Failed to create batch with op column")?;
559 Ok(batch_with_op)
560}
561
562fn mint_batch_descriptions<G, D>(
567 name: String,
568 sink_id: GlobalId,
569 input: &VecCollection<G, D, Diff>,
570 sink: &StorageSinkDesc<CollectionMetadata, Timestamp>,
571 connection: IcebergSinkConnection,
572 storage_configuration: StorageConfiguration,
573 initial_schema: SchemaRef,
574) -> (
575 VecCollection<G, D, Diff>,
576 Stream<G, (Antichain<Timestamp>, Antichain<Timestamp>)>,
577 Stream<G, Infallible>,
578 Stream<G, HealthStatusMessage>,
579 PressOnDropButton,
580)
581where
582 G: Scope<Timestamp = Timestamp>,
583 D: Clone + 'static,
584{
585 let scope = input.scope();
586 let name_for_error = name.clone();
587 let name_for_logging = name.clone();
588 let mut builder = OperatorBuilder::new(name, scope.clone());
589 let sink_version = sink.version;
590
591 let hashed_id = sink_id.hashed();
592 let is_active_worker = usize::cast_from(hashed_id) % scope.peers() == scope.index();
593 let (_, table_ready_stream) = builder.new_output::<CapacityContainerBuilder<_>>();
594 let (output, output_stream) = builder.new_output();
595 let (batch_desc_output, batch_desc_stream) =
596 builder.new_output::<CapacityContainerBuilder<_>>();
597 let mut input =
598 builder.new_input_for_many(&input.inner, Pipeline, [&output, &batch_desc_output]);
599
600 let as_of = sink.as_of.clone();
601 let commit_interval = sink
602 .commit_interval
603 .expect("the planner should have enforced this")
604 .clone();
605
606 let (button, errors): (_, Stream<G, Rc<anyhow::Error>>) = builder.build_fallible(move |caps| {
607 Box::pin(async move {
608 let [table_ready_capset, data_capset, capset]: &mut [_; 3] = caps.try_into().unwrap();
609 *data_capset = CapabilitySet::new();
610
611 if !is_active_worker {
612 *capset = CapabilitySet::new();
613 *data_capset = CapabilitySet::new();
614 *table_ready_capset = CapabilitySet::new();
615 while let Some(event) = input.next().await {
616 match event {
617 Event::Data([output_cap, _], mut data) => {
618 output.give_container(&output_cap, &mut data);
619 }
620 Event::Progress(_) => {}
621 }
622 }
623 return Ok(());
624 }
625
626 let catalog = connection
627 .catalog_connection
628 .connect(&storage_configuration, InTask::Yes)
629 .await
630 .context("Failed to connect to iceberg catalog")?;
631
632 let table = load_or_create_table(
633 catalog.as_ref(),
634 connection.namespace.clone(),
635 connection.table.clone(),
636 initial_schema.as_ref(),
637 )
638 .await?;
639 debug!(
640 ?sink_id,
641 %name_for_logging,
642 namespace = %connection.namespace,
643 table = %connection.table,
644 "iceberg mint loaded/created table"
645 );
646
647 *table_ready_capset = CapabilitySet::new();
648
649 let mut snapshots: Vec<_> = table.metadata().snapshots().cloned().collect();
650 let resume = retrieve_upper_from_snapshots(&mut snapshots)?;
651 let (resume_upper, resume_version) = match resume {
652 Some((f, v)) => (f, v),
653 None => (Antichain::from_elem(Timestamp::minimum()), 0),
654 };
655 debug!(
656 ?sink_id,
657 %name_for_logging,
658 resume_upper = %resume_upper.pretty(),
659 resume_version,
660 as_of = %as_of.pretty(),
661 "iceberg mint resume position loaded"
662 );
663
664 let overcompacted =
666 *resume_upper != [Timestamp::minimum()] &&
668 PartialOrder::less_than(&resume_upper, &as_of);
670
671 if overcompacted {
672 let err = format!(
673 "{name_for_error}: input compacted past resume upper: as_of {}, resume_upper: {}",
674 as_of.pretty(),
675 resume_upper.pretty()
676 );
677 return Err(anyhow::anyhow!("{err}"));
681 };
682
683 if resume_version > sink_version {
684 anyhow::bail!("Fenced off by newer sink version: resume_version {}, sink_version {}", resume_version, sink_version);
685 }
686
687 let mut initialized = false;
688 let mut observed_frontier;
689 let mut max_seen_ts: Option<Timestamp> = None;
690 let mut minted_batches = VecDeque::new();
695 loop {
696 if let Some(event) = input.next().await {
697 match event {
698 Event::Data([output_cap, _], mut data) => {
699 if !initialized {
700 for (_, ts, _) in data.iter() {
701 match max_seen_ts.as_mut() {
702 Some(max) => {
703 if max.less_than(ts) {
704 *max = ts.clone();
705 }
706 }
707 None => {
708 max_seen_ts = Some(ts.clone());
709 }
710 }
711 }
712 }
713 output.give_container(&output_cap, &mut data);
714 continue;
715 }
716 Event::Progress(frontier) => {
717 observed_frontier = frontier;
718 }
719 }
720 } else {
721 return Ok(());
722 }
723
724 if !initialized {
725 if observed_frontier.is_empty() {
726 if let Some(max_ts) = max_seen_ts.as_ref() {
732 let synthesized_upper =
733 Antichain::from_elem(max_ts.step_forward());
734 debug!(
735 ?sink_id,
736 %name_for_logging,
737 max_seen_ts = %max_ts,
738 synthesized_upper = %synthesized_upper.pretty(),
739 "iceberg mint input closed before initialization; using max seen ts"
740 );
741 observed_frontier = synthesized_upper;
742 } else {
743 debug!(
744 ?sink_id,
745 %name_for_logging,
746 "iceberg mint input closed before initialization with no data"
747 );
748 return Ok(());
750 }
751 }
752
753 if PartialOrder::less_than(&observed_frontier, &resume_upper)
756 || PartialOrder::less_than(&observed_frontier, &as_of)
757 {
758 continue;
759 }
760
761 let mut batch_descriptions = vec![];
762 let mut current_upper = observed_frontier.clone();
763 let current_upper_ts = current_upper.as_option().expect("frontier not empty").clone();
764
765 let batch_lower = if PartialOrder::less_than(&resume_upper, &as_of) {
770 as_of.clone()
771 } else {
772 resume_upper.clone()
773 };
774
775 if batch_lower == current_upper {
776 current_upper = Antichain::from_elem(current_upper_ts.step_forward());
779 }
780
781 let batch_description = (batch_lower.clone(), current_upper.clone());
782 debug!(
783 ?sink_id,
784 %name_for_logging,
785 batch_lower = %batch_lower.pretty(),
786 current_upper = %current_upper.pretty(),
787 "iceberg mint initializing (catch-up batch)"
788 );
789 debug!(
790 "{}: creating catch-up batch [{}, {})",
791 name_for_logging,
792 batch_lower.pretty(),
793 current_upper.pretty()
794 );
795 batch_descriptions.push(batch_description);
796 for i in 1..INITIAL_DESCRIPTIONS_TO_MINT {
798 let duration_millis = commit_interval.as_millis()
799 .checked_mul(u128::from(i))
800 .expect("commit interval multiplication overflow");
801 let duration_ts = Timestamp::new(
802 u64::try_from(duration_millis)
803 .expect("commit interval too large for u64"),
804 );
805 let desired_batch_upper = Antichain::from_elem(
806 current_upper_ts.step_forward_by(&duration_ts),
807 );
808
809 let batch_description =
810 (current_upper.clone(), desired_batch_upper.clone());
811 debug!(
812 "{}: minting future batch {}/{} [{}, {})",
813 name_for_logging,
814 i,
815 INITIAL_DESCRIPTIONS_TO_MINT,
816 current_upper.pretty(),
817 desired_batch_upper.pretty()
818 );
819 current_upper = batch_description.1.clone();
820 batch_descriptions.push(batch_description);
821 }
822
823 minted_batches.extend(batch_descriptions.clone());
824
825 for desc in batch_descriptions {
826 batch_desc_output.give(&capset[0], desc);
827 }
828
829 capset.downgrade(current_upper);
830
831 initialized = true;
832 } else {
833 if observed_frontier.is_empty() {
834 return Ok(());
836 }
837 while let Some(oldest_desc) = minted_batches.front() {
840 let oldest_upper = &oldest_desc.1;
841 if !PartialOrder::less_equal(oldest_upper, &observed_frontier) {
842 break;
843 }
844
845 let newest_upper = minted_batches.back().unwrap().1.clone();
846 let new_lower = newest_upper.clone();
847 let duration_ts = Timestamp::new(commit_interval.as_millis()
848 .try_into()
849 .expect("commit interval too large for u64"));
850 let new_upper = Antichain::from_elem(newest_upper
851 .as_option()
852 .unwrap()
853 .step_forward_by(&duration_ts));
854
855 let new_batch_description = (new_lower.clone(), new_upper.clone());
856 minted_batches.pop_front();
857 minted_batches.push_back(new_batch_description.clone());
858
859 batch_desc_output.give(&capset[0], new_batch_description);
860
861 capset.downgrade(new_upper);
862 }
863 }
864 }
865 })
866 });
867
868 let statuses = errors.map(|error| HealthStatusMessage {
869 id: None,
870 update: HealthStatusUpdate::halting(format!("{}", error.display_with_causes()), None),
871 namespace: StatusNamespace::Iceberg,
872 });
873 (
874 output_stream.as_collection(),
875 batch_desc_stream,
876 table_ready_stream,
877 statuses,
878 button.press_on_drop(),
879 )
880}
881
882#[derive(Clone, Debug, Serialize, Deserialize)]
883#[serde(try_from = "AvroDataFile", into = "AvroDataFile")]
884struct SerializableDataFile {
885 pub data_file: DataFile,
886 pub schema: Schema,
887}
888
889#[derive(Clone, Debug, Serialize, Deserialize)]
897struct AvroDataFile {
898 pub data_file: Vec<u8>,
899 pub schema: Vec<u8>,
901}
902
903impl From<SerializableDataFile> for AvroDataFile {
904 fn from(value: SerializableDataFile) -> Self {
905 let mut data_file = Vec::new();
906 write_data_files_to_avro(
907 &mut data_file,
908 [value.data_file],
909 &StructType::new(vec![]),
910 FormatVersion::V2,
911 )
912 .expect("serialization into buffer");
913 let schema = serde_json::to_vec(&value.schema).expect("schema serialization");
914 AvroDataFile { data_file, schema }
915 }
916}
917
918impl TryFrom<AvroDataFile> for SerializableDataFile {
919 type Error = String;
920
921 fn try_from(value: AvroDataFile) -> Result<Self, Self::Error> {
922 let schema: Schema = serde_json::from_slice(&value.schema)
923 .map_err(|e| format!("Failed to deserialize schema: {}", e))?;
924 let data_files = read_data_files_from_avro(
925 &mut &*value.data_file,
926 &schema,
927 0,
928 &StructType::new(vec![]),
929 FormatVersion::V2,
930 )
931 .map_err_to_string_with_causes()?;
932 let Some(data_file) = data_files.into_iter().next() else {
933 return Err("No DataFile found in Avro data".into());
934 };
935 Ok(SerializableDataFile { data_file, schema })
936 }
937}
938
939#[derive(Clone, Debug, Serialize, Deserialize)]
941struct BoundedDataFile {
942 pub data_file: SerializableDataFile,
943 pub batch_desc: (Antichain<Timestamp>, Antichain<Timestamp>),
944}
945
946impl BoundedDataFile {
947 pub fn new(
948 file: DataFile,
949 schema: Schema,
950 batch_desc: (Antichain<Timestamp>, Antichain<Timestamp>),
951 ) -> Self {
952 Self {
953 data_file: SerializableDataFile {
954 data_file: file,
955 schema,
956 },
957 batch_desc,
958 }
959 }
960
961 pub fn batch_desc(&self) -> &(Antichain<Timestamp>, Antichain<Timestamp>) {
962 &self.batch_desc
963 }
964
965 pub fn data_file(&self) -> &DataFile {
966 &self.data_file.data_file
967 }
968
969 pub fn into_data_file(self) -> DataFile {
970 self.data_file.data_file
971 }
972}
973
974#[derive(Clone, Debug, Default)]
976struct BoundedDataFileSet {
977 pub data_files: Vec<BoundedDataFile>,
978}
979
980fn write_data_files<G>(
984 name: String,
985 input: VecCollection<G, (Option<Row>, DiffPair<Row>), Diff>,
986 batch_desc_input: &Stream<G, (Antichain<Timestamp>, Antichain<Timestamp>)>,
987 table_ready_stream: &Stream<G, Infallible>,
988 as_of: Antichain<Timestamp>,
989 connection: IcebergSinkConnection,
990 storage_configuration: StorageConfiguration,
991 materialize_arrow_schema: Arc<ArrowSchema>,
992 metrics: Arc<IcebergSinkMetrics>,
993 statistics: SinkStatistics,
994) -> (
995 Stream<G, BoundedDataFile>,
996 Stream<G, HealthStatusMessage>,
997 PressOnDropButton,
998)
999where
1000 G: Scope<Timestamp = Timestamp>,
1001{
1002 let scope = input.scope();
1003 let name_for_logging = name.clone();
1004 let mut builder = OperatorBuilder::new(name, scope.clone());
1005
1006 let (output, output_stream) = builder.new_output::<CapacityContainerBuilder<_>>();
1007
1008 let mut table_ready_input = builder.new_disconnected_input(table_ready_stream, Pipeline);
1009 let mut batch_desc_input =
1010 builder.new_input_for(&batch_desc_input.broadcast(), Pipeline, &output);
1011 let mut input = builder.new_disconnected_input(&input.inner, Pipeline);
1012
1013 let (button, errors) = builder.build_fallible(move |caps| {
1014 Box::pin(async move {
1015 let [capset]: &mut [_; 1] = caps.try_into().unwrap();
1016 let catalog = connection
1017 .catalog_connection
1018 .connect(&storage_configuration, InTask::Yes)
1019 .await
1020 .context("Failed to connect to iceberg catalog")?;
1021
1022 let namespace_ident = NamespaceIdent::new(connection.namespace.clone());
1023 let table_ident = TableIdent::new(namespace_ident, connection.table.clone());
1024 while let Some(_) = table_ready_input.next().await {
1025 }
1027 let table = catalog
1028 .load_table(&table_ident)
1029 .await
1030 .context("Failed to load Iceberg table")?;
1031
1032 let table_metadata = table.metadata().clone();
1033 let current_schema = Arc::clone(table_metadata.current_schema());
1034
1035 let arrow_schema = Arc::new(
1039 merge_materialize_metadata_into_iceberg_schema(
1040 materialize_arrow_schema.as_ref(),
1041 current_schema.as_ref(),
1042 )
1043 .context("Failed to merge Materialize metadata into Iceberg schema")?,
1044 );
1045
1046 let schema_with_op = Arc::new(build_schema_with_op_column(&arrow_schema));
1048
1049 let location = table_metadata.location();
1053 let corrected_location = match location.rsplit_once("/metadata/") {
1054 Some((a, b)) if b.ends_with(".metadata.json") => a,
1055 _ => location,
1056 };
1057
1058 let data_location = format!("{}/data", corrected_location);
1059 let location_generator = DefaultLocationGenerator::with_data_location(data_location);
1060
1061 let unique_suffix = format!("-{}", uuid::Uuid::new_v4());
1063 let file_name_generator = DefaultFileNameGenerator::new(
1064 PARQUET_FILE_PREFIX.to_string(),
1065 Some(unique_suffix),
1066 iceberg::spec::DataFileFormat::Parquet,
1067 );
1068
1069 let file_io = table.file_io().clone();
1070
1071 let Some((_, equality_indices)) = connection.key_desc_and_indices else {
1072 return Err(anyhow::anyhow!(
1073 "Iceberg sink requires key columns for equality deletes"
1074 ));
1075 };
1076
1077 let equality_ids: Vec<i32> = equality_indices
1078 .iter()
1079 .map(|u| i32::try_from(*u).map(|v| v + 1))
1080 .collect::<Result<Vec<i32>, _>>()
1081 .context("Failed to convert equality index to i32 (index too large)")?;
1082
1083 let writer_properties = WriterProperties::new();
1084
1085 let arrow_schema_for_closure = Arc::clone(&arrow_schema);
1086 let current_schema_for_closure = Arc::clone(¤t_schema);
1087 let file_io_for_closure = file_io.clone();
1088 let location_generator_for_closure = location_generator.clone();
1089 let file_name_generator_for_closure = file_name_generator.clone();
1090 let equality_ids_for_closure = equality_ids.clone();
1091 let writer_properties_for_closure = writer_properties.clone();
1092
1093 let create_delta_writer = move |disable_seen_rows: bool| {
1094 let arrow_schema = Arc::clone(&arrow_schema_for_closure);
1095 let current_schema = Arc::clone(¤t_schema_for_closure);
1096 let file_io = file_io_for_closure.clone();
1097 let location_generator = location_generator_for_closure.clone();
1098 let file_name_generator = file_name_generator_for_closure.clone();
1099 let equality_ids = equality_ids_for_closure.clone();
1100 let writer_properties = writer_properties_for_closure.clone();
1101
1102 async move {
1103 let data_parquet_writer = ParquetWriterBuilder::new(
1104 writer_properties.clone(),
1105 Arc::clone(¤t_schema),
1106 )
1107 .with_arrow_schema(Arc::clone(&arrow_schema))
1108 .context("Arrow schema validation failed")?;
1109 let data_rolling_writer = RollingFileWriterBuilder::new_with_default_file_size(
1110 data_parquet_writer,
1111 Arc::clone(¤t_schema),
1112 file_io.clone(),
1113 location_generator.clone(),
1114 file_name_generator.clone(),
1115 );
1116 let data_writer_builder = DataFileWriterBuilder::new(data_rolling_writer);
1117
1118 let pos_arrow_schema = PositionDeleteWriterConfig::arrow_schema();
1119 let pos_schema = Arc::new(arrow_schema_to_schema(&pos_arrow_schema).context(
1120 "Failed to convert position delete Arrow schema to Iceberg schema",
1121 )?);
1122 let pos_config = PositionDeleteWriterConfig::new(None, 0, None);
1123 let pos_parquet_writer =
1124 ParquetWriterBuilder::new(writer_properties.clone(), pos_schema);
1125 let pos_rolling_writer = RollingFileWriterBuilder::new_with_default_file_size(
1126 pos_parquet_writer,
1127 Arc::clone(¤t_schema),
1128 file_io.clone(),
1129 location_generator.clone(),
1130 file_name_generator.clone(),
1131 );
1132 let pos_delete_writer_builder =
1133 PositionDeleteFileWriterBuilder::new(pos_rolling_writer, pos_config);
1134
1135 let eq_config = EqualityDeleteWriterConfig::new(
1136 equality_ids.clone(),
1137 Arc::clone(¤t_schema),
1138 )
1139 .context("Failed to create EqualityDeleteWriterConfig")?;
1140
1141 let eq_schema = Arc::new(
1142 arrow_schema_to_schema(eq_config.projected_arrow_schema_ref()).context(
1143 "Failed to convert equality delete Arrow schema to Iceberg schema",
1144 )?,
1145 );
1146
1147 let eq_parquet_writer =
1148 ParquetWriterBuilder::new(writer_properties.clone(), eq_schema);
1149 let eq_rolling_writer = RollingFileWriterBuilder::new_with_default_file_size(
1150 eq_parquet_writer,
1151 Arc::clone(¤t_schema),
1152 file_io.clone(),
1153 location_generator.clone(),
1154 file_name_generator.clone(),
1155 );
1156 let eq_delete_writer_builder =
1157 EqualityDeleteFileWriterBuilder::new(eq_rolling_writer, eq_config);
1158
1159 let mut builder = DeltaWriterBuilder::new(
1160 data_writer_builder,
1161 pos_delete_writer_builder,
1162 eq_delete_writer_builder,
1163 equality_ids.clone(),
1164 );
1165
1166 if disable_seen_rows {
1167 builder = builder.with_max_seen_rows(0);
1168 }
1169
1170 builder
1171 .build(None)
1172 .await
1173 .context("Failed to create DeltaWriter")
1174 }
1175 };
1176
1177 let mut stashed_rows: BTreeMap<Timestamp, Vec<(Option<Row>, DiffPair<Row>)>> =
1180 BTreeMap::new();
1181
1182 #[allow(clippy::disallowed_types)]
1187 let mut in_flight_batches: std::collections::HashMap<
1188 (Antichain<Timestamp>, Antichain<Timestamp>),
1189 DeltaWriterType,
1190 > = std::collections::HashMap::new();
1191
1192 let mut batch_description_frontier = Antichain::from_elem(Timestamp::minimum());
1193 let mut processed_batch_description_frontier =
1194 Antichain::from_elem(Timestamp::minimum());
1195 let mut input_frontier = Antichain::from_elem(Timestamp::minimum());
1196 let mut processed_input_frontier = Antichain::from_elem(Timestamp::minimum());
1197
1198 let mut min_batch_lower: Option<Antichain<Timestamp>> = None;
1200
1201 while !(batch_description_frontier.is_empty() && input_frontier.is_empty()) {
1202 let mut staged_messages_since_flush: u64 = 0;
1203 tokio::select! {
1204 _ = batch_desc_input.ready() => {},
1205 _ = input.ready() => {}
1206 }
1207
1208 while let Some(event) = batch_desc_input.next_sync() {
1209 match event {
1210 Event::Data(_cap, data) => {
1211 for batch_desc in data {
1212 let (lower, upper) = &batch_desc;
1213
1214 if min_batch_lower.is_none() {
1216 min_batch_lower = Some(lower.clone());
1217 debug!(
1218 "{}: set min_batch_lower to {}",
1219 name_for_logging,
1220 lower.pretty()
1221 );
1222
1223 let to_remove: Vec<_> = stashed_rows
1225 .keys()
1226 .filter(|ts| {
1227 let ts_antichain = Antichain::from_elem((*ts).clone());
1228 PartialOrder::less_than(&ts_antichain, lower)
1229 })
1230 .cloned()
1231 .collect();
1232
1233 if !to_remove.is_empty() {
1234 let mut removed_count = 0;
1235 for ts in to_remove {
1236 if let Some(rows) = stashed_rows.remove(&ts) {
1237 removed_count += rows.len();
1238 for _ in &rows {
1239 metrics.stashed_rows.dec();
1240 }
1241 }
1242 }
1243 debug!(
1244 "{}: pruned {} already-committed rows (< min_batch_lower)",
1245 name_for_logging,
1246 removed_count
1247 );
1248 }
1249 }
1250
1251 let is_snapshot = lower == &as_of;
1253 debug!(
1254 "{}: received batch description [{}, {}), snapshot={}",
1255 name_for_logging,
1256 lower.pretty(),
1257 upper.pretty(),
1258 is_snapshot
1259 );
1260 let mut delta_writer = create_delta_writer(is_snapshot).await?;
1261 let row_ts_keys: Vec<_> = stashed_rows.keys().cloned().collect();
1263 let mut drained_count = 0;
1264 for row_ts in row_ts_keys {
1265 let ts = Antichain::from_elem(row_ts.clone());
1266 if PartialOrder::less_equal(lower, &ts)
1267 && PartialOrder::less_than(&ts, upper)
1268 {
1269 if let Some(rows) = stashed_rows.remove(&row_ts) {
1270 drained_count += rows.len();
1271 for (_row, diff_pair) in rows {
1272 metrics.stashed_rows.dec();
1273 let record_batch = row_to_recordbatch(
1274 diff_pair.clone(),
1275 Arc::clone(&arrow_schema),
1276 Arc::clone(&schema_with_op),
1277 )
1278 .context("failed to convert row to recordbatch")?;
1279 delta_writer.write(record_batch).await.context(
1280 "Failed to write row to DeltaWriter",
1281 )?;
1282 staged_messages_since_flush += 1;
1283 if staged_messages_since_flush >= 10_000 {
1284 statistics.inc_messages_staged_by(
1285 staged_messages_since_flush,
1286 );
1287 staged_messages_since_flush = 0;
1288 }
1289 }
1290 }
1291 }
1292 }
1293 if drained_count > 0 {
1294 debug!(
1295 "{}: drained {} stashed rows into batch [{}, {})",
1296 name_for_logging,
1297 drained_count,
1298 lower.pretty(),
1299 upper.pretty()
1300 );
1301 }
1302 let prev =
1303 in_flight_batches.insert(batch_desc.clone(), delta_writer);
1304 if prev.is_some() {
1305 anyhow::bail!(
1306 "Duplicate batch description received for description {:?}",
1307 batch_desc
1308 );
1309 }
1310 }
1311 }
1312 Event::Progress(frontier) => {
1313 batch_description_frontier = frontier;
1314 }
1315 }
1316 }
1317
1318 let ready_events = std::iter::from_fn(|| input.next_sync()).collect_vec();
1319 for event in ready_events {
1320 match event {
1321 Event::Data(_cap, data) => {
1322 let mut dropped_per_time = BTreeMap::new();
1323 let mut stashed_per_time = BTreeMap::new();
1324 for ((row, diff_pair), ts, _diff) in data {
1325 let row_ts = ts.clone();
1326 let ts_antichain = Antichain::from_elem(row_ts.clone());
1327 let mut written = false;
1328 for (batch_desc, delta_writer) in in_flight_batches.iter_mut() {
1330 let (lower, upper) = batch_desc;
1331 if PartialOrder::less_equal(lower, &ts_antichain)
1332 && PartialOrder::less_than(&ts_antichain, upper)
1333 {
1334 let record_batch = row_to_recordbatch(
1335 diff_pair.clone(),
1336 Arc::clone(&arrow_schema),
1337 Arc::clone(&schema_with_op),
1338 )
1339 .context("failed to convert row to recordbatch")?;
1340 delta_writer
1341 .write(record_batch)
1342 .await
1343 .context("Failed to write row to DeltaWriter")?;
1344 staged_messages_since_flush += 1;
1345 if staged_messages_since_flush >= 10_000 {
1346 statistics.inc_messages_staged_by(
1347 staged_messages_since_flush,
1348 );
1349 staged_messages_since_flush = 0;
1350 }
1351 written = true;
1352 break;
1353 }
1354 }
1355 if !written {
1356 if let Some(ref min_lower) = min_batch_lower {
1358 if PartialOrder::less_than(&ts_antichain, min_lower) {
1359 dropped_per_time
1360 .entry(ts_antichain.into_option().unwrap())
1361 .and_modify(|c| *c += 1)
1362 .or_insert(1);
1363 continue;
1364 }
1365 }
1366
1367 stashed_per_time.entry(ts).and_modify(|c| *c += 1).or_insert(1);
1368 let entry = stashed_rows.entry(row_ts).or_default();
1369 metrics.stashed_rows.inc();
1370 entry.push((row, diff_pair));
1371 }
1372 }
1373
1374 for (ts, count) in dropped_per_time {
1375 debug!(
1376 "{}: dropped {} rows at timestamp {} (< min_batch_lower, already committed)",
1377 name_for_logging, count, ts
1378 );
1379 }
1380
1381 for (ts, count) in stashed_per_time {
1382 debug!(
1383 "{}: stashed {} rows at timestamp {} (waiting for batch description)",
1384 name_for_logging, count, ts
1385 );
1386 }
1387 }
1388 Event::Progress(frontier) => {
1389 input_frontier = frontier;
1390 }
1391 }
1392 }
1393 if staged_messages_since_flush > 0 {
1394 statistics.inc_messages_staged_by(staged_messages_since_flush);
1395 }
1396
1397 if PartialOrder::less_than(
1399 &processed_batch_description_frontier,
1400 &batch_description_frontier,
1401 ) || PartialOrder::less_than(&processed_input_frontier, &input_frontier)
1402 {
1403 let ready_batches: Vec<_> = in_flight_batches
1409 .extract_if(|(lower, upper), _| {
1410 PartialOrder::less_than(lower, &batch_description_frontier)
1411 && PartialOrder::less_equal(upper, &input_frontier)
1412 })
1413 .collect();
1414
1415 if !ready_batches.is_empty() {
1416 debug!(
1417 "{}: closing {} batches (batch_frontier: {}, input_frontier: {})",
1418 name_for_logging,
1419 ready_batches.len(),
1420 batch_description_frontier.pretty(),
1421 input_frontier.pretty()
1422 );
1423 let mut max_upper = Antichain::from_elem(Timestamp::minimum());
1424 for (desc, mut delta_writer) in ready_batches {
1425 let close_started_at = Instant::now();
1426 let data_files = delta_writer.close().await;
1427 metrics
1428 .writer_close_duration_seconds
1429 .observe(close_started_at.elapsed().as_secs_f64());
1430 let data_files = data_files.context("Failed to close DeltaWriter")?;
1431 debug!(
1432 "{}: closed batch [{}, {}), wrote {} files",
1433 name_for_logging,
1434 desc.0.pretty(),
1435 desc.1.pretty(),
1436 data_files.len()
1437 );
1438 for data_file in data_files {
1439 match data_file.content_type() {
1440 iceberg::spec::DataContentType::Data => {
1441 metrics.data_files_written.inc();
1442 }
1443 iceberg::spec::DataContentType::PositionDeletes
1444 | iceberg::spec::DataContentType::EqualityDeletes => {
1445 metrics.delete_files_written.inc();
1446 }
1447 }
1448 statistics.inc_messages_staged_by(data_file.record_count());
1449 statistics.inc_bytes_staged_by(data_file.file_size_in_bytes());
1450 let file = BoundedDataFile::new(
1451 data_file,
1452 current_schema.as_ref().clone(),
1453 desc.clone(),
1454 );
1455 output.give(&capset[0], file);
1456 }
1457
1458 max_upper = max_upper.join(&desc.1);
1459 }
1460
1461 capset.downgrade(max_upper);
1462 }
1463 processed_batch_description_frontier.clone_from(&batch_description_frontier);
1464 processed_input_frontier.clone_from(&input_frontier);
1465 }
1466 }
1467 Ok(())
1468 })
1469 });
1470
1471 let statuses = errors.map(|error| HealthStatusMessage {
1472 id: None,
1473 update: HealthStatusUpdate::halting(format!("{}", error.display_with_causes()), None),
1474 namespace: StatusNamespace::Iceberg,
1475 });
1476 (output_stream, statuses, button.press_on_drop())
1477}
1478
1479fn commit_to_iceberg<G>(
1483 name: String,
1484 sink_id: GlobalId,
1485 sink_version: u64,
1486 batch_input: &Stream<G, BoundedDataFile>,
1487 batch_desc_input: &Stream<G, (Antichain<Timestamp>, Antichain<Timestamp>)>,
1488 table_ready_stream: &Stream<G, Infallible>,
1489 write_frontier: Rc<RefCell<Antichain<Timestamp>>>,
1490 connection: IcebergSinkConnection,
1491 storage_configuration: StorageConfiguration,
1492 write_handle: impl Future<
1493 Output = anyhow::Result<WriteHandle<SourceData, (), Timestamp, StorageDiff>>,
1494 > + 'static,
1495 metrics: Arc<IcebergSinkMetrics>,
1496 statistics: SinkStatistics,
1497) -> (Stream<G, HealthStatusMessage>, PressOnDropButton)
1498where
1499 G: Scope<Timestamp = Timestamp>,
1500{
1501 let scope = batch_input.scope();
1502 let mut builder = OperatorBuilder::new(name, scope.clone());
1503
1504 let hashed_id = sink_id.hashed();
1505 let is_active_worker = usize::cast_from(hashed_id) % scope.peers() == scope.index();
1506 let name_for_logging = format!("{sink_id}-commit-to-iceberg");
1507
1508 let mut input = builder.new_disconnected_input(batch_input, Exchange::new(move |_| hashed_id));
1509 let mut batch_desc_input =
1510 builder.new_disconnected_input(batch_desc_input, Exchange::new(move |_| hashed_id));
1511 let mut table_ready_input = builder.new_disconnected_input(table_ready_stream, Pipeline);
1512
1513 let (button, errors) = builder.build_fallible(move |_caps| {
1514 Box::pin(async move {
1515 if !is_active_worker {
1516 write_frontier.borrow_mut().clear();
1517 return Ok(());
1518 }
1519
1520 let catalog = connection
1521 .catalog_connection
1522 .connect(&storage_configuration, InTask::Yes)
1523 .await
1524 .context("Failed to connect to iceberg catalog")?;
1525
1526 let mut write_handle = write_handle.await?;
1527
1528 let namespace_ident = NamespaceIdent::new(connection.namespace.clone());
1529 let table_ident = TableIdent::new(namespace_ident, connection.table.clone());
1530 while let Some(_) = table_ready_input.next().await {
1531 }
1533 let mut table = catalog
1534 .load_table(&table_ident)
1535 .await
1536 .context("Failed to load Iceberg table")?;
1537
1538 #[allow(clippy::disallowed_types)]
1539 let mut batch_descriptions: std::collections::HashMap<
1540 (Antichain<Timestamp>, Antichain<Timestamp>),
1541 BoundedDataFileSet,
1542 > = std::collections::HashMap::new();
1543
1544
1545 let mut batch_description_frontier = Antichain::from_elem(Timestamp::minimum());
1546 let mut input_frontier = Antichain::from_elem(Timestamp::minimum());
1547
1548 while !(batch_description_frontier.is_empty() && input_frontier.is_empty()) {
1549 tokio::select! {
1550 _ = batch_desc_input.ready() => {},
1551 _ = input.ready() => {}
1552 }
1553
1554 while let Some(event) = batch_desc_input.next_sync() {
1555 match event {
1556 Event::Data(_cap, data) => {
1557 for batch_desc in data {
1558 let prev = batch_descriptions.insert(
1559 batch_desc,
1560 BoundedDataFileSet { data_files: vec![] },
1561 );
1562 if let Some(prev) = prev {
1563 anyhow::bail!(
1564 "Duplicate batch description received \
1565 in commit operator: {:?}",
1566 prev
1567 );
1568 }
1569 }
1570 }
1571 Event::Progress(frontier) => {
1572 batch_description_frontier = frontier;
1573 }
1574 }
1575 }
1576
1577 let ready_events = std::iter::from_fn(|| input.next_sync()).collect_vec();
1578 for event in ready_events {
1579 match event {
1580 Event::Data(_cap, data) => {
1581 for bounded_data_file in data {
1582 let entry = batch_descriptions
1583 .entry(bounded_data_file.batch_desc().clone())
1584 .or_default();
1585 entry.data_files.push(bounded_data_file);
1586 }
1587 }
1588 Event::Progress(frontier) => {
1589 input_frontier = frontier;
1590 }
1591 }
1592 }
1593
1594 let mut done_batches: Vec<_> = batch_descriptions
1600 .keys()
1601 .filter(|(lower, _upper)| {
1602 PartialOrder::less_than(lower, &input_frontier)
1603 })
1604 .cloned()
1605 .collect();
1606
1607 done_batches.sort_by(|a, b| {
1609 if PartialOrder::less_than(a, b) {
1610 Ordering::Less
1611 } else if PartialOrder::less_than(b, a) {
1612 Ordering::Greater
1613 } else {
1614 Ordering::Equal
1615 }
1616 });
1617
1618 for batch in done_batches {
1619 let file_set = batch_descriptions.remove(&batch).unwrap();
1620
1621 let mut data_files = vec![];
1622 let mut delete_files = vec![];
1623 let mut total_messages: u64 = 0;
1625 let mut total_bytes: u64 = 0;
1626 for file in file_set.data_files {
1627 total_messages += file.data_file().record_count();
1628 total_bytes += file.data_file().file_size_in_bytes();
1629 match file.data_file().content_type() {
1630 iceberg::spec::DataContentType::Data => {
1631 data_files.push(file.into_data_file());
1632 }
1633 iceberg::spec::DataContentType::PositionDeletes |
1634 iceberg::spec::DataContentType::EqualityDeletes => {
1635 delete_files.push(file.into_data_file());
1636 }
1637 }
1638 }
1639
1640 debug!(
1641 ?sink_id,
1642 %name_for_logging,
1643 lower = %batch.0.pretty(),
1644 upper = %batch.1.pretty(),
1645 data_files = data_files.len(),
1646 delete_files = delete_files.len(),
1647 total_messages,
1648 total_bytes,
1649 "iceberg commit applying batch"
1650 );
1651
1652 let instant = Instant::now();
1653
1654 let frontier = batch.1.clone();
1655 let tx = Transaction::new(&table);
1656
1657 let frontier_json = serde_json::to_string(&frontier.elements())
1658 .context("Failed to serialize frontier to JSON")?;
1659
1660 let mut action = tx.row_delta().set_snapshot_properties(
1662 vec![
1663 ("mz-sink-id".to_string(), sink_id.to_string()),
1664 ("mz-frontier".to_string(), frontier_json),
1665 ("mz-sink-version".to_string(), sink_version.to_string()),
1666 ].into_iter().collect()
1667 ).with_check_duplicate(false);
1668
1669 if !data_files.is_empty() || !delete_files.is_empty() {
1670 action = action.add_data_files(data_files).add_delete_files(delete_files);
1671 }
1672
1673 let tx = action.apply(tx).context(
1674 "Failed to apply data file addition to iceberg table transaction",
1675 )?;
1676
1677 let commit_result = Retry::default().max_tries(5).retry_async(|_| async {
1678 let new_table = tx.clone().commit(catalog.as_ref()).await;
1679 match new_table {
1680 Err(e) if matches!(e.kind(), ErrorKind::CatalogCommitConflicts) => {
1681 metrics.commit_conflicts.inc();
1682 let table = reload_table(
1683 catalog.as_ref(),
1684 connection.namespace.clone(),
1685 connection.table.clone(),
1686 table.clone(),
1687 ).await;
1688 let table = match table {
1689 Ok(table) => table,
1690 Err(e) => return RetryResult::RetryableErr(anyhow!(e)),
1691 };
1692
1693 let mut snapshots: Vec<_> =
1694 table.metadata().snapshots().cloned().collect();
1695 let last = retrieve_upper_from_snapshots(&mut snapshots);
1696 let last = match last {
1697 Ok(val) => val,
1698 Err(e) => return RetryResult::RetryableErr(anyhow!(e)),
1699 };
1700
1701 if let Some((last_frontier, _last_version)) = last {
1703 if PartialOrder::less_equal(&frontier, &last_frontier) {
1704 return RetryResult::FatalErr(anyhow!(
1705 "Iceberg table '{}' has been modified by another writer. Current frontier: {:?}, last frontier: {:?}.",
1706 connection.table,
1707 frontier,
1708 last_frontier,
1709 ));
1710 }
1711 }
1712
1713 RetryResult::Ok(table)
1714 }
1715 Err(e) => {
1716 metrics.commit_failures.inc();
1717 RetryResult::RetryableErr(anyhow!(e))
1718 },
1719 Ok(table) => RetryResult::Ok(table)
1720 }
1721 }).await.context("failed to commit to iceberg");
1722 let duration = instant.elapsed();
1723 metrics
1724 .commit_duration_seconds
1725 .observe(duration.as_secs_f64());
1726 table = commit_result?;
1727
1728 debug!(
1729 ?sink_id,
1730 %name_for_logging,
1731 lower = %batch.0.pretty(),
1732 upper = %batch.1.pretty(),
1733 total_messages,
1734 total_bytes,
1735 ?duration,
1736 "iceberg commit applied batch"
1737 );
1738
1739 metrics.snapshots_committed.inc();
1740 statistics.inc_messages_committed_by(total_messages);
1741 statistics.inc_bytes_committed_by(total_bytes);
1742
1743 let mut expect_upper = write_handle.shared_upper();
1744 loop {
1745 if PartialOrder::less_equal(&frontier, &expect_upper) {
1746 break;
1748 }
1749
1750 const EMPTY: &[((SourceData, ()), Timestamp, StorageDiff)] = &[];
1751 match write_handle
1752 .compare_and_append(EMPTY, expect_upper, frontier.clone())
1753 .await
1754 .expect("valid usage")
1755 {
1756 Ok(()) => break,
1757 Err(mismatch) => {
1758 expect_upper = mismatch.current;
1759 }
1760 }
1761 }
1762 write_frontier.borrow_mut().clone_from(&frontier);
1763
1764 }
1765 }
1766
1767
1768 Ok(())
1769 })
1770 });
1771
1772 let statuses = errors.map(|error| HealthStatusMessage {
1773 id: None,
1774 update: HealthStatusUpdate::halting(format!("{}", error.display_with_causes()), None),
1775 namespace: StatusNamespace::Iceberg,
1776 });
1777
1778 (statuses, button.press_on_drop())
1779}
1780
1781impl<G: Scope<Timestamp = Timestamp>> SinkRender<G> for IcebergSinkConnection {
1782 fn get_key_indices(&self) -> Option<&[usize]> {
1783 self.key_desc_and_indices
1784 .as_ref()
1785 .map(|(_, indices)| indices.as_slice())
1786 }
1787
1788 fn get_relation_key_indices(&self) -> Option<&[usize]> {
1789 self.relation_key_indices.as_deref()
1790 }
1791
1792 fn render_sink(
1793 &self,
1794 storage_state: &mut StorageState,
1795 sink: &StorageSinkDesc<CollectionMetadata, Timestamp>,
1796 sink_id: GlobalId,
1797 input: VecCollection<G, (Option<Row>, DiffPair<Row>), Diff>,
1798 _err_collection: VecCollection<G, DataflowError, Diff>,
1799 ) -> (Stream<G, HealthStatusMessage>, Vec<PressOnDropButton>) {
1800 let mut scope = input.scope();
1801
1802 let write_handle = {
1803 let persist = Arc::clone(&storage_state.persist_clients);
1804 let shard_meta = sink.to_storage_metadata.clone();
1805 async move {
1806 let client = persist.open(shard_meta.persist_location).await?;
1807 let handle = client
1808 .open_writer(
1809 shard_meta.data_shard,
1810 Arc::new(shard_meta.relation_desc),
1811 Arc::new(UnitSchema),
1812 Diagnostics::from_purpose("sink handle"),
1813 )
1814 .await?;
1815 Ok(handle)
1816 }
1817 };
1818
1819 let write_frontier = Rc::new(RefCell::new(Antichain::from_elem(Timestamp::minimum())));
1820 storage_state
1821 .sink_write_frontiers
1822 .insert(sink_id, Rc::clone(&write_frontier));
1823
1824 let (arrow_schema_with_ids, iceberg_schema) =
1825 match relation_desc_to_iceberg_schema(&sink.from_desc) {
1826 Ok(schemas) => schemas,
1827 Err(err) => {
1828 let error_stream = std::iter::once(HealthStatusMessage {
1829 id: None,
1830 update: HealthStatusUpdate::halting(
1831 format!("{}", err.display_with_causes()),
1832 None,
1833 ),
1834 namespace: StatusNamespace::Iceberg,
1835 })
1836 .to_stream(&mut scope);
1837 return (error_stream, vec![]);
1838 }
1839 };
1840
1841 let metrics = Arc::new(
1842 storage_state
1843 .metrics
1844 .get_iceberg_sink_metrics(sink_id, scope.index()),
1845 );
1846
1847 let statistics = storage_state
1848 .aggregated_statistics
1849 .get_sink(&sink_id)
1850 .expect("statistics initialized")
1851 .clone();
1852
1853 let connection_for_minter = self.clone();
1854 let (minted_input, batch_descriptions, table_ready, mint_status, mint_button) =
1855 mint_batch_descriptions(
1856 format!("{sink_id}-iceberg-mint"),
1857 sink_id,
1858 &input,
1859 sink,
1860 connection_for_minter,
1861 storage_state.storage_configuration.clone(),
1862 Arc::clone(&iceberg_schema),
1863 );
1864
1865 let connection_for_writer = self.clone();
1866 let (datafiles, write_status, write_button) = write_data_files(
1867 format!("{sink_id}-write-data-files"),
1868 minted_input,
1869 &batch_descriptions,
1870 &table_ready,
1871 sink.as_of.clone(),
1872 connection_for_writer,
1873 storage_state.storage_configuration.clone(),
1874 Arc::new(arrow_schema_with_ids.clone()),
1875 Arc::clone(&metrics),
1876 statistics.clone(),
1877 );
1878
1879 let connection_for_committer = self.clone();
1880 let (commit_status, commit_button) = commit_to_iceberg(
1881 format!("{sink_id}-commit-to-iceberg"),
1882 sink_id,
1883 sink.version,
1884 &datafiles,
1885 &batch_descriptions,
1886 &table_ready,
1887 Rc::clone(&write_frontier),
1888 connection_for_committer,
1889 storage_state.storage_configuration.clone(),
1890 write_handle,
1891 Arc::clone(&metrics),
1892 statistics,
1893 );
1894
1895 let running_status = Some(HealthStatusMessage {
1896 id: None,
1897 update: HealthStatusUpdate::running(),
1898 namespace: StatusNamespace::Iceberg,
1899 })
1900 .to_stream(&mut scope);
1901
1902 let statuses =
1903 scope.concatenate([running_status, mint_status, write_status, commit_status]);
1904
1905 (statuses, vec![mint_button, write_button, commit_button])
1906 }
1907}