1use std::collections::BTreeMap;
13use std::fmt::Debug;
14use std::sync::Arc;
15
16use differential_dataflow::Hashable;
17use differential_dataflow::difference::Monoid;
18use differential_dataflow::lattice::Lattice;
19use futures::StreamExt;
20use futures::stream::FuturesUnordered;
21use mz_ore::cast::CastFrom;
22use mz_ore::instrument;
23use mz_persist_client::ShardId;
24use mz_persist_client::batch::{Batch, ProtoBatch};
25use mz_persist_types::txn::{TxnsCodec, TxnsEntry};
26use mz_persist_types::{Codec, Codec64, StepForward};
27use prost::Message;
28use timely::order::TotalOrder;
29use timely::progress::{Antichain, Timestamp};
30use tracing::debug;
31
32use crate::proto::ProtoIdBatch;
33use crate::txns::{Tidy, TxnsHandle};
34
35#[derive(Debug)]
37pub(crate) struct TxnWrite<K, V, T, D> {
38 pub(crate) batches: Vec<Batch<K, V, T, D>>,
39 pub(crate) staged: Vec<ProtoBatch>,
40 pub(crate) writes: Vec<(K, V, D)>,
41}
42
43impl<K, V, T, D> TxnWrite<K, V, T, D> {
44 pub fn merge(&mut self, other: Self) {
46 self.batches.extend(other.batches);
47 self.staged.extend(other.staged);
48 self.writes.extend(other.writes);
49 }
50}
51
52impl<K, V, T, D> Default for TxnWrite<K, V, T, D> {
53 fn default() -> Self {
54 Self {
55 batches: Vec::default(),
56 staged: Vec::default(),
57 writes: Vec::default(),
58 }
59 }
60}
61
62#[derive(Debug)]
64pub struct Txn<K, V, T, D> {
65 pub(crate) writes: BTreeMap<ShardId, TxnWrite<K, V, T, D>>,
66 tidy: Tidy,
67}
68
69impl<K, V, T, D> Txn<K, V, T, D>
70where
71 K: Debug + Codec,
72 V: Debug + Codec,
73 T: Timestamp + Lattice + TotalOrder + StepForward + Codec64 + Sync,
74 D: Debug + Monoid + Ord + Codec64 + Send + Sync,
75{
76 pub(crate) fn new() -> Self {
77 Txn {
78 writes: BTreeMap::default(),
79 tidy: Tidy::default(),
80 }
81 }
82
83 #[allow(clippy::unused_async)]
90 pub async fn write(&mut self, data_id: &ShardId, key: K, val: V, diff: D) {
91 self.writes
92 .entry(*data_id)
93 .or_default()
94 .writes
95 .push((key, val, diff))
96 }
97
98 pub fn write_batch(&mut self, data_id: &ShardId, batch: ProtoBatch) {
102 self.writes.entry(*data_id).or_default().staged.push(batch)
103 }
104
105 #[instrument(level = "debug", fields(ts = ?commit_ts))]
119 pub async fn commit_at<C>(
120 &mut self,
121 handle: &mut TxnsHandle<K, V, T, D, C>,
122 commit_ts: T,
123 ) -> Result<TxnApply<T>, T>
124 where
125 C: TxnsCodec,
126 {
127 let op = &Arc::clone(&handle.metrics).commit;
128 op.run(async {
129 let mut txns_upper = handle
130 .txns_write
131 .shared_upper()
132 .into_option()
133 .expect("txns shard should not be closed");
134
135 loop {
136 txns_upper = handle.txns_cache.update_ge(&txns_upper).await.clone();
137
138 if commit_ts < txns_upper {
143 debug!(
144 "commit_at {:?} mismatch current={:?}",
145 commit_ts, txns_upper
146 );
147 return Err(txns_upper);
148 }
149 for (data_id, _) in self.writes.iter() {
151 assert!(
152 handle
153 .txns_cache
154 .registered_at_progress(data_id, &txns_upper),
155 "{} should be registered to commit at {:?}",
156 data_id,
157 txns_upper,
158 );
159 }
160 debug!(
161 "commit_at {:?}: [{:?}, {:?}) begin",
162 commit_ts,
163 txns_upper,
164 commit_ts.step_forward(),
165 );
166
167 let txn_batches_updates = FuturesUnordered::new();
168 while let Some((data_id, updates)) = self.writes.pop_first() {
169 let data_write = handle.datas.take_write_for_commit(&data_id).unwrap_or_else(
170 || {
171 panic!(
172 "data shard {} must be registered with this Txn handle to commit",
173 data_id
174 )
175 },
176 );
177 let commit_ts = commit_ts.clone();
178 txn_batches_updates.push(async move {
179 let mut batches =
180 Vec::with_capacity(updates.staged.len() + updates.batches.len() + 1);
181
182 if !updates.writes.is_empty() {
184 let mut batch = data_write.builder(Antichain::from_elem(T::minimum()));
185 for (k, v, d) in updates.writes.iter() {
186 batch.add(k, v, &commit_ts, d).await.expect("valid usage");
187 }
188 let batch = batch
189 .finish(Antichain::from_elem(commit_ts.step_forward()))
190 .await
191 .expect("valid usage");
192 let batch = batch.into_transmittable_batch();
193 batches.push(batch);
194 }
195
196 batches.extend(updates.staged.into_iter());
198 batches.extend(
199 updates
200 .batches
201 .into_iter()
202 .map(|b| b.into_transmittable_batch()),
203 );
204
205 let batches: Vec<_> = batches
207 .into_iter()
208 .map(|batch| {
209 let mut batch = data_write.batch_from_transmittable_batch(batch);
210 batch
211 .rewrite_ts(
212 &Antichain::from_elem(commit_ts.clone()),
213 Antichain::from_elem(commit_ts.step_forward()),
214 )
215 .expect("invalid usage");
216 batch.into_transmittable_batch()
217 })
218 .collect();
219
220 let batch_updates = batches
221 .into_iter()
222 .map(|batch| {
223 let batch_raw = ProtoIdBatch::new(batch.clone()).encode_to_vec();
235 debug!(
236 "wrote {:.9} batch {}",
237 data_id.to_string(),
238 batch_raw.hashed(),
239 );
240 let update = C::encode(TxnsEntry::Append(
241 data_id,
242 T::encode(&commit_ts),
243 batch_raw,
244 ));
245 (batch, update)
246 })
247 .collect::<Vec<_>>();
248 (data_write, batch_updates)
249 })
250 }
251 let txn_batches_updates = txn_batches_updates.collect::<Vec<_>>().await;
252 let mut txns_updates = txn_batches_updates
253 .iter()
254 .flat_map(|(_, batch_updates)| batch_updates.iter().map(|(_, updates)| updates))
255 .map(|(key, val)| ((key, val), &commit_ts, 1))
256 .collect::<Vec<_>>();
257 let apply_is_empty = txns_updates.is_empty();
258
259 let filtered_retractions = handle
265 .read_cache()
266 .filter_retractions(&txns_upper, self.tidy.retractions.iter())
267 .map(|(batch_raw, (ts, data_id))| {
268 C::encode(TxnsEntry::Append(*data_id, *ts, batch_raw.clone()))
269 })
270 .collect::<Vec<_>>();
271 txns_updates.extend(
272 filtered_retractions
273 .iter()
274 .map(|(key, val)| ((key, val), &commit_ts, -1)),
275 );
276
277 let res = crate::small_caa(
278 || "txns commit",
279 &mut handle.txns_write,
280 &txns_updates,
281 txns_upper.clone(),
282 commit_ts.step_forward(),
283 )
284 .await;
285 match res {
286 Ok(()) => {
287 debug!(
288 "commit_at {:?}: [{:?}, {:?}) success",
289 commit_ts,
290 txns_upper,
291 commit_ts.step_forward(),
292 );
293 for (data_write, batch_updates) in txn_batches_updates {
296 for (batch, _) in batch_updates {
297 let batch = data_write
298 .batch_from_transmittable_batch(batch)
299 .into_hollow_batch();
300 handle.metrics.batches.commit_count.inc();
301 handle
302 .metrics
303 .batches
304 .commit_bytes
305 .inc_by(u64::cast_from(batch.encoded_size_bytes()));
306 }
307 handle.datas.put_write_for_commit(data_write);
308 }
309 return Ok(TxnApply {
310 is_empty: apply_is_empty,
311 commit_ts,
312 });
313 }
314 Err(new_txns_upper) => {
315 handle.metrics.commit.retry_count.inc();
316 assert!(txns_upper < new_txns_upper);
317 txns_upper = new_txns_upper;
318 for (data_write, batch_updates) in txn_batches_updates {
319 let batches = batch_updates
320 .into_iter()
321 .map(|(batch, _)| {
322 data_write.batch_from_transmittable_batch(batch.clone())
323 })
324 .collect();
325 let txn_write = TxnWrite {
326 writes: Vec::new(),
327 staged: Vec::new(),
328 batches,
329 };
330 self.writes.insert(data_write.shard_id(), txn_write);
331 handle.datas.put_write_for_commit(data_write);
332 }
333 let _ = handle.txns_cache.update_ge(&txns_upper).await;
334 continue;
335 }
336 }
337 }
338 })
339 .await
340 }
341
342 pub fn merge(&mut self, other: Self) {
344 for (data_id, writes) in other.writes {
345 self.writes.entry(data_id).or_default().merge(writes);
346 }
347 self.tidy.merge(other.tidy);
348 }
349
350 pub fn tidy(&mut self, tidy: Tidy) {
354 self.tidy.merge(tidy);
355 }
356
357 pub fn take_tidy(&mut self) -> Tidy {
360 std::mem::take(&mut self.tidy)
361 }
362}
363
364#[derive(Debug)]
367#[cfg_attr(any(test, debug_assertions), derive(PartialEq))]
368pub struct TxnApply<T> {
369 is_empty: bool,
370 pub(crate) commit_ts: T,
371}
372
373impl<T> TxnApply<T> {
374 pub async fn apply<K, V, D, C>(self, handle: &mut TxnsHandle<K, V, T, D, C>) -> Tidy
376 where
377 K: Debug + Codec,
378 V: Debug + Codec,
379 T: Timestamp + Lattice + TotalOrder + StepForward + Codec64 + Sync,
380 D: Debug + Monoid + Ord + Codec64 + Send + Sync,
381 C: TxnsCodec,
382 {
383 debug!("txn apply {:?}", self.commit_ts);
384 handle.apply_le(&self.commit_ts).await
385 }
386
387 pub fn is_empty(&self) -> bool {
392 self.is_empty
393 }
394}
395
396#[cfg(test)]
397mod tests {
398 use std::time::{Duration, SystemTime};
399
400 use futures::StreamExt;
401 use futures::stream::FuturesUnordered;
402 use mz_ore::assert_err;
403 use mz_persist_client::PersistClient;
404
405 use crate::tests::writer;
406 use crate::txn_cache::TxnsCache;
407
408 use super::*;
409
410 #[mz_ore::test(tokio::test)]
411 #[cfg_attr(miri, ignore)] async fn commit_at() {
413 let client = PersistClient::new_for_tests().await;
414 let mut txns = TxnsHandle::expect_open(client.clone()).await;
415 let mut cache = TxnsCache::expect_open(0, &txns).await;
416 let d0 = txns.expect_register(1).await;
417 let d1 = txns.expect_register(2).await;
418
419 let mut txn = txns.begin();
421 txn.write(&d0, "0".into(), (), 1).await;
422 let mut other = txns.begin();
423 other.write(&d0, "1".into(), (), 1).await;
424 other.write(&d1, "A".into(), (), 1).await;
425 txn.merge(other);
426 txn.commit_at(&mut txns, 3).await.unwrap();
427
428 txns.begin().commit_at(&mut txns, 5).await.unwrap();
430
431 let mut txn = txns.begin();
434 txn.write(&d0, "2".into(), (), 1).await;
435 assert_eq!(txn.commit_at(&mut txns, 4).await, Err(6));
436 txn.commit_at(&mut txns, 6).await.unwrap();
437 txns.apply_le(&6).await;
438
439 let expected_d0 = vec!["0".to_owned(), "1".to_owned(), "2".to_owned()];
440 let actual_d0 = cache.expect_snapshot(&client, d0, 6).await;
441 assert_eq!(actual_d0, expected_d0);
442
443 let expected_d1 = vec!["A".to_owned()];
444 let actual_d1 = cache.expect_snapshot(&client, d1, 6).await;
445 assert_eq!(actual_d1, expected_d1);
446 }
447
448 #[mz_ore::test(tokio::test)]
449 #[cfg_attr(miri, ignore)] async fn apply_and_tidy() {
451 let mut txns = TxnsHandle::expect_open(PersistClient::new_for_tests().await).await;
452 let log = txns.new_log();
453 let mut cache = TxnsCache::expect_open(0, &txns).await;
454 let d0 = txns.expect_register(1).await;
455
456 let mut txn = txns.begin_test();
459 txn.write(&d0, "2".into(), (), 1).await;
460 let apply_2 = txn.commit_at(&mut txns, 2).await.unwrap();
461 log.record_txn(2, &txn);
462 assert_eq!(apply_2.is_empty(), false);
463 let _ = cache.update_gt(&2).await;
464 cache.mark_register_applied(&2);
465 assert_eq!(cache.min_unapplied_ts(), &2);
466 assert_eq!(cache.unapplied().count(), 1);
467
468 let tidy_2 = apply_2.apply(&mut txns).await;
471 assert_eq!(cache.min_unapplied_ts(), &2);
472
473 txns.tidy_at(3, tidy_2).await.unwrap();
475 let _ = cache.update_gt(&3).await;
476 assert_eq!(cache.min_unapplied_ts(), &4);
477 assert_eq!(cache.unapplied().count(), 0);
478
479 let tidy_4 = txns.expect_commit_at(4, d0, &["4"], &log).await;
482 let _ = cache.update_gt(&4).await;
483 assert_eq!(cache.min_unapplied_ts(), &4);
484 let mut txn0 = txns.begin_test();
485 txn0.write(&d0, "5".into(), (), 1).await;
486 txn0.tidy(tidy_4);
487 let mut txn1 = txns.begin_test();
488 txn1.merge(txn0);
489 let apply_5 = txn1.commit_at(&mut txns, 5).await.unwrap();
490 log.record_txn(5, &txn1);
491 let _ = cache.update_gt(&5).await;
492 assert_eq!(cache.min_unapplied_ts(), &5);
493 let tidy_5 = apply_5.apply(&mut txns).await;
494
495 let tidy_6 = txns.expect_commit_at(6, d0, &["6"], &log).await;
497 txns.tidy_at(7, tidy_6).await.unwrap();
498 let _ = cache.update_gt(&7).await;
499 assert_eq!(cache.min_unapplied_ts(), &8);
500
501 txns.tidy_at(8, tidy_5).await.unwrap();
504 let _ = cache.update_gt(&8).await;
505 assert_eq!(cache.min_unapplied_ts(), &9);
506
507 let tidy_9 = txns.expect_commit_at(9, d0, &["9"], &log).await;
509 let tidy_10 = txns.expect_commit_at(10, d0, &["10"], &log).await;
510 let mut txn = txns.begin();
511 txn.tidy(tidy_9);
512 let mut tidy_9 = txn.take_tidy();
513 tidy_9.merge(tidy_10);
514 txns.tidy_at(11, tidy_9).await.unwrap();
515 let _ = cache.update_gt(&11).await;
516 assert_eq!(cache.min_unapplied_ts(), &12);
517
518 let tidy_12 = txns.expect_commit_at(12, d0, &["12"], &log).await;
520 assert_eq!(txns.tidy_at(12, tidy_12).await, Err(13));
521
522 let () = log.assert_snapshot(d0, 12).await;
523 }
524
525 #[mz_ore::test(tokio::test(flavor = "multi_thread"))]
526 #[cfg_attr(miri, ignore)] async fn conflicting_writes() {
528 fn jitter() -> u64 {
529 let time = SystemTime::UNIX_EPOCH.elapsed().unwrap();
531 u64::from(time.subsec_micros() % 20)
532 }
533
534 let client = PersistClient::new_for_tests().await;
535 let mut txns = TxnsHandle::expect_open(client.clone()).await;
536 let log = txns.new_log();
537 let mut cache = TxnsCache::expect_open(0, &txns).await;
538 let d0 = txns.expect_register(1).await;
539
540 const NUM_WRITES: usize = 25;
541 let tasks = FuturesUnordered::new();
542 for idx in 0..NUM_WRITES {
543 let mut txn = txns.begin_test();
544 txn.write(&d0, format!("{:05}", idx), (), 1).await;
545 let (txns_id, client, log) = (txns.txns_id(), client.clone(), log.clone());
546
547 let task = async move {
548 let mut txns = TxnsHandle::expect_open_id(client.clone(), txns_id).await;
549 let mut register_ts = 1;
550 loop {
551 let data_write = writer(&client, d0).await;
552 match txns.register(register_ts, [data_write]).await {
553 Ok(_) => {
554 debug!("{} registered at {}", idx, register_ts);
555 break;
556 }
557 Err(ts) => {
558 register_ts = ts;
559 continue;
560 }
561 }
562 }
563
564 let jitter_ms = jitter();
567 let mut commit_ts = register_ts + 1 + jitter_ms;
568 let apply = loop {
569 let () = tokio::time::sleep(Duration::from_millis(jitter_ms)).await;
570 match txn.commit_at(&mut txns, commit_ts).await {
571 Ok(apply) => break apply,
572 Err(new_commit_ts) => commit_ts = new_commit_ts,
573 }
574 };
575 debug!("{} committed at {}", idx, commit_ts);
576 log.record_txn(commit_ts, &txn);
577
578 let () = tokio::time::sleep(Duration::from_millis(jitter_ms)).await;
580 let tidy = apply.apply(&mut txns).await;
581
582 let jitter_ms = jitter();
584 let mut txn = txns.begin();
585 txn.tidy(tidy);
586 let mut tidy_ts = commit_ts + jitter_ms;
587 loop {
588 let () = tokio::time::sleep(Duration::from_millis(jitter_ms)).await;
589 match txn.commit_at(&mut txns, tidy_ts).await {
590 Ok(apply) => {
591 debug!("{} tidied at {}", idx, tidy_ts);
592 assert!(apply.is_empty());
593 return commit_ts;
594 }
595 Err(new_tidy_ts) => tidy_ts = new_tidy_ts,
596 }
597 }
598 };
599 tasks.push(task)
600 }
601
602 let max_commit_ts = tasks
603 .collect::<Vec<_>>()
604 .await
605 .into_iter()
606 .max()
607 .unwrap_or_default();
608
609 let expected = (0..NUM_WRITES)
612 .map(|x| format!("{:05}", x))
613 .collect::<Vec<_>>();
614 let actual = cache.expect_snapshot(&client, d0, max_commit_ts).await;
615 assert_eq!(actual, expected);
616 log.assert_snapshot(d0, max_commit_ts).await;
617 }
618
619 #[mz_ore::test(tokio::test)]
620 #[cfg_attr(miri, ignore)] async fn tidy_race() {
622 let client = PersistClient::new_for_tests().await;
623 let mut txns0 = TxnsHandle::expect_open(client.clone()).await;
624 let log = txns0.new_log();
625 let d0 = txns0.expect_register(1).await;
626
627 let tidy0 = txns0.expect_commit_at(2, d0, &["foo"], &log).await;
629
630 let mut txns1 = TxnsHandle::expect_open_id(client.clone(), txns0.txns_id()).await;
632 let d1 = txns1.expect_register(3).await;
633 let tidy1 = txns1.expect_commit_at(4, d1, &["foo"], &log).await;
634 let () = txns1.tidy_at(5, tidy1).await.unwrap();
635
636 let () = txns0.tidy_at(6, tidy0).await.unwrap();
639
640 let mut cache = TxnsCache::expect_open(0, &txns0).await;
643 let _ = cache.update_gt(&6).await;
644 assert_eq!(cache.validate(), Ok(()));
645
646 log.assert_snapshot(d0, 6).await;
647 log.assert_snapshot(d1, 6).await;
648 }
649
650 #[mz_ore::test(tokio::test)]
653 #[cfg_attr(miri, ignore)] async fn commit_unregistered_table() {
655 let client = PersistClient::new_for_tests().await;
656 let mut txns = TxnsHandle::expect_open(client.clone()).await;
657
658 let commit = mz_ore::task::spawn(|| "", {
660 let (txns_id, client) = (txns.txns_id(), client.clone());
661 async move {
662 let mut txns = TxnsHandle::expect_open_id(client, txns_id).await;
663 let mut txn = txns.begin();
664 txn.write(&ShardId::new(), "foo".into(), (), 1).await;
665 txn.commit_at(&mut txns, 1).await
666 }
667 })
668 .into_tokio_handle();
669 assert_err!(commit.await);
670
671 let d0 = txns.expect_register(2).await;
672 txns.forget(3, [d0]).await.unwrap();
673
674 let commit = mz_ore::task::spawn(|| "", {
676 let (txns_id, client) = (txns.txns_id(), client.clone());
677 async move {
678 let mut txns = TxnsHandle::expect_open_id(client, txns_id).await;
679 let mut txn = txns.begin();
680 txn.write(&d0, "foo".into(), (), 1).await;
681 txn.commit_at(&mut txns, 4).await
682 }
683 })
684 .into_tokio_handle();
685 assert_err!(commit.await);
686 }
687
688 #[mz_ore::test(tokio::test)]
689 #[cfg_attr(miri, ignore)] async fn commit_retry() {
691 let client = PersistClient::new_for_tests().await;
692 let mut txns = TxnsHandle::expect_open(client.clone()).await;
693 let mut cache = TxnsCache::expect_open(0, &txns).await;
694 let d0 = txns.expect_register(1).await;
695 let d1 = txns.expect_register(2).await;
696
697 let mut txn = txns.begin();
699 txn.write(&d0, "0".into(), (), 1).await;
700 let mut other = txns.begin();
701 other.write(&d1, "42".into(), (), 1).await;
702 other.commit_at(&mut txns, 3).await.unwrap();
703 let upper = txn.commit_at(&mut txns, 3).await.unwrap_err();
704 assert_eq!(upper, 4);
705
706 txn.write(&d0, "1".into(), (), 1).await;
708 txn.commit_at(&mut txns, 4).await.unwrap();
709 txns.apply_le(&4).await;
710
711 let expected_d0 = vec!["0".to_owned(), "1".to_owned()];
712 let actual_d0 = cache.expect_snapshot(&client, d0, 4).await;
713 assert_eq!(actual_d0, expected_d0);
714 }
715}