mz_persist_client/
write.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Write capabilities and handles
11
12use std::borrow::Borrow;
13use std::fmt::Debug;
14use std::sync::Arc;
15
16use differential_dataflow::difference::Monoid;
17use differential_dataflow::lattice::Lattice;
18use differential_dataflow::trace::Description;
19use futures::StreamExt;
20use futures::stream::FuturesUnordered;
21use mz_dyncfg::Config;
22use mz_ore::task::RuntimeExt;
23use mz_ore::{instrument, soft_panic_or_log};
24use mz_persist::location::Blob;
25use mz_persist_types::schema::SchemaId;
26use mz_persist_types::{Codec, Codec64};
27use mz_proto::{IntoRustIfSome, ProtoType};
28use proptest_derive::Arbitrary;
29use semver::Version;
30use serde::{Deserialize, Serialize};
31use timely::PartialOrder;
32use timely::order::TotalOrder;
33use timely::progress::{Antichain, Timestamp};
34use tokio::runtime::Handle;
35use tracing::{Instrument, debug_span, error, info, warn};
36use uuid::Uuid;
37
38use crate::batch::{
39    Added, BATCH_DELETE_ENABLED, Batch, BatchBuilder, BatchBuilderConfig, BatchBuilderInternal,
40    BatchParts, ProtoBatch, validate_truncate_batch,
41};
42use crate::error::{InvalidUsage, UpperMismatch};
43use crate::fetch::{
44    EncodedPart, FetchBatchFilter, FetchedPart, PartDecodeFormat, VALIDATE_PART_BOUNDS_ON_READ,
45};
46use crate::internal::compact::{CompactConfig, Compactor};
47use crate::internal::encoding::{Schemas, assert_code_can_read_data};
48use crate::internal::machine::{CompareAndAppendRes, ExpireFn, Machine};
49use crate::internal::metrics::{BatchWriteMetrics, Metrics, ShardMetrics};
50use crate::internal::state::{BatchPart, HandleDebugState, HollowBatch, RunOrder, RunPart};
51use crate::read::ReadHandle;
52use crate::schema::PartMigration;
53use crate::{GarbageCollector, IsolatedRuntime, PersistConfig, ShardId, parse_id};
54
55pub(crate) const COMBINE_INLINE_WRITES: Config<bool> = Config::new(
56    "persist_write_combine_inline_writes",
57    true,
58    "If set, re-encode inline writes if they don't fit into the batch metadata limits.",
59);
60
61pub(crate) const VALIDATE_PART_BOUNDS_ON_WRITE: Config<bool> = Config::new(
62    "persist_validate_part_bounds_on_write",
63    true,
64    "Validate the part lower <= the batch lower and the part upper <= batch upper,\
65    for the batch being appended.",
66);
67
68/// An opaque identifier for a writer of a persist durable TVC (aka shard).
69#[derive(Arbitrary, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
70#[serde(try_from = "String", into = "String")]
71pub struct WriterId(pub(crate) [u8; 16]);
72
73impl std::fmt::Display for WriterId {
74    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
75        write!(f, "w{}", Uuid::from_bytes(self.0))
76    }
77}
78
79impl std::fmt::Debug for WriterId {
80    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
81        write!(f, "WriterId({})", Uuid::from_bytes(self.0))
82    }
83}
84
85impl std::str::FromStr for WriterId {
86    type Err = String;
87
88    fn from_str(s: &str) -> Result<Self, Self::Err> {
89        parse_id("w", "WriterId", s).map(WriterId)
90    }
91}
92
93impl From<WriterId> for String {
94    fn from(writer_id: WriterId) -> Self {
95        writer_id.to_string()
96    }
97}
98
99impl TryFrom<String> for WriterId {
100    type Error = String;
101
102    fn try_from(s: String) -> Result<Self, Self::Error> {
103        s.parse()
104    }
105}
106
107impl WriterId {
108    pub(crate) fn new() -> Self {
109        WriterId(*Uuid::new_v4().as_bytes())
110    }
111}
112
113/// A "capability" granting the ability to apply updates to some shard at times
114/// greater or equal to `self.upper()`.
115///
116/// All async methods on ReadHandle retry for as long as they are able, but the
117/// returned [std::future::Future]s implement "cancel on drop" semantics. This
118/// means that callers can add a timeout using [tokio::time::timeout] or
119/// [tokio::time::timeout_at].
120///
121/// ```rust,no_run
122/// # let mut write: mz_persist_client::write::WriteHandle<String, String, u64, i64> = unimplemented!();
123/// # let timeout: std::time::Duration = unimplemented!();
124/// # async {
125/// tokio::time::timeout(timeout, write.fetch_recent_upper()).await
126/// # };
127/// ```
128#[derive(Debug)]
129pub struct WriteHandle<K: Codec, V: Codec, T, D> {
130    pub(crate) cfg: PersistConfig,
131    pub(crate) metrics: Arc<Metrics>,
132    pub(crate) machine: Machine<K, V, T, D>,
133    pub(crate) gc: GarbageCollector<K, V, T, D>,
134    pub(crate) compact: Option<Compactor<K, V, T, D>>,
135    pub(crate) blob: Arc<dyn Blob>,
136    pub(crate) isolated_runtime: Arc<IsolatedRuntime>,
137    pub(crate) writer_id: WriterId,
138    pub(crate) debug_state: HandleDebugState,
139    pub(crate) write_schemas: Schemas<K, V>,
140
141    pub(crate) upper: Antichain<T>,
142    expire_fn: Option<ExpireFn>,
143}
144
145impl<K, V, T, D> WriteHandle<K, V, T, D>
146where
147    K: Debug + Codec,
148    V: Debug + Codec,
149    T: Timestamp + TotalOrder + Lattice + Codec64 + Sync,
150    D: Monoid + Ord + Codec64 + Send + Sync,
151{
152    pub(crate) fn new(
153        cfg: PersistConfig,
154        metrics: Arc<Metrics>,
155        machine: Machine<K, V, T, D>,
156        gc: GarbageCollector<K, V, T, D>,
157        blob: Arc<dyn Blob>,
158        writer_id: WriterId,
159        purpose: &str,
160        write_schemas: Schemas<K, V>,
161    ) -> Self {
162        let isolated_runtime = Arc::clone(&machine.isolated_runtime);
163        let compact = cfg
164            .compaction_enabled
165            .then(|| Compactor::new(cfg.clone(), Arc::clone(&metrics), gc.clone()));
166        let debug_state = HandleDebugState {
167            hostname: cfg.hostname.to_owned(),
168            purpose: purpose.to_owned(),
169        };
170        let upper = machine.applier.clone_upper();
171        let expire_fn = Self::expire_fn(machine.clone(), gc.clone(), writer_id.clone());
172        WriteHandle {
173            cfg,
174            metrics,
175            machine,
176            gc,
177            compact,
178            blob,
179            isolated_runtime,
180            writer_id,
181            debug_state,
182            write_schemas,
183            upper,
184            expire_fn: Some(expire_fn),
185        }
186    }
187
188    /// Creates a [WriteHandle] for the same shard from an existing
189    /// [ReadHandle].
190    pub fn from_read(read: &ReadHandle<K, V, T, D>, purpose: &str) -> Self {
191        Self::new(
192            read.cfg.clone(),
193            Arc::clone(&read.metrics),
194            read.machine.clone(),
195            read.gc.clone(),
196            Arc::clone(&read.blob),
197            WriterId::new(),
198            purpose,
199            read.read_schemas.clone(),
200        )
201    }
202
203    /// True iff this WriteHandle supports writing without enforcing batch
204    /// bounds checks.
205    pub fn validate_part_bounds_on_write(&self) -> bool {
206        // Note that we require validation when the read checks are enabled, even if the write-time
207        // checks would otherwise be disabled, to avoid batches that would fail at read time.
208        VALIDATE_PART_BOUNDS_ON_WRITE.get(&self.cfg) || VALIDATE_PART_BOUNDS_ON_READ.get(&self.cfg)
209    }
210
211    /// This handle's shard id.
212    pub fn shard_id(&self) -> ShardId {
213        self.machine.shard_id()
214    }
215
216    /// Returns the schema of this writer.
217    pub fn schema_id(&self) -> Option<SchemaId> {
218        self.write_schemas.id
219    }
220
221    /// Registers the write schema, if it isn't already registered.
222    ///
223    /// This method expects that either the shard doesn't yet have any schema registered, or one of
224    /// the registered schemas is the same as the write schema. If all registered schemas are
225    /// different from the write schema, or the shard is a tombstone, it returns `None`.
226    pub async fn try_register_schema(&mut self) -> Option<SchemaId> {
227        let Schemas { id, key, val } = &self.write_schemas;
228
229        if let Some(id) = id {
230            return Some(*id);
231        }
232
233        let (schema_id, maintenance) = self.machine.register_schema(key, val).await;
234        maintenance.start_performing(&self.machine, &self.gc);
235
236        self.write_schemas.id = schema_id;
237        schema_id
238    }
239
240    /// A cached version of the shard-global `upper` frontier.
241    ///
242    /// This is the most recent upper discovered by this handle. It is
243    /// potentially more stale than [Self::shared_upper] but is lock-free and
244    /// allocation-free. This will always be less or equal to the shard-global
245    /// `upper`.
246    pub fn upper(&self) -> &Antichain<T> {
247        &self.upper
248    }
249
250    /// A less-stale cached version of the shard-global `upper` frontier.
251    ///
252    /// This is the most recently known upper for this shard process-wide, but
253    /// unlike [Self::upper] it requires a mutex and a clone. This will always be
254    /// less or equal to the shard-global `upper`.
255    pub fn shared_upper(&self) -> Antichain<T> {
256        self.machine.applier.clone_upper()
257    }
258
259    /// Fetches and returns a recent shard-global `upper`. Importantly, this operation is
260    /// linearized with write operations.
261    ///
262    /// This requires fetching the latest state from consensus and is therefore a potentially
263    /// expensive operation.
264    #[instrument(level = "debug", fields(shard = %self.machine.shard_id()))]
265    pub async fn fetch_recent_upper(&mut self) -> &Antichain<T> {
266        // TODO: Do we even need to track self.upper on WriteHandle or could
267        // WriteHandle::upper just get the one out of machine?
268        self.machine
269            .applier
270            .fetch_upper(|current_upper| self.upper.clone_from(current_upper))
271            .await;
272        &self.upper
273    }
274
275    /// Advance the shard's upper by the given frontier.
276    ///
277    /// If the provided `target` is less than or equal to the shard's upper, this is a no-op.
278    ///
279    /// In contrast to the various compare-and-append methods, this method does not require the
280    /// handle's write schema to be registered with the shard. That is, it is fine to use a dummy
281    /// schema when creating a writer just to advance a shard upper.
282    pub async fn advance_upper(&mut self, target: &Antichain<T>) {
283        // We avoid `fetch_recent_upper` here, to avoid a consensus roundtrip if the known upper is
284        // already beyond the target.
285        let mut lower = self.shared_upper().clone();
286
287        while !PartialOrder::less_equal(target, &lower) {
288            let since = Antichain::from_elem(T::minimum());
289            let desc = Description::new(lower.clone(), target.clone(), since);
290            let batch = HollowBatch::empty(desc);
291
292            let heartbeat_timestamp = (self.cfg.now)();
293            let res = self
294                .machine
295                .compare_and_append(
296                    &batch,
297                    &self.writer_id,
298                    &self.debug_state,
299                    heartbeat_timestamp,
300                )
301                .await;
302
303            use CompareAndAppendRes::*;
304            let new_upper = match res {
305                Success(_seq_no, maintenance) => {
306                    maintenance.start_performing(&self.machine, &self.gc, self.compact.as_ref());
307                    batch.desc.upper().clone()
308                }
309                UpperMismatch(_seq_no, actual_upper) => actual_upper,
310                InvalidUsage(_invalid_usage) => unreachable!("batch bounds checked above"),
311                InlineBackpressure => unreachable!("batch was empty"),
312            };
313
314            self.upper.clone_from(&new_upper);
315            lower = new_upper;
316        }
317    }
318
319    /// Applies `updates` to this shard and downgrades this handle's upper to
320    /// `upper`.
321    ///
322    /// The innermost `Result` is `Ok` if the updates were successfully written.
323    /// If not, an `Upper` err containing the current writer upper is returned.
324    /// If that happens, we also update our local `upper` to match the current
325    /// upper. This is useful in cases where a timeout happens in between a
326    /// successful write and returning that to the client.
327    ///
328    /// In contrast to [Self::compare_and_append], multiple [WriteHandle]s may
329    /// be used concurrently to write to the same shard, but in this case, the
330    /// data being written must be identical (in the sense of "definite"-ness).
331    /// It's intended for replicated use by source ingestion, sinks, etc.
332    ///
333    /// All times in `updates` must be greater or equal to `lower` and not
334    /// greater or equal to `upper`. A `upper` of the empty antichain "finishes"
335    /// this shard, promising that no more data is ever incoming.
336    ///
337    /// `updates` may be empty, which allows for downgrading `upper` to
338    /// communicate progress. It is possible to call this with `upper` equal to
339    /// `self.upper()` and an empty `updates` (making the call a no-op).
340    ///
341    /// This uses a bounded amount of memory, even when `updates` is very large.
342    /// Individual records, however, should be small enough that we can
343    /// reasonably chunk them up: O(KB) is definitely fine, O(MB) come talk to
344    /// us.
345    ///
346    /// The clunky multi-level Result is to enable more obvious error handling
347    /// in the caller. See <http://sled.rs/errors.html> for details.
348    #[instrument(level = "trace", fields(shard = %self.machine.shard_id()))]
349    pub async fn append<SB, KB, VB, TB, DB, I>(
350        &mut self,
351        updates: I,
352        lower: Antichain<T>,
353        upper: Antichain<T>,
354    ) -> Result<Result<(), UpperMismatch<T>>, InvalidUsage<T>>
355    where
356        SB: Borrow<((KB, VB), TB, DB)>,
357        KB: Borrow<K>,
358        VB: Borrow<V>,
359        TB: Borrow<T>,
360        DB: Borrow<D>,
361        I: IntoIterator<Item = SB>,
362        D: Send + Sync,
363    {
364        let batch = self.batch(updates, lower.clone(), upper.clone()).await?;
365        self.append_batch(batch, lower, upper).await
366    }
367
368    /// Applies `updates` to this shard and downgrades this handle's upper to
369    /// `new_upper` iff the current global upper of this shard is
370    /// `expected_upper`.
371    ///
372    /// The innermost `Result` is `Ok` if the updates were successfully written.
373    /// If not, an `Upper` err containing the current global upper is returned.
374    ///
375    /// In contrast to [Self::append], this linearizes mutations from all
376    /// writers. It's intended for use as an atomic primitive for timestamp
377    /// bindings, SQL tables, etc.
378    ///
379    /// All times in `updates` must be greater or equal to `expected_upper` and
380    /// not greater or equal to `new_upper`. A `new_upper` of the empty
381    /// antichain "finishes" this shard, promising that no more data is ever
382    /// incoming.
383    ///
384    /// `updates` may be empty, which allows for downgrading `upper` to
385    /// communicate progress. It is possible to heartbeat a writer lease by
386    /// calling this with `new_upper` equal to `self.upper()` and an empty
387    /// `updates` (making the call a no-op).
388    ///
389    /// This uses a bounded amount of memory, even when `updates` is very large.
390    /// Individual records, however, should be small enough that we can
391    /// reasonably chunk them up: O(KB) is definitely fine, O(MB) come talk to
392    /// us.
393    ///
394    /// The clunky multi-level Result is to enable more obvious error handling
395    /// in the caller. See <http://sled.rs/errors.html> for details.
396    #[instrument(level = "trace", fields(shard = %self.machine.shard_id()))]
397    pub async fn compare_and_append<SB, KB, VB, TB, DB, I>(
398        &mut self,
399        updates: I,
400        expected_upper: Antichain<T>,
401        new_upper: Antichain<T>,
402    ) -> Result<Result<(), UpperMismatch<T>>, InvalidUsage<T>>
403    where
404        SB: Borrow<((KB, VB), TB, DB)>,
405        KB: Borrow<K>,
406        VB: Borrow<V>,
407        TB: Borrow<T>,
408        DB: Borrow<D>,
409        I: IntoIterator<Item = SB>,
410        D: Send + Sync,
411    {
412        let mut batch = self
413            .batch(updates, expected_upper.clone(), new_upper.clone())
414            .await?;
415        match self
416            .compare_and_append_batch(&mut [&mut batch], expected_upper, new_upper, true)
417            .await
418        {
419            ok @ Ok(Ok(())) => ok,
420            err => {
421                // We cannot delete the batch in compare_and_append_batch()
422                // because the caller owns the batch and might want to retry
423                // with a different `expected_upper`. In this function, we
424                // control the batch, so we have to delete it.
425                batch.delete().await;
426                err
427            }
428        }
429    }
430
431    /// Appends the batch of updates to the shard and downgrades this handle's
432    /// upper to `upper`.
433    ///
434    /// The innermost `Result` is `Ok` if the updates were successfully written.
435    /// If not, an `Upper` err containing the current writer upper is returned.
436    /// If that happens, we also update our local `upper` to match the current
437    /// upper. This is useful in cases where a timeout happens in between a
438    /// successful write and returning that to the client.
439    ///
440    /// In contrast to [Self::compare_and_append_batch], multiple [WriteHandle]s
441    /// may be used concurrently to write to the same shard, but in this case,
442    /// the data being written must be identical (in the sense of
443    /// "definite"-ness). It's intended for replicated use by source ingestion,
444    /// sinks, etc.
445    ///
446    /// A `upper` of the empty antichain "finishes" this shard, promising that
447    /// no more data is ever incoming.
448    ///
449    /// The batch may be empty, which allows for downgrading `upper` to
450    /// communicate progress. It is possible to heartbeat a writer lease by
451    /// calling this with `upper` equal to `self.upper()` and an empty `updates`
452    /// (making the call a no-op).
453    ///
454    /// The clunky multi-level Result is to enable more obvious error handling
455    /// in the caller. See <http://sled.rs/errors.html> for details.
456    #[instrument(level = "trace", fields(shard = %self.machine.shard_id()))]
457    pub async fn append_batch(
458        &mut self,
459        mut batch: Batch<K, V, T, D>,
460        mut lower: Antichain<T>,
461        upper: Antichain<T>,
462    ) -> Result<Result<(), UpperMismatch<T>>, InvalidUsage<T>>
463    where
464        D: Send + Sync,
465    {
466        loop {
467            let res = self
468                .compare_and_append_batch(&mut [&mut batch], lower.clone(), upper.clone(), true)
469                .await?;
470            match res {
471                Ok(()) => {
472                    self.upper = upper;
473                    return Ok(Ok(()));
474                }
475                Err(mismatch) => {
476                    // We tried to to a non-contiguous append, that won't work.
477                    if PartialOrder::less_than(&mismatch.current, &lower) {
478                        self.upper.clone_from(&mismatch.current);
479
480                        batch.delete().await;
481
482                        return Ok(Err(mismatch));
483                    } else if PartialOrder::less_than(&mismatch.current, &upper) {
484                        // Cut down the Description by advancing its lower to the current shard
485                        // upper and try again. IMPORTANT: We can only advance the lower, meaning
486                        // we cut updates away, we must not "extend" the batch by changing to a
487                        // lower that is not beyond the current lower. This invariant is checked by
488                        // the first if branch: if `!(current_upper < lower)` then it holds that
489                        // `lower <= current_upper`.
490                        lower = mismatch.current;
491                    } else {
492                        // We already have updates past this batch's upper, the append is a no-op.
493                        self.upper = mismatch.current;
494
495                        // Because we return a success result, the caller will
496                        // think that the batch was consumed or otherwise used,
497                        // so we have to delete it here.
498                        batch.delete().await;
499
500                        return Ok(Ok(()));
501                    }
502                }
503            }
504        }
505    }
506
507    /// Appends the batch of updates to the shard and downgrades this handle's
508    /// upper to `new_upper` iff the current global upper of this shard is
509    /// `expected_upper`.
510    ///
511    /// The innermost `Result` is `Ok` if the batch was successfully written. If
512    /// not, an `Upper` err containing the current global upper is returned.
513    ///
514    /// In contrast to [Self::append_batch], this linearizes mutations from all
515    /// writers. It's intended for use as an atomic primitive for timestamp
516    /// bindings, SQL tables, etc.
517    ///
518    /// A `new_upper` of the empty antichain "finishes" this shard, promising
519    /// that no more data is ever incoming.
520    ///
521    /// The batch may be empty, which allows for downgrading `upper` to
522    /// communicate progress. It is possible to heartbeat a writer lease by
523    /// calling this with `new_upper` equal to `self.upper()` and an empty
524    /// `updates` (making the call a no-op).
525    ///
526    /// IMPORTANT: In case of an erroneous result the caller is responsible for
527    /// the lifecycle of the `batch`. It can be deleted or it can be used to
528    /// retry with adjusted frontiers.
529    ///
530    /// The clunky multi-level Result is to enable more obvious error handling
531    /// in the caller. See <http://sled.rs/errors.html> for details.
532    ///
533    /// If the `enforce_matching_batch_boundaries` flag is set to `false`:
534    /// We no longer validate that every batch covers the entire range between
535    /// the expected and new uppers, as we wish to allow combining batches that
536    /// cover different subsets of that range, including subsets of that range
537    /// that include no data at all. The caller is responsible for guaranteeing
538    /// that the set of batches provided collectively include all updates for
539    /// the entire range between the expected and new upper.
540    #[instrument(level = "debug", fields(shard = %self.machine.shard_id()))]
541    pub async fn compare_and_append_batch(
542        &mut self,
543        batches: &mut [&mut Batch<K, V, T, D>],
544        expected_upper: Antichain<T>,
545        new_upper: Antichain<T>,
546        validate_part_bounds_on_write: bool,
547    ) -> Result<Result<(), UpperMismatch<T>>, InvalidUsage<T>>
548    where
549        D: Send + Sync,
550    {
551        // Before we append any data, we require a registered write schema.
552        // We expect the caller to ensure our schema is already present... unless this shard is a
553        // tombstone, in which case this write is either a noop or will fail gracefully.
554        let schema_id = self.try_register_schema().await;
555
556        for batch in batches.iter() {
557            if self.machine.shard_id() != batch.shard_id() {
558                return Err(InvalidUsage::BatchNotFromThisShard {
559                    batch_shard: batch.shard_id(),
560                    handle_shard: self.machine.shard_id(),
561                });
562            }
563            assert_code_can_read_data(&self.cfg.build_version, &batch.version);
564            if self.cfg.build_version > batch.version {
565                info!(
566                    shard_id =? self.machine.shard_id(),
567                    batch_version =? batch.version,
568                    writer_version =? self.cfg.build_version,
569                    "Appending batch from the past. This is fine but should be rare. \
570                    TODO: Error on very old versions once the leaked blob detector exists."
571                )
572            }
573            fn assert_schema<A: Codec>(writer_schema: &A::Schema, batch_schema: &bytes::Bytes) {
574                if batch_schema.is_empty() {
575                    // Schema is either trivial or missing!
576                    return;
577                }
578                let batch_schema: A::Schema = A::decode_schema(batch_schema);
579                if *writer_schema != batch_schema {
580                    error!(
581                        ?writer_schema,
582                        ?batch_schema,
583                        "writer and batch schemas should be identical"
584                    );
585                    soft_panic_or_log!("writer and batch schemas should be identical");
586                }
587            }
588            assert_schema::<K>(&*self.write_schemas.key, &batch.schemas.0);
589            assert_schema::<V>(&*self.write_schemas.val, &batch.schemas.1);
590        }
591
592        let lower = expected_upper.clone();
593        let upper = new_upper;
594        let since = Antichain::from_elem(T::minimum());
595        let desc = Description::new(lower, upper, since);
596
597        let mut received_inline_backpressure = false;
598        // Every hollow part must belong to some batch, so we can clean it up when the batch is dropped...
599        // but if we need to merge all our inline parts to a single run in S3, it's not correct to
600        // associate that with any of our individual input batches.
601        // At first, we'll try and put all the inline parts we receive into state... but if we
602        // get backpressured, we retry with this builder set to `Some`, put all our inline data into
603        // it, and ensure it's flushed out to S3 before including it in the batch.
604        let mut inline_batch_builder: Option<(_, BatchBuilder<K, V, T, D>)> = None;
605        let maintenance = loop {
606            let any_batch_rewrite = batches
607                .iter()
608                .any(|x| x.batch.parts.iter().any(|x| x.ts_rewrite().is_some()));
609            let (mut parts, mut num_updates, mut run_splits, mut run_metas) =
610                (vec![], 0, vec![], vec![]);
611            let mut key_storage = None;
612            let mut val_storage = None;
613            for batch in batches.iter() {
614                let () = validate_truncate_batch(
615                    &batch.batch,
616                    &desc,
617                    any_batch_rewrite,
618                    validate_part_bounds_on_write,
619                )?;
620                for (run_meta, run) in batch.batch.runs() {
621                    let start_index = parts.len();
622                    for part in run {
623                        if let (
624                            RunPart::Single(
625                                batch_part @ BatchPart::Inline {
626                                    updates,
627                                    ts_rewrite,
628                                    schema_id: _,
629                                    deprecated_schema_id: _,
630                                },
631                            ),
632                            Some((schema_cache, builder)),
633                        ) = (part, &mut inline_batch_builder)
634                        {
635                            let schema_migration = PartMigration::new(
636                                batch_part,
637                                self.write_schemas.clone(),
638                                schema_cache,
639                            )
640                            .await
641                            .expect("schemas for inline user part");
642
643                            let encoded_part = EncodedPart::from_inline(
644                                &crate::fetch::FetchConfig::from_persist_config(&self.cfg),
645                                &*self.metrics,
646                                self.metrics.read.compaction.clone(),
647                                desc.clone(),
648                                updates,
649                                ts_rewrite.as_ref(),
650                            );
651                            let mut fetched_part = FetchedPart::new(
652                                Arc::clone(&self.metrics),
653                                encoded_part,
654                                schema_migration,
655                                FetchBatchFilter::Compaction {
656                                    since: desc.since().clone(),
657                                },
658                                false,
659                                PartDecodeFormat::Arrow,
660                                None,
661                            );
662
663                            while let Some(((k, v), t, d)) =
664                                fetched_part.next_with_storage(&mut key_storage, &mut val_storage)
665                            {
666                                builder
667                                    .add(
668                                        &k.expect("decoded just-encoded key data"),
669                                        &v.expect("decoded just-encoded val data"),
670                                        &t,
671                                        &d,
672                                    )
673                                    .await
674                                    .expect("re-encoding just-decoded data");
675                            }
676                        } else {
677                            parts.push(part.clone())
678                        }
679                    }
680
681                    let end_index = parts.len();
682
683                    if start_index == end_index {
684                        continue;
685                    }
686
687                    // Mark the boundary if this is not the first run in the batch.
688                    if start_index != 0 {
689                        run_splits.push(start_index);
690                    }
691                    run_metas.push(run_meta.clone());
692                }
693                num_updates += batch.batch.len;
694            }
695
696            let mut flushed_inline_batch = if let Some((_, builder)) = inline_batch_builder.take() {
697                let mut finished = builder
698                    .finish(desc.upper().clone())
699                    .await
700                    .expect("invalid usage");
701                let cfg = BatchBuilderConfig::new(&self.cfg, self.shard_id());
702                finished
703                    .flush_to_blob(
704                        &cfg,
705                        &self.metrics.inline.backpressure,
706                        &self.isolated_runtime,
707                        &self.write_schemas,
708                    )
709                    .await;
710                Some(finished)
711            } else {
712                None
713            };
714
715            if let Some(batch) = &flushed_inline_batch {
716                for (run_meta, run) in batch.batch.runs() {
717                    assert!(run.len() > 0);
718                    let start_index = parts.len();
719                    if start_index != 0 {
720                        run_splits.push(start_index);
721                    }
722                    run_metas.push(run_meta.clone());
723                    parts.extend(run.iter().cloned())
724                }
725            }
726
727            let mut combined_batch =
728                HollowBatch::new(desc.clone(), parts, num_updates, run_metas, run_splits);
729
730            // The batch may have been written by a writer without a registered schema.
731            // Ensure we have a schema ID in the batch metadata before we append, to avoid type
732            // confusion later.
733            match schema_id {
734                Some(schema_id) => {
735                    ensure_batch_schema(&mut combined_batch, self.shard_id(), schema_id);
736                }
737                None => {
738                    assert!(
739                        self.fetch_recent_upper().await.is_empty(),
740                        "fetching a schema id should only fail when the shard is tombstoned"
741                    )
742                }
743            }
744
745            let heartbeat_timestamp = (self.cfg.now)();
746            let res = self
747                .machine
748                .compare_and_append(
749                    &combined_batch,
750                    &self.writer_id,
751                    &self.debug_state,
752                    heartbeat_timestamp,
753                )
754                .await;
755
756            match res {
757                CompareAndAppendRes::Success(_seqno, maintenance) => {
758                    self.upper.clone_from(desc.upper());
759                    for batch in batches.iter_mut() {
760                        batch.mark_consumed();
761                    }
762                    if let Some(batch) = &mut flushed_inline_batch {
763                        batch.mark_consumed();
764                    }
765                    break maintenance;
766                }
767                CompareAndAppendRes::InvalidUsage(invalid_usage) => {
768                    if let Some(batch) = flushed_inline_batch.take() {
769                        batch.delete().await;
770                    }
771                    return Err(invalid_usage);
772                }
773                CompareAndAppendRes::UpperMismatch(_seqno, current_upper) => {
774                    if let Some(batch) = flushed_inline_batch.take() {
775                        batch.delete().await;
776                    }
777                    // We tried to to a compare_and_append with the wrong expected upper, that
778                    // won't work. Update the cached upper to the current upper.
779                    self.upper.clone_from(&current_upper);
780                    return Ok(Err(UpperMismatch {
781                        current: current_upper,
782                        expected: expected_upper,
783                    }));
784                }
785                CompareAndAppendRes::InlineBackpressure => {
786                    // We tried to write an inline part, but there was already
787                    // too much in state. Flush it out to s3 and try again.
788                    assert_eq!(received_inline_backpressure, false);
789                    received_inline_backpressure = true;
790                    if COMBINE_INLINE_WRITES.get(&self.cfg) {
791                        inline_batch_builder = Some((
792                            self.machine.applier.schema_cache(),
793                            self.builder(desc.lower().clone()),
794                        ));
795                        continue;
796                    }
797
798                    let cfg = BatchBuilderConfig::new(&self.cfg, self.shard_id());
799                    // We could have a large number of inline parts (imagine the
800                    // sharded persist_sink), do this flushing concurrently.
801                    let flush_batches = batches
802                        .iter_mut()
803                        .map(|batch| async {
804                            batch
805                                .flush_to_blob(
806                                    &cfg,
807                                    &self.metrics.inline.backpressure,
808                                    &self.isolated_runtime,
809                                    &self.write_schemas,
810                                )
811                                .await
812                        })
813                        .collect::<FuturesUnordered<_>>();
814                    let () = flush_batches.collect::<()>().await;
815
816                    for batch in batches.iter() {
817                        assert_eq!(batch.batch.inline_bytes(), 0);
818                    }
819
820                    continue;
821                }
822            }
823        };
824
825        maintenance.start_performing(&self.machine, &self.gc, self.compact.as_ref());
826
827        Ok(Ok(()))
828    }
829
830    /// Turns the given [`ProtoBatch`] back into a [`Batch`] which can be used
831    /// to append it to this shard.
832    pub fn batch_from_transmittable_batch(&self, batch: ProtoBatch) -> Batch<K, V, T, D> {
833        let shard_id: ShardId = batch
834            .shard_id
835            .into_rust()
836            .expect("valid transmittable batch");
837        assert_eq!(shard_id, self.machine.shard_id());
838
839        let ret = Batch {
840            batch_delete_enabled: BATCH_DELETE_ENABLED.get(&self.cfg),
841            metrics: Arc::clone(&self.metrics),
842            shard_metrics: Arc::clone(&self.machine.applier.shard_metrics),
843            version: Version::parse(&batch.version).expect("valid transmittable batch"),
844            schemas: (batch.key_schema, batch.val_schema),
845            batch: batch
846                .batch
847                .into_rust_if_some("ProtoBatch::batch")
848                .expect("valid transmittable batch"),
849            blob: Arc::clone(&self.blob),
850            _phantom: std::marker::PhantomData,
851        };
852        assert_eq!(ret.shard_id(), self.machine.shard_id());
853        ret
854    }
855
856    /// Returns a [BatchBuilder] that can be used to write a batch of updates to
857    /// blob storage which can then be appended to this shard using
858    /// [Self::compare_and_append_batch] or [Self::append_batch].
859    ///
860    /// It is correct to create an empty batch, which allows for downgrading
861    /// `upper` to communicate progress. (see [Self::compare_and_append_batch]
862    /// or [Self::append_batch])
863    ///
864    /// The builder uses a bounded amount of memory, even when the number of
865    /// updates is very large. Individual records, however, should be small
866    /// enough that we can reasonably chunk them up: O(KB) is definitely fine,
867    /// O(MB) come talk to us.
868    pub fn builder(&self, lower: Antichain<T>) -> BatchBuilder<K, V, T, D> {
869        Self::builder_inner(
870            &self.cfg,
871            CompactConfig::new(&self.cfg, self.shard_id()),
872            Arc::clone(&self.metrics),
873            Arc::clone(&self.machine.applier.shard_metrics),
874            &self.metrics.user,
875            Arc::clone(&self.isolated_runtime),
876            Arc::clone(&self.blob),
877            self.shard_id(),
878            self.write_schemas.clone(),
879            lower,
880        )
881    }
882
883    /// Implementation of [Self::builder], so that we can share the
884    /// implementation in `PersistClient`.
885    pub(crate) fn builder_inner(
886        persist_cfg: &PersistConfig,
887        compact_cfg: CompactConfig,
888        metrics: Arc<Metrics>,
889        shard_metrics: Arc<ShardMetrics>,
890        user_batch_metrics: &BatchWriteMetrics,
891        isolated_runtime: Arc<IsolatedRuntime>,
892        blob: Arc<dyn Blob>,
893        shard_id: ShardId,
894        schemas: Schemas<K, V>,
895        lower: Antichain<T>,
896    ) -> BatchBuilder<K, V, T, D> {
897        let parts = if let Some(max_runs) = compact_cfg.batch.max_runs {
898            BatchParts::new_compacting::<K, V, D>(
899                compact_cfg,
900                Description::new(
901                    lower.clone(),
902                    Antichain::new(),
903                    Antichain::from_elem(T::minimum()),
904                ),
905                max_runs,
906                Arc::clone(&metrics),
907                shard_metrics,
908                shard_id,
909                Arc::clone(&blob),
910                isolated_runtime,
911                user_batch_metrics,
912                schemas.clone(),
913            )
914        } else {
915            BatchParts::new_ordered::<D>(
916                compact_cfg.batch,
917                RunOrder::Unordered,
918                Arc::clone(&metrics),
919                shard_metrics,
920                shard_id,
921                Arc::clone(&blob),
922                isolated_runtime,
923                user_batch_metrics,
924            )
925        };
926        let builder = BatchBuilderInternal::new(
927            BatchBuilderConfig::new(persist_cfg, shard_id),
928            parts,
929            metrics,
930            schemas,
931            blob,
932            shard_id,
933            persist_cfg.build_version.clone(),
934        );
935        BatchBuilder::new(
936            builder,
937            Description::new(lower, Antichain::new(), Antichain::from_elem(T::minimum())),
938        )
939    }
940
941    /// Uploads the given `updates` as one `Batch` to the blob store and returns
942    /// a handle to the batch.
943    #[instrument(level = "trace", fields(shard = %self.machine.shard_id()))]
944    pub async fn batch<SB, KB, VB, TB, DB, I>(
945        &mut self,
946        updates: I,
947        lower: Antichain<T>,
948        upper: Antichain<T>,
949    ) -> Result<Batch<K, V, T, D>, InvalidUsage<T>>
950    where
951        SB: Borrow<((KB, VB), TB, DB)>,
952        KB: Borrow<K>,
953        VB: Borrow<V>,
954        TB: Borrow<T>,
955        DB: Borrow<D>,
956        I: IntoIterator<Item = SB>,
957    {
958        let iter = updates.into_iter();
959
960        let mut builder = self.builder(lower.clone());
961
962        for update in iter {
963            let ((k, v), t, d) = update.borrow();
964            let (k, v, t, d) = (k.borrow(), v.borrow(), t.borrow(), d.borrow());
965            match builder.add(k, v, t, d).await {
966                Ok(Added::Record | Added::RecordAndParts) => (),
967                Err(invalid_usage) => return Err(invalid_usage),
968            }
969        }
970
971        builder.finish(upper.clone()).await
972    }
973
974    /// Blocks until the given `frontier` is less than the upper of the shard.
975    pub async fn wait_for_upper_past(&mut self, frontier: &Antichain<T>) {
976        let mut watch = self.machine.applier.watch();
977        let batch = self
978            .machine
979            .next_listen_batch(frontier, &mut watch, None, None)
980            .await;
981        if PartialOrder::less_than(&self.upper, batch.desc.upper()) {
982            self.upper.clone_from(batch.desc.upper());
983        }
984        assert!(PartialOrder::less_than(frontier, &self.upper));
985    }
986
987    /// Politely expires this writer, releasing any associated state.
988    ///
989    /// There is a best-effort impl in Drop to expire a writer that wasn't
990    /// explictly expired with this method. When possible, explicit expiry is
991    /// still preferred because the Drop one is best effort and is dependant on
992    /// a tokio [Handle] being available in the TLC at the time of drop (which
993    /// is a bit subtle). Also, explicit expiry allows for control over when it
994    /// happens.
995    #[instrument(level = "debug", fields(shard = %self.machine.shard_id()))]
996    pub async fn expire(mut self) {
997        let Some(expire_fn) = self.expire_fn.take() else {
998            return;
999        };
1000        expire_fn.0().await;
1001    }
1002
1003    fn expire_fn(
1004        machine: Machine<K, V, T, D>,
1005        gc: GarbageCollector<K, V, T, D>,
1006        writer_id: WriterId,
1007    ) -> ExpireFn {
1008        ExpireFn(Box::new(move || {
1009            Box::pin(async move {
1010                let (_, maintenance) = machine.expire_writer(&writer_id).await;
1011                maintenance.start_performing(&machine, &gc);
1012            })
1013        }))
1014    }
1015
1016    /// Test helper for an [Self::append] call that is expected to succeed.
1017    #[cfg(test)]
1018    #[track_caller]
1019    pub async fn expect_append<L, U>(&mut self, updates: &[((K, V), T, D)], lower: L, new_upper: U)
1020    where
1021        L: Into<Antichain<T>>,
1022        U: Into<Antichain<T>>,
1023        D: Send + Sync,
1024    {
1025        self.append(updates.iter(), lower.into(), new_upper.into())
1026            .await
1027            .expect("invalid usage")
1028            .expect("unexpected upper");
1029    }
1030
1031    /// Test helper for a [Self::compare_and_append] call that is expected to
1032    /// succeed.
1033    #[cfg(test)]
1034    #[track_caller]
1035    pub async fn expect_compare_and_append(
1036        &mut self,
1037        updates: &[((K, V), T, D)],
1038        expected_upper: T,
1039        new_upper: T,
1040    ) where
1041        D: Send + Sync,
1042    {
1043        self.compare_and_append(
1044            updates.iter().map(|((k, v), t, d)| ((k, v), t, d)),
1045            Antichain::from_elem(expected_upper),
1046            Antichain::from_elem(new_upper),
1047        )
1048        .await
1049        .expect("invalid usage")
1050        .expect("unexpected upper")
1051    }
1052
1053    /// Test helper for a [Self::compare_and_append_batch] call that is expected
1054    /// to succeed.
1055    #[cfg(test)]
1056    #[track_caller]
1057    pub async fn expect_compare_and_append_batch(
1058        &mut self,
1059        batches: &mut [&mut Batch<K, V, T, D>],
1060        expected_upper: T,
1061        new_upper: T,
1062    ) {
1063        self.compare_and_append_batch(
1064            batches,
1065            Antichain::from_elem(expected_upper),
1066            Antichain::from_elem(new_upper),
1067            true,
1068        )
1069        .await
1070        .expect("invalid usage")
1071        .expect("unexpected upper")
1072    }
1073
1074    /// Test helper for an [Self::append] call that is expected to succeed.
1075    #[cfg(test)]
1076    #[track_caller]
1077    pub async fn expect_batch(
1078        &mut self,
1079        updates: &[((K, V), T, D)],
1080        lower: T,
1081        upper: T,
1082    ) -> Batch<K, V, T, D> {
1083        self.batch(
1084            updates.iter(),
1085            Antichain::from_elem(lower),
1086            Antichain::from_elem(upper),
1087        )
1088        .await
1089        .expect("invalid usage")
1090    }
1091}
1092
1093impl<K: Codec, V: Codec, T, D> Drop for WriteHandle<K, V, T, D> {
1094    fn drop(&mut self) {
1095        let Some(expire_fn) = self.expire_fn.take() else {
1096            return;
1097        };
1098        let handle = match Handle::try_current() {
1099            Ok(x) => x,
1100            Err(_) => {
1101                warn!(
1102                    "WriteHandle {} dropped without being explicitly expired, falling back to lease timeout",
1103                    self.writer_id
1104                );
1105                return;
1106            }
1107        };
1108        // Spawn a best-effort task to expire this write handle. It's fine if
1109        // this doesn't run to completion, we'd just have to wait out the lease
1110        // before the shard-global since is unblocked.
1111        //
1112        // Intentionally create the span outside the task to set the parent.
1113        let expire_span = debug_span!("drop::expire");
1114        handle.spawn_named(
1115            || format!("WriteHandle::expire ({})", self.writer_id),
1116            expire_fn.0().instrument(expire_span),
1117        );
1118    }
1119}
1120
1121/// Ensure the given batch uses the given schema ID.
1122///
1123/// If the batch has no schema set, initialize it to the given one.
1124/// If the batch has a schema set, assert that it matches the given one.
1125fn ensure_batch_schema<T>(batch: &mut HollowBatch<T>, shard_id: ShardId, schema_id: SchemaId)
1126where
1127    T: Timestamp + Lattice + Codec64,
1128{
1129    let ensure = |id: &mut Option<SchemaId>| match id {
1130        Some(id) => assert_eq!(*id, schema_id, "schema ID mismatch; shard={shard_id}"),
1131        None => *id = Some(schema_id),
1132    };
1133
1134    for run_meta in &mut batch.run_meta {
1135        ensure(&mut run_meta.schema);
1136    }
1137    for part in &mut batch.parts {
1138        match part {
1139            RunPart::Single(BatchPart::Hollow(part)) => ensure(&mut part.schema_id),
1140            RunPart::Single(BatchPart::Inline { schema_id, .. }) => ensure(schema_id),
1141            RunPart::Many(_hollow_run_ref) => {
1142                // TODO: Fetch the parts in this run and rewrite them too. Alternatively, make
1143                // `run_meta` the only place we keep schema IDs, so rewriting parts isn't
1144                // necessary.
1145            }
1146        }
1147    }
1148}
1149
1150#[cfg(test)]
1151mod tests {
1152    use std::str::FromStr;
1153    use std::sync::mpsc;
1154
1155    use differential_dataflow::consolidation::consolidate_updates;
1156    use futures_util::FutureExt;
1157    use mz_dyncfg::ConfigUpdates;
1158    use mz_ore::collections::CollectionExt;
1159    use mz_ore::task;
1160    use serde_json::json;
1161
1162    use crate::cache::PersistClientCache;
1163    use crate::tests::{all_ok, new_test_client};
1164    use crate::{PersistLocation, ShardId};
1165
1166    use super::*;
1167
1168    #[mz_persist_proc::test(tokio::test)]
1169    #[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented
1170    async fn empty_batches(dyncfgs: ConfigUpdates) {
1171        let data = [
1172            (("1".to_owned(), "one".to_owned()), 1, 1),
1173            (("2".to_owned(), "two".to_owned()), 2, 1),
1174            (("3".to_owned(), "three".to_owned()), 3, 1),
1175        ];
1176
1177        let (mut write, _) = new_test_client(&dyncfgs)
1178            .await
1179            .expect_open::<String, String, u64, i64>(ShardId::new())
1180            .await;
1181        let blob = Arc::clone(&write.blob);
1182
1183        // Write an initial batch.
1184        let mut upper = 3;
1185        write.expect_append(&data[..2], vec![0], vec![upper]).await;
1186
1187        // Write a bunch of empty batches. This shouldn't write blobs, so the count should stay the same.
1188        let mut count_before = 0;
1189        blob.list_keys_and_metadata("", &mut |_| {
1190            count_before += 1;
1191        })
1192        .await
1193        .expect("list_keys failed");
1194        for _ in 0..5 {
1195            let new_upper = upper + 1;
1196            write.expect_compare_and_append(&[], upper, new_upper).await;
1197            upper = new_upper;
1198        }
1199        let mut count_after = 0;
1200        blob.list_keys_and_metadata("", &mut |_| {
1201            count_after += 1;
1202        })
1203        .await
1204        .expect("list_keys failed");
1205        assert_eq!(count_after, count_before);
1206    }
1207
1208    #[mz_persist_proc::test(tokio::test)]
1209    #[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented
1210    async fn compare_and_append_batch_multi(dyncfgs: ConfigUpdates) {
1211        let data0 = vec![
1212            (("1".to_owned(), "one".to_owned()), 1, 1),
1213            (("2".to_owned(), "two".to_owned()), 2, 1),
1214            (("4".to_owned(), "four".to_owned()), 4, 1),
1215        ];
1216        let data1 = vec![
1217            (("1".to_owned(), "one".to_owned()), 1, 1),
1218            (("2".to_owned(), "two".to_owned()), 2, 1),
1219            (("3".to_owned(), "three".to_owned()), 3, 1),
1220        ];
1221
1222        let (mut write, mut read) = new_test_client(&dyncfgs)
1223            .await
1224            .expect_open::<String, String, u64, i64>(ShardId::new())
1225            .await;
1226
1227        let mut batch0 = write.expect_batch(&data0, 0, 5).await;
1228        let mut batch1 = write.expect_batch(&data1, 0, 4).await;
1229
1230        write
1231            .expect_compare_and_append_batch(&mut [&mut batch0, &mut batch1], 0, 4)
1232            .await;
1233
1234        let batch = write
1235            .machine
1236            .snapshot(&Antichain::from_elem(3))
1237            .await
1238            .expect("just wrote this")
1239            .into_element();
1240
1241        assert!(batch.runs().count() >= 2);
1242
1243        let expected = vec![
1244            (("1".to_owned(), "one".to_owned()), 1, 2),
1245            (("2".to_owned(), "two".to_owned()), 2, 2),
1246            (("3".to_owned(), "three".to_owned()), 3, 1),
1247        ];
1248        let mut actual = read.expect_snapshot_and_fetch(3).await;
1249        consolidate_updates(&mut actual);
1250        assert_eq!(actual, all_ok(&expected, 3));
1251    }
1252
1253    #[mz_ore::test]
1254    fn writer_id_human_readable_serde() {
1255        #[derive(Debug, Serialize, Deserialize)]
1256        struct Container {
1257            writer_id: WriterId,
1258        }
1259
1260        // roundtrip through json
1261        let id = WriterId::from_str("w00000000-1234-5678-0000-000000000000").expect("valid id");
1262        assert_eq!(
1263            id,
1264            serde_json::from_value(serde_json::to_value(id.clone()).expect("serializable"))
1265                .expect("deserializable")
1266        );
1267
1268        // deserialize a serialized string directly
1269        assert_eq!(
1270            id,
1271            serde_json::from_str("\"w00000000-1234-5678-0000-000000000000\"")
1272                .expect("deserializable")
1273        );
1274
1275        // roundtrip id through a container type
1276        let json = json!({ "writer_id": id });
1277        assert_eq!(
1278            "{\"writer_id\":\"w00000000-1234-5678-0000-000000000000\"}",
1279            &json.to_string()
1280        );
1281        let container: Container = serde_json::from_value(json).expect("deserializable");
1282        assert_eq!(container.writer_id, id);
1283    }
1284
1285    #[mz_persist_proc::test(tokio::test)]
1286    #[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented
1287    async fn hollow_batch_roundtrip(dyncfgs: ConfigUpdates) {
1288        let data = vec![
1289            (("1".to_owned(), "one".to_owned()), 1, 1),
1290            (("2".to_owned(), "two".to_owned()), 2, 1),
1291            (("3".to_owned(), "three".to_owned()), 3, 1),
1292        ];
1293
1294        let (mut write, mut read) = new_test_client(&dyncfgs)
1295            .await
1296            .expect_open::<String, String, u64, i64>(ShardId::new())
1297            .await;
1298
1299        // This test is a bit more complex than it should be. It would be easier
1300        // if we could just compare the rehydrated batch to the original batch.
1301        // But a) turning a batch into a hollow batch consumes it, and b) Batch
1302        // doesn't have Eq/PartialEq.
1303        let batch = write.expect_batch(&data, 0, 4).await;
1304        let hollow_batch = batch.into_transmittable_batch();
1305        let mut rehydrated_batch = write.batch_from_transmittable_batch(hollow_batch);
1306
1307        write
1308            .expect_compare_and_append_batch(&mut [&mut rehydrated_batch], 0, 4)
1309            .await;
1310
1311        let expected = vec![
1312            (("1".to_owned(), "one".to_owned()), 1, 1),
1313            (("2".to_owned(), "two".to_owned()), 2, 1),
1314            (("3".to_owned(), "three".to_owned()), 3, 1),
1315        ];
1316        let mut actual = read.expect_snapshot_and_fetch(3).await;
1317        consolidate_updates(&mut actual);
1318        assert_eq!(actual, all_ok(&expected, 3));
1319    }
1320
1321    #[mz_persist_proc::test(tokio::test)]
1322    #[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented
1323    async fn wait_for_upper_past(dyncfgs: ConfigUpdates) {
1324        let client = new_test_client(&dyncfgs).await;
1325        let (mut write, _) = client.expect_open::<(), (), u64, i64>(ShardId::new()).await;
1326        let five = Antichain::from_elem(5);
1327
1328        // Upper is not past 5.
1329        assert_eq!(write.wait_for_upper_past(&five).now_or_never(), None);
1330
1331        // Upper is still not past 5.
1332        write
1333            .expect_compare_and_append(&[(((), ()), 1, 1)], 0, 5)
1334            .await;
1335        assert_eq!(write.wait_for_upper_past(&five).now_or_never(), None);
1336
1337        // Upper is past 5.
1338        write
1339            .expect_compare_and_append(&[(((), ()), 5, 1)], 5, 7)
1340            .await;
1341        assert_eq!(write.wait_for_upper_past(&five).now_or_never(), Some(()));
1342        assert_eq!(write.upper(), &Antichain::from_elem(7));
1343
1344        // Waiting for previous uppers does not regress the handle's cached
1345        // upper.
1346        assert_eq!(
1347            write
1348                .wait_for_upper_past(&Antichain::from_elem(2))
1349                .now_or_never(),
1350            Some(())
1351        );
1352        assert_eq!(write.upper(), &Antichain::from_elem(7));
1353    }
1354
1355    #[mz_ore::test(tokio::test(flavor = "multi_thread"))]
1356    #[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented
1357    async fn fetch_recent_upper_linearized() {
1358        type Timestamp = u64;
1359        let max_upper = 1000;
1360
1361        let shard_id = ShardId::new();
1362        let mut clients = PersistClientCache::new_no_metrics();
1363        let upper_writer_client = clients.open(PersistLocation::new_in_mem()).await.unwrap();
1364        let (mut upper_writer, _) = upper_writer_client
1365            .expect_open::<(), (), Timestamp, i64>(shard_id)
1366            .await;
1367        // Clear the state cache between each client to maximally disconnect
1368        // them from each other.
1369        clients.clear_state_cache();
1370        let upper_reader_client = clients.open(PersistLocation::new_in_mem()).await.unwrap();
1371        let (mut upper_reader, _) = upper_reader_client
1372            .expect_open::<(), (), Timestamp, i64>(shard_id)
1373            .await;
1374        let (tx, rx) = mpsc::channel();
1375
1376        let task = task::spawn(|| "upper-reader", async move {
1377            let mut upper = Timestamp::MIN;
1378
1379            while upper < max_upper {
1380                while let Ok(new_upper) = rx.try_recv() {
1381                    upper = new_upper;
1382                }
1383
1384                let recent_upper = upper_reader
1385                    .fetch_recent_upper()
1386                    .await
1387                    .as_option()
1388                    .cloned()
1389                    .expect("u64 is totally ordered and the shard is not finalized");
1390                assert!(
1391                    recent_upper >= upper,
1392                    "recent upper {recent_upper:?} is less than known upper {upper:?}"
1393                );
1394            }
1395        });
1396
1397        for upper in Timestamp::MIN..max_upper {
1398            let next_upper = upper + 1;
1399            upper_writer
1400                .expect_compare_and_append(&[], upper, next_upper)
1401                .await;
1402            tx.send(next_upper).expect("send failed");
1403        }
1404
1405        task.await;
1406    }
1407}