Skip to main content

mz_persist_client/
lib.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! An abstraction presenting as a durable time-varying collection (aka shard)
11
12#![warn(missing_docs, missing_debug_implementations)]
13// #[track_caller] is currently a no-op on async functions, but that hopefully won't be the case
14// forever. So we already annotate those functions now and ignore the compiler warning until
15// https://github.com/rust-lang/rust/issues/87417 pans out.
16#![allow(ungated_async_fn_track_caller)]
17
18use std::fmt::Debug;
19use std::marker::PhantomData;
20use std::sync::Arc;
21
22use differential_dataflow::difference::Monoid;
23use differential_dataflow::lattice::Lattice;
24use itertools::Itertools;
25use mz_build_info::{BuildInfo, build_info};
26use mz_dyncfg::ConfigSet;
27use mz_ore::instrument;
28use mz_persist::location::{Blob, Consensus, ExternalError};
29use mz_persist_types::schema::SchemaId;
30use mz_persist_types::{Codec, Codec64};
31use mz_proto::{IntoRustIfSome, ProtoType};
32use semver::Version;
33use timely::order::TotalOrder;
34use timely::progress::{Antichain, Timestamp};
35
36use crate::async_runtime::IsolatedRuntime;
37use crate::batch::{BATCH_DELETE_ENABLED, Batch, BatchBuilder, ProtoBatch};
38use crate::cache::{PersistClientCache, StateCache};
39use crate::cfg::PersistConfig;
40use crate::critical::{CriticalReaderId, Opaque, SinceHandle};
41use crate::error::InvalidUsage;
42use crate::fetch::{BatchFetcher, BatchFetcherConfig};
43use crate::internal::compact::{CompactConfig, Compactor};
44use crate::internal::encoding::parse_id;
45use crate::internal::gc::GarbageCollector;
46use crate::internal::machine::{Machine, retry_external};
47use crate::internal::state_versions::StateVersions;
48use crate::metrics::Metrics;
49use crate::read::{
50    Cursor, LazyPartStats, LeasedReaderId, READER_LEASE_DURATION, ReadHandle, Since,
51};
52use crate::rpc::PubSubSender;
53use crate::schema::CaESchema;
54use crate::write::{WriteHandle, WriterId};
55
56pub mod async_runtime;
57pub mod batch;
58pub mod cache;
59pub mod cfg;
60pub mod cli {
61    //! Persist command-line utilities
62    pub mod admin;
63    pub mod args;
64    pub mod bench;
65    pub mod inspect;
66}
67pub mod critical;
68pub mod error;
69pub mod fetch;
70pub mod internals_bench;
71pub mod iter;
72pub mod metrics {
73    //! Utilities related to metrics.
74    pub use crate::internal::metrics::{
75        Metrics, SinkMetrics, SinkWorkerMetrics, UpdateDelta, encode_ts_metric,
76    };
77}
78pub mod operators {
79    //! [timely] operators for reading and writing persist Shards.
80
81    use mz_dyncfg::Config;
82
83    pub mod shard_source;
84
85    // TODO(cfg): Move this next to the use.
86    pub(crate) const STORAGE_SOURCE_DECODE_FUEL: Config<usize> = Config::new(
87        "storage_source_decode_fuel",
88        100_000,
89        "\
90        The maximum amount of work to do in the persist_source mfp_and_decode \
91        operator before yielding.",
92    );
93}
94pub mod read;
95pub mod rpc;
96pub mod schema;
97pub mod stats;
98pub mod usage;
99pub mod write;
100
101/// An implementation of the public crate interface.
102mod internal {
103    pub mod apply;
104    pub mod cache;
105    pub mod compact;
106    pub mod encoding;
107    pub mod gc;
108    pub mod machine;
109    pub mod maintenance;
110    pub mod merge;
111    pub mod metrics;
112    pub mod paths;
113    pub mod restore;
114    pub mod service;
115    pub mod state;
116    pub mod state_diff;
117    pub mod state_versions;
118    pub mod trace;
119    pub mod watch;
120
121    #[cfg(test)]
122    pub mod datadriven;
123}
124
125/// Persist build information.
126pub const BUILD_INFO: BuildInfo = build_info!();
127
128// Re-export for convenience.
129pub use mz_persist_types::{PersistLocation, ShardId};
130
131pub use crate::internal::encoding::Schemas;
132
133/// Additional diagnostic information used within Persist
134/// e.g. for logging, metric labels, etc.
135#[derive(Clone, Debug)]
136pub struct Diagnostics {
137    /// A user-friendly name for the shard.
138    pub shard_name: String,
139    /// A purpose for the handle.
140    pub handle_purpose: String,
141}
142
143impl Diagnostics {
144    /// Create a new `Diagnostics` from `handle_purpose`.
145    pub fn from_purpose(handle_purpose: &str) -> Self {
146        Self {
147            shard_name: "unknown".to_string(),
148            handle_purpose: handle_purpose.to_string(),
149        }
150    }
151
152    /// Create a new `Diagnostics` for testing.
153    pub fn for_tests() -> Self {
154        Self {
155            shard_name: "test-shard-name".to_string(),
156            handle_purpose: "test-purpose".to_string(),
157        }
158    }
159}
160
161/// A handle for interacting with the set of persist shard made durable at a
162/// single [PersistLocation].
163///
164/// All async methods on PersistClient retry for as long as they are able, but
165/// the returned [std::future::Future]s implement "cancel on drop" semantics.
166/// This means that callers can add a timeout using [tokio::time::timeout] or
167/// [tokio::time::timeout_at].
168///
169/// ```rust,no_run
170/// # use std::sync::Arc;
171/// # use mz_persist_types::codec_impls::StringSchema;
172/// # let client: mz_persist_client::PersistClient = unimplemented!();
173/// # let timeout: std::time::Duration = unimplemented!();
174/// # let id = mz_persist_client::ShardId::new();
175/// # let diagnostics = mz_persist_client::Diagnostics { shard_name: "".into(), handle_purpose: "".into() };
176/// # async {
177/// tokio::time::timeout(timeout, client.open::<String, String, u64, i64>(id,
178///     Arc::new(StringSchema),Arc::new(StringSchema),diagnostics, true)).await
179/// # };
180/// ```
181#[derive(Debug, Clone)]
182pub struct PersistClient {
183    cfg: PersistConfig,
184    blob: Arc<dyn Blob>,
185    consensus: Arc<dyn Consensus>,
186    metrics: Arc<Metrics>,
187    isolated_runtime: Arc<IsolatedRuntime>,
188    shared_states: Arc<StateCache>,
189    pubsub_sender: Arc<dyn PubSubSender>,
190}
191
192impl PersistClient {
193    /// Returns a new client for interfacing with persist shards made durable to
194    /// the given [Blob] and [Consensus].
195    ///
196    /// This is exposed mostly for testing. Persist users likely want
197    /// [crate::cache::PersistClientCache::open].
198    pub fn new(
199        cfg: PersistConfig,
200        blob: Arc<dyn Blob>,
201        consensus: Arc<dyn Consensus>,
202        metrics: Arc<Metrics>,
203        isolated_runtime: Arc<IsolatedRuntime>,
204        shared_states: Arc<StateCache>,
205        pubsub_sender: Arc<dyn PubSubSender>,
206    ) -> Result<Self, ExternalError> {
207        // TODO: Verify somehow that blob matches consensus to prevent
208        // accidental misuse.
209        Ok(PersistClient {
210            cfg,
211            blob,
212            consensus,
213            metrics,
214            isolated_runtime,
215            shared_states,
216            pubsub_sender,
217        })
218    }
219
220    /// Returns a new in-mem [PersistClient] for tests and examples.
221    pub async fn new_for_tests() -> Self {
222        let cache = PersistClientCache::new_no_metrics();
223        cache
224            .open(PersistLocation::new_in_mem())
225            .await
226            .expect("in-mem location is valid")
227    }
228
229    /// Returns persist's [ConfigSet].
230    pub fn dyncfgs(&self) -> &ConfigSet {
231        &self.cfg.configs
232    }
233
234    async fn make_machine<K, V, T, D>(
235        &self,
236        shard_id: ShardId,
237        diagnostics: Diagnostics,
238    ) -> Result<Machine<K, V, T, D>, InvalidUsage<T>>
239    where
240        K: Debug + Codec,
241        V: Debug + Codec,
242        T: Timestamp + Lattice + Codec64 + Sync,
243        D: Monoid + Codec64 + Send + Sync,
244    {
245        let state_versions = StateVersions::new(
246            self.cfg.clone(),
247            Arc::clone(&self.consensus),
248            Arc::clone(&self.blob),
249            Arc::clone(&self.metrics),
250        );
251        let machine = Machine::<K, V, T, D>::new(
252            self.cfg.clone(),
253            shard_id,
254            Arc::clone(&self.metrics),
255            Arc::new(state_versions),
256            Arc::clone(&self.shared_states),
257            Arc::clone(&self.pubsub_sender),
258            Arc::clone(&self.isolated_runtime),
259            diagnostics.clone(),
260        )
261        .await?;
262        Ok(machine)
263    }
264
265    /// Provides capabilities for the durable TVC identified by `shard_id` at
266    /// its current since and upper frontiers.
267    ///
268    /// This method is a best-effort attempt to regain control of the frontiers
269    /// of a shard. Its most common uses are to recover capabilities that have
270    /// expired (leases) or to attempt to read a TVC that one did not create (or
271    /// otherwise receive capabilities for). If the frontiers have been fully
272    /// released by all other parties, this call may result in capabilities with
273    /// empty frontiers (which are useless).
274    ///
275    /// If `shard_id` has never been used before, initializes a new shard and
276    /// returns handles with `since` and `upper` frontiers set to initial values
277    /// of `Antichain::from_elem(T::minimum())`.
278    ///
279    /// The `schema` parameter is currently unused, but should be an object
280    /// that represents the schema of the data in the shard. This will be required
281    /// in the future.
282    #[instrument(level = "debug", fields(shard = %shard_id))]
283    pub async fn open<K, V, T, D>(
284        &self,
285        shard_id: ShardId,
286        key_schema: Arc<K::Schema>,
287        val_schema: Arc<V::Schema>,
288        diagnostics: Diagnostics,
289        use_critical_since: bool,
290    ) -> Result<(WriteHandle<K, V, T, D>, ReadHandle<K, V, T, D>), InvalidUsage<T>>
291    where
292        K: Debug + Codec,
293        V: Debug + Codec,
294        T: Timestamp + TotalOrder + Lattice + Codec64 + Sync,
295        D: Monoid + Ord + Codec64 + Send + Sync,
296    {
297        Ok((
298            self.open_writer(
299                shard_id,
300                Arc::clone(&key_schema),
301                Arc::clone(&val_schema),
302                diagnostics.clone(),
303            )
304            .await?,
305            self.open_leased_reader(
306                shard_id,
307                key_schema,
308                val_schema,
309                diagnostics,
310                use_critical_since,
311            )
312            .await?,
313        ))
314    }
315
316    /// [Self::open], but returning only a [ReadHandle].
317    ///
318    /// Use this to save latency and a bit of persist traffic if you're just
319    /// going to immediately drop or expire the [WriteHandle].
320    ///
321    /// The `_schema` parameter is currently unused, but should be an object
322    /// that represents the schema of the data in the shard. This will be required
323    /// in the future.
324    #[instrument(level = "debug", fields(shard = %shard_id))]
325    pub async fn open_leased_reader<K, V, T, D>(
326        &self,
327        shard_id: ShardId,
328        key_schema: Arc<K::Schema>,
329        val_schema: Arc<V::Schema>,
330        diagnostics: Diagnostics,
331        use_critical_since: bool,
332    ) -> Result<ReadHandle<K, V, T, D>, InvalidUsage<T>>
333    where
334        K: Debug + Codec,
335        V: Debug + Codec,
336        T: Timestamp + TotalOrder + Lattice + Codec64 + Sync,
337        D: Monoid + Codec64 + Send + Sync,
338    {
339        let machine = self.make_machine(shard_id, diagnostics.clone()).await?;
340        let gc = GarbageCollector::new(machine.clone(), Arc::clone(&self.isolated_runtime));
341
342        let reader_id = LeasedReaderId::new();
343        let heartbeat_ts = (self.cfg.now)();
344        let (reader_state, maintenance) = machine
345            .register_leased_reader(
346                &reader_id,
347                &diagnostics.handle_purpose,
348                READER_LEASE_DURATION.get(&self.cfg),
349                heartbeat_ts,
350                use_critical_since,
351            )
352            .await;
353        maintenance.start_performing(&machine, &gc);
354        let schemas = Schemas {
355            id: None,
356            key: key_schema,
357            val: val_schema,
358        };
359        let reader = ReadHandle::new(
360            self.cfg.clone(),
361            Arc::clone(&self.metrics),
362            machine,
363            gc,
364            Arc::clone(&self.blob),
365            reader_id,
366            schemas,
367            reader_state,
368        )
369        .await;
370
371        Ok(reader)
372    }
373
374    /// Creates and returns a [BatchFetcher] for the given shard id.
375    #[instrument(level = "debug", fields(shard = %shard_id))]
376    pub async fn create_batch_fetcher<K, V, T, D>(
377        &self,
378        shard_id: ShardId,
379        key_schema: Arc<K::Schema>,
380        val_schema: Arc<V::Schema>,
381        is_transient: bool,
382        diagnostics: Diagnostics,
383    ) -> Result<BatchFetcher<K, V, T, D>, InvalidUsage<T>>
384    where
385        K: Debug + Codec,
386        V: Debug + Codec,
387        T: Timestamp + Lattice + Codec64 + Sync,
388        D: Monoid + Codec64 + Send + Sync,
389    {
390        let machine = self.make_machine(shard_id, diagnostics.clone()).await?;
391        let read_schemas = Schemas {
392            id: None,
393            key: key_schema,
394            val: val_schema,
395        };
396        let schema_cache = machine.applier.schema_cache();
397        let fetcher = BatchFetcher {
398            cfg: BatchFetcherConfig::new(&self.cfg),
399            blob: Arc::clone(&self.blob),
400            metrics: Arc::clone(&self.metrics),
401            shard_metrics: Arc::clone(&machine.applier.shard_metrics),
402            shard_id,
403            read_schemas,
404            schema_cache,
405            is_transient,
406            _phantom: PhantomData,
407        };
408
409        Ok(fetcher)
410    }
411
412    /// A convenience [CriticalReaderId] for Materialize controllers.
413    ///
414    /// For most (soon to be all?) shards in Materialize, a centralized
415    /// "controller" is the authority for when a user no longer needs to read at
416    /// a given frontier. (Other uses are temporary holds where correctness of
417    /// the overall system can be maintained through a lease timeout.) To make
418    /// [SinceHandle] easier to work with, we offer this convenience id for
419    /// Materialize controllers, so they don't have to durably record it.
420    ///
421    /// TODO: We're still shaking out whether the controller should be the only
422    /// critical since hold or if there are other places we want them. If the
423    /// former, we should remove [CriticalReaderId] and bake in the singular
424    /// nature of the controller critical handle.
425    ///
426    /// ```rust
427    /// // This prints as something that is not 0 but is visually recognizable.
428    /// assert_eq!(
429    ///     mz_persist_client::PersistClient::CONTROLLER_CRITICAL_SINCE.to_string(),
430    ///     "c00000000-1111-2222-3333-444444444444",
431    /// )
432    /// ```
433    pub const CONTROLLER_CRITICAL_SINCE: CriticalReaderId =
434        CriticalReaderId([0, 0, 0, 0, 17, 17, 34, 34, 51, 51, 68, 68, 68, 68, 68, 68]);
435
436    /// Provides a capability for the durable TVC identified by `shard_id` at
437    /// its current since frontier.
438    ///
439    /// In contrast to the time-leased [ReadHandle] returned by [Self::open] and
440    /// [Self::open_leased_reader], this handle and its associated capability
441    /// are not leased. A [SinceHandle] does not release its since capability;
442    /// downgrade to the empty antichain to hold back the since.
443    /// Also unlike `ReadHandle`, the handle is not expired on drop.
444    /// This is less ergonomic, but useful for "critical" since
445    /// holds which must survive even lease timeouts.
446    ///
447    /// **IMPORTANT**: The above means that if a SinceHandle is registered and
448    /// then lost, the shard's since will be permanently "stuck", forever
449    /// preventing logical compaction. Users are advised to durably record
450    /// (preferably in code) the intended [CriticalReaderId] _before_ registering
451    /// a SinceHandle (in case the process crashes at the wrong time).
452    ///
453    /// If `shard_id` has never been used before, initializes a new shard and
454    /// return a handle with its `since` frontier set to the initial value of
455    /// `Antichain::from_elem(T::minimum())`.
456    #[instrument(level = "debug", fields(shard = %shard_id))]
457    pub async fn open_critical_since<K, V, T, D>(
458        &self,
459        shard_id: ShardId,
460        reader_id: CriticalReaderId,
461        default_opaque: Opaque,
462        diagnostics: Diagnostics,
463    ) -> Result<SinceHandle<K, V, T, D>, InvalidUsage<T>>
464    where
465        K: Debug + Codec,
466        V: Debug + Codec,
467        T: Timestamp + Lattice + Codec64 + Sync,
468        D: Monoid + Codec64 + Send + Sync,
469    {
470        let machine = self.make_machine(shard_id, diagnostics.clone()).await?;
471        let gc = GarbageCollector::new(machine.clone(), Arc::clone(&self.isolated_runtime));
472
473        let (state, maintenance) = machine
474            .register_critical_reader(&reader_id, default_opaque, &diagnostics.handle_purpose)
475            .await;
476        maintenance.start_performing(&machine, &gc);
477        let handle = SinceHandle::new(machine, gc, reader_id, state.since, state.opaque);
478
479        Ok(handle)
480    }
481
482    /// [Self::open], but returning only a [WriteHandle].
483    ///
484    /// Use this to save latency and a bit of persist traffic if you're just
485    /// going to immediately drop or expire the [ReadHandle].
486    #[instrument(level = "debug", fields(shard = %shard_id))]
487    pub async fn open_writer<K, V, T, D>(
488        &self,
489        shard_id: ShardId,
490        key_schema: Arc<K::Schema>,
491        val_schema: Arc<V::Schema>,
492        diagnostics: Diagnostics,
493    ) -> Result<WriteHandle<K, V, T, D>, InvalidUsage<T>>
494    where
495        K: Debug + Codec,
496        V: Debug + Codec,
497        T: Timestamp + TotalOrder + Lattice + Codec64 + Sync,
498        D: Monoid + Ord + Codec64 + Send + Sync,
499    {
500        let machine = self.make_machine(shard_id, diagnostics.clone()).await?;
501        let gc = GarbageCollector::new(machine.clone(), Arc::clone(&self.isolated_runtime));
502
503        // We defer registering the schema until write time, to allow opening
504        // write handles in a "read-only" mode where they don't implicitly
505        // modify persist state. But it might already be registered, in which
506        // case we can fetch its ID.
507        let schema_id = machine.find_schema(&*key_schema, &*val_schema);
508
509        let writer_id = WriterId::new();
510        let schemas = Schemas {
511            id: schema_id,
512            key: key_schema,
513            val: val_schema,
514        };
515        let writer = WriteHandle::new(
516            self.cfg.clone(),
517            Arc::clone(&self.metrics),
518            machine,
519            gc,
520            Arc::clone(&self.blob),
521            writer_id,
522            &diagnostics.handle_purpose,
523            schemas,
524        );
525        Ok(writer)
526    }
527
528    /// Returns a [BatchBuilder] that can be used to write a batch of updates to
529    /// blob storage which can then be appended to the given shard using
530    /// [WriteHandle::compare_and_append_batch] or [WriteHandle::append_batch],
531    /// or which can be read using [PersistClient::read_batches_consolidated].
532    ///
533    /// The builder uses a bounded amount of memory, even when the number of
534    /// updates is very large. Individual records, however, should be small
535    /// enough that we can reasonably chunk them up: O(KB) is definitely fine,
536    /// O(MB) come talk to us.
537    #[instrument(level = "debug", fields(shard = %shard_id))]
538    pub async fn batch_builder<K, V, T, D>(
539        &self,
540        shard_id: ShardId,
541        write_schemas: Schemas<K, V>,
542        lower: Antichain<T>,
543        max_runs: Option<usize>,
544    ) -> BatchBuilder<K, V, T, D>
545    where
546        K: Debug + Codec,
547        V: Debug + Codec,
548        T: Timestamp + Lattice + Codec64 + TotalOrder + Sync,
549        D: Monoid + Ord + Codec64 + Send + Sync,
550    {
551        let mut compact_cfg = CompactConfig::new(&self.cfg, shard_id);
552        compact_cfg.batch.max_runs = max_runs;
553        WriteHandle::builder_inner(
554            &self.cfg,
555            compact_cfg,
556            Arc::clone(&self.metrics),
557            self.metrics.shards.shard(&shard_id, "peek_stash"),
558            &self.metrics.user,
559            Arc::clone(&self.isolated_runtime),
560            Arc::clone(&self.blob),
561            shard_id,
562            write_schemas,
563            lower,
564        )
565    }
566
567    /// Turns the given [`ProtoBatch`] back into a [`Batch`] which can be used
568    /// to append it to the given shard or to read it via
569    /// [PersistClient::read_batches_consolidated]
570    ///
571    /// CAUTION: This API allows turning a [ProtoBatch] into a [Batch] multiple
572    /// times, but if a batch is deleted the backing data goes away, so at that
573    /// point all in-memory copies of a batch become invalid and cannot be read
574    /// anymore.
575    pub fn batch_from_transmittable_batch<K, V, T, D>(
576        &self,
577        shard_id: &ShardId,
578        batch: ProtoBatch,
579    ) -> Batch<K, V, T, D>
580    where
581        K: Debug + Codec,
582        V: Debug + Codec,
583        T: Timestamp + Lattice + Codec64 + Sync,
584        D: Monoid + Ord + Codec64 + Send + Sync,
585    {
586        let batch_shard_id: ShardId = batch
587            .shard_id
588            .into_rust()
589            .expect("valid transmittable batch");
590        assert_eq!(&batch_shard_id, shard_id);
591
592        let shard_metrics = self.metrics.shards.shard(shard_id, "peek_stash");
593
594        let ret = Batch {
595            batch_delete_enabled: BATCH_DELETE_ENABLED.get(&self.cfg),
596            metrics: Arc::clone(&self.metrics),
597            shard_metrics,
598            version: Version::parse(&batch.version).expect("valid transmittable batch"),
599            schemas: (batch.key_schema, batch.val_schema),
600            batch: batch
601                .batch
602                .into_rust_if_some("ProtoBatch::batch")
603                .expect("valid transmittable batch"),
604            blob: Arc::clone(&self.blob),
605            _phantom: std::marker::PhantomData,
606        };
607
608        assert_eq!(&ret.shard_id(), shard_id);
609        ret
610    }
611
612    /// Returns a [Cursor] for reading the given batches. Yielded updates are
613    /// consolidated if the given batches contain sorted runs, which is true
614    /// when they have been written using a [BatchBuilder].
615    ///
616    /// To keep memory usage down when reading a snapshot that consolidates
617    /// well, this consolidates as it goes. However, note that only the
618    /// serialized data is consolidated: the deserialized data will only be
619    /// consolidated if your K/V codecs are one-to-one.
620    ///
621    /// CAUTION: The caller needs to make sure that the given batches are
622    /// readable and they have to remain readable for the lifetime of the
623    /// returned [Cursor]. The caller is also responsible for the lifecycle of
624    /// the batches: once the cursor and the batches are no longer needed you
625    /// must call [Cursor::into_lease] to get back the batches and delete them.
626    #[allow(clippy::unused_async)]
627    pub async fn read_batches_consolidated<K, V, T, D>(
628        &mut self,
629        shard_id: ShardId,
630        as_of: Antichain<T>,
631        read_schemas: Schemas<K, V>,
632        batches: Vec<Batch<K, V, T, D>>,
633        should_fetch_part: impl for<'a> Fn(Option<&'a LazyPartStats>) -> bool,
634        memory_budget_bytes: usize,
635    ) -> Result<Cursor<K, V, T, D, Vec<Batch<K, V, T, D>>>, Since<T>>
636    where
637        K: Debug + Codec + Ord,
638        V: Debug + Codec + Ord,
639        T: Timestamp + Lattice + Codec64 + TotalOrder + Sync,
640        D: Monoid + Ord + Codec64 + Send + Sync,
641    {
642        let shard_metrics = self.metrics.shards.shard(&shard_id, "peek_stash");
643
644        let hollow_batches = batches.iter().map(|b| b.batch.clone()).collect_vec();
645
646        ReadHandle::read_batches_consolidated(
647            &self.cfg,
648            Arc::clone(&self.metrics),
649            shard_metrics,
650            self.metrics.read.snapshot.clone(),
651            Arc::clone(&self.blob),
652            shard_id,
653            as_of,
654            read_schemas,
655            &hollow_batches,
656            batches,
657            should_fetch_part,
658            memory_budget_bytes,
659        )
660    }
661
662    /// Returns the requested schema, if known at the current state.
663    pub async fn get_schema<K, V, T, D>(
664        &self,
665        shard_id: ShardId,
666        schema_id: SchemaId,
667        diagnostics: Diagnostics,
668    ) -> Result<Option<(K::Schema, V::Schema)>, InvalidUsage<T>>
669    where
670        K: Debug + Codec,
671        V: Debug + Codec,
672        T: Timestamp + Lattice + Codec64 + Sync,
673        D: Monoid + Codec64 + Send + Sync,
674    {
675        let machine = self
676            .make_machine::<K, V, T, D>(shard_id, diagnostics)
677            .await?;
678        Ok(machine.get_schema(schema_id))
679    }
680
681    /// Returns the latest schema registered at the current state.
682    pub async fn latest_schema<K, V, T, D>(
683        &self,
684        shard_id: ShardId,
685        diagnostics: Diagnostics,
686    ) -> Result<Option<(SchemaId, K::Schema, V::Schema)>, InvalidUsage<T>>
687    where
688        K: Debug + Codec,
689        V: Debug + Codec,
690        T: Timestamp + Lattice + Codec64 + Sync,
691        D: Monoid + Codec64 + Send + Sync,
692    {
693        let machine = self
694            .make_machine::<K, V, T, D>(shard_id, diagnostics)
695            .await?;
696        Ok(machine.latest_schema())
697    }
698
699    /// Registers a schema for the given shard.
700    ///
701    /// Returns the new schema ID if the registration succeeds, and `None`
702    /// otherwise. Schema registration succeeds in two cases:
703    ///  a) No schema was currently registered for the shard.
704    ///  b) The given schema is already registered for the shard.
705    ///
706    /// To evolve an existing schema instead, use
707    /// [PersistClient::compare_and_evolve_schema].
708    //
709    // TODO: unify with `compare_and_evolve_schema`
710    pub async fn register_schema<K, V, T, D>(
711        &self,
712        shard_id: ShardId,
713        key_schema: &K::Schema,
714        val_schema: &V::Schema,
715        diagnostics: Diagnostics,
716    ) -> Result<Option<SchemaId>, InvalidUsage<T>>
717    where
718        K: Debug + Codec,
719        V: Debug + Codec,
720        T: Timestamp + Lattice + Codec64 + Sync,
721        D: Monoid + Codec64 + Send + Sync,
722    {
723        let machine = self
724            .make_machine::<K, V, T, D>(shard_id, diagnostics)
725            .await?;
726        let gc = GarbageCollector::new(machine.clone(), Arc::clone(&self.isolated_runtime));
727
728        let (schema_id, maintenance) = machine.register_schema(key_schema, val_schema).await;
729        maintenance.start_performing(&machine, &gc);
730
731        Ok(schema_id)
732    }
733
734    /// Registers a new latest schema for the given shard.
735    ///
736    /// This new schema must be [backward_compatible] with all previous schemas
737    /// for this shard. If it's not, [CaESchema::Incompatible] is returned.
738    ///
739    /// [backward_compatible]: mz_persist_types::schema::backward_compatible
740    ///
741    /// To prevent races, the caller must declare what it believes to be the
742    /// latest schema id. If this doesn't match reality,
743    /// [CaESchema::ExpectedMismatch] is returned.
744    pub async fn compare_and_evolve_schema<K, V, T, D>(
745        &self,
746        shard_id: ShardId,
747        expected: SchemaId,
748        key_schema: &K::Schema,
749        val_schema: &V::Schema,
750        diagnostics: Diagnostics,
751    ) -> Result<CaESchema<K, V>, InvalidUsage<T>>
752    where
753        K: Debug + Codec,
754        V: Debug + Codec,
755        T: Timestamp + Lattice + Codec64 + Sync,
756        D: Monoid + Codec64 + Send + Sync,
757    {
758        let machine = self
759            .make_machine::<K, V, T, D>(shard_id, diagnostics)
760            .await?;
761        let gc = GarbageCollector::new(machine.clone(), Arc::clone(&self.isolated_runtime));
762        let (res, maintenance) = machine
763            .compare_and_evolve_schema(expected, key_schema, val_schema)
764            .await;
765        maintenance.start_performing(&machine, &gc);
766        Ok(res)
767    }
768
769    /// Check if the given shard is in a finalized state; ie. it can no longer be
770    /// read, any data that was written to it is no longer accessible, and we've
771    /// discarded references to that data from state.
772    pub async fn is_finalized<K, V, T, D>(
773        &self,
774        shard_id: ShardId,
775        diagnostics: Diagnostics,
776    ) -> Result<bool, InvalidUsage<T>>
777    where
778        K: Debug + Codec,
779        V: Debug + Codec,
780        T: Timestamp + Lattice + Codec64 + Sync,
781        D: Monoid + Codec64 + Send + Sync,
782    {
783        let machine = self
784            .make_machine::<K, V, T, D>(shard_id, diagnostics)
785            .await?;
786        Ok(machine.is_finalized())
787    }
788
789    /// If a shard is guaranteed to never be used again, finalize it to delete
790    /// the associated data and release any associated resources. (Except for a
791    /// little state in consensus we use to represent the tombstone.)
792    ///
793    /// The caller should ensure that both the `since` and `upper` of the shard
794    /// have been advanced to `[]`: ie. the shard is no longer writable or readable.
795    /// Otherwise an error is returned.
796    ///
797    /// Once `finalize_shard` has been called, the result of future operations on
798    /// the shard are not defined. They may return errors or succeed as a noop.
799    #[instrument(level = "debug", fields(shard = %shard_id))]
800    pub async fn finalize_shard<K, V, T, D>(
801        &self,
802        shard_id: ShardId,
803        diagnostics: Diagnostics,
804    ) -> Result<(), InvalidUsage<T>>
805    where
806        K: Debug + Codec,
807        V: Debug + Codec,
808        T: Timestamp + Lattice + Codec64 + Sync,
809        D: Monoid + Codec64 + Send + Sync,
810    {
811        let machine = self
812            .make_machine::<K, V, T, D>(shard_id, diagnostics)
813            .await?;
814
815        let maintenance = machine.become_tombstone().await?;
816        let gc = GarbageCollector::new(machine.clone(), Arc::clone(&self.isolated_runtime));
817
818        let () = maintenance.perform(&machine, &gc).await;
819
820        Ok(())
821    }
822
823    /// Upgrade the state to the latest version. This should only be called once we will no longer
824    /// need to interoperate with older versions, like after a successful upgrade.
825    pub async fn upgrade_version<K, V, T, D>(
826        &self,
827        shard_id: ShardId,
828        diagnostics: Diagnostics,
829    ) -> Result<(), InvalidUsage<T>>
830    where
831        K: Debug + Codec,
832        V: Debug + Codec,
833        T: Timestamp + Lattice + Codec64 + Sync,
834        D: Monoid + Codec64 + Send + Sync,
835    {
836        let machine = self
837            .make_machine::<K, V, T, D>(shard_id, diagnostics)
838            .await?;
839
840        match machine.upgrade_version().await {
841            Ok(maintenance) => {
842                let gc = GarbageCollector::new(machine.clone(), Arc::clone(&self.isolated_runtime));
843                let () = maintenance.perform(&machine, &gc).await;
844                Ok(())
845            }
846            Err(version) => Err(InvalidUsage::IncompatibleVersion { version }),
847        }
848    }
849
850    /// Returns the internal state of the shard for debugging and QA.
851    ///
852    /// We'll be thoughtful about making unnecessary changes, but the **output
853    /// of this method needs to be gated from users**, so that it's not subject
854    /// to our backward compatibility guarantees.
855    pub async fn inspect_shard<T: Timestamp + Lattice + Codec64>(
856        &self,
857        shard_id: &ShardId,
858    ) -> Result<impl serde::Serialize, anyhow::Error> {
859        let state_versions = StateVersions::new(
860            self.cfg.clone(),
861            Arc::clone(&self.consensus),
862            Arc::clone(&self.blob),
863            Arc::clone(&self.metrics),
864        );
865        // TODO: Don't fetch all live diffs. Feels like we should pull out a new
866        // method in StateVersions for fetching the latest version of State of a
867        // shard that might or might not exist.
868        let versions = state_versions.fetch_all_live_diffs(shard_id).await;
869        if versions.is_empty() {
870            return Err(anyhow::anyhow!("{} does not exist", shard_id));
871        }
872        let state = state_versions
873            .fetch_current_state::<T>(shard_id, versions)
874            .await;
875        let state = state.check_ts_codec(shard_id)?;
876        Ok(state)
877    }
878
879    /// Test helper for a [Self::open] call that is expected to succeed.
880    #[cfg(test)]
881    #[track_caller]
882    pub async fn expect_open<K, V, T, D>(
883        &self,
884        shard_id: ShardId,
885    ) -> (WriteHandle<K, V, T, D>, ReadHandle<K, V, T, D>)
886    where
887        K: Debug + Codec,
888        V: Debug + Codec,
889        T: Timestamp + TotalOrder + Lattice + Codec64 + Sync,
890        D: Monoid + Ord + Codec64 + Send + Sync,
891        K::Schema: Default,
892        V::Schema: Default,
893    {
894        self.open(
895            shard_id,
896            Arc::new(K::Schema::default()),
897            Arc::new(V::Schema::default()),
898            Diagnostics::for_tests(),
899            true,
900        )
901        .await
902        .expect("codec mismatch")
903    }
904
905    /// Return the metrics being used by this client.
906    ///
907    /// Only exposed for tests, persistcli, and benchmarks.
908    pub fn metrics(&self) -> &Arc<Metrics> {
909        &self.metrics
910    }
911}
912
913#[cfg(test)]
914mod tests {
915    use std::future::Future;
916    use std::pin::Pin;
917    use std::task::Context;
918    use std::time::Duration;
919
920    use differential_dataflow::consolidation::consolidate_updates;
921    use differential_dataflow::lattice::Lattice;
922    use futures_task::noop_waker;
923    use mz_dyncfg::ConfigUpdates;
924    use mz_ore::assert_ok;
925    use mz_persist::indexed::encoding::BlobTraceBatchPart;
926    use mz_persist::workload::DataGenerator;
927    use mz_persist_types::codec_impls::{StringSchema, VecU8Schema};
928    use mz_proto::protobuf_roundtrip;
929    use proptest::prelude::*;
930    use timely::order::PartialOrder;
931    use timely::progress::Antichain;
932
933    use crate::batch::BLOB_TARGET_SIZE;
934    use crate::cache::PersistClientCache;
935    use crate::cfg::BATCH_BUILDER_MAX_OUTSTANDING_PARTS;
936    use crate::critical::Opaque;
937    use crate::error::{CodecConcreteType, CodecMismatch, UpperMismatch};
938    use crate::internal::paths::BlobKey;
939    use crate::read::ListenEvent;
940
941    use super::*;
942
943    pub fn new_test_client_cache(dyncfgs: &ConfigUpdates) -> PersistClientCache {
944        // Configure an aggressively small blob_target_size so we get some
945        // amount of coverage of that in tests. Similarly, for max_outstanding.
946        let mut cache = PersistClientCache::new_no_metrics();
947        cache.cfg.set_config(&BLOB_TARGET_SIZE, 10);
948        cache
949            .cfg
950            .set_config(&BATCH_BUILDER_MAX_OUTSTANDING_PARTS, 1);
951        dyncfgs.apply(cache.cfg());
952
953        // Enable compaction in tests to ensure we get coverage.
954        cache.cfg.compaction_enabled = true;
955        cache
956    }
957
958    pub async fn new_test_client(dyncfgs: &ConfigUpdates) -> PersistClient {
959        let cache = new_test_client_cache(dyncfgs);
960        cache
961            .open(PersistLocation::new_in_mem())
962            .await
963            .expect("client construction failed")
964    }
965
966    pub fn all_ok<'a, K, V, T, D, I>(iter: I, as_of: T) -> Vec<((K, V), T, D)>
967    where
968        K: Ord + Clone + 'a,
969        V: Ord + Clone + 'a,
970        T: Timestamp + Lattice + Clone + 'a,
971        D: Monoid + Clone + 'a,
972        I: IntoIterator<Item = &'a ((K, V), T, D)>,
973    {
974        let as_of = Antichain::from_elem(as_of);
975        let mut ret = iter
976            .into_iter()
977            .map(|((k, v), t, d)| {
978                let mut t = t.clone();
979                t.advance_by(as_of.borrow());
980                ((k.clone(), v.clone()), t, d.clone())
981            })
982            .collect();
983        consolidate_updates(&mut ret);
984        ret
985    }
986
987    pub async fn expect_fetch_part<K, V, T, D>(
988        blob: &dyn Blob,
989        key: &BlobKey,
990        metrics: &Metrics,
991        read_schemas: &Schemas<K, V>,
992    ) -> (BlobTraceBatchPart<T>, Vec<((K, V), T, D)>)
993    where
994        K: Codec + Clone,
995        V: Codec + Clone,
996        T: Timestamp + Codec64,
997        D: Codec64,
998    {
999        let value = blob
1000            .get(key)
1001            .await
1002            .expect("failed to fetch part")
1003            .expect("missing part");
1004        let mut part =
1005            BlobTraceBatchPart::decode(&value, &metrics.columnar).expect("failed to decode part");
1006        let structured = part
1007            .updates
1008            .into_part::<K, V>(&*read_schemas.key, &*read_schemas.val);
1009        let updates = structured
1010            .decode_iter::<K, V, T, D>(&*read_schemas.key, &*read_schemas.val)
1011            .expect("structured data")
1012            .collect();
1013        (part, updates)
1014    }
1015
1016    #[mz_persist_proc::test(tokio::test)]
1017    #[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented
1018    async fn sanity_check(dyncfgs: ConfigUpdates) {
1019        let data = [
1020            (("1".to_owned(), "one".to_owned()), 1, 1),
1021            (("2".to_owned(), "two".to_owned()), 2, 1),
1022            (("3".to_owned(), "three".to_owned()), 3, 1),
1023        ];
1024
1025        let (mut write, mut read) = new_test_client(&dyncfgs)
1026            .await
1027            .expect_open::<String, String, u64, i64>(ShardId::new())
1028            .await;
1029        assert_eq!(write.upper(), &Antichain::from_elem(u64::minimum()));
1030        assert_eq!(read.since(), &Antichain::from_elem(u64::minimum()));
1031
1032        // Write a [0,3) batch.
1033        write
1034            .expect_append(&data[..2], write.upper().clone(), vec![3])
1035            .await;
1036        assert_eq!(write.upper(), &Antichain::from_elem(3));
1037
1038        // Grab a snapshot and listener as_of 1. Snapshot should only have part of what we wrote.
1039        assert_eq!(
1040            read.expect_snapshot_and_fetch(1).await,
1041            all_ok(&data[..1], 1)
1042        );
1043
1044        let mut listen = read.clone("").await.expect_listen(1).await;
1045
1046        // Write a [3,4) batch.
1047        write
1048            .expect_append(&data[2..], write.upper().clone(), vec![4])
1049            .await;
1050        assert_eq!(write.upper(), &Antichain::from_elem(4));
1051
1052        // Listen should have part of the initial write plus the new one.
1053        assert_eq!(
1054            listen.read_until(&4).await,
1055            (all_ok(&data[1..], 1), Antichain::from_elem(4))
1056        );
1057
1058        // Downgrading the since is tracked locally (but otherwise is a no-op).
1059        read.downgrade_since(&Antichain::from_elem(2)).await;
1060        assert_eq!(read.since(), &Antichain::from_elem(2));
1061    }
1062
1063    // Sanity check that the open_reader and open_writer calls work.
1064    #[mz_persist_proc::test(tokio::test)]
1065    #[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented
1066    async fn open_reader_writer(dyncfgs: ConfigUpdates) {
1067        let data = vec![
1068            (("1".to_owned(), "one".to_owned()), 1, 1),
1069            (("2".to_owned(), "two".to_owned()), 2, 1),
1070            (("3".to_owned(), "three".to_owned()), 3, 1),
1071        ];
1072
1073        let shard_id = ShardId::new();
1074        let client = new_test_client(&dyncfgs).await;
1075        let mut write1 = client
1076            .open_writer::<String, String, u64, i64>(
1077                shard_id,
1078                Arc::new(StringSchema),
1079                Arc::new(StringSchema),
1080                Diagnostics::for_tests(),
1081            )
1082            .await
1083            .expect("codec mismatch");
1084        let mut read1 = client
1085            .open_leased_reader::<String, String, u64, i64>(
1086                shard_id,
1087                Arc::new(StringSchema),
1088                Arc::new(StringSchema),
1089                Diagnostics::for_tests(),
1090                true,
1091            )
1092            .await
1093            .expect("codec mismatch");
1094        let mut read2 = client
1095            .open_leased_reader::<String, String, u64, i64>(
1096                shard_id,
1097                Arc::new(StringSchema),
1098                Arc::new(StringSchema),
1099                Diagnostics::for_tests(),
1100                true,
1101            )
1102            .await
1103            .expect("codec mismatch");
1104        let mut write2 = client
1105            .open_writer::<String, String, u64, i64>(
1106                shard_id,
1107                Arc::new(StringSchema),
1108                Arc::new(StringSchema),
1109                Diagnostics::for_tests(),
1110            )
1111            .await
1112            .expect("codec mismatch");
1113
1114        write2.expect_compare_and_append(&data[..1], 0, 2).await;
1115        assert_eq!(
1116            read2.expect_snapshot_and_fetch(1).await,
1117            all_ok(&data[..1], 1)
1118        );
1119        write1.expect_compare_and_append(&data[1..], 2, 4).await;
1120        assert_eq!(read1.expect_snapshot_and_fetch(3).await, all_ok(&data, 3));
1121    }
1122
1123    #[mz_persist_proc::test(tokio::test)]
1124    #[cfg_attr(miri, ignore)] // too slow
1125    async fn invalid_usage(dyncfgs: ConfigUpdates) {
1126        let data = vec![
1127            (("1".to_owned(), "one".to_owned()), 1, 1),
1128            (("2".to_owned(), "two".to_owned()), 2, 1),
1129            (("3".to_owned(), "three".to_owned()), 3, 1),
1130        ];
1131
1132        let shard_id0 = "s00000000-0000-0000-0000-000000000000"
1133            .parse::<ShardId>()
1134            .expect("invalid shard id");
1135        let mut client = new_test_client(&dyncfgs).await;
1136
1137        let (mut write0, mut read0) = client
1138            .expect_open::<String, String, u64, i64>(shard_id0)
1139            .await;
1140
1141        write0.expect_compare_and_append(&data, 0, 4).await;
1142
1143        // InvalidUsage from PersistClient methods.
1144        {
1145            fn codecs(
1146                k: &str,
1147                v: &str,
1148                t: &str,
1149                d: &str,
1150            ) -> (String, String, String, String, Option<CodecConcreteType>) {
1151                (k.to_owned(), v.to_owned(), t.to_owned(), d.to_owned(), None)
1152            }
1153
1154            client.shared_states = Arc::new(StateCache::new_no_metrics());
1155            assert_eq!(
1156                client
1157                    .open::<Vec<u8>, String, u64, i64>(
1158                        shard_id0,
1159                        Arc::new(VecU8Schema),
1160                        Arc::new(StringSchema),
1161                        Diagnostics::for_tests(),
1162                        true,
1163                    )
1164                    .await
1165                    .unwrap_err(),
1166                InvalidUsage::CodecMismatch(Box::new(CodecMismatch {
1167                    requested: codecs("Vec<u8>", "String", "u64", "i64"),
1168                    actual: codecs("String", "String", "u64", "i64"),
1169                }))
1170            );
1171            assert_eq!(
1172                client
1173                    .open::<String, Vec<u8>, u64, i64>(
1174                        shard_id0,
1175                        Arc::new(StringSchema),
1176                        Arc::new(VecU8Schema),
1177                        Diagnostics::for_tests(),
1178                        true,
1179                    )
1180                    .await
1181                    .unwrap_err(),
1182                InvalidUsage::CodecMismatch(Box::new(CodecMismatch {
1183                    requested: codecs("String", "Vec<u8>", "u64", "i64"),
1184                    actual: codecs("String", "String", "u64", "i64"),
1185                }))
1186            );
1187            assert_eq!(
1188                client
1189                    .open::<String, String, i64, i64>(
1190                        shard_id0,
1191                        Arc::new(StringSchema),
1192                        Arc::new(StringSchema),
1193                        Diagnostics::for_tests(),
1194                        true,
1195                    )
1196                    .await
1197                    .unwrap_err(),
1198                InvalidUsage::CodecMismatch(Box::new(CodecMismatch {
1199                    requested: codecs("String", "String", "i64", "i64"),
1200                    actual: codecs("String", "String", "u64", "i64"),
1201                }))
1202            );
1203            assert_eq!(
1204                client
1205                    .open::<String, String, u64, u64>(
1206                        shard_id0,
1207                        Arc::new(StringSchema),
1208                        Arc::new(StringSchema),
1209                        Diagnostics::for_tests(),
1210                        true,
1211                    )
1212                    .await
1213                    .unwrap_err(),
1214                InvalidUsage::CodecMismatch(Box::new(CodecMismatch {
1215                    requested: codecs("String", "String", "u64", "u64"),
1216                    actual: codecs("String", "String", "u64", "i64"),
1217                }))
1218            );
1219
1220            // open_reader and open_writer end up using the same checks, so just
1221            // verify one type each to verify the plumbing instead of the full
1222            // set.
1223            assert_eq!(
1224                client
1225                    .open_leased_reader::<Vec<u8>, String, u64, i64>(
1226                        shard_id0,
1227                        Arc::new(VecU8Schema),
1228                        Arc::new(StringSchema),
1229                        Diagnostics::for_tests(),
1230                        true,
1231                    )
1232                    .await
1233                    .unwrap_err(),
1234                InvalidUsage::CodecMismatch(Box::new(CodecMismatch {
1235                    requested: codecs("Vec<u8>", "String", "u64", "i64"),
1236                    actual: codecs("String", "String", "u64", "i64"),
1237                }))
1238            );
1239            assert_eq!(
1240                client
1241                    .open_writer::<Vec<u8>, String, u64, i64>(
1242                        shard_id0,
1243                        Arc::new(VecU8Schema),
1244                        Arc::new(StringSchema),
1245                        Diagnostics::for_tests(),
1246                    )
1247                    .await
1248                    .unwrap_err(),
1249                InvalidUsage::CodecMismatch(Box::new(CodecMismatch {
1250                    requested: codecs("Vec<u8>", "String", "u64", "i64"),
1251                    actual: codecs("String", "String", "u64", "i64"),
1252                }))
1253            );
1254        }
1255
1256        // InvalidUsage from ReadHandle methods.
1257        {
1258            let snap = read0
1259                .snapshot(Antichain::from_elem(3))
1260                .await
1261                .expect("cannot serve requested as_of");
1262
1263            let shard_id1 = "s11111111-1111-1111-1111-111111111111"
1264                .parse::<ShardId>()
1265                .expect("invalid shard id");
1266            let mut fetcher1 = client
1267                .create_batch_fetcher::<String, String, u64, i64>(
1268                    shard_id1,
1269                    Default::default(),
1270                    Default::default(),
1271                    false,
1272                    Diagnostics::for_tests(),
1273                )
1274                .await
1275                .unwrap();
1276            for part in snap {
1277                let (part, _lease) = part.into_exchangeable_part();
1278                let res = fetcher1.fetch_leased_part(part).await;
1279                assert_eq!(
1280                    res.unwrap_err(),
1281                    InvalidUsage::BatchNotFromThisShard {
1282                        batch_shard: shard_id0,
1283                        handle_shard: shard_id1,
1284                    }
1285                );
1286            }
1287        }
1288
1289        // InvalidUsage from WriteHandle methods.
1290        {
1291            let ts3 = &data[2];
1292            assert_eq!(ts3.1, 3);
1293            let ts3 = vec![ts3.clone()];
1294
1295            // WriteHandle::append also covers append_batch,
1296            // compare_and_append_batch, compare_and_append.
1297            assert_eq!(
1298                write0
1299                    .append(&ts3, Antichain::from_elem(4), Antichain::from_elem(5))
1300                    .await
1301                    .unwrap_err(),
1302                InvalidUsage::UpdateNotBeyondLower {
1303                    ts: 3,
1304                    lower: Antichain::from_elem(4),
1305                },
1306            );
1307            assert_eq!(
1308                write0
1309                    .append(&ts3, Antichain::from_elem(2), Antichain::from_elem(3))
1310                    .await
1311                    .unwrap_err(),
1312                InvalidUsage::UpdateBeyondUpper {
1313                    ts: 3,
1314                    expected_upper: Antichain::from_elem(3),
1315                },
1316            );
1317            // NB unlike the previous tests, this one has empty updates.
1318            assert_eq!(
1319                write0
1320                    .append(&data[..0], Antichain::from_elem(3), Antichain::from_elem(2))
1321                    .await
1322                    .unwrap_err(),
1323                InvalidUsage::InvalidBounds {
1324                    lower: Antichain::from_elem(3),
1325                    upper: Antichain::from_elem(2),
1326                },
1327            );
1328
1329            // Tests for the BatchBuilder.
1330            assert_eq!(
1331                write0
1332                    .builder(Antichain::from_elem(3))
1333                    .finish(Antichain::from_elem(2))
1334                    .await
1335                    .unwrap_err(),
1336                InvalidUsage::InvalidBounds {
1337                    lower: Antichain::from_elem(3),
1338                    upper: Antichain::from_elem(2)
1339                },
1340            );
1341            let batch = write0
1342                .batch(&ts3, Antichain::from_elem(3), Antichain::from_elem(4))
1343                .await
1344                .expect("invalid usage");
1345            assert_eq!(
1346                write0
1347                    .append_batch(batch, Antichain::from_elem(4), Antichain::from_elem(5))
1348                    .await
1349                    .unwrap_err(),
1350                InvalidUsage::InvalidBatchBounds {
1351                    batch_lower: Antichain::from_elem(3),
1352                    batch_upper: Antichain::from_elem(4),
1353                    append_lower: Antichain::from_elem(4),
1354                    append_upper: Antichain::from_elem(5),
1355                },
1356            );
1357            let batch = write0
1358                .batch(&ts3, Antichain::from_elem(3), Antichain::from_elem(4))
1359                .await
1360                .expect("invalid usage");
1361            assert_eq!(
1362                write0
1363                    .append_batch(batch, Antichain::from_elem(2), Antichain::from_elem(3))
1364                    .await
1365                    .unwrap_err(),
1366                InvalidUsage::InvalidBatchBounds {
1367                    batch_lower: Antichain::from_elem(3),
1368                    batch_upper: Antichain::from_elem(4),
1369                    append_lower: Antichain::from_elem(2),
1370                    append_upper: Antichain::from_elem(3),
1371                },
1372            );
1373            let batch = write0
1374                .batch(&ts3, Antichain::from_elem(3), Antichain::from_elem(4))
1375                .await
1376                .expect("invalid usage");
1377            // NB unlike the others, this one uses matches! because it's
1378            // non-deterministic (the key)
1379            assert!(matches!(
1380                write0
1381                    .append_batch(batch, Antichain::from_elem(3), Antichain::from_elem(3))
1382                    .await
1383                    .unwrap_err(),
1384                InvalidUsage::InvalidEmptyTimeInterval { .. }
1385            ));
1386        }
1387    }
1388
1389    #[mz_persist_proc::test(tokio::test)]
1390    #[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented
1391    async fn multiple_shards(dyncfgs: ConfigUpdates) {
1392        let data1 = [
1393            (("1".to_owned(), "one".to_owned()), 1, 1),
1394            (("2".to_owned(), "two".to_owned()), 2, 1),
1395        ];
1396
1397        let data2 = [(("1".to_owned(), ()), 1, 1), (("2".to_owned(), ()), 2, 1)];
1398
1399        let client = new_test_client(&dyncfgs).await;
1400
1401        let (mut write1, mut read1) = client
1402            .expect_open::<String, String, u64, i64>(ShardId::new())
1403            .await;
1404
1405        // Different types, so that checks would fail in case we were not separating these
1406        // collections internally.
1407        let (mut write2, mut read2) = client
1408            .expect_open::<String, (), u64, i64>(ShardId::new())
1409            .await;
1410
1411        write1
1412            .expect_compare_and_append(&data1[..], u64::minimum(), 3)
1413            .await;
1414
1415        write2
1416            .expect_compare_and_append(&data2[..], u64::minimum(), 3)
1417            .await;
1418
1419        assert_eq!(
1420            read1.expect_snapshot_and_fetch(2).await,
1421            all_ok(&data1[..], 2)
1422        );
1423
1424        assert_eq!(
1425            read2.expect_snapshot_and_fetch(2).await,
1426            all_ok(&data2[..], 2)
1427        );
1428    }
1429
1430    #[mz_persist_proc::test(tokio::test)]
1431    #[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented
1432    async fn fetch_upper(dyncfgs: ConfigUpdates) {
1433        let data = [
1434            (("1".to_owned(), "one".to_owned()), 1, 1),
1435            (("2".to_owned(), "two".to_owned()), 2, 1),
1436        ];
1437
1438        let client = new_test_client(&dyncfgs).await;
1439
1440        let shard_id = ShardId::new();
1441
1442        let (mut write1, _read1) = client
1443            .expect_open::<String, String, u64, i64>(shard_id)
1444            .await;
1445
1446        let (mut write2, _read2) = client
1447            .expect_open::<String, String, u64, i64>(shard_id)
1448            .await;
1449
1450        write1
1451            .expect_append(&data[..], write1.upper().clone(), vec![3])
1452            .await;
1453
1454        // The shard-global upper does advance, even if this writer didn't advance its local upper.
1455        assert_eq!(write2.fetch_recent_upper().await, &Antichain::from_elem(3));
1456
1457        // The writer-local upper should advance, even if it was another writer
1458        // that advanced the frontier.
1459        assert_eq!(write2.upper(), &Antichain::from_elem(3));
1460    }
1461
1462    #[mz_persist_proc::test(tokio::test)]
1463    #[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented
1464    async fn append_with_invalid_upper(dyncfgs: ConfigUpdates) {
1465        let data = [
1466            (("1".to_owned(), "one".to_owned()), 1, 1),
1467            (("2".to_owned(), "two".to_owned()), 2, 1),
1468        ];
1469
1470        let client = new_test_client(&dyncfgs).await;
1471
1472        let shard_id = ShardId::new();
1473
1474        let (mut write, _read) = client
1475            .expect_open::<String, String, u64, i64>(shard_id)
1476            .await;
1477
1478        write
1479            .expect_append(&data[..], write.upper().clone(), vec![3])
1480            .await;
1481
1482        let data = [
1483            (("5".to_owned(), "fünf".to_owned()), 5, 1),
1484            (("6".to_owned(), "sechs".to_owned()), 6, 1),
1485        ];
1486        let res = write
1487            .append(
1488                data.iter(),
1489                Antichain::from_elem(5),
1490                Antichain::from_elem(7),
1491            )
1492            .await;
1493        assert_eq!(
1494            res,
1495            Ok(Err(UpperMismatch {
1496                expected: Antichain::from_elem(5),
1497                current: Antichain::from_elem(3)
1498            }))
1499        );
1500
1501        // Writing with an outdated upper updates the write handle's upper to the correct upper.
1502        assert_eq!(write.upper(), &Antichain::from_elem(3));
1503    }
1504
1505    // Make sure that the API structs are Sync + Send, so that they can be used in async tasks.
1506    // NOTE: This is a compile-time only test. If it compiles, we're good.
1507    #[allow(unused)]
1508    async fn sync_send(dyncfgs: ConfigUpdates) {
1509        mz_ore::test::init_logging();
1510
1511        fn is_send_sync<T: Send + Sync>(_x: T) -> bool {
1512            true
1513        }
1514
1515        let client = new_test_client(&dyncfgs).await;
1516
1517        let (write, read) = client
1518            .expect_open::<String, String, u64, i64>(ShardId::new())
1519            .await;
1520
1521        assert!(is_send_sync(client));
1522        assert!(is_send_sync(write));
1523        assert!(is_send_sync(read));
1524    }
1525
1526    #[mz_persist_proc::test(tokio::test)]
1527    #[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented
1528    async fn compare_and_append(dyncfgs: ConfigUpdates) {
1529        let data = vec![
1530            (("1".to_owned(), "one".to_owned()), 1, 1),
1531            (("2".to_owned(), "two".to_owned()), 2, 1),
1532            (("3".to_owned(), "three".to_owned()), 3, 1),
1533        ];
1534
1535        let id = ShardId::new();
1536        let client = new_test_client(&dyncfgs).await;
1537        let (mut write1, mut read) = client.expect_open::<String, String, u64, i64>(id).await;
1538
1539        let (mut write2, _read) = client.expect_open::<String, String, u64, i64>(id).await;
1540
1541        assert_eq!(write1.upper(), &Antichain::from_elem(u64::minimum()));
1542        assert_eq!(write2.upper(), &Antichain::from_elem(u64::minimum()));
1543        assert_eq!(read.since(), &Antichain::from_elem(u64::minimum()));
1544
1545        // Write a [0,3) batch.
1546        write1
1547            .expect_compare_and_append(&data[..2], u64::minimum(), 3)
1548            .await;
1549        assert_eq!(write1.upper(), &Antichain::from_elem(3));
1550
1551        assert_eq!(
1552            read.expect_snapshot_and_fetch(2).await,
1553            all_ok(&data[..2], 2)
1554        );
1555
1556        // Try and write with a wrong expected upper.
1557        let res = write2
1558            .compare_and_append(
1559                &data[..2],
1560                Antichain::from_elem(u64::minimum()),
1561                Antichain::from_elem(3),
1562            )
1563            .await;
1564        assert_eq!(
1565            res,
1566            Ok(Err(UpperMismatch {
1567                expected: Antichain::from_elem(u64::minimum()),
1568                current: Antichain::from_elem(3)
1569            }))
1570        );
1571
1572        // A failed write updates our local cache of the shard upper.
1573        assert_eq!(write2.upper(), &Antichain::from_elem(3));
1574
1575        // Try again with a good expected upper.
1576        write2.expect_compare_and_append(&data[2..], 3, 4).await;
1577
1578        assert_eq!(write2.upper(), &Antichain::from_elem(4));
1579
1580        assert_eq!(read.expect_snapshot_and_fetch(3).await, all_ok(&data, 3));
1581    }
1582
1583    #[mz_persist_proc::test(tokio::test)]
1584    #[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented
1585    async fn overlapping_append(dyncfgs: ConfigUpdates) {
1586        mz_ore::test::init_logging_default("info");
1587
1588        let data = vec![
1589            (("1".to_owned(), "one".to_owned()), 1, 1),
1590            (("2".to_owned(), "two".to_owned()), 2, 1),
1591            (("3".to_owned(), "three".to_owned()), 3, 1),
1592            (("4".to_owned(), "vier".to_owned()), 4, 1),
1593            (("5".to_owned(), "cinque".to_owned()), 5, 1),
1594        ];
1595
1596        let id = ShardId::new();
1597        let client = new_test_client(&dyncfgs).await;
1598
1599        let (mut write1, mut read) = client.expect_open::<String, String, u64, i64>(id).await;
1600
1601        let (mut write2, _read) = client.expect_open::<String, String, u64, i64>(id).await;
1602
1603        // Grab a listener before we do any writing
1604        let mut listen = read.clone("").await.expect_listen(0).await;
1605
1606        // Write a [0,3) batch.
1607        write1
1608            .expect_append(&data[..2], write1.upper().clone(), vec![3])
1609            .await;
1610        assert_eq!(write1.upper(), &Antichain::from_elem(3));
1611
1612        // Write a [0,5) batch with the second writer.
1613        write2
1614            .expect_append(&data[..4], write2.upper().clone(), vec![5])
1615            .await;
1616        assert_eq!(write2.upper(), &Antichain::from_elem(5));
1617
1618        // Write a [3,6) batch with the first writer.
1619        write1
1620            .expect_append(&data[2..5], write1.upper().clone(), vec![6])
1621            .await;
1622        assert_eq!(write1.upper(), &Antichain::from_elem(6));
1623
1624        assert_eq!(read.expect_snapshot_and_fetch(5).await, all_ok(&data, 5));
1625
1626        assert_eq!(
1627            listen.read_until(&6).await,
1628            (all_ok(&data[..], 1), Antichain::from_elem(6))
1629        );
1630    }
1631
1632    // Appends need to be contiguous for a shard, meaning the lower of an appended batch must not
1633    // be in advance of the current shard upper.
1634    #[mz_persist_proc::test(tokio::test)]
1635    #[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented
1636    async fn contiguous_append(dyncfgs: ConfigUpdates) {
1637        let data = vec![
1638            (("1".to_owned(), "one".to_owned()), 1, 1),
1639            (("2".to_owned(), "two".to_owned()), 2, 1),
1640            (("3".to_owned(), "three".to_owned()), 3, 1),
1641            (("4".to_owned(), "vier".to_owned()), 4, 1),
1642            (("5".to_owned(), "cinque".to_owned()), 5, 1),
1643        ];
1644
1645        let id = ShardId::new();
1646        let client = new_test_client(&dyncfgs).await;
1647
1648        let (mut write, mut read) = client.expect_open::<String, String, u64, i64>(id).await;
1649
1650        // Write a [0,3) batch.
1651        write
1652            .expect_append(&data[..2], write.upper().clone(), vec![3])
1653            .await;
1654        assert_eq!(write.upper(), &Antichain::from_elem(3));
1655
1656        // Appending a non-contiguous batch should fail.
1657        // Write a [5,6) batch with the second writer.
1658        let result = write
1659            .append(
1660                &data[4..5],
1661                Antichain::from_elem(5),
1662                Antichain::from_elem(6),
1663            )
1664            .await;
1665        assert_eq!(
1666            result,
1667            Ok(Err(UpperMismatch {
1668                expected: Antichain::from_elem(5),
1669                current: Antichain::from_elem(3)
1670            }))
1671        );
1672
1673        // Fixing the lower to make the write contiguous should make the append succeed.
1674        write.expect_append(&data[2..5], vec![3], vec![6]).await;
1675        assert_eq!(write.upper(), &Antichain::from_elem(6));
1676
1677        assert_eq!(read.expect_snapshot_and_fetch(5).await, all_ok(&data, 5));
1678    }
1679
1680    // Per-writer appends can be non-contiguous, as long as appends to the shard from all writers
1681    // combined are contiguous.
1682    #[mz_persist_proc::test(tokio::test)]
1683    #[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented
1684    async fn noncontiguous_append_per_writer(dyncfgs: ConfigUpdates) {
1685        let data = vec![
1686            (("1".to_owned(), "one".to_owned()), 1, 1),
1687            (("2".to_owned(), "two".to_owned()), 2, 1),
1688            (("3".to_owned(), "three".to_owned()), 3, 1),
1689            (("4".to_owned(), "vier".to_owned()), 4, 1),
1690            (("5".to_owned(), "cinque".to_owned()), 5, 1),
1691        ];
1692
1693        let id = ShardId::new();
1694        let client = new_test_client(&dyncfgs).await;
1695
1696        let (mut write1, mut read) = client.expect_open::<String, String, u64, i64>(id).await;
1697
1698        let (mut write2, _read) = client.expect_open::<String, String, u64, i64>(id).await;
1699
1700        // Write a [0,3) batch with writer 1.
1701        write1
1702            .expect_append(&data[..2], write1.upper().clone(), vec![3])
1703            .await;
1704        assert_eq!(write1.upper(), &Antichain::from_elem(3));
1705
1706        // Write a [3,5) batch with writer 2.
1707        write2.upper = Antichain::from_elem(3);
1708        write2
1709            .expect_append(&data[2..4], write2.upper().clone(), vec![5])
1710            .await;
1711        assert_eq!(write2.upper(), &Antichain::from_elem(5));
1712
1713        // Write a [5,6) batch with writer 1.
1714        write1.upper = Antichain::from_elem(5);
1715        write1
1716            .expect_append(&data[4..5], write1.upper().clone(), vec![6])
1717            .await;
1718        assert_eq!(write1.upper(), &Antichain::from_elem(6));
1719
1720        assert_eq!(read.expect_snapshot_and_fetch(5).await, all_ok(&data, 5));
1721    }
1722
1723    // Compare_and_appends need to be contiguous for a shard, meaning the lower of an appended
1724    // batch needs to match the current shard upper.
1725    #[mz_persist_proc::test(tokio::test)]
1726    #[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented
1727    async fn contiguous_compare_and_append(dyncfgs: ConfigUpdates) {
1728        let data = vec![
1729            (("1".to_owned(), "one".to_owned()), 1, 1),
1730            (("2".to_owned(), "two".to_owned()), 2, 1),
1731            (("3".to_owned(), "three".to_owned()), 3, 1),
1732            (("4".to_owned(), "vier".to_owned()), 4, 1),
1733            (("5".to_owned(), "cinque".to_owned()), 5, 1),
1734        ];
1735
1736        let id = ShardId::new();
1737        let client = new_test_client(&dyncfgs).await;
1738
1739        let (mut write, mut read) = client.expect_open::<String, String, u64, i64>(id).await;
1740
1741        // Write a [0,3) batch.
1742        write.expect_compare_and_append(&data[..2], 0, 3).await;
1743        assert_eq!(write.upper(), &Antichain::from_elem(3));
1744
1745        // Appending a non-contiguous batch should fail.
1746        // Write a [5,6) batch with the second writer.
1747        let result = write
1748            .compare_and_append(
1749                &data[4..5],
1750                Antichain::from_elem(5),
1751                Antichain::from_elem(6),
1752            )
1753            .await;
1754        assert_eq!(
1755            result,
1756            Ok(Err(UpperMismatch {
1757                expected: Antichain::from_elem(5),
1758                current: Antichain::from_elem(3)
1759            }))
1760        );
1761
1762        // Writing with the correct expected upper to make the write contiguous should make the
1763        // append succeed.
1764        write.expect_compare_and_append(&data[2..5], 3, 6).await;
1765        assert_eq!(write.upper(), &Antichain::from_elem(6));
1766
1767        assert_eq!(read.expect_snapshot_and_fetch(5).await, all_ok(&data, 5));
1768    }
1769
1770    // Per-writer compare_and_appends can be non-contiguous, as long as appends to the shard from
1771    // all writers combined are contiguous.
1772    #[mz_persist_proc::test(tokio::test)]
1773    #[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented
1774    async fn noncontiguous_compare_and_append_per_writer(dyncfgs: ConfigUpdates) {
1775        let data = vec![
1776            (("1".to_owned(), "one".to_owned()), 1, 1),
1777            (("2".to_owned(), "two".to_owned()), 2, 1),
1778            (("3".to_owned(), "three".to_owned()), 3, 1),
1779            (("4".to_owned(), "vier".to_owned()), 4, 1),
1780            (("5".to_owned(), "cinque".to_owned()), 5, 1),
1781        ];
1782
1783        let id = ShardId::new();
1784        let client = new_test_client(&dyncfgs).await;
1785
1786        let (mut write1, mut read) = client.expect_open::<String, String, u64, i64>(id).await;
1787
1788        let (mut write2, _read) = client.expect_open::<String, String, u64, i64>(id).await;
1789
1790        // Write a [0,3) batch with writer 1.
1791        write1.expect_compare_and_append(&data[..2], 0, 3).await;
1792        assert_eq!(write1.upper(), &Antichain::from_elem(3));
1793
1794        // Write a [3,5) batch with writer 2.
1795        write2.expect_compare_and_append(&data[2..4], 3, 5).await;
1796        assert_eq!(write2.upper(), &Antichain::from_elem(5));
1797
1798        // Write a [5,6) batch with writer 1.
1799        write1.expect_compare_and_append(&data[4..5], 5, 6).await;
1800        assert_eq!(write1.upper(), &Antichain::from_elem(6));
1801
1802        assert_eq!(read.expect_snapshot_and_fetch(5).await, all_ok(&data, 5));
1803    }
1804
1805    #[mz_ore::test]
1806    fn fmt_ids() {
1807        assert_eq!(
1808            format!("{}", LeasedReaderId([0u8; 16])),
1809            "r00000000-0000-0000-0000-000000000000"
1810        );
1811        assert_eq!(
1812            format!("{:?}", LeasedReaderId([0u8; 16])),
1813            "LeasedReaderId(00000000-0000-0000-0000-000000000000)"
1814        );
1815    }
1816
1817    #[mz_persist_proc::test(tokio::test(flavor = "multi_thread"))]
1818    #[cfg_attr(miri, ignore)] // error: unsupported operation: integer-to-pointer casts and `ptr::from_exposed_addr` are not supported with `-Zmiri-strict-provenance`
1819    async fn concurrency(dyncfgs: ConfigUpdates) {
1820        let data = DataGenerator::small();
1821
1822        const NUM_WRITERS: usize = 2;
1823        let id = ShardId::new();
1824        let client = new_test_client(&dyncfgs).await;
1825        let mut handles = Vec::<mz_ore::task::JoinHandle<()>>::new();
1826        for idx in 0..NUM_WRITERS {
1827            let (data, client) = (data.clone(), client.clone());
1828
1829            let (batch_tx, mut batch_rx) = tokio::sync::mpsc::channel(1);
1830
1831            let client1 = client.clone();
1832            let handle = mz_ore::task::spawn(|| format!("writer-{}", idx), async move {
1833                let (write, _) = client1.expect_open::<Vec<u8>, Vec<u8>, u64, i64>(id).await;
1834                let mut current_upper = 0;
1835                for batch in data.batches() {
1836                    let new_upper = match batch.get(batch.len() - 1) {
1837                        Some((_, max_ts, _)) => u64::decode(max_ts) + 1,
1838                        None => continue,
1839                    };
1840                    // Because we (intentionally) call open inside the task,
1841                    // some other writer may have raced ahead and already
1842                    // appended some data before this one was registered. As a
1843                    // result, this writer may not be starting with an upper of
1844                    // the initial empty antichain. This is nice because it
1845                    // mimics how a real HA source would work, but it means we
1846                    // have to skip any batches that have already been committed
1847                    // (otherwise our new_upper would be before our upper).
1848                    //
1849                    // Note however, that unlike a real source, our
1850                    // DataGenerator-derived batches are guaranteed to be
1851                    // chunked along the same boundaries. This means we don't
1852                    // have to consider partial batches when generating the
1853                    // updates below.
1854                    if PartialOrder::less_equal(&Antichain::from_elem(new_upper), write.upper()) {
1855                        continue;
1856                    }
1857
1858                    let current_upper_chain = Antichain::from_elem(current_upper);
1859                    current_upper = new_upper;
1860                    let new_upper_chain = Antichain::from_elem(new_upper);
1861                    let mut builder = write.builder(current_upper_chain);
1862
1863                    for ((k, v), t, d) in batch.iter() {
1864                        builder
1865                            .add(&k.to_vec(), &v.to_vec(), &u64::decode(t), &i64::decode(d))
1866                            .await
1867                            .expect("invalid usage");
1868                    }
1869
1870                    let batch = builder
1871                        .finish(new_upper_chain)
1872                        .await
1873                        .expect("invalid usage");
1874
1875                    match batch_tx.send(batch).await {
1876                        Ok(_) => (),
1877                        Err(e) => panic!("send error: {}", e),
1878                    }
1879                }
1880            });
1881            handles.push(handle);
1882
1883            let handle = mz_ore::task::spawn(|| format!("appender-{}", idx), async move {
1884                let (mut write, _) = client.expect_open::<Vec<u8>, Vec<u8>, u64, i64>(id).await;
1885
1886                while let Some(batch) = batch_rx.recv().await {
1887                    let lower = batch.lower().clone();
1888                    let upper = batch.upper().clone();
1889                    write
1890                        .append_batch(batch, lower, upper)
1891                        .await
1892                        .expect("invalid usage")
1893                        .expect("unexpected upper");
1894                }
1895            });
1896            handles.push(handle);
1897        }
1898
1899        for handle in handles {
1900            let () = handle.await;
1901        }
1902
1903        let expected = data.records().collect::<Vec<_>>();
1904        let max_ts = expected.last().map(|(_, t, _)| *t).unwrap_or_default();
1905        let (_, mut read) = client.expect_open::<Vec<u8>, Vec<u8>, u64, i64>(id).await;
1906        assert_eq!(
1907            read.expect_snapshot_and_fetch(max_ts).await,
1908            all_ok(expected.iter(), max_ts)
1909        );
1910    }
1911
1912    // Regression test for database-issues#3523. Snapshot with as_of >= upper would
1913    // immediately return the data currently available instead of waiting for
1914    // upper to advance past as_of.
1915    #[mz_persist_proc::test(tokio::test)]
1916    #[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented
1917    async fn regression_blocking_reads(dyncfgs: ConfigUpdates) {
1918        let waker = noop_waker();
1919        let mut cx = Context::from_waker(&waker);
1920
1921        let data = [
1922            (("1".to_owned(), "one".to_owned()), 1, 1),
1923            (("2".to_owned(), "two".to_owned()), 2, 1),
1924            (("3".to_owned(), "three".to_owned()), 3, 1),
1925        ];
1926
1927        let id = ShardId::new();
1928        let client = new_test_client(&dyncfgs).await;
1929        let (mut write, mut read) = client.expect_open::<String, String, u64, i64>(id).await;
1930
1931        // Grab a listener as_of (aka gt) 1, which is not yet closed out.
1932        let mut listen = read.clone("").await.expect_listen(1).await;
1933        let mut listen_next = Box::pin(listen.fetch_next());
1934        // Intentionally don't await the listen_next, but instead manually poke
1935        // it for a while and assert that it doesn't resolve yet. See below for
1936        // discussion of some alternative ways of writing this unit test.
1937        for _ in 0..100 {
1938            assert!(
1939                Pin::new(&mut listen_next).poll(&mut cx).is_pending(),
1940                "listen::next unexpectedly ready"
1941            );
1942        }
1943
1944        // Write a [0,3) batch.
1945        write
1946            .expect_compare_and_append(&data[..2], u64::minimum(), 3)
1947            .await;
1948
1949        // The initial listen_next call should now be able to return data at 2.
1950        // It doesn't get 1 because the as_of was 1 and listen is strictly gt.
1951        assert_eq!(
1952            listen_next.await,
1953            vec![
1954                ListenEvent::Updates(vec![(("2".to_owned(), "two".to_owned()), 2, 1)]),
1955                ListenEvent::Progress(Antichain::from_elem(3)),
1956            ]
1957        );
1958
1959        // Grab a snapshot as_of 3, which is not yet closed out. Intentionally
1960        // don't await the snap, but instead manually poke it for a while and
1961        // assert that it doesn't resolve yet.
1962        //
1963        // An alternative to this would be to run it in a task and poll the task
1964        // with some timeout, but this would introduce a fixed test execution
1965        // latency of the timeout in the happy case. Plus, it would be
1966        // non-deterministic.
1967        //
1968        // Another alternative (that's potentially quite interesting!) would be
1969        // to separate creating a snapshot immediately (which would fail if
1970        // as_of was >= upper) from a bit of logic that retries until that case
1971        // is ready.
1972        let mut snap = Box::pin(read.expect_snapshot_and_fetch(3));
1973        for _ in 0..100 {
1974            assert!(
1975                Pin::new(&mut snap).poll(&mut cx).is_pending(),
1976                "snapshot unexpectedly ready"
1977            );
1978        }
1979
1980        // Now add the data at 3 and also unblock the snapshot.
1981        write.expect_compare_and_append(&data[2..], 3, 4).await;
1982
1983        // Read the snapshot and check that it got all the appropriate data.
1984        assert_eq!(snap.await, all_ok(&data[..], 3));
1985    }
1986
1987    #[mz_persist_proc::test(tokio::test)]
1988    #[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented
1989    async fn heartbeat_task_shutdown(dyncfgs: ConfigUpdates) {
1990        // Verify that the ReadHandle and WriteHandle background heartbeat tasks
1991        // shut down cleanly after the handle is expired.
1992        let mut cache = new_test_client_cache(&dyncfgs);
1993        cache
1994            .cfg
1995            .set_config(&READER_LEASE_DURATION, Duration::from_millis(1));
1996        cache.cfg.writer_lease_duration = Duration::from_millis(1);
1997        let (_write, mut read) = cache
1998            .open(PersistLocation::new_in_mem())
1999            .await
2000            .expect("client construction failed")
2001            .expect_open::<(), (), u64, i64>(ShardId::new())
2002            .await;
2003        let read_unexpired_state = read
2004            .unexpired_state
2005            .take()
2006            .expect("handle should have unexpired state");
2007        read.expire().await;
2008        read_unexpired_state.heartbeat_task.await
2009    }
2010
2011    /// Verify that shard finalization works with empty shards, shards that have
2012    /// an empty write up to the empty upper Antichain.
2013    #[mz_persist_proc::test(tokio::test)]
2014    #[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented
2015    async fn finalize_empty_shard(dyncfgs: ConfigUpdates) {
2016        let persist_client = new_test_client(&dyncfgs).await;
2017
2018        let shard_id = ShardId::new();
2019        pub const CRITICAL_SINCE: CriticalReaderId =
2020            CriticalReaderId([0, 0, 0, 0, 17, 17, 34, 34, 51, 51, 68, 68, 68, 68, 68, 68]);
2021
2022        let (mut write, mut read) = persist_client
2023            .expect_open::<(), (), u64, i64>(shard_id)
2024            .await;
2025
2026        // Advance since and upper to empty, which is a pre-requisite for
2027        // finalization/tombstoning.
2028        let () = read.downgrade_since(&Antichain::new()).await;
2029        let () = write.advance_upper(&Antichain::new()).await;
2030
2031        let mut since_handle: SinceHandle<(), (), u64, i64> = persist_client
2032            .open_critical_since(
2033                shard_id,
2034                CRITICAL_SINCE,
2035                Opaque::encode(&0u64),
2036                Diagnostics::for_tests(),
2037            )
2038            .await
2039            .expect("invalid persist usage");
2040
2041        let epoch = since_handle.opaque().clone();
2042        let new_since = Antichain::new();
2043        let downgrade = since_handle
2044            .compare_and_downgrade_since(&epoch, (&epoch, &new_since))
2045            .await;
2046
2047        assert!(
2048            downgrade.is_ok(),
2049            "downgrade of critical handle must succeed"
2050        );
2051
2052        let finalize = persist_client
2053            .finalize_shard::<(), (), u64, i64>(shard_id, Diagnostics::for_tests())
2054            .await;
2055
2056        assert_ok!(finalize, "finalization must succeed");
2057
2058        let is_finalized = persist_client
2059            .is_finalized::<(), (), u64, i64>(shard_id, Diagnostics::for_tests())
2060            .await
2061            .expect("invalid persist usage");
2062        assert!(is_finalized, "shard must still be finalized");
2063    }
2064
2065    /// Verify that shard finalization works with shards that had some data
2066    /// written to them, plus then an empty batch to bring their upper to the
2067    /// empty Antichain.
2068    #[mz_persist_proc::test(tokio::test)]
2069    #[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented
2070    async fn finalize_shard(dyncfgs: ConfigUpdates) {
2071        const DATA: &[(((), ()), u64, i64)] = &[(((), ()), 0, 1)];
2072        let persist_client = new_test_client(&dyncfgs).await;
2073
2074        let shard_id = ShardId::new();
2075        pub const CRITICAL_SINCE: CriticalReaderId =
2076            CriticalReaderId([0, 0, 0, 0, 17, 17, 34, 34, 51, 51, 68, 68, 68, 68, 68, 68]);
2077
2078        let (mut write, mut read) = persist_client
2079            .expect_open::<(), (), u64, i64>(shard_id)
2080            .await;
2081
2082        // Write some data.
2083        let () = write
2084            .compare_and_append(DATA, Antichain::from_elem(0), Antichain::from_elem(1))
2085            .await
2086            .expect("usage should be valid")
2087            .expect("upper should match");
2088
2089        // Advance since and upper to empty, which is a pre-requisite for
2090        // finalization/tombstoning.
2091        let () = read.downgrade_since(&Antichain::new()).await;
2092        let () = write.advance_upper(&Antichain::new()).await;
2093
2094        let mut since_handle: SinceHandle<(), (), u64, i64> = persist_client
2095            .open_critical_since(
2096                shard_id,
2097                CRITICAL_SINCE,
2098                Opaque::encode(&0u64),
2099                Diagnostics::for_tests(),
2100            )
2101            .await
2102            .expect("invalid persist usage");
2103
2104        let epoch = since_handle.opaque().clone();
2105        let new_since = Antichain::new();
2106        let downgrade = since_handle
2107            .compare_and_downgrade_since(&epoch, (&epoch, &new_since))
2108            .await;
2109
2110        assert!(
2111            downgrade.is_ok(),
2112            "downgrade of critical handle must succeed"
2113        );
2114
2115        let finalize = persist_client
2116            .finalize_shard::<(), (), u64, i64>(shard_id, Diagnostics::for_tests())
2117            .await;
2118
2119        assert_ok!(finalize, "finalization must succeed");
2120
2121        let is_finalized = persist_client
2122            .is_finalized::<(), (), u64, i64>(shard_id, Diagnostics::for_tests())
2123            .await
2124            .expect("invalid persist usage");
2125        assert!(is_finalized, "shard must still be finalized");
2126    }
2127
2128    proptest! {
2129        #![proptest_config(ProptestConfig::with_cases(4096))]
2130
2131        #[mz_ore::test]
2132        #[cfg_attr(miri, ignore)] // too slow
2133        fn shard_id_protobuf_roundtrip(expect in any::<ShardId>() ) {
2134            let actual = protobuf_roundtrip::<_, String>(&expect);
2135            assert_ok!(actual);
2136            assert_eq!(actual.unwrap(), expect);
2137        }
2138    }
2139}