mz_persist_client/
lib.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! An abstraction presenting as a durable time-varying collection (aka shard)
11
12#![warn(missing_docs, missing_debug_implementations)]
13// #[track_caller] is currently a no-op on async functions, but that hopefully won't be the case
14// forever. So we already annotate those functions now and ignore the compiler warning until
15// https://github.com/rust-lang/rust/issues/87417 pans out.
16#![allow(ungated_async_fn_track_caller)]
17
18use std::fmt::Debug;
19use std::marker::PhantomData;
20use std::sync::Arc;
21
22use differential_dataflow::difference::Semigroup;
23use differential_dataflow::lattice::Lattice;
24use mz_build_info::{BuildInfo, build_info};
25use mz_dyncfg::ConfigSet;
26use mz_ore::{instrument, soft_assert_or_log};
27use mz_persist::location::{Blob, Consensus, ExternalError};
28use mz_persist_types::schema::SchemaId;
29use mz_persist_types::{Codec, Codec64, Opaque};
30use timely::progress::Timestamp;
31
32use crate::async_runtime::IsolatedRuntime;
33use crate::cache::{PersistClientCache, StateCache};
34use crate::cfg::PersistConfig;
35use crate::critical::{CriticalReaderId, SinceHandle};
36use crate::error::InvalidUsage;
37use crate::fetch::{BatchFetcher, BatchFetcherConfig};
38use crate::internal::compact::Compactor;
39use crate::internal::encoding::{Schemas, parse_id};
40use crate::internal::gc::GarbageCollector;
41use crate::internal::machine::{Machine, retry_external};
42use crate::internal::state_versions::StateVersions;
43use crate::metrics::Metrics;
44use crate::read::{LeasedReaderId, READER_LEASE_DURATION, ReadHandle};
45use crate::rpc::PubSubSender;
46use crate::schema::CaESchema;
47use crate::write::{WriteHandle, WriterId};
48
49pub mod async_runtime;
50pub mod batch;
51pub mod cache;
52pub mod cfg;
53pub mod cli {
54    //! Persist command-line utilities
55    pub mod admin;
56    pub mod args;
57    pub mod bench;
58    pub mod inspect;
59}
60pub mod critical;
61pub mod error;
62pub mod fetch;
63pub mod internals_bench;
64pub mod iter;
65pub mod metrics {
66    //! Utilities related to metrics.
67    pub use crate::internal::metrics::{
68        Metrics, SinkMetrics, SinkWorkerMetrics, UpdateDelta, encode_ts_metric,
69    };
70}
71pub mod operators {
72    //! [timely] operators for reading and writing persist Shards.
73
74    use mz_dyncfg::Config;
75
76    pub mod shard_source;
77
78    // TODO(cfg): Move this next to the use.
79    pub(crate) const STORAGE_SOURCE_DECODE_FUEL: Config<usize> = Config::new(
80        "storage_source_decode_fuel",
81        100_000,
82        "\
83        The maximum amount of work to do in the persist_source mfp_and_decode \
84        operator before yielding.",
85    );
86}
87pub mod read;
88pub mod rpc;
89pub mod schema;
90pub mod stats;
91pub mod usage;
92pub mod write;
93
94/// An implementation of the public crate interface.
95mod internal {
96    pub mod apply;
97    pub mod cache;
98    pub mod compact;
99    pub mod encoding;
100    pub mod gc;
101    pub mod machine;
102    pub mod maintenance;
103    pub mod merge;
104    pub mod metrics;
105    pub mod paths;
106    pub mod restore;
107    pub mod service;
108    pub mod state;
109    pub mod state_diff;
110    pub mod state_versions;
111    pub mod trace;
112    pub mod watch;
113
114    #[cfg(test)]
115    pub mod datadriven;
116}
117
118/// Persist build information.
119pub const BUILD_INFO: BuildInfo = build_info!();
120
121// Re-export for convenience.
122pub use mz_persist_types::{PersistLocation, ShardId};
123
124/// Additional diagnostic information used within Persist
125/// e.g. for logging, metric labels, etc.
126#[derive(Clone, Debug)]
127pub struct Diagnostics {
128    /// A user-friendly name for the shard.
129    pub shard_name: String,
130    /// A purpose for the handle.
131    pub handle_purpose: String,
132}
133
134impl Diagnostics {
135    /// Create a new `Diagnostics` from `handle_purpose`.
136    pub fn from_purpose(handle_purpose: &str) -> Self {
137        Self {
138            shard_name: "unknown".to_string(),
139            handle_purpose: handle_purpose.to_string(),
140        }
141    }
142
143    /// Create a new `Diagnostics` for testing.
144    pub fn for_tests() -> Self {
145        Self {
146            shard_name: "test-shard-name".to_string(),
147            handle_purpose: "test-purpose".to_string(),
148        }
149    }
150}
151
152/// A handle for interacting with the set of persist shard made durable at a
153/// single [PersistLocation].
154///
155/// All async methods on PersistClient retry for as long as they are able, but
156/// the returned [std::future::Future]s implement "cancel on drop" semantics.
157/// This means that callers can add a timeout using [tokio::time::timeout] or
158/// [tokio::time::timeout_at].
159///
160/// ```rust,no_run
161/// # use std::sync::Arc;
162/// # use mz_persist_types::codec_impls::StringSchema;
163/// # let client: mz_persist_client::PersistClient = unimplemented!();
164/// # let timeout: std::time::Duration = unimplemented!();
165/// # let id = mz_persist_client::ShardId::new();
166/// # let diagnostics = mz_persist_client::Diagnostics { shard_name: "".into(), handle_purpose: "".into() };
167/// # async {
168/// tokio::time::timeout(timeout, client.open::<String, String, u64, i64>(id,
169///     Arc::new(StringSchema),Arc::new(StringSchema),diagnostics, true)).await
170/// # };
171/// ```
172#[derive(Debug, Clone)]
173pub struct PersistClient {
174    cfg: PersistConfig,
175    blob: Arc<dyn Blob>,
176    consensus: Arc<dyn Consensus>,
177    metrics: Arc<Metrics>,
178    isolated_runtime: Arc<IsolatedRuntime>,
179    shared_states: Arc<StateCache>,
180    pubsub_sender: Arc<dyn PubSubSender>,
181}
182
183impl PersistClient {
184    /// Returns a new client for interfacing with persist shards made durable to
185    /// the given [Blob] and [Consensus].
186    ///
187    /// This is exposed mostly for testing. Persist users likely want
188    /// [crate::cache::PersistClientCache::open].
189    pub fn new(
190        cfg: PersistConfig,
191        blob: Arc<dyn Blob>,
192        consensus: Arc<dyn Consensus>,
193        metrics: Arc<Metrics>,
194        isolated_runtime: Arc<IsolatedRuntime>,
195        shared_states: Arc<StateCache>,
196        pubsub_sender: Arc<dyn PubSubSender>,
197    ) -> Result<Self, ExternalError> {
198        // TODO: Verify somehow that blob matches consensus to prevent
199        // accidental misuse.
200        Ok(PersistClient {
201            cfg,
202            blob,
203            consensus,
204            metrics,
205            isolated_runtime,
206            shared_states,
207            pubsub_sender,
208        })
209    }
210
211    /// Returns a new in-mem [PersistClient] for tests and examples.
212    pub async fn new_for_tests() -> Self {
213        let cache = PersistClientCache::new_no_metrics();
214        cache
215            .open(PersistLocation::new_in_mem())
216            .await
217            .expect("in-mem location is valid")
218    }
219
220    /// Returns persist's [ConfigSet].
221    pub fn dyncfgs(&self) -> &ConfigSet {
222        &self.cfg.configs
223    }
224
225    async fn make_machine<K, V, T, D>(
226        &self,
227        shard_id: ShardId,
228        diagnostics: Diagnostics,
229    ) -> Result<Machine<K, V, T, D>, InvalidUsage<T>>
230    where
231        K: Debug + Codec,
232        V: Debug + Codec,
233        T: Timestamp + Lattice + Codec64 + Sync,
234        D: Semigroup + Codec64 + Send + Sync,
235    {
236        let state_versions = StateVersions::new(
237            self.cfg.clone(),
238            Arc::clone(&self.consensus),
239            Arc::clone(&self.blob),
240            Arc::clone(&self.metrics),
241        );
242        let machine = Machine::<K, V, T, D>::new(
243            self.cfg.clone(),
244            shard_id,
245            Arc::clone(&self.metrics),
246            Arc::new(state_versions),
247            Arc::clone(&self.shared_states),
248            Arc::clone(&self.pubsub_sender),
249            Arc::clone(&self.isolated_runtime),
250            diagnostics.clone(),
251        )
252        .await?;
253        Ok(machine)
254    }
255
256    /// Provides capabilities for the durable TVC identified by `shard_id` at
257    /// its current since and upper frontiers.
258    ///
259    /// This method is a best-effort attempt to regain control of the frontiers
260    /// of a shard. Its most common uses are to recover capabilities that have
261    /// expired (leases) or to attempt to read a TVC that one did not create (or
262    /// otherwise receive capabilities for). If the frontiers have been fully
263    /// released by all other parties, this call may result in capabilities with
264    /// empty frontiers (which are useless).
265    ///
266    /// If `shard_id` has never been used before, initializes a new shard and
267    /// returns handles with `since` and `upper` frontiers set to initial values
268    /// of `Antichain::from_elem(T::minimum())`.
269    ///
270    /// The `schema` parameter is currently unused, but should be an object
271    /// that represents the schema of the data in the shard. This will be required
272    /// in the future.
273    #[instrument(level = "debug", fields(shard = %shard_id))]
274    pub async fn open<K, V, T, D>(
275        &self,
276        shard_id: ShardId,
277        key_schema: Arc<K::Schema>,
278        val_schema: Arc<V::Schema>,
279        diagnostics: Diagnostics,
280        use_critical_since: bool,
281    ) -> Result<(WriteHandle<K, V, T, D>, ReadHandle<K, V, T, D>), InvalidUsage<T>>
282    where
283        K: Debug + Codec,
284        V: Debug + Codec,
285        T: Timestamp + Lattice + Codec64 + Sync,
286        D: Semigroup + Ord + Codec64 + Send + Sync,
287    {
288        Ok((
289            self.open_writer(
290                shard_id,
291                Arc::clone(&key_schema),
292                Arc::clone(&val_schema),
293                diagnostics.clone(),
294            )
295            .await?,
296            self.open_leased_reader(
297                shard_id,
298                key_schema,
299                val_schema,
300                diagnostics,
301                use_critical_since,
302            )
303            .await?,
304        ))
305    }
306
307    /// [Self::open], but returning only a [ReadHandle].
308    ///
309    /// Use this to save latency and a bit of persist traffic if you're just
310    /// going to immediately drop or expire the [WriteHandle].
311    ///
312    /// The `_schema` parameter is currently unused, but should be an object
313    /// that represents the schema of the data in the shard. This will be required
314    /// in the future.
315    #[instrument(level = "debug", fields(shard = %shard_id))]
316    pub async fn open_leased_reader<K, V, T, D>(
317        &self,
318        shard_id: ShardId,
319        key_schema: Arc<K::Schema>,
320        val_schema: Arc<V::Schema>,
321        diagnostics: Diagnostics,
322        use_critical_since: bool,
323    ) -> Result<ReadHandle<K, V, T, D>, InvalidUsage<T>>
324    where
325        K: Debug + Codec,
326        V: Debug + Codec,
327        T: Timestamp + Lattice + Codec64 + Sync,
328        D: Semigroup + Codec64 + Send + Sync,
329    {
330        let machine = self.make_machine(shard_id, diagnostics.clone()).await?;
331        let gc = GarbageCollector::new(machine.clone(), Arc::clone(&self.isolated_runtime));
332
333        let reader_id = LeasedReaderId::new();
334        let heartbeat_ts = (self.cfg.now)();
335        let (reader_state, maintenance) = machine
336            .register_leased_reader(
337                &reader_id,
338                &diagnostics.handle_purpose,
339                READER_LEASE_DURATION.get(&self.cfg),
340                heartbeat_ts,
341                use_critical_since,
342            )
343            .await;
344        maintenance.start_performing(&machine, &gc);
345        let schemas = Schemas {
346            id: None,
347            key: key_schema,
348            val: val_schema,
349        };
350        let reader = ReadHandle::new(
351            self.cfg.clone(),
352            Arc::clone(&self.metrics),
353            machine,
354            gc,
355            Arc::clone(&self.blob),
356            reader_id,
357            schemas,
358            reader_state.since,
359            heartbeat_ts,
360        )
361        .await;
362
363        Ok(reader)
364    }
365
366    /// Creates and returns a [BatchFetcher] for the given shard id.
367    #[instrument(level = "debug", fields(shard = %shard_id))]
368    pub async fn create_batch_fetcher<K, V, T, D>(
369        &self,
370        shard_id: ShardId,
371        key_schema: Arc<K::Schema>,
372        val_schema: Arc<V::Schema>,
373        is_transient: bool,
374        diagnostics: Diagnostics,
375    ) -> Result<BatchFetcher<K, V, T, D>, InvalidUsage<T>>
376    where
377        K: Debug + Codec,
378        V: Debug + Codec,
379        T: Timestamp + Lattice + Codec64 + Sync,
380        D: Semigroup + Codec64 + Send + Sync,
381    {
382        let machine = self.make_machine(shard_id, diagnostics.clone()).await?;
383        let read_schemas = Schemas {
384            id: None,
385            key: key_schema,
386            val: val_schema,
387        };
388        let schema_cache = machine.applier.schema_cache();
389        let fetcher = BatchFetcher {
390            cfg: BatchFetcherConfig::new(&self.cfg),
391            blob: Arc::clone(&self.blob),
392            metrics: Arc::clone(&self.metrics),
393            shard_metrics: Arc::clone(&machine.applier.shard_metrics),
394            shard_id,
395            read_schemas,
396            schema_cache,
397            is_transient,
398            _phantom: PhantomData,
399        };
400
401        Ok(fetcher)
402    }
403
404    /// A convenience [CriticalReaderId] for Materialize controllers.
405    ///
406    /// For most (soon to be all?) shards in Materialize, a centralized
407    /// "controller" is the authority for when a user no longer needs to read at
408    /// a given frontier. (Other uses are temporary holds where correctness of
409    /// the overall system can be maintained through a lease timeout.) To make
410    /// [SinceHandle] easier to work with, we offer this convenience id for
411    /// Materialize controllers, so they don't have to durably record it.
412    ///
413    /// TODO: We're still shaking out whether the controller should be the only
414    /// critical since hold or if there are other places we want them. If the
415    /// former, we should remove [CriticalReaderId] and bake in the singular
416    /// nature of the controller critical handle.
417    ///
418    /// ```rust
419    /// // This prints as something that is not 0 but is visually recognizable.
420    /// assert_eq!(
421    ///     mz_persist_client::PersistClient::CONTROLLER_CRITICAL_SINCE.to_string(),
422    ///     "c00000000-1111-2222-3333-444444444444",
423    /// )
424    /// ```
425    pub const CONTROLLER_CRITICAL_SINCE: CriticalReaderId =
426        CriticalReaderId([0, 0, 0, 0, 17, 17, 34, 34, 51, 51, 68, 68, 68, 68, 68, 68]);
427
428    /// Provides a capability for the durable TVC identified by `shard_id` at
429    /// its current since frontier.
430    ///
431    /// In contrast to the time-leased [ReadHandle] returned by [Self::open] and
432    /// [Self::open_leased_reader], this handle and its associated capability
433    /// are not leased. A [SinceHandle] does not release its since capability;
434    /// downgrade to the empty antichain to hold back the since.
435    /// Also unlike `ReadHandle`, the handle is not expired on drop.
436    /// This is less ergonomic, but useful for "critical" since
437    /// holds which must survive even lease timeouts.
438    ///
439    /// **IMPORTANT**: The above means that if a SinceHandle is registered and
440    /// then lost, the shard's since will be permanently "stuck", forever
441    /// preventing logical compaction. Users are advised to durably record
442    /// (preferably in code) the intended [CriticalReaderId] _before_ registering
443    /// a SinceHandle (in case the process crashes at the wrong time).
444    ///
445    /// If `shard_id` has never been used before, initializes a new shard and
446    /// return a handle with its `since` frontier set to the initial value of
447    /// `Antichain::from_elem(T::minimum())`.
448    #[instrument(level = "debug", fields(shard = %shard_id))]
449    pub async fn open_critical_since<K, V, T, D, O>(
450        &self,
451        shard_id: ShardId,
452        reader_id: CriticalReaderId,
453        diagnostics: Diagnostics,
454    ) -> Result<SinceHandle<K, V, T, D, O>, InvalidUsage<T>>
455    where
456        K: Debug + Codec,
457        V: Debug + Codec,
458        T: Timestamp + Lattice + Codec64 + Sync,
459        D: Semigroup + Codec64 + Send + Sync,
460        O: Opaque + Codec64,
461    {
462        let machine = self.make_machine(shard_id, diagnostics.clone()).await?;
463        let gc = GarbageCollector::new(machine.clone(), Arc::clone(&self.isolated_runtime));
464
465        let (state, maintenance) = machine
466            .register_critical_reader::<O>(&reader_id, &diagnostics.handle_purpose)
467            .await;
468        maintenance.start_performing(&machine, &gc);
469        let handle = SinceHandle::new(
470            machine,
471            gc,
472            reader_id,
473            state.since,
474            Codec64::decode(state.opaque.0),
475        );
476
477        Ok(handle)
478    }
479
480    /// [Self::open], but returning only a [WriteHandle].
481    ///
482    /// Use this to save latency and a bit of persist traffic if you're just
483    /// going to immediately drop or expire the [ReadHandle].
484    ///
485    /// The `_schema` parameter is currently unused, but should be an object
486    /// that represents the schema of the data in the shard. This will be required
487    /// in the future.
488    #[instrument(level = "debug", fields(shard = %shard_id))]
489    pub async fn open_writer<K, V, T, D>(
490        &self,
491        shard_id: ShardId,
492        key_schema: Arc<K::Schema>,
493        val_schema: Arc<V::Schema>,
494        diagnostics: Diagnostics,
495    ) -> Result<WriteHandle<K, V, T, D>, InvalidUsage<T>>
496    where
497        K: Debug + Codec,
498        V: Debug + Codec,
499        T: Timestamp + Lattice + Codec64 + Sync,
500        D: Semigroup + Ord + Codec64 + Send + Sync,
501    {
502        let machine = self.make_machine(shard_id, diagnostics.clone()).await?;
503        let gc = GarbageCollector::new(machine.clone(), Arc::clone(&self.isolated_runtime));
504
505        // TODO: Because schemas are ordered, as part of the persist schema
506        // changes work, we probably want to build some way to allow persist
507        // users to control the order. For example, maybe a
508        // `PersistClient::compare_and_append_schema(current_schema_id,
509        // next_schema)`. Presumably this would then be passed in to open_writer
510        // instead of us implicitly registering it here.
511        // NB: The overwhelming common case is that this schema is already
512        // registered. In this case, the cmd breaks early and nothing is
513        // written to (or read from) CRDB.
514        let (schema_id, maintenance) = machine.register_schema(&*key_schema, &*val_schema).await;
515        maintenance.start_performing(&machine, &gc);
516        soft_assert_or_log!(
517            schema_id.is_some(),
518            "unable to register schemas {:?} {:?}",
519            key_schema,
520            val_schema,
521        );
522
523        let writer_id = WriterId::new();
524        let schemas = Schemas {
525            id: schema_id,
526            key: key_schema,
527            val: val_schema,
528        };
529        let writer = WriteHandle::new(
530            self.cfg.clone(),
531            Arc::clone(&self.metrics),
532            machine,
533            gc,
534            Arc::clone(&self.blob),
535            writer_id,
536            &diagnostics.handle_purpose,
537            schemas,
538        );
539        Ok(writer)
540    }
541
542    /// Returns the requested schema, if known at the current state.
543    pub async fn get_schema<K, V, T, D>(
544        &self,
545        shard_id: ShardId,
546        schema_id: SchemaId,
547        diagnostics: Diagnostics,
548    ) -> Result<Option<(K::Schema, V::Schema)>, InvalidUsage<T>>
549    where
550        K: Debug + Codec,
551        V: Debug + Codec,
552        T: Timestamp + Lattice + Codec64 + Sync,
553        D: Semigroup + Codec64 + Send + Sync,
554    {
555        let machine = self
556            .make_machine::<K, V, T, D>(shard_id, diagnostics)
557            .await?;
558        Ok(machine.get_schema(schema_id))
559    }
560
561    /// Returns the latest schema registered at the current state.
562    pub async fn latest_schema<K, V, T, D>(
563        &self,
564        shard_id: ShardId,
565        diagnostics: Diagnostics,
566    ) -> Result<Option<(SchemaId, K::Schema, V::Schema)>, InvalidUsage<T>>
567    where
568        K: Debug + Codec,
569        V: Debug + Codec,
570        T: Timestamp + Lattice + Codec64 + Sync,
571        D: Semigroup + Codec64 + Send + Sync,
572    {
573        let machine = self
574            .make_machine::<K, V, T, D>(shard_id, diagnostics)
575            .await?;
576        Ok(machine.latest_schema())
577    }
578
579    /// Registers a new latest schema for the given shard.
580    ///
581    /// This new schema must be [backward_compatible] with all previous schemas
582    /// for this shard. If it's not, [CaESchema::Incompatible] is returned.
583    ///
584    /// [backward_compatible]: mz_persist_types::schema::backward_compatible
585    ///
586    /// To prevent races, the caller must declare what it believes to be the
587    /// latest schema id. If this doesn't match reality,
588    /// [CaESchema::ExpectedMismatch] is returned.
589    pub async fn compare_and_evolve_schema<K, V, T, D>(
590        &self,
591        shard_id: ShardId,
592        expected: SchemaId,
593        key_schema: &K::Schema,
594        val_schema: &V::Schema,
595        diagnostics: Diagnostics,
596    ) -> Result<CaESchema<K, V>, InvalidUsage<T>>
597    where
598        K: Debug + Codec,
599        V: Debug + Codec,
600        T: Timestamp + Lattice + Codec64 + Sync,
601        D: Semigroup + Codec64 + Send + Sync,
602    {
603        let machine = self
604            .make_machine::<K, V, T, D>(shard_id, diagnostics)
605            .await?;
606        let gc = GarbageCollector::new(machine.clone(), Arc::clone(&self.isolated_runtime));
607        let (res, maintenance) = machine
608            .compare_and_evolve_schema(expected, key_schema, val_schema)
609            .await;
610        maintenance.start_performing(&machine, &gc);
611        Ok(res)
612    }
613
614    /// Check if the given shard is in a finalized state; ie. it can no longer be
615    /// read, any data that was written to it is no longer accessible, and we've
616    /// discarded references to that data from state.
617    pub async fn is_finalized<K, V, T, D>(
618        &self,
619        shard_id: ShardId,
620        diagnostics: Diagnostics,
621    ) -> Result<bool, InvalidUsage<T>>
622    where
623        K: Debug + Codec,
624        V: Debug + Codec,
625        T: Timestamp + Lattice + Codec64 + Sync,
626        D: Semigroup + Codec64 + Send + Sync,
627    {
628        let machine = self
629            .make_machine::<K, V, T, D>(shard_id, diagnostics)
630            .await?;
631        Ok(machine.is_finalized())
632    }
633
634    /// If a shard is guaranteed to never be used again, finalize it to delete
635    /// the associated data and release any associated resources. (Except for a
636    /// little state in consensus we use to represent the tombstone.)
637    ///
638    /// The caller should ensure that both the `since` and `upper` of the shard
639    /// have been advanced to `[]`: ie. the shard is no longer writable or readable.
640    /// Otherwise an error is returned.
641    ///
642    /// Once `finalize_shard` has been called, the result of future operations on
643    /// the shard are not defined. They may return errors or succeed as a noop.
644    #[instrument(level = "debug", fields(shard = %shard_id))]
645    pub async fn finalize_shard<K, V, T, D>(
646        &self,
647        shard_id: ShardId,
648        diagnostics: Diagnostics,
649    ) -> Result<(), InvalidUsage<T>>
650    where
651        K: Debug + Codec,
652        V: Debug + Codec,
653        T: Timestamp + Lattice + Codec64 + Sync,
654        D: Semigroup + Codec64 + Send + Sync,
655    {
656        let machine = self
657            .make_machine::<K, V, T, D>(shard_id, diagnostics)
658            .await?;
659
660        let maintenance = machine.become_tombstone().await?;
661        let gc = GarbageCollector::new(machine.clone(), Arc::clone(&self.isolated_runtime));
662
663        let () = maintenance.perform(&machine, &gc).await;
664
665        Ok(())
666    }
667
668    /// Returns the internal state of the shard for debugging and QA.
669    ///
670    /// We'll be thoughtful about making unnecessary changes, but the **output
671    /// of this method needs to be gated from users**, so that it's not subject
672    /// to our backward compatibility guarantees.
673    pub async fn inspect_shard<T: Timestamp + Lattice + Codec64>(
674        &self,
675        shard_id: &ShardId,
676    ) -> Result<impl serde::Serialize, anyhow::Error> {
677        let state_versions = StateVersions::new(
678            self.cfg.clone(),
679            Arc::clone(&self.consensus),
680            Arc::clone(&self.blob),
681            Arc::clone(&self.metrics),
682        );
683        // TODO: Don't fetch all live diffs. Feels like we should pull out a new
684        // method in StateVersions for fetching the latest version of State of a
685        // shard that might or might not exist.
686        let versions = state_versions.fetch_all_live_diffs(shard_id).await;
687        if versions.0.is_empty() {
688            return Err(anyhow::anyhow!("{} does not exist", shard_id));
689        }
690        let state = state_versions
691            .fetch_current_state::<T>(shard_id, versions.0)
692            .await;
693        let state = state.check_ts_codec(shard_id)?;
694        Ok(state)
695    }
696
697    /// Test helper for a [Self::open] call that is expected to succeed.
698    #[cfg(test)]
699    #[track_caller]
700    pub async fn expect_open<K, V, T, D>(
701        &self,
702        shard_id: ShardId,
703    ) -> (WriteHandle<K, V, T, D>, ReadHandle<K, V, T, D>)
704    where
705        K: Debug + Codec,
706        V: Debug + Codec,
707        T: Timestamp + Lattice + Codec64 + Sync,
708        D: Semigroup + Ord + Codec64 + Send + Sync,
709        K::Schema: Default,
710        V::Schema: Default,
711    {
712        self.open(
713            shard_id,
714            Arc::new(K::Schema::default()),
715            Arc::new(V::Schema::default()),
716            Diagnostics::for_tests(),
717            true,
718        )
719        .await
720        .expect("codec mismatch")
721    }
722
723    /// Return the metrics being used by this client.
724    ///
725    /// Only exposed for tests, persistcli, and benchmarks.
726    pub fn metrics(&self) -> &Arc<Metrics> {
727        &self.metrics
728    }
729}
730
731#[cfg(test)]
732mod tests {
733    use std::future::Future;
734    use std::mem;
735    use std::pin::Pin;
736    use std::task::Context;
737    use std::time::Duration;
738
739    use differential_dataflow::consolidation::consolidate_updates;
740    use differential_dataflow::lattice::Lattice;
741    use futures_task::noop_waker;
742    use mz_dyncfg::ConfigUpdates;
743    use mz_ore::assert_ok;
744    use mz_persist::indexed::encoding::BlobTraceBatchPart;
745    use mz_persist::workload::DataGenerator;
746    use mz_persist_types::codec_impls::{StringSchema, VecU8Schema};
747    use mz_proto::protobuf_roundtrip;
748    use proptest::prelude::*;
749    use timely::order::PartialOrder;
750    use timely::progress::Antichain;
751
752    use crate::batch::BLOB_TARGET_SIZE;
753    use crate::cache::PersistClientCache;
754    use crate::cfg::BATCH_BUILDER_MAX_OUTSTANDING_PARTS;
755    use crate::error::{CodecConcreteType, CodecMismatch, UpperMismatch};
756    use crate::internal::paths::BlobKey;
757    use crate::read::ListenEvent;
758
759    use super::*;
760
761    pub fn new_test_client_cache(dyncfgs: &ConfigUpdates) -> PersistClientCache {
762        // Configure an aggressively small blob_target_size so we get some
763        // amount of coverage of that in tests. Similarly, for max_outstanding.
764        let mut cache = PersistClientCache::new_no_metrics();
765        cache.cfg.set_config(&BLOB_TARGET_SIZE, 10);
766        cache
767            .cfg
768            .set_config(&BATCH_BUILDER_MAX_OUTSTANDING_PARTS, 1);
769        dyncfgs.apply(cache.cfg());
770
771        // Enable compaction in tests to ensure we get coverage.
772        cache.cfg.compaction_enabled = true;
773        cache
774    }
775
776    pub async fn new_test_client(dyncfgs: &ConfigUpdates) -> PersistClient {
777        let cache = new_test_client_cache(dyncfgs);
778        cache
779            .open(PersistLocation::new_in_mem())
780            .await
781            .expect("client construction failed")
782    }
783
784    pub fn all_ok<'a, K, V, T, D, I>(
785        iter: I,
786        as_of: T,
787    ) -> Vec<((Result<K, String>, Result<V, String>), T, D)>
788    where
789        K: Ord + Clone + 'a,
790        V: Ord + Clone + 'a,
791        T: Timestamp + Lattice + Clone + 'a,
792        D: Semigroup + Clone + 'a,
793        I: IntoIterator<Item = &'a ((K, V), T, D)>,
794    {
795        let as_of = Antichain::from_elem(as_of);
796        let mut ret = iter
797            .into_iter()
798            .map(|((k, v), t, d)| {
799                let mut t = t.clone();
800                t.advance_by(as_of.borrow());
801                ((Ok(k.clone()), Ok(v.clone())), t, d.clone())
802            })
803            .collect();
804        consolidate_updates(&mut ret);
805        ret
806    }
807
808    pub async fn expect_fetch_part<K, V, T, D>(
809        blob: &dyn Blob,
810        key: &BlobKey,
811        metrics: &Metrics,
812        read_schemas: &Schemas<K, V>,
813    ) -> (
814        BlobTraceBatchPart<T>,
815        Vec<((Result<K, String>, Result<V, String>), T, D)>,
816    )
817    where
818        K: Codec,
819        V: Codec,
820        T: Timestamp + Codec64,
821        D: Codec64,
822    {
823        let value = blob
824            .get(key)
825            .await
826            .expect("failed to fetch part")
827            .expect("missing part");
828        let mut part =
829            BlobTraceBatchPart::decode(&value, &metrics.columnar).expect("failed to decode part");
830        // Ensure codec data is present even if it was not generated at write time.
831        let _ = part
832            .updates
833            .get_or_make_codec::<K, V>(&read_schemas.key, &read_schemas.val);
834        let mut updates = Vec::new();
835        // TODO(bkirwi): switch to structured data in tests
836        for ((k, v), t, d) in part.updates.records().expect("codec data").iter() {
837            updates.push((
838                (
839                    K::decode(k, &read_schemas.key),
840                    V::decode(v, &read_schemas.val),
841                ),
842                T::decode(t),
843                D::decode(d),
844            ));
845        }
846        (part, updates)
847    }
848
849    #[mz_persist_proc::test(tokio::test)]
850    #[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented
851    async fn sanity_check(dyncfgs: ConfigUpdates) {
852        let data = [
853            (("1".to_owned(), "one".to_owned()), 1, 1),
854            (("2".to_owned(), "two".to_owned()), 2, 1),
855            (("3".to_owned(), "three".to_owned()), 3, 1),
856        ];
857
858        let (mut write, mut read) = new_test_client(&dyncfgs)
859            .await
860            .expect_open::<String, String, u64, i64>(ShardId::new())
861            .await;
862        assert_eq!(write.upper(), &Antichain::from_elem(u64::minimum()));
863        assert_eq!(read.since(), &Antichain::from_elem(u64::minimum()));
864
865        // Write a [0,3) batch.
866        write
867            .expect_append(&data[..2], write.upper().clone(), vec![3])
868            .await;
869        assert_eq!(write.upper(), &Antichain::from_elem(3));
870
871        // Grab a snapshot and listener as_of 1. Snapshot should only have part of what we wrote.
872        assert_eq!(
873            read.expect_snapshot_and_fetch(1).await,
874            all_ok(&data[..1], 1)
875        );
876
877        let mut listen = read.clone("").await.expect_listen(1).await;
878
879        // Write a [3,4) batch.
880        write
881            .expect_append(&data[2..], write.upper().clone(), vec![4])
882            .await;
883        assert_eq!(write.upper(), &Antichain::from_elem(4));
884
885        // Listen should have part of the initial write plus the new one.
886        assert_eq!(
887            listen.read_until(&4).await,
888            (all_ok(&data[1..], 1), Antichain::from_elem(4))
889        );
890
891        // Downgrading the since is tracked locally (but otherwise is a no-op).
892        read.downgrade_since(&Antichain::from_elem(2)).await;
893        assert_eq!(read.since(), &Antichain::from_elem(2));
894    }
895
896    // Sanity check that the open_reader and open_writer calls work.
897    #[mz_persist_proc::test(tokio::test)]
898    #[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented
899    async fn open_reader_writer(dyncfgs: ConfigUpdates) {
900        let data = vec![
901            (("1".to_owned(), "one".to_owned()), 1, 1),
902            (("2".to_owned(), "two".to_owned()), 2, 1),
903            (("3".to_owned(), "three".to_owned()), 3, 1),
904        ];
905
906        let shard_id = ShardId::new();
907        let client = new_test_client(&dyncfgs).await;
908        let mut write1 = client
909            .open_writer::<String, String, u64, i64>(
910                shard_id,
911                Arc::new(StringSchema),
912                Arc::new(StringSchema),
913                Diagnostics::for_tests(),
914            )
915            .await
916            .expect("codec mismatch");
917        let mut read1 = client
918            .open_leased_reader::<String, String, u64, i64>(
919                shard_id,
920                Arc::new(StringSchema),
921                Arc::new(StringSchema),
922                Diagnostics::for_tests(),
923                true,
924            )
925            .await
926            .expect("codec mismatch");
927        let mut read2 = client
928            .open_leased_reader::<String, String, u64, i64>(
929                shard_id,
930                Arc::new(StringSchema),
931                Arc::new(StringSchema),
932                Diagnostics::for_tests(),
933                true,
934            )
935            .await
936            .expect("codec mismatch");
937        let mut write2 = client
938            .open_writer::<String, String, u64, i64>(
939                shard_id,
940                Arc::new(StringSchema),
941                Arc::new(StringSchema),
942                Diagnostics::for_tests(),
943            )
944            .await
945            .expect("codec mismatch");
946
947        write2.expect_compare_and_append(&data[..1], 0, 2).await;
948        assert_eq!(
949            read2.expect_snapshot_and_fetch(1).await,
950            all_ok(&data[..1], 1)
951        );
952        write1.expect_compare_and_append(&data[1..], 2, 4).await;
953        assert_eq!(read1.expect_snapshot_and_fetch(3).await, all_ok(&data, 3));
954    }
955
956    #[mz_persist_proc::test(tokio::test)]
957    #[cfg_attr(miri, ignore)] // too slow
958    async fn invalid_usage(dyncfgs: ConfigUpdates) {
959        let data = vec![
960            (("1".to_owned(), "one".to_owned()), 1, 1),
961            (("2".to_owned(), "two".to_owned()), 2, 1),
962            (("3".to_owned(), "three".to_owned()), 3, 1),
963        ];
964
965        let shard_id0 = "s00000000-0000-0000-0000-000000000000"
966            .parse::<ShardId>()
967            .expect("invalid shard id");
968        let mut client = new_test_client(&dyncfgs).await;
969
970        let (mut write0, mut read0) = client
971            .expect_open::<String, String, u64, i64>(shard_id0)
972            .await;
973
974        write0.expect_compare_and_append(&data, 0, 4).await;
975
976        // InvalidUsage from PersistClient methods.
977        {
978            fn codecs(
979                k: &str,
980                v: &str,
981                t: &str,
982                d: &str,
983            ) -> (String, String, String, String, Option<CodecConcreteType>) {
984                (k.to_owned(), v.to_owned(), t.to_owned(), d.to_owned(), None)
985            }
986
987            client.shared_states = Arc::new(StateCache::new_no_metrics());
988            assert_eq!(
989                client
990                    .open::<Vec<u8>, String, u64, i64>(
991                        shard_id0,
992                        Arc::new(VecU8Schema),
993                        Arc::new(StringSchema),
994                        Diagnostics::for_tests(),
995                        true,
996                    )
997                    .await
998                    .unwrap_err(),
999                InvalidUsage::CodecMismatch(Box::new(CodecMismatch {
1000                    requested: codecs("Vec<u8>", "String", "u64", "i64"),
1001                    actual: codecs("String", "String", "u64", "i64"),
1002                }))
1003            );
1004            assert_eq!(
1005                client
1006                    .open::<String, Vec<u8>, u64, i64>(
1007                        shard_id0,
1008                        Arc::new(StringSchema),
1009                        Arc::new(VecU8Schema),
1010                        Diagnostics::for_tests(),
1011                        true,
1012                    )
1013                    .await
1014                    .unwrap_err(),
1015                InvalidUsage::CodecMismatch(Box::new(CodecMismatch {
1016                    requested: codecs("String", "Vec<u8>", "u64", "i64"),
1017                    actual: codecs("String", "String", "u64", "i64"),
1018                }))
1019            );
1020            assert_eq!(
1021                client
1022                    .open::<String, String, i64, i64>(
1023                        shard_id0,
1024                        Arc::new(StringSchema),
1025                        Arc::new(StringSchema),
1026                        Diagnostics::for_tests(),
1027                        true,
1028                    )
1029                    .await
1030                    .unwrap_err(),
1031                InvalidUsage::CodecMismatch(Box::new(CodecMismatch {
1032                    requested: codecs("String", "String", "i64", "i64"),
1033                    actual: codecs("String", "String", "u64", "i64"),
1034                }))
1035            );
1036            assert_eq!(
1037                client
1038                    .open::<String, String, u64, u64>(
1039                        shard_id0,
1040                        Arc::new(StringSchema),
1041                        Arc::new(StringSchema),
1042                        Diagnostics::for_tests(),
1043                        true,
1044                    )
1045                    .await
1046                    .unwrap_err(),
1047                InvalidUsage::CodecMismatch(Box::new(CodecMismatch {
1048                    requested: codecs("String", "String", "u64", "u64"),
1049                    actual: codecs("String", "String", "u64", "i64"),
1050                }))
1051            );
1052
1053            // open_reader and open_writer end up using the same checks, so just
1054            // verify one type each to verify the plumbing instead of the full
1055            // set.
1056            assert_eq!(
1057                client
1058                    .open_leased_reader::<Vec<u8>, String, u64, i64>(
1059                        shard_id0,
1060                        Arc::new(VecU8Schema),
1061                        Arc::new(StringSchema),
1062                        Diagnostics::for_tests(),
1063                        true,
1064                    )
1065                    .await
1066                    .unwrap_err(),
1067                InvalidUsage::CodecMismatch(Box::new(CodecMismatch {
1068                    requested: codecs("Vec<u8>", "String", "u64", "i64"),
1069                    actual: codecs("String", "String", "u64", "i64"),
1070                }))
1071            );
1072            assert_eq!(
1073                client
1074                    .open_writer::<Vec<u8>, String, u64, i64>(
1075                        shard_id0,
1076                        Arc::new(VecU8Schema),
1077                        Arc::new(StringSchema),
1078                        Diagnostics::for_tests(),
1079                    )
1080                    .await
1081                    .unwrap_err(),
1082                InvalidUsage::CodecMismatch(Box::new(CodecMismatch {
1083                    requested: codecs("Vec<u8>", "String", "u64", "i64"),
1084                    actual: codecs("String", "String", "u64", "i64"),
1085                }))
1086            );
1087        }
1088
1089        // InvalidUsage from ReadHandle methods.
1090        {
1091            let snap = read0
1092                .snapshot(Antichain::from_elem(3))
1093                .await
1094                .expect("cannot serve requested as_of");
1095
1096            let shard_id1 = "s11111111-1111-1111-1111-111111111111"
1097                .parse::<ShardId>()
1098                .expect("invalid shard id");
1099            let mut fetcher1 = client
1100                .create_batch_fetcher::<String, String, u64, i64>(
1101                    shard_id1,
1102                    Default::default(),
1103                    Default::default(),
1104                    false,
1105                    Diagnostics::for_tests(),
1106                )
1107                .await
1108                .unwrap();
1109            for batch in snap {
1110                let res = fetcher1.fetch_leased_part(&batch).await;
1111                assert_eq!(
1112                    res.unwrap_err(),
1113                    InvalidUsage::BatchNotFromThisShard {
1114                        batch_shard: shard_id0,
1115                        handle_shard: shard_id1,
1116                    }
1117                );
1118            }
1119        }
1120
1121        // InvalidUsage from WriteHandle methods.
1122        {
1123            let ts3 = &data[2];
1124            assert_eq!(ts3.1, 3);
1125            let ts3 = vec![ts3.clone()];
1126
1127            // WriteHandle::append also covers append_batch,
1128            // compare_and_append_batch, compare_and_append.
1129            assert_eq!(
1130                write0
1131                    .append(&ts3, Antichain::from_elem(4), Antichain::from_elem(5))
1132                    .await
1133                    .unwrap_err(),
1134                InvalidUsage::UpdateNotBeyondLower {
1135                    ts: 3,
1136                    lower: Antichain::from_elem(4),
1137                },
1138            );
1139            assert_eq!(
1140                write0
1141                    .append(&ts3, Antichain::from_elem(2), Antichain::from_elem(3))
1142                    .await
1143                    .unwrap_err(),
1144                InvalidUsage::UpdateBeyondUpper {
1145                    ts: 3,
1146                    expected_upper: Antichain::from_elem(3),
1147                },
1148            );
1149            // NB unlike the previous tests, this one has empty updates.
1150            assert_eq!(
1151                write0
1152                    .append(&data[..0], Antichain::from_elem(3), Antichain::from_elem(2))
1153                    .await
1154                    .unwrap_err(),
1155                InvalidUsage::InvalidBounds {
1156                    lower: Antichain::from_elem(3),
1157                    upper: Antichain::from_elem(2),
1158                },
1159            );
1160
1161            // Tests for the BatchBuilder.
1162            assert_eq!(
1163                write0
1164                    .builder(Antichain::from_elem(3))
1165                    .finish(Antichain::from_elem(2))
1166                    .await
1167                    .unwrap_err(),
1168                InvalidUsage::InvalidBounds {
1169                    lower: Antichain::from_elem(3),
1170                    upper: Antichain::from_elem(2)
1171                },
1172            );
1173            let batch = write0
1174                .batch(&ts3, Antichain::from_elem(3), Antichain::from_elem(4))
1175                .await
1176                .expect("invalid usage");
1177            assert_eq!(
1178                write0
1179                    .append_batch(batch, Antichain::from_elem(4), Antichain::from_elem(5))
1180                    .await
1181                    .unwrap_err(),
1182                InvalidUsage::InvalidBatchBounds {
1183                    batch_lower: Antichain::from_elem(3),
1184                    batch_upper: Antichain::from_elem(4),
1185                    append_lower: Antichain::from_elem(4),
1186                    append_upper: Antichain::from_elem(5),
1187                },
1188            );
1189            let batch = write0
1190                .batch(&ts3, Antichain::from_elem(3), Antichain::from_elem(4))
1191                .await
1192                .expect("invalid usage");
1193            assert_eq!(
1194                write0
1195                    .append_batch(batch, Antichain::from_elem(2), Antichain::from_elem(3))
1196                    .await
1197                    .unwrap_err(),
1198                InvalidUsage::InvalidBatchBounds {
1199                    batch_lower: Antichain::from_elem(3),
1200                    batch_upper: Antichain::from_elem(4),
1201                    append_lower: Antichain::from_elem(2),
1202                    append_upper: Antichain::from_elem(3),
1203                },
1204            );
1205            let batch = write0
1206                .batch(&ts3, Antichain::from_elem(3), Antichain::from_elem(4))
1207                .await
1208                .expect("invalid usage");
1209            // NB unlike the others, this one uses matches! because it's
1210            // non-deterministic (the key)
1211            assert!(matches!(
1212                write0
1213                    .append_batch(batch, Antichain::from_elem(3), Antichain::from_elem(3))
1214                    .await
1215                    .unwrap_err(),
1216                InvalidUsage::InvalidEmptyTimeInterval { .. }
1217            ));
1218        }
1219    }
1220
1221    #[mz_persist_proc::test(tokio::test)]
1222    #[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented
1223    async fn multiple_shards(dyncfgs: ConfigUpdates) {
1224        let data1 = [
1225            (("1".to_owned(), "one".to_owned()), 1, 1),
1226            (("2".to_owned(), "two".to_owned()), 2, 1),
1227        ];
1228
1229        let data2 = [(("1".to_owned(), ()), 1, 1), (("2".to_owned(), ()), 2, 1)];
1230
1231        let client = new_test_client(&dyncfgs).await;
1232
1233        let (mut write1, mut read1) = client
1234            .expect_open::<String, String, u64, i64>(ShardId::new())
1235            .await;
1236
1237        // Different types, so that checks would fail in case we were not separating these
1238        // collections internally.
1239        let (mut write2, mut read2) = client
1240            .expect_open::<String, (), u64, i64>(ShardId::new())
1241            .await;
1242
1243        write1
1244            .expect_compare_and_append(&data1[..], u64::minimum(), 3)
1245            .await;
1246
1247        write2
1248            .expect_compare_and_append(&data2[..], u64::minimum(), 3)
1249            .await;
1250
1251        assert_eq!(
1252            read1.expect_snapshot_and_fetch(2).await,
1253            all_ok(&data1[..], 2)
1254        );
1255
1256        assert_eq!(
1257            read2.expect_snapshot_and_fetch(2).await,
1258            all_ok(&data2[..], 2)
1259        );
1260    }
1261
1262    #[mz_persist_proc::test(tokio::test)]
1263    #[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented
1264    async fn fetch_upper(dyncfgs: ConfigUpdates) {
1265        let data = [
1266            (("1".to_owned(), "one".to_owned()), 1, 1),
1267            (("2".to_owned(), "two".to_owned()), 2, 1),
1268        ];
1269
1270        let client = new_test_client(&dyncfgs).await;
1271
1272        let shard_id = ShardId::new();
1273
1274        let (mut write1, _read1) = client
1275            .expect_open::<String, String, u64, i64>(shard_id)
1276            .await;
1277
1278        let (mut write2, _read2) = client
1279            .expect_open::<String, String, u64, i64>(shard_id)
1280            .await;
1281
1282        write1
1283            .expect_append(&data[..], write1.upper().clone(), vec![3])
1284            .await;
1285
1286        // The shard-global upper does advance, even if this writer didn't advance its local upper.
1287        assert_eq!(write2.fetch_recent_upper().await, &Antichain::from_elem(3));
1288
1289        // The writer-local upper should advance, even if it was another writer
1290        // that advanced the frontier.
1291        assert_eq!(write2.upper(), &Antichain::from_elem(3));
1292    }
1293
1294    #[mz_persist_proc::test(tokio::test)]
1295    #[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented
1296    async fn append_with_invalid_upper(dyncfgs: ConfigUpdates) {
1297        let data = [
1298            (("1".to_owned(), "one".to_owned()), 1, 1),
1299            (("2".to_owned(), "two".to_owned()), 2, 1),
1300        ];
1301
1302        let client = new_test_client(&dyncfgs).await;
1303
1304        let shard_id = ShardId::new();
1305
1306        let (mut write, _read) = client
1307            .expect_open::<String, String, u64, i64>(shard_id)
1308            .await;
1309
1310        write
1311            .expect_append(&data[..], write.upper().clone(), vec![3])
1312            .await;
1313
1314        let data = [
1315            (("5".to_owned(), "fünf".to_owned()), 5, 1),
1316            (("6".to_owned(), "sechs".to_owned()), 6, 1),
1317        ];
1318        let res = write
1319            .append(
1320                data.iter(),
1321                Antichain::from_elem(5),
1322                Antichain::from_elem(7),
1323            )
1324            .await;
1325        assert_eq!(
1326            res,
1327            Ok(Err(UpperMismatch {
1328                expected: Antichain::from_elem(5),
1329                current: Antichain::from_elem(3)
1330            }))
1331        );
1332
1333        // Writing with an outdated upper updates the write handle's upper to the correct upper.
1334        assert_eq!(write.upper(), &Antichain::from_elem(3));
1335    }
1336
1337    // Make sure that the API structs are Sync + Send, so that they can be used in async tasks.
1338    // NOTE: This is a compile-time only test. If it compiles, we're good.
1339    #[allow(unused)]
1340    async fn sync_send(dyncfgs: ConfigUpdates) {
1341        mz_ore::test::init_logging();
1342
1343        fn is_send_sync<T: Send + Sync>(_x: T) -> bool {
1344            true
1345        }
1346
1347        let client = new_test_client(&dyncfgs).await;
1348
1349        let (write, read) = client
1350            .expect_open::<String, String, u64, i64>(ShardId::new())
1351            .await;
1352
1353        assert!(is_send_sync(client));
1354        assert!(is_send_sync(write));
1355        assert!(is_send_sync(read));
1356    }
1357
1358    #[mz_persist_proc::test(tokio::test)]
1359    #[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented
1360    async fn compare_and_append(dyncfgs: ConfigUpdates) {
1361        let data = vec![
1362            (("1".to_owned(), "one".to_owned()), 1, 1),
1363            (("2".to_owned(), "two".to_owned()), 2, 1),
1364            (("3".to_owned(), "three".to_owned()), 3, 1),
1365        ];
1366
1367        let id = ShardId::new();
1368        let client = new_test_client(&dyncfgs).await;
1369        let (mut write1, mut read) = client.expect_open::<String, String, u64, i64>(id).await;
1370
1371        let (mut write2, _read) = client.expect_open::<String, String, u64, i64>(id).await;
1372
1373        assert_eq!(write1.upper(), &Antichain::from_elem(u64::minimum()));
1374        assert_eq!(write2.upper(), &Antichain::from_elem(u64::minimum()));
1375        assert_eq!(read.since(), &Antichain::from_elem(u64::minimum()));
1376
1377        // Write a [0,3) batch.
1378        write1
1379            .expect_compare_and_append(&data[..2], u64::minimum(), 3)
1380            .await;
1381        assert_eq!(write1.upper(), &Antichain::from_elem(3));
1382
1383        assert_eq!(
1384            read.expect_snapshot_and_fetch(2).await,
1385            all_ok(&data[..2], 2)
1386        );
1387
1388        // Try and write with a wrong expected upper.
1389        let res = write2
1390            .compare_and_append(
1391                &data[..2],
1392                Antichain::from_elem(u64::minimum()),
1393                Antichain::from_elem(3),
1394            )
1395            .await;
1396        assert_eq!(
1397            res,
1398            Ok(Err(UpperMismatch {
1399                expected: Antichain::from_elem(u64::minimum()),
1400                current: Antichain::from_elem(3)
1401            }))
1402        );
1403
1404        // A failed write updates our local cache of the shard upper.
1405        assert_eq!(write2.upper(), &Antichain::from_elem(3));
1406
1407        // Try again with a good expected upper.
1408        write2.expect_compare_and_append(&data[2..], 3, 4).await;
1409
1410        assert_eq!(write2.upper(), &Antichain::from_elem(4));
1411
1412        assert_eq!(read.expect_snapshot_and_fetch(3).await, all_ok(&data, 3));
1413    }
1414
1415    #[mz_persist_proc::test(tokio::test)]
1416    #[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented
1417    async fn overlapping_append(dyncfgs: ConfigUpdates) {
1418        mz_ore::test::init_logging_default("info");
1419
1420        let data = vec![
1421            (("1".to_owned(), "one".to_owned()), 1, 1),
1422            (("2".to_owned(), "two".to_owned()), 2, 1),
1423            (("3".to_owned(), "three".to_owned()), 3, 1),
1424            (("4".to_owned(), "vier".to_owned()), 4, 1),
1425            (("5".to_owned(), "cinque".to_owned()), 5, 1),
1426        ];
1427
1428        let id = ShardId::new();
1429        let client = new_test_client(&dyncfgs).await;
1430
1431        let (mut write1, mut read) = client.expect_open::<String, String, u64, i64>(id).await;
1432
1433        let (mut write2, _read) = client.expect_open::<String, String, u64, i64>(id).await;
1434
1435        // Grab a listener before we do any writing
1436        let mut listen = read.clone("").await.expect_listen(0).await;
1437
1438        // Write a [0,3) batch.
1439        write1
1440            .expect_append(&data[..2], write1.upper().clone(), vec![3])
1441            .await;
1442        assert_eq!(write1.upper(), &Antichain::from_elem(3));
1443
1444        // Write a [0,5) batch with the second writer.
1445        write2
1446            .expect_append(&data[..4], write2.upper().clone(), vec![5])
1447            .await;
1448        assert_eq!(write2.upper(), &Antichain::from_elem(5));
1449
1450        // Write a [3,6) batch with the first writer.
1451        write1
1452            .expect_append(&data[2..5], write1.upper().clone(), vec![6])
1453            .await;
1454        assert_eq!(write1.upper(), &Antichain::from_elem(6));
1455
1456        assert_eq!(read.expect_snapshot_and_fetch(5).await, all_ok(&data, 5));
1457
1458        assert_eq!(
1459            listen.read_until(&6).await,
1460            (all_ok(&data[..], 1), Antichain::from_elem(6))
1461        );
1462    }
1463
1464    // Appends need to be contiguous for a shard, meaning the lower of an appended batch must not
1465    // be in advance of the current shard upper.
1466    #[mz_persist_proc::test(tokio::test)]
1467    #[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented
1468    async fn contiguous_append(dyncfgs: ConfigUpdates) {
1469        let data = vec![
1470            (("1".to_owned(), "one".to_owned()), 1, 1),
1471            (("2".to_owned(), "two".to_owned()), 2, 1),
1472            (("3".to_owned(), "three".to_owned()), 3, 1),
1473            (("4".to_owned(), "vier".to_owned()), 4, 1),
1474            (("5".to_owned(), "cinque".to_owned()), 5, 1),
1475        ];
1476
1477        let id = ShardId::new();
1478        let client = new_test_client(&dyncfgs).await;
1479
1480        let (mut write, mut read) = client.expect_open::<String, String, u64, i64>(id).await;
1481
1482        // Write a [0,3) batch.
1483        write
1484            .expect_append(&data[..2], write.upper().clone(), vec![3])
1485            .await;
1486        assert_eq!(write.upper(), &Antichain::from_elem(3));
1487
1488        // Appending a non-contiguous batch should fail.
1489        // Write a [5,6) batch with the second writer.
1490        let result = write
1491            .append(
1492                &data[4..5],
1493                Antichain::from_elem(5),
1494                Antichain::from_elem(6),
1495            )
1496            .await;
1497        assert_eq!(
1498            result,
1499            Ok(Err(UpperMismatch {
1500                expected: Antichain::from_elem(5),
1501                current: Antichain::from_elem(3)
1502            }))
1503        );
1504
1505        // Fixing the lower to make the write contiguous should make the append succeed.
1506        write.expect_append(&data[2..5], vec![3], vec![6]).await;
1507        assert_eq!(write.upper(), &Antichain::from_elem(6));
1508
1509        assert_eq!(read.expect_snapshot_and_fetch(5).await, all_ok(&data, 5));
1510    }
1511
1512    // Per-writer appends can be non-contiguous, as long as appends to the shard from all writers
1513    // combined are contiguous.
1514    #[mz_persist_proc::test(tokio::test)]
1515    #[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented
1516    async fn noncontiguous_append_per_writer(dyncfgs: ConfigUpdates) {
1517        let data = vec![
1518            (("1".to_owned(), "one".to_owned()), 1, 1),
1519            (("2".to_owned(), "two".to_owned()), 2, 1),
1520            (("3".to_owned(), "three".to_owned()), 3, 1),
1521            (("4".to_owned(), "vier".to_owned()), 4, 1),
1522            (("5".to_owned(), "cinque".to_owned()), 5, 1),
1523        ];
1524
1525        let id = ShardId::new();
1526        let client = new_test_client(&dyncfgs).await;
1527
1528        let (mut write1, mut read) = client.expect_open::<String, String, u64, i64>(id).await;
1529
1530        let (mut write2, _read) = client.expect_open::<String, String, u64, i64>(id).await;
1531
1532        // Write a [0,3) batch with writer 1.
1533        write1
1534            .expect_append(&data[..2], write1.upper().clone(), vec![3])
1535            .await;
1536        assert_eq!(write1.upper(), &Antichain::from_elem(3));
1537
1538        // Write a [3,5) batch with writer 2.
1539        write2.upper = Antichain::from_elem(3);
1540        write2
1541            .expect_append(&data[2..4], write2.upper().clone(), vec![5])
1542            .await;
1543        assert_eq!(write2.upper(), &Antichain::from_elem(5));
1544
1545        // Write a [5,6) batch with writer 1.
1546        write1.upper = Antichain::from_elem(5);
1547        write1
1548            .expect_append(&data[4..5], write1.upper().clone(), vec![6])
1549            .await;
1550        assert_eq!(write1.upper(), &Antichain::from_elem(6));
1551
1552        assert_eq!(read.expect_snapshot_and_fetch(5).await, all_ok(&data, 5));
1553    }
1554
1555    // Compare_and_appends need to be contiguous for a shard, meaning the lower of an appended
1556    // batch needs to match the current shard upper.
1557    #[mz_persist_proc::test(tokio::test)]
1558    #[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented
1559    async fn contiguous_compare_and_append(dyncfgs: ConfigUpdates) {
1560        let data = vec![
1561            (("1".to_owned(), "one".to_owned()), 1, 1),
1562            (("2".to_owned(), "two".to_owned()), 2, 1),
1563            (("3".to_owned(), "three".to_owned()), 3, 1),
1564            (("4".to_owned(), "vier".to_owned()), 4, 1),
1565            (("5".to_owned(), "cinque".to_owned()), 5, 1),
1566        ];
1567
1568        let id = ShardId::new();
1569        let client = new_test_client(&dyncfgs).await;
1570
1571        let (mut write, mut read) = client.expect_open::<String, String, u64, i64>(id).await;
1572
1573        // Write a [0,3) batch.
1574        write.expect_compare_and_append(&data[..2], 0, 3).await;
1575        assert_eq!(write.upper(), &Antichain::from_elem(3));
1576
1577        // Appending a non-contiguous batch should fail.
1578        // Write a [5,6) batch with the second writer.
1579        let result = write
1580            .compare_and_append(
1581                &data[4..5],
1582                Antichain::from_elem(5),
1583                Antichain::from_elem(6),
1584            )
1585            .await;
1586        assert_eq!(
1587            result,
1588            Ok(Err(UpperMismatch {
1589                expected: Antichain::from_elem(5),
1590                current: Antichain::from_elem(3)
1591            }))
1592        );
1593
1594        // Writing with the correct expected upper to make the write contiguous should make the
1595        // append succeed.
1596        write.expect_compare_and_append(&data[2..5], 3, 6).await;
1597        assert_eq!(write.upper(), &Antichain::from_elem(6));
1598
1599        assert_eq!(read.expect_snapshot_and_fetch(5).await, all_ok(&data, 5));
1600    }
1601
1602    // Per-writer compare_and_appends can be non-contiguous, as long as appends to the shard from
1603    // all writers combined are contiguous.
1604    #[mz_persist_proc::test(tokio::test)]
1605    #[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented
1606    async fn noncontiguous_compare_and_append_per_writer(dyncfgs: ConfigUpdates) {
1607        let data = vec![
1608            (("1".to_owned(), "one".to_owned()), 1, 1),
1609            (("2".to_owned(), "two".to_owned()), 2, 1),
1610            (("3".to_owned(), "three".to_owned()), 3, 1),
1611            (("4".to_owned(), "vier".to_owned()), 4, 1),
1612            (("5".to_owned(), "cinque".to_owned()), 5, 1),
1613        ];
1614
1615        let id = ShardId::new();
1616        let client = new_test_client(&dyncfgs).await;
1617
1618        let (mut write1, mut read) = client.expect_open::<String, String, u64, i64>(id).await;
1619
1620        let (mut write2, _read) = client.expect_open::<String, String, u64, i64>(id).await;
1621
1622        // Write a [0,3) batch with writer 1.
1623        write1.expect_compare_and_append(&data[..2], 0, 3).await;
1624        assert_eq!(write1.upper(), &Antichain::from_elem(3));
1625
1626        // Write a [3,5) batch with writer 2.
1627        write2.expect_compare_and_append(&data[2..4], 3, 5).await;
1628        assert_eq!(write2.upper(), &Antichain::from_elem(5));
1629
1630        // Write a [5,6) batch with writer 1.
1631        write1.expect_compare_and_append(&data[4..5], 5, 6).await;
1632        assert_eq!(write1.upper(), &Antichain::from_elem(6));
1633
1634        assert_eq!(read.expect_snapshot_and_fetch(5).await, all_ok(&data, 5));
1635    }
1636
1637    #[mz_ore::test]
1638    fn fmt_ids() {
1639        assert_eq!(
1640            format!("{}", LeasedReaderId([0u8; 16])),
1641            "r00000000-0000-0000-0000-000000000000"
1642        );
1643        assert_eq!(
1644            format!("{:?}", LeasedReaderId([0u8; 16])),
1645            "LeasedReaderId(00000000-0000-0000-0000-000000000000)"
1646        );
1647    }
1648
1649    #[mz_persist_proc::test(tokio::test(flavor = "multi_thread"))]
1650    #[cfg_attr(miri, ignore)] // error: unsupported operation: integer-to-pointer casts and `ptr::from_exposed_addr` are not supported with `-Zmiri-strict-provenance`
1651    async fn concurrency(dyncfgs: ConfigUpdates) {
1652        let data = DataGenerator::small();
1653
1654        const NUM_WRITERS: usize = 2;
1655        let id = ShardId::new();
1656        let client = new_test_client(&dyncfgs).await;
1657        let mut handles = Vec::<mz_ore::task::JoinHandle<()>>::new();
1658        for idx in 0..NUM_WRITERS {
1659            let (data, client) = (data.clone(), client.clone());
1660
1661            let (batch_tx, mut batch_rx) = tokio::sync::mpsc::channel(1);
1662
1663            let client1 = client.clone();
1664            let handle = mz_ore::task::spawn(|| format!("writer-{}", idx), async move {
1665                let (write, _) = client1.expect_open::<Vec<u8>, Vec<u8>, u64, i64>(id).await;
1666                let mut current_upper = 0;
1667                for batch in data.batches() {
1668                    let new_upper = match batch.get(batch.len() - 1) {
1669                        Some((_, max_ts, _)) => u64::decode(max_ts) + 1,
1670                        None => continue,
1671                    };
1672                    // Because we (intentionally) call open inside the task,
1673                    // some other writer may have raced ahead and already
1674                    // appended some data before this one was registered. As a
1675                    // result, this writer may not be starting with an upper of
1676                    // the initial empty antichain. This is nice because it
1677                    // mimics how a real HA source would work, but it means we
1678                    // have to skip any batches that have already been committed
1679                    // (otherwise our new_upper would be before our upper).
1680                    //
1681                    // Note however, that unlike a real source, our
1682                    // DataGenerator-derived batches are guaranteed to be
1683                    // chunked along the same boundaries. This means we don't
1684                    // have to consider partial batches when generating the
1685                    // updates below.
1686                    if PartialOrder::less_equal(&Antichain::from_elem(new_upper), write.upper()) {
1687                        continue;
1688                    }
1689
1690                    let current_upper_chain = Antichain::from_elem(current_upper);
1691                    current_upper = new_upper;
1692                    let new_upper_chain = Antichain::from_elem(new_upper);
1693                    let mut builder = write.builder(current_upper_chain);
1694
1695                    for ((k, v), t, d) in batch.iter() {
1696                        builder
1697                            .add(&k.to_vec(), &v.to_vec(), &u64::decode(t), &i64::decode(d))
1698                            .await
1699                            .expect("invalid usage");
1700                    }
1701
1702                    let batch = builder
1703                        .finish(new_upper_chain)
1704                        .await
1705                        .expect("invalid usage");
1706
1707                    match batch_tx.send(batch).await {
1708                        Ok(_) => (),
1709                        Err(e) => panic!("send error: {}", e),
1710                    }
1711                }
1712            });
1713            handles.push(handle);
1714
1715            let handle = mz_ore::task::spawn(|| format!("appender-{}", idx), async move {
1716                let (mut write, _) = client.expect_open::<Vec<u8>, Vec<u8>, u64, i64>(id).await;
1717
1718                while let Some(batch) = batch_rx.recv().await {
1719                    let lower = batch.lower().clone();
1720                    let upper = batch.upper().clone();
1721                    write
1722                        .append_batch(batch, lower, upper)
1723                        .await
1724                        .expect("invalid usage")
1725                        .expect("unexpected upper");
1726                }
1727            });
1728            handles.push(handle);
1729        }
1730
1731        for handle in handles {
1732            let () = handle.await.expect("task failed");
1733        }
1734
1735        let expected = data.records().collect::<Vec<_>>();
1736        let max_ts = expected.last().map(|(_, t, _)| *t).unwrap_or_default();
1737        let (_, mut read) = client.expect_open::<Vec<u8>, Vec<u8>, u64, i64>(id).await;
1738        assert_eq!(
1739            read.expect_snapshot_and_fetch(max_ts).await,
1740            all_ok(expected.iter(), max_ts)
1741        );
1742    }
1743
1744    // Regression test for database-issues#3523. Snapshot with as_of >= upper would
1745    // immediately return the data currently available instead of waiting for
1746    // upper to advance past as_of.
1747    #[mz_persist_proc::test(tokio::test)]
1748    #[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented
1749    async fn regression_blocking_reads(dyncfgs: ConfigUpdates) {
1750        let waker = noop_waker();
1751        let mut cx = Context::from_waker(&waker);
1752
1753        let data = [
1754            (("1".to_owned(), "one".to_owned()), 1, 1),
1755            (("2".to_owned(), "two".to_owned()), 2, 1),
1756            (("3".to_owned(), "three".to_owned()), 3, 1),
1757        ];
1758
1759        let id = ShardId::new();
1760        let client = new_test_client(&dyncfgs).await;
1761        let (mut write, mut read) = client.expect_open::<String, String, u64, i64>(id).await;
1762
1763        // Grab a listener as_of (aka gt) 1, which is not yet closed out.
1764        let mut listen = read.clone("").await.expect_listen(1).await;
1765        let mut listen_next = Box::pin(listen.fetch_next());
1766        // Intentionally don't await the listen_next, but instead manually poke
1767        // it for a while and assert that it doesn't resolve yet. See below for
1768        // discussion of some alternative ways of writing this unit test.
1769        for _ in 0..100 {
1770            assert!(
1771                Pin::new(&mut listen_next).poll(&mut cx).is_pending(),
1772                "listen::next unexpectedly ready"
1773            );
1774        }
1775
1776        // Write a [0,3) batch.
1777        write
1778            .expect_compare_and_append(&data[..2], u64::minimum(), 3)
1779            .await;
1780
1781        // The initial listen_next call should now be able to return data at 2.
1782        // It doesn't get 1 because the as_of was 1 and listen is strictly gt.
1783        assert_eq!(
1784            listen_next.await,
1785            vec![
1786                ListenEvent::Updates(vec![((Ok("2".to_owned()), Ok("two".to_owned())), 2, 1)]),
1787                ListenEvent::Progress(Antichain::from_elem(3)),
1788            ]
1789        );
1790
1791        // Grab a snapshot as_of 3, which is not yet closed out. Intentionally
1792        // don't await the snap, but instead manually poke it for a while and
1793        // assert that it doesn't resolve yet.
1794        //
1795        // An alternative to this would be to run it in a task and poll the task
1796        // with some timeout, but this would introduce a fixed test execution
1797        // latency of the timeout in the happy case. Plus, it would be
1798        // non-deterministic.
1799        //
1800        // Another alternative (that's potentially quite interesting!) would be
1801        // to separate creating a snapshot immediately (which would fail if
1802        // as_of was >= upper) from a bit of logic that retries until that case
1803        // is ready.
1804        let mut snap = Box::pin(read.expect_snapshot_and_fetch(3));
1805        for _ in 0..100 {
1806            assert!(
1807                Pin::new(&mut snap).poll(&mut cx).is_pending(),
1808                "snapshot unexpectedly ready"
1809            );
1810        }
1811
1812        // Now add the data at 3 and also unblock the snapshot.
1813        write.expect_compare_and_append(&data[2..], 3, 4).await;
1814
1815        // Read the snapshot and check that it got all the appropriate data.
1816        assert_eq!(snap.await, all_ok(&data[..], 3));
1817    }
1818
1819    #[mz_persist_proc::test(tokio::test)]
1820    #[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented
1821    async fn heartbeat_task_shutdown(dyncfgs: ConfigUpdates) {
1822        // Verify that the ReadHandle and WriteHandle background heartbeat tasks
1823        // shut down cleanly after the handle is expired.
1824        let mut cache = new_test_client_cache(&dyncfgs);
1825        cache
1826            .cfg
1827            .set_config(&READER_LEASE_DURATION, Duration::from_millis(1));
1828        cache.cfg.writer_lease_duration = Duration::from_millis(1);
1829        let (_write, mut read) = cache
1830            .open(PersistLocation::new_in_mem())
1831            .await
1832            .expect("client construction failed")
1833            .expect_open::<(), (), u64, i64>(ShardId::new())
1834            .await;
1835        let mut read_unexpired_state = read
1836            .unexpired_state
1837            .take()
1838            .expect("handle should have unexpired state");
1839        read.expire().await;
1840        for read_heartbeat_task in mem::take(&mut read_unexpired_state._heartbeat_tasks) {
1841            let () = read_heartbeat_task
1842                .await
1843                .expect("task should shutdown cleanly");
1844        }
1845    }
1846
1847    /// Verify that shard finalization works with empty shards, shards that have
1848    /// an empty write up to the empty upper Antichain.
1849    #[mz_persist_proc::test(tokio::test)]
1850    #[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented
1851    async fn finalize_empty_shard(dyncfgs: ConfigUpdates) {
1852        const EMPTY: &[(((), ()), u64, i64)] = &[];
1853        let persist_client = new_test_client(&dyncfgs).await;
1854
1855        let shard_id = ShardId::new();
1856        pub const CRITICAL_SINCE: CriticalReaderId =
1857            CriticalReaderId([0, 0, 0, 0, 17, 17, 34, 34, 51, 51, 68, 68, 68, 68, 68, 68]);
1858
1859        let (mut write, mut read) = persist_client
1860            .expect_open::<(), (), u64, i64>(shard_id)
1861            .await;
1862
1863        // Advance since and upper to empty, which is a pre-requisite for
1864        // finalization/tombstoning.
1865        let () = read.downgrade_since(&Antichain::new()).await;
1866        let () = write
1867            .compare_and_append(EMPTY, Antichain::from_elem(0), Antichain::new())
1868            .await
1869            .expect("usage should be valid")
1870            .expect("upper should match");
1871
1872        let mut since_handle: SinceHandle<(), (), u64, i64, u64> = persist_client
1873            .open_critical_since(shard_id, CRITICAL_SINCE, Diagnostics::for_tests())
1874            .await
1875            .expect("invalid persist usage");
1876
1877        let epoch = since_handle.opaque().clone();
1878        let new_since = Antichain::new();
1879        let downgrade = since_handle
1880            .compare_and_downgrade_since(&epoch, (&epoch, &new_since))
1881            .await;
1882
1883        assert!(
1884            downgrade.is_ok(),
1885            "downgrade of critical handle must succeed"
1886        );
1887
1888        let finalize = persist_client
1889            .finalize_shard::<(), (), u64, i64>(shard_id, Diagnostics::for_tests())
1890            .await;
1891
1892        assert_ok!(finalize, "finalization must succeed");
1893
1894        let is_finalized = persist_client
1895            .is_finalized::<(), (), u64, i64>(shard_id, Diagnostics::for_tests())
1896            .await
1897            .expect("invalid persist usage");
1898        assert!(is_finalized, "shard must still be finalized");
1899    }
1900
1901    /// Verify that shard finalization works with shards that had some data
1902    /// written to them, plus then an empty batch to bring their upper to the
1903    /// empty Antichain.
1904    #[mz_persist_proc::test(tokio::test)]
1905    #[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented
1906    async fn finalize_shard(dyncfgs: ConfigUpdates) {
1907        const EMPTY: &[(((), ()), u64, i64)] = &[];
1908        const DATA: &[(((), ()), u64, i64)] = &[(((), ()), 0, 1)];
1909        let persist_client = new_test_client(&dyncfgs).await;
1910
1911        let shard_id = ShardId::new();
1912        pub const CRITICAL_SINCE: CriticalReaderId =
1913            CriticalReaderId([0, 0, 0, 0, 17, 17, 34, 34, 51, 51, 68, 68, 68, 68, 68, 68]);
1914
1915        let (mut write, mut read) = persist_client
1916            .expect_open::<(), (), u64, i64>(shard_id)
1917            .await;
1918
1919        // Write some data.
1920        let () = write
1921            .compare_and_append(DATA, Antichain::from_elem(0), Antichain::from_elem(1))
1922            .await
1923            .expect("usage should be valid")
1924            .expect("upper should match");
1925
1926        // Advance since and upper to empty, which is a pre-requisite for
1927        // finalization/tombstoning.
1928        let () = read.downgrade_since(&Antichain::new()).await;
1929        let () = write
1930            .compare_and_append(EMPTY, Antichain::from_elem(1), Antichain::new())
1931            .await
1932            .expect("usage should be valid")
1933            .expect("upper should match");
1934
1935        let mut since_handle: SinceHandle<(), (), u64, i64, u64> = persist_client
1936            .open_critical_since(shard_id, CRITICAL_SINCE, Diagnostics::for_tests())
1937            .await
1938            .expect("invalid persist usage");
1939
1940        let epoch = since_handle.opaque().clone();
1941        let new_since = Antichain::new();
1942        let downgrade = since_handle
1943            .compare_and_downgrade_since(&epoch, (&epoch, &new_since))
1944            .await;
1945
1946        assert!(
1947            downgrade.is_ok(),
1948            "downgrade of critical handle must succeed"
1949        );
1950
1951        let finalize = persist_client
1952            .finalize_shard::<(), (), u64, i64>(shard_id, Diagnostics::for_tests())
1953            .await;
1954
1955        assert_ok!(finalize, "finalization must succeed");
1956
1957        let is_finalized = persist_client
1958            .is_finalized::<(), (), u64, i64>(shard_id, Diagnostics::for_tests())
1959            .await
1960            .expect("invalid persist usage");
1961        assert!(is_finalized, "shard must still be finalized");
1962    }
1963
1964    proptest! {
1965        #![proptest_config(ProptestConfig::with_cases(4096))]
1966
1967        #[mz_ore::test]
1968        #[cfg_attr(miri, ignore)] // too slow
1969        fn shard_id_protobuf_roundtrip(expect in any::<ShardId>() ) {
1970            let actual = protobuf_roundtrip::<_, String>(&expect);
1971            assert_ok!(actual);
1972            assert_eq!(actual.unwrap(), expect);
1973        }
1974    }
1975}