Skip to main content

mz_compute/sink/
materialized_view.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! A dataflow sink that writes input records to a persist shard.
11//!
12//! This implementation is both parallel and self-correcting.
13//!
14//!  * parallel: Multiple workers can participate in writing updates for the same times, letting
15//!    sink throughput scale with the number of workers allocated to the replica.
16//!  * self-correcting: The sink continually compares the contents of the persist shard with the
17//!    contents of the input collection and writes down the difference. If the persist shard ends
18//!    up with undesired contents for any reason, this is corrected the next time the sink manages
19//!    to append to the shard.
20//!
21//! ### Operators
22//!
23//! The persist sink consists of a graph of operators.
24//!
25//!    desired                    persist <---------------.
26//!       |                          |                    |
27//!       |                          |                    |
28//!       |---------------------.    |                    |
29//!       |                     |    |                    |
30//!       |                     |    |                    |
31//!       v                     v    v                    |
32//!   +--------+              +--------+              +--------+
33//!   |  mint  | --descs-.--> | write  | --batches--> | append |
34//!   +--------+          \   +--------+          .-> +--------+
35//!                        \_____________________/
36//!
37//!  * `mint` mints batch descriptions, i.e., `(lower, upper)` bounds of batches that should be
38//!    written. The persist API requires that all workers write batches with the same bounds, so
39//!    they can be appended as a single logical batch. To ensure this, the `mint` operator only
40//!    runs on a single worker that broadcasts minted descriptions to all workers. Batch bounds are
41//!    picked based on the frontiers of the `desired` stream and the output persist shard.
42//!  * `write` stages batch data in persist, based on the batch descriptions received from the
43//!    `mint` operator, but without appending it to the persist shard. This is a multi-worker
44//!    operator, with each worker writing batches of the data that arrives at its local inputs. To
45//!    do so it reads from the `desired` and `persist` streams and produces the difference between
46//!    them to write back out, ensuring that the final contents of the persist shard match
47//!    `desired`.
48//!  * `append` appends the batches minted by `mint` and written by `write` to the persist shard.
49//!    This is a multi-worker operator, where workers are responsible for different subsets of
50//!    batch descriptions. If a worker is responsible for a given batch description, it waits for
51//!    all workers to stage their batches for that batch description, then appends all the batches
52//!    together as a single logical batch.
53//!
54//! Note that while the above graph suggests that `mint` and `write` both receive copies of the
55//! `desired` stream, the actual implementation passes that stream through `mint` and lets `write`
56//! read the passed-through stream, to avoid cloning data.
57//!
58//! Also note that the `append` operator's implementation would perhaps be more natural as a
59//! single-worker implementation. The purpose of sharing the work between all workers is to avoid a
60//! work imbalance where one worker is overloaded (doing both appends and the consequent persist
61//! maintenance work) while others are comparatively idle.
62//!
63//! The persist sink is written to be robust to the presence of other conflicting instances (e.g.
64//! from other replicas) writing to the same persist shard. Each of the three operators needs to be
65//! able to handle conflicting writes that unexpectedly change the contents of the output persist
66//! shard.
67//!
68//! ### Frontiers
69//!
70//! The `desired` frontier tracks the progress of the upstream dataflow, but may be rounded up to
71//! the next refresh time for dataflows that follow a refresh schedule other than "on commit".
72//!
73//! The `persist` frontier tracks the `upper` frontier of the target persist shard, with one
74//! exception: When the `persist_source` that reads back the shard is rendered, it will start
75//! reading at its `since` frontier. So if the shard's `since` is initially greater than its
76//! `upper`, the `persist` frontier too will be in advance of the shard `upper`, until the `upper`
77//! has caught up. To avoid getting confused by this edge case, the `mint` operator does not use
78//! the `persist` stream to observe the shard frontier but keeps its own `WriteHandle` instead.
79//!
80//! The `descs` frontier communicates which `lower` bounds may still be emitted in batch
81//! descriptions. All future batch descriptions will have a `lower` that is greater or equal to the
82//! current `descs` frontier.
83//!
84//! The `batches` frontier communicates for which `lower` bounds batches may still be written. All
85//! batches for descriptions with `lower`s less than the current `batches` frontier have already
86//! been written.
87//!
88//! ### Invariants
89//!
90//! The implementation upholds several invariants that can be relied upon to simplify the
91//! implementation:
92//!
93//!  1. `lower`s in minted batch descriptions are unique and strictly increasing. That is, the
94//!     `mint` operator will never mint the same `lower` twice and a minted `lower` is always
95//!     greater than any previously minted ones.
96//!  2. `upper`s in minted batch descriptions are monotonically increasing.
97//!  3. From (1) follows that there is always at most one "valid" batch description in flight in
98//!     the operator graph. "Valid" here means that the described batch can be appended to the
99//!     persist shard.
100//!
101//! The main simplification these invariants allow is that operators only need to keep track of the
102//! most recent batch description and/or `lower`. Previous batch descriptions are not valid
103//! anymore, so there is no reason to hold any state or perform any work in support of them.
104//!
105//! ### Read-only Mode
106//!
107//! The persist sink can optionally be initialized in read-only mode. In this mode it is passive
108//! and avoids any writes to persist. Activating the `read_only_rx` transitions the sink into write
109//! mode, where it commences normal operation.
110//!
111//! Read-only mode is implemented by the `mint` operator. To disable writes, the `mint` operator
112//! simply avoids minting any batch descriptions. Since both the `write` and the `append` operator
113//! require batch descriptions to write/append batches, this suppresses any persist communication.
114//! At the same time, the `write` operator still observes changes to the `desired` and `persist`
115//! collections, allowing it to keep its correction buffer up-to-date.
116
117use std::any::Any;
118use std::cell::RefCell;
119use std::pin::pin;
120use std::rc::Rc;
121use std::sync::Arc;
122
123use differential_dataflow::{AsCollection, Hashable, VecCollection};
124use futures::StreamExt;
125use mz_compute_types::dyncfgs::MV_SINK_ADVANCE_PERSIST_FRONTIERS;
126use mz_compute_types::sinks::{ComputeSinkDesc, MaterializedViewSinkConnection};
127use mz_dyncfg::ConfigSet;
128use mz_ore::cast::CastFrom;
129use mz_persist_client::batch::{Batch, ProtoBatch};
130use mz_persist_client::cache::PersistClientCache;
131use mz_persist_client::metrics::SinkMetrics;
132use mz_persist_client::operators::shard_source::{ErrorHandler, SnapshotMode};
133use mz_persist_client::write::WriteHandle;
134use mz_persist_client::{Diagnostics, PersistClient};
135use mz_persist_types::codec_impls::UnitSchema;
136use mz_repr::{Diff, GlobalId, Row, Timestamp};
137use mz_storage_types::StorageDiff;
138use mz_storage_types::controller::CollectionMetadata;
139use mz_storage_types::errors::DataflowError;
140use mz_storage_types::sources::SourceData;
141use mz_timely_util::builder_async::PressOnDropButton;
142use mz_timely_util::builder_async::{Event, OperatorBuilder};
143use mz_timely_util::probe::{Handle, ProbeNotify};
144use serde::{Deserialize, Serialize};
145use timely::PartialOrder;
146use timely::container::CapacityContainerBuilder;
147use timely::dataflow::channels::pact::{Exchange, Pipeline};
148use timely::dataflow::operators::vec::Broadcast;
149use timely::dataflow::operators::{Capability, CapabilitySet, probe};
150use timely::dataflow::{Scope, StreamVec};
151use timely::progress::Antichain;
152use tokio::sync::watch;
153use tracing::trace;
154
155use crate::compute_state::ComputeState;
156use crate::render::StartSignal;
157use crate::render::sinks::SinkRender;
158use crate::sink::correction::{Correction, Logging};
159use crate::sink::refresh::apply_refresh;
160
161impl<G> SinkRender<G> for MaterializedViewSinkConnection<CollectionMetadata>
162where
163    G: Scope<Timestamp = Timestamp>,
164{
165    fn render_sink(
166        &self,
167        compute_state: &mut ComputeState,
168        sink: &ComputeSinkDesc<CollectionMetadata>,
169        sink_id: GlobalId,
170        as_of: Antichain<Timestamp>,
171        start_signal: StartSignal,
172        mut ok_collection: VecCollection<G, Row, Diff>,
173        mut err_collection: VecCollection<G, DataflowError, Diff>,
174        _ct_times: Option<VecCollection<G, (), Diff>>,
175        output_probe: &Handle<Timestamp>,
176    ) -> Option<Rc<dyn Any>> {
177        // Attach probes reporting the compute frontier.
178        // The `apply_refresh` operator can round up frontiers, making it impossible to accurately
179        // track the progress of the computation, so we need to attach probes before it.
180        let probe = probe::Handle::default();
181        ok_collection = ok_collection
182            .probe_with(&probe)
183            .inner
184            .probe_notify_with(vec![output_probe.clone()])
185            .as_collection();
186        let collection_state = compute_state.expect_collection_mut(sink_id);
187        collection_state.compute_probe = Some(probe);
188
189        // If a `RefreshSchedule` was specified, round up timestamps.
190        if let Some(refresh_schedule) = &sink.refresh_schedule {
191            ok_collection = apply_refresh(ok_collection, refresh_schedule.clone());
192            err_collection = apply_refresh(err_collection, refresh_schedule.clone());
193        }
194
195        if sink.up_to != Antichain::default() {
196            unimplemented!(
197                "UP TO is not supported for persist sinks yet, and shouldn't have been accepted during parsing/planning"
198            )
199        }
200
201        let read_only_rx = collection_state.read_only_rx.clone();
202
203        let token = persist_sink(
204            sink_id,
205            &self.storage_metadata,
206            ok_collection,
207            err_collection,
208            as_of,
209            compute_state,
210            start_signal,
211            read_only_rx,
212        );
213        Some(token)
214    }
215}
216
217/// Type of the `desired` stream, split into `Ok` and `Err` streams.
218type DesiredStreams<S> =
219    OkErr<StreamVec<S, (Row, Timestamp, Diff)>, StreamVec<S, (DataflowError, Timestamp, Diff)>>;
220
221/// Type of the `persist` stream, split into `Ok` and `Err` streams.
222type PersistStreams<S> =
223    OkErr<StreamVec<S, (Row, Timestamp, Diff)>, StreamVec<S, (DataflowError, Timestamp, Diff)>>;
224
225/// Type of the `descs` stream.
226type DescsStream<S> = StreamVec<S, BatchDescription>;
227
228/// Type of the `batches` stream.
229type BatchesStream<S> = StreamVec<S, (BatchDescription, ProtoBatch)>;
230
231/// Type of the shared sink write frontier.
232type SharedSinkFrontier = Rc<RefCell<Antichain<Timestamp>>>;
233
234/// Renders an MV sink writing the given desired collection into the `target` persist collection.
235pub(super) fn persist_sink<S>(
236    sink_id: GlobalId,
237    target: &CollectionMetadata,
238    ok_collection: VecCollection<S, Row, Diff>,
239    err_collection: VecCollection<S, DataflowError, Diff>,
240    as_of: Antichain<Timestamp>,
241    compute_state: &mut ComputeState,
242    start_signal: StartSignal,
243    read_only_rx: watch::Receiver<bool>,
244) -> Rc<dyn Any>
245where
246    S: Scope<Timestamp = Timestamp>,
247{
248    let mut scope = ok_collection.scope();
249    let desired = OkErr::new(ok_collection.inner, err_collection.inner);
250
251    // Read back the persist shard.
252    let (persist, persist_token) = persist_source(
253        &mut scope,
254        sink_id,
255        target.clone(),
256        compute_state,
257        start_signal,
258    );
259
260    let persist_api = PersistApi {
261        persist_clients: Arc::clone(&compute_state.persist_clients),
262        collection: target.clone(),
263        shard_name: sink_id.to_string(),
264        purpose: format!("MV sink {sink_id}"),
265    };
266
267    let (desired, descs, sink_frontier, mint_token) = mint::render(
268        sink_id,
269        persist_api.clone(),
270        as_of.clone(),
271        read_only_rx,
272        desired,
273    );
274
275    let (batches, write_token) = write::render(
276        sink_id,
277        persist_api.clone(),
278        as_of,
279        desired,
280        persist,
281        descs.clone(),
282        Rc::clone(&compute_state.worker_config),
283    );
284
285    let append_token = append::render(sink_id, persist_api, descs, batches);
286
287    // Report sink frontier updates to the `ComputeState`.
288    let collection = compute_state.expect_collection_mut(sink_id);
289    collection.sink_write_frontier = Some(sink_frontier);
290
291    Rc::new((persist_token, mint_token, write_token, append_token))
292}
293
294/// Generic wrapper around ok/err pairs (e.g. streams, frontiers), to simplify code dealing with
295/// such pairs.
296struct OkErr<O, E> {
297    ok: O,
298    err: E,
299}
300
301impl<O, E> OkErr<O, E> {
302    fn new(ok: O, err: E) -> Self {
303        Self { ok, err }
304    }
305}
306
307impl OkErr<Antichain<Timestamp>, Antichain<Timestamp>> {
308    fn new_frontiers() -> Self {
309        Self {
310            ok: Antichain::from_elem(Timestamp::MIN),
311            err: Antichain::from_elem(Timestamp::MIN),
312        }
313    }
314
315    /// Return the overall frontier, i.e., the minimum of `ok` and `err`.
316    fn frontier(&self) -> &Antichain<Timestamp> {
317        if PartialOrder::less_equal(&self.ok, &self.err) {
318            &self.ok
319        } else {
320            &self.err
321        }
322    }
323}
324
325/// Advance the given `frontier` to `new`, if the latter one is greater.
326///
327/// Returns whether `frontier` was advanced.
328fn advance(frontier: &mut Antichain<Timestamp>, new: Antichain<Timestamp>) -> bool {
329    if PartialOrder::less_than(frontier, &new) {
330        *frontier = new;
331        true
332    } else {
333        false
334    }
335}
336
337/// A persist API specialized to a single collection.
338#[derive(Clone)]
339struct PersistApi {
340    persist_clients: Arc<PersistClientCache>,
341    collection: CollectionMetadata,
342    shard_name: String,
343    purpose: String,
344}
345
346impl PersistApi {
347    async fn open_client(&self) -> PersistClient {
348        self.persist_clients
349            .open(self.collection.persist_location.clone())
350            .await
351            .unwrap_or_else(|error| panic!("error opening persist client: {error}"))
352    }
353
354    async fn open_writer(&self) -> WriteHandle<SourceData, (), Timestamp, StorageDiff> {
355        self.open_client()
356            .await
357            .open_writer(
358                self.collection.data_shard,
359                Arc::new(self.collection.relation_desc.clone()),
360                Arc::new(UnitSchema),
361                Diagnostics {
362                    shard_name: self.shard_name.clone(),
363                    handle_purpose: self.purpose.clone(),
364                },
365            )
366            .await
367            .unwrap_or_else(|error| panic!("error opening persist writer: {error}"))
368    }
369
370    async fn open_metrics(&self) -> SinkMetrics {
371        let client = self.open_client().await;
372        client.metrics().sink.clone()
373    }
374}
375
376/// Instantiate a persist source reading back the `target` collection.
377fn persist_source<S>(
378    scope: &mut S,
379    sink_id: GlobalId,
380    target: CollectionMetadata,
381    compute_state: &ComputeState,
382    start_signal: StartSignal,
383) -> (PersistStreams<S>, Vec<PressOnDropButton>)
384where
385    S: Scope<Timestamp = Timestamp>,
386{
387    // There is no guarantee that the sink as-of is beyond the persist shard's since. If it isn't,
388    // instantiating a `persist_source` with it would panic. So instead we leave it to
389    // `persist_source` to select an appropriate as-of. We only care about times beyond the current
390    // shard upper anyway.
391    //
392    // TODO(teskje): Ideally we would select the as-of as `join(sink_as_of, since, upper)`, to
393    // allow `persist_source` to omit as much historical detail as possible. However, we don't know
394    // the shard frontiers and we cannot get them here as that requires an `async` context. We
395    // should consider extending the `persist_source` API to allow as-of selection based on the
396    // shard's current frontiers.
397    let as_of = None;
398
399    let until = Antichain::new();
400    let map_filter_project = None;
401
402    let (ok_stream, err_stream, token) = mz_storage_operators::persist_source::persist_source(
403        scope,
404        sink_id,
405        Arc::clone(&compute_state.persist_clients),
406        &compute_state.txns_ctx,
407        target,
408        None,
409        as_of,
410        SnapshotMode::Include,
411        until,
412        map_filter_project,
413        compute_state.dataflow_max_inflight_bytes(),
414        start_signal,
415        ErrorHandler::Halt("compute persist sink"),
416    );
417
418    let streams = OkErr::new(ok_stream, err_stream);
419    (streams, token)
420}
421
422/// A description for a batch of updates to be written.
423///
424/// Batch descriptions are produced by the `mint` operator and consumed by the `write` and `append`
425/// operators, where they inform which batches should be written or appended, respectively.
426///
427/// Each batch description also contains the index of its "append worker", i.e. the worker that is
428/// responsible for appending the written batches to the output shard.
429#[derive(Clone, Serialize, Deserialize)]
430struct BatchDescription {
431    lower: Antichain<Timestamp>,
432    upper: Antichain<Timestamp>,
433    append_worker: usize,
434}
435
436impl BatchDescription {
437    fn new(lower: Antichain<Timestamp>, upper: Antichain<Timestamp>, append_worker: usize) -> Self {
438        assert!(PartialOrder::less_than(&lower, &upper));
439        Self {
440            lower,
441            upper,
442            append_worker,
443        }
444    }
445}
446
447impl std::fmt::Debug for BatchDescription {
448    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
449        write!(
450            f,
451            "({:?}, {:?})@{}",
452            self.lower.elements(),
453            self.upper.elements(),
454            self.append_worker,
455        )
456    }
457}
458
459/// Construct a name for the given sub-operator.
460fn operator_name(sink_id: GlobalId, sub_operator: &str) -> String {
461    format!("mv_sink({sink_id})::{sub_operator}")
462}
463
464/// Implementation of the `mint` operator.
465mod mint {
466    use super::*;
467
468    /// Render the `mint` operator.
469    ///
470    /// The parameters passed in are:
471    ///  * `sink_id`: The `GlobalId` of the sink export.
472    ///  * `persist_api`: An object providing access to the output persist shard.
473    ///  * `as_of`: The first time for which the sink may produce output.
474    ///  * `read_only_tx`: A receiver that reports the sink is in read-only mode.
475    ///  * `desired`: The ok/err streams that should be sinked to persist.
476    pub fn render<S>(
477        sink_id: GlobalId,
478        persist_api: PersistApi,
479        as_of: Antichain<Timestamp>,
480        mut read_only_rx: watch::Receiver<bool>,
481        desired: DesiredStreams<S>,
482    ) -> (
483        DesiredStreams<S>,
484        DescsStream<S>,
485        SharedSinkFrontier,
486        PressOnDropButton,
487    )
488    where
489        S: Scope<Timestamp = Timestamp>,
490    {
491        let scope = desired.ok.scope();
492        let worker_id = scope.index();
493        let worker_count = scope.peers();
494
495        // Determine the active worker for the mint operator.
496        let active_worker_id = usize::cast_from(sink_id.hashed()) % scope.peers();
497
498        let sink_frontier = Rc::new(RefCell::new(Antichain::from_elem(Timestamp::MIN)));
499        let shared_frontier = Rc::clone(&sink_frontier);
500
501        let name = operator_name(sink_id, "mint");
502        let mut op = OperatorBuilder::new(name, scope);
503
504        let (ok_output, ok_stream) = op.new_output::<CapacityContainerBuilder<_>>();
505        let (err_output, err_stream) = op.new_output::<CapacityContainerBuilder<_>>();
506        let desired_outputs = OkErr::new(ok_output, err_output);
507        let desired_output_streams = OkErr::new(ok_stream, err_stream);
508
509        let (desc_output, desc_output_stream) = op.new_output::<CapacityContainerBuilder<_>>();
510
511        let mut desired_inputs = OkErr {
512            ok: op.new_input_for(desired.ok, Pipeline, &desired_outputs.ok),
513            err: op.new_input_for(desired.err, Pipeline, &desired_outputs.err),
514        };
515
516        let button = op.build(move |capabilities| async move {
517            // Passing through the `desired` streams only requires data capabilities, so we can
518            // immediately drop their initial capabilities here.
519            let [_, _, desc_cap]: [_; 3] =
520                capabilities.try_into().expect("one capability per output");
521
522            // Non-active workers just pass the `desired` and `persist` data through.
523            if worker_id != active_worker_id {
524                drop(desc_cap);
525                shared_frontier.borrow_mut().clear();
526
527                loop {
528                    tokio::select! {
529                        Some(event) = desired_inputs.ok.next() => {
530                            if let Event::Data(cap, mut data) = event {
531                                desired_outputs.ok.give_container(&cap, &mut data);
532                            }
533                        }
534                        Some(event) = desired_inputs.err.next() => {
535                            if let Event::Data(cap, mut data) = event {
536                                desired_outputs.err.give_container(&cap, &mut data);
537                            }
538                        }
539                        // All inputs are exhausted, so we can shut down.
540                        else => return,
541                    }
542                }
543            }
544
545            let mut cap_set = CapabilitySet::from_elem(desc_cap);
546
547            let read_only = *read_only_rx.borrow_and_update();
548            let mut state = State::new(sink_id, worker_count, as_of, read_only);
549
550            // Create a stream that reports advancements of the target shard's frontier and updates
551            // the shared sink frontier.
552            //
553            // We collect the persist frontier from a write handle directly, rather than inspecting
554            // the `persist` stream, because the latter has two annoying glitches:
555            //  (a) It starts at the shard's read frontier, not its write frontier.
556            //  (b) It can lag behind if there are spikes in ingested data.
557            let mut persist_frontiers = pin!(async_stream::stream! {
558                let mut writer = persist_api.open_writer().await;
559                let mut frontier = Antichain::from_elem(Timestamp::MIN);
560                while !frontier.is_empty() {
561                    writer.wait_for_upper_past(&frontier).await;
562                    frontier = writer.upper().clone();
563                    shared_frontier.borrow_mut().clone_from(&frontier);
564                    yield frontier.clone();
565                }
566            });
567
568            loop {
569                // Read from the inputs, pass through all data to the respective outputs, and keep
570                // track of the input frontiers. When a frontier advances we might have to mint a
571                // new batch description.
572                let maybe_desc = tokio::select! {
573                    Some(event) = desired_inputs.ok.next() => {
574                        match event {
575                            Event::Data(cap, mut data) => {
576                                desired_outputs.ok.give_container(&cap, &mut data);
577                                None
578                            }
579                            Event::Progress(frontier) => {
580                                state.advance_desired_ok_frontier(frontier);
581                                state.maybe_mint_batch_description()
582                            }
583                        }
584                    }
585                    Some(event) = desired_inputs.err.next() => {
586                        match event {
587                            Event::Data(cap, mut data) => {
588                                desired_outputs.err.give_container(&cap, &mut data);
589                                None
590                            }
591                            Event::Progress(frontier) => {
592                                state.advance_desired_err_frontier(frontier);
593                                state.maybe_mint_batch_description()
594                            }
595                        }
596                    }
597                    Some(frontier) = persist_frontiers.next() => {
598                        state.advance_persist_frontier(frontier);
599                        state.maybe_mint_batch_description()
600                    }
601                    Ok(()) = read_only_rx.changed(), if read_only => {
602                        state.allow_writes();
603                        state.maybe_mint_batch_description()
604                    }
605                    // All inputs are exhausted, so we can shut down.
606                    else => return,
607                };
608
609                if let Some(desc) = maybe_desc {
610                    let lower_ts = *desc.lower.as_option().expect("not empty");
611                    let cap = cap_set.delayed(&lower_ts);
612                    desc_output.give(&cap, desc);
613
614                    // We only emit strictly increasing `lower`s, so we can let our output frontier
615                    // advance beyond the current `lower`.
616                    cap_set.downgrade([lower_ts.step_forward()]);
617                } else {
618                    // The next emitted `lower` will be at least the `persist` frontier, so we can
619                    // advance our output frontier as far.
620                    let _ = cap_set.try_downgrade(state.persist_frontier.iter());
621                }
622            }
623        });
624
625        (
626            desired_output_streams,
627            desc_output_stream,
628            sink_frontier,
629            button.press_on_drop(),
630        )
631    }
632
633    /// State maintained by the `mint` operator.
634    struct State {
635        sink_id: GlobalId,
636        /// The number of workers in the Timely cluster.
637        worker_count: usize,
638        /// The frontiers of the `desired` inputs.
639        desired_frontiers: OkErr<Antichain<Timestamp>, Antichain<Timestamp>>,
640        /// The frontier of the target persist shard.
641        persist_frontier: Antichain<Timestamp>,
642        /// The append worker for the next batch description, chosen in round-robin fashion.
643        next_append_worker: usize,
644        /// The last `lower` we have emitted in a batch description, if any. Whenever the
645        /// `persist_frontier` moves beyond this frontier, we need to mint a new description.
646        last_lower: Option<Antichain<Timestamp>>,
647        /// Whether we are operating in read-only mode.
648        ///
649        /// In read-only mode, minting of batch descriptions is disabled.
650        read_only: bool,
651    }
652
653    impl State {
654        fn new(
655            sink_id: GlobalId,
656            worker_count: usize,
657            as_of: Antichain<Timestamp>,
658            read_only: bool,
659        ) -> Self {
660            // Initializing `persist_frontier` to the `as_of` ensures that the first minted batch
661            // description will have a `lower` of `as_of` or beyond, and thus that we don't spend
662            // work needlessly writing batches at previous times.
663            let persist_frontier = as_of;
664
665            Self {
666                sink_id,
667                worker_count,
668                desired_frontiers: OkErr::new_frontiers(),
669                persist_frontier,
670                next_append_worker: 0,
671                last_lower: None,
672                read_only,
673            }
674        }
675
676        fn trace<S: AsRef<str>>(&self, message: S) {
677            let message = message.as_ref();
678            trace!(
679                sink_id = %self.sink_id,
680                desired_frontier = ?self.desired_frontiers.frontier().elements(),
681                persist_frontier = ?self.persist_frontier.elements(),
682                last_lower = ?self.last_lower.as_ref().map(|f| f.elements()),
683                message,
684            );
685        }
686
687        fn advance_desired_ok_frontier(&mut self, frontier: Antichain<Timestamp>) {
688            if advance(&mut self.desired_frontiers.ok, frontier) {
689                self.trace("advanced `desired` ok frontier");
690            }
691        }
692
693        fn advance_desired_err_frontier(&mut self, frontier: Antichain<Timestamp>) {
694            if advance(&mut self.desired_frontiers.err, frontier) {
695                self.trace("advanced `desired` err frontier");
696            }
697        }
698
699        fn advance_persist_frontier(&mut self, frontier: Antichain<Timestamp>) {
700            if advance(&mut self.persist_frontier, frontier) {
701                self.trace("advanced `persist` frontier");
702            }
703        }
704
705        fn allow_writes(&mut self) {
706            if self.read_only {
707                self.read_only = false;
708                self.trace("disabled read-only mode");
709            }
710        }
711
712        fn maybe_mint_batch_description(&mut self) -> Option<BatchDescription> {
713            let desired_frontier = self.desired_frontiers.frontier();
714            let persist_frontier = &self.persist_frontier;
715
716            // We only mint new batch descriptions when:
717            //  1. We are _not_ in read-only mode.
718            //  2. The `desired` frontier is ahead of the `persist` frontier.
719            //  3. The `persist` frontier advanced since we last emitted a batch description.
720            let desired_ahead = PartialOrder::less_than(persist_frontier, desired_frontier);
721            let persist_advanced = self.last_lower.as_ref().map_or(true, |lower| {
722                PartialOrder::less_than(lower, persist_frontier)
723            });
724
725            if self.read_only || !desired_ahead || !persist_advanced {
726                return None;
727            }
728
729            let lower = persist_frontier.clone();
730            let upper = desired_frontier.clone();
731            let append_worker = self.next_append_worker;
732            let desc = BatchDescription::new(lower, upper, append_worker);
733
734            self.next_append_worker = (append_worker + 1) % self.worker_count;
735            self.last_lower = Some(desc.lower.clone());
736
737            self.trace(format!("minted batch description: {desc:?}"));
738            Some(desc)
739        }
740    }
741}
742
743/// Implementation of the `write` operator.
744mod write {
745    use super::*;
746
747    /// Render the `write` operator.
748    ///
749    /// The parameters passed in are:
750    ///  * `sink_id`: The `GlobalId` of the sink export.
751    ///  * `persist_api`: An object providing access to the output persist shard.
752    ///  * `as_of`: The first time for which the sink may produce output.
753    ///  * `desired`: The ok/err streams that should be sinked to persist.
754    ///  * `persist`: The ok/err streams read back from the output persist shard.
755    ///  * `descs`: The stream of batch descriptions produced by the `mint` operator.
756    pub fn render<S>(
757        sink_id: GlobalId,
758        persist_api: PersistApi,
759        as_of: Antichain<Timestamp>,
760        desired: DesiredStreams<S>,
761        persist: PersistStreams<S>,
762        descs: DescsStream<S>,
763        worker_config: Rc<ConfigSet>,
764    ) -> (BatchesStream<S>, PressOnDropButton)
765    where
766        S: Scope<Timestamp = Timestamp>,
767    {
768        let scope = desired.ok.scope();
769        let worker_id = scope.index();
770
771        let name = operator_name(sink_id, "write");
772        let mut op = OperatorBuilder::new(name, scope.clone());
773
774        let mut logging = None;
775        if let (Some(compute_logger), Some(differential_logger)) = (
776            scope.logger_for("materialize/compute"),
777            scope.logger_for("differential/arrange"),
778        ) {
779            let operator_info = op.operator_info();
780            logging = Some(Logging::new(
781                compute_logger,
782                differential_logger.into(),
783                operator_info.global_id,
784                operator_info.address.to_vec(),
785            ));
786        }
787
788        let (batches_output, batches_output_stream) =
789            op.new_output::<CapacityContainerBuilder<_>>();
790
791        // It is important that we exchange the `desired` and `persist` data the same way, so
792        // updates that cancel each other out end up on the same worker.
793        let exchange_ok = |(d, _, _): &(Row, Timestamp, Diff)| d.hashed();
794        let exchange_err = |(d, _, _): &(DataflowError, Timestamp, Diff)| d.hashed();
795
796        let mut desired_inputs = OkErr::new(
797            op.new_disconnected_input(desired.ok, Exchange::new(exchange_ok)),
798            op.new_disconnected_input(desired.err, Exchange::new(exchange_err)),
799        );
800        let mut persist_inputs = OkErr::new(
801            op.new_disconnected_input(persist.ok, Exchange::new(exchange_ok)),
802            op.new_disconnected_input(persist.err, Exchange::new(exchange_err)),
803        );
804        let mut descs_input = op.new_input_for(descs.broadcast(), Pipeline, &batches_output);
805
806        let button = op.build(move |capabilities| async move {
807            // We will use the data capabilities from the `descs` input to produce output, so no
808            // need to hold onto the initial capabilities.
809            drop(capabilities);
810
811            let writer = persist_api.open_writer().await;
812            let sink_metrics = persist_api.open_metrics().await;
813            let mut state = State::new(
814                sink_id,
815                worker_id,
816                writer,
817                sink_metrics,
818                logging,
819                as_of,
820                &worker_config,
821            );
822
823            loop {
824                // Read from the inputs, extract `desired` updates as positive contributions to
825                // `correction` and `persist` updates as negative contributions. If either the
826                // `desired` or `persist` frontier advances, or if we receive a new batch description,
827                // we might have to write a new batch.
828                let maybe_batch = tokio::select! {
829                    Some(event) = desired_inputs.ok.next() => {
830                        match event {
831                            Event::Data(_cap, mut data) => {
832                                state.corrections.ok.insert(&mut data);
833                                None
834                            }
835                            Event::Progress(frontier) => {
836                                state.advance_desired_ok_frontier(frontier);
837                                state.maybe_write_batch().await
838                            }
839                        }
840                    }
841                    Some(event) = desired_inputs.err.next() => {
842                        match event {
843                            Event::Data(_cap, mut data) => {
844                                state.corrections.err.insert(&mut data);
845                                None
846                            }
847                            Event::Progress(frontier) => {
848                                state.advance_desired_err_frontier(frontier);
849                                state.maybe_write_batch().await
850                            }
851                        }
852                    }
853                    Some(event) = persist_inputs.ok.next() => {
854                        match event {
855                            Event::Data(_cap, mut data) => {
856                                state.corrections.ok.insert_negated(&mut data);
857                                None
858                            }
859                            Event::Progress(frontier) => {
860                                state.advance_persist_ok_frontier(frontier);
861                                state.maybe_write_batch().await
862                            }
863                        }
864                    }
865                    Some(event) = persist_inputs.err.next() => {
866                        match event {
867                            Event::Data(_cap, mut data) => {
868                                state.corrections.err.insert_negated(&mut data);
869                                None
870                            }
871                            Event::Progress(frontier) => {
872                                state.advance_persist_err_frontier(frontier);
873                                state.maybe_write_batch().await
874                            }
875                        }
876                    }
877                    Some(event) = descs_input.next() => {
878                        match event {
879                            Event::Data(cap, data) => {
880                                for desc in data {
881                                    state.absorb_batch_description(desc, cap.clone());
882                                }
883                                state.maybe_write_batch().await
884                            }
885                            Event::Progress(_frontier) => None,
886                        }
887                    }
888                    // All inputs are exhausted, so we can shut down.
889                    else => return,
890                };
891
892                if let Some((index, batch, cap)) = maybe_batch {
893                    batches_output.give(&cap, (index, batch));
894                }
895            }
896        });
897
898        (batches_output_stream, button.press_on_drop())
899    }
900
901    /// State maintained by the `write` operator.
902    struct State {
903        sink_id: GlobalId,
904        worker_id: usize,
905        persist_writer: WriteHandle<SourceData, (), Timestamp, StorageDiff>,
906        /// Contains `desired - persist`, reflecting the updates we would like to commit to
907        /// `persist` in order to "correct" it to track `desired`. This collection is only modified
908        /// by updates received from either the `desired` or `persist` inputs.
909        corrections: OkErr<Correction<Row>, Correction<DataflowError>>,
910        /// The frontiers of the `desired` inputs.
911        desired_frontiers: OkErr<Antichain<Timestamp>, Antichain<Timestamp>>,
912        /// The frontiers of the `persist` inputs.
913        ///
914        /// Note that this is _not_ the same as the write frontier of the output persist shard! It
915        /// usually is, but during snapshot processing, these frontiers will start at the shard's
916        /// read frontier, so they can be beyond its write frontier. This is important as it means
917        /// we must not discard batch descriptions based on these persist frontiers: A batch
918        /// description might still be valid even if its `lower` is before the persist frontiers we
919        /// observe.
920        persist_frontiers: OkErr<Antichain<Timestamp>, Antichain<Timestamp>>,
921        /// The current valid batch description and associated output capability, if any.
922        batch_description: Option<(BatchDescription, Capability<Timestamp>)>,
923        /// A request to force a consolidation of `corrections` once both `desired_frontiers` and
924        /// `persist_frontiers` become greater than the given frontier.
925        ///
926        /// Normally we force a consolidation whenever we write a batch, but there are periods
927        /// (like read-only mode) when that doesn't happen, and we need to manually force
928        /// consolidation instead. Currently this is only used to ensure we quickly get rid of the
929        /// snapshot updates.
930        force_consolidation_after: Option<Antichain<Timestamp>>,
931    }
932
933    impl State {
934        fn new(
935            sink_id: GlobalId,
936            worker_id: usize,
937            persist_writer: WriteHandle<SourceData, (), Timestamp, StorageDiff>,
938            metrics: SinkMetrics,
939            logging: Option<Logging>,
940            as_of: Antichain<Timestamp>,
941            worker_config: &ConfigSet,
942        ) -> Self {
943            let worker_metrics = metrics.for_worker(worker_id);
944
945            // Force a consolidation of `corrections` after the snapshot updates have been fully
946            // processed, to ensure we get rid of those as quickly as possible.
947            let force_consolidation_after = Some(as_of.clone());
948
949            let mut state = Self {
950                sink_id,
951                worker_id,
952                persist_writer,
953                corrections: OkErr::new(
954                    Correction::new(
955                        metrics.clone(),
956                        worker_metrics.clone(),
957                        logging.clone(),
958                        worker_config,
959                    ),
960                    Correction::new(metrics, worker_metrics, logging, worker_config),
961                ),
962                desired_frontiers: OkErr::new_frontiers(),
963                persist_frontiers: OkErr::new_frontiers(),
964                batch_description: None,
965                force_consolidation_after,
966            };
967
968            // Immediately advance the persist frontier tracking to the `as_of`.
969            // This is important to ensure the persist sink doesn't get stuck if the output shard's
970            // initial frontier is less than the `as_of`. The `mint` operator first emits a batch
971            // description with `lower = as_of`, and the `write` operator only emits a batch when
972            // its observed persist frontier is >= the batch description's `lower`, which (assuming
973            // no other writers) would be never if we didn't advance the observed persist frontier
974            // to the `as_of`.
975            if MV_SINK_ADVANCE_PERSIST_FRONTIERS.get(worker_config) {
976                state.advance_persist_ok_frontier(as_of.clone());
977                state.advance_persist_err_frontier(as_of);
978            }
979
980            state
981        }
982
983        fn trace<S: AsRef<str>>(&self, message: S) {
984            let message = message.as_ref();
985            trace!(
986                sink_id = %self.sink_id,
987                worker = %self.worker_id,
988                desired_frontier = ?self.desired_frontiers.frontier().elements(),
989                persist_frontier = ?self.persist_frontiers.frontier().elements(),
990                batch_description = ?self.batch_description.as_ref().map(|(d, _)| d),
991                message,
992            );
993        }
994
995        fn advance_desired_ok_frontier(&mut self, frontier: Antichain<Timestamp>) {
996            if advance(&mut self.desired_frontiers.ok, frontier) {
997                self.apply_desired_frontier_advancement();
998                self.trace("advanced `desired` ok frontier");
999            }
1000        }
1001
1002        fn advance_desired_err_frontier(&mut self, frontier: Antichain<Timestamp>) {
1003            if advance(&mut self.desired_frontiers.err, frontier) {
1004                self.apply_desired_frontier_advancement();
1005                self.trace("advanced `desired` err frontier");
1006            }
1007        }
1008
1009        fn advance_persist_ok_frontier(&mut self, frontier: Antichain<Timestamp>) {
1010            if advance(&mut self.persist_frontiers.ok, frontier) {
1011                self.apply_persist_frontier_advancement();
1012                self.trace("advanced `persist` ok frontier");
1013            }
1014        }
1015
1016        fn advance_persist_err_frontier(&mut self, frontier: Antichain<Timestamp>) {
1017            if advance(&mut self.persist_frontiers.err, frontier) {
1018                self.apply_persist_frontier_advancement();
1019                self.trace("advanced `persist` err frontier");
1020            }
1021        }
1022
1023        /// Apply the effects of a previous `desired` frontier advancement.
1024        fn apply_desired_frontier_advancement(&mut self) {
1025            self.maybe_force_consolidation();
1026        }
1027
1028        /// Apply the effects of a previous `persist` frontier advancement.
1029        fn apply_persist_frontier_advancement(&mut self) {
1030            let frontier = self.persist_frontiers.frontier();
1031
1032            // We will only emit times at or after the `persist` frontier, so now is a good time to
1033            // advance the times of stashed updates.
1034            self.corrections.ok.advance_since(frontier.clone());
1035            self.corrections.err.advance_since(frontier.clone());
1036
1037            self.maybe_force_consolidation();
1038        }
1039
1040        /// If the current consolidation request has become applicable, apply it.
1041        fn maybe_force_consolidation(&mut self) {
1042            let Some(request) = &self.force_consolidation_after else {
1043                return;
1044            };
1045
1046            let desired_frontier = self.desired_frontiers.frontier();
1047            let persist_frontier = self.persist_frontiers.frontier();
1048            if PartialOrder::less_than(request, desired_frontier)
1049                && PartialOrder::less_than(request, persist_frontier)
1050            {
1051                self.trace("forcing correction consolidation");
1052                self.corrections.ok.consolidate_at_since();
1053                self.corrections.err.consolidate_at_since();
1054
1055                // Remove the consolidation request, now that we have fulfilled it.
1056                self.force_consolidation_after = None;
1057            }
1058        }
1059
1060        fn absorb_batch_description(&mut self, desc: BatchDescription, cap: Capability<Timestamp>) {
1061            // The incoming batch description is outdated if we already have a batch description
1062            // with a greater `lower`.
1063            //
1064            // Note that we cannot assume a description is outdated based on the comparison of its
1065            // `lower` with the `persist_frontier`. The persist frontier observed by the `write`
1066            // operator is initialized with the shard's read frontier, so it can be greater than
1067            // the shard's write frontier.
1068            if let Some((prev, _)) = &self.batch_description {
1069                if PartialOrder::less_than(&desc.lower, &prev.lower) {
1070                    self.trace(format!("skipping outdated batch description: {desc:?}"));
1071                    return;
1072                }
1073            }
1074
1075            self.batch_description = Some((desc, cap));
1076            self.trace("set batch description");
1077        }
1078
1079        async fn maybe_write_batch(
1080            &mut self,
1081        ) -> Option<(BatchDescription, ProtoBatch, Capability<Timestamp>)> {
1082            let (desc, _cap) = self.batch_description.as_ref()?;
1083
1084            // We can write a new batch if we have seen all `persist` updates before `lower` and
1085            // all `desired` updates up to `upper`.
1086            let persist_complete =
1087                PartialOrder::less_equal(&desc.lower, self.persist_frontiers.frontier());
1088            let desired_complete =
1089                PartialOrder::less_equal(&desc.upper, self.desired_frontiers.frontier());
1090            if !persist_complete || !desired_complete {
1091                return None;
1092            }
1093
1094            let (desc, cap) = self.batch_description.take()?;
1095
1096            let ok_updates = self.corrections.ok.updates_before(&desc.upper);
1097            let err_updates = self.corrections.err.updates_before(&desc.upper);
1098
1099            let oks = ok_updates.map(|(d, t, r)| ((SourceData(Ok(d)), ()), t, r.into_inner()));
1100            let errs = err_updates.map(|(d, t, r)| ((SourceData(Err(d)), ()), t, r.into_inner()));
1101            let mut updates = oks.chain(errs).peekable();
1102
1103            // Don't write empty batches.
1104            if updates.peek().is_none() {
1105                drop(updates);
1106                self.trace("skipping empty batch");
1107                return None;
1108            }
1109
1110            let batch = self
1111                .persist_writer
1112                .batch(updates, desc.lower.clone(), desc.upper.clone())
1113                .await
1114                .expect("valid usage")
1115                .into_transmittable_batch();
1116
1117            self.trace("wrote a batch");
1118            Some((desc, batch, cap))
1119        }
1120    }
1121}
1122
1123/// Implementation of the `append` operator.
1124mod append {
1125    use super::*;
1126
1127    /// Render the `append` operator.
1128    ///
1129    /// The parameters passed in are:
1130    ///  * `sink_id`: The `GlobalId` of the sink export.
1131    ///  * `persist_api`: An object providing access to the output persist shard.
1132    ///  * `descs`: The stream of batch descriptions produced by the `mint` operator.
1133    ///  * `batches`: The stream of written batches produced by the `write` operator.
1134    pub fn render<S>(
1135        sink_id: GlobalId,
1136        persist_api: PersistApi,
1137        descs: DescsStream<S>,
1138        batches: BatchesStream<S>,
1139    ) -> PressOnDropButton
1140    where
1141        S: Scope<Timestamp = Timestamp>,
1142    {
1143        let scope = descs.scope();
1144        let worker_id = scope.index();
1145
1146        let name = operator_name(sink_id, "append");
1147        let mut op = OperatorBuilder::new(name, scope);
1148
1149        // Broadcast batch descriptions to all workers, regardless of whether or not they are
1150        // responsible for the append, to give them a chance to clean up any outdated state they
1151        // might still hold.
1152        let mut descs_input = op.new_disconnected_input(descs.broadcast(), Pipeline);
1153        let mut batches_input = op.new_disconnected_input(
1154            batches,
1155            Exchange::new(move |(desc, _): &(BatchDescription, _)| {
1156                u64::cast_from(desc.append_worker)
1157            }),
1158        );
1159
1160        let button = op.build(move |_capabilities| async move {
1161            let writer = persist_api.open_writer().await;
1162            let mut state = State::new(sink_id, worker_id, writer);
1163
1164            loop {
1165                // Read from the inputs, absorb batch descriptions and batches. If the `batches`
1166                // frontier advances, or if we receive a new batch description, we might have to
1167                // append a new batch.
1168                tokio::select! {
1169                    Some(event) = descs_input.next() => {
1170                        if let Event::Data(_cap, data) = event {
1171                            for desc in data {
1172                                state.absorb_batch_description(desc).await;
1173                                state.maybe_append_batches().await;
1174                            }
1175                        }
1176                    }
1177                    Some(event) = batches_input.next() => {
1178                        match event {
1179                            Event::Data(_cap, data) => {
1180                                // The batch description is only used for routing and we ignore it
1181                                // here since we already get one from `descs_input`.
1182                                for (_desc, batch) in data {
1183                                    state.absorb_batch(batch).await;
1184                                }
1185                            }
1186                            Event::Progress(frontier) => {
1187                                state.advance_batches_frontier(frontier);
1188                                state.maybe_append_batches().await;
1189                            }
1190                        }
1191                    }
1192                    // All inputs are exhausted, so we can shut down.
1193                    else => return,
1194                }
1195            }
1196        });
1197
1198        button.press_on_drop()
1199    }
1200
1201    /// State maintained by the `append` operator.
1202    struct State {
1203        sink_id: GlobalId,
1204        worker_id: usize,
1205        persist_writer: WriteHandle<SourceData, (), Timestamp, StorageDiff>,
1206        /// The current input frontier of `batches`.
1207        batches_frontier: Antichain<Timestamp>,
1208        /// The greatest observed `lower` from both `descs` and `batches`.
1209        lower: Antichain<Timestamp>,
1210        /// The batch description for `lower`, if any.
1211        batch_description: Option<BatchDescription>,
1212        /// Batches received for `lower`.
1213        batches: Vec<Batch<SourceData, (), Timestamp, StorageDiff>>,
1214    }
1215
1216    impl State {
1217        fn new(
1218            sink_id: GlobalId,
1219            worker_id: usize,
1220            persist_writer: WriteHandle<SourceData, (), Timestamp, StorageDiff>,
1221        ) -> Self {
1222            Self {
1223                sink_id,
1224                worker_id,
1225                persist_writer,
1226                batches_frontier: Antichain::from_elem(Timestamp::MIN),
1227                lower: Antichain::from_elem(Timestamp::MIN),
1228                batch_description: None,
1229                batches: Default::default(),
1230            }
1231        }
1232
1233        fn trace<S: AsRef<str>>(&self, message: S) {
1234            let message = message.as_ref();
1235            trace!(
1236                sink_id = %self.sink_id,
1237                worker = %self.worker_id,
1238                batches_frontier = ?self.batches_frontier.elements(),
1239                lower = ?self.lower.elements(),
1240                batch_description = ?self.batch_description,
1241                message,
1242            );
1243        }
1244
1245        fn advance_batches_frontier(&mut self, frontier: Antichain<Timestamp>) {
1246            if advance(&mut self.batches_frontier, frontier) {
1247                self.trace("advanced `batches` frontier");
1248            }
1249        }
1250
1251        /// Advance the current `lower`.
1252        ///
1253        /// Discards all currently stashed batches and batch descriptions, assuming that they are
1254        /// now invalid.
1255        async fn advance_lower(&mut self, frontier: Antichain<Timestamp>) {
1256            assert!(PartialOrder::less_than(&self.lower, &frontier));
1257
1258            self.lower = frontier;
1259            self.batch_description = None;
1260
1261            // Remove stashed batches, cleaning up those we didn't append.
1262            for batch in self.batches.drain(..) {
1263                batch.delete().await;
1264            }
1265
1266            self.trace("advanced `lower`");
1267        }
1268
1269        /// Absorb the given batch description into the state, provided it is not outdated.
1270        async fn absorb_batch_description(&mut self, desc: BatchDescription) {
1271            if PartialOrder::less_than(&self.lower, &desc.lower) {
1272                self.advance_lower(desc.lower.clone()).await;
1273            } else if &self.lower != &desc.lower {
1274                self.trace(format!("skipping outdated batch description: {desc:?}"));
1275                return;
1276            }
1277
1278            if desc.append_worker == self.worker_id {
1279                self.batch_description = Some(desc);
1280                self.trace("set batch description");
1281            }
1282        }
1283
1284        /// Absorb the given batch into the state, provided it is not outdated.
1285        async fn absorb_batch(&mut self, batch: ProtoBatch) {
1286            let batch = self.persist_writer.batch_from_transmittable_batch(batch);
1287            if PartialOrder::less_than(&self.lower, batch.lower()) {
1288                self.advance_lower(batch.lower().clone()).await;
1289            } else if &self.lower != batch.lower() {
1290                self.trace(format!(
1291                    "skipping outdated batch: ({:?}, {:?})",
1292                    batch.lower().elements(),
1293                    batch.upper().elements(),
1294                ));
1295
1296                // Ensure the batch's data gets properly cleaned up before dropping it.
1297                batch.delete().await;
1298                return;
1299            }
1300
1301            self.batches.push(batch);
1302            self.trace("absorbed a batch");
1303        }
1304
1305        async fn maybe_append_batches(&mut self) {
1306            let batches_complete = PartialOrder::less_than(&self.lower, &self.batches_frontier);
1307            if !batches_complete {
1308                return;
1309            }
1310
1311            let Some(desc) = self.batch_description.take() else {
1312                return;
1313            };
1314
1315            let new_lower = match self.append_batches(desc).await {
1316                Ok(shard_upper) => {
1317                    self.trace("appended a batch");
1318                    shard_upper
1319                }
1320                Err(shard_upper) => {
1321                    // Failing the append is expected in the presence of concurrent replicas. There
1322                    // is nothing special to do here: The self-correcting feedback mechanism
1323                    // ensures that we observe the concurrent changes, compute their consequences,
1324                    // and append them at a future time.
1325                    self.trace(format!(
1326                        "append failed due to `lower` mismatch: {:?}",
1327                        shard_upper.elements(),
1328                    ));
1329                    shard_upper
1330                }
1331            };
1332
1333            self.advance_lower(new_lower).await;
1334        }
1335
1336        /// Append the current `batches` to the output shard.
1337        ///
1338        /// Returns whether the append was successful or not, and the current shard upper in either
1339        /// case.
1340        ///
1341        /// This method advances the shard upper to the batch `lower` if necessary. This is the
1342        /// mechanism that brings the shard upper to the sink as-of when appending the initial
1343        /// batch.
1344        ///
1345        /// An alternative mechanism for bringing the shard upper to the sink as-of would be making
1346        /// a single append at operator startup. The reason we are doing it here instead is that it
1347        /// simplifies the implementation of read-only mode. In read-only mode we have to defer any
1348        /// persist writes, including the initial upper bump. Having only a single place that
1349        /// performs writes makes it easy to ensure we are doing that correctly.
1350        async fn append_batches(
1351            &mut self,
1352            desc: BatchDescription,
1353        ) -> Result<Antichain<Timestamp>, Antichain<Timestamp>> {
1354            let (lower, upper) = (desc.lower, desc.upper);
1355            let mut to_append: Vec<_> = self.batches.iter_mut().collect();
1356
1357            loop {
1358                let result = self
1359                    .persist_writer
1360                    .compare_and_append_batch(&mut to_append, lower.clone(), upper.clone(), true)
1361                    .await
1362                    .expect("valid usage");
1363
1364                match result {
1365                    Ok(()) => return Ok(upper),
1366                    Err(mismatch) if PartialOrder::less_than(&mismatch.current, &lower) => {
1367                        advance_shard_upper(&mut self.persist_writer, lower.clone()).await;
1368
1369                        // At this point the shard's since and upper are likely the same, a state
1370                        // that is likely to hit edge-cases in logic reasoning about frontiers.
1371                        fail::fail_point!("mv_advanced_upper");
1372                    }
1373                    Err(mismatch) => return Err(mismatch.current),
1374                }
1375            }
1376        }
1377    }
1378
1379    /// Advance the frontier of the given writer's shard to at least the given `upper`.
1380    async fn advance_shard_upper(
1381        persist_writer: &mut WriteHandle<SourceData, (), Timestamp, StorageDiff>,
1382        upper: Antichain<Timestamp>,
1383    ) {
1384        let empty_updates: &[((SourceData, ()), Timestamp, StorageDiff)] = &[];
1385        let lower = Antichain::from_elem(Timestamp::MIN);
1386        persist_writer
1387            .append(empty_updates, lower, upper)
1388            .await
1389            .expect("valid usage")
1390            .expect("should always succeed");
1391    }
1392}