Skip to main content

mz_compute/sink/
materialized_view_v2.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Sync Timely operator implementation of the MV sink.
11//!
12//! This module provides an alternative implementation of `persist_sink` that uses sync Timely
13//! operators communicating with Tokio tasks via channels, instead of async Timely operators.
14//! Gated behind the `ENABLE_SYNC_MV_SINK` dyncfg.
15//!
16//! See the [main module](super::materialized_view) for the operator graph and design docs.
17//!
18//! ### Channel ordering requirements
19//!
20//! Each operator splits state across a Timely thread (which observes inputs and frontiers) and a
21//! Tokio task (which owns persist I/O state). They communicate via `mpsc` command channels, which
22//! preserve send order on a single sender. Each operator instance constructs its own
23//! `(tx, rx)` pair inside `render` and never clones the sender, so there is exactly one producer
24//! per channel — sends are totally ordered. Different worker instances of the same operator
25//! never share a channel, so cross-worker ordering is not a concern. The correctness of the
26//! operators relies on a few ordering invariants between the messages sent within a single Timely
27//! activation:
28//!
29//!  * **`mint`**: the `persist_watch` Tokio task is the sole producer of persist-frontier updates
30//!    and emits them in monotonically increasing order, terminated by the empty frontier. The
31//!    Timely closure drains the receiver each activation; processing in receive order is therefore
32//!    sufficient. No cross-channel ordering is needed because `mint` only has the one channel.
33//!
34//!  * **`write`**: per-activation, the Timely closure first appends all observed input data into
35//!    a single `WriteCommand::Batch` and only then sends a `WriteCommand::WriteBatch` (issued from
36//!    `maybe_start_batch` after frontier checks). The Tokio task processes commands FIFO, so a
37//!    `WriteBatch` is guaranteed to see every `Batch` from the same activation already applied to
38//!    the corrections buffer. Reversing this order would let the task write a batch that is
39//!    missing updates the Timely closure already observed.
40//!
41//!  * **`append`**: per-activation, the Timely closure forwards messages in the order
42//!    `Description` → `Batch` → `BatchesFrontier`. The first two carry the data the task needs
43//!    to absorb; `BatchesFrontier` is the trigger that allows `maybe_append_batches` to fire.
44//!    Sending `BatchesFrontier` *after* its corresponding `Batch` messages ensures the task does
45//!    not append a batch description before all batches contributing to it have been absorbed.
46//!    If the order were reversed, `maybe_append_batches` could fire on an incomplete `batches`
47//!    set and miss writes.
48
49use std::any::Any;
50use std::cell::RefCell;
51use std::rc::Rc;
52use std::sync::Arc;
53
54use differential_dataflow::{Hashable, VecCollection};
55use mz_compute_types::dyncfgs::MV_SINK_ADVANCE_PERSIST_FRONTIERS;
56use mz_dyncfg::ConfigSet;
57use mz_ore::cast::CastFrom;
58use mz_persist_client::batch::{Batch, ProtoBatch};
59use mz_persist_client::write::WriteHandle;
60use mz_repr::{Diff, GlobalId, Row, Timestamp};
61use mz_storage_types::StorageDiff;
62use mz_storage_types::sources::SourceData;
63use timely::PartialOrder;
64use timely::dataflow::channels::pact::{Exchange, Pipeline};
65use timely::dataflow::operators::generic::OutputBuilder;
66use timely::dataflow::operators::generic::builder_rc::OperatorBuilder as OperatorBuilderRc;
67use timely::dataflow::operators::vec::Broadcast;
68use timely::dataflow::operators::{Capability, CapabilitySet};
69use timely::progress::Antichain;
70use timely::progress::frontier::AntichainRef;
71use tokio::sync::{mpsc, watch};
72use tracing::trace;
73
74use crate::compute_state::ComputeState;
75use crate::render::StartSignal;
76use crate::render::errors::DataflowErrorSer;
77use crate::sink::correction::{ChannelLogging, Correction, CorrectionLogger};
78use crate::sink::materialized_view::{
79    BatchDescription, BatchesStream, DescsStream, DesiredStreams, OkErr, PersistApi,
80    PersistStreams, SharedSinkFrontier, advance, operator_name, persist_source,
81};
82
83/// Renders an MV sink writing the given desired collection into the `target` persist collection.
84///
85/// This is the sync Timely operator implementation, using Tokio tasks for I/O.
86pub(super) fn persist_sink<'s>(
87    sink_id: GlobalId,
88    target: &mz_storage_types::controller::CollectionMetadata,
89    ok_collection: VecCollection<'s, Timestamp, Row, Diff>,
90    err_collection: VecCollection<'s, Timestamp, DataflowErrorSer, Diff>,
91    as_of: Antichain<Timestamp>,
92    compute_state: &mut ComputeState,
93    start_signal: StartSignal,
94    read_only_rx: watch::Receiver<bool>,
95) -> Rc<dyn Any> {
96    let scope = ok_collection.scope();
97    let desired = OkErr::new(ok_collection.inner, err_collection.inner);
98
99    // Read back the persist shard.
100    let (persist, persist_token) =
101        persist_source(scope, sink_id, target.clone(), compute_state, start_signal);
102
103    let persist_api = PersistApi {
104        persist_clients: Arc::clone(&compute_state.persist_clients),
105        collection: target.clone(),
106        shard_name: sink_id.to_string(),
107        purpose: format!("MV sink {sink_id}"),
108    };
109
110    let (desired, descs, sink_frontier) = mint::render(
111        sink_id,
112        persist_api.clone(),
113        as_of.clone(),
114        read_only_rx,
115        desired,
116    );
117
118    // Broadcast batch descriptions to all workers, regardless of whether or not they are
119    // responsible for the append, to give them a chance to clean up any outdated state they
120    // might still hold.
121    let descs = descs.broadcast();
122
123    let batches = write::render(
124        sink_id,
125        persist_api.clone(),
126        as_of,
127        desired,
128        persist,
129        descs.clone(),
130        Rc::clone(&compute_state.worker_config),
131    );
132
133    append::render(sink_id, persist_api, descs, batches);
134
135    // Report sink frontier updates to the `ComputeState`.
136    let collection = compute_state.expect_collection_mut(sink_id);
137    collection.sink_write_frontier = Some(sink_frontier);
138
139    Rc::new(persist_token)
140}
141
142/// Implementation of the `mint` operator.
143mod mint {
144    use super::*;
145    use timely::progress::frontier::AntichainRef;
146
147    /// Render the `mint` operator.
148    ///
149    /// The parameters passed in are:
150    ///  * `sink_id`: The `GlobalId` of the sink export.
151    ///  * `persist_api`: An object providing access to the output persist shard.
152    ///  * `as_of`: The first time for which the sink may produce output.
153    ///  * `read_only_rx`: A receiver that reports the sink is in read-only mode.
154    ///  * `desired`: The ok/err streams that should be sinked to persist.
155    pub fn render<'s>(
156        sink_id: GlobalId,
157        persist_api: PersistApi,
158        as_of: Antichain<Timestamp>,
159        mut read_only_rx: watch::Receiver<bool>,
160        desired: DesiredStreams<'s>,
161    ) -> (DesiredStreams<'s>, DescsStream<'s>, SharedSinkFrontier) {
162        let scope = desired.ok.scope();
163        let worker_id = scope.index();
164        let worker_count = scope.peers();
165
166        // Determine the active worker for the mint operator.
167        let active_worker_id = usize::cast_from(sink_id.hashed()) % scope.peers();
168
169        let sink_frontier = Rc::new(RefCell::new(Antichain::from_elem(Timestamp::MIN)));
170        let shared_frontier = Rc::clone(&sink_frontier);
171
172        let name = operator_name(sink_id, "mint");
173        let mut builder = OperatorBuilderRc::new(name, scope.clone());
174        let info = builder.operator_info();
175
176        // Create outputs (before inputs, so no input connections yet).
177        let (ok_output, ok_stream) = builder.new_output();
178        let (err_output, err_stream) = builder.new_output();
179        let (desc_output, desc_stream) = builder.new_output();
180
181        let mut ok_output = OutputBuilder::from(ok_output);
182        let mut err_output = OutputBuilder::from(err_output);
183        let mut desc_output = OutputBuilder::from(desc_output);
184
185        // desired_ok -> output 0 (ok passthrough)
186        let mut desired_ok_input = builder.new_input_connection(
187            desired.ok,
188            Pipeline,
189            [(0, Antichain::from_elem(Default::default()))],
190        );
191        // desired_err -> output 1 (err passthrough)
192        let mut desired_err_input = builder.new_input_connection(
193            desired.err,
194            Pipeline,
195            [(1, Antichain::from_elem(Default::default()))],
196        );
197
198        // Set up background tasks and state for the active worker only.
199        let mut task_handles = Vec::new();
200        let read_only = *read_only_rx.borrow_and_update();
201        let mut state = None;
202        if worker_id == active_worker_id {
203            // Spawn a Tokio task to watch the persist shard's upper frontier.
204            //
205            // We collect the persist frontier from a write handle directly, rather than
206            // inspecting the `persist` stream, because the latter has two annoying glitches:
207            //  (a) It starts at the shard's read frontier, not its write frontier.
208            //  (b) It can lag behind if there are spikes in ingested data.
209            //
210            // The decoupling from the `persist` stream is load-bearing: that stream can fall
211            // arbitrarily behind the shard upper during snapshot replay or write spikes. Using
212            // it would delay both (1) the controller-visible sink frontier (`shared_frontier`),
213            // which previously caused a CrossJoin feature-bench regression where the controller
214            // held a finished MV dataflow open waiting for the empty frontier, and (2) the
215            // `state.persist_frontier` that gates batch-description minting, stalling mint until
216            // the read-back stream catches up. The goal isn't tick-granular descriptions per
217            // se — it's avoiding the stream-induced stall. See 5eab5ff896 for the original
218            // regression and rationale.
219            //
220            // The task sends the empty frontier as its final message before exiting. The
221            // operator drops `persist_rx` once it receives the empty frontier.
222            let (persist_tx, persist_rx) = mpsc::unbounded_channel();
223            let sync_activator = scope.worker().sync_activator_for(info.address.to_vec());
224            let handle = mz_ore::task::spawn(
225                || operator_name(sink_id, "mint::persist_watch"),
226                async move {
227                    let mut writer = persist_api.open_writer().await;
228                    let mut frontier = Antichain::from_elem(Timestamp::MIN);
229                    loop {
230                        writer.wait_for_upper_past(&frontier).await;
231                        frontier = writer.upper().clone();
232                        if persist_tx.send(frontier.clone()).is_err() {
233                            return;
234                        }
235                        if sync_activator.activate().is_err() {
236                            return;
237                        }
238                        if frontier.is_empty() {
239                            return;
240                        }
241                    }
242                },
243            );
244            task_handles.push(handle.abort_on_drop());
245
246            // Spawn a Tokio task to wake the operator when read-only mode changes.
247            if read_only {
248                let sync_activator = scope.worker().sync_activator_for(info.address.to_vec());
249                let mut rx = read_only_rx.clone();
250                let handle = mz_ore::task::spawn(
251                    || format!("mv_sink({sink_id})::mint::read_only_watch"),
252                    async move {
253                        let _ = rx.changed().await;
254                        let _ = sync_activator.activate();
255                    },
256                );
257                task_handles.push(handle.abort_on_drop());
258            }
259
260            state = Some(State::new(
261                sink_id,
262                worker_count,
263                as_of,
264                read_only,
265                persist_rx,
266            ));
267        }
268
269        builder.build(move |capabilities| {
270            // Passing through the `desired` streams only requires data capabilities, so we can
271            // immediately drop their initial capabilities here.
272            let [_, _, desc_cap]: [_; 3] =
273                capabilities.try_into().expect("one capability per output");
274
275            let mut cap_set = if state.is_some() {
276                Some(CapabilitySet::from_elem(desc_cap))
277            } else {
278                drop(desc_cap);
279                shared_frontier.borrow_mut().clear();
280                None
281            };
282
283            move |frontiers| {
284                // Keep task handles alive so they are aborted when the operator is dropped.
285                let _ = &task_handles;
286
287                // Pass through desired data.
288                let mut ok_out = ok_output.activate();
289                desired_ok_input.for_each(|cap, data| {
290                    ok_out.session(&cap).give_container(data);
291                });
292                let mut err_out = err_output.activate();
293                desired_err_input.for_each(|cap, data| {
294                    err_out.session(&cap).give_container(data);
295                });
296
297                let Some(state) = &mut state else {
298                    // Non-active worker: just pass through data.
299                    return;
300                };
301                let cap_set = cap_set.as_mut().unwrap();
302
303                // Track desired frontiers.
304                state.advance_desired_ok_frontier(frontiers[0].frontier());
305                state.advance_desired_err_frontier(frontiers[1].frontier());
306
307                state.drain_persist_rx(&shared_frontier);
308
309                // Check read-only mode.
310                if state.read_only && read_only_rx.has_changed().unwrap_or(false) {
311                    if !*read_only_rx.borrow_and_update() {
312                        state.allow_writes();
313                    }
314                }
315
316                // Try to mint a batch description.
317                let mut desc_out = desc_output.activate();
318                if let Some(desc) = state.maybe_mint_batch_description() {
319                    let lower_ts = *desc.lower.as_option().expect("not empty");
320                    let cap = cap_set.delayed(&lower_ts);
321                    desc_out.session(&cap).give(desc);
322
323                    // We only emit strictly increasing `lower`s, so we can let our output frontier
324                    // advance beyond the current `lower`.
325                    cap_set.downgrade([lower_ts.step_forward()]);
326                } else {
327                    // The next emitted `lower` will be at least the `persist` frontier, so we can
328                    // advance our output frontier as far.
329                    let _ = cap_set.try_downgrade(state.persist_frontier.iter());
330                }
331            }
332        });
333
334        let desired_output_streams = OkErr::new(ok_stream, err_stream);
335        (desired_output_streams, desc_stream, sink_frontier)
336    }
337
338    /// State maintained by the `mint` operator.
339    struct State {
340        sink_id: GlobalId,
341        /// The number of workers in the Timely cluster.
342        worker_count: usize,
343        /// The frontiers of the `desired` inputs.
344        desired_frontiers: OkErr<Antichain<Timestamp>, Antichain<Timestamp>>,
345        /// The frontier of the target persist shard.
346        persist_frontier: Antichain<Timestamp>,
347        /// Receiver for persist frontier updates from the Tokio persist_watch task.
348        ///
349        /// Dropped once the empty frontier is received (the task's shutdown signal).
350        persist_rx: Option<mpsc::UnboundedReceiver<Antichain<Timestamp>>>,
351        /// The append worker for the next batch description, chosen in round-robin fashion.
352        next_append_worker: usize,
353        /// The last `lower` we have emitted in a batch description, if any. Whenever the
354        /// `persist_frontier` moves beyond this frontier, we need to mint a new description.
355        last_lower: Option<Antichain<Timestamp>>,
356        /// Whether we are operating in read-only mode.
357        ///
358        /// In read-only mode, minting of batch descriptions is disabled.
359        read_only: bool,
360    }
361
362    impl State {
363        fn new(
364            sink_id: GlobalId,
365            worker_count: usize,
366            as_of: Antichain<Timestamp>,
367            read_only: bool,
368            persist_rx: mpsc::UnboundedReceiver<Antichain<Timestamp>>,
369        ) -> Self {
370            // Initializing `persist_frontier` to the `as_of` ensures that the first minted batch
371            // description will have a `lower` of `as_of` or beyond, and thus that we don't spend
372            // effort writing out snapshots of data that is already in the shard.
373            let persist_frontier = as_of;
374
375            Self {
376                sink_id,
377                worker_count,
378                desired_frontiers: OkErr::new_frontiers(),
379                persist_frontier,
380                persist_rx: Some(persist_rx),
381                next_append_worker: 0,
382                last_lower: None,
383                read_only,
384            }
385        }
386
387        fn trace<S: AsRef<str>>(&self, message: S) {
388            let message = message.as_ref();
389            trace!(
390                sink_id = %self.sink_id,
391                desired_frontier = ?self.desired_frontiers.frontier().elements(),
392                persist_frontier = ?self.persist_frontier.elements(),
393                last_lower = ?self.last_lower,
394                message,
395            );
396        }
397
398        fn advance_desired_ok_frontier(&mut self, frontier: AntichainRef<Timestamp>) {
399            if advance(&mut self.desired_frontiers.ok, frontier) {
400                self.trace("advanced `desired` ok frontier");
401            }
402        }
403
404        fn advance_desired_err_frontier(&mut self, frontier: AntichainRef<Timestamp>) {
405            if advance(&mut self.desired_frontiers.err, frontier) {
406                self.trace("advanced `desired` err frontier");
407            }
408        }
409
410        fn advance_persist_frontier(&mut self, frontier: AntichainRef<Timestamp>) {
411            if advance(&mut self.persist_frontier, frontier) {
412                self.trace("advanced `persist` frontier");
413            }
414        }
415
416        /// Drain persist frontier updates from the Tokio task.
417        ///
418        /// Frontiers from the `persist_watch` task are monotonically increasing, so only the
419        /// most recent one matters. We drain all queued messages and apply just the latest,
420        /// avoiding redundant `advance_persist_frontier`/`trace!` calls when several updates
421        /// arrived between activations.
422        ///
423        /// The task sends the empty frontier as its final message before exiting. Once
424        /// received, we drop the receiver.
425        fn drain_persist_rx(&mut self, shared_frontier: &RefCell<Antichain<Timestamp>>) {
426            let Some(mut rx) = self.persist_rx.take() else {
427                return;
428            };
429            let mut latest: Option<Antichain<Timestamp>> = None;
430            let mut closed = false;
431            loop {
432                match rx.try_recv() {
433                    Ok(frontier) => {
434                        let done = frontier.is_empty();
435                        latest = Some(frontier);
436                        if done {
437                            closed = true;
438                            break;
439                        }
440                    }
441                    Err(mpsc::error::TryRecvError::Empty) => break,
442                    Err(mpsc::error::TryRecvError::Disconnected) => {
443                        panic!("mint persist_watch task unexpectedly gone");
444                    }
445                }
446            }
447            if let Some(frontier) = latest {
448                shared_frontier.borrow_mut().clone_from(&frontier);
449                self.advance_persist_frontier(frontier.borrow());
450            }
451            if !closed {
452                self.persist_rx = Some(rx);
453            }
454        }
455
456        fn allow_writes(&mut self) {
457            if self.read_only {
458                self.read_only = false;
459                self.trace("switched to write mode");
460            }
461        }
462
463        fn maybe_mint_batch_description(&mut self) -> Option<BatchDescription> {
464            let desired_frontier = self.desired_frontiers.frontier();
465            let persist_frontier = &self.persist_frontier;
466
467            // We only mint new batch descriptions when:
468            //  1. We are _not_ in read-only mode.
469            //  2. The `desired` frontier is ahead of the `persist` frontier.
470            //  3. The `persist` frontier advanced since we last emitted a batch description.
471            let desired_ahead = PartialOrder::less_than(persist_frontier, desired_frontier);
472            let persist_advanced = self.last_lower.as_ref().map_or(true, |lower| {
473                PartialOrder::less_than(lower, persist_frontier)
474            });
475
476            if self.read_only || !desired_ahead || !persist_advanced {
477                return None;
478            }
479
480            let lower = persist_frontier.clone();
481            let upper = desired_frontier.clone();
482            let append_worker = self.next_append_worker;
483            let desc = BatchDescription::new(lower, upper, append_worker);
484
485            self.next_append_worker = (append_worker + 1) % self.worker_count;
486            self.last_lower = Some(desc.lower.clone());
487
488            self.trace(format!("minted batch description: {desc:?}"));
489            Some(desc)
490        }
491    }
492}
493
494/// Implementation of the `write` operator.
495mod write {
496    use super::*;
497
498    use mz_timely_util::activator::ArcActivator;
499
500    /// Commands sent from the Timely operator to the Tokio write task.
501    enum WriteCommand {
502        /// A coalesced batch of work gathered during a single operator activation.
503        ///
504        /// The Timely closure accumulates all updates observed across the four data inputs plus
505        /// any frontier advancement and forced-consolidation flag, and sends a single
506        /// `WriteCommand::Batch` per activation. This keeps the channel overhead independent of
507        /// the number of Timely chunks processed per activation.
508        Batch(BatchUpdates),
509        /// Write a batch with the given description. The task drains corrections and writes
510        /// them to persist.
511        WriteBatch(BatchDescription),
512    }
513
514    /// The payload of a coalesced [`WriteCommand::Batch`].
515    struct BatchUpdates {
516        /// Positive contributions from the `desired` ok input.
517        desired_ok: Vec<(Row, Timestamp, Diff)>,
518        /// Positive contributions from the `desired` err input.
519        desired_err: Vec<(DataflowErrorSer, Timestamp, Diff)>,
520        /// Negative contributions from the `persist` ok input.
521        persist_ok: Vec<(Row, Timestamp, Diff)>,
522        /// Negative contributions from the `persist` err input.
523        persist_err: Vec<(DataflowErrorSer, Timestamp, Diff)>,
524        /// The new persist frontier, if it advanced this activation.
525        persist_frontier: Option<Antichain<Timestamp>>,
526        /// Whether a consolidation of the corrections buffer should be forced.
527        force_consolidation: bool,
528    }
529
530    impl BatchUpdates {
531        fn new() -> Self {
532            Self {
533                desired_ok: Vec::new(),
534                desired_err: Vec::new(),
535                persist_ok: Vec::new(),
536                persist_err: Vec::new(),
537                persist_frontier: None,
538                force_consolidation: false,
539            }
540        }
541
542        /// Returns true if there is no work in this batch.
543        fn is_empty(&self) -> bool {
544            self.desired_ok.is_empty()
545                && self.desired_err.is_empty()
546                && self.persist_ok.is_empty()
547                && self.persist_err.is_empty()
548                && self.persist_frontier.is_none()
549                && !self.force_consolidation
550        }
551    }
552
553    /// A response from the Tokio write task back to the Timely operator.
554    struct WriteResponse {
555        /// The written batch, or `None` if the corrections buffer had no updates.
556        batch: Option<ProtoBatch>,
557    }
558
559    /// Render the `write` operator.
560    ///
561    /// The parameters passed in are:
562    ///  * `sink_id`: The `GlobalId` of the sink export.
563    ///  * `persist_api`: An object providing access to the output persist shard.
564    ///  * `as_of`: The first time for which the sink may produce output.
565    ///  * `desired`: The ok/err streams that should be sinked to persist.
566    ///  * `persist`: The ok/err streams read back from the output persist shard.
567    ///  * `descs`: The stream of batch descriptions produced by the `mint` operator.
568    pub fn render<'s>(
569        sink_id: GlobalId,
570        persist_api: PersistApi,
571        as_of: Antichain<Timestamp>,
572        desired: DesiredStreams<'s>,
573        persist: PersistStreams<'s>,
574        descs: DescsStream<'s>,
575        worker_config: Rc<ConfigSet>,
576    ) -> BatchesStream<'s> {
577        let scope = desired.ok.scope();
578        let worker_id = scope.index();
579
580        let name = operator_name(sink_id, "write");
581        let mut builder = OperatorBuilderRc::new(name, scope.clone());
582        let info = builder.operator_info();
583
584        // Set up correction buffer logging. CorrectionLogger is not Send (uses timely
585        // loggers), so the Tokio task uses ChannelLogging to send events back to the
586        // Timely thread for application by the CorrectionLogger.
587        let mut channel_logging = None;
588        let mut correction_logger = None;
589        if let (Some(compute_logger), Some(differential_logger)) = (
590            scope.worker().logger_for("materialize/compute"),
591            scope.worker().logger_for("differential/arrange"),
592        ) {
593            let operator_info = builder.operator_info();
594            let (tx, rx) = mpsc::unbounded_channel();
595            channel_logging = Some(ChannelLogging::new(tx));
596            correction_logger = Some(CorrectionLogger::new(
597                compute_logger,
598                differential_logger.into(),
599                operator_info.global_id,
600                operator_info.address.to_vec(),
601                rx,
602            ));
603        }
604
605        // It is important that we exchange the `desired` and `persist` data the same way, so
606        // updates that cancel each other out end up on the same worker.
607        let exchange_ok = |(d, _, _): &(Row, Timestamp, Diff)| d.hashed();
608        let exchange_err = |(d, _, _): &(DataflowErrorSer, Timestamp, Diff)| d.hashed();
609
610        // Data inputs are created before the output, so they are not connected to it.
611        let mut desired_ok_input = builder.new_input(desired.ok, Exchange::new(exchange_ok));
612        let mut desired_err_input = builder.new_input(desired.err, Exchange::new(exchange_err));
613        let mut persist_ok_input = builder.new_input(persist.ok, Exchange::new(exchange_ok));
614        let mut persist_err_input = builder.new_input(persist.err, Exchange::new(exchange_err));
615        let mut descs_input = builder.new_input(descs, Pipeline);
616
617        // Only descs (input 4) is connected to the batches output.
618        let (batches_output, batches_output_stream) =
619            builder.new_output_connection([(4, Antichain::from_elem(Default::default()))]);
620        let mut batches_output = OutputBuilder::from(batches_output);
621
622        // Obtain SinkMetrics synchronously from the persist client cache, rather than through
623        // a WriteHandle, to avoid async I/O on the Timely thread.
624        let sink_metrics = persist_api.persist_clients.metrics().sink.clone();
625
626        // Construct corrections on the Timely thread (reads ConfigSet), then move to the
627        // Tokio task. The ChannelLogging sends events back to the Timely thread.
628        let worker_metrics = sink_metrics.for_worker(worker_id);
629        let mut corrections: OkErr<Correction<Row>, Correction<DataflowErrorSer>> = OkErr::new(
630            Correction::new(
631                sink_metrics.clone(),
632                worker_metrics.clone(),
633                channel_logging.clone(),
634                &worker_config,
635            ),
636            Correction::new(
637                sink_metrics.clone(),
638                worker_metrics,
639                channel_logging,
640                &worker_config,
641            ),
642        );
643
644        // Read `MV_SINK_ADVANCE_PERSIST_FRONTIERS` exactly once and reuse the captured value for
645        // both the Tokio-side `corrections.since` initialization below and the Timely-side
646        // `persist_frontiers` initialization in `State::new`. Re-reading the dyncfg per init site
647        // would let the value flip between reads and produce the very inconsistency this fix
648        // addresses: `persist_frontiers = as_of` (gate open) with `corrections.since = MIN`
649        // (snapshot updates not advanced) reproduces the original `UpdateNotBeyondLower` panic.
650        let advance_persist_frontiers_at_startup =
651            MV_SINK_ADVANCE_PERSIST_FRONTIERS.get(&worker_config);
652
653        // Mirror the persist-frontier initialization performed by `State::new` below. With the
654        // flag enabled, `State` advances its Timely-side `persist_frontiers` to `as_of`, opening
655        // the `maybe_start_batch` write gate (`desc.lower <= persist_frontiers.frontier()`)
656        // immediately for the first description minted with `lower = as_of`. The corrections
657        // buffer lives on the Tokio task and only learns of frontier advancements through
658        // `WriteCommand::Batch { persist_frontier, .. }`, which the Timely closure populates only
659        // when an input frontier actually moves. On startup, the input frontiers begin at
660        // `Timestamp::MIN`, so no `Batch` carries `persist_frontier` until the persist input
661        // catches up — yet a `WriteBatch(desc)` with `desc.lower = as_of` can already be sent.
662        // Snapshot-replay updates inserted in the meantime stay at their original timestamps
663        // (`Correction::insert` rounds to `max(t, since)` and `since == MIN`), and slip into the
664        // batch, tripping persist's `UpdateNotBeyondLower` invariant. Advancing
665        // `corrections.since` here keeps the Tokio side in lockstep with the Timely side, the
666        // same invariant `materialized_view::write::State::new` upholds via
667        // `apply_persist_frontier_advancement`.
668        if advance_persist_frontiers_at_startup {
669            corrections.ok.advance_since(as_of.clone());
670            corrections.err.advance_since(as_of.clone());
671        }
672
673        // Channels for commands and responses.
674        let (cmd_tx, mut cmd_rx) = mpsc::unbounded_channel::<WriteCommand>();
675        let (resp_tx, mut resp_rx) = mpsc::unbounded_channel::<WriteResponse>();
676
677        // Spawn Tokio task that owns the WriteHandle and corrections buffer.
678        let (activator, activation_ack) = ArcActivator::new(scope, &info);
679        let write_task_handle = {
680            mz_ore::task::spawn(
681                || operator_name(sink_id, "write::batch_writer"),
682                async move {
683                    let mut writer = persist_api.open_writer().await;
684
685                    while let Some(cmd) = cmd_rx.recv().await {
686                        apply_command(&mut corrections, &mut writer, cmd, &resp_tx).await;
687                        // Activate the operator to drain logging events and process batch responses.
688                        // ArcActivator suppresses redundant activations, so this is cheap.
689                        activator.activate();
690                    }
691                },
692            )
693            .abort_on_drop()
694        };
695
696        builder.build(move |capabilities| {
697            // We will use the data capabilities from the `descs` input to produce output, so no
698            // need to hold onto the initial capabilities.
699            drop(capabilities);
700
701            let mut state = State::new(
702                sink_id,
703                worker_id,
704                as_of,
705                advance_persist_frontiers_at_startup,
706            );
707
708            // Whether a batch write is currently in flight in the Tokio task.
709            let mut batch_in_flight: Option<(BatchDescription, Capability<Timestamp>)> = None;
710
711            // CorrectionLogger lives on the Timely thread and drains events from
712            // the channel each activation. On drop, it drains remaining events and
713            // retracts all logged state.
714            let mut correction_logger = correction_logger;
715
716            move |frontiers| {
717                // Keep task handle alive so it is aborted when the operator is dropped.
718                let _ = &write_task_handle;
719
720                // Acknowledge activation so the Tokio task can activate us again.
721                activation_ack.ack();
722
723                // Drain logging events from the Tokio task's ChannelLogging.
724                if let Some(logger) = &mut correction_logger {
725                    logger.apply_events();
726                }
727                // Coalesce all work from this activation into a single command. This keeps
728                // per-chunk channel overhead low, which matters for hydration when many small
729                // Timely chunks arrive in a single activation sweep.
730                let mut batch = BatchUpdates::new();
731                desired_ok_input.for_each(|_cap, data| {
732                    batch.desired_ok.append(data);
733                });
734                desired_err_input.for_each(|_cap, data| {
735                    batch.desired_err.append(data);
736                });
737                persist_ok_input.for_each(|_cap, data| {
738                    batch.persist_ok.append(data);
739                });
740                persist_err_input.for_each(|_cap, data| {
741                    batch.persist_err.append(data);
742                });
743
744                // Accept batch descriptions.
745                descs_input.for_each(|cap, data| {
746                    let cap = cap.retain(0);
747                    for desc in data.drain(..) {
748                        state.absorb_batch_description(desc, cap.clone());
749                    }
750                });
751
752                // Track frontiers. Include the new persist frontier in the coalesced batch for
753                // `advance_since`.
754                state.advance_desired_ok_frontier(frontiers[0].frontier());
755                state.advance_desired_err_frontier(frontiers[1].frontier());
756                if state.advance_persist_ok_frontier(frontiers[2].frontier())
757                    | state.advance_persist_err_frontier(frontiers[3].frontier())
758                {
759                    batch.persist_frontier = Some(state.persist_frontiers.frontier().to_owned());
760                }
761                if state.should_force_consolidation() {
762                    batch.force_consolidation = true;
763                }
764
765                if !batch.is_empty() {
766                    cmd_tx
767                        .send(WriteCommand::Batch(batch))
768                        .expect("write task unexpectedly gone");
769                }
770
771                // Try to receive batch results from the Tokio task.
772                loop {
773                    match resp_rx.try_recv() {
774                        Ok(resp) => {
775                            if let Some((desc, cap)) = batch_in_flight.take() {
776                                if let Some(batch) = resp.batch {
777                                    let mut out = batches_output.activate();
778                                    out.session(&cap).give((desc, batch));
779                                    state.trace("wrote a batch");
780                                } else {
781                                    state.trace("skipping empty batch");
782                                }
783                            }
784                        }
785                        Err(mpsc::error::TryRecvError::Empty) => break,
786                        Err(mpsc::error::TryRecvError::Disconnected) => {
787                            panic!("write task unexpectedly gone");
788                        }
789                    }
790                }
791
792                // If no batch in flight, try to write a new batch.
793                if batch_in_flight.is_none() {
794                    if let Some((desc, cap)) = state.maybe_start_batch(&cmd_tx) {
795                        batch_in_flight = Some((desc, cap));
796                    }
797                }
798            }
799        });
800
801        batches_output_stream
802    }
803
804    /// Apply a single command to the task state.
805    ///
806    /// `desired` updates enter `corrections` as positive contributions and `persist` updates as
807    /// negative contributions, so the buffer contains `desired - persist`, i.e. the updates that
808    /// need to be written to bring the shard in line with `desired`.
809    async fn apply_command(
810        corrections: &mut OkErr<Correction<Row>, Correction<DataflowErrorSer>>,
811        writer: &mut WriteHandle<SourceData, (), Timestamp, StorageDiff>,
812        cmd: WriteCommand,
813        resp_tx: &mpsc::UnboundedSender<WriteResponse>,
814    ) {
815        match cmd {
816            WriteCommand::Batch(mut batch) => {
817                // Apply the same logical sequence of operations that the per-chunk commands
818                // used to: positive desired inserts, negated persist inserts, then optional
819                // frontier advancement and forced consolidation.
820                if !batch.desired_ok.is_empty() {
821                    corrections.ok.insert(&mut batch.desired_ok);
822                }
823                if !batch.desired_err.is_empty() {
824                    corrections.err.insert(&mut batch.desired_err);
825                }
826                if !batch.persist_ok.is_empty() {
827                    corrections.ok.insert_negated(&mut batch.persist_ok);
828                }
829                if !batch.persist_err.is_empty() {
830                    corrections.err.insert_negated(&mut batch.persist_err);
831                }
832                if let Some(frontier) = batch.persist_frontier {
833                    // We will only emit times at or after the `persist` frontier, so now is a
834                    // good time to advance the times of stashed updates.
835                    corrections.ok.advance_since(frontier.clone());
836                    corrections.err.advance_since(frontier);
837                }
838                if batch.force_consolidation {
839                    corrections.ok.consolidate_at_since();
840                    corrections.err.consolidate_at_since();
841                }
842            }
843            WriteCommand::WriteBatch(desc) => {
844                // Chain ok and err correction iterators directly, avoiding an
845                // intermediate Vec allocation.
846                let oks = corrections
847                    .ok
848                    .updates_before(&desc.upper)
849                    .map(|(d, t, r)| ((SourceData(Ok(d)), ()), t, r.into_inner()));
850                let errs = corrections
851                    .err
852                    .updates_before(&desc.upper)
853                    .map(|(d, t, r)| ((SourceData(Err(d.deserialize())), ()), t, r.into_inner()));
854                let mut updates = oks.chain(errs).peekable();
855
856                if updates.peek().is_none() {
857                    // No corrections to write.
858                    let _ = resp_tx.send(WriteResponse { batch: None });
859                    return;
860                }
861
862                let batch = writer
863                    .batch(updates, desc.lower, desc.upper)
864                    .await
865                    .expect("valid usage");
866                let proto_batch = batch.into_transmittable_batch();
867                if let Err(err) = resp_tx.send(WriteResponse {
868                    batch: Some(proto_batch),
869                }) {
870                    let batch =
871                        writer.batch_from_transmittable_batch(err.0.batch.expect("just sent"));
872                    batch.delete().await;
873                }
874            }
875        }
876    }
877
878    /// State maintained by the `write` operator on the Timely thread.
879    struct State {
880        sink_id: GlobalId,
881        worker_id: usize,
882        /// The frontiers of the `desired` inputs.
883        desired_frontiers: OkErr<Antichain<Timestamp>, Antichain<Timestamp>>,
884        /// The frontiers of the `persist` inputs.
885        ///
886        /// Note that this is _not_ the same as the write frontier of the output persist shard! It
887        /// usually is, but during snapshot processing, these frontiers will start at the shard's
888        /// read frontier, so they can be beyond its write frontier. This is important as it means
889        /// we must not discard batch descriptions based on these persist frontiers: A batch
890        /// description might still be valid even if its `lower` is before the persist frontiers we
891        /// observe.
892        persist_frontiers: OkErr<Antichain<Timestamp>, Antichain<Timestamp>>,
893        /// The current valid batch description and associated output capability, if any.
894        batch_description: Option<(BatchDescription, Capability<Timestamp>)>,
895        /// A request to force a consolidation of corrections once both `desired_frontiers` and
896        /// `persist_frontiers` become greater than the given frontier.
897        ///
898        /// Normally we force a consolidation whenever we write a batch, but there are periods
899        /// (like read-only mode) when that doesn't happen, and we need to manually force
900        /// consolidation instead. Currently this is only used to ensure we quickly get rid of the
901        /// snapshot updates.
902        force_consolidation_after: Option<Antichain<Timestamp>>,
903    }
904
905    impl State {
906        fn new(
907            sink_id: GlobalId,
908            worker_id: usize,
909            as_of: Antichain<Timestamp>,
910            advance_persist_frontiers_at_startup: bool,
911        ) -> Self {
912            // Force a consolidation of corrections after the snapshot updates have been fully
913            // processed, to ensure we get rid of those as quickly as possible.
914            let force_consolidation_after = Some(as_of.clone());
915
916            let mut state = Self {
917                sink_id,
918                worker_id,
919                desired_frontiers: OkErr::new_frontiers(),
920                persist_frontiers: OkErr::new_frontiers(),
921                batch_description: None,
922                force_consolidation_after,
923            };
924
925            // Immediately advance the persist frontier tracking to the `as_of`.
926            // This is important to ensure the persist sink doesn't get stuck if the output shard's
927            // initial frontier is less than the `as_of`. The `mint` operator first emits a batch
928            // description with `lower = as_of`, and the `write` operator only emits a batch when
929            // its observed persist frontier is >= the batch description's `lower`, which (assuming
930            // no other writers) would be never if we didn't advance the observed persist frontier
931            // to the `as_of`.
932            //
933            // The `must_use` bool returned by the advance helpers signals that a corresponding
934            // `corrections.advance_since` must be queued to the Tokio task. We drop it here
935            // because the Tokio-side `corrections.since` is initialized to the same `as_of` in
936            // `write::render` before the task is spawned (using the same captured flag value),
937            // keeping both sides in lockstep without an additional channel send.
938            if advance_persist_frontiers_at_startup {
939                let _ = state.advance_persist_ok_frontier(as_of.borrow());
940                let _ = state.advance_persist_err_frontier(as_of.borrow());
941            }
942
943            state
944        }
945
946        fn trace<S: AsRef<str>>(&self, message: S) {
947            let message = message.as_ref();
948            trace!(
949                sink_id = %self.sink_id,
950                worker = %self.worker_id,
951                desired_frontier = ?self.desired_frontiers.frontier().elements(),
952                persist_frontier = ?self.persist_frontiers.frontier().elements(),
953                batch_description = ?self.batch_description.as_ref().map(|(d, _)| d),
954                message,
955            );
956        }
957
958        fn advance_desired_ok_frontier(&mut self, frontier: AntichainRef<Timestamp>) {
959            if advance(&mut self.desired_frontiers.ok, frontier) {
960                self.trace("advanced `desired` ok frontier");
961            }
962        }
963
964        fn advance_desired_err_frontier(&mut self, frontier: AntichainRef<Timestamp>) {
965            if advance(&mut self.desired_frontiers.err, frontier) {
966                self.trace("advanced `desired` err frontier");
967            }
968        }
969
970        /// Returns true if the persist frontier advanced.
971        ///
972        /// The caller must propagate a `true` return value into the next `WriteCommand::Batch`'s
973        /// `persist_frontier` field so the Tokio task advances `corrections.since` accordingly.
974        /// Dropping the bool leaves the Tokio-side `since` lagging behind `persist_frontiers`,
975        /// which can let updates with timestamps below `desc.lower` slip into a written batch.
976        #[must_use = "advance_persist_ok_frontier's return value gates a `corrections.advance_since` send to the Tokio task"]
977        fn advance_persist_ok_frontier(&mut self, frontier: AntichainRef<Timestamp>) -> bool {
978            if advance(&mut self.persist_frontiers.ok, frontier) {
979                self.trace("advanced `persist` ok frontier");
980                true
981            } else {
982                false
983            }
984        }
985
986        /// Returns true if the persist frontier advanced.
987        ///
988        /// The caller must propagate a `true` return value into the next `WriteCommand::Batch`'s
989        /// `persist_frontier` field so the Tokio task advances `corrections.since` accordingly.
990        /// Dropping the bool leaves the Tokio-side `since` lagging behind `persist_frontiers`,
991        /// which can let updates with timestamps below `desc.lower` slip into a written batch.
992        #[must_use = "advance_persist_err_frontier's return value gates a `corrections.advance_since` send to the Tokio task"]
993        fn advance_persist_err_frontier(&mut self, frontier: AntichainRef<Timestamp>) -> bool {
994            if advance(&mut self.persist_frontiers.err, frontier) {
995                self.trace("advanced `persist` err frontier");
996                true
997            } else {
998                false
999            }
1000        }
1001
1002        /// Check if a forced consolidation should be triggered.
1003        fn should_force_consolidation(&mut self) -> bool {
1004            let Some(request) = &self.force_consolidation_after else {
1005                return false;
1006            };
1007
1008            let desired_frontier = self.desired_frontiers.frontier();
1009            let persist_frontier = self.persist_frontiers.frontier();
1010            if PartialOrder::less_than(request, desired_frontier)
1011                && PartialOrder::less_than(request, persist_frontier)
1012            {
1013                self.trace("requesting correction consolidation");
1014                self.force_consolidation_after = None;
1015                true
1016            } else {
1017                false
1018            }
1019        }
1020
1021        fn absorb_batch_description(&mut self, desc: BatchDescription, cap: Capability<Timestamp>) {
1022            // Enforce monotonicity: drop descriptions whose `lower` regresses below the one we
1023            // already hold. The `mint` operator only emits strictly increasing `lower`s
1024            // (invariant 1), so a regression means this description is outdated. We cannot use
1025            // `persist_frontiers` for the same check, because during snapshot processing those
1026            // frontiers can be ahead of the shard's write frontier and a still-valid description
1027            // may have a `lower` below them.
1028            if let Some((prev, _)) = &self.batch_description {
1029                if PartialOrder::less_than(&desc.lower, &prev.lower) {
1030                    self.trace(format!("skipping outdated batch description: {desc:?}"));
1031                    return;
1032                }
1033            }
1034
1035            self.batch_description = Some((desc, cap));
1036            self.trace("set batch description");
1037        }
1038
1039        /// Check if a batch can be written and send a write command to the Tokio task if so.
1040        fn maybe_start_batch(
1041            &mut self,
1042            cmd_tx: &mpsc::UnboundedSender<WriteCommand>,
1043        ) -> Option<(BatchDescription, Capability<Timestamp>)> {
1044            let (desc, _cap) = self.batch_description.as_ref()?;
1045
1046            // We can write a new batch if we have seen all `persist` updates before `lower` and
1047            // all `desired` updates before `upper`.
1048            let persist_ready =
1049                PartialOrder::less_equal(&desc.lower, self.persist_frontiers.frontier());
1050            let desired_ready =
1051                PartialOrder::less_equal(&desc.upper, self.desired_frontiers.frontier());
1052            if !persist_ready || !desired_ready {
1053                return None;
1054            }
1055
1056            self.trace("write batch description");
1057            let (desc, cap) = self.batch_description.take()?;
1058            cmd_tx
1059                .send(WriteCommand::WriteBatch(desc.clone()))
1060                .expect("write task unexpectedly gone");
1061            Some((desc, cap))
1062        }
1063    }
1064}
1065
1066/// Implementation of the `append` operator.
1067mod append {
1068    use super::*;
1069
1070    /// Commands sent from the Timely operator to the Tokio append task.
1071    enum AppendCommand {
1072        /// A new batch description has been received.
1073        Description(BatchDescription),
1074        /// A written batch has been received.
1075        Batch(ProtoBatch),
1076        /// The batches frontier has advanced.
1077        BatchesFrontier(Antichain<Timestamp>),
1078    }
1079
1080    /// Render the `append` operator.
1081    ///
1082    /// The parameters passed in are:
1083    ///  * `sink_id`: The `GlobalId` of the sink export.
1084    ///  * `persist_api`: An object providing access to the output persist shard.
1085    ///  * `descs`: The stream of batch descriptions produced by the `mint` operator.
1086    ///  * `batches`: The stream of written batches produced by the `write` operator.
1087    pub fn render<'s>(
1088        sink_id: GlobalId,
1089        persist_api: PersistApi,
1090        descs: DescsStream<'s>,
1091        batches: BatchesStream<'s>,
1092    ) {
1093        let scope = descs.scope();
1094        let worker_id = scope.index();
1095
1096        let name = operator_name(sink_id, "append");
1097        let mut builder = OperatorBuilderRc::new(name, scope.clone());
1098        let mut descs_input = builder.new_input(descs, Pipeline);
1099        let batch_exchange =
1100            Exchange::new(|(desc, _): &(BatchDescription, _)| u64::cast_from(desc.append_worker));
1101        let mut batches_input = builder.new_input(batches, batch_exchange);
1102
1103        // Channel for commands to the Tokio append task.
1104        let (cmd_tx, mut cmd_rx) = mpsc::unbounded_channel::<AppendCommand>();
1105
1106        // Spawn Tokio task that owns the append state machine.
1107        let append_task_handle =
1108            mz_ore::task::spawn(|| operator_name(sink_id, "append"), async move {
1109                let writer = persist_api.open_writer().await;
1110                let mut state = State::new(sink_id, worker_id, writer);
1111
1112                while let Some(cmd) = cmd_rx.recv().await {
1113                    match cmd {
1114                        AppendCommand::Description(desc) => {
1115                            state.absorb_batch_description(desc).await;
1116                            state.maybe_append_batches().await;
1117                        }
1118                        AppendCommand::Batch(batch) => {
1119                            state.absorb_batch(batch).await;
1120                        }
1121                        AppendCommand::BatchesFrontier(frontier) => {
1122                            state.advance_batches_frontier(frontier.borrow());
1123                            state.maybe_append_batches().await;
1124                        }
1125                    }
1126                }
1127            })
1128            .abort_on_drop();
1129
1130        builder.build(move |_capabilities| {
1131            let mut prev_batches_frontier = Antichain::from_elem(Timestamp::MIN);
1132
1133            move |frontiers| {
1134                // Keep task handle alive so it is aborted when the operator is dropped.
1135                let _ = &append_task_handle;
1136
1137                // Forward batch descriptions to the Tokio task.
1138                descs_input.for_each(|_cap, data| {
1139                    for desc in data.drain(..) {
1140                        cmd_tx
1141                            .send(AppendCommand::Description(desc))
1142                            .expect("append task unexpectedly gone");
1143                    }
1144                });
1145
1146                // Forward batches to the Tokio task.
1147                batches_input.for_each(|_cap, data| {
1148                    for (_desc, batch) in data.drain(..) {
1149                        // The batch description is only used for routing and we ignore it
1150                        // here since we already get one from `descs_input`.
1151                        cmd_tx
1152                            .send(AppendCommand::Batch(batch))
1153                            .expect("append task unexpectedly gone");
1154                    }
1155                });
1156
1157                // Forward batches frontier advancements *after* the per-activation
1158                // `Description`/`Batch` sends above. The Tokio task drains commands FIFO and only
1159                // calls `maybe_append_batches` on `Description`/`BatchesFrontier`; if a frontier
1160                // advance arrived before its batches, the task could append an incomplete set.
1161                // See module-level docs for the full ordering invariant.
1162                let new_batches_frontier = frontiers[1].frontier();
1163                if PartialOrder::less_than(&prev_batches_frontier.borrow(), &new_batches_frontier) {
1164                    prev_batches_frontier.clear();
1165                    prev_batches_frontier.extend(new_batches_frontier.iter().cloned());
1166                    cmd_tx
1167                        .send(AppendCommand::BatchesFrontier(
1168                            new_batches_frontier.to_owned(),
1169                        ))
1170                        .expect("append task unexpectedly gone");
1171                }
1172            }
1173        });
1174    }
1175
1176    /// State maintained by the `append` Tokio task.
1177    struct State {
1178        sink_id: GlobalId,
1179        worker_id: usize,
1180        persist_writer: WriteHandle<SourceData, (), Timestamp, StorageDiff>,
1181        /// The current input frontier of `batches`.
1182        batches_frontier: Antichain<Timestamp>,
1183        /// The greatest observed `lower` from both `descs` and `batches`.
1184        lower: Antichain<Timestamp>,
1185        /// The batch description for `lower`, if any.
1186        batch_description: Option<BatchDescription>,
1187        /// Batches received for `lower`.
1188        batches: Vec<Batch<SourceData, (), Timestamp, StorageDiff>>,
1189    }
1190
1191    impl State {
1192        fn new(
1193            sink_id: GlobalId,
1194            worker_id: usize,
1195            persist_writer: WriteHandle<SourceData, (), Timestamp, StorageDiff>,
1196        ) -> Self {
1197            Self {
1198                sink_id,
1199                worker_id,
1200                persist_writer,
1201                batches_frontier: Antichain::from_elem(Timestamp::MIN),
1202                lower: Antichain::from_elem(Timestamp::MIN),
1203                batch_description: None,
1204                batches: Default::default(),
1205            }
1206        }
1207
1208        fn trace<S: AsRef<str>>(&self, message: S) {
1209            let message = message.as_ref();
1210            trace!(
1211                sink_id = %self.sink_id,
1212                worker = %self.worker_id,
1213                batches_frontier = ?self.batches_frontier.elements(),
1214                lower = ?self.lower.elements(),
1215                batch_description = ?self.batch_description,
1216                message,
1217            );
1218        }
1219
1220        fn advance_batches_frontier(&mut self, frontier: AntichainRef<Timestamp>) {
1221            if advance(&mut self.batches_frontier, frontier) {
1222                self.trace("advanced `batches` frontier");
1223            }
1224        }
1225
1226        /// Advance the current `lower`.
1227        ///
1228        /// Discards all currently stashed batches and batch descriptions, assuming that they are
1229        /// now invalid.
1230        async fn advance_lower(&mut self, frontier: Antichain<Timestamp>) {
1231            assert!(PartialOrder::less_than(&self.lower, &frontier));
1232
1233            self.lower = frontier;
1234            self.batch_description = None;
1235
1236            // Remove stashed batches, cleaning up those we didn't append.
1237            for batch in self.batches.drain(..) {
1238                batch.delete().await;
1239            }
1240
1241            self.trace("advanced `lower`");
1242        }
1243
1244        /// Absorb the given batch description into the state, provided it is not outdated.
1245        async fn absorb_batch_description(&mut self, desc: BatchDescription) {
1246            if PartialOrder::less_than(&self.lower, &desc.lower) {
1247                self.advance_lower(desc.lower.clone()).await;
1248            } else if &self.lower != &desc.lower {
1249                self.trace(format!("skipping outdated batch description: {desc:?}"));
1250                return;
1251            }
1252
1253            if desc.append_worker == self.worker_id {
1254                self.batch_description = Some(desc);
1255                self.trace("set batch description");
1256            }
1257        }
1258
1259        /// Absorb the given batch into the state, provided it is not outdated.
1260        async fn absorb_batch(&mut self, batch: ProtoBatch) {
1261            let batch = self.persist_writer.batch_from_transmittable_batch(batch);
1262            if PartialOrder::less_than(&self.lower, batch.lower()) {
1263                self.advance_lower(batch.lower().clone()).await;
1264            } else if &self.lower != batch.lower() {
1265                self.trace(format!(
1266                    "skipping outdated batch: ({:?}, {:?})",
1267                    batch.lower().elements(),
1268                    batch.upper().elements(),
1269                ));
1270
1271                // Ensure the batch's data gets properly cleaned up before dropping it.
1272                batch.delete().await;
1273                return;
1274            }
1275
1276            self.batches.push(batch);
1277            self.trace("absorbed a batch");
1278        }
1279
1280        async fn maybe_append_batches(&mut self) {
1281            let batches_complete = PartialOrder::less_than(&self.lower, &self.batches_frontier);
1282            if !batches_complete {
1283                return;
1284            }
1285
1286            let Some(desc) = self.batch_description.take() else {
1287                return;
1288            };
1289
1290            let new_lower = match self.append_batches(desc).await {
1291                Ok(shard_upper) => {
1292                    self.trace("appended a batch");
1293                    shard_upper
1294                }
1295                Err(shard_upper) => {
1296                    // Failing the append is expected in the presence of concurrent replicas. There
1297                    // is nothing special to do here: The self-correcting feedback mechanism
1298                    // ensures that we observe the concurrent changes, compute their consequences,
1299                    // and append them at a future time.
1300                    self.trace(format!(
1301                        "append failed due to `lower` mismatch: {:?}",
1302                        shard_upper.elements(),
1303                    ));
1304                    shard_upper
1305                }
1306            };
1307
1308            self.advance_lower(new_lower).await;
1309        }
1310
1311        /// Append the current `batches` to the output shard.
1312        ///
1313        /// Returns whether the append was successful or not, and the current shard upper in either
1314        /// case.
1315        ///
1316        /// This method advances the shard upper to the batch `lower` if necessary. This is the
1317        /// mechanism that brings the shard upper to the sink as-of when appending the initial
1318        /// batch.
1319        ///
1320        /// An alternative mechanism for bringing the shard upper to the sink as-of would be making
1321        /// a single append at operator startup. The reason we are doing it here instead is that it
1322        /// simplifies the implementation of read-only mode. In read-only mode we have to defer any
1323        /// persist writes, including the initial upper bump. Having only a single place that
1324        /// performs writes makes it easy to ensure we are doing that correctly.
1325        async fn append_batches(
1326            &mut self,
1327            desc: BatchDescription,
1328        ) -> Result<Antichain<Timestamp>, Antichain<Timestamp>> {
1329            let (lower, upper) = (desc.lower, desc.upper);
1330            let mut to_append: Vec<_> = self.batches.iter_mut().collect();
1331
1332            loop {
1333                let result = self
1334                    .persist_writer
1335                    .compare_and_append_batch(&mut to_append, lower.clone(), upper.clone(), true)
1336                    .await
1337                    .expect("valid usage");
1338
1339                match result {
1340                    Ok(()) => return Ok(upper),
1341                    Err(mismatch) if PartialOrder::less_than(&mismatch.current, &lower) => {
1342                        advance_shard_upper(&mut self.persist_writer, lower.clone()).await;
1343
1344                        // At this point the shard's since and upper are likely the same, a state
1345                        // that is likely to hit edge-cases in logic reasoning about frontiers.
1346                        fail::fail_point!("mv_advanced_upper");
1347                    }
1348                    Err(mismatch) => return Err(mismatch.current),
1349                }
1350            }
1351        }
1352    }
1353
1354    /// Advance the frontier of the given writer's shard to at least the given `upper`.
1355    async fn advance_shard_upper(
1356        persist_writer: &mut WriteHandle<SourceData, (), Timestamp, StorageDiff>,
1357        upper: Antichain<Timestamp>,
1358    ) {
1359        let empty_updates: &[((SourceData, ()), Timestamp, StorageDiff)] = &[];
1360        let lower = Antichain::from_elem(Timestamp::MIN);
1361        persist_writer
1362            .append(empty_updates, lower, upper)
1363            .await
1364            .expect("valid usage")
1365            .expect("should always succeed");
1366    }
1367}