Skip to main content

mz_txn_wal/
txn_write.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Interfaces for writing txn shards as well as data shards.
11
12use std::collections::BTreeMap;
13use std::fmt::Debug;
14use std::sync::Arc;
15
16use differential_dataflow::Hashable;
17use differential_dataflow::difference::Monoid;
18use differential_dataflow::lattice::Lattice;
19use futures::StreamExt;
20use futures::stream::FuturesUnordered;
21use mz_ore::cast::CastFrom;
22use mz_ore::instrument;
23use mz_persist_client::ShardId;
24use mz_persist_client::batch::{Batch, ProtoBatch};
25use mz_persist_types::txn::{TxnsCodec, TxnsEntry};
26use mz_persist_types::{Codec, Codec64, StepForward};
27use prost::Message;
28use timely::order::TotalOrder;
29use timely::progress::{Antichain, Timestamp};
30use tracing::debug;
31
32use crate::proto::ProtoIdBatch;
33use crate::txns::{Tidy, TxnsHandle};
34
35/// Pending writes to a shard for an in-progress transaction.
36#[derive(Debug)]
37pub(crate) struct TxnWrite<K, V, T, D> {
38    pub(crate) batches: Vec<Batch<K, V, T, D>>,
39    pub(crate) staged: Vec<ProtoBatch>,
40    pub(crate) writes: Vec<(K, V, D)>,
41}
42
43impl<K, V, T, D> TxnWrite<K, V, T, D> {
44    /// Merges the staged writes in `other` into this.
45    pub fn merge(&mut self, other: Self) {
46        self.batches.extend(other.batches);
47        self.staged.extend(other.staged);
48        self.writes.extend(other.writes);
49    }
50}
51
52impl<K, V, T, D> Default for TxnWrite<K, V, T, D> {
53    fn default() -> Self {
54        Self {
55            batches: Vec::default(),
56            staged: Vec::default(),
57            writes: Vec::default(),
58        }
59    }
60}
61
62/// An in-progress transaction.
63#[derive(Debug)]
64pub struct Txn<K, V, T, D> {
65    pub(crate) writes: BTreeMap<ShardId, TxnWrite<K, V, T, D>>,
66    tidy: Tidy,
67}
68
69impl<K, V, T, D> Txn<K, V, T, D>
70where
71    K: Debug + Codec,
72    V: Debug + Codec,
73    T: Timestamp + Lattice + TotalOrder + StepForward + Codec64 + Sync,
74    D: Debug + Monoid + Ord + Codec64 + Send + Sync,
75{
76    pub(crate) fn new() -> Self {
77        Txn {
78            writes: BTreeMap::default(),
79            tidy: Tidy::default(),
80        }
81    }
82
83    /// Stage a write to the in-progress txn.
84    ///
85    /// The timestamp will be assigned at commit time.
86    ///
87    /// TODO: Allow this to spill to s3 (for bounded memory) once persist can
88    /// make the ts rewrite op efficient.
89    #[allow(clippy::unused_async)]
90    pub async fn write(&mut self, data_id: &ShardId, key: K, val: V, diff: D) {
91        self.writes
92            .entry(*data_id)
93            .or_default()
94            .writes
95            .push((key, val, diff))
96    }
97
98    /// Stage a [`Batch`] to the in-progress txn.
99    ///
100    /// The timestamp will be assigned at commit time.
101    pub fn write_batch(&mut self, data_id: &ShardId, batch: ProtoBatch) {
102        self.writes.entry(*data_id).or_default().staged.push(batch)
103    }
104
105    /// Commit this transaction at `commit_ts`.
106    ///
107    /// This either atomically commits all staged writes or, if that's no longer
108    /// possible at the requested timestamp, returns an error with the least
109    /// commit-able timestamp.
110    ///
111    /// On success a token is returned representing apply work expected to be
112    /// promptly performed by the caller. At this point, the txn is durable and
113    /// it's safe to bubble up success, but reads at the commit timestamp will
114    /// block until this apply work finishes. In the event of a crash, neither
115    /// correctness nor liveness require this followup be done.
116    ///
117    /// Panics if any involved data shards were not registered before commit ts.
118    #[instrument(level = "debug", fields(ts = ?commit_ts))]
119    pub async fn commit_at<C>(
120        &mut self,
121        handle: &mut TxnsHandle<K, V, T, D, C>,
122        commit_ts: T,
123    ) -> Result<TxnApply<T>, T>
124    where
125        C: TxnsCodec,
126    {
127        let op = &Arc::clone(&handle.metrics).commit;
128        op.run(async {
129            let mut txns_upper = handle
130                .txns_write
131                .shared_upper()
132                .into_option()
133                .expect("txns shard should not be closed");
134
135            loop {
136                txns_upper = handle.txns_cache.update_ge(&txns_upper).await.clone();
137
138                // txns_upper is the (inclusive) minimum timestamp at which we
139                // could possibly write. If our requested commit timestamp is before
140                // that, then it's no longer possible to write and the caller needs
141                // to decide what to do.
142                if commit_ts < txns_upper {
143                    debug!(
144                        "commit_at {:?} mismatch current={:?}",
145                        commit_ts, txns_upper
146                    );
147                    return Err(txns_upper);
148                }
149                // Validate that the involved data shards are all registered.
150                for (data_id, _) in self.writes.iter() {
151                    assert!(
152                        handle
153                            .txns_cache
154                            .registered_at_progress(data_id, &txns_upper),
155                        "{} should be registered to commit at {:?}",
156                        data_id,
157                        txns_upper,
158                    );
159                }
160                debug!(
161                    "commit_at {:?}: [{:?}, {:?}) begin",
162                    commit_ts,
163                    txns_upper,
164                    commit_ts.step_forward(),
165                );
166
167                let txn_batches_updates = FuturesUnordered::new();
168                while let Some((data_id, updates)) = self.writes.pop_first() {
169                    let data_write = handle.datas.take_write_for_commit(&data_id).unwrap_or_else(
170                        || {
171                            panic!(
172                                "data shard {} must be registered with this Txn handle to commit",
173                                data_id
174                            )
175                        },
176                    );
177                    let commit_ts = commit_ts.clone();
178                    txn_batches_updates.push(async move {
179                        let mut batches =
180                            Vec::with_capacity(updates.staged.len() + updates.batches.len() + 1);
181
182                        // Form batches for any Row data.
183                        if !updates.writes.is_empty() {
184                            let mut batch = data_write.builder(Antichain::from_elem(T::minimum()));
185                            for (k, v, d) in updates.writes.iter() {
186                                batch.add(k, v, &commit_ts, d).await.expect("valid usage");
187                            }
188                            let batch = batch
189                                .finish(Antichain::from_elem(commit_ts.step_forward()))
190                                .await
191                                .expect("valid usage");
192                            let batch = batch.into_transmittable_batch();
193                            batches.push(batch);
194                        }
195
196                        // Append any already staged Batches.
197                        batches.extend(updates.staged.into_iter());
198                        batches.extend(
199                            updates
200                                .batches
201                                .into_iter()
202                                .map(|b| b.into_transmittable_batch()),
203                        );
204
205                        // Rewrite the timestamp for them all.
206                        let batches: Vec<_> = batches
207                            .into_iter()
208                            .map(|batch| {
209                                let mut batch = data_write.batch_from_transmittable_batch(batch);
210                                batch
211                                    .rewrite_ts(
212                                        &Antichain::from_elem(commit_ts.clone()),
213                                        Antichain::from_elem(commit_ts.step_forward()),
214                                    )
215                                    .expect("invalid usage");
216                                batch.into_transmittable_batch()
217                            })
218                            .collect();
219
220                        let batch_updates = batches
221                            .into_iter()
222                            .map(|batch| {
223                                // The code to handle retracting applied batches assumes
224                                // that the encoded representation of each is unique (it
225                                // works by retracting and cancelling out the raw
226                                // bytes). It's possible to make that code handle any
227                                // diff value but the complexity isn't worth it.
228                                //
229                                // So ensure that every committed batch has a unique
230                                // serialization. Technically, I'm pretty sure that
231                                // they're naturally unique but the justification is
232                                // long, subtle, and brittle. Instead, just slap a
233                                // random uuid on it.
234                                let batch_raw = ProtoIdBatch::new(batch.clone()).encode_to_vec();
235                                debug!(
236                                    "wrote {:.9} batch {}",
237                                    data_id.to_string(),
238                                    batch_raw.hashed(),
239                                );
240                                let update = C::encode(TxnsEntry::Append(
241                                    data_id,
242                                    T::encode(&commit_ts),
243                                    batch_raw,
244                                ));
245                                (batch, update)
246                            })
247                            .collect::<Vec<_>>();
248                        (data_write, batch_updates)
249                    })
250                }
251                let txn_batches_updates = txn_batches_updates.collect::<Vec<_>>().await;
252                let mut txns_updates = txn_batches_updates
253                    .iter()
254                    .flat_map(|(_, batch_updates)| batch_updates.iter().map(|(_, updates)| updates))
255                    .map(|(key, val)| ((key, val), &commit_ts, 1))
256                    .collect::<Vec<_>>();
257                let apply_is_empty = txns_updates.is_empty();
258
259                // Tidy guarantees that anything in retractions has been applied,
260                // but races mean someone else may have written the retraction. If
261                // the following CaA goes through, then the `update_ge(txns_upper)`
262                // above means that anything the cache thinks is still unapplied
263                // but we know is applied indeed still needs to be retracted.
264                let filtered_retractions = handle
265                    .read_cache()
266                    .filter_retractions(&txns_upper, self.tidy.retractions.iter())
267                    .map(|(batch_raw, (ts, data_id))| {
268                        C::encode(TxnsEntry::Append(*data_id, *ts, batch_raw.clone()))
269                    })
270                    .collect::<Vec<_>>();
271                txns_updates.extend(
272                    filtered_retractions
273                        .iter()
274                        .map(|(key, val)| ((key, val), &commit_ts, -1)),
275                );
276
277                let res = crate::small_caa(
278                    || "txns commit",
279                    &mut handle.txns_write,
280                    &txns_updates,
281                    txns_upper.clone(),
282                    commit_ts.step_forward(),
283                )
284                .await;
285                match res {
286                    Ok(()) => {
287                        debug!(
288                            "commit_at {:?}: [{:?}, {:?}) success",
289                            commit_ts,
290                            txns_upper,
291                            commit_ts.step_forward(),
292                        );
293                        // The batch we wrote at commit_ts did commit. Mark it as
294                        // such to avoid a WARN in the logs.
295                        for (data_write, batch_updates) in txn_batches_updates {
296                            for (batch, _) in batch_updates {
297                                let batch = data_write
298                                    .batch_from_transmittable_batch(batch)
299                                    .into_hollow_batch();
300                                handle.metrics.batches.commit_count.inc();
301                                handle
302                                    .metrics
303                                    .batches
304                                    .commit_bytes
305                                    .inc_by(u64::cast_from(batch.encoded_size_bytes()));
306                            }
307                            handle.datas.put_write_for_commit(data_write);
308                        }
309                        return Ok(TxnApply {
310                            is_empty: apply_is_empty,
311                            commit_ts,
312                        });
313                    }
314                    Err(new_txns_upper) => {
315                        handle.metrics.commit.retry_count.inc();
316                        assert!(txns_upper < new_txns_upper);
317                        txns_upper = new_txns_upper;
318                        for (data_write, batch_updates) in txn_batches_updates {
319                            let batches = batch_updates
320                                .into_iter()
321                                .map(|(batch, _)| {
322                                    data_write.batch_from_transmittable_batch(batch.clone())
323                                })
324                                .collect();
325                            let txn_write = TxnWrite {
326                                writes: Vec::new(),
327                                staged: Vec::new(),
328                                batches,
329                            };
330                            self.writes.insert(data_write.shard_id(), txn_write);
331                            handle.datas.put_write_for_commit(data_write);
332                        }
333                        let _ = handle.txns_cache.update_ge(&txns_upper).await;
334                        continue;
335                    }
336                }
337            }
338        })
339        .await
340    }
341
342    /// Merges the staged writes in the other txn into this one.
343    pub fn merge(&mut self, other: Self) {
344        for (data_id, writes) in other.writes {
345            self.writes.entry(data_id).or_default().merge(writes);
346        }
347        self.tidy.merge(other.tidy);
348    }
349
350    /// Merges the work represented by given tidy into this txn.
351    ///
352    /// If this txn commits, the tidy work will be written at the commit ts.
353    pub fn tidy(&mut self, tidy: Tidy) {
354        self.tidy.merge(tidy);
355    }
356
357    /// Extracts any tidy work that has been merged into this txn with
358    /// [Self::tidy].
359    pub fn take_tidy(&mut self) -> Tidy {
360        std::mem::take(&mut self.tidy)
361    }
362}
363
364/// A token representing the asynchronous "apply" work expected to be promptly
365/// performed by a txn committer.
366#[derive(Debug)]
367#[cfg_attr(any(test, debug_assertions), derive(PartialEq))]
368pub struct TxnApply<T> {
369    is_empty: bool,
370    pub(crate) commit_ts: T,
371}
372
373impl<T> TxnApply<T> {
374    /// Applies the txn, unblocking reads at timestamp it was committed at.
375    pub async fn apply<K, V, D, C>(self, handle: &mut TxnsHandle<K, V, T, D, C>) -> Tidy
376    where
377        K: Debug + Codec,
378        V: Debug + Codec,
379        T: Timestamp + Lattice + TotalOrder + StepForward + Codec64 + Sync,
380        D: Debug + Monoid + Ord + Codec64 + Send + Sync,
381        C: TxnsCodec,
382    {
383        debug!("txn apply {:?}", self.commit_ts);
384        handle.apply_le(&self.commit_ts).await
385    }
386
387    /// Returns whether the apply represents a txn with any non-tidy writes.
388    ///
389    /// If this returns true, the apply is essentially a no-op and safe to
390    /// discard.
391    pub fn is_empty(&self) -> bool {
392        self.is_empty
393    }
394}
395
396#[cfg(test)]
397mod tests {
398    use std::time::{Duration, SystemTime};
399
400    use futures::StreamExt;
401    use futures::stream::FuturesUnordered;
402    use mz_ore::assert_err;
403    use mz_persist_client::PersistClient;
404
405    use crate::tests::writer;
406    use crate::txn_cache::TxnsCache;
407
408    use super::*;
409
410    #[mz_ore::test(tokio::test)]
411    #[cfg_attr(miri, ignore)] // too slow
412    async fn commit_at() {
413        let client = PersistClient::new_for_tests().await;
414        let mut txns = TxnsHandle::expect_open(client.clone()).await;
415        let mut cache = TxnsCache::expect_open(0, &txns).await;
416        let d0 = txns.expect_register(1).await;
417        let d1 = txns.expect_register(2).await;
418
419        // Can merge two txns. Can have multiple data shards in a txn.
420        let mut txn = txns.begin();
421        txn.write(&d0, "0".into(), (), 1).await;
422        let mut other = txns.begin();
423        other.write(&d0, "1".into(), (), 1).await;
424        other.write(&d1, "A".into(), (), 1).await;
425        txn.merge(other);
426        txn.commit_at(&mut txns, 3).await.unwrap();
427
428        // Can commit an empty txn. Can "skip" timestamps.
429        txns.begin().commit_at(&mut txns, 5).await.unwrap();
430
431        // Txn cannot be committed at a closed out time. The Err includes the
432        // earliest committable time. Failed txn can commit on retry.
433        let mut txn = txns.begin();
434        txn.write(&d0, "2".into(), (), 1).await;
435        assert_eq!(txn.commit_at(&mut txns, 4).await, Err(6));
436        txn.commit_at(&mut txns, 6).await.unwrap();
437        txns.apply_le(&6).await;
438
439        let expected_d0 = vec!["0".to_owned(), "1".to_owned(), "2".to_owned()];
440        let actual_d0 = cache.expect_snapshot(&client, d0, 6).await;
441        assert_eq!(actual_d0, expected_d0);
442
443        let expected_d1 = vec!["A".to_owned()];
444        let actual_d1 = cache.expect_snapshot(&client, d1, 6).await;
445        assert_eq!(actual_d1, expected_d1);
446    }
447
448    #[mz_ore::test(tokio::test)]
449    #[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented
450    async fn apply_and_tidy() {
451        let mut txns = TxnsHandle::expect_open(PersistClient::new_for_tests().await).await;
452        let log = txns.new_log();
453        let mut cache = TxnsCache::expect_open(0, &txns).await;
454        let d0 = txns.expect_register(1).await;
455
456        // Non-empty txn means non-empty apply. Min unapplied ts is the commit
457        // ts.
458        let mut txn = txns.begin_test();
459        txn.write(&d0, "2".into(), (), 1).await;
460        let apply_2 = txn.commit_at(&mut txns, 2).await.unwrap();
461        log.record_txn(2, &txn);
462        assert_eq!(apply_2.is_empty(), false);
463        let _ = cache.update_gt(&2).await;
464        cache.mark_register_applied(&2);
465        assert_eq!(cache.min_unapplied_ts(), &2);
466        assert_eq!(cache.unapplied().count(), 1);
467
468        // Running the apply unblocks reads but does not advance the min
469        // unapplied ts.
470        let tidy_2 = apply_2.apply(&mut txns).await;
471        assert_eq!(cache.min_unapplied_ts(), &2);
472
473        // Running the tidy advances the min unapplied ts.
474        txns.tidy_at(3, tidy_2).await.unwrap();
475        let _ = cache.update_gt(&3).await;
476        assert_eq!(cache.min_unapplied_ts(), &4);
477        assert_eq!(cache.unapplied().count(), 0);
478
479        // We can also sneak the tidy into a normal txn. Tidies copy across txn
480        // merges.
481        let tidy_4 = txns.expect_commit_at(4, d0, &["4"], &log).await;
482        let _ = cache.update_gt(&4).await;
483        assert_eq!(cache.min_unapplied_ts(), &4);
484        let mut txn0 = txns.begin_test();
485        txn0.write(&d0, "5".into(), (), 1).await;
486        txn0.tidy(tidy_4);
487        let mut txn1 = txns.begin_test();
488        txn1.merge(txn0);
489        let apply_5 = txn1.commit_at(&mut txns, 5).await.unwrap();
490        log.record_txn(5, &txn1);
491        let _ = cache.update_gt(&5).await;
492        assert_eq!(cache.min_unapplied_ts(), &5);
493        let tidy_5 = apply_5.apply(&mut txns).await;
494
495        // It's fine to drop a tidy, someone else will do it eventually.
496        let tidy_6 = txns.expect_commit_at(6, d0, &["6"], &log).await;
497        txns.tidy_at(7, tidy_6).await.unwrap();
498        let _ = cache.update_gt(&7).await;
499        assert_eq!(cache.min_unapplied_ts(), &8);
500
501        // Also fine if we don't drop it, but instead do it late (no-op but
502        // consumes a ts).
503        txns.tidy_at(8, tidy_5).await.unwrap();
504        let _ = cache.update_gt(&8).await;
505        assert_eq!(cache.min_unapplied_ts(), &9);
506
507        // Tidies can be merged and also can be stolen back out of a txn.
508        let tidy_9 = txns.expect_commit_at(9, d0, &["9"], &log).await;
509        let tidy_10 = txns.expect_commit_at(10, d0, &["10"], &log).await;
510        let mut txn = txns.begin();
511        txn.tidy(tidy_9);
512        let mut tidy_9 = txn.take_tidy();
513        tidy_9.merge(tidy_10);
514        txns.tidy_at(11, tidy_9).await.unwrap();
515        let _ = cache.update_gt(&11).await;
516        assert_eq!(cache.min_unapplied_ts(), &12);
517
518        // Can't tidy at an already committed ts.
519        let tidy_12 = txns.expect_commit_at(12, d0, &["12"], &log).await;
520        assert_eq!(txns.tidy_at(12, tidy_12).await, Err(13));
521
522        let () = log.assert_snapshot(d0, 12).await;
523    }
524
525    #[mz_ore::test(tokio::test(flavor = "multi_thread"))]
526    #[cfg_attr(miri, ignore)] // too slow
527    async fn conflicting_writes() {
528        fn jitter() -> u64 {
529            // We could also use something like `rand`.
530            let time = SystemTime::UNIX_EPOCH.elapsed().unwrap();
531            u64::from(time.subsec_micros() % 20)
532        }
533
534        let client = PersistClient::new_for_tests().await;
535        let mut txns = TxnsHandle::expect_open(client.clone()).await;
536        let log = txns.new_log();
537        let mut cache = TxnsCache::expect_open(0, &txns).await;
538        let d0 = txns.expect_register(1).await;
539
540        const NUM_WRITES: usize = 25;
541        let tasks = FuturesUnordered::new();
542        for idx in 0..NUM_WRITES {
543            let mut txn = txns.begin_test();
544            txn.write(&d0, format!("{:05}", idx), (), 1).await;
545            let (txns_id, client, log) = (txns.txns_id(), client.clone(), log.clone());
546
547            let task = async move {
548                let mut txns = TxnsHandle::expect_open_id(client.clone(), txns_id).await;
549                let mut register_ts = 1;
550                loop {
551                    let data_write = writer(&client, d0).await;
552                    match txns.register(register_ts, [data_write]).await {
553                        Ok(_) => {
554                            debug!("{} registered at {}", idx, register_ts);
555                            break;
556                        }
557                        Err(ts) => {
558                            register_ts = ts;
559                            continue;
560                        }
561                    }
562                }
563
564                // Add some jitter to the commit timestamps (to create gaps) and
565                // to the execution (to create interleaving).
566                let jitter_ms = jitter();
567                let mut commit_ts = register_ts + 1 + jitter_ms;
568                let apply = loop {
569                    let () = tokio::time::sleep(Duration::from_millis(jitter_ms)).await;
570                    match txn.commit_at(&mut txns, commit_ts).await {
571                        Ok(apply) => break apply,
572                        Err(new_commit_ts) => commit_ts = new_commit_ts,
573                    }
574                };
575                debug!("{} committed at {}", idx, commit_ts);
576                log.record_txn(commit_ts, &txn);
577
578                // Ditto sleep before apply.
579                let () = tokio::time::sleep(Duration::from_millis(jitter_ms)).await;
580                let tidy = apply.apply(&mut txns).await;
581
582                // Ditto jitter the tidy timestamps and execution.
583                let jitter_ms = jitter();
584                let mut txn = txns.begin();
585                txn.tidy(tidy);
586                let mut tidy_ts = commit_ts + jitter_ms;
587                loop {
588                    let () = tokio::time::sleep(Duration::from_millis(jitter_ms)).await;
589                    match txn.commit_at(&mut txns, tidy_ts).await {
590                        Ok(apply) => {
591                            debug!("{} tidied at {}", idx, tidy_ts);
592                            assert!(apply.is_empty());
593                            return commit_ts;
594                        }
595                        Err(new_tidy_ts) => tidy_ts = new_tidy_ts,
596                    }
597                }
598            };
599            tasks.push(task)
600        }
601
602        let max_commit_ts = tasks
603            .collect::<Vec<_>>()
604            .await
605            .into_iter()
606            .max()
607            .unwrap_or_default();
608
609        // Also manually create expected as a failsafe in case we ever end up
610        // with a bug in CommitLog.
611        let expected = (0..NUM_WRITES)
612            .map(|x| format!("{:05}", x))
613            .collect::<Vec<_>>();
614        let actual = cache.expect_snapshot(&client, d0, max_commit_ts).await;
615        assert_eq!(actual, expected);
616        log.assert_snapshot(d0, max_commit_ts).await;
617    }
618
619    #[mz_ore::test(tokio::test)]
620    #[cfg_attr(miri, ignore)] // too slow
621    async fn tidy_race() {
622        let client = PersistClient::new_for_tests().await;
623        let mut txns0 = TxnsHandle::expect_open(client.clone()).await;
624        let log = txns0.new_log();
625        let d0 = txns0.expect_register(1).await;
626
627        // Commit something and apply it, but don't tidy yet.
628        let tidy0 = txns0.expect_commit_at(2, d0, &["foo"], &log).await;
629
630        // Now open an independent TxnsHandle, commit, apply, and tidy.
631        let mut txns1 = TxnsHandle::expect_open_id(client.clone(), txns0.txns_id()).await;
632        let d1 = txns1.expect_register(3).await;
633        let tidy1 = txns1.expect_commit_at(4, d1, &["foo"], &log).await;
634        let () = txns1.tidy_at(5, tidy1).await.unwrap();
635
636        // Now try the original tidy0. tidy1 has already done the retraction for
637        // it, so this needs to be careful not to double-retract.
638        let () = txns0.tidy_at(6, tidy0).await.unwrap();
639
640        // Replay a cache from the beginning and make sure we don't see a
641        // double retraction.
642        let mut cache = TxnsCache::expect_open(0, &txns0).await;
643        let _ = cache.update_gt(&6).await;
644        assert_eq!(cache.validate(), Ok(()));
645
646        log.assert_snapshot(d0, 6).await;
647        log.assert_snapshot(d1, 6).await;
648    }
649
650    // Regression test for a bug caught during code review, where it was
651    // possible to commit to an unregistered data shard.
652    #[mz_ore::test(tokio::test)]
653    #[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented
654    async fn commit_unregistered_table() {
655        let client = PersistClient::new_for_tests().await;
656        let mut txns = TxnsHandle::expect_open(client.clone()).await;
657
658        // This panics because the commit ts is before the register ts.
659        let commit = mz_ore::task::spawn(|| "", {
660            let (txns_id, client) = (txns.txns_id(), client.clone());
661            async move {
662                let mut txns = TxnsHandle::expect_open_id(client, txns_id).await;
663                let mut txn = txns.begin();
664                txn.write(&ShardId::new(), "foo".into(), (), 1).await;
665                txn.commit_at(&mut txns, 1).await
666            }
667        })
668        .into_tokio_handle();
669        assert_err!(commit.await);
670
671        let d0 = txns.expect_register(2).await;
672        txns.forget(3, [d0]).await.unwrap();
673
674        // This panics because the commit ts is after the forget ts.
675        let commit = mz_ore::task::spawn(|| "", {
676            let (txns_id, client) = (txns.txns_id(), client.clone());
677            async move {
678                let mut txns = TxnsHandle::expect_open_id(client, txns_id).await;
679                let mut txn = txns.begin();
680                txn.write(&d0, "foo".into(), (), 1).await;
681                txn.commit_at(&mut txns, 4).await
682            }
683        })
684        .into_tokio_handle();
685        assert_err!(commit.await);
686    }
687
688    #[mz_ore::test(tokio::test)]
689    #[cfg_attr(miri, ignore)] // too slow
690    async fn commit_retry() {
691        let client = PersistClient::new_for_tests().await;
692        let mut txns = TxnsHandle::expect_open(client.clone()).await;
693        let mut cache = TxnsCache::expect_open(0, &txns).await;
694        let d0 = txns.expect_register(1).await;
695        let d1 = txns.expect_register(2).await;
696
697        // `txn` commit is interrupted by `other` commit.
698        let mut txn = txns.begin();
699        txn.write(&d0, "0".into(), (), 1).await;
700        let mut other = txns.begin();
701        other.write(&d1, "42".into(), (), 1).await;
702        other.commit_at(&mut txns, 3).await.unwrap();
703        let upper = txn.commit_at(&mut txns, 3).await.unwrap_err();
704        assert_eq!(upper, 4);
705
706        // Add more writes to `txn` and try again.
707        txn.write(&d0, "1".into(), (), 1).await;
708        txn.commit_at(&mut txns, 4).await.unwrap();
709        txns.apply_le(&4).await;
710
711        let expected_d0 = vec!["0".to_owned(), "1".to_owned()];
712        let actual_d0 = cache.expect_snapshot(&client, d0, 4).await;
713        assert_eq!(actual_d0, expected_d0);
714    }
715}