iceberg/transaction/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! This module contains transaction api.
19//!
20//! The transaction API enables changes to be made to an existing table.
21//!
22//! Note that this may also have side effects, such as producing new manifest
23//! files.
24//!
25//! Below is a basic example using the "fast-append" action:
26//!
27//! ```ignore
28//! use iceberg::transaction::{ApplyTransactionAction, Transaction};
29//! use iceberg::Catalog;
30//!
31//! // Create a transaction.
32//! let tx = Transaction::new(my_table);
33//!
34//! // Create a `FastAppendAction` which will not rewrite or append
35//! // to existing metadata. This will create a new manifest.
36//! let action = tx.fast_append().add_data_files(my_data_files);
37//!
38//! // Apply the fast-append action to the given transaction, returning
39//! // the newly updated `Transaction`.
40//! let tx = action.apply(tx).unwrap();
41//!
42//!
43//! // End the transaction by committing to an `iceberg::Catalog`
44//! // implementation. This will cause a table update to occur.
45//! let table = tx
46//!     .commit(&some_catalog_impl)
47//!     .await
48//!     .unwrap();
49//! ```
50
51/// The `ApplyTransactionAction` trait provides an `apply` method
52/// that allows users to apply a transaction action to a `Transaction`.
53mod action;
54
55use std::collections::HashMap;
56
57pub use action::*;
58mod append;
59mod row_delta;
60mod snapshot;
61mod sort_order;
62mod update_location;
63mod update_properties;
64mod update_statistics;
65mod upgrade_format_version;
66
67use std::sync::Arc;
68use std::time::Duration;
69
70use backon::{BackoffBuilder, ExponentialBackoff, ExponentialBuilder, RetryableWithContext};
71
72use crate::error::Result;
73use crate::spec::{
74    PROPERTY_COMMIT_MAX_RETRY_WAIT_MS, PROPERTY_COMMIT_MAX_RETRY_WAIT_MS_DEFAULT,
75    PROPERTY_COMMIT_MIN_RETRY_WAIT_MS, PROPERTY_COMMIT_MIN_RETRY_WAIT_MS_DEFAULT,
76    PROPERTY_COMMIT_NUM_RETRIES, PROPERTY_COMMIT_NUM_RETRIES_DEFAULT,
77    PROPERTY_COMMIT_TOTAL_RETRY_TIME_MS, PROPERTY_COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT,
78};
79use crate::table::Table;
80use crate::transaction::action::BoxedTransactionAction;
81use crate::transaction::append::FastAppendAction;
82use crate::transaction::row_delta::RowDeltaAction;
83use crate::transaction::sort_order::ReplaceSortOrderAction;
84use crate::transaction::update_location::UpdateLocationAction;
85use crate::transaction::update_properties::UpdatePropertiesAction;
86use crate::transaction::update_statistics::UpdateStatisticsAction;
87use crate::transaction::upgrade_format_version::UpgradeFormatVersionAction;
88use crate::{Catalog, Error, ErrorKind, TableCommit, TableRequirement, TableUpdate};
89
90/// Table transaction.
91#[derive(Clone)]
92pub struct Transaction {
93    table: Table,
94    actions: Vec<BoxedTransactionAction>,
95}
96
97impl Transaction {
98    /// Creates a new transaction.
99    pub fn new(table: &Table) -> Self {
100        Self {
101            table: table.clone(),
102            actions: vec![],
103        }
104    }
105
106    fn update_table_metadata(table: Table, updates: &[TableUpdate]) -> Result<Table> {
107        let mut metadata_builder = table.metadata().clone().into_builder(None);
108        for update in updates {
109            metadata_builder = update.clone().apply(metadata_builder)?;
110        }
111
112        Ok(table.with_metadata(Arc::new(metadata_builder.build()?.metadata)))
113    }
114
115    /// Applies an [`ActionCommit`] to the given [`Table`], returning a new [`Table`] with updated metadata.
116    /// Also appends any derived [`TableUpdate`]s and [`TableRequirement`]s to the provided vectors.
117    fn apply(
118        table: Table,
119        mut action_commit: ActionCommit,
120        existing_updates: &mut Vec<TableUpdate>,
121        existing_requirements: &mut Vec<TableRequirement>,
122    ) -> Result<Table> {
123        let updates = action_commit.take_updates();
124        let requirements = action_commit.take_requirements();
125
126        for requirement in &requirements {
127            requirement.check(Some(table.metadata()))?;
128        }
129
130        let updated_table = Self::update_table_metadata(table, &updates)?;
131
132        existing_updates.extend(updates);
133        existing_requirements.extend(requirements);
134
135        Ok(updated_table)
136    }
137
138    /// Sets table to a new version.
139    pub fn upgrade_table_version(&self) -> UpgradeFormatVersionAction {
140        UpgradeFormatVersionAction::new()
141    }
142
143    /// Update table's property.
144    pub fn update_table_properties(&self) -> UpdatePropertiesAction {
145        UpdatePropertiesAction::new()
146    }
147
148    /// Creates a fast append action.
149    pub fn fast_append(&self) -> FastAppendAction {
150        FastAppendAction::new()
151    }
152
153    /// Creates a row delta action for row-level changes.
154    ///
155    /// Use this action for:
156    /// - CDC (Change Data Capture) ingestion
157    /// - Upsert operations
158    /// - Adding delete files (position or equality deletes)
159    pub fn row_delta(&self) -> RowDeltaAction {
160        RowDeltaAction::new()
161    }
162
163    /// Creates replace sort order action.
164    pub fn replace_sort_order(&self) -> ReplaceSortOrderAction {
165        ReplaceSortOrderAction::new()
166    }
167
168    /// Set the location of table
169    pub fn update_location(&self) -> UpdateLocationAction {
170        UpdateLocationAction::new()
171    }
172
173    /// Update the statistics of table
174    pub fn update_statistics(&self) -> UpdateStatisticsAction {
175        UpdateStatisticsAction::new()
176    }
177
178    /// Commit transaction.
179    pub async fn commit(self, catalog: &dyn Catalog) -> Result<Table> {
180        if self.actions.is_empty() {
181            // nothing to commit
182            return Ok(self.table);
183        }
184
185        let backoff = Self::build_backoff(self.table.metadata().properties())?;
186        let tx = self;
187
188        (|mut tx: Transaction| async {
189            let result = tx.do_commit(catalog).await;
190            (tx, result)
191        })
192        .retry(backoff)
193        .sleep(tokio::time::sleep)
194        .context(tx)
195        .when(|e| e.retryable())
196        .await
197        .1
198    }
199
200    fn build_backoff(props: &HashMap<String, String>) -> Result<ExponentialBackoff> {
201        let min_delay = match props.get(PROPERTY_COMMIT_MIN_RETRY_WAIT_MS) {
202            Some(value_str) => value_str.parse::<u64>().map_err(|e| {
203                Error::new(
204                    ErrorKind::DataInvalid,
205                    "Invalid value for commit.retry.min-wait-ms",
206                )
207                .with_source(e)
208            })?,
209            None => PROPERTY_COMMIT_MIN_RETRY_WAIT_MS_DEFAULT,
210        };
211        let max_delay = match props.get(PROPERTY_COMMIT_MAX_RETRY_WAIT_MS) {
212            Some(value_str) => value_str.parse::<u64>().map_err(|e| {
213                Error::new(
214                    ErrorKind::DataInvalid,
215                    "Invalid value for commit.retry.max-wait-ms",
216                )
217                .with_source(e)
218            })?,
219            None => PROPERTY_COMMIT_MAX_RETRY_WAIT_MS_DEFAULT,
220        };
221        let total_delay = match props.get(PROPERTY_COMMIT_TOTAL_RETRY_TIME_MS) {
222            Some(value_str) => value_str.parse::<u64>().map_err(|e| {
223                Error::new(
224                    ErrorKind::DataInvalid,
225                    "Invalid value for commit.retry.total-timeout-ms",
226                )
227                .with_source(e)
228            })?,
229            None => PROPERTY_COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT,
230        };
231        let max_times = match props.get(PROPERTY_COMMIT_NUM_RETRIES) {
232            Some(value_str) => value_str.parse::<usize>().map_err(|e| {
233                Error::new(
234                    ErrorKind::DataInvalid,
235                    "Invalid value for commit.retry.num-retries",
236                )
237                .with_source(e)
238            })?,
239            None => PROPERTY_COMMIT_NUM_RETRIES_DEFAULT,
240        };
241
242        Ok(ExponentialBuilder::new()
243            .with_min_delay(Duration::from_millis(min_delay))
244            .with_max_delay(Duration::from_millis(max_delay))
245            .with_total_delay(Some(Duration::from_millis(total_delay)))
246            .with_max_times(max_times)
247            .with_factor(2.0)
248            .build())
249    }
250
251    async fn do_commit(&mut self, catalog: &dyn Catalog) -> Result<Table> {
252        let refreshed = catalog.load_table(self.table.identifier()).await?;
253
254        if self.table.metadata() != refreshed.metadata()
255            || self.table.metadata_location() != refreshed.metadata_location()
256        {
257            // current base is stale, use refreshed as base and re-apply transaction actions
258            self.table = refreshed.clone();
259        }
260
261        let mut current_table = self.table.clone();
262        let mut existing_updates: Vec<TableUpdate> = vec![];
263        let mut existing_requirements: Vec<TableRequirement> = vec![];
264
265        for action in &self.actions {
266            let action_commit = Arc::clone(action).commit(&current_table).await?;
267            // apply action commit to current_table
268            current_table = Self::apply(
269                current_table,
270                action_commit,
271                &mut existing_updates,
272                &mut existing_requirements,
273            )?;
274        }
275
276        let table_commit = TableCommit::builder()
277            .ident(self.table.identifier().to_owned())
278            .updates(existing_updates)
279            .requirements(existing_requirements)
280            .build();
281
282        catalog.update_table(table_commit).await
283    }
284}
285
286#[cfg(test)]
287mod tests {
288    use std::collections::HashMap;
289    use std::fs::File;
290    use std::io::BufReader;
291    use std::sync::Arc;
292    use std::sync::atomic::{AtomicU32, Ordering};
293
294    use crate::catalog::MockCatalog;
295    use crate::io::FileIOBuilder;
296    use crate::spec::TableMetadata;
297    use crate::table::Table;
298    use crate::transaction::{ApplyTransactionAction, Transaction};
299    use crate::{Error, ErrorKind, TableIdent};
300
301    pub fn make_v1_table() -> Table {
302        let file = File::open(format!(
303            "{}/testdata/table_metadata/{}",
304            env!("CARGO_MANIFEST_DIR"),
305            "TableMetadataV1Valid.json"
306        ))
307        .unwrap();
308        let reader = BufReader::new(file);
309        let resp = serde_json::from_reader::<_, TableMetadata>(reader).unwrap();
310
311        Table::builder()
312            .metadata(resp)
313            .metadata_location("s3://bucket/test/location/metadata/v1.json".to_string())
314            .identifier(TableIdent::from_strs(["ns1", "test1"]).unwrap())
315            .file_io(FileIOBuilder::new("memory").build().unwrap())
316            .build()
317            .unwrap()
318    }
319
320    pub fn make_v2_table() -> Table {
321        let file = File::open(format!(
322            "{}/testdata/table_metadata/{}",
323            env!("CARGO_MANIFEST_DIR"),
324            "TableMetadataV2Valid.json"
325        ))
326        .unwrap();
327        let reader = BufReader::new(file);
328        let resp = serde_json::from_reader::<_, TableMetadata>(reader).unwrap();
329
330        Table::builder()
331            .metadata(resp)
332            .metadata_location("s3://bucket/test/location/metadata/v1.json".to_string())
333            .identifier(TableIdent::from_strs(["ns1", "test1"]).unwrap())
334            .file_io(FileIOBuilder::new("memory").build().unwrap())
335            .build()
336            .unwrap()
337    }
338
339    pub fn make_v2_minimal_table() -> Table {
340        let file = File::open(format!(
341            "{}/testdata/table_metadata/{}",
342            env!("CARGO_MANIFEST_DIR"),
343            "TableMetadataV2ValidMinimal.json"
344        ))
345        .unwrap();
346        let reader = BufReader::new(file);
347        let resp = serde_json::from_reader::<_, TableMetadata>(reader).unwrap();
348
349        Table::builder()
350            .metadata(resp)
351            .metadata_location("s3://bucket/test/location/metadata/v1.json".to_string())
352            .identifier(TableIdent::from_strs(["ns1", "test1"]).unwrap())
353            .file_io(FileIOBuilder::new("memory").build().unwrap())
354            .build()
355            .unwrap()
356    }
357
358    /// Helper function to create a test table with retry properties
359    fn setup_test_table(num_retries: &str) -> Table {
360        let table = make_v2_table();
361
362        // Set retry properties
363        let mut props = HashMap::new();
364        props.insert("commit.retry.min-wait-ms".to_string(), "10".to_string());
365        props.insert("commit.retry.max-wait-ms".to_string(), "100".to_string());
366        props.insert(
367            "commit.retry.total-timeout-ms".to_string(),
368            "1000".to_string(),
369        );
370        props.insert(
371            "commit.retry.num-retries".to_string(),
372            num_retries.to_string(),
373        );
374
375        // Update table properties
376        let metadata = table
377            .metadata()
378            .clone()
379            .into_builder(None)
380            .set_properties(props)
381            .unwrap()
382            .build()
383            .unwrap()
384            .metadata;
385
386        table.with_metadata(Arc::new(metadata))
387    }
388
389    /// Helper function to create a transaction with a simple update action
390    fn create_test_transaction(table: &Table) -> Transaction {
391        let tx = Transaction::new(table);
392        tx.update_table_properties()
393            .set("test.key".to_string(), "test.value".to_string())
394            .apply(tx)
395            .unwrap()
396    }
397
398    /// Helper function to set up a mock catalog with retryable errors
399    fn setup_mock_catalog_with_retryable_errors(
400        success_after_attempts: Option<u32>,
401        expected_calls: usize,
402    ) -> MockCatalog {
403        let mut mock_catalog = MockCatalog::new();
404
405        mock_catalog
406            .expect_load_table()
407            .returning_st(|_| Box::pin(async move { Ok(make_v2_table()) }));
408
409        let attempts = AtomicU32::new(0);
410        mock_catalog
411            .expect_update_table()
412            .times(expected_calls)
413            .returning_st(move |_| {
414                if let Some(success_after_attempts) = success_after_attempts {
415                    attempts.fetch_add(1, Ordering::SeqCst);
416                    if attempts.load(Ordering::SeqCst) <= success_after_attempts {
417                        Box::pin(async move {
418                            Err(
419                                Error::new(ErrorKind::CatalogCommitConflicts, "Commit conflict")
420                                    .with_retryable(true),
421                            )
422                        })
423                    } else {
424                        Box::pin(async move { Ok(make_v2_table()) })
425                    }
426                } else {
427                    // Always fail with retryable error
428                    Box::pin(async move {
429                        Err(
430                            Error::new(ErrorKind::CatalogCommitConflicts, "Commit conflict")
431                                .with_retryable(true),
432                        )
433                    })
434                }
435            });
436
437        mock_catalog
438    }
439
440    /// Helper function to set up a mock catalog with non-retryable error
441    fn setup_mock_catalog_with_non_retryable_error() -> MockCatalog {
442        let mut mock_catalog = MockCatalog::new();
443
444        mock_catalog
445            .expect_load_table()
446            .returning_st(|_| Box::pin(async move { Ok(make_v2_table()) }));
447
448        mock_catalog
449            .expect_update_table()
450            .times(1) // Should only be called once since error is not retryable
451            .returning_st(move |_| {
452                Box::pin(async move {
453                    Err(Error::new(ErrorKind::Unexpected, "Non-retryable error")
454                        .with_retryable(false))
455                })
456            });
457
458        mock_catalog
459    }
460
461    #[tokio::test]
462    async fn test_commit_retryable_error() {
463        // Create a test table with retry properties
464        let table = setup_test_table("3");
465
466        // Create a transaction with a simple update action
467        let tx = create_test_transaction(&table);
468
469        // Create a mock catalog that fails twice then succeeds
470        let mock_catalog = setup_mock_catalog_with_retryable_errors(Some(2), 3);
471
472        // Commit the transaction
473        let result = tx.commit(&mock_catalog).await;
474
475        // Verify the result
476        assert!(result.is_ok(), "Transaction should eventually succeed");
477    }
478
479    #[tokio::test]
480    async fn test_commit_non_retryable_error() {
481        // Create a test table with retry properties
482        let table = setup_test_table("3");
483
484        // Create a transaction with a simple update action
485        let tx = create_test_transaction(&table);
486
487        // Create a mock catalog that fails with non-retryable error
488        let mock_catalog = setup_mock_catalog_with_non_retryable_error();
489
490        // Commit the transaction
491        let result = tx.commit(&mock_catalog).await;
492
493        // Verify the result
494        assert!(result.is_err(), "Transaction should fail immediately");
495        if let Err(err) = result {
496            assert_eq!(err.kind(), ErrorKind::Unexpected);
497            assert_eq!(err.message(), "Non-retryable error");
498            assert!(!err.retryable(), "Error should not be retryable");
499        }
500    }
501
502    #[tokio::test]
503    async fn test_commit_max_retries_exceeded() {
504        // Create a test table with retry properties (only allow 2 retries)
505        let table = setup_test_table("2");
506
507        // Create a transaction with a simple update action
508        let tx = create_test_transaction(&table);
509
510        // Create a mock catalog that always fails with retryable error
511        let mock_catalog = setup_mock_catalog_with_retryable_errors(None, 3); // Initial attempt + 2 retries = 3 total attempts
512
513        // Commit the transaction
514        let result = tx.commit(&mock_catalog).await;
515
516        // Verify the result
517        assert!(result.is_err(), "Transaction should fail after max retries");
518        if let Err(err) = result {
519            assert_eq!(err.kind(), ErrorKind::CatalogCommitConflicts);
520            assert_eq!(err.message(), "Commit conflict");
521            assert!(err.retryable(), "Error should be retryable");
522        }
523    }
524}