Skip to main content

mz_persist_client/
write.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Write capabilities and handles
11
12use std::borrow::Borrow;
13use std::fmt::Debug;
14use std::sync::Arc;
15
16use differential_dataflow::difference::Monoid;
17use differential_dataflow::lattice::Lattice;
18use differential_dataflow::trace::Description;
19use futures::StreamExt;
20use futures::stream::FuturesUnordered;
21use mz_dyncfg::Config;
22use mz_ore::task::RuntimeExt;
23use mz_ore::{instrument, soft_panic_or_log};
24use mz_persist::location::Blob;
25use mz_persist_types::schema::SchemaId;
26use mz_persist_types::{Codec, Codec64};
27use mz_proto::{IntoRustIfSome, ProtoType};
28use proptest_derive::Arbitrary;
29use semver::Version;
30use serde::{Deserialize, Serialize};
31use timely::PartialOrder;
32use timely::order::TotalOrder;
33use timely::progress::{Antichain, Timestamp};
34use tokio::runtime::Handle;
35use tracing::{Instrument, debug_span, error, info, warn};
36use uuid::Uuid;
37
38use crate::batch::{
39    Added, BATCH_DELETE_ENABLED, Batch, BatchBuilder, BatchBuilderConfig, BatchBuilderInternal,
40    BatchParts, ProtoBatch, validate_truncate_batch,
41};
42use crate::error::{InvalidUsage, UpperMismatch};
43use crate::fetch::{
44    EncodedPart, FetchBatchFilter, FetchedPart, PartDecodeFormat, VALIDATE_PART_BOUNDS_ON_READ,
45};
46use crate::internal::compact::{CompactConfig, Compactor};
47use crate::internal::encoding::{Schemas, assert_code_can_read_data};
48use crate::internal::machine::{
49    CompareAndAppendRes, ExpireFn, Machine, next_listen_batch_retry_params,
50};
51use crate::internal::metrics::{BatchWriteMetrics, Metrics, ShardMetrics};
52use crate::internal::state::{BatchPart, HandleDebugState, HollowBatch, RunOrder, RunPart};
53use crate::read::ReadHandle;
54use crate::schema::PartMigration;
55use crate::{GarbageCollector, IsolatedRuntime, PersistConfig, ShardId, parse_id};
56
57pub(crate) const COMBINE_INLINE_WRITES: Config<bool> = Config::new(
58    "persist_write_combine_inline_writes",
59    true,
60    "If set, re-encode inline writes if they don't fit into the batch metadata limits.",
61);
62
63pub(crate) const VALIDATE_PART_BOUNDS_ON_WRITE: Config<bool> = Config::new(
64    "persist_validate_part_bounds_on_write",
65    false,
66    "Validate the part lower <= the batch lower and the part upper <= batch upper,\
67    for the batch being appended.",
68);
69
70/// An opaque identifier for a writer of a persist durable TVC (aka shard).
71#[derive(
72    Arbitrary,
73    Clone,
74    PartialEq,
75    Eq,
76    PartialOrd,
77    Ord,
78    Hash,
79    Serialize,
80    Deserialize
81)]
82#[serde(try_from = "String", into = "String")]
83pub struct WriterId(pub(crate) [u8; 16]);
84
85impl std::fmt::Display for WriterId {
86    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
87        write!(f, "w{}", Uuid::from_bytes(self.0))
88    }
89}
90
91impl std::fmt::Debug for WriterId {
92    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
93        write!(f, "WriterId({})", Uuid::from_bytes(self.0))
94    }
95}
96
97impl std::str::FromStr for WriterId {
98    type Err = String;
99
100    fn from_str(s: &str) -> Result<Self, Self::Err> {
101        parse_id("w", "WriterId", s).map(WriterId)
102    }
103}
104
105impl From<WriterId> for String {
106    fn from(writer_id: WriterId) -> Self {
107        writer_id.to_string()
108    }
109}
110
111impl TryFrom<String> for WriterId {
112    type Error = String;
113
114    fn try_from(s: String) -> Result<Self, Self::Error> {
115        s.parse()
116    }
117}
118
119impl WriterId {
120    pub(crate) fn new() -> Self {
121        WriterId(*Uuid::new_v4().as_bytes())
122    }
123}
124
125/// A "capability" granting the ability to apply updates to some shard at times
126/// greater or equal to `self.upper()`.
127///
128/// All async methods on ReadHandle retry for as long as they are able, but the
129/// returned [std::future::Future]s implement "cancel on drop" semantics. This
130/// means that callers can add a timeout using [tokio::time::timeout] or
131/// [tokio::time::timeout_at].
132///
133/// ```rust,no_run
134/// # let mut write: mz_persist_client::write::WriteHandle<String, String, u64, i64> = unimplemented!();
135/// # let timeout: std::time::Duration = unimplemented!();
136/// # async {
137/// tokio::time::timeout(timeout, write.fetch_recent_upper()).await
138/// # };
139/// ```
140#[derive(Debug)]
141pub struct WriteHandle<K: Codec, V: Codec, T, D> {
142    pub(crate) cfg: PersistConfig,
143    pub(crate) metrics: Arc<Metrics>,
144    pub(crate) machine: Machine<K, V, T, D>,
145    pub(crate) gc: GarbageCollector<K, V, T, D>,
146    pub(crate) compact: Option<Compactor<K, V, T, D>>,
147    pub(crate) blob: Arc<dyn Blob>,
148    pub(crate) isolated_runtime: Arc<IsolatedRuntime>,
149    pub(crate) writer_id: WriterId,
150    pub(crate) debug_state: HandleDebugState,
151    pub(crate) write_schemas: Schemas<K, V>,
152
153    pub(crate) upper: Antichain<T>,
154    expire_fn: Option<ExpireFn>,
155}
156
157impl<K, V, T, D> WriteHandle<K, V, T, D>
158where
159    K: Debug + Codec,
160    V: Debug + Codec,
161    T: Timestamp + TotalOrder + Lattice + Codec64 + Sync,
162    D: Monoid + Ord + Codec64 + Send + Sync,
163{
164    pub(crate) fn new(
165        cfg: PersistConfig,
166        metrics: Arc<Metrics>,
167        machine: Machine<K, V, T, D>,
168        gc: GarbageCollector<K, V, T, D>,
169        blob: Arc<dyn Blob>,
170        writer_id: WriterId,
171        purpose: &str,
172        write_schemas: Schemas<K, V>,
173    ) -> Self {
174        let isolated_runtime = Arc::clone(&machine.isolated_runtime);
175        let compact = cfg
176            .compaction_enabled
177            .then(|| Compactor::new(cfg.clone(), Arc::clone(&metrics), gc.clone()));
178        let debug_state = HandleDebugState {
179            hostname: cfg.hostname.to_owned(),
180            purpose: purpose.to_owned(),
181        };
182        let upper = machine.applier.clone_upper();
183        let expire_fn = Self::expire_fn(machine.clone(), gc.clone(), writer_id.clone());
184        WriteHandle {
185            cfg,
186            metrics,
187            machine,
188            gc,
189            compact,
190            blob,
191            isolated_runtime,
192            writer_id,
193            debug_state,
194            write_schemas,
195            upper,
196            expire_fn: Some(expire_fn),
197        }
198    }
199
200    /// Creates a [WriteHandle] for the same shard from an existing
201    /// [ReadHandle].
202    pub fn from_read(read: &ReadHandle<K, V, T, D>, purpose: &str) -> Self {
203        Self::new(
204            read.cfg.clone(),
205            Arc::clone(&read.metrics),
206            read.machine.clone(),
207            read.gc.clone(),
208            Arc::clone(&read.blob),
209            WriterId::new(),
210            purpose,
211            read.read_schemas.clone(),
212        )
213    }
214
215    /// True iff this WriteHandle supports writing without enforcing batch
216    /// bounds checks.
217    pub fn validate_part_bounds_on_write(&self) -> bool {
218        // Note that we require validation when the read checks are enabled, even if the write-time
219        // checks would otherwise be disabled, to avoid batches that would fail at read time.
220        VALIDATE_PART_BOUNDS_ON_WRITE.get(&self.cfg) || VALIDATE_PART_BOUNDS_ON_READ.get(&self.cfg)
221    }
222
223    /// This handle's shard id.
224    pub fn shard_id(&self) -> ShardId {
225        self.machine.shard_id()
226    }
227
228    /// Returns the schema of this writer.
229    pub fn schema_id(&self) -> Option<SchemaId> {
230        self.write_schemas.id
231    }
232
233    /// Registers the write schema, if it isn't already registered.
234    ///
235    /// This method expects that either the shard doesn't yet have any schema registered, or one of
236    /// the registered schemas is the same as the write schema. If all registered schemas are
237    /// different from the write schema, or the shard is a tombstone, it returns `None`.
238    pub async fn try_register_schema(&mut self) -> Option<SchemaId> {
239        let Schemas { id, key, val } = &self.write_schemas;
240
241        if let Some(id) = id {
242            return Some(*id);
243        }
244
245        let (schema_id, maintenance) = self.machine.register_schema(key, val).await;
246        maintenance.start_performing(&self.machine, &self.gc);
247
248        self.write_schemas.id = schema_id;
249        schema_id
250    }
251
252    /// A cached version of the shard-global `upper` frontier.
253    ///
254    /// This is the most recent upper discovered by this handle. It is
255    /// potentially more stale than [Self::shared_upper] but is lock-free and
256    /// allocation-free. This will always be less or equal to the shard-global
257    /// `upper`.
258    pub fn upper(&self) -> &Antichain<T> {
259        &self.upper
260    }
261
262    /// A less-stale cached version of the shard-global `upper` frontier.
263    ///
264    /// This is the most recently known upper for this shard process-wide, but
265    /// unlike [Self::upper] it requires a mutex and a clone. This will always be
266    /// less or equal to the shard-global `upper`.
267    pub fn shared_upper(&self) -> Antichain<T> {
268        self.machine.applier.clone_upper()
269    }
270
271    /// Fetches and returns a recent shard-global `upper`. Importantly, this operation is
272    /// linearized with write operations.
273    ///
274    /// This requires fetching the latest state from consensus and is therefore a potentially
275    /// expensive operation.
276    #[instrument(level = "debug", fields(shard = %self.machine.shard_id()))]
277    pub async fn fetch_recent_upper(&mut self) -> &Antichain<T> {
278        // TODO: Do we even need to track self.upper on WriteHandle or could
279        // WriteHandle::upper just get the one out of machine?
280        self.machine
281            .applier
282            .fetch_upper(|current_upper| self.upper.clone_from(current_upper))
283            .await;
284        &self.upper
285    }
286
287    /// Advance the shard's upper by the given frontier.
288    ///
289    /// If the provided `target` is less than or equal to the shard's upper, this is a no-op.
290    ///
291    /// In contrast to the various compare-and-append methods, this method does not require the
292    /// handle's write schema to be registered with the shard. That is, it is fine to use a dummy
293    /// schema when creating a writer just to advance a shard upper.
294    pub async fn advance_upper(&mut self, target: &Antichain<T>) {
295        // We avoid `fetch_recent_upper` here, to avoid a consensus roundtrip if the known upper is
296        // already beyond the target.
297        let mut lower = self.shared_upper().clone();
298
299        while !PartialOrder::less_equal(target, &lower) {
300            let since = Antichain::from_elem(T::minimum());
301            let desc = Description::new(lower.clone(), target.clone(), since);
302            let batch = HollowBatch::empty(desc);
303
304            let heartbeat_timestamp = (self.cfg.now)();
305            let res = self
306                .machine
307                .compare_and_append(
308                    &batch,
309                    &self.writer_id,
310                    &self.debug_state,
311                    heartbeat_timestamp,
312                )
313                .await;
314
315            use CompareAndAppendRes::*;
316            let new_upper = match res {
317                Success(_seq_no, maintenance) => {
318                    maintenance.start_performing(&self.machine, &self.gc, self.compact.as_ref());
319                    batch.desc.upper().clone()
320                }
321                UpperMismatch(_seq_no, actual_upper) => actual_upper,
322                InvalidUsage(_invalid_usage) => unreachable!("batch bounds checked above"),
323                InlineBackpressure => unreachable!("batch was empty"),
324            };
325
326            self.upper.clone_from(&new_upper);
327            lower = new_upper;
328        }
329    }
330
331    /// Applies `updates` to this shard and downgrades this handle's upper to
332    /// `upper`.
333    ///
334    /// The innermost `Result` is `Ok` if the updates were successfully written.
335    /// If not, an `Upper` err containing the current writer upper is returned.
336    /// If that happens, we also update our local `upper` to match the current
337    /// upper. This is useful in cases where a timeout happens in between a
338    /// successful write and returning that to the client.
339    ///
340    /// In contrast to [Self::compare_and_append], multiple [WriteHandle]s may
341    /// be used concurrently to write to the same shard, but in this case, the
342    /// data being written must be identical (in the sense of "definite"-ness).
343    /// It's intended for replicated use by source ingestion, sinks, etc.
344    ///
345    /// All times in `updates` must be greater or equal to `lower` and not
346    /// greater or equal to `upper`. A `upper` of the empty antichain "finishes"
347    /// this shard, promising that no more data is ever incoming.
348    ///
349    /// `updates` may be empty, which allows for downgrading `upper` to
350    /// communicate progress. It is possible to call this with `upper` equal to
351    /// `self.upper()` and an empty `updates` (making the call a no-op).
352    ///
353    /// This uses a bounded amount of memory, even when `updates` is very large.
354    /// Individual records, however, should be small enough that we can
355    /// reasonably chunk them up: O(KB) is definitely fine, O(MB) come talk to
356    /// us.
357    ///
358    /// The clunky multi-level Result is to enable more obvious error handling
359    /// in the caller. See <http://sled.rs/errors.html> for details.
360    #[instrument(level = "trace", fields(shard = %self.machine.shard_id()))]
361    pub async fn append<SB, KB, VB, TB, DB, I>(
362        &mut self,
363        updates: I,
364        lower: Antichain<T>,
365        upper: Antichain<T>,
366    ) -> Result<Result<(), UpperMismatch<T>>, InvalidUsage<T>>
367    where
368        SB: Borrow<((KB, VB), TB, DB)>,
369        KB: Borrow<K>,
370        VB: Borrow<V>,
371        TB: Borrow<T>,
372        DB: Borrow<D>,
373        I: IntoIterator<Item = SB>,
374        D: Send + Sync,
375    {
376        let batch = self.batch(updates, lower.clone(), upper.clone()).await?;
377        self.append_batch(batch, lower, upper).await
378    }
379
380    /// Applies `updates` to this shard and downgrades this handle's upper to
381    /// `new_upper` iff the current global upper of this shard is
382    /// `expected_upper`.
383    ///
384    /// The innermost `Result` is `Ok` if the updates were successfully written.
385    /// If not, an `Upper` err containing the current global upper is returned.
386    ///
387    /// In contrast to [Self::append], this linearizes mutations from all
388    /// writers. It's intended for use as an atomic primitive for timestamp
389    /// bindings, SQL tables, etc.
390    ///
391    /// All times in `updates` must be greater or equal to `expected_upper` and
392    /// not greater or equal to `new_upper`. A `new_upper` of the empty
393    /// antichain "finishes" this shard, promising that no more data is ever
394    /// incoming.
395    ///
396    /// `updates` may be empty, which allows for downgrading `upper` to
397    /// communicate progress. It is possible to heartbeat a writer lease by
398    /// calling this with `new_upper` equal to `self.upper()` and an empty
399    /// `updates` (making the call a no-op).
400    ///
401    /// This uses a bounded amount of memory, even when `updates` is very large.
402    /// Individual records, however, should be small enough that we can
403    /// reasonably chunk them up: O(KB) is definitely fine, O(MB) come talk to
404    /// us.
405    ///
406    /// The clunky multi-level Result is to enable more obvious error handling
407    /// in the caller. See <http://sled.rs/errors.html> for details.
408    #[instrument(level = "trace", fields(shard = %self.machine.shard_id()))]
409    pub async fn compare_and_append<SB, KB, VB, TB, DB, I>(
410        &mut self,
411        updates: I,
412        expected_upper: Antichain<T>,
413        new_upper: Antichain<T>,
414    ) -> Result<Result<(), UpperMismatch<T>>, InvalidUsage<T>>
415    where
416        SB: Borrow<((KB, VB), TB, DB)>,
417        KB: Borrow<K>,
418        VB: Borrow<V>,
419        TB: Borrow<T>,
420        DB: Borrow<D>,
421        I: IntoIterator<Item = SB>,
422        D: Send + Sync,
423    {
424        let mut batch = self
425            .batch(updates, expected_upper.clone(), new_upper.clone())
426            .await?;
427        match self
428            .compare_and_append_batch(&mut [&mut batch], expected_upper, new_upper, true)
429            .await
430        {
431            ok @ Ok(Ok(())) => ok,
432            err => {
433                // We cannot delete the batch in compare_and_append_batch()
434                // because the caller owns the batch and might want to retry
435                // with a different `expected_upper`. In this function, we
436                // control the batch, so we have to delete it.
437                batch.delete().await;
438                err
439            }
440        }
441    }
442
443    /// Appends the batch of updates to the shard and downgrades this handle's
444    /// upper to `upper`.
445    ///
446    /// The innermost `Result` is `Ok` if the updates were successfully written.
447    /// If not, an `Upper` err containing the current writer upper is returned.
448    /// If that happens, we also update our local `upper` to match the current
449    /// upper. This is useful in cases where a timeout happens in between a
450    /// successful write and returning that to the client.
451    ///
452    /// In contrast to [Self::compare_and_append_batch], multiple [WriteHandle]s
453    /// may be used concurrently to write to the same shard, but in this case,
454    /// the data being written must be identical (in the sense of
455    /// "definite"-ness). It's intended for replicated use by source ingestion,
456    /// sinks, etc.
457    ///
458    /// A `upper` of the empty antichain "finishes" this shard, promising that
459    /// no more data is ever incoming.
460    ///
461    /// The batch may be empty, which allows for downgrading `upper` to
462    /// communicate progress. It is possible to heartbeat a writer lease by
463    /// calling this with `upper` equal to `self.upper()` and an empty `updates`
464    /// (making the call a no-op).
465    ///
466    /// The clunky multi-level Result is to enable more obvious error handling
467    /// in the caller. See <http://sled.rs/errors.html> for details.
468    #[instrument(level = "trace", fields(shard = %self.machine.shard_id()))]
469    pub async fn append_batch(
470        &mut self,
471        mut batch: Batch<K, V, T, D>,
472        mut lower: Antichain<T>,
473        upper: Antichain<T>,
474    ) -> Result<Result<(), UpperMismatch<T>>, InvalidUsage<T>>
475    where
476        D: Send + Sync,
477    {
478        loop {
479            let res = self
480                .compare_and_append_batch(&mut [&mut batch], lower.clone(), upper.clone(), true)
481                .await?;
482            match res {
483                Ok(()) => {
484                    self.upper = upper;
485                    return Ok(Ok(()));
486                }
487                Err(mismatch) => {
488                    // We tried to to a non-contiguous append, that won't work.
489                    if PartialOrder::less_than(&mismatch.current, &lower) {
490                        self.upper.clone_from(&mismatch.current);
491
492                        batch.delete().await;
493
494                        return Ok(Err(mismatch));
495                    } else if PartialOrder::less_than(&mismatch.current, &upper) {
496                        // Cut down the Description by advancing its lower to the current shard
497                        // upper and try again. IMPORTANT: We can only advance the lower, meaning
498                        // we cut updates away, we must not "extend" the batch by changing to a
499                        // lower that is not beyond the current lower. This invariant is checked by
500                        // the first if branch: if `!(current_upper < lower)` then it holds that
501                        // `lower <= current_upper`.
502                        lower = mismatch.current;
503                    } else {
504                        // We already have updates past this batch's upper, the append is a no-op.
505                        self.upper = mismatch.current;
506
507                        // Because we return a success result, the caller will
508                        // think that the batch was consumed or otherwise used,
509                        // so we have to delete it here.
510                        batch.delete().await;
511
512                        return Ok(Ok(()));
513                    }
514                }
515            }
516        }
517    }
518
519    /// Appends the batch of updates to the shard and downgrades this handle's
520    /// upper to `new_upper` iff the current global upper of this shard is
521    /// `expected_upper`.
522    ///
523    /// The innermost `Result` is `Ok` if the batch was successfully written. If
524    /// not, an `Upper` err containing the current global upper is returned.
525    ///
526    /// In contrast to [Self::append_batch], this linearizes mutations from all
527    /// writers. It's intended for use as an atomic primitive for timestamp
528    /// bindings, SQL tables, etc.
529    ///
530    /// A `new_upper` of the empty antichain "finishes" this shard, promising
531    /// that no more data is ever incoming.
532    ///
533    /// The batch may be empty, which allows for downgrading `upper` to
534    /// communicate progress. It is possible to heartbeat a writer lease by
535    /// calling this with `new_upper` equal to `self.upper()` and an empty
536    /// `updates` (making the call a no-op).
537    ///
538    /// IMPORTANT: In case of an erroneous result the caller is responsible for
539    /// the lifecycle of the `batch`. It can be deleted or it can be used to
540    /// retry with adjusted frontiers.
541    ///
542    /// The clunky multi-level Result is to enable more obvious error handling
543    /// in the caller. See <http://sled.rs/errors.html> for details.
544    ///
545    /// If the `enforce_matching_batch_boundaries` flag is set to `false`:
546    /// We no longer validate that every batch covers the entire range between
547    /// the expected and new uppers, as we wish to allow combining batches that
548    /// cover different subsets of that range, including subsets of that range
549    /// that include no data at all. The caller is responsible for guaranteeing
550    /// that the set of batches provided collectively include all updates for
551    /// the entire range between the expected and new upper.
552    #[instrument(level = "debug", fields(shard = %self.machine.shard_id()))]
553    pub async fn compare_and_append_batch(
554        &mut self,
555        batches: &mut [&mut Batch<K, V, T, D>],
556        expected_upper: Antichain<T>,
557        new_upper: Antichain<T>,
558        validate_part_bounds_on_write: bool,
559    ) -> Result<Result<(), UpperMismatch<T>>, InvalidUsage<T>>
560    where
561        D: Send + Sync,
562    {
563        // Before we append any data, we require a registered write schema.
564        // We expect the caller to ensure our schema is already present... unless this shard is a
565        // tombstone, in which case this write is either a noop or will fail gracefully.
566        let schema_id = self.try_register_schema().await;
567
568        for batch in batches.iter() {
569            if self.machine.shard_id() != batch.shard_id() {
570                return Err(InvalidUsage::BatchNotFromThisShard {
571                    batch_shard: batch.shard_id(),
572                    handle_shard: self.machine.shard_id(),
573                });
574            }
575            assert_code_can_read_data(&self.cfg.build_version, &batch.version);
576            if self.cfg.build_version > batch.version {
577                info!(
578                    shard_id =? self.machine.shard_id(),
579                    batch_version =? batch.version,
580                    writer_version =? self.cfg.build_version,
581                    "Appending batch from the past. This is fine but should be rare. \
582                    TODO: Error on very old versions once the leaked blob detector exists."
583                )
584            }
585            fn assert_schema<A: Codec>(writer_schema: &A::Schema, batch_schema: &bytes::Bytes) {
586                if batch_schema.is_empty() {
587                    // Schema is either trivial or missing!
588                    return;
589                }
590                let batch_schema: A::Schema = A::decode_schema(batch_schema);
591                if *writer_schema != batch_schema {
592                    error!(
593                        ?writer_schema,
594                        ?batch_schema,
595                        "writer and batch schemas should be identical"
596                    );
597                    soft_panic_or_log!("writer and batch schemas should be identical");
598                }
599            }
600            assert_schema::<K>(&*self.write_schemas.key, &batch.schemas.0);
601            assert_schema::<V>(&*self.write_schemas.val, &batch.schemas.1);
602        }
603
604        let lower = expected_upper.clone();
605        let upper = new_upper;
606        let since = Antichain::from_elem(T::minimum());
607        let desc = Description::new(lower, upper, since);
608
609        let mut received_inline_backpressure = false;
610        // Every hollow part must belong to some batch, so we can clean it up when the batch is dropped...
611        // but if we need to merge all our inline parts to a single run in S3, it's not correct to
612        // associate that with any of our individual input batches.
613        // At first, we'll try and put all the inline parts we receive into state... but if we
614        // get backpressured, we retry with this builder set to `Some`, put all our inline data into
615        // it, and ensure it's flushed out to S3 before including it in the batch.
616        let mut inline_batch_builder: Option<(_, BatchBuilder<K, V, T, D>)> = None;
617        let maintenance = loop {
618            let any_batch_rewrite = batches
619                .iter()
620                .any(|x| x.batch.parts.iter().any(|x| x.ts_rewrite().is_some()));
621            let (mut parts, mut num_updates, mut run_splits, mut run_metas) =
622                (vec![], 0, vec![], vec![]);
623            let mut key_storage = None;
624            let mut val_storage = None;
625            for batch in batches.iter() {
626                let () = validate_truncate_batch(
627                    &batch.batch,
628                    &desc,
629                    any_batch_rewrite,
630                    validate_part_bounds_on_write,
631                )?;
632                for (run_meta, run) in batch.batch.runs() {
633                    let start_index = parts.len();
634                    for part in run {
635                        if let (
636                            RunPart::Single(
637                                batch_part @ BatchPart::Inline {
638                                    updates,
639                                    ts_rewrite,
640                                    schema_id: _,
641                                    deprecated_schema_id: _,
642                                },
643                            ),
644                            Some((schema_cache, builder)),
645                        ) = (part, &mut inline_batch_builder)
646                        {
647                            let schema_migration = PartMigration::new(
648                                batch_part,
649                                self.write_schemas.clone(),
650                                schema_cache,
651                            )
652                            .await
653                            .expect("schemas for inline user part");
654
655                            let encoded_part = EncodedPart::from_inline(
656                                &crate::fetch::FetchConfig::from_persist_config(&self.cfg),
657                                &*self.metrics,
658                                self.metrics.read.compaction.clone(),
659                                desc.clone(),
660                                updates,
661                                ts_rewrite.as_ref(),
662                            );
663                            let mut fetched_part = FetchedPart::new(
664                                Arc::clone(&self.metrics),
665                                encoded_part,
666                                schema_migration,
667                                FetchBatchFilter::Compaction {
668                                    since: desc.since().clone(),
669                                },
670                                false,
671                                PartDecodeFormat::Arrow,
672                                None,
673                            );
674
675                            while let Some(((k, v), t, d)) =
676                                fetched_part.next_with_storage(&mut key_storage, &mut val_storage)
677                            {
678                                builder
679                                    .add(&k, &v, &t, &d)
680                                    .await
681                                    .expect("re-encoding just-decoded data");
682                            }
683                        } else {
684                            parts.push(part.clone())
685                        }
686                    }
687
688                    let end_index = parts.len();
689
690                    if start_index == end_index {
691                        continue;
692                    }
693
694                    // Mark the boundary if this is not the first run in the batch.
695                    if start_index != 0 {
696                        run_splits.push(start_index);
697                    }
698                    run_metas.push(run_meta.clone());
699                }
700                num_updates += batch.batch.len;
701            }
702
703            let mut flushed_inline_batch = if let Some((_, builder)) = inline_batch_builder.take() {
704                let mut finished = builder
705                    .finish(desc.upper().clone())
706                    .await
707                    .expect("invalid usage");
708                let cfg = BatchBuilderConfig::new(&self.cfg, self.shard_id());
709                finished
710                    .flush_to_blob(
711                        &cfg,
712                        &self.metrics.inline.backpressure,
713                        &self.isolated_runtime,
714                        &self.write_schemas,
715                    )
716                    .await;
717                Some(finished)
718            } else {
719                None
720            };
721
722            if let Some(batch) = &flushed_inline_batch {
723                for (run_meta, run) in batch.batch.runs() {
724                    assert!(run.len() > 0);
725                    let start_index = parts.len();
726                    if start_index != 0 {
727                        run_splits.push(start_index);
728                    }
729                    run_metas.push(run_meta.clone());
730                    parts.extend(run.iter().cloned())
731                }
732            }
733
734            let mut combined_batch =
735                HollowBatch::new(desc.clone(), parts, num_updates, run_metas, run_splits);
736
737            // The batch may have been written by a writer without a registered schema.
738            // Ensure we have a schema ID in the batch metadata before we append, to avoid type
739            // confusion later.
740            match schema_id {
741                Some(schema_id) => {
742                    ensure_batch_schema(&mut combined_batch, self.shard_id(), schema_id);
743                }
744                None => {
745                    assert!(
746                        self.fetch_recent_upper().await.is_empty(),
747                        "fetching a schema id should only fail when the shard is tombstoned"
748                    )
749                }
750            }
751
752            let heartbeat_timestamp = (self.cfg.now)();
753            let res = self
754                .machine
755                .compare_and_append(
756                    &combined_batch,
757                    &self.writer_id,
758                    &self.debug_state,
759                    heartbeat_timestamp,
760                )
761                .await;
762
763            match res {
764                CompareAndAppendRes::Success(_seqno, maintenance) => {
765                    self.upper.clone_from(desc.upper());
766                    for batch in batches.iter_mut() {
767                        batch.mark_consumed();
768                    }
769                    if let Some(batch) = &mut flushed_inline_batch {
770                        batch.mark_consumed();
771                    }
772                    break maintenance;
773                }
774                CompareAndAppendRes::InvalidUsage(invalid_usage) => {
775                    if let Some(batch) = flushed_inline_batch.take() {
776                        batch.delete().await;
777                    }
778                    return Err(invalid_usage);
779                }
780                CompareAndAppendRes::UpperMismatch(_seqno, current_upper) => {
781                    if let Some(batch) = flushed_inline_batch.take() {
782                        batch.delete().await;
783                    }
784                    // We tried to to a compare_and_append with the wrong expected upper, that
785                    // won't work. Update the cached upper to the current upper.
786                    self.upper.clone_from(&current_upper);
787                    return Ok(Err(UpperMismatch {
788                        current: current_upper,
789                        expected: expected_upper,
790                    }));
791                }
792                CompareAndAppendRes::InlineBackpressure => {
793                    // We tried to write an inline part, but there was already
794                    // too much in state. Flush it out to s3 and try again.
795                    assert_eq!(received_inline_backpressure, false);
796                    received_inline_backpressure = true;
797                    if COMBINE_INLINE_WRITES.get(&self.cfg) {
798                        inline_batch_builder = Some((
799                            self.machine.applier.schema_cache(),
800                            self.builder(desc.lower().clone()),
801                        ));
802                        continue;
803                    }
804
805                    let cfg = BatchBuilderConfig::new(&self.cfg, self.shard_id());
806                    // We could have a large number of inline parts (imagine the
807                    // sharded persist_sink), do this flushing concurrently.
808                    let flush_batches = batches
809                        .iter_mut()
810                        .map(|batch| async {
811                            batch
812                                .flush_to_blob(
813                                    &cfg,
814                                    &self.metrics.inline.backpressure,
815                                    &self.isolated_runtime,
816                                    &self.write_schemas,
817                                )
818                                .await
819                        })
820                        .collect::<FuturesUnordered<_>>();
821                    let () = flush_batches.collect::<()>().await;
822
823                    for batch in batches.iter() {
824                        assert_eq!(batch.batch.inline_bytes(), 0);
825                    }
826
827                    continue;
828                }
829            }
830        };
831
832        maintenance.start_performing(&self.machine, &self.gc, self.compact.as_ref());
833
834        Ok(Ok(()))
835    }
836
837    /// Turns the given [`ProtoBatch`] back into a [`Batch`] which can be used
838    /// to append it to this shard.
839    pub fn batch_from_transmittable_batch(&self, batch: ProtoBatch) -> Batch<K, V, T, D> {
840        let shard_id: ShardId = batch
841            .shard_id
842            .into_rust()
843            .expect("valid transmittable batch");
844        assert_eq!(shard_id, self.machine.shard_id());
845
846        let ret = Batch {
847            batch_delete_enabled: BATCH_DELETE_ENABLED.get(&self.cfg),
848            metrics: Arc::clone(&self.metrics),
849            shard_metrics: Arc::clone(&self.machine.applier.shard_metrics),
850            version: Version::parse(&batch.version).expect("valid transmittable batch"),
851            schemas: (batch.key_schema, batch.val_schema),
852            batch: batch
853                .batch
854                .into_rust_if_some("ProtoBatch::batch")
855                .expect("valid transmittable batch"),
856            blob: Arc::clone(&self.blob),
857            _phantom: std::marker::PhantomData,
858        };
859        assert_eq!(ret.shard_id(), self.machine.shard_id());
860        ret
861    }
862
863    /// Returns a [BatchBuilder] that can be used to write a batch of updates to
864    /// blob storage which can then be appended to this shard using
865    /// [Self::compare_and_append_batch] or [Self::append_batch].
866    ///
867    /// It is correct to create an empty batch, which allows for downgrading
868    /// `upper` to communicate progress. (see [Self::compare_and_append_batch]
869    /// or [Self::append_batch])
870    ///
871    /// The builder uses a bounded amount of memory, even when the number of
872    /// updates is very large. Individual records, however, should be small
873    /// enough that we can reasonably chunk them up: O(KB) is definitely fine,
874    /// O(MB) come talk to us.
875    pub fn builder(&self, lower: Antichain<T>) -> BatchBuilder<K, V, T, D> {
876        Self::builder_inner(
877            &self.cfg,
878            CompactConfig::new(&self.cfg, self.shard_id()),
879            Arc::clone(&self.metrics),
880            Arc::clone(&self.machine.applier.shard_metrics),
881            &self.metrics.user,
882            Arc::clone(&self.isolated_runtime),
883            Arc::clone(&self.blob),
884            self.shard_id(),
885            self.write_schemas.clone(),
886            lower,
887        )
888    }
889
890    /// Implementation of [Self::builder], so that we can share the
891    /// implementation in `PersistClient`.
892    pub(crate) fn builder_inner(
893        persist_cfg: &PersistConfig,
894        compact_cfg: CompactConfig,
895        metrics: Arc<Metrics>,
896        shard_metrics: Arc<ShardMetrics>,
897        user_batch_metrics: &BatchWriteMetrics,
898        isolated_runtime: Arc<IsolatedRuntime>,
899        blob: Arc<dyn Blob>,
900        shard_id: ShardId,
901        schemas: Schemas<K, V>,
902        lower: Antichain<T>,
903    ) -> BatchBuilder<K, V, T, D> {
904        let parts = if let Some(max_runs) = compact_cfg.batch.max_runs {
905            BatchParts::new_compacting::<K, V, D>(
906                compact_cfg,
907                Description::new(
908                    lower.clone(),
909                    Antichain::new(),
910                    Antichain::from_elem(T::minimum()),
911                ),
912                max_runs,
913                Arc::clone(&metrics),
914                shard_metrics,
915                shard_id,
916                Arc::clone(&blob),
917                isolated_runtime,
918                user_batch_metrics,
919                schemas.clone(),
920            )
921        } else {
922            BatchParts::new_ordered::<D>(
923                compact_cfg.batch,
924                RunOrder::Unordered,
925                Arc::clone(&metrics),
926                shard_metrics,
927                shard_id,
928                Arc::clone(&blob),
929                isolated_runtime,
930                user_batch_metrics,
931            )
932        };
933        let builder = BatchBuilderInternal::new(
934            BatchBuilderConfig::new(persist_cfg, shard_id),
935            parts,
936            metrics,
937            schemas,
938            blob,
939            shard_id,
940            persist_cfg.build_version.clone(),
941        );
942        BatchBuilder::new(
943            builder,
944            Description::new(lower, Antichain::new(), Antichain::from_elem(T::minimum())),
945        )
946    }
947
948    /// Uploads the given `updates` as one `Batch` to the blob store and returns
949    /// a handle to the batch.
950    #[instrument(level = "trace", fields(shard = %self.machine.shard_id()))]
951    pub async fn batch<SB, KB, VB, TB, DB, I>(
952        &mut self,
953        updates: I,
954        lower: Antichain<T>,
955        upper: Antichain<T>,
956    ) -> Result<Batch<K, V, T, D>, InvalidUsage<T>>
957    where
958        SB: Borrow<((KB, VB), TB, DB)>,
959        KB: Borrow<K>,
960        VB: Borrow<V>,
961        TB: Borrow<T>,
962        DB: Borrow<D>,
963        I: IntoIterator<Item = SB>,
964    {
965        let iter = updates.into_iter();
966
967        let mut builder = self.builder(lower.clone());
968
969        for update in iter {
970            let ((k, v), t, d) = update.borrow();
971            let (k, v, t, d) = (k.borrow(), v.borrow(), t.borrow(), d.borrow());
972            match builder.add(k, v, t, d).await {
973                Ok(Added::Record | Added::RecordAndParts) => (),
974                Err(invalid_usage) => return Err(invalid_usage),
975            }
976        }
977
978        builder.finish(upper.clone()).await
979    }
980
981    /// Blocks until the given `frontier` is less than the upper of the shard.
982    pub async fn wait_for_upper_past(&mut self, frontier: &Antichain<T>) {
983        let mut watch = self.machine.applier.watch();
984        self.machine
985            .wait_for_upper_past(
986                frontier,
987                &mut watch,
988                None,
989                &self.metrics.retries.next_listen_batch, // TODO: new retry metrics for these?
990                next_listen_batch_retry_params(&self.cfg),
991            )
992            .await;
993        let upper = self.machine.applier.clone_upper();
994        if PartialOrder::less_than(&self.upper, &upper) {
995            self.upper.clone_from(&upper);
996        }
997        assert!(PartialOrder::less_than(frontier, &self.upper));
998    }
999
1000    /// Politely expires this writer, releasing any associated state.
1001    ///
1002    /// There is a best-effort impl in Drop to expire a writer that wasn't
1003    /// explictly expired with this method. When possible, explicit expiry is
1004    /// still preferred because the Drop one is best effort and is dependant on
1005    /// a tokio [Handle] being available in the TLC at the time of drop (which
1006    /// is a bit subtle). Also, explicit expiry allows for control over when it
1007    /// happens.
1008    #[instrument(level = "debug", fields(shard = %self.machine.shard_id()))]
1009    pub async fn expire(mut self) {
1010        let Some(expire_fn) = self.expire_fn.take() else {
1011            return;
1012        };
1013        expire_fn.0().await;
1014    }
1015
1016    fn expire_fn(
1017        machine: Machine<K, V, T, D>,
1018        gc: GarbageCollector<K, V, T, D>,
1019        writer_id: WriterId,
1020    ) -> ExpireFn {
1021        ExpireFn(Box::new(move || {
1022            Box::pin(async move {
1023                let (_, maintenance) = machine.expire_writer(&writer_id).await;
1024                maintenance.start_performing(&machine, &gc);
1025            })
1026        }))
1027    }
1028
1029    /// Test helper for an [Self::append] call that is expected to succeed.
1030    #[cfg(test)]
1031    #[track_caller]
1032    pub async fn expect_append<L, U>(&mut self, updates: &[((K, V), T, D)], lower: L, new_upper: U)
1033    where
1034        L: Into<Antichain<T>>,
1035        U: Into<Antichain<T>>,
1036        D: Send + Sync,
1037    {
1038        self.append(updates.iter(), lower.into(), new_upper.into())
1039            .await
1040            .expect("invalid usage")
1041            .expect("unexpected upper");
1042    }
1043
1044    /// Test helper for a [Self::compare_and_append] call that is expected to
1045    /// succeed.
1046    #[cfg(test)]
1047    #[track_caller]
1048    pub async fn expect_compare_and_append(
1049        &mut self,
1050        updates: &[((K, V), T, D)],
1051        expected_upper: T,
1052        new_upper: T,
1053    ) where
1054        D: Send + Sync,
1055    {
1056        self.compare_and_append(
1057            updates.iter().map(|((k, v), t, d)| ((k, v), t, d)),
1058            Antichain::from_elem(expected_upper),
1059            Antichain::from_elem(new_upper),
1060        )
1061        .await
1062        .expect("invalid usage")
1063        .expect("unexpected upper")
1064    }
1065
1066    /// Test helper for a [Self::compare_and_append_batch] call that is expected
1067    /// to succeed.
1068    #[cfg(test)]
1069    #[track_caller]
1070    pub async fn expect_compare_and_append_batch(
1071        &mut self,
1072        batches: &mut [&mut Batch<K, V, T, D>],
1073        expected_upper: T,
1074        new_upper: T,
1075    ) {
1076        self.compare_and_append_batch(
1077            batches,
1078            Antichain::from_elem(expected_upper),
1079            Antichain::from_elem(new_upper),
1080            true,
1081        )
1082        .await
1083        .expect("invalid usage")
1084        .expect("unexpected upper")
1085    }
1086
1087    /// Test helper for an [Self::append] call that is expected to succeed.
1088    #[cfg(test)]
1089    #[track_caller]
1090    pub async fn expect_batch(
1091        &mut self,
1092        updates: &[((K, V), T, D)],
1093        lower: T,
1094        upper: T,
1095    ) -> Batch<K, V, T, D> {
1096        self.batch(
1097            updates.iter(),
1098            Antichain::from_elem(lower),
1099            Antichain::from_elem(upper),
1100        )
1101        .await
1102        .expect("invalid usage")
1103    }
1104}
1105
1106impl<K: Codec, V: Codec, T, D> Drop for WriteHandle<K, V, T, D> {
1107    fn drop(&mut self) {
1108        let Some(expire_fn) = self.expire_fn.take() else {
1109            return;
1110        };
1111        let handle = match Handle::try_current() {
1112            Ok(x) => x,
1113            Err(_) => {
1114                warn!(
1115                    "WriteHandle {} dropped without being explicitly expired, falling back to lease timeout",
1116                    self.writer_id
1117                );
1118                return;
1119            }
1120        };
1121        // Spawn a best-effort task to expire this write handle. It's fine if
1122        // this doesn't run to completion, we'd just have to wait out the lease
1123        // before the shard-global since is unblocked.
1124        //
1125        // Intentionally create the span outside the task to set the parent.
1126        let expire_span = debug_span!("drop::expire");
1127        handle.spawn_named(
1128            || format!("WriteHandle::expire ({})", self.writer_id),
1129            expire_fn.0().instrument(expire_span),
1130        );
1131    }
1132}
1133
1134/// Ensure the given batch uses the given schema ID.
1135///
1136/// If the batch has no schema set, initialize it to the given one.
1137/// If the batch has a schema set, assert that it matches the given one.
1138fn ensure_batch_schema<T>(batch: &mut HollowBatch<T>, shard_id: ShardId, schema_id: SchemaId)
1139where
1140    T: Timestamp + Lattice + Codec64,
1141{
1142    let ensure = |id: &mut Option<SchemaId>| match id {
1143        Some(id) => assert_eq!(*id, schema_id, "schema ID mismatch; shard={shard_id}"),
1144        None => *id = Some(schema_id),
1145    };
1146
1147    for run_meta in &mut batch.run_meta {
1148        ensure(&mut run_meta.schema);
1149    }
1150    for part in &mut batch.parts {
1151        match part {
1152            RunPart::Single(BatchPart::Hollow(part)) => ensure(&mut part.schema_id),
1153            RunPart::Single(BatchPart::Inline { schema_id, .. }) => ensure(schema_id),
1154            RunPart::Many(_hollow_run_ref) => {
1155                // TODO: Fetch the parts in this run and rewrite them too. Alternatively, make
1156                // `run_meta` the only place we keep schema IDs, so rewriting parts isn't
1157                // necessary.
1158            }
1159        }
1160    }
1161}
1162
1163#[cfg(test)]
1164mod tests {
1165    use std::str::FromStr;
1166    use std::sync::mpsc;
1167
1168    use differential_dataflow::consolidation::consolidate_updates;
1169    use futures_util::FutureExt;
1170    use mz_dyncfg::ConfigUpdates;
1171    use mz_ore::collections::CollectionExt;
1172    use mz_ore::task;
1173    use serde_json::json;
1174
1175    use crate::cache::PersistClientCache;
1176    use crate::tests::{all_ok, new_test_client};
1177    use crate::{PersistLocation, ShardId};
1178
1179    use super::*;
1180
1181    #[mz_persist_proc::test(tokio::test)]
1182    #[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented
1183    async fn empty_batches(dyncfgs: ConfigUpdates) {
1184        let data = [
1185            (("1".to_owned(), "one".to_owned()), 1, 1),
1186            (("2".to_owned(), "two".to_owned()), 2, 1),
1187            (("3".to_owned(), "three".to_owned()), 3, 1),
1188        ];
1189
1190        let (mut write, _) = new_test_client(&dyncfgs)
1191            .await
1192            .expect_open::<String, String, u64, i64>(ShardId::new())
1193            .await;
1194        let blob = Arc::clone(&write.blob);
1195
1196        // Write an initial batch.
1197        let mut upper = 3;
1198        write.expect_append(&data[..2], vec![0], vec![upper]).await;
1199
1200        // Write a bunch of empty batches. This shouldn't write blobs, so the count should stay the same.
1201        let mut count_before = 0;
1202        blob.list_keys_and_metadata("", &mut |_| {
1203            count_before += 1;
1204        })
1205        .await
1206        .expect("list_keys failed");
1207        for _ in 0..5 {
1208            let new_upper = upper + 1;
1209            write.expect_compare_and_append(&[], upper, new_upper).await;
1210            upper = new_upper;
1211        }
1212        let mut count_after = 0;
1213        blob.list_keys_and_metadata("", &mut |_| {
1214            count_after += 1;
1215        })
1216        .await
1217        .expect("list_keys failed");
1218        assert_eq!(count_after, count_before);
1219    }
1220
1221    #[mz_persist_proc::test(tokio::test)]
1222    #[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented
1223    async fn compare_and_append_batch_multi(dyncfgs: ConfigUpdates) {
1224        let data0 = vec![
1225            (("1".to_owned(), "one".to_owned()), 1, 1),
1226            (("2".to_owned(), "two".to_owned()), 2, 1),
1227            (("4".to_owned(), "four".to_owned()), 4, 1),
1228        ];
1229        let data1 = vec![
1230            (("1".to_owned(), "one".to_owned()), 1, 1),
1231            (("2".to_owned(), "two".to_owned()), 2, 1),
1232            (("3".to_owned(), "three".to_owned()), 3, 1),
1233        ];
1234
1235        let (mut write, mut read) = new_test_client(&dyncfgs)
1236            .await
1237            .expect_open::<String, String, u64, i64>(ShardId::new())
1238            .await;
1239
1240        let mut batch0 = write.expect_batch(&data0, 0, 5).await;
1241        let mut batch1 = write.expect_batch(&data1, 0, 4).await;
1242
1243        write
1244            .expect_compare_and_append_batch(&mut [&mut batch0, &mut batch1], 0, 4)
1245            .await;
1246
1247        let batch = write
1248            .machine
1249            .unleased_snapshot(&Antichain::from_elem(3))
1250            .await
1251            .expect("just wrote this")
1252            .into_element();
1253
1254        assert!(batch.runs().count() >= 2);
1255
1256        let expected = vec![
1257            (("1".to_owned(), "one".to_owned()), 1, 2),
1258            (("2".to_owned(), "two".to_owned()), 2, 2),
1259            (("3".to_owned(), "three".to_owned()), 3, 1),
1260        ];
1261        let mut actual = read.expect_snapshot_and_fetch(3).await;
1262        consolidate_updates(&mut actual);
1263        assert_eq!(actual, all_ok(&expected, 3));
1264    }
1265
1266    #[mz_ore::test]
1267    fn writer_id_human_readable_serde() {
1268        #[derive(Debug, Serialize, Deserialize)]
1269        struct Container {
1270            writer_id: WriterId,
1271        }
1272
1273        // roundtrip through json
1274        let id = WriterId::from_str("w00000000-1234-5678-0000-000000000000").expect("valid id");
1275        assert_eq!(
1276            id,
1277            serde_json::from_value(serde_json::to_value(id.clone()).expect("serializable"))
1278                .expect("deserializable")
1279        );
1280
1281        // deserialize a serialized string directly
1282        assert_eq!(
1283            id,
1284            serde_json::from_str("\"w00000000-1234-5678-0000-000000000000\"")
1285                .expect("deserializable")
1286        );
1287
1288        // roundtrip id through a container type
1289        let json = json!({ "writer_id": id });
1290        assert_eq!(
1291            "{\"writer_id\":\"w00000000-1234-5678-0000-000000000000\"}",
1292            &json.to_string()
1293        );
1294        let container: Container = serde_json::from_value(json).expect("deserializable");
1295        assert_eq!(container.writer_id, id);
1296    }
1297
1298    #[mz_persist_proc::test(tokio::test)]
1299    #[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented
1300    async fn hollow_batch_roundtrip(dyncfgs: ConfigUpdates) {
1301        let data = vec![
1302            (("1".to_owned(), "one".to_owned()), 1, 1),
1303            (("2".to_owned(), "two".to_owned()), 2, 1),
1304            (("3".to_owned(), "three".to_owned()), 3, 1),
1305        ];
1306
1307        let (mut write, mut read) = new_test_client(&dyncfgs)
1308            .await
1309            .expect_open::<String, String, u64, i64>(ShardId::new())
1310            .await;
1311
1312        // This test is a bit more complex than it should be. It would be easier
1313        // if we could just compare the rehydrated batch to the original batch.
1314        // But a) turning a batch into a hollow batch consumes it, and b) Batch
1315        // doesn't have Eq/PartialEq.
1316        let batch = write.expect_batch(&data, 0, 4).await;
1317        let hollow_batch = batch.into_transmittable_batch();
1318        let mut rehydrated_batch = write.batch_from_transmittable_batch(hollow_batch);
1319
1320        write
1321            .expect_compare_and_append_batch(&mut [&mut rehydrated_batch], 0, 4)
1322            .await;
1323
1324        let expected = vec![
1325            (("1".to_owned(), "one".to_owned()), 1, 1),
1326            (("2".to_owned(), "two".to_owned()), 2, 1),
1327            (("3".to_owned(), "three".to_owned()), 3, 1),
1328        ];
1329        let mut actual = read.expect_snapshot_and_fetch(3).await;
1330        consolidate_updates(&mut actual);
1331        assert_eq!(actual, all_ok(&expected, 3));
1332    }
1333
1334    #[mz_persist_proc::test(tokio::test)]
1335    #[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented
1336    async fn wait_for_upper_past(dyncfgs: ConfigUpdates) {
1337        let client = new_test_client(&dyncfgs).await;
1338        let (mut write, _) = client.expect_open::<(), (), u64, i64>(ShardId::new()).await;
1339        let five = Antichain::from_elem(5);
1340
1341        // Upper is not past 5.
1342        assert_eq!(write.wait_for_upper_past(&five).now_or_never(), None);
1343
1344        // Upper is still not past 5.
1345        write
1346            .expect_compare_and_append(&[(((), ()), 1, 1)], 0, 5)
1347            .await;
1348        assert_eq!(write.wait_for_upper_past(&five).now_or_never(), None);
1349
1350        // Upper is past 5.
1351        write
1352            .expect_compare_and_append(&[(((), ()), 5, 1)], 5, 7)
1353            .await;
1354        assert_eq!(write.wait_for_upper_past(&five).now_or_never(), Some(()));
1355        assert_eq!(write.upper(), &Antichain::from_elem(7));
1356
1357        // Waiting for previous uppers does not regress the handle's cached
1358        // upper.
1359        assert_eq!(
1360            write
1361                .wait_for_upper_past(&Antichain::from_elem(2))
1362                .now_or_never(),
1363            Some(())
1364        );
1365        assert_eq!(write.upper(), &Antichain::from_elem(7));
1366    }
1367
1368    #[mz_ore::test(tokio::test(flavor = "multi_thread"))]
1369    #[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented
1370    async fn fetch_recent_upper_linearized() {
1371        type Timestamp = u64;
1372        let max_upper = 1000;
1373
1374        let shard_id = ShardId::new();
1375        let mut clients = PersistClientCache::new_no_metrics();
1376        let upper_writer_client = clients.open(PersistLocation::new_in_mem()).await.unwrap();
1377        let (mut upper_writer, _) = upper_writer_client
1378            .expect_open::<(), (), Timestamp, i64>(shard_id)
1379            .await;
1380        // Clear the state cache between each client to maximally disconnect
1381        // them from each other.
1382        clients.clear_state_cache();
1383        let upper_reader_client = clients.open(PersistLocation::new_in_mem()).await.unwrap();
1384        let (mut upper_reader, _) = upper_reader_client
1385            .expect_open::<(), (), Timestamp, i64>(shard_id)
1386            .await;
1387        let (tx, rx) = mpsc::channel();
1388
1389        let task = task::spawn(|| "upper-reader", async move {
1390            let mut upper = Timestamp::MIN;
1391
1392            while upper < max_upper {
1393                while let Ok(new_upper) = rx.try_recv() {
1394                    upper = new_upper;
1395                }
1396
1397                let recent_upper = upper_reader
1398                    .fetch_recent_upper()
1399                    .await
1400                    .as_option()
1401                    .cloned()
1402                    .expect("u64 is totally ordered and the shard is not finalized");
1403                assert!(
1404                    recent_upper >= upper,
1405                    "recent upper {recent_upper:?} is less than known upper {upper:?}"
1406                );
1407            }
1408        });
1409
1410        for upper in Timestamp::MIN..max_upper {
1411            let next_upper = upper + 1;
1412            upper_writer
1413                .expect_compare_and_append(&[], upper, next_upper)
1414                .await;
1415            tx.send(next_upper).expect("send failed");
1416        }
1417
1418        task.await;
1419    }
1420}