Skip to main content

mz_compute/sink/
materialized_view.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! A dataflow sink that writes input records to a persist shard.
11//!
12//! This implementation is both parallel and self-correcting.
13//!
14//!  * parallel: Multiple workers can participate in writing updates for the same times, letting
15//!    sink throughput scale with the number of workers allocated to the replica.
16//!  * self-correcting: The sink continually compares the contents of the persist shard with the
17//!    contents of the input collection and writes down the difference. If the persist shard ends
18//!    up with undesired contents for any reason, this is corrected the next time the sink manages
19//!    to append to the shard.
20//!
21//! ### Operators
22//!
23//! The persist sink consists of a graph of operators.
24//!
25//!    desired                    persist <---------------.
26//!       |                          |                    |
27//!       |                          |                    |
28//!       |---------------------.    |                    |
29//!       |                     |    |                    |
30//!       |                     |    |                    |
31//!       v                     v    v                    |
32//!   +--------+              +--------+              +--------+
33//!   |  mint  | --descs-.--> | write  | --batches--> | append |
34//!   +--------+          \   +--------+          .-> +--------+
35//!                        \_____________________/
36//!
37//!  * `mint` mints batch descriptions, i.e., `(lower, upper)` bounds of batches that should be
38//!    written. The persist API requires that all workers write batches with the same bounds, so
39//!    they can be appended as a single logical batch. To ensure this, the `mint` operator only
40//!    runs on a single worker that broadcasts minted descriptions to all workers. Batch bounds are
41//!    picked based on the frontiers of the `desired` stream and the output persist shard.
42//!  * `write` stages batch data in persist, based on the batch descriptions received from the
43//!    `mint` operator, but without appending it to the persist shard. This is a multi-worker
44//!    operator, with each worker writing batches of the data that arrives at its local inputs. To
45//!    do so it reads from the `desired` and `persist` streams and produces the difference between
46//!    them to write back out, ensuring that the final contents of the persist shard match
47//!    `desired`.
48//!  * `append` appends the batches minted by `mint` and written by `write` to the persist shard.
49//!    This is a multi-worker operator, where workers are responsible for different subsets of
50//!    batch descriptions. If a worker is responsible for a given batch description, it waits for
51//!    all workers to stage their batches for that batch description, then appends all the batches
52//!    together as a single logical batch.
53//!
54//! Note that while the above graph suggests that `mint` and `write` both receive copies of the
55//! `desired` stream, the actual implementation passes that stream through `mint` and lets `write`
56//! read the passed-through stream, to avoid cloning data.
57//!
58//! Also note that the `append` operator's implementation would perhaps be more natural as a
59//! single-worker implementation. The purpose of sharing the work between all workers is to avoid a
60//! work imbalance where one worker is overloaded (doing both appends and the consequent persist
61//! maintenance work) while others are comparatively idle.
62//!
63//! The persist sink is written to be robust to the presence of other conflicting instances (e.g.
64//! from other replicas) writing to the same persist shard. Each of the three operators needs to be
65//! able to handle conflicting writes that unexpectedly change the contents of the output persist
66//! shard.
67//!
68//! ### Frontiers
69//!
70//! The `desired` frontier tracks the progress of the upstream dataflow, but may be rounded up to
71//! the next refresh time for dataflows that follow a refresh schedule other than "on commit".
72//!
73//! The `persist` frontier tracks the `upper` frontier of the target persist shard, with one
74//! exception: When the `persist_source` that reads back the shard is rendered, it will start
75//! reading at its `since` frontier. So if the shard's `since` is initially greater than its
76//! `upper`, the `persist` frontier too will be in advance of the shard `upper`, until the `upper`
77//! has caught up. To avoid getting confused by this edge case, the `mint` operator does not use
78//! the `persist` stream to observe the shard frontier but keeps its own `WriteHandle` instead.
79//!
80//! The `descs` frontier communicates which `lower` bounds may still be emitted in batch
81//! descriptions. All future batch descriptions will have a `lower` that is greater or equal to the
82//! current `descs` frontier.
83//!
84//! The `batches` frontier communicates for which `lower` bounds batches may still be written. All
85//! batches for descriptions with `lower`s less than the current `batches` frontier have already
86//! been written.
87//!
88//! ### Invariants
89//!
90//! The implementation upholds several invariants that can be relied upon to simplify the
91//! implementation:
92//!
93//!  1. `lower`s in minted batch descriptions are unique and strictly increasing. That is, the
94//!     `mint` operator will never mint the same `lower` twice and a minted `lower` is always
95//!     greater than any previously minted ones.
96//!  2. `upper`s in minted batch descriptions are monotonically increasing.
97//!  3. From (1) follows that there is always at most one "valid" batch description in flight in
98//!     the operator graph. "Valid" here means that the described batch can be appended to the
99//!     persist shard.
100//!
101//! The main simplification these invariants allow is that operators only need to keep track of the
102//! most recent batch description and/or `lower`. Previous batch descriptions are not valid
103//! anymore, so there is no reason to hold any state or perform any work in support of them.
104//!
105//! ### Read-only Mode
106//!
107//! The persist sink can optionally be initialized in read-only mode. In this mode it is passive
108//! and avoids any writes to persist. Activating the `read_only_rx` transitions the sink into write
109//! mode, where it commences normal operation.
110//!
111//! Read-only mode is implemented by the `mint` operator. To disable writes, the `mint` operator
112//! simply avoids minting any batch descriptions. Since both the `write` and the `append` operator
113//! require batch descriptions to write/append batches, this suppresses any persist communication.
114//! At the same time, the `write` operator still observes changes to the `desired` and `persist`
115//! collections, allowing it to keep its correction buffer up-to-date.
116
117use std::any::Any;
118use std::cell::RefCell;
119use std::pin::pin;
120use std::rc::Rc;
121use std::sync::Arc;
122
123use differential_dataflow::{AsCollection, Hashable, VecCollection};
124use futures::StreamExt;
125use mz_compute_types::dyncfgs::MV_SINK_ADVANCE_PERSIST_FRONTIERS;
126use mz_compute_types::sinks::{ComputeSinkDesc, MaterializedViewSinkConnection};
127use mz_dyncfg::ConfigSet;
128use mz_ore::cast::CastFrom;
129use mz_persist_client::batch::{Batch, ProtoBatch};
130use mz_persist_client::cache::PersistClientCache;
131use mz_persist_client::metrics::SinkMetrics;
132use mz_persist_client::operators::shard_source::{ErrorHandler, SnapshotMode};
133use mz_persist_client::write::WriteHandle;
134use mz_persist_client::{Diagnostics, PersistClient};
135use mz_persist_types::codec_impls::UnitSchema;
136use mz_repr::{Diff, GlobalId, Row, Timestamp};
137use mz_storage_types::StorageDiff;
138use mz_storage_types::controller::CollectionMetadata;
139use mz_storage_types::sources::SourceData;
140use mz_timely_util::builder_async::PressOnDropButton;
141use mz_timely_util::builder_async::{Event, OperatorBuilder};
142use mz_timely_util::probe::{Handle, ProbeNotify};
143use serde::{Deserialize, Serialize};
144use timely::PartialOrder;
145use timely::container::CapacityContainerBuilder;
146use timely::dataflow::channels::pact::{Exchange, Pipeline};
147use timely::dataflow::operators::vec::Broadcast;
148use timely::dataflow::operators::{Capability, CapabilitySet, probe};
149use timely::dataflow::{Scope, StreamVec};
150use timely::progress::Antichain;
151use tokio::sync::watch;
152use tracing::trace;
153
154use crate::compute_state::ComputeState;
155use crate::render::StartSignal;
156use crate::render::errors::DataflowErrorSer;
157use crate::render::sinks::SinkRender;
158use crate::sink::correction::{ChannelLogging, Correction, CorrectionLogger};
159use crate::sink::materialized_view_v2;
160use crate::sink::refresh::apply_refresh;
161
162impl<'scope> SinkRender<'scope> for MaterializedViewSinkConnection<CollectionMetadata> {
163    fn render_sink(
164        &self,
165        compute_state: &mut ComputeState,
166        sink: &ComputeSinkDesc<CollectionMetadata>,
167        sink_id: GlobalId,
168        as_of: Antichain<Timestamp>,
169        start_signal: StartSignal,
170        mut ok_collection: VecCollection<'scope, Timestamp, Row, Diff>,
171        mut err_collection: VecCollection<'scope, Timestamp, DataflowErrorSer, Diff>,
172        output_probe: &Handle<Timestamp>,
173    ) -> Option<Rc<dyn Any>> {
174        // Attach probes reporting the compute frontier.
175        // The `apply_refresh` operator can round up frontiers, making it impossible to accurately
176        // track the progress of the computation, so we need to attach probes before it.
177        let probe = probe::Handle::default();
178        ok_collection = ok_collection
179            .probe_with(&probe)
180            .inner
181            .probe_notify_with(vec![output_probe.clone()])
182            .as_collection();
183        let collection_state = compute_state.expect_collection_mut(sink_id);
184        collection_state.compute_probe = Some(probe);
185
186        // If a `RefreshSchedule` was specified, round up timestamps.
187        if let Some(refresh_schedule) = &sink.refresh_schedule {
188            ok_collection = apply_refresh(ok_collection, refresh_schedule.clone());
189            err_collection = apply_refresh(err_collection, refresh_schedule.clone());
190        }
191
192        if sink.up_to != Antichain::default() {
193            unimplemented!(
194                "UP TO is not supported for persist sinks yet, and shouldn't have been accepted during parsing/planning"
195            )
196        }
197
198        let read_only_rx = collection_state.read_only_rx.clone();
199
200        let token = persist_sink(
201            sink_id,
202            &self.storage_metadata,
203            ok_collection,
204            err_collection,
205            as_of,
206            compute_state,
207            start_signal,
208            read_only_rx,
209        );
210        Some(token)
211    }
212}
213
214/// Type of the `desired` stream, split into `Ok` and `Err` streams.
215pub(super) type DesiredStreams<'s> = OkErr<
216    StreamVec<'s, Timestamp, (Row, Timestamp, Diff)>,
217    StreamVec<'s, Timestamp, (DataflowErrorSer, Timestamp, Diff)>,
218>;
219
220/// Type of the `persist` stream, split into `Ok` and `Err` streams.
221pub(super) type PersistStreams<'s> = OkErr<
222    StreamVec<'s, Timestamp, (Row, Timestamp, Diff)>,
223    StreamVec<'s, Timestamp, (DataflowErrorSer, Timestamp, Diff)>,
224>;
225
226/// Type of the `descs` stream.
227pub(super) type DescsStream<'s> = StreamVec<'s, Timestamp, BatchDescription>;
228
229/// Type of the `batches` stream.
230pub(super) type BatchesStream<'s> = StreamVec<'s, Timestamp, (BatchDescription, ProtoBatch)>;
231
232/// Type of the shared sink write frontier.
233pub(super) type SharedSinkFrontier = Rc<RefCell<Antichain<Timestamp>>>;
234
235/// Renders an MV sink writing the given desired collection into the `target` persist collection.
236pub(super) fn persist_sink<'s>(
237    sink_id: GlobalId,
238    target: &CollectionMetadata,
239    ok_collection: VecCollection<'s, Timestamp, Row, Diff>,
240    err_collection: VecCollection<'s, Timestamp, DataflowErrorSer, Diff>,
241    as_of: Antichain<Timestamp>,
242    compute_state: &mut ComputeState,
243    start_signal: StartSignal,
244    read_only_rx: watch::Receiver<bool>,
245) -> Rc<dyn Any>
246where
247{
248    if mz_compute_types::dyncfgs::ENABLE_SYNC_MV_SINK.get(&compute_state.worker_config) {
249        return materialized_view_v2::persist_sink(
250            sink_id,
251            target,
252            ok_collection,
253            err_collection,
254            as_of,
255            compute_state,
256            start_signal,
257            read_only_rx,
258        );
259    }
260
261    let scope = ok_collection.scope();
262    let desired = OkErr::new(ok_collection.inner, err_collection.inner);
263
264    // Read back the persist shard.
265    let (persist, persist_token) =
266        persist_source(scope, sink_id, target.clone(), compute_state, start_signal);
267
268    let persist_api = PersistApi {
269        persist_clients: Arc::clone(&compute_state.persist_clients),
270        collection: target.clone(),
271        shard_name: sink_id.to_string(),
272        purpose: format!("MV sink {sink_id}"),
273    };
274
275    let (desired, descs, sink_frontier, mint_token) = mint::render(
276        sink_id,
277        persist_api.clone(),
278        as_of.clone(),
279        read_only_rx.clone(),
280        desired,
281    );
282
283    let (batches, write_token) = write::render(
284        sink_id,
285        persist_api.clone(),
286        as_of,
287        desired,
288        persist,
289        descs.clone(),
290        read_only_rx,
291        Rc::clone(&compute_state.worker_config),
292    );
293
294    let append_token = append::render(sink_id, persist_api, descs, batches);
295
296    // Report sink frontier updates to the `ComputeState`.
297    let collection = compute_state.expect_collection_mut(sink_id);
298    collection.sink_write_frontier = Some(sink_frontier);
299
300    Rc::new((persist_token, mint_token, write_token, append_token))
301}
302
303/// Generic wrapper around ok/err pairs (e.g. streams, frontiers), to simplify code dealing with
304/// such pairs.
305pub(super) struct OkErr<O, E> {
306    pub(super) ok: O,
307    pub(super) err: E,
308}
309
310impl<O, E> OkErr<O, E> {
311    pub(super) fn new(ok: O, err: E) -> Self {
312        Self { ok, err }
313    }
314}
315
316impl OkErr<Antichain<Timestamp>, Antichain<Timestamp>> {
317    pub(super) fn new_frontiers() -> Self {
318        Self {
319            ok: Antichain::from_elem(Timestamp::MIN),
320            err: Antichain::from_elem(Timestamp::MIN),
321        }
322    }
323
324    /// Return the overall frontier, i.e., the minimum of `ok` and `err`.
325    pub(super) fn frontier(&self) -> &Antichain<Timestamp> {
326        if PartialOrder::less_equal(&self.ok, &self.err) {
327            &self.ok
328        } else {
329            &self.err
330        }
331    }
332}
333
334/// Advance the given `frontier` to `new`, if the latter one is greater.
335///
336/// Returns whether `frontier` was advanced.
337pub(super) fn advance(
338    frontier: &mut Antichain<Timestamp>,
339    new: timely::progress::frontier::AntichainRef<'_, Timestamp>,
340) -> bool {
341    if PartialOrder::less_than(&frontier.borrow(), &new) {
342        frontier.clear();
343        frontier.extend(new.iter().cloned());
344        true
345    } else {
346        false
347    }
348}
349
350/// A persist API specialized to a single collection.
351#[derive(Clone)]
352pub(super) struct PersistApi {
353    pub(super) persist_clients: Arc<PersistClientCache>,
354    pub(super) collection: CollectionMetadata,
355    pub(super) shard_name: String,
356    pub(super) purpose: String,
357}
358
359impl PersistApi {
360    pub(super) async fn open_client(&self) -> PersistClient {
361        self.persist_clients
362            .open(self.collection.persist_location.clone())
363            .await
364            .unwrap_or_else(|error| panic!("error opening persist client: {error}"))
365    }
366
367    pub(super) async fn open_writer(&self) -> WriteHandle<SourceData, (), Timestamp, StorageDiff> {
368        self.open_client()
369            .await
370            .open_writer(
371                self.collection.data_shard,
372                Arc::new(self.collection.relation_desc.clone()),
373                Arc::new(UnitSchema),
374                Diagnostics {
375                    shard_name: self.shard_name.clone(),
376                    handle_purpose: self.purpose.clone(),
377                },
378            )
379            .await
380            .unwrap_or_else(|error| panic!("error opening persist writer: {error}"))
381    }
382
383    async fn open_metrics(&self) -> SinkMetrics {
384        let client = self.open_client().await;
385        client.metrics().sink.clone()
386    }
387}
388
389/// Instantiate a persist source reading back the `target` collection.
390pub(super) fn persist_source<'s>(
391    scope: Scope<'s, Timestamp>,
392    sink_id: GlobalId,
393    target: CollectionMetadata,
394    compute_state: &ComputeState,
395    start_signal: StartSignal,
396) -> (PersistStreams<'s>, Vec<PressOnDropButton>) {
397    // There is no guarantee that the sink as-of is beyond the persist shard's since. If it isn't,
398    // instantiating a `persist_source` with it would panic. So instead we leave it to
399    // `persist_source` to select an appropriate as-of. We only care about times beyond the current
400    // shard upper anyway.
401    //
402    // TODO(teskje): Ideally we would select the as-of as `join(sink_as_of, since, upper)`, to
403    // allow `persist_source` to omit as much historical detail as possible. However, we don't know
404    // the shard frontiers and we cannot get them here as that requires an `async` context. We
405    // should consider extending the `persist_source` API to allow as-of selection based on the
406    // shard's current frontiers.
407    let as_of = None;
408
409    let until = Antichain::new();
410    let map_filter_project = None;
411
412    let (ok_stream, err_stream, token) =
413        mz_storage_operators::persist_source::persist_source::<DataflowErrorSer>(
414            scope,
415            sink_id,
416            Arc::clone(&compute_state.persist_clients),
417            &compute_state.txns_ctx,
418            target,
419            None,
420            as_of,
421            SnapshotMode::Include,
422            until,
423            map_filter_project,
424            compute_state.dataflow_max_inflight_bytes(),
425            start_signal.into_send_future(),
426            ErrorHandler::Halt("compute persist sink"),
427        );
428
429    let streams = OkErr::new(ok_stream, err_stream);
430    (streams, token)
431}
432
433/// A description for a batch of updates to be written.
434///
435/// Batch descriptions are produced by the `mint` operator and consumed by the `write` and `append`
436/// operators, where they inform which batches should be written or appended, respectively.
437///
438/// Each batch description also contains the index of its "append worker", i.e. the worker that is
439/// responsible for appending the written batches to the output shard.
440#[derive(Clone, Serialize, Deserialize)]
441pub(super) struct BatchDescription {
442    pub(super) lower: Antichain<Timestamp>,
443    pub(super) upper: Antichain<Timestamp>,
444    pub(super) append_worker: usize,
445}
446
447impl BatchDescription {
448    pub(super) fn new(
449        lower: Antichain<Timestamp>,
450        upper: Antichain<Timestamp>,
451        append_worker: usize,
452    ) -> Self {
453        assert!(PartialOrder::less_than(&lower, &upper));
454        Self {
455            lower,
456            upper,
457            append_worker,
458        }
459    }
460}
461
462impl std::fmt::Debug for BatchDescription {
463    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
464        write!(
465            f,
466            "({:?}, {:?})@{}",
467            self.lower.elements(),
468            self.upper.elements(),
469            self.append_worker,
470        )
471    }
472}
473
474/// Construct a name for the given sub-operator.
475pub(super) fn operator_name(sink_id: GlobalId, sub_operator: &str) -> String {
476    format!("mv_sink({sink_id})::{sub_operator}")
477}
478
479/// Implementation of the `mint` operator.
480mod mint {
481    use super::*;
482
483    /// Render the `mint` operator.
484    ///
485    /// The parameters passed in are:
486    ///  * `sink_id`: The `GlobalId` of the sink export.
487    ///  * `persist_api`: An object providing access to the output persist shard.
488    ///  * `as_of`: The first time for which the sink may produce output.
489    ///  * `read_only_tx`: A receiver that reports the sink is in read-only mode.
490    ///  * `desired`: The ok/err streams that should be sinked to persist.
491    pub fn render<'s>(
492        sink_id: GlobalId,
493        persist_api: PersistApi,
494        as_of: Antichain<Timestamp>,
495        mut read_only_rx: watch::Receiver<bool>,
496        desired: DesiredStreams<'s>,
497    ) -> (
498        DesiredStreams<'s>,
499        DescsStream<'s>,
500        SharedSinkFrontier,
501        PressOnDropButton,
502    ) {
503        let scope = desired.ok.scope();
504        let worker_id = scope.index();
505        let worker_count = scope.peers();
506
507        // Determine the active worker for the mint operator.
508        let active_worker_id = usize::cast_from(sink_id.hashed()) % scope.peers();
509
510        let sink_frontier = Rc::new(RefCell::new(Antichain::from_elem(Timestamp::MIN)));
511        let shared_frontier = Rc::clone(&sink_frontier);
512
513        let name = operator_name(sink_id, "mint");
514        let mut op = OperatorBuilder::new(name, scope);
515
516        let (ok_output, ok_stream) = op.new_output::<CapacityContainerBuilder<_>>();
517        let (err_output, err_stream) = op.new_output::<CapacityContainerBuilder<_>>();
518        let desired_outputs = OkErr::new(ok_output, err_output);
519        let desired_output_streams = OkErr::new(ok_stream, err_stream);
520
521        let (desc_output, desc_output_stream) = op.new_output::<CapacityContainerBuilder<_>>();
522
523        let mut desired_inputs = OkErr {
524            ok: op.new_input_for(desired.ok, Pipeline, &desired_outputs.ok),
525            err: op.new_input_for(desired.err, Pipeline, &desired_outputs.err),
526        };
527
528        let button = op.build(move |capabilities| async move {
529            // Passing through the `desired` streams only requires data capabilities, so we can
530            // immediately drop their initial capabilities here.
531            let [_, _, desc_cap]: [_; 3] =
532                capabilities.try_into().expect("one capability per output");
533
534            // Non-active workers just pass the `desired` and `persist` data through.
535            if worker_id != active_worker_id {
536                drop(desc_cap);
537                shared_frontier.borrow_mut().clear();
538
539                loop {
540                    tokio::select! {
541                        Some(event) = desired_inputs.ok.next() => {
542                            if let Event::Data(cap, mut data) = event {
543                                desired_outputs.ok.give_container(&cap, &mut data);
544                            }
545                        }
546                        Some(event) = desired_inputs.err.next() => {
547                            if let Event::Data(cap, mut data) = event {
548                                desired_outputs.err.give_container(&cap, &mut data);
549                            }
550                        }
551                        // All inputs are exhausted, so we can shut down.
552                        else => return,
553                    }
554                }
555            }
556
557            let mut cap_set = CapabilitySet::from_elem(desc_cap);
558
559            let read_only = *read_only_rx.borrow_and_update();
560            let mut state = State::new(sink_id, worker_count, as_of, read_only);
561
562            // Create a stream that reports advancements of the target shard's frontier and updates
563            // the shared sink frontier.
564            //
565            // We collect the persist frontier from a write handle directly, rather than inspecting
566            // the `persist` stream, because the latter has two annoying glitches:
567            //  (a) It starts at the shard's read frontier, not its write frontier.
568            //  (b) It can lag behind if there are spikes in ingested data.
569            //
570            // The decoupling from the `persist` stream is load-bearing: that stream can fall
571            // arbitrarily behind the shard upper during snapshot replay or write spikes. Using it
572            // would delay both (1) the controller-visible sink frontier (`shared_frontier`),
573            // which previously caused a CrossJoin feature-bench regression where the controller
574            // held a finished MV dataflow open waiting for the empty frontier, and (2) the
575            // `state.persist_frontier` that gates batch-description minting, stalling mint until
576            // the read-back stream catches up. The goal isn't tick-granular descriptions per
577            // se — it's avoiding the stream-induced stall. See 5eab5ff896 for the original
578            // regression and rationale.
579            let mut persist_frontiers = pin!(async_stream::stream! {
580                let mut writer = persist_api.open_writer().await;
581                let mut frontier = Antichain::from_elem(Timestamp::MIN);
582                while !frontier.is_empty() {
583                    writer.wait_for_upper_past(&frontier).await;
584                    frontier = writer.upper().clone();
585                    shared_frontier.borrow_mut().clone_from(&frontier);
586                    yield frontier.clone();
587                }
588            });
589
590            loop {
591                // Read from the inputs, pass through all data to the respective outputs, and keep
592                // track of the input frontiers. When a frontier advances we might have to mint a
593                // new batch description.
594                let maybe_desc = tokio::select! {
595                    Some(event) = desired_inputs.ok.next() => {
596                        match event {
597                            Event::Data(cap, mut data) => {
598                                desired_outputs.ok.give_container(&cap, &mut data);
599                                None
600                            }
601                            Event::Progress(frontier) => {
602                                state.advance_desired_ok_frontier(frontier);
603                                state.maybe_mint_batch_description()
604                            }
605                        }
606                    }
607                    Some(event) = desired_inputs.err.next() => {
608                        match event {
609                            Event::Data(cap, mut data) => {
610                                desired_outputs.err.give_container(&cap, &mut data);
611                                None
612                            }
613                            Event::Progress(frontier) => {
614                                state.advance_desired_err_frontier(frontier);
615                                state.maybe_mint_batch_description()
616                            }
617                        }
618                    }
619                    Some(frontier) = persist_frontiers.next() => {
620                        state.advance_persist_frontier(frontier);
621                        state.maybe_mint_batch_description()
622                    }
623                    Ok(()) = read_only_rx.changed(), if read_only => {
624                        state.allow_writes();
625                        state.maybe_mint_batch_description()
626                    }
627                    // All inputs are exhausted, so we can shut down.
628                    else => return,
629                };
630
631                if let Some(desc) = maybe_desc {
632                    let lower_ts = *desc.lower.as_option().expect("not empty");
633                    let cap = cap_set.delayed(&lower_ts);
634                    desc_output.give(&cap, desc);
635
636                    // We only emit strictly increasing `lower`s, so we can let our output frontier
637                    // advance beyond the current `lower`.
638                    cap_set.downgrade([lower_ts.step_forward()]);
639                } else {
640                    // The next emitted `lower` will be at least the `persist` frontier, so we can
641                    // advance our output frontier as far.
642                    let _ = cap_set.try_downgrade(state.persist_frontier.iter());
643                }
644            }
645        });
646
647        (
648            desired_output_streams,
649            desc_output_stream,
650            sink_frontier,
651            button.press_on_drop(),
652        )
653    }
654
655    /// State maintained by the `mint` operator.
656    struct State {
657        sink_id: GlobalId,
658        /// The number of workers in the Timely cluster.
659        worker_count: usize,
660        /// The frontiers of the `desired` inputs.
661        desired_frontiers: OkErr<Antichain<Timestamp>, Antichain<Timestamp>>,
662        /// The frontier of the target persist shard.
663        persist_frontier: Antichain<Timestamp>,
664        /// The append worker for the next batch description, chosen in round-robin fashion.
665        next_append_worker: usize,
666        /// The last `lower` we have emitted in a batch description, if any. Whenever the
667        /// `persist_frontier` moves beyond this frontier, we need to mint a new description.
668        last_lower: Option<Antichain<Timestamp>>,
669        /// Whether we are operating in read-only mode.
670        ///
671        /// In read-only mode, minting of batch descriptions is disabled.
672        read_only: bool,
673    }
674
675    impl State {
676        fn new(
677            sink_id: GlobalId,
678            worker_count: usize,
679            as_of: Antichain<Timestamp>,
680            read_only: bool,
681        ) -> Self {
682            // Initializing `persist_frontier` to the `as_of` ensures that the first minted batch
683            // description will have a `lower` of `as_of` or beyond, and thus that we don't spend
684            // work needlessly writing batches at previous times.
685            let persist_frontier = as_of;
686
687            Self {
688                sink_id,
689                worker_count,
690                desired_frontiers: OkErr::new_frontiers(),
691                persist_frontier,
692                next_append_worker: 0,
693                last_lower: None,
694                read_only,
695            }
696        }
697
698        fn trace<S: AsRef<str>>(&self, message: S) {
699            let message = message.as_ref();
700            trace!(
701                sink_id = %self.sink_id,
702                desired_frontier = ?self.desired_frontiers.frontier().elements(),
703                persist_frontier = ?self.persist_frontier.elements(),
704                last_lower = ?self.last_lower.as_ref().map(|f| f.elements()),
705                message,
706            );
707        }
708
709        fn advance_desired_ok_frontier(&mut self, frontier: Antichain<Timestamp>) {
710            if advance(&mut self.desired_frontiers.ok, frontier.borrow()) {
711                self.trace("advanced `desired` ok frontier");
712            }
713        }
714
715        fn advance_desired_err_frontier(&mut self, frontier: Antichain<Timestamp>) {
716            if advance(&mut self.desired_frontiers.err, frontier.borrow()) {
717                self.trace("advanced `desired` err frontier");
718            }
719        }
720
721        fn advance_persist_frontier(&mut self, frontier: Antichain<Timestamp>) {
722            if advance(&mut self.persist_frontier, frontier.borrow()) {
723                self.trace("advanced `persist` frontier");
724            }
725        }
726
727        fn allow_writes(&mut self) {
728            if self.read_only {
729                self.read_only = false;
730                self.trace("disabled read-only mode");
731            }
732        }
733
734        fn maybe_mint_batch_description(&mut self) -> Option<BatchDescription> {
735            let desired_frontier = self.desired_frontiers.frontier();
736            let persist_frontier = &self.persist_frontier;
737
738            // We only mint new batch descriptions when:
739            //  1. We are _not_ in read-only mode.
740            //  2. The `desired` frontier is ahead of the `persist` frontier.
741            //  3. The `persist` frontier advanced since we last emitted a batch description.
742            let desired_ahead = PartialOrder::less_than(persist_frontier, desired_frontier);
743            let persist_advanced = self.last_lower.as_ref().map_or(true, |lower| {
744                PartialOrder::less_than(lower, persist_frontier)
745            });
746
747            if self.read_only || !desired_ahead || !persist_advanced {
748                return None;
749            }
750
751            let lower = persist_frontier.clone();
752            let upper = desired_frontier.clone();
753            let append_worker = self.next_append_worker;
754            let desc = BatchDescription::new(lower, upper, append_worker);
755
756            self.next_append_worker = (append_worker + 1) % self.worker_count;
757            self.last_lower = Some(desc.lower.clone());
758
759            self.trace(format!("minted batch description: {desc:?}"));
760            Some(desc)
761        }
762    }
763}
764
765/// Implementation of the `write` operator.
766mod write {
767    use super::*;
768
769    /// Render the `write` operator.
770    ///
771    /// The parameters passed in are:
772    ///  * `sink_id`: The `GlobalId` of the sink export.
773    ///  * `persist_api`: An object providing access to the output persist shard.
774    ///  * `as_of`: The first time for which the sink may produce output.
775    ///  * `desired`: The ok/err streams that should be sinked to persist.
776    ///  * `persist`: The ok/err streams read back from the output persist shard.
777    ///  * `descs`: The stream of batch descriptions produced by the `mint` operator.
778    pub fn render<'s>(
779        sink_id: GlobalId,
780        persist_api: PersistApi,
781        as_of: Antichain<Timestamp>,
782        desired: DesiredStreams<'s>,
783        persist: PersistStreams<'s>,
784        descs: DescsStream<'s>,
785        mut read_only_rx: watch::Receiver<bool>,
786        worker_config: Rc<ConfigSet>,
787    ) -> (BatchesStream<'s>, PressOnDropButton) {
788        let scope = desired.ok.scope();
789        let worker_id = scope.index();
790
791        let name = operator_name(sink_id, "write");
792        let mut op = OperatorBuilder::new(name, scope.clone());
793
794        let mut channel_logging = None;
795        let mut correction_logger = None;
796        if let (Some(compute_logger), Some(differential_logger)) = (
797            scope.worker().logger_for("materialize/compute"),
798            scope.worker().logger_for("differential/arrange"),
799        ) {
800            let operator_info = op.operator_info();
801            let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
802            channel_logging = Some(ChannelLogging::new(tx));
803            correction_logger = Some(CorrectionLogger::new(
804                compute_logger,
805                differential_logger.into(),
806                operator_info.global_id,
807                operator_info.address.to_vec(),
808                rx,
809            ));
810        }
811
812        let (batches_output, batches_output_stream) =
813            op.new_output::<CapacityContainerBuilder<_>>();
814
815        // It is important that we exchange the `desired` and `persist` data the same way, so
816        // updates that cancel each other out end up on the same worker.
817        let exchange_ok = |(d, _, _): &(Row, Timestamp, Diff)| d.hashed();
818        let exchange_err = |(d, _, _): &(DataflowErrorSer, Timestamp, Diff)| d.hashed();
819
820        let mut desired_inputs = OkErr::new(
821            op.new_disconnected_input(desired.ok, Exchange::new(exchange_ok)),
822            op.new_disconnected_input(desired.err, Exchange::new(exchange_err)),
823        );
824        let mut persist_inputs = OkErr::new(
825            op.new_disconnected_input(persist.ok, Exchange::new(exchange_ok)),
826            op.new_disconnected_input(persist.err, Exchange::new(exchange_err)),
827        );
828        let mut descs_input = op.new_input_for(descs.broadcast(), Pipeline, &batches_output);
829
830        let button = op.build(move |capabilities| async move {
831            // We will use the data capabilities from the `descs` input to produce output, so no
832            // need to hold onto the initial capabilities.
833            drop(capabilities);
834
835            let writer = persist_api.open_writer().await;
836            let sink_metrics = persist_api.open_metrics().await;
837            let read_only = *read_only_rx.borrow_and_update();
838            let mut state = State::new(
839                sink_id,
840                worker_id,
841                writer,
842                sink_metrics,
843                channel_logging,
844                as_of,
845                read_only,
846                &worker_config,
847            );
848            let mut correction_logger = correction_logger;
849
850            loop {
851                // Drain correction logging events from the channel.
852                if let Some(logger) = &mut correction_logger {
853                    logger.apply_events();
854                }
855
856                // Read from the inputs, extract `desired` updates as positive contributions to
857                // `correction` and `persist` updates as negative contributions. If either the
858                // `desired` or `persist` frontier advances, or if we receive a new batch description,
859                // we might have to write a new batch.
860                let maybe_batch = tokio::select! {
861                    Some(event) = desired_inputs.ok.next() => {
862                        match event {
863                            Event::Data(_cap, mut data) => {
864                                state.corrections.ok.insert(&mut data);
865                                None
866                            }
867                            Event::Progress(frontier) => {
868                                state.advance_desired_ok_frontier(frontier);
869                                state.maybe_write_batch().await
870                            }
871                        }
872                    }
873                    Some(event) = desired_inputs.err.next() => {
874                        match event {
875                            Event::Data(_cap, mut data) => {
876                                state.corrections.err.insert(&mut data);
877                                None
878                            }
879                            Event::Progress(frontier) => {
880                                state.advance_desired_err_frontier(frontier);
881                                state.maybe_write_batch().await
882                            }
883                        }
884                    }
885                    Some(event) = persist_inputs.ok.next() => {
886                        match event {
887                            Event::Data(_cap, mut data) => {
888                                state.corrections.ok.insert_negated(&mut data);
889                                None
890                            }
891                            Event::Progress(frontier) => {
892                                state.advance_persist_ok_frontier(frontier);
893                                state.maybe_write_batch().await
894                            }
895                        }
896                    }
897                    Some(event) = persist_inputs.err.next() => {
898                        match event {
899                            Event::Data(_cap, mut data) => {
900                                state.corrections.err.insert_negated(&mut data);
901                                None
902                            }
903                            Event::Progress(frontier) => {
904                                state.advance_persist_err_frontier(frontier);
905                                state.maybe_write_batch().await
906                            }
907                        }
908                    }
909                    Some(event) = descs_input.next() => {
910                        match event {
911                            Event::Data(cap, data) => {
912                                for desc in data {
913                                    state.absorb_batch_description(desc, cap.clone());
914                                }
915                                state.maybe_write_batch().await
916                            }
917                            Event::Progress(_frontier) => None,
918                        }
919                    }
920                    // Track read-only mode so the forced consolidation stops re-arming once
921                    // writes are allowed and the batch-write path takes over.
922                    Ok(()) = read_only_rx.changed(), if state.read_only => {
923                        if !*read_only_rx.borrow_and_update() {
924                            state.set_read_only(false);
925                        }
926                        None
927                    }
928                    // All inputs are exhausted, so we can shut down.
929                    else => return,
930                };
931
932                if let Some((index, batch, cap)) = maybe_batch {
933                    batches_output.give(&cap, (index, batch));
934                }
935            }
936        });
937
938        (batches_output_stream, button.press_on_drop())
939    }
940
941    /// State maintained by the `write` operator.
942    struct State {
943        sink_id: GlobalId,
944        worker_id: usize,
945        persist_writer: WriteHandle<SourceData, (), Timestamp, StorageDiff>,
946        /// Contains `desired - persist`, reflecting the updates we would like to commit to
947        /// `persist` in order to "correct" it to track `desired`. This collection is only modified
948        /// by updates received from either the `desired` or `persist` inputs.
949        corrections: OkErr<Correction<Row>, Correction<DataflowErrorSer>>,
950        /// The frontiers of the `desired` inputs.
951        desired_frontiers: OkErr<Antichain<Timestamp>, Antichain<Timestamp>>,
952        /// The frontiers of the `persist` inputs.
953        ///
954        /// Note that this is _not_ the same as the write frontier of the output persist shard! It
955        /// usually is, but during snapshot processing, these frontiers will start at the shard's
956        /// read frontier, so they can be beyond its write frontier. This is important as it means
957        /// we must not discard batch descriptions based on these persist frontiers: A batch
958        /// description might still be valid even if its `lower` is before the persist frontiers we
959        /// observe.
960        persist_frontiers: OkErr<Antichain<Timestamp>, Antichain<Timestamp>>,
961        /// The current valid batch description and associated output capability, if any.
962        batch_description: Option<(BatchDescription, Capability<Timestamp>)>,
963        /// A request to force a consolidation of `corrections` once both `desired_frontiers` and
964        /// `persist_frontiers` become greater than the given frontier.
965        ///
966        /// Normally we force a consolidation whenever we write a batch, but there are periods
967        /// (like read-only mode) when that doesn't happen, and we need to manually force
968        /// consolidation instead. In read-only mode this is re-armed after each firing so it
969        /// keeps sweeping forward as the frontiers advance (see `maybe_force_consolidation`).
970        force_consolidation_after: Option<Antichain<Timestamp>>,
971        /// Whether the sink is in read-only mode. While read-only the `write` operator mints no
972        /// batches, so the batch-write path never sweeps `consolidate_before(upper)` forward; the
973        /// forced consolidation stands in for it and is re-armed as long as this holds.
974        read_only: bool,
975    }
976
977    impl State {
978        fn new(
979            sink_id: GlobalId,
980            worker_id: usize,
981            persist_writer: WriteHandle<SourceData, (), Timestamp, StorageDiff>,
982            metrics: SinkMetrics,
983            logging: Option<ChannelLogging>,
984            as_of: Antichain<Timestamp>,
985            read_only: bool,
986            worker_config: &ConfigSet,
987        ) -> Self {
988            let worker_metrics = metrics.for_worker(worker_id);
989
990            // Force a consolidation of `corrections` after the snapshot updates have been fully
991            // processed, to ensure we get rid of those as quickly as possible.
992            let force_consolidation_after = Some(as_of.clone());
993
994            let mut state = Self {
995                sink_id,
996                worker_id,
997                persist_writer,
998                corrections: OkErr::new(
999                    Correction::new(
1000                        metrics.clone(),
1001                        worker_metrics.clone(),
1002                        logging.clone(),
1003                        worker_config,
1004                    ),
1005                    Correction::new(metrics, worker_metrics, logging, worker_config),
1006                ),
1007                desired_frontiers: OkErr::new_frontiers(),
1008                persist_frontiers: OkErr::new_frontiers(),
1009                batch_description: None,
1010                force_consolidation_after,
1011                read_only,
1012            };
1013
1014            // Immediately advance the persist frontier tracking to the `as_of`.
1015            // This is important to ensure the persist sink doesn't get stuck if the output shard's
1016            // initial frontier is less than the `as_of`. The `mint` operator first emits a batch
1017            // description with `lower = as_of`, and the `write` operator only emits a batch when
1018            // its observed persist frontier is >= the batch description's `lower`, which (assuming
1019            // no other writers) would be never if we didn't advance the observed persist frontier
1020            // to the `as_of`.
1021            if MV_SINK_ADVANCE_PERSIST_FRONTIERS.get(worker_config) {
1022                state.advance_persist_ok_frontier(as_of.clone());
1023                state.advance_persist_err_frontier(as_of);
1024            }
1025
1026            state
1027        }
1028
1029        fn trace<S: AsRef<str>>(&self, message: S) {
1030            let message = message.as_ref();
1031            trace!(
1032                sink_id = %self.sink_id,
1033                worker = %self.worker_id,
1034                desired_frontier = ?self.desired_frontiers.frontier().elements(),
1035                persist_frontier = ?self.persist_frontiers.frontier().elements(),
1036                batch_description = ?self.batch_description.as_ref().map(|(d, _)| d),
1037                message,
1038            );
1039        }
1040
1041        fn advance_desired_ok_frontier(&mut self, frontier: Antichain<Timestamp>) {
1042            if advance(&mut self.desired_frontiers.ok, frontier.borrow()) {
1043                self.apply_desired_frontier_advancement();
1044                self.trace("advanced `desired` ok frontier");
1045            }
1046        }
1047
1048        fn advance_desired_err_frontier(&mut self, frontier: Antichain<Timestamp>) {
1049            if advance(&mut self.desired_frontiers.err, frontier.borrow()) {
1050                self.apply_desired_frontier_advancement();
1051                self.trace("advanced `desired` err frontier");
1052            }
1053        }
1054
1055        fn advance_persist_ok_frontier(&mut self, frontier: Antichain<Timestamp>) {
1056            if advance(&mut self.persist_frontiers.ok, frontier.borrow()) {
1057                self.apply_persist_frontier_advancement();
1058                self.trace("advanced `persist` ok frontier");
1059            }
1060        }
1061
1062        fn advance_persist_err_frontier(&mut self, frontier: Antichain<Timestamp>) {
1063            if advance(&mut self.persist_frontiers.err, frontier.borrow()) {
1064                self.apply_persist_frontier_advancement();
1065                self.trace("advanced `persist` err frontier");
1066            }
1067        }
1068
1069        /// Apply the effects of a previous `desired` frontier advancement.
1070        fn apply_desired_frontier_advancement(&mut self) {
1071            self.maybe_force_consolidation();
1072        }
1073
1074        /// Apply the effects of a previous `persist` frontier advancement.
1075        fn apply_persist_frontier_advancement(&mut self) {
1076            let frontier = self.persist_frontiers.frontier();
1077
1078            // We will only emit times at or after the `persist` frontier, so now is a good time to
1079            // advance the times of stashed updates.
1080            self.corrections.ok.advance_since(frontier.clone());
1081            self.corrections.err.advance_since(frontier.clone());
1082
1083            self.maybe_force_consolidation();
1084        }
1085
1086        /// If the current consolidation request has become applicable, apply it.
1087        fn maybe_force_consolidation(&mut self) {
1088            let Some(request) = &self.force_consolidation_after else {
1089                return;
1090            };
1091
1092            let desired_frontier = self.desired_frontiers.frontier();
1093            let persist_frontier = self.persist_frontiers.frontier();
1094            if PartialOrder::less_than(request, desired_frontier)
1095                && PartialOrder::less_than(request, persist_frontier)
1096            {
1097                self.trace("forcing correction consolidation");
1098                self.corrections.ok.consolidate_at_since();
1099                self.corrections.err.consolidate_at_since();
1100
1101                // In read-only mode the sink mints no batches, so the batch-write path never
1102                // sweeps `consolidate_before(upper)` forward and a single forced consolidation
1103                // only covers the band below the `since` at the moment it fires, leaving the
1104                // forward region uncancelled for the whole read-only window (CLU-131). Re-arm at
1105                // the current persist frontier so the forced consolidation keeps sweeping forward
1106                // as `desired`/`persist` advance. Once writes are allowed, the batch-write path
1107                // takes over and we disarm.
1108                self.force_consolidation_after = if self.read_only {
1109                    Some(persist_frontier.to_owned())
1110                } else {
1111                    None
1112                };
1113            }
1114        }
1115
1116        /// Update read-only mode. While read-only the forced consolidation re-arms itself; once
1117        /// writes are allowed the still-armed request fires one final time and then disarms, after
1118        /// which the batch-write path drives consolidation.
1119        fn set_read_only(&mut self, read_only: bool) {
1120            self.read_only = read_only;
1121        }
1122
1123        fn absorb_batch_description(&mut self, desc: BatchDescription, cap: Capability<Timestamp>) {
1124            // The incoming batch description is outdated if we already have a batch description
1125            // with a greater `lower`.
1126            //
1127            // Note that we cannot assume a description is outdated based on the comparison of its
1128            // `lower` with the `persist_frontier`. The persist frontier observed by the `write`
1129            // operator is initialized with the shard's read frontier, so it can be greater than
1130            // the shard's write frontier.
1131            if let Some((prev, _)) = &self.batch_description {
1132                if PartialOrder::less_than(&desc.lower, &prev.lower) {
1133                    self.trace(format!("skipping outdated batch description: {desc:?}"));
1134                    return;
1135                }
1136            }
1137
1138            self.batch_description = Some((desc, cap));
1139            self.trace("set batch description");
1140        }
1141
1142        async fn maybe_write_batch(
1143            &mut self,
1144        ) -> Option<(BatchDescription, ProtoBatch, Capability<Timestamp>)> {
1145            let (desc, _cap) = self.batch_description.as_ref()?;
1146
1147            // We can write a new batch if we have seen all `persist` updates before `lower` and
1148            // all `desired` updates up to `upper`.
1149            let persist_complete =
1150                PartialOrder::less_equal(&desc.lower, self.persist_frontiers.frontier());
1151            let desired_complete =
1152                PartialOrder::less_equal(&desc.upper, self.desired_frontiers.frontier());
1153            if !persist_complete || !desired_complete {
1154                return None;
1155            }
1156
1157            let (desc, cap) = self.batch_description.take()?;
1158
1159            let ok_updates = self.corrections.ok.updates_before(&desc.upper);
1160            let err_updates = self.corrections.err.updates_before(&desc.upper);
1161
1162            let oks = ok_updates.map(|(d, t, r)| ((SourceData(Ok(d)), ()), t, r.into_inner()));
1163            let errs = err_updates
1164                .map(|(d, t, r)| ((SourceData(Err(d.deserialize())), ()), t, r.into_inner()));
1165            let mut updates = oks.chain(errs).peekable();
1166
1167            // Don't write empty batches.
1168            if updates.peek().is_none() {
1169                drop(updates);
1170                self.trace("skipping empty batch");
1171                return None;
1172            }
1173
1174            let batch = self
1175                .persist_writer
1176                .batch(updates, desc.lower.clone(), desc.upper.clone())
1177                .await
1178                .expect("valid usage")
1179                .into_transmittable_batch();
1180
1181            self.trace("wrote a batch");
1182            Some((desc, batch, cap))
1183        }
1184    }
1185}
1186
1187/// Implementation of the `append` operator.
1188mod append {
1189    use super::*;
1190
1191    /// Render the `append` operator.
1192    ///
1193    /// The parameters passed in are:
1194    ///  * `sink_id`: The `GlobalId` of the sink export.
1195    ///  * `persist_api`: An object providing access to the output persist shard.
1196    ///  * `descs`: The stream of batch descriptions produced by the `mint` operator.
1197    ///  * `batches`: The stream of written batches produced by the `write` operator.
1198    pub fn render<'s>(
1199        sink_id: GlobalId,
1200        persist_api: PersistApi,
1201        descs: DescsStream<'s>,
1202        batches: BatchesStream<'s>,
1203    ) -> PressOnDropButton {
1204        let scope = descs.scope();
1205        let worker_id = scope.index();
1206
1207        let name = operator_name(sink_id, "append");
1208        let mut op = OperatorBuilder::new(name, scope);
1209
1210        // Broadcast batch descriptions to all workers, regardless of whether or not they are
1211        // responsible for the append, to give them a chance to clean up any outdated state they
1212        // might still hold.
1213        let mut descs_input = op.new_disconnected_input(descs.broadcast(), Pipeline);
1214        let mut batches_input = op.new_disconnected_input(
1215            batches,
1216            Exchange::new(move |(desc, _): &(BatchDescription, _)| {
1217                u64::cast_from(desc.append_worker)
1218            }),
1219        );
1220
1221        let button = op.build(move |_capabilities| async move {
1222            let writer = persist_api.open_writer().await;
1223            let mut state = State::new(sink_id, worker_id, writer);
1224
1225            loop {
1226                // Read from the inputs, absorb batch descriptions and batches. If the `batches`
1227                // frontier advances, or if we receive a new batch description, we might have to
1228                // append a new batch.
1229                tokio::select! {
1230                    Some(event) = descs_input.next() => {
1231                        if let Event::Data(_cap, data) = event {
1232                            for desc in data {
1233                                state.absorb_batch_description(desc).await;
1234                                state.maybe_append_batches().await;
1235                            }
1236                        }
1237                    }
1238                    Some(event) = batches_input.next() => {
1239                        match event {
1240                            Event::Data(_cap, data) => {
1241                                // The batch description is only used for routing and we ignore it
1242                                // here since we already get one from `descs_input`.
1243                                for (_desc, batch) in data {
1244                                    state.absorb_batch(batch).await;
1245                                }
1246                            }
1247                            Event::Progress(frontier) => {
1248                                state.advance_batches_frontier(frontier);
1249                                state.maybe_append_batches().await;
1250                            }
1251                        }
1252                    }
1253                    // All inputs are exhausted, so we can shut down.
1254                    else => return,
1255                }
1256            }
1257        });
1258
1259        button.press_on_drop()
1260    }
1261
1262    /// State maintained by the `append` operator.
1263    struct State {
1264        sink_id: GlobalId,
1265        worker_id: usize,
1266        persist_writer: WriteHandle<SourceData, (), Timestamp, StorageDiff>,
1267        /// The current input frontier of `batches`.
1268        batches_frontier: Antichain<Timestamp>,
1269        /// The greatest observed `lower` from both `descs` and `batches`.
1270        lower: Antichain<Timestamp>,
1271        /// The batch description for `lower`, if any.
1272        batch_description: Option<BatchDescription>,
1273        /// Batches received for `lower`.
1274        batches: Vec<Batch<SourceData, (), Timestamp, StorageDiff>>,
1275    }
1276
1277    impl State {
1278        fn new(
1279            sink_id: GlobalId,
1280            worker_id: usize,
1281            persist_writer: WriteHandle<SourceData, (), Timestamp, StorageDiff>,
1282        ) -> Self {
1283            Self {
1284                sink_id,
1285                worker_id,
1286                persist_writer,
1287                batches_frontier: Antichain::from_elem(Timestamp::MIN),
1288                lower: Antichain::from_elem(Timestamp::MIN),
1289                batch_description: None,
1290                batches: Default::default(),
1291            }
1292        }
1293
1294        fn trace<S: AsRef<str>>(&self, message: S) {
1295            let message = message.as_ref();
1296            trace!(
1297                sink_id = %self.sink_id,
1298                worker = %self.worker_id,
1299                batches_frontier = ?self.batches_frontier.elements(),
1300                lower = ?self.lower.elements(),
1301                batch_description = ?self.batch_description,
1302                message,
1303            );
1304        }
1305
1306        fn advance_batches_frontier(&mut self, frontier: Antichain<Timestamp>) {
1307            if advance(&mut self.batches_frontier, frontier.borrow()) {
1308                self.trace("advanced `batches` frontier");
1309            }
1310        }
1311
1312        /// Advance the current `lower`.
1313        ///
1314        /// Discards all currently stashed batches and batch descriptions, assuming that they are
1315        /// now invalid.
1316        async fn advance_lower(&mut self, frontier: Antichain<Timestamp>) {
1317            assert!(PartialOrder::less_than(&self.lower, &frontier));
1318
1319            self.lower = frontier;
1320            self.batch_description = None;
1321
1322            // Remove stashed batches, cleaning up those we didn't append.
1323            for batch in self.batches.drain(..) {
1324                batch.delete().await;
1325            }
1326
1327            self.trace("advanced `lower`");
1328        }
1329
1330        /// Absorb the given batch description into the state, provided it is not outdated.
1331        async fn absorb_batch_description(&mut self, desc: BatchDescription) {
1332            if PartialOrder::less_than(&self.lower, &desc.lower) {
1333                self.advance_lower(desc.lower.clone()).await;
1334            } else if &self.lower != &desc.lower {
1335                self.trace(format!("skipping outdated batch description: {desc:?}"));
1336                return;
1337            }
1338
1339            if desc.append_worker == self.worker_id {
1340                self.batch_description = Some(desc);
1341                self.trace("set batch description");
1342            }
1343        }
1344
1345        /// Absorb the given batch into the state, provided it is not outdated.
1346        async fn absorb_batch(&mut self, batch: ProtoBatch) {
1347            let batch = self.persist_writer.batch_from_transmittable_batch(batch);
1348            if PartialOrder::less_than(&self.lower, batch.lower()) {
1349                self.advance_lower(batch.lower().clone()).await;
1350            } else if &self.lower != batch.lower() {
1351                self.trace(format!(
1352                    "skipping outdated batch: ({:?}, {:?})",
1353                    batch.lower().elements(),
1354                    batch.upper().elements(),
1355                ));
1356
1357                // Ensure the batch's data gets properly cleaned up before dropping it.
1358                batch.delete().await;
1359                return;
1360            }
1361
1362            self.batches.push(batch);
1363            self.trace("absorbed a batch");
1364        }
1365
1366        async fn maybe_append_batches(&mut self) {
1367            let batches_complete = PartialOrder::less_than(&self.lower, &self.batches_frontier);
1368            if !batches_complete {
1369                return;
1370            }
1371
1372            let Some(desc) = self.batch_description.take() else {
1373                return;
1374            };
1375
1376            let new_lower = match self.append_batches(desc).await {
1377                Ok(shard_upper) => {
1378                    self.trace("appended a batch");
1379                    shard_upper
1380                }
1381                Err(shard_upper) => {
1382                    // Failing the append is expected in the presence of concurrent replicas. There
1383                    // is nothing special to do here: The self-correcting feedback mechanism
1384                    // ensures that we observe the concurrent changes, compute their consequences,
1385                    // and append them at a future time.
1386                    self.trace(format!(
1387                        "append failed due to `lower` mismatch: {:?}",
1388                        shard_upper.elements(),
1389                    ));
1390                    shard_upper
1391                }
1392            };
1393
1394            self.advance_lower(new_lower).await;
1395        }
1396
1397        /// Append the current `batches` to the output shard.
1398        ///
1399        /// Returns whether the append was successful or not, and the current shard upper in either
1400        /// case.
1401        ///
1402        /// This method advances the shard upper to the batch `lower` if necessary. This is the
1403        /// mechanism that brings the shard upper to the sink as-of when appending the initial
1404        /// batch.
1405        ///
1406        /// An alternative mechanism for bringing the shard upper to the sink as-of would be making
1407        /// a single append at operator startup. The reason we are doing it here instead is that it
1408        /// simplifies the implementation of read-only mode. In read-only mode we have to defer any
1409        /// persist writes, including the initial upper bump. Having only a single place that
1410        /// performs writes makes it easy to ensure we are doing that correctly.
1411        async fn append_batches(
1412            &mut self,
1413            desc: BatchDescription,
1414        ) -> Result<Antichain<Timestamp>, Antichain<Timestamp>> {
1415            let (lower, upper) = (desc.lower, desc.upper);
1416            let mut to_append: Vec<_> = self.batches.iter_mut().collect();
1417
1418            loop {
1419                let result = self
1420                    .persist_writer
1421                    .compare_and_append_batch(&mut to_append, lower.clone(), upper.clone(), true)
1422                    .await
1423                    .expect("valid usage");
1424
1425                match result {
1426                    Ok(()) => return Ok(upper),
1427                    Err(mismatch) if PartialOrder::less_than(&mismatch.current, &lower) => {
1428                        advance_shard_upper(&mut self.persist_writer, lower.clone()).await;
1429
1430                        // At this point the shard's since and upper are likely the same, a state
1431                        // that is likely to hit edge-cases in logic reasoning about frontiers.
1432                        fail::fail_point!("mv_advanced_upper");
1433                    }
1434                    Err(mismatch) => return Err(mismatch.current),
1435                }
1436            }
1437        }
1438    }
1439
1440    /// Advance the frontier of the given writer's shard to at least the given `upper`.
1441    async fn advance_shard_upper(
1442        persist_writer: &mut WriteHandle<SourceData, (), Timestamp, StorageDiff>,
1443        upper: Antichain<Timestamp>,
1444    ) {
1445        let empty_updates: &[((SourceData, ()), Timestamp, StorageDiff)] = &[];
1446        let lower = Antichain::from_elem(Timestamp::MIN);
1447        persist_writer
1448            .append(empty_updates, lower, upper)
1449            .await
1450            .expect("valid usage")
1451            .expect("should always succeed");
1452    }
1453}