1use std::any::Any;
13use std::fmt::Debug;
14use std::future::Future;
15use std::sync::mpsc::TryRecvError;
16use std::sync::{Arc, mpsc};
17use std::time::Duration;
18
19use differential_dataflow::Hashable;
20use differential_dataflow::difference::Monoid;
21use differential_dataflow::lattice::Lattice;
22use futures::StreamExt;
23use mz_dyncfg::{Config, ConfigSet};
24use mz_ore::cast::CastFrom;
25use mz_persist_client::cfg::RetryParameters;
26use mz_persist_client::operators::shard_source::{
27 ErrorHandler, FilterResult, SnapshotMode, shard_source,
28};
29use mz_persist_client::{Diagnostics, PersistClient, ShardId};
30use mz_persist_types::codec_impls::{StringSchema, UnitSchema};
31use mz_persist_types::txn::TxnsCodec;
32use mz_persist_types::{Codec, Codec64, StepForward};
33use mz_timely_util::builder_async::{
34 AsyncInputHandle, Event as AsyncEvent, InputConnection,
35 OperatorBuilder as AsyncOperatorBuilder, PressOnDropButton,
36};
37use timely::container::CapacityContainerBuilder;
38use timely::dataflow::channels::pact::Pipeline;
39use timely::dataflow::operators::capture::Event;
40use timely::dataflow::operators::{Broadcast, Capture, Leave, Map, Probe};
41use timely::dataflow::{ProbeHandle, Scope, Stream};
42use timely::order::TotalOrder;
43use timely::progress::{Antichain, Timestamp};
44use timely::worker::Worker;
45use timely::{Data, PartialOrder, WorkerConfig};
46use tracing::debug;
47
48use crate::TxnsCodecDefault;
49use crate::txn_cache::TxnsCache;
50use crate::txn_read::{DataRemapEntry, TxnsRead};
51
52pub fn txns_progress<K, V, T, D, P, C, F, G>(
95 passthrough: Stream<G, P>,
96 name: &str,
97 ctx: &TxnsContext,
98 client_fn: impl Fn() -> F,
99 txns_id: ShardId,
100 data_id: ShardId,
101 as_of: T,
102 until: Antichain<T>,
103 data_key_schema: Arc<K::Schema>,
104 data_val_schema: Arc<V::Schema>,
105) -> (Stream<G, P>, Vec<PressOnDropButton>)
106where
107 K: Debug + Codec + Send + Sync,
108 V: Debug + Codec + Send + Sync,
109 T: Timestamp + Lattice + TotalOrder + StepForward + Codec64 + Sync,
110 D: Debug + Data + Monoid + Ord + Codec64 + Send + Sync,
111 P: Debug + Data,
112 C: TxnsCodec + 'static,
113 F: Future<Output = PersistClient> + Send + 'static,
114 G: Scope<Timestamp = T>,
115{
116 let unique_id = (name, passthrough.scope().addr()).hashed();
117 let (remap, source_button) = txns_progress_source_global::<K, V, T, D, P, C, G>(
118 passthrough.scope(),
119 name,
120 ctx.clone(),
121 client_fn(),
122 txns_id,
123 data_id,
124 as_of,
125 data_key_schema,
126 data_val_schema,
127 unique_id,
128 );
129 let remap = remap.broadcast();
132 let (passthrough, frontiers_button) = txns_progress_frontiers::<K, V, T, D, P, C, G>(
133 remap,
134 passthrough,
135 name,
136 data_id,
137 until,
138 unique_id,
139 );
140 (passthrough, vec![source_button, frontiers_button])
141}
142
143fn txns_progress_source_global<K, V, T, D, P, C, G>(
157 scope: G,
158 name: &str,
159 ctx: TxnsContext,
160 client: impl Future<Output = PersistClient> + 'static,
161 txns_id: ShardId,
162 data_id: ShardId,
163 as_of: T,
164 data_key_schema: Arc<K::Schema>,
165 data_val_schema: Arc<V::Schema>,
166 unique_id: u64,
167) -> (Stream<G, DataRemapEntry<T>>, PressOnDropButton)
168where
169 K: Debug + Codec + Send + Sync,
170 V: Debug + Codec + Send + Sync,
171 T: Timestamp + Lattice + TotalOrder + StepForward + Codec64 + Sync,
172 D: Debug + Data + Monoid + Ord + Codec64 + Send + Sync,
173 P: Debug + Data,
174 C: TxnsCodec + 'static,
175 G: Scope<Timestamp = T>,
176{
177 let worker_idx = scope.index();
178 let chosen_worker = usize::cast_from(name.hashed()) % scope.peers();
179 let name = format!("txns_progress_source({})", name);
180 let mut builder = AsyncOperatorBuilder::new(name.clone(), scope);
181 let name = format!("{} [{}] {:.9}", name, unique_id, data_id.to_string());
182 let (remap_output, remap_stream) = builder.new_output::<CapacityContainerBuilder<_>>();
183
184 let shutdown_button = builder.build(move |capabilities| async move {
185 if worker_idx != chosen_worker {
186 return;
187 }
188
189 let [mut cap]: [_; 1] = capabilities.try_into().expect("one capability per output");
190 let client = client.await;
191 let txns_read = ctx.get_or_init::<T, C>(&client, txns_id).await;
192
193 let _ = txns_read.update_gt(as_of.clone()).await;
194 let data_write = client
195 .open_writer::<K, V, T, D>(
196 data_id,
197 Arc::clone(&data_key_schema),
198 Arc::clone(&data_val_schema),
199 Diagnostics::from_purpose("data read physical upper"),
200 )
201 .await
202 .expect("schema shouldn't change");
203 let mut rx = txns_read
204 .data_subscribe(data_id, as_of.clone(), data_write)
205 .await;
206 debug!("{} starting as_of={:?}", name, as_of);
207
208 let mut physical_upper = T::minimum();
209
210 while let Some(remap) = rx.recv().await {
211 assert!(physical_upper <= remap.physical_upper);
212 assert!(physical_upper < remap.logical_upper);
213
214 let logical_upper = remap.logical_upper.clone();
215 if remap.physical_upper != physical_upper {
220 physical_upper = remap.physical_upper.clone();
221 debug!("{} emitting {:?}", name, remap);
222 remap_output.give(&cap, remap);
223 } else {
224 debug!("{} not emitting {:?}", name, remap);
225 }
226 cap.downgrade(&logical_upper);
227 }
228 });
229 (remap_stream, shutdown_button.press_on_drop())
230}
231
232fn txns_progress_frontiers<K, V, T, D, P, C, G>(
233 remap: Stream<G, DataRemapEntry<T>>,
234 passthrough: Stream<G, P>,
235 name: &str,
236 data_id: ShardId,
237 until: Antichain<T>,
238 unique_id: u64,
239) -> (Stream<G, P>, PressOnDropButton)
240where
241 K: Debug + Codec,
242 V: Debug + Codec,
243 T: Timestamp + Lattice + TotalOrder + StepForward + Codec64,
244 D: Data + Monoid + Codec64 + Send + Sync,
245 P: Debug + Data,
246 C: TxnsCodec,
247 G: Scope<Timestamp = T>,
248{
249 let name = format!("txns_progress_frontiers({})", name);
250 let mut builder = AsyncOperatorBuilder::new(name.clone(), passthrough.scope());
251 let name = format!(
252 "{} [{}] {}/{} {:.9}",
253 name,
254 unique_id,
255 passthrough.scope().index(),
256 passthrough.scope().peers(),
257 data_id.to_string(),
258 );
259 let (passthrough_output, passthrough_stream) =
260 builder.new_output::<CapacityContainerBuilder<_>>();
261 let mut remap_input = builder.new_disconnected_input(&remap, Pipeline);
262 let mut passthrough_input = builder.new_disconnected_input(&passthrough, Pipeline);
263
264 let shutdown_button = builder.build(move |capabilities| async move {
265 let [mut cap]: [_; 1] = capabilities.try_into().expect("one capability per output");
266
267 let mut remap = Some(DataRemapEntry {
269 physical_upper: T::minimum(),
270 logical_upper: T::minimum(),
271 });
272 loop {
275 debug!("{} remap {:?}", name, remap);
276 if let Some(r) = remap.as_ref() {
277 assert!(r.physical_upper <= r.logical_upper);
278 if r.physical_upper.less_equal(cap.time()) {
290 if cap.time() < &r.logical_upper {
291 cap.downgrade(&r.logical_upper);
292 }
293 remap = txns_progress_frontiers_read_remap_input(
294 &name,
295 &mut remap_input,
296 r.clone(),
297 )
298 .await;
299 continue;
300 }
301 }
302
303 let event = passthrough_input
307 .next()
308 .await
309 .unwrap_or_else(|| AsyncEvent::Progress(Antichain::new()));
310 match event {
311 AsyncEvent::Data(_data_cap, mut data) => {
313 debug!("{} emitting data {:?}", name, data);
318 passthrough_output.give_container(&cap, &mut data);
319 }
320 AsyncEvent::Progress(new_progress) => {
321 if PartialOrder::less_equal(&until, &new_progress) {
332 debug!(
333 "{} progress {:?} has passed until {:?}",
334 name,
335 new_progress.elements(),
336 until.elements()
337 );
338 return;
339 }
340 let Some(new_progress) = new_progress.into_option() else {
342 return;
343 };
344
345 if cap.time() < &new_progress {
350 debug!("{} downgrading cap to {:?}", name, new_progress);
351 cap.downgrade(&new_progress);
352 }
353 }
354 }
355 }
356 });
357 (passthrough_stream, shutdown_button.press_on_drop())
358}
359
360async fn txns_progress_frontiers_read_remap_input<T, C>(
361 name: &str,
362 input: &mut AsyncInputHandle<T, Vec<DataRemapEntry<T>>, C>,
363 mut remap: DataRemapEntry<T>,
364) -> Option<DataRemapEntry<T>>
365where
366 T: Timestamp + TotalOrder,
367 C: InputConnection<T>,
368{
369 while let Some(event) = input.next().await {
370 let xs = match event {
371 AsyncEvent::Progress(logical_upper) => {
372 if let Some(logical_upper) = logical_upper.into_option() {
373 if remap.logical_upper < logical_upper {
374 remap.logical_upper = logical_upper;
375 return Some(remap);
376 }
377 }
378 continue;
379 }
380 AsyncEvent::Data(_cap, xs) => xs,
381 };
382 for x in xs {
383 debug!("{} got remap {:?}", name, x);
384 if remap.logical_upper < x.logical_upper {
386 assert!(
387 remap.physical_upper <= x.physical_upper,
388 "previous remap physical upper {:?} is ahead of new remap physical upper {:?}",
389 remap.physical_upper,
390 x.physical_upper,
391 );
392 remap = x;
403 }
404 }
405 return Some(remap);
406 }
407 None
409}
410
411#[derive(Default, Debug, Clone)]
413pub struct TxnsContext {
414 read: Arc<tokio::sync::OnceCell<Box<dyn Any + Send + Sync>>>,
415}
416
417impl TxnsContext {
418 async fn get_or_init<T, C>(&self, client: &PersistClient, txns_id: ShardId) -> TxnsRead<T>
419 where
420 T: Timestamp + Lattice + Codec64 + TotalOrder + StepForward + Sync,
421 C: TxnsCodec + 'static,
422 {
423 let read = self
424 .read
425 .get_or_init(|| {
426 let client = client.clone();
427 async move {
428 let read: Box<dyn Any + Send + Sync> =
429 Box::new(TxnsRead::<T>::start::<C>(client, txns_id).await);
430 read
431 }
432 })
433 .await
434 .downcast_ref::<TxnsRead<T>>()
435 .expect("timestamp types should match");
436 assert_eq!(&txns_id, read.txns_id());
438 read.clone()
439 }
440}
441
442pub(crate) const DATA_SHARD_RETRYER_INITIAL_BACKOFF: Config<Duration> = Config::new(
446 "persist_txns_data_shard_retryer_initial_backoff",
447 Duration::from_millis(1024),
448 "The initial backoff when polling for new batches from a txns data shard persist_source.",
449);
450
451pub(crate) const DATA_SHARD_RETRYER_MULTIPLIER: Config<u32> = Config::new(
452 "persist_txns_data_shard_retryer_multiplier",
453 2,
454 "The backoff multiplier when polling for new batches from a txns data shard persist_source.",
455);
456
457pub(crate) const DATA_SHARD_RETRYER_CLAMP: Config<Duration> = Config::new(
458 "persist_txns_data_shard_retryer_clamp",
459 Duration::from_secs(16),
460 "The backoff clamp duration when polling for new batches from a txns data shard persist_source.",
461);
462
463pub fn txns_data_shard_retry_params(cfg: &ConfigSet) -> RetryParameters {
466 RetryParameters {
467 fixed_sleep: Duration::ZERO,
468 initial_backoff: DATA_SHARD_RETRYER_INITIAL_BACKOFF.get(cfg),
469 multiplier: DATA_SHARD_RETRYER_MULTIPLIER.get(cfg),
470 clamp: DATA_SHARD_RETRYER_CLAMP.get(cfg),
471 }
472}
473
474pub struct DataSubscribe {
482 pub(crate) as_of: u64,
483 pub(crate) worker: Worker<timely::communication::allocator::Thread>,
484 data: ProbeHandle<u64>,
485 txns: ProbeHandle<u64>,
486 capture: mpsc::Receiver<Event<u64, Vec<(String, u64, i64)>>>,
487 output: Vec<(String, u64, i64)>,
488
489 _tokens: Vec<PressOnDropButton>,
490}
491
492impl std::fmt::Debug for DataSubscribe {
493 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
494 let DataSubscribe {
495 as_of,
496 worker: _,
497 data,
498 txns,
499 capture: _,
500 output,
501 _tokens: _,
502 } = self;
503 f.debug_struct("DataSubscribe")
504 .field("as_of", as_of)
505 .field("data", data)
506 .field("txns", txns)
507 .field("output", output)
508 .finish_non_exhaustive()
509 }
510}
511
512impl DataSubscribe {
513 pub fn new(
515 name: &str,
516 client: PersistClient,
517 txns_id: ShardId,
518 data_id: ShardId,
519 as_of: u64,
520 until: Antichain<u64>,
521 ) -> Self {
522 let mut worker = Worker::new(
523 WorkerConfig::default(),
524 timely::communication::allocator::Thread::default(),
525 Some(std::time::Instant::now()),
526 );
527 let (data, txns, capture, tokens) = worker.dataflow::<u64, _, _>(|scope| {
528 let (data_stream, shard_source_token) = scope.scoped::<u64, _, _>("hybrid", |scope| {
529 let client = client.clone();
530 let (data_stream, token) = shard_source::<String, (), u64, i64, _, _, _>(
531 scope,
532 name,
533 move || std::future::ready(client.clone()),
534 data_id,
535 Some(Antichain::from_elem(as_of)),
536 SnapshotMode::Include,
537 until.clone(),
538 false.then_some(|_, _: &_, _| unreachable!()),
539 Arc::new(StringSchema),
540 Arc::new(UnitSchema),
541 FilterResult::keep_all,
542 false.then_some(|| unreachable!()),
543 async {},
544 ErrorHandler::Halt("data_subscribe"),
545 );
546 (data_stream.leave(), token)
547 });
548 let (data, txns) = (ProbeHandle::new(), ProbeHandle::new());
549 let data_stream = data_stream.flat_map(|part| {
550 let part = part.parse();
551 part.part.map(|((k, ()), t, d)| (k, t, d))
552 });
553 let data_stream = data_stream.probe_with(&data);
554 let (data_stream, mut txns_progress_token) =
555 txns_progress::<String, (), u64, i64, _, TxnsCodecDefault, _, _>(
556 data_stream,
557 name,
558 &TxnsContext::default(),
559 || std::future::ready(client.clone()),
560 txns_id,
561 data_id,
562 as_of,
563 until,
564 Arc::new(StringSchema),
565 Arc::new(UnitSchema),
566 );
567 let data_stream = data_stream.probe_with(&txns);
568 let mut tokens = shard_source_token;
569 tokens.append(&mut txns_progress_token);
570 (data, txns, data_stream.capture(), tokens)
571 });
572 Self {
573 as_of,
574 worker,
575 data,
576 txns,
577 capture,
578 output: Vec::new(),
579 _tokens: tokens,
580 }
581 }
582
583 pub fn progress(&self) -> u64 {
585 self.txns
586 .with_frontier(|f| *f.as_option().unwrap_or(&u64::MAX))
587 }
588
589 pub fn step(&mut self) {
591 self.worker.step();
592 self.capture_output()
593 }
594
595 pub(crate) fn capture_output(&mut self) {
596 loop {
597 let event = match self.capture.try_recv() {
598 Ok(x) => x,
599 Err(TryRecvError::Empty) | Err(TryRecvError::Disconnected) => break,
600 };
601 match event {
602 Event::Progress(_) => {}
603 Event::Messages(_, mut msgs) => self.output.append(&mut msgs),
604 }
605 }
606 }
607
608 #[cfg(test)]
610 pub async fn step_past(&mut self, ts: u64) {
611 while self.txns.less_equal(&ts) {
612 tracing::trace!(
613 "progress at {:?}",
614 self.txns.with_frontier(|x| x.to_owned()).elements()
615 );
616 self.step();
617 tokio::task::yield_now().await;
618 }
619 }
620
621 pub fn output(&self) -> &Vec<(String, u64, i64)> {
623 &self.output
624 }
625}
626
627#[derive(Debug)]
629pub struct DataSubscribeTask {
630 tx: std::sync::mpsc::Sender<(
633 Option<u64>,
634 tokio::sync::oneshot::Sender<(Vec<(String, u64, i64)>, u64)>,
635 )>,
636 task: mz_ore::task::JoinHandle<Vec<(String, u64, i64)>>,
637 output: Vec<(String, u64, i64)>,
638 progress: u64,
639}
640
641impl DataSubscribeTask {
642 pub async fn new(
644 client: PersistClient,
645 txns_id: ShardId,
646 data_id: ShardId,
647 as_of: u64,
648 ) -> Self {
649 let cache = TxnsCache::open(&client, txns_id, Some(data_id)).await;
650 let (tx, rx) = std::sync::mpsc::channel();
651 let task = mz_ore::task::spawn_blocking(
652 || "data_subscribe task",
653 move || Self::task(client, cache, data_id, as_of, rx),
654 );
655 DataSubscribeTask {
656 tx,
657 task,
658 output: Vec::new(),
659 progress: 0,
660 }
661 }
662
663 #[cfg(test)]
664 async fn step(&mut self) {
665 self.send(None).await;
666 }
667
668 pub async fn step_past(&mut self, ts: u64) -> u64 {
670 self.send(Some(ts)).await;
671 self.progress
672 }
673
674 pub fn output(&self) -> &Vec<(String, u64, i64)> {
676 &self.output
677 }
678
679 async fn send(&mut self, ts: Option<u64>) {
680 let (tx, rx) = tokio::sync::oneshot::channel();
681 self.tx.send((ts, tx)).expect("task should be running");
682 let (mut new_output, new_progress) = rx.await.expect("task should be running");
683 self.output.append(&mut new_output);
684 assert!(self.progress <= new_progress);
685 self.progress = new_progress;
686 }
687
688 pub async fn finish(self) -> Vec<(String, u64, i64)> {
693 drop(self.tx);
695 self.task.await
696 }
697
698 fn task(
699 client: PersistClient,
700 cache: TxnsCache<u64>,
701 data_id: ShardId,
702 as_of: u64,
703 rx: std::sync::mpsc::Receiver<(
704 Option<u64>,
705 tokio::sync::oneshot::Sender<(Vec<(String, u64, i64)>, u64)>,
706 )>,
707 ) -> Vec<(String, u64, i64)> {
708 let mut subscribe = DataSubscribe::new(
709 "DataSubscribeTask",
710 client.clone(),
711 cache.txns_id(),
712 data_id,
713 as_of,
714 Antichain::new(),
715 );
716 let mut output = Vec::new();
717 loop {
718 let (ts, tx) = match rx.try_recv() {
719 Ok(x) => x,
720 Err(TryRecvError::Empty) => {
721 subscribe.step();
723 continue;
724 }
725 Err(TryRecvError::Disconnected) => {
726 return output;
728 }
729 };
730 subscribe.step();
732 if let Some(ts) = ts {
734 while subscribe.progress() <= ts {
735 subscribe.step();
736 }
737 }
738 let new_output = std::mem::take(&mut subscribe.output);
739 output.extend(new_output.iter().cloned());
740 let _ = tx.send((new_output, subscribe.progress()));
741 }
742 }
743}
744
745#[cfg(test)]
746mod tests {
747 use itertools::{Either, Itertools};
748 use mz_persist_types::Opaque;
749
750 use crate::tests::writer;
751 use crate::txns::TxnsHandle;
752
753 use super::*;
754
755 impl<K, V, T, D, O, C> TxnsHandle<K, V, T, D, O, C>
756 where
757 K: Debug + Codec,
758 V: Debug + Codec,
759 T: Timestamp + Lattice + TotalOrder + StepForward + Codec64 + Sync,
760 D: Debug + Monoid + Ord + Codec64 + Send + Sync,
761 O: Opaque + Debug + Codec64,
762 C: TxnsCodec,
763 {
764 async fn subscribe_task(
765 &self,
766 client: &PersistClient,
767 data_id: ShardId,
768 as_of: u64,
769 ) -> DataSubscribeTask {
770 DataSubscribeTask::new(client.clone(), self.txns_id(), data_id, as_of).await
771 }
772 }
773
774 #[mz_ore::test(tokio::test(flavor = "multi_thread"))]
775 #[cfg_attr(miri, ignore)] async fn data_subscribe() {
777 async fn step(subs: &mut Vec<DataSubscribeTask>) {
778 for sub in subs.iter_mut() {
779 sub.step().await;
780 }
781 }
782
783 let client = PersistClient::new_for_tests().await;
784 let mut txns = TxnsHandle::expect_open(client.clone()).await;
785 let log = txns.new_log();
786 let d0 = ShardId::new();
787
788 let mut subs = Vec::new();
790 subs.push(txns.subscribe_task(&client, d0, 5).await);
791 step(&mut subs).await;
792
793 txns.register(1, [writer(&client, d0).await]).await.unwrap();
796 subs.push(txns.subscribe_task(&client, d0, 5).await);
797 step(&mut subs).await;
798
799 let d1 = txns.expect_register(2).await;
801 txns.expect_commit_at(3, d1, &["nope"], &log).await;
802 subs.push(txns.subscribe_task(&client, d0, 5).await);
803 step(&mut subs).await;
804
805 txns.expect_commit_at(4, d0, &["4"], &log).await;
807 subs.push(txns.subscribe_task(&client, d0, 5).await);
808 step(&mut subs).await;
809
810 txns.expect_commit_at(5, d0, &["5"], &log).await;
812 subs.push(txns.subscribe_task(&client, d0, 5).await);
813 step(&mut subs).await;
814
815 txns.expect_commit_at(6, d0, &["6"], &log).await;
817 subs.push(txns.subscribe_task(&client, d0, 5).await);
818 step(&mut subs).await;
819
820 txns.expect_commit_at(7, d1, &["nope"], &log).await;
822 subs.push(txns.subscribe_task(&client, d0, 5).await);
823 step(&mut subs).await;
824
825 for mut sub in subs {
828 let progress = sub.step_past(7).await;
829 assert_eq!(progress, 8);
830 log.assert_eq(d0, 5, 8, sub.finish().await);
831 }
832 }
833
834 #[mz_ore::test(tokio::test(flavor = "multi_thread"))]
835 #[cfg_attr(miri, ignore)] async fn subscribe_shard_finalize() {
837 let client = PersistClient::new_for_tests().await;
838 let mut txns = TxnsHandle::expect_open(client.clone()).await;
839 let log = txns.new_log();
840 let d0 = txns.expect_register(1).await;
841
842 let mut sub = txns.read_cache().expect_subscribe(&client, d0, 1);
844 sub.step_past(1).await;
845
846 txns.expect_commit_at(2, d0, &["foo"], &log).await;
848 sub.step_past(2).await;
849
850 txns.forget(3, [d0]).await.unwrap();
852 sub.step_past(3).await;
853
854 txns.begin().commit_at(&mut txns, 7).await.unwrap();
857
858 let mut d0_write = writer(&client, d0).await;
861 let key = "bar".to_owned();
862 crate::small_caa(|| "test", &mut d0_write, &[((&key, &()), &5, 1)], 4, 6)
863 .await
864 .unwrap();
865 log.record((d0, key, 5, 1));
866 sub.step_past(4).await;
867
868 let () = d0_write
870 .compare_and_append_batch(&mut [], Antichain::from_elem(6), Antichain::new(), true)
871 .await
872 .unwrap()
873 .unwrap();
874 while sub.txns.less_than(&u64::MAX) {
875 sub.step();
876 tokio::task::yield_now().await;
877 }
878
879 log.assert_eq(d0, 1, u64::MAX, sub.output().clone());
881
882 log.assert_subscribe(d0, 4, u64::MAX).await;
886 log.assert_subscribe(d0, 6, u64::MAX).await;
887 }
888
889 #[mz_ore::test(tokio::test(flavor = "multi_thread"))]
890 #[cfg_attr(miri, ignore)] async fn subscribe_shard_register_forget() {
892 let client = PersistClient::new_for_tests().await;
893 let mut txns = TxnsHandle::expect_open(client.clone()).await;
894 let d0 = ShardId::new();
895
896 let mut sub = txns.read_cache().expect_subscribe(&client, d0, 0);
898 assert_eq!(sub.progress(), 0);
899
900 txns.register(10, [writer(&client, d0).await])
902 .await
903 .unwrap();
904 sub.step_past(10).await;
905 assert!(
906 sub.progress() > 10,
907 "operator should advance past 10 when shard is registered"
908 );
909
910 txns.forget(20, [d0]).await.unwrap();
912 sub.step_past(20).await;
913 assert!(
914 sub.progress() > 20,
915 "operator should advance past 20 when shard is forgotten"
916 );
917 }
918
919 #[mz_ore::test(tokio::test)]
920 #[cfg_attr(miri, ignore)] async fn as_of_until() {
922 let client = PersistClient::new_for_tests().await;
923 let mut txns = TxnsHandle::expect_open(client.clone()).await;
924 let log = txns.new_log();
925
926 let d0 = txns.expect_register(1).await;
927 txns.expect_commit_at(2, d0, &["2"], &log).await;
928 txns.expect_commit_at(3, d0, &["3"], &log).await;
929 txns.expect_commit_at(4, d0, &["4"], &log).await;
930 txns.expect_commit_at(5, d0, &["5"], &log).await;
931 txns.expect_commit_at(6, d0, &["6"], &log).await;
932 txns.expect_commit_at(7, d0, &["7"], &log).await;
933
934 let until = 5;
935 let mut sub = DataSubscribe::new(
936 "as_of_until",
937 client,
938 txns.txns_id(),
939 d0,
940 3,
941 Antichain::from_elem(until),
942 );
943 while sub.txns.less_equal(&5) {
947 sub.worker.step();
948 tokio::task::yield_now().await;
949 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
950 }
951 let (actual_progresses, actual_events): (Vec<_>, Vec<_>) =
952 sub.capture.into_iter().partition_map(|event| match event {
953 Event::Progress(progress) => Either::Left(progress),
954 Event::Messages(ts, data) => Either::Right((ts, data)),
955 });
956 let expected = vec![
957 (3, vec![("2".to_owned(), 3, 1), ("3".to_owned(), 3, 1)]),
958 (3, vec![("4".to_owned(), 4, 1)]),
959 ];
960 assert_eq!(actual_events, expected);
961
962 if let Some(max_progress_ts) = actual_progresses
966 .into_iter()
967 .flatten()
968 .map(|(ts, _diff)| ts)
969 .max()
970 {
971 assert!(max_progress_ts < until, "{max_progress_ts} < {until}");
972 }
973 }
974}