mz_compute/sink/
materialized_view.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! A dataflow sink that writes input records to a persist shard.
11//!
12//! This implementation is both parallel and self-correcting.
13//!
14//!  * parallel: Multiple workers can participate in writing updates for the same times, letting
15//!    sink throughput scale with the number of workers allocated to the replica.
16//!  * self-correcting: The sink continually compares the contents of the persist shard with the
17//!    contents of the input collection and writes down the difference. If the persist shard ends
18//!    up with undesired contents for any reason, this is corrected the next time the sink manages
19//!    to append to the shard.
20//!
21//! ### Operators
22//!
23//! The persist sink consists of a graph of operators.
24//!
25//!    desired                    persist <---------------.
26//!       |                          |                    |
27//!       |                          |                    |
28//!       |---------------------.    |                    |
29//!       |                     |    |                    |
30//!       |                     |    |                    |
31//!       v                     v    v                    |
32//!   +--------+              +--------+              +--------+
33//!   |  mint  | --descs-.--> | write  | --batches--> | append |
34//!   +--------+          \   +--------+          .-> +--------+
35//!                        \_____________________/
36//!
37//!  * `mint` mints batch descriptions, i.e., `(lower, upper)` bounds of batches that should be
38//!    written. The persist API requires that all workers write batches with the same bounds, so
39//!    they can be appended as a single logical batch. To ensure this, the `mint` operator only
40//!    runs on a single worker that broadcasts minted descriptions to all workers. Batch bounds are
41//!    picked based on the frontiers of the `desired` stream and the output persist shard.
42//!  * `write` stages batch data in persist, based on the batch descriptions received from the
43//!    `mint` operator, but without appending it to the persist shard. This is a multi-worker
44//!    operator, with each worker writing batches of the data that arrives at its local inputs. To
45//!    do so it reads from the `desired` and `persist` streams and produces the difference between
46//!    them to write back out, ensuring that the final contents of the persist shard match
47//!    `desired`.
48//!  * `append` appends the batches minted by `mint` and written by `write` to the persist shard.
49//!    This is a multi-worker operator, where workers are responsible for different subsets of
50//!    batch descriptions. If a worker is responsible for a given batch description, it waits for
51//!    all workers to stage their batches for that batch description, then appends all the batches
52//!    together as a single logical batch.
53//!
54//! Note that while the above graph suggests that `mint` and `write` both receive copies of the
55//! `desired` stream, the actual implementation passes that stream through `mint` and lets `write`
56//! read the passed-through stream, to avoid cloning data.
57//!
58//! Also note that the `append` operator's implementation would perhaps be more natural as a
59//! single-worker implementation. The purpose of sharing the work between all workers is to avoid a
60//! work imbalance where one worker is overloaded (doing both appends and the consequent persist
61//! maintenance work) while others are comparatively idle.
62//!
63//! The persist sink is written to be robust to the presence of other conflicting instances (e.g.
64//! from other replicas) writing to the same persist shard. Each of the three operators needs to be
65//! able to handle conflicting writes that unexpectedly change the contents of the output persist
66//! shard.
67//!
68//! ### Frontiers
69//!
70//! The `desired` frontier tracks the progress of the upstream dataflow, but may be rounded up to
71//! the next refresh time for dataflows that follow a refresh schedule other than "on commit".
72//!
73//! The `persist` frontier tracks the `upper` frontier of the target persist shard, with one
74//! exception: When the `persist_source` that reads back the shard is rendered, it will start
75//! reading at its `since` frontier. So if the shard's `since` is initially greater than its
76//! `upper`, the `persist` frontier too will be in advance of the shard `upper`, until the `upper`
77//! has caught up. To avoid getting confused by this edge case, the `mint` operator does not use
78//! the `persist` stream to observe the shard frontier but keeps its own `WriteHandle` instead.
79//!
80//! The `descs` frontier communicates which `lower` bounds may still be emitted in batch
81//! descriptions. All future batch descriptions will have a `lower` that is greater or equal to the
82//! current `descs` frontier.
83//!
84//! The `batches` frontier communicates for which `lower` bounds batches may still be written. All
85//! batches for descriptions with `lower`s less than the current `batches` frontier have already
86//! been written.
87//!
88//! ### Invariants
89//!
90//! The implementation upholds several invariants that can be relied upon to simplify the
91//! implementation:
92//!
93//!  1. `lower`s in minted batch descriptions are unique and strictly increasing. That is, the
94//!     `mint` operator will never mint the same `lower` twice and a minted `lower` is always
95//!     greater than any previously minted ones.
96//!  2. `upper`s in minted batch descriptions are monotonically increasing.
97//!  3. From (1) follows that there is always at most one "valid" batch description in flight in
98//!     the operator graph. "Valid" here means that the described batch can be appended to the
99//!     persist shard.
100//!
101//! The main simplification these invariants allow is that operators only need to keep track of the
102//! most recent batch description and/or `lower`. Previous batch descriptions are not valid
103//! anymore, so there is no reason to hold any state or perform any work in support of them.
104//!
105//! ### Read-only Mode
106//!
107//! The persist sink can optionally be initialized in read-only mode. In this mode it is passive
108//! and avoids any writes to persist. Activating the `read_only_rx` transitions the sink into write
109//! mode, where it commences normal operation.
110//!
111//! Read-only mode is implemented by the `mint` operator. To disable writes, the `mint` operator
112//! simply avoids minting any batch descriptions. Since both the `write` and the `append` operator
113//! require batch descriptions to write/append batches, this suppresses any persist communication.
114//! At the same time, the `write` operator still observes changes to the `desired` and `persist`
115//! collections, allowing it to keep its correction buffer up-to-date.
116
117use std::any::Any;
118use std::cell::RefCell;
119use std::pin::pin;
120use std::rc::Rc;
121use std::sync::Arc;
122
123use differential_dataflow::{AsCollection, Collection, Hashable};
124use futures::StreamExt;
125use mz_compute_types::dyncfgs::ENABLE_MV_APPEND_SMEARING;
126use mz_compute_types::sinks::{ComputeSinkDesc, MaterializedViewSinkConnection};
127use mz_dyncfg::ConfigSet;
128use mz_ore::cast::CastFrom;
129use mz_persist_client::batch::{Batch, ProtoBatch};
130use mz_persist_client::cache::PersistClientCache;
131use mz_persist_client::metrics::SinkMetrics;
132use mz_persist_client::operators::shard_source::SnapshotMode;
133use mz_persist_client::write::WriteHandle;
134use mz_persist_client::{Diagnostics, PersistClient};
135use mz_persist_types::codec_impls::UnitSchema;
136use mz_repr::{Diff, GlobalId, Row, Timestamp};
137use mz_storage_types::StorageDiff;
138use mz_storage_types::controller::CollectionMetadata;
139use mz_storage_types::errors::DataflowError;
140use mz_storage_types::sources::SourceData;
141use mz_timely_util::builder_async::PressOnDropButton;
142use mz_timely_util::builder_async::{Event, OperatorBuilder};
143use mz_timely_util::probe::{Handle, ProbeNotify};
144use serde::{Deserialize, Serialize};
145use timely::PartialOrder;
146use timely::container::CapacityContainerBuilder;
147use timely::dataflow::channels::pact::{Exchange, Pipeline};
148use timely::dataflow::operators::{Broadcast, Capability, CapabilitySet, probe};
149use timely::dataflow::{Scope, Stream};
150use timely::progress::Antichain;
151use tokio::sync::watch;
152use tracing::trace;
153
154use crate::compute_state::ComputeState;
155use crate::render::StartSignal;
156use crate::render::sinks::SinkRender;
157use crate::sink::correction::Correction;
158use crate::sink::refresh::apply_refresh;
159
160impl<G> SinkRender<G> for MaterializedViewSinkConnection<CollectionMetadata>
161where
162    G: Scope<Timestamp = Timestamp>,
163{
164    fn render_sink(
165        &self,
166        compute_state: &mut ComputeState,
167        sink: &ComputeSinkDesc<CollectionMetadata>,
168        sink_id: GlobalId,
169        as_of: Antichain<Timestamp>,
170        start_signal: StartSignal,
171        mut ok_collection: Collection<G, Row, Diff>,
172        mut err_collection: Collection<G, DataflowError, Diff>,
173        _ct_times: Option<Collection<G, (), Diff>>,
174        output_probe: &Handle<Timestamp>,
175    ) -> Option<Rc<dyn Any>> {
176        // Attach probes reporting the compute frontier.
177        // The `apply_refresh` operator can round up frontiers, making it impossible to accurately
178        // track the progress of the computation, so we need to attach probes before it.
179        let probe = probe::Handle::default();
180        ok_collection = ok_collection
181            .probe_with(&probe)
182            .inner
183            .probe_notify_with(vec![output_probe.clone()])
184            .as_collection();
185        let collection_state = compute_state.expect_collection_mut(sink_id);
186        collection_state.compute_probe = Some(probe);
187
188        // If a `RefreshSchedule` was specified, round up timestamps.
189        if let Some(refresh_schedule) = &sink.refresh_schedule {
190            ok_collection = apply_refresh(ok_collection, refresh_schedule.clone());
191            err_collection = apply_refresh(err_collection, refresh_schedule.clone());
192        }
193
194        if sink.up_to != Antichain::default() {
195            unimplemented!(
196                "UP TO is not supported for persist sinks yet, and shouldn't have been accepted during parsing/planning"
197            )
198        }
199
200        let token = persist_sink(
201            sink_id,
202            &self.storage_metadata,
203            ok_collection,
204            err_collection,
205            as_of,
206            compute_state,
207            start_signal,
208        );
209        Some(token)
210    }
211}
212
213/// Type of the `desired` stream, split into `Ok` and `Err` streams.
214type DesiredStreams<S> =
215    OkErr<Stream<S, (Row, Timestamp, Diff)>, Stream<S, (DataflowError, Timestamp, Diff)>>;
216
217/// Type of the `persist` stream, split into `Ok` and `Err` streams.
218type PersistStreams<S> =
219    OkErr<Stream<S, (Row, Timestamp, Diff)>, Stream<S, (DataflowError, Timestamp, Diff)>>;
220
221/// Type of the `descs` stream.
222type DescsStream<S> = Stream<S, BatchDescription>;
223
224/// Type of the `batches` stream.
225type BatchesStream<S> = Stream<S, (BatchDescription, ProtoBatch)>;
226
227/// Type of the shared sink write frontier.
228type SharedSinkFrontier = Rc<RefCell<Antichain<Timestamp>>>;
229
230/// Renders an MV sink writing the given desired collection into the `target` persist collection.
231pub(super) fn persist_sink<S>(
232    sink_id: GlobalId,
233    target: &CollectionMetadata,
234    ok_collection: Collection<S, Row, Diff>,
235    err_collection: Collection<S, DataflowError, Diff>,
236    as_of: Antichain<Timestamp>,
237    compute_state: &mut ComputeState,
238    start_signal: StartSignal,
239) -> Rc<dyn Any>
240where
241    S: Scope<Timestamp = Timestamp>,
242{
243    let mut scope = ok_collection.scope();
244    let desired = OkErr::new(ok_collection.inner, err_collection.inner);
245
246    // Read back the persist shard.
247    let (persist, persist_token) = persist_source(
248        &mut scope,
249        sink_id,
250        target.clone(),
251        compute_state,
252        start_signal,
253    );
254
255    let persist_api = PersistApi {
256        persist_clients: Arc::clone(&compute_state.persist_clients),
257        collection: target.clone(),
258        shard_name: sink_id.to_string(),
259        purpose: format!("MV sink {sink_id}"),
260    };
261
262    let (desired, descs, sink_frontier, mint_token) = mint::render(
263        sink_id,
264        persist_api.clone(),
265        as_of.clone(),
266        compute_state.read_only_rx.clone(),
267        &desired,
268        Rc::clone(&compute_state.worker_config),
269    );
270
271    let (batches, write_token) = write::render(
272        sink_id,
273        persist_api.clone(),
274        as_of,
275        &desired,
276        &persist,
277        &descs,
278        Rc::clone(&compute_state.worker_config),
279    );
280
281    let append_token = append::render(sink_id, persist_api, &descs, &batches);
282
283    // Report sink frontier updates to the `ComputeState`.
284    let collection = compute_state.expect_collection_mut(sink_id);
285    collection.sink_write_frontier = Some(sink_frontier);
286
287    Rc::new((persist_token, mint_token, write_token, append_token))
288}
289
290/// Generic wrapper around ok/err pairs (e.g. streams, frontiers), to simplify code dealing with
291/// such pairs.
292struct OkErr<O, E> {
293    ok: O,
294    err: E,
295}
296
297impl<O, E> OkErr<O, E> {
298    fn new(ok: O, err: E) -> Self {
299        Self { ok, err }
300    }
301}
302
303impl OkErr<Antichain<Timestamp>, Antichain<Timestamp>> {
304    fn new_frontiers() -> Self {
305        Self {
306            ok: Antichain::from_elem(Timestamp::MIN),
307            err: Antichain::from_elem(Timestamp::MIN),
308        }
309    }
310
311    /// Return the overall frontier, i.e., the minimum of `ok` and `err`.
312    fn frontier(&self) -> &Antichain<Timestamp> {
313        if PartialOrder::less_equal(&self.ok, &self.err) {
314            &self.ok
315        } else {
316            &self.err
317        }
318    }
319}
320
321/// Advance the given `frontier` to `new`, if the latter one is greater.
322///
323/// Returns whether `frontier` was advanced.
324fn advance(frontier: &mut Antichain<Timestamp>, new: Antichain<Timestamp>) -> bool {
325    if PartialOrder::less_than(frontier, &new) {
326        *frontier = new;
327        true
328    } else {
329        false
330    }
331}
332
333/// A persist API specialized to a single collection.
334#[derive(Clone)]
335struct PersistApi {
336    persist_clients: Arc<PersistClientCache>,
337    collection: CollectionMetadata,
338    shard_name: String,
339    purpose: String,
340}
341
342impl PersistApi {
343    async fn open_client(&self) -> PersistClient {
344        self.persist_clients
345            .open(self.collection.persist_location.clone())
346            .await
347            .unwrap_or_else(|error| panic!("error opening persist client: {error}"))
348    }
349
350    async fn open_writer(&self) -> WriteHandle<SourceData, (), Timestamp, StorageDiff> {
351        self.open_client()
352            .await
353            .open_writer(
354                self.collection.data_shard,
355                Arc::new(self.collection.relation_desc.clone()),
356                Arc::new(UnitSchema),
357                Diagnostics {
358                    shard_name: self.shard_name.clone(),
359                    handle_purpose: self.purpose.clone(),
360                },
361            )
362            .await
363            .unwrap_or_else(|error| panic!("error opening persist writer: {error}"))
364    }
365
366    async fn open_metrics(&self) -> SinkMetrics {
367        let client = self.open_client().await;
368        client.metrics().sink.clone()
369    }
370}
371
372/// Instantiate a persist source reading back the `target` collection.
373fn persist_source<S>(
374    scope: &mut S,
375    sink_id: GlobalId,
376    target: CollectionMetadata,
377    compute_state: &ComputeState,
378    start_signal: StartSignal,
379) -> (PersistStreams<S>, Vec<PressOnDropButton>)
380where
381    S: Scope<Timestamp = Timestamp>,
382{
383    // There is no guarantee that the sink as-of is beyond the persist shard's since. If it isn't,
384    // instantiating a `persist_source` with it would panic. So instead we leave it to
385    // `persist_source` to select an appropriate as-of. We only care about times beyond the current
386    // shard upper anyway.
387    //
388    // TODO(teskje): Ideally we would select the as-of as `join(sink_as_of, since, upper)`, to
389    // allow `persist_source` to omit as much historical detail as possible. However, we don't know
390    // the shard frontiers and we cannot get them here as that requires an `async` context. We
391    // should consider extending the `persist_source` API to allow as-of selection based on the
392    // shard's current frontiers.
393    let as_of = None;
394
395    let until = Antichain::new();
396    let map_filter_project = None;
397
398    let (ok_stream, err_stream, token) = mz_storage_operators::persist_source::persist_source(
399        scope,
400        sink_id,
401        Arc::clone(&compute_state.persist_clients),
402        &compute_state.txns_ctx,
403        &compute_state.worker_config,
404        target,
405        None,
406        as_of,
407        SnapshotMode::Include,
408        until,
409        map_filter_project,
410        compute_state.dataflow_max_inflight_bytes(),
411        start_signal,
412        |error| panic!("compute_persist_sink: {error}"),
413    );
414
415    let streams = OkErr::new(ok_stream, err_stream);
416    (streams, token)
417}
418
419/// A description for a batch of updates to be written.
420///
421/// Batch descriptions are produced by the `mint` operator and consumed by the `write` and `append`
422/// operators, where they inform which batches should be written or appended, respectively.
423///
424/// Each batch description also contains the index of its "append worker", i.e. the worker that is
425/// responsible for appending the written batches to the output shard.
426#[derive(Clone, Serialize, Deserialize)]
427struct BatchDescription {
428    lower: Antichain<Timestamp>,
429    upper: Antichain<Timestamp>,
430    append_worker: usize,
431}
432
433impl BatchDescription {
434    fn new(lower: Antichain<Timestamp>, upper: Antichain<Timestamp>, append_worker: usize) -> Self {
435        assert!(PartialOrder::less_than(&lower, &upper));
436        Self {
437            lower,
438            upper,
439            append_worker,
440        }
441    }
442}
443
444impl std::fmt::Debug for BatchDescription {
445    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
446        write!(
447            f,
448            "({:?}, {:?})@{}",
449            self.lower.elements(),
450            self.upper.elements(),
451            self.append_worker,
452        )
453    }
454}
455
456/// Construct a name for the given sub-operator.
457fn operator_name(sink_id: GlobalId, sub_operator: &str) -> String {
458    format!("mv_sink({sink_id})::{sub_operator}")
459}
460
461/// Implementation of the `mint` operator.
462mod mint {
463    use super::*;
464
465    /// Render the `mint` operator.
466    ///
467    /// The parameters passed in are:
468    ///  * `sink_id`: The `GlobalId` of the sink export.
469    ///  * `persist_api`: An object providing access to the output persist shard.
470    ///  * `as_of`: The first time for which the sink may produce output.
471    ///  * `read_only_tx`: A receiver that reports the sink is in read-only mode.
472    ///  * `desired`: The ok/err streams that should be sinked to persist.
473    pub fn render<S>(
474        sink_id: GlobalId,
475        persist_api: PersistApi,
476        as_of: Antichain<Timestamp>,
477        mut read_only_rx: watch::Receiver<bool>,
478        desired: &DesiredStreams<S>,
479        worker_config: Rc<ConfigSet>,
480    ) -> (
481        DesiredStreams<S>,
482        DescsStream<S>,
483        SharedSinkFrontier,
484        PressOnDropButton,
485    )
486    where
487        S: Scope<Timestamp = Timestamp>,
488    {
489        let scope = desired.ok.scope();
490        let worker_id = scope.index();
491        let worker_count = scope.peers();
492
493        // Determine the active worker for the mint operator.
494        let active_worker_id = usize::cast_from(sink_id.hashed()) % scope.peers();
495
496        let sink_frontier = Rc::new(RefCell::new(Antichain::from_elem(Timestamp::MIN)));
497        let shared_frontier = Rc::clone(&sink_frontier);
498
499        let name = operator_name(sink_id, "mint");
500        let mut op = OperatorBuilder::new(name, scope);
501
502        let (ok_output, ok_stream) = op.new_output::<CapacityContainerBuilder<_>>();
503        let (err_output, err_stream) = op.new_output::<CapacityContainerBuilder<_>>();
504        let desired_outputs = OkErr::new(ok_output, err_output);
505        let desired_output_streams = OkErr::new(ok_stream, err_stream);
506
507        let (desc_output, desc_output_stream) = op.new_output();
508
509        let mut desired_inputs = OkErr {
510            ok: op.new_input_for(&desired.ok, Pipeline, &desired_outputs.ok),
511            err: op.new_input_for(&desired.err, Pipeline, &desired_outputs.err),
512        };
513
514        let button = op.build(move |capabilities| async move {
515            // Passing through the `desired` streams only requires data capabilities, so we can
516            // immediately drop their initial capabilities here.
517            let [_, _, desc_cap]: [_; 3] =
518                capabilities.try_into().expect("one capability per output");
519
520            // Non-active workers just pass the `desired` and `persist` data through.
521            if worker_id != active_worker_id {
522                drop(desc_cap);
523                shared_frontier.borrow_mut().clear();
524
525                loop {
526                    tokio::select! {
527                        Some(event) = desired_inputs.ok.next() => {
528                            if let Event::Data(cap, mut data) = event {
529                                desired_outputs.ok.give_container(&cap, &mut data);
530                            }
531                        }
532                        Some(event) = desired_inputs.err.next() => {
533                            if let Event::Data(cap, mut data) = event {
534                                desired_outputs.err.give_container(&cap, &mut data);
535                            }
536                        }
537                        // All inputs are exhausted, so we can shut down.
538                        else => return,
539                    }
540                }
541            }
542
543            let mut cap_set = CapabilitySet::from_elem(desc_cap);
544
545            let read_only = *read_only_rx.borrow_and_update();
546            let mut state = State::new(sink_id, worker_count, as_of, read_only, worker_config);
547
548            // Create a stream that reports advancements of the target shard's frontier and updates
549            // the shared sink frontier.
550            //
551            // We collect the persist frontier from a write handle directly, rather than inspecting
552            // the `persist` stream, because the latter has two annoying glitches:
553            //  (a) It starts at the shard's read frontier, not its write frontier.
554            //  (b) It can lag behind if there are spikes in ingested data.
555            let mut persist_frontiers = pin!(async_stream::stream! {
556                let mut writer = persist_api.open_writer().await;
557                let mut frontier = Antichain::from_elem(Timestamp::MIN);
558                while !frontier.is_empty() {
559                    writer.wait_for_upper_past(&frontier).await;
560                    frontier = writer.upper().clone();
561                    shared_frontier.borrow_mut().clone_from(&frontier);
562                    yield frontier.clone();
563                }
564            });
565
566            loop {
567                // Read from the inputs, pass through all data to the respective outputs, and keep
568                // track of the input frontiers. When a frontier advances we might have to mint a
569                // new batch description.
570                let maybe_desc = tokio::select! {
571                    Some(event) = desired_inputs.ok.next() => {
572                        match event {
573                            Event::Data(cap, mut data) => {
574                                desired_outputs.ok.give_container(&cap, &mut data);
575                                None
576                            }
577                            Event::Progress(frontier) => {
578                                state.advance_desired_ok_frontier(frontier);
579                                state.maybe_mint_batch_description()
580                            }
581                        }
582                    }
583                    Some(event) = desired_inputs.err.next() => {
584                        match event {
585                            Event::Data(cap, mut data) => {
586                                desired_outputs.err.give_container(&cap, &mut data);
587                                None
588                            }
589                            Event::Progress(frontier) => {
590                                state.advance_desired_err_frontier(frontier);
591                                state.maybe_mint_batch_description()
592                            }
593                        }
594                    }
595                    Some(frontier) = persist_frontiers.next() => {
596                        state.advance_persist_frontier(frontier);
597                        state.maybe_mint_batch_description()
598                    }
599                    Ok(()) = read_only_rx.changed(), if read_only => {
600                        state.allow_writes();
601                        state.maybe_mint_batch_description()
602                    }
603                    // All inputs are exhausted, so we can shut down.
604                    else => return,
605                };
606
607                if let Some(desc) = maybe_desc {
608                    let lower_ts = *desc.lower.as_option().expect("not empty");
609                    let cap = cap_set.delayed(&lower_ts);
610                    desc_output.give(&cap, desc);
611
612                    // We only emit strictly increasing `lower`s, so we can let our output frontier
613                    // advance beyond the current `lower`.
614                    cap_set.downgrade([lower_ts.step_forward()]);
615                } else {
616                    // The next emitted `lower` will be at least the `persist` frontier, so we can
617                    // advance our output frontier as far.
618                    let _ = cap_set.try_downgrade(state.persist_frontier.iter());
619                }
620            }
621        });
622
623        (
624            desired_output_streams,
625            desc_output_stream,
626            sink_frontier,
627            button.press_on_drop(),
628        )
629    }
630
631    /// State maintained by the `mint` operator.
632    struct State {
633        sink_id: GlobalId,
634        /// The number of workers in the Timely cluster.
635        worker_count: usize,
636        /// The frontiers of the `desired` inputs.
637        desired_frontiers: OkErr<Antichain<Timestamp>, Antichain<Timestamp>>,
638        /// The frontier of the target persist shard.
639        persist_frontier: Antichain<Timestamp>,
640        /// The append worker for the next batch description, chosen in round-robin fashion.
641        next_append_worker: usize,
642        /// The last `lower` we have emitted in a batch description, if any. Whenever the
643        /// `persist_frontier` moves beyond this frontier, we need to mint a new description.
644        last_lower: Option<Antichain<Timestamp>>,
645        /// Whether we are operating in read-only mode.
646        ///
647        /// In read-only mode, minting of batch descriptions is disabled.
648        read_only: bool,
649        /// Dynamic configuration.
650        worker_config: Rc<ConfigSet>,
651    }
652
653    impl State {
654        fn new(
655            sink_id: GlobalId,
656            worker_count: usize,
657            as_of: Antichain<Timestamp>,
658            read_only: bool,
659            worker_config: Rc<ConfigSet>,
660        ) -> Self {
661            // Initializing `persist_frontier` to the `as_of` ensures that the first minted batch
662            // description will have a `lower` of `as_of` or beyond, and thus that we don't spend
663            // work needlessly writing batches at previous times.
664            let persist_frontier = as_of;
665
666            Self {
667                sink_id,
668                worker_count,
669                desired_frontiers: OkErr::new_frontiers(),
670                persist_frontier,
671                next_append_worker: 0,
672                last_lower: None,
673                read_only,
674                worker_config,
675            }
676        }
677
678        fn trace<S: AsRef<str>>(&self, message: S) {
679            let message = message.as_ref();
680            trace!(
681                sink_id = %self.sink_id,
682                desired_frontier = ?self.desired_frontiers.frontier().elements(),
683                persist_frontier = ?self.persist_frontier.elements(),
684                last_lower = ?self.last_lower.as_ref().map(|f| f.elements()),
685                message,
686            );
687        }
688
689        fn advance_desired_ok_frontier(&mut self, frontier: Antichain<Timestamp>) {
690            if advance(&mut self.desired_frontiers.ok, frontier) {
691                self.trace("advanced `desired` ok frontier");
692            }
693        }
694
695        fn advance_desired_err_frontier(&mut self, frontier: Antichain<Timestamp>) {
696            if advance(&mut self.desired_frontiers.err, frontier) {
697                self.trace("advanced `desired` err frontier");
698            }
699        }
700
701        fn advance_persist_frontier(&mut self, frontier: Antichain<Timestamp>) {
702            if advance(&mut self.persist_frontier, frontier) {
703                self.trace("advanced `persist` frontier");
704            }
705        }
706
707        fn allow_writes(&mut self) {
708            if self.read_only {
709                self.read_only = false;
710                self.trace("disabled read-only mode");
711            }
712        }
713
714        fn maybe_mint_batch_description(&mut self) -> Option<BatchDescription> {
715            let desired_frontier = self.desired_frontiers.frontier();
716            let persist_frontier = &self.persist_frontier;
717
718            // We only mint new batch descriptions when:
719            //  1. We are _not_ in read-only mode.
720            //  2. The `desired` frontier is ahead of the `persist` frontier.
721            //  3. The `persist` frontier advanced since we last emitted a batch description.
722            let desired_ahead = PartialOrder::less_than(persist_frontier, desired_frontier);
723            let persist_advanced = self.last_lower.as_ref().map_or(true, |lower| {
724                PartialOrder::less_than(lower, persist_frontier)
725            });
726
727            if self.read_only || !desired_ahead || !persist_advanced {
728                return None;
729            }
730
731            let lower = persist_frontier.clone();
732            let upper = desired_frontier.clone();
733            let append_worker = self.next_append_worker;
734            let desc = BatchDescription::new(lower, upper, append_worker);
735
736            if ENABLE_MV_APPEND_SMEARING.get(&self.worker_config) {
737                self.next_append_worker = (self.next_append_worker + 1) % self.worker_count;
738            }
739
740            self.last_lower = Some(desc.lower.clone());
741
742            self.trace(format!("minted batch description: {desc:?}"));
743            Some(desc)
744        }
745    }
746}
747
748/// Implementation of the `write` operator.
749mod write {
750    use super::*;
751
752    /// Render the `write` operator.
753    ///
754    /// The parameters passed in are:
755    ///  * `sink_id`: The `GlobalId` of the sink export.
756    ///  * `persist_api`: An object providing access to the output persist shard.
757    ///  * `as_of`: The first time for which the sink may produce output.
758    ///  * `desired`: The ok/err streams that should be sinked to persist.
759    ///  * `persist`: The ok/err streams read back from the output persist shard.
760    ///  * `descs`: The stream of batch descriptions produced by the `mint` operator.
761    pub fn render<S>(
762        sink_id: GlobalId,
763        persist_api: PersistApi,
764        as_of: Antichain<Timestamp>,
765        desired: &DesiredStreams<S>,
766        persist: &PersistStreams<S>,
767        descs: &Stream<S, BatchDescription>,
768        worker_config: Rc<ConfigSet>,
769    ) -> (BatchesStream<S>, PressOnDropButton)
770    where
771        S: Scope<Timestamp = Timestamp>,
772    {
773        let scope = desired.ok.scope();
774        let worker_id = scope.index();
775
776        let name = operator_name(sink_id, "write");
777        let mut op = OperatorBuilder::new(name, scope);
778
779        let (batches_output, batches_output_stream) = op.new_output();
780
781        // It is important that we exchange the `desired` and `persist` data the same way, so
782        // updates that cancel each other out end up on the same worker.
783        let exchange_ok = |(d, _, _): &(Row, Timestamp, Diff)| d.hashed();
784        let exchange_err = |(d, _, _): &(DataflowError, Timestamp, Diff)| d.hashed();
785
786        let mut desired_inputs = OkErr::new(
787            op.new_disconnected_input(&desired.ok, Exchange::new(exchange_ok)),
788            op.new_disconnected_input(&desired.err, Exchange::new(exchange_err)),
789        );
790        let mut persist_inputs = OkErr::new(
791            op.new_disconnected_input(&persist.ok, Exchange::new(exchange_ok)),
792            op.new_disconnected_input(&persist.err, Exchange::new(exchange_err)),
793        );
794        let mut descs_input = op.new_input_for(&descs.broadcast(), Pipeline, &batches_output);
795
796        let button = op.build(move |capabilities| async move {
797            // We will use the data capabilities from the `descs` input to produce output, so no
798            // need to hold onto the initial capabilities.
799            drop(capabilities);
800
801            let writer = persist_api.open_writer().await;
802            let sink_metrics = persist_api.open_metrics().await;
803            let mut state = State::new(
804                sink_id,
805                worker_id,
806                writer,
807                sink_metrics,
808                as_of,
809                &worker_config,
810            );
811
812            loop {
813                // Read from the inputs, extract `desired` updates as positive contributions to
814                // `correction` and `persist` updates as negative contributions. If either the
815                // `desired` or `persist` frontier advances, or if we receive a new batch description,
816                // we might have to write a new batch.
817                let maybe_batch = tokio::select! {
818                    Some(event) = desired_inputs.ok.next() => {
819                        match event {
820                            Event::Data(_cap, mut data) => {
821                                state.corrections.ok.insert(&mut data);
822                                None
823                            }
824                            Event::Progress(frontier) => {
825                                state.advance_desired_ok_frontier(frontier);
826                                state.maybe_write_batch().await
827                            }
828                        }
829                    }
830                    Some(event) = desired_inputs.err.next() => {
831                        match event {
832                            Event::Data(_cap, mut data) => {
833                                state.corrections.err.insert(&mut data);
834                                None
835                            }
836                            Event::Progress(frontier) => {
837                                state.advance_desired_err_frontier(frontier);
838                                state.maybe_write_batch().await
839                            }
840                        }
841                    }
842                    Some(event) = persist_inputs.ok.next() => {
843                        match event {
844                            Event::Data(_cap, mut data) => {
845                                state.corrections.ok.insert_negated(&mut data);
846                                None
847                            }
848                            Event::Progress(frontier) => {
849                                state.advance_persist_ok_frontier(frontier);
850                                state.maybe_write_batch().await
851                            }
852                        }
853                    }
854                    Some(event) = persist_inputs.err.next() => {
855                        match event {
856                            Event::Data(_cap, mut data) => {
857                                state.corrections.err.insert_negated(&mut data);
858                                None
859                            }
860                            Event::Progress(frontier) => {
861                                state.advance_persist_err_frontier(frontier);
862                                state.maybe_write_batch().await
863                            }
864                        }
865                    }
866                    Some(event) = descs_input.next() => {
867                        match event {
868                            Event::Data(cap, data) => {
869                                for desc in data {
870                                    state.absorb_batch_description(desc, cap.clone());
871                                }
872                                state.maybe_write_batch().await
873                            }
874                            Event::Progress(_frontier) => None,
875                        }
876                    }
877                    // All inputs are exhausted, so we can shut down.
878                    else => return,
879                };
880
881                if let Some((index, batch, cap)) = maybe_batch {
882                    batches_output.give(&cap, (index, batch));
883                }
884            }
885        });
886
887        (batches_output_stream, button.press_on_drop())
888    }
889
890    /// State maintained by the `write` operator.
891    struct State {
892        sink_id: GlobalId,
893        worker_id: usize,
894        persist_writer: WriteHandle<SourceData, (), Timestamp, StorageDiff>,
895        /// Contains `desired - persist`, reflecting the updates we would like to commit to
896        /// `persist` in order to "correct" it to track `desired`. This collection is only modified
897        /// by updates received from either the `desired` or `persist` inputs.
898        corrections: OkErr<Correction<Row>, Correction<DataflowError>>,
899        /// The frontiers of the `desired` inputs.
900        desired_frontiers: OkErr<Antichain<Timestamp>, Antichain<Timestamp>>,
901        /// The frontiers of the `persist` inputs.
902        ///
903        /// Note that this is _not_ the same as the write frontier of the output persist shard! It
904        /// usually is, but during snapshot processing, these frontiers will start at the shard's
905        /// read frontier, so they can be beyond its write frontier. This is important as it means
906        /// we must not discard batch descriptions based on these persist frontiers: A batch
907        /// description might still be valid even if its `lower` is before the persist frontiers we
908        /// observe.
909        persist_frontiers: OkErr<Antichain<Timestamp>, Antichain<Timestamp>>,
910        /// The current valid batch description and associated output capability, if any.
911        batch_description: Option<(BatchDescription, Capability<Timestamp>)>,
912        /// A request to force a consolidation of `corrections` once both `desired_frontiers` and
913        /// `persist_frontiers` become greater than the given frontier.
914        ///
915        /// Normally we force a consolidation whenever we write a batch, but there are periods
916        /// (like read-only mode) when that doesn't happen, and we need to manually force
917        /// consolidation instead. Currently this is only used to ensure we quickly get rid of the
918        /// snapshot updates.
919        force_consolidation_after: Option<Antichain<Timestamp>>,
920    }
921
922    impl State {
923        fn new(
924            sink_id: GlobalId,
925            worker_id: usize,
926            persist_writer: WriteHandle<SourceData, (), Timestamp, StorageDiff>,
927            metrics: SinkMetrics,
928            as_of: Antichain<Timestamp>,
929            worker_config: &ConfigSet,
930        ) -> Self {
931            let worker_metrics = metrics.for_worker(worker_id);
932
933            // Force a consolidation of `corrections` after the snapshot updates have been fully
934            // processed, to ensure we get rid of those as quickly as possible.
935            let force_consolidation_after = Some(as_of);
936
937            Self {
938                sink_id,
939                worker_id,
940                persist_writer,
941                corrections: OkErr::new(
942                    Correction::new(metrics.clone(), worker_metrics.clone(), worker_config),
943                    Correction::new(metrics, worker_metrics, worker_config),
944                ),
945                desired_frontiers: OkErr::new_frontiers(),
946                persist_frontiers: OkErr::new_frontiers(),
947                batch_description: None,
948                force_consolidation_after,
949            }
950        }
951
952        fn trace<S: AsRef<str>>(&self, message: S) {
953            let message = message.as_ref();
954            trace!(
955                sink_id = %self.sink_id,
956                worker = %self.worker_id,
957                desired_frontier = ?self.desired_frontiers.frontier().elements(),
958                persist_frontier = ?self.persist_frontiers.frontier().elements(),
959                batch_description = ?self.batch_description.as_ref().map(|(d, _)| d),
960                message,
961            );
962        }
963
964        fn advance_desired_ok_frontier(&mut self, frontier: Antichain<Timestamp>) {
965            if advance(&mut self.desired_frontiers.ok, frontier) {
966                self.apply_desired_frontier_advancement();
967                self.trace("advanced `desired` ok frontier");
968            }
969        }
970
971        fn advance_desired_err_frontier(&mut self, frontier: Antichain<Timestamp>) {
972            if advance(&mut self.desired_frontiers.err, frontier) {
973                self.apply_desired_frontier_advancement();
974                self.trace("advanced `desired` err frontier");
975            }
976        }
977
978        fn advance_persist_ok_frontier(&mut self, frontier: Antichain<Timestamp>) {
979            if advance(&mut self.persist_frontiers.ok, frontier) {
980                self.apply_persist_frontier_advancement();
981                self.trace("advanced `persist` ok frontier");
982            }
983        }
984
985        fn advance_persist_err_frontier(&mut self, frontier: Antichain<Timestamp>) {
986            if advance(&mut self.persist_frontiers.err, frontier) {
987                self.apply_persist_frontier_advancement();
988                self.trace("advanced `persist` err frontier");
989            }
990        }
991
992        /// Apply the effects of a previous `desired` frontier advancement.
993        fn apply_desired_frontier_advancement(&mut self) {
994            self.maybe_force_consolidation();
995        }
996
997        /// Apply the effects of a previous `persist` frontier advancement.
998        fn apply_persist_frontier_advancement(&mut self) {
999            let frontier = self.persist_frontiers.frontier();
1000
1001            // We will only emit times at or after the `persist` frontier, so now is a good time to
1002            // advance the times of stashed updates.
1003            self.corrections.ok.advance_since(frontier.clone());
1004            self.corrections.err.advance_since(frontier.clone());
1005
1006            self.maybe_force_consolidation();
1007        }
1008
1009        /// If the current consolidation request has become applicable, apply it.
1010        fn maybe_force_consolidation(&mut self) {
1011            let Some(request) = &self.force_consolidation_after else {
1012                return;
1013            };
1014
1015            let desired_frontier = self.desired_frontiers.frontier();
1016            let persist_frontier = self.persist_frontiers.frontier();
1017            if PartialOrder::less_than(request, desired_frontier)
1018                && PartialOrder::less_than(request, persist_frontier)
1019            {
1020                self.trace("forcing correction consolidation");
1021                self.corrections.ok.consolidate_at_since();
1022                self.corrections.err.consolidate_at_since();
1023
1024                // Remove the consolidation request, now that we have fulfilled it.
1025                self.force_consolidation_after = None;
1026            }
1027        }
1028
1029        fn absorb_batch_description(&mut self, desc: BatchDescription, cap: Capability<Timestamp>) {
1030            // The incoming batch description is outdated if we already have a batch description
1031            // with a greater `lower`.
1032            //
1033            // Note that we cannot assume a description is outdated based on the comparison of its
1034            // `lower` with the `persist_frontier`. The persist frontier observed by the `write`
1035            // operator is initialized with the shard's read frontier, so it can be greater than
1036            // the shard's write frontier.
1037            if let Some((prev, _)) = &self.batch_description {
1038                if PartialOrder::less_than(&desc.lower, &prev.lower) {
1039                    self.trace(format!("skipping outdated batch description: {desc:?}"));
1040                    return;
1041                }
1042            }
1043
1044            self.batch_description = Some((desc, cap));
1045            self.trace("set batch description");
1046        }
1047
1048        async fn maybe_write_batch(
1049            &mut self,
1050        ) -> Option<(BatchDescription, ProtoBatch, Capability<Timestamp>)> {
1051            let (desc, _cap) = self.batch_description.as_ref()?;
1052
1053            // We can write a new batch if we have seen all `persist` updates before `lower` and
1054            // all `desired` updates up to `upper`.
1055            let persist_complete =
1056                PartialOrder::less_equal(&desc.lower, self.persist_frontiers.frontier());
1057            let desired_complete =
1058                PartialOrder::less_equal(&desc.upper, self.desired_frontiers.frontier());
1059            if !persist_complete || !desired_complete {
1060                return None;
1061            }
1062
1063            let (desc, cap) = self.batch_description.take()?;
1064
1065            let ok_updates = self.corrections.ok.updates_before(&desc.upper);
1066            let err_updates = self.corrections.err.updates_before(&desc.upper);
1067
1068            let oks = ok_updates.map(|(d, t, r)| ((SourceData(Ok(d)), ()), t, r.into_inner()));
1069            let errs = err_updates.map(|(d, t, r)| ((SourceData(Err(d)), ()), t, r.into_inner()));
1070            let mut updates = oks.chain(errs).peekable();
1071
1072            // Don't write empty batches.
1073            if updates.peek().is_none() {
1074                drop(updates);
1075                self.trace("skipping empty batch");
1076                return None;
1077            }
1078
1079            let batch = self
1080                .persist_writer
1081                .batch(updates, desc.lower.clone(), desc.upper.clone())
1082                .await
1083                .expect("valid usage")
1084                .into_transmittable_batch();
1085
1086            self.trace("wrote a batch");
1087            Some((desc, batch, cap))
1088        }
1089    }
1090}
1091
1092/// Implementation of the `append` operator.
1093mod append {
1094    use super::*;
1095
1096    /// Render the `append` operator.
1097    ///
1098    /// The parameters passed in are:
1099    ///  * `sink_id`: The `GlobalId` of the sink export.
1100    ///  * `persist_api`: An object providing access to the output persist shard.
1101    ///  * `descs`: The stream of batch descriptions produced by the `mint` operator.
1102    ///  * `batches`: The stream of written batches produced by the `write` operator.
1103    pub fn render<S>(
1104        sink_id: GlobalId,
1105        persist_api: PersistApi,
1106        descs: &DescsStream<S>,
1107        batches: &BatchesStream<S>,
1108    ) -> PressOnDropButton
1109    where
1110        S: Scope<Timestamp = Timestamp>,
1111    {
1112        let scope = descs.scope();
1113        let worker_id = scope.index();
1114
1115        let name = operator_name(sink_id, "append");
1116        let mut op = OperatorBuilder::new(name, scope);
1117
1118        // Broadcast batch descriptions to all workers, regardless of whether or not they are
1119        // responsible for the append, to give them a chance to clean up any outdated state they
1120        // might still hold.
1121        let mut descs_input = op.new_disconnected_input(&descs.broadcast(), Pipeline);
1122        let mut batches_input = op.new_disconnected_input(
1123            batches,
1124            Exchange::new(move |(desc, _): &(BatchDescription, _)| {
1125                u64::cast_from(desc.append_worker)
1126            }),
1127        );
1128
1129        let button = op.build(move |_capabilities| async move {
1130            let writer = persist_api.open_writer().await;
1131            let mut state = State::new(sink_id, worker_id, writer);
1132
1133            loop {
1134                // Read from the inputs, absorb batch descriptions and batches. If the `batches`
1135                // frontier advances, or if we receive a new batch description, we might have to
1136                // append a new batch.
1137                tokio::select! {
1138                    Some(event) = descs_input.next() => {
1139                        if let Event::Data(_cap, data) = event {
1140                            for desc in data {
1141                                state.absorb_batch_description(desc).await;
1142                                state.maybe_append_batches().await;
1143                            }
1144                        }
1145                    }
1146                    Some(event) = batches_input.next() => {
1147                        match event {
1148                            Event::Data(_cap, data) => {
1149                                // The batch description is only used for routing and we ignore it
1150                                // here since we already get one from `descs_input`.
1151                                for (_desc, batch) in data {
1152                                    state.absorb_batch(batch).await;
1153                                }
1154                            }
1155                            Event::Progress(frontier) => {
1156                                state.advance_batches_frontier(frontier);
1157                                state.maybe_append_batches().await;
1158                            }
1159                        }
1160                    }
1161                    // All inputs are exhausted, so we can shut down.
1162                    else => return,
1163                }
1164            }
1165        });
1166
1167        button.press_on_drop()
1168    }
1169
1170    /// State maintained by the `append` operator.
1171    struct State {
1172        sink_id: GlobalId,
1173        worker_id: usize,
1174        persist_writer: WriteHandle<SourceData, (), Timestamp, StorageDiff>,
1175        /// The current input frontier of `batches`.
1176        batches_frontier: Antichain<Timestamp>,
1177        /// The greatest observed `lower` from both `descs` and `batches`.
1178        lower: Antichain<Timestamp>,
1179        /// The batch description for `lower`, if any.
1180        batch_description: Option<BatchDescription>,
1181        /// Batches received for `lower`.
1182        batches: Vec<Batch<SourceData, (), Timestamp, StorageDiff>>,
1183    }
1184
1185    impl State {
1186        fn new(
1187            sink_id: GlobalId,
1188            worker_id: usize,
1189            persist_writer: WriteHandle<SourceData, (), Timestamp, StorageDiff>,
1190        ) -> Self {
1191            Self {
1192                sink_id,
1193                worker_id,
1194                persist_writer,
1195                batches_frontier: Antichain::from_elem(Timestamp::MIN),
1196                lower: Antichain::from_elem(Timestamp::MIN),
1197                batch_description: None,
1198                batches: Default::default(),
1199            }
1200        }
1201
1202        fn trace<S: AsRef<str>>(&self, message: S) {
1203            let message = message.as_ref();
1204            trace!(
1205                sink_id = %self.sink_id,
1206                worker = %self.worker_id,
1207                batches_frontier = ?self.batches_frontier.elements(),
1208                lower = ?self.lower.elements(),
1209                batch_description = ?self.batch_description,
1210                message,
1211            );
1212        }
1213
1214        fn advance_batches_frontier(&mut self, frontier: Antichain<Timestamp>) {
1215            if advance(&mut self.batches_frontier, frontier) {
1216                self.trace("advanced `batches` frontier");
1217            }
1218        }
1219
1220        /// Advance the current `lower`.
1221        ///
1222        /// Discards all currently stashed batches and batch descriptions, assuming that they are
1223        /// now invalid.
1224        async fn advance_lower(&mut self, frontier: Antichain<Timestamp>) {
1225            assert!(PartialOrder::less_than(&self.lower, &frontier));
1226
1227            self.lower = frontier;
1228            self.batch_description = None;
1229
1230            // Remove stashed batches, cleaning up those we didn't append.
1231            for batch in self.batches.drain(..) {
1232                batch.delete().await;
1233            }
1234
1235            self.trace("advanced `lower`");
1236        }
1237
1238        /// Absorb the given batch description into the state, provided it is not outdated.
1239        async fn absorb_batch_description(&mut self, desc: BatchDescription) {
1240            if PartialOrder::less_than(&self.lower, &desc.lower) {
1241                self.advance_lower(desc.lower.clone()).await;
1242            } else if &self.lower != &desc.lower {
1243                self.trace(format!("skipping outdated batch description: {desc:?}"));
1244                return;
1245            }
1246
1247            if desc.append_worker == self.worker_id {
1248                self.batch_description = Some(desc);
1249                self.trace("set batch description");
1250            }
1251        }
1252
1253        /// Absorb the given batch into the state, provided it is not outdated.
1254        async fn absorb_batch(&mut self, batch: ProtoBatch) {
1255            let batch = self.persist_writer.batch_from_transmittable_batch(batch);
1256            if PartialOrder::less_than(&self.lower, batch.lower()) {
1257                self.advance_lower(batch.lower().clone()).await;
1258            } else if &self.lower != batch.lower() {
1259                self.trace(format!(
1260                    "skipping outdated batch: ({:?}, {:?})",
1261                    batch.lower().elements(),
1262                    batch.upper().elements(),
1263                ));
1264
1265                // Ensure the batch's data gets properly cleaned up before dropping it.
1266                batch.delete().await;
1267                return;
1268            }
1269
1270            self.batches.push(batch);
1271            self.trace("absorbed a batch");
1272        }
1273
1274        async fn maybe_append_batches(&mut self) {
1275            let batches_complete = PartialOrder::less_than(&self.lower, &self.batches_frontier);
1276            if !batches_complete {
1277                return;
1278            }
1279
1280            let Some(desc) = self.batch_description.take() else {
1281                return;
1282            };
1283
1284            let new_lower = match self.append_batches(desc).await {
1285                Ok(shard_upper) => {
1286                    self.trace("appended a batch");
1287                    shard_upper
1288                }
1289                Err(shard_upper) => {
1290                    // Failing the append is expected in the presence of concurrent replicas. There
1291                    // is nothing special to do here: The self-correcting feedback mechanism
1292                    // ensures that we observe the concurrent changes, compute their consequences,
1293                    // and append them at a future time.
1294                    self.trace(format!(
1295                        "append failed due to `lower` mismatch: {:?}",
1296                        shard_upper.elements(),
1297                    ));
1298                    shard_upper
1299                }
1300            };
1301
1302            self.advance_lower(new_lower).await;
1303        }
1304
1305        /// Append the current `batches` to the output shard.
1306        ///
1307        /// Returns whether the append was successful or not, and the current shard upper in either
1308        /// case.
1309        ///
1310        /// This method advances the shard upper to the batch `lower` if necessary. This is the
1311        /// mechanism that brings the shard upper to the sink as-of when appending the initial
1312        /// batch.
1313        ///
1314        /// An alternative mechanism for bringing the shard upper to the sink as-of would be making
1315        /// a single append at operator startup. The reason we are doing it here instead is that it
1316        /// simplifies the implementation of read-only mode. In read-only mode we have to defer any
1317        /// persist writes, including the initial upper bump. Having only a single place that
1318        /// performs writes makes it easy to ensure we are doing that correctly.
1319        async fn append_batches(
1320            &mut self,
1321            desc: BatchDescription,
1322        ) -> Result<Antichain<Timestamp>, Antichain<Timestamp>> {
1323            let (lower, upper) = (desc.lower, desc.upper);
1324            let mut to_append: Vec<_> = self.batches.iter_mut().collect();
1325
1326            loop {
1327                let result = self
1328                    .persist_writer
1329                    .compare_and_append_batch(&mut to_append, lower.clone(), upper.clone())
1330                    .await
1331                    .expect("valid usage");
1332
1333                match result {
1334                    Ok(()) => return Ok(upper),
1335                    Err(mismatch) if PartialOrder::less_than(&mismatch.current, &lower) => {
1336                        advance_shard_upper(&mut self.persist_writer, lower.clone()).await;
1337                    }
1338                    Err(mismatch) => return Err(mismatch.current),
1339                }
1340            }
1341        }
1342    }
1343
1344    /// Advance the frontier of the given writer's shard to at least the given `upper`.
1345    async fn advance_shard_upper(
1346        persist_writer: &mut WriteHandle<SourceData, (), Timestamp, StorageDiff>,
1347        upper: Antichain<Timestamp>,
1348    ) {
1349        let empty_updates: &[((SourceData, ()), Timestamp, StorageDiff)] = &[];
1350        let lower = Antichain::from_elem(Timestamp::MIN);
1351        persist_writer
1352            .append(empty_updates, lower, upper)
1353            .await
1354            .expect("valid usage")
1355            .expect("should always succeed");
1356    }
1357}