mz_persist_client/
lib.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! An abstraction presenting as a durable time-varying collection (aka shard)
11
12#![warn(missing_docs, missing_debug_implementations)]
13// #[track_caller] is currently a no-op on async functions, but that hopefully won't be the case
14// forever. So we already annotate those functions now and ignore the compiler warning until
15// https://github.com/rust-lang/rust/issues/87417 pans out.
16#![allow(ungated_async_fn_track_caller)]
17
18use std::fmt::Debug;
19use std::marker::PhantomData;
20use std::sync::Arc;
21
22use differential_dataflow::difference::Monoid;
23use differential_dataflow::lattice::Lattice;
24use itertools::Itertools;
25use mz_build_info::{BuildInfo, build_info};
26use mz_dyncfg::ConfigSet;
27use mz_ore::instrument;
28use mz_persist::location::{Blob, Consensus, ExternalError};
29use mz_persist_types::schema::SchemaId;
30use mz_persist_types::{Codec, Codec64, Opaque};
31use mz_proto::{IntoRustIfSome, ProtoType};
32use semver::Version;
33use timely::order::TotalOrder;
34use timely::progress::{Antichain, Timestamp};
35
36use crate::async_runtime::IsolatedRuntime;
37use crate::batch::{BATCH_DELETE_ENABLED, Batch, BatchBuilder, ProtoBatch};
38use crate::cache::{PersistClientCache, StateCache};
39use crate::cfg::PersistConfig;
40use crate::critical::{CriticalReaderId, SinceHandle};
41use crate::error::InvalidUsage;
42use crate::fetch::{BatchFetcher, BatchFetcherConfig};
43use crate::internal::compact::{CompactConfig, Compactor};
44use crate::internal::encoding::parse_id;
45use crate::internal::gc::GarbageCollector;
46use crate::internal::machine::{Machine, retry_external};
47use crate::internal::state_versions::StateVersions;
48use crate::metrics::Metrics;
49use crate::read::{
50    Cursor, LazyPartStats, LeasedReaderId, READER_LEASE_DURATION, ReadHandle, Since,
51};
52use crate::rpc::PubSubSender;
53use crate::schema::CaESchema;
54use crate::write::{WriteHandle, WriterId};
55
56pub mod async_runtime;
57pub mod batch;
58pub mod cache;
59pub mod cfg;
60pub mod cli {
61    //! Persist command-line utilities
62    pub mod admin;
63    pub mod args;
64    pub mod bench;
65    pub mod inspect;
66}
67pub mod critical;
68pub mod error;
69pub mod fetch;
70pub mod internals_bench;
71pub mod iter;
72pub mod metrics {
73    //! Utilities related to metrics.
74    pub use crate::internal::metrics::{
75        Metrics, SinkMetrics, SinkWorkerMetrics, UpdateDelta, encode_ts_metric,
76    };
77}
78pub mod operators {
79    //! [timely] operators for reading and writing persist Shards.
80
81    use mz_dyncfg::Config;
82
83    pub mod shard_source;
84
85    // TODO(cfg): Move this next to the use.
86    pub(crate) const STORAGE_SOURCE_DECODE_FUEL: Config<usize> = Config::new(
87        "storage_source_decode_fuel",
88        100_000,
89        "\
90        The maximum amount of work to do in the persist_source mfp_and_decode \
91        operator before yielding.",
92    );
93}
94pub mod read;
95pub mod rpc;
96pub mod schema;
97pub mod stats;
98pub mod usage;
99pub mod write;
100
101/// An implementation of the public crate interface.
102mod internal {
103    pub mod apply;
104    pub mod cache;
105    pub mod compact;
106    pub mod encoding;
107    pub mod gc;
108    pub mod machine;
109    pub mod maintenance;
110    pub mod merge;
111    pub mod metrics;
112    pub mod paths;
113    pub mod restore;
114    pub mod service;
115    pub mod state;
116    pub mod state_diff;
117    pub mod state_versions;
118    pub mod trace;
119    pub mod watch;
120
121    #[cfg(test)]
122    pub mod datadriven;
123}
124
125/// Persist build information.
126pub const BUILD_INFO: BuildInfo = build_info!();
127
128// Re-export for convenience.
129pub use mz_persist_types::{PersistLocation, ShardId};
130
131pub use crate::internal::encoding::Schemas;
132
133/// Additional diagnostic information used within Persist
134/// e.g. for logging, metric labels, etc.
135#[derive(Clone, Debug)]
136pub struct Diagnostics {
137    /// A user-friendly name for the shard.
138    pub shard_name: String,
139    /// A purpose for the handle.
140    pub handle_purpose: String,
141}
142
143impl Diagnostics {
144    /// Create a new `Diagnostics` from `handle_purpose`.
145    pub fn from_purpose(handle_purpose: &str) -> Self {
146        Self {
147            shard_name: "unknown".to_string(),
148            handle_purpose: handle_purpose.to_string(),
149        }
150    }
151
152    /// Create a new `Diagnostics` for testing.
153    pub fn for_tests() -> Self {
154        Self {
155            shard_name: "test-shard-name".to_string(),
156            handle_purpose: "test-purpose".to_string(),
157        }
158    }
159}
160
161/// A handle for interacting with the set of persist shard made durable at a
162/// single [PersistLocation].
163///
164/// All async methods on PersistClient retry for as long as they are able, but
165/// the returned [std::future::Future]s implement "cancel on drop" semantics.
166/// This means that callers can add a timeout using [tokio::time::timeout] or
167/// [tokio::time::timeout_at].
168///
169/// ```rust,no_run
170/// # use std::sync::Arc;
171/// # use mz_persist_types::codec_impls::StringSchema;
172/// # let client: mz_persist_client::PersistClient = unimplemented!();
173/// # let timeout: std::time::Duration = unimplemented!();
174/// # let id = mz_persist_client::ShardId::new();
175/// # let diagnostics = mz_persist_client::Diagnostics { shard_name: "".into(), handle_purpose: "".into() };
176/// # async {
177/// tokio::time::timeout(timeout, client.open::<String, String, u64, i64>(id,
178///     Arc::new(StringSchema),Arc::new(StringSchema),diagnostics, true)).await
179/// # };
180/// ```
181#[derive(Debug, Clone)]
182pub struct PersistClient {
183    cfg: PersistConfig,
184    blob: Arc<dyn Blob>,
185    consensus: Arc<dyn Consensus>,
186    metrics: Arc<Metrics>,
187    isolated_runtime: Arc<IsolatedRuntime>,
188    shared_states: Arc<StateCache>,
189    pubsub_sender: Arc<dyn PubSubSender>,
190}
191
192impl PersistClient {
193    /// Returns a new client for interfacing with persist shards made durable to
194    /// the given [Blob] and [Consensus].
195    ///
196    /// This is exposed mostly for testing. Persist users likely want
197    /// [crate::cache::PersistClientCache::open].
198    pub fn new(
199        cfg: PersistConfig,
200        blob: Arc<dyn Blob>,
201        consensus: Arc<dyn Consensus>,
202        metrics: Arc<Metrics>,
203        isolated_runtime: Arc<IsolatedRuntime>,
204        shared_states: Arc<StateCache>,
205        pubsub_sender: Arc<dyn PubSubSender>,
206    ) -> Result<Self, ExternalError> {
207        // TODO: Verify somehow that blob matches consensus to prevent
208        // accidental misuse.
209        Ok(PersistClient {
210            cfg,
211            blob,
212            consensus,
213            metrics,
214            isolated_runtime,
215            shared_states,
216            pubsub_sender,
217        })
218    }
219
220    /// Returns a new in-mem [PersistClient] for tests and examples.
221    pub async fn new_for_tests() -> Self {
222        let cache = PersistClientCache::new_no_metrics();
223        cache
224            .open(PersistLocation::new_in_mem())
225            .await
226            .expect("in-mem location is valid")
227    }
228
229    /// Returns persist's [ConfigSet].
230    pub fn dyncfgs(&self) -> &ConfigSet {
231        &self.cfg.configs
232    }
233
234    async fn make_machine<K, V, T, D>(
235        &self,
236        shard_id: ShardId,
237        diagnostics: Diagnostics,
238    ) -> Result<Machine<K, V, T, D>, InvalidUsage<T>>
239    where
240        K: Debug + Codec,
241        V: Debug + Codec,
242        T: Timestamp + Lattice + Codec64 + Sync,
243        D: Monoid + Codec64 + Send + Sync,
244    {
245        let state_versions = StateVersions::new(
246            self.cfg.clone(),
247            Arc::clone(&self.consensus),
248            Arc::clone(&self.blob),
249            Arc::clone(&self.metrics),
250        );
251        let machine = Machine::<K, V, T, D>::new(
252            self.cfg.clone(),
253            shard_id,
254            Arc::clone(&self.metrics),
255            Arc::new(state_versions),
256            Arc::clone(&self.shared_states),
257            Arc::clone(&self.pubsub_sender),
258            Arc::clone(&self.isolated_runtime),
259            diagnostics.clone(),
260        )
261        .await?;
262        Ok(machine)
263    }
264
265    /// Provides capabilities for the durable TVC identified by `shard_id` at
266    /// its current since and upper frontiers.
267    ///
268    /// This method is a best-effort attempt to regain control of the frontiers
269    /// of a shard. Its most common uses are to recover capabilities that have
270    /// expired (leases) or to attempt to read a TVC that one did not create (or
271    /// otherwise receive capabilities for). If the frontiers have been fully
272    /// released by all other parties, this call may result in capabilities with
273    /// empty frontiers (which are useless).
274    ///
275    /// If `shard_id` has never been used before, initializes a new shard and
276    /// returns handles with `since` and `upper` frontiers set to initial values
277    /// of `Antichain::from_elem(T::minimum())`.
278    ///
279    /// The `schema` parameter is currently unused, but should be an object
280    /// that represents the schema of the data in the shard. This will be required
281    /// in the future.
282    #[instrument(level = "debug", fields(shard = %shard_id))]
283    pub async fn open<K, V, T, D>(
284        &self,
285        shard_id: ShardId,
286        key_schema: Arc<K::Schema>,
287        val_schema: Arc<V::Schema>,
288        diagnostics: Diagnostics,
289        use_critical_since: bool,
290    ) -> Result<(WriteHandle<K, V, T, D>, ReadHandle<K, V, T, D>), InvalidUsage<T>>
291    where
292        K: Debug + Codec,
293        V: Debug + Codec,
294        T: Timestamp + TotalOrder + Lattice + Codec64 + Sync,
295        D: Monoid + Ord + Codec64 + Send + Sync,
296    {
297        Ok((
298            self.open_writer(
299                shard_id,
300                Arc::clone(&key_schema),
301                Arc::clone(&val_schema),
302                diagnostics.clone(),
303            )
304            .await?,
305            self.open_leased_reader(
306                shard_id,
307                key_schema,
308                val_schema,
309                diagnostics,
310                use_critical_since,
311            )
312            .await?,
313        ))
314    }
315
316    /// [Self::open], but returning only a [ReadHandle].
317    ///
318    /// Use this to save latency and a bit of persist traffic if you're just
319    /// going to immediately drop or expire the [WriteHandle].
320    ///
321    /// The `_schema` parameter is currently unused, but should be an object
322    /// that represents the schema of the data in the shard. This will be required
323    /// in the future.
324    #[instrument(level = "debug", fields(shard = %shard_id))]
325    pub async fn open_leased_reader<K, V, T, D>(
326        &self,
327        shard_id: ShardId,
328        key_schema: Arc<K::Schema>,
329        val_schema: Arc<V::Schema>,
330        diagnostics: Diagnostics,
331        use_critical_since: bool,
332    ) -> Result<ReadHandle<K, V, T, D>, InvalidUsage<T>>
333    where
334        K: Debug + Codec,
335        V: Debug + Codec,
336        T: Timestamp + TotalOrder + Lattice + Codec64 + Sync,
337        D: Monoid + Codec64 + Send + Sync,
338    {
339        let machine = self.make_machine(shard_id, diagnostics.clone()).await?;
340        let gc = GarbageCollector::new(machine.clone(), Arc::clone(&self.isolated_runtime));
341
342        let reader_id = LeasedReaderId::new();
343        let heartbeat_ts = (self.cfg.now)();
344        let (reader_state, maintenance) = machine
345            .register_leased_reader(
346                &reader_id,
347                &diagnostics.handle_purpose,
348                READER_LEASE_DURATION.get(&self.cfg),
349                heartbeat_ts,
350                use_critical_since,
351            )
352            .await;
353        maintenance.start_performing(&machine, &gc);
354        let schemas = Schemas {
355            id: None,
356            key: key_schema,
357            val: val_schema,
358        };
359        let reader = ReadHandle::new(
360            self.cfg.clone(),
361            Arc::clone(&self.metrics),
362            machine,
363            gc,
364            Arc::clone(&self.blob),
365            reader_id,
366            schemas,
367            reader_state.since,
368            heartbeat_ts,
369        )
370        .await;
371
372        Ok(reader)
373    }
374
375    /// Creates and returns a [BatchFetcher] for the given shard id.
376    #[instrument(level = "debug", fields(shard = %shard_id))]
377    pub async fn create_batch_fetcher<K, V, T, D>(
378        &self,
379        shard_id: ShardId,
380        key_schema: Arc<K::Schema>,
381        val_schema: Arc<V::Schema>,
382        is_transient: bool,
383        diagnostics: Diagnostics,
384    ) -> Result<BatchFetcher<K, V, T, D>, InvalidUsage<T>>
385    where
386        K: Debug + Codec,
387        V: Debug + Codec,
388        T: Timestamp + Lattice + Codec64 + Sync,
389        D: Monoid + Codec64 + Send + Sync,
390    {
391        let machine = self.make_machine(shard_id, diagnostics.clone()).await?;
392        let read_schemas = Schemas {
393            id: None,
394            key: key_schema,
395            val: val_schema,
396        };
397        let schema_cache = machine.applier.schema_cache();
398        let fetcher = BatchFetcher {
399            cfg: BatchFetcherConfig::new(&self.cfg),
400            blob: Arc::clone(&self.blob),
401            metrics: Arc::clone(&self.metrics),
402            shard_metrics: Arc::clone(&machine.applier.shard_metrics),
403            shard_id,
404            read_schemas,
405            schema_cache,
406            is_transient,
407            _phantom: PhantomData,
408        };
409
410        Ok(fetcher)
411    }
412
413    /// A convenience [CriticalReaderId] for Materialize controllers.
414    ///
415    /// For most (soon to be all?) shards in Materialize, a centralized
416    /// "controller" is the authority for when a user no longer needs to read at
417    /// a given frontier. (Other uses are temporary holds where correctness of
418    /// the overall system can be maintained through a lease timeout.) To make
419    /// [SinceHandle] easier to work with, we offer this convenience id for
420    /// Materialize controllers, so they don't have to durably record it.
421    ///
422    /// TODO: We're still shaking out whether the controller should be the only
423    /// critical since hold or if there are other places we want them. If the
424    /// former, we should remove [CriticalReaderId] and bake in the singular
425    /// nature of the controller critical handle.
426    ///
427    /// ```rust
428    /// // This prints as something that is not 0 but is visually recognizable.
429    /// assert_eq!(
430    ///     mz_persist_client::PersistClient::CONTROLLER_CRITICAL_SINCE.to_string(),
431    ///     "c00000000-1111-2222-3333-444444444444",
432    /// )
433    /// ```
434    pub const CONTROLLER_CRITICAL_SINCE: CriticalReaderId =
435        CriticalReaderId([0, 0, 0, 0, 17, 17, 34, 34, 51, 51, 68, 68, 68, 68, 68, 68]);
436
437    /// Provides a capability for the durable TVC identified by `shard_id` at
438    /// its current since frontier.
439    ///
440    /// In contrast to the time-leased [ReadHandle] returned by [Self::open] and
441    /// [Self::open_leased_reader], this handle and its associated capability
442    /// are not leased. A [SinceHandle] does not release its since capability;
443    /// downgrade to the empty antichain to hold back the since.
444    /// Also unlike `ReadHandle`, the handle is not expired on drop.
445    /// This is less ergonomic, but useful for "critical" since
446    /// holds which must survive even lease timeouts.
447    ///
448    /// **IMPORTANT**: The above means that if a SinceHandle is registered and
449    /// then lost, the shard's since will be permanently "stuck", forever
450    /// preventing logical compaction. Users are advised to durably record
451    /// (preferably in code) the intended [CriticalReaderId] _before_ registering
452    /// a SinceHandle (in case the process crashes at the wrong time).
453    ///
454    /// If `shard_id` has never been used before, initializes a new shard and
455    /// return a handle with its `since` frontier set to the initial value of
456    /// `Antichain::from_elem(T::minimum())`.
457    #[instrument(level = "debug", fields(shard = %shard_id))]
458    pub async fn open_critical_since<K, V, T, D, O>(
459        &self,
460        shard_id: ShardId,
461        reader_id: CriticalReaderId,
462        diagnostics: Diagnostics,
463    ) -> Result<SinceHandle<K, V, T, D, O>, InvalidUsage<T>>
464    where
465        K: Debug + Codec,
466        V: Debug + Codec,
467        T: Timestamp + Lattice + Codec64 + Sync,
468        D: Monoid + Codec64 + Send + Sync,
469        O: Opaque + Codec64,
470    {
471        let machine = self.make_machine(shard_id, diagnostics.clone()).await?;
472        let gc = GarbageCollector::new(machine.clone(), Arc::clone(&self.isolated_runtime));
473
474        let (state, maintenance) = machine
475            .register_critical_reader::<O>(&reader_id, &diagnostics.handle_purpose)
476            .await;
477        maintenance.start_performing(&machine, &gc);
478        let handle = SinceHandle::new(
479            machine,
480            gc,
481            reader_id,
482            state.since,
483            Codec64::decode(state.opaque.0),
484        );
485
486        Ok(handle)
487    }
488
489    /// [Self::open], but returning only a [WriteHandle].
490    ///
491    /// Use this to save latency and a bit of persist traffic if you're just
492    /// going to immediately drop or expire the [ReadHandle].
493    #[instrument(level = "debug", fields(shard = %shard_id))]
494    pub async fn open_writer<K, V, T, D>(
495        &self,
496        shard_id: ShardId,
497        key_schema: Arc<K::Schema>,
498        val_schema: Arc<V::Schema>,
499        diagnostics: Diagnostics,
500    ) -> Result<WriteHandle<K, V, T, D>, InvalidUsage<T>>
501    where
502        K: Debug + Codec,
503        V: Debug + Codec,
504        T: Timestamp + TotalOrder + Lattice + Codec64 + Sync,
505        D: Monoid + Ord + Codec64 + Send + Sync,
506    {
507        let machine = self.make_machine(shard_id, diagnostics.clone()).await?;
508        let gc = GarbageCollector::new(machine.clone(), Arc::clone(&self.isolated_runtime));
509
510        // We defer registering the schema until write time, to allow opening
511        // write handles in a "read-only" mode where they don't implicitly
512        // modify persist state. But it might already be registered, in which
513        // case we can fetch its ID.
514        let schema_id = machine.find_schema(&*key_schema, &*val_schema);
515
516        let writer_id = WriterId::new();
517        let schemas = Schemas {
518            id: schema_id,
519            key: key_schema,
520            val: val_schema,
521        };
522        let writer = WriteHandle::new(
523            self.cfg.clone(),
524            Arc::clone(&self.metrics),
525            machine,
526            gc,
527            Arc::clone(&self.blob),
528            writer_id,
529            &diagnostics.handle_purpose,
530            schemas,
531        );
532        Ok(writer)
533    }
534
535    /// Returns a [BatchBuilder] that can be used to write a batch of updates to
536    /// blob storage which can then be appended to the given shard using
537    /// [WriteHandle::compare_and_append_batch] or [WriteHandle::append_batch],
538    /// or which can be read using [PersistClient::read_batches_consolidated].
539    ///
540    /// The builder uses a bounded amount of memory, even when the number of
541    /// updates is very large. Individual records, however, should be small
542    /// enough that we can reasonably chunk them up: O(KB) is definitely fine,
543    /// O(MB) come talk to us.
544    #[instrument(level = "debug", fields(shard = %shard_id))]
545    pub async fn batch_builder<K, V, T, D>(
546        &self,
547        shard_id: ShardId,
548        write_schemas: Schemas<K, V>,
549        lower: Antichain<T>,
550        max_runs: Option<usize>,
551    ) -> BatchBuilder<K, V, T, D>
552    where
553        K: Debug + Codec,
554        V: Debug + Codec,
555        T: Timestamp + Lattice + Codec64 + TotalOrder + Sync,
556        D: Monoid + Ord + Codec64 + Send + Sync,
557    {
558        let mut compact_cfg = CompactConfig::new(&self.cfg, shard_id);
559        compact_cfg.batch.max_runs = max_runs;
560        WriteHandle::builder_inner(
561            &self.cfg,
562            compact_cfg,
563            Arc::clone(&self.metrics),
564            self.metrics.shards.shard(&shard_id, "peek_stash"),
565            &self.metrics.user,
566            Arc::clone(&self.isolated_runtime),
567            Arc::clone(&self.blob),
568            shard_id,
569            write_schemas,
570            lower,
571        )
572    }
573
574    /// Turns the given [`ProtoBatch`] back into a [`Batch`] which can be used
575    /// to append it to the given shard or to read it via
576    /// [PersistClient::read_batches_consolidated]
577    ///
578    /// CAUTION: This API allows turning a [ProtoBatch] into a [Batch] multiple
579    /// times, but if a batch is deleted the backing data goes away, so at that
580    /// point all in-memory copies of a batch become invalid and cannot be read
581    /// anymore.
582    pub fn batch_from_transmittable_batch<K, V, T, D>(
583        &self,
584        shard_id: &ShardId,
585        batch: ProtoBatch,
586    ) -> Batch<K, V, T, D>
587    where
588        K: Debug + Codec,
589        V: Debug + Codec,
590        T: Timestamp + Lattice + Codec64 + Sync,
591        D: Monoid + Ord + Codec64 + Send + Sync,
592    {
593        let batch_shard_id: ShardId = batch
594            .shard_id
595            .into_rust()
596            .expect("valid transmittable batch");
597        assert_eq!(&batch_shard_id, shard_id);
598
599        let shard_metrics = self.metrics.shards.shard(shard_id, "peek_stash");
600
601        let ret = Batch {
602            batch_delete_enabled: BATCH_DELETE_ENABLED.get(&self.cfg),
603            metrics: Arc::clone(&self.metrics),
604            shard_metrics,
605            version: Version::parse(&batch.version).expect("valid transmittable batch"),
606            schemas: (batch.key_schema, batch.val_schema),
607            batch: batch
608                .batch
609                .into_rust_if_some("ProtoBatch::batch")
610                .expect("valid transmittable batch"),
611            blob: Arc::clone(&self.blob),
612            _phantom: std::marker::PhantomData,
613        };
614
615        assert_eq!(&ret.shard_id(), shard_id);
616        ret
617    }
618
619    /// Returns a [Cursor] for reading the given batches. Yielded updates are
620    /// consolidated if the given batches contain sorted runs, which is true
621    /// when they have been written using a [BatchBuilder].
622    ///
623    /// To keep memory usage down when reading a snapshot that consolidates
624    /// well, this consolidates as it goes. However, note that only the
625    /// serialized data is consolidated: the deserialized data will only be
626    /// consolidated if your K/V codecs are one-to-one.
627    ///
628    /// CAUTION: The caller needs to make sure that the given batches are
629    /// readable and they have to remain readable for the lifetime of the
630    /// returned [Cursor]. The caller is also responsible for the lifecycle of
631    /// the batches: once the cursor and the batches are no longer needed you
632    /// must call [Cursor::into_lease] to get back the batches and delete them.
633    #[allow(clippy::unused_async)]
634    pub async fn read_batches_consolidated<K, V, T, D>(
635        &mut self,
636        shard_id: ShardId,
637        as_of: Antichain<T>,
638        read_schemas: Schemas<K, V>,
639        batches: Vec<Batch<K, V, T, D>>,
640        should_fetch_part: impl for<'a> Fn(Option<&'a LazyPartStats>) -> bool,
641        memory_budget_bytes: usize,
642    ) -> Result<Cursor<K, V, T, D, Vec<Batch<K, V, T, D>>>, Since<T>>
643    where
644        K: Debug + Codec + Ord,
645        V: Debug + Codec + Ord,
646        T: Timestamp + Lattice + Codec64 + TotalOrder + Sync,
647        D: Monoid + Ord + Codec64 + Send + Sync,
648    {
649        let shard_metrics = self.metrics.shards.shard(&shard_id, "peek_stash");
650
651        let hollow_batches = batches.iter().map(|b| b.batch.clone()).collect_vec();
652
653        ReadHandle::read_batches_consolidated(
654            &self.cfg,
655            Arc::clone(&self.metrics),
656            shard_metrics,
657            self.metrics.read.snapshot.clone(),
658            Arc::clone(&self.blob),
659            shard_id,
660            as_of,
661            read_schemas,
662            &hollow_batches,
663            batches,
664            should_fetch_part,
665            memory_budget_bytes,
666        )
667    }
668
669    /// Returns the requested schema, if known at the current state.
670    pub async fn get_schema<K, V, T, D>(
671        &self,
672        shard_id: ShardId,
673        schema_id: SchemaId,
674        diagnostics: Diagnostics,
675    ) -> Result<Option<(K::Schema, V::Schema)>, InvalidUsage<T>>
676    where
677        K: Debug + Codec,
678        V: Debug + Codec,
679        T: Timestamp + Lattice + Codec64 + Sync,
680        D: Monoid + Codec64 + Send + Sync,
681    {
682        let machine = self
683            .make_machine::<K, V, T, D>(shard_id, diagnostics)
684            .await?;
685        Ok(machine.get_schema(schema_id))
686    }
687
688    /// Returns the latest schema registered at the current state.
689    pub async fn latest_schema<K, V, T, D>(
690        &self,
691        shard_id: ShardId,
692        diagnostics: Diagnostics,
693    ) -> Result<Option<(SchemaId, K::Schema, V::Schema)>, InvalidUsage<T>>
694    where
695        K: Debug + Codec,
696        V: Debug + Codec,
697        T: Timestamp + Lattice + Codec64 + Sync,
698        D: Monoid + Codec64 + Send + Sync,
699    {
700        let machine = self
701            .make_machine::<K, V, T, D>(shard_id, diagnostics)
702            .await?;
703        Ok(machine.latest_schema())
704    }
705
706    /// Registers a schema for the given shard.
707    ///
708    /// Returns the new schema ID if the registration succeeds, and `None`
709    /// otherwise. Schema registration succeeds in two cases:
710    ///  a) No schema was currently registered for the shard.
711    ///  b) The given schema is already registered for the shard.
712    ///
713    /// To evolve an existing schema instead, use
714    /// [PersistClient::compare_and_evolve_schema].
715    //
716    // TODO: unify with `compare_and_evolve_schema`
717    pub async fn register_schema<K, V, T, D>(
718        &self,
719        shard_id: ShardId,
720        key_schema: &K::Schema,
721        val_schema: &V::Schema,
722        diagnostics: Diagnostics,
723    ) -> Result<Option<SchemaId>, InvalidUsage<T>>
724    where
725        K: Debug + Codec,
726        V: Debug + Codec,
727        T: Timestamp + Lattice + Codec64 + Sync,
728        D: Monoid + Codec64 + Send + Sync,
729    {
730        let machine = self
731            .make_machine::<K, V, T, D>(shard_id, diagnostics)
732            .await?;
733        let gc = GarbageCollector::new(machine.clone(), Arc::clone(&self.isolated_runtime));
734
735        let (schema_id, maintenance) = machine.register_schema(key_schema, val_schema).await;
736        maintenance.start_performing(&machine, &gc);
737
738        Ok(schema_id)
739    }
740
741    /// Registers a new latest schema for the given shard.
742    ///
743    /// This new schema must be [backward_compatible] with all previous schemas
744    /// for this shard. If it's not, [CaESchema::Incompatible] is returned.
745    ///
746    /// [backward_compatible]: mz_persist_types::schema::backward_compatible
747    ///
748    /// To prevent races, the caller must declare what it believes to be the
749    /// latest schema id. If this doesn't match reality,
750    /// [CaESchema::ExpectedMismatch] is returned.
751    pub async fn compare_and_evolve_schema<K, V, T, D>(
752        &self,
753        shard_id: ShardId,
754        expected: SchemaId,
755        key_schema: &K::Schema,
756        val_schema: &V::Schema,
757        diagnostics: Diagnostics,
758    ) -> Result<CaESchema<K, V>, InvalidUsage<T>>
759    where
760        K: Debug + Codec,
761        V: Debug + Codec,
762        T: Timestamp + Lattice + Codec64 + Sync,
763        D: Monoid + Codec64 + Send + Sync,
764    {
765        let machine = self
766            .make_machine::<K, V, T, D>(shard_id, diagnostics)
767            .await?;
768        let gc = GarbageCollector::new(machine.clone(), Arc::clone(&self.isolated_runtime));
769        let (res, maintenance) = machine
770            .compare_and_evolve_schema(expected, key_schema, val_schema)
771            .await;
772        maintenance.start_performing(&machine, &gc);
773        Ok(res)
774    }
775
776    /// Check if the given shard is in a finalized state; ie. it can no longer be
777    /// read, any data that was written to it is no longer accessible, and we've
778    /// discarded references to that data from state.
779    pub async fn is_finalized<K, V, T, D>(
780        &self,
781        shard_id: ShardId,
782        diagnostics: Diagnostics,
783    ) -> Result<bool, InvalidUsage<T>>
784    where
785        K: Debug + Codec,
786        V: Debug + Codec,
787        T: Timestamp + Lattice + Codec64 + Sync,
788        D: Monoid + Codec64 + Send + Sync,
789    {
790        let machine = self
791            .make_machine::<K, V, T, D>(shard_id, diagnostics)
792            .await?;
793        Ok(machine.is_finalized())
794    }
795
796    /// If a shard is guaranteed to never be used again, finalize it to delete
797    /// the associated data and release any associated resources. (Except for a
798    /// little state in consensus we use to represent the tombstone.)
799    ///
800    /// The caller should ensure that both the `since` and `upper` of the shard
801    /// have been advanced to `[]`: ie. the shard is no longer writable or readable.
802    /// Otherwise an error is returned.
803    ///
804    /// Once `finalize_shard` has been called, the result of future operations on
805    /// the shard are not defined. They may return errors or succeed as a noop.
806    #[instrument(level = "debug", fields(shard = %shard_id))]
807    pub async fn finalize_shard<K, V, T, D>(
808        &self,
809        shard_id: ShardId,
810        diagnostics: Diagnostics,
811    ) -> Result<(), InvalidUsage<T>>
812    where
813        K: Debug + Codec,
814        V: Debug + Codec,
815        T: Timestamp + Lattice + Codec64 + Sync,
816        D: Monoid + Codec64 + Send + Sync,
817    {
818        let machine = self
819            .make_machine::<K, V, T, D>(shard_id, diagnostics)
820            .await?;
821
822        let maintenance = machine.become_tombstone().await?;
823        let gc = GarbageCollector::new(machine.clone(), Arc::clone(&self.isolated_runtime));
824
825        let () = maintenance.perform(&machine, &gc).await;
826
827        Ok(())
828    }
829
830    /// Upgrade the state to the latest version. This should only be called once we will no longer
831    /// need to interoperate with older versions, like after a successful upgrade.
832    pub async fn upgrade_version<K, V, T, D>(
833        &self,
834        shard_id: ShardId,
835        diagnostics: Diagnostics,
836    ) -> Result<(), InvalidUsage<T>>
837    where
838        K: Debug + Codec,
839        V: Debug + Codec,
840        T: Timestamp + Lattice + Codec64 + Sync,
841        D: Monoid + Codec64 + Send + Sync,
842    {
843        let machine = self
844            .make_machine::<K, V, T, D>(shard_id, diagnostics)
845            .await?;
846
847        match machine.upgrade_version().await {
848            Ok(maintenance) => {
849                let gc = GarbageCollector::new(machine.clone(), Arc::clone(&self.isolated_runtime));
850                let () = maintenance.perform(&machine, &gc).await;
851                Ok(())
852            }
853            Err(version) => Err(InvalidUsage::IncompatibleVersion { version }),
854        }
855    }
856
857    /// Returns the internal state of the shard for debugging and QA.
858    ///
859    /// We'll be thoughtful about making unnecessary changes, but the **output
860    /// of this method needs to be gated from users**, so that it's not subject
861    /// to our backward compatibility guarantees.
862    pub async fn inspect_shard<T: Timestamp + Lattice + Codec64>(
863        &self,
864        shard_id: &ShardId,
865    ) -> Result<impl serde::Serialize, anyhow::Error> {
866        let state_versions = StateVersions::new(
867            self.cfg.clone(),
868            Arc::clone(&self.consensus),
869            Arc::clone(&self.blob),
870            Arc::clone(&self.metrics),
871        );
872        // TODO: Don't fetch all live diffs. Feels like we should pull out a new
873        // method in StateVersions for fetching the latest version of State of a
874        // shard that might or might not exist.
875        let versions = state_versions.fetch_all_live_diffs(shard_id).await;
876        if versions.0.is_empty() {
877            return Err(anyhow::anyhow!("{} does not exist", shard_id));
878        }
879        let state = state_versions
880            .fetch_current_state::<T>(shard_id, versions.0)
881            .await;
882        let state = state.check_ts_codec(shard_id)?;
883        Ok(state)
884    }
885
886    /// Test helper for a [Self::open] call that is expected to succeed.
887    #[cfg(test)]
888    #[track_caller]
889    pub async fn expect_open<K, V, T, D>(
890        &self,
891        shard_id: ShardId,
892    ) -> (WriteHandle<K, V, T, D>, ReadHandle<K, V, T, D>)
893    where
894        K: Debug + Codec,
895        V: Debug + Codec,
896        T: Timestamp + TotalOrder + Lattice + Codec64 + Sync,
897        D: Monoid + Ord + Codec64 + Send + Sync,
898        K::Schema: Default,
899        V::Schema: Default,
900    {
901        self.open(
902            shard_id,
903            Arc::new(K::Schema::default()),
904            Arc::new(V::Schema::default()),
905            Diagnostics::for_tests(),
906            true,
907        )
908        .await
909        .expect("codec mismatch")
910    }
911
912    /// Return the metrics being used by this client.
913    ///
914    /// Only exposed for tests, persistcli, and benchmarks.
915    pub fn metrics(&self) -> &Arc<Metrics> {
916        &self.metrics
917    }
918}
919
920#[cfg(test)]
921mod tests {
922    use std::future::Future;
923    use std::mem;
924    use std::pin::Pin;
925    use std::task::Context;
926    use std::time::Duration;
927
928    use differential_dataflow::consolidation::consolidate_updates;
929    use differential_dataflow::lattice::Lattice;
930    use futures_task::noop_waker;
931    use mz_dyncfg::ConfigUpdates;
932    use mz_ore::assert_ok;
933    use mz_persist::indexed::encoding::BlobTraceBatchPart;
934    use mz_persist::workload::DataGenerator;
935    use mz_persist_types::codec_impls::{StringSchema, VecU8Schema};
936    use mz_proto::protobuf_roundtrip;
937    use proptest::prelude::*;
938    use timely::order::PartialOrder;
939    use timely::progress::Antichain;
940
941    use crate::batch::BLOB_TARGET_SIZE;
942    use crate::cache::PersistClientCache;
943    use crate::cfg::BATCH_BUILDER_MAX_OUTSTANDING_PARTS;
944    use crate::error::{CodecConcreteType, CodecMismatch, UpperMismatch};
945    use crate::internal::paths::BlobKey;
946    use crate::read::ListenEvent;
947
948    use super::*;
949
950    pub fn new_test_client_cache(dyncfgs: &ConfigUpdates) -> PersistClientCache {
951        // Configure an aggressively small blob_target_size so we get some
952        // amount of coverage of that in tests. Similarly, for max_outstanding.
953        let mut cache = PersistClientCache::new_no_metrics();
954        cache.cfg.set_config(&BLOB_TARGET_SIZE, 10);
955        cache
956            .cfg
957            .set_config(&BATCH_BUILDER_MAX_OUTSTANDING_PARTS, 1);
958        dyncfgs.apply(cache.cfg());
959
960        // Enable compaction in tests to ensure we get coverage.
961        cache.cfg.compaction_enabled = true;
962        cache
963    }
964
965    pub async fn new_test_client(dyncfgs: &ConfigUpdates) -> PersistClient {
966        let cache = new_test_client_cache(dyncfgs);
967        cache
968            .open(PersistLocation::new_in_mem())
969            .await
970            .expect("client construction failed")
971    }
972
973    pub fn all_ok<'a, K, V, T, D, I>(
974        iter: I,
975        as_of: T,
976    ) -> Vec<((Result<K, String>, Result<V, String>), T, D)>
977    where
978        K: Ord + Clone + 'a,
979        V: Ord + Clone + 'a,
980        T: Timestamp + Lattice + Clone + 'a,
981        D: Monoid + Clone + 'a,
982        I: IntoIterator<Item = &'a ((K, V), T, D)>,
983    {
984        let as_of = Antichain::from_elem(as_of);
985        let mut ret = iter
986            .into_iter()
987            .map(|((k, v), t, d)| {
988                let mut t = t.clone();
989                t.advance_by(as_of.borrow());
990                ((Ok(k.clone()), Ok(v.clone())), t, d.clone())
991            })
992            .collect();
993        consolidate_updates(&mut ret);
994        ret
995    }
996
997    pub async fn expect_fetch_part<K, V, T, D>(
998        blob: &dyn Blob,
999        key: &BlobKey,
1000        metrics: &Metrics,
1001        read_schemas: &Schemas<K, V>,
1002    ) -> (
1003        BlobTraceBatchPart<T>,
1004        Vec<((Result<K, String>, Result<V, String>), T, D)>,
1005    )
1006    where
1007        K: Codec,
1008        V: Codec,
1009        T: Timestamp + Codec64,
1010        D: Codec64,
1011    {
1012        let value = blob
1013            .get(key)
1014            .await
1015            .expect("failed to fetch part")
1016            .expect("missing part");
1017        let mut part =
1018            BlobTraceBatchPart::decode(&value, &metrics.columnar).expect("failed to decode part");
1019        // Ensure codec data is present even if it was not generated at write time.
1020        let _ = part
1021            .updates
1022            .get_or_make_codec::<K, V>(&read_schemas.key, &read_schemas.val);
1023        let mut updates = Vec::new();
1024        // TODO(bkirwi): switch to structured data in tests
1025        for ((k, v), t, d) in part.updates.records().expect("codec data").iter() {
1026            updates.push((
1027                (
1028                    K::decode(k, &read_schemas.key),
1029                    V::decode(v, &read_schemas.val),
1030                ),
1031                T::decode(t),
1032                D::decode(d),
1033            ));
1034        }
1035        (part, updates)
1036    }
1037
1038    #[mz_persist_proc::test(tokio::test)]
1039    #[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented
1040    async fn sanity_check(dyncfgs: ConfigUpdates) {
1041        let data = [
1042            (("1".to_owned(), "one".to_owned()), 1, 1),
1043            (("2".to_owned(), "two".to_owned()), 2, 1),
1044            (("3".to_owned(), "three".to_owned()), 3, 1),
1045        ];
1046
1047        let (mut write, mut read) = new_test_client(&dyncfgs)
1048            .await
1049            .expect_open::<String, String, u64, i64>(ShardId::new())
1050            .await;
1051        assert_eq!(write.upper(), &Antichain::from_elem(u64::minimum()));
1052        assert_eq!(read.since(), &Antichain::from_elem(u64::minimum()));
1053
1054        // Write a [0,3) batch.
1055        write
1056            .expect_append(&data[..2], write.upper().clone(), vec![3])
1057            .await;
1058        assert_eq!(write.upper(), &Antichain::from_elem(3));
1059
1060        // Grab a snapshot and listener as_of 1. Snapshot should only have part of what we wrote.
1061        assert_eq!(
1062            read.expect_snapshot_and_fetch(1).await,
1063            all_ok(&data[..1], 1)
1064        );
1065
1066        let mut listen = read.clone("").await.expect_listen(1).await;
1067
1068        // Write a [3,4) batch.
1069        write
1070            .expect_append(&data[2..], write.upper().clone(), vec![4])
1071            .await;
1072        assert_eq!(write.upper(), &Antichain::from_elem(4));
1073
1074        // Listen should have part of the initial write plus the new one.
1075        assert_eq!(
1076            listen.read_until(&4).await,
1077            (all_ok(&data[1..], 1), Antichain::from_elem(4))
1078        );
1079
1080        // Downgrading the since is tracked locally (but otherwise is a no-op).
1081        read.downgrade_since(&Antichain::from_elem(2)).await;
1082        assert_eq!(read.since(), &Antichain::from_elem(2));
1083    }
1084
1085    // Sanity check that the open_reader and open_writer calls work.
1086    #[mz_persist_proc::test(tokio::test)]
1087    #[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented
1088    async fn open_reader_writer(dyncfgs: ConfigUpdates) {
1089        let data = vec![
1090            (("1".to_owned(), "one".to_owned()), 1, 1),
1091            (("2".to_owned(), "two".to_owned()), 2, 1),
1092            (("3".to_owned(), "three".to_owned()), 3, 1),
1093        ];
1094
1095        let shard_id = ShardId::new();
1096        let client = new_test_client(&dyncfgs).await;
1097        let mut write1 = client
1098            .open_writer::<String, String, u64, i64>(
1099                shard_id,
1100                Arc::new(StringSchema),
1101                Arc::new(StringSchema),
1102                Diagnostics::for_tests(),
1103            )
1104            .await
1105            .expect("codec mismatch");
1106        let mut read1 = client
1107            .open_leased_reader::<String, String, u64, i64>(
1108                shard_id,
1109                Arc::new(StringSchema),
1110                Arc::new(StringSchema),
1111                Diagnostics::for_tests(),
1112                true,
1113            )
1114            .await
1115            .expect("codec mismatch");
1116        let mut read2 = client
1117            .open_leased_reader::<String, String, u64, i64>(
1118                shard_id,
1119                Arc::new(StringSchema),
1120                Arc::new(StringSchema),
1121                Diagnostics::for_tests(),
1122                true,
1123            )
1124            .await
1125            .expect("codec mismatch");
1126        let mut write2 = client
1127            .open_writer::<String, String, u64, i64>(
1128                shard_id,
1129                Arc::new(StringSchema),
1130                Arc::new(StringSchema),
1131                Diagnostics::for_tests(),
1132            )
1133            .await
1134            .expect("codec mismatch");
1135
1136        write2.expect_compare_and_append(&data[..1], 0, 2).await;
1137        assert_eq!(
1138            read2.expect_snapshot_and_fetch(1).await,
1139            all_ok(&data[..1], 1)
1140        );
1141        write1.expect_compare_and_append(&data[1..], 2, 4).await;
1142        assert_eq!(read1.expect_snapshot_and_fetch(3).await, all_ok(&data, 3));
1143    }
1144
1145    #[mz_persist_proc::test(tokio::test)]
1146    #[cfg_attr(miri, ignore)] // too slow
1147    async fn invalid_usage(dyncfgs: ConfigUpdates) {
1148        let data = vec![
1149            (("1".to_owned(), "one".to_owned()), 1, 1),
1150            (("2".to_owned(), "two".to_owned()), 2, 1),
1151            (("3".to_owned(), "three".to_owned()), 3, 1),
1152        ];
1153
1154        let shard_id0 = "s00000000-0000-0000-0000-000000000000"
1155            .parse::<ShardId>()
1156            .expect("invalid shard id");
1157        let mut client = new_test_client(&dyncfgs).await;
1158
1159        let (mut write0, mut read0) = client
1160            .expect_open::<String, String, u64, i64>(shard_id0)
1161            .await;
1162
1163        write0.expect_compare_and_append(&data, 0, 4).await;
1164
1165        // InvalidUsage from PersistClient methods.
1166        {
1167            fn codecs(
1168                k: &str,
1169                v: &str,
1170                t: &str,
1171                d: &str,
1172            ) -> (String, String, String, String, Option<CodecConcreteType>) {
1173                (k.to_owned(), v.to_owned(), t.to_owned(), d.to_owned(), None)
1174            }
1175
1176            client.shared_states = Arc::new(StateCache::new_no_metrics());
1177            assert_eq!(
1178                client
1179                    .open::<Vec<u8>, String, u64, i64>(
1180                        shard_id0,
1181                        Arc::new(VecU8Schema),
1182                        Arc::new(StringSchema),
1183                        Diagnostics::for_tests(),
1184                        true,
1185                    )
1186                    .await
1187                    .unwrap_err(),
1188                InvalidUsage::CodecMismatch(Box::new(CodecMismatch {
1189                    requested: codecs("Vec<u8>", "String", "u64", "i64"),
1190                    actual: codecs("String", "String", "u64", "i64"),
1191                }))
1192            );
1193            assert_eq!(
1194                client
1195                    .open::<String, Vec<u8>, u64, i64>(
1196                        shard_id0,
1197                        Arc::new(StringSchema),
1198                        Arc::new(VecU8Schema),
1199                        Diagnostics::for_tests(),
1200                        true,
1201                    )
1202                    .await
1203                    .unwrap_err(),
1204                InvalidUsage::CodecMismatch(Box::new(CodecMismatch {
1205                    requested: codecs("String", "Vec<u8>", "u64", "i64"),
1206                    actual: codecs("String", "String", "u64", "i64"),
1207                }))
1208            );
1209            assert_eq!(
1210                client
1211                    .open::<String, String, i64, i64>(
1212                        shard_id0,
1213                        Arc::new(StringSchema),
1214                        Arc::new(StringSchema),
1215                        Diagnostics::for_tests(),
1216                        true,
1217                    )
1218                    .await
1219                    .unwrap_err(),
1220                InvalidUsage::CodecMismatch(Box::new(CodecMismatch {
1221                    requested: codecs("String", "String", "i64", "i64"),
1222                    actual: codecs("String", "String", "u64", "i64"),
1223                }))
1224            );
1225            assert_eq!(
1226                client
1227                    .open::<String, String, u64, u64>(
1228                        shard_id0,
1229                        Arc::new(StringSchema),
1230                        Arc::new(StringSchema),
1231                        Diagnostics::for_tests(),
1232                        true,
1233                    )
1234                    .await
1235                    .unwrap_err(),
1236                InvalidUsage::CodecMismatch(Box::new(CodecMismatch {
1237                    requested: codecs("String", "String", "u64", "u64"),
1238                    actual: codecs("String", "String", "u64", "i64"),
1239                }))
1240            );
1241
1242            // open_reader and open_writer end up using the same checks, so just
1243            // verify one type each to verify the plumbing instead of the full
1244            // set.
1245            assert_eq!(
1246                client
1247                    .open_leased_reader::<Vec<u8>, String, u64, i64>(
1248                        shard_id0,
1249                        Arc::new(VecU8Schema),
1250                        Arc::new(StringSchema),
1251                        Diagnostics::for_tests(),
1252                        true,
1253                    )
1254                    .await
1255                    .unwrap_err(),
1256                InvalidUsage::CodecMismatch(Box::new(CodecMismatch {
1257                    requested: codecs("Vec<u8>", "String", "u64", "i64"),
1258                    actual: codecs("String", "String", "u64", "i64"),
1259                }))
1260            );
1261            assert_eq!(
1262                client
1263                    .open_writer::<Vec<u8>, String, u64, i64>(
1264                        shard_id0,
1265                        Arc::new(VecU8Schema),
1266                        Arc::new(StringSchema),
1267                        Diagnostics::for_tests(),
1268                    )
1269                    .await
1270                    .unwrap_err(),
1271                InvalidUsage::CodecMismatch(Box::new(CodecMismatch {
1272                    requested: codecs("Vec<u8>", "String", "u64", "i64"),
1273                    actual: codecs("String", "String", "u64", "i64"),
1274                }))
1275            );
1276        }
1277
1278        // InvalidUsage from ReadHandle methods.
1279        {
1280            let snap = read0
1281                .snapshot(Antichain::from_elem(3))
1282                .await
1283                .expect("cannot serve requested as_of");
1284
1285            let shard_id1 = "s11111111-1111-1111-1111-111111111111"
1286                .parse::<ShardId>()
1287                .expect("invalid shard id");
1288            let mut fetcher1 = client
1289                .create_batch_fetcher::<String, String, u64, i64>(
1290                    shard_id1,
1291                    Default::default(),
1292                    Default::default(),
1293                    false,
1294                    Diagnostics::for_tests(),
1295                )
1296                .await
1297                .unwrap();
1298            for part in snap {
1299                let (part, _lease) = part.into_exchangeable_part();
1300                let res = fetcher1.fetch_leased_part(part).await;
1301                assert_eq!(
1302                    res.unwrap_err(),
1303                    InvalidUsage::BatchNotFromThisShard {
1304                        batch_shard: shard_id0,
1305                        handle_shard: shard_id1,
1306                    }
1307                );
1308            }
1309        }
1310
1311        // InvalidUsage from WriteHandle methods.
1312        {
1313            let ts3 = &data[2];
1314            assert_eq!(ts3.1, 3);
1315            let ts3 = vec![ts3.clone()];
1316
1317            // WriteHandle::append also covers append_batch,
1318            // compare_and_append_batch, compare_and_append.
1319            assert_eq!(
1320                write0
1321                    .append(&ts3, Antichain::from_elem(4), Antichain::from_elem(5))
1322                    .await
1323                    .unwrap_err(),
1324                InvalidUsage::UpdateNotBeyondLower {
1325                    ts: 3,
1326                    lower: Antichain::from_elem(4),
1327                },
1328            );
1329            assert_eq!(
1330                write0
1331                    .append(&ts3, Antichain::from_elem(2), Antichain::from_elem(3))
1332                    .await
1333                    .unwrap_err(),
1334                InvalidUsage::UpdateBeyondUpper {
1335                    ts: 3,
1336                    expected_upper: Antichain::from_elem(3),
1337                },
1338            );
1339            // NB unlike the previous tests, this one has empty updates.
1340            assert_eq!(
1341                write0
1342                    .append(&data[..0], Antichain::from_elem(3), Antichain::from_elem(2))
1343                    .await
1344                    .unwrap_err(),
1345                InvalidUsage::InvalidBounds {
1346                    lower: Antichain::from_elem(3),
1347                    upper: Antichain::from_elem(2),
1348                },
1349            );
1350
1351            // Tests for the BatchBuilder.
1352            assert_eq!(
1353                write0
1354                    .builder(Antichain::from_elem(3))
1355                    .finish(Antichain::from_elem(2))
1356                    .await
1357                    .unwrap_err(),
1358                InvalidUsage::InvalidBounds {
1359                    lower: Antichain::from_elem(3),
1360                    upper: Antichain::from_elem(2)
1361                },
1362            );
1363            let batch = write0
1364                .batch(&ts3, Antichain::from_elem(3), Antichain::from_elem(4))
1365                .await
1366                .expect("invalid usage");
1367            assert_eq!(
1368                write0
1369                    .append_batch(batch, Antichain::from_elem(4), Antichain::from_elem(5))
1370                    .await
1371                    .unwrap_err(),
1372                InvalidUsage::InvalidBatchBounds {
1373                    batch_lower: Antichain::from_elem(3),
1374                    batch_upper: Antichain::from_elem(4),
1375                    append_lower: Antichain::from_elem(4),
1376                    append_upper: Antichain::from_elem(5),
1377                },
1378            );
1379            let batch = write0
1380                .batch(&ts3, Antichain::from_elem(3), Antichain::from_elem(4))
1381                .await
1382                .expect("invalid usage");
1383            assert_eq!(
1384                write0
1385                    .append_batch(batch, Antichain::from_elem(2), Antichain::from_elem(3))
1386                    .await
1387                    .unwrap_err(),
1388                InvalidUsage::InvalidBatchBounds {
1389                    batch_lower: Antichain::from_elem(3),
1390                    batch_upper: Antichain::from_elem(4),
1391                    append_lower: Antichain::from_elem(2),
1392                    append_upper: Antichain::from_elem(3),
1393                },
1394            );
1395            let batch = write0
1396                .batch(&ts3, Antichain::from_elem(3), Antichain::from_elem(4))
1397                .await
1398                .expect("invalid usage");
1399            // NB unlike the others, this one uses matches! because it's
1400            // non-deterministic (the key)
1401            assert!(matches!(
1402                write0
1403                    .append_batch(batch, Antichain::from_elem(3), Antichain::from_elem(3))
1404                    .await
1405                    .unwrap_err(),
1406                InvalidUsage::InvalidEmptyTimeInterval { .. }
1407            ));
1408        }
1409    }
1410
1411    #[mz_persist_proc::test(tokio::test)]
1412    #[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented
1413    async fn multiple_shards(dyncfgs: ConfigUpdates) {
1414        let data1 = [
1415            (("1".to_owned(), "one".to_owned()), 1, 1),
1416            (("2".to_owned(), "two".to_owned()), 2, 1),
1417        ];
1418
1419        let data2 = [(("1".to_owned(), ()), 1, 1), (("2".to_owned(), ()), 2, 1)];
1420
1421        let client = new_test_client(&dyncfgs).await;
1422
1423        let (mut write1, mut read1) = client
1424            .expect_open::<String, String, u64, i64>(ShardId::new())
1425            .await;
1426
1427        // Different types, so that checks would fail in case we were not separating these
1428        // collections internally.
1429        let (mut write2, mut read2) = client
1430            .expect_open::<String, (), u64, i64>(ShardId::new())
1431            .await;
1432
1433        write1
1434            .expect_compare_and_append(&data1[..], u64::minimum(), 3)
1435            .await;
1436
1437        write2
1438            .expect_compare_and_append(&data2[..], u64::minimum(), 3)
1439            .await;
1440
1441        assert_eq!(
1442            read1.expect_snapshot_and_fetch(2).await,
1443            all_ok(&data1[..], 2)
1444        );
1445
1446        assert_eq!(
1447            read2.expect_snapshot_and_fetch(2).await,
1448            all_ok(&data2[..], 2)
1449        );
1450    }
1451
1452    #[mz_persist_proc::test(tokio::test)]
1453    #[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented
1454    async fn fetch_upper(dyncfgs: ConfigUpdates) {
1455        let data = [
1456            (("1".to_owned(), "one".to_owned()), 1, 1),
1457            (("2".to_owned(), "two".to_owned()), 2, 1),
1458        ];
1459
1460        let client = new_test_client(&dyncfgs).await;
1461
1462        let shard_id = ShardId::new();
1463
1464        let (mut write1, _read1) = client
1465            .expect_open::<String, String, u64, i64>(shard_id)
1466            .await;
1467
1468        let (mut write2, _read2) = client
1469            .expect_open::<String, String, u64, i64>(shard_id)
1470            .await;
1471
1472        write1
1473            .expect_append(&data[..], write1.upper().clone(), vec![3])
1474            .await;
1475
1476        // The shard-global upper does advance, even if this writer didn't advance its local upper.
1477        assert_eq!(write2.fetch_recent_upper().await, &Antichain::from_elem(3));
1478
1479        // The writer-local upper should advance, even if it was another writer
1480        // that advanced the frontier.
1481        assert_eq!(write2.upper(), &Antichain::from_elem(3));
1482    }
1483
1484    #[mz_persist_proc::test(tokio::test)]
1485    #[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented
1486    async fn append_with_invalid_upper(dyncfgs: ConfigUpdates) {
1487        let data = [
1488            (("1".to_owned(), "one".to_owned()), 1, 1),
1489            (("2".to_owned(), "two".to_owned()), 2, 1),
1490        ];
1491
1492        let client = new_test_client(&dyncfgs).await;
1493
1494        let shard_id = ShardId::new();
1495
1496        let (mut write, _read) = client
1497            .expect_open::<String, String, u64, i64>(shard_id)
1498            .await;
1499
1500        write
1501            .expect_append(&data[..], write.upper().clone(), vec![3])
1502            .await;
1503
1504        let data = [
1505            (("5".to_owned(), "fünf".to_owned()), 5, 1),
1506            (("6".to_owned(), "sechs".to_owned()), 6, 1),
1507        ];
1508        let res = write
1509            .append(
1510                data.iter(),
1511                Antichain::from_elem(5),
1512                Antichain::from_elem(7),
1513            )
1514            .await;
1515        assert_eq!(
1516            res,
1517            Ok(Err(UpperMismatch {
1518                expected: Antichain::from_elem(5),
1519                current: Antichain::from_elem(3)
1520            }))
1521        );
1522
1523        // Writing with an outdated upper updates the write handle's upper to the correct upper.
1524        assert_eq!(write.upper(), &Antichain::from_elem(3));
1525    }
1526
1527    // Make sure that the API structs are Sync + Send, so that they can be used in async tasks.
1528    // NOTE: This is a compile-time only test. If it compiles, we're good.
1529    #[allow(unused)]
1530    async fn sync_send(dyncfgs: ConfigUpdates) {
1531        mz_ore::test::init_logging();
1532
1533        fn is_send_sync<T: Send + Sync>(_x: T) -> bool {
1534            true
1535        }
1536
1537        let client = new_test_client(&dyncfgs).await;
1538
1539        let (write, read) = client
1540            .expect_open::<String, String, u64, i64>(ShardId::new())
1541            .await;
1542
1543        assert!(is_send_sync(client));
1544        assert!(is_send_sync(write));
1545        assert!(is_send_sync(read));
1546    }
1547
1548    #[mz_persist_proc::test(tokio::test)]
1549    #[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented
1550    async fn compare_and_append(dyncfgs: ConfigUpdates) {
1551        let data = vec![
1552            (("1".to_owned(), "one".to_owned()), 1, 1),
1553            (("2".to_owned(), "two".to_owned()), 2, 1),
1554            (("3".to_owned(), "three".to_owned()), 3, 1),
1555        ];
1556
1557        let id = ShardId::new();
1558        let client = new_test_client(&dyncfgs).await;
1559        let (mut write1, mut read) = client.expect_open::<String, String, u64, i64>(id).await;
1560
1561        let (mut write2, _read) = client.expect_open::<String, String, u64, i64>(id).await;
1562
1563        assert_eq!(write1.upper(), &Antichain::from_elem(u64::minimum()));
1564        assert_eq!(write2.upper(), &Antichain::from_elem(u64::minimum()));
1565        assert_eq!(read.since(), &Antichain::from_elem(u64::minimum()));
1566
1567        // Write a [0,3) batch.
1568        write1
1569            .expect_compare_and_append(&data[..2], u64::minimum(), 3)
1570            .await;
1571        assert_eq!(write1.upper(), &Antichain::from_elem(3));
1572
1573        assert_eq!(
1574            read.expect_snapshot_and_fetch(2).await,
1575            all_ok(&data[..2], 2)
1576        );
1577
1578        // Try and write with a wrong expected upper.
1579        let res = write2
1580            .compare_and_append(
1581                &data[..2],
1582                Antichain::from_elem(u64::minimum()),
1583                Antichain::from_elem(3),
1584            )
1585            .await;
1586        assert_eq!(
1587            res,
1588            Ok(Err(UpperMismatch {
1589                expected: Antichain::from_elem(u64::minimum()),
1590                current: Antichain::from_elem(3)
1591            }))
1592        );
1593
1594        // A failed write updates our local cache of the shard upper.
1595        assert_eq!(write2.upper(), &Antichain::from_elem(3));
1596
1597        // Try again with a good expected upper.
1598        write2.expect_compare_and_append(&data[2..], 3, 4).await;
1599
1600        assert_eq!(write2.upper(), &Antichain::from_elem(4));
1601
1602        assert_eq!(read.expect_snapshot_and_fetch(3).await, all_ok(&data, 3));
1603    }
1604
1605    #[mz_persist_proc::test(tokio::test)]
1606    #[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented
1607    async fn overlapping_append(dyncfgs: ConfigUpdates) {
1608        mz_ore::test::init_logging_default("info");
1609
1610        let data = vec![
1611            (("1".to_owned(), "one".to_owned()), 1, 1),
1612            (("2".to_owned(), "two".to_owned()), 2, 1),
1613            (("3".to_owned(), "three".to_owned()), 3, 1),
1614            (("4".to_owned(), "vier".to_owned()), 4, 1),
1615            (("5".to_owned(), "cinque".to_owned()), 5, 1),
1616        ];
1617
1618        let id = ShardId::new();
1619        let client = new_test_client(&dyncfgs).await;
1620
1621        let (mut write1, mut read) = client.expect_open::<String, String, u64, i64>(id).await;
1622
1623        let (mut write2, _read) = client.expect_open::<String, String, u64, i64>(id).await;
1624
1625        // Grab a listener before we do any writing
1626        let mut listen = read.clone("").await.expect_listen(0).await;
1627
1628        // Write a [0,3) batch.
1629        write1
1630            .expect_append(&data[..2], write1.upper().clone(), vec![3])
1631            .await;
1632        assert_eq!(write1.upper(), &Antichain::from_elem(3));
1633
1634        // Write a [0,5) batch with the second writer.
1635        write2
1636            .expect_append(&data[..4], write2.upper().clone(), vec![5])
1637            .await;
1638        assert_eq!(write2.upper(), &Antichain::from_elem(5));
1639
1640        // Write a [3,6) batch with the first writer.
1641        write1
1642            .expect_append(&data[2..5], write1.upper().clone(), vec![6])
1643            .await;
1644        assert_eq!(write1.upper(), &Antichain::from_elem(6));
1645
1646        assert_eq!(read.expect_snapshot_and_fetch(5).await, all_ok(&data, 5));
1647
1648        assert_eq!(
1649            listen.read_until(&6).await,
1650            (all_ok(&data[..], 1), Antichain::from_elem(6))
1651        );
1652    }
1653
1654    // Appends need to be contiguous for a shard, meaning the lower of an appended batch must not
1655    // be in advance of the current shard upper.
1656    #[mz_persist_proc::test(tokio::test)]
1657    #[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented
1658    async fn contiguous_append(dyncfgs: ConfigUpdates) {
1659        let data = vec![
1660            (("1".to_owned(), "one".to_owned()), 1, 1),
1661            (("2".to_owned(), "two".to_owned()), 2, 1),
1662            (("3".to_owned(), "three".to_owned()), 3, 1),
1663            (("4".to_owned(), "vier".to_owned()), 4, 1),
1664            (("5".to_owned(), "cinque".to_owned()), 5, 1),
1665        ];
1666
1667        let id = ShardId::new();
1668        let client = new_test_client(&dyncfgs).await;
1669
1670        let (mut write, mut read) = client.expect_open::<String, String, u64, i64>(id).await;
1671
1672        // Write a [0,3) batch.
1673        write
1674            .expect_append(&data[..2], write.upper().clone(), vec![3])
1675            .await;
1676        assert_eq!(write.upper(), &Antichain::from_elem(3));
1677
1678        // Appending a non-contiguous batch should fail.
1679        // Write a [5,6) batch with the second writer.
1680        let result = write
1681            .append(
1682                &data[4..5],
1683                Antichain::from_elem(5),
1684                Antichain::from_elem(6),
1685            )
1686            .await;
1687        assert_eq!(
1688            result,
1689            Ok(Err(UpperMismatch {
1690                expected: Antichain::from_elem(5),
1691                current: Antichain::from_elem(3)
1692            }))
1693        );
1694
1695        // Fixing the lower to make the write contiguous should make the append succeed.
1696        write.expect_append(&data[2..5], vec![3], vec![6]).await;
1697        assert_eq!(write.upper(), &Antichain::from_elem(6));
1698
1699        assert_eq!(read.expect_snapshot_and_fetch(5).await, all_ok(&data, 5));
1700    }
1701
1702    // Per-writer appends can be non-contiguous, as long as appends to the shard from all writers
1703    // combined are contiguous.
1704    #[mz_persist_proc::test(tokio::test)]
1705    #[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented
1706    async fn noncontiguous_append_per_writer(dyncfgs: ConfigUpdates) {
1707        let data = vec![
1708            (("1".to_owned(), "one".to_owned()), 1, 1),
1709            (("2".to_owned(), "two".to_owned()), 2, 1),
1710            (("3".to_owned(), "three".to_owned()), 3, 1),
1711            (("4".to_owned(), "vier".to_owned()), 4, 1),
1712            (("5".to_owned(), "cinque".to_owned()), 5, 1),
1713        ];
1714
1715        let id = ShardId::new();
1716        let client = new_test_client(&dyncfgs).await;
1717
1718        let (mut write1, mut read) = client.expect_open::<String, String, u64, i64>(id).await;
1719
1720        let (mut write2, _read) = client.expect_open::<String, String, u64, i64>(id).await;
1721
1722        // Write a [0,3) batch with writer 1.
1723        write1
1724            .expect_append(&data[..2], write1.upper().clone(), vec![3])
1725            .await;
1726        assert_eq!(write1.upper(), &Antichain::from_elem(3));
1727
1728        // Write a [3,5) batch with writer 2.
1729        write2.upper = Antichain::from_elem(3);
1730        write2
1731            .expect_append(&data[2..4], write2.upper().clone(), vec![5])
1732            .await;
1733        assert_eq!(write2.upper(), &Antichain::from_elem(5));
1734
1735        // Write a [5,6) batch with writer 1.
1736        write1.upper = Antichain::from_elem(5);
1737        write1
1738            .expect_append(&data[4..5], write1.upper().clone(), vec![6])
1739            .await;
1740        assert_eq!(write1.upper(), &Antichain::from_elem(6));
1741
1742        assert_eq!(read.expect_snapshot_and_fetch(5).await, all_ok(&data, 5));
1743    }
1744
1745    // Compare_and_appends need to be contiguous for a shard, meaning the lower of an appended
1746    // batch needs to match the current shard upper.
1747    #[mz_persist_proc::test(tokio::test)]
1748    #[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented
1749    async fn contiguous_compare_and_append(dyncfgs: ConfigUpdates) {
1750        let data = vec![
1751            (("1".to_owned(), "one".to_owned()), 1, 1),
1752            (("2".to_owned(), "two".to_owned()), 2, 1),
1753            (("3".to_owned(), "three".to_owned()), 3, 1),
1754            (("4".to_owned(), "vier".to_owned()), 4, 1),
1755            (("5".to_owned(), "cinque".to_owned()), 5, 1),
1756        ];
1757
1758        let id = ShardId::new();
1759        let client = new_test_client(&dyncfgs).await;
1760
1761        let (mut write, mut read) = client.expect_open::<String, String, u64, i64>(id).await;
1762
1763        // Write a [0,3) batch.
1764        write.expect_compare_and_append(&data[..2], 0, 3).await;
1765        assert_eq!(write.upper(), &Antichain::from_elem(3));
1766
1767        // Appending a non-contiguous batch should fail.
1768        // Write a [5,6) batch with the second writer.
1769        let result = write
1770            .compare_and_append(
1771                &data[4..5],
1772                Antichain::from_elem(5),
1773                Antichain::from_elem(6),
1774            )
1775            .await;
1776        assert_eq!(
1777            result,
1778            Ok(Err(UpperMismatch {
1779                expected: Antichain::from_elem(5),
1780                current: Antichain::from_elem(3)
1781            }))
1782        );
1783
1784        // Writing with the correct expected upper to make the write contiguous should make the
1785        // append succeed.
1786        write.expect_compare_and_append(&data[2..5], 3, 6).await;
1787        assert_eq!(write.upper(), &Antichain::from_elem(6));
1788
1789        assert_eq!(read.expect_snapshot_and_fetch(5).await, all_ok(&data, 5));
1790    }
1791
1792    // Per-writer compare_and_appends can be non-contiguous, as long as appends to the shard from
1793    // all writers combined are contiguous.
1794    #[mz_persist_proc::test(tokio::test)]
1795    #[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented
1796    async fn noncontiguous_compare_and_append_per_writer(dyncfgs: ConfigUpdates) {
1797        let data = vec![
1798            (("1".to_owned(), "one".to_owned()), 1, 1),
1799            (("2".to_owned(), "two".to_owned()), 2, 1),
1800            (("3".to_owned(), "three".to_owned()), 3, 1),
1801            (("4".to_owned(), "vier".to_owned()), 4, 1),
1802            (("5".to_owned(), "cinque".to_owned()), 5, 1),
1803        ];
1804
1805        let id = ShardId::new();
1806        let client = new_test_client(&dyncfgs).await;
1807
1808        let (mut write1, mut read) = client.expect_open::<String, String, u64, i64>(id).await;
1809
1810        let (mut write2, _read) = client.expect_open::<String, String, u64, i64>(id).await;
1811
1812        // Write a [0,3) batch with writer 1.
1813        write1.expect_compare_and_append(&data[..2], 0, 3).await;
1814        assert_eq!(write1.upper(), &Antichain::from_elem(3));
1815
1816        // Write a [3,5) batch with writer 2.
1817        write2.expect_compare_and_append(&data[2..4], 3, 5).await;
1818        assert_eq!(write2.upper(), &Antichain::from_elem(5));
1819
1820        // Write a [5,6) batch with writer 1.
1821        write1.expect_compare_and_append(&data[4..5], 5, 6).await;
1822        assert_eq!(write1.upper(), &Antichain::from_elem(6));
1823
1824        assert_eq!(read.expect_snapshot_and_fetch(5).await, all_ok(&data, 5));
1825    }
1826
1827    #[mz_ore::test]
1828    fn fmt_ids() {
1829        assert_eq!(
1830            format!("{}", LeasedReaderId([0u8; 16])),
1831            "r00000000-0000-0000-0000-000000000000"
1832        );
1833        assert_eq!(
1834            format!("{:?}", LeasedReaderId([0u8; 16])),
1835            "LeasedReaderId(00000000-0000-0000-0000-000000000000)"
1836        );
1837    }
1838
1839    #[mz_persist_proc::test(tokio::test(flavor = "multi_thread"))]
1840    #[cfg_attr(miri, ignore)] // error: unsupported operation: integer-to-pointer casts and `ptr::from_exposed_addr` are not supported with `-Zmiri-strict-provenance`
1841    async fn concurrency(dyncfgs: ConfigUpdates) {
1842        let data = DataGenerator::small();
1843
1844        const NUM_WRITERS: usize = 2;
1845        let id = ShardId::new();
1846        let client = new_test_client(&dyncfgs).await;
1847        let mut handles = Vec::<mz_ore::task::JoinHandle<()>>::new();
1848        for idx in 0..NUM_WRITERS {
1849            let (data, client) = (data.clone(), client.clone());
1850
1851            let (batch_tx, mut batch_rx) = tokio::sync::mpsc::channel(1);
1852
1853            let client1 = client.clone();
1854            let handle = mz_ore::task::spawn(|| format!("writer-{}", idx), async move {
1855                let (write, _) = client1.expect_open::<Vec<u8>, Vec<u8>, u64, i64>(id).await;
1856                let mut current_upper = 0;
1857                for batch in data.batches() {
1858                    let new_upper = match batch.get(batch.len() - 1) {
1859                        Some((_, max_ts, _)) => u64::decode(max_ts) + 1,
1860                        None => continue,
1861                    };
1862                    // Because we (intentionally) call open inside the task,
1863                    // some other writer may have raced ahead and already
1864                    // appended some data before this one was registered. As a
1865                    // result, this writer may not be starting with an upper of
1866                    // the initial empty antichain. This is nice because it
1867                    // mimics how a real HA source would work, but it means we
1868                    // have to skip any batches that have already been committed
1869                    // (otherwise our new_upper would be before our upper).
1870                    //
1871                    // Note however, that unlike a real source, our
1872                    // DataGenerator-derived batches are guaranteed to be
1873                    // chunked along the same boundaries. This means we don't
1874                    // have to consider partial batches when generating the
1875                    // updates below.
1876                    if PartialOrder::less_equal(&Antichain::from_elem(new_upper), write.upper()) {
1877                        continue;
1878                    }
1879
1880                    let current_upper_chain = Antichain::from_elem(current_upper);
1881                    current_upper = new_upper;
1882                    let new_upper_chain = Antichain::from_elem(new_upper);
1883                    let mut builder = write.builder(current_upper_chain);
1884
1885                    for ((k, v), t, d) in batch.iter() {
1886                        builder
1887                            .add(&k.to_vec(), &v.to_vec(), &u64::decode(t), &i64::decode(d))
1888                            .await
1889                            .expect("invalid usage");
1890                    }
1891
1892                    let batch = builder
1893                        .finish(new_upper_chain)
1894                        .await
1895                        .expect("invalid usage");
1896
1897                    match batch_tx.send(batch).await {
1898                        Ok(_) => (),
1899                        Err(e) => panic!("send error: {}", e),
1900                    }
1901                }
1902            });
1903            handles.push(handle);
1904
1905            let handle = mz_ore::task::spawn(|| format!("appender-{}", idx), async move {
1906                let (mut write, _) = client.expect_open::<Vec<u8>, Vec<u8>, u64, i64>(id).await;
1907
1908                while let Some(batch) = batch_rx.recv().await {
1909                    let lower = batch.lower().clone();
1910                    let upper = batch.upper().clone();
1911                    write
1912                        .append_batch(batch, lower, upper)
1913                        .await
1914                        .expect("invalid usage")
1915                        .expect("unexpected upper");
1916                }
1917            });
1918            handles.push(handle);
1919        }
1920
1921        for handle in handles {
1922            let () = handle.await;
1923        }
1924
1925        let expected = data.records().collect::<Vec<_>>();
1926        let max_ts = expected.last().map(|(_, t, _)| *t).unwrap_or_default();
1927        let (_, mut read) = client.expect_open::<Vec<u8>, Vec<u8>, u64, i64>(id).await;
1928        assert_eq!(
1929            read.expect_snapshot_and_fetch(max_ts).await,
1930            all_ok(expected.iter(), max_ts)
1931        );
1932    }
1933
1934    // Regression test for database-issues#3523. Snapshot with as_of >= upper would
1935    // immediately return the data currently available instead of waiting for
1936    // upper to advance past as_of.
1937    #[mz_persist_proc::test(tokio::test)]
1938    #[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented
1939    async fn regression_blocking_reads(dyncfgs: ConfigUpdates) {
1940        let waker = noop_waker();
1941        let mut cx = Context::from_waker(&waker);
1942
1943        let data = [
1944            (("1".to_owned(), "one".to_owned()), 1, 1),
1945            (("2".to_owned(), "two".to_owned()), 2, 1),
1946            (("3".to_owned(), "three".to_owned()), 3, 1),
1947        ];
1948
1949        let id = ShardId::new();
1950        let client = new_test_client(&dyncfgs).await;
1951        let (mut write, mut read) = client.expect_open::<String, String, u64, i64>(id).await;
1952
1953        // Grab a listener as_of (aka gt) 1, which is not yet closed out.
1954        let mut listen = read.clone("").await.expect_listen(1).await;
1955        let mut listen_next = Box::pin(listen.fetch_next());
1956        // Intentionally don't await the listen_next, but instead manually poke
1957        // it for a while and assert that it doesn't resolve yet. See below for
1958        // discussion of some alternative ways of writing this unit test.
1959        for _ in 0..100 {
1960            assert!(
1961                Pin::new(&mut listen_next).poll(&mut cx).is_pending(),
1962                "listen::next unexpectedly ready"
1963            );
1964        }
1965
1966        // Write a [0,3) batch.
1967        write
1968            .expect_compare_and_append(&data[..2], u64::minimum(), 3)
1969            .await;
1970
1971        // The initial listen_next call should now be able to return data at 2.
1972        // It doesn't get 1 because the as_of was 1 and listen is strictly gt.
1973        assert_eq!(
1974            listen_next.await,
1975            vec![
1976                ListenEvent::Updates(vec![((Ok("2".to_owned()), Ok("two".to_owned())), 2, 1)]),
1977                ListenEvent::Progress(Antichain::from_elem(3)),
1978            ]
1979        );
1980
1981        // Grab a snapshot as_of 3, which is not yet closed out. Intentionally
1982        // don't await the snap, but instead manually poke it for a while and
1983        // assert that it doesn't resolve yet.
1984        //
1985        // An alternative to this would be to run it in a task and poll the task
1986        // with some timeout, but this would introduce a fixed test execution
1987        // latency of the timeout in the happy case. Plus, it would be
1988        // non-deterministic.
1989        //
1990        // Another alternative (that's potentially quite interesting!) would be
1991        // to separate creating a snapshot immediately (which would fail if
1992        // as_of was >= upper) from a bit of logic that retries until that case
1993        // is ready.
1994        let mut snap = Box::pin(read.expect_snapshot_and_fetch(3));
1995        for _ in 0..100 {
1996            assert!(
1997                Pin::new(&mut snap).poll(&mut cx).is_pending(),
1998                "snapshot unexpectedly ready"
1999            );
2000        }
2001
2002        // Now add the data at 3 and also unblock the snapshot.
2003        write.expect_compare_and_append(&data[2..], 3, 4).await;
2004
2005        // Read the snapshot and check that it got all the appropriate data.
2006        assert_eq!(snap.await, all_ok(&data[..], 3));
2007    }
2008
2009    #[mz_persist_proc::test(tokio::test)]
2010    #[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented
2011    async fn heartbeat_task_shutdown(dyncfgs: ConfigUpdates) {
2012        // Verify that the ReadHandle and WriteHandle background heartbeat tasks
2013        // shut down cleanly after the handle is expired.
2014        let mut cache = new_test_client_cache(&dyncfgs);
2015        cache
2016            .cfg
2017            .set_config(&READER_LEASE_DURATION, Duration::from_millis(1));
2018        cache.cfg.writer_lease_duration = Duration::from_millis(1);
2019        let (_write, mut read) = cache
2020            .open(PersistLocation::new_in_mem())
2021            .await
2022            .expect("client construction failed")
2023            .expect_open::<(), (), u64, i64>(ShardId::new())
2024            .await;
2025        let mut read_unexpired_state = read
2026            .unexpired_state
2027            .take()
2028            .expect("handle should have unexpired state");
2029        read.expire().await;
2030        for read_heartbeat_task in mem::take(&mut read_unexpired_state._heartbeat_tasks) {
2031            let () = read_heartbeat_task.await;
2032        }
2033    }
2034
2035    /// Verify that shard finalization works with empty shards, shards that have
2036    /// an empty write up to the empty upper Antichain.
2037    #[mz_persist_proc::test(tokio::test)]
2038    #[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented
2039    async fn finalize_empty_shard(dyncfgs: ConfigUpdates) {
2040        let persist_client = new_test_client(&dyncfgs).await;
2041
2042        let shard_id = ShardId::new();
2043        pub const CRITICAL_SINCE: CriticalReaderId =
2044            CriticalReaderId([0, 0, 0, 0, 17, 17, 34, 34, 51, 51, 68, 68, 68, 68, 68, 68]);
2045
2046        let (mut write, mut read) = persist_client
2047            .expect_open::<(), (), u64, i64>(shard_id)
2048            .await;
2049
2050        // Advance since and upper to empty, which is a pre-requisite for
2051        // finalization/tombstoning.
2052        let () = read.downgrade_since(&Antichain::new()).await;
2053        let () = write.advance_upper(&Antichain::new()).await;
2054
2055        let mut since_handle: SinceHandle<(), (), u64, i64, u64> = persist_client
2056            .open_critical_since(shard_id, CRITICAL_SINCE, Diagnostics::for_tests())
2057            .await
2058            .expect("invalid persist usage");
2059
2060        let epoch = since_handle.opaque().clone();
2061        let new_since = Antichain::new();
2062        let downgrade = since_handle
2063            .compare_and_downgrade_since(&epoch, (&epoch, &new_since))
2064            .await;
2065
2066        assert!(
2067            downgrade.is_ok(),
2068            "downgrade of critical handle must succeed"
2069        );
2070
2071        let finalize = persist_client
2072            .finalize_shard::<(), (), u64, i64>(shard_id, Diagnostics::for_tests())
2073            .await;
2074
2075        assert_ok!(finalize, "finalization must succeed");
2076
2077        let is_finalized = persist_client
2078            .is_finalized::<(), (), u64, i64>(shard_id, Diagnostics::for_tests())
2079            .await
2080            .expect("invalid persist usage");
2081        assert!(is_finalized, "shard must still be finalized");
2082    }
2083
2084    /// Verify that shard finalization works with shards that had some data
2085    /// written to them, plus then an empty batch to bring their upper to the
2086    /// empty Antichain.
2087    #[mz_persist_proc::test(tokio::test)]
2088    #[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented
2089    async fn finalize_shard(dyncfgs: ConfigUpdates) {
2090        const DATA: &[(((), ()), u64, i64)] = &[(((), ()), 0, 1)];
2091        let persist_client = new_test_client(&dyncfgs).await;
2092
2093        let shard_id = ShardId::new();
2094        pub const CRITICAL_SINCE: CriticalReaderId =
2095            CriticalReaderId([0, 0, 0, 0, 17, 17, 34, 34, 51, 51, 68, 68, 68, 68, 68, 68]);
2096
2097        let (mut write, mut read) = persist_client
2098            .expect_open::<(), (), u64, i64>(shard_id)
2099            .await;
2100
2101        // Write some data.
2102        let () = write
2103            .compare_and_append(DATA, Antichain::from_elem(0), Antichain::from_elem(1))
2104            .await
2105            .expect("usage should be valid")
2106            .expect("upper should match");
2107
2108        // Advance since and upper to empty, which is a pre-requisite for
2109        // finalization/tombstoning.
2110        let () = read.downgrade_since(&Antichain::new()).await;
2111        let () = write.advance_upper(&Antichain::new()).await;
2112
2113        let mut since_handle: SinceHandle<(), (), u64, i64, u64> = persist_client
2114            .open_critical_since(shard_id, CRITICAL_SINCE, Diagnostics::for_tests())
2115            .await
2116            .expect("invalid persist usage");
2117
2118        let epoch = since_handle.opaque().clone();
2119        let new_since = Antichain::new();
2120        let downgrade = since_handle
2121            .compare_and_downgrade_since(&epoch, (&epoch, &new_since))
2122            .await;
2123
2124        assert!(
2125            downgrade.is_ok(),
2126            "downgrade of critical handle must succeed"
2127        );
2128
2129        let finalize = persist_client
2130            .finalize_shard::<(), (), u64, i64>(shard_id, Diagnostics::for_tests())
2131            .await;
2132
2133        assert_ok!(finalize, "finalization must succeed");
2134
2135        let is_finalized = persist_client
2136            .is_finalized::<(), (), u64, i64>(shard_id, Diagnostics::for_tests())
2137            .await
2138            .expect("invalid persist usage");
2139        assert!(is_finalized, "shard must still be finalized");
2140    }
2141
2142    proptest! {
2143        #![proptest_config(ProptestConfig::with_cases(4096))]
2144
2145        #[mz_ore::test]
2146        #[cfg_attr(miri, ignore)] // too slow
2147        fn shard_id_protobuf_roundtrip(expect in any::<ShardId>() ) {
2148            let actual = protobuf_roundtrip::<_, String>(&expect);
2149            assert_ok!(actual);
2150            assert_eq!(actual.unwrap(), expect);
2151        }
2152    }
2153}