mz_persist_client/
write.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Write capabilities and handles
11
12use std::borrow::Borrow;
13use std::fmt::Debug;
14use std::sync::Arc;
15
16use differential_dataflow::difference::Monoid;
17use differential_dataflow::lattice::Lattice;
18use differential_dataflow::trace::Description;
19use futures::StreamExt;
20use futures::stream::FuturesUnordered;
21use mz_dyncfg::Config;
22use mz_ore::instrument;
23use mz_ore::task::RuntimeExt;
24use mz_persist::location::Blob;
25use mz_persist_types::schema::SchemaId;
26use mz_persist_types::{Codec, Codec64};
27use mz_proto::{IntoRustIfSome, ProtoType};
28use proptest_derive::Arbitrary;
29use semver::Version;
30use serde::{Deserialize, Serialize};
31use timely::PartialOrder;
32use timely::order::TotalOrder;
33use timely::progress::{Antichain, Timestamp};
34use tokio::runtime::Handle;
35use tracing::{Instrument, debug_span, info, warn};
36use uuid::Uuid;
37
38use crate::batch::{
39    Added, BATCH_DELETE_ENABLED, Batch, BatchBuilder, BatchBuilderConfig, BatchBuilderInternal,
40    BatchParts, ProtoBatch, validate_truncate_batch,
41};
42use crate::error::{InvalidUsage, UpperMismatch};
43use crate::fetch::{
44    EncodedPart, FetchBatchFilter, FetchedPart, PartDecodeFormat, VALIDATE_PART_BOUNDS_ON_READ,
45};
46use crate::internal::compact::{CompactConfig, Compactor};
47use crate::internal::encoding::{Schemas, assert_code_can_read_data};
48use crate::internal::machine::{CompareAndAppendRes, ExpireFn, Machine};
49use crate::internal::metrics::{BatchWriteMetrics, Metrics, ShardMetrics};
50use crate::internal::state::{BatchPart, HandleDebugState, HollowBatch, RunOrder, RunPart};
51use crate::read::ReadHandle;
52use crate::schema::PartMigration;
53use crate::{GarbageCollector, IsolatedRuntime, PersistConfig, ShardId, parse_id};
54
55pub(crate) const COMBINE_INLINE_WRITES: Config<bool> = Config::new(
56    "persist_write_combine_inline_writes",
57    true,
58    "If set, re-encode inline writes if they don't fit into the batch metadata limits.",
59);
60
61pub(crate) const VALIDATE_PART_BOUNDS_ON_WRITE: Config<bool> = Config::new(
62    "persist_validate_part_bounds_on_write",
63    true,
64    "Validate the part lower <= the batch lower and the part upper <= batch upper,\
65    for the batch being appended.",
66);
67
68/// An opaque identifier for a writer of a persist durable TVC (aka shard).
69#[derive(Arbitrary, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
70#[serde(try_from = "String", into = "String")]
71pub struct WriterId(pub(crate) [u8; 16]);
72
73impl std::fmt::Display for WriterId {
74    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
75        write!(f, "w{}", Uuid::from_bytes(self.0))
76    }
77}
78
79impl std::fmt::Debug for WriterId {
80    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
81        write!(f, "WriterId({})", Uuid::from_bytes(self.0))
82    }
83}
84
85impl std::str::FromStr for WriterId {
86    type Err = String;
87
88    fn from_str(s: &str) -> Result<Self, Self::Err> {
89        parse_id("w", "WriterId", s).map(WriterId)
90    }
91}
92
93impl From<WriterId> for String {
94    fn from(writer_id: WriterId) -> Self {
95        writer_id.to_string()
96    }
97}
98
99impl TryFrom<String> for WriterId {
100    type Error = String;
101
102    fn try_from(s: String) -> Result<Self, Self::Error> {
103        s.parse()
104    }
105}
106
107impl WriterId {
108    pub(crate) fn new() -> Self {
109        WriterId(*Uuid::new_v4().as_bytes())
110    }
111}
112
113/// A "capability" granting the ability to apply updates to some shard at times
114/// greater or equal to `self.upper()`.
115///
116/// All async methods on ReadHandle retry for as long as they are able, but the
117/// returned [std::future::Future]s implement "cancel on drop" semantics. This
118/// means that callers can add a timeout using [tokio::time::timeout] or
119/// [tokio::time::timeout_at].
120///
121/// ```rust,no_run
122/// # let mut write: mz_persist_client::write::WriteHandle<String, String, u64, i64> = unimplemented!();
123/// # let timeout: std::time::Duration = unimplemented!();
124/// # async {
125/// tokio::time::timeout(timeout, write.fetch_recent_upper()).await
126/// # };
127/// ```
128#[derive(Debug)]
129pub struct WriteHandle<K: Codec, V: Codec, T, D> {
130    pub(crate) cfg: PersistConfig,
131    pub(crate) metrics: Arc<Metrics>,
132    pub(crate) machine: Machine<K, V, T, D>,
133    pub(crate) gc: GarbageCollector<K, V, T, D>,
134    pub(crate) compact: Option<Compactor<K, V, T, D>>,
135    pub(crate) blob: Arc<dyn Blob>,
136    pub(crate) isolated_runtime: Arc<IsolatedRuntime>,
137    pub(crate) writer_id: WriterId,
138    pub(crate) debug_state: HandleDebugState,
139    pub(crate) write_schemas: Schemas<K, V>,
140
141    pub(crate) upper: Antichain<T>,
142    expire_fn: Option<ExpireFn>,
143}
144
145impl<K, V, T, D> WriteHandle<K, V, T, D>
146where
147    K: Debug + Codec,
148    V: Debug + Codec,
149    T: Timestamp + TotalOrder + Lattice + Codec64 + Sync,
150    D: Monoid + Ord + Codec64 + Send + Sync,
151{
152    pub(crate) fn new(
153        cfg: PersistConfig,
154        metrics: Arc<Metrics>,
155        machine: Machine<K, V, T, D>,
156        gc: GarbageCollector<K, V, T, D>,
157        blob: Arc<dyn Blob>,
158        writer_id: WriterId,
159        purpose: &str,
160        write_schemas: Schemas<K, V>,
161    ) -> Self {
162        let isolated_runtime = Arc::clone(&machine.isolated_runtime);
163        let compact = cfg
164            .compaction_enabled
165            .then(|| Compactor::new(cfg.clone(), Arc::clone(&metrics), gc.clone()));
166        let debug_state = HandleDebugState {
167            hostname: cfg.hostname.to_owned(),
168            purpose: purpose.to_owned(),
169        };
170        let upper = machine.applier.clone_upper();
171        let expire_fn = Self::expire_fn(machine.clone(), gc.clone(), writer_id.clone());
172        WriteHandle {
173            cfg,
174            metrics,
175            machine,
176            gc,
177            compact,
178            blob,
179            isolated_runtime,
180            writer_id,
181            debug_state,
182            write_schemas,
183            upper,
184            expire_fn: Some(expire_fn),
185        }
186    }
187
188    /// Creates a [WriteHandle] for the same shard from an existing
189    /// [ReadHandle].
190    pub fn from_read(read: &ReadHandle<K, V, T, D>, purpose: &str) -> Self {
191        Self::new(
192            read.cfg.clone(),
193            Arc::clone(&read.metrics),
194            read.machine.clone(),
195            read.gc.clone(),
196            Arc::clone(&read.blob),
197            WriterId::new(),
198            purpose,
199            read.read_schemas.clone(),
200        )
201    }
202
203    /// Whether or not this WriteHandle supports writing without enforcing batch
204    /// bounds checks.
205    pub fn validate_part_bounds_on_write(&self) -> bool {
206        VALIDATE_PART_BOUNDS_ON_WRITE.get(&self.cfg) && VALIDATE_PART_BOUNDS_ON_READ.get(&self.cfg)
207    }
208
209    /// This handle's shard id.
210    pub fn shard_id(&self) -> ShardId {
211        self.machine.shard_id()
212    }
213
214    /// Returns the schema of this writer.
215    pub fn schema_id(&self) -> Option<SchemaId> {
216        self.write_schemas.id
217    }
218
219    /// Registers the write schema, if it isn't already registered.
220    ///
221    /// # Panics
222    ///
223    /// This method expects that either the shard doesn't yet have any schema registered, or one of
224    /// the registered schemas is the same as the write schema. If all registered schemas are
225    /// different from the write schema, it panics.
226    pub async fn ensure_schema_registered(&mut self) -> SchemaId {
227        let Schemas { id, key, val } = &self.write_schemas;
228
229        if let Some(id) = id {
230            return *id;
231        }
232
233        let (schema_id, maintenance) = self.machine.register_schema(key, val).await;
234        maintenance.start_performing(&self.machine, &self.gc);
235
236        let Some(schema_id) = schema_id else {
237            panic!("unable to register schemas: {key:?} {val:?}");
238        };
239
240        self.write_schemas.id = Some(schema_id);
241        schema_id
242    }
243
244    /// A cached version of the shard-global `upper` frontier.
245    ///
246    /// This is the most recent upper discovered by this handle. It is
247    /// potentially more stale than [Self::shared_upper] but is lock-free and
248    /// allocation-free. This will always be less or equal to the shard-global
249    /// `upper`.
250    pub fn upper(&self) -> &Antichain<T> {
251        &self.upper
252    }
253
254    /// A less-stale cached version of the shard-global `upper` frontier.
255    ///
256    /// This is the most recently known upper for this shard process-wide, but
257    /// unlike [Self::upper] it requires a mutex and a clone. This will always be
258    /// less or equal to the shard-global `upper`.
259    pub fn shared_upper(&self) -> Antichain<T> {
260        self.machine.applier.clone_upper()
261    }
262
263    /// Fetches and returns a recent shard-global `upper`. Importantly, this operation is
264    /// linearized with write operations.
265    ///
266    /// This requires fetching the latest state from consensus and is therefore a potentially
267    /// expensive operation.
268    #[instrument(level = "debug", fields(shard = %self.machine.shard_id()))]
269    pub async fn fetch_recent_upper(&mut self) -> &Antichain<T> {
270        // TODO: Do we even need to track self.upper on WriteHandle or could
271        // WriteHandle::upper just get the one out of machine?
272        self.machine
273            .applier
274            .fetch_upper(|current_upper| self.upper.clone_from(current_upper))
275            .await;
276        &self.upper
277    }
278
279    /// Advance the shard's upper by the given frontier.
280    ///
281    /// If the provided `target` is less than or equal to the shard's upper, this is a no-op.
282    ///
283    /// In contrast to the various compare-and-append methods, this method does not require the
284    /// handle's write schema to be registered with the shard. That is, it is fine to use a dummy
285    /// schema when creating a writer just to advance a shard upper.
286    pub async fn advance_upper(&mut self, target: &Antichain<T>) {
287        // We avoid `fetch_recent_upper` here, to avoid a consensus roundtrip if the known upper is
288        // already beyond the target.
289        let mut lower = self.shared_upper().clone();
290
291        while !PartialOrder::less_equal(target, &lower) {
292            let since = Antichain::from_elem(T::minimum());
293            let desc = Description::new(lower.clone(), target.clone(), since);
294            let batch = HollowBatch::empty(desc);
295
296            let heartbeat_timestamp = (self.cfg.now)();
297            let res = self
298                .machine
299                .compare_and_append(
300                    &batch,
301                    &self.writer_id,
302                    &self.debug_state,
303                    heartbeat_timestamp,
304                )
305                .await;
306
307            use CompareAndAppendRes::*;
308            let new_upper = match res {
309                Success(_seq_no, maintenance) => {
310                    maintenance.start_performing(&self.machine, &self.gc, self.compact.as_ref());
311                    batch.desc.upper().clone()
312                }
313                UpperMismatch(_seq_no, actual_upper) => actual_upper,
314                InvalidUsage(_invalid_usage) => unreachable!("batch bounds checked above"),
315                InlineBackpressure => unreachable!("batch was empty"),
316            };
317
318            self.upper.clone_from(&new_upper);
319            lower = new_upper;
320        }
321    }
322
323    /// Applies `updates` to this shard and downgrades this handle's upper to
324    /// `upper`.
325    ///
326    /// The innermost `Result` is `Ok` if the updates were successfully written.
327    /// If not, an `Upper` err containing the current writer upper is returned.
328    /// If that happens, we also update our local `upper` to match the current
329    /// upper. This is useful in cases where a timeout happens in between a
330    /// successful write and returning that to the client.
331    ///
332    /// In contrast to [Self::compare_and_append], multiple [WriteHandle]s may
333    /// be used concurrently to write to the same shard, but in this case, the
334    /// data being written must be identical (in the sense of "definite"-ness).
335    /// It's intended for replicated use by source ingestion, sinks, etc.
336    ///
337    /// All times in `updates` must be greater or equal to `lower` and not
338    /// greater or equal to `upper`. A `upper` of the empty antichain "finishes"
339    /// this shard, promising that no more data is ever incoming.
340    ///
341    /// `updates` may be empty, which allows for downgrading `upper` to
342    /// communicate progress. It is possible to call this with `upper` equal to
343    /// `self.upper()` and an empty `updates` (making the call a no-op).
344    ///
345    /// This uses a bounded amount of memory, even when `updates` is very large.
346    /// Individual records, however, should be small enough that we can
347    /// reasonably chunk them up: O(KB) is definitely fine, O(MB) come talk to
348    /// us.
349    ///
350    /// The clunky multi-level Result is to enable more obvious error handling
351    /// in the caller. See <http://sled.rs/errors.html> for details.
352    #[instrument(level = "trace", fields(shard = %self.machine.shard_id()))]
353    pub async fn append<SB, KB, VB, TB, DB, I>(
354        &mut self,
355        updates: I,
356        lower: Antichain<T>,
357        upper: Antichain<T>,
358    ) -> Result<Result<(), UpperMismatch<T>>, InvalidUsage<T>>
359    where
360        SB: Borrow<((KB, VB), TB, DB)>,
361        KB: Borrow<K>,
362        VB: Borrow<V>,
363        TB: Borrow<T>,
364        DB: Borrow<D>,
365        I: IntoIterator<Item = SB>,
366        D: Send + Sync,
367    {
368        let batch = self.batch(updates, lower.clone(), upper.clone()).await?;
369        self.append_batch(batch, lower, upper).await
370    }
371
372    /// Applies `updates` to this shard and downgrades this handle's upper to
373    /// `new_upper` iff the current global upper of this shard is
374    /// `expected_upper`.
375    ///
376    /// The innermost `Result` is `Ok` if the updates were successfully written.
377    /// If not, an `Upper` err containing the current global upper is returned.
378    ///
379    /// In contrast to [Self::append], this linearizes mutations from all
380    /// writers. It's intended for use as an atomic primitive for timestamp
381    /// bindings, SQL tables, etc.
382    ///
383    /// All times in `updates` must be greater or equal to `expected_upper` and
384    /// not greater or equal to `new_upper`. A `new_upper` of the empty
385    /// antichain "finishes" this shard, promising that no more data is ever
386    /// incoming.
387    ///
388    /// `updates` may be empty, which allows for downgrading `upper` to
389    /// communicate progress. It is possible to heartbeat a writer lease by
390    /// calling this with `new_upper` equal to `self.upper()` and an empty
391    /// `updates` (making the call a no-op).
392    ///
393    /// This uses a bounded amount of memory, even when `updates` is very large.
394    /// Individual records, however, should be small enough that we can
395    /// reasonably chunk them up: O(KB) is definitely fine, O(MB) come talk to
396    /// us.
397    ///
398    /// The clunky multi-level Result is to enable more obvious error handling
399    /// in the caller. See <http://sled.rs/errors.html> for details.
400    #[instrument(level = "trace", fields(shard = %self.machine.shard_id()))]
401    pub async fn compare_and_append<SB, KB, VB, TB, DB, I>(
402        &mut self,
403        updates: I,
404        expected_upper: Antichain<T>,
405        new_upper: Antichain<T>,
406    ) -> Result<Result<(), UpperMismatch<T>>, InvalidUsage<T>>
407    where
408        SB: Borrow<((KB, VB), TB, DB)>,
409        KB: Borrow<K>,
410        VB: Borrow<V>,
411        TB: Borrow<T>,
412        DB: Borrow<D>,
413        I: IntoIterator<Item = SB>,
414        D: Send + Sync,
415    {
416        let mut batch = self
417            .batch(updates, expected_upper.clone(), new_upper.clone())
418            .await?;
419        match self
420            .compare_and_append_batch(&mut [&mut batch], expected_upper, new_upper, true)
421            .await
422        {
423            ok @ Ok(Ok(())) => ok,
424            err => {
425                // We cannot delete the batch in compare_and_append_batch()
426                // because the caller owns the batch and might want to retry
427                // with a different `expected_upper`. In this function, we
428                // control the batch, so we have to delete it.
429                batch.delete().await;
430                err
431            }
432        }
433    }
434
435    /// Appends the batch of updates to the shard and downgrades this handle's
436    /// upper to `upper`.
437    ///
438    /// The innermost `Result` is `Ok` if the updates were successfully written.
439    /// If not, an `Upper` err containing the current writer upper is returned.
440    /// If that happens, we also update our local `upper` to match the current
441    /// upper. This is useful in cases where a timeout happens in between a
442    /// successful write and returning that to the client.
443    ///
444    /// In contrast to [Self::compare_and_append_batch], multiple [WriteHandle]s
445    /// may be used concurrently to write to the same shard, but in this case,
446    /// the data being written must be identical (in the sense of
447    /// "definite"-ness). It's intended for replicated use by source ingestion,
448    /// sinks, etc.
449    ///
450    /// A `upper` of the empty antichain "finishes" this shard, promising that
451    /// no more data is ever incoming.
452    ///
453    /// The batch may be empty, which allows for downgrading `upper` to
454    /// communicate progress. It is possible to heartbeat a writer lease by
455    /// calling this with `upper` equal to `self.upper()` and an empty `updates`
456    /// (making the call a no-op).
457    ///
458    /// The clunky multi-level Result is to enable more obvious error handling
459    /// in the caller. See <http://sled.rs/errors.html> for details.
460    #[instrument(level = "trace", fields(shard = %self.machine.shard_id()))]
461    pub async fn append_batch(
462        &mut self,
463        mut batch: Batch<K, V, T, D>,
464        mut lower: Antichain<T>,
465        upper: Antichain<T>,
466    ) -> Result<Result<(), UpperMismatch<T>>, InvalidUsage<T>>
467    where
468        D: Send + Sync,
469    {
470        loop {
471            let res = self
472                .compare_and_append_batch(&mut [&mut batch], lower.clone(), upper.clone(), true)
473                .await?;
474            match res {
475                Ok(()) => {
476                    self.upper = upper;
477                    return Ok(Ok(()));
478                }
479                Err(mismatch) => {
480                    // We tried to to a non-contiguous append, that won't work.
481                    if PartialOrder::less_than(&mismatch.current, &lower) {
482                        self.upper.clone_from(&mismatch.current);
483
484                        batch.delete().await;
485
486                        return Ok(Err(mismatch));
487                    } else if PartialOrder::less_than(&mismatch.current, &upper) {
488                        // Cut down the Description by advancing its lower to the current shard
489                        // upper and try again. IMPORTANT: We can only advance the lower, meaning
490                        // we cut updates away, we must not "extend" the batch by changing to a
491                        // lower that is not beyond the current lower. This invariant is checked by
492                        // the first if branch: if `!(current_upper < lower)` then it holds that
493                        // `lower <= current_upper`.
494                        lower = mismatch.current;
495                    } else {
496                        // We already have updates past this batch's upper, the append is a no-op.
497                        self.upper = mismatch.current;
498
499                        // Because we return a success result, the caller will
500                        // think that the batch was consumed or otherwise used,
501                        // so we have to delete it here.
502                        batch.delete().await;
503
504                        return Ok(Ok(()));
505                    }
506                }
507            }
508        }
509    }
510
511    /// Appends the batch of updates to the shard and downgrades this handle's
512    /// upper to `new_upper` iff the current global upper of this shard is
513    /// `expected_upper`.
514    ///
515    /// The innermost `Result` is `Ok` if the batch was successfully written. If
516    /// not, an `Upper` err containing the current global upper is returned.
517    ///
518    /// In contrast to [Self::append_batch], this linearizes mutations from all
519    /// writers. It's intended for use as an atomic primitive for timestamp
520    /// bindings, SQL tables, etc.
521    ///
522    /// A `new_upper` of the empty antichain "finishes" this shard, promising
523    /// that no more data is ever incoming.
524    ///
525    /// The batch may be empty, which allows for downgrading `upper` to
526    /// communicate progress. It is possible to heartbeat a writer lease by
527    /// calling this with `new_upper` equal to `self.upper()` and an empty
528    /// `updates` (making the call a no-op).
529    ///
530    /// IMPORTANT: In case of an erroneous result the caller is responsible for
531    /// the lifecycle of the `batch`. It can be deleted or it can be used to
532    /// retry with adjusted frontiers.
533    ///
534    /// The clunky multi-level Result is to enable more obvious error handling
535    /// in the caller. See <http://sled.rs/errors.html> for details.
536    ///
537    /// If the `enforce_matching_batch_boundaries` flag is set to `false`:
538    /// We no longer validate that every batch covers the entire range between
539    /// the expected and new uppers, as we wish to allow combining batches that
540    /// cover different subsets of that range, including subsets of that range
541    /// that include no data at all. The caller is responsible for guaranteeing
542    /// that the set of batches provided collectively include all updates for
543    /// the entire range between the expected and new upper.
544    #[instrument(level = "debug", fields(shard = %self.machine.shard_id()))]
545    pub async fn compare_and_append_batch(
546        &mut self,
547        batches: &mut [&mut Batch<K, V, T, D>],
548        expected_upper: Antichain<T>,
549        new_upper: Antichain<T>,
550        validate_part_bounds_on_write: bool,
551    ) -> Result<Result<(), UpperMismatch<T>>, InvalidUsage<T>>
552    where
553        D: Send + Sync,
554    {
555        // Before we append any data, we require a registered write schema.
556        let schema_id = self.ensure_schema_registered().await;
557
558        for batch in batches.iter() {
559            if self.machine.shard_id() != batch.shard_id() {
560                return Err(InvalidUsage::BatchNotFromThisShard {
561                    batch_shard: batch.shard_id(),
562                    handle_shard: self.machine.shard_id(),
563                });
564            }
565            assert_code_can_read_data(&self.cfg.build_version, &batch.version);
566            if self.cfg.build_version > batch.version {
567                info!(
568                    shard_id =? self.machine.shard_id(),
569                    batch_version =? batch.version,
570                    writer_version =? self.cfg.build_version,
571                    "Appending batch from the past. This is fine but should be rare. \
572                    TODO: Error on very old versions once the leaked blob detector exists."
573                )
574            }
575        }
576
577        let lower = expected_upper.clone();
578        let upper = new_upper;
579        let since = Antichain::from_elem(T::minimum());
580        let desc = Description::new(lower, upper, since);
581
582        let mut received_inline_backpressure = false;
583        // Every hollow part must belong to some batch, so we can clean it up when the batch is dropped...
584        // but if we need to merge all our inline parts to a single run in S3, it's not correct to
585        // associate that with any of our individual input batches.
586        // At first, we'll try and put all the inline parts we receive into state... but if we
587        // get backpressured, we retry with this builder set to `Some`, put all our inline data into
588        // it, and ensure it's flushed out to S3 before including it in the batch.
589        let mut inline_batch_builder: Option<(_, BatchBuilder<K, V, T, D>)> = None;
590        let maintenance = loop {
591            let any_batch_rewrite = batches
592                .iter()
593                .any(|x| x.batch.parts.iter().any(|x| x.ts_rewrite().is_some()));
594            let (mut parts, mut num_updates, mut run_splits, mut run_metas) =
595                (vec![], 0, vec![], vec![]);
596            let mut key_storage = None;
597            let mut val_storage = None;
598            for batch in batches.iter() {
599                let () = validate_truncate_batch(
600                    &batch.batch,
601                    &desc,
602                    any_batch_rewrite,
603                    validate_part_bounds_on_write,
604                )?;
605                for (run_meta, run) in batch.batch.runs() {
606                    let start_index = parts.len();
607                    for part in run {
608                        if let (
609                            RunPart::Single(
610                                batch_part @ BatchPart::Inline {
611                                    updates,
612                                    ts_rewrite,
613                                    schema_id: _,
614                                    deprecated_schema_id: _,
615                                },
616                            ),
617                            Some((schema_cache, builder)),
618                        ) = (part, &mut inline_batch_builder)
619                        {
620                            let schema_migration = PartMigration::new(
621                                batch_part,
622                                self.write_schemas.clone(),
623                                schema_cache,
624                            )
625                            .await
626                            .expect("schemas for inline user part");
627
628                            let encoded_part = EncodedPart::from_inline(
629                                &crate::fetch::FetchConfig::from_persist_config(&self.cfg),
630                                &*self.metrics,
631                                self.metrics.read.compaction.clone(),
632                                desc.clone(),
633                                updates,
634                                ts_rewrite.as_ref(),
635                            );
636                            let mut fetched_part = FetchedPart::new(
637                                Arc::clone(&self.metrics),
638                                encoded_part,
639                                schema_migration,
640                                FetchBatchFilter::Compaction {
641                                    since: desc.since().clone(),
642                                },
643                                false,
644                                PartDecodeFormat::Arrow,
645                                None,
646                            );
647
648                            while let Some(((k, v), t, d)) =
649                                fetched_part.next_with_storage(&mut key_storage, &mut val_storage)
650                            {
651                                builder
652                                    .add(
653                                        &k.expect("decoded just-encoded key data"),
654                                        &v.expect("decoded just-encoded val data"),
655                                        &t,
656                                        &d,
657                                    )
658                                    .await
659                                    .expect("re-encoding just-decoded data");
660                            }
661                        } else {
662                            parts.push(part.clone())
663                        }
664                    }
665
666                    let end_index = parts.len();
667
668                    if start_index == end_index {
669                        continue;
670                    }
671
672                    // Mark the boundary if this is not the first run in the batch.
673                    if start_index != 0 {
674                        run_splits.push(start_index);
675                    }
676                    run_metas.push(run_meta.clone());
677                }
678                num_updates += batch.batch.len;
679            }
680
681            let mut flushed_inline_batch = if let Some((_, builder)) = inline_batch_builder.take() {
682                let mut finished = builder
683                    .finish(desc.upper().clone())
684                    .await
685                    .expect("invalid usage");
686                let cfg = BatchBuilderConfig::new(&self.cfg, self.shard_id());
687                finished
688                    .flush_to_blob(
689                        &cfg,
690                        &self.metrics.inline.backpressure,
691                        &self.isolated_runtime,
692                        &self.write_schemas,
693                    )
694                    .await;
695                Some(finished)
696            } else {
697                None
698            };
699
700            if let Some(batch) = &flushed_inline_batch {
701                for (run_meta, run) in batch.batch.runs() {
702                    assert!(run.len() > 0);
703                    let start_index = parts.len();
704                    if start_index != 0 {
705                        run_splits.push(start_index);
706                    }
707                    run_metas.push(run_meta.clone());
708                    parts.extend(run.iter().cloned())
709                }
710            }
711
712            let mut combined_batch =
713                HollowBatch::new(desc.clone(), parts, num_updates, run_metas, run_splits);
714            // The batch may have been written by a writer without a registered schema.
715            // Ensure we have a schema ID in the batch metadata before we append, to avoid type
716            // confusion later.
717            ensure_batch_schema(&mut combined_batch, self.shard_id(), schema_id);
718            let heartbeat_timestamp = (self.cfg.now)();
719            let res = self
720                .machine
721                .compare_and_append(
722                    &combined_batch,
723                    &self.writer_id,
724                    &self.debug_state,
725                    heartbeat_timestamp,
726                )
727                .await;
728
729            match res {
730                CompareAndAppendRes::Success(_seqno, maintenance) => {
731                    self.upper.clone_from(desc.upper());
732                    for batch in batches.iter_mut() {
733                        batch.mark_consumed();
734                    }
735                    if let Some(batch) = &mut flushed_inline_batch {
736                        batch.mark_consumed();
737                    }
738                    break maintenance;
739                }
740                CompareAndAppendRes::InvalidUsage(invalid_usage) => {
741                    if let Some(batch) = flushed_inline_batch.take() {
742                        batch.delete().await;
743                    }
744                    return Err(invalid_usage);
745                }
746                CompareAndAppendRes::UpperMismatch(_seqno, current_upper) => {
747                    if let Some(batch) = flushed_inline_batch.take() {
748                        batch.delete().await;
749                    }
750                    // We tried to to a compare_and_append with the wrong expected upper, that
751                    // won't work. Update the cached upper to the current upper.
752                    self.upper.clone_from(&current_upper);
753                    return Ok(Err(UpperMismatch {
754                        current: current_upper,
755                        expected: expected_upper,
756                    }));
757                }
758                CompareAndAppendRes::InlineBackpressure => {
759                    // We tried to write an inline part, but there was already
760                    // too much in state. Flush it out to s3 and try again.
761                    assert_eq!(received_inline_backpressure, false);
762                    received_inline_backpressure = true;
763                    if COMBINE_INLINE_WRITES.get(&self.cfg) {
764                        inline_batch_builder = Some((
765                            self.machine.applier.schema_cache(),
766                            self.builder(desc.lower().clone()),
767                        ));
768                        continue;
769                    }
770
771                    let cfg = BatchBuilderConfig::new(&self.cfg, self.shard_id());
772                    // We could have a large number of inline parts (imagine the
773                    // sharded persist_sink), do this flushing concurrently.
774                    let flush_batches = batches
775                        .iter_mut()
776                        .map(|batch| async {
777                            batch
778                                .flush_to_blob(
779                                    &cfg,
780                                    &self.metrics.inline.backpressure,
781                                    &self.isolated_runtime,
782                                    &self.write_schemas,
783                                )
784                                .await
785                        })
786                        .collect::<FuturesUnordered<_>>();
787                    let () = flush_batches.collect::<()>().await;
788
789                    for batch in batches.iter() {
790                        assert_eq!(batch.batch.inline_bytes(), 0);
791                    }
792
793                    continue;
794                }
795            }
796        };
797
798        maintenance.start_performing(&self.machine, &self.gc, self.compact.as_ref());
799
800        Ok(Ok(()))
801    }
802
803    /// Turns the given [`ProtoBatch`] back into a [`Batch`] which can be used
804    /// to append it to this shard.
805    pub fn batch_from_transmittable_batch(&self, batch: ProtoBatch) -> Batch<K, V, T, D> {
806        let shard_id: ShardId = batch
807            .shard_id
808            .into_rust()
809            .expect("valid transmittable batch");
810        assert_eq!(shard_id, self.machine.shard_id());
811
812        let ret = Batch {
813            batch_delete_enabled: BATCH_DELETE_ENABLED.get(&self.cfg),
814            metrics: Arc::clone(&self.metrics),
815            shard_metrics: Arc::clone(&self.machine.applier.shard_metrics),
816            version: Version::parse(&batch.version).expect("valid transmittable batch"),
817            batch: batch
818                .batch
819                .into_rust_if_some("ProtoBatch::batch")
820                .expect("valid transmittable batch"),
821            blob: Arc::clone(&self.blob),
822            _phantom: std::marker::PhantomData,
823        };
824        assert_eq!(ret.shard_id(), self.machine.shard_id());
825        ret
826    }
827
828    /// Returns a [BatchBuilder] that can be used to write a batch of updates to
829    /// blob storage which can then be appended to this shard using
830    /// [Self::compare_and_append_batch] or [Self::append_batch].
831    ///
832    /// It is correct to create an empty batch, which allows for downgrading
833    /// `upper` to communicate progress. (see [Self::compare_and_append_batch]
834    /// or [Self::append_batch])
835    ///
836    /// The builder uses a bounded amount of memory, even when the number of
837    /// updates is very large. Individual records, however, should be small
838    /// enough that we can reasonably chunk them up: O(KB) is definitely fine,
839    /// O(MB) come talk to us.
840    pub fn builder(&self, lower: Antichain<T>) -> BatchBuilder<K, V, T, D> {
841        Self::builder_inner(
842            &self.cfg,
843            CompactConfig::new(&self.cfg, self.shard_id()),
844            Arc::clone(&self.metrics),
845            Arc::clone(&self.machine.applier.shard_metrics),
846            &self.metrics.user,
847            Arc::clone(&self.isolated_runtime),
848            Arc::clone(&self.blob),
849            self.shard_id(),
850            self.write_schemas.clone(),
851            lower,
852        )
853    }
854
855    /// Implementation of [Self::builder], so that we can share the
856    /// implementation in `PersistClient`.
857    pub(crate) fn builder_inner(
858        persist_cfg: &PersistConfig,
859        compact_cfg: CompactConfig,
860        metrics: Arc<Metrics>,
861        shard_metrics: Arc<ShardMetrics>,
862        user_batch_metrics: &BatchWriteMetrics,
863        isolated_runtime: Arc<IsolatedRuntime>,
864        blob: Arc<dyn Blob>,
865        shard_id: ShardId,
866        schemas: Schemas<K, V>,
867        lower: Antichain<T>,
868    ) -> BatchBuilder<K, V, T, D> {
869        let parts = if let Some(max_runs) = compact_cfg.batch.max_runs {
870            BatchParts::new_compacting::<K, V, D>(
871                compact_cfg,
872                Description::new(
873                    lower.clone(),
874                    Antichain::new(),
875                    Antichain::from_elem(T::minimum()),
876                ),
877                max_runs,
878                Arc::clone(&metrics),
879                shard_metrics,
880                shard_id,
881                Arc::clone(&blob),
882                isolated_runtime,
883                user_batch_metrics,
884                schemas.clone(),
885            )
886        } else {
887            BatchParts::new_ordered::<D>(
888                compact_cfg.batch,
889                RunOrder::Unordered,
890                Arc::clone(&metrics),
891                shard_metrics,
892                shard_id,
893                Arc::clone(&blob),
894                isolated_runtime,
895                user_batch_metrics,
896            )
897        };
898        let builder = BatchBuilderInternal::new(
899            BatchBuilderConfig::new(persist_cfg, shard_id),
900            parts,
901            metrics,
902            schemas,
903            blob,
904            shard_id,
905            persist_cfg.build_version.clone(),
906        );
907        BatchBuilder::new(
908            builder,
909            Description::new(lower, Antichain::new(), Antichain::from_elem(T::minimum())),
910        )
911    }
912
913    /// Uploads the given `updates` as one `Batch` to the blob store and returns
914    /// a handle to the batch.
915    #[instrument(level = "trace", fields(shard = %self.machine.shard_id()))]
916    pub async fn batch<SB, KB, VB, TB, DB, I>(
917        &mut self,
918        updates: I,
919        lower: Antichain<T>,
920        upper: Antichain<T>,
921    ) -> Result<Batch<K, V, T, D>, InvalidUsage<T>>
922    where
923        SB: Borrow<((KB, VB), TB, DB)>,
924        KB: Borrow<K>,
925        VB: Borrow<V>,
926        TB: Borrow<T>,
927        DB: Borrow<D>,
928        I: IntoIterator<Item = SB>,
929    {
930        let iter = updates.into_iter();
931
932        let mut builder = self.builder(lower.clone());
933
934        for update in iter {
935            let ((k, v), t, d) = update.borrow();
936            let (k, v, t, d) = (k.borrow(), v.borrow(), t.borrow(), d.borrow());
937            match builder.add(k, v, t, d).await {
938                Ok(Added::Record | Added::RecordAndParts) => (),
939                Err(invalid_usage) => return Err(invalid_usage),
940            }
941        }
942
943        builder.finish(upper.clone()).await
944    }
945
946    /// Blocks until the given `frontier` is less than the upper of the shard.
947    pub async fn wait_for_upper_past(&mut self, frontier: &Antichain<T>) {
948        let mut watch = self.machine.applier.watch();
949        let batch = self
950            .machine
951            .next_listen_batch(frontier, &mut watch, None, None)
952            .await;
953        if PartialOrder::less_than(&self.upper, batch.desc.upper()) {
954            self.upper.clone_from(batch.desc.upper());
955        }
956        assert!(PartialOrder::less_than(frontier, &self.upper));
957    }
958
959    /// Politely expires this writer, releasing any associated state.
960    ///
961    /// There is a best-effort impl in Drop to expire a writer that wasn't
962    /// explictly expired with this method. When possible, explicit expiry is
963    /// still preferred because the Drop one is best effort and is dependant on
964    /// a tokio [Handle] being available in the TLC at the time of drop (which
965    /// is a bit subtle). Also, explicit expiry allows for control over when it
966    /// happens.
967    #[instrument(level = "debug", fields(shard = %self.machine.shard_id()))]
968    pub async fn expire(mut self) {
969        let Some(expire_fn) = self.expire_fn.take() else {
970            return;
971        };
972        expire_fn.0().await;
973    }
974
975    fn expire_fn(
976        machine: Machine<K, V, T, D>,
977        gc: GarbageCollector<K, V, T, D>,
978        writer_id: WriterId,
979    ) -> ExpireFn {
980        ExpireFn(Box::new(move || {
981            Box::pin(async move {
982                let (_, maintenance) = machine.expire_writer(&writer_id).await;
983                maintenance.start_performing(&machine, &gc);
984            })
985        }))
986    }
987
988    /// Test helper for an [Self::append] call that is expected to succeed.
989    #[cfg(test)]
990    #[track_caller]
991    pub async fn expect_append<L, U>(&mut self, updates: &[((K, V), T, D)], lower: L, new_upper: U)
992    where
993        L: Into<Antichain<T>>,
994        U: Into<Antichain<T>>,
995        D: Send + Sync,
996    {
997        self.append(updates.iter(), lower.into(), new_upper.into())
998            .await
999            .expect("invalid usage")
1000            .expect("unexpected upper");
1001    }
1002
1003    /// Test helper for a [Self::compare_and_append] call that is expected to
1004    /// succeed.
1005    #[cfg(test)]
1006    #[track_caller]
1007    pub async fn expect_compare_and_append(
1008        &mut self,
1009        updates: &[((K, V), T, D)],
1010        expected_upper: T,
1011        new_upper: T,
1012    ) where
1013        D: Send + Sync,
1014    {
1015        self.compare_and_append(
1016            updates.iter().map(|((k, v), t, d)| ((k, v), t, d)),
1017            Antichain::from_elem(expected_upper),
1018            Antichain::from_elem(new_upper),
1019        )
1020        .await
1021        .expect("invalid usage")
1022        .expect("unexpected upper")
1023    }
1024
1025    /// Test helper for a [Self::compare_and_append_batch] call that is expected
1026    /// to succeed.
1027    #[cfg(test)]
1028    #[track_caller]
1029    pub async fn expect_compare_and_append_batch(
1030        &mut self,
1031        batches: &mut [&mut Batch<K, V, T, D>],
1032        expected_upper: T,
1033        new_upper: T,
1034    ) {
1035        self.compare_and_append_batch(
1036            batches,
1037            Antichain::from_elem(expected_upper),
1038            Antichain::from_elem(new_upper),
1039            true,
1040        )
1041        .await
1042        .expect("invalid usage")
1043        .expect("unexpected upper")
1044    }
1045
1046    /// Test helper for an [Self::append] call that is expected to succeed.
1047    #[cfg(test)]
1048    #[track_caller]
1049    pub async fn expect_batch(
1050        &mut self,
1051        updates: &[((K, V), T, D)],
1052        lower: T,
1053        upper: T,
1054    ) -> Batch<K, V, T, D> {
1055        self.batch(
1056            updates.iter(),
1057            Antichain::from_elem(lower),
1058            Antichain::from_elem(upper),
1059        )
1060        .await
1061        .expect("invalid usage")
1062    }
1063}
1064
1065impl<K: Codec, V: Codec, T, D> Drop for WriteHandle<K, V, T, D> {
1066    fn drop(&mut self) {
1067        let Some(expire_fn) = self.expire_fn.take() else {
1068            return;
1069        };
1070        let handle = match Handle::try_current() {
1071            Ok(x) => x,
1072            Err(_) => {
1073                warn!(
1074                    "WriteHandle {} dropped without being explicitly expired, falling back to lease timeout",
1075                    self.writer_id
1076                );
1077                return;
1078            }
1079        };
1080        // Spawn a best-effort task to expire this write handle. It's fine if
1081        // this doesn't run to completion, we'd just have to wait out the lease
1082        // before the shard-global since is unblocked.
1083        //
1084        // Intentionally create the span outside the task to set the parent.
1085        let expire_span = debug_span!("drop::expire");
1086        handle.spawn_named(
1087            || format!("WriteHandle::expire ({})", self.writer_id),
1088            expire_fn.0().instrument(expire_span),
1089        );
1090    }
1091}
1092
1093/// Ensure the given batch uses the given schema ID.
1094///
1095/// If the batch has no schema set, initialize it to the given one.
1096/// If the batch has a schema set, assert that it matches the given one.
1097fn ensure_batch_schema<T>(batch: &mut HollowBatch<T>, shard_id: ShardId, schema_id: SchemaId)
1098where
1099    T: Timestamp + Lattice + Codec64,
1100{
1101    let ensure = |id: &mut Option<SchemaId>| match id {
1102        Some(id) => assert_eq!(*id, schema_id, "schema ID mismatch; shard={shard_id}"),
1103        None => *id = Some(schema_id),
1104    };
1105
1106    for run_meta in &mut batch.run_meta {
1107        ensure(&mut run_meta.schema);
1108    }
1109    for part in &mut batch.parts {
1110        match part {
1111            RunPart::Single(BatchPart::Hollow(part)) => ensure(&mut part.schema_id),
1112            RunPart::Single(BatchPart::Inline { schema_id, .. }) => ensure(schema_id),
1113            RunPart::Many(_hollow_run_ref) => {
1114                // TODO: Fetch the parts in this run and rewrite them too. Alternatively, make
1115                // `run_meta` the only place we keep schema IDs, so rewriting parts isn't
1116                // necessary.
1117            }
1118        }
1119    }
1120}
1121
1122#[cfg(test)]
1123mod tests {
1124    use std::str::FromStr;
1125    use std::sync::mpsc;
1126
1127    use differential_dataflow::consolidation::consolidate_updates;
1128    use futures_util::FutureExt;
1129    use mz_dyncfg::ConfigUpdates;
1130    use mz_ore::collections::CollectionExt;
1131    use mz_ore::task;
1132    use serde_json::json;
1133
1134    use crate::cache::PersistClientCache;
1135    use crate::tests::{all_ok, new_test_client};
1136    use crate::{PersistLocation, ShardId};
1137
1138    use super::*;
1139
1140    #[mz_persist_proc::test(tokio::test)]
1141    #[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented
1142    async fn empty_batches(dyncfgs: ConfigUpdates) {
1143        let data = [
1144            (("1".to_owned(), "one".to_owned()), 1, 1),
1145            (("2".to_owned(), "two".to_owned()), 2, 1),
1146            (("3".to_owned(), "three".to_owned()), 3, 1),
1147        ];
1148
1149        let (mut write, _) = new_test_client(&dyncfgs)
1150            .await
1151            .expect_open::<String, String, u64, i64>(ShardId::new())
1152            .await;
1153        let blob = Arc::clone(&write.blob);
1154
1155        // Write an initial batch.
1156        let mut upper = 3;
1157        write.expect_append(&data[..2], vec![0], vec![upper]).await;
1158
1159        // Write a bunch of empty batches. This shouldn't write blobs, so the count should stay the same.
1160        let mut count_before = 0;
1161        blob.list_keys_and_metadata("", &mut |_| {
1162            count_before += 1;
1163        })
1164        .await
1165        .expect("list_keys failed");
1166        for _ in 0..5 {
1167            let new_upper = upper + 1;
1168            write.expect_compare_and_append(&[], upper, new_upper).await;
1169            upper = new_upper;
1170        }
1171        let mut count_after = 0;
1172        blob.list_keys_and_metadata("", &mut |_| {
1173            count_after += 1;
1174        })
1175        .await
1176        .expect("list_keys failed");
1177        assert_eq!(count_after, count_before);
1178    }
1179
1180    #[mz_persist_proc::test(tokio::test)]
1181    #[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented
1182    async fn compare_and_append_batch_multi(dyncfgs: ConfigUpdates) {
1183        let data0 = vec![
1184            (("1".to_owned(), "one".to_owned()), 1, 1),
1185            (("2".to_owned(), "two".to_owned()), 2, 1),
1186            (("4".to_owned(), "four".to_owned()), 4, 1),
1187        ];
1188        let data1 = vec![
1189            (("1".to_owned(), "one".to_owned()), 1, 1),
1190            (("2".to_owned(), "two".to_owned()), 2, 1),
1191            (("3".to_owned(), "three".to_owned()), 3, 1),
1192        ];
1193
1194        let (mut write, mut read) = new_test_client(&dyncfgs)
1195            .await
1196            .expect_open::<String, String, u64, i64>(ShardId::new())
1197            .await;
1198
1199        let mut batch0 = write.expect_batch(&data0, 0, 5).await;
1200        let mut batch1 = write.expect_batch(&data1, 0, 4).await;
1201
1202        write
1203            .expect_compare_and_append_batch(&mut [&mut batch0, &mut batch1], 0, 4)
1204            .await;
1205
1206        let batch = write
1207            .machine
1208            .snapshot(&Antichain::from_elem(3))
1209            .await
1210            .expect("just wrote this")
1211            .into_element();
1212
1213        assert!(batch.runs().count() >= 2);
1214
1215        let expected = vec![
1216            (("1".to_owned(), "one".to_owned()), 1, 2),
1217            (("2".to_owned(), "two".to_owned()), 2, 2),
1218            (("3".to_owned(), "three".to_owned()), 3, 1),
1219        ];
1220        let mut actual = read.expect_snapshot_and_fetch(3).await;
1221        consolidate_updates(&mut actual);
1222        assert_eq!(actual, all_ok(&expected, 3));
1223    }
1224
1225    #[mz_ore::test]
1226    fn writer_id_human_readable_serde() {
1227        #[derive(Debug, Serialize, Deserialize)]
1228        struct Container {
1229            writer_id: WriterId,
1230        }
1231
1232        // roundtrip through json
1233        let id = WriterId::from_str("w00000000-1234-5678-0000-000000000000").expect("valid id");
1234        assert_eq!(
1235            id,
1236            serde_json::from_value(serde_json::to_value(id.clone()).expect("serializable"))
1237                .expect("deserializable")
1238        );
1239
1240        // deserialize a serialized string directly
1241        assert_eq!(
1242            id,
1243            serde_json::from_str("\"w00000000-1234-5678-0000-000000000000\"")
1244                .expect("deserializable")
1245        );
1246
1247        // roundtrip id through a container type
1248        let json = json!({ "writer_id": id });
1249        assert_eq!(
1250            "{\"writer_id\":\"w00000000-1234-5678-0000-000000000000\"}",
1251            &json.to_string()
1252        );
1253        let container: Container = serde_json::from_value(json).expect("deserializable");
1254        assert_eq!(container.writer_id, id);
1255    }
1256
1257    #[mz_persist_proc::test(tokio::test)]
1258    #[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented
1259    async fn hollow_batch_roundtrip(dyncfgs: ConfigUpdates) {
1260        let data = vec![
1261            (("1".to_owned(), "one".to_owned()), 1, 1),
1262            (("2".to_owned(), "two".to_owned()), 2, 1),
1263            (("3".to_owned(), "three".to_owned()), 3, 1),
1264        ];
1265
1266        let (mut write, mut read) = new_test_client(&dyncfgs)
1267            .await
1268            .expect_open::<String, String, u64, i64>(ShardId::new())
1269            .await;
1270
1271        // This test is a bit more complex than it should be. It would be easier
1272        // if we could just compare the rehydrated batch to the original batch.
1273        // But a) turning a batch into a hollow batch consumes it, and b) Batch
1274        // doesn't have Eq/PartialEq.
1275        let batch = write.expect_batch(&data, 0, 4).await;
1276        let hollow_batch = batch.into_transmittable_batch();
1277        let mut rehydrated_batch = write.batch_from_transmittable_batch(hollow_batch);
1278
1279        write
1280            .expect_compare_and_append_batch(&mut [&mut rehydrated_batch], 0, 4)
1281            .await;
1282
1283        let expected = vec![
1284            (("1".to_owned(), "one".to_owned()), 1, 1),
1285            (("2".to_owned(), "two".to_owned()), 2, 1),
1286            (("3".to_owned(), "three".to_owned()), 3, 1),
1287        ];
1288        let mut actual = read.expect_snapshot_and_fetch(3).await;
1289        consolidate_updates(&mut actual);
1290        assert_eq!(actual, all_ok(&expected, 3));
1291    }
1292
1293    #[mz_persist_proc::test(tokio::test)]
1294    #[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented
1295    async fn wait_for_upper_past(dyncfgs: ConfigUpdates) {
1296        let client = new_test_client(&dyncfgs).await;
1297        let (mut write, _) = client.expect_open::<(), (), u64, i64>(ShardId::new()).await;
1298        let five = Antichain::from_elem(5);
1299
1300        // Upper is not past 5.
1301        assert_eq!(write.wait_for_upper_past(&five).now_or_never(), None);
1302
1303        // Upper is still not past 5.
1304        write
1305            .expect_compare_and_append(&[(((), ()), 1, 1)], 0, 5)
1306            .await;
1307        assert_eq!(write.wait_for_upper_past(&five).now_or_never(), None);
1308
1309        // Upper is past 5.
1310        write
1311            .expect_compare_and_append(&[(((), ()), 5, 1)], 5, 7)
1312            .await;
1313        assert_eq!(write.wait_for_upper_past(&five).now_or_never(), Some(()));
1314        assert_eq!(write.upper(), &Antichain::from_elem(7));
1315
1316        // Waiting for previous uppers does not regress the handle's cached
1317        // upper.
1318        assert_eq!(
1319            write
1320                .wait_for_upper_past(&Antichain::from_elem(2))
1321                .now_or_never(),
1322            Some(())
1323        );
1324        assert_eq!(write.upper(), &Antichain::from_elem(7));
1325    }
1326
1327    #[mz_ore::test(tokio::test(flavor = "multi_thread"))]
1328    #[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented
1329    async fn fetch_recent_upper_linearized() {
1330        type Timestamp = u64;
1331        let max_upper = 1000;
1332
1333        let shard_id = ShardId::new();
1334        let mut clients = PersistClientCache::new_no_metrics();
1335        let upper_writer_client = clients.open(PersistLocation::new_in_mem()).await.unwrap();
1336        let (mut upper_writer, _) = upper_writer_client
1337            .expect_open::<(), (), Timestamp, i64>(shard_id)
1338            .await;
1339        // Clear the state cache between each client to maximally disconnect
1340        // them from each other.
1341        clients.clear_state_cache();
1342        let upper_reader_client = clients.open(PersistLocation::new_in_mem()).await.unwrap();
1343        let (mut upper_reader, _) = upper_reader_client
1344            .expect_open::<(), (), Timestamp, i64>(shard_id)
1345            .await;
1346        let (tx, rx) = mpsc::channel();
1347
1348        let task = task::spawn(|| "upper-reader", async move {
1349            let mut upper = Timestamp::MIN;
1350
1351            while upper < max_upper {
1352                while let Ok(new_upper) = rx.try_recv() {
1353                    upper = new_upper;
1354                }
1355
1356                let recent_upper = upper_reader
1357                    .fetch_recent_upper()
1358                    .await
1359                    .as_option()
1360                    .cloned()
1361                    .expect("u64 is totally ordered and the shard is not finalized");
1362                assert!(
1363                    recent_upper >= upper,
1364                    "recent upper {recent_upper:?} is less than known upper {upper:?}"
1365                );
1366            }
1367        });
1368
1369        for upper in Timestamp::MIN..max_upper {
1370            let next_upper = upper + 1;
1371            upper_writer
1372                .expect_compare_and_append(&[], upper, next_upper)
1373                .await;
1374            tx.send(next_upper).expect("send failed");
1375        }
1376
1377        task.await.expect("await failed");
1378    }
1379}