1mod action;
54
55use std::collections::HashMap;
56
57pub use action::*;
58mod append;
59mod row_delta;
60mod snapshot;
61mod sort_order;
62mod update_location;
63mod update_properties;
64mod update_statistics;
65mod upgrade_format_version;
66
67use std::sync::Arc;
68use std::time::Duration;
69
70use backon::{BackoffBuilder, ExponentialBackoff, ExponentialBuilder, RetryableWithContext};
71
72use crate::error::Result;
73use crate::spec::{
74 PROPERTY_COMMIT_MAX_RETRY_WAIT_MS, PROPERTY_COMMIT_MAX_RETRY_WAIT_MS_DEFAULT,
75 PROPERTY_COMMIT_MIN_RETRY_WAIT_MS, PROPERTY_COMMIT_MIN_RETRY_WAIT_MS_DEFAULT,
76 PROPERTY_COMMIT_NUM_RETRIES, PROPERTY_COMMIT_NUM_RETRIES_DEFAULT,
77 PROPERTY_COMMIT_TOTAL_RETRY_TIME_MS, PROPERTY_COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT,
78};
79use crate::table::Table;
80use crate::transaction::action::BoxedTransactionAction;
81use crate::transaction::append::FastAppendAction;
82use crate::transaction::row_delta::RowDeltaAction;
83use crate::transaction::sort_order::ReplaceSortOrderAction;
84use crate::transaction::update_location::UpdateLocationAction;
85use crate::transaction::update_properties::UpdatePropertiesAction;
86use crate::transaction::update_statistics::UpdateStatisticsAction;
87use crate::transaction::upgrade_format_version::UpgradeFormatVersionAction;
88use crate::{Catalog, Error, ErrorKind, TableCommit, TableRequirement, TableUpdate};
89
90#[derive(Clone)]
92pub struct Transaction {
93 table: Table,
94 actions: Vec<BoxedTransactionAction>,
95}
96
97impl Transaction {
98 pub fn new(table: &Table) -> Self {
100 Self {
101 table: table.clone(),
102 actions: vec![],
103 }
104 }
105
106 fn update_table_metadata(table: Table, updates: &[TableUpdate]) -> Result<Table> {
107 let mut metadata_builder = table.metadata().clone().into_builder(None);
108 for update in updates {
109 metadata_builder = update.clone().apply(metadata_builder)?;
110 }
111
112 Ok(table.with_metadata(Arc::new(metadata_builder.build()?.metadata)))
113 }
114
115 fn apply(
118 table: Table,
119 mut action_commit: ActionCommit,
120 existing_updates: &mut Vec<TableUpdate>,
121 existing_requirements: &mut Vec<TableRequirement>,
122 ) -> Result<Table> {
123 let updates = action_commit.take_updates();
124 let requirements = action_commit.take_requirements();
125
126 for requirement in &requirements {
127 requirement.check(Some(table.metadata()))?;
128 }
129
130 let updated_table = Self::update_table_metadata(table, &updates)?;
131
132 existing_updates.extend(updates);
133 existing_requirements.extend(requirements);
134
135 Ok(updated_table)
136 }
137
138 pub fn upgrade_table_version(&self) -> UpgradeFormatVersionAction {
140 UpgradeFormatVersionAction::new()
141 }
142
143 pub fn update_table_properties(&self) -> UpdatePropertiesAction {
145 UpdatePropertiesAction::new()
146 }
147
148 pub fn fast_append(&self) -> FastAppendAction {
150 FastAppendAction::new()
151 }
152
153 pub fn row_delta(&self) -> RowDeltaAction {
160 RowDeltaAction::new()
161 }
162
163 pub fn replace_sort_order(&self) -> ReplaceSortOrderAction {
165 ReplaceSortOrderAction::new()
166 }
167
168 pub fn update_location(&self) -> UpdateLocationAction {
170 UpdateLocationAction::new()
171 }
172
173 pub fn update_statistics(&self) -> UpdateStatisticsAction {
175 UpdateStatisticsAction::new()
176 }
177
178 pub async fn commit(self, catalog: &dyn Catalog) -> Result<Table> {
180 if self.actions.is_empty() {
181 return Ok(self.table);
183 }
184
185 let backoff = Self::build_backoff(self.table.metadata().properties())?;
186 let tx = self;
187
188 (|mut tx: Transaction| async {
189 let result = tx.do_commit(catalog).await;
190 (tx, result)
191 })
192 .retry(backoff)
193 .sleep(tokio::time::sleep)
194 .context(tx)
195 .when(|e| e.retryable())
196 .await
197 .1
198 }
199
200 fn build_backoff(props: &HashMap<String, String>) -> Result<ExponentialBackoff> {
201 let min_delay = match props.get(PROPERTY_COMMIT_MIN_RETRY_WAIT_MS) {
202 Some(value_str) => value_str.parse::<u64>().map_err(|e| {
203 Error::new(
204 ErrorKind::DataInvalid,
205 "Invalid value for commit.retry.min-wait-ms",
206 )
207 .with_source(e)
208 })?,
209 None => PROPERTY_COMMIT_MIN_RETRY_WAIT_MS_DEFAULT,
210 };
211 let max_delay = match props.get(PROPERTY_COMMIT_MAX_RETRY_WAIT_MS) {
212 Some(value_str) => value_str.parse::<u64>().map_err(|e| {
213 Error::new(
214 ErrorKind::DataInvalid,
215 "Invalid value for commit.retry.max-wait-ms",
216 )
217 .with_source(e)
218 })?,
219 None => PROPERTY_COMMIT_MAX_RETRY_WAIT_MS_DEFAULT,
220 };
221 let total_delay = match props.get(PROPERTY_COMMIT_TOTAL_RETRY_TIME_MS) {
222 Some(value_str) => value_str.parse::<u64>().map_err(|e| {
223 Error::new(
224 ErrorKind::DataInvalid,
225 "Invalid value for commit.retry.total-timeout-ms",
226 )
227 .with_source(e)
228 })?,
229 None => PROPERTY_COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT,
230 };
231 let max_times = match props.get(PROPERTY_COMMIT_NUM_RETRIES) {
232 Some(value_str) => value_str.parse::<usize>().map_err(|e| {
233 Error::new(
234 ErrorKind::DataInvalid,
235 "Invalid value for commit.retry.num-retries",
236 )
237 .with_source(e)
238 })?,
239 None => PROPERTY_COMMIT_NUM_RETRIES_DEFAULT,
240 };
241
242 Ok(ExponentialBuilder::new()
243 .with_min_delay(Duration::from_millis(min_delay))
244 .with_max_delay(Duration::from_millis(max_delay))
245 .with_total_delay(Some(Duration::from_millis(total_delay)))
246 .with_max_times(max_times)
247 .with_factor(2.0)
248 .build())
249 }
250
251 async fn do_commit(&mut self, catalog: &dyn Catalog) -> Result<Table> {
252 let refreshed = catalog.load_table(self.table.identifier()).await?;
253
254 if self.table.metadata() != refreshed.metadata()
255 || self.table.metadata_location() != refreshed.metadata_location()
256 {
257 self.table = refreshed.clone();
259 }
260
261 let mut current_table = self.table.clone();
262 let mut existing_updates: Vec<TableUpdate> = vec![];
263 let mut existing_requirements: Vec<TableRequirement> = vec![];
264
265 for action in &self.actions {
266 let action_commit = Arc::clone(action).commit(¤t_table).await?;
267 current_table = Self::apply(
269 current_table,
270 action_commit,
271 &mut existing_updates,
272 &mut existing_requirements,
273 )?;
274 }
275
276 let table_commit = TableCommit::builder()
277 .ident(self.table.identifier().to_owned())
278 .updates(existing_updates)
279 .requirements(existing_requirements)
280 .build();
281
282 catalog.update_table(table_commit).await
283 }
284}
285
286#[cfg(test)]
287mod tests {
288 use std::collections::HashMap;
289 use std::fs::File;
290 use std::io::BufReader;
291 use std::sync::Arc;
292 use std::sync::atomic::{AtomicU32, Ordering};
293
294 use crate::catalog::MockCatalog;
295 use crate::io::FileIOBuilder;
296 use crate::spec::TableMetadata;
297 use crate::table::Table;
298 use crate::transaction::{ApplyTransactionAction, Transaction};
299 use crate::{Error, ErrorKind, TableIdent};
300
301 pub fn make_v1_table() -> Table {
302 let file = File::open(format!(
303 "{}/testdata/table_metadata/{}",
304 env!("CARGO_MANIFEST_DIR"),
305 "TableMetadataV1Valid.json"
306 ))
307 .unwrap();
308 let reader = BufReader::new(file);
309 let resp = serde_json::from_reader::<_, TableMetadata>(reader).unwrap();
310
311 Table::builder()
312 .metadata(resp)
313 .metadata_location("s3://bucket/test/location/metadata/v1.json".to_string())
314 .identifier(TableIdent::from_strs(["ns1", "test1"]).unwrap())
315 .file_io(FileIOBuilder::new("memory").build().unwrap())
316 .build()
317 .unwrap()
318 }
319
320 pub fn make_v2_table() -> Table {
321 let file = File::open(format!(
322 "{}/testdata/table_metadata/{}",
323 env!("CARGO_MANIFEST_DIR"),
324 "TableMetadataV2Valid.json"
325 ))
326 .unwrap();
327 let reader = BufReader::new(file);
328 let resp = serde_json::from_reader::<_, TableMetadata>(reader).unwrap();
329
330 Table::builder()
331 .metadata(resp)
332 .metadata_location("s3://bucket/test/location/metadata/v1.json".to_string())
333 .identifier(TableIdent::from_strs(["ns1", "test1"]).unwrap())
334 .file_io(FileIOBuilder::new("memory").build().unwrap())
335 .build()
336 .unwrap()
337 }
338
339 pub fn make_v2_minimal_table() -> Table {
340 let file = File::open(format!(
341 "{}/testdata/table_metadata/{}",
342 env!("CARGO_MANIFEST_DIR"),
343 "TableMetadataV2ValidMinimal.json"
344 ))
345 .unwrap();
346 let reader = BufReader::new(file);
347 let resp = serde_json::from_reader::<_, TableMetadata>(reader).unwrap();
348
349 Table::builder()
350 .metadata(resp)
351 .metadata_location("s3://bucket/test/location/metadata/v1.json".to_string())
352 .identifier(TableIdent::from_strs(["ns1", "test1"]).unwrap())
353 .file_io(FileIOBuilder::new("memory").build().unwrap())
354 .build()
355 .unwrap()
356 }
357
358 fn setup_test_table(num_retries: &str) -> Table {
360 let table = make_v2_table();
361
362 let mut props = HashMap::new();
364 props.insert("commit.retry.min-wait-ms".to_string(), "10".to_string());
365 props.insert("commit.retry.max-wait-ms".to_string(), "100".to_string());
366 props.insert(
367 "commit.retry.total-timeout-ms".to_string(),
368 "1000".to_string(),
369 );
370 props.insert(
371 "commit.retry.num-retries".to_string(),
372 num_retries.to_string(),
373 );
374
375 let metadata = table
377 .metadata()
378 .clone()
379 .into_builder(None)
380 .set_properties(props)
381 .unwrap()
382 .build()
383 .unwrap()
384 .metadata;
385
386 table.with_metadata(Arc::new(metadata))
387 }
388
389 fn create_test_transaction(table: &Table) -> Transaction {
391 let tx = Transaction::new(table);
392 tx.update_table_properties()
393 .set("test.key".to_string(), "test.value".to_string())
394 .apply(tx)
395 .unwrap()
396 }
397
398 fn setup_mock_catalog_with_retryable_errors(
400 success_after_attempts: Option<u32>,
401 expected_calls: usize,
402 ) -> MockCatalog {
403 let mut mock_catalog = MockCatalog::new();
404
405 mock_catalog
406 .expect_load_table()
407 .returning_st(|_| Box::pin(async move { Ok(make_v2_table()) }));
408
409 let attempts = AtomicU32::new(0);
410 mock_catalog
411 .expect_update_table()
412 .times(expected_calls)
413 .returning_st(move |_| {
414 if let Some(success_after_attempts) = success_after_attempts {
415 attempts.fetch_add(1, Ordering::SeqCst);
416 if attempts.load(Ordering::SeqCst) <= success_after_attempts {
417 Box::pin(async move {
418 Err(
419 Error::new(ErrorKind::CatalogCommitConflicts, "Commit conflict")
420 .with_retryable(true),
421 )
422 })
423 } else {
424 Box::pin(async move { Ok(make_v2_table()) })
425 }
426 } else {
427 Box::pin(async move {
429 Err(
430 Error::new(ErrorKind::CatalogCommitConflicts, "Commit conflict")
431 .with_retryable(true),
432 )
433 })
434 }
435 });
436
437 mock_catalog
438 }
439
440 fn setup_mock_catalog_with_non_retryable_error() -> MockCatalog {
442 let mut mock_catalog = MockCatalog::new();
443
444 mock_catalog
445 .expect_load_table()
446 .returning_st(|_| Box::pin(async move { Ok(make_v2_table()) }));
447
448 mock_catalog
449 .expect_update_table()
450 .times(1) .returning_st(move |_| {
452 Box::pin(async move {
453 Err(Error::new(ErrorKind::Unexpected, "Non-retryable error")
454 .with_retryable(false))
455 })
456 });
457
458 mock_catalog
459 }
460
461 #[tokio::test]
462 async fn test_commit_retryable_error() {
463 let table = setup_test_table("3");
465
466 let tx = create_test_transaction(&table);
468
469 let mock_catalog = setup_mock_catalog_with_retryable_errors(Some(2), 3);
471
472 let result = tx.commit(&mock_catalog).await;
474
475 assert!(result.is_ok(), "Transaction should eventually succeed");
477 }
478
479 #[tokio::test]
480 async fn test_commit_non_retryable_error() {
481 let table = setup_test_table("3");
483
484 let tx = create_test_transaction(&table);
486
487 let mock_catalog = setup_mock_catalog_with_non_retryable_error();
489
490 let result = tx.commit(&mock_catalog).await;
492
493 assert!(result.is_err(), "Transaction should fail immediately");
495 if let Err(err) = result {
496 assert_eq!(err.kind(), ErrorKind::Unexpected);
497 assert_eq!(err.message(), "Non-retryable error");
498 assert!(!err.retryable(), "Error should not be retryable");
499 }
500 }
501
502 #[tokio::test]
503 async fn test_commit_max_retries_exceeded() {
504 let table = setup_test_table("2");
506
507 let tx = create_test_transaction(&table);
509
510 let mock_catalog = setup_mock_catalog_with_retryable_errors(None, 3); let result = tx.commit(&mock_catalog).await;
515
516 assert!(result.is_err(), "Transaction should fail after max retries");
518 if let Err(err) = result {
519 assert_eq!(err.kind(), ErrorKind::CatalogCommitConflicts);
520 assert_eq!(err.message(), "Commit conflict");
521 assert!(err.retryable(), "Error should be retryable");
522 }
523 }
524}