Skip to main content

mz_persist_client/
write.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Write capabilities and handles
11
12use std::borrow::Borrow;
13use std::fmt::Debug;
14use std::sync::Arc;
15
16use differential_dataflow::difference::Monoid;
17use differential_dataflow::lattice::Lattice;
18use differential_dataflow::trace::Description;
19use futures::StreamExt;
20use futures::stream::FuturesUnordered;
21use mz_dyncfg::Config;
22use mz_ore::task::RuntimeExt;
23use mz_ore::{instrument, soft_panic_or_log};
24use mz_persist::location::Blob;
25use mz_persist_types::schema::SchemaId;
26use mz_persist_types::{Codec, Codec64};
27use mz_proto::{IntoRustIfSome, ProtoType};
28use proptest_derive::Arbitrary;
29use semver::Version;
30use serde::{Deserialize, Serialize};
31use timely::PartialOrder;
32use timely::order::TotalOrder;
33use timely::progress::{Antichain, Timestamp};
34use tokio::runtime::Handle;
35use tracing::{Instrument, debug_span, error, info, warn};
36use uuid::Uuid;
37
38use crate::batch::{
39    Added, BATCH_DELETE_ENABLED, Batch, BatchBuilder, BatchBuilderConfig, BatchBuilderInternal,
40    BatchParts, ProtoBatch, validate_truncate_batch,
41};
42use crate::error::{InvalidUsage, UpperMismatch};
43use crate::fetch::{
44    EncodedPart, FetchBatchFilter, FetchedPart, PartDecodeFormat, VALIDATE_PART_BOUNDS_ON_READ,
45};
46use crate::internal::compact::{CompactConfig, Compactor};
47use crate::internal::encoding::{Schemas, assert_code_can_read_data};
48use crate::internal::machine::{CompareAndAppendRes, ExpireFn, Machine};
49use crate::internal::metrics::{BatchWriteMetrics, Metrics, ShardMetrics};
50use crate::internal::state::{BatchPart, HandleDebugState, HollowBatch, RunOrder, RunPart};
51use crate::read::ReadHandle;
52use crate::schema::PartMigration;
53use crate::{GarbageCollector, IsolatedRuntime, PersistConfig, ShardId, parse_id};
54
55pub(crate) const COMBINE_INLINE_WRITES: Config<bool> = Config::new(
56    "persist_write_combine_inline_writes",
57    true,
58    "If set, re-encode inline writes if they don't fit into the batch metadata limits.",
59);
60
61pub(crate) const VALIDATE_PART_BOUNDS_ON_WRITE: Config<bool> = Config::new(
62    "persist_validate_part_bounds_on_write",
63    false,
64    "Validate the part lower <= the batch lower and the part upper <= batch upper,\
65    for the batch being appended.",
66);
67
68/// An opaque identifier for a writer of a persist durable TVC (aka shard).
69#[derive(
70    Arbitrary,
71    Clone,
72    PartialEq,
73    Eq,
74    PartialOrd,
75    Ord,
76    Hash,
77    Serialize,
78    Deserialize
79)]
80#[serde(try_from = "String", into = "String")]
81pub struct WriterId(pub(crate) [u8; 16]);
82
83impl std::fmt::Display for WriterId {
84    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
85        write!(f, "w{}", Uuid::from_bytes(self.0))
86    }
87}
88
89impl std::fmt::Debug for WriterId {
90    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
91        write!(f, "WriterId({})", Uuid::from_bytes(self.0))
92    }
93}
94
95impl std::str::FromStr for WriterId {
96    type Err = String;
97
98    fn from_str(s: &str) -> Result<Self, Self::Err> {
99        parse_id("w", "WriterId", s).map(WriterId)
100    }
101}
102
103impl From<WriterId> for String {
104    fn from(writer_id: WriterId) -> Self {
105        writer_id.to_string()
106    }
107}
108
109impl TryFrom<String> for WriterId {
110    type Error = String;
111
112    fn try_from(s: String) -> Result<Self, Self::Error> {
113        s.parse()
114    }
115}
116
117impl WriterId {
118    pub(crate) fn new() -> Self {
119        WriterId(*Uuid::new_v4().as_bytes())
120    }
121}
122
123/// A "capability" granting the ability to apply updates to some shard at times
124/// greater or equal to `self.upper()`.
125///
126/// All async methods on ReadHandle retry for as long as they are able, but the
127/// returned [std::future::Future]s implement "cancel on drop" semantics. This
128/// means that callers can add a timeout using [tokio::time::timeout] or
129/// [tokio::time::timeout_at].
130///
131/// ```rust,no_run
132/// # let mut write: mz_persist_client::write::WriteHandle<String, String, u64, i64> = unimplemented!();
133/// # let timeout: std::time::Duration = unimplemented!();
134/// # async {
135/// tokio::time::timeout(timeout, write.fetch_recent_upper()).await
136/// # };
137/// ```
138#[derive(Debug)]
139pub struct WriteHandle<K: Codec, V: Codec, T, D> {
140    pub(crate) cfg: PersistConfig,
141    pub(crate) metrics: Arc<Metrics>,
142    pub(crate) machine: Machine<K, V, T, D>,
143    pub(crate) gc: GarbageCollector<K, V, T, D>,
144    pub(crate) compact: Option<Compactor<K, V, T, D>>,
145    pub(crate) blob: Arc<dyn Blob>,
146    pub(crate) isolated_runtime: Arc<IsolatedRuntime>,
147    pub(crate) writer_id: WriterId,
148    pub(crate) debug_state: HandleDebugState,
149    pub(crate) write_schemas: Schemas<K, V>,
150
151    pub(crate) upper: Antichain<T>,
152    expire_fn: Option<ExpireFn>,
153}
154
155impl<K, V, T, D> WriteHandle<K, V, T, D>
156where
157    K: Debug + Codec,
158    V: Debug + Codec,
159    T: Timestamp + TotalOrder + Lattice + Codec64 + Sync,
160    D: Monoid + Ord + Codec64 + Send + Sync,
161{
162    pub(crate) fn new(
163        cfg: PersistConfig,
164        metrics: Arc<Metrics>,
165        machine: Machine<K, V, T, D>,
166        gc: GarbageCollector<K, V, T, D>,
167        blob: Arc<dyn Blob>,
168        writer_id: WriterId,
169        purpose: &str,
170        write_schemas: Schemas<K, V>,
171    ) -> Self {
172        let isolated_runtime = Arc::clone(&machine.isolated_runtime);
173        let compact = cfg
174            .compaction_enabled
175            .then(|| Compactor::new(cfg.clone(), Arc::clone(&metrics), gc.clone()));
176        let debug_state = HandleDebugState {
177            hostname: cfg.hostname.to_owned(),
178            purpose: purpose.to_owned(),
179        };
180        let upper = machine.applier.clone_upper();
181        let expire_fn = Self::expire_fn(machine.clone(), gc.clone(), writer_id.clone());
182        WriteHandle {
183            cfg,
184            metrics,
185            machine,
186            gc,
187            compact,
188            blob,
189            isolated_runtime,
190            writer_id,
191            debug_state,
192            write_schemas,
193            upper,
194            expire_fn: Some(expire_fn),
195        }
196    }
197
198    /// Creates a [WriteHandle] for the same shard from an existing
199    /// [ReadHandle].
200    pub fn from_read(read: &ReadHandle<K, V, T, D>, purpose: &str) -> Self {
201        Self::new(
202            read.cfg.clone(),
203            Arc::clone(&read.metrics),
204            read.machine.clone(),
205            read.gc.clone(),
206            Arc::clone(&read.blob),
207            WriterId::new(),
208            purpose,
209            read.read_schemas.clone(),
210        )
211    }
212
213    /// True iff this WriteHandle supports writing without enforcing batch
214    /// bounds checks.
215    pub fn validate_part_bounds_on_write(&self) -> bool {
216        // Note that we require validation when the read checks are enabled, even if the write-time
217        // checks would otherwise be disabled, to avoid batches that would fail at read time.
218        VALIDATE_PART_BOUNDS_ON_WRITE.get(&self.cfg) || VALIDATE_PART_BOUNDS_ON_READ.get(&self.cfg)
219    }
220
221    /// This handle's shard id.
222    pub fn shard_id(&self) -> ShardId {
223        self.machine.shard_id()
224    }
225
226    /// Returns the schema of this writer.
227    pub fn schema_id(&self) -> Option<SchemaId> {
228        self.write_schemas.id
229    }
230
231    /// Registers the write schema, if it isn't already registered.
232    ///
233    /// This method expects that either the shard doesn't yet have any schema registered, or one of
234    /// the registered schemas is the same as the write schema. If all registered schemas are
235    /// different from the write schema, or the shard is a tombstone, it returns `None`.
236    pub async fn try_register_schema(&mut self) -> Option<SchemaId> {
237        let Schemas { id, key, val } = &self.write_schemas;
238
239        if let Some(id) = id {
240            return Some(*id);
241        }
242
243        let (schema_id, maintenance) = self.machine.register_schema(key, val).await;
244        maintenance.start_performing(&self.machine, &self.gc);
245
246        self.write_schemas.id = schema_id;
247        schema_id
248    }
249
250    /// A cached version of the shard-global `upper` frontier.
251    ///
252    /// This is the most recent upper discovered by this handle. It is
253    /// potentially more stale than [Self::shared_upper] but is lock-free and
254    /// allocation-free. This will always be less or equal to the shard-global
255    /// `upper`.
256    pub fn upper(&self) -> &Antichain<T> {
257        &self.upper
258    }
259
260    /// A less-stale cached version of the shard-global `upper` frontier.
261    ///
262    /// This is the most recently known upper for this shard process-wide, but
263    /// unlike [Self::upper] it requires a mutex and a clone. This will always be
264    /// less or equal to the shard-global `upper`.
265    pub fn shared_upper(&self) -> Antichain<T> {
266        self.machine.applier.clone_upper()
267    }
268
269    /// Fetches and returns a recent shard-global `upper`. Importantly, this operation is
270    /// linearized with write operations.
271    ///
272    /// This requires fetching the latest state from consensus and is therefore a potentially
273    /// expensive operation.
274    #[instrument(level = "debug", fields(shard = %self.machine.shard_id()))]
275    pub async fn fetch_recent_upper(&mut self) -> &Antichain<T> {
276        // TODO: Do we even need to track self.upper on WriteHandle or could
277        // WriteHandle::upper just get the one out of machine?
278        self.machine
279            .applier
280            .fetch_upper(|current_upper| self.upper.clone_from(current_upper))
281            .await;
282        &self.upper
283    }
284
285    /// Advance the shard's upper by the given frontier.
286    ///
287    /// If the provided `target` is less than or equal to the shard's upper, this is a no-op.
288    ///
289    /// In contrast to the various compare-and-append methods, this method does not require the
290    /// handle's write schema to be registered with the shard. That is, it is fine to use a dummy
291    /// schema when creating a writer just to advance a shard upper.
292    pub async fn advance_upper(&mut self, target: &Antichain<T>) {
293        // We avoid `fetch_recent_upper` here, to avoid a consensus roundtrip if the known upper is
294        // already beyond the target.
295        let mut lower = self.shared_upper().clone();
296
297        while !PartialOrder::less_equal(target, &lower) {
298            let since = Antichain::from_elem(T::minimum());
299            let desc = Description::new(lower.clone(), target.clone(), since);
300            let batch = HollowBatch::empty(desc);
301
302            let heartbeat_timestamp = (self.cfg.now)();
303            let res = self
304                .machine
305                .compare_and_append(
306                    &batch,
307                    &self.writer_id,
308                    &self.debug_state,
309                    heartbeat_timestamp,
310                )
311                .await;
312
313            use CompareAndAppendRes::*;
314            let new_upper = match res {
315                Success(_seq_no, maintenance) => {
316                    maintenance.start_performing(&self.machine, &self.gc, self.compact.as_ref());
317                    batch.desc.upper().clone()
318                }
319                UpperMismatch(_seq_no, actual_upper) => actual_upper,
320                InvalidUsage(_invalid_usage) => unreachable!("batch bounds checked above"),
321                InlineBackpressure => unreachable!("batch was empty"),
322            };
323
324            self.upper.clone_from(&new_upper);
325            lower = new_upper;
326        }
327    }
328
329    /// Applies `updates` to this shard and downgrades this handle's upper to
330    /// `upper`.
331    ///
332    /// The innermost `Result` is `Ok` if the updates were successfully written.
333    /// If not, an `Upper` err containing the current writer upper is returned.
334    /// If that happens, we also update our local `upper` to match the current
335    /// upper. This is useful in cases where a timeout happens in between a
336    /// successful write and returning that to the client.
337    ///
338    /// In contrast to [Self::compare_and_append], multiple [WriteHandle]s may
339    /// be used concurrently to write to the same shard, but in this case, the
340    /// data being written must be identical (in the sense of "definite"-ness).
341    /// It's intended for replicated use by source ingestion, sinks, etc.
342    ///
343    /// All times in `updates` must be greater or equal to `lower` and not
344    /// greater or equal to `upper`. A `upper` of the empty antichain "finishes"
345    /// this shard, promising that no more data is ever incoming.
346    ///
347    /// `updates` may be empty, which allows for downgrading `upper` to
348    /// communicate progress. It is possible to call this with `upper` equal to
349    /// `self.upper()` and an empty `updates` (making the call a no-op).
350    ///
351    /// This uses a bounded amount of memory, even when `updates` is very large.
352    /// Individual records, however, should be small enough that we can
353    /// reasonably chunk them up: O(KB) is definitely fine, O(MB) come talk to
354    /// us.
355    ///
356    /// The clunky multi-level Result is to enable more obvious error handling
357    /// in the caller. See <http://sled.rs/errors.html> for details.
358    #[instrument(level = "trace", fields(shard = %self.machine.shard_id()))]
359    pub async fn append<SB, KB, VB, TB, DB, I>(
360        &mut self,
361        updates: I,
362        lower: Antichain<T>,
363        upper: Antichain<T>,
364    ) -> Result<Result<(), UpperMismatch<T>>, InvalidUsage<T>>
365    where
366        SB: Borrow<((KB, VB), TB, DB)>,
367        KB: Borrow<K>,
368        VB: Borrow<V>,
369        TB: Borrow<T>,
370        DB: Borrow<D>,
371        I: IntoIterator<Item = SB>,
372        D: Send + Sync,
373    {
374        let batch = self.batch(updates, lower.clone(), upper.clone()).await?;
375        self.append_batch(batch, lower, upper).await
376    }
377
378    /// Applies `updates` to this shard and downgrades this handle's upper to
379    /// `new_upper` iff the current global upper of this shard is
380    /// `expected_upper`.
381    ///
382    /// The innermost `Result` is `Ok` if the updates were successfully written.
383    /// If not, an `Upper` err containing the current global upper is returned.
384    ///
385    /// In contrast to [Self::append], this linearizes mutations from all
386    /// writers. It's intended for use as an atomic primitive for timestamp
387    /// bindings, SQL tables, etc.
388    ///
389    /// All times in `updates` must be greater or equal to `expected_upper` and
390    /// not greater or equal to `new_upper`. A `new_upper` of the empty
391    /// antichain "finishes" this shard, promising that no more data is ever
392    /// incoming.
393    ///
394    /// `updates` may be empty, which allows for downgrading `upper` to
395    /// communicate progress. It is possible to heartbeat a writer lease by
396    /// calling this with `new_upper` equal to `self.upper()` and an empty
397    /// `updates` (making the call a no-op).
398    ///
399    /// This uses a bounded amount of memory, even when `updates` is very large.
400    /// Individual records, however, should be small enough that we can
401    /// reasonably chunk them up: O(KB) is definitely fine, O(MB) come talk to
402    /// us.
403    ///
404    /// The clunky multi-level Result is to enable more obvious error handling
405    /// in the caller. See <http://sled.rs/errors.html> for details.
406    #[instrument(level = "trace", fields(shard = %self.machine.shard_id()))]
407    pub async fn compare_and_append<SB, KB, VB, TB, DB, I>(
408        &mut self,
409        updates: I,
410        expected_upper: Antichain<T>,
411        new_upper: Antichain<T>,
412    ) -> Result<Result<(), UpperMismatch<T>>, InvalidUsage<T>>
413    where
414        SB: Borrow<((KB, VB), TB, DB)>,
415        KB: Borrow<K>,
416        VB: Borrow<V>,
417        TB: Borrow<T>,
418        DB: Borrow<D>,
419        I: IntoIterator<Item = SB>,
420        D: Send + Sync,
421    {
422        let mut batch = self
423            .batch(updates, expected_upper.clone(), new_upper.clone())
424            .await?;
425        match self
426            .compare_and_append_batch(&mut [&mut batch], expected_upper, new_upper, true)
427            .await
428        {
429            ok @ Ok(Ok(())) => ok,
430            err => {
431                // We cannot delete the batch in compare_and_append_batch()
432                // because the caller owns the batch and might want to retry
433                // with a different `expected_upper`. In this function, we
434                // control the batch, so we have to delete it.
435                batch.delete().await;
436                err
437            }
438        }
439    }
440
441    /// Appends the batch of updates to the shard and downgrades this handle's
442    /// upper to `upper`.
443    ///
444    /// The innermost `Result` is `Ok` if the updates were successfully written.
445    /// If not, an `Upper` err containing the current writer upper is returned.
446    /// If that happens, we also update our local `upper` to match the current
447    /// upper. This is useful in cases where a timeout happens in between a
448    /// successful write and returning that to the client.
449    ///
450    /// In contrast to [Self::compare_and_append_batch], multiple [WriteHandle]s
451    /// may be used concurrently to write to the same shard, but in this case,
452    /// the data being written must be identical (in the sense of
453    /// "definite"-ness). It's intended for replicated use by source ingestion,
454    /// sinks, etc.
455    ///
456    /// A `upper` of the empty antichain "finishes" this shard, promising that
457    /// no more data is ever incoming.
458    ///
459    /// The batch may be empty, which allows for downgrading `upper` to
460    /// communicate progress. It is possible to heartbeat a writer lease by
461    /// calling this with `upper` equal to `self.upper()` and an empty `updates`
462    /// (making the call a no-op).
463    ///
464    /// The clunky multi-level Result is to enable more obvious error handling
465    /// in the caller. See <http://sled.rs/errors.html> for details.
466    #[instrument(level = "trace", fields(shard = %self.machine.shard_id()))]
467    pub async fn append_batch(
468        &mut self,
469        mut batch: Batch<K, V, T, D>,
470        mut lower: Antichain<T>,
471        upper: Antichain<T>,
472    ) -> Result<Result<(), UpperMismatch<T>>, InvalidUsage<T>>
473    where
474        D: Send + Sync,
475    {
476        loop {
477            let res = self
478                .compare_and_append_batch(&mut [&mut batch], lower.clone(), upper.clone(), true)
479                .await?;
480            match res {
481                Ok(()) => {
482                    self.upper = upper;
483                    return Ok(Ok(()));
484                }
485                Err(mismatch) => {
486                    // We tried to to a non-contiguous append, that won't work.
487                    if PartialOrder::less_than(&mismatch.current, &lower) {
488                        self.upper.clone_from(&mismatch.current);
489
490                        batch.delete().await;
491
492                        return Ok(Err(mismatch));
493                    } else if PartialOrder::less_than(&mismatch.current, &upper) {
494                        // Cut down the Description by advancing its lower to the current shard
495                        // upper and try again. IMPORTANT: We can only advance the lower, meaning
496                        // we cut updates away, we must not "extend" the batch by changing to a
497                        // lower that is not beyond the current lower. This invariant is checked by
498                        // the first if branch: if `!(current_upper < lower)` then it holds that
499                        // `lower <= current_upper`.
500                        lower = mismatch.current;
501                    } else {
502                        // We already have updates past this batch's upper, the append is a no-op.
503                        self.upper = mismatch.current;
504
505                        // Because we return a success result, the caller will
506                        // think that the batch was consumed or otherwise used,
507                        // so we have to delete it here.
508                        batch.delete().await;
509
510                        return Ok(Ok(()));
511                    }
512                }
513            }
514        }
515    }
516
517    /// Appends the batch of updates to the shard and downgrades this handle's
518    /// upper to `new_upper` iff the current global upper of this shard is
519    /// `expected_upper`.
520    ///
521    /// The innermost `Result` is `Ok` if the batch was successfully written. If
522    /// not, an `Upper` err containing the current global upper is returned.
523    ///
524    /// In contrast to [Self::append_batch], this linearizes mutations from all
525    /// writers. It's intended for use as an atomic primitive for timestamp
526    /// bindings, SQL tables, etc.
527    ///
528    /// A `new_upper` of the empty antichain "finishes" this shard, promising
529    /// that no more data is ever incoming.
530    ///
531    /// The batch may be empty, which allows for downgrading `upper` to
532    /// communicate progress. It is possible to heartbeat a writer lease by
533    /// calling this with `new_upper` equal to `self.upper()` and an empty
534    /// `updates` (making the call a no-op).
535    ///
536    /// IMPORTANT: In case of an erroneous result the caller is responsible for
537    /// the lifecycle of the `batch`. It can be deleted or it can be used to
538    /// retry with adjusted frontiers.
539    ///
540    /// The clunky multi-level Result is to enable more obvious error handling
541    /// in the caller. See <http://sled.rs/errors.html> for details.
542    ///
543    /// If the `enforce_matching_batch_boundaries` flag is set to `false`:
544    /// We no longer validate that every batch covers the entire range between
545    /// the expected and new uppers, as we wish to allow combining batches that
546    /// cover different subsets of that range, including subsets of that range
547    /// that include no data at all. The caller is responsible for guaranteeing
548    /// that the set of batches provided collectively include all updates for
549    /// the entire range between the expected and new upper.
550    #[instrument(level = "debug", fields(shard = %self.machine.shard_id()))]
551    pub async fn compare_and_append_batch(
552        &mut self,
553        batches: &mut [&mut Batch<K, V, T, D>],
554        expected_upper: Antichain<T>,
555        new_upper: Antichain<T>,
556        validate_part_bounds_on_write: bool,
557    ) -> Result<Result<(), UpperMismatch<T>>, InvalidUsage<T>>
558    where
559        D: Send + Sync,
560    {
561        // Before we append any data, we require a registered write schema.
562        // We expect the caller to ensure our schema is already present... unless this shard is a
563        // tombstone, in which case this write is either a noop or will fail gracefully.
564        let schema_id = self.try_register_schema().await;
565
566        for batch in batches.iter() {
567            if self.machine.shard_id() != batch.shard_id() {
568                return Err(InvalidUsage::BatchNotFromThisShard {
569                    batch_shard: batch.shard_id(),
570                    handle_shard: self.machine.shard_id(),
571                });
572            }
573            assert_code_can_read_data(&self.cfg.build_version, &batch.version);
574            if self.cfg.build_version > batch.version {
575                info!(
576                    shard_id =? self.machine.shard_id(),
577                    batch_version =? batch.version,
578                    writer_version =? self.cfg.build_version,
579                    "Appending batch from the past. This is fine but should be rare. \
580                    TODO: Error on very old versions once the leaked blob detector exists."
581                )
582            }
583            fn assert_schema<A: Codec>(writer_schema: &A::Schema, batch_schema: &bytes::Bytes) {
584                if batch_schema.is_empty() {
585                    // Schema is either trivial or missing!
586                    return;
587                }
588                let batch_schema: A::Schema = A::decode_schema(batch_schema);
589                if *writer_schema != batch_schema {
590                    error!(
591                        ?writer_schema,
592                        ?batch_schema,
593                        "writer and batch schemas should be identical"
594                    );
595                    soft_panic_or_log!("writer and batch schemas should be identical");
596                }
597            }
598            assert_schema::<K>(&*self.write_schemas.key, &batch.schemas.0);
599            assert_schema::<V>(&*self.write_schemas.val, &batch.schemas.1);
600        }
601
602        let lower = expected_upper.clone();
603        let upper = new_upper;
604        let since = Antichain::from_elem(T::minimum());
605        let desc = Description::new(lower, upper, since);
606
607        let mut received_inline_backpressure = false;
608        // Every hollow part must belong to some batch, so we can clean it up when the batch is dropped...
609        // but if we need to merge all our inline parts to a single run in S3, it's not correct to
610        // associate that with any of our individual input batches.
611        // At first, we'll try and put all the inline parts we receive into state... but if we
612        // get backpressured, we retry with this builder set to `Some`, put all our inline data into
613        // it, and ensure it's flushed out to S3 before including it in the batch.
614        let mut inline_batch_builder: Option<(_, BatchBuilder<K, V, T, D>)> = None;
615        let maintenance = loop {
616            let any_batch_rewrite = batches
617                .iter()
618                .any(|x| x.batch.parts.iter().any(|x| x.ts_rewrite().is_some()));
619            let (mut parts, mut num_updates, mut run_splits, mut run_metas) =
620                (vec![], 0, vec![], vec![]);
621            let mut key_storage = None;
622            let mut val_storage = None;
623            for batch in batches.iter() {
624                let () = validate_truncate_batch(
625                    &batch.batch,
626                    &desc,
627                    any_batch_rewrite,
628                    validate_part_bounds_on_write,
629                )?;
630                for (run_meta, run) in batch.batch.runs() {
631                    let start_index = parts.len();
632                    for part in run {
633                        if let (
634                            RunPart::Single(
635                                batch_part @ BatchPart::Inline {
636                                    updates,
637                                    ts_rewrite,
638                                    schema_id: _,
639                                    deprecated_schema_id: _,
640                                },
641                            ),
642                            Some((schema_cache, builder)),
643                        ) = (part, &mut inline_batch_builder)
644                        {
645                            let schema_migration = PartMigration::new(
646                                batch_part,
647                                self.write_schemas.clone(),
648                                schema_cache,
649                            )
650                            .await
651                            .expect("schemas for inline user part");
652
653                            let encoded_part = EncodedPart::from_inline(
654                                &crate::fetch::FetchConfig::from_persist_config(&self.cfg),
655                                &*self.metrics,
656                                self.metrics.read.compaction.clone(),
657                                desc.clone(),
658                                updates,
659                                ts_rewrite.as_ref(),
660                            );
661                            let mut fetched_part = FetchedPart::new(
662                                Arc::clone(&self.metrics),
663                                encoded_part,
664                                schema_migration,
665                                FetchBatchFilter::Compaction {
666                                    since: desc.since().clone(),
667                                },
668                                false,
669                                PartDecodeFormat::Arrow,
670                                None,
671                            );
672
673                            while let Some(((k, v), t, d)) =
674                                fetched_part.next_with_storage(&mut key_storage, &mut val_storage)
675                            {
676                                builder
677                                    .add(&k, &v, &t, &d)
678                                    .await
679                                    .expect("re-encoding just-decoded data");
680                            }
681                        } else {
682                            parts.push(part.clone())
683                        }
684                    }
685
686                    let end_index = parts.len();
687
688                    if start_index == end_index {
689                        continue;
690                    }
691
692                    // Mark the boundary if this is not the first run in the batch.
693                    if start_index != 0 {
694                        run_splits.push(start_index);
695                    }
696                    run_metas.push(run_meta.clone());
697                }
698                num_updates += batch.batch.len;
699            }
700
701            let mut flushed_inline_batch = if let Some((_, builder)) = inline_batch_builder.take() {
702                let mut finished = builder
703                    .finish(desc.upper().clone())
704                    .await
705                    .expect("invalid usage");
706                let cfg = BatchBuilderConfig::new(&self.cfg, self.shard_id());
707                finished
708                    .flush_to_blob(
709                        &cfg,
710                        &self.metrics.inline.backpressure,
711                        &self.isolated_runtime,
712                        &self.write_schemas,
713                    )
714                    .await;
715                Some(finished)
716            } else {
717                None
718            };
719
720            if let Some(batch) = &flushed_inline_batch {
721                for (run_meta, run) in batch.batch.runs() {
722                    assert!(run.len() > 0);
723                    let start_index = parts.len();
724                    if start_index != 0 {
725                        run_splits.push(start_index);
726                    }
727                    run_metas.push(run_meta.clone());
728                    parts.extend(run.iter().cloned())
729                }
730            }
731
732            let mut combined_batch =
733                HollowBatch::new(desc.clone(), parts, num_updates, run_metas, run_splits);
734
735            // The batch may have been written by a writer without a registered schema.
736            // Ensure we have a schema ID in the batch metadata before we append, to avoid type
737            // confusion later.
738            match schema_id {
739                Some(schema_id) => {
740                    ensure_batch_schema(&mut combined_batch, self.shard_id(), schema_id);
741                }
742                None => {
743                    assert!(
744                        self.fetch_recent_upper().await.is_empty(),
745                        "fetching a schema id should only fail when the shard is tombstoned"
746                    )
747                }
748            }
749
750            let heartbeat_timestamp = (self.cfg.now)();
751            let res = self
752                .machine
753                .compare_and_append(
754                    &combined_batch,
755                    &self.writer_id,
756                    &self.debug_state,
757                    heartbeat_timestamp,
758                )
759                .await;
760
761            match res {
762                CompareAndAppendRes::Success(_seqno, maintenance) => {
763                    self.upper.clone_from(desc.upper());
764                    for batch in batches.iter_mut() {
765                        batch.mark_consumed();
766                    }
767                    if let Some(batch) = &mut flushed_inline_batch {
768                        batch.mark_consumed();
769                    }
770                    break maintenance;
771                }
772                CompareAndAppendRes::InvalidUsage(invalid_usage) => {
773                    if let Some(batch) = flushed_inline_batch.take() {
774                        batch.delete().await;
775                    }
776                    return Err(invalid_usage);
777                }
778                CompareAndAppendRes::UpperMismatch(_seqno, current_upper) => {
779                    if let Some(batch) = flushed_inline_batch.take() {
780                        batch.delete().await;
781                    }
782                    // We tried to to a compare_and_append with the wrong expected upper, that
783                    // won't work. Update the cached upper to the current upper.
784                    self.upper.clone_from(&current_upper);
785                    return Ok(Err(UpperMismatch {
786                        current: current_upper,
787                        expected: expected_upper,
788                    }));
789                }
790                CompareAndAppendRes::InlineBackpressure => {
791                    // We tried to write an inline part, but there was already
792                    // too much in state. Flush it out to s3 and try again.
793                    assert_eq!(received_inline_backpressure, false);
794                    received_inline_backpressure = true;
795                    if COMBINE_INLINE_WRITES.get(&self.cfg) {
796                        inline_batch_builder = Some((
797                            self.machine.applier.schema_cache(),
798                            self.builder(desc.lower().clone()),
799                        ));
800                        continue;
801                    }
802
803                    let cfg = BatchBuilderConfig::new(&self.cfg, self.shard_id());
804                    // We could have a large number of inline parts (imagine the
805                    // sharded persist_sink), do this flushing concurrently.
806                    let flush_batches = batches
807                        .iter_mut()
808                        .map(|batch| async {
809                            batch
810                                .flush_to_blob(
811                                    &cfg,
812                                    &self.metrics.inline.backpressure,
813                                    &self.isolated_runtime,
814                                    &self.write_schemas,
815                                )
816                                .await
817                        })
818                        .collect::<FuturesUnordered<_>>();
819                    let () = flush_batches.collect::<()>().await;
820
821                    for batch in batches.iter() {
822                        assert_eq!(batch.batch.inline_bytes(), 0);
823                    }
824
825                    continue;
826                }
827            }
828        };
829
830        maintenance.start_performing(&self.machine, &self.gc, self.compact.as_ref());
831
832        Ok(Ok(()))
833    }
834
835    /// Turns the given [`ProtoBatch`] back into a [`Batch`] which can be used
836    /// to append it to this shard.
837    pub fn batch_from_transmittable_batch(&self, batch: ProtoBatch) -> Batch<K, V, T, D> {
838        let shard_id: ShardId = batch
839            .shard_id
840            .into_rust()
841            .expect("valid transmittable batch");
842        assert_eq!(shard_id, self.machine.shard_id());
843
844        let ret = Batch {
845            batch_delete_enabled: BATCH_DELETE_ENABLED.get(&self.cfg),
846            metrics: Arc::clone(&self.metrics),
847            shard_metrics: Arc::clone(&self.machine.applier.shard_metrics),
848            version: Version::parse(&batch.version).expect("valid transmittable batch"),
849            schemas: (batch.key_schema, batch.val_schema),
850            batch: batch
851                .batch
852                .into_rust_if_some("ProtoBatch::batch")
853                .expect("valid transmittable batch"),
854            blob: Arc::clone(&self.blob),
855            _phantom: std::marker::PhantomData,
856        };
857        assert_eq!(ret.shard_id(), self.machine.shard_id());
858        ret
859    }
860
861    /// Returns a [BatchBuilder] that can be used to write a batch of updates to
862    /// blob storage which can then be appended to this shard using
863    /// [Self::compare_and_append_batch] or [Self::append_batch].
864    ///
865    /// It is correct to create an empty batch, which allows for downgrading
866    /// `upper` to communicate progress. (see [Self::compare_and_append_batch]
867    /// or [Self::append_batch])
868    ///
869    /// The builder uses a bounded amount of memory, even when the number of
870    /// updates is very large. Individual records, however, should be small
871    /// enough that we can reasonably chunk them up: O(KB) is definitely fine,
872    /// O(MB) come talk to us.
873    pub fn builder(&self, lower: Antichain<T>) -> BatchBuilder<K, V, T, D> {
874        Self::builder_inner(
875            &self.cfg,
876            CompactConfig::new(&self.cfg, self.shard_id()),
877            Arc::clone(&self.metrics),
878            Arc::clone(&self.machine.applier.shard_metrics),
879            &self.metrics.user,
880            Arc::clone(&self.isolated_runtime),
881            Arc::clone(&self.blob),
882            self.shard_id(),
883            self.write_schemas.clone(),
884            lower,
885        )
886    }
887
888    /// Implementation of [Self::builder], so that we can share the
889    /// implementation in `PersistClient`.
890    pub(crate) fn builder_inner(
891        persist_cfg: &PersistConfig,
892        compact_cfg: CompactConfig,
893        metrics: Arc<Metrics>,
894        shard_metrics: Arc<ShardMetrics>,
895        user_batch_metrics: &BatchWriteMetrics,
896        isolated_runtime: Arc<IsolatedRuntime>,
897        blob: Arc<dyn Blob>,
898        shard_id: ShardId,
899        schemas: Schemas<K, V>,
900        lower: Antichain<T>,
901    ) -> BatchBuilder<K, V, T, D> {
902        let parts = if let Some(max_runs) = compact_cfg.batch.max_runs {
903            BatchParts::new_compacting::<K, V, D>(
904                compact_cfg,
905                Description::new(
906                    lower.clone(),
907                    Antichain::new(),
908                    Antichain::from_elem(T::minimum()),
909                ),
910                max_runs,
911                Arc::clone(&metrics),
912                shard_metrics,
913                shard_id,
914                Arc::clone(&blob),
915                isolated_runtime,
916                user_batch_metrics,
917                schemas.clone(),
918            )
919        } else {
920            BatchParts::new_ordered::<D>(
921                compact_cfg.batch,
922                RunOrder::Unordered,
923                Arc::clone(&metrics),
924                shard_metrics,
925                shard_id,
926                Arc::clone(&blob),
927                isolated_runtime,
928                user_batch_metrics,
929            )
930        };
931        let builder = BatchBuilderInternal::new(
932            BatchBuilderConfig::new(persist_cfg, shard_id),
933            parts,
934            metrics,
935            schemas,
936            blob,
937            shard_id,
938            persist_cfg.build_version.clone(),
939        );
940        BatchBuilder::new(
941            builder,
942            Description::new(lower, Antichain::new(), Antichain::from_elem(T::minimum())),
943        )
944    }
945
946    /// Uploads the given `updates` as one `Batch` to the blob store and returns
947    /// a handle to the batch.
948    #[instrument(level = "trace", fields(shard = %self.machine.shard_id()))]
949    pub async fn batch<SB, KB, VB, TB, DB, I>(
950        &mut self,
951        updates: I,
952        lower: Antichain<T>,
953        upper: Antichain<T>,
954    ) -> Result<Batch<K, V, T, D>, InvalidUsage<T>>
955    where
956        SB: Borrow<((KB, VB), TB, DB)>,
957        KB: Borrow<K>,
958        VB: Borrow<V>,
959        TB: Borrow<T>,
960        DB: Borrow<D>,
961        I: IntoIterator<Item = SB>,
962    {
963        let iter = updates.into_iter();
964
965        let mut builder = self.builder(lower.clone());
966
967        for update in iter {
968            let ((k, v), t, d) = update.borrow();
969            let (k, v, t, d) = (k.borrow(), v.borrow(), t.borrow(), d.borrow());
970            match builder.add(k, v, t, d).await {
971                Ok(Added::Record | Added::RecordAndParts) => (),
972                Err(invalid_usage) => return Err(invalid_usage),
973            }
974        }
975
976        builder.finish(upper.clone()).await
977    }
978
979    /// Blocks until the given `frontier` is less than the upper of the shard.
980    pub async fn wait_for_upper_past(&mut self, frontier: &Antichain<T>) {
981        let mut watch = self.machine.applier.watch();
982        let batch = self
983            .machine
984            .next_listen_batch(frontier, &mut watch, None, None)
985            .await;
986        if PartialOrder::less_than(&self.upper, batch.desc.upper()) {
987            self.upper.clone_from(batch.desc.upper());
988        }
989        assert!(PartialOrder::less_than(frontier, &self.upper));
990    }
991
992    /// Politely expires this writer, releasing any associated state.
993    ///
994    /// There is a best-effort impl in Drop to expire a writer that wasn't
995    /// explictly expired with this method. When possible, explicit expiry is
996    /// still preferred because the Drop one is best effort and is dependant on
997    /// a tokio [Handle] being available in the TLC at the time of drop (which
998    /// is a bit subtle). Also, explicit expiry allows for control over when it
999    /// happens.
1000    #[instrument(level = "debug", fields(shard = %self.machine.shard_id()))]
1001    pub async fn expire(mut self) {
1002        let Some(expire_fn) = self.expire_fn.take() else {
1003            return;
1004        };
1005        expire_fn.0().await;
1006    }
1007
1008    fn expire_fn(
1009        machine: Machine<K, V, T, D>,
1010        gc: GarbageCollector<K, V, T, D>,
1011        writer_id: WriterId,
1012    ) -> ExpireFn {
1013        ExpireFn(Box::new(move || {
1014            Box::pin(async move {
1015                let (_, maintenance) = machine.expire_writer(&writer_id).await;
1016                maintenance.start_performing(&machine, &gc);
1017            })
1018        }))
1019    }
1020
1021    /// Test helper for an [Self::append] call that is expected to succeed.
1022    #[cfg(test)]
1023    #[track_caller]
1024    pub async fn expect_append<L, U>(&mut self, updates: &[((K, V), T, D)], lower: L, new_upper: U)
1025    where
1026        L: Into<Antichain<T>>,
1027        U: Into<Antichain<T>>,
1028        D: Send + Sync,
1029    {
1030        self.append(updates.iter(), lower.into(), new_upper.into())
1031            .await
1032            .expect("invalid usage")
1033            .expect("unexpected upper");
1034    }
1035
1036    /// Test helper for a [Self::compare_and_append] call that is expected to
1037    /// succeed.
1038    #[cfg(test)]
1039    #[track_caller]
1040    pub async fn expect_compare_and_append(
1041        &mut self,
1042        updates: &[((K, V), T, D)],
1043        expected_upper: T,
1044        new_upper: T,
1045    ) where
1046        D: Send + Sync,
1047    {
1048        self.compare_and_append(
1049            updates.iter().map(|((k, v), t, d)| ((k, v), t, d)),
1050            Antichain::from_elem(expected_upper),
1051            Antichain::from_elem(new_upper),
1052        )
1053        .await
1054        .expect("invalid usage")
1055        .expect("unexpected upper")
1056    }
1057
1058    /// Test helper for a [Self::compare_and_append_batch] call that is expected
1059    /// to succeed.
1060    #[cfg(test)]
1061    #[track_caller]
1062    pub async fn expect_compare_and_append_batch(
1063        &mut self,
1064        batches: &mut [&mut Batch<K, V, T, D>],
1065        expected_upper: T,
1066        new_upper: T,
1067    ) {
1068        self.compare_and_append_batch(
1069            batches,
1070            Antichain::from_elem(expected_upper),
1071            Antichain::from_elem(new_upper),
1072            true,
1073        )
1074        .await
1075        .expect("invalid usage")
1076        .expect("unexpected upper")
1077    }
1078
1079    /// Test helper for an [Self::append] call that is expected to succeed.
1080    #[cfg(test)]
1081    #[track_caller]
1082    pub async fn expect_batch(
1083        &mut self,
1084        updates: &[((K, V), T, D)],
1085        lower: T,
1086        upper: T,
1087    ) -> Batch<K, V, T, D> {
1088        self.batch(
1089            updates.iter(),
1090            Antichain::from_elem(lower),
1091            Antichain::from_elem(upper),
1092        )
1093        .await
1094        .expect("invalid usage")
1095    }
1096}
1097
1098impl<K: Codec, V: Codec, T, D> Drop for WriteHandle<K, V, T, D> {
1099    fn drop(&mut self) {
1100        let Some(expire_fn) = self.expire_fn.take() else {
1101            return;
1102        };
1103        let handle = match Handle::try_current() {
1104            Ok(x) => x,
1105            Err(_) => {
1106                warn!(
1107                    "WriteHandle {} dropped without being explicitly expired, falling back to lease timeout",
1108                    self.writer_id
1109                );
1110                return;
1111            }
1112        };
1113        // Spawn a best-effort task to expire this write handle. It's fine if
1114        // this doesn't run to completion, we'd just have to wait out the lease
1115        // before the shard-global since is unblocked.
1116        //
1117        // Intentionally create the span outside the task to set the parent.
1118        let expire_span = debug_span!("drop::expire");
1119        handle.spawn_named(
1120            || format!("WriteHandle::expire ({})", self.writer_id),
1121            expire_fn.0().instrument(expire_span),
1122        );
1123    }
1124}
1125
1126/// Ensure the given batch uses the given schema ID.
1127///
1128/// If the batch has no schema set, initialize it to the given one.
1129/// If the batch has a schema set, assert that it matches the given one.
1130fn ensure_batch_schema<T>(batch: &mut HollowBatch<T>, shard_id: ShardId, schema_id: SchemaId)
1131where
1132    T: Timestamp + Lattice + Codec64,
1133{
1134    let ensure = |id: &mut Option<SchemaId>| match id {
1135        Some(id) => assert_eq!(*id, schema_id, "schema ID mismatch; shard={shard_id}"),
1136        None => *id = Some(schema_id),
1137    };
1138
1139    for run_meta in &mut batch.run_meta {
1140        ensure(&mut run_meta.schema);
1141    }
1142    for part in &mut batch.parts {
1143        match part {
1144            RunPart::Single(BatchPart::Hollow(part)) => ensure(&mut part.schema_id),
1145            RunPart::Single(BatchPart::Inline { schema_id, .. }) => ensure(schema_id),
1146            RunPart::Many(_hollow_run_ref) => {
1147                // TODO: Fetch the parts in this run and rewrite them too. Alternatively, make
1148                // `run_meta` the only place we keep schema IDs, so rewriting parts isn't
1149                // necessary.
1150            }
1151        }
1152    }
1153}
1154
1155#[cfg(test)]
1156mod tests {
1157    use std::str::FromStr;
1158    use std::sync::mpsc;
1159
1160    use differential_dataflow::consolidation::consolidate_updates;
1161    use futures_util::FutureExt;
1162    use mz_dyncfg::ConfigUpdates;
1163    use mz_ore::collections::CollectionExt;
1164    use mz_ore::task;
1165    use serde_json::json;
1166
1167    use crate::cache::PersistClientCache;
1168    use crate::tests::{all_ok, new_test_client};
1169    use crate::{PersistLocation, ShardId};
1170
1171    use super::*;
1172
1173    #[mz_persist_proc::test(tokio::test)]
1174    #[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented
1175    async fn empty_batches(dyncfgs: ConfigUpdates) {
1176        let data = [
1177            (("1".to_owned(), "one".to_owned()), 1, 1),
1178            (("2".to_owned(), "two".to_owned()), 2, 1),
1179            (("3".to_owned(), "three".to_owned()), 3, 1),
1180        ];
1181
1182        let (mut write, _) = new_test_client(&dyncfgs)
1183            .await
1184            .expect_open::<String, String, u64, i64>(ShardId::new())
1185            .await;
1186        let blob = Arc::clone(&write.blob);
1187
1188        // Write an initial batch.
1189        let mut upper = 3;
1190        write.expect_append(&data[..2], vec![0], vec![upper]).await;
1191
1192        // Write a bunch of empty batches. This shouldn't write blobs, so the count should stay the same.
1193        let mut count_before = 0;
1194        blob.list_keys_and_metadata("", &mut |_| {
1195            count_before += 1;
1196        })
1197        .await
1198        .expect("list_keys failed");
1199        for _ in 0..5 {
1200            let new_upper = upper + 1;
1201            write.expect_compare_and_append(&[], upper, new_upper).await;
1202            upper = new_upper;
1203        }
1204        let mut count_after = 0;
1205        blob.list_keys_and_metadata("", &mut |_| {
1206            count_after += 1;
1207        })
1208        .await
1209        .expect("list_keys failed");
1210        assert_eq!(count_after, count_before);
1211    }
1212
1213    #[mz_persist_proc::test(tokio::test)]
1214    #[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented
1215    async fn compare_and_append_batch_multi(dyncfgs: ConfigUpdates) {
1216        let data0 = vec![
1217            (("1".to_owned(), "one".to_owned()), 1, 1),
1218            (("2".to_owned(), "two".to_owned()), 2, 1),
1219            (("4".to_owned(), "four".to_owned()), 4, 1),
1220        ];
1221        let data1 = vec![
1222            (("1".to_owned(), "one".to_owned()), 1, 1),
1223            (("2".to_owned(), "two".to_owned()), 2, 1),
1224            (("3".to_owned(), "three".to_owned()), 3, 1),
1225        ];
1226
1227        let (mut write, mut read) = new_test_client(&dyncfgs)
1228            .await
1229            .expect_open::<String, String, u64, i64>(ShardId::new())
1230            .await;
1231
1232        let mut batch0 = write.expect_batch(&data0, 0, 5).await;
1233        let mut batch1 = write.expect_batch(&data1, 0, 4).await;
1234
1235        write
1236            .expect_compare_and_append_batch(&mut [&mut batch0, &mut batch1], 0, 4)
1237            .await;
1238
1239        let batch = write
1240            .machine
1241            .snapshot(&Antichain::from_elem(3))
1242            .await
1243            .expect("just wrote this")
1244            .into_element();
1245
1246        assert!(batch.runs().count() >= 2);
1247
1248        let expected = vec![
1249            (("1".to_owned(), "one".to_owned()), 1, 2),
1250            (("2".to_owned(), "two".to_owned()), 2, 2),
1251            (("3".to_owned(), "three".to_owned()), 3, 1),
1252        ];
1253        let mut actual = read.expect_snapshot_and_fetch(3).await;
1254        consolidate_updates(&mut actual);
1255        assert_eq!(actual, all_ok(&expected, 3));
1256    }
1257
1258    #[mz_ore::test]
1259    fn writer_id_human_readable_serde() {
1260        #[derive(Debug, Serialize, Deserialize)]
1261        struct Container {
1262            writer_id: WriterId,
1263        }
1264
1265        // roundtrip through json
1266        let id = WriterId::from_str("w00000000-1234-5678-0000-000000000000").expect("valid id");
1267        assert_eq!(
1268            id,
1269            serde_json::from_value(serde_json::to_value(id.clone()).expect("serializable"))
1270                .expect("deserializable")
1271        );
1272
1273        // deserialize a serialized string directly
1274        assert_eq!(
1275            id,
1276            serde_json::from_str("\"w00000000-1234-5678-0000-000000000000\"")
1277                .expect("deserializable")
1278        );
1279
1280        // roundtrip id through a container type
1281        let json = json!({ "writer_id": id });
1282        assert_eq!(
1283            "{\"writer_id\":\"w00000000-1234-5678-0000-000000000000\"}",
1284            &json.to_string()
1285        );
1286        let container: Container = serde_json::from_value(json).expect("deserializable");
1287        assert_eq!(container.writer_id, id);
1288    }
1289
1290    #[mz_persist_proc::test(tokio::test)]
1291    #[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented
1292    async fn hollow_batch_roundtrip(dyncfgs: ConfigUpdates) {
1293        let data = vec![
1294            (("1".to_owned(), "one".to_owned()), 1, 1),
1295            (("2".to_owned(), "two".to_owned()), 2, 1),
1296            (("3".to_owned(), "three".to_owned()), 3, 1),
1297        ];
1298
1299        let (mut write, mut read) = new_test_client(&dyncfgs)
1300            .await
1301            .expect_open::<String, String, u64, i64>(ShardId::new())
1302            .await;
1303
1304        // This test is a bit more complex than it should be. It would be easier
1305        // if we could just compare the rehydrated batch to the original batch.
1306        // But a) turning a batch into a hollow batch consumes it, and b) Batch
1307        // doesn't have Eq/PartialEq.
1308        let batch = write.expect_batch(&data, 0, 4).await;
1309        let hollow_batch = batch.into_transmittable_batch();
1310        let mut rehydrated_batch = write.batch_from_transmittable_batch(hollow_batch);
1311
1312        write
1313            .expect_compare_and_append_batch(&mut [&mut rehydrated_batch], 0, 4)
1314            .await;
1315
1316        let expected = vec![
1317            (("1".to_owned(), "one".to_owned()), 1, 1),
1318            (("2".to_owned(), "two".to_owned()), 2, 1),
1319            (("3".to_owned(), "three".to_owned()), 3, 1),
1320        ];
1321        let mut actual = read.expect_snapshot_and_fetch(3).await;
1322        consolidate_updates(&mut actual);
1323        assert_eq!(actual, all_ok(&expected, 3));
1324    }
1325
1326    #[mz_persist_proc::test(tokio::test)]
1327    #[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented
1328    async fn wait_for_upper_past(dyncfgs: ConfigUpdates) {
1329        let client = new_test_client(&dyncfgs).await;
1330        let (mut write, _) = client.expect_open::<(), (), u64, i64>(ShardId::new()).await;
1331        let five = Antichain::from_elem(5);
1332
1333        // Upper is not past 5.
1334        assert_eq!(write.wait_for_upper_past(&five).now_or_never(), None);
1335
1336        // Upper is still not past 5.
1337        write
1338            .expect_compare_and_append(&[(((), ()), 1, 1)], 0, 5)
1339            .await;
1340        assert_eq!(write.wait_for_upper_past(&five).now_or_never(), None);
1341
1342        // Upper is past 5.
1343        write
1344            .expect_compare_and_append(&[(((), ()), 5, 1)], 5, 7)
1345            .await;
1346        assert_eq!(write.wait_for_upper_past(&five).now_or_never(), Some(()));
1347        assert_eq!(write.upper(), &Antichain::from_elem(7));
1348
1349        // Waiting for previous uppers does not regress the handle's cached
1350        // upper.
1351        assert_eq!(
1352            write
1353                .wait_for_upper_past(&Antichain::from_elem(2))
1354                .now_or_never(),
1355            Some(())
1356        );
1357        assert_eq!(write.upper(), &Antichain::from_elem(7));
1358    }
1359
1360    #[mz_ore::test(tokio::test(flavor = "multi_thread"))]
1361    #[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented
1362    async fn fetch_recent_upper_linearized() {
1363        type Timestamp = u64;
1364        let max_upper = 1000;
1365
1366        let shard_id = ShardId::new();
1367        let mut clients = PersistClientCache::new_no_metrics();
1368        let upper_writer_client = clients.open(PersistLocation::new_in_mem()).await.unwrap();
1369        let (mut upper_writer, _) = upper_writer_client
1370            .expect_open::<(), (), Timestamp, i64>(shard_id)
1371            .await;
1372        // Clear the state cache between each client to maximally disconnect
1373        // them from each other.
1374        clients.clear_state_cache();
1375        let upper_reader_client = clients.open(PersistLocation::new_in_mem()).await.unwrap();
1376        let (mut upper_reader, _) = upper_reader_client
1377            .expect_open::<(), (), Timestamp, i64>(shard_id)
1378            .await;
1379        let (tx, rx) = mpsc::channel();
1380
1381        let task = task::spawn(|| "upper-reader", async move {
1382            let mut upper = Timestamp::MIN;
1383
1384            while upper < max_upper {
1385                while let Ok(new_upper) = rx.try_recv() {
1386                    upper = new_upper;
1387                }
1388
1389                let recent_upper = upper_reader
1390                    .fetch_recent_upper()
1391                    .await
1392                    .as_option()
1393                    .cloned()
1394                    .expect("u64 is totally ordered and the shard is not finalized");
1395                assert!(
1396                    recent_upper >= upper,
1397                    "recent upper {recent_upper:?} is less than known upper {upper:?}"
1398                );
1399            }
1400        });
1401
1402        for upper in Timestamp::MIN..max_upper {
1403            let next_upper = upper + 1;
1404            upper_writer
1405                .expect_compare_and_append(&[], upper, next_upper)
1406                .await;
1407            tx.send(next_upper).expect("send failed");
1408        }
1409
1410        task.await;
1411    }
1412}