mz_persist_client/
lib.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! An abstraction presenting as a durable time-varying collection (aka shard)
11
12#![warn(missing_docs, missing_debug_implementations)]
13// #[track_caller] is currently a no-op on async functions, but that hopefully won't be the case
14// forever. So we already annotate those functions now and ignore the compiler warning until
15// https://github.com/rust-lang/rust/issues/87417 pans out.
16#![allow(ungated_async_fn_track_caller)]
17
18use std::fmt::Debug;
19use std::marker::PhantomData;
20use std::sync::Arc;
21
22use differential_dataflow::difference::Semigroup;
23use differential_dataflow::lattice::Lattice;
24use itertools::Itertools;
25use mz_build_info::{BuildInfo, build_info};
26use mz_dyncfg::ConfigSet;
27use mz_ore::{instrument, soft_assert_or_log};
28use mz_persist::location::{Blob, Consensus, ExternalError};
29use mz_persist_types::schema::SchemaId;
30use mz_persist_types::{Codec, Codec64, Opaque};
31use mz_proto::{IntoRustIfSome, ProtoType};
32use semver::Version;
33use timely::order::TotalOrder;
34use timely::progress::{Antichain, Timestamp};
35
36use crate::async_runtime::IsolatedRuntime;
37use crate::batch::{BATCH_DELETE_ENABLED, Batch, BatchBuilder, ProtoBatch};
38use crate::cache::{PersistClientCache, StateCache};
39use crate::cfg::PersistConfig;
40use crate::critical::{CriticalReaderId, SinceHandle};
41use crate::error::InvalidUsage;
42use crate::fetch::{BatchFetcher, BatchFetcherConfig};
43use crate::internal::compact::{CompactConfig, Compactor};
44use crate::internal::encoding::parse_id;
45use crate::internal::gc::GarbageCollector;
46use crate::internal::machine::{Machine, retry_external};
47use crate::internal::state_versions::StateVersions;
48use crate::metrics::Metrics;
49use crate::read::{
50    Cursor, LazyPartStats, LeasedReaderId, READER_LEASE_DURATION, ReadHandle, Since,
51};
52use crate::rpc::PubSubSender;
53use crate::schema::CaESchema;
54use crate::write::{WriteHandle, WriterId};
55
56pub mod async_runtime;
57pub mod batch;
58pub mod cache;
59pub mod cfg;
60pub mod cli {
61    //! Persist command-line utilities
62    pub mod admin;
63    pub mod args;
64    pub mod bench;
65    pub mod inspect;
66}
67pub mod critical;
68pub mod error;
69pub mod fetch;
70pub mod internals_bench;
71pub mod iter;
72pub mod metrics {
73    //! Utilities related to metrics.
74    pub use crate::internal::metrics::{
75        Metrics, SinkMetrics, SinkWorkerMetrics, UpdateDelta, encode_ts_metric,
76    };
77}
78pub mod operators {
79    //! [timely] operators for reading and writing persist Shards.
80
81    use mz_dyncfg::Config;
82
83    pub mod shard_source;
84
85    // TODO(cfg): Move this next to the use.
86    pub(crate) const STORAGE_SOURCE_DECODE_FUEL: Config<usize> = Config::new(
87        "storage_source_decode_fuel",
88        100_000,
89        "\
90        The maximum amount of work to do in the persist_source mfp_and_decode \
91        operator before yielding.",
92    );
93}
94pub mod read;
95pub mod rpc;
96pub mod schema;
97pub mod stats;
98pub mod usage;
99pub mod write;
100
101/// An implementation of the public crate interface.
102mod internal {
103    pub mod apply;
104    pub mod cache;
105    pub mod compact;
106    pub mod encoding;
107    pub mod gc;
108    pub mod machine;
109    pub mod maintenance;
110    pub mod merge;
111    pub mod metrics;
112    pub mod paths;
113    pub mod restore;
114    pub mod service;
115    pub mod state;
116    pub mod state_diff;
117    pub mod state_versions;
118    pub mod trace;
119    pub mod watch;
120
121    #[cfg(test)]
122    pub mod datadriven;
123}
124
125/// Persist build information.
126pub const BUILD_INFO: BuildInfo = build_info!();
127
128// Re-export for convenience.
129pub use mz_persist_types::{PersistLocation, ShardId};
130
131pub use crate::internal::encoding::Schemas;
132
133/// Additional diagnostic information used within Persist
134/// e.g. for logging, metric labels, etc.
135#[derive(Clone, Debug)]
136pub struct Diagnostics {
137    /// A user-friendly name for the shard.
138    pub shard_name: String,
139    /// A purpose for the handle.
140    pub handle_purpose: String,
141}
142
143impl Diagnostics {
144    /// Create a new `Diagnostics` from `handle_purpose`.
145    pub fn from_purpose(handle_purpose: &str) -> Self {
146        Self {
147            shard_name: "unknown".to_string(),
148            handle_purpose: handle_purpose.to_string(),
149        }
150    }
151
152    /// Create a new `Diagnostics` for testing.
153    pub fn for_tests() -> Self {
154        Self {
155            shard_name: "test-shard-name".to_string(),
156            handle_purpose: "test-purpose".to_string(),
157        }
158    }
159}
160
161/// A handle for interacting with the set of persist shard made durable at a
162/// single [PersistLocation].
163///
164/// All async methods on PersistClient retry for as long as they are able, but
165/// the returned [std::future::Future]s implement "cancel on drop" semantics.
166/// This means that callers can add a timeout using [tokio::time::timeout] or
167/// [tokio::time::timeout_at].
168///
169/// ```rust,no_run
170/// # use std::sync::Arc;
171/// # use mz_persist_types::codec_impls::StringSchema;
172/// # let client: mz_persist_client::PersistClient = unimplemented!();
173/// # let timeout: std::time::Duration = unimplemented!();
174/// # let id = mz_persist_client::ShardId::new();
175/// # let diagnostics = mz_persist_client::Diagnostics { shard_name: "".into(), handle_purpose: "".into() };
176/// # async {
177/// tokio::time::timeout(timeout, client.open::<String, String, u64, i64>(id,
178///     Arc::new(StringSchema),Arc::new(StringSchema),diagnostics, true)).await
179/// # };
180/// ```
181#[derive(Debug, Clone)]
182pub struct PersistClient {
183    cfg: PersistConfig,
184    blob: Arc<dyn Blob>,
185    consensus: Arc<dyn Consensus>,
186    metrics: Arc<Metrics>,
187    isolated_runtime: Arc<IsolatedRuntime>,
188    shared_states: Arc<StateCache>,
189    pubsub_sender: Arc<dyn PubSubSender>,
190}
191
192impl PersistClient {
193    /// Returns a new client for interfacing with persist shards made durable to
194    /// the given [Blob] and [Consensus].
195    ///
196    /// This is exposed mostly for testing. Persist users likely want
197    /// [crate::cache::PersistClientCache::open].
198    pub fn new(
199        cfg: PersistConfig,
200        blob: Arc<dyn Blob>,
201        consensus: Arc<dyn Consensus>,
202        metrics: Arc<Metrics>,
203        isolated_runtime: Arc<IsolatedRuntime>,
204        shared_states: Arc<StateCache>,
205        pubsub_sender: Arc<dyn PubSubSender>,
206    ) -> Result<Self, ExternalError> {
207        // TODO: Verify somehow that blob matches consensus to prevent
208        // accidental misuse.
209        Ok(PersistClient {
210            cfg,
211            blob,
212            consensus,
213            metrics,
214            isolated_runtime,
215            shared_states,
216            pubsub_sender,
217        })
218    }
219
220    /// Returns a new in-mem [PersistClient] for tests and examples.
221    pub async fn new_for_tests() -> Self {
222        let cache = PersistClientCache::new_no_metrics();
223        cache
224            .open(PersistLocation::new_in_mem())
225            .await
226            .expect("in-mem location is valid")
227    }
228
229    /// Returns persist's [ConfigSet].
230    pub fn dyncfgs(&self) -> &ConfigSet {
231        &self.cfg.configs
232    }
233
234    async fn make_machine<K, V, T, D>(
235        &self,
236        shard_id: ShardId,
237        diagnostics: Diagnostics,
238    ) -> Result<Machine<K, V, T, D>, InvalidUsage<T>>
239    where
240        K: Debug + Codec,
241        V: Debug + Codec,
242        T: Timestamp + Lattice + Codec64 + Sync,
243        D: Semigroup + Codec64 + Send + Sync,
244    {
245        let state_versions = StateVersions::new(
246            self.cfg.clone(),
247            Arc::clone(&self.consensus),
248            Arc::clone(&self.blob),
249            Arc::clone(&self.metrics),
250        );
251        let machine = Machine::<K, V, T, D>::new(
252            self.cfg.clone(),
253            shard_id,
254            Arc::clone(&self.metrics),
255            Arc::new(state_versions),
256            Arc::clone(&self.shared_states),
257            Arc::clone(&self.pubsub_sender),
258            Arc::clone(&self.isolated_runtime),
259            diagnostics.clone(),
260        )
261        .await?;
262        Ok(machine)
263    }
264
265    /// Provides capabilities for the durable TVC identified by `shard_id` at
266    /// its current since and upper frontiers.
267    ///
268    /// This method is a best-effort attempt to regain control of the frontiers
269    /// of a shard. Its most common uses are to recover capabilities that have
270    /// expired (leases) or to attempt to read a TVC that one did not create (or
271    /// otherwise receive capabilities for). If the frontiers have been fully
272    /// released by all other parties, this call may result in capabilities with
273    /// empty frontiers (which are useless).
274    ///
275    /// If `shard_id` has never been used before, initializes a new shard and
276    /// returns handles with `since` and `upper` frontiers set to initial values
277    /// of `Antichain::from_elem(T::minimum())`.
278    ///
279    /// The `schema` parameter is currently unused, but should be an object
280    /// that represents the schema of the data in the shard. This will be required
281    /// in the future.
282    #[instrument(level = "debug", fields(shard = %shard_id))]
283    pub async fn open<K, V, T, D>(
284        &self,
285        shard_id: ShardId,
286        key_schema: Arc<K::Schema>,
287        val_schema: Arc<V::Schema>,
288        diagnostics: Diagnostics,
289        use_critical_since: bool,
290    ) -> Result<(WriteHandle<K, V, T, D>, ReadHandle<K, V, T, D>), InvalidUsage<T>>
291    where
292        K: Debug + Codec,
293        V: Debug + Codec,
294        T: Timestamp + TotalOrder + Lattice + Codec64 + Sync,
295        D: Semigroup + Ord + Codec64 + Send + Sync,
296    {
297        Ok((
298            self.open_writer(
299                shard_id,
300                Arc::clone(&key_schema),
301                Arc::clone(&val_schema),
302                diagnostics.clone(),
303            )
304            .await?,
305            self.open_leased_reader(
306                shard_id,
307                key_schema,
308                val_schema,
309                diagnostics,
310                use_critical_since,
311            )
312            .await?,
313        ))
314    }
315
316    /// [Self::open], but returning only a [ReadHandle].
317    ///
318    /// Use this to save latency and a bit of persist traffic if you're just
319    /// going to immediately drop or expire the [WriteHandle].
320    ///
321    /// The `_schema` parameter is currently unused, but should be an object
322    /// that represents the schema of the data in the shard. This will be required
323    /// in the future.
324    #[instrument(level = "debug", fields(shard = %shard_id))]
325    pub async fn open_leased_reader<K, V, T, D>(
326        &self,
327        shard_id: ShardId,
328        key_schema: Arc<K::Schema>,
329        val_schema: Arc<V::Schema>,
330        diagnostics: Diagnostics,
331        use_critical_since: bool,
332    ) -> Result<ReadHandle<K, V, T, D>, InvalidUsage<T>>
333    where
334        K: Debug + Codec,
335        V: Debug + Codec,
336        T: Timestamp + TotalOrder + Lattice + Codec64 + Sync,
337        D: Semigroup + Codec64 + Send + Sync,
338    {
339        let machine = self.make_machine(shard_id, diagnostics.clone()).await?;
340        let gc = GarbageCollector::new(machine.clone(), Arc::clone(&self.isolated_runtime));
341
342        let reader_id = LeasedReaderId::new();
343        let heartbeat_ts = (self.cfg.now)();
344        let (reader_state, maintenance) = machine
345            .register_leased_reader(
346                &reader_id,
347                &diagnostics.handle_purpose,
348                READER_LEASE_DURATION.get(&self.cfg),
349                heartbeat_ts,
350                use_critical_since,
351            )
352            .await;
353        maintenance.start_performing(&machine, &gc);
354        let schemas = Schemas {
355            id: None,
356            key: key_schema,
357            val: val_schema,
358        };
359        let reader = ReadHandle::new(
360            self.cfg.clone(),
361            Arc::clone(&self.metrics),
362            machine,
363            gc,
364            Arc::clone(&self.blob),
365            reader_id,
366            schemas,
367            reader_state.since,
368            heartbeat_ts,
369        )
370        .await;
371
372        Ok(reader)
373    }
374
375    /// Creates and returns a [BatchFetcher] for the given shard id.
376    #[instrument(level = "debug", fields(shard = %shard_id))]
377    pub async fn create_batch_fetcher<K, V, T, D>(
378        &self,
379        shard_id: ShardId,
380        key_schema: Arc<K::Schema>,
381        val_schema: Arc<V::Schema>,
382        is_transient: bool,
383        diagnostics: Diagnostics,
384    ) -> Result<BatchFetcher<K, V, T, D>, InvalidUsage<T>>
385    where
386        K: Debug + Codec,
387        V: Debug + Codec,
388        T: Timestamp + Lattice + Codec64 + Sync,
389        D: Semigroup + Codec64 + Send + Sync,
390    {
391        let machine = self.make_machine(shard_id, diagnostics.clone()).await?;
392        let read_schemas = Schemas {
393            id: None,
394            key: key_schema,
395            val: val_schema,
396        };
397        let schema_cache = machine.applier.schema_cache();
398        let fetcher = BatchFetcher {
399            cfg: BatchFetcherConfig::new(&self.cfg),
400            blob: Arc::clone(&self.blob),
401            metrics: Arc::clone(&self.metrics),
402            shard_metrics: Arc::clone(&machine.applier.shard_metrics),
403            shard_id,
404            read_schemas,
405            schema_cache,
406            is_transient,
407            _phantom: PhantomData,
408        };
409
410        Ok(fetcher)
411    }
412
413    /// A convenience [CriticalReaderId] for Materialize controllers.
414    ///
415    /// For most (soon to be all?) shards in Materialize, a centralized
416    /// "controller" is the authority for when a user no longer needs to read at
417    /// a given frontier. (Other uses are temporary holds where correctness of
418    /// the overall system can be maintained through a lease timeout.) To make
419    /// [SinceHandle] easier to work with, we offer this convenience id for
420    /// Materialize controllers, so they don't have to durably record it.
421    ///
422    /// TODO: We're still shaking out whether the controller should be the only
423    /// critical since hold or if there are other places we want them. If the
424    /// former, we should remove [CriticalReaderId] and bake in the singular
425    /// nature of the controller critical handle.
426    ///
427    /// ```rust
428    /// // This prints as something that is not 0 but is visually recognizable.
429    /// assert_eq!(
430    ///     mz_persist_client::PersistClient::CONTROLLER_CRITICAL_SINCE.to_string(),
431    ///     "c00000000-1111-2222-3333-444444444444",
432    /// )
433    /// ```
434    pub const CONTROLLER_CRITICAL_SINCE: CriticalReaderId =
435        CriticalReaderId([0, 0, 0, 0, 17, 17, 34, 34, 51, 51, 68, 68, 68, 68, 68, 68]);
436
437    /// Provides a capability for the durable TVC identified by `shard_id` at
438    /// its current since frontier.
439    ///
440    /// In contrast to the time-leased [ReadHandle] returned by [Self::open] and
441    /// [Self::open_leased_reader], this handle and its associated capability
442    /// are not leased. A [SinceHandle] does not release its since capability;
443    /// downgrade to the empty antichain to hold back the since.
444    /// Also unlike `ReadHandle`, the handle is not expired on drop.
445    /// This is less ergonomic, but useful for "critical" since
446    /// holds which must survive even lease timeouts.
447    ///
448    /// **IMPORTANT**: The above means that if a SinceHandle is registered and
449    /// then lost, the shard's since will be permanently "stuck", forever
450    /// preventing logical compaction. Users are advised to durably record
451    /// (preferably in code) the intended [CriticalReaderId] _before_ registering
452    /// a SinceHandle (in case the process crashes at the wrong time).
453    ///
454    /// If `shard_id` has never been used before, initializes a new shard and
455    /// return a handle with its `since` frontier set to the initial value of
456    /// `Antichain::from_elem(T::minimum())`.
457    #[instrument(level = "debug", fields(shard = %shard_id))]
458    pub async fn open_critical_since<K, V, T, D, O>(
459        &self,
460        shard_id: ShardId,
461        reader_id: CriticalReaderId,
462        diagnostics: Diagnostics,
463    ) -> Result<SinceHandle<K, V, T, D, O>, InvalidUsage<T>>
464    where
465        K: Debug + Codec,
466        V: Debug + Codec,
467        T: Timestamp + Lattice + Codec64 + Sync,
468        D: Semigroup + Codec64 + Send + Sync,
469        O: Opaque + Codec64,
470    {
471        let machine = self.make_machine(shard_id, diagnostics.clone()).await?;
472        let gc = GarbageCollector::new(machine.clone(), Arc::clone(&self.isolated_runtime));
473
474        let (state, maintenance) = machine
475            .register_critical_reader::<O>(&reader_id, &diagnostics.handle_purpose)
476            .await;
477        maintenance.start_performing(&machine, &gc);
478        let handle = SinceHandle::new(
479            machine,
480            gc,
481            reader_id,
482            state.since,
483            Codec64::decode(state.opaque.0),
484        );
485
486        Ok(handle)
487    }
488
489    /// [Self::open], but returning only a [WriteHandle].
490    ///
491    /// Use this to save latency and a bit of persist traffic if you're just
492    /// going to immediately drop or expire the [ReadHandle].
493    ///
494    /// The `_schema` parameter is currently unused, but should be an object
495    /// that represents the schema of the data in the shard. This will be required
496    /// in the future.
497    #[instrument(level = "debug", fields(shard = %shard_id))]
498    pub async fn open_writer<K, V, T, D>(
499        &self,
500        shard_id: ShardId,
501        key_schema: Arc<K::Schema>,
502        val_schema: Arc<V::Schema>,
503        diagnostics: Diagnostics,
504    ) -> Result<WriteHandle<K, V, T, D>, InvalidUsage<T>>
505    where
506        K: Debug + Codec,
507        V: Debug + Codec,
508        T: Timestamp + TotalOrder + Lattice + Codec64 + Sync,
509        D: Semigroup + Ord + Codec64 + Send + Sync,
510    {
511        let machine = self.make_machine(shard_id, diagnostics.clone()).await?;
512        let gc = GarbageCollector::new(machine.clone(), Arc::clone(&self.isolated_runtime));
513
514        // TODO: Because schemas are ordered, as part of the persist schema
515        // changes work, we probably want to build some way to allow persist
516        // users to control the order. For example, maybe a
517        // `PersistClient::compare_and_append_schema(current_schema_id,
518        // next_schema)`. Presumably this would then be passed in to open_writer
519        // instead of us implicitly registering it here.
520        // NB: The overwhelming common case is that this schema is already
521        // registered. In this case, the cmd breaks early and nothing is
522        // written to (or read from) CRDB.
523        let (schema_id, maintenance) = machine.register_schema(&*key_schema, &*val_schema).await;
524        maintenance.start_performing(&machine, &gc);
525        soft_assert_or_log!(
526            schema_id.is_some(),
527            "unable to register schemas {:?} {:?}",
528            key_schema,
529            val_schema,
530        );
531
532        let writer_id = WriterId::new();
533        let schemas = Schemas {
534            id: schema_id,
535            key: key_schema,
536            val: val_schema,
537        };
538        let writer = WriteHandle::new(
539            self.cfg.clone(),
540            Arc::clone(&self.metrics),
541            machine,
542            gc,
543            Arc::clone(&self.blob),
544            writer_id,
545            &diagnostics.handle_purpose,
546            schemas,
547        );
548        Ok(writer)
549    }
550
551    /// Returns a [BatchBuilder] that can be used to write a batch of updates to
552    /// blob storage which can then be appended to the given shard using
553    /// [WriteHandle::compare_and_append_batch] or [WriteHandle::append_batch],
554    /// or which can be read using [PersistClient::read_batches_consolidated].
555    ///
556    /// The builder uses a bounded amount of memory, even when the number of
557    /// updates is very large. Individual records, however, should be small
558    /// enough that we can reasonably chunk them up: O(KB) is definitely fine,
559    /// O(MB) come talk to us.
560    #[instrument(level = "debug", fields(shard = %shard_id))]
561    pub async fn batch_builder<K, V, T, D>(
562        &self,
563        shard_id: ShardId,
564        write_schemas: Schemas<K, V>,
565        lower: Antichain<T>,
566        max_runs: Option<usize>,
567    ) -> BatchBuilder<K, V, T, D>
568    where
569        K: Debug + Codec,
570        V: Debug + Codec,
571        T: Timestamp + Lattice + Codec64 + TotalOrder + Sync,
572        D: Semigroup + Ord + Codec64 + Send + Sync,
573    {
574        let mut compact_cfg = CompactConfig::new(&self.cfg, shard_id);
575        compact_cfg.batch.max_runs = max_runs;
576        WriteHandle::builder_inner(
577            &self.cfg,
578            compact_cfg,
579            Arc::clone(&self.metrics),
580            self.metrics.shards.shard(&shard_id, "peek_stash"),
581            &self.metrics.user,
582            Arc::clone(&self.isolated_runtime),
583            Arc::clone(&self.blob),
584            shard_id,
585            write_schemas,
586            lower,
587        )
588    }
589
590    /// Turns the given [`ProtoBatch`] back into a [`Batch`] which can be used
591    /// to append it to the given shard or to read it via
592    /// [PersistClient::read_batches_consolidated]
593    ///
594    /// CAUTION: This API allows turning a [ProtoBatch] into a [Batch] multiple
595    /// times, but if a batch is deleted the backing data goes away, so at that
596    /// point all in-memory copies of a batch become invalid and cannot be read
597    /// anymore.
598    pub fn batch_from_transmittable_batch<K, V, T, D>(
599        &self,
600        shard_id: &ShardId,
601        batch: ProtoBatch,
602    ) -> Batch<K, V, T, D>
603    where
604        K: Debug + Codec,
605        V: Debug + Codec,
606        T: Timestamp + Lattice + Codec64 + Sync,
607        D: Semigroup + Ord + Codec64 + Send + Sync,
608    {
609        let batch_shard_id: ShardId = batch
610            .shard_id
611            .into_rust()
612            .expect("valid transmittable batch");
613        assert_eq!(&batch_shard_id, shard_id);
614
615        let shard_metrics = self.metrics.shards.shard(shard_id, "peek_stash");
616
617        let ret = Batch {
618            batch_delete_enabled: BATCH_DELETE_ENABLED.get(&self.cfg),
619            metrics: Arc::clone(&self.metrics),
620            shard_metrics,
621            version: Version::parse(&batch.version).expect("valid transmittable batch"),
622            batch: batch
623                .batch
624                .into_rust_if_some("ProtoBatch::batch")
625                .expect("valid transmittable batch"),
626            blob: Arc::clone(&self.blob),
627            _phantom: std::marker::PhantomData,
628        };
629
630        assert_eq!(&ret.shard_id(), shard_id);
631        ret
632    }
633
634    /// Returns a [Cursor] for reading the given batches. Yielded updates are
635    /// consolidated if the given batches contain sorted runs, which is true
636    /// when they have been written using a [BatchBuilder].
637    ///
638    /// To keep memory usage down when reading a snapshot that consolidates
639    /// well, this consolidates as it goes. However, note that only the
640    /// serialized data is consolidated: the deserialized data will only be
641    /// consolidated if your K/V codecs are one-to-one.
642    ///
643    /// CAUTION: The caller needs to make sure that the given batches are
644    /// readable and they have to remain readable for the lifetime of the
645    /// returned [Cursor]. The caller is also responsible for the lifecycle of
646    /// the batches: once the cursor and the batches are no longer needed you
647    /// must call [Cursor::into_lease] to get back the batches and delete them.
648    #[allow(clippy::unused_async)]
649    pub async fn read_batches_consolidated<K, V, T, D>(
650        &mut self,
651        shard_id: ShardId,
652        as_of: Antichain<T>,
653        read_schemas: Schemas<K, V>,
654        batches: Vec<Batch<K, V, T, D>>,
655        should_fetch_part: impl for<'a> Fn(Option<&'a LazyPartStats>) -> bool,
656        memory_budget_bytes: usize,
657    ) -> Result<Cursor<K, V, T, D, Vec<Batch<K, V, T, D>>>, Since<T>>
658    where
659        K: Debug + Codec + Ord,
660        V: Debug + Codec + Ord,
661        T: Timestamp + Lattice + Codec64 + TotalOrder + Sync,
662        D: Semigroup + Ord + Codec64 + Send + Sync,
663    {
664        let shard_metrics = self.metrics.shards.shard(&shard_id, "peek_stash");
665
666        let hollow_batches = batches.iter().map(|b| b.batch.clone()).collect_vec();
667
668        ReadHandle::read_batches_consolidated(
669            &self.cfg,
670            Arc::clone(&self.metrics),
671            shard_metrics,
672            self.metrics.read.snapshot.clone(),
673            Arc::clone(&self.blob),
674            shard_id,
675            as_of,
676            read_schemas,
677            &hollow_batches,
678            batches,
679            should_fetch_part,
680            memory_budget_bytes,
681        )
682    }
683
684    /// Returns the requested schema, if known at the current state.
685    pub async fn get_schema<K, V, T, D>(
686        &self,
687        shard_id: ShardId,
688        schema_id: SchemaId,
689        diagnostics: Diagnostics,
690    ) -> Result<Option<(K::Schema, V::Schema)>, InvalidUsage<T>>
691    where
692        K: Debug + Codec,
693        V: Debug + Codec,
694        T: Timestamp + Lattice + Codec64 + Sync,
695        D: Semigroup + Codec64 + Send + Sync,
696    {
697        let machine = self
698            .make_machine::<K, V, T, D>(shard_id, diagnostics)
699            .await?;
700        Ok(machine.get_schema(schema_id))
701    }
702
703    /// Returns the latest schema registered at the current state.
704    pub async fn latest_schema<K, V, T, D>(
705        &self,
706        shard_id: ShardId,
707        diagnostics: Diagnostics,
708    ) -> Result<Option<(SchemaId, K::Schema, V::Schema)>, InvalidUsage<T>>
709    where
710        K: Debug + Codec,
711        V: Debug + Codec,
712        T: Timestamp + Lattice + Codec64 + Sync,
713        D: Semigroup + Codec64 + Send + Sync,
714    {
715        let machine = self
716            .make_machine::<K, V, T, D>(shard_id, diagnostics)
717            .await?;
718        Ok(machine.latest_schema())
719    }
720
721    /// Registers a new latest schema for the given shard.
722    ///
723    /// This new schema must be [backward_compatible] with all previous schemas
724    /// for this shard. If it's not, [CaESchema::Incompatible] is returned.
725    ///
726    /// [backward_compatible]: mz_persist_types::schema::backward_compatible
727    ///
728    /// To prevent races, the caller must declare what it believes to be the
729    /// latest schema id. If this doesn't match reality,
730    /// [CaESchema::ExpectedMismatch] is returned.
731    pub async fn compare_and_evolve_schema<K, V, T, D>(
732        &self,
733        shard_id: ShardId,
734        expected: SchemaId,
735        key_schema: &K::Schema,
736        val_schema: &V::Schema,
737        diagnostics: Diagnostics,
738    ) -> Result<CaESchema<K, V>, InvalidUsage<T>>
739    where
740        K: Debug + Codec,
741        V: Debug + Codec,
742        T: Timestamp + Lattice + Codec64 + Sync,
743        D: Semigroup + Codec64 + Send + Sync,
744    {
745        let machine = self
746            .make_machine::<K, V, T, D>(shard_id, diagnostics)
747            .await?;
748        let gc = GarbageCollector::new(machine.clone(), Arc::clone(&self.isolated_runtime));
749        let (res, maintenance) = machine
750            .compare_and_evolve_schema(expected, key_schema, val_schema)
751            .await;
752        maintenance.start_performing(&machine, &gc);
753        Ok(res)
754    }
755
756    /// Check if the given shard is in a finalized state; ie. it can no longer be
757    /// read, any data that was written to it is no longer accessible, and we've
758    /// discarded references to that data from state.
759    pub async fn is_finalized<K, V, T, D>(
760        &self,
761        shard_id: ShardId,
762        diagnostics: Diagnostics,
763    ) -> Result<bool, InvalidUsage<T>>
764    where
765        K: Debug + Codec,
766        V: Debug + Codec,
767        T: Timestamp + Lattice + Codec64 + Sync,
768        D: Semigroup + Codec64 + Send + Sync,
769    {
770        let machine = self
771            .make_machine::<K, V, T, D>(shard_id, diagnostics)
772            .await?;
773        Ok(machine.is_finalized())
774    }
775
776    /// If a shard is guaranteed to never be used again, finalize it to delete
777    /// the associated data and release any associated resources. (Except for a
778    /// little state in consensus we use to represent the tombstone.)
779    ///
780    /// The caller should ensure that both the `since` and `upper` of the shard
781    /// have been advanced to `[]`: ie. the shard is no longer writable or readable.
782    /// Otherwise an error is returned.
783    ///
784    /// Once `finalize_shard` has been called, the result of future operations on
785    /// the shard are not defined. They may return errors or succeed as a noop.
786    #[instrument(level = "debug", fields(shard = %shard_id))]
787    pub async fn finalize_shard<K, V, T, D>(
788        &self,
789        shard_id: ShardId,
790        diagnostics: Diagnostics,
791    ) -> Result<(), InvalidUsage<T>>
792    where
793        K: Debug + Codec,
794        V: Debug + Codec,
795        T: Timestamp + Lattice + Codec64 + Sync,
796        D: Semigroup + Codec64 + Send + Sync,
797    {
798        let machine = self
799            .make_machine::<K, V, T, D>(shard_id, diagnostics)
800            .await?;
801
802        let maintenance = machine.become_tombstone().await?;
803        let gc = GarbageCollector::new(machine.clone(), Arc::clone(&self.isolated_runtime));
804
805        let () = maintenance.perform(&machine, &gc).await;
806
807        Ok(())
808    }
809
810    /// Returns the internal state of the shard for debugging and QA.
811    ///
812    /// We'll be thoughtful about making unnecessary changes, but the **output
813    /// of this method needs to be gated from users**, so that it's not subject
814    /// to our backward compatibility guarantees.
815    pub async fn inspect_shard<T: Timestamp + Lattice + Codec64>(
816        &self,
817        shard_id: &ShardId,
818    ) -> Result<impl serde::Serialize, anyhow::Error> {
819        let state_versions = StateVersions::new(
820            self.cfg.clone(),
821            Arc::clone(&self.consensus),
822            Arc::clone(&self.blob),
823            Arc::clone(&self.metrics),
824        );
825        // TODO: Don't fetch all live diffs. Feels like we should pull out a new
826        // method in StateVersions for fetching the latest version of State of a
827        // shard that might or might not exist.
828        let versions = state_versions.fetch_all_live_diffs(shard_id).await;
829        if versions.0.is_empty() {
830            return Err(anyhow::anyhow!("{} does not exist", shard_id));
831        }
832        let state = state_versions
833            .fetch_current_state::<T>(shard_id, versions.0)
834            .await;
835        let state = state.check_ts_codec(shard_id)?;
836        Ok(state)
837    }
838
839    /// Test helper for a [Self::open] call that is expected to succeed.
840    #[cfg(test)]
841    #[track_caller]
842    pub async fn expect_open<K, V, T, D>(
843        &self,
844        shard_id: ShardId,
845    ) -> (WriteHandle<K, V, T, D>, ReadHandle<K, V, T, D>)
846    where
847        K: Debug + Codec,
848        V: Debug + Codec,
849        T: Timestamp + TotalOrder + Lattice + Codec64 + Sync,
850        D: Semigroup + Ord + Codec64 + Send + Sync,
851        K::Schema: Default,
852        V::Schema: Default,
853    {
854        self.open(
855            shard_id,
856            Arc::new(K::Schema::default()),
857            Arc::new(V::Schema::default()),
858            Diagnostics::for_tests(),
859            true,
860        )
861        .await
862        .expect("codec mismatch")
863    }
864
865    /// Return the metrics being used by this client.
866    ///
867    /// Only exposed for tests, persistcli, and benchmarks.
868    pub fn metrics(&self) -> &Arc<Metrics> {
869        &self.metrics
870    }
871}
872
873#[cfg(test)]
874mod tests {
875    use std::future::Future;
876    use std::mem;
877    use std::pin::Pin;
878    use std::task::Context;
879    use std::time::Duration;
880
881    use differential_dataflow::consolidation::consolidate_updates;
882    use differential_dataflow::lattice::Lattice;
883    use futures_task::noop_waker;
884    use mz_dyncfg::ConfigUpdates;
885    use mz_ore::assert_ok;
886    use mz_persist::indexed::encoding::BlobTraceBatchPart;
887    use mz_persist::workload::DataGenerator;
888    use mz_persist_types::codec_impls::{StringSchema, VecU8Schema};
889    use mz_proto::protobuf_roundtrip;
890    use proptest::prelude::*;
891    use timely::order::PartialOrder;
892    use timely::progress::Antichain;
893
894    use crate::batch::BLOB_TARGET_SIZE;
895    use crate::cache::PersistClientCache;
896    use crate::cfg::BATCH_BUILDER_MAX_OUTSTANDING_PARTS;
897    use crate::error::{CodecConcreteType, CodecMismatch, UpperMismatch};
898    use crate::internal::paths::BlobKey;
899    use crate::read::ListenEvent;
900
901    use super::*;
902
903    pub fn new_test_client_cache(dyncfgs: &ConfigUpdates) -> PersistClientCache {
904        // Configure an aggressively small blob_target_size so we get some
905        // amount of coverage of that in tests. Similarly, for max_outstanding.
906        let mut cache = PersistClientCache::new_no_metrics();
907        cache.cfg.set_config(&BLOB_TARGET_SIZE, 10);
908        cache
909            .cfg
910            .set_config(&BATCH_BUILDER_MAX_OUTSTANDING_PARTS, 1);
911        dyncfgs.apply(cache.cfg());
912
913        // Enable compaction in tests to ensure we get coverage.
914        cache.cfg.compaction_enabled = true;
915        cache
916    }
917
918    pub async fn new_test_client(dyncfgs: &ConfigUpdates) -> PersistClient {
919        let cache = new_test_client_cache(dyncfgs);
920        cache
921            .open(PersistLocation::new_in_mem())
922            .await
923            .expect("client construction failed")
924    }
925
926    pub fn all_ok<'a, K, V, T, D, I>(
927        iter: I,
928        as_of: T,
929    ) -> Vec<((Result<K, String>, Result<V, String>), T, D)>
930    where
931        K: Ord + Clone + 'a,
932        V: Ord + Clone + 'a,
933        T: Timestamp + Lattice + Clone + 'a,
934        D: Semigroup + Clone + 'a,
935        I: IntoIterator<Item = &'a ((K, V), T, D)>,
936    {
937        let as_of = Antichain::from_elem(as_of);
938        let mut ret = iter
939            .into_iter()
940            .map(|((k, v), t, d)| {
941                let mut t = t.clone();
942                t.advance_by(as_of.borrow());
943                ((Ok(k.clone()), Ok(v.clone())), t, d.clone())
944            })
945            .collect();
946        consolidate_updates(&mut ret);
947        ret
948    }
949
950    pub async fn expect_fetch_part<K, V, T, D>(
951        blob: &dyn Blob,
952        key: &BlobKey,
953        metrics: &Metrics,
954        read_schemas: &Schemas<K, V>,
955    ) -> (
956        BlobTraceBatchPart<T>,
957        Vec<((Result<K, String>, Result<V, String>), T, D)>,
958    )
959    where
960        K: Codec,
961        V: Codec,
962        T: Timestamp + Codec64,
963        D: Codec64,
964    {
965        let value = blob
966            .get(key)
967            .await
968            .expect("failed to fetch part")
969            .expect("missing part");
970        let mut part =
971            BlobTraceBatchPart::decode(&value, &metrics.columnar).expect("failed to decode part");
972        // Ensure codec data is present even if it was not generated at write time.
973        let _ = part
974            .updates
975            .get_or_make_codec::<K, V>(&read_schemas.key, &read_schemas.val);
976        let mut updates = Vec::new();
977        // TODO(bkirwi): switch to structured data in tests
978        for ((k, v), t, d) in part.updates.records().expect("codec data").iter() {
979            updates.push((
980                (
981                    K::decode(k, &read_schemas.key),
982                    V::decode(v, &read_schemas.val),
983                ),
984                T::decode(t),
985                D::decode(d),
986            ));
987        }
988        (part, updates)
989    }
990
991    #[mz_persist_proc::test(tokio::test)]
992    #[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented
993    async fn sanity_check(dyncfgs: ConfigUpdates) {
994        let data = [
995            (("1".to_owned(), "one".to_owned()), 1, 1),
996            (("2".to_owned(), "two".to_owned()), 2, 1),
997            (("3".to_owned(), "three".to_owned()), 3, 1),
998        ];
999
1000        let (mut write, mut read) = new_test_client(&dyncfgs)
1001            .await
1002            .expect_open::<String, String, u64, i64>(ShardId::new())
1003            .await;
1004        assert_eq!(write.upper(), &Antichain::from_elem(u64::minimum()));
1005        assert_eq!(read.since(), &Antichain::from_elem(u64::minimum()));
1006
1007        // Write a [0,3) batch.
1008        write
1009            .expect_append(&data[..2], write.upper().clone(), vec![3])
1010            .await;
1011        assert_eq!(write.upper(), &Antichain::from_elem(3));
1012
1013        // Grab a snapshot and listener as_of 1. Snapshot should only have part of what we wrote.
1014        assert_eq!(
1015            read.expect_snapshot_and_fetch(1).await,
1016            all_ok(&data[..1], 1)
1017        );
1018
1019        let mut listen = read.clone("").await.expect_listen(1).await;
1020
1021        // Write a [3,4) batch.
1022        write
1023            .expect_append(&data[2..], write.upper().clone(), vec![4])
1024            .await;
1025        assert_eq!(write.upper(), &Antichain::from_elem(4));
1026
1027        // Listen should have part of the initial write plus the new one.
1028        assert_eq!(
1029            listen.read_until(&4).await,
1030            (all_ok(&data[1..], 1), Antichain::from_elem(4))
1031        );
1032
1033        // Downgrading the since is tracked locally (but otherwise is a no-op).
1034        read.downgrade_since(&Antichain::from_elem(2)).await;
1035        assert_eq!(read.since(), &Antichain::from_elem(2));
1036    }
1037
1038    // Sanity check that the open_reader and open_writer calls work.
1039    #[mz_persist_proc::test(tokio::test)]
1040    #[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented
1041    async fn open_reader_writer(dyncfgs: ConfigUpdates) {
1042        let data = vec![
1043            (("1".to_owned(), "one".to_owned()), 1, 1),
1044            (("2".to_owned(), "two".to_owned()), 2, 1),
1045            (("3".to_owned(), "three".to_owned()), 3, 1),
1046        ];
1047
1048        let shard_id = ShardId::new();
1049        let client = new_test_client(&dyncfgs).await;
1050        let mut write1 = client
1051            .open_writer::<String, String, u64, i64>(
1052                shard_id,
1053                Arc::new(StringSchema),
1054                Arc::new(StringSchema),
1055                Diagnostics::for_tests(),
1056            )
1057            .await
1058            .expect("codec mismatch");
1059        let mut read1 = client
1060            .open_leased_reader::<String, String, u64, i64>(
1061                shard_id,
1062                Arc::new(StringSchema),
1063                Arc::new(StringSchema),
1064                Diagnostics::for_tests(),
1065                true,
1066            )
1067            .await
1068            .expect("codec mismatch");
1069        let mut read2 = client
1070            .open_leased_reader::<String, String, u64, i64>(
1071                shard_id,
1072                Arc::new(StringSchema),
1073                Arc::new(StringSchema),
1074                Diagnostics::for_tests(),
1075                true,
1076            )
1077            .await
1078            .expect("codec mismatch");
1079        let mut write2 = client
1080            .open_writer::<String, String, u64, i64>(
1081                shard_id,
1082                Arc::new(StringSchema),
1083                Arc::new(StringSchema),
1084                Diagnostics::for_tests(),
1085            )
1086            .await
1087            .expect("codec mismatch");
1088
1089        write2.expect_compare_and_append(&data[..1], 0, 2).await;
1090        assert_eq!(
1091            read2.expect_snapshot_and_fetch(1).await,
1092            all_ok(&data[..1], 1)
1093        );
1094        write1.expect_compare_and_append(&data[1..], 2, 4).await;
1095        assert_eq!(read1.expect_snapshot_and_fetch(3).await, all_ok(&data, 3));
1096    }
1097
1098    #[mz_persist_proc::test(tokio::test)]
1099    #[cfg_attr(miri, ignore)] // too slow
1100    async fn invalid_usage(dyncfgs: ConfigUpdates) {
1101        let data = vec![
1102            (("1".to_owned(), "one".to_owned()), 1, 1),
1103            (("2".to_owned(), "two".to_owned()), 2, 1),
1104            (("3".to_owned(), "three".to_owned()), 3, 1),
1105        ];
1106
1107        let shard_id0 = "s00000000-0000-0000-0000-000000000000"
1108            .parse::<ShardId>()
1109            .expect("invalid shard id");
1110        let mut client = new_test_client(&dyncfgs).await;
1111
1112        let (mut write0, mut read0) = client
1113            .expect_open::<String, String, u64, i64>(shard_id0)
1114            .await;
1115
1116        write0.expect_compare_and_append(&data, 0, 4).await;
1117
1118        // InvalidUsage from PersistClient methods.
1119        {
1120            fn codecs(
1121                k: &str,
1122                v: &str,
1123                t: &str,
1124                d: &str,
1125            ) -> (String, String, String, String, Option<CodecConcreteType>) {
1126                (k.to_owned(), v.to_owned(), t.to_owned(), d.to_owned(), None)
1127            }
1128
1129            client.shared_states = Arc::new(StateCache::new_no_metrics());
1130            assert_eq!(
1131                client
1132                    .open::<Vec<u8>, String, u64, i64>(
1133                        shard_id0,
1134                        Arc::new(VecU8Schema),
1135                        Arc::new(StringSchema),
1136                        Diagnostics::for_tests(),
1137                        true,
1138                    )
1139                    .await
1140                    .unwrap_err(),
1141                InvalidUsage::CodecMismatch(Box::new(CodecMismatch {
1142                    requested: codecs("Vec<u8>", "String", "u64", "i64"),
1143                    actual: codecs("String", "String", "u64", "i64"),
1144                }))
1145            );
1146            assert_eq!(
1147                client
1148                    .open::<String, Vec<u8>, u64, i64>(
1149                        shard_id0,
1150                        Arc::new(StringSchema),
1151                        Arc::new(VecU8Schema),
1152                        Diagnostics::for_tests(),
1153                        true,
1154                    )
1155                    .await
1156                    .unwrap_err(),
1157                InvalidUsage::CodecMismatch(Box::new(CodecMismatch {
1158                    requested: codecs("String", "Vec<u8>", "u64", "i64"),
1159                    actual: codecs("String", "String", "u64", "i64"),
1160                }))
1161            );
1162            assert_eq!(
1163                client
1164                    .open::<String, String, i64, i64>(
1165                        shard_id0,
1166                        Arc::new(StringSchema),
1167                        Arc::new(StringSchema),
1168                        Diagnostics::for_tests(),
1169                        true,
1170                    )
1171                    .await
1172                    .unwrap_err(),
1173                InvalidUsage::CodecMismatch(Box::new(CodecMismatch {
1174                    requested: codecs("String", "String", "i64", "i64"),
1175                    actual: codecs("String", "String", "u64", "i64"),
1176                }))
1177            );
1178            assert_eq!(
1179                client
1180                    .open::<String, String, u64, u64>(
1181                        shard_id0,
1182                        Arc::new(StringSchema),
1183                        Arc::new(StringSchema),
1184                        Diagnostics::for_tests(),
1185                        true,
1186                    )
1187                    .await
1188                    .unwrap_err(),
1189                InvalidUsage::CodecMismatch(Box::new(CodecMismatch {
1190                    requested: codecs("String", "String", "u64", "u64"),
1191                    actual: codecs("String", "String", "u64", "i64"),
1192                }))
1193            );
1194
1195            // open_reader and open_writer end up using the same checks, so just
1196            // verify one type each to verify the plumbing instead of the full
1197            // set.
1198            assert_eq!(
1199                client
1200                    .open_leased_reader::<Vec<u8>, String, u64, i64>(
1201                        shard_id0,
1202                        Arc::new(VecU8Schema),
1203                        Arc::new(StringSchema),
1204                        Diagnostics::for_tests(),
1205                        true,
1206                    )
1207                    .await
1208                    .unwrap_err(),
1209                InvalidUsage::CodecMismatch(Box::new(CodecMismatch {
1210                    requested: codecs("Vec<u8>", "String", "u64", "i64"),
1211                    actual: codecs("String", "String", "u64", "i64"),
1212                }))
1213            );
1214            assert_eq!(
1215                client
1216                    .open_writer::<Vec<u8>, String, u64, i64>(
1217                        shard_id0,
1218                        Arc::new(VecU8Schema),
1219                        Arc::new(StringSchema),
1220                        Diagnostics::for_tests(),
1221                    )
1222                    .await
1223                    .unwrap_err(),
1224                InvalidUsage::CodecMismatch(Box::new(CodecMismatch {
1225                    requested: codecs("Vec<u8>", "String", "u64", "i64"),
1226                    actual: codecs("String", "String", "u64", "i64"),
1227                }))
1228            );
1229        }
1230
1231        // InvalidUsage from ReadHandle methods.
1232        {
1233            let snap = read0
1234                .snapshot(Antichain::from_elem(3))
1235                .await
1236                .expect("cannot serve requested as_of");
1237
1238            let shard_id1 = "s11111111-1111-1111-1111-111111111111"
1239                .parse::<ShardId>()
1240                .expect("invalid shard id");
1241            let mut fetcher1 = client
1242                .create_batch_fetcher::<String, String, u64, i64>(
1243                    shard_id1,
1244                    Default::default(),
1245                    Default::default(),
1246                    false,
1247                    Diagnostics::for_tests(),
1248                )
1249                .await
1250                .unwrap();
1251            for part in snap {
1252                let (part, _lease) = part.into_exchangeable_part();
1253                let res = fetcher1.fetch_leased_part(part).await;
1254                assert_eq!(
1255                    res.unwrap_err(),
1256                    InvalidUsage::BatchNotFromThisShard {
1257                        batch_shard: shard_id0,
1258                        handle_shard: shard_id1,
1259                    }
1260                );
1261            }
1262        }
1263
1264        // InvalidUsage from WriteHandle methods.
1265        {
1266            let ts3 = &data[2];
1267            assert_eq!(ts3.1, 3);
1268            let ts3 = vec![ts3.clone()];
1269
1270            // WriteHandle::append also covers append_batch,
1271            // compare_and_append_batch, compare_and_append.
1272            assert_eq!(
1273                write0
1274                    .append(&ts3, Antichain::from_elem(4), Antichain::from_elem(5))
1275                    .await
1276                    .unwrap_err(),
1277                InvalidUsage::UpdateNotBeyondLower {
1278                    ts: 3,
1279                    lower: Antichain::from_elem(4),
1280                },
1281            );
1282            assert_eq!(
1283                write0
1284                    .append(&ts3, Antichain::from_elem(2), Antichain::from_elem(3))
1285                    .await
1286                    .unwrap_err(),
1287                InvalidUsage::UpdateBeyondUpper {
1288                    ts: 3,
1289                    expected_upper: Antichain::from_elem(3),
1290                },
1291            );
1292            // NB unlike the previous tests, this one has empty updates.
1293            assert_eq!(
1294                write0
1295                    .append(&data[..0], Antichain::from_elem(3), Antichain::from_elem(2))
1296                    .await
1297                    .unwrap_err(),
1298                InvalidUsage::InvalidBounds {
1299                    lower: Antichain::from_elem(3),
1300                    upper: Antichain::from_elem(2),
1301                },
1302            );
1303
1304            // Tests for the BatchBuilder.
1305            assert_eq!(
1306                write0
1307                    .builder(Antichain::from_elem(3))
1308                    .finish(Antichain::from_elem(2))
1309                    .await
1310                    .unwrap_err(),
1311                InvalidUsage::InvalidBounds {
1312                    lower: Antichain::from_elem(3),
1313                    upper: Antichain::from_elem(2)
1314                },
1315            );
1316            let batch = write0
1317                .batch(&ts3, Antichain::from_elem(3), Antichain::from_elem(4))
1318                .await
1319                .expect("invalid usage");
1320            assert_eq!(
1321                write0
1322                    .append_batch(batch, Antichain::from_elem(4), Antichain::from_elem(5))
1323                    .await
1324                    .unwrap_err(),
1325                InvalidUsage::InvalidBatchBounds {
1326                    batch_lower: Antichain::from_elem(3),
1327                    batch_upper: Antichain::from_elem(4),
1328                    append_lower: Antichain::from_elem(4),
1329                    append_upper: Antichain::from_elem(5),
1330                },
1331            );
1332            let batch = write0
1333                .batch(&ts3, Antichain::from_elem(3), Antichain::from_elem(4))
1334                .await
1335                .expect("invalid usage");
1336            assert_eq!(
1337                write0
1338                    .append_batch(batch, Antichain::from_elem(2), Antichain::from_elem(3))
1339                    .await
1340                    .unwrap_err(),
1341                InvalidUsage::InvalidBatchBounds {
1342                    batch_lower: Antichain::from_elem(3),
1343                    batch_upper: Antichain::from_elem(4),
1344                    append_lower: Antichain::from_elem(2),
1345                    append_upper: Antichain::from_elem(3),
1346                },
1347            );
1348            let batch = write0
1349                .batch(&ts3, Antichain::from_elem(3), Antichain::from_elem(4))
1350                .await
1351                .expect("invalid usage");
1352            // NB unlike the others, this one uses matches! because it's
1353            // non-deterministic (the key)
1354            assert!(matches!(
1355                write0
1356                    .append_batch(batch, Antichain::from_elem(3), Antichain::from_elem(3))
1357                    .await
1358                    .unwrap_err(),
1359                InvalidUsage::InvalidEmptyTimeInterval { .. }
1360            ));
1361        }
1362    }
1363
1364    #[mz_persist_proc::test(tokio::test)]
1365    #[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented
1366    async fn multiple_shards(dyncfgs: ConfigUpdates) {
1367        let data1 = [
1368            (("1".to_owned(), "one".to_owned()), 1, 1),
1369            (("2".to_owned(), "two".to_owned()), 2, 1),
1370        ];
1371
1372        let data2 = [(("1".to_owned(), ()), 1, 1), (("2".to_owned(), ()), 2, 1)];
1373
1374        let client = new_test_client(&dyncfgs).await;
1375
1376        let (mut write1, mut read1) = client
1377            .expect_open::<String, String, u64, i64>(ShardId::new())
1378            .await;
1379
1380        // Different types, so that checks would fail in case we were not separating these
1381        // collections internally.
1382        let (mut write2, mut read2) = client
1383            .expect_open::<String, (), u64, i64>(ShardId::new())
1384            .await;
1385
1386        write1
1387            .expect_compare_and_append(&data1[..], u64::minimum(), 3)
1388            .await;
1389
1390        write2
1391            .expect_compare_and_append(&data2[..], u64::minimum(), 3)
1392            .await;
1393
1394        assert_eq!(
1395            read1.expect_snapshot_and_fetch(2).await,
1396            all_ok(&data1[..], 2)
1397        );
1398
1399        assert_eq!(
1400            read2.expect_snapshot_and_fetch(2).await,
1401            all_ok(&data2[..], 2)
1402        );
1403    }
1404
1405    #[mz_persist_proc::test(tokio::test)]
1406    #[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented
1407    async fn fetch_upper(dyncfgs: ConfigUpdates) {
1408        let data = [
1409            (("1".to_owned(), "one".to_owned()), 1, 1),
1410            (("2".to_owned(), "two".to_owned()), 2, 1),
1411        ];
1412
1413        let client = new_test_client(&dyncfgs).await;
1414
1415        let shard_id = ShardId::new();
1416
1417        let (mut write1, _read1) = client
1418            .expect_open::<String, String, u64, i64>(shard_id)
1419            .await;
1420
1421        let (mut write2, _read2) = client
1422            .expect_open::<String, String, u64, i64>(shard_id)
1423            .await;
1424
1425        write1
1426            .expect_append(&data[..], write1.upper().clone(), vec![3])
1427            .await;
1428
1429        // The shard-global upper does advance, even if this writer didn't advance its local upper.
1430        assert_eq!(write2.fetch_recent_upper().await, &Antichain::from_elem(3));
1431
1432        // The writer-local upper should advance, even if it was another writer
1433        // that advanced the frontier.
1434        assert_eq!(write2.upper(), &Antichain::from_elem(3));
1435    }
1436
1437    #[mz_persist_proc::test(tokio::test)]
1438    #[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented
1439    async fn append_with_invalid_upper(dyncfgs: ConfigUpdates) {
1440        let data = [
1441            (("1".to_owned(), "one".to_owned()), 1, 1),
1442            (("2".to_owned(), "two".to_owned()), 2, 1),
1443        ];
1444
1445        let client = new_test_client(&dyncfgs).await;
1446
1447        let shard_id = ShardId::new();
1448
1449        let (mut write, _read) = client
1450            .expect_open::<String, String, u64, i64>(shard_id)
1451            .await;
1452
1453        write
1454            .expect_append(&data[..], write.upper().clone(), vec![3])
1455            .await;
1456
1457        let data = [
1458            (("5".to_owned(), "fünf".to_owned()), 5, 1),
1459            (("6".to_owned(), "sechs".to_owned()), 6, 1),
1460        ];
1461        let res = write
1462            .append(
1463                data.iter(),
1464                Antichain::from_elem(5),
1465                Antichain::from_elem(7),
1466            )
1467            .await;
1468        assert_eq!(
1469            res,
1470            Ok(Err(UpperMismatch {
1471                expected: Antichain::from_elem(5),
1472                current: Antichain::from_elem(3)
1473            }))
1474        );
1475
1476        // Writing with an outdated upper updates the write handle's upper to the correct upper.
1477        assert_eq!(write.upper(), &Antichain::from_elem(3));
1478    }
1479
1480    // Make sure that the API structs are Sync + Send, so that they can be used in async tasks.
1481    // NOTE: This is a compile-time only test. If it compiles, we're good.
1482    #[allow(unused)]
1483    async fn sync_send(dyncfgs: ConfigUpdates) {
1484        mz_ore::test::init_logging();
1485
1486        fn is_send_sync<T: Send + Sync>(_x: T) -> bool {
1487            true
1488        }
1489
1490        let client = new_test_client(&dyncfgs).await;
1491
1492        let (write, read) = client
1493            .expect_open::<String, String, u64, i64>(ShardId::new())
1494            .await;
1495
1496        assert!(is_send_sync(client));
1497        assert!(is_send_sync(write));
1498        assert!(is_send_sync(read));
1499    }
1500
1501    #[mz_persist_proc::test(tokio::test)]
1502    #[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented
1503    async fn compare_and_append(dyncfgs: ConfigUpdates) {
1504        let data = vec![
1505            (("1".to_owned(), "one".to_owned()), 1, 1),
1506            (("2".to_owned(), "two".to_owned()), 2, 1),
1507            (("3".to_owned(), "three".to_owned()), 3, 1),
1508        ];
1509
1510        let id = ShardId::new();
1511        let client = new_test_client(&dyncfgs).await;
1512        let (mut write1, mut read) = client.expect_open::<String, String, u64, i64>(id).await;
1513
1514        let (mut write2, _read) = client.expect_open::<String, String, u64, i64>(id).await;
1515
1516        assert_eq!(write1.upper(), &Antichain::from_elem(u64::minimum()));
1517        assert_eq!(write2.upper(), &Antichain::from_elem(u64::minimum()));
1518        assert_eq!(read.since(), &Antichain::from_elem(u64::minimum()));
1519
1520        // Write a [0,3) batch.
1521        write1
1522            .expect_compare_and_append(&data[..2], u64::minimum(), 3)
1523            .await;
1524        assert_eq!(write1.upper(), &Antichain::from_elem(3));
1525
1526        assert_eq!(
1527            read.expect_snapshot_and_fetch(2).await,
1528            all_ok(&data[..2], 2)
1529        );
1530
1531        // Try and write with a wrong expected upper.
1532        let res = write2
1533            .compare_and_append(
1534                &data[..2],
1535                Antichain::from_elem(u64::minimum()),
1536                Antichain::from_elem(3),
1537            )
1538            .await;
1539        assert_eq!(
1540            res,
1541            Ok(Err(UpperMismatch {
1542                expected: Antichain::from_elem(u64::minimum()),
1543                current: Antichain::from_elem(3)
1544            }))
1545        );
1546
1547        // A failed write updates our local cache of the shard upper.
1548        assert_eq!(write2.upper(), &Antichain::from_elem(3));
1549
1550        // Try again with a good expected upper.
1551        write2.expect_compare_and_append(&data[2..], 3, 4).await;
1552
1553        assert_eq!(write2.upper(), &Antichain::from_elem(4));
1554
1555        assert_eq!(read.expect_snapshot_and_fetch(3).await, all_ok(&data, 3));
1556    }
1557
1558    #[mz_persist_proc::test(tokio::test)]
1559    #[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented
1560    async fn overlapping_append(dyncfgs: ConfigUpdates) {
1561        mz_ore::test::init_logging_default("info");
1562
1563        let data = vec![
1564            (("1".to_owned(), "one".to_owned()), 1, 1),
1565            (("2".to_owned(), "two".to_owned()), 2, 1),
1566            (("3".to_owned(), "three".to_owned()), 3, 1),
1567            (("4".to_owned(), "vier".to_owned()), 4, 1),
1568            (("5".to_owned(), "cinque".to_owned()), 5, 1),
1569        ];
1570
1571        let id = ShardId::new();
1572        let client = new_test_client(&dyncfgs).await;
1573
1574        let (mut write1, mut read) = client.expect_open::<String, String, u64, i64>(id).await;
1575
1576        let (mut write2, _read) = client.expect_open::<String, String, u64, i64>(id).await;
1577
1578        // Grab a listener before we do any writing
1579        let mut listen = read.clone("").await.expect_listen(0).await;
1580
1581        // Write a [0,3) batch.
1582        write1
1583            .expect_append(&data[..2], write1.upper().clone(), vec![3])
1584            .await;
1585        assert_eq!(write1.upper(), &Antichain::from_elem(3));
1586
1587        // Write a [0,5) batch with the second writer.
1588        write2
1589            .expect_append(&data[..4], write2.upper().clone(), vec![5])
1590            .await;
1591        assert_eq!(write2.upper(), &Antichain::from_elem(5));
1592
1593        // Write a [3,6) batch with the first writer.
1594        write1
1595            .expect_append(&data[2..5], write1.upper().clone(), vec![6])
1596            .await;
1597        assert_eq!(write1.upper(), &Antichain::from_elem(6));
1598
1599        assert_eq!(read.expect_snapshot_and_fetch(5).await, all_ok(&data, 5));
1600
1601        assert_eq!(
1602            listen.read_until(&6).await,
1603            (all_ok(&data[..], 1), Antichain::from_elem(6))
1604        );
1605    }
1606
1607    // Appends need to be contiguous for a shard, meaning the lower of an appended batch must not
1608    // be in advance of the current shard upper.
1609    #[mz_persist_proc::test(tokio::test)]
1610    #[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented
1611    async fn contiguous_append(dyncfgs: ConfigUpdates) {
1612        let data = vec![
1613            (("1".to_owned(), "one".to_owned()), 1, 1),
1614            (("2".to_owned(), "two".to_owned()), 2, 1),
1615            (("3".to_owned(), "three".to_owned()), 3, 1),
1616            (("4".to_owned(), "vier".to_owned()), 4, 1),
1617            (("5".to_owned(), "cinque".to_owned()), 5, 1),
1618        ];
1619
1620        let id = ShardId::new();
1621        let client = new_test_client(&dyncfgs).await;
1622
1623        let (mut write, mut read) = client.expect_open::<String, String, u64, i64>(id).await;
1624
1625        // Write a [0,3) batch.
1626        write
1627            .expect_append(&data[..2], write.upper().clone(), vec![3])
1628            .await;
1629        assert_eq!(write.upper(), &Antichain::from_elem(3));
1630
1631        // Appending a non-contiguous batch should fail.
1632        // Write a [5,6) batch with the second writer.
1633        let result = write
1634            .append(
1635                &data[4..5],
1636                Antichain::from_elem(5),
1637                Antichain::from_elem(6),
1638            )
1639            .await;
1640        assert_eq!(
1641            result,
1642            Ok(Err(UpperMismatch {
1643                expected: Antichain::from_elem(5),
1644                current: Antichain::from_elem(3)
1645            }))
1646        );
1647
1648        // Fixing the lower to make the write contiguous should make the append succeed.
1649        write.expect_append(&data[2..5], vec![3], vec![6]).await;
1650        assert_eq!(write.upper(), &Antichain::from_elem(6));
1651
1652        assert_eq!(read.expect_snapshot_and_fetch(5).await, all_ok(&data, 5));
1653    }
1654
1655    // Per-writer appends can be non-contiguous, as long as appends to the shard from all writers
1656    // combined are contiguous.
1657    #[mz_persist_proc::test(tokio::test)]
1658    #[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented
1659    async fn noncontiguous_append_per_writer(dyncfgs: ConfigUpdates) {
1660        let data = vec![
1661            (("1".to_owned(), "one".to_owned()), 1, 1),
1662            (("2".to_owned(), "two".to_owned()), 2, 1),
1663            (("3".to_owned(), "three".to_owned()), 3, 1),
1664            (("4".to_owned(), "vier".to_owned()), 4, 1),
1665            (("5".to_owned(), "cinque".to_owned()), 5, 1),
1666        ];
1667
1668        let id = ShardId::new();
1669        let client = new_test_client(&dyncfgs).await;
1670
1671        let (mut write1, mut read) = client.expect_open::<String, String, u64, i64>(id).await;
1672
1673        let (mut write2, _read) = client.expect_open::<String, String, u64, i64>(id).await;
1674
1675        // Write a [0,3) batch with writer 1.
1676        write1
1677            .expect_append(&data[..2], write1.upper().clone(), vec![3])
1678            .await;
1679        assert_eq!(write1.upper(), &Antichain::from_elem(3));
1680
1681        // Write a [3,5) batch with writer 2.
1682        write2.upper = Antichain::from_elem(3);
1683        write2
1684            .expect_append(&data[2..4], write2.upper().clone(), vec![5])
1685            .await;
1686        assert_eq!(write2.upper(), &Antichain::from_elem(5));
1687
1688        // Write a [5,6) batch with writer 1.
1689        write1.upper = Antichain::from_elem(5);
1690        write1
1691            .expect_append(&data[4..5], write1.upper().clone(), vec![6])
1692            .await;
1693        assert_eq!(write1.upper(), &Antichain::from_elem(6));
1694
1695        assert_eq!(read.expect_snapshot_and_fetch(5).await, all_ok(&data, 5));
1696    }
1697
1698    // Compare_and_appends need to be contiguous for a shard, meaning the lower of an appended
1699    // batch needs to match the current shard upper.
1700    #[mz_persist_proc::test(tokio::test)]
1701    #[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented
1702    async fn contiguous_compare_and_append(dyncfgs: ConfigUpdates) {
1703        let data = vec![
1704            (("1".to_owned(), "one".to_owned()), 1, 1),
1705            (("2".to_owned(), "two".to_owned()), 2, 1),
1706            (("3".to_owned(), "three".to_owned()), 3, 1),
1707            (("4".to_owned(), "vier".to_owned()), 4, 1),
1708            (("5".to_owned(), "cinque".to_owned()), 5, 1),
1709        ];
1710
1711        let id = ShardId::new();
1712        let client = new_test_client(&dyncfgs).await;
1713
1714        let (mut write, mut read) = client.expect_open::<String, String, u64, i64>(id).await;
1715
1716        // Write a [0,3) batch.
1717        write.expect_compare_and_append(&data[..2], 0, 3).await;
1718        assert_eq!(write.upper(), &Antichain::from_elem(3));
1719
1720        // Appending a non-contiguous batch should fail.
1721        // Write a [5,6) batch with the second writer.
1722        let result = write
1723            .compare_and_append(
1724                &data[4..5],
1725                Antichain::from_elem(5),
1726                Antichain::from_elem(6),
1727            )
1728            .await;
1729        assert_eq!(
1730            result,
1731            Ok(Err(UpperMismatch {
1732                expected: Antichain::from_elem(5),
1733                current: Antichain::from_elem(3)
1734            }))
1735        );
1736
1737        // Writing with the correct expected upper to make the write contiguous should make the
1738        // append succeed.
1739        write.expect_compare_and_append(&data[2..5], 3, 6).await;
1740        assert_eq!(write.upper(), &Antichain::from_elem(6));
1741
1742        assert_eq!(read.expect_snapshot_and_fetch(5).await, all_ok(&data, 5));
1743    }
1744
1745    // Per-writer compare_and_appends can be non-contiguous, as long as appends to the shard from
1746    // all writers combined are contiguous.
1747    #[mz_persist_proc::test(tokio::test)]
1748    #[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented
1749    async fn noncontiguous_compare_and_append_per_writer(dyncfgs: ConfigUpdates) {
1750        let data = vec![
1751            (("1".to_owned(), "one".to_owned()), 1, 1),
1752            (("2".to_owned(), "two".to_owned()), 2, 1),
1753            (("3".to_owned(), "three".to_owned()), 3, 1),
1754            (("4".to_owned(), "vier".to_owned()), 4, 1),
1755            (("5".to_owned(), "cinque".to_owned()), 5, 1),
1756        ];
1757
1758        let id = ShardId::new();
1759        let client = new_test_client(&dyncfgs).await;
1760
1761        let (mut write1, mut read) = client.expect_open::<String, String, u64, i64>(id).await;
1762
1763        let (mut write2, _read) = client.expect_open::<String, String, u64, i64>(id).await;
1764
1765        // Write a [0,3) batch with writer 1.
1766        write1.expect_compare_and_append(&data[..2], 0, 3).await;
1767        assert_eq!(write1.upper(), &Antichain::from_elem(3));
1768
1769        // Write a [3,5) batch with writer 2.
1770        write2.expect_compare_and_append(&data[2..4], 3, 5).await;
1771        assert_eq!(write2.upper(), &Antichain::from_elem(5));
1772
1773        // Write a [5,6) batch with writer 1.
1774        write1.expect_compare_and_append(&data[4..5], 5, 6).await;
1775        assert_eq!(write1.upper(), &Antichain::from_elem(6));
1776
1777        assert_eq!(read.expect_snapshot_and_fetch(5).await, all_ok(&data, 5));
1778    }
1779
1780    #[mz_ore::test]
1781    fn fmt_ids() {
1782        assert_eq!(
1783            format!("{}", LeasedReaderId([0u8; 16])),
1784            "r00000000-0000-0000-0000-000000000000"
1785        );
1786        assert_eq!(
1787            format!("{:?}", LeasedReaderId([0u8; 16])),
1788            "LeasedReaderId(00000000-0000-0000-0000-000000000000)"
1789        );
1790    }
1791
1792    #[mz_persist_proc::test(tokio::test(flavor = "multi_thread"))]
1793    #[cfg_attr(miri, ignore)] // error: unsupported operation: integer-to-pointer casts and `ptr::from_exposed_addr` are not supported with `-Zmiri-strict-provenance`
1794    async fn concurrency(dyncfgs: ConfigUpdates) {
1795        let data = DataGenerator::small();
1796
1797        const NUM_WRITERS: usize = 2;
1798        let id = ShardId::new();
1799        let client = new_test_client(&dyncfgs).await;
1800        let mut handles = Vec::<mz_ore::task::JoinHandle<()>>::new();
1801        for idx in 0..NUM_WRITERS {
1802            let (data, client) = (data.clone(), client.clone());
1803
1804            let (batch_tx, mut batch_rx) = tokio::sync::mpsc::channel(1);
1805
1806            let client1 = client.clone();
1807            let handle = mz_ore::task::spawn(|| format!("writer-{}", idx), async move {
1808                let (write, _) = client1.expect_open::<Vec<u8>, Vec<u8>, u64, i64>(id).await;
1809                let mut current_upper = 0;
1810                for batch in data.batches() {
1811                    let new_upper = match batch.get(batch.len() - 1) {
1812                        Some((_, max_ts, _)) => u64::decode(max_ts) + 1,
1813                        None => continue,
1814                    };
1815                    // Because we (intentionally) call open inside the task,
1816                    // some other writer may have raced ahead and already
1817                    // appended some data before this one was registered. As a
1818                    // result, this writer may not be starting with an upper of
1819                    // the initial empty antichain. This is nice because it
1820                    // mimics how a real HA source would work, but it means we
1821                    // have to skip any batches that have already been committed
1822                    // (otherwise our new_upper would be before our upper).
1823                    //
1824                    // Note however, that unlike a real source, our
1825                    // DataGenerator-derived batches are guaranteed to be
1826                    // chunked along the same boundaries. This means we don't
1827                    // have to consider partial batches when generating the
1828                    // updates below.
1829                    if PartialOrder::less_equal(&Antichain::from_elem(new_upper), write.upper()) {
1830                        continue;
1831                    }
1832
1833                    let current_upper_chain = Antichain::from_elem(current_upper);
1834                    current_upper = new_upper;
1835                    let new_upper_chain = Antichain::from_elem(new_upper);
1836                    let mut builder = write.builder(current_upper_chain);
1837
1838                    for ((k, v), t, d) in batch.iter() {
1839                        builder
1840                            .add(&k.to_vec(), &v.to_vec(), &u64::decode(t), &i64::decode(d))
1841                            .await
1842                            .expect("invalid usage");
1843                    }
1844
1845                    let batch = builder
1846                        .finish(new_upper_chain)
1847                        .await
1848                        .expect("invalid usage");
1849
1850                    match batch_tx.send(batch).await {
1851                        Ok(_) => (),
1852                        Err(e) => panic!("send error: {}", e),
1853                    }
1854                }
1855            });
1856            handles.push(handle);
1857
1858            let handle = mz_ore::task::spawn(|| format!("appender-{}", idx), async move {
1859                let (mut write, _) = client.expect_open::<Vec<u8>, Vec<u8>, u64, i64>(id).await;
1860
1861                while let Some(batch) = batch_rx.recv().await {
1862                    let lower = batch.lower().clone();
1863                    let upper = batch.upper().clone();
1864                    write
1865                        .append_batch(batch, lower, upper)
1866                        .await
1867                        .expect("invalid usage")
1868                        .expect("unexpected upper");
1869                }
1870            });
1871            handles.push(handle);
1872        }
1873
1874        for handle in handles {
1875            let () = handle.await.expect("task failed");
1876        }
1877
1878        let expected = data.records().collect::<Vec<_>>();
1879        let max_ts = expected.last().map(|(_, t, _)| *t).unwrap_or_default();
1880        let (_, mut read) = client.expect_open::<Vec<u8>, Vec<u8>, u64, i64>(id).await;
1881        assert_eq!(
1882            read.expect_snapshot_and_fetch(max_ts).await,
1883            all_ok(expected.iter(), max_ts)
1884        );
1885    }
1886
1887    // Regression test for database-issues#3523. Snapshot with as_of >= upper would
1888    // immediately return the data currently available instead of waiting for
1889    // upper to advance past as_of.
1890    #[mz_persist_proc::test(tokio::test)]
1891    #[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented
1892    async fn regression_blocking_reads(dyncfgs: ConfigUpdates) {
1893        let waker = noop_waker();
1894        let mut cx = Context::from_waker(&waker);
1895
1896        let data = [
1897            (("1".to_owned(), "one".to_owned()), 1, 1),
1898            (("2".to_owned(), "two".to_owned()), 2, 1),
1899            (("3".to_owned(), "three".to_owned()), 3, 1),
1900        ];
1901
1902        let id = ShardId::new();
1903        let client = new_test_client(&dyncfgs).await;
1904        let (mut write, mut read) = client.expect_open::<String, String, u64, i64>(id).await;
1905
1906        // Grab a listener as_of (aka gt) 1, which is not yet closed out.
1907        let mut listen = read.clone("").await.expect_listen(1).await;
1908        let mut listen_next = Box::pin(listen.fetch_next());
1909        // Intentionally don't await the listen_next, but instead manually poke
1910        // it for a while and assert that it doesn't resolve yet. See below for
1911        // discussion of some alternative ways of writing this unit test.
1912        for _ in 0..100 {
1913            assert!(
1914                Pin::new(&mut listen_next).poll(&mut cx).is_pending(),
1915                "listen::next unexpectedly ready"
1916            );
1917        }
1918
1919        // Write a [0,3) batch.
1920        write
1921            .expect_compare_and_append(&data[..2], u64::minimum(), 3)
1922            .await;
1923
1924        // The initial listen_next call should now be able to return data at 2.
1925        // It doesn't get 1 because the as_of was 1 and listen is strictly gt.
1926        assert_eq!(
1927            listen_next.await,
1928            vec![
1929                ListenEvent::Updates(vec![((Ok("2".to_owned()), Ok("two".to_owned())), 2, 1)]),
1930                ListenEvent::Progress(Antichain::from_elem(3)),
1931            ]
1932        );
1933
1934        // Grab a snapshot as_of 3, which is not yet closed out. Intentionally
1935        // don't await the snap, but instead manually poke it for a while and
1936        // assert that it doesn't resolve yet.
1937        //
1938        // An alternative to this would be to run it in a task and poll the task
1939        // with some timeout, but this would introduce a fixed test execution
1940        // latency of the timeout in the happy case. Plus, it would be
1941        // non-deterministic.
1942        //
1943        // Another alternative (that's potentially quite interesting!) would be
1944        // to separate creating a snapshot immediately (which would fail if
1945        // as_of was >= upper) from a bit of logic that retries until that case
1946        // is ready.
1947        let mut snap = Box::pin(read.expect_snapshot_and_fetch(3));
1948        for _ in 0..100 {
1949            assert!(
1950                Pin::new(&mut snap).poll(&mut cx).is_pending(),
1951                "snapshot unexpectedly ready"
1952            );
1953        }
1954
1955        // Now add the data at 3 and also unblock the snapshot.
1956        write.expect_compare_and_append(&data[2..], 3, 4).await;
1957
1958        // Read the snapshot and check that it got all the appropriate data.
1959        assert_eq!(snap.await, all_ok(&data[..], 3));
1960    }
1961
1962    #[mz_persist_proc::test(tokio::test)]
1963    #[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented
1964    async fn heartbeat_task_shutdown(dyncfgs: ConfigUpdates) {
1965        // Verify that the ReadHandle and WriteHandle background heartbeat tasks
1966        // shut down cleanly after the handle is expired.
1967        let mut cache = new_test_client_cache(&dyncfgs);
1968        cache
1969            .cfg
1970            .set_config(&READER_LEASE_DURATION, Duration::from_millis(1));
1971        cache.cfg.writer_lease_duration = Duration::from_millis(1);
1972        let (_write, mut read) = cache
1973            .open(PersistLocation::new_in_mem())
1974            .await
1975            .expect("client construction failed")
1976            .expect_open::<(), (), u64, i64>(ShardId::new())
1977            .await;
1978        let mut read_unexpired_state = read
1979            .unexpired_state
1980            .take()
1981            .expect("handle should have unexpired state");
1982        read.expire().await;
1983        for read_heartbeat_task in mem::take(&mut read_unexpired_state._heartbeat_tasks) {
1984            let () = read_heartbeat_task
1985                .await
1986                .expect("task should shutdown cleanly");
1987        }
1988    }
1989
1990    /// Verify that shard finalization works with empty shards, shards that have
1991    /// an empty write up to the empty upper Antichain.
1992    #[mz_persist_proc::test(tokio::test)]
1993    #[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented
1994    async fn finalize_empty_shard(dyncfgs: ConfigUpdates) {
1995        const EMPTY: &[(((), ()), u64, i64)] = &[];
1996        let persist_client = new_test_client(&dyncfgs).await;
1997
1998        let shard_id = ShardId::new();
1999        pub const CRITICAL_SINCE: CriticalReaderId =
2000            CriticalReaderId([0, 0, 0, 0, 17, 17, 34, 34, 51, 51, 68, 68, 68, 68, 68, 68]);
2001
2002        let (mut write, mut read) = persist_client
2003            .expect_open::<(), (), u64, i64>(shard_id)
2004            .await;
2005
2006        // Advance since and upper to empty, which is a pre-requisite for
2007        // finalization/tombstoning.
2008        let () = read.downgrade_since(&Antichain::new()).await;
2009        let () = write
2010            .compare_and_append(EMPTY, Antichain::from_elem(0), Antichain::new())
2011            .await
2012            .expect("usage should be valid")
2013            .expect("upper should match");
2014
2015        let mut since_handle: SinceHandle<(), (), u64, i64, u64> = persist_client
2016            .open_critical_since(shard_id, CRITICAL_SINCE, Diagnostics::for_tests())
2017            .await
2018            .expect("invalid persist usage");
2019
2020        let epoch = since_handle.opaque().clone();
2021        let new_since = Antichain::new();
2022        let downgrade = since_handle
2023            .compare_and_downgrade_since(&epoch, (&epoch, &new_since))
2024            .await;
2025
2026        assert!(
2027            downgrade.is_ok(),
2028            "downgrade of critical handle must succeed"
2029        );
2030
2031        let finalize = persist_client
2032            .finalize_shard::<(), (), u64, i64>(shard_id, Diagnostics::for_tests())
2033            .await;
2034
2035        assert_ok!(finalize, "finalization must succeed");
2036
2037        let is_finalized = persist_client
2038            .is_finalized::<(), (), u64, i64>(shard_id, Diagnostics::for_tests())
2039            .await
2040            .expect("invalid persist usage");
2041        assert!(is_finalized, "shard must still be finalized");
2042    }
2043
2044    /// Verify that shard finalization works with shards that had some data
2045    /// written to them, plus then an empty batch to bring their upper to the
2046    /// empty Antichain.
2047    #[mz_persist_proc::test(tokio::test)]
2048    #[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented
2049    async fn finalize_shard(dyncfgs: ConfigUpdates) {
2050        const EMPTY: &[(((), ()), u64, i64)] = &[];
2051        const DATA: &[(((), ()), u64, i64)] = &[(((), ()), 0, 1)];
2052        let persist_client = new_test_client(&dyncfgs).await;
2053
2054        let shard_id = ShardId::new();
2055        pub const CRITICAL_SINCE: CriticalReaderId =
2056            CriticalReaderId([0, 0, 0, 0, 17, 17, 34, 34, 51, 51, 68, 68, 68, 68, 68, 68]);
2057
2058        let (mut write, mut read) = persist_client
2059            .expect_open::<(), (), u64, i64>(shard_id)
2060            .await;
2061
2062        // Write some data.
2063        let () = write
2064            .compare_and_append(DATA, Antichain::from_elem(0), Antichain::from_elem(1))
2065            .await
2066            .expect("usage should be valid")
2067            .expect("upper should match");
2068
2069        // Advance since and upper to empty, which is a pre-requisite for
2070        // finalization/tombstoning.
2071        let () = read.downgrade_since(&Antichain::new()).await;
2072        let () = write
2073            .compare_and_append(EMPTY, Antichain::from_elem(1), Antichain::new())
2074            .await
2075            .expect("usage should be valid")
2076            .expect("upper should match");
2077
2078        let mut since_handle: SinceHandle<(), (), u64, i64, u64> = persist_client
2079            .open_critical_since(shard_id, CRITICAL_SINCE, Diagnostics::for_tests())
2080            .await
2081            .expect("invalid persist usage");
2082
2083        let epoch = since_handle.opaque().clone();
2084        let new_since = Antichain::new();
2085        let downgrade = since_handle
2086            .compare_and_downgrade_since(&epoch, (&epoch, &new_since))
2087            .await;
2088
2089        assert!(
2090            downgrade.is_ok(),
2091            "downgrade of critical handle must succeed"
2092        );
2093
2094        let finalize = persist_client
2095            .finalize_shard::<(), (), u64, i64>(shard_id, Diagnostics::for_tests())
2096            .await;
2097
2098        assert_ok!(finalize, "finalization must succeed");
2099
2100        let is_finalized = persist_client
2101            .is_finalized::<(), (), u64, i64>(shard_id, Diagnostics::for_tests())
2102            .await
2103            .expect("invalid persist usage");
2104        assert!(is_finalized, "shard must still be finalized");
2105    }
2106
2107    proptest! {
2108        #![proptest_config(ProptestConfig::with_cases(4096))]
2109
2110        #[mz_ore::test]
2111        #[cfg_attr(miri, ignore)] // too slow
2112        fn shard_id_protobuf_roundtrip(expect in any::<ShardId>() ) {
2113            let actual = protobuf_roundtrip::<_, String>(&expect);
2114            assert_ok!(actual);
2115            assert_eq!(actual.unwrap(), expect);
2116        }
2117    }
2118}