1use std::cmp::Ordering;
13use std::collections::{BTreeMap, BTreeSet};
14use std::fmt::Debug;
15use std::future::Future;
16use std::sync::Arc;
17
18use differential_dataflow::difference::Semigroup;
19use differential_dataflow::lattice::Lattice;
20use futures::Stream;
21use mz_ore::instrument;
22use mz_ore::task::AbortOnDropHandle;
23use mz_persist_client::cfg::USE_CRITICAL_SINCE_TXN;
24use mz_persist_client::critical::SinceHandle;
25use mz_persist_client::read::{Cursor, LazyPartStats, ListenEvent, ReadHandle, Since, Subscribe};
26use mz_persist_client::stats::{SnapshotPartsStats, SnapshotStats};
27use mz_persist_client::write::WriteHandle;
28use mz_persist_client::{Diagnostics, PersistClient, ShardId};
29use mz_persist_types::txn::{TxnsCodec, TxnsEntry};
30use mz_persist_types::{Codec, Codec64, Opaque, StepForward};
31use timely::order::TotalOrder;
32use timely::progress::{Antichain, Timestamp};
33use tokio::sync::{mpsc, oneshot};
34use tracing::{debug, warn};
35use uuid::Uuid;
36
37use crate::TxnsCodecDefault;
38use crate::txn_cache::{TxnsCache, TxnsCacheState};
39
40#[derive(Debug)]
47#[cfg_attr(test, derive(PartialEq))]
48pub struct DataSnapshot<T> {
49 pub(crate) data_id: ShardId,
51 pub(crate) latest_write: Option<T>,
54 pub(crate) as_of: T,
56 pub(crate) empty_to: T,
59}
60
61impl<T: Timestamp + Lattice + TotalOrder + Codec64 + Sync> DataSnapshot<T> {
62 #[instrument(level = "debug", fields(shard = %self.data_id, ts = ?self.as_of, empty_to = ?self.empty_to))]
65 pub(crate) async fn unblock_read<K, V, D>(&self, mut data_write: WriteHandle<K, V, T, D>)
66 where
67 K: Debug + Codec,
68 V: Debug + Codec,
69 D: Debug + Semigroup + Ord + Codec64 + Send + Sync,
70 {
71 debug!(
72 "unblock_read latest_write={:?} as_of={:?} for {:.9}",
73 self.latest_write,
74 self.as_of,
75 self.data_id.to_string()
76 );
77 if let Some(latest_write) = self.latest_write.as_ref() {
79 let () = data_write
80 .wait_for_upper_past(&Antichain::from_elem(latest_write.clone()))
81 .await;
82 }
83
84 let Some(mut data_upper) = data_write.shared_upper().into_option() else {
96 debug!(
99 "CaA data snapshot {:.9} shard finalized",
100 self.data_id.to_string(),
101 );
102 return;
103 };
104
105 while data_upper < self.empty_to {
109 if let Some(latest_write) = self.latest_write.as_ref() {
113 assert!(latest_write < &data_upper);
114 }
115 assert!(self.as_of < self.empty_to);
116 let res = crate::small_caa(
117 || format!("data {:.9} unblock reads", self.data_id.to_string()),
118 &mut data_write,
119 &[],
120 data_upper.clone(),
121 self.empty_to.clone(),
122 )
123 .await;
124 match res {
125 Ok(()) => {
126 data_write.expire().await;
130 break;
131 }
132 Err(new_data_upper) => {
133 data_upper = new_data_upper;
134 continue;
135 }
136 }
137 }
138 }
139
140 pub async fn snapshot_and_fetch<K, V, D>(
142 &self,
143 data_read: &mut ReadHandle<K, V, T, D>,
144 ) -> Result<Vec<((Result<K, String>, Result<V, String>), T, D)>, Since<T>>
145 where
146 K: Debug + Codec + Ord,
147 V: Debug + Codec + Ord,
148 D: Debug + Semigroup + Ord + Codec64 + Send + Sync,
149 {
150 let data_write = WriteHandle::from_read(data_read, "unblock_read");
151 self.unblock_read(data_write).await;
152 data_read
153 .snapshot_and_fetch(Antichain::from_elem(self.as_of.clone()))
154 .await
155 }
156
157 pub async fn snapshot_cursor<K, V, D>(
159 &self,
160 data_read: &mut ReadHandle<K, V, T, D>,
161 should_fetch_part: impl for<'a> Fn(Option<&'a LazyPartStats>) -> bool,
162 ) -> Result<Cursor<K, V, T, D>, Since<T>>
163 where
164 K: Debug + Codec + Ord,
165 V: Debug + Codec + Ord,
166 D: Debug + Semigroup + Ord + Codec64 + Send + Sync,
167 {
168 let data_write = WriteHandle::from_read(data_read, "unblock_read");
169 self.unblock_read(data_write).await;
170 data_read
171 .snapshot_cursor(Antichain::from_elem(self.as_of.clone()), should_fetch_part)
172 .await
173 }
174
175 pub async fn snapshot_and_stream<K, V, D>(
177 &self,
178 data_read: &mut ReadHandle<K, V, T, D>,
179 ) -> Result<
180 impl Stream<Item = ((Result<K, String>, Result<V, String>), T, D)> + use<K, V, T, D>,
181 Since<T>,
182 >
183 where
184 K: Debug + Codec + Ord,
185 V: Debug + Codec + Ord,
186 D: Debug + Semigroup + Ord + Codec64 + Send + Sync,
187 {
188 let data_write = WriteHandle::from_read(data_read, "unblock_read");
189 self.unblock_read(data_write).await;
190 data_read
191 .snapshot_and_stream(Antichain::from_elem(self.as_of.clone()))
192 .await
193 }
194
195 pub fn snapshot_stats_from_critical<K, V, D, O>(
197 &self,
198 data_since: &SinceHandle<K, V, T, D, O>,
199 ) -> impl Future<Output = Result<SnapshotStats, Since<T>>> + Send + 'static
200 where
201 K: Debug + Codec + Ord,
202 V: Debug + Codec + Ord,
203 D: Semigroup + Codec64 + Send + Sync,
204 O: Opaque + Codec64,
205 {
206 let as_of = self.latest_write.clone().map(Antichain::from_elem);
223 data_since.snapshot_stats(as_of)
224 }
225
226 pub fn snapshot_stats_from_leased<K, V, D>(
228 &self,
229 data_since: &ReadHandle<K, V, T, D>,
230 ) -> impl Future<Output = Result<SnapshotStats, Since<T>>> + Send + 'static
231 where
232 K: Debug + Codec + Ord,
233 V: Debug + Codec + Ord,
234 D: Ord + Semigroup + Codec64 + Send + Sync,
235 {
236 let as_of = self.latest_write.clone().map(Antichain::from_elem);
253 data_since.snapshot_stats(as_of)
254 }
255
256 pub async fn snapshot_parts_stats<K, V, D>(
258 &self,
259 data_read: &ReadHandle<K, V, T, D>,
260 ) -> Result<SnapshotPartsStats, Since<T>>
261 where
262 K: Debug + Codec + Ord,
263 V: Debug + Codec + Ord,
264 D: Debug + Semigroup + Ord + Codec64 + Send + Sync,
265 {
266 let data_write = WriteHandle::from_read(data_read, "unblock_read");
267 self.unblock_read(data_write).await;
268 data_read
269 .snapshot_parts_stats(Antichain::from_elem(self.as_of.clone()))
270 .await
271 }
272
273 pub(crate) fn validate(&self) -> Result<(), String> {
274 if let Some(latest_write) = self.latest_write.as_ref() {
275 if !(latest_write <= &self.as_of) {
276 return Err(format!(
277 "latest_write {:?} not <= as_of {:?}",
278 self.latest_write, self.as_of
279 ));
280 }
281 }
282 if !(self.as_of < self.empty_to) {
283 return Err(format!(
284 "as_of {:?} not < empty_to {:?}",
285 self.as_of, self.empty_to
286 ));
287 }
288 Ok(())
289 }
290}
291
292#[derive(Debug)]
296#[cfg_attr(any(test, debug_assertions), derive(PartialEq))]
297pub enum DataListenNext<T> {
298 ReadDataTo(T),
301 EmitLogicalProgress(T),
305 WaitForTxnsProgress,
309}
310
311#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
319pub struct DataRemapEntry<T> {
320 pub physical_upper: T,
322 pub logical_upper: T,
325}
326
327#[derive(Debug)]
329pub(crate) struct DataSubscribe<T> {
330 pub(crate) data_id: ShardId,
332 pub(crate) snapshot: Option<DataSnapshot<T>>,
335 pub(crate) remap: DataRemapEntry<T>,
337}
338
339#[derive(Debug)]
341pub struct DataSubscription<T> {
342 subscribe: DataSubscribe<T>,
344 tx: mpsc::UnboundedSender<DataRemapEntry<T>>,
346}
347
348#[async_trait::async_trait]
349pub(crate) trait UnblockRead<T>: Debug + Send {
350 async fn unblock_read(self: Box<Self>, snapshot: DataSnapshot<T>);
351}
352
353#[async_trait::async_trait]
354impl<K, V, T, D> UnblockRead<T> for WriteHandle<K, V, T, D>
355where
356 K: Debug + Codec + Send + Sync,
357 V: Debug + Codec + Send + Sync,
358 T: Timestamp + Lattice + TotalOrder + StepForward + Codec64 + Sync,
359 D: Debug + Semigroup + Ord + Codec64 + Send + Sync,
360{
361 async fn unblock_read(self: Box<Self>, snapshot: DataSnapshot<T>) {
362 snapshot.unblock_read(*self).await;
363 }
364}
365
366#[derive(Debug, Clone)]
368pub struct TxnsRead<T> {
369 txns_id: ShardId,
370 tx: mpsc::UnboundedSender<TxnsReadCmd<T>>,
371 _read_task: Arc<AbortOnDropHandle<()>>,
372 _subscribe_task: Arc<AbortOnDropHandle<()>>,
373}
374
375impl<T: Timestamp + Lattice + Codec64 + Sync> TxnsRead<T> {
376 pub async fn start<C>(client: PersistClient, txns_id: ShardId) -> Self
378 where
379 T: TotalOrder + StepForward,
380 C: TxnsCodec + 'static,
381 {
382 let (tx, rx) = mpsc::unbounded_channel();
383
384 let (mut subscribe_task, cache) =
385 TxnsSubscribeTask::<T, C>::open(&client, txns_id, None, tx.clone()).await;
386
387 let mut task = TxnsReadTask {
388 rx,
389 cache,
390 pending_waits_by_ts: BTreeSet::new(),
391 pending_waits_by_id: BTreeMap::new(),
392 data_subscriptions: Vec::new(),
393 };
394
395 let read_task =
396 mz_ore::task::spawn(|| "txn-wal::read_task", async move { task.run().await });
397
398 let subscribe_task = mz_ore::task::spawn(|| "txn-wal::subscribe_task", async move {
399 subscribe_task.run().await
400 });
401
402 TxnsRead {
403 txns_id,
404 tx,
405 _read_task: Arc::new(read_task.abort_on_drop()),
406 _subscribe_task: Arc::new(subscribe_task.abort_on_drop()),
407 }
408 }
409
410 pub fn txns_id(&self) -> &ShardId {
412 &self.txns_id
413 }
414
415 pub async fn data_snapshot(&self, data_id: ShardId, as_of: T) -> DataSnapshot<T> {
417 self.send(|tx| TxnsReadCmd::DataSnapshot { data_id, as_of, tx })
418 .await
419 }
420
421 pub(crate) async fn data_subscribe(
425 &self,
426 data_id: ShardId,
427 as_of: T,
428 unblock: Box<dyn UnblockRead<T>>,
429 ) -> mpsc::UnboundedReceiver<DataRemapEntry<T>> {
430 self.send(|tx| TxnsReadCmd::DataSubscribe {
431 data_id,
432 as_of,
433 unblock,
434 tx,
435 })
436 .await
437 }
438
439 pub async fn update_ge(&self, ts: T) {
441 let wait = WaitTs::GreaterEqual(ts);
442 self.update(wait).await
443 }
444
445 pub async fn update_gt(&self, ts: T) {
447 let wait = WaitTs::GreaterThan(ts);
448 self.update(wait).await
449 }
450
451 async fn update(&self, wait: WaitTs<T>) {
452 let id = Uuid::new_v4();
453 let res = self.send(|tx| TxnsReadCmd::Wait {
454 id: id.clone(),
455 ts: wait,
456 tx,
457 });
458
459 let mut cancel_guard = CancelWaitOnDrop {
462 id,
463 tx: Some(self.tx.clone()),
464 };
465
466 let res = res.await;
467
468 cancel_guard.complete();
470
471 res
472 }
473
474 async fn send<R: std::fmt::Debug>(
475 &self,
476 cmd: impl FnOnce(oneshot::Sender<R>) -> TxnsReadCmd<T>,
477 ) -> R {
478 let (tx, rx) = oneshot::channel();
479 let req = cmd(tx);
480 let () = self.tx.send(req).expect("task unexpectedly shut down");
481 rx.await.expect("task unexpectedly shut down")
482 }
483}
484
485struct CancelWaitOnDrop<T> {
488 id: Uuid,
489 tx: Option<mpsc::UnboundedSender<TxnsReadCmd<T>>>,
490}
491
492impl<T> CancelWaitOnDrop<T> {
493 pub fn complete(&mut self) {
496 self.tx.take();
497 }
498}
499
500impl<T> Drop for CancelWaitOnDrop<T> {
501 fn drop(&mut self) {
502 let tx = match self.tx.take() {
503 Some(tx) => tx,
504 None => {
505 return;
507 }
508 };
509
510 let _ = tx.send(TxnsReadCmd::CancelWait {
511 id: self.id.clone(),
512 });
513 }
514}
515
516#[derive(Debug)]
517enum TxnsReadCmd<T> {
518 Updates {
519 entries: Vec<(TxnsEntry, T, i64)>,
520 frontier: T,
521 },
522 DataSnapshot {
523 data_id: ShardId,
524 as_of: T,
525 tx: oneshot::Sender<DataSnapshot<T>>,
526 },
527 DataSubscribe {
528 data_id: ShardId,
529 as_of: T,
530 unblock: Box<dyn UnblockRead<T>>,
531 tx: oneshot::Sender<mpsc::UnboundedReceiver<DataRemapEntry<T>>>,
532 },
533 Wait {
534 id: Uuid,
535 ts: WaitTs<T>,
536 tx: oneshot::Sender<()>,
537 },
538 CancelWait {
539 id: Uuid,
540 },
541}
542
543#[derive(Debug, PartialEq, Eq, Clone)]
544enum WaitTs<T> {
545 GreaterEqual(T),
546 GreaterThan(T),
547}
548
549impl<T: Ord> Ord for WaitTs<T> {
557 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
558 let self_ts = match self {
559 WaitTs::GreaterEqual(ts) => ts,
560 WaitTs::GreaterThan(ts) => ts,
561 };
562 let other_ts = match other {
563 WaitTs::GreaterEqual(ts) => ts,
564 WaitTs::GreaterThan(ts) => ts,
565 };
566
567 if self_ts < other_ts {
568 Ordering::Less
569 } else if *self_ts > *other_ts {
570 Ordering::Greater
571 } else if matches!(self, WaitTs::GreaterEqual(_)) && matches!(other, WaitTs::GreaterThan(_))
572 {
573 Ordering::Less
574 } else if matches!(self, WaitTs::GreaterThan(_)) && matches!(other, WaitTs::GreaterEqual(_))
575 {
576 Ordering::Greater
577 } else {
578 Ordering::Equal
579 }
580 }
581}
582
583impl<T: Ord> PartialOrd for WaitTs<T> {
584 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
585 Some(self.cmp(other))
586 }
587}
588
589impl<T: Timestamp + Lattice> WaitTs<T> {
590 fn is_ready(&self, frontier: &T) -> bool {
592 match &self {
593 WaitTs::GreaterEqual(ts) => {
594 if frontier >= ts {
595 return true;
596 }
597 }
598 WaitTs::GreaterThan(ts) => {
599 if frontier > ts {
600 return true;
601 }
602 }
603 };
604
605 false
606 }
607}
608
609#[derive(Debug)]
610struct TxnsReadTask<T> {
611 rx: mpsc::UnboundedReceiver<TxnsReadCmd<T>>,
612 cache: TxnsCacheState<T>,
613 pending_waits_by_ts: BTreeSet<(WaitTs<T>, Uuid)>,
614 pending_waits_by_id: BTreeMap<Uuid, PendingWait<T>>,
615 data_subscriptions: Vec<DataSubscription<T>>,
616}
617
618#[derive(Debug)]
621struct PendingWait<T> {
622 ts: WaitTs<T>,
623 tx: Option<oneshot::Sender<()>>,
624}
625
626impl<T: Timestamp + Lattice + Codec64> PendingWait<T> {
627 fn maybe_complete(&mut self, frontier: &T) -> bool {
632 if self.tx.is_none() {
633 return true;
635 }
636
637 if self.ts.is_ready(frontier) {
638 let _ = self.tx.take().expect("known to exist").send(());
639 return true;
640 }
641
642 if let Some(tx) = self.tx.as_ref() {
643 if tx.is_closed() {
644 self.tx.take();
646 return true;
647 }
648 }
649
650 false
651 }
652}
653
654impl<T> TxnsReadTask<T>
655where
656 T: Timestamp + Lattice + TotalOrder + StepForward + Codec64 + Sync,
657{
658 async fn run(&mut self) {
659 while let Some(cmd) = self.rx.recv().await {
660 match cmd {
661 TxnsReadCmd::Updates { entries, frontier } => {
662 tracing::trace!(
663 "updates from subscribe task at ({:?}): {:?}",
664 frontier,
665 entries
666 );
667
668 self.cache.push_entries(entries.clone(), frontier.clone());
669
670 self.data_subscriptions
671 .retain(|subscription| !subscription.tx.is_closed());
672 for subscription in &mut self.data_subscriptions {
673 Self::update_subscription(subscription, &self.cache);
674 }
675
676 loop {
680 let first_wait = self.pending_waits_by_ts.first();
681
682 let (wait_ts, id) = match first_wait {
683 Some(wait) => wait,
684 None => break,
685 };
686
687 let completed = wait_ts.is_ready(&frontier);
688
689 if completed {
690 let mut wait = self
691 .pending_waits_by_id
692 .remove(id)
693 .expect("wait must be in map");
694
695 let really_completed = wait.maybe_complete(&frontier);
696 assert!(really_completed);
697
698 self.pending_waits_by_ts.pop_first();
699 } else {
700 break;
703 }
704 }
705 }
706 TxnsReadCmd::DataSnapshot { data_id, as_of, tx } => {
707 let res = self.cache.data_snapshot(data_id, as_of.clone());
708 let _ = tx.send(res);
709 }
710 TxnsReadCmd::DataSubscribe {
711 data_id,
712 as_of,
713 unblock,
714 tx,
715 } => {
716 let mut subscribe = self.cache.data_subscribe(data_id, as_of.clone());
717 if let Some(snapshot) = subscribe.snapshot.take() {
718 mz_ore::task::spawn(
719 || "txn-wal::unblock_subscribe",
720 unblock.unblock_read(snapshot),
721 );
722 }
723 let (sub_tx, sub_rx) = mpsc::unbounded_channel();
724 sub_tx
726 .send(subscribe.remap.clone())
727 .expect("receiver still in scope");
728 let mut subscription = DataSubscription {
729 subscribe,
730 tx: sub_tx,
731 };
732 Self::update_subscription(&mut subscription, &self.cache);
734 self.data_subscriptions.push(subscription);
735 let _ = tx.send(sub_rx);
736 }
737 TxnsReadCmd::Wait { id, ts, tx } => {
738 let mut pending_wait = PendingWait { ts, tx: Some(tx) };
739 let completed = pending_wait.maybe_complete(&self.cache.progress_exclusive);
740 if !completed {
741 let wait_ts = pending_wait.ts.clone();
742 self.pending_waits_by_ts.insert((wait_ts, id.clone()));
743 self.pending_waits_by_id.insert(id, pending_wait);
744 }
745 }
746 TxnsReadCmd::CancelWait { id } => {
747 if let Some(pending_wait) = self.pending_waits_by_id.remove(&id) {
752 self.pending_waits_by_ts.remove(&(pending_wait.ts, id));
753 }
754 }
755 }
756 }
757 warn!("TxnsReadTask shutting down");
758 }
759
760 fn update_subscription(subscription: &mut DataSubscription<T>, cache: &TxnsCacheState<T>) {
761 loop {
762 match cache.data_listen_next(
763 &subscription.subscribe.data_id,
764 &subscription.subscribe.remap.logical_upper,
765 ) {
766 DataListenNext::ReadDataTo(new_upper) => {
768 subscription.subscribe.remap.physical_upper = new_upper.clone();
770 subscription.subscribe.remap.logical_upper = new_upper.clone();
771 }
772 DataListenNext::EmitLogicalProgress(new_progress) => {
775 assert!(subscription.subscribe.remap.physical_upper < new_progress);
776 assert!(subscription.subscribe.remap.logical_upper < new_progress);
777
778 subscription.subscribe.remap.logical_upper = new_progress.clone();
779 }
780 DataListenNext::WaitForTxnsProgress => break,
783 };
784 let _ = subscription.tx.send(subscription.subscribe.remap.clone());
786 }
787 assert_eq!(
788 cache.progress_exclusive, subscription.subscribe.remap.logical_upper,
789 "we should update the subscription up to the current progress_exclusive"
790 );
791 }
792}
793
794#[derive(Debug)]
797struct TxnsSubscribeTask<T, C: TxnsCodec = TxnsCodecDefault> {
798 txns_subscribe: Subscribe<C::Key, C::Val, T, i64>,
799
800 buf: Vec<(TxnsEntry, T, i64)>,
803
804 tx: mpsc::UnboundedSender<TxnsReadCmd<T>>,
806
807 only_data_id: Option<ShardId>,
814}
815
816impl<T, C> TxnsSubscribeTask<T, C>
817where
818 T: Timestamp + Lattice + TotalOrder + StepForward + Codec64 + Sync,
819 C: TxnsCodec,
820{
821 pub async fn open(
833 client: &PersistClient,
834 txns_id: ShardId,
835 only_data_id: Option<ShardId>,
836 tx: mpsc::UnboundedSender<TxnsReadCmd<T>>,
837 ) -> (Self, TxnsCacheState<T>) {
838 let (txns_key_schema, txns_val_schema) = C::schemas();
839 let txns_read: ReadHandle<<C as TxnsCodec>::Key, <C as TxnsCodec>::Val, T, i64> = client
840 .open_leased_reader(
841 txns_id,
842 Arc::new(txns_key_schema),
843 Arc::new(txns_val_schema),
844 Diagnostics {
845 shard_name: "txns".to_owned(),
846 handle_purpose: "read txns".to_owned(),
847 },
848 USE_CRITICAL_SINCE_TXN.get(client.dyncfgs()),
849 )
850 .await
851 .expect("txns schema shouldn't change");
852 let (state, txns_subscribe) = TxnsCacheState::init::<C>(only_data_id, txns_read).await;
853 let subscribe_task = TxnsSubscribeTask {
854 txns_subscribe,
855 buf: Vec::new(),
856 tx,
857 only_data_id,
858 };
859
860 (subscribe_task, state)
861 }
862
863 async fn run(&mut self) {
864 loop {
865 let events = self.txns_subscribe.next(None).await;
866 for event in events {
867 match event {
868 ListenEvent::Progress(frontier) => {
869 let frontier_ts = frontier
870 .into_option()
871 .expect("nothing should close the txns shard");
872 let entries = std::mem::take(&mut self.buf);
873 let res = self.tx.send(TxnsReadCmd::Updates {
874 entries,
875 frontier: frontier_ts,
876 });
877 if let Err(e) = res {
878 warn!("TxnsSubscribeTask shutting down: {}", e);
879 return;
880 }
881 }
882 ListenEvent::Updates(parts) => {
883 TxnsCache::<T, C>::fetch_parts(
884 self.only_data_id.clone(),
885 &mut self.txns_subscribe,
886 parts,
887 &mut self.buf,
888 )
889 .await;
890 }
891 };
892 }
893 }
894 }
895}
896
897#[cfg(test)]
898mod tests {
899 use super::WaitTs;
900
901 #[mz_ore::test]
902 fn wait_ts_ord() {
903 let mut waits = vec![
904 WaitTs::GreaterThan(3),
905 WaitTs::GreaterThan(2),
906 WaitTs::GreaterEqual(2),
907 WaitTs::GreaterThan(1),
908 ];
909
910 waits.sort();
911
912 let expected = vec![
913 WaitTs::GreaterThan(1),
914 WaitTs::GreaterEqual(2),
915 WaitTs::GreaterThan(2),
916 WaitTs::GreaterThan(3),
917 ];
918
919 assert_eq!(waits, expected);
920 }
921}