1use std::cmp::Ordering;
75use std::collections::{BTreeMap, VecDeque};
76use std::convert::Infallible;
77use std::time::Instant;
78use std::{cell::RefCell, rc::Rc, sync::Arc};
79
80use anyhow::{Context, anyhow};
81use arrow::array::{ArrayRef, Int32Array, RecordBatch};
82use arrow::datatypes::{DataType, Field, Schema as ArrowSchema, SchemaRef as ArrowSchemaRef};
83use differential_dataflow::lattice::Lattice;
84use differential_dataflow::{AsCollection, Hashable, VecCollection};
85use futures::StreamExt;
86use iceberg::ErrorKind;
87use iceberg::arrow::{arrow_schema_to_schema, schema_to_arrow_schema};
88use iceberg::spec::{
89 DataFile, FormatVersion, Snapshot, StructType, read_data_files_from_avro,
90 write_data_files_to_avro,
91};
92use iceberg::spec::{Schema, SchemaRef};
93use iceberg::table::Table;
94use iceberg::transaction::{ApplyTransactionAction, Transaction};
95use iceberg::writer::base_writer::data_file_writer::DataFileWriter;
96use iceberg::writer::base_writer::data_file_writer::DataFileWriterBuilder;
97use iceberg::writer::base_writer::equality_delete_writer::{
98 EqualityDeleteFileWriter, EqualityDeleteFileWriterBuilder, EqualityDeleteWriterConfig,
99};
100use iceberg::writer::base_writer::position_delete_writer::PositionDeleteFileWriter;
101use iceberg::writer::base_writer::position_delete_writer::{
102 PositionDeleteFileWriterBuilder, PositionDeleteWriterConfig,
103};
104use iceberg::writer::combined_writer::delta_writer::{DeltaWriter, DeltaWriterBuilder};
105use iceberg::writer::file_writer::ParquetWriterBuilder;
106use iceberg::writer::file_writer::location_generator::{
107 DefaultFileNameGenerator, DefaultLocationGenerator,
108};
109use iceberg::writer::file_writer::rolling_writer::RollingFileWriterBuilder;
110use iceberg::writer::{IcebergWriter, IcebergWriterBuilder};
111use iceberg::{Catalog, NamespaceIdent, TableCreation, TableIdent};
112use itertools::Itertools;
113use mz_arrow_util::builder::{ARROW_EXTENSION_NAME_KEY, ArrowBuilder};
114use mz_interchange::avro::DiffPair;
115use mz_ore::cast::CastFrom;
116use mz_ore::error::ErrorExt;
117use mz_ore::future::InTask;
118use mz_ore::result::ResultExt;
119use mz_ore::retry::{Retry, RetryResult};
120use mz_persist_client::Diagnostics;
121use mz_persist_client::write::WriteHandle;
122use mz_persist_types::codec_impls::UnitSchema;
123use mz_repr::{Diff, GlobalId, Row, Timestamp};
124use mz_storage_types::StorageDiff;
125use mz_storage_types::configuration::StorageConfiguration;
126use mz_storage_types::controller::CollectionMetadata;
127use mz_storage_types::errors::DataflowError;
128use mz_storage_types::sinks::{IcebergSinkConnection, StorageSinkDesc};
129use mz_storage_types::sources::SourceData;
130use mz_timely_util::antichain::AntichainExt;
131use mz_timely_util::builder_async::{Event, OperatorBuilder, PressOnDropButton};
132use parquet::arrow::PARQUET_FIELD_ID_META_KEY;
133use parquet::file::properties::WriterProperties;
134use serde::{Deserialize, Serialize};
135use timely::PartialOrder;
136use timely::container::CapacityContainerBuilder;
137use timely::dataflow::channels::pact::{Exchange, Pipeline};
138use timely::dataflow::operators::vec::{Broadcast, Map, ToStream};
139use timely::dataflow::operators::{CapabilitySet, Concatenate};
140use timely::dataflow::{Scope, StreamVec};
141use timely::progress::{Antichain, Timestamp as _};
142use tracing::debug;
143
144use crate::healthcheck::{HealthStatusMessage, HealthStatusUpdate, StatusNamespace};
145use crate::metrics::sink::iceberg::IcebergSinkMetrics;
146use crate::render::sinks::SinkRender;
147use crate::statistics::SinkStatistics;
148use crate::storage_state::StorageState;
149
150const DEFAULT_ARRAY_BUILDER_ITEM_CAPACITY: usize = 1024;
153const DEFAULT_ARRAY_BUILDER_DATA_CAPACITY: usize = 1024;
157
158const PARQUET_FILE_PREFIX: &str = "mz_data";
160const INITIAL_DESCRIPTIONS_TO_MINT: u64 = 3;
163
164type DeltaWriterType = DeltaWriter<
165 DataFileWriter<ParquetWriterBuilder, DefaultLocationGenerator, DefaultFileNameGenerator>,
166 PositionDeleteFileWriter<
167 ParquetWriterBuilder,
168 DefaultLocationGenerator,
169 DefaultFileNameGenerator,
170 >,
171 EqualityDeleteFileWriter<
172 ParquetWriterBuilder,
173 DefaultLocationGenerator,
174 DefaultFileNameGenerator,
175 >,
176>;
177
178fn add_field_ids_to_arrow_schema(schema: ArrowSchema) -> ArrowSchema {
183 let mut next_field_id = 1i32;
184 let fields: Vec<Field> = schema
185 .fields()
186 .iter()
187 .map(|field| add_field_ids_recursive(field, &mut next_field_id))
188 .collect();
189 ArrowSchema::new(fields).with_metadata(schema.metadata().clone())
190}
191
192fn add_field_ids_recursive(field: &Field, next_id: &mut i32) -> Field {
194 let current_id = *next_id;
195 *next_id += 1;
196
197 let mut metadata = field.metadata().clone();
198 metadata.insert(
199 PARQUET_FIELD_ID_META_KEY.to_string(),
200 current_id.to_string(),
201 );
202
203 let new_data_type = add_field_ids_to_datatype(field.data_type(), next_id);
204
205 Field::new(field.name(), new_data_type, field.is_nullable()).with_metadata(metadata)
206}
207
208fn add_field_ids_to_datatype(data_type: &DataType, next_id: &mut i32) -> DataType {
210 match data_type {
211 DataType::Struct(fields) => {
212 let new_fields: Vec<Field> = fields
213 .iter()
214 .map(|f| add_field_ids_recursive(f, next_id))
215 .collect();
216 DataType::Struct(new_fields.into())
217 }
218 DataType::List(element_field) => {
219 let new_element = add_field_ids_recursive(element_field, next_id);
220 DataType::List(Arc::new(new_element))
221 }
222 DataType::LargeList(element_field) => {
223 let new_element = add_field_ids_recursive(element_field, next_id);
224 DataType::LargeList(Arc::new(new_element))
225 }
226 DataType::Map(entries_field, sorted) => {
227 let new_entries = add_field_ids_recursive(entries_field, next_id);
228 DataType::Map(Arc::new(new_entries), *sorted)
229 }
230 _ => data_type.clone(),
231 }
232}
233
234fn merge_materialize_metadata_into_iceberg_schema(
239 materialize_arrow_schema: &ArrowSchema,
240 iceberg_schema: &Schema,
241) -> anyhow::Result<ArrowSchema> {
242 let iceberg_arrow_schema = schema_to_arrow_schema(iceberg_schema)
244 .context("Failed to convert Iceberg schema to Arrow schema")?;
245
246 let fields: Vec<Field> = iceberg_arrow_schema
248 .fields()
249 .iter()
250 .map(|iceberg_field| {
251 let mz_field = materialize_arrow_schema
253 .field_with_name(iceberg_field.name())
254 .with_context(|| {
255 format!(
256 "Field '{}' not found in Materialize schema",
257 iceberg_field.name()
258 )
259 })?;
260
261 merge_field_metadata_recursive(iceberg_field, Some(mz_field))
262 })
263 .collect::<anyhow::Result<Vec<_>>>()?;
264
265 Ok(ArrowSchema::new(fields).with_metadata(iceberg_arrow_schema.metadata().clone()))
266}
267
268fn merge_field_metadata_recursive(
270 iceberg_field: &Field,
271 mz_field: Option<&Field>,
272) -> anyhow::Result<Field> {
273 let mut metadata = iceberg_field.metadata().clone();
275
276 if let Some(mz_f) = mz_field {
278 if let Some(extension_name) = mz_f.metadata().get(ARROW_EXTENSION_NAME_KEY) {
279 metadata.insert(ARROW_EXTENSION_NAME_KEY.to_string(), extension_name.clone());
280 }
281 }
282
283 let new_data_type = match iceberg_field.data_type() {
285 DataType::Struct(iceberg_fields) => {
286 let mz_struct_fields = match mz_field {
287 Some(f) => match f.data_type() {
288 DataType::Struct(fields) => Some(fields),
289 other => anyhow::bail!(
290 "Type mismatch for field '{}': Iceberg schema has Struct, but Materialize schema has {:?}",
291 iceberg_field.name(),
292 other
293 ),
294 },
295 None => None,
296 };
297
298 let new_fields: Vec<Field> = iceberg_fields
299 .iter()
300 .map(|iceberg_inner| {
301 let mz_inner = mz_struct_fields.and_then(|fields| {
302 fields.iter().find(|f| f.name() == iceberg_inner.name())
303 });
304 merge_field_metadata_recursive(iceberg_inner, mz_inner.map(|f| f.as_ref()))
305 })
306 .collect::<anyhow::Result<Vec<_>>>()?;
307
308 DataType::Struct(new_fields.into())
309 }
310 DataType::List(iceberg_element) => {
311 let mz_element = match mz_field {
312 Some(f) => match f.data_type() {
313 DataType::List(element) => Some(element.as_ref()),
314 other => anyhow::bail!(
315 "Type mismatch for field '{}': Iceberg schema has List, but Materialize schema has {:?}",
316 iceberg_field.name(),
317 other
318 ),
319 },
320 None => None,
321 };
322 let new_element = merge_field_metadata_recursive(iceberg_element, mz_element)?;
323 DataType::List(Arc::new(new_element))
324 }
325 DataType::LargeList(iceberg_element) => {
326 let mz_element = match mz_field {
327 Some(f) => match f.data_type() {
328 DataType::LargeList(element) => Some(element.as_ref()),
329 other => anyhow::bail!(
330 "Type mismatch for field '{}': Iceberg schema has LargeList, but Materialize schema has {:?}",
331 iceberg_field.name(),
332 other
333 ),
334 },
335 None => None,
336 };
337 let new_element = merge_field_metadata_recursive(iceberg_element, mz_element)?;
338 DataType::LargeList(Arc::new(new_element))
339 }
340 DataType::Map(iceberg_entries, sorted) => {
341 let mz_entries = match mz_field {
342 Some(f) => match f.data_type() {
343 DataType::Map(entries, _) => Some(entries.as_ref()),
344 other => anyhow::bail!(
345 "Type mismatch for field '{}': Iceberg schema has Map, but Materialize schema has {:?}",
346 iceberg_field.name(),
347 other
348 ),
349 },
350 None => None,
351 };
352 let new_entries = merge_field_metadata_recursive(iceberg_entries, mz_entries)?;
353 DataType::Map(Arc::new(new_entries), *sorted)
354 }
355 other => other.clone(),
356 };
357
358 Ok(Field::new(
359 iceberg_field.name(),
360 new_data_type,
361 iceberg_field.is_nullable(),
362 )
363 .with_metadata(metadata))
364}
365
366async fn reload_table(
367 catalog: &dyn Catalog,
368 namespace: String,
369 table_name: String,
370 current_table: Table,
371) -> anyhow::Result<Table> {
372 let namespace_ident = NamespaceIdent::new(namespace.clone());
373 let table_ident = TableIdent::new(namespace_ident, table_name.clone());
374 let current_schema = current_table.metadata().current_schema_id();
375 let current_partition_spec = current_table.metadata().default_partition_spec_id();
376
377 match catalog.load_table(&table_ident).await {
378 Ok(table) => {
379 let reloaded_schema = table.metadata().current_schema_id();
380 let reloaded_partition_spec = table.metadata().default_partition_spec_id();
381 if reloaded_schema != current_schema {
382 return Err(anyhow::anyhow!(
383 "Iceberg table '{}' schema changed during operation but schema evolution isn't supported, expected schema ID {}, got {}",
384 table_name,
385 current_schema,
386 reloaded_schema
387 ));
388 }
389
390 if reloaded_partition_spec != current_partition_spec {
391 return Err(anyhow::anyhow!(
392 "Iceberg table '{}' partition spec changed during operation but partition spec evolution isn't supported, expected partition spec ID {}, got {}",
393 table_name,
394 current_partition_spec,
395 reloaded_partition_spec
396 ));
397 }
398
399 Ok(table)
400 }
401 Err(err) => Err(err).context("Failed to reload Iceberg table"),
402 }
403}
404
405async fn load_or_create_table(
407 catalog: &dyn Catalog,
408 namespace: String,
409 table_name: String,
410 schema: &Schema,
411) -> anyhow::Result<iceberg::table::Table> {
412 let namespace_ident = NamespaceIdent::new(namespace.clone());
413 let table_ident = TableIdent::new(namespace_ident.clone(), table_name.clone());
414
415 match catalog.load_table(&table_ident).await {
417 Ok(table) => {
418 Ok(table)
421 }
422 Err(err) => {
423 if matches!(err.kind(), ErrorKind::TableNotFound { .. })
424 || err
425 .message()
426 .contains("Tried to load a table that does not exist")
427 {
428 let table_creation = TableCreation::builder()
432 .name(table_name.clone())
433 .schema(schema.clone())
434 .build();
438
439 catalog
440 .create_table(&namespace_ident, table_creation)
441 .await
442 .with_context(|| {
443 format!(
444 "Failed to create Iceberg table '{}' in namespace '{}'",
445 table_name, namespace
446 )
447 })
448 } else {
449 Err(err).context("Failed to load Iceberg table")
451 }
452 }
453 }
454}
455
456fn retrieve_upper_from_snapshots(
461 snapshots: &mut [Arc<Snapshot>],
462) -> anyhow::Result<Option<(Antichain<Timestamp>, u64)>> {
463 snapshots.sort_by(|a, b| Ord::cmp(&b.sequence_number(), &a.sequence_number()));
464
465 for snapshot in snapshots {
466 let props = &snapshot.summary().additional_properties;
467 if let (Some(frontier_json), Some(sink_version_str)) =
468 (props.get("mz-frontier"), props.get("mz-sink-version"))
469 {
470 let frontier: Vec<Timestamp> = serde_json::from_str(frontier_json)
471 .context("Failed to deserialize frontier from snapshot properties")?;
472 let frontier = Antichain::from_iter(frontier);
473
474 let sink_version = sink_version_str
475 .parse::<u64>()
476 .context("Failed to parse mz-sink-version from snapshot properties")?;
477
478 return Ok(Some((frontier, sink_version)));
479 }
480 if snapshot.summary().operation.as_str() != "replace" {
481 anyhow::bail!(
486 "Iceberg table is in an inconsistent state: snapshot {} has operation '{}' but is missing 'mz-frontier' property. Schema or partition spec evolution is not supported.",
487 snapshot.snapshot_id(),
488 snapshot.summary().operation.as_str(),
489 );
490 }
491 }
492
493 Ok(None)
494}
495
496fn relation_desc_to_iceberg_schema(
500 desc: &mz_repr::RelationDesc,
501) -> anyhow::Result<(ArrowSchema, SchemaRef)> {
502 let arrow_schema = mz_arrow_util::builder::desc_to_schema(desc)
503 .context("Failed to convert RelationDesc to Arrow schema")?;
504
505 let arrow_schema_with_ids = add_field_ids_to_arrow_schema(arrow_schema);
506
507 let iceberg_schema = arrow_schema_to_schema(&arrow_schema_with_ids)
508 .context("Failed to convert Arrow schema to Iceberg schema")?;
509
510 Ok((arrow_schema_with_ids, Arc::new(iceberg_schema)))
511}
512
513fn build_schema_with_op_column(schema: &ArrowSchema) -> ArrowSchema {
515 let mut fields: Vec<Arc<Field>> = schema.fields().iter().cloned().collect();
516 fields.push(Arc::new(Field::new("__op", DataType::Int32, false)));
517 ArrowSchema::new(fields)
518}
519
520fn row_to_recordbatch(
524 row: DiffPair<Row>,
525 schema: ArrowSchemaRef,
526 schema_with_op: ArrowSchemaRef,
527) -> anyhow::Result<RecordBatch> {
528 let mut builder = ArrowBuilder::new_with_schema(
529 Arc::clone(&schema),
530 DEFAULT_ARRAY_BUILDER_ITEM_CAPACITY,
531 DEFAULT_ARRAY_BUILDER_DATA_CAPACITY,
532 )
533 .context("Failed to create builder")?;
534
535 let mut op_values = Vec::new();
536
537 if let Some(before) = row.before {
538 builder
539 .add_row(&before)
540 .context("Failed to add delete row to builder")?;
541 op_values.push(-1i32); }
543 if let Some(after) = row.after {
544 builder
545 .add_row(&after)
546 .context("Failed to add insert row to builder")?;
547 op_values.push(1i32); }
549
550 let batch = builder
551 .to_record_batch()
552 .context("Failed to create record batch")?;
553
554 let mut columns: Vec<ArrayRef> = batch.columns().to_vec();
555 let op_column = Arc::new(Int32Array::from(op_values));
556 columns.push(op_column);
557
558 let batch_with_op = RecordBatch::try_new(schema_with_op, columns)
559 .context("Failed to create batch with op column")?;
560 Ok(batch_with_op)
561}
562
563fn mint_batch_descriptions<G, D>(
568 name: String,
569 sink_id: GlobalId,
570 input: VecCollection<G, D, Diff>,
571 sink: &StorageSinkDesc<CollectionMetadata, Timestamp>,
572 connection: IcebergSinkConnection,
573 storage_configuration: StorageConfiguration,
574 initial_schema: SchemaRef,
575) -> (
576 VecCollection<G, D, Diff>,
577 StreamVec<G, (Antichain<Timestamp>, Antichain<Timestamp>)>,
578 StreamVec<G, Infallible>,
579 StreamVec<G, HealthStatusMessage>,
580 PressOnDropButton,
581)
582where
583 G: Scope<Timestamp = Timestamp>,
584 D: Clone + 'static,
585{
586 let scope = input.scope();
587 let name_for_error = name.clone();
588 let name_for_logging = name.clone();
589 let mut builder = OperatorBuilder::new(name, scope.clone());
590 let sink_version = sink.version;
591
592 let hashed_id = sink_id.hashed();
593 let is_active_worker = usize::cast_from(hashed_id) % scope.peers() == scope.index();
594 let (_, table_ready_stream) = builder.new_output::<CapacityContainerBuilder<Vec<_>>>();
595 let (output, output_stream) = builder.new_output();
596 let (batch_desc_output, batch_desc_stream) =
597 builder.new_output::<CapacityContainerBuilder<Vec<_>>>();
598 let mut input =
599 builder.new_input_for_many(input.inner, Pipeline, [&output, &batch_desc_output]);
600
601 let as_of = sink.as_of.clone();
602 let commit_interval = sink
603 .commit_interval
604 .expect("the planner should have enforced this")
605 .clone();
606
607 let (button, errors): (_, StreamVec<G, Rc<anyhow::Error>>) =
608 builder.build_fallible(move |caps| {
609 Box::pin(async move {
610 let [table_ready_capset, data_capset, capset]: &mut [_; 3] = caps.try_into().unwrap();
611 *data_capset = CapabilitySet::new();
612
613 if !is_active_worker {
614 *capset = CapabilitySet::new();
615 *data_capset = CapabilitySet::new();
616 *table_ready_capset = CapabilitySet::new();
617 while let Some(event) = input.next().await {
618 match event {
619 Event::Data([output_cap, _], mut data) => {
620 output.give_container(&output_cap, &mut data);
621 }
622 Event::Progress(_) => {}
623 }
624 }
625 return Ok(());
626 }
627
628 let catalog = connection
629 .catalog_connection
630 .connect(&storage_configuration, InTask::Yes)
631 .await
632 .with_context(|| {
633 format!(
634 "Failed to connect to Iceberg catalog '{}' for table '{}.{}'",
635 connection.catalog_connection.uri, connection.namespace, connection.table
636 )
637 })?;
638
639 let table = load_or_create_table(
640 catalog.as_ref(),
641 connection.namespace.clone(),
642 connection.table.clone(),
643 initial_schema.as_ref(),
644 )
645 .await?;
646 debug!(
647 ?sink_id,
648 %name_for_logging,
649 namespace = %connection.namespace,
650 table = %connection.table,
651 "iceberg mint loaded/created table"
652 );
653
654 *table_ready_capset = CapabilitySet::new();
655
656 let mut snapshots: Vec<_> = table.metadata().snapshots().cloned().collect();
657 let resume = retrieve_upper_from_snapshots(&mut snapshots)?;
658 let (resume_upper, resume_version) = match resume {
659 Some((f, v)) => (f, v),
660 None => (Antichain::from_elem(Timestamp::minimum()), 0),
661 };
662 debug!(
663 ?sink_id,
664 %name_for_logging,
665 resume_upper = %resume_upper.pretty(),
666 resume_version,
667 as_of = %as_of.pretty(),
668 "iceberg mint resume position loaded"
669 );
670
671 let overcompacted =
673 *resume_upper != [Timestamp::minimum()] &&
675 PartialOrder::less_than(&resume_upper, &as_of);
677
678 if overcompacted {
679 let err = format!(
680 "{name_for_error}: input compacted past resume upper: as_of {}, resume_upper: {}",
681 as_of.pretty(),
682 resume_upper.pretty()
683 );
684 return Err(anyhow::anyhow!("{err}"));
688 };
689
690 if resume_version > sink_version {
691 anyhow::bail!("Fenced off by newer sink version: resume_version {}, sink_version {}", resume_version, sink_version);
692 }
693
694 let mut initialized = false;
695 let mut observed_frontier;
696 let mut max_seen_ts: Option<Timestamp> = None;
697 let mut minted_batches = VecDeque::new();
702 loop {
703 if let Some(event) = input.next().await {
704 match event {
705 Event::Data([output_cap, _], mut data) => {
706 if !initialized {
707 for (_, ts, _) in data.iter() {
708 match max_seen_ts.as_mut() {
709 Some(max) => {
710 if max.less_than(ts) {
711 *max = ts.clone();
712 }
713 }
714 None => {
715 max_seen_ts = Some(ts.clone());
716 }
717 }
718 }
719 }
720 output.give_container(&output_cap, &mut data);
721 continue;
722 }
723 Event::Progress(frontier) => {
724 observed_frontier = frontier;
725 }
726 }
727 } else {
728 return Ok(());
729 }
730
731 if !initialized {
732 if observed_frontier.is_empty() {
733 if let Some(max_ts) = max_seen_ts.as_ref() {
739 let synthesized_upper =
740 Antichain::from_elem(max_ts.step_forward());
741 debug!(
742 ?sink_id,
743 %name_for_logging,
744 max_seen_ts = %max_ts,
745 synthesized_upper = %synthesized_upper.pretty(),
746 "iceberg mint input closed before initialization; using max seen ts"
747 );
748 observed_frontier = synthesized_upper;
749 } else {
750 debug!(
751 ?sink_id,
752 %name_for_logging,
753 "iceberg mint input closed before initialization with no data"
754 );
755 return Ok(());
757 }
758 }
759
760 if PartialOrder::less_than(&observed_frontier, &resume_upper)
763 || PartialOrder::less_than(&observed_frontier, &as_of)
764 {
765 continue;
766 }
767
768 let mut batch_descriptions = vec![];
769 let mut current_upper = observed_frontier.clone();
770 let current_upper_ts = current_upper.as_option().expect("frontier not empty").clone();
771
772 let batch_lower = if PartialOrder::less_than(&resume_upper, &as_of) {
777 as_of.clone()
778 } else {
779 resume_upper.clone()
780 };
781
782 if batch_lower == current_upper {
783 current_upper = Antichain::from_elem(current_upper_ts.step_forward());
786 }
787
788 let batch_description = (batch_lower.clone(), current_upper.clone());
789 debug!(
790 ?sink_id,
791 %name_for_logging,
792 batch_lower = %batch_lower.pretty(),
793 current_upper = %current_upper.pretty(),
794 "iceberg mint initializing (catch-up batch)"
795 );
796 debug!(
797 "{}: creating catch-up batch [{}, {})",
798 name_for_logging,
799 batch_lower.pretty(),
800 current_upper.pretty()
801 );
802 batch_descriptions.push(batch_description);
803 for i in 1..INITIAL_DESCRIPTIONS_TO_MINT {
805 let duration_millis = commit_interval.as_millis()
806 .checked_mul(u128::from(i))
807 .expect("commit interval multiplication overflow");
808 let duration_ts = Timestamp::new(
809 u64::try_from(duration_millis)
810 .expect("commit interval too large for u64"),
811 );
812 let desired_batch_upper = Antichain::from_elem(
813 current_upper_ts.step_forward_by(&duration_ts),
814 );
815
816 let batch_description =
817 (current_upper.clone(), desired_batch_upper.clone());
818 debug!(
819 "{}: minting future batch {}/{} [{}, {})",
820 name_for_logging,
821 i,
822 INITIAL_DESCRIPTIONS_TO_MINT,
823 current_upper.pretty(),
824 desired_batch_upper.pretty()
825 );
826 current_upper = batch_description.1.clone();
827 batch_descriptions.push(batch_description);
828 }
829
830 minted_batches.extend(batch_descriptions.clone());
831
832 for desc in batch_descriptions {
833 batch_desc_output.give(&capset[0], desc);
834 }
835
836 capset.downgrade(current_upper);
837
838 initialized = true;
839 } else {
840 if observed_frontier.is_empty() {
841 return Ok(());
843 }
844 while let Some(oldest_desc) = minted_batches.front() {
847 let oldest_upper = &oldest_desc.1;
848 if !PartialOrder::less_equal(oldest_upper, &observed_frontier) {
849 break;
850 }
851
852 let newest_upper = minted_batches.back().unwrap().1.clone();
853 let new_lower = newest_upper.clone();
854 let duration_ts = Timestamp::new(commit_interval.as_millis()
855 .try_into()
856 .expect("commit interval too large for u64"));
857 let new_upper = Antichain::from_elem(newest_upper
858 .as_option()
859 .unwrap()
860 .step_forward_by(&duration_ts));
861
862 let new_batch_description = (new_lower.clone(), new_upper.clone());
863 minted_batches.pop_front();
864 minted_batches.push_back(new_batch_description.clone());
865
866 batch_desc_output.give(&capset[0], new_batch_description);
867
868 capset.downgrade(new_upper);
869 }
870 }
871 }
872 })
873 });
874
875 let statuses = errors.map(|error| HealthStatusMessage {
876 id: None,
877 update: HealthStatusUpdate::halting(format!("{}", error.display_with_causes()), None),
878 namespace: StatusNamespace::Iceberg,
879 });
880 (
881 output_stream.as_collection(),
882 batch_desc_stream,
883 table_ready_stream,
884 statuses,
885 button.press_on_drop(),
886 )
887}
888
889#[derive(Clone, Debug, Serialize, Deserialize)]
890#[serde(try_from = "AvroDataFile", into = "AvroDataFile")]
891struct SerializableDataFile {
892 pub data_file: DataFile,
893 pub schema: Schema,
894}
895
896#[derive(Clone, Debug, Serialize, Deserialize)]
904struct AvroDataFile {
905 pub data_file: Vec<u8>,
906 pub schema: Vec<u8>,
908}
909
910impl From<SerializableDataFile> for AvroDataFile {
911 fn from(value: SerializableDataFile) -> Self {
912 let mut data_file = Vec::new();
913 write_data_files_to_avro(
914 &mut data_file,
915 [value.data_file],
916 &StructType::new(vec![]),
917 FormatVersion::V2,
918 )
919 .expect("serialization into buffer");
920 let schema = serde_json::to_vec(&value.schema).expect("schema serialization");
921 AvroDataFile { data_file, schema }
922 }
923}
924
925impl TryFrom<AvroDataFile> for SerializableDataFile {
926 type Error = String;
927
928 fn try_from(value: AvroDataFile) -> Result<Self, Self::Error> {
929 let schema: Schema = serde_json::from_slice(&value.schema)
930 .map_err(|e| format!("Failed to deserialize schema: {}", e))?;
931 let data_files = read_data_files_from_avro(
932 &mut &*value.data_file,
933 &schema,
934 0,
935 &StructType::new(vec![]),
936 FormatVersion::V2,
937 )
938 .map_err_to_string_with_causes()?;
939 let Some(data_file) = data_files.into_iter().next() else {
940 return Err("No DataFile found in Avro data".into());
941 };
942 Ok(SerializableDataFile { data_file, schema })
943 }
944}
945
946#[derive(Clone, Debug, Serialize, Deserialize)]
948struct BoundedDataFile {
949 pub data_file: SerializableDataFile,
950 pub batch_desc: (Antichain<Timestamp>, Antichain<Timestamp>),
951}
952
953impl BoundedDataFile {
954 pub fn new(
955 file: DataFile,
956 schema: Schema,
957 batch_desc: (Antichain<Timestamp>, Antichain<Timestamp>),
958 ) -> Self {
959 Self {
960 data_file: SerializableDataFile {
961 data_file: file,
962 schema,
963 },
964 batch_desc,
965 }
966 }
967
968 pub fn batch_desc(&self) -> &(Antichain<Timestamp>, Antichain<Timestamp>) {
969 &self.batch_desc
970 }
971
972 pub fn data_file(&self) -> &DataFile {
973 &self.data_file.data_file
974 }
975
976 pub fn into_data_file(self) -> DataFile {
977 self.data_file.data_file
978 }
979}
980
981#[derive(Clone, Debug, Default)]
983struct BoundedDataFileSet {
984 pub data_files: Vec<BoundedDataFile>,
985}
986
987fn write_data_files<G>(
991 name: String,
992 input: VecCollection<G, (Option<Row>, DiffPair<Row>), Diff>,
993 batch_desc_input: StreamVec<G, (Antichain<Timestamp>, Antichain<Timestamp>)>,
994 table_ready_stream: StreamVec<G, Infallible>,
995 as_of: Antichain<Timestamp>,
996 connection: IcebergSinkConnection,
997 storage_configuration: StorageConfiguration,
998 materialize_arrow_schema: Arc<ArrowSchema>,
999 metrics: Arc<IcebergSinkMetrics>,
1000 statistics: SinkStatistics,
1001) -> (
1002 StreamVec<G, BoundedDataFile>,
1003 StreamVec<G, HealthStatusMessage>,
1004 PressOnDropButton,
1005)
1006where
1007 G: Scope<Timestamp = Timestamp>,
1008{
1009 let scope = input.scope();
1010 let name_for_logging = name.clone();
1011 let mut builder = OperatorBuilder::new(name, scope.clone());
1012
1013 let (output, output_stream) = builder.new_output::<CapacityContainerBuilder<_>>();
1014
1015 let mut table_ready_input = builder.new_disconnected_input(table_ready_stream, Pipeline);
1016 let mut batch_desc_input =
1017 builder.new_input_for(batch_desc_input.broadcast(), Pipeline, &output);
1018 let mut input = builder.new_disconnected_input(input.inner, Pipeline);
1019
1020 let (button, errors) = builder.build_fallible(move |caps| {
1021 Box::pin(async move {
1022 let [capset]: &mut [_; 1] = caps.try_into().unwrap();
1023 let catalog = connection
1024 .catalog_connection
1025 .connect(&storage_configuration, InTask::Yes)
1026 .await
1027 .with_context(|| {
1028 format!(
1029 "Failed to connect to Iceberg catalog '{}' for table '{}.{}'",
1030 connection.catalog_connection.uri, connection.namespace, connection.table
1031 )
1032 })?;
1033
1034 let namespace_ident = NamespaceIdent::new(connection.namespace.clone());
1035 let table_ident = TableIdent::new(namespace_ident, connection.table.clone());
1036 while let Some(_) = table_ready_input.next().await {
1037 }
1039 let table = catalog
1040 .load_table(&table_ident)
1041 .await
1042 .with_context(|| {
1043 format!(
1044 "Failed to load Iceberg table '{}.{}' in write_data_files operator",
1045 connection.namespace, connection.table
1046 )
1047 })?;
1048
1049 let table_metadata = table.metadata().clone();
1050 let current_schema = Arc::clone(table_metadata.current_schema());
1051
1052 let arrow_schema = Arc::new(
1056 merge_materialize_metadata_into_iceberg_schema(
1057 materialize_arrow_schema.as_ref(),
1058 current_schema.as_ref(),
1059 )
1060 .context("Failed to merge Materialize metadata into Iceberg schema")?,
1061 );
1062
1063 let schema_with_op = Arc::new(build_schema_with_op_column(&arrow_schema));
1065
1066 let location = table_metadata.location();
1070 let corrected_location = match location.rsplit_once("/metadata/") {
1071 Some((a, b)) if b.ends_with(".metadata.json") => a,
1072 _ => location,
1073 };
1074
1075 let data_location = format!("{}/data", corrected_location);
1076 let location_generator = DefaultLocationGenerator::with_data_location(data_location);
1077
1078 let unique_suffix = format!("-{}", uuid::Uuid::new_v4());
1080 let file_name_generator = DefaultFileNameGenerator::new(
1081 PARQUET_FILE_PREFIX.to_string(),
1082 Some(unique_suffix),
1083 iceberg::spec::DataFileFormat::Parquet,
1084 );
1085
1086 let file_io = table.file_io().clone();
1087
1088 let Some((_, equality_indices)) = connection.key_desc_and_indices else {
1089 return Err(anyhow::anyhow!(
1090 "Iceberg sink requires key columns for equality deletes"
1091 ));
1092 };
1093
1094 let equality_ids: Vec<i32> = equality_indices
1095 .iter()
1096 .map(|u| i32::try_from(*u).map(|v| v + 1))
1097 .collect::<Result<Vec<i32>, _>>()
1098 .context("Failed to convert equality index to i32 (index too large)")?;
1099
1100 let writer_properties = WriterProperties::new();
1101
1102 let pos_arrow_schema = PositionDeleteWriterConfig::arrow_schema();
1104 let pos_schema = Arc::new(arrow_schema_to_schema(&pos_arrow_schema).context(
1105 "Failed to convert position delete Arrow schema to Iceberg schema",
1106 )?);
1107
1108 let eq_config = EqualityDeleteWriterConfig::new(
1109 equality_ids.clone(),
1110 Arc::clone(¤t_schema),
1111 )
1112 .context("Failed to create EqualityDeleteWriterConfig")?;
1113 let eq_schema = Arc::new(
1114 arrow_schema_to_schema(eq_config.projected_arrow_schema_ref()).context(
1115 "Failed to convert equality delete Arrow schema to Iceberg schema",
1116 )?,
1117 );
1118
1119 let arrow_schema_for_closure = Arc::clone(&arrow_schema);
1120 let current_schema_for_closure = Arc::clone(¤t_schema);
1121
1122 let create_delta_writer = move |disable_seen_rows: bool| {
1123 let arrow_schema = Arc::clone(&arrow_schema_for_closure);
1124 let current_schema = Arc::clone(¤t_schema_for_closure);
1125 let file_io = file_io.clone();
1126 let location_generator = location_generator.clone();
1127 let file_name_generator = file_name_generator.clone();
1128 let equality_ids = equality_ids.clone();
1129 let writer_properties = writer_properties.clone();
1130 let pos_schema = Arc::clone(&pos_schema);
1131 let eq_schema = Arc::clone(&eq_schema);
1132 let eq_config = eq_config.clone();
1133
1134 async move {
1135 let data_parquet_writer = ParquetWriterBuilder::new(
1136 writer_properties.clone(),
1137 Arc::clone(¤t_schema),
1138 )
1139 .with_arrow_schema(Arc::clone(&arrow_schema))
1140 .context("Arrow schema validation failed")?;
1141 let data_rolling_writer = RollingFileWriterBuilder::new_with_default_file_size(
1142 data_parquet_writer,
1143 Arc::clone(¤t_schema),
1144 file_io.clone(),
1145 location_generator.clone(),
1146 file_name_generator.clone(),
1147 );
1148 let data_writer_builder = DataFileWriterBuilder::new(data_rolling_writer);
1149
1150 let pos_config = PositionDeleteWriterConfig::new(None, 0, None);
1151 let pos_parquet_writer =
1152 ParquetWriterBuilder::new(writer_properties.clone(), pos_schema);
1153 let pos_rolling_writer = RollingFileWriterBuilder::new_with_default_file_size(
1154 pos_parquet_writer,
1155 Arc::clone(¤t_schema),
1156 file_io.clone(),
1157 location_generator.clone(),
1158 file_name_generator.clone(),
1159 );
1160 let pos_delete_writer_builder =
1161 PositionDeleteFileWriterBuilder::new(pos_rolling_writer, pos_config);
1162
1163 let eq_parquet_writer =
1164 ParquetWriterBuilder::new(writer_properties.clone(), eq_schema);
1165 let eq_rolling_writer = RollingFileWriterBuilder::new_with_default_file_size(
1166 eq_parquet_writer,
1167 Arc::clone(¤t_schema),
1168 file_io.clone(),
1169 location_generator.clone(),
1170 file_name_generator.clone(),
1171 );
1172 let eq_delete_writer_builder =
1173 EqualityDeleteFileWriterBuilder::new(eq_rolling_writer, eq_config);
1174
1175 let mut builder = DeltaWriterBuilder::new(
1176 data_writer_builder,
1177 pos_delete_writer_builder,
1178 eq_delete_writer_builder,
1179 equality_ids.clone(),
1180 );
1181
1182 if disable_seen_rows {
1183 builder = builder.with_max_seen_rows(0);
1184 }
1185
1186 builder
1187 .build(None)
1188 .await
1189 .context("Failed to create DeltaWriter")
1190 }
1191 };
1192
1193 let mut stashed_rows: BTreeMap<Timestamp, Vec<(Option<Row>, DiffPair<Row>)>> =
1196 BTreeMap::new();
1197
1198 #[allow(clippy::disallowed_types)]
1203 let mut in_flight_batches: std::collections::HashMap<
1204 (Antichain<Timestamp>, Antichain<Timestamp>),
1205 Box<DeltaWriterType>,
1206 > = std::collections::HashMap::new();
1207
1208 let mut batch_description_frontier = Antichain::from_elem(Timestamp::minimum());
1209 let mut processed_batch_description_frontier =
1210 Antichain::from_elem(Timestamp::minimum());
1211 let mut input_frontier = Antichain::from_elem(Timestamp::minimum());
1212 let mut processed_input_frontier = Antichain::from_elem(Timestamp::minimum());
1213
1214 let mut min_batch_lower: Option<Antichain<Timestamp>> = None;
1216
1217 while !(batch_description_frontier.is_empty() && input_frontier.is_empty()) {
1218 let mut staged_messages_since_flush: u64 = 0;
1219 tokio::select! {
1220 _ = batch_desc_input.ready() => {},
1221 _ = input.ready() => {}
1222 }
1223
1224 while let Some(event) = batch_desc_input.next_sync() {
1225 match event {
1226 Event::Data(_cap, data) => {
1227 for batch_desc in data {
1228 let (lower, upper) = &batch_desc;
1229
1230 if min_batch_lower.is_none() {
1232 min_batch_lower = Some(lower.clone());
1233 debug!(
1234 "{}: set min_batch_lower to {}",
1235 name_for_logging,
1236 lower.pretty()
1237 );
1238
1239 let to_remove: Vec<_> = stashed_rows
1241 .keys()
1242 .filter(|ts| {
1243 let ts_antichain = Antichain::from_elem((*ts).clone());
1244 PartialOrder::less_than(&ts_antichain, lower)
1245 })
1246 .cloned()
1247 .collect();
1248
1249 if !to_remove.is_empty() {
1250 let mut removed_count = 0;
1251 for ts in to_remove {
1252 if let Some(rows) = stashed_rows.remove(&ts) {
1253 removed_count += rows.len();
1254 for _ in &rows {
1255 metrics.stashed_rows.dec();
1256 }
1257 }
1258 }
1259 debug!(
1260 "{}: pruned {} already-committed rows (< min_batch_lower)",
1261 name_for_logging,
1262 removed_count
1263 );
1264 }
1265 }
1266
1267 let is_snapshot = lower == &as_of;
1269 debug!(
1270 "{}: received batch description [{}, {}), snapshot={}",
1271 name_for_logging,
1272 lower.pretty(),
1273 upper.pretty(),
1274 is_snapshot
1275 );
1276 let mut delta_writer = create_delta_writer(is_snapshot).await?;
1277 let row_ts_keys: Vec<_> = stashed_rows.keys().cloned().collect();
1279 let mut drained_count = 0;
1280 for row_ts in row_ts_keys {
1281 let ts = Antichain::from_elem(row_ts.clone());
1282 if PartialOrder::less_equal(lower, &ts)
1283 && PartialOrder::less_than(&ts, upper)
1284 {
1285 if let Some(rows) = stashed_rows.remove(&row_ts) {
1286 drained_count += rows.len();
1287 for (_row, diff_pair) in rows {
1288 metrics.stashed_rows.dec();
1289 let record_batch = row_to_recordbatch(
1290 diff_pair.clone(),
1291 Arc::clone(&arrow_schema),
1292 Arc::clone(&schema_with_op),
1293 )
1294 .context("failed to convert row to recordbatch")?;
1295 delta_writer.write(record_batch).await.context(
1296 "Failed to write row to DeltaWriter",
1297 )?;
1298 staged_messages_since_flush += 1;
1299 if staged_messages_since_flush >= 10_000 {
1300 statistics.inc_messages_staged_by(
1301 staged_messages_since_flush,
1302 );
1303 staged_messages_since_flush = 0;
1304 }
1305 }
1306 }
1307 }
1308 }
1309 if drained_count > 0 {
1310 debug!(
1311 "{}: drained {} stashed rows into batch [{}, {})",
1312 name_for_logging,
1313 drained_count,
1314 lower.pretty(),
1315 upper.pretty()
1316 );
1317 }
1318 let prev =
1319 in_flight_batches.insert(
1320 batch_desc.clone(), Box::new(delta_writer)
1321 );
1322 if prev.is_some() {
1323 anyhow::bail!(
1324 "Duplicate batch description received for description {:?}",
1325 batch_desc
1326 );
1327 }
1328 }
1329 }
1330 Event::Progress(frontier) => {
1331 batch_description_frontier = frontier;
1332 }
1333 }
1334 }
1335
1336 let ready_events = std::iter::from_fn(|| input.next_sync()).collect_vec();
1337 for event in ready_events {
1338 match event {
1339 Event::Data(_cap, data) => {
1340 let mut dropped_per_time = BTreeMap::new();
1341 let mut stashed_per_time = BTreeMap::new();
1342 for ((row, diff_pair), ts, _diff) in data {
1343 let row_ts = ts.clone();
1344 let ts_antichain = Antichain::from_elem(row_ts.clone());
1345 let mut written = false;
1346 for (batch_desc, delta_writer) in in_flight_batches.iter_mut() {
1348 let (lower, upper) = batch_desc;
1349 if PartialOrder::less_equal(lower, &ts_antichain)
1350 && PartialOrder::less_than(&ts_antichain, upper)
1351 {
1352 let record_batch = row_to_recordbatch(
1353 diff_pair.clone(),
1354 Arc::clone(&arrow_schema),
1355 Arc::clone(&schema_with_op),
1356 )
1357 .context("failed to convert row to recordbatch")?;
1358 delta_writer
1359 .write(record_batch)
1360 .await
1361 .context("Failed to write row to DeltaWriter")?;
1362 staged_messages_since_flush += 1;
1363 if staged_messages_since_flush >= 10_000 {
1364 statistics.inc_messages_staged_by(
1365 staged_messages_since_flush,
1366 );
1367 staged_messages_since_flush = 0;
1368 }
1369 written = true;
1370 break;
1371 }
1372 }
1373 if !written {
1374 if let Some(ref min_lower) = min_batch_lower {
1376 if PartialOrder::less_than(&ts_antichain, min_lower) {
1377 dropped_per_time
1378 .entry(ts_antichain.into_option().unwrap())
1379 .and_modify(|c| *c += 1)
1380 .or_insert(1);
1381 continue;
1382 }
1383 }
1384
1385 stashed_per_time.entry(ts).and_modify(|c| *c += 1).or_insert(1);
1386 let entry = stashed_rows.entry(row_ts).or_default();
1387 metrics.stashed_rows.inc();
1388 entry.push((row, diff_pair));
1389 }
1390 }
1391
1392 for (ts, count) in dropped_per_time {
1393 debug!(
1394 "{}: dropped {} rows at timestamp {} (< min_batch_lower, already committed)",
1395 name_for_logging, count, ts
1396 );
1397 }
1398
1399 for (ts, count) in stashed_per_time {
1400 debug!(
1401 "{}: stashed {} rows at timestamp {} (waiting for batch description)",
1402 name_for_logging, count, ts
1403 );
1404 }
1405 }
1406 Event::Progress(frontier) => {
1407 input_frontier = frontier;
1408 }
1409 }
1410 }
1411 if staged_messages_since_flush > 0 {
1412 statistics.inc_messages_staged_by(staged_messages_since_flush);
1413 }
1414
1415 if PartialOrder::less_than(
1417 &processed_batch_description_frontier,
1418 &batch_description_frontier,
1419 ) || PartialOrder::less_than(&processed_input_frontier, &input_frontier)
1420 {
1421 let ready_batches: Vec<_> = in_flight_batches
1427 .extract_if(|(lower, upper), _| {
1428 PartialOrder::less_than(lower, &batch_description_frontier)
1429 && PartialOrder::less_equal(upper, &input_frontier)
1430 })
1431 .collect();
1432
1433 if !ready_batches.is_empty() {
1434 debug!(
1435 "{}: closing {} batches (batch_frontier: {}, input_frontier: {})",
1436 name_for_logging,
1437 ready_batches.len(),
1438 batch_description_frontier.pretty(),
1439 input_frontier.pretty()
1440 );
1441 let mut max_upper = Antichain::from_elem(Timestamp::minimum());
1442 for (desc, mut delta_writer) in ready_batches {
1443 let close_started_at = Instant::now();
1444 let data_files = delta_writer.close().await;
1445 metrics
1446 .writer_close_duration_seconds
1447 .observe(close_started_at.elapsed().as_secs_f64());
1448 let data_files = data_files.context("Failed to close DeltaWriter")?;
1449 debug!(
1450 "{}: closed batch [{}, {}), wrote {} files",
1451 name_for_logging,
1452 desc.0.pretty(),
1453 desc.1.pretty(),
1454 data_files.len()
1455 );
1456 for data_file in data_files {
1457 match data_file.content_type() {
1458 iceberg::spec::DataContentType::Data => {
1459 metrics.data_files_written.inc();
1460 }
1461 iceberg::spec::DataContentType::PositionDeletes
1462 | iceberg::spec::DataContentType::EqualityDeletes => {
1463 metrics.delete_files_written.inc();
1464 }
1465 }
1466 statistics.inc_messages_staged_by(data_file.record_count());
1467 statistics.inc_bytes_staged_by(data_file.file_size_in_bytes());
1468 let file = BoundedDataFile::new(
1469 data_file,
1470 current_schema.as_ref().clone(),
1471 desc.clone(),
1472 );
1473 output.give(&capset[0], file);
1474 }
1475
1476 max_upper = max_upper.join(&desc.1);
1477 }
1478
1479 capset.downgrade(max_upper);
1480 }
1481 processed_batch_description_frontier.clone_from(&batch_description_frontier);
1482 processed_input_frontier.clone_from(&input_frontier);
1483 }
1484 }
1485 Ok(())
1486 })
1487 });
1488
1489 let statuses = errors.map(|error| HealthStatusMessage {
1490 id: None,
1491 update: HealthStatusUpdate::halting(format!("{}", error.display_with_causes()), None),
1492 namespace: StatusNamespace::Iceberg,
1493 });
1494 (output_stream, statuses, button.press_on_drop())
1495}
1496
1497fn commit_to_iceberg<G>(
1501 name: String,
1502 sink_id: GlobalId,
1503 sink_version: u64,
1504 batch_input: StreamVec<G, BoundedDataFile>,
1505 batch_desc_input: StreamVec<G, (Antichain<Timestamp>, Antichain<Timestamp>)>,
1506 table_ready_stream: StreamVec<G, Infallible>,
1507 write_frontier: Rc<RefCell<Antichain<Timestamp>>>,
1508 connection: IcebergSinkConnection,
1509 storage_configuration: StorageConfiguration,
1510 write_handle: impl Future<
1511 Output = anyhow::Result<WriteHandle<SourceData, (), Timestamp, StorageDiff>>,
1512 > + 'static,
1513 metrics: Arc<IcebergSinkMetrics>,
1514 statistics: SinkStatistics,
1515) -> (StreamVec<G, HealthStatusMessage>, PressOnDropButton)
1516where
1517 G: Scope<Timestamp = Timestamp>,
1518{
1519 let scope = batch_input.scope();
1520 let mut builder = OperatorBuilder::new(name, scope.clone());
1521
1522 let hashed_id = sink_id.hashed();
1523 let is_active_worker = usize::cast_from(hashed_id) % scope.peers() == scope.index();
1524 let name_for_logging = format!("{sink_id}-commit-to-iceberg");
1525
1526 let mut input = builder.new_disconnected_input(batch_input, Exchange::new(move |_| hashed_id));
1527 let mut batch_desc_input =
1528 builder.new_disconnected_input(batch_desc_input, Exchange::new(move |_| hashed_id));
1529 let mut table_ready_input = builder.new_disconnected_input(table_ready_stream, Pipeline);
1530
1531 let (button, errors) = builder.build_fallible(move |_caps| {
1532 Box::pin(async move {
1533 if !is_active_worker {
1534 write_frontier.borrow_mut().clear();
1535 return Ok(());
1536 }
1537
1538 let catalog = connection
1539 .catalog_connection
1540 .connect(&storage_configuration, InTask::Yes)
1541 .await
1542 .with_context(|| {
1543 format!(
1544 "Failed to connect to Iceberg catalog '{}' for table '{}.{}'",
1545 connection.catalog_connection.uri, connection.namespace, connection.table
1546 )
1547 })?;
1548
1549 let mut write_handle = write_handle.await?;
1550
1551 let namespace_ident = NamespaceIdent::new(connection.namespace.clone());
1552 let table_ident = TableIdent::new(namespace_ident, connection.table.clone());
1553 while let Some(_) = table_ready_input.next().await {
1554 }
1556 let mut table = catalog
1557 .load_table(&table_ident)
1558 .await
1559 .with_context(|| {
1560 format!(
1561 "Failed to load Iceberg table '{}.{}' in commit_to_iceberg operator",
1562 connection.namespace, connection.table
1563 )
1564 })?;
1565
1566 #[allow(clippy::disallowed_types)]
1567 let mut batch_descriptions: std::collections::HashMap<
1568 (Antichain<Timestamp>, Antichain<Timestamp>),
1569 BoundedDataFileSet,
1570 > = std::collections::HashMap::new();
1571
1572
1573 let mut batch_description_frontier = Antichain::from_elem(Timestamp::minimum());
1574 let mut input_frontier = Antichain::from_elem(Timestamp::minimum());
1575
1576 while !(batch_description_frontier.is_empty() && input_frontier.is_empty()) {
1577 tokio::select! {
1578 _ = batch_desc_input.ready() => {},
1579 _ = input.ready() => {}
1580 }
1581
1582 while let Some(event) = batch_desc_input.next_sync() {
1583 match event {
1584 Event::Data(_cap, data) => {
1585 for batch_desc in data {
1586 let prev = batch_descriptions.insert(
1587 batch_desc,
1588 BoundedDataFileSet { data_files: vec![] },
1589 );
1590 if let Some(prev) = prev {
1591 anyhow::bail!(
1592 "Duplicate batch description received \
1593 in commit operator: {:?}",
1594 prev
1595 );
1596 }
1597 }
1598 }
1599 Event::Progress(frontier) => {
1600 batch_description_frontier = frontier;
1601 }
1602 }
1603 }
1604
1605 let ready_events = std::iter::from_fn(|| input.next_sync()).collect_vec();
1606 for event in ready_events {
1607 match event {
1608 Event::Data(_cap, data) => {
1609 for bounded_data_file in data {
1610 let entry = batch_descriptions
1611 .entry(bounded_data_file.batch_desc().clone())
1612 .or_default();
1613 entry.data_files.push(bounded_data_file);
1614 }
1615 }
1616 Event::Progress(frontier) => {
1617 input_frontier = frontier;
1618 }
1619 }
1620 }
1621
1622 let mut done_batches: Vec<_> = batch_descriptions
1628 .keys()
1629 .filter(|(lower, _upper)| {
1630 PartialOrder::less_than(lower, &input_frontier)
1631 })
1632 .cloned()
1633 .collect();
1634
1635 done_batches.sort_by(|a, b| {
1637 if PartialOrder::less_than(a, b) {
1638 Ordering::Less
1639 } else if PartialOrder::less_than(b, a) {
1640 Ordering::Greater
1641 } else {
1642 Ordering::Equal
1643 }
1644 });
1645
1646 for batch in done_batches {
1647 let file_set = batch_descriptions.remove(&batch).unwrap();
1648
1649 let mut data_files = vec![];
1650 let mut delete_files = vec![];
1651 let mut total_messages: u64 = 0;
1653 let mut total_bytes: u64 = 0;
1654 for file in file_set.data_files {
1655 total_messages += file.data_file().record_count();
1656 total_bytes += file.data_file().file_size_in_bytes();
1657 match file.data_file().content_type() {
1658 iceberg::spec::DataContentType::Data => {
1659 data_files.push(file.into_data_file());
1660 }
1661 iceberg::spec::DataContentType::PositionDeletes |
1662 iceberg::spec::DataContentType::EqualityDeletes => {
1663 delete_files.push(file.into_data_file());
1664 }
1665 }
1666 }
1667
1668 debug!(
1669 ?sink_id,
1670 %name_for_logging,
1671 lower = %batch.0.pretty(),
1672 upper = %batch.1.pretty(),
1673 data_files = data_files.len(),
1674 delete_files = delete_files.len(),
1675 total_messages,
1676 total_bytes,
1677 "iceberg commit applying batch"
1678 );
1679
1680 let instant = Instant::now();
1681
1682 let frontier = batch.1.clone();
1683 let tx = Transaction::new(&table);
1684
1685 let frontier_json = serde_json::to_string(&frontier.elements())
1686 .context("Failed to serialize frontier to JSON")?;
1687
1688 let mut action = tx.row_delta().set_snapshot_properties(
1690 vec![
1691 ("mz-sink-id".to_string(), sink_id.to_string()),
1692 ("mz-frontier".to_string(), frontier_json),
1693 ("mz-sink-version".to_string(), sink_version.to_string()),
1694 ].into_iter().collect()
1695 ).with_check_duplicate(false);
1696
1697 if !data_files.is_empty() || !delete_files.is_empty() {
1698 action = action.add_data_files(data_files).add_delete_files(delete_files);
1699 }
1700
1701 let tx = action.apply(tx).context(
1702 "Failed to apply data file addition to iceberg table transaction",
1703 )?;
1704
1705 let commit_result = Retry::default().max_tries(5).retry_async(|_| async {
1706 let new_table = tx.clone().commit(catalog.as_ref()).await;
1707 match new_table {
1708 Err(e) if matches!(e.kind(), ErrorKind::CatalogCommitConflicts) => {
1709 metrics.commit_conflicts.inc();
1710 let table = reload_table(
1711 catalog.as_ref(),
1712 connection.namespace.clone(),
1713 connection.table.clone(),
1714 table.clone(),
1715 ).await;
1716 let table = match table {
1717 Ok(table) => table,
1718 Err(e) => return RetryResult::RetryableErr(anyhow!(e)),
1719 };
1720
1721 let mut snapshots: Vec<_> =
1722 table.metadata().snapshots().cloned().collect();
1723 let last = retrieve_upper_from_snapshots(&mut snapshots);
1724 let last = match last {
1725 Ok(val) => val,
1726 Err(e) => return RetryResult::RetryableErr(anyhow!(e)),
1727 };
1728
1729 if let Some((last_frontier, _last_version)) = last {
1731 if PartialOrder::less_equal(&frontier, &last_frontier) {
1732 return RetryResult::FatalErr(anyhow!(
1733 "Iceberg table '{}' has been modified by another writer. Current frontier: {:?}, last frontier: {:?}.",
1734 connection.table,
1735 frontier,
1736 last_frontier,
1737 ));
1738 }
1739 }
1740
1741 RetryResult::Ok(table)
1742 }
1743 Err(e) => {
1744 metrics.commit_failures.inc();
1745 RetryResult::RetryableErr(anyhow!(e))
1746 },
1747 Ok(table) => RetryResult::Ok(table)
1748 }
1749 })
1750 .await
1751 .with_context(|| {
1752 format!(
1753 "failed to commit batch to Iceberg table '{}.{}'",
1754 connection.namespace, connection.table
1755 )
1756 });
1757 let duration = instant.elapsed();
1758 metrics
1759 .commit_duration_seconds
1760 .observe(duration.as_secs_f64());
1761 table = commit_result?;
1762
1763 debug!(
1764 ?sink_id,
1765 %name_for_logging,
1766 lower = %batch.0.pretty(),
1767 upper = %batch.1.pretty(),
1768 total_messages,
1769 total_bytes,
1770 ?duration,
1771 "iceberg commit applied batch"
1772 );
1773
1774 metrics.snapshots_committed.inc();
1775 statistics.inc_messages_committed_by(total_messages);
1776 statistics.inc_bytes_committed_by(total_bytes);
1777
1778 let mut expect_upper = write_handle.shared_upper();
1779 loop {
1780 if PartialOrder::less_equal(&frontier, &expect_upper) {
1781 break;
1783 }
1784
1785 const EMPTY: &[((SourceData, ()), Timestamp, StorageDiff)] = &[];
1786 match write_handle
1787 .compare_and_append(EMPTY, expect_upper, frontier.clone())
1788 .await
1789 .expect("valid usage")
1790 {
1791 Ok(()) => break,
1792 Err(mismatch) => {
1793 expect_upper = mismatch.current;
1794 }
1795 }
1796 }
1797 write_frontier.borrow_mut().clone_from(&frontier);
1798
1799 }
1800 }
1801
1802
1803 Ok(())
1804 })
1805 });
1806
1807 let statuses = errors.map(|error| HealthStatusMessage {
1808 id: None,
1809 update: HealthStatusUpdate::halting(format!("{}", error.display_with_causes()), None),
1810 namespace: StatusNamespace::Iceberg,
1811 });
1812
1813 (statuses, button.press_on_drop())
1814}
1815
1816impl<G: Scope<Timestamp = Timestamp>> SinkRender<G> for IcebergSinkConnection {
1817 fn get_key_indices(&self) -> Option<&[usize]> {
1818 self.key_desc_and_indices
1819 .as_ref()
1820 .map(|(_, indices)| indices.as_slice())
1821 }
1822
1823 fn get_relation_key_indices(&self) -> Option<&[usize]> {
1824 self.relation_key_indices.as_deref()
1825 }
1826
1827 fn render_sink(
1828 &self,
1829 storage_state: &mut StorageState,
1830 sink: &StorageSinkDesc<CollectionMetadata, Timestamp>,
1831 sink_id: GlobalId,
1832 input: VecCollection<G, (Option<Row>, DiffPair<Row>), Diff>,
1833 _err_collection: VecCollection<G, DataflowError, Diff>,
1834 ) -> (StreamVec<G, HealthStatusMessage>, Vec<PressOnDropButton>) {
1835 let mut scope = input.scope();
1836
1837 let write_handle = {
1838 let persist = Arc::clone(&storage_state.persist_clients);
1839 let shard_meta = sink.to_storage_metadata.clone();
1840 async move {
1841 let client = persist.open(shard_meta.persist_location).await?;
1842 let handle = client
1843 .open_writer(
1844 shard_meta.data_shard,
1845 Arc::new(shard_meta.relation_desc),
1846 Arc::new(UnitSchema),
1847 Diagnostics::from_purpose("sink handle"),
1848 )
1849 .await?;
1850 Ok(handle)
1851 }
1852 };
1853
1854 let write_frontier = Rc::new(RefCell::new(Antichain::from_elem(Timestamp::minimum())));
1855 storage_state
1856 .sink_write_frontiers
1857 .insert(sink_id, Rc::clone(&write_frontier));
1858
1859 let (arrow_schema_with_ids, iceberg_schema) =
1860 match relation_desc_to_iceberg_schema(&sink.from_desc) {
1861 Ok(schemas) => schemas,
1862 Err(err) => {
1863 let error_stream = std::iter::once(HealthStatusMessage {
1864 id: None,
1865 update: HealthStatusUpdate::halting(
1866 format!("{}", err.display_with_causes()),
1867 None,
1868 ),
1869 namespace: StatusNamespace::Iceberg,
1870 })
1871 .to_stream(&mut scope);
1872 return (error_stream, vec![]);
1873 }
1874 };
1875
1876 let metrics = Arc::new(
1877 storage_state
1878 .metrics
1879 .get_iceberg_sink_metrics(sink_id, scope.index()),
1880 );
1881
1882 let statistics = storage_state
1883 .aggregated_statistics
1884 .get_sink(&sink_id)
1885 .expect("statistics initialized")
1886 .clone();
1887
1888 let connection_for_minter = self.clone();
1889 let (minted_input, batch_descriptions, table_ready, mint_status, mint_button) =
1890 mint_batch_descriptions(
1891 format!("{sink_id}-iceberg-mint"),
1892 sink_id,
1893 input,
1894 sink,
1895 connection_for_minter,
1896 storage_state.storage_configuration.clone(),
1897 Arc::clone(&iceberg_schema),
1898 );
1899
1900 let connection_for_writer = self.clone();
1901 let (datafiles, write_status, write_button) = write_data_files(
1902 format!("{sink_id}-write-data-files"),
1903 minted_input,
1904 batch_descriptions.clone(),
1905 table_ready.clone(),
1906 sink.as_of.clone(),
1907 connection_for_writer,
1908 storage_state.storage_configuration.clone(),
1909 Arc::new(arrow_schema_with_ids.clone()),
1910 Arc::clone(&metrics),
1911 statistics.clone(),
1912 );
1913
1914 let connection_for_committer = self.clone();
1915 let (commit_status, commit_button) = commit_to_iceberg(
1916 format!("{sink_id}-commit-to-iceberg"),
1917 sink_id,
1918 sink.version,
1919 datafiles,
1920 batch_descriptions,
1921 table_ready,
1922 Rc::clone(&write_frontier),
1923 connection_for_committer,
1924 storage_state.storage_configuration.clone(),
1925 write_handle,
1926 Arc::clone(&metrics),
1927 statistics,
1928 );
1929
1930 let running_status = Some(HealthStatusMessage {
1931 id: None,
1932 update: HealthStatusUpdate::running(),
1933 namespace: StatusNamespace::Iceberg,
1934 })
1935 .to_stream(&mut scope);
1936
1937 let statuses =
1938 scope.concatenate([running_status, mint_status, write_status, commit_status]);
1939
1940 (statuses, vec![mint_button, write_button, commit_button])
1941 }
1942}