1use std::collections::BTreeMap;
13use std::fmt::Debug;
14use std::sync::Arc;
15
16use differential_dataflow::Hashable;
17use differential_dataflow::difference::Semigroup;
18use differential_dataflow::lattice::Lattice;
19use futures::StreamExt;
20use futures::stream::FuturesUnordered;
21use mz_ore::cast::CastFrom;
22use mz_ore::instrument;
23use mz_persist_client::ShardId;
24use mz_persist_client::batch::{Batch, ProtoBatch};
25use mz_persist_types::txn::{TxnsCodec, TxnsEntry};
26use mz_persist_types::{Codec, Codec64, Opaque, StepForward};
27use prost::Message;
28use timely::order::TotalOrder;
29use timely::progress::{Antichain, Timestamp};
30use tracing::debug;
31
32use crate::proto::ProtoIdBatch;
33use crate::txns::{Tidy, TxnsHandle};
34
35#[derive(Debug)]
37pub(crate) struct TxnWrite<K, V, T, D> {
38 pub(crate) batches: Vec<Batch<K, V, T, D>>,
39 pub(crate) staged: Vec<ProtoBatch>,
40 pub(crate) writes: Vec<(K, V, D)>,
41}
42
43impl<K, V, T, D> TxnWrite<K, V, T, D> {
44 pub fn merge(&mut self, other: Self) {
46 self.batches.extend(other.batches);
47 self.staged.extend(other.staged);
48 self.writes.extend(other.writes);
49 }
50}
51
52impl<K, V, T, D> Default for TxnWrite<K, V, T, D> {
53 fn default() -> Self {
54 Self {
55 batches: Vec::default(),
56 staged: Vec::default(),
57 writes: Vec::default(),
58 }
59 }
60}
61
62#[derive(Debug)]
64pub struct Txn<K, V, T, D> {
65 pub(crate) writes: BTreeMap<ShardId, TxnWrite<K, V, T, D>>,
66 tidy: Tidy,
67}
68
69impl<K, V, T, D> Txn<K, V, T, D>
70where
71 K: Debug + Codec,
72 V: Debug + Codec,
73 T: Timestamp + Lattice + TotalOrder + StepForward + Codec64 + Sync,
74 D: Debug + Semigroup + Ord + Codec64 + Send + Sync,
75{
76 pub(crate) fn new() -> Self {
77 Txn {
78 writes: BTreeMap::default(),
79 tidy: Tidy::default(),
80 }
81 }
82
83 #[allow(clippy::unused_async)]
90 pub async fn write(&mut self, data_id: &ShardId, key: K, val: V, diff: D) {
91 self.writes
92 .entry(*data_id)
93 .or_default()
94 .writes
95 .push((key, val, diff))
96 }
97
98 pub fn write_batch(&mut self, data_id: &ShardId, batch: ProtoBatch) {
102 self.writes.entry(*data_id).or_default().staged.push(batch)
103 }
104
105 #[instrument(level = "debug", fields(ts = ?commit_ts))]
119 pub async fn commit_at<O, C>(
120 &mut self,
121 handle: &mut TxnsHandle<K, V, T, D, O, C>,
122 commit_ts: T,
123 ) -> Result<TxnApply<T>, T>
124 where
125 O: Opaque + Debug + Codec64,
126 C: TxnsCodec,
127 {
128 let op = &Arc::clone(&handle.metrics).commit;
129 op.run(async {
130 let mut txns_upper = handle
131 .txns_write
132 .shared_upper()
133 .into_option()
134 .expect("txns shard should not be closed");
135
136 loop {
137 txns_upper = handle.txns_cache.update_ge(&txns_upper).await.clone();
138
139 if commit_ts < txns_upper {
144 debug!(
145 "commit_at {:?} mismatch current={:?}",
146 commit_ts, txns_upper
147 );
148 return Err(txns_upper);
149 }
150 for (data_id, _) in self.writes.iter() {
152 assert!(
153 handle
154 .txns_cache
155 .registered_at_progress(data_id, &txns_upper),
156 "{} should be registered to commit at {:?}",
157 data_id,
158 txns_upper,
159 );
160 }
161 debug!(
162 "commit_at {:?}: [{:?}, {:?}) begin",
163 commit_ts,
164 txns_upper,
165 commit_ts.step_forward(),
166 );
167
168 let txn_batches_updates = FuturesUnordered::new();
169 while let Some((data_id, updates)) = self.writes.pop_first() {
170 let data_write = handle.datas.take_write_for_commit(&data_id).unwrap_or_else(
171 || {
172 panic!(
173 "data shard {} must be registered with this Txn handle to commit",
174 data_id
175 )
176 },
177 );
178 let commit_ts = commit_ts.clone();
179 txn_batches_updates.push(async move {
180 let mut batches =
181 Vec::with_capacity(updates.staged.len() + updates.batches.len() + 1);
182
183 if !updates.writes.is_empty() {
185 let mut batch = data_write.builder(Antichain::from_elem(T::minimum()));
186 for (k, v, d) in updates.writes.iter() {
187 batch.add(k, v, &commit_ts, d).await.expect("valid usage");
188 }
189 let batch = batch
190 .finish(Antichain::from_elem(commit_ts.step_forward()))
191 .await
192 .expect("valid usage");
193 let batch = batch.into_transmittable_batch();
194 batches.push(batch);
195 }
196
197 batches.extend(updates.staged.into_iter());
199 batches.extend(
200 updates
201 .batches
202 .into_iter()
203 .map(|b| b.into_transmittable_batch()),
204 );
205
206 let batches: Vec<_> = batches
208 .into_iter()
209 .map(|batch| {
210 let mut batch = data_write.batch_from_transmittable_batch(batch);
211 batch
212 .rewrite_ts(
213 &Antichain::from_elem(commit_ts.clone()),
214 Antichain::from_elem(commit_ts.step_forward()),
215 )
216 .expect("invalid usage");
217 batch.into_transmittable_batch()
218 })
219 .collect();
220
221 let batch_updates = batches
222 .into_iter()
223 .map(|batch| {
224 let batch_raw = ProtoIdBatch::new(batch.clone()).encode_to_vec();
236 debug!(
237 "wrote {:.9} batch {}",
238 data_id.to_string(),
239 batch_raw.hashed(),
240 );
241 let update = C::encode(TxnsEntry::Append(
242 data_id,
243 T::encode(&commit_ts),
244 batch_raw,
245 ));
246 (batch, update)
247 })
248 .collect::<Vec<_>>();
249 (data_write, batch_updates)
250 })
251 }
252 let txn_batches_updates = txn_batches_updates.collect::<Vec<_>>().await;
253 let mut txns_updates = txn_batches_updates
254 .iter()
255 .flat_map(|(_, batch_updates)| batch_updates.iter().map(|(_, updates)| updates))
256 .map(|(key, val)| ((key, val), &commit_ts, 1))
257 .collect::<Vec<_>>();
258 let apply_is_empty = txns_updates.is_empty();
259
260 let filtered_retractions = handle
266 .read_cache()
267 .filter_retractions(&txns_upper, self.tidy.retractions.iter())
268 .map(|(batch_raw, (ts, data_id))| {
269 C::encode(TxnsEntry::Append(*data_id, *ts, batch_raw.clone()))
270 })
271 .collect::<Vec<_>>();
272 txns_updates.extend(
273 filtered_retractions
274 .iter()
275 .map(|(key, val)| ((key, val), &commit_ts, -1)),
276 );
277
278 let res = crate::small_caa(
279 || "txns commit",
280 &mut handle.txns_write,
281 &txns_updates,
282 txns_upper.clone(),
283 commit_ts.step_forward(),
284 )
285 .await;
286 match res {
287 Ok(()) => {
288 debug!(
289 "commit_at {:?}: [{:?}, {:?}) success",
290 commit_ts,
291 txns_upper,
292 commit_ts.step_forward(),
293 );
294 for (data_write, batch_updates) in txn_batches_updates {
297 for (batch, _) in batch_updates {
298 let batch = data_write
299 .batch_from_transmittable_batch(batch)
300 .into_hollow_batch();
301 handle.metrics.batches.commit_count.inc();
302 handle
303 .metrics
304 .batches
305 .commit_bytes
306 .inc_by(u64::cast_from(batch.encoded_size_bytes()));
307 }
308 handle.datas.put_write_for_commit(data_write);
309 }
310 return Ok(TxnApply {
311 is_empty: apply_is_empty,
312 commit_ts,
313 });
314 }
315 Err(new_txns_upper) => {
316 handle.metrics.commit.retry_count.inc();
317 assert!(txns_upper < new_txns_upper);
318 txns_upper = new_txns_upper;
319 for (data_write, batch_updates) in txn_batches_updates {
320 let batches = batch_updates
321 .into_iter()
322 .map(|(batch, _)| {
323 data_write.batch_from_transmittable_batch(batch.clone())
324 })
325 .collect();
326 let txn_write = TxnWrite {
327 writes: Vec::new(),
328 staged: Vec::new(),
329 batches,
330 };
331 self.writes.insert(data_write.shard_id(), txn_write);
332 handle.datas.put_write_for_commit(data_write);
333 }
334 let _ = handle.txns_cache.update_ge(&txns_upper).await;
335 continue;
336 }
337 }
338 }
339 })
340 .await
341 }
342
343 pub fn merge(&mut self, other: Self) {
345 for (data_id, writes) in other.writes {
346 self.writes.entry(data_id).or_default().merge(writes);
347 }
348 self.tidy.merge(other.tidy);
349 }
350
351 pub fn tidy(&mut self, tidy: Tidy) {
355 self.tidy.merge(tidy);
356 }
357
358 pub fn take_tidy(&mut self) -> Tidy {
361 std::mem::take(&mut self.tidy)
362 }
363}
364
365#[derive(Debug)]
368#[cfg_attr(any(test, debug_assertions), derive(PartialEq))]
369pub struct TxnApply<T> {
370 is_empty: bool,
371 pub(crate) commit_ts: T,
372}
373
374impl<T> TxnApply<T> {
375 pub async fn apply<K, V, D, O, C>(self, handle: &mut TxnsHandle<K, V, T, D, O, C>) -> Tidy
377 where
378 K: Debug + Codec,
379 V: Debug + Codec,
380 T: Timestamp + Lattice + TotalOrder + StepForward + Codec64 + Sync,
381 D: Debug + Semigroup + Ord + Codec64 + Send + Sync,
382 O: Opaque + Debug + Codec64,
383 C: TxnsCodec,
384 {
385 debug!("txn apply {:?}", self.commit_ts);
386 handle.apply_le(&self.commit_ts).await
387 }
388
389 pub fn is_empty(&self) -> bool {
394 self.is_empty
395 }
396}
397
398#[cfg(test)]
399mod tests {
400 use std::time::{Duration, SystemTime};
401
402 use futures::StreamExt;
403 use futures::stream::FuturesUnordered;
404 use mz_ore::assert_err;
405 use mz_persist_client::PersistClient;
406
407 use crate::tests::writer;
408 use crate::txn_cache::TxnsCache;
409
410 use super::*;
411
412 #[mz_ore::test(tokio::test)]
413 #[cfg_attr(miri, ignore)] async fn commit_at() {
415 let client = PersistClient::new_for_tests().await;
416 let mut txns = TxnsHandle::expect_open(client.clone()).await;
417 let mut cache = TxnsCache::expect_open(0, &txns).await;
418 let d0 = txns.expect_register(1).await;
419 let d1 = txns.expect_register(2).await;
420
421 let mut txn = txns.begin();
423 txn.write(&d0, "0".into(), (), 1).await;
424 let mut other = txns.begin();
425 other.write(&d0, "1".into(), (), 1).await;
426 other.write(&d1, "A".into(), (), 1).await;
427 txn.merge(other);
428 txn.commit_at(&mut txns, 3).await.unwrap();
429
430 txns.begin().commit_at(&mut txns, 5).await.unwrap();
432
433 let mut txn = txns.begin();
436 txn.write(&d0, "2".into(), (), 1).await;
437 assert_eq!(txn.commit_at(&mut txns, 4).await, Err(6));
438 txn.commit_at(&mut txns, 6).await.unwrap();
439 txns.apply_le(&6).await;
440
441 let expected_d0 = vec!["0".to_owned(), "1".to_owned(), "2".to_owned()];
442 let actual_d0 = cache.expect_snapshot(&client, d0, 6).await;
443 assert_eq!(actual_d0, expected_d0);
444
445 let expected_d1 = vec!["A".to_owned()];
446 let actual_d1 = cache.expect_snapshot(&client, d1, 6).await;
447 assert_eq!(actual_d1, expected_d1);
448 }
449
450 #[mz_ore::test(tokio::test)]
451 #[cfg_attr(miri, ignore)] async fn apply_and_tidy() {
453 let mut txns = TxnsHandle::expect_open(PersistClient::new_for_tests().await).await;
454 let log = txns.new_log();
455 let mut cache = TxnsCache::expect_open(0, &txns).await;
456 let d0 = txns.expect_register(1).await;
457
458 let mut txn = txns.begin_test();
461 txn.write(&d0, "2".into(), (), 1).await;
462 let apply_2 = txn.commit_at(&mut txns, 2).await.unwrap();
463 log.record_txn(2, &txn);
464 assert_eq!(apply_2.is_empty(), false);
465 let _ = cache.update_gt(&2).await;
466 cache.mark_register_applied(&2);
467 assert_eq!(cache.min_unapplied_ts(), &2);
468 assert_eq!(cache.unapplied().count(), 1);
469
470 let tidy_2 = apply_2.apply(&mut txns).await;
473 assert_eq!(cache.min_unapplied_ts(), &2);
474
475 txns.tidy_at(3, tidy_2).await.unwrap();
477 let _ = cache.update_gt(&3).await;
478 assert_eq!(cache.min_unapplied_ts(), &4);
479 assert_eq!(cache.unapplied().count(), 0);
480
481 let tidy_4 = txns.expect_commit_at(4, d0, &["4"], &log).await;
484 let _ = cache.update_gt(&4).await;
485 assert_eq!(cache.min_unapplied_ts(), &4);
486 let mut txn0 = txns.begin_test();
487 txn0.write(&d0, "5".into(), (), 1).await;
488 txn0.tidy(tidy_4);
489 let mut txn1 = txns.begin_test();
490 txn1.merge(txn0);
491 let apply_5 = txn1.commit_at(&mut txns, 5).await.unwrap();
492 log.record_txn(5, &txn1);
493 let _ = cache.update_gt(&5).await;
494 assert_eq!(cache.min_unapplied_ts(), &5);
495 let tidy_5 = apply_5.apply(&mut txns).await;
496
497 let tidy_6 = txns.expect_commit_at(6, d0, &["6"], &log).await;
499 txns.tidy_at(7, tidy_6).await.unwrap();
500 let _ = cache.update_gt(&7).await;
501 assert_eq!(cache.min_unapplied_ts(), &8);
502
503 txns.tidy_at(8, tidy_5).await.unwrap();
506 let _ = cache.update_gt(&8).await;
507 assert_eq!(cache.min_unapplied_ts(), &9);
508
509 let tidy_9 = txns.expect_commit_at(9, d0, &["9"], &log).await;
511 let tidy_10 = txns.expect_commit_at(10, d0, &["10"], &log).await;
512 let mut txn = txns.begin();
513 txn.tidy(tidy_9);
514 let mut tidy_9 = txn.take_tidy();
515 tidy_9.merge(tidy_10);
516 txns.tidy_at(11, tidy_9).await.unwrap();
517 let _ = cache.update_gt(&11).await;
518 assert_eq!(cache.min_unapplied_ts(), &12);
519
520 let tidy_12 = txns.expect_commit_at(12, d0, &["12"], &log).await;
522 assert_eq!(txns.tidy_at(12, tidy_12).await, Err(13));
523
524 let () = log.assert_snapshot(d0, 12).await;
525 }
526
527 #[mz_ore::test(tokio::test(flavor = "multi_thread"))]
528 #[cfg_attr(miri, ignore)] async fn conflicting_writes() {
530 fn jitter() -> u64 {
531 let time = SystemTime::UNIX_EPOCH.elapsed().unwrap();
533 u64::from(time.subsec_micros() % 20)
534 }
535
536 let client = PersistClient::new_for_tests().await;
537 let mut txns = TxnsHandle::expect_open(client.clone()).await;
538 let log = txns.new_log();
539 let mut cache = TxnsCache::expect_open(0, &txns).await;
540 let d0 = txns.expect_register(1).await;
541
542 const NUM_WRITES: usize = 25;
543 let tasks = FuturesUnordered::new();
544 for idx in 0..NUM_WRITES {
545 let mut txn = txns.begin_test();
546 txn.write(&d0, format!("{:05}", idx), (), 1).await;
547 let (txns_id, client, log) = (txns.txns_id(), client.clone(), log.clone());
548
549 let task = async move {
550 let mut txns = TxnsHandle::expect_open_id(client.clone(), txns_id).await;
551 let mut register_ts = 1;
552 loop {
553 let data_write = writer(&client, d0).await;
554 match txns.register(register_ts, [data_write]).await {
555 Ok(_) => {
556 debug!("{} registered at {}", idx, register_ts);
557 break;
558 }
559 Err(ts) => {
560 register_ts = ts;
561 continue;
562 }
563 }
564 }
565
566 let jitter_ms = jitter();
569 let mut commit_ts = register_ts + 1 + jitter_ms;
570 let apply = loop {
571 let () = tokio::time::sleep(Duration::from_millis(jitter_ms)).await;
572 match txn.commit_at(&mut txns, commit_ts).await {
573 Ok(apply) => break apply,
574 Err(new_commit_ts) => commit_ts = new_commit_ts,
575 }
576 };
577 debug!("{} committed at {}", idx, commit_ts);
578 log.record_txn(commit_ts, &txn);
579
580 let () = tokio::time::sleep(Duration::from_millis(jitter_ms)).await;
582 let tidy = apply.apply(&mut txns).await;
583
584 let jitter_ms = jitter();
586 let mut txn = txns.begin();
587 txn.tidy(tidy);
588 let mut tidy_ts = commit_ts + jitter_ms;
589 loop {
590 let () = tokio::time::sleep(Duration::from_millis(jitter_ms)).await;
591 match txn.commit_at(&mut txns, tidy_ts).await {
592 Ok(apply) => {
593 debug!("{} tidied at {}", idx, tidy_ts);
594 assert!(apply.is_empty());
595 return commit_ts;
596 }
597 Err(new_tidy_ts) => tidy_ts = new_tidy_ts,
598 }
599 }
600 };
601 tasks.push(task)
602 }
603
604 let max_commit_ts = tasks
605 .collect::<Vec<_>>()
606 .await
607 .into_iter()
608 .max()
609 .unwrap_or_default();
610
611 let expected = (0..NUM_WRITES)
614 .map(|x| format!("{:05}", x))
615 .collect::<Vec<_>>();
616 let actual = cache.expect_snapshot(&client, d0, max_commit_ts).await;
617 assert_eq!(actual, expected);
618 log.assert_snapshot(d0, max_commit_ts).await;
619 }
620
621 #[mz_ore::test(tokio::test)]
622 #[cfg_attr(miri, ignore)] async fn tidy_race() {
624 let client = PersistClient::new_for_tests().await;
625 let mut txns0 = TxnsHandle::expect_open(client.clone()).await;
626 let log = txns0.new_log();
627 let d0 = txns0.expect_register(1).await;
628
629 let tidy0 = txns0.expect_commit_at(2, d0, &["foo"], &log).await;
631
632 let mut txns1 = TxnsHandle::expect_open_id(client.clone(), txns0.txns_id()).await;
634 let d1 = txns1.expect_register(3).await;
635 let tidy1 = txns1.expect_commit_at(4, d1, &["foo"], &log).await;
636 let () = txns1.tidy_at(5, tidy1).await.unwrap();
637
638 let () = txns0.tidy_at(6, tidy0).await.unwrap();
641
642 let mut cache = TxnsCache::expect_open(0, &txns0).await;
645 let _ = cache.update_gt(&6).await;
646 assert_eq!(cache.validate(), Ok(()));
647
648 log.assert_snapshot(d0, 6).await;
649 log.assert_snapshot(d1, 6).await;
650 }
651
652 #[mz_ore::test(tokio::test)]
655 #[cfg_attr(miri, ignore)] async fn commit_unregistered_table() {
657 let client = PersistClient::new_for_tests().await;
658 let mut txns = TxnsHandle::expect_open(client.clone()).await;
659
660 let commit = mz_ore::task::spawn(|| "", {
662 let (txns_id, client) = (txns.txns_id(), client.clone());
663 async move {
664 let mut txns = TxnsHandle::expect_open_id(client, txns_id).await;
665 let mut txn = txns.begin();
666 txn.write(&ShardId::new(), "foo".into(), (), 1).await;
667 txn.commit_at(&mut txns, 1).await
668 }
669 });
670 assert_err!(commit.await);
671
672 let d0 = txns.expect_register(2).await;
673 txns.forget(3, [d0]).await.unwrap();
674
675 let commit = mz_ore::task::spawn(|| "", {
677 let (txns_id, client) = (txns.txns_id(), client.clone());
678 async move {
679 let mut txns = TxnsHandle::expect_open_id(client, txns_id).await;
680 let mut txn = txns.begin();
681 txn.write(&d0, "foo".into(), (), 1).await;
682 txn.commit_at(&mut txns, 4).await
683 }
684 });
685 assert_err!(commit.await);
686 }
687
688 #[mz_ore::test(tokio::test)]
689 #[cfg_attr(miri, ignore)] async fn commit_retry() {
691 let client = PersistClient::new_for_tests().await;
692 let mut txns = TxnsHandle::expect_open(client.clone()).await;
693 let mut cache = TxnsCache::expect_open(0, &txns).await;
694 let d0 = txns.expect_register(1).await;
695 let d1 = txns.expect_register(2).await;
696
697 let mut txn = txns.begin();
699 txn.write(&d0, "0".into(), (), 1).await;
700 let mut other = txns.begin();
701 other.write(&d1, "42".into(), (), 1).await;
702 other.commit_at(&mut txns, 3).await.unwrap();
703 let upper = txn.commit_at(&mut txns, 3).await.unwrap_err();
704 assert_eq!(upper, 4);
705
706 txn.write(&d0, "1".into(), (), 1).await;
708 txn.commit_at(&mut txns, 4).await.unwrap();
709 txns.apply_le(&4).await;
710
711 let expected_d0 = vec!["0".to_owned(), "1".to_owned()];
712 let actual_d0 = cache.expect_snapshot(&client, d0, 4).await;
713 assert_eq!(actual_d0, expected_d0);
714 }
715}