1mod action;
54
55pub use action::*;
56mod append;
57mod row_delta;
58mod snapshot;
59mod sort_order;
60mod update_location;
61mod update_properties;
62mod update_statistics;
63mod upgrade_format_version;
64
65use std::sync::Arc;
66use std::time::Duration;
67
68use backon::{BackoffBuilder, ExponentialBackoff, ExponentialBuilder, RetryableWithContext};
69
70use crate::error::Result;
71use crate::spec::TableProperties;
72use crate::table::Table;
73use crate::transaction::action::BoxedTransactionAction;
74use crate::transaction::append::FastAppendAction;
75use crate::transaction::row_delta::RowDeltaAction;
76use crate::transaction::sort_order::ReplaceSortOrderAction;
77use crate::transaction::update_location::UpdateLocationAction;
78use crate::transaction::update_properties::UpdatePropertiesAction;
79use crate::transaction::update_statistics::UpdateStatisticsAction;
80use crate::transaction::upgrade_format_version::UpgradeFormatVersionAction;
81use crate::{Catalog, Error, ErrorKind, TableCommit, TableRequirement, TableUpdate};
82
83#[derive(Clone)]
85pub struct Transaction {
86 table: Table,
87 actions: Vec<BoxedTransactionAction>,
88}
89
90impl Transaction {
91 pub fn new(table: &Table) -> Self {
93 Self {
94 table: table.clone(),
95 actions: vec![],
96 }
97 }
98
99 fn update_table_metadata(table: Table, updates: &[TableUpdate]) -> Result<Table> {
100 let mut metadata_builder = table.metadata().clone().into_builder(None);
101 for update in updates {
102 metadata_builder = update.clone().apply(metadata_builder)?;
103 }
104
105 Ok(table.with_metadata(Arc::new(metadata_builder.build()?.metadata)))
106 }
107
108 fn apply(
111 table: Table,
112 mut action_commit: ActionCommit,
113 existing_updates: &mut Vec<TableUpdate>,
114 existing_requirements: &mut Vec<TableRequirement>,
115 ) -> Result<Table> {
116 let updates = action_commit.take_updates();
117 let requirements = action_commit.take_requirements();
118
119 for requirement in &requirements {
120 requirement.check(Some(table.metadata()))?;
121 }
122
123 let updated_table = Self::update_table_metadata(table, &updates)?;
124
125 existing_updates.extend(updates);
126 existing_requirements.extend(requirements);
127
128 Ok(updated_table)
129 }
130
131 pub fn upgrade_table_version(&self) -> UpgradeFormatVersionAction {
133 UpgradeFormatVersionAction::new()
134 }
135
136 pub fn update_table_properties(&self) -> UpdatePropertiesAction {
138 UpdatePropertiesAction::new()
139 }
140
141 pub fn fast_append(&self) -> FastAppendAction {
143 FastAppendAction::new()
144 }
145
146 pub fn row_delta(&self) -> RowDeltaAction {
153 RowDeltaAction::new()
154 }
155
156 pub fn replace_sort_order(&self) -> ReplaceSortOrderAction {
158 ReplaceSortOrderAction::new()
159 }
160
161 pub fn update_location(&self) -> UpdateLocationAction {
163 UpdateLocationAction::new()
164 }
165
166 pub fn update_statistics(&self) -> UpdateStatisticsAction {
168 UpdateStatisticsAction::new()
169 }
170
171 pub async fn commit(self, catalog: &dyn Catalog) -> Result<Table> {
173 if self.actions.is_empty() {
174 return Ok(self.table);
176 }
177
178 let table_props =
179 TableProperties::try_from(self.table.metadata().properties()).map_err(|e| {
180 Error::new(ErrorKind::DataInvalid, "Invalid table properties").with_source(e)
181 })?;
182
183 let backoff = Self::build_backoff(table_props)?;
184 let tx = self;
185
186 (|mut tx: Transaction| async {
187 let result = tx.do_commit(catalog).await;
188 (tx, result)
189 })
190 .retry(backoff)
191 .sleep(tokio::time::sleep)
192 .context(tx)
193 .when(|e| e.retryable())
194 .await
195 .1
196 }
197
198 fn build_backoff(props: TableProperties) -> Result<ExponentialBackoff> {
199 Ok(ExponentialBuilder::new()
200 .with_min_delay(Duration::from_millis(props.commit_min_retry_wait_ms))
201 .with_max_delay(Duration::from_millis(props.commit_max_retry_wait_ms))
202 .with_total_delay(Some(Duration::from_millis(
203 props.commit_total_retry_timeout_ms,
204 )))
205 .with_max_times(props.commit_num_retries)
206 .with_factor(2.0)
207 .build())
208 }
209
210 async fn do_commit(&mut self, catalog: &dyn Catalog) -> Result<Table> {
211 let refreshed = catalog.load_table(self.table.identifier()).await?;
212
213 if self.table.metadata() != refreshed.metadata()
214 || self.table.metadata_location() != refreshed.metadata_location()
215 {
216 self.table = refreshed.clone();
218 }
219
220 let mut current_table = self.table.clone();
221 let mut existing_updates: Vec<TableUpdate> = vec![];
222 let mut existing_requirements: Vec<TableRequirement> = vec![];
223
224 for action in &self.actions {
225 let action_commit = Arc::clone(action).commit(¤t_table).await?;
226 current_table = Self::apply(
228 current_table,
229 action_commit,
230 &mut existing_updates,
231 &mut existing_requirements,
232 )?;
233 }
234
235 let table_commit = TableCommit::builder()
236 .ident(self.table.identifier().to_owned())
237 .updates(existing_updates)
238 .requirements(existing_requirements)
239 .build();
240
241 catalog.update_table(table_commit).await
242 }
243}
244
245#[cfg(test)]
246mod tests {
247 use std::collections::HashMap;
248 use std::fs::File;
249 use std::io::BufReader;
250 use std::sync::Arc;
251 use std::sync::atomic::{AtomicU32, Ordering};
252
253 use crate::catalog::MockCatalog;
254 use crate::io::FileIOBuilder;
255 use crate::spec::TableMetadata;
256 use crate::table::Table;
257 use crate::transaction::{ApplyTransactionAction, Transaction};
258 use crate::{Catalog, Error, ErrorKind, TableCreation, TableIdent};
259
260 pub fn make_v1_table() -> Table {
261 let file = File::open(format!(
262 "{}/testdata/table_metadata/{}",
263 env!("CARGO_MANIFEST_DIR"),
264 "TableMetadataV1Valid.json"
265 ))
266 .unwrap();
267 let reader = BufReader::new(file);
268 let resp = serde_json::from_reader::<_, TableMetadata>(reader).unwrap();
269
270 Table::builder()
271 .metadata(resp)
272 .metadata_location("s3://bucket/test/location/metadata/v1.json".to_string())
273 .identifier(TableIdent::from_strs(["ns1", "test1"]).unwrap())
274 .file_io(FileIOBuilder::new("memory").build().unwrap())
275 .build()
276 .unwrap()
277 }
278
279 pub fn make_v2_table() -> Table {
280 let file = File::open(format!(
281 "{}/testdata/table_metadata/{}",
282 env!("CARGO_MANIFEST_DIR"),
283 "TableMetadataV2Valid.json"
284 ))
285 .unwrap();
286 let reader = BufReader::new(file);
287 let resp = serde_json::from_reader::<_, TableMetadata>(reader).unwrap();
288
289 Table::builder()
290 .metadata(resp)
291 .metadata_location("s3://bucket/test/location/metadata/v1.json".to_string())
292 .identifier(TableIdent::from_strs(["ns1", "test1"]).unwrap())
293 .file_io(FileIOBuilder::new("memory").build().unwrap())
294 .build()
295 .unwrap()
296 }
297
298 pub fn make_v2_minimal_table() -> Table {
299 let file = File::open(format!(
300 "{}/testdata/table_metadata/{}",
301 env!("CARGO_MANIFEST_DIR"),
302 "TableMetadataV2ValidMinimal.json"
303 ))
304 .unwrap();
305 let reader = BufReader::new(file);
306 let resp = serde_json::from_reader::<_, TableMetadata>(reader).unwrap();
307
308 Table::builder()
309 .metadata(resp)
310 .metadata_location("s3://bucket/test/location/metadata/v1.json".to_string())
311 .identifier(TableIdent::from_strs(["ns1", "test1"]).unwrap())
312 .file_io(FileIOBuilder::new("memory").build().unwrap())
313 .build()
314 .unwrap()
315 }
316
317 pub(crate) async fn make_v3_minimal_table_in_catalog(catalog: &impl Catalog) -> Table {
318 let table_ident =
319 TableIdent::from_strs([format!("ns1-{}", uuid::Uuid::new_v4()), "test1".to_string()])
320 .unwrap();
321
322 catalog
323 .create_namespace(table_ident.namespace(), HashMap::new())
324 .await
325 .unwrap();
326
327 let file = File::open(format!(
328 "{}/testdata/table_metadata/{}",
329 env!("CARGO_MANIFEST_DIR"),
330 "TableMetadataV3ValidMinimal.json"
331 ))
332 .unwrap();
333 let reader = BufReader::new(file);
334 let base_metadata = serde_json::from_reader::<_, TableMetadata>(reader).unwrap();
335
336 let table_creation = TableCreation::builder()
337 .schema((**base_metadata.current_schema()).clone())
338 .partition_spec((**base_metadata.default_partition_spec()).clone())
339 .sort_order((**base_metadata.default_sort_order()).clone())
340 .name(table_ident.name().to_string())
341 .format_version(crate::spec::FormatVersion::V3)
342 .build();
343
344 catalog
345 .create_table(table_ident.namespace(), table_creation)
346 .await
347 .unwrap()
348 }
349
350 pub(super) fn setup_test_table(num_retries: &str) -> Table {
352 let table = make_v2_table();
353
354 let mut props = HashMap::new();
356 props.insert("commit.retry.min-wait-ms".to_string(), "10".to_string());
357 props.insert("commit.retry.max-wait-ms".to_string(), "100".to_string());
358 props.insert(
359 "commit.retry.total-timeout-ms".to_string(),
360 "1000".to_string(),
361 );
362 props.insert(
363 "commit.retry.num-retries".to_string(),
364 num_retries.to_string(),
365 );
366
367 let metadata = table
369 .metadata()
370 .clone()
371 .into_builder(None)
372 .set_properties(props)
373 .unwrap()
374 .build()
375 .unwrap()
376 .metadata;
377
378 table.with_metadata(Arc::new(metadata))
379 }
380
381 fn create_test_transaction(table: &Table) -> Transaction {
383 let tx = Transaction::new(table);
384 tx.update_table_properties()
385 .set("test.key".to_string(), "test.value".to_string())
386 .apply(tx)
387 .unwrap()
388 }
389
390 fn setup_mock_catalog_with_retryable_errors(
392 success_after_attempts: Option<u32>,
393 expected_calls: usize,
394 ) -> MockCatalog {
395 let mut mock_catalog = MockCatalog::new();
396
397 mock_catalog
398 .expect_load_table()
399 .returning_st(|_| Box::pin(async move { Ok(make_v2_table()) }));
400
401 let attempts = AtomicU32::new(0);
402 mock_catalog
403 .expect_update_table()
404 .times(expected_calls)
405 .returning_st(move |_| {
406 if let Some(success_after_attempts) = success_after_attempts {
407 attempts.fetch_add(1, Ordering::SeqCst);
408 if attempts.load(Ordering::SeqCst) <= success_after_attempts {
409 Box::pin(async move {
410 Err(
411 Error::new(ErrorKind::CatalogCommitConflicts, "Commit conflict")
412 .with_retryable(true),
413 )
414 })
415 } else {
416 Box::pin(async move { Ok(make_v2_table()) })
417 }
418 } else {
419 Box::pin(async move {
421 Err(
422 Error::new(ErrorKind::CatalogCommitConflicts, "Commit conflict")
423 .with_retryable(true),
424 )
425 })
426 }
427 });
428
429 mock_catalog
430 }
431
432 fn setup_mock_catalog_with_non_retryable_error() -> MockCatalog {
434 let mut mock_catalog = MockCatalog::new();
435
436 mock_catalog
437 .expect_load_table()
438 .returning_st(|_| Box::pin(async move { Ok(make_v2_table()) }));
439
440 mock_catalog
441 .expect_update_table()
442 .times(1) .returning_st(move |_| {
444 Box::pin(async move {
445 Err(Error::new(ErrorKind::Unexpected, "Non-retryable error")
446 .with_retryable(false))
447 })
448 });
449
450 mock_catalog
451 }
452
453 #[tokio::test]
454 async fn test_commit_retryable_error() {
455 let table = setup_test_table("3");
457
458 let tx = create_test_transaction(&table);
460
461 let mock_catalog = setup_mock_catalog_with_retryable_errors(Some(2), 3);
463
464 let result = tx.commit(&mock_catalog).await;
466
467 assert!(result.is_ok(), "Transaction should eventually succeed");
469 }
470
471 #[tokio::test]
472 async fn test_commit_non_retryable_error() {
473 let table = setup_test_table("3");
475
476 let tx = create_test_transaction(&table);
478
479 let mock_catalog = setup_mock_catalog_with_non_retryable_error();
481
482 let result = tx.commit(&mock_catalog).await;
484
485 assert!(result.is_err(), "Transaction should fail immediately");
487 if let Err(err) = result {
488 assert_eq!(err.kind(), ErrorKind::Unexpected);
489 assert_eq!(err.message(), "Non-retryable error");
490 assert!(!err.retryable(), "Error should not be retryable");
491 }
492 }
493
494 #[tokio::test]
495 async fn test_commit_max_retries_exceeded() {
496 let table = setup_test_table("2");
498
499 let tx = create_test_transaction(&table);
501
502 let mock_catalog = setup_mock_catalog_with_retryable_errors(None, 3); let result = tx.commit(&mock_catalog).await;
507
508 assert!(result.is_err(), "Transaction should fail after max retries");
510 if let Err(err) = result {
511 assert_eq!(err.kind(), ErrorKind::CatalogCommitConflicts);
512 assert_eq!(err.message(), "Commit conflict");
513 assert!(err.retryable(), "Error should be retryable");
514 }
515 }
516}
517
518#[cfg(test)]
519mod test_row_lineage {
520 use crate::memory::tests::new_memory_catalog;
521 use crate::spec::{
522 DataContentType, DataFile, DataFileBuilder, DataFileFormat, Literal, Struct,
523 };
524 use crate::transaction::tests::make_v3_minimal_table_in_catalog;
525 use crate::transaction::{ApplyTransactionAction, Transaction};
526
527 #[tokio::test]
528 async fn test_fast_append_with_row_lineage() {
529 fn file_with_rows(record_count: u64) -> DataFile {
531 DataFileBuilder::default()
532 .content(DataContentType::Data)
533 .file_path(format!("test/{record_count}.parquet"))
534 .file_format(DataFileFormat::Parquet)
535 .file_size_in_bytes(100)
536 .record_count(record_count)
537 .partition(Struct::from_iter([Some(Literal::long(0))]))
538 .partition_spec_id(0)
539 .build()
540 .unwrap()
541 }
542 let catalog = new_memory_catalog().await;
543
544 let table = make_v3_minimal_table_in_catalog(&catalog).await;
545
546 assert_eq!(table.metadata().next_row_id(), 0);
548
549 let tx = Transaction::new(&table);
551 let data_file_30 = file_with_rows(30);
552 let action = tx.fast_append().add_data_files(vec![data_file_30]);
553 let tx = action.apply(tx).unwrap();
554 let table = tx.commit(&catalog).await.unwrap();
555
556 let snapshot = table.metadata().current_snapshot().unwrap();
558 assert_eq!(snapshot.first_row_id(), Some(0));
559 assert_eq!(table.metadata().next_row_id(), 30);
560
561 let manifest_list = table
563 .metadata()
564 .current_snapshot()
565 .unwrap()
566 .load_manifest_list(table.file_io(), table.metadata())
567 .await
568 .unwrap();
569
570 assert_eq!(manifest_list.entries().len(), 1);
571 let manifest_file = &manifest_list.entries()[0];
572 assert_eq!(manifest_file.first_row_id, Some(0));
573
574 let tx = Transaction::new(&table);
576 let data_file_17 = file_with_rows(17);
577 let data_file_11 = file_with_rows(11);
578 let action = tx
579 .fast_append()
580 .add_data_files(vec![data_file_17, data_file_11]);
581 let tx = action.apply(tx).unwrap();
582 let table = tx.commit(&catalog).await.unwrap();
583
584 let snapshot = table.metadata().current_snapshot().unwrap();
586 assert_eq!(snapshot.first_row_id(), Some(30));
587 assert_eq!(table.metadata().next_row_id(), 30 + 17 + 11);
588
589 let manifest_list = table
591 .metadata()
592 .current_snapshot()
593 .unwrap()
594 .load_manifest_list(table.file_io(), table.metadata())
595 .await
596 .unwrap();
597 assert_eq!(manifest_list.entries().len(), 2);
598 let manifest_file = &manifest_list.entries()[1];
599 assert_eq!(manifest_file.first_row_id, Some(30));
600 }
601}