Skip to main content

iceberg/transaction/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! This module contains transaction api.
19//!
20//! The transaction API enables changes to be made to an existing table.
21//!
22//! Note that this may also have side effects, such as producing new manifest
23//! files.
24//!
25//! Below is a basic example using the "fast-append" action:
26//!
27//! ```ignore
28//! use iceberg::transaction::{ApplyTransactionAction, Transaction};
29//! use iceberg::Catalog;
30//!
31//! // Create a transaction.
32//! let tx = Transaction::new(my_table);
33//!
34//! // Create a `FastAppendAction` which will not rewrite or append
35//! // to existing metadata. This will create a new manifest.
36//! let action = tx.fast_append().add_data_files(my_data_files);
37//!
38//! // Apply the fast-append action to the given transaction, returning
39//! // the newly updated `Transaction`.
40//! let tx = action.apply(tx).unwrap();
41//!
42//!
43//! // End the transaction by committing to an `iceberg::Catalog`
44//! // implementation. This will cause a table update to occur.
45//! let table = tx
46//!     .commit(&some_catalog_impl)
47//!     .await
48//!     .unwrap();
49//! ```
50
51/// The `ApplyTransactionAction` trait provides an `apply` method
52/// that allows users to apply a transaction action to a `Transaction`.
53mod action;
54
55pub use action::*;
56mod append;
57mod row_delta;
58mod snapshot;
59mod sort_order;
60mod update_location;
61mod update_properties;
62mod update_statistics;
63mod upgrade_format_version;
64
65use std::sync::Arc;
66use std::time::Duration;
67
68use backon::{BackoffBuilder, ExponentialBackoff, ExponentialBuilder, RetryableWithContext};
69
70use crate::error::Result;
71use crate::spec::TableProperties;
72use crate::table::Table;
73use crate::transaction::action::BoxedTransactionAction;
74use crate::transaction::append::FastAppendAction;
75use crate::transaction::row_delta::RowDeltaAction;
76use crate::transaction::sort_order::ReplaceSortOrderAction;
77use crate::transaction::update_location::UpdateLocationAction;
78use crate::transaction::update_properties::UpdatePropertiesAction;
79use crate::transaction::update_statistics::UpdateStatisticsAction;
80use crate::transaction::upgrade_format_version::UpgradeFormatVersionAction;
81use crate::{Catalog, TableCommit, TableRequirement, TableUpdate};
82
83/// Table transaction.
84#[derive(Clone)]
85pub struct Transaction {
86    table: Table,
87    actions: Vec<BoxedTransactionAction>,
88}
89
90impl Transaction {
91    /// Creates a new transaction.
92    pub fn new(table: &Table) -> Self {
93        Self {
94            table: table.clone(),
95            actions: vec![],
96        }
97    }
98
99    fn update_table_metadata(table: Table, updates: &[TableUpdate]) -> Result<Table> {
100        let mut metadata_builder = table.metadata().clone().into_builder(None);
101        for update in updates {
102            metadata_builder = update.clone().apply(metadata_builder)?;
103        }
104
105        Ok(table.with_metadata(Arc::new(metadata_builder.build()?.metadata)))
106    }
107
108    /// Applies an [`ActionCommit`] to the given [`Table`], returning a new [`Table`] with updated metadata.
109    /// Also appends any derived [`TableUpdate`]s and [`TableRequirement`]s to the provided vectors.
110    fn apply(
111        table: Table,
112        mut action_commit: ActionCommit,
113        existing_updates: &mut Vec<TableUpdate>,
114        existing_requirements: &mut Vec<TableRequirement>,
115    ) -> Result<Table> {
116        let updates = action_commit.take_updates();
117        let requirements = action_commit.take_requirements();
118
119        for requirement in &requirements {
120            requirement.check(Some(table.metadata()))?;
121        }
122
123        let updated_table = Self::update_table_metadata(table, &updates)?;
124
125        existing_updates.extend(updates);
126        existing_requirements.extend(requirements);
127
128        Ok(updated_table)
129    }
130
131    /// Sets table to a new version.
132    pub fn upgrade_table_version(&self) -> UpgradeFormatVersionAction {
133        UpgradeFormatVersionAction::new()
134    }
135
136    /// Update table's property.
137    pub fn update_table_properties(&self) -> UpdatePropertiesAction {
138        UpdatePropertiesAction::new()
139    }
140
141    /// Creates a fast append action.
142    pub fn fast_append(&self) -> FastAppendAction {
143        FastAppendAction::new()
144    }
145
146    /// Creates a row delta action for row-level changes.
147    ///
148    /// Use this action for:
149    /// - CDC (Change Data Capture) ingestion
150    /// - Upsert operations
151    /// - Adding delete files (position or equality deletes)
152    pub fn row_delta(&self) -> RowDeltaAction {
153        RowDeltaAction::new()
154    }
155
156    /// Creates replace sort order action.
157    pub fn replace_sort_order(&self) -> ReplaceSortOrderAction {
158        ReplaceSortOrderAction::new()
159    }
160
161    /// Set the location of table
162    pub fn update_location(&self) -> UpdateLocationAction {
163        UpdateLocationAction::new()
164    }
165
166    /// Update the statistics of table
167    pub fn update_statistics(&self) -> UpdateStatisticsAction {
168        UpdateStatisticsAction::new()
169    }
170
171    /// Commit transaction.
172    pub async fn commit(self, catalog: &dyn Catalog) -> Result<Table> {
173        if self.actions.is_empty() {
174            // nothing to commit
175            return Ok(self.table);
176        }
177
178        let table_props = self.table.metadata().table_properties()?;
179
180        let backoff = Self::build_backoff(table_props)?;
181        let tx = self;
182
183        (|mut tx: Transaction| async {
184            let result = tx.do_commit(catalog).await;
185            (tx, result)
186        })
187        .retry(backoff)
188        .sleep(tokio::time::sleep)
189        .context(tx)
190        .when(|e| e.retryable())
191        .await
192        .1
193    }
194
195    fn build_backoff(props: TableProperties) -> Result<ExponentialBackoff> {
196        Ok(ExponentialBuilder::new()
197            .with_min_delay(Duration::from_millis(props.commit_min_retry_wait_ms))
198            .with_max_delay(Duration::from_millis(props.commit_max_retry_wait_ms))
199            .with_total_delay(Some(Duration::from_millis(
200                props.commit_total_retry_timeout_ms,
201            )))
202            .with_max_times(props.commit_num_retries)
203            .with_factor(2.0)
204            .build())
205    }
206
207    async fn do_commit(&mut self, catalog: &dyn Catalog) -> Result<Table> {
208        let refreshed = catalog.load_table(self.table.identifier()).await?;
209
210        if self.table.metadata() != refreshed.metadata()
211            || self.table.metadata_location() != refreshed.metadata_location()
212        {
213            // current base is stale, use refreshed as base and re-apply transaction actions
214            self.table = refreshed.clone();
215        }
216
217        let mut current_table = self.table.clone();
218        let mut existing_updates: Vec<TableUpdate> = vec![];
219        let mut existing_requirements: Vec<TableRequirement> = vec![];
220
221        for action in &self.actions {
222            let action_commit = Arc::clone(action).commit(&current_table).await?;
223            // apply action commit to current_table
224            current_table = Self::apply(
225                current_table,
226                action_commit,
227                &mut existing_updates,
228                &mut existing_requirements,
229            )?;
230        }
231
232        let table_commit = TableCommit::builder()
233            .ident(self.table.identifier().to_owned())
234            .updates(existing_updates)
235            .requirements(existing_requirements)
236            .build();
237
238        catalog.update_table(table_commit).await
239    }
240}
241
242#[cfg(test)]
243mod tests {
244    use std::collections::HashMap;
245    use std::fs::File;
246    use std::io::BufReader;
247    use std::sync::Arc;
248    use std::sync::atomic::{AtomicU32, Ordering};
249
250    use crate::catalog::MockCatalog;
251    use crate::io::FileIO;
252    use crate::spec::TableMetadata;
253    use crate::table::Table;
254    use crate::transaction::{ApplyTransactionAction, Transaction};
255    use crate::{Catalog, Error, ErrorKind, TableCreation, TableIdent};
256
257    pub fn make_v1_table() -> Table {
258        let file = File::open(format!(
259            "{}/testdata/table_metadata/{}",
260            env!("CARGO_MANIFEST_DIR"),
261            "TableMetadataV1Valid.json"
262        ))
263        .unwrap();
264        let reader = BufReader::new(file);
265        let resp = serde_json::from_reader::<_, TableMetadata>(reader).unwrap();
266
267        Table::builder()
268            .metadata(resp)
269            .metadata_location("s3://bucket/test/location/metadata/v1.json".to_string())
270            .identifier(TableIdent::from_strs(["ns1", "test1"]).unwrap())
271            .file_io(FileIO::new_with_memory())
272            .build()
273            .unwrap()
274    }
275
276    pub fn make_v2_table() -> Table {
277        let file = File::open(format!(
278            "{}/testdata/table_metadata/{}",
279            env!("CARGO_MANIFEST_DIR"),
280            "TableMetadataV2Valid.json"
281        ))
282        .unwrap();
283        let reader = BufReader::new(file);
284        let resp = serde_json::from_reader::<_, TableMetadata>(reader).unwrap();
285
286        Table::builder()
287            .metadata(resp)
288            .metadata_location("s3://bucket/test/location/metadata/v1.json".to_string())
289            .identifier(TableIdent::from_strs(["ns1", "test1"]).unwrap())
290            .file_io(FileIO::new_with_memory())
291            .build()
292            .unwrap()
293    }
294
295    pub fn make_v2_minimal_table() -> Table {
296        let file = File::open(format!(
297            "{}/testdata/table_metadata/{}",
298            env!("CARGO_MANIFEST_DIR"),
299            "TableMetadataV2ValidMinimal.json"
300        ))
301        .unwrap();
302        let reader = BufReader::new(file);
303        let resp = serde_json::from_reader::<_, TableMetadata>(reader).unwrap();
304
305        Table::builder()
306            .metadata(resp)
307            .metadata_location("s3://bucket/test/location/metadata/v1.json".to_string())
308            .identifier(TableIdent::from_strs(["ns1", "test1"]).unwrap())
309            .file_io(FileIO::new_with_memory())
310            .build()
311            .unwrap()
312    }
313
314    pub(crate) async fn make_v3_minimal_table_in_catalog(catalog: &impl Catalog) -> Table {
315        let table_ident =
316            TableIdent::from_strs([format!("ns1-{}", uuid::Uuid::new_v4()), "test1".to_string()])
317                .unwrap();
318
319        catalog
320            .create_namespace(table_ident.namespace(), HashMap::new())
321            .await
322            .unwrap();
323
324        let file = File::open(format!(
325            "{}/testdata/table_metadata/{}",
326            env!("CARGO_MANIFEST_DIR"),
327            "TableMetadataV3ValidMinimal.json"
328        ))
329        .unwrap();
330        let reader = BufReader::new(file);
331        let base_metadata = serde_json::from_reader::<_, TableMetadata>(reader).unwrap();
332
333        let table_creation = TableCreation::builder()
334            .schema((**base_metadata.current_schema()).clone())
335            .partition_spec((**base_metadata.default_partition_spec()).clone())
336            .sort_order((**base_metadata.default_sort_order()).clone())
337            .name(table_ident.name().to_string())
338            .format_version(crate::spec::FormatVersion::V3)
339            .build();
340
341        catalog
342            .create_table(table_ident.namespace(), table_creation)
343            .await
344            .unwrap()
345    }
346
347    /// Helper function to create a test table with retry properties
348    pub(super) fn setup_test_table(num_retries: &str) -> Table {
349        let table = make_v2_table();
350
351        // Set retry properties
352        let mut props = HashMap::new();
353        props.insert("commit.retry.min-wait-ms".to_string(), "10".to_string());
354        props.insert("commit.retry.max-wait-ms".to_string(), "100".to_string());
355        props.insert(
356            "commit.retry.total-timeout-ms".to_string(),
357            "1000".to_string(),
358        );
359        props.insert(
360            "commit.retry.num-retries".to_string(),
361            num_retries.to_string(),
362        );
363
364        // Update table properties
365        let metadata = table
366            .metadata()
367            .clone()
368            .into_builder(None)
369            .set_properties(props)
370            .unwrap()
371            .build()
372            .unwrap()
373            .metadata;
374
375        table.with_metadata(Arc::new(metadata))
376    }
377
378    /// Helper function to create a transaction with a simple update action
379    fn create_test_transaction(table: &Table) -> Transaction {
380        let tx = Transaction::new(table);
381        tx.update_table_properties()
382            .set("test.key".to_string(), "test.value".to_string())
383            .apply(tx)
384            .unwrap()
385    }
386
387    /// Helper function to set up a mock catalog with retryable errors
388    fn setup_mock_catalog_with_retryable_errors(
389        success_after_attempts: Option<u32>,
390        expected_calls: usize,
391    ) -> MockCatalog {
392        let mut mock_catalog = MockCatalog::new();
393
394        mock_catalog
395            .expect_load_table()
396            .returning_st(|_| Box::pin(async move { Ok(make_v2_table()) }));
397
398        let attempts = AtomicU32::new(0);
399        mock_catalog
400            .expect_update_table()
401            .times(expected_calls)
402            .returning_st(move |_| {
403                if let Some(success_after_attempts) = success_after_attempts {
404                    attempts.fetch_add(1, Ordering::SeqCst);
405                    if attempts.load(Ordering::SeqCst) <= success_after_attempts {
406                        Box::pin(async move {
407                            Err(
408                                Error::new(ErrorKind::CatalogCommitConflicts, "Commit conflict")
409                                    .with_retryable(true),
410                            )
411                        })
412                    } else {
413                        Box::pin(async move { Ok(make_v2_table()) })
414                    }
415                } else {
416                    // Always fail with retryable error
417                    Box::pin(async move {
418                        Err(
419                            Error::new(ErrorKind::CatalogCommitConflicts, "Commit conflict")
420                                .with_retryable(true),
421                        )
422                    })
423                }
424            });
425
426        mock_catalog
427    }
428
429    /// Helper function to set up a mock catalog with non-retryable error
430    fn setup_mock_catalog_with_non_retryable_error() -> MockCatalog {
431        let mut mock_catalog = MockCatalog::new();
432
433        mock_catalog
434            .expect_load_table()
435            .returning_st(|_| Box::pin(async move { Ok(make_v2_table()) }));
436
437        mock_catalog
438            .expect_update_table()
439            .times(1) // Should only be called once since error is not retryable
440            .returning_st(move |_| {
441                Box::pin(async move {
442                    Err(Error::new(ErrorKind::Unexpected, "Non-retryable error")
443                        .with_retryable(false))
444                })
445            });
446
447        mock_catalog
448    }
449
450    #[tokio::test]
451    async fn test_commit_retryable_error() {
452        // Create a test table with retry properties
453        let table = setup_test_table("3");
454
455        // Create a transaction with a simple update action
456        let tx = create_test_transaction(&table);
457
458        // Create a mock catalog that fails twice then succeeds
459        let mock_catalog = setup_mock_catalog_with_retryable_errors(Some(2), 3);
460
461        // Commit the transaction
462        let result = tx.commit(&mock_catalog).await;
463
464        // Verify the result
465        assert!(result.is_ok(), "Transaction should eventually succeed");
466    }
467
468    #[tokio::test]
469    async fn test_commit_non_retryable_error() {
470        // Create a test table with retry properties
471        let table = setup_test_table("3");
472
473        // Create a transaction with a simple update action
474        let tx = create_test_transaction(&table);
475
476        // Create a mock catalog that fails with non-retryable error
477        let mock_catalog = setup_mock_catalog_with_non_retryable_error();
478
479        // Commit the transaction
480        let result = tx.commit(&mock_catalog).await;
481
482        // Verify the result
483        assert!(result.is_err(), "Transaction should fail immediately");
484        if let Err(err) = result {
485            assert_eq!(err.kind(), ErrorKind::Unexpected);
486            assert_eq!(err.message(), "Non-retryable error");
487            assert!(!err.retryable(), "Error should not be retryable");
488        }
489    }
490
491    #[tokio::test]
492    async fn test_commit_max_retries_exceeded() {
493        // Create a test table with retry properties (only allow 2 retries)
494        let table = setup_test_table("2");
495
496        // Create a transaction with a simple update action
497        let tx = create_test_transaction(&table);
498
499        // Create a mock catalog that always fails with retryable error
500        let mock_catalog = setup_mock_catalog_with_retryable_errors(None, 3); // Initial attempt + 2 retries = 3 total attempts
501
502        // Commit the transaction
503        let result = tx.commit(&mock_catalog).await;
504
505        // Verify the result
506        assert!(result.is_err(), "Transaction should fail after max retries");
507        if let Err(err) = result {
508            assert_eq!(err.kind(), ErrorKind::CatalogCommitConflicts);
509            assert_eq!(err.message(), "Commit conflict");
510            assert!(err.retryable(), "Error should be retryable");
511        }
512    }
513}
514
515#[cfg(test)]
516mod test_row_lineage {
517    use crate::memory::tests::new_memory_catalog;
518    use crate::spec::{
519        DataContentType, DataFile, DataFileBuilder, DataFileFormat, Literal, Struct,
520    };
521    use crate::transaction::tests::make_v3_minimal_table_in_catalog;
522    use crate::transaction::{ApplyTransactionAction, Transaction};
523
524    #[tokio::test]
525    async fn test_fast_append_with_row_lineage() {
526        // Helper function to create a data file with specified number of rows
527        fn file_with_rows(record_count: u64) -> DataFile {
528            DataFileBuilder::default()
529                .content(DataContentType::Data)
530                .file_path(format!("test/{record_count}.parquet"))
531                .file_format(DataFileFormat::Parquet)
532                .file_size_in_bytes(100)
533                .record_count(record_count)
534                .partition(Struct::from_iter([Some(Literal::long(0))]))
535                .partition_spec_id(0)
536                .build()
537                .unwrap()
538        }
539        let catalog = new_memory_catalog().await;
540
541        let table = make_v3_minimal_table_in_catalog(&catalog).await;
542
543        // Check initial state - next_row_id should be 0
544        assert_eq!(table.metadata().next_row_id(), 0);
545
546        // First fast append with 30 rows
547        let tx = Transaction::new(&table);
548        let data_file_30 = file_with_rows(30);
549        let action = tx.fast_append().add_data_files(vec![data_file_30]);
550        let tx = action.apply(tx).unwrap();
551        let table = tx.commit(&catalog).await.unwrap();
552
553        // Check snapshot and table state after first append
554        let snapshot = table.metadata().current_snapshot().unwrap();
555        assert_eq!(snapshot.first_row_id(), Some(0));
556        assert_eq!(table.metadata().next_row_id(), 30);
557
558        // Check written manifest for first_row_id
559        let manifest_list = table
560            .metadata()
561            .current_snapshot()
562            .unwrap()
563            .load_manifest_list(table.file_io(), table.metadata())
564            .await
565            .unwrap();
566
567        assert_eq!(manifest_list.entries().len(), 1);
568        let manifest_file = &manifest_list.entries()[0];
569        assert_eq!(manifest_file.first_row_id, Some(0));
570
571        // Second fast append with 17 and 11 rows
572        let tx = Transaction::new(&table);
573        let data_file_17 = file_with_rows(17);
574        let data_file_11 = file_with_rows(11);
575        let action = tx
576            .fast_append()
577            .add_data_files(vec![data_file_17, data_file_11]);
578        let tx = action.apply(tx).unwrap();
579        let table = tx.commit(&catalog).await.unwrap();
580
581        // Check snapshot and table state after second append
582        let snapshot = table.metadata().current_snapshot().unwrap();
583        assert_eq!(snapshot.first_row_id(), Some(30));
584        assert_eq!(table.metadata().next_row_id(), 30 + 17 + 11);
585
586        // Check written manifest for first_row_id
587        let manifest_list = table
588            .metadata()
589            .current_snapshot()
590            .unwrap()
591            .load_manifest_list(table.file_io(), table.metadata())
592            .await
593            .unwrap();
594        assert_eq!(manifest_list.entries().len(), 2);
595        let manifest_file = &manifest_list.entries()[1];
596        assert_eq!(manifest_file.first_row_id, Some(30));
597    }
598}