Skip to main content

mz_compute/sink/
materialized_view_v2.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Sync Timely operator implementation of the MV sink.
11//!
12//! This module provides an alternative implementation of `persist_sink` that uses sync Timely
13//! operators communicating with Tokio tasks via channels, instead of async Timely operators.
14//! Gated behind the `ENABLE_SYNC_MV_SINK` dyncfg.
15//!
16//! See the [main module](super::materialized_view) for the operator graph and design docs.
17//!
18//! ### Channel ordering requirements
19//!
20//! Each operator splits state across a Timely thread (which observes inputs and frontiers) and a
21//! Tokio task (which owns persist I/O state). They communicate via `mpsc` command channels, which
22//! preserve send order on a single sender. Each operator instance constructs its own
23//! `(tx, rx)` pair inside `render` and never clones the sender, so there is exactly one producer
24//! per channel — sends are totally ordered. Different worker instances of the same operator
25//! never share a channel, so cross-worker ordering is not a concern. The correctness of the
26//! operators relies on a few ordering invariants between the messages sent within a single Timely
27//! activation:
28//!
29//!  * **`mint`**: the `persist_watch` Tokio task is the sole producer of persist-frontier updates
30//!    and emits them in monotonically increasing order, terminated by the empty frontier. The
31//!    Timely closure drains the receiver each activation; processing in receive order is therefore
32//!    sufficient. No cross-channel ordering is needed because `mint` only has the one channel.
33//!
34//!  * **`write`**: per-activation, the Timely closure first appends all observed input data into
35//!    a single `WriteCommand::Batch` and only then sends a `WriteCommand::WriteBatch` (issued from
36//!    `maybe_start_batch` after frontier checks). The Tokio task processes commands FIFO, so a
37//!    `WriteBatch` is guaranteed to see every `Batch` from the same activation already applied to
38//!    the corrections buffer. Reversing this order would let the task write a batch that is
39//!    missing updates the Timely closure already observed.
40//!
41//!  * **`append`**: per-activation, the Timely closure forwards messages in the order
42//!    `Description` → `Batch` → `BatchesFrontier`. The first two carry the data the task needs
43//!    to absorb; `BatchesFrontier` is the trigger that allows `maybe_append_batches` to fire.
44//!    Sending `BatchesFrontier` *after* its corresponding `Batch` messages ensures the task does
45//!    not append a batch description before all batches contributing to it have been absorbed.
46//!    If the order were reversed, `maybe_append_batches` could fire on an incomplete `batches`
47//!    set and miss writes.
48
49use std::any::Any;
50use std::cell::RefCell;
51use std::rc::Rc;
52use std::sync::Arc;
53
54use differential_dataflow::{Hashable, VecCollection};
55use mz_compute_types::dyncfgs::MV_SINK_ADVANCE_PERSIST_FRONTIERS;
56use mz_dyncfg::ConfigSet;
57use mz_ore::cast::CastFrom;
58use mz_persist_client::batch::{Batch, ProtoBatch};
59use mz_persist_client::write::WriteHandle;
60use mz_repr::{Diff, GlobalId, Row, Timestamp};
61use mz_storage_types::StorageDiff;
62use mz_storage_types::sources::SourceData;
63use timely::PartialOrder;
64use timely::dataflow::channels::pact::{Exchange, Pipeline};
65use timely::dataflow::operators::generic::OutputBuilder;
66use timely::dataflow::operators::generic::builder_rc::OperatorBuilder as OperatorBuilderRc;
67use timely::dataflow::operators::vec::Broadcast;
68use timely::dataflow::operators::{Capability, CapabilitySet};
69use timely::progress::Antichain;
70use timely::progress::frontier::AntichainRef;
71use tokio::sync::{mpsc, watch};
72use tracing::trace;
73
74use crate::compute_state::ComputeState;
75use crate::render::StartSignal;
76use crate::render::errors::DataflowErrorSer;
77use crate::sink::correction::{ChannelLogging, Correction, CorrectionLogger};
78use crate::sink::materialized_view::{
79    BatchDescription, BatchesStream, DescsStream, DesiredStreams, OkErr, PersistApi,
80    PersistStreams, SharedSinkFrontier, advance, operator_name, persist_source,
81};
82
83/// Renders an MV sink writing the given desired collection into the `target` persist collection.
84///
85/// This is the sync Timely operator implementation, using Tokio tasks for I/O.
86pub(super) fn persist_sink<'s>(
87    sink_id: GlobalId,
88    target: &mz_storage_types::controller::CollectionMetadata,
89    ok_collection: VecCollection<'s, Timestamp, Row, Diff>,
90    err_collection: VecCollection<'s, Timestamp, DataflowErrorSer, Diff>,
91    as_of: Antichain<Timestamp>,
92    compute_state: &mut ComputeState,
93    start_signal: StartSignal,
94    read_only_rx: watch::Receiver<bool>,
95) -> Rc<dyn Any> {
96    let scope = ok_collection.scope();
97    let desired = OkErr::new(ok_collection.inner, err_collection.inner);
98
99    // Read back the persist shard.
100    let (persist, persist_token) =
101        persist_source(scope, sink_id, target.clone(), compute_state, start_signal);
102
103    let persist_api = PersistApi {
104        persist_clients: Arc::clone(&compute_state.persist_clients),
105        collection: target.clone(),
106        shard_name: sink_id.to_string(),
107        purpose: format!("MV sink {sink_id}"),
108    };
109
110    let (desired, descs, sink_frontier) = mint::render(
111        sink_id,
112        persist_api.clone(),
113        as_of.clone(),
114        read_only_rx.clone(),
115        desired,
116    );
117
118    // Broadcast batch descriptions to all workers, regardless of whether or not they are
119    // responsible for the append, to give them a chance to clean up any outdated state they
120    // might still hold.
121    let descs = descs.broadcast();
122
123    let batches = write::render(
124        sink_id,
125        persist_api.clone(),
126        as_of,
127        desired,
128        persist,
129        descs.clone(),
130        read_only_rx,
131        Rc::clone(&compute_state.worker_config),
132    );
133
134    append::render(sink_id, persist_api, descs, batches);
135
136    // Report sink frontier updates to the `ComputeState`.
137    let collection = compute_state.expect_collection_mut(sink_id);
138    collection.sink_write_frontier = Some(sink_frontier);
139
140    Rc::new(persist_token)
141}
142
143/// Implementation of the `mint` operator.
144mod mint {
145    use super::*;
146    use timely::progress::frontier::AntichainRef;
147
148    /// Render the `mint` operator.
149    ///
150    /// The parameters passed in are:
151    ///  * `sink_id`: The `GlobalId` of the sink export.
152    ///  * `persist_api`: An object providing access to the output persist shard.
153    ///  * `as_of`: The first time for which the sink may produce output.
154    ///  * `read_only_rx`: A receiver that reports the sink is in read-only mode.
155    ///  * `desired`: The ok/err streams that should be sinked to persist.
156    pub fn render<'s>(
157        sink_id: GlobalId,
158        persist_api: PersistApi,
159        as_of: Antichain<Timestamp>,
160        mut read_only_rx: watch::Receiver<bool>,
161        desired: DesiredStreams<'s>,
162    ) -> (DesiredStreams<'s>, DescsStream<'s>, SharedSinkFrontier) {
163        let scope = desired.ok.scope();
164        let worker_id = scope.index();
165        let worker_count = scope.peers();
166
167        // Determine the active worker for the mint operator.
168        let active_worker_id = usize::cast_from(sink_id.hashed()) % scope.peers();
169
170        let sink_frontier = Rc::new(RefCell::new(Antichain::from_elem(Timestamp::MIN)));
171        let shared_frontier = Rc::clone(&sink_frontier);
172
173        let name = operator_name(sink_id, "mint");
174        let mut builder = OperatorBuilderRc::new(name, scope.clone());
175        let info = builder.operator_info();
176
177        // Create outputs (before inputs, so no input connections yet).
178        let (ok_output, ok_stream) = builder.new_output();
179        let (err_output, err_stream) = builder.new_output();
180        let (desc_output, desc_stream) = builder.new_output();
181
182        let mut ok_output = OutputBuilder::from(ok_output);
183        let mut err_output = OutputBuilder::from(err_output);
184        let mut desc_output = OutputBuilder::from(desc_output);
185
186        // desired_ok -> output 0 (ok passthrough)
187        let mut desired_ok_input = builder.new_input_connection(
188            desired.ok,
189            Pipeline,
190            [(0, Antichain::from_elem(Default::default()))],
191        );
192        // desired_err -> output 1 (err passthrough)
193        let mut desired_err_input = builder.new_input_connection(
194            desired.err,
195            Pipeline,
196            [(1, Antichain::from_elem(Default::default()))],
197        );
198
199        // Set up background tasks and state for the active worker only.
200        let mut task_handles = Vec::new();
201        let read_only = *read_only_rx.borrow_and_update();
202        let mut state = None;
203        if worker_id == active_worker_id {
204            // Spawn a Tokio task to watch the persist shard's upper frontier.
205            //
206            // We collect the persist frontier from a write handle directly, rather than
207            // inspecting the `persist` stream, because the latter has two annoying glitches:
208            //  (a) It starts at the shard's read frontier, not its write frontier.
209            //  (b) It can lag behind if there are spikes in ingested data.
210            //
211            // The decoupling from the `persist` stream is load-bearing: that stream can fall
212            // arbitrarily behind the shard upper during snapshot replay or write spikes. Using
213            // it would delay both (1) the controller-visible sink frontier (`shared_frontier`),
214            // which previously caused a CrossJoin feature-bench regression where the controller
215            // held a finished MV dataflow open waiting for the empty frontier, and (2) the
216            // `state.persist_frontier` that gates batch-description minting, stalling mint until
217            // the read-back stream catches up. The goal isn't tick-granular descriptions per
218            // se — it's avoiding the stream-induced stall. See 5eab5ff896 for the original
219            // regression and rationale.
220            //
221            // The task sends the empty frontier as its final message before exiting. The
222            // operator drops `persist_rx` once it receives the empty frontier.
223            let (persist_tx, persist_rx) = mpsc::unbounded_channel();
224            let sync_activator = scope.worker().sync_activator_for(info.address.to_vec());
225            let handle = mz_ore::task::spawn(
226                || operator_name(sink_id, "mint::persist_watch"),
227                async move {
228                    let mut writer = persist_api.open_writer().await;
229                    let mut frontier = Antichain::from_elem(Timestamp::MIN);
230                    loop {
231                        writer.wait_for_upper_past(&frontier).await;
232                        frontier = writer.upper().clone();
233                        if persist_tx.send(frontier.clone()).is_err() {
234                            return;
235                        }
236                        if sync_activator.activate().is_err() {
237                            return;
238                        }
239                        if frontier.is_empty() {
240                            return;
241                        }
242                    }
243                },
244            );
245            task_handles.push(handle.abort_on_drop());
246
247            // Spawn a Tokio task to wake the operator when read-only mode changes.
248            if read_only {
249                let sync_activator = scope.worker().sync_activator_for(info.address.to_vec());
250                let mut rx = read_only_rx.clone();
251                let handle = mz_ore::task::spawn(
252                    || format!("mv_sink({sink_id})::mint::read_only_watch"),
253                    async move {
254                        let _ = rx.changed().await;
255                        let _ = sync_activator.activate();
256                    },
257                );
258                task_handles.push(handle.abort_on_drop());
259            }
260
261            state = Some(State::new(
262                sink_id,
263                worker_count,
264                as_of,
265                read_only,
266                persist_rx,
267            ));
268        }
269
270        builder.build(move |capabilities| {
271            // Passing through the `desired` streams only requires data capabilities, so we can
272            // immediately drop their initial capabilities here.
273            let [_, _, desc_cap]: [_; 3] =
274                capabilities.try_into().expect("one capability per output");
275
276            let mut cap_set = if state.is_some() {
277                Some(CapabilitySet::from_elem(desc_cap))
278            } else {
279                drop(desc_cap);
280                shared_frontier.borrow_mut().clear();
281                None
282            };
283
284            move |frontiers| {
285                // Keep task handles alive so they are aborted when the operator is dropped.
286                let _ = &task_handles;
287
288                // Pass through desired data.
289                let mut ok_out = ok_output.activate();
290                desired_ok_input.for_each(|cap, data| {
291                    ok_out.session(&cap).give_container(data);
292                });
293                let mut err_out = err_output.activate();
294                desired_err_input.for_each(|cap, data| {
295                    err_out.session(&cap).give_container(data);
296                });
297
298                let Some(state) = &mut state else {
299                    // Non-active worker: just pass through data.
300                    return;
301                };
302                let cap_set = cap_set.as_mut().unwrap();
303
304                // Track desired frontiers.
305                state.advance_desired_ok_frontier(frontiers[0].frontier());
306                state.advance_desired_err_frontier(frontiers[1].frontier());
307
308                state.drain_persist_rx(&shared_frontier);
309
310                // Check read-only mode.
311                if state.read_only && read_only_rx.has_changed().unwrap_or(false) {
312                    if !*read_only_rx.borrow_and_update() {
313                        state.allow_writes();
314                    }
315                }
316
317                // Try to mint a batch description.
318                let mut desc_out = desc_output.activate();
319                if let Some(desc) = state.maybe_mint_batch_description() {
320                    let lower_ts = *desc.lower.as_option().expect("not empty");
321                    let cap = cap_set.delayed(&lower_ts);
322                    desc_out.session(&cap).give(desc);
323
324                    // We only emit strictly increasing `lower`s, so we can let our output frontier
325                    // advance beyond the current `lower`.
326                    cap_set.downgrade([lower_ts.step_forward()]);
327                } else {
328                    // The next emitted `lower` will be at least the `persist` frontier, so we can
329                    // advance our output frontier as far.
330                    let _ = cap_set.try_downgrade(state.persist_frontier.iter());
331                }
332            }
333        });
334
335        let desired_output_streams = OkErr::new(ok_stream, err_stream);
336        (desired_output_streams, desc_stream, sink_frontier)
337    }
338
339    /// State maintained by the `mint` operator.
340    struct State {
341        sink_id: GlobalId,
342        /// The number of workers in the Timely cluster.
343        worker_count: usize,
344        /// The frontiers of the `desired` inputs.
345        desired_frontiers: OkErr<Antichain<Timestamp>, Antichain<Timestamp>>,
346        /// The frontier of the target persist shard.
347        persist_frontier: Antichain<Timestamp>,
348        /// Receiver for persist frontier updates from the Tokio persist_watch task.
349        ///
350        /// Dropped once the empty frontier is received (the task's shutdown signal).
351        persist_rx: Option<mpsc::UnboundedReceiver<Antichain<Timestamp>>>,
352        /// The append worker for the next batch description, chosen in round-robin fashion.
353        next_append_worker: usize,
354        /// The last `lower` we have emitted in a batch description, if any. Whenever the
355        /// `persist_frontier` moves beyond this frontier, we need to mint a new description.
356        last_lower: Option<Antichain<Timestamp>>,
357        /// Whether we are operating in read-only mode.
358        ///
359        /// In read-only mode, minting of batch descriptions is disabled.
360        read_only: bool,
361    }
362
363    impl State {
364        fn new(
365            sink_id: GlobalId,
366            worker_count: usize,
367            as_of: Antichain<Timestamp>,
368            read_only: bool,
369            persist_rx: mpsc::UnboundedReceiver<Antichain<Timestamp>>,
370        ) -> Self {
371            // Initializing `persist_frontier` to the `as_of` ensures that the first minted batch
372            // description will have a `lower` of `as_of` or beyond, and thus that we don't spend
373            // effort writing out snapshots of data that is already in the shard.
374            let persist_frontier = as_of;
375
376            Self {
377                sink_id,
378                worker_count,
379                desired_frontiers: OkErr::new_frontiers(),
380                persist_frontier,
381                persist_rx: Some(persist_rx),
382                next_append_worker: 0,
383                last_lower: None,
384                read_only,
385            }
386        }
387
388        fn trace<S: AsRef<str>>(&self, message: S) {
389            let message = message.as_ref();
390            trace!(
391                sink_id = %self.sink_id,
392                desired_frontier = ?self.desired_frontiers.frontier().elements(),
393                persist_frontier = ?self.persist_frontier.elements(),
394                last_lower = ?self.last_lower,
395                message,
396            );
397        }
398
399        fn advance_desired_ok_frontier(&mut self, frontier: AntichainRef<Timestamp>) {
400            if advance(&mut self.desired_frontiers.ok, frontier) {
401                self.trace("advanced `desired` ok frontier");
402            }
403        }
404
405        fn advance_desired_err_frontier(&mut self, frontier: AntichainRef<Timestamp>) {
406            if advance(&mut self.desired_frontiers.err, frontier) {
407                self.trace("advanced `desired` err frontier");
408            }
409        }
410
411        fn advance_persist_frontier(&mut self, frontier: AntichainRef<Timestamp>) {
412            if advance(&mut self.persist_frontier, frontier) {
413                self.trace("advanced `persist` frontier");
414            }
415        }
416
417        /// Drain persist frontier updates from the Tokio task.
418        ///
419        /// Frontiers from the `persist_watch` task are monotonically increasing, so only the
420        /// most recent one matters. We drain all queued messages and apply just the latest,
421        /// avoiding redundant `advance_persist_frontier`/`trace!` calls when several updates
422        /// arrived between activations.
423        ///
424        /// The task sends the empty frontier as its final message before exiting. Once
425        /// received, we drop the receiver.
426        fn drain_persist_rx(&mut self, shared_frontier: &RefCell<Antichain<Timestamp>>) {
427            let Some(mut rx) = self.persist_rx.take() else {
428                return;
429            };
430            let mut latest: Option<Antichain<Timestamp>> = None;
431            let mut closed = false;
432            loop {
433                match rx.try_recv() {
434                    Ok(frontier) => {
435                        let done = frontier.is_empty();
436                        latest = Some(frontier);
437                        if done {
438                            closed = true;
439                            break;
440                        }
441                    }
442                    Err(mpsc::error::TryRecvError::Empty) => break,
443                    Err(mpsc::error::TryRecvError::Disconnected) => {
444                        panic!("mint persist_watch task unexpectedly gone");
445                    }
446                }
447            }
448            if let Some(frontier) = latest {
449                shared_frontier.borrow_mut().clone_from(&frontier);
450                self.advance_persist_frontier(frontier.borrow());
451            }
452            if !closed {
453                self.persist_rx = Some(rx);
454            }
455        }
456
457        fn allow_writes(&mut self) {
458            if self.read_only {
459                self.read_only = false;
460                self.trace("switched to write mode");
461            }
462        }
463
464        fn maybe_mint_batch_description(&mut self) -> Option<BatchDescription> {
465            let desired_frontier = self.desired_frontiers.frontier();
466            let persist_frontier = &self.persist_frontier;
467
468            // We only mint new batch descriptions when:
469            //  1. We are _not_ in read-only mode.
470            //  2. The `desired` frontier is ahead of the `persist` frontier.
471            //  3. The `persist` frontier advanced since we last emitted a batch description.
472            let desired_ahead = PartialOrder::less_than(persist_frontier, desired_frontier);
473            let persist_advanced = self.last_lower.as_ref().map_or(true, |lower| {
474                PartialOrder::less_than(lower, persist_frontier)
475            });
476
477            if self.read_only || !desired_ahead || !persist_advanced {
478                return None;
479            }
480
481            let lower = persist_frontier.clone();
482            let upper = desired_frontier.clone();
483            let append_worker = self.next_append_worker;
484            let desc = BatchDescription::new(lower, upper, append_worker);
485
486            self.next_append_worker = (append_worker + 1) % self.worker_count;
487            self.last_lower = Some(desc.lower.clone());
488
489            self.trace(format!("minted batch description: {desc:?}"));
490            Some(desc)
491        }
492    }
493}
494
495/// Implementation of the `write` operator.
496mod write {
497    use super::*;
498
499    use mz_timely_util::activator::ArcActivator;
500
501    /// Commands sent from the Timely operator to the Tokio write task.
502    enum WriteCommand {
503        /// A coalesced batch of work gathered during a single operator activation.
504        ///
505        /// The Timely closure accumulates all updates observed across the four data inputs plus
506        /// any frontier advancement and forced-consolidation flag, and sends a single
507        /// `WriteCommand::Batch` per activation. This keeps the channel overhead independent of
508        /// the number of Timely chunks processed per activation.
509        Batch(BatchUpdates),
510        /// Write a batch with the given description. The task drains corrections and writes
511        /// them to persist.
512        WriteBatch(BatchDescription),
513    }
514
515    /// The payload of a coalesced [`WriteCommand::Batch`].
516    struct BatchUpdates {
517        /// Positive contributions from the `desired` ok input.
518        desired_ok: Vec<(Row, Timestamp, Diff)>,
519        /// Positive contributions from the `desired` err input.
520        desired_err: Vec<(DataflowErrorSer, Timestamp, Diff)>,
521        /// Negative contributions from the `persist` ok input.
522        persist_ok: Vec<(Row, Timestamp, Diff)>,
523        /// Negative contributions from the `persist` err input.
524        persist_err: Vec<(DataflowErrorSer, Timestamp, Diff)>,
525        /// The new persist frontier, if it advanced this activation.
526        persist_frontier: Option<Antichain<Timestamp>>,
527        /// Whether a consolidation of the corrections buffer should be forced.
528        force_consolidation: bool,
529    }
530
531    impl BatchUpdates {
532        fn new() -> Self {
533            Self {
534                desired_ok: Vec::new(),
535                desired_err: Vec::new(),
536                persist_ok: Vec::new(),
537                persist_err: Vec::new(),
538                persist_frontier: None,
539                force_consolidation: false,
540            }
541        }
542
543        /// Returns true if there is no work in this batch.
544        fn is_empty(&self) -> bool {
545            self.desired_ok.is_empty()
546                && self.desired_err.is_empty()
547                && self.persist_ok.is_empty()
548                && self.persist_err.is_empty()
549                && self.persist_frontier.is_none()
550                && !self.force_consolidation
551        }
552    }
553
554    /// A response from the Tokio write task back to the Timely operator.
555    struct WriteResponse {
556        /// The written batch, or `None` if the corrections buffer had no updates.
557        batch: Option<ProtoBatch>,
558    }
559
560    /// Render the `write` operator.
561    ///
562    /// The parameters passed in are:
563    ///  * `sink_id`: The `GlobalId` of the sink export.
564    ///  * `persist_api`: An object providing access to the output persist shard.
565    ///  * `as_of`: The first time for which the sink may produce output.
566    ///  * `desired`: The ok/err streams that should be sinked to persist.
567    ///  * `persist`: The ok/err streams read back from the output persist shard.
568    ///  * `descs`: The stream of batch descriptions produced by the `mint` operator.
569    pub fn render<'s>(
570        sink_id: GlobalId,
571        persist_api: PersistApi,
572        as_of: Antichain<Timestamp>,
573        desired: DesiredStreams<'s>,
574        persist: PersistStreams<'s>,
575        descs: DescsStream<'s>,
576        mut read_only_rx: watch::Receiver<bool>,
577        worker_config: Rc<ConfigSet>,
578    ) -> BatchesStream<'s> {
579        let scope = desired.ok.scope();
580        let worker_id = scope.index();
581
582        let name = operator_name(sink_id, "write");
583        let mut builder = OperatorBuilderRc::new(name, scope.clone());
584        let info = builder.operator_info();
585
586        // Set up correction buffer logging. CorrectionLogger is not Send (uses timely
587        // loggers), so the Tokio task uses ChannelLogging to send events back to the
588        // Timely thread for application by the CorrectionLogger.
589        let mut channel_logging = None;
590        let mut correction_logger = None;
591        if let (Some(compute_logger), Some(differential_logger)) = (
592            scope.worker().logger_for("materialize/compute"),
593            scope.worker().logger_for("differential/arrange"),
594        ) {
595            let operator_info = builder.operator_info();
596            let (tx, rx) = mpsc::unbounded_channel();
597            channel_logging = Some(ChannelLogging::new(tx));
598            correction_logger = Some(CorrectionLogger::new(
599                compute_logger,
600                differential_logger.into(),
601                operator_info.global_id,
602                operator_info.address.to_vec(),
603                rx,
604            ));
605        }
606
607        // It is important that we exchange the `desired` and `persist` data the same way, so
608        // updates that cancel each other out end up on the same worker.
609        let exchange_ok = |(d, _, _): &(Row, Timestamp, Diff)| d.hashed();
610        let exchange_err = |(d, _, _): &(DataflowErrorSer, Timestamp, Diff)| d.hashed();
611
612        // Data inputs are created before the output, so they are not connected to it.
613        let mut desired_ok_input = builder.new_input(desired.ok, Exchange::new(exchange_ok));
614        let mut desired_err_input = builder.new_input(desired.err, Exchange::new(exchange_err));
615        let mut persist_ok_input = builder.new_input(persist.ok, Exchange::new(exchange_ok));
616        let mut persist_err_input = builder.new_input(persist.err, Exchange::new(exchange_err));
617        let mut descs_input = builder.new_input(descs, Pipeline);
618
619        // Only descs (input 4) is connected to the batches output.
620        let (batches_output, batches_output_stream) =
621            builder.new_output_connection([(4, Antichain::from_elem(Default::default()))]);
622        let mut batches_output = OutputBuilder::from(batches_output);
623
624        // Obtain SinkMetrics synchronously from the persist client cache, rather than through
625        // a WriteHandle, to avoid async I/O on the Timely thread.
626        let sink_metrics = persist_api.persist_clients.metrics().sink.clone();
627
628        // Construct corrections on the Timely thread (reads ConfigSet), then move to the
629        // Tokio task. The ChannelLogging sends events back to the Timely thread.
630        let worker_metrics = sink_metrics.for_worker(worker_id);
631        let mut corrections: OkErr<Correction<Row>, Correction<DataflowErrorSer>> = OkErr::new(
632            Correction::new(
633                sink_metrics.clone(),
634                worker_metrics.clone(),
635                channel_logging.clone(),
636                &worker_config,
637            ),
638            Correction::new(
639                sink_metrics.clone(),
640                worker_metrics,
641                channel_logging,
642                &worker_config,
643            ),
644        );
645
646        // Read `MV_SINK_ADVANCE_PERSIST_FRONTIERS` exactly once and reuse the captured value for
647        // both the Tokio-side `corrections.since` initialization below and the Timely-side
648        // `persist_frontiers` initialization in `State::new`. Re-reading the dyncfg per init site
649        // would let the value flip between reads and produce the very inconsistency this fix
650        // addresses: `persist_frontiers = as_of` (gate open) with `corrections.since = MIN`
651        // (snapshot updates not advanced) reproduces the original `UpdateNotBeyondLower` panic.
652        let advance_persist_frontiers_at_startup =
653            MV_SINK_ADVANCE_PERSIST_FRONTIERS.get(&worker_config);
654
655        // Mirror the persist-frontier initialization performed by `State::new` below. With the
656        // flag enabled, `State` advances its Timely-side `persist_frontiers` to `as_of`, opening
657        // the `maybe_start_batch` write gate (`desc.lower <= persist_frontiers.frontier()`)
658        // immediately for the first description minted with `lower = as_of`. The corrections
659        // buffer lives on the Tokio task and only learns of frontier advancements through
660        // `WriteCommand::Batch { persist_frontier, .. }`, which the Timely closure populates only
661        // when an input frontier actually moves. On startup, the input frontiers begin at
662        // `Timestamp::MIN`, so no `Batch` carries `persist_frontier` until the persist input
663        // catches up — yet a `WriteBatch(desc)` with `desc.lower = as_of` can already be sent.
664        // Snapshot-replay updates inserted in the meantime stay at their original timestamps
665        // (`Correction::insert` rounds to `max(t, since)` and `since == MIN`), and slip into the
666        // batch, tripping persist's `UpdateNotBeyondLower` invariant. Advancing
667        // `corrections.since` here keeps the Tokio side in lockstep with the Timely side, the
668        // same invariant `materialized_view::write::State::new` upholds via
669        // `apply_persist_frontier_advancement`.
670        if advance_persist_frontiers_at_startup {
671            corrections.ok.advance_since(as_of.clone());
672            corrections.err.advance_since(as_of.clone());
673        }
674
675        // Channels for commands and responses.
676        let (cmd_tx, mut cmd_rx) = mpsc::unbounded_channel::<WriteCommand>();
677        let (resp_tx, mut resp_rx) = mpsc::unbounded_channel::<WriteResponse>();
678
679        // Spawn Tokio task that owns the WriteHandle and corrections buffer.
680        let (activator, activation_ack) = ArcActivator::new(scope, &info);
681        let write_task_handle = {
682            mz_ore::task::spawn(
683                || operator_name(sink_id, "write::batch_writer"),
684                async move {
685                    let mut writer = persist_api.open_writer().await;
686
687                    while let Some(cmd) = cmd_rx.recv().await {
688                        apply_command(&mut corrections, &mut writer, cmd, &resp_tx).await;
689                        // Activate the operator to drain logging events and process batch responses.
690                        // ArcActivator suppresses redundant activations, so this is cheap.
691                        activator.activate();
692                    }
693                },
694            )
695            .abort_on_drop()
696        };
697
698        // In read-only mode, wake the operator when writes are allowed so the forced
699        // consolidation stops re-arming and the `WriteBatch` path takes over promptly. Without
700        // this the operator only learns of the transition on its next input activation, which an
701        // idle sink may not get for a while.
702        let read_only_watch_handle = (*read_only_rx.borrow()).then(|| {
703            let sync_activator = scope.worker().sync_activator_for(info.address.to_vec());
704            let mut rx = read_only_rx.clone();
705            mz_ore::task::spawn(
706                || operator_name(sink_id, "write::read_only_watch"),
707                async move {
708                    let _ = rx.changed().await;
709                    let _ = sync_activator.activate();
710                },
711            )
712            .abort_on_drop()
713        });
714
715        builder.build(move |capabilities| {
716            // We will use the data capabilities from the `descs` input to produce output, so no
717            // need to hold onto the initial capabilities.
718            drop(capabilities);
719
720            let read_only = *read_only_rx.borrow_and_update();
721            let mut state = State::new(
722                sink_id,
723                worker_id,
724                as_of,
725                advance_persist_frontiers_at_startup,
726                read_only,
727            );
728
729            // Whether a batch write is currently in flight in the Tokio task.
730            let mut batch_in_flight: Option<(BatchDescription, Capability<Timestamp>)> = None;
731
732            // CorrectionLogger lives on the Timely thread and drains events from
733            // the channel each activation. On drop, it drains remaining events and
734            // retracts all logged state.
735            let mut correction_logger = correction_logger;
736
737            move |frontiers| {
738                // Keep task handles alive so they are aborted when the operator is dropped.
739                let _ = &write_task_handle;
740                let _ = &read_only_watch_handle;
741
742                // Acknowledge activation so the Tokio task can activate us again.
743                activation_ack.ack();
744
745                // Drain logging events from the Tokio task's ChannelLogging.
746                if let Some(logger) = &mut correction_logger {
747                    logger.apply_events();
748                }
749                // Coalesce all work from this activation into a single command. This keeps
750                // per-chunk channel overhead low, which matters for hydration when many small
751                // Timely chunks arrive in a single activation sweep.
752                let mut batch = BatchUpdates::new();
753                desired_ok_input.for_each(|_cap, data| {
754                    batch.desired_ok.append(data);
755                });
756                desired_err_input.for_each(|_cap, data| {
757                    batch.desired_err.append(data);
758                });
759                persist_ok_input.for_each(|_cap, data| {
760                    batch.persist_ok.append(data);
761                });
762                persist_err_input.for_each(|_cap, data| {
763                    batch.persist_err.append(data);
764                });
765
766                // Accept batch descriptions.
767                descs_input.for_each(|cap, data| {
768                    let cap = cap.retain(0);
769                    for desc in data.drain(..) {
770                        state.absorb_batch_description(desc, cap.clone());
771                    }
772                });
773
774                // Track frontiers. Include the new persist frontier in the coalesced batch for
775                // `advance_since`.
776                state.advance_desired_ok_frontier(frontiers[0].frontier());
777                state.advance_desired_err_frontier(frontiers[1].frontier());
778                if state.advance_persist_ok_frontier(frontiers[2].frontier())
779                    | state.advance_persist_err_frontier(frontiers[3].frontier())
780                {
781                    batch.persist_frontier = Some(state.persist_frontiers.frontier().to_owned());
782                }
783                // Track read-only mode so the forced consolidation stops re-arming once writes
784                // are allowed and the `WriteBatch` path takes over driving consolidation.
785                if state.read_only && read_only_rx.has_changed().unwrap_or(false) {
786                    if !*read_only_rx.borrow_and_update() {
787                        state.set_read_only(false);
788                    }
789                }
790
791                if state.should_force_consolidation() {
792                    batch.force_consolidation = true;
793                }
794
795                if !batch.is_empty() {
796                    cmd_tx
797                        .send(WriteCommand::Batch(batch))
798                        .expect("write task unexpectedly gone");
799                }
800
801                // Try to receive batch results from the Tokio task.
802                loop {
803                    match resp_rx.try_recv() {
804                        Ok(resp) => {
805                            if let Some((desc, cap)) = batch_in_flight.take() {
806                                if let Some(batch) = resp.batch {
807                                    let mut out = batches_output.activate();
808                                    out.session(&cap).give((desc, batch));
809                                    state.trace("wrote a batch");
810                                } else {
811                                    state.trace("skipping empty batch");
812                                }
813                            }
814                        }
815                        Err(mpsc::error::TryRecvError::Empty) => break,
816                        Err(mpsc::error::TryRecvError::Disconnected) => {
817                            panic!("write task unexpectedly gone");
818                        }
819                    }
820                }
821
822                // If no batch in flight, try to write a new batch.
823                if batch_in_flight.is_none() {
824                    if let Some((desc, cap)) = state.maybe_start_batch(&cmd_tx) {
825                        batch_in_flight = Some((desc, cap));
826                    }
827                }
828            }
829        });
830
831        batches_output_stream
832    }
833
834    /// Apply a single command to the task state.
835    ///
836    /// `desired` updates enter `corrections` as positive contributions and `persist` updates as
837    /// negative contributions, so the buffer contains `desired - persist`, i.e. the updates that
838    /// need to be written to bring the shard in line with `desired`.
839    async fn apply_command(
840        corrections: &mut OkErr<Correction<Row>, Correction<DataflowErrorSer>>,
841        writer: &mut WriteHandle<SourceData, (), Timestamp, StorageDiff>,
842        cmd: WriteCommand,
843        resp_tx: &mpsc::UnboundedSender<WriteResponse>,
844    ) {
845        match cmd {
846            WriteCommand::Batch(mut batch) => {
847                // Apply the same logical sequence of operations that the per-chunk commands
848                // used to: positive desired inserts, negated persist inserts, then optional
849                // frontier advancement and forced consolidation.
850                if !batch.desired_ok.is_empty() {
851                    corrections.ok.insert(&mut batch.desired_ok);
852                }
853                if !batch.desired_err.is_empty() {
854                    corrections.err.insert(&mut batch.desired_err);
855                }
856                if !batch.persist_ok.is_empty() {
857                    corrections.ok.insert_negated(&mut batch.persist_ok);
858                }
859                if !batch.persist_err.is_empty() {
860                    corrections.err.insert_negated(&mut batch.persist_err);
861                }
862                if let Some(frontier) = batch.persist_frontier {
863                    // We will only emit times at or after the `persist` frontier, so now is a
864                    // good time to advance the times of stashed updates.
865                    corrections.ok.advance_since(frontier.clone());
866                    corrections.err.advance_since(frontier);
867                }
868                if batch.force_consolidation {
869                    corrections.ok.consolidate_at_since();
870                    corrections.err.consolidate_at_since();
871                }
872            }
873            WriteCommand::WriteBatch(desc) => {
874                // Chain ok and err correction iterators directly, avoiding an
875                // intermediate Vec allocation.
876                let oks = corrections
877                    .ok
878                    .updates_before(&desc.upper)
879                    .map(|(d, t, r)| ((SourceData(Ok(d)), ()), t, r.into_inner()));
880                let errs = corrections
881                    .err
882                    .updates_before(&desc.upper)
883                    .map(|(d, t, r)| ((SourceData(Err(d.deserialize())), ()), t, r.into_inner()));
884                let mut updates = oks.chain(errs).peekable();
885
886                if updates.peek().is_none() {
887                    // No corrections to write.
888                    let _ = resp_tx.send(WriteResponse { batch: None });
889                    return;
890                }
891
892                let batch = writer
893                    .batch(updates, desc.lower, desc.upper)
894                    .await
895                    .expect("valid usage");
896                let proto_batch = batch.into_transmittable_batch();
897                if let Err(err) = resp_tx.send(WriteResponse {
898                    batch: Some(proto_batch),
899                }) {
900                    let batch =
901                        writer.batch_from_transmittable_batch(err.0.batch.expect("just sent"));
902                    batch.delete().await;
903                }
904            }
905        }
906    }
907
908    /// State maintained by the `write` operator on the Timely thread.
909    struct State {
910        sink_id: GlobalId,
911        worker_id: usize,
912        /// The frontiers of the `desired` inputs.
913        desired_frontiers: OkErr<Antichain<Timestamp>, Antichain<Timestamp>>,
914        /// The frontiers of the `persist` inputs.
915        ///
916        /// Note that this is _not_ the same as the write frontier of the output persist shard! It
917        /// usually is, but during snapshot processing, these frontiers will start at the shard's
918        /// read frontier, so they can be beyond its write frontier. This is important as it means
919        /// we must not discard batch descriptions based on these persist frontiers: A batch
920        /// description might still be valid even if its `lower` is before the persist frontiers we
921        /// observe.
922        persist_frontiers: OkErr<Antichain<Timestamp>, Antichain<Timestamp>>,
923        /// The current valid batch description and associated output capability, if any.
924        batch_description: Option<(BatchDescription, Capability<Timestamp>)>,
925        /// A request to force a consolidation of corrections once both `desired_frontiers` and
926        /// `persist_frontiers` become greater than the given frontier.
927        ///
928        /// Normally we force a consolidation whenever we write a batch, but there are periods
929        /// (like read-only mode) when that doesn't happen, and we need to manually force
930        /// consolidation instead. In read-only mode this is re-armed after each firing so it
931        /// keeps sweeping forward as the frontiers advance (see `should_force_consolidation`).
932        force_consolidation_after: Option<Antichain<Timestamp>>,
933        /// Whether the sink is in read-only mode. While read-only the `write` operator mints no
934        /// batches, so the `WriteBatch` path never sweeps `consolidate_before(upper)` forward;
935        /// the forced consolidation stands in for it and is re-armed as long as this holds.
936        read_only: bool,
937    }
938
939    impl State {
940        fn new(
941            sink_id: GlobalId,
942            worker_id: usize,
943            as_of: Antichain<Timestamp>,
944            advance_persist_frontiers_at_startup: bool,
945            read_only: bool,
946        ) -> Self {
947            // Force a consolidation of corrections after the snapshot updates have been fully
948            // processed, to ensure we get rid of those as quickly as possible.
949            let force_consolidation_after = Some(as_of.clone());
950
951            let mut state = Self {
952                sink_id,
953                worker_id,
954                desired_frontiers: OkErr::new_frontiers(),
955                persist_frontiers: OkErr::new_frontiers(),
956                batch_description: None,
957                force_consolidation_after,
958                read_only,
959            };
960
961            // Immediately advance the persist frontier tracking to the `as_of`.
962            // This is important to ensure the persist sink doesn't get stuck if the output shard's
963            // initial frontier is less than the `as_of`. The `mint` operator first emits a batch
964            // description with `lower = as_of`, and the `write` operator only emits a batch when
965            // its observed persist frontier is >= the batch description's `lower`, which (assuming
966            // no other writers) would be never if we didn't advance the observed persist frontier
967            // to the `as_of`.
968            //
969            // The `must_use` bool returned by the advance helpers signals that a corresponding
970            // `corrections.advance_since` must be queued to the Tokio task. We drop it here
971            // because the Tokio-side `corrections.since` is initialized to the same `as_of` in
972            // `write::render` before the task is spawned (using the same captured flag value),
973            // keeping both sides in lockstep without an additional channel send.
974            if advance_persist_frontiers_at_startup {
975                let _ = state.advance_persist_ok_frontier(as_of.borrow());
976                let _ = state.advance_persist_err_frontier(as_of.borrow());
977            }
978
979            state
980        }
981
982        fn trace<S: AsRef<str>>(&self, message: S) {
983            let message = message.as_ref();
984            trace!(
985                sink_id = %self.sink_id,
986                worker = %self.worker_id,
987                desired_frontier = ?self.desired_frontiers.frontier().elements(),
988                persist_frontier = ?self.persist_frontiers.frontier().elements(),
989                batch_description = ?self.batch_description.as_ref().map(|(d, _)| d),
990                message,
991            );
992        }
993
994        fn advance_desired_ok_frontier(&mut self, frontier: AntichainRef<Timestamp>) {
995            if advance(&mut self.desired_frontiers.ok, frontier) {
996                self.trace("advanced `desired` ok frontier");
997            }
998        }
999
1000        fn advance_desired_err_frontier(&mut self, frontier: AntichainRef<Timestamp>) {
1001            if advance(&mut self.desired_frontiers.err, frontier) {
1002                self.trace("advanced `desired` err frontier");
1003            }
1004        }
1005
1006        /// Returns true if the persist frontier advanced.
1007        ///
1008        /// The caller must propagate a `true` return value into the next `WriteCommand::Batch`'s
1009        /// `persist_frontier` field so the Tokio task advances `corrections.since` accordingly.
1010        /// Dropping the bool leaves the Tokio-side `since` lagging behind `persist_frontiers`,
1011        /// which can let updates with timestamps below `desc.lower` slip into a written batch.
1012        #[must_use = "advance_persist_ok_frontier's return value gates a `corrections.advance_since` send to the Tokio task"]
1013        fn advance_persist_ok_frontier(&mut self, frontier: AntichainRef<Timestamp>) -> bool {
1014            if advance(&mut self.persist_frontiers.ok, frontier) {
1015                self.trace("advanced `persist` ok frontier");
1016                true
1017            } else {
1018                false
1019            }
1020        }
1021
1022        /// Returns true if the persist frontier advanced.
1023        ///
1024        /// The caller must propagate a `true` return value into the next `WriteCommand::Batch`'s
1025        /// `persist_frontier` field so the Tokio task advances `corrections.since` accordingly.
1026        /// Dropping the bool leaves the Tokio-side `since` lagging behind `persist_frontiers`,
1027        /// which can let updates with timestamps below `desc.lower` slip into a written batch.
1028        #[must_use = "advance_persist_err_frontier's return value gates a `corrections.advance_since` send to the Tokio task"]
1029        fn advance_persist_err_frontier(&mut self, frontier: AntichainRef<Timestamp>) -> bool {
1030            if advance(&mut self.persist_frontiers.err, frontier) {
1031                self.trace("advanced `persist` err frontier");
1032                true
1033            } else {
1034                false
1035            }
1036        }
1037
1038        /// Check if a forced consolidation should be triggered.
1039        fn should_force_consolidation(&mut self) -> bool {
1040            let Some(request) = &self.force_consolidation_after else {
1041                return false;
1042            };
1043
1044            let desired_frontier = self.desired_frontiers.frontier();
1045            let persist_frontier = self.persist_frontiers.frontier();
1046            if PartialOrder::less_than(request, desired_frontier)
1047                && PartialOrder::less_than(request, persist_frontier)
1048            {
1049                self.trace("requesting correction consolidation");
1050                // In read-only mode the sink mints no batches, so the `WriteBatch` path never
1051                // sweeps `consolidate_before(upper)` forward and a single forced consolidation
1052                // only covers the band below the `since` at the moment it fires, leaving the
1053                // forward region uncancelled for the whole read-only window (CLU-131). Re-arm at
1054                // the current persist frontier so the forced consolidation keeps sweeping forward
1055                // as `desired`/`persist` advance, mirroring the read-write path. Once writes are
1056                // allowed, the `WriteBatch` path takes over and we disarm.
1057                self.force_consolidation_after = if self.read_only {
1058                    Some(persist_frontier.to_owned())
1059                } else {
1060                    None
1061                };
1062                true
1063            } else {
1064                false
1065            }
1066        }
1067
1068        /// Update read-only mode. While read-only the forced consolidation re-arms itself; once
1069        /// writes are allowed the still-armed request fires one final time and then disarms, after
1070        /// which the `WriteBatch` path drives consolidation.
1071        fn set_read_only(&mut self, read_only: bool) {
1072            self.read_only = read_only;
1073        }
1074
1075        fn absorb_batch_description(&mut self, desc: BatchDescription, cap: Capability<Timestamp>) {
1076            // Enforce monotonicity: drop descriptions whose `lower` regresses below the one we
1077            // already hold. The `mint` operator only emits strictly increasing `lower`s
1078            // (invariant 1), so a regression means this description is outdated. We cannot use
1079            // `persist_frontiers` for the same check, because during snapshot processing those
1080            // frontiers can be ahead of the shard's write frontier and a still-valid description
1081            // may have a `lower` below them.
1082            if let Some((prev, _)) = &self.batch_description {
1083                if PartialOrder::less_than(&desc.lower, &prev.lower) {
1084                    self.trace(format!("skipping outdated batch description: {desc:?}"));
1085                    return;
1086                }
1087            }
1088
1089            self.batch_description = Some((desc, cap));
1090            self.trace("set batch description");
1091        }
1092
1093        /// Check if a batch can be written and send a write command to the Tokio task if so.
1094        fn maybe_start_batch(
1095            &mut self,
1096            cmd_tx: &mpsc::UnboundedSender<WriteCommand>,
1097        ) -> Option<(BatchDescription, Capability<Timestamp>)> {
1098            let (desc, _cap) = self.batch_description.as_ref()?;
1099
1100            // We can write a new batch if we have seen all `persist` updates before `lower` and
1101            // all `desired` updates before `upper`.
1102            let persist_ready =
1103                PartialOrder::less_equal(&desc.lower, self.persist_frontiers.frontier());
1104            let desired_ready =
1105                PartialOrder::less_equal(&desc.upper, self.desired_frontiers.frontier());
1106            if !persist_ready || !desired_ready {
1107                return None;
1108            }
1109
1110            self.trace("write batch description");
1111            let (desc, cap) = self.batch_description.take()?;
1112            cmd_tx
1113                .send(WriteCommand::WriteBatch(desc.clone()))
1114                .expect("write task unexpectedly gone");
1115            Some((desc, cap))
1116        }
1117    }
1118}
1119
1120/// Implementation of the `append` operator.
1121mod append {
1122    use super::*;
1123
1124    /// Commands sent from the Timely operator to the Tokio append task.
1125    enum AppendCommand {
1126        /// A new batch description has been received.
1127        Description(BatchDescription),
1128        /// A written batch has been received.
1129        Batch(ProtoBatch),
1130        /// The batches frontier has advanced.
1131        BatchesFrontier(Antichain<Timestamp>),
1132    }
1133
1134    /// Render the `append` operator.
1135    ///
1136    /// The parameters passed in are:
1137    ///  * `sink_id`: The `GlobalId` of the sink export.
1138    ///  * `persist_api`: An object providing access to the output persist shard.
1139    ///  * `descs`: The stream of batch descriptions produced by the `mint` operator.
1140    ///  * `batches`: The stream of written batches produced by the `write` operator.
1141    pub fn render<'s>(
1142        sink_id: GlobalId,
1143        persist_api: PersistApi,
1144        descs: DescsStream<'s>,
1145        batches: BatchesStream<'s>,
1146    ) {
1147        let scope = descs.scope();
1148        let worker_id = scope.index();
1149
1150        let name = operator_name(sink_id, "append");
1151        let mut builder = OperatorBuilderRc::new(name, scope.clone());
1152        let mut descs_input = builder.new_input(descs, Pipeline);
1153        let batch_exchange =
1154            Exchange::new(|(desc, _): &(BatchDescription, _)| u64::cast_from(desc.append_worker));
1155        let mut batches_input = builder.new_input(batches, batch_exchange);
1156
1157        // Channel for commands to the Tokio append task.
1158        let (cmd_tx, mut cmd_rx) = mpsc::unbounded_channel::<AppendCommand>();
1159
1160        // Spawn Tokio task that owns the append state machine.
1161        let append_task_handle =
1162            mz_ore::task::spawn(|| operator_name(sink_id, "append"), async move {
1163                let writer = persist_api.open_writer().await;
1164                let mut state = State::new(sink_id, worker_id, writer);
1165
1166                while let Some(cmd) = cmd_rx.recv().await {
1167                    match cmd {
1168                        AppendCommand::Description(desc) => {
1169                            state.absorb_batch_description(desc).await;
1170                            state.maybe_append_batches().await;
1171                        }
1172                        AppendCommand::Batch(batch) => {
1173                            state.absorb_batch(batch).await;
1174                        }
1175                        AppendCommand::BatchesFrontier(frontier) => {
1176                            state.advance_batches_frontier(frontier.borrow());
1177                            state.maybe_append_batches().await;
1178                        }
1179                    }
1180                }
1181            })
1182            .abort_on_drop();
1183
1184        builder.build(move |_capabilities| {
1185            let mut prev_batches_frontier = Antichain::from_elem(Timestamp::MIN);
1186
1187            move |frontiers| {
1188                // Keep task handle alive so it is aborted when the operator is dropped.
1189                let _ = &append_task_handle;
1190
1191                // Forward batch descriptions to the Tokio task.
1192                descs_input.for_each(|_cap, data| {
1193                    for desc in data.drain(..) {
1194                        cmd_tx
1195                            .send(AppendCommand::Description(desc))
1196                            .expect("append task unexpectedly gone");
1197                    }
1198                });
1199
1200                // Forward batches to the Tokio task.
1201                batches_input.for_each(|_cap, data| {
1202                    for (_desc, batch) in data.drain(..) {
1203                        // The batch description is only used for routing and we ignore it
1204                        // here since we already get one from `descs_input`.
1205                        cmd_tx
1206                            .send(AppendCommand::Batch(batch))
1207                            .expect("append task unexpectedly gone");
1208                    }
1209                });
1210
1211                // Forward batches frontier advancements *after* the per-activation
1212                // `Description`/`Batch` sends above. The Tokio task drains commands FIFO and only
1213                // calls `maybe_append_batches` on `Description`/`BatchesFrontier`; if a frontier
1214                // advance arrived before its batches, the task could append an incomplete set.
1215                // See module-level docs for the full ordering invariant.
1216                let new_batches_frontier = frontiers[1].frontier();
1217                if PartialOrder::less_than(&prev_batches_frontier.borrow(), &new_batches_frontier) {
1218                    prev_batches_frontier.clear();
1219                    prev_batches_frontier.extend(new_batches_frontier.iter().cloned());
1220                    cmd_tx
1221                        .send(AppendCommand::BatchesFrontier(
1222                            new_batches_frontier.to_owned(),
1223                        ))
1224                        .expect("append task unexpectedly gone");
1225                }
1226            }
1227        });
1228    }
1229
1230    /// State maintained by the `append` Tokio task.
1231    struct State {
1232        sink_id: GlobalId,
1233        worker_id: usize,
1234        persist_writer: WriteHandle<SourceData, (), Timestamp, StorageDiff>,
1235        /// The current input frontier of `batches`.
1236        batches_frontier: Antichain<Timestamp>,
1237        /// The greatest observed `lower` from both `descs` and `batches`.
1238        lower: Antichain<Timestamp>,
1239        /// The batch description for `lower`, if any.
1240        batch_description: Option<BatchDescription>,
1241        /// Batches received for `lower`.
1242        batches: Vec<Batch<SourceData, (), Timestamp, StorageDiff>>,
1243    }
1244
1245    impl State {
1246        fn new(
1247            sink_id: GlobalId,
1248            worker_id: usize,
1249            persist_writer: WriteHandle<SourceData, (), Timestamp, StorageDiff>,
1250        ) -> Self {
1251            Self {
1252                sink_id,
1253                worker_id,
1254                persist_writer,
1255                batches_frontier: Antichain::from_elem(Timestamp::MIN),
1256                lower: Antichain::from_elem(Timestamp::MIN),
1257                batch_description: None,
1258                batches: Default::default(),
1259            }
1260        }
1261
1262        fn trace<S: AsRef<str>>(&self, message: S) {
1263            let message = message.as_ref();
1264            trace!(
1265                sink_id = %self.sink_id,
1266                worker = %self.worker_id,
1267                batches_frontier = ?self.batches_frontier.elements(),
1268                lower = ?self.lower.elements(),
1269                batch_description = ?self.batch_description,
1270                message,
1271            );
1272        }
1273
1274        fn advance_batches_frontier(&mut self, frontier: AntichainRef<Timestamp>) {
1275            if advance(&mut self.batches_frontier, frontier) {
1276                self.trace("advanced `batches` frontier");
1277            }
1278        }
1279
1280        /// Advance the current `lower`.
1281        ///
1282        /// Discards all currently stashed batches and batch descriptions, assuming that they are
1283        /// now invalid.
1284        async fn advance_lower(&mut self, frontier: Antichain<Timestamp>) {
1285            assert!(PartialOrder::less_than(&self.lower, &frontier));
1286
1287            self.lower = frontier;
1288            self.batch_description = None;
1289
1290            // Remove stashed batches, cleaning up those we didn't append.
1291            for batch in self.batches.drain(..) {
1292                batch.delete().await;
1293            }
1294
1295            self.trace("advanced `lower`");
1296        }
1297
1298        /// Absorb the given batch description into the state, provided it is not outdated.
1299        async fn absorb_batch_description(&mut self, desc: BatchDescription) {
1300            if PartialOrder::less_than(&self.lower, &desc.lower) {
1301                self.advance_lower(desc.lower.clone()).await;
1302            } else if &self.lower != &desc.lower {
1303                self.trace(format!("skipping outdated batch description: {desc:?}"));
1304                return;
1305            }
1306
1307            if desc.append_worker == self.worker_id {
1308                self.batch_description = Some(desc);
1309                self.trace("set batch description");
1310            }
1311        }
1312
1313        /// Absorb the given batch into the state, provided it is not outdated.
1314        async fn absorb_batch(&mut self, batch: ProtoBatch) {
1315            let batch = self.persist_writer.batch_from_transmittable_batch(batch);
1316            if PartialOrder::less_than(&self.lower, batch.lower()) {
1317                self.advance_lower(batch.lower().clone()).await;
1318            } else if &self.lower != batch.lower() {
1319                self.trace(format!(
1320                    "skipping outdated batch: ({:?}, {:?})",
1321                    batch.lower().elements(),
1322                    batch.upper().elements(),
1323                ));
1324
1325                // Ensure the batch's data gets properly cleaned up before dropping it.
1326                batch.delete().await;
1327                return;
1328            }
1329
1330            self.batches.push(batch);
1331            self.trace("absorbed a batch");
1332        }
1333
1334        async fn maybe_append_batches(&mut self) {
1335            let batches_complete = PartialOrder::less_than(&self.lower, &self.batches_frontier);
1336            if !batches_complete {
1337                return;
1338            }
1339
1340            let Some(desc) = self.batch_description.take() else {
1341                return;
1342            };
1343
1344            let new_lower = match self.append_batches(desc).await {
1345                Ok(shard_upper) => {
1346                    self.trace("appended a batch");
1347                    shard_upper
1348                }
1349                Err(shard_upper) => {
1350                    // Failing the append is expected in the presence of concurrent replicas. There
1351                    // is nothing special to do here: The self-correcting feedback mechanism
1352                    // ensures that we observe the concurrent changes, compute their consequences,
1353                    // and append them at a future time.
1354                    self.trace(format!(
1355                        "append failed due to `lower` mismatch: {:?}",
1356                        shard_upper.elements(),
1357                    ));
1358                    shard_upper
1359                }
1360            };
1361
1362            self.advance_lower(new_lower).await;
1363        }
1364
1365        /// Append the current `batches` to the output shard.
1366        ///
1367        /// Returns whether the append was successful or not, and the current shard upper in either
1368        /// case.
1369        ///
1370        /// This method advances the shard upper to the batch `lower` if necessary. This is the
1371        /// mechanism that brings the shard upper to the sink as-of when appending the initial
1372        /// batch.
1373        ///
1374        /// An alternative mechanism for bringing the shard upper to the sink as-of would be making
1375        /// a single append at operator startup. The reason we are doing it here instead is that it
1376        /// simplifies the implementation of read-only mode. In read-only mode we have to defer any
1377        /// persist writes, including the initial upper bump. Having only a single place that
1378        /// performs writes makes it easy to ensure we are doing that correctly.
1379        async fn append_batches(
1380            &mut self,
1381            desc: BatchDescription,
1382        ) -> Result<Antichain<Timestamp>, Antichain<Timestamp>> {
1383            let (lower, upper) = (desc.lower, desc.upper);
1384            let mut to_append: Vec<_> = self.batches.iter_mut().collect();
1385
1386            loop {
1387                let result = self
1388                    .persist_writer
1389                    .compare_and_append_batch(&mut to_append, lower.clone(), upper.clone(), true)
1390                    .await
1391                    .expect("valid usage");
1392
1393                match result {
1394                    Ok(()) => return Ok(upper),
1395                    Err(mismatch) if PartialOrder::less_than(&mismatch.current, &lower) => {
1396                        advance_shard_upper(&mut self.persist_writer, lower.clone()).await;
1397
1398                        // At this point the shard's since and upper are likely the same, a state
1399                        // that is likely to hit edge-cases in logic reasoning about frontiers.
1400                        fail::fail_point!("mv_advanced_upper");
1401                    }
1402                    Err(mismatch) => return Err(mismatch.current),
1403                }
1404            }
1405        }
1406    }
1407
1408    /// Advance the frontier of the given writer's shard to at least the given `upper`.
1409    async fn advance_shard_upper(
1410        persist_writer: &mut WriteHandle<SourceData, (), Timestamp, StorageDiff>,
1411        upper: Antichain<Timestamp>,
1412    ) {
1413        let empty_updates: &[((SourceData, ()), Timestamp, StorageDiff)] = &[];
1414        let lower = Antichain::from_elem(Timestamp::MIN);
1415        persist_writer
1416            .append(empty_updates, lower, upper)
1417            .await
1418            .expect("valid usage")
1419            .expect("should always succeed");
1420    }
1421}