iceberg/transaction/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! This module contains transaction api.
19//!
20//! The transaction API enables changes to be made to an existing table.
21//!
22//! Note that this may also have side effects, such as producing new manifest
23//! files.
24//!
25//! Below is a basic example using the "fast-append" action:
26//!
27//! ```ignore
28//! use iceberg::transaction::{ApplyTransactionAction, Transaction};
29//! use iceberg::Catalog;
30//!
31//! // Create a transaction.
32//! let tx = Transaction::new(my_table);
33//!
34//! // Create a `FastAppendAction` which will not rewrite or append
35//! // to existing metadata. This will create a new manifest.
36//! let action = tx.fast_append().add_data_files(my_data_files);
37//!
38//! // Apply the fast-append action to the given transaction, returning
39//! // the newly updated `Transaction`.
40//! let tx = action.apply(tx).unwrap();
41//!
42//!
43//! // End the transaction by committing to an `iceberg::Catalog`
44//! // implementation. This will cause a table update to occur.
45//! let table = tx
46//!     .commit(&some_catalog_impl)
47//!     .await
48//!     .unwrap();
49//! ```
50
51/// The `ApplyTransactionAction` trait provides an `apply` method
52/// that allows users to apply a transaction action to a `Transaction`.
53mod action;
54
55pub use action::*;
56mod append;
57mod row_delta;
58mod snapshot;
59mod sort_order;
60mod update_location;
61mod update_properties;
62mod update_statistics;
63mod upgrade_format_version;
64
65use std::sync::Arc;
66use std::time::Duration;
67
68use backon::{BackoffBuilder, ExponentialBackoff, ExponentialBuilder, RetryableWithContext};
69
70use crate::error::Result;
71use crate::spec::TableProperties;
72use crate::table::Table;
73use crate::transaction::action::BoxedTransactionAction;
74use crate::transaction::append::FastAppendAction;
75use crate::transaction::row_delta::RowDeltaAction;
76use crate::transaction::sort_order::ReplaceSortOrderAction;
77use crate::transaction::update_location::UpdateLocationAction;
78use crate::transaction::update_properties::UpdatePropertiesAction;
79use crate::transaction::update_statistics::UpdateStatisticsAction;
80use crate::transaction::upgrade_format_version::UpgradeFormatVersionAction;
81use crate::{Catalog, Error, ErrorKind, TableCommit, TableRequirement, TableUpdate};
82
83/// Table transaction.
84#[derive(Clone)]
85pub struct Transaction {
86    table: Table,
87    actions: Vec<BoxedTransactionAction>,
88}
89
90impl Transaction {
91    /// Creates a new transaction.
92    pub fn new(table: &Table) -> Self {
93        Self {
94            table: table.clone(),
95            actions: vec![],
96        }
97    }
98
99    fn update_table_metadata(table: Table, updates: &[TableUpdate]) -> Result<Table> {
100        let mut metadata_builder = table.metadata().clone().into_builder(None);
101        for update in updates {
102            metadata_builder = update.clone().apply(metadata_builder)?;
103        }
104
105        Ok(table.with_metadata(Arc::new(metadata_builder.build()?.metadata)))
106    }
107
108    /// Applies an [`ActionCommit`] to the given [`Table`], returning a new [`Table`] with updated metadata.
109    /// Also appends any derived [`TableUpdate`]s and [`TableRequirement`]s to the provided vectors.
110    fn apply(
111        table: Table,
112        mut action_commit: ActionCommit,
113        existing_updates: &mut Vec<TableUpdate>,
114        existing_requirements: &mut Vec<TableRequirement>,
115    ) -> Result<Table> {
116        let updates = action_commit.take_updates();
117        let requirements = action_commit.take_requirements();
118
119        for requirement in &requirements {
120            requirement.check(Some(table.metadata()))?;
121        }
122
123        let updated_table = Self::update_table_metadata(table, &updates)?;
124
125        existing_updates.extend(updates);
126        existing_requirements.extend(requirements);
127
128        Ok(updated_table)
129    }
130
131    /// Sets table to a new version.
132    pub fn upgrade_table_version(&self) -> UpgradeFormatVersionAction {
133        UpgradeFormatVersionAction::new()
134    }
135
136    /// Update table's property.
137    pub fn update_table_properties(&self) -> UpdatePropertiesAction {
138        UpdatePropertiesAction::new()
139    }
140
141    /// Creates a fast append action.
142    pub fn fast_append(&self) -> FastAppendAction {
143        FastAppendAction::new()
144    }
145
146    /// Creates a row delta action for row-level changes.
147    ///
148    /// Use this action for:
149    /// - CDC (Change Data Capture) ingestion
150    /// - Upsert operations
151    /// - Adding delete files (position or equality deletes)
152    pub fn row_delta(&self) -> RowDeltaAction {
153        RowDeltaAction::new()
154    }
155
156    /// Creates replace sort order action.
157    pub fn replace_sort_order(&self) -> ReplaceSortOrderAction {
158        ReplaceSortOrderAction::new()
159    }
160
161    /// Set the location of table
162    pub fn update_location(&self) -> UpdateLocationAction {
163        UpdateLocationAction::new()
164    }
165
166    /// Update the statistics of table
167    pub fn update_statistics(&self) -> UpdateStatisticsAction {
168        UpdateStatisticsAction::new()
169    }
170
171    /// Commit transaction.
172    pub async fn commit(self, catalog: &dyn Catalog) -> Result<Table> {
173        if self.actions.is_empty() {
174            // nothing to commit
175            return Ok(self.table);
176        }
177
178        let table_props =
179            TableProperties::try_from(self.table.metadata().properties()).map_err(|e| {
180                Error::new(ErrorKind::DataInvalid, "Invalid table properties").with_source(e)
181            })?;
182
183        let backoff = Self::build_backoff(table_props)?;
184        let tx = self;
185
186        (|mut tx: Transaction| async {
187            let result = tx.do_commit(catalog).await;
188            (tx, result)
189        })
190        .retry(backoff)
191        .sleep(tokio::time::sleep)
192        .context(tx)
193        .when(|e| e.retryable())
194        .await
195        .1
196    }
197
198    fn build_backoff(props: TableProperties) -> Result<ExponentialBackoff> {
199        Ok(ExponentialBuilder::new()
200            .with_min_delay(Duration::from_millis(props.commit_min_retry_wait_ms))
201            .with_max_delay(Duration::from_millis(props.commit_max_retry_wait_ms))
202            .with_total_delay(Some(Duration::from_millis(
203                props.commit_total_retry_timeout_ms,
204            )))
205            .with_max_times(props.commit_num_retries)
206            .with_factor(2.0)
207            .build())
208    }
209
210    async fn do_commit(&mut self, catalog: &dyn Catalog) -> Result<Table> {
211        let refreshed = catalog.load_table(self.table.identifier()).await?;
212
213        if self.table.metadata() != refreshed.metadata()
214            || self.table.metadata_location() != refreshed.metadata_location()
215        {
216            // current base is stale, use refreshed as base and re-apply transaction actions
217            self.table = refreshed.clone();
218        }
219
220        let mut current_table = self.table.clone();
221        let mut existing_updates: Vec<TableUpdate> = vec![];
222        let mut existing_requirements: Vec<TableRequirement> = vec![];
223
224        for action in &self.actions {
225            let action_commit = Arc::clone(action).commit(&current_table).await?;
226            // apply action commit to current_table
227            current_table = Self::apply(
228                current_table,
229                action_commit,
230                &mut existing_updates,
231                &mut existing_requirements,
232            )?;
233        }
234
235        let table_commit = TableCommit::builder()
236            .ident(self.table.identifier().to_owned())
237            .updates(existing_updates)
238            .requirements(existing_requirements)
239            .build();
240
241        catalog.update_table(table_commit).await
242    }
243}
244
245#[cfg(test)]
246mod tests {
247    use std::collections::HashMap;
248    use std::fs::File;
249    use std::io::BufReader;
250    use std::sync::Arc;
251    use std::sync::atomic::{AtomicU32, Ordering};
252
253    use crate::catalog::MockCatalog;
254    use crate::io::FileIOBuilder;
255    use crate::spec::TableMetadata;
256    use crate::table::Table;
257    use crate::transaction::{ApplyTransactionAction, Transaction};
258    use crate::{Catalog, Error, ErrorKind, TableCreation, TableIdent};
259
260    pub fn make_v1_table() -> Table {
261        let file = File::open(format!(
262            "{}/testdata/table_metadata/{}",
263            env!("CARGO_MANIFEST_DIR"),
264            "TableMetadataV1Valid.json"
265        ))
266        .unwrap();
267        let reader = BufReader::new(file);
268        let resp = serde_json::from_reader::<_, TableMetadata>(reader).unwrap();
269
270        Table::builder()
271            .metadata(resp)
272            .metadata_location("s3://bucket/test/location/metadata/v1.json".to_string())
273            .identifier(TableIdent::from_strs(["ns1", "test1"]).unwrap())
274            .file_io(FileIOBuilder::new("memory").build().unwrap())
275            .build()
276            .unwrap()
277    }
278
279    pub fn make_v2_table() -> Table {
280        let file = File::open(format!(
281            "{}/testdata/table_metadata/{}",
282            env!("CARGO_MANIFEST_DIR"),
283            "TableMetadataV2Valid.json"
284        ))
285        .unwrap();
286        let reader = BufReader::new(file);
287        let resp = serde_json::from_reader::<_, TableMetadata>(reader).unwrap();
288
289        Table::builder()
290            .metadata(resp)
291            .metadata_location("s3://bucket/test/location/metadata/v1.json".to_string())
292            .identifier(TableIdent::from_strs(["ns1", "test1"]).unwrap())
293            .file_io(FileIOBuilder::new("memory").build().unwrap())
294            .build()
295            .unwrap()
296    }
297
298    pub fn make_v2_minimal_table() -> Table {
299        let file = File::open(format!(
300            "{}/testdata/table_metadata/{}",
301            env!("CARGO_MANIFEST_DIR"),
302            "TableMetadataV2ValidMinimal.json"
303        ))
304        .unwrap();
305        let reader = BufReader::new(file);
306        let resp = serde_json::from_reader::<_, TableMetadata>(reader).unwrap();
307
308        Table::builder()
309            .metadata(resp)
310            .metadata_location("s3://bucket/test/location/metadata/v1.json".to_string())
311            .identifier(TableIdent::from_strs(["ns1", "test1"]).unwrap())
312            .file_io(FileIOBuilder::new("memory").build().unwrap())
313            .build()
314            .unwrap()
315    }
316
317    pub(crate) async fn make_v3_minimal_table_in_catalog(catalog: &impl Catalog) -> Table {
318        let table_ident =
319            TableIdent::from_strs([format!("ns1-{}", uuid::Uuid::new_v4()), "test1".to_string()])
320                .unwrap();
321
322        catalog
323            .create_namespace(table_ident.namespace(), HashMap::new())
324            .await
325            .unwrap();
326
327        let file = File::open(format!(
328            "{}/testdata/table_metadata/{}",
329            env!("CARGO_MANIFEST_DIR"),
330            "TableMetadataV3ValidMinimal.json"
331        ))
332        .unwrap();
333        let reader = BufReader::new(file);
334        let base_metadata = serde_json::from_reader::<_, TableMetadata>(reader).unwrap();
335
336        let table_creation = TableCreation::builder()
337            .schema((**base_metadata.current_schema()).clone())
338            .partition_spec((**base_metadata.default_partition_spec()).clone())
339            .sort_order((**base_metadata.default_sort_order()).clone())
340            .name(table_ident.name().to_string())
341            .format_version(crate::spec::FormatVersion::V3)
342            .build();
343
344        catalog
345            .create_table(table_ident.namespace(), table_creation)
346            .await
347            .unwrap()
348    }
349
350    /// Helper function to create a test table with retry properties
351    pub(super) fn setup_test_table(num_retries: &str) -> Table {
352        let table = make_v2_table();
353
354        // Set retry properties
355        let mut props = HashMap::new();
356        props.insert("commit.retry.min-wait-ms".to_string(), "10".to_string());
357        props.insert("commit.retry.max-wait-ms".to_string(), "100".to_string());
358        props.insert(
359            "commit.retry.total-timeout-ms".to_string(),
360            "1000".to_string(),
361        );
362        props.insert(
363            "commit.retry.num-retries".to_string(),
364            num_retries.to_string(),
365        );
366
367        // Update table properties
368        let metadata = table
369            .metadata()
370            .clone()
371            .into_builder(None)
372            .set_properties(props)
373            .unwrap()
374            .build()
375            .unwrap()
376            .metadata;
377
378        table.with_metadata(Arc::new(metadata))
379    }
380
381    /// Helper function to create a transaction with a simple update action
382    fn create_test_transaction(table: &Table) -> Transaction {
383        let tx = Transaction::new(table);
384        tx.update_table_properties()
385            .set("test.key".to_string(), "test.value".to_string())
386            .apply(tx)
387            .unwrap()
388    }
389
390    /// Helper function to set up a mock catalog with retryable errors
391    fn setup_mock_catalog_with_retryable_errors(
392        success_after_attempts: Option<u32>,
393        expected_calls: usize,
394    ) -> MockCatalog {
395        let mut mock_catalog = MockCatalog::new();
396
397        mock_catalog
398            .expect_load_table()
399            .returning_st(|_| Box::pin(async move { Ok(make_v2_table()) }));
400
401        let attempts = AtomicU32::new(0);
402        mock_catalog
403            .expect_update_table()
404            .times(expected_calls)
405            .returning_st(move |_| {
406                if let Some(success_after_attempts) = success_after_attempts {
407                    attempts.fetch_add(1, Ordering::SeqCst);
408                    if attempts.load(Ordering::SeqCst) <= success_after_attempts {
409                        Box::pin(async move {
410                            Err(
411                                Error::new(ErrorKind::CatalogCommitConflicts, "Commit conflict")
412                                    .with_retryable(true),
413                            )
414                        })
415                    } else {
416                        Box::pin(async move { Ok(make_v2_table()) })
417                    }
418                } else {
419                    // Always fail with retryable error
420                    Box::pin(async move {
421                        Err(
422                            Error::new(ErrorKind::CatalogCommitConflicts, "Commit conflict")
423                                .with_retryable(true),
424                        )
425                    })
426                }
427            });
428
429        mock_catalog
430    }
431
432    /// Helper function to set up a mock catalog with non-retryable error
433    fn setup_mock_catalog_with_non_retryable_error() -> MockCatalog {
434        let mut mock_catalog = MockCatalog::new();
435
436        mock_catalog
437            .expect_load_table()
438            .returning_st(|_| Box::pin(async move { Ok(make_v2_table()) }));
439
440        mock_catalog
441            .expect_update_table()
442            .times(1) // Should only be called once since error is not retryable
443            .returning_st(move |_| {
444                Box::pin(async move {
445                    Err(Error::new(ErrorKind::Unexpected, "Non-retryable error")
446                        .with_retryable(false))
447                })
448            });
449
450        mock_catalog
451    }
452
453    #[tokio::test]
454    async fn test_commit_retryable_error() {
455        // Create a test table with retry properties
456        let table = setup_test_table("3");
457
458        // Create a transaction with a simple update action
459        let tx = create_test_transaction(&table);
460
461        // Create a mock catalog that fails twice then succeeds
462        let mock_catalog = setup_mock_catalog_with_retryable_errors(Some(2), 3);
463
464        // Commit the transaction
465        let result = tx.commit(&mock_catalog).await;
466
467        // Verify the result
468        assert!(result.is_ok(), "Transaction should eventually succeed");
469    }
470
471    #[tokio::test]
472    async fn test_commit_non_retryable_error() {
473        // Create a test table with retry properties
474        let table = setup_test_table("3");
475
476        // Create a transaction with a simple update action
477        let tx = create_test_transaction(&table);
478
479        // Create a mock catalog that fails with non-retryable error
480        let mock_catalog = setup_mock_catalog_with_non_retryable_error();
481
482        // Commit the transaction
483        let result = tx.commit(&mock_catalog).await;
484
485        // Verify the result
486        assert!(result.is_err(), "Transaction should fail immediately");
487        if let Err(err) = result {
488            assert_eq!(err.kind(), ErrorKind::Unexpected);
489            assert_eq!(err.message(), "Non-retryable error");
490            assert!(!err.retryable(), "Error should not be retryable");
491        }
492    }
493
494    #[tokio::test]
495    async fn test_commit_max_retries_exceeded() {
496        // Create a test table with retry properties (only allow 2 retries)
497        let table = setup_test_table("2");
498
499        // Create a transaction with a simple update action
500        let tx = create_test_transaction(&table);
501
502        // Create a mock catalog that always fails with retryable error
503        let mock_catalog = setup_mock_catalog_with_retryable_errors(None, 3); // Initial attempt + 2 retries = 3 total attempts
504
505        // Commit the transaction
506        let result = tx.commit(&mock_catalog).await;
507
508        // Verify the result
509        assert!(result.is_err(), "Transaction should fail after max retries");
510        if let Err(err) = result {
511            assert_eq!(err.kind(), ErrorKind::CatalogCommitConflicts);
512            assert_eq!(err.message(), "Commit conflict");
513            assert!(err.retryable(), "Error should be retryable");
514        }
515    }
516}
517
518#[cfg(test)]
519mod test_row_lineage {
520    use crate::memory::tests::new_memory_catalog;
521    use crate::spec::{
522        DataContentType, DataFile, DataFileBuilder, DataFileFormat, Literal, Struct,
523    };
524    use crate::transaction::tests::make_v3_minimal_table_in_catalog;
525    use crate::transaction::{ApplyTransactionAction, Transaction};
526
527    #[tokio::test]
528    async fn test_fast_append_with_row_lineage() {
529        // Helper function to create a data file with specified number of rows
530        fn file_with_rows(record_count: u64) -> DataFile {
531            DataFileBuilder::default()
532                .content(DataContentType::Data)
533                .file_path(format!("test/{record_count}.parquet"))
534                .file_format(DataFileFormat::Parquet)
535                .file_size_in_bytes(100)
536                .record_count(record_count)
537                .partition(Struct::from_iter([Some(Literal::long(0))]))
538                .partition_spec_id(0)
539                .build()
540                .unwrap()
541        }
542        let catalog = new_memory_catalog().await;
543
544        let table = make_v3_minimal_table_in_catalog(&catalog).await;
545
546        // Check initial state - next_row_id should be 0
547        assert_eq!(table.metadata().next_row_id(), 0);
548
549        // First fast append with 30 rows
550        let tx = Transaction::new(&table);
551        let data_file_30 = file_with_rows(30);
552        let action = tx.fast_append().add_data_files(vec![data_file_30]);
553        let tx = action.apply(tx).unwrap();
554        let table = tx.commit(&catalog).await.unwrap();
555
556        // Check snapshot and table state after first append
557        let snapshot = table.metadata().current_snapshot().unwrap();
558        assert_eq!(snapshot.first_row_id(), Some(0));
559        assert_eq!(table.metadata().next_row_id(), 30);
560
561        // Check written manifest for first_row_id
562        let manifest_list = table
563            .metadata()
564            .current_snapshot()
565            .unwrap()
566            .load_manifest_list(table.file_io(), table.metadata())
567            .await
568            .unwrap();
569
570        assert_eq!(manifest_list.entries().len(), 1);
571        let manifest_file = &manifest_list.entries()[0];
572        assert_eq!(manifest_file.first_row_id, Some(0));
573
574        // Second fast append with 17 and 11 rows
575        let tx = Transaction::new(&table);
576        let data_file_17 = file_with_rows(17);
577        let data_file_11 = file_with_rows(11);
578        let action = tx
579            .fast_append()
580            .add_data_files(vec![data_file_17, data_file_11]);
581        let tx = action.apply(tx).unwrap();
582        let table = tx.commit(&catalog).await.unwrap();
583
584        // Check snapshot and table state after second append
585        let snapshot = table.metadata().current_snapshot().unwrap();
586        assert_eq!(snapshot.first_row_id(), Some(30));
587        assert_eq!(table.metadata().next_row_id(), 30 + 17 + 11);
588
589        // Check written manifest for first_row_id
590        let manifest_list = table
591            .metadata()
592            .current_snapshot()
593            .unwrap()
594            .load_manifest_list(table.file_io(), table.metadata())
595            .await
596            .unwrap();
597        assert_eq!(manifest_list.entries().len(), 2);
598        let manifest_file = &manifest_list.entries()[1];
599        assert_eq!(manifest_file.first_row_id, Some(30));
600    }
601}