mz_txn_wal/
txn_write.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Interfaces for writing txn shards as well as data shards.
11
12use std::collections::BTreeMap;
13use std::fmt::Debug;
14use std::sync::Arc;
15
16use differential_dataflow::Hashable;
17use differential_dataflow::difference::Semigroup;
18use differential_dataflow::lattice::Lattice;
19use futures::StreamExt;
20use futures::stream::FuturesUnordered;
21use mz_ore::cast::CastFrom;
22use mz_ore::instrument;
23use mz_persist_client::ShardId;
24use mz_persist_client::batch::{Batch, ProtoBatch};
25use mz_persist_types::txn::{TxnsCodec, TxnsEntry};
26use mz_persist_types::{Codec, Codec64, Opaque, StepForward};
27use prost::Message;
28use timely::order::TotalOrder;
29use timely::progress::{Antichain, Timestamp};
30use tracing::debug;
31
32use crate::proto::ProtoIdBatch;
33use crate::txns::{Tidy, TxnsHandle};
34
35/// Pending writes to a shard for an in-progress transaction.
36#[derive(Debug)]
37pub(crate) struct TxnWrite<K, V, T, D> {
38    pub(crate) batches: Vec<Batch<K, V, T, D>>,
39    pub(crate) staged: Vec<ProtoBatch>,
40    pub(crate) writes: Vec<(K, V, D)>,
41}
42
43impl<K, V, T, D> TxnWrite<K, V, T, D> {
44    /// Merges the staged writes in `other` into this.
45    pub fn merge(&mut self, other: Self) {
46        self.batches.extend(other.batches);
47        self.staged.extend(other.staged);
48        self.writes.extend(other.writes);
49    }
50}
51
52impl<K, V, T, D> Default for TxnWrite<K, V, T, D> {
53    fn default() -> Self {
54        Self {
55            batches: Vec::default(),
56            staged: Vec::default(),
57            writes: Vec::default(),
58        }
59    }
60}
61
62/// An in-progress transaction.
63#[derive(Debug)]
64pub struct Txn<K, V, T, D> {
65    pub(crate) writes: BTreeMap<ShardId, TxnWrite<K, V, T, D>>,
66    tidy: Tidy,
67}
68
69impl<K, V, T, D> Txn<K, V, T, D>
70where
71    K: Debug + Codec,
72    V: Debug + Codec,
73    T: Timestamp + Lattice + TotalOrder + StepForward + Codec64 + Sync,
74    D: Debug + Semigroup + Ord + Codec64 + Send + Sync,
75{
76    pub(crate) fn new() -> Self {
77        Txn {
78            writes: BTreeMap::default(),
79            tidy: Tidy::default(),
80        }
81    }
82
83    /// Stage a write to the in-progress txn.
84    ///
85    /// The timestamp will be assigned at commit time.
86    ///
87    /// TODO: Allow this to spill to s3 (for bounded memory) once persist can
88    /// make the ts rewrite op efficient.
89    #[allow(clippy::unused_async)]
90    pub async fn write(&mut self, data_id: &ShardId, key: K, val: V, diff: D) {
91        self.writes
92            .entry(*data_id)
93            .or_default()
94            .writes
95            .push((key, val, diff))
96    }
97
98    /// Stage a [`Batch`] to the in-progress txn.
99    ///
100    /// The timestamp will be assigned at commit time.
101    pub fn write_batch(&mut self, data_id: &ShardId, batch: ProtoBatch) {
102        self.writes.entry(*data_id).or_default().staged.push(batch)
103    }
104
105    /// Commit this transaction at `commit_ts`.
106    ///
107    /// This either atomically commits all staged writes or, if that's no longer
108    /// possible at the requested timestamp, returns an error with the least
109    /// commit-able timestamp.
110    ///
111    /// On success a token is returned representing apply work expected to be
112    /// promptly performed by the caller. At this point, the txn is durable and
113    /// it's safe to bubble up success, but reads at the commit timestamp will
114    /// block until this apply work finishes. In the event of a crash, neither
115    /// correctness nor liveness require this followup be done.
116    ///
117    /// Panics if any involved data shards were not registered before commit ts.
118    #[instrument(level = "debug", fields(ts = ?commit_ts))]
119    pub async fn commit_at<O, C>(
120        &mut self,
121        handle: &mut TxnsHandle<K, V, T, D, O, C>,
122        commit_ts: T,
123    ) -> Result<TxnApply<T>, T>
124    where
125        O: Opaque + Debug + Codec64,
126        C: TxnsCodec,
127    {
128        let op = &Arc::clone(&handle.metrics).commit;
129        op.run(async {
130            let mut txns_upper = handle
131                .txns_write
132                .shared_upper()
133                .into_option()
134                .expect("txns shard should not be closed");
135
136            loop {
137                txns_upper = handle.txns_cache.update_ge(&txns_upper).await.clone();
138
139                // txns_upper is the (inclusive) minimum timestamp at which we
140                // could possibly write. If our requested commit timestamp is before
141                // that, then it's no longer possible to write and the caller needs
142                // to decide what to do.
143                if commit_ts < txns_upper {
144                    debug!(
145                        "commit_at {:?} mismatch current={:?}",
146                        commit_ts, txns_upper
147                    );
148                    return Err(txns_upper);
149                }
150                // Validate that the involved data shards are all registered.
151                for (data_id, _) in self.writes.iter() {
152                    assert!(
153                        handle
154                            .txns_cache
155                            .registered_at_progress(data_id, &txns_upper),
156                        "{} should be registered to commit at {:?}",
157                        data_id,
158                        txns_upper,
159                    );
160                }
161                debug!(
162                    "commit_at {:?}: [{:?}, {:?}) begin",
163                    commit_ts,
164                    txns_upper,
165                    commit_ts.step_forward(),
166                );
167
168                let txn_batches_updates = FuturesUnordered::new();
169                while let Some((data_id, updates)) = self.writes.pop_first() {
170                    let data_write = handle.datas.take_write_for_commit(&data_id).unwrap_or_else(
171                        || {
172                            panic!(
173                                "data shard {} must be registered with this Txn handle to commit",
174                                data_id
175                            )
176                        },
177                    );
178                    let commit_ts = commit_ts.clone();
179                    txn_batches_updates.push(async move {
180                        let mut batches =
181                            Vec::with_capacity(updates.staged.len() + updates.batches.len() + 1);
182
183                        // Form batches for any Row data.
184                        if !updates.writes.is_empty() {
185                            let mut batch = data_write.builder(Antichain::from_elem(T::minimum()));
186                            for (k, v, d) in updates.writes.iter() {
187                                batch.add(k, v, &commit_ts, d).await.expect("valid usage");
188                            }
189                            let batch = batch
190                                .finish(Antichain::from_elem(commit_ts.step_forward()))
191                                .await
192                                .expect("valid usage");
193                            let batch = batch.into_transmittable_batch();
194                            batches.push(batch);
195                        }
196
197                        // Append any already staged Batches.
198                        batches.extend(updates.staged.into_iter());
199                        batches.extend(
200                            updates
201                                .batches
202                                .into_iter()
203                                .map(|b| b.into_transmittable_batch()),
204                        );
205
206                        // Rewrite the timestamp for them all.
207                        let batches: Vec<_> = batches
208                            .into_iter()
209                            .map(|batch| {
210                                let mut batch = data_write.batch_from_transmittable_batch(batch);
211                                batch
212                                    .rewrite_ts(
213                                        &Antichain::from_elem(commit_ts.clone()),
214                                        Antichain::from_elem(commit_ts.step_forward()),
215                                    )
216                                    .expect("invalid usage");
217                                batch.into_transmittable_batch()
218                            })
219                            .collect();
220
221                        let batch_updates = batches
222                            .into_iter()
223                            .map(|batch| {
224                                // The code to handle retracting applied batches assumes
225                                // that the encoded representation of each is unique (it
226                                // works by retracting and cancelling out the raw
227                                // bytes). It's possible to make that code handle any
228                                // diff value but the complexity isn't worth it.
229                                //
230                                // So ensure that every committed batch has a unique
231                                // serialization. Technically, I'm pretty sure that
232                                // they're naturally unique but the justification is
233                                // long, subtle, and brittle. Instead, just slap a
234                                // random uuid on it.
235                                let batch_raw = ProtoIdBatch::new(batch.clone()).encode_to_vec();
236                                debug!(
237                                    "wrote {:.9} batch {}",
238                                    data_id.to_string(),
239                                    batch_raw.hashed(),
240                                );
241                                let update = C::encode(TxnsEntry::Append(
242                                    data_id,
243                                    T::encode(&commit_ts),
244                                    batch_raw,
245                                ));
246                                (batch, update)
247                            })
248                            .collect::<Vec<_>>();
249                        (data_write, batch_updates)
250                    })
251                }
252                let txn_batches_updates = txn_batches_updates.collect::<Vec<_>>().await;
253                let mut txns_updates = txn_batches_updates
254                    .iter()
255                    .flat_map(|(_, batch_updates)| batch_updates.iter().map(|(_, updates)| updates))
256                    .map(|(key, val)| ((key, val), &commit_ts, 1))
257                    .collect::<Vec<_>>();
258                let apply_is_empty = txns_updates.is_empty();
259
260                // Tidy guarantees that anything in retractions has been applied,
261                // but races mean someone else may have written the retraction. If
262                // the following CaA goes through, then the `update_ge(txns_upper)`
263                // above means that anything the cache thinks is still unapplied
264                // but we know is applied indeed still needs to be retracted.
265                let filtered_retractions = handle
266                    .read_cache()
267                    .filter_retractions(&txns_upper, self.tidy.retractions.iter())
268                    .map(|(batch_raw, (ts, data_id))| {
269                        C::encode(TxnsEntry::Append(*data_id, *ts, batch_raw.clone()))
270                    })
271                    .collect::<Vec<_>>();
272                txns_updates.extend(
273                    filtered_retractions
274                        .iter()
275                        .map(|(key, val)| ((key, val), &commit_ts, -1)),
276                );
277
278                let res = crate::small_caa(
279                    || "txns commit",
280                    &mut handle.txns_write,
281                    &txns_updates,
282                    txns_upper.clone(),
283                    commit_ts.step_forward(),
284                )
285                .await;
286                match res {
287                    Ok(()) => {
288                        debug!(
289                            "commit_at {:?}: [{:?}, {:?}) success",
290                            commit_ts,
291                            txns_upper,
292                            commit_ts.step_forward(),
293                        );
294                        // The batch we wrote at commit_ts did commit. Mark it as
295                        // such to avoid a WARN in the logs.
296                        for (data_write, batch_updates) in txn_batches_updates {
297                            for (batch, _) in batch_updates {
298                                let batch = data_write
299                                    .batch_from_transmittable_batch(batch)
300                                    .into_hollow_batch();
301                                handle.metrics.batches.commit_count.inc();
302                                handle
303                                    .metrics
304                                    .batches
305                                    .commit_bytes
306                                    .inc_by(u64::cast_from(batch.encoded_size_bytes()));
307                            }
308                            handle.datas.put_write_for_commit(data_write);
309                        }
310                        return Ok(TxnApply {
311                            is_empty: apply_is_empty,
312                            commit_ts,
313                        });
314                    }
315                    Err(new_txns_upper) => {
316                        handle.metrics.commit.retry_count.inc();
317                        assert!(txns_upper < new_txns_upper);
318                        txns_upper = new_txns_upper;
319                        for (data_write, batch_updates) in txn_batches_updates {
320                            let batches = batch_updates
321                                .into_iter()
322                                .map(|(batch, _)| {
323                                    data_write.batch_from_transmittable_batch(batch.clone())
324                                })
325                                .collect();
326                            let txn_write = TxnWrite {
327                                writes: Vec::new(),
328                                staged: Vec::new(),
329                                batches,
330                            };
331                            self.writes.insert(data_write.shard_id(), txn_write);
332                            handle.datas.put_write_for_commit(data_write);
333                        }
334                        let _ = handle.txns_cache.update_ge(&txns_upper).await;
335                        continue;
336                    }
337                }
338            }
339        })
340        .await
341    }
342
343    /// Merges the staged writes in the other txn into this one.
344    pub fn merge(&mut self, other: Self) {
345        for (data_id, writes) in other.writes {
346            self.writes.entry(data_id).or_default().merge(writes);
347        }
348        self.tidy.merge(other.tidy);
349    }
350
351    /// Merges the work represented by given tidy into this txn.
352    ///
353    /// If this txn commits, the tidy work will be written at the commit ts.
354    pub fn tidy(&mut self, tidy: Tidy) {
355        self.tidy.merge(tidy);
356    }
357
358    /// Extracts any tidy work that has been merged into this txn with
359    /// [Self::tidy].
360    pub fn take_tidy(&mut self) -> Tidy {
361        std::mem::take(&mut self.tidy)
362    }
363}
364
365/// A token representing the asynchronous "apply" work expected to be promptly
366/// performed by a txn committer.
367#[derive(Debug)]
368#[cfg_attr(any(test, debug_assertions), derive(PartialEq))]
369pub struct TxnApply<T> {
370    is_empty: bool,
371    pub(crate) commit_ts: T,
372}
373
374impl<T> TxnApply<T> {
375    /// Applies the txn, unblocking reads at timestamp it was committed at.
376    pub async fn apply<K, V, D, O, C>(self, handle: &mut TxnsHandle<K, V, T, D, O, C>) -> Tidy
377    where
378        K: Debug + Codec,
379        V: Debug + Codec,
380        T: Timestamp + Lattice + TotalOrder + StepForward + Codec64 + Sync,
381        D: Debug + Semigroup + Ord + Codec64 + Send + Sync,
382        O: Opaque + Debug + Codec64,
383        C: TxnsCodec,
384    {
385        debug!("txn apply {:?}", self.commit_ts);
386        handle.apply_le(&self.commit_ts).await
387    }
388
389    /// Returns whether the apply represents a txn with any non-tidy writes.
390    ///
391    /// If this returns true, the apply is essentially a no-op and safe to
392    /// discard.
393    pub fn is_empty(&self) -> bool {
394        self.is_empty
395    }
396}
397
398#[cfg(test)]
399mod tests {
400    use std::time::{Duration, SystemTime};
401
402    use futures::StreamExt;
403    use futures::stream::FuturesUnordered;
404    use mz_ore::assert_err;
405    use mz_persist_client::PersistClient;
406
407    use crate::tests::writer;
408    use crate::txn_cache::TxnsCache;
409
410    use super::*;
411
412    #[mz_ore::test(tokio::test)]
413    #[cfg_attr(miri, ignore)] // too slow
414    async fn commit_at() {
415        let client = PersistClient::new_for_tests().await;
416        let mut txns = TxnsHandle::expect_open(client.clone()).await;
417        let mut cache = TxnsCache::expect_open(0, &txns).await;
418        let d0 = txns.expect_register(1).await;
419        let d1 = txns.expect_register(2).await;
420
421        // Can merge two txns. Can have multiple data shards in a txn.
422        let mut txn = txns.begin();
423        txn.write(&d0, "0".into(), (), 1).await;
424        let mut other = txns.begin();
425        other.write(&d0, "1".into(), (), 1).await;
426        other.write(&d1, "A".into(), (), 1).await;
427        txn.merge(other);
428        txn.commit_at(&mut txns, 3).await.unwrap();
429
430        // Can commit an empty txn. Can "skip" timestamps.
431        txns.begin().commit_at(&mut txns, 5).await.unwrap();
432
433        // Txn cannot be committed at a closed out time. The Err includes the
434        // earliest committable time. Failed txn can commit on retry.
435        let mut txn = txns.begin();
436        txn.write(&d0, "2".into(), (), 1).await;
437        assert_eq!(txn.commit_at(&mut txns, 4).await, Err(6));
438        txn.commit_at(&mut txns, 6).await.unwrap();
439        txns.apply_le(&6).await;
440
441        let expected_d0 = vec!["0".to_owned(), "1".to_owned(), "2".to_owned()];
442        let actual_d0 = cache.expect_snapshot(&client, d0, 6).await;
443        assert_eq!(actual_d0, expected_d0);
444
445        let expected_d1 = vec!["A".to_owned()];
446        let actual_d1 = cache.expect_snapshot(&client, d1, 6).await;
447        assert_eq!(actual_d1, expected_d1);
448    }
449
450    #[mz_ore::test(tokio::test)]
451    #[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented
452    async fn apply_and_tidy() {
453        let mut txns = TxnsHandle::expect_open(PersistClient::new_for_tests().await).await;
454        let log = txns.new_log();
455        let mut cache = TxnsCache::expect_open(0, &txns).await;
456        let d0 = txns.expect_register(1).await;
457
458        // Non-empty txn means non-empty apply. Min unapplied ts is the commit
459        // ts.
460        let mut txn = txns.begin_test();
461        txn.write(&d0, "2".into(), (), 1).await;
462        let apply_2 = txn.commit_at(&mut txns, 2).await.unwrap();
463        log.record_txn(2, &txn);
464        assert_eq!(apply_2.is_empty(), false);
465        let _ = cache.update_gt(&2).await;
466        cache.mark_register_applied(&2);
467        assert_eq!(cache.min_unapplied_ts(), &2);
468        assert_eq!(cache.unapplied().count(), 1);
469
470        // Running the apply unblocks reads but does not advance the min
471        // unapplied ts.
472        let tidy_2 = apply_2.apply(&mut txns).await;
473        assert_eq!(cache.min_unapplied_ts(), &2);
474
475        // Running the tidy advances the min unapplied ts.
476        txns.tidy_at(3, tidy_2).await.unwrap();
477        let _ = cache.update_gt(&3).await;
478        assert_eq!(cache.min_unapplied_ts(), &4);
479        assert_eq!(cache.unapplied().count(), 0);
480
481        // We can also sneak the tidy into a normal txn. Tidies copy across txn
482        // merges.
483        let tidy_4 = txns.expect_commit_at(4, d0, &["4"], &log).await;
484        let _ = cache.update_gt(&4).await;
485        assert_eq!(cache.min_unapplied_ts(), &4);
486        let mut txn0 = txns.begin_test();
487        txn0.write(&d0, "5".into(), (), 1).await;
488        txn0.tidy(tidy_4);
489        let mut txn1 = txns.begin_test();
490        txn1.merge(txn0);
491        let apply_5 = txn1.commit_at(&mut txns, 5).await.unwrap();
492        log.record_txn(5, &txn1);
493        let _ = cache.update_gt(&5).await;
494        assert_eq!(cache.min_unapplied_ts(), &5);
495        let tidy_5 = apply_5.apply(&mut txns).await;
496
497        // It's fine to drop a tidy, someone else will do it eventually.
498        let tidy_6 = txns.expect_commit_at(6, d0, &["6"], &log).await;
499        txns.tidy_at(7, tidy_6).await.unwrap();
500        let _ = cache.update_gt(&7).await;
501        assert_eq!(cache.min_unapplied_ts(), &8);
502
503        // Also fine if we don't drop it, but instead do it late (no-op but
504        // consumes a ts).
505        txns.tidy_at(8, tidy_5).await.unwrap();
506        let _ = cache.update_gt(&8).await;
507        assert_eq!(cache.min_unapplied_ts(), &9);
508
509        // Tidies can be merged and also can be stolen back out of a txn.
510        let tidy_9 = txns.expect_commit_at(9, d0, &["9"], &log).await;
511        let tidy_10 = txns.expect_commit_at(10, d0, &["10"], &log).await;
512        let mut txn = txns.begin();
513        txn.tidy(tidy_9);
514        let mut tidy_9 = txn.take_tidy();
515        tidy_9.merge(tidy_10);
516        txns.tidy_at(11, tidy_9).await.unwrap();
517        let _ = cache.update_gt(&11).await;
518        assert_eq!(cache.min_unapplied_ts(), &12);
519
520        // Can't tidy at an already committed ts.
521        let tidy_12 = txns.expect_commit_at(12, d0, &["12"], &log).await;
522        assert_eq!(txns.tidy_at(12, tidy_12).await, Err(13));
523
524        let () = log.assert_snapshot(d0, 12).await;
525    }
526
527    #[mz_ore::test(tokio::test(flavor = "multi_thread"))]
528    #[cfg_attr(miri, ignore)] // too slow
529    async fn conflicting_writes() {
530        fn jitter() -> u64 {
531            // We could also use something like `rand`.
532            let time = SystemTime::UNIX_EPOCH.elapsed().unwrap();
533            u64::from(time.subsec_micros() % 20)
534        }
535
536        let client = PersistClient::new_for_tests().await;
537        let mut txns = TxnsHandle::expect_open(client.clone()).await;
538        let log = txns.new_log();
539        let mut cache = TxnsCache::expect_open(0, &txns).await;
540        let d0 = txns.expect_register(1).await;
541
542        const NUM_WRITES: usize = 25;
543        let tasks = FuturesUnordered::new();
544        for idx in 0..NUM_WRITES {
545            let mut txn = txns.begin_test();
546            txn.write(&d0, format!("{:05}", idx), (), 1).await;
547            let (txns_id, client, log) = (txns.txns_id(), client.clone(), log.clone());
548
549            let task = async move {
550                let mut txns = TxnsHandle::expect_open_id(client.clone(), txns_id).await;
551                let mut register_ts = 1;
552                loop {
553                    let data_write = writer(&client, d0).await;
554                    match txns.register(register_ts, [data_write]).await {
555                        Ok(_) => {
556                            debug!("{} registered at {}", idx, register_ts);
557                            break;
558                        }
559                        Err(ts) => {
560                            register_ts = ts;
561                            continue;
562                        }
563                    }
564                }
565
566                // Add some jitter to the commit timestamps (to create gaps) and
567                // to the execution (to create interleaving).
568                let jitter_ms = jitter();
569                let mut commit_ts = register_ts + 1 + jitter_ms;
570                let apply = loop {
571                    let () = tokio::time::sleep(Duration::from_millis(jitter_ms)).await;
572                    match txn.commit_at(&mut txns, commit_ts).await {
573                        Ok(apply) => break apply,
574                        Err(new_commit_ts) => commit_ts = new_commit_ts,
575                    }
576                };
577                debug!("{} committed at {}", idx, commit_ts);
578                log.record_txn(commit_ts, &txn);
579
580                // Ditto sleep before apply.
581                let () = tokio::time::sleep(Duration::from_millis(jitter_ms)).await;
582                let tidy = apply.apply(&mut txns).await;
583
584                // Ditto jitter the tidy timestamps and execution.
585                let jitter_ms = jitter();
586                let mut txn = txns.begin();
587                txn.tidy(tidy);
588                let mut tidy_ts = commit_ts + jitter_ms;
589                loop {
590                    let () = tokio::time::sleep(Duration::from_millis(jitter_ms)).await;
591                    match txn.commit_at(&mut txns, tidy_ts).await {
592                        Ok(apply) => {
593                            debug!("{} tidied at {}", idx, tidy_ts);
594                            assert!(apply.is_empty());
595                            return commit_ts;
596                        }
597                        Err(new_tidy_ts) => tidy_ts = new_tidy_ts,
598                    }
599                }
600            };
601            tasks.push(task)
602        }
603
604        let max_commit_ts = tasks
605            .collect::<Vec<_>>()
606            .await
607            .into_iter()
608            .max()
609            .unwrap_or_default();
610
611        // Also manually create expected as a failsafe in case we ever end up
612        // with a bug in CommitLog.
613        let expected = (0..NUM_WRITES)
614            .map(|x| format!("{:05}", x))
615            .collect::<Vec<_>>();
616        let actual = cache.expect_snapshot(&client, d0, max_commit_ts).await;
617        assert_eq!(actual, expected);
618        log.assert_snapshot(d0, max_commit_ts).await;
619    }
620
621    #[mz_ore::test(tokio::test)]
622    #[cfg_attr(miri, ignore)] // too slow
623    async fn tidy_race() {
624        let client = PersistClient::new_for_tests().await;
625        let mut txns0 = TxnsHandle::expect_open(client.clone()).await;
626        let log = txns0.new_log();
627        let d0 = txns0.expect_register(1).await;
628
629        // Commit something and apply it, but don't tidy yet.
630        let tidy0 = txns0.expect_commit_at(2, d0, &["foo"], &log).await;
631
632        // Now open an independent TxnsHandle, commit, apply, and tidy.
633        let mut txns1 = TxnsHandle::expect_open_id(client.clone(), txns0.txns_id()).await;
634        let d1 = txns1.expect_register(3).await;
635        let tidy1 = txns1.expect_commit_at(4, d1, &["foo"], &log).await;
636        let () = txns1.tidy_at(5, tidy1).await.unwrap();
637
638        // Now try the original tidy0. tidy1 has already done the retraction for
639        // it, so this needs to be careful not to double-retract.
640        let () = txns0.tidy_at(6, tidy0).await.unwrap();
641
642        // Replay a cache from the beginning and make sure we don't see a
643        // double retraction.
644        let mut cache = TxnsCache::expect_open(0, &txns0).await;
645        let _ = cache.update_gt(&6).await;
646        assert_eq!(cache.validate(), Ok(()));
647
648        log.assert_snapshot(d0, 6).await;
649        log.assert_snapshot(d1, 6).await;
650    }
651
652    // Regression test for a bug caught during code review, where it was
653    // possible to commit to an unregistered data shard.
654    #[mz_ore::test(tokio::test)]
655    #[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented
656    async fn commit_unregistered_table() {
657        let client = PersistClient::new_for_tests().await;
658        let mut txns = TxnsHandle::expect_open(client.clone()).await;
659
660        // This panics because the commit ts is before the register ts.
661        let commit = mz_ore::task::spawn(|| "", {
662            let (txns_id, client) = (txns.txns_id(), client.clone());
663            async move {
664                let mut txns = TxnsHandle::expect_open_id(client, txns_id).await;
665                let mut txn = txns.begin();
666                txn.write(&ShardId::new(), "foo".into(), (), 1).await;
667                txn.commit_at(&mut txns, 1).await
668            }
669        });
670        assert_err!(commit.await);
671
672        let d0 = txns.expect_register(2).await;
673        txns.forget(3, [d0]).await.unwrap();
674
675        // This panics because the commit ts is after the forget ts.
676        let commit = mz_ore::task::spawn(|| "", {
677            let (txns_id, client) = (txns.txns_id(), client.clone());
678            async move {
679                let mut txns = TxnsHandle::expect_open_id(client, txns_id).await;
680                let mut txn = txns.begin();
681                txn.write(&d0, "foo".into(), (), 1).await;
682                txn.commit_at(&mut txns, 4).await
683            }
684        });
685        assert_err!(commit.await);
686    }
687
688    #[mz_ore::test(tokio::test)]
689    #[cfg_attr(miri, ignore)] // too slow
690    async fn commit_retry() {
691        let client = PersistClient::new_for_tests().await;
692        let mut txns = TxnsHandle::expect_open(client.clone()).await;
693        let mut cache = TxnsCache::expect_open(0, &txns).await;
694        let d0 = txns.expect_register(1).await;
695        let d1 = txns.expect_register(2).await;
696
697        // `txn` commit is interrupted by `other` commit.
698        let mut txn = txns.begin();
699        txn.write(&d0, "0".into(), (), 1).await;
700        let mut other = txns.begin();
701        other.write(&d1, "42".into(), (), 1).await;
702        other.commit_at(&mut txns, 3).await.unwrap();
703        let upper = txn.commit_at(&mut txns, 3).await.unwrap_err();
704        assert_eq!(upper, 4);
705
706        // Add more writes to `txn` and try again.
707        txn.write(&d0, "1".into(), (), 1).await;
708        txn.commit_at(&mut txns, 4).await.unwrap();
709        txns.apply_le(&4).await;
710
711        let expected_d0 = vec!["0".to_owned(), "1".to_owned()];
712        let actual_d0 = cache.expect_snapshot(&client, d0, 4).await;
713        assert_eq!(actual_d0, expected_d0);
714    }
715}