1use std::cell::RefCell;
13use std::collections::BTreeMap;
14use std::collections::hash_map::DefaultHasher;
15use std::convert::Infallible;
16use std::fmt::{Debug, Formatter};
17use std::future::Future;
18use std::hash::{Hash, Hasher};
19use std::pin::pin;
20use std::rc::Rc;
21use std::sync::Arc;
22use std::time::Instant;
23
24use anyhow::anyhow;
25use arrow::array::ArrayRef;
26use differential_dataflow::Hashable;
27use differential_dataflow::difference::Monoid;
28use differential_dataflow::lattice::Lattice;
29use futures_util::StreamExt;
30use mz_ore::cast::CastFrom;
31use mz_ore::collections::CollectionExt;
32use mz_persist_types::stats::PartStats;
33use mz_persist_types::{Codec, Codec64};
34use mz_timely_util::builder_async::{
35 Event, OperatorBuilder as AsyncOperatorBuilder, PressOnDropButton,
36};
37use timely::PartialOrder;
38use timely::container::CapacityContainerBuilder;
39use timely::dataflow::channels::pact::{Exchange, Pipeline};
40use timely::dataflow::operators::{CapabilitySet, ConnectLoop, Enter, Feedback, Leave};
41use timely::dataflow::{Scope, StreamVec};
42use timely::order::TotalOrder;
43use timely::progress::frontier::AntichainRef;
44use timely::progress::{Antichain, Timestamp, timestamp::Refines};
45use tracing::{debug, trace};
46
47use crate::batch::BLOB_TARGET_SIZE;
48use crate::cfg::{RetryParameters, USE_CRITICAL_SINCE_SOURCE};
49use crate::fetch::{ExchangeableBatchPart, FetchedBlob, Lease};
50use crate::internal::state::BatchPart;
51use crate::stats::{STATS_AUDIT_PERCENT, STATS_FILTER_ENABLED};
52use crate::{Diagnostics, PersistClient, ShardId};
53
54#[derive(Debug, Clone, PartialEq, Default)]
56pub enum FilterResult {
57 #[default]
59 Keep,
60 Discard,
62 ReplaceWith {
65 key: ArrayRef,
67 val: ArrayRef,
69 },
70}
71
72impl FilterResult {
73 pub fn keep_all<T>(_stats: &PartStats, _frontier: AntichainRef<T>) -> FilterResult {
75 Self::Keep
76 }
77}
78
79#[derive(Clone)]
90pub enum ErrorHandler {
91 Halt(&'static str),
93 Signal(Rc<dyn Fn(anyhow::Error) + 'static>),
95}
96
97impl Debug for ErrorHandler {
98 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
99 match self {
100 ErrorHandler::Halt(name) => f.debug_tuple("ErrorHandler::Halt").field(name).finish(),
101 ErrorHandler::Signal(_) => f.write_str("ErrorHandler::Signal"),
102 }
103 }
104}
105
106impl ErrorHandler {
107 pub fn signal(signal_fn: impl Fn(anyhow::Error) + 'static) -> Self {
109 Self::Signal(Rc::new(signal_fn))
110 }
111
112 pub async fn report_and_stop(&self, error: anyhow::Error) -> ! {
116 match self {
117 ErrorHandler::Halt(name) => {
118 mz_ore::halt!("unhandled error in {name}: {error:#}")
119 }
120 ErrorHandler::Signal(callback) => {
121 let () = callback(error);
122 std::future::pending().await
123 }
124 }
125 }
126}
127
128pub fn shard_source<'inner, 'outer, K, V, T, D, DT, TOuter, C>(
142 outer: Scope<'outer, TOuter>,
143 scope: Scope<'inner, T>,
144 name: &str,
145 client: impl Fn() -> C,
146 shard_id: ShardId,
147 as_of: Option<Antichain<TOuter>>,
148 snapshot_mode: SnapshotMode,
149 until: Antichain<TOuter>,
150 desc_transformer: Option<DT>,
151 key_schema: Arc<K::Schema>,
152 val_schema: Arc<V::Schema>,
153 filter_fn: impl FnMut(&PartStats, AntichainRef<TOuter>) -> FilterResult + 'static,
154 listen_sleep: Option<impl Fn() -> RetryParameters + 'static>,
156 start_signal: impl Future<Output = ()> + 'static,
157 error_handler: ErrorHandler,
158) -> (
159 StreamVec<'inner, T, FetchedBlob<K, V, TOuter, D>>,
160 Vec<PressOnDropButton>,
161)
162where
163 K: Debug + Codec,
164 V: Debug + Codec,
165 D: Monoid + Codec64 + Send + Sync,
166 TOuter: Timestamp + Lattice + Codec64 + TotalOrder + Sync,
168 T: Refines<TOuter>,
169 DT: FnOnce(
170 Scope<'inner, T>,
171 StreamVec<'inner, T, (usize, ExchangeableBatchPart<TOuter>)>,
172 usize,
173 ) -> (
174 StreamVec<'inner, T, (usize, ExchangeableBatchPart<TOuter>)>,
175 Vec<PressOnDropButton>,
176 ),
177 C: Future<Output = PersistClient> + Send + 'static,
178{
179 let chosen_worker = usize::cast_from(name.hashed()) % scope.peers();
195
196 let mut tokens = vec![];
197
198 let (completed_fetches_feedback_handle, completed_fetches_feedback_stream) =
201 scope.feedback(T::Summary::default());
202
203 let is_transient = !until.is_empty();
207
208 let (descs, descs_token) = shard_source_descs::<K, V, D, TOuter>(
209 outer,
210 name,
211 client(),
212 shard_id.clone(),
213 as_of,
214 snapshot_mode,
215 until,
216 completed_fetches_feedback_stream.leave(outer),
217 chosen_worker,
218 Arc::clone(&key_schema),
219 Arc::clone(&val_schema),
220 filter_fn,
221 listen_sleep,
222 start_signal,
223 error_handler.clone(),
224 );
225 tokens.push(descs_token);
226
227 let descs = descs.enter(scope);
228 let descs = match desc_transformer {
229 Some(desc_transformer) => {
230 let (descs, extra_tokens) = desc_transformer(scope, descs, chosen_worker);
231 tokens.extend(extra_tokens);
232 descs
233 }
234 None => descs,
235 };
236
237 let (parts, completed_fetches_stream, fetch_token) = shard_source_fetch::<K, V, TOuter, D, T>(
238 descs,
239 name,
240 client(),
241 shard_id,
242 key_schema,
243 val_schema,
244 is_transient,
245 error_handler,
246 );
247 completed_fetches_stream.connect_loop(completed_fetches_feedback_handle);
248 tokens.push(fetch_token);
249
250 (parts, tokens)
251}
252
253#[derive(Debug, Clone, Copy)]
255pub enum SnapshotMode {
256 Include,
258 Exclude,
260}
261
262#[derive(Debug)]
263struct LeaseManager<T> {
264 leases: BTreeMap<T, Vec<Lease>>,
265}
266
267impl<T: Timestamp + Codec64> LeaseManager<T> {
268 fn new() -> Self {
269 Self {
270 leases: BTreeMap::new(),
271 }
272 }
273
274 fn push_at(&mut self, time: T, lease: Lease) {
276 self.leases.entry(time).or_default().push(lease);
277 }
278
279 fn advance_to(&mut self, frontier: AntichainRef<T>)
281 where
282 T: TotalOrder,
284 {
285 while let Some(first) = self.leases.first_entry() {
286 if frontier.less_equal(first.key()) {
287 break; }
289 drop(first.remove());
290 }
291 }
292}
293
294pub(crate) fn shard_source_descs<'outer, K, V, D, TOuter>(
295 scope: Scope<'outer, TOuter>,
296 name: &str,
297 client: impl Future<Output = PersistClient> + Send + 'static,
298 shard_id: ShardId,
299 as_of: Option<Antichain<TOuter>>,
300 snapshot_mode: SnapshotMode,
301 until: Antichain<TOuter>,
302 completed_fetches_stream: StreamVec<'outer, TOuter, Infallible>,
303 chosen_worker: usize,
304 key_schema: Arc<K::Schema>,
305 val_schema: Arc<V::Schema>,
306 mut filter_fn: impl FnMut(&PartStats, AntichainRef<TOuter>) -> FilterResult + 'static,
307 listen_sleep: Option<impl Fn() -> RetryParameters + 'static>,
309 start_signal: impl Future<Output = ()> + 'static,
310 error_handler: ErrorHandler,
311) -> (
312 StreamVec<'outer, TOuter, (usize, ExchangeableBatchPart<TOuter>)>,
313 PressOnDropButton,
314)
315where
316 K: Debug + Codec,
317 V: Debug + Codec,
318 D: Monoid + Codec64 + Send + Sync,
319 TOuter: Timestamp + Lattice + Codec64 + TotalOrder + Sync,
321{
322 let worker_index = scope.index();
323 let num_workers = scope.peers();
324
325 let name_owned = name.to_owned();
328
329 let listen_handle = Rc::new(RefCell::new(None));
331 let return_listen_handle = Rc::clone(&listen_handle);
332
333 let (tx, rx) = tokio::sync::oneshot::channel::<Rc<RefCell<LeaseManager<TOuter>>>>();
335 let mut builder = AsyncOperatorBuilder::new(
336 format!("shard_source_descs_return({})", name),
337 scope.clone(),
338 );
339 let mut completed_fetches = builder.new_disconnected_input(completed_fetches_stream, Pipeline);
340 builder.build(move |_caps| async move {
343 let Ok(leases) = rx.await else {
344 return;
347 };
348 while let Some(event) = completed_fetches.next().await {
349 let Event::Progress(frontier) = event else {
350 continue;
351 };
352 leases.borrow_mut().advance_to(frontier.borrow());
353 }
354 drop(return_listen_handle);
356 });
357
358 let mut builder =
359 AsyncOperatorBuilder::new(format!("shard_source_descs({})", name), scope.clone());
360 let (descs_output, descs_stream) = builder.new_output::<CapacityContainerBuilder<_>>();
361
362 #[allow(clippy::await_holding_refcell_ref)]
363 let shutdown_button = builder.build(move |caps| async move {
364 let mut cap_set = CapabilitySet::from_elem(caps.into_element());
365
366 if worker_index != chosen_worker {
368 trace!(
369 "We are not the chosen worker ({}), exiting...",
370 chosen_worker
371 );
372 return;
373 }
374
375 let mut read = mz_ore::task::spawn(|| format!("shard_source_reader({})", name_owned), {
385 let diagnostics = Diagnostics {
386 handle_purpose: format!("shard_source({})", name_owned),
387 shard_name: name_owned.clone(),
388 };
389 async move {
390 let client = client.await;
391 client
392 .open_leased_reader::<K, V, TOuter, D>(
393 shard_id,
394 key_schema,
395 val_schema,
396 diagnostics,
397 USE_CRITICAL_SINCE_SOURCE.get(client.dyncfgs()),
398 )
399 .await
400 }
401 })
402 .await
403 .expect("could not open persist shard");
404
405 let () = start_signal.await;
409
410 let cfg = read.cfg.clone();
411 let metrics = Arc::clone(&read.metrics);
412
413 let as_of = as_of.unwrap_or_else(|| read.since().clone());
414
415 cap_set.downgrade(as_of.clone());
431
432 let mut snapshot_parts =
433 match snapshot_mode {
434 SnapshotMode::Include => match read.snapshot(as_of.clone()).await {
435 Ok(parts) => parts,
436 Err(e) => error_handler
437 .report_and_stop(anyhow!(
438 "{name_owned}: {shard_id} cannot serve requested as_of {as_of:?}: {e:?}"
439 ))
440 .await,
441 },
442 SnapshotMode::Exclude => vec![],
443 };
444
445 let leases = Rc::new(RefCell::new(LeaseManager::new()));
449 tx.send(Rc::clone(&leases))
450 .expect("lease returner exited before desc producer");
451
452 let mut listen = listen_handle.borrow_mut();
455 let listen = match read.listen(as_of.clone()).await {
456 Ok(handle) => listen.insert(handle),
457 Err(e) => {
458 error_handler
459 .report_and_stop(anyhow!(
460 "{name_owned}: {shard_id} cannot serve requested as_of {as_of:?}: {e:?}"
461 ))
462 .await
463 }
464 };
465
466 let listen_retry = listen_sleep.as_ref().map(|retry| retry());
467
468 let listen_head = if !snapshot_parts.is_empty() {
470 let (mut parts, progress) = listen.next(listen_retry).await;
471 snapshot_parts.append(&mut parts);
472 futures::stream::iter(Some((snapshot_parts, progress)))
473 } else {
474 futures::stream::iter(None)
475 };
476
477 let listen_tail = futures::stream::unfold(listen, |listen| async move {
479 Some((listen.next(listen_retry).await, listen))
480 });
481
482 let mut shard_stream = pin!(listen_head.chain(listen_tail));
483
484 let mut audit_budget_bytes = u64::cast_from(BLOB_TARGET_SIZE.get(&cfg).saturating_mul(2));
488
489 let mut current_frontier = as_of.clone();
491
492 while !PartialOrder::less_equal(&until, ¤t_frontier) {
495 let (parts, progress) = shard_stream.next().await.expect("infinite stream");
496
497 let current_ts = current_frontier
500 .as_option()
501 .expect("until should always be <= the empty frontier");
502 let session_cap = cap_set.delayed(current_ts);
503
504 for mut part_desc in parts {
505 if STATS_FILTER_ENABLED.get(&cfg) {
508 let filter_result = match &part_desc.part {
509 BatchPart::Hollow(x) => {
510 let should_fetch =
511 x.stats.as_ref().map_or(FilterResult::Keep, |stats| {
512 filter_fn(&stats.decode(), current_frontier.borrow())
513 });
514 should_fetch
515 }
516 BatchPart::Inline { .. } => FilterResult::Keep,
517 };
518 let bytes = u64::cast_from(part_desc.encoded_size_bytes());
520 match filter_result {
521 FilterResult::Keep => {
522 audit_budget_bytes = audit_budget_bytes.saturating_add(bytes);
523 }
524 FilterResult::Discard => {
525 metrics.pushdown.parts_filtered_count.inc();
526 metrics.pushdown.parts_filtered_bytes.inc_by(bytes);
527 let should_audit = match &part_desc.part {
528 BatchPart::Hollow(x) => {
529 let mut h = DefaultHasher::new();
530 x.key.hash(&mut h);
531 usize::cast_from(h.finish()) % 100
532 < STATS_AUDIT_PERCENT.get(&cfg)
533 }
534 BatchPart::Inline { .. } => false,
535 };
536 if should_audit && bytes < audit_budget_bytes {
537 audit_budget_bytes -= bytes;
538 metrics.pushdown.parts_audited_count.inc();
539 metrics.pushdown.parts_audited_bytes.inc_by(bytes);
540 part_desc.request_filter_pushdown_audit();
541 } else {
542 debug!(
543 "skipping part because of stats filter {:?}",
544 part_desc.part.stats()
545 );
546 continue;
547 }
548 }
549 FilterResult::ReplaceWith { key, val } => {
550 part_desc.maybe_optimize(&cfg, key, val);
551 audit_budget_bytes = audit_budget_bytes.saturating_add(bytes);
552 }
553 }
554 let bytes = u64::cast_from(part_desc.encoded_size_bytes());
555 if part_desc.part.is_inline() {
556 metrics.pushdown.parts_inline_count.inc();
557 metrics.pushdown.parts_inline_bytes.inc_by(bytes);
558 } else {
559 metrics.pushdown.parts_fetched_count.inc();
560 metrics.pushdown.parts_fetched_bytes.inc_by(bytes);
561 }
562 }
563
564 let worker_idx = usize::cast_from(Instant::now().hashed()) % num_workers;
571 let (part, lease) = part_desc.into_exchangeable_part();
572 leases.borrow_mut().push_at(current_ts.clone(), lease);
573 descs_output.give(&session_cap, (worker_idx, part));
574 }
575
576 current_frontier.join_assign(&progress);
577 cap_set.downgrade(progress.iter());
578 }
579 });
580
581 (descs_stream, shutdown_button.press_on_drop())
582}
583
584pub(crate) fn shard_source_fetch<'inner, K, V, T, D, TInner>(
585 descs: StreamVec<'inner, TInner, (usize, ExchangeableBatchPart<T>)>,
586 name: &str,
587 client: impl Future<Output = PersistClient> + Send + 'static,
588 shard_id: ShardId,
589 key_schema: Arc<K::Schema>,
590 val_schema: Arc<V::Schema>,
591 is_transient: bool,
592 error_handler: ErrorHandler,
593) -> (
594 StreamVec<'inner, TInner, FetchedBlob<K, V, T, D>>,
595 StreamVec<'inner, TInner, Infallible>,
596 PressOnDropButton,
597)
598where
599 K: Debug + Codec,
600 V: Debug + Codec,
601 T: Timestamp + Lattice + Codec64 + Sync,
602 D: Monoid + Codec64 + Send + Sync,
603 TInner: Timestamp + Refines<T>,
604{
605 let mut builder =
606 AsyncOperatorBuilder::new(format!("shard_source_fetch({})", name), descs.scope());
607 let (fetched_output, fetched_stream) = builder.new_output::<CapacityContainerBuilder<_>>();
608 let (completed_fetches_output, completed_fetches_stream) =
609 builder.new_output::<CapacityContainerBuilder<Vec<Infallible>>>();
610 let mut descs_input = builder.new_input_for_many(
611 descs,
612 Exchange::new(|&(i, _): &(usize, _)| u64::cast_from(i)),
613 [&fetched_output, &completed_fetches_output],
614 );
615 let name_owned = name.to_owned();
616
617 let shutdown_button = builder.build(move |_capabilities| async move {
618 let mut fetcher = mz_ore::task::spawn(|| format!("shard_source_fetch({})", name_owned), {
619 let diagnostics = Diagnostics {
620 shard_name: name_owned.clone(),
621 handle_purpose: format!("shard_source_fetch batch fetcher {}", name_owned),
622 };
623 async move {
624 client
625 .await
626 .create_batch_fetcher::<K, V, T, D>(
627 shard_id,
628 key_schema,
629 val_schema,
630 is_transient,
631 diagnostics,
632 )
633 .await
634 }
635 })
636 .await
637 .expect("shard codecs should not change");
638
639 while let Some(event) = descs_input.next().await {
640 if let Event::Data([fetched_cap, _completed_fetches_cap], data) = event {
641 for (_idx, part) in data {
644 let reader_id = part.reader_id().clone();
645 let fetched = fetcher
646 .fetch_leased_part(part)
647 .await
648 .expect("shard_id should match across all workers");
649 let fetched = match fetched {
650 Ok(fetched) => fetched,
651 Err(blob_key) => {
652 let diagnostics = fetcher.missing_blob_diagnostics(&reader_id).await;
662 error_handler
663 .report_and_stop(anyhow!(
664 "batch fetcher could not fetch batch part {}: {}",
665 blob_key,
666 diagnostics
667 ))
668 .await
669 }
670 };
671 {
672 fetched_output.give(&fetched_cap, fetched);
678 }
679 }
680 }
681 }
682 });
683
684 (
685 fetched_stream,
686 completed_fetches_stream,
687 shutdown_button.press_on_drop(),
688 )
689}
690
691#[cfg(test)]
692mod tests {
693 use super::*;
694 use std::sync::Arc;
695
696 use mz_persist::location::SeqNo;
697 use timely::dataflow::operators::Leave;
698 use timely::dataflow::operators::Probe;
699 use timely::dataflow::operators::probe::Handle as ProbeHandle;
700 use timely::progress::Antichain;
701
702 use crate::operators::shard_source::shard_source;
703 use crate::{Diagnostics, ShardId};
704
705 #[mz_ore::test]
706 fn test_lease_manager() {
707 let lease = Lease::new(SeqNo::minimum());
708 let mut manager = LeaseManager::new();
709 for t in 0u64..10 {
710 manager.push_at(t, lease.clone());
711 }
712 assert_eq!(lease.count(), 11);
713 manager.advance_to(AntichainRef::new(&[5]));
714 assert_eq!(lease.count(), 6);
715 manager.advance_to(AntichainRef::new(&[3]));
716 assert_eq!(lease.count(), 6);
717 manager.advance_to(AntichainRef::new(&[9]));
718 assert_eq!(lease.count(), 2);
719 manager.advance_to(AntichainRef::new(&[10]));
720 assert_eq!(lease.count(), 1);
721 }
722
723 #[mz_ore::test(tokio::test(flavor = "multi_thread"))]
731 #[cfg_attr(miri, ignore)] async fn test_shard_source_implicit_initial_as_of() {
733 let persist_client = PersistClient::new_for_tests().await;
734
735 let expected_frontier = 42;
736 let shard_id = ShardId::new();
737
738 initialize_shard(
739 &persist_client,
740 shard_id,
741 Antichain::from_elem(expected_frontier),
742 )
743 .await;
744
745 let res = timely::execute::execute_directly(move |worker| {
746 let until = Antichain::new();
747
748 let (probe, _token) = worker.dataflow::<u64, _, _>(|outer| {
749 let (stream, token) = outer.scoped::<u64, _, _>("hybrid", |scope| {
750 let transformer = move |_, descs, _| (descs, vec![]);
751 let (stream, tokens) = shard_source::<String, String, u64, u64, _, _, _>(
752 outer,
753 scope,
754 "test_source",
755 move || std::future::ready(persist_client.clone()),
756 shard_id,
757 None, SnapshotMode::Include,
759 until,
760 Some(transformer),
761 Arc::new(
762 <std::string::String as mz_persist_types::Codec>::Schema::default(),
763 ),
764 Arc::new(
765 <std::string::String as mz_persist_types::Codec>::Schema::default(),
766 ),
767 FilterResult::keep_all,
768 false.then_some(|| unreachable!()),
769 async {},
770 ErrorHandler::Halt("test"),
771 );
772 (stream.leave(outer), tokens)
773 });
774
775 let probe = ProbeHandle::new();
776 let _stream = stream.probe_with(&probe);
777
778 (probe, token)
779 });
780
781 while probe.less_than(&expected_frontier) {
782 worker.step();
783 }
784
785 let mut probe_frontier = Antichain::new();
786 probe.with_frontier(|f| probe_frontier.extend(f.iter().cloned()));
787
788 probe_frontier
789 });
790
791 assert_eq!(res, Antichain::from_elem(expected_frontier));
792 }
793
794 #[mz_ore::test(tokio::test(flavor = "multi_thread"))]
801 #[cfg_attr(miri, ignore)] async fn test_shard_source_explicit_initial_as_of() {
803 let persist_client = PersistClient::new_for_tests().await;
804
805 let expected_frontier = 42;
806 let shard_id = ShardId::new();
807
808 initialize_shard(
809 &persist_client,
810 shard_id,
811 Antichain::from_elem(expected_frontier),
812 )
813 .await;
814
815 let res = timely::execute::execute_directly(move |worker| {
816 let as_of = Antichain::from_elem(expected_frontier);
817 let until = Antichain::new();
818
819 let (probe, _token) = worker.dataflow::<u64, _, _>(|outer| {
820 let (stream, token) = outer.scoped::<u64, _, _>("hybrid", |scope| {
821 let transformer = move |_, descs, _| (descs, vec![]);
822 let (stream, tokens) = shard_source::<String, String, u64, u64, _, _, _>(
823 outer,
824 scope,
825 "test_source",
826 move || std::future::ready(persist_client.clone()),
827 shard_id,
828 Some(as_of), SnapshotMode::Include,
830 until,
831 Some(transformer),
832 Arc::new(
833 <std::string::String as mz_persist_types::Codec>::Schema::default(),
834 ),
835 Arc::new(
836 <std::string::String as mz_persist_types::Codec>::Schema::default(),
837 ),
838 FilterResult::keep_all,
839 false.then_some(|| unreachable!()),
840 async {},
841 ErrorHandler::Halt("test"),
842 );
843 (stream.leave(outer), tokens)
844 });
845
846 let probe = ProbeHandle::new();
847 let _stream = stream.probe_with(&probe);
848
849 (probe, token)
850 });
851
852 while probe.less_than(&expected_frontier) {
853 worker.step();
854 }
855
856 let mut probe_frontier = Antichain::new();
857 probe.with_frontier(|f| probe_frontier.extend(f.iter().cloned()));
858
859 probe_frontier
860 });
861
862 assert_eq!(res, Antichain::from_elem(expected_frontier));
863 }
864
865 async fn initialize_shard(
866 persist_client: &PersistClient,
867 shard_id: ShardId,
868 since: Antichain<u64>,
869 ) {
870 let mut read_handle = persist_client
871 .open_leased_reader::<String, String, u64, u64>(
872 shard_id,
873 Arc::new(<std::string::String as mz_persist_types::Codec>::Schema::default()),
874 Arc::new(<std::string::String as mz_persist_types::Codec>::Schema::default()),
875 Diagnostics::for_tests(),
876 true,
877 )
878 .await
879 .expect("invalid usage");
880
881 read_handle.downgrade_since(&since).await;
882 }
883}