mz_persist_client/
write.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Write capabilities and handles
11
12use std::borrow::Borrow;
13use std::fmt::Debug;
14use std::sync::Arc;
15
16use differential_dataflow::difference::Monoid;
17use differential_dataflow::lattice::Lattice;
18use differential_dataflow::trace::Description;
19use futures::StreamExt;
20use futures::stream::FuturesUnordered;
21use mz_dyncfg::Config;
22use mz_ore::instrument;
23use mz_ore::task::RuntimeExt;
24use mz_persist::location::Blob;
25use mz_persist_types::schema::SchemaId;
26use mz_persist_types::{Codec, Codec64};
27use mz_proto::{IntoRustIfSome, ProtoType};
28use proptest_derive::Arbitrary;
29use semver::Version;
30use serde::{Deserialize, Serialize};
31use timely::PartialOrder;
32use timely::order::TotalOrder;
33use timely::progress::{Antichain, Timestamp};
34use tokio::runtime::Handle;
35use tracing::{Instrument, debug_span, info, warn};
36use uuid::Uuid;
37
38use crate::batch::{
39    Added, BATCH_DELETE_ENABLED, Batch, BatchBuilder, BatchBuilderConfig, BatchBuilderInternal,
40    BatchParts, ProtoBatch, validate_truncate_batch,
41};
42use crate::error::{InvalidUsage, UpperMismatch};
43use crate::fetch::{
44    EncodedPart, FetchBatchFilter, FetchedPart, PartDecodeFormat, VALIDATE_PART_BOUNDS_ON_READ,
45};
46use crate::internal::compact::{CompactConfig, Compactor};
47use crate::internal::encoding::{Schemas, check_data_version};
48use crate::internal::machine::{CompareAndAppendRes, ExpireFn, Machine};
49use crate::internal::metrics::{BatchWriteMetrics, Metrics, ShardMetrics};
50use crate::internal::state::{BatchPart, HandleDebugState, HollowBatch, RunOrder, RunPart};
51use crate::read::ReadHandle;
52use crate::schema::PartMigration;
53use crate::{GarbageCollector, IsolatedRuntime, PersistConfig, ShardId, parse_id};
54
55pub(crate) const COMBINE_INLINE_WRITES: Config<bool> = Config::new(
56    "persist_write_combine_inline_writes",
57    true,
58    "If set, re-encode inline writes if they don't fit into the batch metadata limits.",
59);
60
61pub(crate) const VALIDATE_PART_BOUNDS_ON_WRITE: Config<bool> = Config::new(
62    "persist_validate_part_bounds_on_write",
63    true,
64    "Validate the part lower <= the batch lower and the part upper <= batch upper,\
65    for the batch being appended.",
66);
67
68/// An opaque identifier for a writer of a persist durable TVC (aka shard).
69#[derive(Arbitrary, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
70#[serde(try_from = "String", into = "String")]
71pub struct WriterId(pub(crate) [u8; 16]);
72
73impl std::fmt::Display for WriterId {
74    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
75        write!(f, "w{}", Uuid::from_bytes(self.0))
76    }
77}
78
79impl std::fmt::Debug for WriterId {
80    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
81        write!(f, "WriterId({})", Uuid::from_bytes(self.0))
82    }
83}
84
85impl std::str::FromStr for WriterId {
86    type Err = String;
87
88    fn from_str(s: &str) -> Result<Self, Self::Err> {
89        parse_id("w", "WriterId", s).map(WriterId)
90    }
91}
92
93impl From<WriterId> for String {
94    fn from(writer_id: WriterId) -> Self {
95        writer_id.to_string()
96    }
97}
98
99impl TryFrom<String> for WriterId {
100    type Error = String;
101
102    fn try_from(s: String) -> Result<Self, Self::Error> {
103        s.parse()
104    }
105}
106
107impl WriterId {
108    pub(crate) fn new() -> Self {
109        WriterId(*Uuid::new_v4().as_bytes())
110    }
111}
112
113/// A "capability" granting the ability to apply updates to some shard at times
114/// greater or equal to `self.upper()`.
115///
116/// All async methods on ReadHandle retry for as long as they are able, but the
117/// returned [std::future::Future]s implement "cancel on drop" semantics. This
118/// means that callers can add a timeout using [tokio::time::timeout] or
119/// [tokio::time::timeout_at].
120///
121/// ```rust,no_run
122/// # let mut write: mz_persist_client::write::WriteHandle<String, String, u64, i64> = unimplemented!();
123/// # let timeout: std::time::Duration = unimplemented!();
124/// # async {
125/// tokio::time::timeout(timeout, write.fetch_recent_upper()).await
126/// # };
127/// ```
128#[derive(Debug)]
129pub struct WriteHandle<K: Codec, V: Codec, T, D> {
130    pub(crate) cfg: PersistConfig,
131    pub(crate) metrics: Arc<Metrics>,
132    pub(crate) machine: Machine<K, V, T, D>,
133    pub(crate) gc: GarbageCollector<K, V, T, D>,
134    pub(crate) compact: Option<Compactor<K, V, T, D>>,
135    pub(crate) blob: Arc<dyn Blob>,
136    pub(crate) isolated_runtime: Arc<IsolatedRuntime>,
137    pub(crate) writer_id: WriterId,
138    pub(crate) debug_state: HandleDebugState,
139    pub(crate) write_schemas: Schemas<K, V>,
140
141    pub(crate) upper: Antichain<T>,
142    expire_fn: Option<ExpireFn>,
143}
144
145impl<K, V, T, D> WriteHandle<K, V, T, D>
146where
147    K: Debug + Codec,
148    V: Debug + Codec,
149    T: Timestamp + TotalOrder + Lattice + Codec64 + Sync,
150    D: Monoid + Ord + Codec64 + Send + Sync,
151{
152    pub(crate) fn new(
153        cfg: PersistConfig,
154        metrics: Arc<Metrics>,
155        machine: Machine<K, V, T, D>,
156        gc: GarbageCollector<K, V, T, D>,
157        blob: Arc<dyn Blob>,
158        writer_id: WriterId,
159        purpose: &str,
160        write_schemas: Schemas<K, V>,
161    ) -> Self {
162        let isolated_runtime = Arc::clone(&machine.isolated_runtime);
163        let compact = cfg.compaction_enabled.then(|| {
164            Compactor::new(
165                cfg.clone(),
166                Arc::clone(&metrics),
167                write_schemas.clone(),
168                gc.clone(),
169            )
170        });
171        let debug_state = HandleDebugState {
172            hostname: cfg.hostname.to_owned(),
173            purpose: purpose.to_owned(),
174        };
175        let upper = machine.applier.clone_upper();
176        let expire_fn = Self::expire_fn(machine.clone(), gc.clone(), writer_id.clone());
177        WriteHandle {
178            cfg,
179            metrics,
180            machine,
181            gc,
182            compact,
183            blob,
184            isolated_runtime,
185            writer_id,
186            debug_state,
187            write_schemas,
188            upper,
189            expire_fn: Some(expire_fn),
190        }
191    }
192
193    /// Creates a [WriteHandle] for the same shard from an existing
194    /// [ReadHandle].
195    pub fn from_read(read: &ReadHandle<K, V, T, D>, purpose: &str) -> Self {
196        Self::new(
197            read.cfg.clone(),
198            Arc::clone(&read.metrics),
199            read.machine.clone(),
200            read.gc.clone(),
201            Arc::clone(&read.blob),
202            WriterId::new(),
203            purpose,
204            read.read_schemas.clone(),
205        )
206    }
207
208    /// Whether or not this WriteHandle supports writing without enforcing batch
209    /// bounds checks.
210    pub fn validate_part_bounds_on_write(&self) -> bool {
211        VALIDATE_PART_BOUNDS_ON_WRITE.get(&self.cfg) && VALIDATE_PART_BOUNDS_ON_READ.get(&self.cfg)
212    }
213
214    /// This handle's shard id.
215    pub fn shard_id(&self) -> ShardId {
216        self.machine.shard_id()
217    }
218
219    /// Returns the schema of this writer.
220    pub fn schema_id(&self) -> Option<SchemaId> {
221        self.write_schemas.id
222    }
223
224    /// Registers the write schema, if it isn't already registered.
225    ///
226    /// # Panics
227    ///
228    /// This method expects that either the shard doesn't yet have any schema registered, or one of
229    /// the registered schemas is the same as the write schema. If all registered schemas are
230    /// different from the write schema, it panics.
231    pub async fn ensure_schema_registered(&mut self) -> SchemaId {
232        let Schemas { id, key, val } = &self.write_schemas;
233
234        if let Some(id) = id {
235            return *id;
236        }
237
238        let (schema_id, maintenance) = self.machine.register_schema(key, val).await;
239        maintenance.start_performing(&self.machine, &self.gc);
240
241        let Some(schema_id) = schema_id else {
242            panic!("unable to register schemas: {key:?} {val:?}");
243        };
244
245        self.write_schemas.id = Some(schema_id);
246        schema_id
247    }
248
249    /// A cached version of the shard-global `upper` frontier.
250    ///
251    /// This is the most recent upper discovered by this handle. It is
252    /// potentially more stale than [Self::shared_upper] but is lock-free and
253    /// allocation-free. This will always be less or equal to the shard-global
254    /// `upper`.
255    pub fn upper(&self) -> &Antichain<T> {
256        &self.upper
257    }
258
259    /// A less-stale cached version of the shard-global `upper` frontier.
260    ///
261    /// This is the most recently known upper for this shard process-wide, but
262    /// unlike [Self::upper] it requires a mutex and a clone. This will always be
263    /// less or equal to the shard-global `upper`.
264    pub fn shared_upper(&self) -> Antichain<T> {
265        self.machine.applier.clone_upper()
266    }
267
268    /// Fetches and returns a recent shard-global `upper`. Importantly, this operation is
269    /// linearized with write operations.
270    ///
271    /// This requires fetching the latest state from consensus and is therefore a potentially
272    /// expensive operation.
273    #[instrument(level = "debug", fields(shard = %self.machine.shard_id()))]
274    pub async fn fetch_recent_upper(&mut self) -> &Antichain<T> {
275        // TODO: Do we even need to track self.upper on WriteHandle or could
276        // WriteHandle::upper just get the one out of machine?
277        self.machine
278            .applier
279            .fetch_upper(|current_upper| self.upper.clone_from(current_upper))
280            .await;
281        &self.upper
282    }
283
284    /// Advance the shard's upper by the given frontier.
285    ///
286    /// If the provided `target` is less than or equal to the shard's upper, this is a no-op.
287    ///
288    /// In contrast to the various compare-and-append methods, this method does not require the
289    /// handle's write schema to be registered with the shard. That is, it is fine to use a dummy
290    /// schema when creating a writer just to advance a shard upper.
291    pub async fn advance_upper(&mut self, target: &Antichain<T>) {
292        // We avoid `fetch_recent_upper` here, to avoid a consensus roundtrip if the known upper is
293        // already beyond the target.
294        let mut lower = self.shared_upper().clone();
295
296        while !PartialOrder::less_equal(target, &lower) {
297            let since = Antichain::from_elem(T::minimum());
298            let desc = Description::new(lower.clone(), target.clone(), since);
299            let batch = HollowBatch::empty(desc);
300
301            let heartbeat_timestamp = (self.cfg.now)();
302            let res = self
303                .machine
304                .compare_and_append(
305                    &batch,
306                    &self.writer_id,
307                    &self.debug_state,
308                    heartbeat_timestamp,
309                )
310                .await;
311
312            use CompareAndAppendRes::*;
313            let new_upper = match res {
314                Success(_seq_no, maintenance) => {
315                    maintenance.start_performing(&self.machine, &self.gc, self.compact.as_ref());
316                    batch.desc.upper().clone()
317                }
318                UpperMismatch(_seq_no, actual_upper) => actual_upper,
319                InvalidUsage(_invalid_usage) => unreachable!("batch bounds checked above"),
320                InlineBackpressure => unreachable!("batch was empty"),
321            };
322
323            self.upper.clone_from(&new_upper);
324            lower = new_upper;
325        }
326    }
327
328    /// Applies `updates` to this shard and downgrades this handle's upper to
329    /// `upper`.
330    ///
331    /// The innermost `Result` is `Ok` if the updates were successfully written.
332    /// If not, an `Upper` err containing the current writer upper is returned.
333    /// If that happens, we also update our local `upper` to match the current
334    /// upper. This is useful in cases where a timeout happens in between a
335    /// successful write and returning that to the client.
336    ///
337    /// In contrast to [Self::compare_and_append], multiple [WriteHandle]s may
338    /// be used concurrently to write to the same shard, but in this case, the
339    /// data being written must be identical (in the sense of "definite"-ness).
340    /// It's intended for replicated use by source ingestion, sinks, etc.
341    ///
342    /// All times in `updates` must be greater or equal to `lower` and not
343    /// greater or equal to `upper`. A `upper` of the empty antichain "finishes"
344    /// this shard, promising that no more data is ever incoming.
345    ///
346    /// `updates` may be empty, which allows for downgrading `upper` to
347    /// communicate progress. It is possible to call this with `upper` equal to
348    /// `self.upper()` and an empty `updates` (making the call a no-op).
349    ///
350    /// This uses a bounded amount of memory, even when `updates` is very large.
351    /// Individual records, however, should be small enough that we can
352    /// reasonably chunk them up: O(KB) is definitely fine, O(MB) come talk to
353    /// us.
354    ///
355    /// The clunky multi-level Result is to enable more obvious error handling
356    /// in the caller. See <http://sled.rs/errors.html> for details.
357    #[instrument(level = "trace", fields(shard = %self.machine.shard_id()))]
358    pub async fn append<SB, KB, VB, TB, DB, I>(
359        &mut self,
360        updates: I,
361        lower: Antichain<T>,
362        upper: Antichain<T>,
363    ) -> Result<Result<(), UpperMismatch<T>>, InvalidUsage<T>>
364    where
365        SB: Borrow<((KB, VB), TB, DB)>,
366        KB: Borrow<K>,
367        VB: Borrow<V>,
368        TB: Borrow<T>,
369        DB: Borrow<D>,
370        I: IntoIterator<Item = SB>,
371        D: Send + Sync,
372    {
373        let batch = self.batch(updates, lower.clone(), upper.clone()).await?;
374        self.append_batch(batch, lower, upper).await
375    }
376
377    /// Applies `updates` to this shard and downgrades this handle's upper to
378    /// `new_upper` iff the current global upper of this shard is
379    /// `expected_upper`.
380    ///
381    /// The innermost `Result` is `Ok` if the updates were successfully written.
382    /// If not, an `Upper` err containing the current global upper is returned.
383    ///
384    /// In contrast to [Self::append], this linearizes mutations from all
385    /// writers. It's intended for use as an atomic primitive for timestamp
386    /// bindings, SQL tables, etc.
387    ///
388    /// All times in `updates` must be greater or equal to `expected_upper` and
389    /// not greater or equal to `new_upper`. A `new_upper` of the empty
390    /// antichain "finishes" this shard, promising that no more data is ever
391    /// incoming.
392    ///
393    /// `updates` may be empty, which allows for downgrading `upper` to
394    /// communicate progress. It is possible to heartbeat a writer lease by
395    /// calling this with `new_upper` equal to `self.upper()` and an empty
396    /// `updates` (making the call a no-op).
397    ///
398    /// This uses a bounded amount of memory, even when `updates` is very large.
399    /// Individual records, however, should be small enough that we can
400    /// reasonably chunk them up: O(KB) is definitely fine, O(MB) come talk to
401    /// us.
402    ///
403    /// The clunky multi-level Result is to enable more obvious error handling
404    /// in the caller. See <http://sled.rs/errors.html> for details.
405    #[instrument(level = "trace", fields(shard = %self.machine.shard_id()))]
406    pub async fn compare_and_append<SB, KB, VB, TB, DB, I>(
407        &mut self,
408        updates: I,
409        expected_upper: Antichain<T>,
410        new_upper: Antichain<T>,
411    ) -> Result<Result<(), UpperMismatch<T>>, InvalidUsage<T>>
412    where
413        SB: Borrow<((KB, VB), TB, DB)>,
414        KB: Borrow<K>,
415        VB: Borrow<V>,
416        TB: Borrow<T>,
417        DB: Borrow<D>,
418        I: IntoIterator<Item = SB>,
419        D: Send + Sync,
420    {
421        let mut batch = self
422            .batch(updates, expected_upper.clone(), new_upper.clone())
423            .await?;
424        match self
425            .compare_and_append_batch(&mut [&mut batch], expected_upper, new_upper, true)
426            .await
427        {
428            ok @ Ok(Ok(())) => ok,
429            err => {
430                // We cannot delete the batch in compare_and_append_batch()
431                // because the caller owns the batch and might want to retry
432                // with a different `expected_upper`. In this function, we
433                // control the batch, so we have to delete it.
434                batch.delete().await;
435                err
436            }
437        }
438    }
439
440    /// Appends the batch of updates to the shard and downgrades this handle's
441    /// upper to `upper`.
442    ///
443    /// The innermost `Result` is `Ok` if the updates were successfully written.
444    /// If not, an `Upper` err containing the current writer upper is returned.
445    /// If that happens, we also update our local `upper` to match the current
446    /// upper. This is useful in cases where a timeout happens in between a
447    /// successful write and returning that to the client.
448    ///
449    /// In contrast to [Self::compare_and_append_batch], multiple [WriteHandle]s
450    /// may be used concurrently to write to the same shard, but in this case,
451    /// the data being written must be identical (in the sense of
452    /// "definite"-ness). It's intended for replicated use by source ingestion,
453    /// sinks, etc.
454    ///
455    /// A `upper` of the empty antichain "finishes" this shard, promising that
456    /// no more data is ever incoming.
457    ///
458    /// The batch may be empty, which allows for downgrading `upper` to
459    /// communicate progress. It is possible to heartbeat a writer lease by
460    /// calling this with `upper` equal to `self.upper()` and an empty `updates`
461    /// (making the call a no-op).
462    ///
463    /// The clunky multi-level Result is to enable more obvious error handling
464    /// in the caller. See <http://sled.rs/errors.html> for details.
465    #[instrument(level = "trace", fields(shard = %self.machine.shard_id()))]
466    pub async fn append_batch(
467        &mut self,
468        mut batch: Batch<K, V, T, D>,
469        mut lower: Antichain<T>,
470        upper: Antichain<T>,
471    ) -> Result<Result<(), UpperMismatch<T>>, InvalidUsage<T>>
472    where
473        D: Send + Sync,
474    {
475        loop {
476            let res = self
477                .compare_and_append_batch(&mut [&mut batch], lower.clone(), upper.clone(), true)
478                .await?;
479            match res {
480                Ok(()) => {
481                    self.upper = upper;
482                    return Ok(Ok(()));
483                }
484                Err(mismatch) => {
485                    // We tried to to a non-contiguous append, that won't work.
486                    if PartialOrder::less_than(&mismatch.current, &lower) {
487                        self.upper.clone_from(&mismatch.current);
488
489                        batch.delete().await;
490
491                        return Ok(Err(mismatch));
492                    } else if PartialOrder::less_than(&mismatch.current, &upper) {
493                        // Cut down the Description by advancing its lower to the current shard
494                        // upper and try again. IMPORTANT: We can only advance the lower, meaning
495                        // we cut updates away, we must not "extend" the batch by changing to a
496                        // lower that is not beyond the current lower. This invariant is checked by
497                        // the first if branch: if `!(current_upper < lower)` then it holds that
498                        // `lower <= current_upper`.
499                        lower = mismatch.current;
500                    } else {
501                        // We already have updates past this batch's upper, the append is a no-op.
502                        self.upper = mismatch.current;
503
504                        // Because we return a success result, the caller will
505                        // think that the batch was consumed or otherwise used,
506                        // so we have to delete it here.
507                        batch.delete().await;
508
509                        return Ok(Ok(()));
510                    }
511                }
512            }
513        }
514    }
515
516    /// Appends the batch of updates to the shard and downgrades this handle's
517    /// upper to `new_upper` iff the current global upper of this shard is
518    /// `expected_upper`.
519    ///
520    /// The innermost `Result` is `Ok` if the batch was successfully written. If
521    /// not, an `Upper` err containing the current global upper is returned.
522    ///
523    /// In contrast to [Self::append_batch], this linearizes mutations from all
524    /// writers. It's intended for use as an atomic primitive for timestamp
525    /// bindings, SQL tables, etc.
526    ///
527    /// A `new_upper` of the empty antichain "finishes" this shard, promising
528    /// that no more data is ever incoming.
529    ///
530    /// The batch may be empty, which allows for downgrading `upper` to
531    /// communicate progress. It is possible to heartbeat a writer lease by
532    /// calling this with `new_upper` equal to `self.upper()` and an empty
533    /// `updates` (making the call a no-op).
534    ///
535    /// IMPORTANT: In case of an erroneous result the caller is responsible for
536    /// the lifecycle of the `batch`. It can be deleted or it can be used to
537    /// retry with adjusted frontiers.
538    ///
539    /// The clunky multi-level Result is to enable more obvious error handling
540    /// in the caller. See <http://sled.rs/errors.html> for details.
541    ///
542    /// If the `enforce_matching_batch_boundaries` flag is set to `false`:
543    /// We no longer validate that every batch covers the entire range between
544    /// the expected and new uppers, as we wish to allow combining batches that
545    /// cover different subsets of that range, including subsets of that range
546    /// that include no data at all. The caller is responsible for guaranteeing
547    /// that the set of batches provided collectively include all updates for
548    /// the entire range between the expected and new upper.
549    #[instrument(level = "debug", fields(shard = %self.machine.shard_id()))]
550    pub async fn compare_and_append_batch(
551        &mut self,
552        batches: &mut [&mut Batch<K, V, T, D>],
553        expected_upper: Antichain<T>,
554        new_upper: Antichain<T>,
555        validate_part_bounds_on_write: bool,
556    ) -> Result<Result<(), UpperMismatch<T>>, InvalidUsage<T>>
557    where
558        D: Send + Sync,
559    {
560        // Before we append any data, we require a registered write schema.
561        let schema_id = self.ensure_schema_registered().await;
562
563        for batch in batches.iter() {
564            if self.machine.shard_id() != batch.shard_id() {
565                return Err(InvalidUsage::BatchNotFromThisShard {
566                    batch_shard: batch.shard_id(),
567                    handle_shard: self.machine.shard_id(),
568                });
569            }
570            check_data_version(&self.cfg.build_version, &batch.version);
571            if self.cfg.build_version > batch.version {
572                info!(
573                    shard_id =? self.machine.shard_id(),
574                    batch_version =? batch.version,
575                    writer_version =? self.cfg.build_version,
576                    "Appending batch from the past. This is fine but should be rare. \
577                    TODO: Error on very old versions once the leaked blob detector exists."
578                )
579            }
580        }
581
582        let lower = expected_upper.clone();
583        let upper = new_upper;
584        let since = Antichain::from_elem(T::minimum());
585        let desc = Description::new(lower, upper, since);
586
587        let mut received_inline_backpressure = false;
588        // Every hollow part must belong to some batch, so we can clean it up when the batch is dropped...
589        // but if we need to merge all our inline parts to a single run in S3, it's not correct to
590        // associate that with any of our individual input batches.
591        // At first, we'll try and put all the inline parts we receive into state... but if we
592        // get backpressured, we retry with this builder set to `Some`, put all our inline data into
593        // it, and ensure it's flushed out to S3 before including it in the batch.
594        let mut inline_batch_builder: Option<(_, BatchBuilder<K, V, T, D>)> = None;
595        let maintenance = loop {
596            let any_batch_rewrite = batches
597                .iter()
598                .any(|x| x.batch.parts.iter().any(|x| x.ts_rewrite().is_some()));
599            let (mut parts, mut num_updates, mut run_splits, mut run_metas) =
600                (vec![], 0, vec![], vec![]);
601            let mut key_storage = None;
602            let mut val_storage = None;
603            for batch in batches.iter() {
604                let () = validate_truncate_batch(
605                    &batch.batch,
606                    &desc,
607                    any_batch_rewrite,
608                    validate_part_bounds_on_write,
609                )?;
610                for (run_meta, run) in batch.batch.runs() {
611                    let start_index = parts.len();
612                    for part in run {
613                        if let (
614                            RunPart::Single(
615                                batch_part @ BatchPart::Inline {
616                                    updates,
617                                    ts_rewrite,
618                                    schema_id: _,
619                                    deprecated_schema_id: _,
620                                },
621                            ),
622                            Some((schema_cache, builder)),
623                        ) = (part, &mut inline_batch_builder)
624                        {
625                            let schema_migration = PartMigration::new(
626                                batch_part,
627                                self.write_schemas.clone(),
628                                schema_cache,
629                            )
630                            .await
631                            .expect("schemas for inline user part");
632
633                            let encoded_part = EncodedPart::from_inline(
634                                &crate::fetch::FetchConfig::from_persist_config(&self.cfg),
635                                &*self.metrics,
636                                self.metrics.read.compaction.clone(),
637                                desc.clone(),
638                                updates,
639                                ts_rewrite.as_ref(),
640                            );
641                            let mut fetched_part = FetchedPart::new(
642                                Arc::clone(&self.metrics),
643                                encoded_part,
644                                schema_migration,
645                                FetchBatchFilter::Compaction {
646                                    since: desc.since().clone(),
647                                },
648                                false,
649                                PartDecodeFormat::Arrow,
650                                None,
651                            );
652
653                            while let Some(((k, v), t, d)) =
654                                fetched_part.next_with_storage(&mut key_storage, &mut val_storage)
655                            {
656                                builder
657                                    .add(
658                                        &k.expect("decoded just-encoded key data"),
659                                        &v.expect("decoded just-encoded val data"),
660                                        &t,
661                                        &d,
662                                    )
663                                    .await
664                                    .expect("re-encoding just-decoded data");
665                            }
666                        } else {
667                            parts.push(part.clone())
668                        }
669                    }
670
671                    let end_index = parts.len();
672
673                    if start_index == end_index {
674                        continue;
675                    }
676
677                    // Mark the boundary if this is not the first run in the batch.
678                    if start_index != 0 {
679                        run_splits.push(start_index);
680                    }
681                    run_metas.push(run_meta.clone());
682                }
683                num_updates += batch.batch.len;
684            }
685
686            let mut flushed_inline_batch = if let Some((_, builder)) = inline_batch_builder.take() {
687                let mut finished = builder
688                    .finish(desc.upper().clone())
689                    .await
690                    .expect("invalid usage");
691                let cfg = BatchBuilderConfig::new(&self.cfg, self.shard_id());
692                finished
693                    .flush_to_blob(
694                        &cfg,
695                        &self.metrics.inline.backpressure,
696                        &self.isolated_runtime,
697                        &self.write_schemas,
698                    )
699                    .await;
700                Some(finished)
701            } else {
702                None
703            };
704
705            if let Some(batch) = &flushed_inline_batch {
706                for (run_meta, run) in batch.batch.runs() {
707                    assert!(run.len() > 0);
708                    let start_index = parts.len();
709                    if start_index != 0 {
710                        run_splits.push(start_index);
711                    }
712                    run_metas.push(run_meta.clone());
713                    parts.extend(run.iter().cloned())
714                }
715            }
716
717            let mut combined_batch =
718                HollowBatch::new(desc.clone(), parts, num_updates, run_metas, run_splits);
719            // The batch may have been written by a writer without a registered schema.
720            // Ensure we have a schema ID in the batch metadata before we append, to avoid type
721            // confusion later.
722            ensure_batch_schema(&mut combined_batch, self.shard_id(), schema_id);
723            let heartbeat_timestamp = (self.cfg.now)();
724            let res = self
725                .machine
726                .compare_and_append(
727                    &combined_batch,
728                    &self.writer_id,
729                    &self.debug_state,
730                    heartbeat_timestamp,
731                )
732                .await;
733
734            match res {
735                CompareAndAppendRes::Success(_seqno, maintenance) => {
736                    self.upper.clone_from(desc.upper());
737                    for batch in batches.iter_mut() {
738                        batch.mark_consumed();
739                    }
740                    if let Some(batch) = &mut flushed_inline_batch {
741                        batch.mark_consumed();
742                    }
743                    break maintenance;
744                }
745                CompareAndAppendRes::InvalidUsage(invalid_usage) => {
746                    if let Some(batch) = flushed_inline_batch.take() {
747                        batch.delete().await;
748                    }
749                    return Err(invalid_usage);
750                }
751                CompareAndAppendRes::UpperMismatch(_seqno, current_upper) => {
752                    if let Some(batch) = flushed_inline_batch.take() {
753                        batch.delete().await;
754                    }
755                    // We tried to to a compare_and_append with the wrong expected upper, that
756                    // won't work. Update the cached upper to the current upper.
757                    self.upper.clone_from(&current_upper);
758                    return Ok(Err(UpperMismatch {
759                        current: current_upper,
760                        expected: expected_upper,
761                    }));
762                }
763                CompareAndAppendRes::InlineBackpressure => {
764                    // We tried to write an inline part, but there was already
765                    // too much in state. Flush it out to s3 and try again.
766                    assert_eq!(received_inline_backpressure, false);
767                    received_inline_backpressure = true;
768                    if COMBINE_INLINE_WRITES.get(&self.cfg) {
769                        inline_batch_builder = Some((
770                            self.machine.applier.schema_cache(),
771                            self.builder(desc.lower().clone()),
772                        ));
773                        continue;
774                    }
775
776                    let cfg = BatchBuilderConfig::new(&self.cfg, self.shard_id());
777                    // We could have a large number of inline parts (imagine the
778                    // sharded persist_sink), do this flushing concurrently.
779                    let flush_batches = batches
780                        .iter_mut()
781                        .map(|batch| async {
782                            batch
783                                .flush_to_blob(
784                                    &cfg,
785                                    &self.metrics.inline.backpressure,
786                                    &self.isolated_runtime,
787                                    &self.write_schemas,
788                                )
789                                .await
790                        })
791                        .collect::<FuturesUnordered<_>>();
792                    let () = flush_batches.collect::<()>().await;
793
794                    for batch in batches.iter() {
795                        assert_eq!(batch.batch.inline_bytes(), 0);
796                    }
797
798                    continue;
799                }
800            }
801        };
802
803        maintenance.start_performing(&self.machine, &self.gc, self.compact.as_ref());
804
805        Ok(Ok(()))
806    }
807
808    /// Turns the given [`ProtoBatch`] back into a [`Batch`] which can be used
809    /// to append it to this shard.
810    pub fn batch_from_transmittable_batch(&self, batch: ProtoBatch) -> Batch<K, V, T, D> {
811        let shard_id: ShardId = batch
812            .shard_id
813            .into_rust()
814            .expect("valid transmittable batch");
815        assert_eq!(shard_id, self.machine.shard_id());
816
817        let ret = Batch {
818            batch_delete_enabled: BATCH_DELETE_ENABLED.get(&self.cfg),
819            metrics: Arc::clone(&self.metrics),
820            shard_metrics: Arc::clone(&self.machine.applier.shard_metrics),
821            version: Version::parse(&batch.version).expect("valid transmittable batch"),
822            batch: batch
823                .batch
824                .into_rust_if_some("ProtoBatch::batch")
825                .expect("valid transmittable batch"),
826            blob: Arc::clone(&self.blob),
827            _phantom: std::marker::PhantomData,
828        };
829        assert_eq!(ret.shard_id(), self.machine.shard_id());
830        ret
831    }
832
833    /// Returns a [BatchBuilder] that can be used to write a batch of updates to
834    /// blob storage which can then be appended to this shard using
835    /// [Self::compare_and_append_batch] or [Self::append_batch].
836    ///
837    /// It is correct to create an empty batch, which allows for downgrading
838    /// `upper` to communicate progress. (see [Self::compare_and_append_batch]
839    /// or [Self::append_batch])
840    ///
841    /// The builder uses a bounded amount of memory, even when the number of
842    /// updates is very large. Individual records, however, should be small
843    /// enough that we can reasonably chunk them up: O(KB) is definitely fine,
844    /// O(MB) come talk to us.
845    pub fn builder(&self, lower: Antichain<T>) -> BatchBuilder<K, V, T, D> {
846        Self::builder_inner(
847            &self.cfg,
848            CompactConfig::new(&self.cfg, self.shard_id()),
849            Arc::clone(&self.metrics),
850            Arc::clone(&self.machine.applier.shard_metrics),
851            &self.metrics.user,
852            Arc::clone(&self.isolated_runtime),
853            Arc::clone(&self.blob),
854            self.shard_id(),
855            self.write_schemas.clone(),
856            lower,
857        )
858    }
859
860    /// Implementation of [Self::builder], so that we can share the
861    /// implementation in `PersistClient`.
862    pub(crate) fn builder_inner(
863        persist_cfg: &PersistConfig,
864        compact_cfg: CompactConfig,
865        metrics: Arc<Metrics>,
866        shard_metrics: Arc<ShardMetrics>,
867        user_batch_metrics: &BatchWriteMetrics,
868        isolated_runtime: Arc<IsolatedRuntime>,
869        blob: Arc<dyn Blob>,
870        shard_id: ShardId,
871        schemas: Schemas<K, V>,
872        lower: Antichain<T>,
873    ) -> BatchBuilder<K, V, T, D> {
874        let parts = if let Some(max_runs) = compact_cfg.batch.max_runs {
875            BatchParts::new_compacting::<K, V, D>(
876                compact_cfg,
877                Description::new(
878                    lower.clone(),
879                    Antichain::new(),
880                    Antichain::from_elem(T::minimum()),
881                ),
882                max_runs,
883                Arc::clone(&metrics),
884                shard_metrics,
885                shard_id,
886                Arc::clone(&blob),
887                isolated_runtime,
888                user_batch_metrics,
889                schemas.clone(),
890            )
891        } else {
892            BatchParts::new_ordered::<D>(
893                compact_cfg.batch,
894                RunOrder::Unordered,
895                Arc::clone(&metrics),
896                shard_metrics,
897                shard_id,
898                Arc::clone(&blob),
899                isolated_runtime,
900                user_batch_metrics,
901            )
902        };
903        let builder = BatchBuilderInternal::new(
904            BatchBuilderConfig::new(persist_cfg, shard_id),
905            parts,
906            metrics,
907            schemas,
908            blob,
909            shard_id,
910            persist_cfg.build_version.clone(),
911        );
912        BatchBuilder::new(
913            builder,
914            Description::new(lower, Antichain::new(), Antichain::from_elem(T::minimum())),
915        )
916    }
917
918    /// Uploads the given `updates` as one `Batch` to the blob store and returns
919    /// a handle to the batch.
920    #[instrument(level = "trace", fields(shard = %self.machine.shard_id()))]
921    pub async fn batch<SB, KB, VB, TB, DB, I>(
922        &mut self,
923        updates: I,
924        lower: Antichain<T>,
925        upper: Antichain<T>,
926    ) -> Result<Batch<K, V, T, D>, InvalidUsage<T>>
927    where
928        SB: Borrow<((KB, VB), TB, DB)>,
929        KB: Borrow<K>,
930        VB: Borrow<V>,
931        TB: Borrow<T>,
932        DB: Borrow<D>,
933        I: IntoIterator<Item = SB>,
934    {
935        let iter = updates.into_iter();
936
937        let mut builder = self.builder(lower.clone());
938
939        for update in iter {
940            let ((k, v), t, d) = update.borrow();
941            let (k, v, t, d) = (k.borrow(), v.borrow(), t.borrow(), d.borrow());
942            match builder.add(k, v, t, d).await {
943                Ok(Added::Record | Added::RecordAndParts) => (),
944                Err(invalid_usage) => return Err(invalid_usage),
945            }
946        }
947
948        builder.finish(upper.clone()).await
949    }
950
951    /// Blocks until the given `frontier` is less than the upper of the shard.
952    pub async fn wait_for_upper_past(&mut self, frontier: &Antichain<T>) {
953        let mut watch = self.machine.applier.watch();
954        let batch = self
955            .machine
956            .next_listen_batch(frontier, &mut watch, None, None)
957            .await;
958        if PartialOrder::less_than(&self.upper, batch.desc.upper()) {
959            self.upper.clone_from(batch.desc.upper());
960        }
961        assert!(PartialOrder::less_than(frontier, &self.upper));
962    }
963
964    /// Politely expires this writer, releasing any associated state.
965    ///
966    /// There is a best-effort impl in Drop to expire a writer that wasn't
967    /// explictly expired with this method. When possible, explicit expiry is
968    /// still preferred because the Drop one is best effort and is dependant on
969    /// a tokio [Handle] being available in the TLC at the time of drop (which
970    /// is a bit subtle). Also, explicit expiry allows for control over when it
971    /// happens.
972    #[instrument(level = "debug", fields(shard = %self.machine.shard_id()))]
973    pub async fn expire(mut self) {
974        let Some(expire_fn) = self.expire_fn.take() else {
975            return;
976        };
977        expire_fn.0().await;
978    }
979
980    fn expire_fn(
981        machine: Machine<K, V, T, D>,
982        gc: GarbageCollector<K, V, T, D>,
983        writer_id: WriterId,
984    ) -> ExpireFn {
985        ExpireFn(Box::new(move || {
986            Box::pin(async move {
987                let (_, maintenance) = machine.expire_writer(&writer_id).await;
988                maintenance.start_performing(&machine, &gc);
989            })
990        }))
991    }
992
993    /// Test helper for an [Self::append] call that is expected to succeed.
994    #[cfg(test)]
995    #[track_caller]
996    pub async fn expect_append<L, U>(&mut self, updates: &[((K, V), T, D)], lower: L, new_upper: U)
997    where
998        L: Into<Antichain<T>>,
999        U: Into<Antichain<T>>,
1000        D: Send + Sync,
1001    {
1002        self.append(updates.iter(), lower.into(), new_upper.into())
1003            .await
1004            .expect("invalid usage")
1005            .expect("unexpected upper");
1006    }
1007
1008    /// Test helper for a [Self::compare_and_append] call that is expected to
1009    /// succeed.
1010    #[cfg(test)]
1011    #[track_caller]
1012    pub async fn expect_compare_and_append(
1013        &mut self,
1014        updates: &[((K, V), T, D)],
1015        expected_upper: T,
1016        new_upper: T,
1017    ) where
1018        D: Send + Sync,
1019    {
1020        self.compare_and_append(
1021            updates.iter().map(|((k, v), t, d)| ((k, v), t, d)),
1022            Antichain::from_elem(expected_upper),
1023            Antichain::from_elem(new_upper),
1024        )
1025        .await
1026        .expect("invalid usage")
1027        .expect("unexpected upper")
1028    }
1029
1030    /// Test helper for a [Self::compare_and_append_batch] call that is expected
1031    /// to succeed.
1032    #[cfg(test)]
1033    #[track_caller]
1034    pub async fn expect_compare_and_append_batch(
1035        &mut self,
1036        batches: &mut [&mut Batch<K, V, T, D>],
1037        expected_upper: T,
1038        new_upper: T,
1039    ) {
1040        self.compare_and_append_batch(
1041            batches,
1042            Antichain::from_elem(expected_upper),
1043            Antichain::from_elem(new_upper),
1044            true,
1045        )
1046        .await
1047        .expect("invalid usage")
1048        .expect("unexpected upper")
1049    }
1050
1051    /// Test helper for an [Self::append] call that is expected to succeed.
1052    #[cfg(test)]
1053    #[track_caller]
1054    pub async fn expect_batch(
1055        &mut self,
1056        updates: &[((K, V), T, D)],
1057        lower: T,
1058        upper: T,
1059    ) -> Batch<K, V, T, D> {
1060        self.batch(
1061            updates.iter(),
1062            Antichain::from_elem(lower),
1063            Antichain::from_elem(upper),
1064        )
1065        .await
1066        .expect("invalid usage")
1067    }
1068}
1069
1070impl<K: Codec, V: Codec, T, D> Drop for WriteHandle<K, V, T, D> {
1071    fn drop(&mut self) {
1072        let Some(expire_fn) = self.expire_fn.take() else {
1073            return;
1074        };
1075        let handle = match Handle::try_current() {
1076            Ok(x) => x,
1077            Err(_) => {
1078                warn!(
1079                    "WriteHandle {} dropped without being explicitly expired, falling back to lease timeout",
1080                    self.writer_id
1081                );
1082                return;
1083            }
1084        };
1085        // Spawn a best-effort task to expire this write handle. It's fine if
1086        // this doesn't run to completion, we'd just have to wait out the lease
1087        // before the shard-global since is unblocked.
1088        //
1089        // Intentionally create the span outside the task to set the parent.
1090        let expire_span = debug_span!("drop::expire");
1091        handle.spawn_named(
1092            || format!("WriteHandle::expire ({})", self.writer_id),
1093            expire_fn.0().instrument(expire_span),
1094        );
1095    }
1096}
1097
1098/// Ensure the given batch uses the given schema ID.
1099///
1100/// If the batch has no schema set, initialize it to the given one.
1101/// If the batch has a schema set, assert that it matches the given one.
1102fn ensure_batch_schema<T>(batch: &mut HollowBatch<T>, shard_id: ShardId, schema_id: SchemaId)
1103where
1104    T: Timestamp + Lattice + Codec64,
1105{
1106    let ensure = |id: &mut Option<SchemaId>| match id {
1107        Some(id) => assert_eq!(*id, schema_id, "schema ID mismatch; shard={shard_id}"),
1108        None => *id = Some(schema_id),
1109    };
1110
1111    for run_meta in &mut batch.run_meta {
1112        ensure(&mut run_meta.schema);
1113    }
1114    for part in &mut batch.parts {
1115        match part {
1116            RunPart::Single(BatchPart::Hollow(part)) => ensure(&mut part.schema_id),
1117            RunPart::Single(BatchPart::Inline { schema_id, .. }) => ensure(schema_id),
1118            RunPart::Many(_hollow_run_ref) => {
1119                // TODO: Fetch the parts in this run and rewrite them too. Alternatively, make
1120                // `run_meta` the only place we keep schema IDs, so rewriting parts isn't
1121                // necessary.
1122            }
1123        }
1124    }
1125}
1126
1127#[cfg(test)]
1128mod tests {
1129    use std::str::FromStr;
1130    use std::sync::mpsc;
1131
1132    use differential_dataflow::consolidation::consolidate_updates;
1133    use futures_util::FutureExt;
1134    use mz_dyncfg::ConfigUpdates;
1135    use mz_ore::collections::CollectionExt;
1136    use mz_ore::task;
1137    use serde_json::json;
1138
1139    use crate::cache::PersistClientCache;
1140    use crate::tests::{all_ok, new_test_client};
1141    use crate::{PersistLocation, ShardId};
1142
1143    use super::*;
1144
1145    #[mz_persist_proc::test(tokio::test)]
1146    #[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented
1147    async fn empty_batches(dyncfgs: ConfigUpdates) {
1148        let data = [
1149            (("1".to_owned(), "one".to_owned()), 1, 1),
1150            (("2".to_owned(), "two".to_owned()), 2, 1),
1151            (("3".to_owned(), "three".to_owned()), 3, 1),
1152        ];
1153
1154        let (mut write, _) = new_test_client(&dyncfgs)
1155            .await
1156            .expect_open::<String, String, u64, i64>(ShardId::new())
1157            .await;
1158        let blob = Arc::clone(&write.blob);
1159
1160        // Write an initial batch.
1161        let mut upper = 3;
1162        write.expect_append(&data[..2], vec![0], vec![upper]).await;
1163
1164        // Write a bunch of empty batches. This shouldn't write blobs, so the count should stay the same.
1165        let mut count_before = 0;
1166        blob.list_keys_and_metadata("", &mut |_| {
1167            count_before += 1;
1168        })
1169        .await
1170        .expect("list_keys failed");
1171        for _ in 0..5 {
1172            let new_upper = upper + 1;
1173            write.expect_compare_and_append(&[], upper, new_upper).await;
1174            upper = new_upper;
1175        }
1176        let mut count_after = 0;
1177        blob.list_keys_and_metadata("", &mut |_| {
1178            count_after += 1;
1179        })
1180        .await
1181        .expect("list_keys failed");
1182        assert_eq!(count_after, count_before);
1183    }
1184
1185    #[mz_persist_proc::test(tokio::test)]
1186    #[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented
1187    async fn compare_and_append_batch_multi(dyncfgs: ConfigUpdates) {
1188        let data0 = vec![
1189            (("1".to_owned(), "one".to_owned()), 1, 1),
1190            (("2".to_owned(), "two".to_owned()), 2, 1),
1191            (("4".to_owned(), "four".to_owned()), 4, 1),
1192        ];
1193        let data1 = vec![
1194            (("1".to_owned(), "one".to_owned()), 1, 1),
1195            (("2".to_owned(), "two".to_owned()), 2, 1),
1196            (("3".to_owned(), "three".to_owned()), 3, 1),
1197        ];
1198
1199        let (mut write, mut read) = new_test_client(&dyncfgs)
1200            .await
1201            .expect_open::<String, String, u64, i64>(ShardId::new())
1202            .await;
1203
1204        let mut batch0 = write.expect_batch(&data0, 0, 5).await;
1205        let mut batch1 = write.expect_batch(&data1, 0, 4).await;
1206
1207        write
1208            .expect_compare_and_append_batch(&mut [&mut batch0, &mut batch1], 0, 4)
1209            .await;
1210
1211        let batch = write
1212            .machine
1213            .snapshot(&Antichain::from_elem(3))
1214            .await
1215            .expect("just wrote this")
1216            .into_element();
1217
1218        assert!(batch.runs().count() >= 2);
1219
1220        let expected = vec![
1221            (("1".to_owned(), "one".to_owned()), 1, 2),
1222            (("2".to_owned(), "two".to_owned()), 2, 2),
1223            (("3".to_owned(), "three".to_owned()), 3, 1),
1224        ];
1225        let mut actual = read.expect_snapshot_and_fetch(3).await;
1226        consolidate_updates(&mut actual);
1227        assert_eq!(actual, all_ok(&expected, 3));
1228    }
1229
1230    #[mz_ore::test]
1231    fn writer_id_human_readable_serde() {
1232        #[derive(Debug, Serialize, Deserialize)]
1233        struct Container {
1234            writer_id: WriterId,
1235        }
1236
1237        // roundtrip through json
1238        let id = WriterId::from_str("w00000000-1234-5678-0000-000000000000").expect("valid id");
1239        assert_eq!(
1240            id,
1241            serde_json::from_value(serde_json::to_value(id.clone()).expect("serializable"))
1242                .expect("deserializable")
1243        );
1244
1245        // deserialize a serialized string directly
1246        assert_eq!(
1247            id,
1248            serde_json::from_str("\"w00000000-1234-5678-0000-000000000000\"")
1249                .expect("deserializable")
1250        );
1251
1252        // roundtrip id through a container type
1253        let json = json!({ "writer_id": id });
1254        assert_eq!(
1255            "{\"writer_id\":\"w00000000-1234-5678-0000-000000000000\"}",
1256            &json.to_string()
1257        );
1258        let container: Container = serde_json::from_value(json).expect("deserializable");
1259        assert_eq!(container.writer_id, id);
1260    }
1261
1262    #[mz_persist_proc::test(tokio::test)]
1263    #[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented
1264    async fn hollow_batch_roundtrip(dyncfgs: ConfigUpdates) {
1265        let data = vec![
1266            (("1".to_owned(), "one".to_owned()), 1, 1),
1267            (("2".to_owned(), "two".to_owned()), 2, 1),
1268            (("3".to_owned(), "three".to_owned()), 3, 1),
1269        ];
1270
1271        let (mut write, mut read) = new_test_client(&dyncfgs)
1272            .await
1273            .expect_open::<String, String, u64, i64>(ShardId::new())
1274            .await;
1275
1276        // This test is a bit more complex than it should be. It would be easier
1277        // if we could just compare the rehydrated batch to the original batch.
1278        // But a) turning a batch into a hollow batch consumes it, and b) Batch
1279        // doesn't have Eq/PartialEq.
1280        let batch = write.expect_batch(&data, 0, 4).await;
1281        let hollow_batch = batch.into_transmittable_batch();
1282        let mut rehydrated_batch = write.batch_from_transmittable_batch(hollow_batch);
1283
1284        write
1285            .expect_compare_and_append_batch(&mut [&mut rehydrated_batch], 0, 4)
1286            .await;
1287
1288        let expected = vec![
1289            (("1".to_owned(), "one".to_owned()), 1, 1),
1290            (("2".to_owned(), "two".to_owned()), 2, 1),
1291            (("3".to_owned(), "three".to_owned()), 3, 1),
1292        ];
1293        let mut actual = read.expect_snapshot_and_fetch(3).await;
1294        consolidate_updates(&mut actual);
1295        assert_eq!(actual, all_ok(&expected, 3));
1296    }
1297
1298    #[mz_persist_proc::test(tokio::test)]
1299    #[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented
1300    async fn wait_for_upper_past(dyncfgs: ConfigUpdates) {
1301        let client = new_test_client(&dyncfgs).await;
1302        let (mut write, _) = client.expect_open::<(), (), u64, i64>(ShardId::new()).await;
1303        let five = Antichain::from_elem(5);
1304
1305        // Upper is not past 5.
1306        assert_eq!(write.wait_for_upper_past(&five).now_or_never(), None);
1307
1308        // Upper is still not past 5.
1309        write
1310            .expect_compare_and_append(&[(((), ()), 1, 1)], 0, 5)
1311            .await;
1312        assert_eq!(write.wait_for_upper_past(&five).now_or_never(), None);
1313
1314        // Upper is past 5.
1315        write
1316            .expect_compare_and_append(&[(((), ()), 5, 1)], 5, 7)
1317            .await;
1318        assert_eq!(write.wait_for_upper_past(&five).now_or_never(), Some(()));
1319        assert_eq!(write.upper(), &Antichain::from_elem(7));
1320
1321        // Waiting for previous uppers does not regress the handle's cached
1322        // upper.
1323        assert_eq!(
1324            write
1325                .wait_for_upper_past(&Antichain::from_elem(2))
1326                .now_or_never(),
1327            Some(())
1328        );
1329        assert_eq!(write.upper(), &Antichain::from_elem(7));
1330    }
1331
1332    #[mz_ore::test(tokio::test(flavor = "multi_thread"))]
1333    #[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented
1334    async fn fetch_recent_upper_linearized() {
1335        type Timestamp = u64;
1336        let max_upper = 1000;
1337
1338        let shard_id = ShardId::new();
1339        let mut clients = PersistClientCache::new_no_metrics();
1340        let upper_writer_client = clients.open(PersistLocation::new_in_mem()).await.unwrap();
1341        let (mut upper_writer, _) = upper_writer_client
1342            .expect_open::<(), (), Timestamp, i64>(shard_id)
1343            .await;
1344        // Clear the state cache between each client to maximally disconnect
1345        // them from each other.
1346        clients.clear_state_cache();
1347        let upper_reader_client = clients.open(PersistLocation::new_in_mem()).await.unwrap();
1348        let (mut upper_reader, _) = upper_reader_client
1349            .expect_open::<(), (), Timestamp, i64>(shard_id)
1350            .await;
1351        let (tx, rx) = mpsc::channel();
1352
1353        let task = task::spawn(|| "upper-reader", async move {
1354            let mut upper = Timestamp::MIN;
1355
1356            while upper < max_upper {
1357                while let Ok(new_upper) = rx.try_recv() {
1358                    upper = new_upper;
1359                }
1360
1361                let recent_upper = upper_reader
1362                    .fetch_recent_upper()
1363                    .await
1364                    .as_option()
1365                    .cloned()
1366                    .expect("u64 is totally ordered and the shard is not finalized");
1367                assert!(
1368                    recent_upper >= upper,
1369                    "recent upper {recent_upper:?} is less than known upper {upper:?}"
1370                );
1371            }
1372        });
1373
1374        for upper in Timestamp::MIN..max_upper {
1375            let next_upper = upper + 1;
1376            upper_writer
1377                .expect_compare_and_append(&[], upper, next_upper)
1378                .await;
1379            tx.send(next_upper).expect("send failed");
1380        }
1381
1382        task.await.expect("await failed");
1383    }
1384}