1use std::cmp::Ordering;
75use std::collections::{BTreeMap, VecDeque};
76use std::{cell::RefCell, rc::Rc, sync::Arc};
77
78use anyhow::{Context, anyhow};
79use arrow::array::{ArrayRef, Int32Array, RecordBatch};
80use arrow::datatypes::{DataType, Field, Schema as ArrowSchema, SchemaRef as ArrowSchemaRef};
81use differential_dataflow::lattice::Lattice;
82use differential_dataflow::{AsCollection, Hashable, VecCollection};
83use futures::StreamExt;
84use iceberg::ErrorKind;
85use iceberg::arrow::{arrow_schema_to_schema, schema_to_arrow_schema};
86use iceberg::spec::{
87 DataFile, FormatVersion, Snapshot, StructType, read_data_files_from_avro,
88 write_data_files_to_avro,
89};
90use iceberg::spec::{Schema, SchemaRef};
91use iceberg::table::Table;
92use iceberg::transaction::{ApplyTransactionAction, Transaction};
93use iceberg::writer::base_writer::data_file_writer::DataFileWriter;
94use iceberg::writer::base_writer::data_file_writer::DataFileWriterBuilder;
95use iceberg::writer::base_writer::equality_delete_writer::{
96 EqualityDeleteFileWriter, EqualityDeleteFileWriterBuilder, EqualityDeleteWriterConfig,
97};
98use iceberg::writer::base_writer::position_delete_writer::PositionDeleteFileWriter;
99use iceberg::writer::base_writer::position_delete_writer::{
100 PositionDeleteFileWriterBuilder, PositionDeleteWriterConfig,
101};
102use iceberg::writer::combined_writer::delta_writer::{DeltaWriter, DeltaWriterBuilder};
103use iceberg::writer::file_writer::ParquetWriterBuilder;
104use iceberg::writer::file_writer::location_generator::{
105 DefaultFileNameGenerator, DefaultLocationGenerator,
106};
107use iceberg::writer::file_writer::rolling_writer::RollingFileWriterBuilder;
108use iceberg::writer::{IcebergWriter, IcebergWriterBuilder};
109use iceberg::{Catalog, NamespaceIdent, TableCreation, TableIdent};
110use itertools::Itertools;
111use mz_arrow_util::builder::{ARROW_EXTENSION_NAME_KEY, ArrowBuilder};
112use mz_interchange::avro::DiffPair;
113use mz_ore::cast::CastFrom;
114use mz_ore::error::ErrorExt;
115use mz_ore::future::InTask;
116use mz_ore::result::ResultExt;
117use mz_ore::retry::{Retry, RetryResult};
118use mz_persist_client::Diagnostics;
119use mz_persist_client::write::WriteHandle;
120use mz_persist_types::codec_impls::UnitSchema;
121use mz_repr::{Diff, GlobalId, Row, Timestamp};
122use mz_storage_types::StorageDiff;
123use mz_storage_types::configuration::StorageConfiguration;
124use mz_storage_types::controller::CollectionMetadata;
125use mz_storage_types::errors::DataflowError;
126use mz_storage_types::sinks::{IcebergSinkConnection, StorageSinkDesc};
127use mz_storage_types::sources::SourceData;
128use mz_timely_util::antichain::AntichainExt;
129use mz_timely_util::builder_async::{Event, OperatorBuilder, PressOnDropButton};
130use parquet::arrow::PARQUET_FIELD_ID_META_KEY;
131use parquet::file::properties::WriterProperties;
132use serde::{Deserialize, Serialize};
133use timely::PartialOrder;
134use timely::container::CapacityContainerBuilder;
135use timely::dataflow::channels::pact::{Exchange, Pipeline};
136use timely::dataflow::operators::{Broadcast, CapabilitySet, Concatenate, Map, ToStream};
137use timely::dataflow::{Scope, Stream};
138use timely::progress::{Antichain, Timestamp as _};
139
140use crate::healthcheck::{HealthStatusMessage, HealthStatusUpdate, StatusNamespace};
141use crate::metrics::sink::iceberg::IcebergSinkMetrics;
142use crate::render::sinks::SinkRender;
143use crate::statistics::SinkStatistics;
144use crate::storage_state::StorageState;
145
146const DEFAULT_ARRAY_BUILDER_ITEM_CAPACITY: usize = 1024;
149const DEFAULT_ARRAY_BUILDER_DATA_CAPACITY: usize = 1024;
153
154const PARQUET_FILE_PREFIX: &str = "mz_data";
156const INITIAL_DESCRIPTIONS_TO_MINT: u64 = 3;
159
160type DeltaWriterType = DeltaWriter<
161 DataFileWriter<ParquetWriterBuilder, DefaultLocationGenerator, DefaultFileNameGenerator>,
162 PositionDeleteFileWriter<
163 ParquetWriterBuilder,
164 DefaultLocationGenerator,
165 DefaultFileNameGenerator,
166 >,
167 EqualityDeleteFileWriter<
168 ParquetWriterBuilder,
169 DefaultLocationGenerator,
170 DefaultFileNameGenerator,
171 >,
172>;
173
174fn add_field_ids_to_arrow_schema(schema: ArrowSchema) -> ArrowSchema {
179 let mut next_field_id = 1i32;
180 let fields: Vec<Field> = schema
181 .fields()
182 .iter()
183 .map(|field| add_field_ids_recursive(field, &mut next_field_id))
184 .collect();
185 ArrowSchema::new(fields).with_metadata(schema.metadata().clone())
186}
187
188fn add_field_ids_recursive(field: &Field, next_id: &mut i32) -> Field {
190 let current_id = *next_id;
191 *next_id += 1;
192
193 let mut metadata = field.metadata().clone();
194 metadata.insert(
195 PARQUET_FIELD_ID_META_KEY.to_string(),
196 current_id.to_string(),
197 );
198
199 let new_data_type = add_field_ids_to_datatype(field.data_type(), next_id);
200
201 Field::new(field.name(), new_data_type, field.is_nullable()).with_metadata(metadata)
202}
203
204fn add_field_ids_to_datatype(data_type: &DataType, next_id: &mut i32) -> DataType {
206 match data_type {
207 DataType::Struct(fields) => {
208 let new_fields: Vec<Field> = fields
209 .iter()
210 .map(|f| add_field_ids_recursive(f, next_id))
211 .collect();
212 DataType::Struct(new_fields.into())
213 }
214 DataType::List(element_field) => {
215 let new_element = add_field_ids_recursive(element_field, next_id);
216 DataType::List(Arc::new(new_element))
217 }
218 DataType::LargeList(element_field) => {
219 let new_element = add_field_ids_recursive(element_field, next_id);
220 DataType::LargeList(Arc::new(new_element))
221 }
222 DataType::Map(entries_field, sorted) => {
223 let new_entries = add_field_ids_recursive(entries_field, next_id);
224 DataType::Map(Arc::new(new_entries), *sorted)
225 }
226 _ => data_type.clone(),
227 }
228}
229
230fn merge_materialize_metadata_into_iceberg_schema(
235 materialize_arrow_schema: &ArrowSchema,
236 iceberg_schema: &Schema,
237) -> anyhow::Result<ArrowSchema> {
238 let iceberg_arrow_schema = schema_to_arrow_schema(iceberg_schema)
240 .context("Failed to convert Iceberg schema to Arrow schema")?;
241
242 let fields: Vec<Field> = iceberg_arrow_schema
244 .fields()
245 .iter()
246 .map(|iceberg_field| {
247 let mz_field = materialize_arrow_schema
249 .field_with_name(iceberg_field.name())
250 .with_context(|| {
251 format!(
252 "Field '{}' not found in Materialize schema",
253 iceberg_field.name()
254 )
255 })?;
256
257 merge_field_metadata_recursive(iceberg_field, Some(mz_field))
258 })
259 .collect::<anyhow::Result<Vec<_>>>()?;
260
261 Ok(ArrowSchema::new(fields).with_metadata(iceberg_arrow_schema.metadata().clone()))
262}
263
264fn merge_field_metadata_recursive(
266 iceberg_field: &Field,
267 mz_field: Option<&Field>,
268) -> anyhow::Result<Field> {
269 let mut metadata = iceberg_field.metadata().clone();
271
272 if let Some(mz_f) = mz_field {
274 if let Some(extension_name) = mz_f.metadata().get(ARROW_EXTENSION_NAME_KEY) {
275 metadata.insert(ARROW_EXTENSION_NAME_KEY.to_string(), extension_name.clone());
276 }
277 }
278
279 let new_data_type = match iceberg_field.data_type() {
281 DataType::Struct(iceberg_fields) => {
282 let mz_struct_fields = match mz_field {
283 Some(f) => match f.data_type() {
284 DataType::Struct(fields) => Some(fields),
285 other => anyhow::bail!(
286 "Type mismatch for field '{}': Iceberg schema has Struct, but Materialize schema has {:?}",
287 iceberg_field.name(),
288 other
289 ),
290 },
291 None => None,
292 };
293
294 let new_fields: Vec<Field> = iceberg_fields
295 .iter()
296 .map(|iceberg_inner| {
297 let mz_inner = mz_struct_fields.and_then(|fields| {
298 fields.iter().find(|f| f.name() == iceberg_inner.name())
299 });
300 merge_field_metadata_recursive(iceberg_inner, mz_inner.map(|f| f.as_ref()))
301 })
302 .collect::<anyhow::Result<Vec<_>>>()?;
303
304 DataType::Struct(new_fields.into())
305 }
306 DataType::List(iceberg_element) => {
307 let mz_element = match mz_field {
308 Some(f) => match f.data_type() {
309 DataType::List(element) => Some(element.as_ref()),
310 other => anyhow::bail!(
311 "Type mismatch for field '{}': Iceberg schema has List, but Materialize schema has {:?}",
312 iceberg_field.name(),
313 other
314 ),
315 },
316 None => None,
317 };
318 let new_element = merge_field_metadata_recursive(iceberg_element, mz_element)?;
319 DataType::List(Arc::new(new_element))
320 }
321 DataType::LargeList(iceberg_element) => {
322 let mz_element = match mz_field {
323 Some(f) => match f.data_type() {
324 DataType::LargeList(element) => Some(element.as_ref()),
325 other => anyhow::bail!(
326 "Type mismatch for field '{}': Iceberg schema has LargeList, but Materialize schema has {:?}",
327 iceberg_field.name(),
328 other
329 ),
330 },
331 None => None,
332 };
333 let new_element = merge_field_metadata_recursive(iceberg_element, mz_element)?;
334 DataType::LargeList(Arc::new(new_element))
335 }
336 DataType::Map(iceberg_entries, sorted) => {
337 let mz_entries = match mz_field {
338 Some(f) => match f.data_type() {
339 DataType::Map(entries, _) => Some(entries.as_ref()),
340 other => anyhow::bail!(
341 "Type mismatch for field '{}': Iceberg schema has Map, but Materialize schema has {:?}",
342 iceberg_field.name(),
343 other
344 ),
345 },
346 None => None,
347 };
348 let new_entries = merge_field_metadata_recursive(iceberg_entries, mz_entries)?;
349 DataType::Map(Arc::new(new_entries), *sorted)
350 }
351 other => other.clone(),
352 };
353
354 Ok(Field::new(
355 iceberg_field.name(),
356 new_data_type,
357 iceberg_field.is_nullable(),
358 )
359 .with_metadata(metadata))
360}
361
362async fn reload_table(
363 catalog: &dyn Catalog,
364 namespace: String,
365 table_name: String,
366 current_table: Table,
367) -> anyhow::Result<Table> {
368 let namespace_ident = NamespaceIdent::new(namespace.clone());
369 let table_ident = TableIdent::new(namespace_ident, table_name.clone());
370 let current_schema = current_table.metadata().current_schema_id();
371 let current_partition_spec = current_table.metadata().default_partition_spec_id();
372
373 match catalog.load_table(&table_ident).await {
374 Ok(table) => {
375 let reloaded_schema = table.metadata().current_schema_id();
376 let reloaded_partition_spec = table.metadata().default_partition_spec_id();
377 if reloaded_schema != current_schema {
378 return Err(anyhow::anyhow!(
379 "Iceberg table '{}' schema changed during operation but schema evolution isn't supported, expected schema ID {}, got {}",
380 table_name,
381 current_schema,
382 reloaded_schema
383 ));
384 }
385
386 if reloaded_partition_spec != current_partition_spec {
387 return Err(anyhow::anyhow!(
388 "Iceberg table '{}' partition spec changed during operation but partition spec evolution isn't supported, expected partition spec ID {}, got {}",
389 table_name,
390 current_partition_spec,
391 reloaded_partition_spec
392 ));
393 }
394
395 Ok(table)
396 }
397 Err(err) => Err(err).context("Failed to reload Iceberg table"),
398 }
399}
400
401async fn load_or_create_table(
403 catalog: &dyn Catalog,
404 namespace: String,
405 table_name: String,
406 schema: &Schema,
407) -> anyhow::Result<iceberg::table::Table> {
408 let namespace_ident = NamespaceIdent::new(namespace.clone());
409 let table_ident = TableIdent::new(namespace_ident.clone(), table_name.clone());
410
411 match catalog.load_table(&table_ident).await {
413 Ok(table) => {
414 Ok(table)
417 }
418 Err(err) => {
419 if matches!(err.kind(), ErrorKind::TableNotFound { .. })
420 || err
421 .message()
422 .contains("Tried to load a table that does not exist")
423 {
424 let table_creation = TableCreation::builder()
428 .name(table_name.clone())
429 .schema(schema.clone())
430 .build();
434
435 catalog
436 .create_table(&namespace_ident, table_creation)
437 .await
438 .with_context(|| {
439 format!(
440 "Failed to create Iceberg table '{}' in namespace '{}'",
441 table_name, namespace
442 )
443 })
444 } else {
445 Err(err).context("Failed to load Iceberg table")
447 }
448 }
449 }
450}
451
452fn retrieve_upper_from_snapshots(
457 snapshots: &mut [Arc<Snapshot>],
458) -> anyhow::Result<Option<(Antichain<Timestamp>, u64)>> {
459 snapshots.sort_by(|a, b| Ord::cmp(&b.sequence_number(), &a.sequence_number()));
460
461 for snapshot in snapshots {
462 let props = &snapshot.summary().additional_properties;
463 if let (Some(frontier_json), Some(sink_version_str)) =
464 (props.get("mz-frontier"), props.get("mz-sink-version"))
465 {
466 let frontier: Vec<Timestamp> = serde_json::from_str(frontier_json)
467 .context("Failed to deserialize frontier from snapshot properties")?;
468 let frontier = Antichain::from_iter(frontier);
469
470 let sink_version = sink_version_str
471 .parse::<u64>()
472 .context("Failed to parse mz-sink-version from snapshot properties")?;
473
474 return Ok(Some((frontier, sink_version)));
475 }
476 if snapshot.summary().operation.as_str() != "replace" {
477 anyhow::bail!(
482 "Iceberg table is in an inconsistent state: snapshot {} has operation '{}' but is missing 'mz-frontier' property. Schema or partition spec evolution is not supported.",
483 snapshot.snapshot_id(),
484 snapshot.summary().operation.as_str(),
485 );
486 }
487 }
488
489 Ok(None)
490}
491
492fn relation_desc_to_iceberg_schema(
496 desc: &mz_repr::RelationDesc,
497) -> anyhow::Result<(ArrowSchema, SchemaRef)> {
498 let arrow_schema = mz_arrow_util::builder::desc_to_schema(desc)
499 .context("Failed to convert RelationDesc to Arrow schema")?;
500
501 let arrow_schema_with_ids = add_field_ids_to_arrow_schema(arrow_schema);
502
503 let iceberg_schema = arrow_schema_to_schema(&arrow_schema_with_ids)
504 .context("Failed to convert Arrow schema to Iceberg schema")?;
505
506 Ok((arrow_schema_with_ids, Arc::new(iceberg_schema)))
507}
508
509fn build_schema_with_op_column(schema: &ArrowSchema) -> ArrowSchema {
511 let mut fields: Vec<Arc<Field>> = schema.fields().iter().cloned().collect();
512 fields.push(Arc::new(Field::new("__op", DataType::Int32, false)));
513 ArrowSchema::new(fields)
514}
515
516fn row_to_recordbatch(
520 row: DiffPair<Row>,
521 schema: ArrowSchemaRef,
522 schema_with_op: ArrowSchemaRef,
523) -> anyhow::Result<RecordBatch> {
524 let mut builder = ArrowBuilder::new_with_schema(
525 Arc::clone(&schema),
526 DEFAULT_ARRAY_BUILDER_ITEM_CAPACITY,
527 DEFAULT_ARRAY_BUILDER_DATA_CAPACITY,
528 )
529 .context("Failed to create builder")?;
530
531 let mut op_values = Vec::new();
532
533 if let Some(before) = row.before {
534 builder
535 .add_row(&before)
536 .context("Failed to add delete row to builder")?;
537 op_values.push(-1i32); }
539 if let Some(after) = row.after {
540 builder
541 .add_row(&after)
542 .context("Failed to add insert row to builder")?;
543 op_values.push(1i32); }
545
546 let batch = builder
547 .to_record_batch()
548 .context("Failed to create record batch")?;
549
550 let mut columns: Vec<ArrayRef> = batch.columns().to_vec();
551 let op_column = Arc::new(Int32Array::from(op_values));
552 columns.push(op_column);
553
554 let batch_with_op = RecordBatch::try_new(schema_with_op, columns)
555 .context("Failed to create batch with op column")?;
556 Ok(batch_with_op)
557}
558
559fn mint_batch_descriptions<G, D>(
564 name: String,
565 sink_id: GlobalId,
566 input: &VecCollection<G, D, Diff>,
567 sink: &StorageSinkDesc<CollectionMetadata, Timestamp>,
568 connection: IcebergSinkConnection,
569 storage_configuration: StorageConfiguration,
570 initial_schema: SchemaRef,
571) -> (
572 VecCollection<G, D, Diff>,
573 Stream<G, (Antichain<Timestamp>, Antichain<Timestamp>)>,
574 Stream<G, HealthStatusMessage>,
575 PressOnDropButton,
576)
577where
578 G: Scope<Timestamp = Timestamp>,
579 D: Clone + 'static,
580{
581 let scope = input.scope();
582 let name_for_error = name.clone();
583 let mut builder = OperatorBuilder::new(name, scope.clone());
584 let sink_version = sink.version;
585
586 let hashed_id = sink_id.hashed();
587 let is_active_worker = usize::cast_from(hashed_id) % scope.peers() == scope.index();
588 let (output, output_stream) = builder.new_output();
589 let (batch_desc_output, batch_desc_stream) =
590 builder.new_output::<CapacityContainerBuilder<_>>();
591 let mut input =
592 builder.new_input_for_many(&input.inner, Pipeline, [&output, &batch_desc_output]);
593
594 let as_of = sink.as_of.clone();
595 let commit_interval = sink
596 .commit_interval
597 .expect("the planner should have enforced this")
598 .clone();
599
600 let (button, errors): (_, Stream<G, Rc<anyhow::Error>>) = builder.build_fallible(move |caps| {
601 Box::pin(async move {
602 let [data_capset, capset]: &mut [_; 2] = caps.try_into().unwrap();
603 *data_capset = CapabilitySet::new();
604
605 if !is_active_worker {
606 *capset = CapabilitySet::new();
607 *data_capset = CapabilitySet::new();
608 while let Some(event) = input.next().await {
609 match event {
610 Event::Data([output_cap, _], mut data) => {
611 output.give_container(&output_cap, &mut data);
612 }
613 Event::Progress(_) => {}
614 }
615 }
616 return Ok(());
617 }
618
619 let catalog = connection
620 .catalog_connection
621 .connect(&storage_configuration, InTask::Yes)
622 .await
623 .context("Failed to connect to iceberg catalog")?;
624
625 let table = load_or_create_table(
626 catalog.as_ref(),
627 connection.namespace.clone(),
628 connection.table.clone(),
629 initial_schema.as_ref(),
630 )
631 .await?;
632
633 let mut snapshots: Vec<_> = table.metadata().snapshots().cloned().collect();
634 let resume = retrieve_upper_from_snapshots(&mut snapshots)?;
635 let (resume_upper, resume_version) = match resume {
636 Some((f, v)) => (f, v),
637 None => (Antichain::from_elem(Timestamp::minimum()), 0),
638 };
639
640 let overcompacted =
642 *resume_upper != [Timestamp::minimum()] &&
644 PartialOrder::less_than(&resume_upper, &as_of);
646
647 if overcompacted {
648 let err = format!(
649 "{name_for_error}: input compacted past resume upper: as_of {}, resume_upper: {}",
650 as_of.pretty(),
651 resume_upper.pretty()
652 );
653 return Err(anyhow::anyhow!("{err}"));
657 };
658
659 if resume_version > sink_version {
660 anyhow::bail!("Fenced off by newer sink version: resume_version {}, sink_version {}", resume_version, sink_version);
661 }
662
663 let mut initialized = false;
664 let mut observed_frontier;
665 let mut minted_batches = VecDeque::new();
670 loop {
671 if let Some(event) = input.next().await {
672 match event {
673 Event::Data([output_cap, _], mut data) => {
674 output.give_container(&output_cap, &mut data);
675 continue;
676 }
677 Event::Progress(frontier) => {
678 observed_frontier = frontier;
679 }
680 }
681 } else {
682 return Ok(());
683 }
684
685 if !initialized {
686 if observed_frontier.is_empty() {
689 return Ok(());
691 }
692 if PartialOrder::less_equal(&observed_frontier, &resume_upper) || PartialOrder::less_equal(&observed_frontier, &as_of) {
693 continue;
694 }
695
696 let mut batch_descriptions = vec![];
697 let mut current_upper = observed_frontier.clone();
698 let current_upper_ts = current_upper.as_option().expect("frontier not empty").clone();
699
700 if PartialOrder::less_than(&resume_upper, ¤t_upper) {
702 let batch_description = (resume_upper.clone(), current_upper.clone());
703 batch_descriptions.push(batch_description);
704 }
705
706 for i in 1..INITIAL_DESCRIPTIONS_TO_MINT + 1 {
708 let duration_millis = commit_interval.as_millis()
709 .checked_mul(u128::from(i))
710 .expect("commit interval multiplication overflow");
711 let duration_ts = Timestamp::new(u64::try_from(duration_millis)
712 .expect("commit interval too large for u64"));
713 let desired_batch_upper = Antichain::from_elem(current_upper_ts.step_forward_by(&duration_ts));
714
715 let batch_description = (current_upper.clone(), desired_batch_upper);
716 current_upper = batch_description.1.clone();
717 batch_descriptions.push(batch_description);
718 }
719
720 minted_batches.extend(batch_descriptions.clone());
721
722 for desc in batch_descriptions {
723 batch_desc_output.give(&capset[0], desc);
724 }
725
726 capset.downgrade(current_upper);
727
728 initialized = true;
729 } else {
730 while let Some(oldest_desc) = minted_batches.front() {
733 let oldest_upper = &oldest_desc.1;
734 if !PartialOrder::less_equal(oldest_upper, &observed_frontier) {
735 break;
736 }
737
738 let newest_upper = minted_batches.back().unwrap().1.clone();
739 let new_lower = newest_upper.clone();
740 let duration_ts = Timestamp::new(commit_interval.as_millis()
741 .try_into()
742 .expect("commit interval too large for u64"));
743 let new_upper = Antichain::from_elem(newest_upper
744 .as_option()
745 .unwrap()
746 .step_forward_by(&duration_ts));
747
748 let new_batch_description = (new_lower.clone(), new_upper.clone());
749 minted_batches.pop_front();
750 minted_batches.push_back(new_batch_description.clone());
751
752 batch_desc_output.give(&capset[0], new_batch_description);
753
754 capset.downgrade(new_upper);
755 }
756 }
757 }
758 })
759 });
760
761 let statuses = errors.map(|error| HealthStatusMessage {
762 id: None,
763 update: HealthStatusUpdate::halting(format!("{}", error.display_with_causes()), None),
764 namespace: StatusNamespace::Iceberg,
765 });
766 (
767 output_stream.as_collection(),
768 batch_desc_stream,
769 statuses,
770 button.press_on_drop(),
771 )
772}
773
774#[derive(Clone, Debug, Serialize, Deserialize)]
775#[serde(try_from = "AvroDataFile", into = "AvroDataFile")]
776struct SerializableDataFile {
777 pub data_file: DataFile,
778 pub schema: Schema,
779}
780
781#[derive(Clone, Debug, Serialize, Deserialize)]
789struct AvroDataFile {
790 pub data_file: Vec<u8>,
791 pub schema: Vec<u8>,
793}
794
795impl From<SerializableDataFile> for AvroDataFile {
796 fn from(value: SerializableDataFile) -> Self {
797 let mut data_file = Vec::new();
798 write_data_files_to_avro(
799 &mut data_file,
800 [value.data_file],
801 &StructType::new(vec![]),
802 FormatVersion::V2,
803 )
804 .expect("serialization into buffer");
805 let schema = serde_json::to_vec(&value.schema).expect("schema serialization");
806 AvroDataFile { data_file, schema }
807 }
808}
809
810impl TryFrom<AvroDataFile> for SerializableDataFile {
811 type Error = String;
812
813 fn try_from(value: AvroDataFile) -> Result<Self, Self::Error> {
814 let schema: Schema = serde_json::from_slice(&value.schema)
815 .map_err(|e| format!("Failed to deserialize schema: {}", e))?;
816 let data_files = read_data_files_from_avro(
817 &mut &*value.data_file,
818 &schema,
819 0,
820 &StructType::new(vec![]),
821 FormatVersion::V2,
822 )
823 .map_err_to_string_with_causes()?;
824 let Some(data_file) = data_files.into_iter().next() else {
825 return Err("No DataFile found in Avro data".into());
826 };
827 Ok(SerializableDataFile { data_file, schema })
828 }
829}
830
831#[derive(Clone, Debug, Serialize, Deserialize)]
833struct BoundedDataFile {
834 pub data_file: SerializableDataFile,
835 pub batch_desc: (Antichain<Timestamp>, Antichain<Timestamp>),
836}
837
838impl BoundedDataFile {
839 pub fn new(
840 file: DataFile,
841 schema: Schema,
842 batch_desc: (Antichain<Timestamp>, Antichain<Timestamp>),
843 ) -> Self {
844 Self {
845 data_file: SerializableDataFile {
846 data_file: file,
847 schema,
848 },
849 batch_desc,
850 }
851 }
852
853 pub fn batch_desc(&self) -> &(Antichain<Timestamp>, Antichain<Timestamp>) {
854 &self.batch_desc
855 }
856
857 pub fn data_file(&self) -> &DataFile {
858 &self.data_file.data_file
859 }
860
861 pub fn into_data_file(self) -> DataFile {
862 self.data_file.data_file
863 }
864}
865
866#[derive(Clone, Debug, Default)]
868struct BoundedDataFileSet {
869 pub data_files: Vec<BoundedDataFile>,
870}
871
872fn write_data_files<G>(
876 name: String,
877 input: VecCollection<G, (Option<Row>, DiffPair<Row>), Diff>,
878 batch_desc_input: &Stream<G, (Antichain<Timestamp>, Antichain<Timestamp>)>,
879 connection: IcebergSinkConnection,
880 storage_configuration: StorageConfiguration,
881 materialize_arrow_schema: Arc<ArrowSchema>,
882 metrics: IcebergSinkMetrics,
883 statistics: SinkStatistics,
884) -> (
885 Stream<G, BoundedDataFile>,
886 Stream<G, HealthStatusMessage>,
887 PressOnDropButton,
888)
889where
890 G: Scope<Timestamp = Timestamp>,
891{
892 let scope = input.scope();
893 let mut builder = OperatorBuilder::new(name, scope.clone());
894
895 let (output, output_stream) = builder.new_output::<CapacityContainerBuilder<_>>();
896
897 let mut batch_desc_input =
898 builder.new_input_for(&batch_desc_input.broadcast(), Pipeline, &output);
899 let mut input = builder.new_disconnected_input(&input.inner, Pipeline);
900
901 let (button, errors) = builder.build_fallible(|caps| {
902 Box::pin(async move {
903 let [capset]: &mut [_; 1] = caps.try_into().unwrap();
904 let catalog = connection
905 .catalog_connection
906 .connect(&storage_configuration, InTask::Yes)
907 .await
908 .context("Failed to connect to iceberg catalog")?;
909
910 let namespace_ident = NamespaceIdent::new(connection.namespace.clone());
911 let table_ident = TableIdent::new(namespace_ident, connection.table.clone());
912 let table = catalog
913 .load_table(&table_ident)
914 .await
915 .context("Failed to load Iceberg table")?;
916
917 let table_metadata = table.metadata().clone();
918 let current_schema = Arc::clone(table_metadata.current_schema());
919
920 let arrow_schema = Arc::new(
924 merge_materialize_metadata_into_iceberg_schema(
925 materialize_arrow_schema.as_ref(),
926 current_schema.as_ref(),
927 )
928 .context("Failed to merge Materialize metadata into Iceberg schema")?,
929 );
930
931 let schema_with_op = Arc::new(build_schema_with_op_column(&arrow_schema));
933
934 let location = table_metadata.location();
938 let corrected_location = match location.rsplit_once("/metadata/") {
939 Some((a, b)) if b.ends_with(".metadata.json") => a,
940 _ => location,
941 };
942
943 let data_location = format!("{}/data", corrected_location);
944 let location_generator = DefaultLocationGenerator::with_data_location(data_location);
945
946 let unique_suffix = format!("-{}", uuid::Uuid::new_v4());
948 let file_name_generator = DefaultFileNameGenerator::new(
949 PARQUET_FILE_PREFIX.to_string(),
950 Some(unique_suffix),
951 iceberg::spec::DataFileFormat::Parquet,
952 );
953
954 let file_io = table.file_io().clone();
955
956 let Some((_, equality_indices)) = connection.key_desc_and_indices else {
957 return Err(anyhow::anyhow!(
958 "Iceberg sink requires key columns for equality deletes"
959 ));
960 };
961
962 let equality_ids: Vec<i32> = equality_indices
963 .iter()
964 .map(|u| i32::try_from(*u).map(|v| v + 1))
965 .collect::<Result<Vec<i32>, _>>()
966 .context("Failed to convert equality index to i32 (index too large)")?;
967
968 let writer_properties = WriterProperties::new();
969
970 let create_delta_writer = || async {
971 let data_parquet_writer = ParquetWriterBuilder::new(
972 writer_properties.clone(),
973 Arc::clone(¤t_schema),
974 )
975 .with_arrow_schema(Arc::clone(&arrow_schema))
976 .context("Arrow schema validation failed")?;
977 let data_rolling_writer = RollingFileWriterBuilder::new_with_default_file_size(
978 data_parquet_writer,
979 Arc::clone(¤t_schema),
980 file_io.clone(),
981 location_generator.clone(),
982 file_name_generator.clone(),
983 );
984 let data_writer_builder = DataFileWriterBuilder::new(data_rolling_writer);
985
986 let pos_arrow_schema = PositionDeleteWriterConfig::arrow_schema();
987 let pos_schema =
988 Arc::new(arrow_schema_to_schema(&pos_arrow_schema).context(
989 "Failed to convert position delete Arrow schema to Iceberg schema",
990 )?);
991 let pos_config = PositionDeleteWriterConfig::new(None, 0, None);
992 let pos_parquet_writer =
993 ParquetWriterBuilder::new(writer_properties.clone(), pos_schema);
994 let pos_rolling_writer = RollingFileWriterBuilder::new_with_default_file_size(
995 pos_parquet_writer,
996 Arc::clone(¤t_schema),
997 file_io.clone(),
998 location_generator.clone(),
999 file_name_generator.clone(),
1000 );
1001 let pos_delete_writer_builder =
1002 PositionDeleteFileWriterBuilder::new(pos_rolling_writer, pos_config);
1003
1004 let eq_config = EqualityDeleteWriterConfig::new(
1005 equality_ids.clone(),
1006 Arc::clone(¤t_schema),
1007 )
1008 .context("Failed to create EqualityDeleteWriterConfig")?;
1009
1010 let eq_schema = Arc::new(
1011 arrow_schema_to_schema(eq_config.projected_arrow_schema_ref()).context(
1012 "Failed to convert equality delete Arrow schema to Iceberg schema",
1013 )?,
1014 );
1015
1016 let eq_parquet_writer =
1017 ParquetWriterBuilder::new(writer_properties.clone(), eq_schema);
1018 let eq_rolling_writer = RollingFileWriterBuilder::new_with_default_file_size(
1019 eq_parquet_writer,
1020 Arc::clone(¤t_schema),
1021 file_io.clone(),
1022 location_generator.clone(),
1023 file_name_generator.clone(),
1024 );
1025 let eq_delete_writer_builder =
1026 EqualityDeleteFileWriterBuilder::new(eq_rolling_writer, eq_config);
1027
1028 DeltaWriterBuilder::new(
1029 data_writer_builder,
1030 pos_delete_writer_builder,
1031 eq_delete_writer_builder,
1032 equality_ids.clone(),
1033 )
1034 .build(None)
1035 .await
1036 .context("Failed to create DeltaWriter")
1037 };
1038
1039 let mut stashed_rows: BTreeMap<Timestamp, Vec<(Option<Row>, DiffPair<Row>)>> =
1042 BTreeMap::new();
1043
1044 #[allow(clippy::disallowed_types)]
1049 let mut in_flight_batches: std::collections::HashMap<
1050 (Antichain<Timestamp>, Antichain<Timestamp>),
1051 DeltaWriterType,
1052 > = std::collections::HashMap::new();
1053
1054 let mut batch_description_frontier = Antichain::from_elem(Timestamp::minimum());
1055 let mut processed_batch_description_frontier =
1056 Antichain::from_elem(Timestamp::minimum());
1057 let mut input_frontier = Antichain::from_elem(Timestamp::minimum());
1058 let mut processed_input_frontier = Antichain::from_elem(Timestamp::minimum());
1059
1060 while !(batch_description_frontier.is_empty() && input_frontier.is_empty()) {
1061 tokio::select! {
1062 _ = batch_desc_input.ready() => {},
1063 _ = input.ready() => {}
1064 }
1065
1066 while let Some(event) = batch_desc_input.next_sync() {
1067 match event {
1068 Event::Data(_cap, data) => {
1069 for batch_desc in data {
1070 let (lower, upper) = &batch_desc;
1071 let mut delta_writer = create_delta_writer().await?;
1072 let row_ts_keys: Vec<_> = stashed_rows.keys().cloned().collect();
1074 for row_ts in row_ts_keys {
1075 let ts = Antichain::from_elem(row_ts.clone());
1076 if PartialOrder::less_equal(lower, &ts)
1077 && PartialOrder::less_than(&ts, upper)
1078 {
1079 if let Some(rows) = stashed_rows.remove(&row_ts) {
1080 for (_row, diff_pair) in rows {
1081 metrics.stashed_rows.dec();
1082 let record_batch = row_to_recordbatch(
1083 diff_pair.clone(),
1084 Arc::clone(&arrow_schema),
1085 Arc::clone(&schema_with_op),
1086 )
1087 .context("failed to convert row to recordbatch")?;
1088 delta_writer.write(record_batch).await.context(
1089 "Failed to write row to DeltaWriter",
1090 )?;
1091 }
1092 }
1093 }
1094 }
1095 let prev =
1096 in_flight_batches.insert(batch_desc.clone(), delta_writer);
1097 if prev.is_some() {
1098 anyhow::bail!(
1099 "Duplicate batch description received for description {:?}",
1100 batch_desc
1101 );
1102 }
1103 }
1104 }
1105 Event::Progress(frontier) => {
1106 batch_description_frontier = frontier;
1107 }
1108 }
1109 }
1110
1111 let ready_events = std::iter::from_fn(|| input.next_sync()).collect_vec();
1112 for event in ready_events {
1113 match event {
1114 Event::Data(_cap, data) => {
1115 for ((row, diff_pair), ts, _diff) in data {
1116 let row_ts = ts.clone();
1117 let ts_antichain = Antichain::from_elem(row_ts.clone());
1118 let mut written = false;
1119 for (batch_desc, delta_writer) in in_flight_batches.iter_mut() {
1121 let (lower, upper) = batch_desc;
1122 if PartialOrder::less_equal(lower, &ts_antichain)
1123 && PartialOrder::less_than(&ts_antichain, upper)
1124 {
1125 let record_batch = row_to_recordbatch(
1126 diff_pair.clone(),
1127 Arc::clone(&arrow_schema),
1128 Arc::clone(&schema_with_op),
1129 )
1130 .context("failed to convert row to recordbatch")?;
1131 delta_writer
1132 .write(record_batch)
1133 .await
1134 .context("Failed to write row to DeltaWriter")?;
1135 written = true;
1136 break;
1137 }
1138 }
1139 if !written {
1140 let entry = stashed_rows.entry(row_ts).or_default();
1142 metrics.stashed_rows.inc();
1143 entry.push((row, diff_pair));
1144 }
1145 }
1146 }
1147 Event::Progress(frontier) => {
1148 input_frontier = frontier;
1149 }
1150 }
1151 }
1152
1153 if PartialOrder::less_equal(
1155 &processed_batch_description_frontier,
1156 &batch_description_frontier,
1157 ) || PartialOrder::less_than(&processed_input_frontier, &input_frontier)
1158 {
1159 let ready_batches: Vec<_> = in_flight_batches
1161 .extract_if(|(lower, upper), _| {
1162 PartialOrder::less_than(lower, &batch_description_frontier)
1163 && PartialOrder::less_than(upper, &input_frontier)
1164 })
1165 .collect();
1166
1167 if !ready_batches.is_empty() {
1168 let mut max_upper = Antichain::from_elem(Timestamp::minimum());
1169 for (desc, mut delta_writer) in ready_batches {
1170 let data_files = delta_writer
1171 .close()
1172 .await
1173 .context("Failed to close DeltaWriter")?;
1174 for data_file in data_files {
1175 match data_file.content_type() {
1176 iceberg::spec::DataContentType::Data => {
1177 metrics.data_files_written.inc();
1178 }
1179 iceberg::spec::DataContentType::PositionDeletes
1180 | iceberg::spec::DataContentType::EqualityDeletes => {
1181 metrics.delete_files_written.inc();
1182 }
1183 }
1184 statistics.inc_messages_staged_by(data_file.record_count());
1185 statistics.inc_bytes_staged_by(data_file.file_size_in_bytes());
1186 let file = BoundedDataFile::new(
1187 data_file,
1188 current_schema.as_ref().clone(),
1189 desc.clone(),
1190 );
1191 output.give(&capset[0], file);
1192 }
1193
1194 max_upper = max_upper.join(&desc.1);
1195 }
1196
1197 capset.downgrade(max_upper);
1198 }
1199 processed_batch_description_frontier.clone_from(&batch_description_frontier);
1200 processed_input_frontier.clone_from(&input_frontier);
1201 }
1202 }
1203 Ok(())
1204 })
1205 });
1206
1207 let statuses = errors.map(|error| HealthStatusMessage {
1208 id: None,
1209 update: HealthStatusUpdate::halting(format!("{}", error.display_with_causes()), None),
1210 namespace: StatusNamespace::Iceberg,
1211 });
1212 (output_stream, statuses, button.press_on_drop())
1213}
1214
1215fn commit_to_iceberg<G>(
1219 name: String,
1220 sink_id: GlobalId,
1221 sink_version: u64,
1222 batch_input: &Stream<G, BoundedDataFile>,
1223 batch_desc_input: &Stream<G, (Antichain<Timestamp>, Antichain<Timestamp>)>,
1224 write_frontier: Rc<RefCell<Antichain<Timestamp>>>,
1225 connection: IcebergSinkConnection,
1226 storage_configuration: StorageConfiguration,
1227 write_handle: impl Future<
1228 Output = anyhow::Result<WriteHandle<SourceData, (), Timestamp, StorageDiff>>,
1229 > + 'static,
1230 metrics: IcebergSinkMetrics,
1231 statistics: SinkStatistics,
1232) -> (Stream<G, HealthStatusMessage>, PressOnDropButton)
1233where
1234 G: Scope<Timestamp = Timestamp>,
1235{
1236 let scope = batch_input.scope();
1237 let mut builder = OperatorBuilder::new(name, scope.clone());
1238
1239 let hashed_id = sink_id.hashed();
1240 let is_active_worker = usize::cast_from(hashed_id) % scope.peers() == scope.index();
1241
1242 let mut input = builder.new_disconnected_input(batch_input, Exchange::new(move |_| hashed_id));
1243 let mut batch_desc_input =
1244 builder.new_disconnected_input(batch_desc_input, Exchange::new(move |_| hashed_id));
1245
1246 let (button, errors) = builder.build_fallible(move |_caps| {
1247 Box::pin(async move {
1248 if !is_active_worker {
1249 write_frontier.borrow_mut().clear();
1250 return Ok(());
1251 }
1252
1253 let catalog = connection
1254 .catalog_connection
1255 .connect(&storage_configuration, InTask::Yes)
1256 .await
1257 .context("Failed to connect to iceberg catalog")?;
1258
1259 let mut write_handle = write_handle.await?;
1260
1261 let namespace_ident = NamespaceIdent::new(connection.namespace.clone());
1262 let table_ident = TableIdent::new(namespace_ident, connection.table.clone());
1263 let mut table = catalog
1264 .load_table(&table_ident)
1265 .await
1266 .context("Failed to load Iceberg table")?;
1267
1268 #[allow(clippy::disallowed_types)]
1269 let mut batch_descriptions: std::collections::HashMap<
1270 (Antichain<Timestamp>, Antichain<Timestamp>),
1271 BoundedDataFileSet,
1272 > = std::collections::HashMap::new();
1273
1274
1275 let mut batch_description_frontier = Antichain::from_elem(Timestamp::minimum());
1276 let mut input_frontier = Antichain::from_elem(Timestamp::minimum());
1277
1278 while !(batch_description_frontier.is_empty() && input_frontier.is_empty()) {
1279 tokio::select! {
1280 _ = batch_desc_input.ready() => {},
1281 _ = input.ready() => {}
1282 }
1283
1284 while let Some(event) = batch_desc_input.next_sync() {
1285 match event {
1286 Event::Data(_cap, data) => {
1287 for batch_desc in data {
1288 let prev = batch_descriptions.insert(batch_desc, BoundedDataFileSet { data_files: vec![] });
1289 if let Some(prev) = prev {
1290 anyhow::bail!(
1291 "Duplicate batch description received in commit operator: {:?}",
1292 prev
1293 );
1294 }
1295 }
1296 }
1297 Event::Progress(frontier) => {
1298 batch_description_frontier = frontier;
1299 }
1300 }
1301 }
1302
1303 let ready_events = std::iter::from_fn(|| input.next_sync()).collect_vec();
1304 for event in ready_events {
1305 match event {
1306 Event::Data(_cap, data) => {
1307 for bounded_data_file in data {
1308 let entry = batch_descriptions.entry(bounded_data_file.batch_desc().clone()).or_default();
1309 entry.data_files.push(bounded_data_file);
1310 }
1311 }
1312 Event::Progress(frontier) => {
1313 input_frontier = frontier;
1314 }
1315 }
1316 }
1317
1318 let mut done_batches: Vec<_> = batch_descriptions
1319 .keys()
1320 .filter(|(lower, _upper)| {
1321 PartialOrder::less_than(lower, &input_frontier)
1322 })
1323 .cloned()
1324 .collect();
1325
1326 done_batches.sort_by(|a, b| {
1328 if PartialOrder::less_than(a, b) {
1329 Ordering::Less
1330 } else if PartialOrder::less_than(b, a) {
1331 Ordering::Greater
1332 } else {
1333 Ordering::Equal
1334 }
1335 });
1336
1337 for batch in done_batches {
1338 let file_set = batch_descriptions.remove(&batch).unwrap();
1339
1340 let mut data_files = vec![];
1341 let mut delete_files = vec![];
1342 let mut total_messages: u64 = 0;
1344 let mut total_bytes: u64 = 0;
1345 for file in file_set.data_files {
1346 total_messages += file.data_file().record_count();
1347 total_bytes += file.data_file().file_size_in_bytes();
1348 match file.data_file().content_type() {
1349 iceberg::spec::DataContentType::Data => {
1350 data_files.push(file.into_data_file());
1351 }
1352 iceberg::spec::DataContentType::PositionDeletes |
1353 iceberg::spec::DataContentType::EqualityDeletes => {
1354 delete_files.push(file.into_data_file());
1355 }
1356 }
1357 }
1358
1359 let frontier = batch.1.clone();
1360 let tx = Transaction::new(&table);
1361
1362 let frontier_json = serde_json::to_string(&frontier.elements())
1363 .context("Failed to serialize frontier to JSON")?;
1364
1365 let mut action = tx.row_delta().set_snapshot_properties(
1367 vec![
1368 ("mz-sink-id".to_string(), sink_id.to_string()),
1369 ("mz-frontier".to_string(), frontier_json),
1370 ("mz-sink-version".to_string(), sink_version.to_string()),
1371 ].into_iter().collect()
1372 );
1373
1374 if !data_files.is_empty() || !delete_files.is_empty() {
1375 action = action.add_data_files(data_files).add_delete_files(delete_files);
1376 }
1377
1378 let tx = action.apply(tx).context(
1379 "Failed to apply data file addition to iceberg table transaction",
1380 )?;
1381
1382 table = Retry::default().max_tries(5).retry_async(|_| async {
1383 let new_table = tx.clone().commit(catalog.as_ref()).await;
1384 match new_table {
1385 Err(e) if matches!(e.kind(), ErrorKind::CatalogCommitConflicts) => {
1386 metrics.commit_conflicts.inc();
1387 let table = reload_table(
1388 catalog.as_ref(),
1389 connection.namespace.clone(),
1390 connection.table.clone(),
1391 table.clone(),
1392 ).await;
1393 let table = match table {
1394 Ok(table) => table,
1395 Err(e) => return RetryResult::RetryableErr(anyhow!(e)),
1396 };
1397
1398 let mut snapshots: Vec<_> = table.metadata().snapshots().cloned().collect();
1399 let last = retrieve_upper_from_snapshots(&mut snapshots);
1400 let last = match last {
1401 Ok(val) => val,
1402 Err(e) => return RetryResult::RetryableErr(anyhow!(e)),
1403 };
1404
1405 if let Some((last_frontier, _last_version)) = last {
1407 if PartialOrder::less_equal(&frontier, &last_frontier) {
1408 return RetryResult::FatalErr(anyhow!(
1409 "Iceberg table '{}' has been modified by another writer. Current frontier: {:?}, last frontier: {:?}.",
1410 connection.table,
1411 frontier,
1412 last_frontier,
1413 ));
1414 }
1415 }
1416
1417 RetryResult::Ok(table)
1418 }
1419 Err(e) => {
1420 metrics.commit_failures.inc();
1421 RetryResult::RetryableErr(anyhow!(e))
1422 },
1423 Ok(table) => RetryResult::Ok(table)
1424 }
1425 }).await.context("failed to commit to iceberg")?;
1426
1427 metrics.snapshots_committed.inc();
1428 statistics.inc_messages_committed_by(total_messages);
1429 statistics.inc_bytes_committed_by(total_bytes);
1430
1431 let mut expect_upper = write_handle.shared_upper();
1432 loop {
1433 if PartialOrder::less_equal(&frontier, &expect_upper) {
1434 break;
1436 }
1437
1438 const EMPTY: &[((SourceData, ()), Timestamp, StorageDiff)] = &[];
1439 match write_handle
1440 .compare_and_append(EMPTY, expect_upper, frontier.clone())
1441 .await
1442 .expect("valid usage")
1443 {
1444 Ok(()) => break,
1445 Err(mismatch) => {
1446 expect_upper = mismatch.current;
1447 }
1448 }
1449 }
1450 write_frontier.borrow_mut().clone_from(&frontier);
1451
1452 }
1453 }
1454
1455
1456 Ok(())
1457 })
1458 });
1459
1460 let statuses = errors.map(|error| HealthStatusMessage {
1461 id: None,
1462 update: HealthStatusUpdate::halting(format!("{}", error.display_with_causes()), None),
1463 namespace: StatusNamespace::Iceberg,
1464 });
1465
1466 (statuses, button.press_on_drop())
1467}
1468
1469impl<G: Scope<Timestamp = Timestamp>> SinkRender<G> for IcebergSinkConnection {
1470 fn get_key_indices(&self) -> Option<&[usize]> {
1471 self.key_desc_and_indices
1472 .as_ref()
1473 .map(|(_, indices)| indices.as_slice())
1474 }
1475
1476 fn get_relation_key_indices(&self) -> Option<&[usize]> {
1477 self.relation_key_indices.as_deref()
1478 }
1479
1480 fn render_sink(
1481 &self,
1482 storage_state: &mut StorageState,
1483 sink: &StorageSinkDesc<CollectionMetadata, Timestamp>,
1484 sink_id: GlobalId,
1485 input: VecCollection<G, (Option<Row>, DiffPair<Row>), Diff>,
1486 _err_collection: VecCollection<G, DataflowError, Diff>,
1487 ) -> (Stream<G, HealthStatusMessage>, Vec<PressOnDropButton>) {
1488 let mut scope = input.scope();
1489
1490 let write_handle = {
1491 let persist = Arc::clone(&storage_state.persist_clients);
1492 let shard_meta = sink.to_storage_metadata.clone();
1493 async move {
1494 let client = persist.open(shard_meta.persist_location).await?;
1495 let handle = client
1496 .open_writer(
1497 shard_meta.data_shard,
1498 Arc::new(shard_meta.relation_desc),
1499 Arc::new(UnitSchema),
1500 Diagnostics::from_purpose("sink handle"),
1501 )
1502 .await?;
1503 Ok(handle)
1504 }
1505 };
1506
1507 let write_frontier = Rc::new(RefCell::new(Antichain::from_elem(Timestamp::minimum())));
1508 storage_state
1509 .sink_write_frontiers
1510 .insert(sink_id, Rc::clone(&write_frontier));
1511
1512 let (arrow_schema_with_ids, iceberg_schema) =
1513 match relation_desc_to_iceberg_schema(&sink.from_desc) {
1514 Ok(schemas) => schemas,
1515 Err(err) => {
1516 let error_stream = std::iter::once(HealthStatusMessage {
1517 id: None,
1518 update: HealthStatusUpdate::halting(
1519 format!("{}", err.display_with_causes()),
1520 None,
1521 ),
1522 namespace: StatusNamespace::Iceberg,
1523 })
1524 .to_stream(&mut scope);
1525 return (error_stream, vec![]);
1526 }
1527 };
1528
1529 let metrics = storage_state
1530 .metrics
1531 .get_iceberg_sink_metrics(sink_id, scope.index());
1532
1533 let statistics = storage_state
1534 .aggregated_statistics
1535 .get_sink(&sink_id)
1536 .expect("statistics initialized")
1537 .clone();
1538
1539 let connection_for_minter = self.clone();
1540 let (minted_input, batch_descriptions, mint_status, mint_button) = mint_batch_descriptions(
1541 format!("{sink_id}-iceberg-mint"),
1542 sink_id,
1543 &input,
1544 sink,
1545 connection_for_minter,
1546 storage_state.storage_configuration.clone(),
1547 Arc::clone(&iceberg_schema),
1548 );
1549
1550 let connection_for_writer = self.clone();
1551 let (datafiles, write_status, write_button) = write_data_files(
1552 format!("{sink_id}-write-data-files"),
1553 minted_input,
1554 &batch_descriptions,
1555 connection_for_writer,
1556 storage_state.storage_configuration.clone(),
1557 Arc::new(arrow_schema_with_ids.clone()),
1558 metrics.clone(),
1559 statistics.clone(),
1560 );
1561
1562 let connection_for_committer = self.clone();
1563 let (commit_status, commit_button) = commit_to_iceberg(
1564 format!("{sink_id}-commit-to-iceberg"),
1565 sink_id,
1566 sink.version,
1567 &datafiles,
1568 &batch_descriptions,
1569 Rc::clone(&write_frontier),
1570 connection_for_committer,
1571 storage_state.storage_configuration.clone(),
1572 write_handle,
1573 metrics.clone(),
1574 statistics,
1575 );
1576
1577 let running_status = Some(HealthStatusMessage {
1578 id: None,
1579 update: HealthStatusUpdate::running(),
1580 namespace: StatusNamespace::Iceberg,
1581 })
1582 .to_stream(&mut scope);
1583
1584 let statuses =
1585 scope.concatenate([running_status, mint_status, write_status, commit_status]);
1586
1587 (statuses, vec![mint_button, write_button, commit_button])
1588 }
1589}