mz_compute/sink/
materialized_view.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! A dataflow sink that writes input records to a persist shard.
11//!
12//! This implementation is both parallel and self-correcting.
13//!
14//!  * parallel: Multiple workers can participate in writing updates for the same times, letting
15//!    sink throughput scale with the number of workers allocated to the replica.
16//!  * self-correcting: The sink continually compares the contents of the persist shard with the
17//!    contents of the input collection and writes down the difference. If the persist shard ends
18//!    up with undesired contents for any reason, this is corrected the next time the sink manages
19//!    to append to the shard.
20//!
21//! ### Operators
22//!
23//! The persist sink consists of a graph of operators.
24//!
25//!    desired                    persist <---------------.
26//!       |                          |                    |
27//!       |                          |                    |
28//!       |---------------------.    |                    |
29//!       |                     |    |                    |
30//!       |                     |    |                    |
31//!       v                     v    v                    |
32//!   +--------+              +--------+              +--------+
33//!   |  mint  | --descs-.--> | write  | --batches--> | append |
34//!   +--------+          \   +--------+          .-> +--------+
35//!                        \_____________________/
36//!
37//!  * `mint` mints batch descriptions, i.e., `(lower, upper)` bounds of batches that should be
38//!    written. The persist API requires that all workers write batches with the same bounds, so
39//!    they can be appended as a single logical batch. To ensure this, the `mint` operator only
40//!    runs on a single worker that broadcasts minted descriptions to all workers. Batch bounds are
41//!    picked based on the frontiers of the `desired` stream and the output persist shard.
42//!  * `write` stages batch data in persist, based on the batch descriptions received from the
43//!    `mint` operator, but without appending it to the persist shard. This is a multi-worker
44//!    operator, with each worker writing batches of the data that arrives at its local inputs. To
45//!    do so it reads from the `desired` and `persist` streams and produces the difference between
46//!    them to write back out, ensuring that the final contents of the persist shard match
47//!    `desired`.
48//!  * `append` appends the batches minted by `mint` and written by `write` to the persist shard.
49//!    This is a multi-worker operator, where workers are responsible for different subsets of
50//!    batch descriptions. If a worker is responsible for a given batch description, it waits for
51//!    all workers to stage their batches for that batch description, then appends all the batches
52//!    together as a single logical batch.
53//!
54//! Note that while the above graph suggests that `mint` and `write` both receive copies of the
55//! `desired` stream, the actual implementation passes that stream through `mint` and lets `write`
56//! read the passed-through stream, to avoid cloning data.
57//!
58//! Also note that the `append` operator's implementation would perhaps be more natural as a
59//! single-worker implementation. The purpose of sharing the work between all workers is to avoid a
60//! work imbalance where one worker is overloaded (doing both appends and the consequent persist
61//! maintenance work) while others are comparatively idle.
62//!
63//! The persist sink is written to be robust to the presence of other conflicting instances (e.g.
64//! from other replicas) writing to the same persist shard. Each of the three operators needs to be
65//! able to handle conflicting writes that unexpectedly change the contents of the output persist
66//! shard.
67//!
68//! ### Frontiers
69//!
70//! The `desired` frontier tracks the progress of the upstream dataflow, but may be rounded up to
71//! the next refresh time for dataflows that follow a refresh schedule other than "on commit".
72//!
73//! The `persist` frontier tracks the `upper` frontier of the target persist shard, with one
74//! exception: When the `persist_source` that reads back the shard is rendered, it will start
75//! reading at its `since` frontier. So if the shard's `since` is initially greater than its
76//! `upper`, the `persist` frontier too will be in advance of the shard `upper`, until the `upper`
77//! has caught up. To avoid getting confused by this edge case, the `mint` operator does not use
78//! the `persist` stream to observe the shard frontier but keeps its own `WriteHandle` instead.
79//!
80//! The `descs` frontier communicates which `lower` bounds may still be emitted in batch
81//! descriptions. All future batch descriptions will have a `lower` that is greater or equal to the
82//! current `descs` frontier.
83//!
84//! The `batches` frontier communicates for which `lower` bounds batches may still be written. All
85//! batches for descriptions with `lower`s less than the current `batches` frontier have already
86//! been written.
87//!
88//! ### Invariants
89//!
90//! The implementation upholds several invariants that can be relied upon to simplify the
91//! implementation:
92//!
93//!  1. `lower`s in minted batch descriptions are unique and strictly increasing. That is, the
94//!     `mint` operator will never mint the same `lower` twice and a minted `lower` is always
95//!     greater than any previously minted ones.
96//!  2. `upper`s in minted batch descriptions are monotonically increasing.
97//!  3. From (1) follows that there is always at most one "valid" batch description in flight in
98//!     the operator graph. "Valid" here means that the described batch can be appended to the
99//!     persist shard.
100//!
101//! The main simplification these invariants allow is that operators only need to keep track of the
102//! most recent batch description and/or `lower`. Previous batch descriptions are not valid
103//! anymore, so there is no reason to hold any state or perform any work in support of them.
104//!
105//! ### Read-only Mode
106//!
107//! The persist sink can optionally be initialized in read-only mode. In this mode it is passive
108//! and avoids any writes to persist. Activating the `read_only_rx` transitions the sink into write
109//! mode, where it commences normal operation.
110//!
111//! Read-only mode is implemented by the `mint` operator. To disable writes, the `mint` operator
112//! simply avoids minting any batch descriptions. Since both the `write` and the `append` operator
113//! require batch descriptions to write/append batches, this suppresses any persist communication.
114//! At the same time, the `write` operator still observes changes to the `desired` and `persist`
115//! collections, allowing it to keep its correction buffer up-to-date.
116
117use std::any::Any;
118use std::cell::RefCell;
119use std::pin::pin;
120use std::rc::Rc;
121use std::sync::Arc;
122
123use differential_dataflow::{AsCollection, Collection, Hashable};
124use futures::StreamExt;
125use mz_compute_types::sinks::{ComputeSinkDesc, MaterializedViewSinkConnection};
126use mz_dyncfg::ConfigSet;
127use mz_ore::cast::CastFrom;
128use mz_persist_client::batch::{Batch, ProtoBatch};
129use mz_persist_client::cache::PersistClientCache;
130use mz_persist_client::metrics::SinkMetrics;
131use mz_persist_client::operators::shard_source::{ErrorHandler, SnapshotMode};
132use mz_persist_client::write::WriteHandle;
133use mz_persist_client::{Diagnostics, PersistClient};
134use mz_persist_types::codec_impls::UnitSchema;
135use mz_repr::{Diff, GlobalId, Row, Timestamp};
136use mz_storage_types::StorageDiff;
137use mz_storage_types::controller::CollectionMetadata;
138use mz_storage_types::errors::DataflowError;
139use mz_storage_types::sources::SourceData;
140use mz_timely_util::builder_async::PressOnDropButton;
141use mz_timely_util::builder_async::{Event, OperatorBuilder};
142use mz_timely_util::probe::{Handle, ProbeNotify};
143use serde::{Deserialize, Serialize};
144use timely::PartialOrder;
145use timely::container::CapacityContainerBuilder;
146use timely::dataflow::channels::pact::{Exchange, Pipeline};
147use timely::dataflow::operators::{Broadcast, Capability, CapabilitySet, probe};
148use timely::dataflow::{Scope, Stream};
149use timely::progress::Antichain;
150use tokio::sync::watch;
151use tracing::trace;
152
153use crate::compute_state::ComputeState;
154use crate::render::StartSignal;
155use crate::render::sinks::SinkRender;
156use crate::sink::correction::Correction;
157use crate::sink::refresh::apply_refresh;
158
159impl<G> SinkRender<G> for MaterializedViewSinkConnection<CollectionMetadata>
160where
161    G: Scope<Timestamp = Timestamp>,
162{
163    fn render_sink(
164        &self,
165        compute_state: &mut ComputeState,
166        sink: &ComputeSinkDesc<CollectionMetadata>,
167        sink_id: GlobalId,
168        as_of: Antichain<Timestamp>,
169        start_signal: StartSignal,
170        mut ok_collection: Collection<G, Row, Diff>,
171        mut err_collection: Collection<G, DataflowError, Diff>,
172        _ct_times: Option<Collection<G, (), Diff>>,
173        output_probe: &Handle<Timestamp>,
174    ) -> Option<Rc<dyn Any>> {
175        // Attach probes reporting the compute frontier.
176        // The `apply_refresh` operator can round up frontiers, making it impossible to accurately
177        // track the progress of the computation, so we need to attach probes before it.
178        let probe = probe::Handle::default();
179        ok_collection = ok_collection
180            .probe_with(&probe)
181            .inner
182            .probe_notify_with(vec![output_probe.clone()])
183            .as_collection();
184        let collection_state = compute_state.expect_collection_mut(sink_id);
185        collection_state.compute_probe = Some(probe);
186
187        // If a `RefreshSchedule` was specified, round up timestamps.
188        if let Some(refresh_schedule) = &sink.refresh_schedule {
189            ok_collection = apply_refresh(ok_collection, refresh_schedule.clone());
190            err_collection = apply_refresh(err_collection, refresh_schedule.clone());
191        }
192
193        if sink.up_to != Antichain::default() {
194            unimplemented!(
195                "UP TO is not supported for persist sinks yet, and shouldn't have been accepted during parsing/planning"
196            )
197        }
198
199        let token = persist_sink(
200            sink_id,
201            &self.storage_metadata,
202            ok_collection,
203            err_collection,
204            as_of,
205            compute_state,
206            start_signal,
207        );
208        Some(token)
209    }
210}
211
212/// Type of the `desired` stream, split into `Ok` and `Err` streams.
213type DesiredStreams<S> =
214    OkErr<Stream<S, (Row, Timestamp, Diff)>, Stream<S, (DataflowError, Timestamp, Diff)>>;
215
216/// Type of the `persist` stream, split into `Ok` and `Err` streams.
217type PersistStreams<S> =
218    OkErr<Stream<S, (Row, Timestamp, Diff)>, Stream<S, (DataflowError, Timestamp, Diff)>>;
219
220/// Type of the `descs` stream.
221type DescsStream<S> = Stream<S, BatchDescription>;
222
223/// Type of the `batches` stream.
224type BatchesStream<S> = Stream<S, (BatchDescription, ProtoBatch)>;
225
226/// Type of the shared sink write frontier.
227type SharedSinkFrontier = Rc<RefCell<Antichain<Timestamp>>>;
228
229/// Renders an MV sink writing the given desired collection into the `target` persist collection.
230pub(super) fn persist_sink<S>(
231    sink_id: GlobalId,
232    target: &CollectionMetadata,
233    ok_collection: Collection<S, Row, Diff>,
234    err_collection: Collection<S, DataflowError, Diff>,
235    as_of: Antichain<Timestamp>,
236    compute_state: &mut ComputeState,
237    start_signal: StartSignal,
238) -> Rc<dyn Any>
239where
240    S: Scope<Timestamp = Timestamp>,
241{
242    let mut scope = ok_collection.scope();
243    let desired = OkErr::new(ok_collection.inner, err_collection.inner);
244
245    // Read back the persist shard.
246    let (persist, persist_token) = persist_source(
247        &mut scope,
248        sink_id,
249        target.clone(),
250        compute_state,
251        start_signal,
252    );
253
254    let persist_api = PersistApi {
255        persist_clients: Arc::clone(&compute_state.persist_clients),
256        collection: target.clone(),
257        shard_name: sink_id.to_string(),
258        purpose: format!("MV sink {sink_id}"),
259    };
260
261    let (desired, descs, sink_frontier, mint_token) = mint::render(
262        sink_id,
263        persist_api.clone(),
264        as_of.clone(),
265        compute_state.read_only_rx.clone(),
266        &desired,
267    );
268
269    let (batches, write_token) = write::render(
270        sink_id,
271        persist_api.clone(),
272        as_of,
273        &desired,
274        &persist,
275        &descs,
276        Rc::clone(&compute_state.worker_config),
277    );
278
279    let append_token = append::render(sink_id, persist_api, &descs, &batches);
280
281    // Report sink frontier updates to the `ComputeState`.
282    let collection = compute_state.expect_collection_mut(sink_id);
283    collection.sink_write_frontier = Some(sink_frontier);
284
285    Rc::new((persist_token, mint_token, write_token, append_token))
286}
287
288/// Generic wrapper around ok/err pairs (e.g. streams, frontiers), to simplify code dealing with
289/// such pairs.
290struct OkErr<O, E> {
291    ok: O,
292    err: E,
293}
294
295impl<O, E> OkErr<O, E> {
296    fn new(ok: O, err: E) -> Self {
297        Self { ok, err }
298    }
299}
300
301impl OkErr<Antichain<Timestamp>, Antichain<Timestamp>> {
302    fn new_frontiers() -> Self {
303        Self {
304            ok: Antichain::from_elem(Timestamp::MIN),
305            err: Antichain::from_elem(Timestamp::MIN),
306        }
307    }
308
309    /// Return the overall frontier, i.e., the minimum of `ok` and `err`.
310    fn frontier(&self) -> &Antichain<Timestamp> {
311        if PartialOrder::less_equal(&self.ok, &self.err) {
312            &self.ok
313        } else {
314            &self.err
315        }
316    }
317}
318
319/// Advance the given `frontier` to `new`, if the latter one is greater.
320///
321/// Returns whether `frontier` was advanced.
322fn advance(frontier: &mut Antichain<Timestamp>, new: Antichain<Timestamp>) -> bool {
323    if PartialOrder::less_than(frontier, &new) {
324        *frontier = new;
325        true
326    } else {
327        false
328    }
329}
330
331/// A persist API specialized to a single collection.
332#[derive(Clone)]
333struct PersistApi {
334    persist_clients: Arc<PersistClientCache>,
335    collection: CollectionMetadata,
336    shard_name: String,
337    purpose: String,
338}
339
340impl PersistApi {
341    async fn open_client(&self) -> PersistClient {
342        self.persist_clients
343            .open(self.collection.persist_location.clone())
344            .await
345            .unwrap_or_else(|error| panic!("error opening persist client: {error}"))
346    }
347
348    async fn open_writer(&self) -> WriteHandle<SourceData, (), Timestamp, StorageDiff> {
349        self.open_client()
350            .await
351            .open_writer(
352                self.collection.data_shard,
353                Arc::new(self.collection.relation_desc.clone()),
354                Arc::new(UnitSchema),
355                Diagnostics {
356                    shard_name: self.shard_name.clone(),
357                    handle_purpose: self.purpose.clone(),
358                },
359            )
360            .await
361            .unwrap_or_else(|error| panic!("error opening persist writer: {error}"))
362    }
363
364    async fn open_metrics(&self) -> SinkMetrics {
365        let client = self.open_client().await;
366        client.metrics().sink.clone()
367    }
368}
369
370/// Instantiate a persist source reading back the `target` collection.
371fn persist_source<S>(
372    scope: &mut S,
373    sink_id: GlobalId,
374    target: CollectionMetadata,
375    compute_state: &ComputeState,
376    start_signal: StartSignal,
377) -> (PersistStreams<S>, Vec<PressOnDropButton>)
378where
379    S: Scope<Timestamp = Timestamp>,
380{
381    // There is no guarantee that the sink as-of is beyond the persist shard's since. If it isn't,
382    // instantiating a `persist_source` with it would panic. So instead we leave it to
383    // `persist_source` to select an appropriate as-of. We only care about times beyond the current
384    // shard upper anyway.
385    //
386    // TODO(teskje): Ideally we would select the as-of as `join(sink_as_of, since, upper)`, to
387    // allow `persist_source` to omit as much historical detail as possible. However, we don't know
388    // the shard frontiers and we cannot get them here as that requires an `async` context. We
389    // should consider extending the `persist_source` API to allow as-of selection based on the
390    // shard's current frontiers.
391    let as_of = None;
392
393    let until = Antichain::new();
394    let map_filter_project = None;
395
396    let (ok_stream, err_stream, token) = mz_storage_operators::persist_source::persist_source(
397        scope,
398        sink_id,
399        Arc::clone(&compute_state.persist_clients),
400        &compute_state.txns_ctx,
401        &compute_state.worker_config,
402        target,
403        None,
404        as_of,
405        SnapshotMode::Include,
406        until,
407        map_filter_project,
408        compute_state.dataflow_max_inflight_bytes(),
409        start_signal,
410        ErrorHandler::Halt("compute persist sink"),
411    );
412
413    let streams = OkErr::new(ok_stream, err_stream);
414    (streams, token)
415}
416
417/// A description for a batch of updates to be written.
418///
419/// Batch descriptions are produced by the `mint` operator and consumed by the `write` and `append`
420/// operators, where they inform which batches should be written or appended, respectively.
421///
422/// Each batch description also contains the index of its "append worker", i.e. the worker that is
423/// responsible for appending the written batches to the output shard.
424#[derive(Clone, Serialize, Deserialize)]
425struct BatchDescription {
426    lower: Antichain<Timestamp>,
427    upper: Antichain<Timestamp>,
428    append_worker: usize,
429}
430
431impl BatchDescription {
432    fn new(lower: Antichain<Timestamp>, upper: Antichain<Timestamp>, append_worker: usize) -> Self {
433        assert!(PartialOrder::less_than(&lower, &upper));
434        Self {
435            lower,
436            upper,
437            append_worker,
438        }
439    }
440}
441
442impl std::fmt::Debug for BatchDescription {
443    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
444        write!(
445            f,
446            "({:?}, {:?})@{}",
447            self.lower.elements(),
448            self.upper.elements(),
449            self.append_worker,
450        )
451    }
452}
453
454/// Construct a name for the given sub-operator.
455fn operator_name(sink_id: GlobalId, sub_operator: &str) -> String {
456    format!("mv_sink({sink_id})::{sub_operator}")
457}
458
459/// Implementation of the `mint` operator.
460mod mint {
461    use super::*;
462
463    /// Render the `mint` operator.
464    ///
465    /// The parameters passed in are:
466    ///  * `sink_id`: The `GlobalId` of the sink export.
467    ///  * `persist_api`: An object providing access to the output persist shard.
468    ///  * `as_of`: The first time for which the sink may produce output.
469    ///  * `read_only_tx`: A receiver that reports the sink is in read-only mode.
470    ///  * `desired`: The ok/err streams that should be sinked to persist.
471    pub fn render<S>(
472        sink_id: GlobalId,
473        persist_api: PersistApi,
474        as_of: Antichain<Timestamp>,
475        mut read_only_rx: watch::Receiver<bool>,
476        desired: &DesiredStreams<S>,
477    ) -> (
478        DesiredStreams<S>,
479        DescsStream<S>,
480        SharedSinkFrontier,
481        PressOnDropButton,
482    )
483    where
484        S: Scope<Timestamp = Timestamp>,
485    {
486        let scope = desired.ok.scope();
487        let worker_id = scope.index();
488        let worker_count = scope.peers();
489
490        // Determine the active worker for the mint operator.
491        let active_worker_id = usize::cast_from(sink_id.hashed()) % scope.peers();
492
493        let sink_frontier = Rc::new(RefCell::new(Antichain::from_elem(Timestamp::MIN)));
494        let shared_frontier = Rc::clone(&sink_frontier);
495
496        let name = operator_name(sink_id, "mint");
497        let mut op = OperatorBuilder::new(name, scope);
498
499        let (ok_output, ok_stream) = op.new_output::<CapacityContainerBuilder<_>>();
500        let (err_output, err_stream) = op.new_output::<CapacityContainerBuilder<_>>();
501        let desired_outputs = OkErr::new(ok_output, err_output);
502        let desired_output_streams = OkErr::new(ok_stream, err_stream);
503
504        let (desc_output, desc_output_stream) = op.new_output();
505
506        let mut desired_inputs = OkErr {
507            ok: op.new_input_for(&desired.ok, Pipeline, &desired_outputs.ok),
508            err: op.new_input_for(&desired.err, Pipeline, &desired_outputs.err),
509        };
510
511        let button = op.build(move |capabilities| async move {
512            // Passing through the `desired` streams only requires data capabilities, so we can
513            // immediately drop their initial capabilities here.
514            let [_, _, desc_cap]: [_; 3] =
515                capabilities.try_into().expect("one capability per output");
516
517            // Non-active workers just pass the `desired` and `persist` data through.
518            if worker_id != active_worker_id {
519                drop(desc_cap);
520                shared_frontier.borrow_mut().clear();
521
522                loop {
523                    tokio::select! {
524                        Some(event) = desired_inputs.ok.next() => {
525                            if let Event::Data(cap, mut data) = event {
526                                desired_outputs.ok.give_container(&cap, &mut data);
527                            }
528                        }
529                        Some(event) = desired_inputs.err.next() => {
530                            if let Event::Data(cap, mut data) = event {
531                                desired_outputs.err.give_container(&cap, &mut data);
532                            }
533                        }
534                        // All inputs are exhausted, so we can shut down.
535                        else => return,
536                    }
537                }
538            }
539
540            let mut cap_set = CapabilitySet::from_elem(desc_cap);
541
542            let read_only = *read_only_rx.borrow_and_update();
543            let mut state = State::new(sink_id, worker_count, as_of, read_only);
544
545            // Create a stream that reports advancements of the target shard's frontier and updates
546            // the shared sink frontier.
547            //
548            // We collect the persist frontier from a write handle directly, rather than inspecting
549            // the `persist` stream, because the latter has two annoying glitches:
550            //  (a) It starts at the shard's read frontier, not its write frontier.
551            //  (b) It can lag behind if there are spikes in ingested data.
552            let mut persist_frontiers = pin!(async_stream::stream! {
553                let mut writer = persist_api.open_writer().await;
554                let mut frontier = Antichain::from_elem(Timestamp::MIN);
555                while !frontier.is_empty() {
556                    writer.wait_for_upper_past(&frontier).await;
557                    frontier = writer.upper().clone();
558                    shared_frontier.borrow_mut().clone_from(&frontier);
559                    yield frontier.clone();
560                }
561            });
562
563            loop {
564                // Read from the inputs, pass through all data to the respective outputs, and keep
565                // track of the input frontiers. When a frontier advances we might have to mint a
566                // new batch description.
567                let maybe_desc = tokio::select! {
568                    Some(event) = desired_inputs.ok.next() => {
569                        match event {
570                            Event::Data(cap, mut data) => {
571                                desired_outputs.ok.give_container(&cap, &mut data);
572                                None
573                            }
574                            Event::Progress(frontier) => {
575                                state.advance_desired_ok_frontier(frontier);
576                                state.maybe_mint_batch_description()
577                            }
578                        }
579                    }
580                    Some(event) = desired_inputs.err.next() => {
581                        match event {
582                            Event::Data(cap, mut data) => {
583                                desired_outputs.err.give_container(&cap, &mut data);
584                                None
585                            }
586                            Event::Progress(frontier) => {
587                                state.advance_desired_err_frontier(frontier);
588                                state.maybe_mint_batch_description()
589                            }
590                        }
591                    }
592                    Some(frontier) = persist_frontiers.next() => {
593                        state.advance_persist_frontier(frontier);
594                        state.maybe_mint_batch_description()
595                    }
596                    Ok(()) = read_only_rx.changed(), if read_only => {
597                        state.allow_writes();
598                        state.maybe_mint_batch_description()
599                    }
600                    // All inputs are exhausted, so we can shut down.
601                    else => return,
602                };
603
604                if let Some(desc) = maybe_desc {
605                    let lower_ts = *desc.lower.as_option().expect("not empty");
606                    let cap = cap_set.delayed(&lower_ts);
607                    desc_output.give(&cap, desc);
608
609                    // We only emit strictly increasing `lower`s, so we can let our output frontier
610                    // advance beyond the current `lower`.
611                    cap_set.downgrade([lower_ts.step_forward()]);
612                } else {
613                    // The next emitted `lower` will be at least the `persist` frontier, so we can
614                    // advance our output frontier as far.
615                    let _ = cap_set.try_downgrade(state.persist_frontier.iter());
616                }
617            }
618        });
619
620        (
621            desired_output_streams,
622            desc_output_stream,
623            sink_frontier,
624            button.press_on_drop(),
625        )
626    }
627
628    /// State maintained by the `mint` operator.
629    struct State {
630        sink_id: GlobalId,
631        /// The number of workers in the Timely cluster.
632        worker_count: usize,
633        /// The frontiers of the `desired` inputs.
634        desired_frontiers: OkErr<Antichain<Timestamp>, Antichain<Timestamp>>,
635        /// The frontier of the target persist shard.
636        persist_frontier: Antichain<Timestamp>,
637        /// The append worker for the next batch description, chosen in round-robin fashion.
638        next_append_worker: usize,
639        /// The last `lower` we have emitted in a batch description, if any. Whenever the
640        /// `persist_frontier` moves beyond this frontier, we need to mint a new description.
641        last_lower: Option<Antichain<Timestamp>>,
642        /// Whether we are operating in read-only mode.
643        ///
644        /// In read-only mode, minting of batch descriptions is disabled.
645        read_only: bool,
646    }
647
648    impl State {
649        fn new(
650            sink_id: GlobalId,
651            worker_count: usize,
652            as_of: Antichain<Timestamp>,
653            read_only: bool,
654        ) -> Self {
655            // Initializing `persist_frontier` to the `as_of` ensures that the first minted batch
656            // description will have a `lower` of `as_of` or beyond, and thus that we don't spend
657            // work needlessly writing batches at previous times.
658            let persist_frontier = as_of;
659
660            Self {
661                sink_id,
662                worker_count,
663                desired_frontiers: OkErr::new_frontiers(),
664                persist_frontier,
665                next_append_worker: 0,
666                last_lower: None,
667                read_only,
668            }
669        }
670
671        fn trace<S: AsRef<str>>(&self, message: S) {
672            let message = message.as_ref();
673            trace!(
674                sink_id = %self.sink_id,
675                desired_frontier = ?self.desired_frontiers.frontier().elements(),
676                persist_frontier = ?self.persist_frontier.elements(),
677                last_lower = ?self.last_lower.as_ref().map(|f| f.elements()),
678                message,
679            );
680        }
681
682        fn advance_desired_ok_frontier(&mut self, frontier: Antichain<Timestamp>) {
683            if advance(&mut self.desired_frontiers.ok, frontier) {
684                self.trace("advanced `desired` ok frontier");
685            }
686        }
687
688        fn advance_desired_err_frontier(&mut self, frontier: Antichain<Timestamp>) {
689            if advance(&mut self.desired_frontiers.err, frontier) {
690                self.trace("advanced `desired` err frontier");
691            }
692        }
693
694        fn advance_persist_frontier(&mut self, frontier: Antichain<Timestamp>) {
695            if advance(&mut self.persist_frontier, frontier) {
696                self.trace("advanced `persist` frontier");
697            }
698        }
699
700        fn allow_writes(&mut self) {
701            if self.read_only {
702                self.read_only = false;
703                self.trace("disabled read-only mode");
704            }
705        }
706
707        fn maybe_mint_batch_description(&mut self) -> Option<BatchDescription> {
708            let desired_frontier = self.desired_frontiers.frontier();
709            let persist_frontier = &self.persist_frontier;
710
711            // We only mint new batch descriptions when:
712            //  1. We are _not_ in read-only mode.
713            //  2. The `desired` frontier is ahead of the `persist` frontier.
714            //  3. The `persist` frontier advanced since we last emitted a batch description.
715            let desired_ahead = PartialOrder::less_than(persist_frontier, desired_frontier);
716            let persist_advanced = self.last_lower.as_ref().map_or(true, |lower| {
717                PartialOrder::less_than(lower, persist_frontier)
718            });
719
720            if self.read_only || !desired_ahead || !persist_advanced {
721                return None;
722            }
723
724            let lower = persist_frontier.clone();
725            let upper = desired_frontier.clone();
726            let append_worker = self.next_append_worker;
727            let desc = BatchDescription::new(lower, upper, append_worker);
728
729            self.next_append_worker = (append_worker + 1) % self.worker_count;
730            self.last_lower = Some(desc.lower.clone());
731
732            self.trace(format!("minted batch description: {desc:?}"));
733            Some(desc)
734        }
735    }
736}
737
738/// Implementation of the `write` operator.
739mod write {
740    use super::*;
741
742    /// Render the `write` operator.
743    ///
744    /// The parameters passed in are:
745    ///  * `sink_id`: The `GlobalId` of the sink export.
746    ///  * `persist_api`: An object providing access to the output persist shard.
747    ///  * `as_of`: The first time for which the sink may produce output.
748    ///  * `desired`: The ok/err streams that should be sinked to persist.
749    ///  * `persist`: The ok/err streams read back from the output persist shard.
750    ///  * `descs`: The stream of batch descriptions produced by the `mint` operator.
751    pub fn render<S>(
752        sink_id: GlobalId,
753        persist_api: PersistApi,
754        as_of: Antichain<Timestamp>,
755        desired: &DesiredStreams<S>,
756        persist: &PersistStreams<S>,
757        descs: &Stream<S, BatchDescription>,
758        worker_config: Rc<ConfigSet>,
759    ) -> (BatchesStream<S>, PressOnDropButton)
760    where
761        S: Scope<Timestamp = Timestamp>,
762    {
763        let scope = desired.ok.scope();
764        let worker_id = scope.index();
765
766        let name = operator_name(sink_id, "write");
767        let mut op = OperatorBuilder::new(name, scope);
768
769        let (batches_output, batches_output_stream) = op.new_output();
770
771        // It is important that we exchange the `desired` and `persist` data the same way, so
772        // updates that cancel each other out end up on the same worker.
773        let exchange_ok = |(d, _, _): &(Row, Timestamp, Diff)| d.hashed();
774        let exchange_err = |(d, _, _): &(DataflowError, Timestamp, Diff)| d.hashed();
775
776        let mut desired_inputs = OkErr::new(
777            op.new_disconnected_input(&desired.ok, Exchange::new(exchange_ok)),
778            op.new_disconnected_input(&desired.err, Exchange::new(exchange_err)),
779        );
780        let mut persist_inputs = OkErr::new(
781            op.new_disconnected_input(&persist.ok, Exchange::new(exchange_ok)),
782            op.new_disconnected_input(&persist.err, Exchange::new(exchange_err)),
783        );
784        let mut descs_input = op.new_input_for(&descs.broadcast(), Pipeline, &batches_output);
785
786        let button = op.build(move |capabilities| async move {
787            // We will use the data capabilities from the `descs` input to produce output, so no
788            // need to hold onto the initial capabilities.
789            drop(capabilities);
790
791            let writer = persist_api.open_writer().await;
792            let sink_metrics = persist_api.open_metrics().await;
793            let mut state = State::new(
794                sink_id,
795                worker_id,
796                writer,
797                sink_metrics,
798                as_of,
799                &worker_config,
800            );
801
802            loop {
803                // Read from the inputs, extract `desired` updates as positive contributions to
804                // `correction` and `persist` updates as negative contributions. If either the
805                // `desired` or `persist` frontier advances, or if we receive a new batch description,
806                // we might have to write a new batch.
807                let maybe_batch = tokio::select! {
808                    Some(event) = desired_inputs.ok.next() => {
809                        match event {
810                            Event::Data(_cap, mut data) => {
811                                state.corrections.ok.insert(&mut data);
812                                None
813                            }
814                            Event::Progress(frontier) => {
815                                state.advance_desired_ok_frontier(frontier);
816                                state.maybe_write_batch().await
817                            }
818                        }
819                    }
820                    Some(event) = desired_inputs.err.next() => {
821                        match event {
822                            Event::Data(_cap, mut data) => {
823                                state.corrections.err.insert(&mut data);
824                                None
825                            }
826                            Event::Progress(frontier) => {
827                                state.advance_desired_err_frontier(frontier);
828                                state.maybe_write_batch().await
829                            }
830                        }
831                    }
832                    Some(event) = persist_inputs.ok.next() => {
833                        match event {
834                            Event::Data(_cap, mut data) => {
835                                state.corrections.ok.insert_negated(&mut data);
836                                None
837                            }
838                            Event::Progress(frontier) => {
839                                state.advance_persist_ok_frontier(frontier);
840                                state.maybe_write_batch().await
841                            }
842                        }
843                    }
844                    Some(event) = persist_inputs.err.next() => {
845                        match event {
846                            Event::Data(_cap, mut data) => {
847                                state.corrections.err.insert_negated(&mut data);
848                                None
849                            }
850                            Event::Progress(frontier) => {
851                                state.advance_persist_err_frontier(frontier);
852                                state.maybe_write_batch().await
853                            }
854                        }
855                    }
856                    Some(event) = descs_input.next() => {
857                        match event {
858                            Event::Data(cap, data) => {
859                                for desc in data {
860                                    state.absorb_batch_description(desc, cap.clone());
861                                }
862                                state.maybe_write_batch().await
863                            }
864                            Event::Progress(_frontier) => None,
865                        }
866                    }
867                    // All inputs are exhausted, so we can shut down.
868                    else => return,
869                };
870
871                if let Some((index, batch, cap)) = maybe_batch {
872                    batches_output.give(&cap, (index, batch));
873                }
874            }
875        });
876
877        (batches_output_stream, button.press_on_drop())
878    }
879
880    /// State maintained by the `write` operator.
881    struct State {
882        sink_id: GlobalId,
883        worker_id: usize,
884        persist_writer: WriteHandle<SourceData, (), Timestamp, StorageDiff>,
885        /// Contains `desired - persist`, reflecting the updates we would like to commit to
886        /// `persist` in order to "correct" it to track `desired`. This collection is only modified
887        /// by updates received from either the `desired` or `persist` inputs.
888        corrections: OkErr<Correction<Row>, Correction<DataflowError>>,
889        /// The frontiers of the `desired` inputs.
890        desired_frontiers: OkErr<Antichain<Timestamp>, Antichain<Timestamp>>,
891        /// The frontiers of the `persist` inputs.
892        ///
893        /// Note that this is _not_ the same as the write frontier of the output persist shard! It
894        /// usually is, but during snapshot processing, these frontiers will start at the shard's
895        /// read frontier, so they can be beyond its write frontier. This is important as it means
896        /// we must not discard batch descriptions based on these persist frontiers: A batch
897        /// description might still be valid even if its `lower` is before the persist frontiers we
898        /// observe.
899        persist_frontiers: OkErr<Antichain<Timestamp>, Antichain<Timestamp>>,
900        /// The current valid batch description and associated output capability, if any.
901        batch_description: Option<(BatchDescription, Capability<Timestamp>)>,
902        /// A request to force a consolidation of `corrections` once both `desired_frontiers` and
903        /// `persist_frontiers` become greater than the given frontier.
904        ///
905        /// Normally we force a consolidation whenever we write a batch, but there are periods
906        /// (like read-only mode) when that doesn't happen, and we need to manually force
907        /// consolidation instead. Currently this is only used to ensure we quickly get rid of the
908        /// snapshot updates.
909        force_consolidation_after: Option<Antichain<Timestamp>>,
910    }
911
912    impl State {
913        fn new(
914            sink_id: GlobalId,
915            worker_id: usize,
916            persist_writer: WriteHandle<SourceData, (), Timestamp, StorageDiff>,
917            metrics: SinkMetrics,
918            as_of: Antichain<Timestamp>,
919            worker_config: &ConfigSet,
920        ) -> Self {
921            let worker_metrics = metrics.for_worker(worker_id);
922
923            // Force a consolidation of `corrections` after the snapshot updates have been fully
924            // processed, to ensure we get rid of those as quickly as possible.
925            let force_consolidation_after = Some(as_of);
926
927            Self {
928                sink_id,
929                worker_id,
930                persist_writer,
931                corrections: OkErr::new(
932                    Correction::new(metrics.clone(), worker_metrics.clone(), worker_config),
933                    Correction::new(metrics, worker_metrics, worker_config),
934                ),
935                desired_frontiers: OkErr::new_frontiers(),
936                persist_frontiers: OkErr::new_frontiers(),
937                batch_description: None,
938                force_consolidation_after,
939            }
940        }
941
942        fn trace<S: AsRef<str>>(&self, message: S) {
943            let message = message.as_ref();
944            trace!(
945                sink_id = %self.sink_id,
946                worker = %self.worker_id,
947                desired_frontier = ?self.desired_frontiers.frontier().elements(),
948                persist_frontier = ?self.persist_frontiers.frontier().elements(),
949                batch_description = ?self.batch_description.as_ref().map(|(d, _)| d),
950                message,
951            );
952        }
953
954        fn advance_desired_ok_frontier(&mut self, frontier: Antichain<Timestamp>) {
955            if advance(&mut self.desired_frontiers.ok, frontier) {
956                self.apply_desired_frontier_advancement();
957                self.trace("advanced `desired` ok frontier");
958            }
959        }
960
961        fn advance_desired_err_frontier(&mut self, frontier: Antichain<Timestamp>) {
962            if advance(&mut self.desired_frontiers.err, frontier) {
963                self.apply_desired_frontier_advancement();
964                self.trace("advanced `desired` err frontier");
965            }
966        }
967
968        fn advance_persist_ok_frontier(&mut self, frontier: Antichain<Timestamp>) {
969            if advance(&mut self.persist_frontiers.ok, frontier) {
970                self.apply_persist_frontier_advancement();
971                self.trace("advanced `persist` ok frontier");
972            }
973        }
974
975        fn advance_persist_err_frontier(&mut self, frontier: Antichain<Timestamp>) {
976            if advance(&mut self.persist_frontiers.err, frontier) {
977                self.apply_persist_frontier_advancement();
978                self.trace("advanced `persist` err frontier");
979            }
980        }
981
982        /// Apply the effects of a previous `desired` frontier advancement.
983        fn apply_desired_frontier_advancement(&mut self) {
984            self.maybe_force_consolidation();
985        }
986
987        /// Apply the effects of a previous `persist` frontier advancement.
988        fn apply_persist_frontier_advancement(&mut self) {
989            let frontier = self.persist_frontiers.frontier();
990
991            // We will only emit times at or after the `persist` frontier, so now is a good time to
992            // advance the times of stashed updates.
993            self.corrections.ok.advance_since(frontier.clone());
994            self.corrections.err.advance_since(frontier.clone());
995
996            self.maybe_force_consolidation();
997        }
998
999        /// If the current consolidation request has become applicable, apply it.
1000        fn maybe_force_consolidation(&mut self) {
1001            let Some(request) = &self.force_consolidation_after else {
1002                return;
1003            };
1004
1005            let desired_frontier = self.desired_frontiers.frontier();
1006            let persist_frontier = self.persist_frontiers.frontier();
1007            if PartialOrder::less_than(request, desired_frontier)
1008                && PartialOrder::less_than(request, persist_frontier)
1009            {
1010                self.trace("forcing correction consolidation");
1011                self.corrections.ok.consolidate_at_since();
1012                self.corrections.err.consolidate_at_since();
1013
1014                // Remove the consolidation request, now that we have fulfilled it.
1015                self.force_consolidation_after = None;
1016            }
1017        }
1018
1019        fn absorb_batch_description(&mut self, desc: BatchDescription, cap: Capability<Timestamp>) {
1020            // The incoming batch description is outdated if we already have a batch description
1021            // with a greater `lower`.
1022            //
1023            // Note that we cannot assume a description is outdated based on the comparison of its
1024            // `lower` with the `persist_frontier`. The persist frontier observed by the `write`
1025            // operator is initialized with the shard's read frontier, so it can be greater than
1026            // the shard's write frontier.
1027            if let Some((prev, _)) = &self.batch_description {
1028                if PartialOrder::less_than(&desc.lower, &prev.lower) {
1029                    self.trace(format!("skipping outdated batch description: {desc:?}"));
1030                    return;
1031                }
1032            }
1033
1034            self.batch_description = Some((desc, cap));
1035            self.trace("set batch description");
1036        }
1037
1038        async fn maybe_write_batch(
1039            &mut self,
1040        ) -> Option<(BatchDescription, ProtoBatch, Capability<Timestamp>)> {
1041            let (desc, _cap) = self.batch_description.as_ref()?;
1042
1043            // We can write a new batch if we have seen all `persist` updates before `lower` and
1044            // all `desired` updates up to `upper`.
1045            let persist_complete =
1046                PartialOrder::less_equal(&desc.lower, self.persist_frontiers.frontier());
1047            let desired_complete =
1048                PartialOrder::less_equal(&desc.upper, self.desired_frontiers.frontier());
1049            if !persist_complete || !desired_complete {
1050                return None;
1051            }
1052
1053            let (desc, cap) = self.batch_description.take()?;
1054
1055            let ok_updates = self.corrections.ok.updates_before(&desc.upper);
1056            let err_updates = self.corrections.err.updates_before(&desc.upper);
1057
1058            let oks = ok_updates.map(|(d, t, r)| ((SourceData(Ok(d)), ()), t, r.into_inner()));
1059            let errs = err_updates.map(|(d, t, r)| ((SourceData(Err(d)), ()), t, r.into_inner()));
1060            let mut updates = oks.chain(errs).peekable();
1061
1062            // Don't write empty batches.
1063            if updates.peek().is_none() {
1064                drop(updates);
1065                self.trace("skipping empty batch");
1066                return None;
1067            }
1068
1069            let batch = self
1070                .persist_writer
1071                .batch(updates, desc.lower.clone(), desc.upper.clone())
1072                .await
1073                .expect("valid usage")
1074                .into_transmittable_batch();
1075
1076            self.trace("wrote a batch");
1077            Some((desc, batch, cap))
1078        }
1079    }
1080}
1081
1082/// Implementation of the `append` operator.
1083mod append {
1084    use super::*;
1085
1086    /// Render the `append` operator.
1087    ///
1088    /// The parameters passed in are:
1089    ///  * `sink_id`: The `GlobalId` of the sink export.
1090    ///  * `persist_api`: An object providing access to the output persist shard.
1091    ///  * `descs`: The stream of batch descriptions produced by the `mint` operator.
1092    ///  * `batches`: The stream of written batches produced by the `write` operator.
1093    pub fn render<S>(
1094        sink_id: GlobalId,
1095        persist_api: PersistApi,
1096        descs: &DescsStream<S>,
1097        batches: &BatchesStream<S>,
1098    ) -> PressOnDropButton
1099    where
1100        S: Scope<Timestamp = Timestamp>,
1101    {
1102        let scope = descs.scope();
1103        let worker_id = scope.index();
1104
1105        let name = operator_name(sink_id, "append");
1106        let mut op = OperatorBuilder::new(name, scope);
1107
1108        // Broadcast batch descriptions to all workers, regardless of whether or not they are
1109        // responsible for the append, to give them a chance to clean up any outdated state they
1110        // might still hold.
1111        let mut descs_input = op.new_disconnected_input(&descs.broadcast(), Pipeline);
1112        let mut batches_input = op.new_disconnected_input(
1113            batches,
1114            Exchange::new(move |(desc, _): &(BatchDescription, _)| {
1115                u64::cast_from(desc.append_worker)
1116            }),
1117        );
1118
1119        let button = op.build(move |_capabilities| async move {
1120            let writer = persist_api.open_writer().await;
1121            let mut state = State::new(sink_id, worker_id, writer);
1122
1123            loop {
1124                // Read from the inputs, absorb batch descriptions and batches. If the `batches`
1125                // frontier advances, or if we receive a new batch description, we might have to
1126                // append a new batch.
1127                tokio::select! {
1128                    Some(event) = descs_input.next() => {
1129                        if let Event::Data(_cap, data) = event {
1130                            for desc in data {
1131                                state.absorb_batch_description(desc).await;
1132                                state.maybe_append_batches().await;
1133                            }
1134                        }
1135                    }
1136                    Some(event) = batches_input.next() => {
1137                        match event {
1138                            Event::Data(_cap, data) => {
1139                                // The batch description is only used for routing and we ignore it
1140                                // here since we already get one from `descs_input`.
1141                                for (_desc, batch) in data {
1142                                    state.absorb_batch(batch).await;
1143                                }
1144                            }
1145                            Event::Progress(frontier) => {
1146                                state.advance_batches_frontier(frontier);
1147                                state.maybe_append_batches().await;
1148                            }
1149                        }
1150                    }
1151                    // All inputs are exhausted, so we can shut down.
1152                    else => return,
1153                }
1154            }
1155        });
1156
1157        button.press_on_drop()
1158    }
1159
1160    /// State maintained by the `append` operator.
1161    struct State {
1162        sink_id: GlobalId,
1163        worker_id: usize,
1164        persist_writer: WriteHandle<SourceData, (), Timestamp, StorageDiff>,
1165        /// The current input frontier of `batches`.
1166        batches_frontier: Antichain<Timestamp>,
1167        /// The greatest observed `lower` from both `descs` and `batches`.
1168        lower: Antichain<Timestamp>,
1169        /// The batch description for `lower`, if any.
1170        batch_description: Option<BatchDescription>,
1171        /// Batches received for `lower`.
1172        batches: Vec<Batch<SourceData, (), Timestamp, StorageDiff>>,
1173    }
1174
1175    impl State {
1176        fn new(
1177            sink_id: GlobalId,
1178            worker_id: usize,
1179            persist_writer: WriteHandle<SourceData, (), Timestamp, StorageDiff>,
1180        ) -> Self {
1181            Self {
1182                sink_id,
1183                worker_id,
1184                persist_writer,
1185                batches_frontier: Antichain::from_elem(Timestamp::MIN),
1186                lower: Antichain::from_elem(Timestamp::MIN),
1187                batch_description: None,
1188                batches: Default::default(),
1189            }
1190        }
1191
1192        fn trace<S: AsRef<str>>(&self, message: S) {
1193            let message = message.as_ref();
1194            trace!(
1195                sink_id = %self.sink_id,
1196                worker = %self.worker_id,
1197                batches_frontier = ?self.batches_frontier.elements(),
1198                lower = ?self.lower.elements(),
1199                batch_description = ?self.batch_description,
1200                message,
1201            );
1202        }
1203
1204        fn advance_batches_frontier(&mut self, frontier: Antichain<Timestamp>) {
1205            if advance(&mut self.batches_frontier, frontier) {
1206                self.trace("advanced `batches` frontier");
1207            }
1208        }
1209
1210        /// Advance the current `lower`.
1211        ///
1212        /// Discards all currently stashed batches and batch descriptions, assuming that they are
1213        /// now invalid.
1214        async fn advance_lower(&mut self, frontier: Antichain<Timestamp>) {
1215            assert!(PartialOrder::less_than(&self.lower, &frontier));
1216
1217            self.lower = frontier;
1218            self.batch_description = None;
1219
1220            // Remove stashed batches, cleaning up those we didn't append.
1221            for batch in self.batches.drain(..) {
1222                batch.delete().await;
1223            }
1224
1225            self.trace("advanced `lower`");
1226        }
1227
1228        /// Absorb the given batch description into the state, provided it is not outdated.
1229        async fn absorb_batch_description(&mut self, desc: BatchDescription) {
1230            if PartialOrder::less_than(&self.lower, &desc.lower) {
1231                self.advance_lower(desc.lower.clone()).await;
1232            } else if &self.lower != &desc.lower {
1233                self.trace(format!("skipping outdated batch description: {desc:?}"));
1234                return;
1235            }
1236
1237            if desc.append_worker == self.worker_id {
1238                self.batch_description = Some(desc);
1239                self.trace("set batch description");
1240            }
1241        }
1242
1243        /// Absorb the given batch into the state, provided it is not outdated.
1244        async fn absorb_batch(&mut self, batch: ProtoBatch) {
1245            let batch = self.persist_writer.batch_from_transmittable_batch(batch);
1246            if PartialOrder::less_than(&self.lower, batch.lower()) {
1247                self.advance_lower(batch.lower().clone()).await;
1248            } else if &self.lower != batch.lower() {
1249                self.trace(format!(
1250                    "skipping outdated batch: ({:?}, {:?})",
1251                    batch.lower().elements(),
1252                    batch.upper().elements(),
1253                ));
1254
1255                // Ensure the batch's data gets properly cleaned up before dropping it.
1256                batch.delete().await;
1257                return;
1258            }
1259
1260            self.batches.push(batch);
1261            self.trace("absorbed a batch");
1262        }
1263
1264        async fn maybe_append_batches(&mut self) {
1265            let batches_complete = PartialOrder::less_than(&self.lower, &self.batches_frontier);
1266            if !batches_complete {
1267                return;
1268            }
1269
1270            let Some(desc) = self.batch_description.take() else {
1271                return;
1272            };
1273
1274            let new_lower = match self.append_batches(desc).await {
1275                Ok(shard_upper) => {
1276                    self.trace("appended a batch");
1277                    shard_upper
1278                }
1279                Err(shard_upper) => {
1280                    // Failing the append is expected in the presence of concurrent replicas. There
1281                    // is nothing special to do here: The self-correcting feedback mechanism
1282                    // ensures that we observe the concurrent changes, compute their consequences,
1283                    // and append them at a future time.
1284                    self.trace(format!(
1285                        "append failed due to `lower` mismatch: {:?}",
1286                        shard_upper.elements(),
1287                    ));
1288                    shard_upper
1289                }
1290            };
1291
1292            self.advance_lower(new_lower).await;
1293        }
1294
1295        /// Append the current `batches` to the output shard.
1296        ///
1297        /// Returns whether the append was successful or not, and the current shard upper in either
1298        /// case.
1299        ///
1300        /// This method advances the shard upper to the batch `lower` if necessary. This is the
1301        /// mechanism that brings the shard upper to the sink as-of when appending the initial
1302        /// batch.
1303        ///
1304        /// An alternative mechanism for bringing the shard upper to the sink as-of would be making
1305        /// a single append at operator startup. The reason we are doing it here instead is that it
1306        /// simplifies the implementation of read-only mode. In read-only mode we have to defer any
1307        /// persist writes, including the initial upper bump. Having only a single place that
1308        /// performs writes makes it easy to ensure we are doing that correctly.
1309        async fn append_batches(
1310            &mut self,
1311            desc: BatchDescription,
1312        ) -> Result<Antichain<Timestamp>, Antichain<Timestamp>> {
1313            let (lower, upper) = (desc.lower, desc.upper);
1314            let mut to_append: Vec<_> = self.batches.iter_mut().collect();
1315
1316            loop {
1317                let result = self
1318                    .persist_writer
1319                    .compare_and_append_batch(&mut to_append, lower.clone(), upper.clone(), true)
1320                    .await
1321                    .expect("valid usage");
1322
1323                match result {
1324                    Ok(()) => return Ok(upper),
1325                    Err(mismatch) if PartialOrder::less_than(&mismatch.current, &lower) => {
1326                        advance_shard_upper(&mut self.persist_writer, lower.clone()).await;
1327                    }
1328                    Err(mismatch) => return Err(mismatch.current),
1329                }
1330            }
1331        }
1332    }
1333
1334    /// Advance the frontier of the given writer's shard to at least the given `upper`.
1335    async fn advance_shard_upper(
1336        persist_writer: &mut WriteHandle<SourceData, (), Timestamp, StorageDiff>,
1337        upper: Antichain<Timestamp>,
1338    ) {
1339        let empty_updates: &[((SourceData, ()), Timestamp, StorageDiff)] = &[];
1340        let lower = Antichain::from_elem(Timestamp::MIN);
1341        persist_writer
1342            .append(empty_updates, lower, upper)
1343            .await
1344            .expect("valid usage")
1345            .expect("should always succeed");
1346    }
1347}