1use std::collections::BTreeMap;
13use std::fmt::Debug;
14use std::ops::{Deref, DerefMut};
15use std::sync::Arc;
16
17use differential_dataflow::difference::Monoid;
18use differential_dataflow::lattice::Lattice;
19use futures::StreamExt;
20use futures::stream::FuturesUnordered;
21use mz_dyncfg::{Config, ConfigSet, ConfigValHandle};
22use mz_ore::collections::HashSet;
23use mz_ore::instrument;
24use mz_persist_client::batch::Batch;
25use mz_persist_client::cfg::USE_CRITICAL_SINCE_TXN;
26use mz_persist_client::critical::SinceHandle;
27use mz_persist_client::write::WriteHandle;
28use mz_persist_client::{Diagnostics, PersistClient, ShardId};
29use mz_persist_types::schema::SchemaId;
30use mz_persist_types::txn::{TxnsCodec, TxnsEntry};
31use mz_persist_types::{Codec, Codec64, Opaque, StepForward};
32use timely::order::TotalOrder;
33use timely::progress::Timestamp;
34use tracing::debug;
35
36use crate::TxnsCodecDefault;
37use crate::metrics::Metrics;
38use crate::txn_cache::{TxnsCache, Unapplied};
39use crate::txn_write::Txn;
40
41#[derive(Debug)]
107pub struct TxnsHandle<K: Codec, V: Codec, T, D, O = u64, C: TxnsCodec = TxnsCodecDefault> {
108 pub(crate) metrics: Arc<Metrics>,
109 pub(crate) txns_cache: TxnsCache<T, C>,
110 pub(crate) txns_write: WriteHandle<C::Key, C::Val, T, i64>,
111 pub(crate) txns_since: SinceHandle<C::Key, C::Val, T, i64, O>,
112 pub(crate) datas: DataHandles<K, V, T, D>,
113}
114
115impl<K, V, T, D, O, C> TxnsHandle<K, V, T, D, O, C>
116where
117 K: Debug + Codec,
118 V: Debug + Codec,
119 T: Timestamp + Lattice + TotalOrder + StepForward + Codec64 + Sync,
120 D: Debug + Monoid + Ord + Codec64 + Send + Sync,
121 O: Opaque + Debug + Codec64,
122 C: TxnsCodec,
123{
124 pub async fn open(
134 init_ts: T,
135 client: PersistClient,
136 dyncfgs: ConfigSet,
137 metrics: Arc<Metrics>,
138 txns_id: ShardId,
139 ) -> Self {
140 let (txns_key_schema, txns_val_schema) = C::schemas();
141 let (mut txns_write, txns_read) = client
142 .open(
143 txns_id,
144 Arc::new(txns_key_schema),
145 Arc::new(txns_val_schema),
146 Diagnostics {
147 shard_name: "txns".to_owned(),
148 handle_purpose: "commit txns".to_owned(),
149 },
150 USE_CRITICAL_SINCE_TXN.get(client.dyncfgs()),
151 )
152 .await
153 .expect("txns schema shouldn't change");
154 let txns_since = client
155 .open_critical_since(
156 txns_id,
157 PersistClient::CONTROLLER_CRITICAL_SINCE,
160 Diagnostics {
161 shard_name: "txns".to_owned(),
162 handle_purpose: "commit txns".to_owned(),
163 },
164 )
165 .await
166 .expect("txns schema shouldn't change");
167 let txns_cache = TxnsCache::init(init_ts, txns_read, &mut txns_write).await;
168 TxnsHandle {
169 metrics,
170 txns_cache,
171 txns_write,
172 txns_since,
173 datas: DataHandles {
174 dyncfgs,
175 client: Arc::new(client),
176 data_write_for_apply: BTreeMap::new(),
177 data_write_for_commit: BTreeMap::new(),
178 },
179 }
180 }
181
182 pub fn begin(&self) -> Txn<K, V, T, D> {
185 Txn::new()
188 }
189
190 #[instrument(level = "debug", fields(ts = ?register_ts))]
210 pub async fn register(
211 &mut self,
212 register_ts: T,
213 data_writes: impl IntoIterator<Item = WriteHandle<K, V, T, D>>,
214 ) -> Result<Tidy, T> {
215 let op = &Arc::clone(&self.metrics).register;
216 op.run(async {
217 let mut data_writes = data_writes.into_iter().collect::<Vec<_>>();
218
219 for data_write in &mut data_writes {
223 data_write
226 .try_register_schema()
227 .await
228 .expect("schema should be registered");
229 }
230
231 let updates = data_writes
232 .iter()
233 .map(|data_write| {
234 let data_id = data_write.shard_id();
235 let entry = TxnsEntry::Register(data_id, T::encode(®ister_ts));
236 (data_id, C::encode(entry))
237 })
238 .collect::<Vec<_>>();
239 let data_ids_debug = || {
240 data_writes
241 .iter()
242 .map(|x| format!("{:.9}", x.shard_id().to_string()))
243 .collect::<Vec<_>>()
244 .join(" ")
245 };
246
247 let mut txns_upper = self
248 .txns_write
249 .shared_upper()
250 .into_option()
251 .expect("txns should not be closed");
252 loop {
253 txns_upper = self.txns_cache.update_ge(&txns_upper).await.clone();
254 let updates = updates
258 .iter()
259 .flat_map(|(data_id, (key, val))| {
260 let registered =
261 self.txns_cache.registered_at_progress(data_id, &txns_upper);
262 (!registered).then_some(((key, val), ®ister_ts, 1))
263 })
264 .collect::<Vec<_>>();
265 if register_ts < txns_upper {
267 debug!(
268 "txns register {} at {:?} mismatch current={:?}",
269 data_ids_debug(),
270 register_ts,
271 txns_upper,
272 );
273 return Err(txns_upper);
274 }
275
276 let res = crate::small_caa(
277 || format!("txns register {}", data_ids_debug()),
278 &mut self.txns_write,
279 &updates,
280 txns_upper,
281 register_ts.step_forward(),
282 )
283 .await;
284 match res {
285 Ok(()) => {
286 debug!(
287 "txns register {} at {:?} success",
288 data_ids_debug(),
289 register_ts
290 );
291 break;
292 }
293 Err(new_txns_upper) => {
294 self.metrics.register.retry_count.inc();
295 txns_upper = new_txns_upper;
296 continue;
297 }
298 }
299 }
300 for data_write in data_writes {
301 match self.datas.data_write_for_commit.get(&data_write.shard_id()) {
307 None => {
308 self.datas
309 .data_write_for_commit
310 .insert(data_write.shard_id(), DataWriteCommit(data_write));
311 }
312 Some(previous) => {
313 let new_schema_id = data_write.schema_id().expect("ensured above");
314
315 if let Some(prev_schema_id) = previous.schema_id()
316 && prev_schema_id > new_schema_id
317 {
318 mz_ore::soft_panic_or_log!(
319 "tried registering a WriteHandle with an older SchemaId; \
320 prev_schema_id: {} new_schema_id: {} shard_id: {}",
321 prev_schema_id,
322 new_schema_id,
323 previous.shard_id(),
324 );
325 continue;
326 } else if previous.schema_id().is_none() {
327 mz_ore::soft_panic_or_log!(
328 "encountered data shard without a schema; shard_id: {}",
329 previous.shard_id(),
330 );
331 }
332
333 tracing::info!(
334 prev_schema_id = ?previous.schema_id(),
335 ?new_schema_id,
336 shard_id = %previous.shard_id(),
337 "replacing WriteHandle"
338 );
339 self.datas
340 .data_write_for_commit
341 .insert(data_write.shard_id(), DataWriteCommit(data_write));
342 }
343 }
344 }
345 let tidy = self.apply_le(®ister_ts).await;
346
347 Ok(tidy)
348 })
349 .await
350 }
351
352 #[instrument(level = "debug", fields(ts = ?forget_ts))]
372 pub async fn forget(
373 &mut self,
374 forget_ts: T,
375 data_ids: impl IntoIterator<Item = ShardId>,
376 ) -> Result<Tidy, T> {
377 let op = &Arc::clone(&self.metrics).forget;
378 op.run(async {
379 let data_ids = data_ids.into_iter().collect::<Vec<_>>();
380 let mut txns_upper = self
381 .txns_write
382 .shared_upper()
383 .into_option()
384 .expect("txns should not be closed");
385 loop {
386 txns_upper = self.txns_cache.update_ge(&txns_upper).await.clone();
387
388 let data_ids_debug = || {
389 data_ids
390 .iter()
391 .map(|x| format!("{:.9}", x.to_string()))
392 .collect::<Vec<_>>()
393 .join(" ")
394 };
395 let updates = data_ids
396 .iter()
397 .filter(|data_id| self.txns_cache.registered_at_progress(data_id, &txns_upper))
401 .map(|data_id| C::encode(TxnsEntry::Register(*data_id, T::encode(&forget_ts))))
402 .collect::<Vec<_>>();
403 let updates = updates
404 .iter()
405 .map(|(key, val)| ((key, val), &forget_ts, -1))
406 .collect::<Vec<_>>();
407
408 if forget_ts < txns_upper {
410 debug!(
411 "txns forget {} at {:?} mismatch current={:?}",
412 data_ids_debug(),
413 forget_ts,
414 txns_upper,
415 );
416 return Err(txns_upper);
417 }
418
419 {
422 let data_ids: HashSet<_> = data_ids.iter().cloned().collect();
423 let data_latest_unapplied = self
424 .txns_cache
425 .unapplied_batches
426 .values()
427 .rev()
428 .find(|(x, _, _)| data_ids.contains(x));
429 if let Some((_, _, latest_write)) = data_latest_unapplied {
430 debug!(
431 "txns forget {} applying latest write {:?}",
432 data_ids_debug(),
433 latest_write,
434 );
435 let latest_write = latest_write.clone();
436 let _tidy = self.apply_le(&latest_write).await;
437 }
438 }
439 let res = crate::small_caa(
440 || format!("txns forget {}", data_ids_debug()),
441 &mut self.txns_write,
442 &updates,
443 txns_upper,
444 forget_ts.step_forward(),
445 )
446 .await;
447 match res {
448 Ok(()) => {
449 debug!(
450 "txns forget {} at {:?} success",
451 data_ids_debug(),
452 forget_ts
453 );
454 break;
455 }
456 Err(new_txns_upper) => {
457 self.metrics.forget.retry_count.inc();
458 txns_upper = new_txns_upper;
459 continue;
460 }
461 }
462 }
463
464 let tidy = self.apply_le(&forget_ts).await;
467 for data_id in &data_ids {
468 self.datas.data_write_for_commit.remove(data_id);
469 }
470
471 Ok(tidy)
472 })
473 .await
474 }
475
476 #[instrument(level = "debug", fields(ts = ?forget_ts))]
479 pub async fn forget_all(&mut self, forget_ts: T) -> Result<(Vec<ShardId>, Tidy), T> {
480 let op = &Arc::clone(&self.metrics).forget_all;
481 op.run(async {
482 let mut txns_upper = self
483 .txns_write
484 .shared_upper()
485 .into_option()
486 .expect("txns should not be closed");
487 let registered = loop {
488 txns_upper = self.txns_cache.update_ge(&txns_upper).await.clone();
489
490 let registered = self.txns_cache.all_registered_at_progress(&txns_upper);
491 let data_ids_debug = || {
492 registered
493 .iter()
494 .map(|x| format!("{:.9}", x.to_string()))
495 .collect::<Vec<_>>()
496 .join(" ")
497 };
498 let updates = registered
499 .iter()
500 .map(|data_id| {
501 C::encode(crate::TxnsEntry::Register(*data_id, T::encode(&forget_ts)))
502 })
503 .collect::<Vec<_>>();
504 let updates = updates
505 .iter()
506 .map(|(key, val)| ((key, val), &forget_ts, -1))
507 .collect::<Vec<_>>();
508
509 if forget_ts < txns_upper {
511 debug!(
512 "txns forget_all {} at {:?} mismatch current={:?}",
513 data_ids_debug(),
514 forget_ts,
515 txns_upper,
516 );
517 return Err(txns_upper);
518 }
519
520 let data_latest_unapplied = self.txns_cache.unapplied_batches.values().last();
528 if let Some((_, _, latest_write)) = data_latest_unapplied {
529 debug!(
530 "txns forget_all {} applying latest write {:?}",
531 data_ids_debug(),
532 latest_write,
533 );
534 let latest_write = latest_write.clone();
535 let _tidy = self.apply_le(&latest_write).await;
536 }
537 let res = crate::small_caa(
538 || format!("txns forget_all {}", data_ids_debug()),
539 &mut self.txns_write,
540 &updates,
541 txns_upper,
542 forget_ts.step_forward(),
543 )
544 .await;
545 match res {
546 Ok(()) => {
547 debug!(
548 "txns forget_all {} at {:?} success",
549 data_ids_debug(),
550 forget_ts
551 );
552 break registered;
553 }
554 Err(new_txns_upper) => {
555 self.metrics.forget_all.retry_count.inc();
556 txns_upper = new_txns_upper;
557 continue;
558 }
559 }
560 };
561
562 for data_id in registered.iter() {
563 self.datas.data_write_for_commit.remove(data_id);
564 }
565 let tidy = self.apply_le(&forget_ts).await;
566
567 Ok((registered, tidy))
568 })
569 .await
570 }
571
572 #[instrument(level = "debug", fields(ts = ?ts))]
585 pub async fn apply_le(&mut self, ts: &T) -> Tidy {
586 let op = &self.metrics.apply_le;
587 op.run(async {
588 debug!("apply_le {:?}", ts);
589 let _ = self.txns_cache.update_gt(ts).await;
590 self.txns_cache.update_gauges(&self.metrics);
591
592 let mut unapplied_by_data = BTreeMap::<_, Vec<_>>::new();
593 for (data_id, unapplied, unapplied_ts) in self.txns_cache.unapplied() {
594 if ts < unapplied_ts {
595 break;
596 }
597 unapplied_by_data
598 .entry(*data_id)
599 .or_default()
600 .push((unapplied, unapplied_ts));
601 }
602
603 let retractions = FuturesUnordered::new();
604 for (data_id, unapplied) in unapplied_by_data {
605 let mut data_write = self.datas.take_write_for_apply(&data_id).await;
606 retractions.push(async move {
607 let mut ret = Vec::new();
608 for (unapplied, unapplied_ts) in unapplied {
609 match unapplied {
610 Unapplied::RegisterForget => {
611 let () = crate::empty_caa(
612 || {
613 format!(
614 "data {:.9} register/forget fill",
615 data_id.to_string()
616 )
617 },
618 &mut data_write,
619 unapplied_ts.clone(),
620 )
621 .await;
622 }
623 Unapplied::Batch(batch_raws) => {
624 let batch_raws = batch_raws
625 .into_iter()
626 .map(|batch_raw| batch_raw.as_slice())
627 .collect();
628 crate::apply_caa(
629 &mut data_write,
630 &batch_raws,
631 unapplied_ts.clone(),
632 )
633 .await;
634 for batch_raw in batch_raws {
635 ret.push((
639 batch_raw.to_vec(),
640 (T::encode(unapplied_ts), data_id),
641 ));
642 }
643 }
644 }
645 }
646 (data_write, ret)
647 });
648 }
649 let retractions = retractions.collect::<Vec<_>>().await;
650 let retractions = retractions
651 .into_iter()
652 .flat_map(|(data_write, retractions)| {
653 self.datas.put_write_for_apply(data_write);
654 retractions
655 })
656 .collect();
657
658 self.txns_cache.mark_register_applied(ts);
660
661 debug!("apply_le {:?} success", ts);
662 Tidy { retractions }
663 })
664 .await
665 }
666
667 #[cfg(test)]
672 pub async fn tidy_at(&mut self, tidy_ts: T, tidy: Tidy) -> Result<(), T> {
673 debug!("tidy at {:?}", tidy_ts);
674
675 let mut txn = self.begin();
676 txn.tidy(tidy);
677 let apply = txn.commit_at(self, tidy_ts.clone()).await?;
681 assert!(apply.is_empty());
682
683 debug!("tidy at {:?} success", tidy_ts);
684 Ok(())
685 }
686
687 pub async fn compact_to(&mut self, mut since_ts: T) {
693 let op = &self.metrics.compact_to;
694 op.run(async {
695 tracing::debug!("compact_to {:?}", since_ts);
696 let _ = self.txns_cache.update_gt(&since_ts).await;
697
698 let min_unapplied_ts = self.txns_cache.min_unapplied_ts();
702 if min_unapplied_ts < &since_ts {
703 since_ts.clone_from(min_unapplied_ts);
704 }
705 crate::cads::<T, O, C>(&mut self.txns_since, since_ts).await;
706 })
707 .await
708 }
709
710 pub async fn upgrade_version(&mut self) {
715 self.txns_since
716 .upgrade_version()
717 .await
718 .expect("invalid usage")
719 }
720
721 pub fn txns_id(&self) -> ShardId {
723 self.txns_write.shard_id()
724 }
725
726 pub fn read_cache(&self) -> &TxnsCache<T, C> {
728 &self.txns_cache
729 }
730}
731
732#[derive(Debug, Default)]
738pub struct Tidy {
739 pub(crate) retractions: BTreeMap<Vec<u8>, ([u8; 8], ShardId)>,
740}
741
742impl Tidy {
743 pub fn merge(&mut self, other: Tidy) {
745 self.retractions.extend(other.retractions)
746 }
747}
748
749#[derive(Debug)]
751pub(crate) struct DataHandles<K: Codec, V: Codec, T, D> {
752 pub(crate) dyncfgs: ConfigSet,
753 pub(crate) client: Arc<PersistClient>,
754 data_write_for_apply: BTreeMap<ShardId, DataWriteApply<K, V, T, D>>,
758 data_write_for_commit: BTreeMap<ShardId, DataWriteCommit<K, V, T, D>>,
767}
768
769impl<K, V, T, D> DataHandles<K, V, T, D>
770where
771 K: Debug + Codec,
772 V: Debug + Codec,
773 T: Timestamp + Lattice + TotalOrder + Codec64 + Sync,
774 D: Monoid + Ord + Codec64 + Send + Sync,
775{
776 async fn open_data_write_for_apply(&self, data_id: ShardId) -> DataWriteApply<K, V, T, D> {
777 let diagnostics = Diagnostics::from_purpose("txn data");
778 let schemas = self
779 .client
780 .latest_schema::<K, V, T, D>(data_id, diagnostics.clone())
781 .await
782 .expect("codecs have not changed");
783 let (key_schema, val_schema) = match schemas {
784 Some((_, key_schema, val_schema)) => (Arc::new(key_schema), Arc::new(val_schema)),
785 None => unreachable!("data shard {} should have a schema", data_id),
788 };
789 let wrapped = self
790 .client
791 .open_writer(data_id, key_schema, val_schema, diagnostics)
792 .await
793 .expect("schema shouldn't change");
794 DataWriteApply {
795 apply_ensure_schema_match: APPLY_ENSURE_SCHEMA_MATCH.handle(&self.dyncfgs),
796 client: Arc::clone(&self.client),
797 wrapped,
798 }
799 }
800
801 pub(crate) async fn take_write_for_apply(
802 &mut self,
803 data_id: &ShardId,
804 ) -> DataWriteApply<K, V, T, D> {
805 if let Some(data_write) = self.data_write_for_apply.remove(data_id) {
806 return data_write;
807 }
808 self.open_data_write_for_apply(*data_id).await
809 }
810
811 pub(crate) fn put_write_for_apply(&mut self, data_write: DataWriteApply<K, V, T, D>) {
812 self.data_write_for_apply
813 .insert(data_write.shard_id(), data_write);
814 }
815
816 pub(crate) fn take_write_for_commit(
817 &mut self,
818 data_id: &ShardId,
819 ) -> Option<DataWriteCommit<K, V, T, D>> {
820 self.data_write_for_commit.remove(data_id)
821 }
822
823 pub(crate) fn put_write_for_commit(&mut self, data_write: DataWriteCommit<K, V, T, D>) {
824 let prev = self
825 .data_write_for_commit
826 .insert(data_write.shard_id(), data_write);
827 assert!(prev.is_none());
828 }
829}
830
831#[derive(Debug)]
841pub(crate) struct DataWriteCommit<K: Codec, V: Codec, T, D>(pub(crate) WriteHandle<K, V, T, D>);
842
843impl<K: Codec, V: Codec, T, D> Deref for DataWriteCommit<K, V, T, D> {
844 type Target = WriteHandle<K, V, T, D>;
845
846 fn deref(&self) -> &Self::Target {
847 &self.0
848 }
849}
850
851impl<K: Codec, V: Codec, T, D> DerefMut for DataWriteCommit<K, V, T, D> {
852 fn deref_mut(&mut self) -> &mut Self::Target {
853 &mut self.0
854 }
855}
856
857#[derive(Debug)]
866pub(crate) struct DataWriteApply<K: Codec, V: Codec, T, D> {
867 client: Arc<PersistClient>,
868 apply_ensure_schema_match: ConfigValHandle<bool>,
869 pub(crate) wrapped: WriteHandle<K, V, T, D>,
870}
871
872impl<K: Codec, V: Codec, T, D> Deref for DataWriteApply<K, V, T, D> {
873 type Target = WriteHandle<K, V, T, D>;
874
875 fn deref(&self) -> &Self::Target {
876 &self.wrapped
877 }
878}
879
880impl<K: Codec, V: Codec, T, D> DerefMut for DataWriteApply<K, V, T, D> {
881 fn deref_mut(&mut self) -> &mut Self::Target {
882 &mut self.wrapped
883 }
884}
885
886pub(crate) const APPLY_ENSURE_SCHEMA_MATCH: Config<bool> = Config::new(
887 "txn_wal_apply_ensure_schema_match",
888 true,
889 "CYA to skip updating write handle to batch schema in apply",
890);
891
892fn at_most_one_schema(
893 schemas: impl Iterator<Item = SchemaId>,
894) -> Result<Option<SchemaId>, (SchemaId, SchemaId)> {
895 let mut schema = None;
896 for s in schemas {
897 match schema {
898 None => schema = Some(s),
899 Some(x) if s != x => return Err((s, x)),
900 Some(_) => continue,
901 }
902 }
903 Ok(schema)
904}
905
906impl<K, V, T, D> DataWriteApply<K, V, T, D>
907where
908 K: Debug + Codec,
909 V: Debug + Codec,
910 T: Timestamp + Lattice + TotalOrder + Codec64 + Sync,
911 D: Monoid + Ord + Codec64 + Send + Sync,
912{
913 pub(crate) async fn maybe_replace_with_batch_schema(&mut self, batches: &[Batch<K, V, T, D>]) {
914 if !self.apply_ensure_schema_match.get() {
917 return;
918 }
919 let batch_schema = at_most_one_schema(batches.iter().flat_map(|x| x.schemas()));
920 let batch_schema = batch_schema.unwrap_or_else(|_| {
921 panic!(
922 "txn-wal uses at most one schema to commit batches, got: {:?}",
923 batches.iter().flat_map(|x| x.schemas()).collect::<Vec<_>>()
924 )
925 });
926 let (batch_schema, handle_schema) = match (batch_schema, self.wrapped.schema_id()) {
927 (Some(batch_schema), Some(handle_schema)) if batch_schema != handle_schema => {
928 (batch_schema, handle_schema)
929 }
930 _ => return,
931 };
932
933 let data_id = self.shard_id();
934 let diagnostics = Diagnostics::from_purpose("txn data");
935 let (key_schema, val_schema) = self
936 .client
937 .get_schema::<K, V, T, D>(data_id, batch_schema, diagnostics.clone())
938 .await
939 .expect("codecs shouldn't change")
940 .expect("id must have been registered to create this batch");
941 let new_data_write = self
942 .client
943 .open_writer(
944 self.shard_id(),
945 Arc::new(key_schema),
946 Arc::new(val_schema),
947 diagnostics,
948 )
949 .await
950 .expect("codecs shouldn't change");
951 tracing::info!(
952 "updated {} write handle from {} to {} to apply batches",
953 data_id,
954 handle_schema,
955 batch_schema
956 );
957 assert_eq!(new_data_write.schema_id(), Some(batch_schema));
958 self.wrapped = new_data_write;
959 }
960}
961
962#[cfg(test)]
963mod tests {
964 use std::time::{Duration, UNIX_EPOCH};
965
966 use differential_dataflow::Hashable;
967 use futures::future::BoxFuture;
968 use mz_ore::assert_none;
969 use mz_ore::cast::CastFrom;
970 use mz_ore::collections::CollectionExt;
971 use mz_ore::metrics::MetricsRegistry;
972 use mz_persist_client::PersistLocation;
973 use mz_persist_client::cache::PersistClientCache;
974 use mz_persist_client::cfg::RetryParameters;
975 use rand::rngs::SmallRng;
976 use rand::{RngCore, SeedableRng};
977 use timely::progress::Antichain;
978 use tokio::sync::oneshot;
979 use tracing::{Instrument, info, info_span};
980
981 use crate::operator::DataSubscribe;
982 use crate::tests::{CommitLog, reader, write_directly, writer};
983
984 use super::*;
985
986 impl TxnsHandle<String, (), u64, i64, u64, TxnsCodecDefault> {
987 pub(crate) async fn expect_open(client: PersistClient) -> Self {
988 Self::expect_open_id(client, ShardId::new()).await
989 }
990
991 pub(crate) async fn expect_open_id(client: PersistClient, txns_id: ShardId) -> Self {
992 let dyncfgs = crate::all_dyncfgs(client.dyncfgs().clone());
993 Self::open(
994 0,
995 client,
996 dyncfgs,
997 Arc::new(Metrics::new(&MetricsRegistry::new())),
998 txns_id,
999 )
1000 .await
1001 }
1002
1003 pub(crate) fn new_log(&self) -> CommitLog {
1004 CommitLog::new((*self.datas.client).clone(), self.txns_id())
1005 }
1006
1007 pub(crate) async fn expect_register(&mut self, register_ts: u64) -> ShardId {
1008 self.expect_registers(register_ts, 1).await.into_element()
1009 }
1010
1011 pub(crate) async fn expect_registers(
1012 &mut self,
1013 register_ts: u64,
1014 amount: usize,
1015 ) -> Vec<ShardId> {
1016 let data_ids: Vec<_> = (0..amount).map(|_| ShardId::new()).collect();
1017 let mut writers = Vec::new();
1018 for data_id in &data_ids {
1019 writers.push(writer(&self.datas.client, *data_id).await);
1020 }
1021 self.register(register_ts, writers).await.unwrap();
1022 data_ids
1023 }
1024
1025 pub(crate) async fn expect_commit_at(
1026 &mut self,
1027 commit_ts: u64,
1028 data_id: ShardId,
1029 keys: &[&str],
1030 log: &CommitLog,
1031 ) -> Tidy {
1032 let mut txn = self.begin();
1033 for key in keys {
1034 txn.write(&data_id, (*key).into(), (), 1).await;
1035 }
1036 let tidy = txn
1037 .commit_at(self, commit_ts)
1038 .await
1039 .unwrap()
1040 .apply(self)
1041 .await;
1042 for key in keys {
1043 log.record((data_id, (*key).into(), commit_ts, 1));
1044 }
1045 tidy
1046 }
1047 }
1048
1049 #[mz_ore::test(tokio::test)]
1050 #[cfg_attr(miri, ignore)] async fn register_at() {
1052 let client = PersistClient::new_for_tests().await;
1053 let mut txns = TxnsHandle::expect_open(client.clone()).await;
1054 let log = txns.new_log();
1055 let d0 = txns.expect_register(2).await;
1056
1057 txns.register(3, [writer(&client, d0).await]).await.unwrap();
1059
1060 let d1 = ShardId::new();
1064 assert_eq!(
1065 txns.register(2, [writer(&client, d1).await])
1066 .await
1067 .unwrap_err(),
1068 4
1069 );
1070
1071 txns.expect_commit_at(4, d0, &["foo"], &log).await;
1073 txns.register(5, [writer(&client, d1).await]).await.unwrap();
1074
1075 let d2 = ShardId::new();
1077 txns.register(6, [writer(&client, d0).await, writer(&client, d2).await])
1078 .await
1079 .unwrap();
1080
1081 let () = log.assert_snapshot(d0, 6).await;
1082 let () = log.assert_snapshot(d1, 6).await;
1083 }
1084
1085 #[mz_ore::test(tokio::test)]
1088 #[cfg_attr(miri, ignore)] #[should_panic(expected = "left: [(\"foo\", 2, 1)]\n right: [(\"foo\", 2, 2)]")]
1090 async fn incorrect_usage_register_write_same_time() {
1091 let client = PersistClient::new_for_tests().await;
1092 let mut txns = TxnsHandle::expect_open(client.clone()).await;
1093 let log = txns.new_log();
1094 let d0 = txns.expect_register(1).await;
1095 let mut d0_write = writer(&client, d0).await;
1096
1097 let mut txn = txns.begin_test();
1099 txn.write(&d0, "foo".into(), (), 1).await;
1100 let apply = txn.commit_at(&mut txns, 2).await.unwrap();
1101 log.record_txn(2, &txn);
1102 let () = d0_write
1104 .compare_and_append(
1105 &[(("foo".to_owned(), ()), 2, 1)],
1106 Antichain::from_elem(2),
1107 Antichain::from_elem(3),
1108 )
1109 .await
1110 .unwrap()
1111 .unwrap();
1112 log.record((d0, "foo".into(), 2, 1));
1113 apply.apply(&mut txns).await;
1114
1115 log.assert_snapshot(d0, 2).await;
1117 }
1118
1119 #[mz_ore::test(tokio::test)]
1120 #[cfg_attr(miri, ignore)] async fn forget_at() {
1122 let client = PersistClient::new_for_tests().await;
1123 let mut txns = TxnsHandle::expect_open(client.clone()).await;
1124 let log = txns.new_log();
1125
1126 txns.forget(1, [ShardId::new()]).await.unwrap();
1128
1129 txns.forget(2, (0..5).map(|_| ShardId::new()))
1131 .await
1132 .unwrap();
1133
1134 let d0 = txns.expect_register(3).await;
1136 txns.forget(4, [d0]).await.unwrap();
1137
1138 let ds = txns.expect_registers(5, 5).await;
1140 txns.forget(6, ds.clone()).await.unwrap();
1141
1142 txns.forget(7, [d0]).await.unwrap();
1144 txns.forget(8, ds.clone()).await.unwrap();
1145
1146 let d1 = txns.expect_register(9).await;
1149 assert_eq!(txns.forget(9, [d1]).await.unwrap_err(), 10);
1150
1151 let mut d0_write = writer(&client, d0).await;
1153 txns.expect_commit_at(10, d1, &["d1"], &log).await;
1154 let updates = [(("d0".to_owned(), ()), 10, 1)];
1155 d0_write
1156 .compare_and_append(&updates, d0_write.shared_upper(), Antichain::from_elem(11))
1157 .await
1158 .unwrap()
1159 .unwrap();
1160 log.record((d0, "d0".into(), 10, 1));
1161
1162 txns.register(11, [writer(&client, d0).await])
1164 .await
1165 .unwrap();
1166 let mut forget_expected = vec![d0, d1];
1167 forget_expected.sort();
1168 assert_eq!(txns.forget_all(12).await.unwrap().0, forget_expected);
1169
1170 d0_write
1172 .compare_and_append_batch(&mut [], d0_write.shared_upper(), Antichain::new(), true)
1173 .await
1174 .unwrap()
1175 .unwrap();
1176
1177 let () = log.assert_snapshot(d0, 12).await;
1178 let () = log.assert_snapshot(d1, 12).await;
1179
1180 for di in ds {
1181 let mut di_write = writer(&client, di).await;
1182
1183 di_write
1185 .compare_and_append_batch(&mut [], di_write.shared_upper(), Antichain::new(), true)
1186 .await
1187 .unwrap()
1188 .unwrap();
1189
1190 let () = log.assert_snapshot(di, 8).await;
1191 }
1192 }
1193
1194 #[mz_ore::test(tokio::test)]
1195 #[cfg_attr(miri, ignore)] async fn register_forget() {
1197 async fn step_some_past(subs: &mut Vec<DataSubscribe>, ts: u64) {
1198 for (idx, sub) in subs.iter_mut().enumerate() {
1199 if usize::cast_from(ts) % (idx + 1) == 0 {
1201 async {
1202 info!("stepping sub {} past {}", idx, ts);
1203 sub.step_past(ts).await;
1204 }
1205 .instrument(info_span!("sub", idx))
1206 .await;
1207 }
1208 }
1209 }
1210
1211 let client = PersistClient::new_for_tests().await;
1212 let mut txns = TxnsHandle::expect_open(client.clone()).await;
1213 let log = txns.new_log();
1214 let d0 = ShardId::new();
1215 let mut d0_write = writer(&client, d0).await;
1216 let mut subs = Vec::new();
1217
1218 let mut ts = 0;
1228 while ts < 32 {
1229 subs.push(txns.read_cache().expect_subscribe(&client, d0, ts));
1230 ts += 1;
1231 info!("{} direct", ts);
1232 txns.begin().commit_at(&mut txns, ts).await.unwrap();
1233 write_directly(ts, &mut d0_write, &[&format!("d{}", ts)], &log).await;
1234 step_some_past(&mut subs, ts).await;
1235 if ts % 11 == 0 {
1236 txns.compact_to(ts).await;
1237 }
1238
1239 subs.push(txns.read_cache().expect_subscribe(&client, d0, ts));
1240 ts += 1;
1241 info!("{} register", ts);
1242 txns.register(ts, [writer(&client, d0).await])
1243 .await
1244 .unwrap();
1245 step_some_past(&mut subs, ts).await;
1246 if ts % 11 == 0 {
1247 txns.compact_to(ts).await;
1248 }
1249
1250 subs.push(txns.read_cache().expect_subscribe(&client, d0, ts));
1251 ts += 1;
1252 info!("{} txns", ts);
1253 txns.expect_commit_at(ts, d0, &[&format!("t{}", ts)], &log)
1254 .await;
1255 step_some_past(&mut subs, ts).await;
1256 if ts % 11 == 0 {
1257 txns.compact_to(ts).await;
1258 }
1259
1260 subs.push(txns.read_cache().expect_subscribe(&client, d0, ts));
1261 ts += 1;
1262 info!("{} forget", ts);
1263 txns.forget(ts, [d0]).await.unwrap();
1264 step_some_past(&mut subs, ts).await;
1265 if ts % 11 == 0 {
1266 txns.compact_to(ts).await;
1267 }
1268 }
1269
1270 for mut sub in subs.into_iter() {
1272 sub.step_past(ts).await;
1273 log.assert_eq(d0, sub.as_of, sub.progress(), sub.output().clone());
1274 }
1275 }
1276
1277 #[mz_ore::test(tokio::test)]
1286 #[cfg_attr(miri, ignore)] async fn race_data_shard_register_and_commit() {
1288 let client = PersistClient::new_for_tests().await;
1289 let mut txns = TxnsHandle::expect_open(client.clone()).await;
1290 let d0 = txns.expect_register(1).await;
1291
1292 let mut txn = txns.begin();
1293 txn.write(&d0, "foo".into(), (), 1).await;
1294 let commit_apply = txn.commit_at(&mut txns, 2).await.unwrap();
1295
1296 txns.register(3, [writer(&client, d0).await]).await.unwrap();
1297
1298 let actual = txns.txns_cache.expect_snapshot(&client, d0, 1).await;
1301 assert_eq!(actual, Vec::<String>::new());
1302
1303 commit_apply.apply(&mut txns).await;
1304 let actual = txns.txns_cache.expect_snapshot(&client, d0, 2).await;
1305 assert_eq!(actual, vec!["foo".to_owned()]);
1306 }
1307
1308 #[mz_ore::test(tokio::test(flavor = "multi_thread"))]
1310 #[cfg_attr(miri, ignore)] async fn apply_many_ts() {
1312 let client = PersistClient::new_for_tests().await;
1313 let mut txns = TxnsHandle::expect_open(client.clone()).await;
1314 let log = txns.new_log();
1315 let d0 = txns.expect_register(1).await;
1316
1317 for ts in 2..10 {
1318 let mut txn = txns.begin();
1319 txn.write(&d0, ts.to_string(), (), 1).await;
1320 let _apply = txn.commit_at(&mut txns, ts).await.unwrap();
1321 log.record((d0, ts.to_string(), ts, 1));
1322 }
1323 txns.expect_commit_at(10, d0, &[], &log).await;
1326
1327 log.assert_snapshot(d0, 10).await;
1328 }
1329
1330 struct StressWorker {
1331 idx: usize,
1332 data_ids: Vec<ShardId>,
1333 txns: TxnsHandle<String, (), u64, i64>,
1334 log: CommitLog,
1335 tidy: Tidy,
1336 ts: u64,
1337 step: usize,
1338 rng: SmallRng,
1339 reads: Vec<(
1340 oneshot::Sender<u64>,
1341 ShardId,
1342 u64,
1343 mz_ore::task::JoinHandle<Vec<(String, u64, i64)>>,
1344 )>,
1345 }
1346
1347 impl StressWorker {
1348 pub async fn step(&mut self) {
1349 debug!(
1350 "stress {} step {} START ts={}",
1351 self.idx, self.step, self.ts
1352 );
1353 let data_id =
1354 self.data_ids[usize::cast_from(self.rng.next_u64()) % self.data_ids.len()];
1355 match self.rng.next_u64() % 6 {
1356 0 => self.write(data_id).await,
1357 1 => self.register(data_id).await,
1360 2 => self.forget(data_id).await,
1361 3 => {
1362 debug!("stress update {:.9} to {}", data_id.to_string(), self.ts);
1363 let _ = self.txns.txns_cache.update_ge(&self.ts).await;
1364 }
1365 4 => self.start_read(data_id),
1366 5 => self.start_read(data_id),
1367 _ => unreachable!(""),
1368 }
1369 debug!("stress {} step {} DONE ts={}", self.idx, self.step, self.ts);
1370 self.step += 1;
1371 }
1372
1373 fn key(&self) -> String {
1374 format!("w{}s{}", self.idx, self.step)
1375 }
1376
1377 async fn registered_at_progress_ts(&mut self, data_id: ShardId) -> bool {
1378 self.ts = *self.txns.txns_cache.update_ge(&self.ts).await;
1379 self.txns
1380 .txns_cache
1381 .registered_at_progress(&data_id, &self.ts)
1382 }
1383
1384 async fn write(&mut self, data_id: ShardId) {
1387 self.retry_ts_err(&mut |w: &mut StressWorker| {
1391 Box::pin(async move {
1392 if w.registered_at_progress_ts(data_id).await {
1393 w.write_via_txns(data_id).await
1394 } else {
1395 w.write_direct(data_id).await
1396 }
1397 })
1398 })
1399 .await
1400 }
1401
1402 async fn write_via_txns(&mut self, data_id: ShardId) -> Result<(), u64> {
1403 debug!(
1404 "stress write_via_txns {:.9} at {}",
1405 data_id.to_string(),
1406 self.ts
1407 );
1408 if !self.txns.datas.data_write_for_commit.contains_key(&data_id) {
1414 let x = writer(&self.txns.datas.client, data_id).await;
1415 self.txns
1416 .datas
1417 .data_write_for_commit
1418 .insert(data_id, DataWriteCommit(x));
1419 }
1420 let mut txn = self.txns.begin_test();
1421 txn.tidy(std::mem::take(&mut self.tidy));
1422 txn.write(&data_id, self.key(), (), 1).await;
1423 let apply = txn.commit_at(&mut self.txns, self.ts).await?;
1424 debug!(
1425 "log {:.9} {} at {}",
1426 data_id.to_string(),
1427 self.key(),
1428 self.ts
1429 );
1430 self.log.record_txn(self.ts, &txn);
1431 if self.rng.next_u64() % 3 == 0 {
1432 self.tidy.merge(apply.apply(&mut self.txns).await);
1433 }
1434 Ok(())
1435 }
1436
1437 async fn write_direct(&mut self, data_id: ShardId) -> Result<(), u64> {
1438 debug!(
1439 "stress write_direct {:.9} at {}",
1440 data_id.to_string(),
1441 self.ts
1442 );
1443 self.txns.begin().commit_at(&mut self.txns, self.ts).await?;
1446
1447 let mut write = writer(&self.txns.datas.client, data_id).await;
1448 let mut current = write.shared_upper().into_option().unwrap();
1449 loop {
1450 if !(current <= self.ts) {
1451 return Err(current);
1452 }
1453 let key = self.key();
1454 let updates = [((&key, &()), &self.ts, 1)];
1455 let res = crate::small_caa(
1456 || format!("data {:.9} direct", data_id.to_string()),
1457 &mut write,
1458 &updates,
1459 current,
1460 self.ts + 1,
1461 )
1462 .await;
1463 match res {
1464 Ok(()) => {
1465 debug!("log {:.9} {} at {}", data_id.to_string(), key, self.ts);
1466 self.log.record((data_id, key, self.ts, 1));
1467 return Ok(());
1468 }
1469 Err(new_current) => current = new_current,
1470 }
1471 }
1472 }
1473
1474 async fn register(&mut self, data_id: ShardId) {
1475 self.retry_ts_err(&mut |w: &mut StressWorker| {
1476 debug!("stress register {:.9} at {}", data_id.to_string(), w.ts);
1477 Box::pin(async move {
1478 let data_write = writer(&w.txns.datas.client, data_id).await;
1479 let _ = w.txns.register(w.ts, [data_write]).await?;
1480 Ok(())
1481 })
1482 })
1483 .await
1484 }
1485
1486 async fn forget(&mut self, data_id: ShardId) {
1487 self.retry_ts_err(&mut |w: &mut StressWorker| {
1488 debug!("stress forget {:.9} at {}", data_id.to_string(), w.ts);
1489 Box::pin(async move { w.txns.forget(w.ts, [data_id]).await.map(|_| ()) })
1490 })
1491 .await
1492 }
1493
1494 fn start_read(&mut self, data_id: ShardId) {
1495 debug!(
1496 "stress start_read {:.9} at {}",
1497 data_id.to_string(),
1498 self.ts
1499 );
1500 let client = (*self.txns.datas.client).clone();
1501 let txns_id = self.txns.txns_id();
1502 let as_of = self.ts;
1503 debug!("start_read {:.9} as_of {}", data_id.to_string(), as_of);
1504 let (tx, mut rx) = oneshot::channel();
1505 let subscribe = mz_ore::task::spawn_blocking(
1506 || format!("{:.9}-{}", data_id.to_string(), as_of),
1507 move || {
1508 let mut subscribe = DataSubscribe::new(
1509 "test",
1510 client,
1511 txns_id,
1512 data_id,
1513 as_of,
1514 Antichain::new(),
1515 );
1516 let data_id = format!("{:.9}", data_id.to_string());
1517 let _guard = info_span!("read_worker", %data_id, as_of).entered();
1518 loop {
1519 subscribe.worker.step_or_park(None);
1520 subscribe.capture_output();
1521 let until = match rx.try_recv() {
1522 Ok(ts) => ts,
1523 Err(oneshot::error::TryRecvError::Empty) => {
1524 continue;
1525 }
1526 Err(oneshot::error::TryRecvError::Closed) => 0,
1527 };
1528 while subscribe.progress() < until {
1529 subscribe.worker.step_or_park(None);
1530 subscribe.capture_output();
1531 }
1532 return subscribe.output().clone();
1533 }
1534 },
1535 );
1536 self.reads.push((tx, data_id, as_of, subscribe));
1537 }
1538
1539 async fn retry_ts_err<W>(&mut self, work_fn: &mut W)
1540 where
1541 W: for<'b> FnMut(&'b mut Self) -> BoxFuture<'b, Result<(), u64>>,
1542 {
1543 loop {
1544 match work_fn(self).await {
1545 Ok(ret) => return ret,
1546 Err(new_ts) => self.ts = new_ts,
1547 }
1548 }
1549 }
1550 }
1551
1552 #[mz_ore::test(tokio::test(flavor = "multi_thread"))]
1553 #[cfg_attr(miri, ignore)] async fn stress_correctness() {
1555 const NUM_DATA_SHARDS: usize = 2;
1556 const NUM_WORKERS: usize = 2;
1557 const NUM_STEPS_PER_WORKER: usize = 100;
1558 let seed = UNIX_EPOCH.elapsed().unwrap().hashed();
1559 eprintln!("using seed {}", seed);
1560
1561 let mut clients = PersistClientCache::new_no_metrics();
1562 clients
1565 .cfg()
1566 .set_next_listen_batch_retryer(RetryParameters {
1567 fixed_sleep: Duration::ZERO,
1568 initial_backoff: Duration::from_millis(1),
1569 multiplier: 1,
1570 clamp: Duration::from_millis(1),
1571 });
1572 let client = clients.open(PersistLocation::new_in_mem()).await.unwrap();
1573 let mut txns = TxnsHandle::expect_open(client.clone()).await;
1574 let log = txns.new_log();
1575 let data_ids = (0..NUM_DATA_SHARDS)
1576 .map(|_| ShardId::new())
1577 .collect::<Vec<_>>();
1578 let data_writes = data_ids
1579 .iter()
1580 .map(|data_id| writer(&client, *data_id))
1581 .collect::<FuturesUnordered<_>>()
1582 .collect::<Vec<_>>()
1583 .await;
1584 let _data_sinces = data_ids
1585 .iter()
1586 .map(|data_id| reader(&client, *data_id))
1587 .collect::<FuturesUnordered<_>>()
1588 .collect::<Vec<_>>()
1589 .await;
1590 let register_ts = 1;
1591 txns.register(register_ts, data_writes).await.unwrap();
1592
1593 let mut workers = Vec::new();
1594 for idx in 0..NUM_WORKERS {
1595 clients.clear_state_cache();
1598 let client = clients.open(PersistLocation::new_in_mem()).await.unwrap();
1599 let mut worker = StressWorker {
1600 idx,
1601 log: log.clone(),
1602 txns: TxnsHandle::expect_open_id(client.clone(), txns.txns_id()).await,
1603 data_ids: data_ids.clone(),
1604 tidy: Tidy::default(),
1605 ts: register_ts,
1606 step: 0,
1607 rng: SmallRng::seed_from_u64(seed.wrapping_add(u64::cast_from(idx))),
1608 reads: Vec::new(),
1609 };
1610 let worker = async move {
1611 while worker.step < NUM_STEPS_PER_WORKER {
1612 worker.step().await;
1613 }
1614 (worker.ts, worker.reads)
1615 }
1616 .instrument(info_span!("stress_worker", idx));
1617 workers.push(mz_ore::task::spawn(|| format!("worker-{}", idx), worker));
1618 }
1619
1620 let mut max_ts = 0;
1621 let mut reads = Vec::new();
1622 for worker in workers {
1623 let (t, mut r) = worker.await;
1624 max_ts = std::cmp::max(max_ts, t);
1625 reads.append(&mut r);
1626 }
1627
1628 tokio::time::timeout(Duration::from_secs(30), async {
1630 info!("finished with max_ts of {}", max_ts);
1631 txns.apply_le(&max_ts).await;
1632 for data_id in data_ids {
1633 info!("reading data shard {}", data_id);
1634 log.assert_snapshot(data_id, max_ts)
1635 .instrument(info_span!("read_data", data_id = format!("{:.9}", data_id)))
1636 .await;
1637 }
1638 info!("now waiting for reads {}", max_ts);
1639 for (tx, data_id, as_of, subscribe) in reads {
1640 let _ = tx.send(max_ts + 1);
1641 let output = subscribe.await;
1642 log.assert_eq(data_id, as_of, max_ts + 1, output);
1643 }
1644 })
1645 .await
1646 .unwrap();
1647 }
1648
1649 #[mz_ore::test(tokio::test)]
1650 #[cfg_attr(miri, ignore)] async fn advance_physical_uppers_past() {
1652 let client = PersistClient::new_for_tests().await;
1653 let mut txns = TxnsHandle::expect_open(client.clone()).await;
1654 let log = txns.new_log();
1655 let d0 = txns.expect_register(1).await;
1656 let mut d0_write = writer(&client, d0).await;
1657 let d1 = txns.expect_register(2).await;
1658 let mut d1_write = writer(&client, d1).await;
1659
1660 assert_eq!(d0_write.fetch_recent_upper().await.elements(), &[2]);
1661 assert_eq!(d1_write.fetch_recent_upper().await.elements(), &[3]);
1662
1663 txns.expect_commit_at(3, d0, &["0-2"], &log).await;
1667 assert_eq!(d0_write.fetch_recent_upper().await.elements(), &[4]);
1668 assert_eq!(d1_write.fetch_recent_upper().await.elements(), &[3]);
1669
1670 txns.expect_commit_at(4, d1, &["1-3"], &log).await;
1672 assert_eq!(d0_write.fetch_recent_upper().await.elements(), &[4]);
1673 assert_eq!(d1_write.fetch_recent_upper().await.elements(), &[5]);
1674
1675 log.assert_snapshot(d0, 4).await;
1676 log.assert_snapshot(d1, 4).await;
1677 }
1678
1679 #[mz_ore::test(tokio::test)]
1680 #[cfg_attr(miri, ignore)]
1681 #[allow(clippy::unnecessary_get_then_check)] async fn schemas() {
1683 let client = PersistClient::new_for_tests().await;
1684 let mut txns0 = TxnsHandle::expect_open(client.clone()).await;
1685 let mut txns1 = TxnsHandle::expect_open_id(client.clone(), txns0.txns_id()).await;
1686 let log = txns0.new_log();
1687 let d0 = txns0.expect_register(1).await;
1688
1689 assert!(txns0.datas.data_write_for_commit.get(&d0).is_some());
1692 let mut txn = txns0.begin_test();
1693 txn.write(&d0, "foo".into(), (), 1).await;
1694 let apply = txn.commit_at(&mut txns0, 2).await.unwrap();
1695 log.record_txn(2, &txn);
1696
1697 assert!(txns1.datas.data_write_for_commit.get(&d0).is_none());
1699 let _tidy = apply.apply(&mut txns1).await;
1700
1701 assert!(txns1.datas.data_write_for_commit.get(&d0).is_none());
1703 let res = mz_ore::task::spawn(|| "test", async move {
1704 let mut txn = txns1.begin();
1705 txn.write(&d0, "bar".into(), (), 1).await;
1706 let _ = txn.commit_at(&mut txns1, 3).await;
1708 })
1709 .into_tokio_handle();
1710 assert!(res.await.is_err());
1711
1712 assert!(txns0.datas.data_write_for_commit.get(&d0).is_some());
1715 txns0.forget(3, [d0]).await.unwrap();
1716 assert_none!(txns0.datas.data_write_for_commit.get(&d0));
1717
1718 assert_none!(txns0.datas.data_write_for_commit.get(&d0));
1720 txns0.forget(4, [d0]).await.unwrap();
1721 assert_none!(txns0.datas.data_write_for_commit.get(&d0));
1722
1723 assert_none!(txns0.datas.data_write_for_commit.get(&d0));
1725 txns0
1726 .register(5, [writer(&client, d0).await])
1727 .await
1728 .unwrap();
1729 assert!(txns0.datas.data_write_for_commit.get(&d0).is_some());
1730 txns0.expect_commit_at(6, d0, &["baz"], &log).await;
1731
1732 log.assert_snapshot(d0, 6).await;
1733 }
1734}