1mod action;
54
55pub use action::*;
56mod append;
57mod row_delta;
58mod snapshot;
59mod sort_order;
60mod update_location;
61mod update_properties;
62mod update_statistics;
63mod upgrade_format_version;
64
65use std::sync::Arc;
66use std::time::Duration;
67
68use backon::{BackoffBuilder, ExponentialBackoff, ExponentialBuilder, RetryableWithContext};
69
70use crate::error::Result;
71use crate::spec::TableProperties;
72use crate::table::Table;
73use crate::transaction::action::BoxedTransactionAction;
74use crate::transaction::append::FastAppendAction;
75use crate::transaction::row_delta::RowDeltaAction;
76use crate::transaction::sort_order::ReplaceSortOrderAction;
77use crate::transaction::update_location::UpdateLocationAction;
78use crate::transaction::update_properties::UpdatePropertiesAction;
79use crate::transaction::update_statistics::UpdateStatisticsAction;
80use crate::transaction::upgrade_format_version::UpgradeFormatVersionAction;
81use crate::{Catalog, TableCommit, TableRequirement, TableUpdate};
82
83#[derive(Clone)]
85pub struct Transaction {
86 table: Table,
87 actions: Vec<BoxedTransactionAction>,
88}
89
90impl Transaction {
91 pub fn new(table: &Table) -> Self {
93 Self {
94 table: table.clone(),
95 actions: vec![],
96 }
97 }
98
99 fn update_table_metadata(table: Table, updates: &[TableUpdate]) -> Result<Table> {
100 let mut metadata_builder = table.metadata().clone().into_builder(None);
101 for update in updates {
102 metadata_builder = update.clone().apply(metadata_builder)?;
103 }
104
105 Ok(table.with_metadata(Arc::new(metadata_builder.build()?.metadata)))
106 }
107
108 fn apply(
111 table: Table,
112 mut action_commit: ActionCommit,
113 existing_updates: &mut Vec<TableUpdate>,
114 existing_requirements: &mut Vec<TableRequirement>,
115 ) -> Result<Table> {
116 let updates = action_commit.take_updates();
117 let requirements = action_commit.take_requirements();
118
119 for requirement in &requirements {
120 requirement.check(Some(table.metadata()))?;
121 }
122
123 let updated_table = Self::update_table_metadata(table, &updates)?;
124
125 existing_updates.extend(updates);
126 existing_requirements.extend(requirements);
127
128 Ok(updated_table)
129 }
130
131 pub fn upgrade_table_version(&self) -> UpgradeFormatVersionAction {
133 UpgradeFormatVersionAction::new()
134 }
135
136 pub fn update_table_properties(&self) -> UpdatePropertiesAction {
138 UpdatePropertiesAction::new()
139 }
140
141 pub fn fast_append(&self) -> FastAppendAction {
143 FastAppendAction::new()
144 }
145
146 pub fn row_delta(&self) -> RowDeltaAction {
153 RowDeltaAction::new()
154 }
155
156 pub fn replace_sort_order(&self) -> ReplaceSortOrderAction {
158 ReplaceSortOrderAction::new()
159 }
160
161 pub fn update_location(&self) -> UpdateLocationAction {
163 UpdateLocationAction::new()
164 }
165
166 pub fn update_statistics(&self) -> UpdateStatisticsAction {
168 UpdateStatisticsAction::new()
169 }
170
171 pub async fn commit(self, catalog: &dyn Catalog) -> Result<Table> {
173 if self.actions.is_empty() {
174 return Ok(self.table);
176 }
177
178 let table_props = self.table.metadata().table_properties()?;
179
180 let backoff = Self::build_backoff(table_props)?;
181 let tx = self;
182
183 (|mut tx: Transaction| async {
184 let result = tx.do_commit(catalog).await;
185 (tx, result)
186 })
187 .retry(backoff)
188 .sleep(tokio::time::sleep)
189 .context(tx)
190 .when(|e| e.retryable())
191 .await
192 .1
193 }
194
195 fn build_backoff(props: TableProperties) -> Result<ExponentialBackoff> {
196 Ok(ExponentialBuilder::new()
197 .with_min_delay(Duration::from_millis(props.commit_min_retry_wait_ms))
198 .with_max_delay(Duration::from_millis(props.commit_max_retry_wait_ms))
199 .with_total_delay(Some(Duration::from_millis(
200 props.commit_total_retry_timeout_ms,
201 )))
202 .with_max_times(props.commit_num_retries)
203 .with_factor(2.0)
204 .build())
205 }
206
207 async fn do_commit(&mut self, catalog: &dyn Catalog) -> Result<Table> {
208 let refreshed = catalog.load_table(self.table.identifier()).await?;
209
210 if self.table.metadata() != refreshed.metadata()
211 || self.table.metadata_location() != refreshed.metadata_location()
212 {
213 self.table = refreshed.clone();
215 }
216
217 let mut current_table = self.table.clone();
218 let mut existing_updates: Vec<TableUpdate> = vec![];
219 let mut existing_requirements: Vec<TableRequirement> = vec![];
220
221 for action in &self.actions {
222 let action_commit = Arc::clone(action).commit(¤t_table).await?;
223 current_table = Self::apply(
225 current_table,
226 action_commit,
227 &mut existing_updates,
228 &mut existing_requirements,
229 )?;
230 }
231
232 let table_commit = TableCommit::builder()
233 .ident(self.table.identifier().to_owned())
234 .updates(existing_updates)
235 .requirements(existing_requirements)
236 .build();
237
238 catalog.update_table(table_commit).await
239 }
240}
241
242#[cfg(test)]
243mod tests {
244 use std::collections::HashMap;
245 use std::fs::File;
246 use std::io::BufReader;
247 use std::sync::Arc;
248 use std::sync::atomic::{AtomicU32, Ordering};
249
250 use crate::catalog::MockCatalog;
251 use crate::io::FileIO;
252 use crate::spec::TableMetadata;
253 use crate::table::Table;
254 use crate::transaction::{ApplyTransactionAction, Transaction};
255 use crate::{Catalog, Error, ErrorKind, TableCreation, TableIdent};
256
257 pub fn make_v1_table() -> Table {
258 let file = File::open(format!(
259 "{}/testdata/table_metadata/{}",
260 env!("CARGO_MANIFEST_DIR"),
261 "TableMetadataV1Valid.json"
262 ))
263 .unwrap();
264 let reader = BufReader::new(file);
265 let resp = serde_json::from_reader::<_, TableMetadata>(reader).unwrap();
266
267 Table::builder()
268 .metadata(resp)
269 .metadata_location("s3://bucket/test/location/metadata/v1.json".to_string())
270 .identifier(TableIdent::from_strs(["ns1", "test1"]).unwrap())
271 .file_io(FileIO::new_with_memory())
272 .build()
273 .unwrap()
274 }
275
276 pub fn make_v2_table() -> Table {
277 let file = File::open(format!(
278 "{}/testdata/table_metadata/{}",
279 env!("CARGO_MANIFEST_DIR"),
280 "TableMetadataV2Valid.json"
281 ))
282 .unwrap();
283 let reader = BufReader::new(file);
284 let resp = serde_json::from_reader::<_, TableMetadata>(reader).unwrap();
285
286 Table::builder()
287 .metadata(resp)
288 .metadata_location("s3://bucket/test/location/metadata/v1.json".to_string())
289 .identifier(TableIdent::from_strs(["ns1", "test1"]).unwrap())
290 .file_io(FileIO::new_with_memory())
291 .build()
292 .unwrap()
293 }
294
295 pub fn make_v2_minimal_table() -> Table {
296 let file = File::open(format!(
297 "{}/testdata/table_metadata/{}",
298 env!("CARGO_MANIFEST_DIR"),
299 "TableMetadataV2ValidMinimal.json"
300 ))
301 .unwrap();
302 let reader = BufReader::new(file);
303 let resp = serde_json::from_reader::<_, TableMetadata>(reader).unwrap();
304
305 Table::builder()
306 .metadata(resp)
307 .metadata_location("s3://bucket/test/location/metadata/v1.json".to_string())
308 .identifier(TableIdent::from_strs(["ns1", "test1"]).unwrap())
309 .file_io(FileIO::new_with_memory())
310 .build()
311 .unwrap()
312 }
313
314 pub(crate) async fn make_v3_minimal_table_in_catalog(catalog: &impl Catalog) -> Table {
315 let table_ident =
316 TableIdent::from_strs([format!("ns1-{}", uuid::Uuid::new_v4()), "test1".to_string()])
317 .unwrap();
318
319 catalog
320 .create_namespace(table_ident.namespace(), HashMap::new())
321 .await
322 .unwrap();
323
324 let file = File::open(format!(
325 "{}/testdata/table_metadata/{}",
326 env!("CARGO_MANIFEST_DIR"),
327 "TableMetadataV3ValidMinimal.json"
328 ))
329 .unwrap();
330 let reader = BufReader::new(file);
331 let base_metadata = serde_json::from_reader::<_, TableMetadata>(reader).unwrap();
332
333 let table_creation = TableCreation::builder()
334 .schema((**base_metadata.current_schema()).clone())
335 .partition_spec((**base_metadata.default_partition_spec()).clone())
336 .sort_order((**base_metadata.default_sort_order()).clone())
337 .name(table_ident.name().to_string())
338 .format_version(crate::spec::FormatVersion::V3)
339 .build();
340
341 catalog
342 .create_table(table_ident.namespace(), table_creation)
343 .await
344 .unwrap()
345 }
346
347 pub(super) fn setup_test_table(num_retries: &str) -> Table {
349 let table = make_v2_table();
350
351 let mut props = HashMap::new();
353 props.insert("commit.retry.min-wait-ms".to_string(), "10".to_string());
354 props.insert("commit.retry.max-wait-ms".to_string(), "100".to_string());
355 props.insert(
356 "commit.retry.total-timeout-ms".to_string(),
357 "1000".to_string(),
358 );
359 props.insert(
360 "commit.retry.num-retries".to_string(),
361 num_retries.to_string(),
362 );
363
364 let metadata = table
366 .metadata()
367 .clone()
368 .into_builder(None)
369 .set_properties(props)
370 .unwrap()
371 .build()
372 .unwrap()
373 .metadata;
374
375 table.with_metadata(Arc::new(metadata))
376 }
377
378 fn create_test_transaction(table: &Table) -> Transaction {
380 let tx = Transaction::new(table);
381 tx.update_table_properties()
382 .set("test.key".to_string(), "test.value".to_string())
383 .apply(tx)
384 .unwrap()
385 }
386
387 fn setup_mock_catalog_with_retryable_errors(
389 success_after_attempts: Option<u32>,
390 expected_calls: usize,
391 ) -> MockCatalog {
392 let mut mock_catalog = MockCatalog::new();
393
394 mock_catalog
395 .expect_load_table()
396 .returning_st(|_| Box::pin(async move { Ok(make_v2_table()) }));
397
398 let attempts = AtomicU32::new(0);
399 mock_catalog
400 .expect_update_table()
401 .times(expected_calls)
402 .returning_st(move |_| {
403 if let Some(success_after_attempts) = success_after_attempts {
404 attempts.fetch_add(1, Ordering::SeqCst);
405 if attempts.load(Ordering::SeqCst) <= success_after_attempts {
406 Box::pin(async move {
407 Err(
408 Error::new(ErrorKind::CatalogCommitConflicts, "Commit conflict")
409 .with_retryable(true),
410 )
411 })
412 } else {
413 Box::pin(async move { Ok(make_v2_table()) })
414 }
415 } else {
416 Box::pin(async move {
418 Err(
419 Error::new(ErrorKind::CatalogCommitConflicts, "Commit conflict")
420 .with_retryable(true),
421 )
422 })
423 }
424 });
425
426 mock_catalog
427 }
428
429 fn setup_mock_catalog_with_non_retryable_error() -> MockCatalog {
431 let mut mock_catalog = MockCatalog::new();
432
433 mock_catalog
434 .expect_load_table()
435 .returning_st(|_| Box::pin(async move { Ok(make_v2_table()) }));
436
437 mock_catalog
438 .expect_update_table()
439 .times(1) .returning_st(move |_| {
441 Box::pin(async move {
442 Err(Error::new(ErrorKind::Unexpected, "Non-retryable error")
443 .with_retryable(false))
444 })
445 });
446
447 mock_catalog
448 }
449
450 #[tokio::test]
451 async fn test_commit_retryable_error() {
452 let table = setup_test_table("3");
454
455 let tx = create_test_transaction(&table);
457
458 let mock_catalog = setup_mock_catalog_with_retryable_errors(Some(2), 3);
460
461 let result = tx.commit(&mock_catalog).await;
463
464 assert!(result.is_ok(), "Transaction should eventually succeed");
466 }
467
468 #[tokio::test]
469 async fn test_commit_non_retryable_error() {
470 let table = setup_test_table("3");
472
473 let tx = create_test_transaction(&table);
475
476 let mock_catalog = setup_mock_catalog_with_non_retryable_error();
478
479 let result = tx.commit(&mock_catalog).await;
481
482 assert!(result.is_err(), "Transaction should fail immediately");
484 if let Err(err) = result {
485 assert_eq!(err.kind(), ErrorKind::Unexpected);
486 assert_eq!(err.message(), "Non-retryable error");
487 assert!(!err.retryable(), "Error should not be retryable");
488 }
489 }
490
491 #[tokio::test]
492 async fn test_commit_max_retries_exceeded() {
493 let table = setup_test_table("2");
495
496 let tx = create_test_transaction(&table);
498
499 let mock_catalog = setup_mock_catalog_with_retryable_errors(None, 3); let result = tx.commit(&mock_catalog).await;
504
505 assert!(result.is_err(), "Transaction should fail after max retries");
507 if let Err(err) = result {
508 assert_eq!(err.kind(), ErrorKind::CatalogCommitConflicts);
509 assert_eq!(err.message(), "Commit conflict");
510 assert!(err.retryable(), "Error should be retryable");
511 }
512 }
513}
514
515#[cfg(test)]
516mod test_row_lineage {
517 use crate::memory::tests::new_memory_catalog;
518 use crate::spec::{
519 DataContentType, DataFile, DataFileBuilder, DataFileFormat, Literal, Struct,
520 };
521 use crate::transaction::tests::make_v3_minimal_table_in_catalog;
522 use crate::transaction::{ApplyTransactionAction, Transaction};
523
524 #[tokio::test]
525 async fn test_fast_append_with_row_lineage() {
526 fn file_with_rows(record_count: u64) -> DataFile {
528 DataFileBuilder::default()
529 .content(DataContentType::Data)
530 .file_path(format!("test/{record_count}.parquet"))
531 .file_format(DataFileFormat::Parquet)
532 .file_size_in_bytes(100)
533 .record_count(record_count)
534 .partition(Struct::from_iter([Some(Literal::long(0))]))
535 .partition_spec_id(0)
536 .build()
537 .unwrap()
538 }
539 let catalog = new_memory_catalog().await;
540
541 let table = make_v3_minimal_table_in_catalog(&catalog).await;
542
543 assert_eq!(table.metadata().next_row_id(), 0);
545
546 let tx = Transaction::new(&table);
548 let data_file_30 = file_with_rows(30);
549 let action = tx.fast_append().add_data_files(vec![data_file_30]);
550 let tx = action.apply(tx).unwrap();
551 let table = tx.commit(&catalog).await.unwrap();
552
553 let snapshot = table.metadata().current_snapshot().unwrap();
555 assert_eq!(snapshot.first_row_id(), Some(0));
556 assert_eq!(table.metadata().next_row_id(), 30);
557
558 let manifest_list = table
560 .metadata()
561 .current_snapshot()
562 .unwrap()
563 .load_manifest_list(table.file_io(), table.metadata())
564 .await
565 .unwrap();
566
567 assert_eq!(manifest_list.entries().len(), 1);
568 let manifest_file = &manifest_list.entries()[0];
569 assert_eq!(manifest_file.first_row_id, Some(0));
570
571 let tx = Transaction::new(&table);
573 let data_file_17 = file_with_rows(17);
574 let data_file_11 = file_with_rows(11);
575 let action = tx
576 .fast_append()
577 .add_data_files(vec![data_file_17, data_file_11]);
578 let tx = action.apply(tx).unwrap();
579 let table = tx.commit(&catalog).await.unwrap();
580
581 let snapshot = table.metadata().current_snapshot().unwrap();
583 assert_eq!(snapshot.first_row_id(), Some(30));
584 assert_eq!(table.metadata().next_row_id(), 30 + 17 + 11);
585
586 let manifest_list = table
588 .metadata()
589 .current_snapshot()
590 .unwrap()
591 .load_manifest_list(table.file_io(), table.metadata())
592 .await
593 .unwrap();
594 assert_eq!(manifest_list.entries().len(), 2);
595 let manifest_file = &manifest_list.entries()[1];
596 assert_eq!(manifest_file.first_row_id, Some(30));
597 }
598}