mz_compute/sink/materialized_view_v2.rs
1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Sync Timely operator implementation of the MV sink.
11//!
12//! This module provides an alternative implementation of `persist_sink` that uses sync Timely
13//! operators communicating with Tokio tasks via channels, instead of async Timely operators.
14//! Gated behind the `ENABLE_SYNC_MV_SINK` dyncfg.
15//!
16//! See the [main module](super::materialized_view) for the operator graph and design docs.
17//!
18//! ### Channel ordering requirements
19//!
20//! Each operator splits state across a Timely thread (which observes inputs and frontiers) and a
21//! Tokio task (which owns persist I/O state). They communicate via `mpsc` command channels, which
22//! preserve send order on a single sender. Each operator instance constructs its own
23//! `(tx, rx)` pair inside `render` and never clones the sender, so there is exactly one producer
24//! per channel — sends are totally ordered. Different worker instances of the same operator
25//! never share a channel, so cross-worker ordering is not a concern. The correctness of the
26//! operators relies on a few ordering invariants between the messages sent within a single Timely
27//! activation:
28//!
29//! * **`mint`**: the `persist_watch` Tokio task is the sole producer of persist-frontier updates
30//! and emits them in monotonically increasing order, terminated by the empty frontier. The
31//! Timely closure drains the receiver each activation; processing in receive order is therefore
32//! sufficient. No cross-channel ordering is needed because `mint` only has the one channel.
33//!
34//! * **`write`**: per-activation, the Timely closure first appends all observed input data into
35//! a single `WriteCommand::Batch` and only then sends a `WriteCommand::WriteBatch` (issued from
36//! `maybe_start_batch` after frontier checks). The Tokio task processes commands FIFO, so a
37//! `WriteBatch` is guaranteed to see every `Batch` from the same activation already applied to
38//! the corrections buffer. Reversing this order would let the task write a batch that is
39//! missing updates the Timely closure already observed.
40//!
41//! * **`append`**: per-activation, the Timely closure forwards messages in the order
42//! `Description` → `Batch` → `BatchesFrontier`. The first two carry the data the task needs
43//! to absorb; `BatchesFrontier` is the trigger that allows `maybe_append_batches` to fire.
44//! Sending `BatchesFrontier` *after* its corresponding `Batch` messages ensures the task does
45//! not append a batch description before all batches contributing to it have been absorbed.
46//! If the order were reversed, `maybe_append_batches` could fire on an incomplete `batches`
47//! set and miss writes.
48
49use std::any::Any;
50use std::cell::RefCell;
51use std::rc::Rc;
52use std::sync::Arc;
53
54use differential_dataflow::{Hashable, VecCollection};
55use mz_compute_types::dyncfgs::MV_SINK_ADVANCE_PERSIST_FRONTIERS;
56use mz_dyncfg::ConfigSet;
57use mz_ore::cast::CastFrom;
58use mz_persist_client::batch::{Batch, ProtoBatch};
59use mz_persist_client::write::WriteHandle;
60use mz_repr::{Diff, GlobalId, Row, Timestamp};
61use mz_storage_types::StorageDiff;
62use mz_storage_types::sources::SourceData;
63use timely::PartialOrder;
64use timely::dataflow::channels::pact::{Exchange, Pipeline};
65use timely::dataflow::operators::generic::OutputBuilder;
66use timely::dataflow::operators::generic::builder_rc::OperatorBuilder as OperatorBuilderRc;
67use timely::dataflow::operators::vec::Broadcast;
68use timely::dataflow::operators::{Capability, CapabilitySet};
69use timely::progress::Antichain;
70use timely::progress::frontier::AntichainRef;
71use tokio::sync::{mpsc, watch};
72use tracing::trace;
73
74use crate::compute_state::ComputeState;
75use crate::render::StartSignal;
76use crate::render::errors::DataflowErrorSer;
77use crate::sink::correction::{ChannelLogging, Correction, CorrectionLogger};
78use crate::sink::materialized_view::{
79 BatchDescription, BatchesStream, DescsStream, DesiredStreams, OkErr, PersistApi,
80 PersistStreams, SharedSinkFrontier, advance, operator_name, persist_source,
81};
82
83/// Renders an MV sink writing the given desired collection into the `target` persist collection.
84///
85/// This is the sync Timely operator implementation, using Tokio tasks for I/O.
86pub(super) fn persist_sink<'s>(
87 sink_id: GlobalId,
88 target: &mz_storage_types::controller::CollectionMetadata,
89 ok_collection: VecCollection<'s, Timestamp, Row, Diff>,
90 err_collection: VecCollection<'s, Timestamp, DataflowErrorSer, Diff>,
91 as_of: Antichain<Timestamp>,
92 compute_state: &mut ComputeState,
93 start_signal: StartSignal,
94 read_only_rx: watch::Receiver<bool>,
95) -> Rc<dyn Any> {
96 let scope = ok_collection.scope();
97 let desired = OkErr::new(ok_collection.inner, err_collection.inner);
98
99 // Read back the persist shard.
100 let (persist, persist_token) =
101 persist_source(scope, sink_id, target.clone(), compute_state, start_signal);
102
103 let persist_api = PersistApi {
104 persist_clients: Arc::clone(&compute_state.persist_clients),
105 collection: target.clone(),
106 shard_name: sink_id.to_string(),
107 purpose: format!("MV sink {sink_id}"),
108 };
109
110 let (desired, descs, sink_frontier) = mint::render(
111 sink_id,
112 persist_api.clone(),
113 as_of.clone(),
114 read_only_rx,
115 desired,
116 );
117
118 // Broadcast batch descriptions to all workers, regardless of whether or not they are
119 // responsible for the append, to give them a chance to clean up any outdated state they
120 // might still hold.
121 let descs = descs.broadcast();
122
123 let batches = write::render(
124 sink_id,
125 persist_api.clone(),
126 as_of,
127 desired,
128 persist,
129 descs.clone(),
130 Rc::clone(&compute_state.worker_config),
131 );
132
133 append::render(sink_id, persist_api, descs, batches);
134
135 // Report sink frontier updates to the `ComputeState`.
136 let collection = compute_state.expect_collection_mut(sink_id);
137 collection.sink_write_frontier = Some(sink_frontier);
138
139 Rc::new(persist_token)
140}
141
142/// Implementation of the `mint` operator.
143mod mint {
144 use super::*;
145 use timely::progress::frontier::AntichainRef;
146
147 /// Render the `mint` operator.
148 ///
149 /// The parameters passed in are:
150 /// * `sink_id`: The `GlobalId` of the sink export.
151 /// * `persist_api`: An object providing access to the output persist shard.
152 /// * `as_of`: The first time for which the sink may produce output.
153 /// * `read_only_rx`: A receiver that reports the sink is in read-only mode.
154 /// * `desired`: The ok/err streams that should be sinked to persist.
155 pub fn render<'s>(
156 sink_id: GlobalId,
157 persist_api: PersistApi,
158 as_of: Antichain<Timestamp>,
159 mut read_only_rx: watch::Receiver<bool>,
160 desired: DesiredStreams<'s>,
161 ) -> (DesiredStreams<'s>, DescsStream<'s>, SharedSinkFrontier) {
162 let scope = desired.ok.scope();
163 let worker_id = scope.index();
164 let worker_count = scope.peers();
165
166 // Determine the active worker for the mint operator.
167 let active_worker_id = usize::cast_from(sink_id.hashed()) % scope.peers();
168
169 let sink_frontier = Rc::new(RefCell::new(Antichain::from_elem(Timestamp::MIN)));
170 let shared_frontier = Rc::clone(&sink_frontier);
171
172 let name = operator_name(sink_id, "mint");
173 let mut builder = OperatorBuilderRc::new(name, scope.clone());
174 let info = builder.operator_info();
175
176 // Create outputs (before inputs, so no input connections yet).
177 let (ok_output, ok_stream) = builder.new_output();
178 let (err_output, err_stream) = builder.new_output();
179 let (desc_output, desc_stream) = builder.new_output();
180
181 let mut ok_output = OutputBuilder::from(ok_output);
182 let mut err_output = OutputBuilder::from(err_output);
183 let mut desc_output = OutputBuilder::from(desc_output);
184
185 // desired_ok -> output 0 (ok passthrough)
186 let mut desired_ok_input = builder.new_input_connection(
187 desired.ok,
188 Pipeline,
189 [(0, Antichain::from_elem(Default::default()))],
190 );
191 // desired_err -> output 1 (err passthrough)
192 let mut desired_err_input = builder.new_input_connection(
193 desired.err,
194 Pipeline,
195 [(1, Antichain::from_elem(Default::default()))],
196 );
197
198 // Set up background tasks and state for the active worker only.
199 let mut task_handles = Vec::new();
200 let read_only = *read_only_rx.borrow_and_update();
201 let mut state = None;
202 if worker_id == active_worker_id {
203 // Spawn a Tokio task to watch the persist shard's upper frontier.
204 //
205 // We collect the persist frontier from a write handle directly, rather than
206 // inspecting the `persist` stream, because the latter has two annoying glitches:
207 // (a) It starts at the shard's read frontier, not its write frontier.
208 // (b) It can lag behind if there are spikes in ingested data.
209 //
210 // The decoupling from the `persist` stream is load-bearing: that stream can fall
211 // arbitrarily behind the shard upper during snapshot replay or write spikes. Using
212 // it would delay both (1) the controller-visible sink frontier (`shared_frontier`),
213 // which previously caused a CrossJoin feature-bench regression where the controller
214 // held a finished MV dataflow open waiting for the empty frontier, and (2) the
215 // `state.persist_frontier` that gates batch-description minting, stalling mint until
216 // the read-back stream catches up. The goal isn't tick-granular descriptions per
217 // se — it's avoiding the stream-induced stall. See 5eab5ff896 for the original
218 // regression and rationale.
219 //
220 // The task sends the empty frontier as its final message before exiting. The
221 // operator drops `persist_rx` once it receives the empty frontier.
222 let (persist_tx, persist_rx) = mpsc::unbounded_channel();
223 let sync_activator = scope.worker().sync_activator_for(info.address.to_vec());
224 let handle = mz_ore::task::spawn(
225 || operator_name(sink_id, "mint::persist_watch"),
226 async move {
227 let mut writer = persist_api.open_writer().await;
228 let mut frontier = Antichain::from_elem(Timestamp::MIN);
229 loop {
230 writer.wait_for_upper_past(&frontier).await;
231 frontier = writer.upper().clone();
232 if persist_tx.send(frontier.clone()).is_err() {
233 return;
234 }
235 if sync_activator.activate().is_err() {
236 return;
237 }
238 if frontier.is_empty() {
239 return;
240 }
241 }
242 },
243 );
244 task_handles.push(handle.abort_on_drop());
245
246 // Spawn a Tokio task to wake the operator when read-only mode changes.
247 if read_only {
248 let sync_activator = scope.worker().sync_activator_for(info.address.to_vec());
249 let mut rx = read_only_rx.clone();
250 let handle = mz_ore::task::spawn(
251 || format!("mv_sink({sink_id})::mint::read_only_watch"),
252 async move {
253 let _ = rx.changed().await;
254 let _ = sync_activator.activate();
255 },
256 );
257 task_handles.push(handle.abort_on_drop());
258 }
259
260 state = Some(State::new(
261 sink_id,
262 worker_count,
263 as_of,
264 read_only,
265 persist_rx,
266 ));
267 }
268
269 builder.build(move |capabilities| {
270 // Passing through the `desired` streams only requires data capabilities, so we can
271 // immediately drop their initial capabilities here.
272 let [_, _, desc_cap]: [_; 3] =
273 capabilities.try_into().expect("one capability per output");
274
275 let mut cap_set = if state.is_some() {
276 Some(CapabilitySet::from_elem(desc_cap))
277 } else {
278 drop(desc_cap);
279 shared_frontier.borrow_mut().clear();
280 None
281 };
282
283 move |frontiers| {
284 // Keep task handles alive so they are aborted when the operator is dropped.
285 let _ = &task_handles;
286
287 // Pass through desired data.
288 let mut ok_out = ok_output.activate();
289 desired_ok_input.for_each(|cap, data| {
290 ok_out.session(&cap).give_container(data);
291 });
292 let mut err_out = err_output.activate();
293 desired_err_input.for_each(|cap, data| {
294 err_out.session(&cap).give_container(data);
295 });
296
297 let Some(state) = &mut state else {
298 // Non-active worker: just pass through data.
299 return;
300 };
301 let cap_set = cap_set.as_mut().unwrap();
302
303 // Track desired frontiers.
304 state.advance_desired_ok_frontier(frontiers[0].frontier());
305 state.advance_desired_err_frontier(frontiers[1].frontier());
306
307 state.drain_persist_rx(&shared_frontier);
308
309 // Check read-only mode.
310 if state.read_only && read_only_rx.has_changed().unwrap_or(false) {
311 if !*read_only_rx.borrow_and_update() {
312 state.allow_writes();
313 }
314 }
315
316 // Try to mint a batch description.
317 let mut desc_out = desc_output.activate();
318 if let Some(desc) = state.maybe_mint_batch_description() {
319 let lower_ts = *desc.lower.as_option().expect("not empty");
320 let cap = cap_set.delayed(&lower_ts);
321 desc_out.session(&cap).give(desc);
322
323 // We only emit strictly increasing `lower`s, so we can let our output frontier
324 // advance beyond the current `lower`.
325 cap_set.downgrade([lower_ts.step_forward()]);
326 } else {
327 // The next emitted `lower` will be at least the `persist` frontier, so we can
328 // advance our output frontier as far.
329 let _ = cap_set.try_downgrade(state.persist_frontier.iter());
330 }
331 }
332 });
333
334 let desired_output_streams = OkErr::new(ok_stream, err_stream);
335 (desired_output_streams, desc_stream, sink_frontier)
336 }
337
338 /// State maintained by the `mint` operator.
339 struct State {
340 sink_id: GlobalId,
341 /// The number of workers in the Timely cluster.
342 worker_count: usize,
343 /// The frontiers of the `desired` inputs.
344 desired_frontiers: OkErr<Antichain<Timestamp>, Antichain<Timestamp>>,
345 /// The frontier of the target persist shard.
346 persist_frontier: Antichain<Timestamp>,
347 /// Receiver for persist frontier updates from the Tokio persist_watch task.
348 ///
349 /// Dropped once the empty frontier is received (the task's shutdown signal).
350 persist_rx: Option<mpsc::UnboundedReceiver<Antichain<Timestamp>>>,
351 /// The append worker for the next batch description, chosen in round-robin fashion.
352 next_append_worker: usize,
353 /// The last `lower` we have emitted in a batch description, if any. Whenever the
354 /// `persist_frontier` moves beyond this frontier, we need to mint a new description.
355 last_lower: Option<Antichain<Timestamp>>,
356 /// Whether we are operating in read-only mode.
357 ///
358 /// In read-only mode, minting of batch descriptions is disabled.
359 read_only: bool,
360 }
361
362 impl State {
363 fn new(
364 sink_id: GlobalId,
365 worker_count: usize,
366 as_of: Antichain<Timestamp>,
367 read_only: bool,
368 persist_rx: mpsc::UnboundedReceiver<Antichain<Timestamp>>,
369 ) -> Self {
370 // Initializing `persist_frontier` to the `as_of` ensures that the first minted batch
371 // description will have a `lower` of `as_of` or beyond, and thus that we don't spend
372 // effort writing out snapshots of data that is already in the shard.
373 let persist_frontier = as_of;
374
375 Self {
376 sink_id,
377 worker_count,
378 desired_frontiers: OkErr::new_frontiers(),
379 persist_frontier,
380 persist_rx: Some(persist_rx),
381 next_append_worker: 0,
382 last_lower: None,
383 read_only,
384 }
385 }
386
387 fn trace<S: AsRef<str>>(&self, message: S) {
388 let message = message.as_ref();
389 trace!(
390 sink_id = %self.sink_id,
391 desired_frontier = ?self.desired_frontiers.frontier().elements(),
392 persist_frontier = ?self.persist_frontier.elements(),
393 last_lower = ?self.last_lower,
394 message,
395 );
396 }
397
398 fn advance_desired_ok_frontier(&mut self, frontier: AntichainRef<Timestamp>) {
399 if advance(&mut self.desired_frontiers.ok, frontier) {
400 self.trace("advanced `desired` ok frontier");
401 }
402 }
403
404 fn advance_desired_err_frontier(&mut self, frontier: AntichainRef<Timestamp>) {
405 if advance(&mut self.desired_frontiers.err, frontier) {
406 self.trace("advanced `desired` err frontier");
407 }
408 }
409
410 fn advance_persist_frontier(&mut self, frontier: AntichainRef<Timestamp>) {
411 if advance(&mut self.persist_frontier, frontier) {
412 self.trace("advanced `persist` frontier");
413 }
414 }
415
416 /// Drain persist frontier updates from the Tokio task.
417 ///
418 /// Frontiers from the `persist_watch` task are monotonically increasing, so only the
419 /// most recent one matters. We drain all queued messages and apply just the latest,
420 /// avoiding redundant `advance_persist_frontier`/`trace!` calls when several updates
421 /// arrived between activations.
422 ///
423 /// The task sends the empty frontier as its final message before exiting. Once
424 /// received, we drop the receiver.
425 fn drain_persist_rx(&mut self, shared_frontier: &RefCell<Antichain<Timestamp>>) {
426 let Some(mut rx) = self.persist_rx.take() else {
427 return;
428 };
429 let mut latest: Option<Antichain<Timestamp>> = None;
430 let mut closed = false;
431 loop {
432 match rx.try_recv() {
433 Ok(frontier) => {
434 let done = frontier.is_empty();
435 latest = Some(frontier);
436 if done {
437 closed = true;
438 break;
439 }
440 }
441 Err(mpsc::error::TryRecvError::Empty) => break,
442 Err(mpsc::error::TryRecvError::Disconnected) => {
443 panic!("mint persist_watch task unexpectedly gone");
444 }
445 }
446 }
447 if let Some(frontier) = latest {
448 shared_frontier.borrow_mut().clone_from(&frontier);
449 self.advance_persist_frontier(frontier.borrow());
450 }
451 if !closed {
452 self.persist_rx = Some(rx);
453 }
454 }
455
456 fn allow_writes(&mut self) {
457 if self.read_only {
458 self.read_only = false;
459 self.trace("switched to write mode");
460 }
461 }
462
463 fn maybe_mint_batch_description(&mut self) -> Option<BatchDescription> {
464 let desired_frontier = self.desired_frontiers.frontier();
465 let persist_frontier = &self.persist_frontier;
466
467 // We only mint new batch descriptions when:
468 // 1. We are _not_ in read-only mode.
469 // 2. The `desired` frontier is ahead of the `persist` frontier.
470 // 3. The `persist` frontier advanced since we last emitted a batch description.
471 let desired_ahead = PartialOrder::less_than(persist_frontier, desired_frontier);
472 let persist_advanced = self.last_lower.as_ref().map_or(true, |lower| {
473 PartialOrder::less_than(lower, persist_frontier)
474 });
475
476 if self.read_only || !desired_ahead || !persist_advanced {
477 return None;
478 }
479
480 let lower = persist_frontier.clone();
481 let upper = desired_frontier.clone();
482 let append_worker = self.next_append_worker;
483 let desc = BatchDescription::new(lower, upper, append_worker);
484
485 self.next_append_worker = (append_worker + 1) % self.worker_count;
486 self.last_lower = Some(desc.lower.clone());
487
488 self.trace(format!("minted batch description: {desc:?}"));
489 Some(desc)
490 }
491 }
492}
493
494/// Implementation of the `write` operator.
495mod write {
496 use super::*;
497
498 use mz_timely_util::activator::ArcActivator;
499
500 /// Commands sent from the Timely operator to the Tokio write task.
501 enum WriteCommand {
502 /// A coalesced batch of work gathered during a single operator activation.
503 ///
504 /// The Timely closure accumulates all updates observed across the four data inputs plus
505 /// any frontier advancement and forced-consolidation flag, and sends a single
506 /// `WriteCommand::Batch` per activation. This keeps the channel overhead independent of
507 /// the number of Timely chunks processed per activation.
508 Batch(BatchUpdates),
509 /// Write a batch with the given description. The task drains corrections and writes
510 /// them to persist.
511 WriteBatch(BatchDescription),
512 }
513
514 /// The payload of a coalesced [`WriteCommand::Batch`].
515 struct BatchUpdates {
516 /// Positive contributions from the `desired` ok input.
517 desired_ok: Vec<(Row, Timestamp, Diff)>,
518 /// Positive contributions from the `desired` err input.
519 desired_err: Vec<(DataflowErrorSer, Timestamp, Diff)>,
520 /// Negative contributions from the `persist` ok input.
521 persist_ok: Vec<(Row, Timestamp, Diff)>,
522 /// Negative contributions from the `persist` err input.
523 persist_err: Vec<(DataflowErrorSer, Timestamp, Diff)>,
524 /// The new persist frontier, if it advanced this activation.
525 persist_frontier: Option<Antichain<Timestamp>>,
526 /// Whether a consolidation of the corrections buffer should be forced.
527 force_consolidation: bool,
528 }
529
530 impl BatchUpdates {
531 fn new() -> Self {
532 Self {
533 desired_ok: Vec::new(),
534 desired_err: Vec::new(),
535 persist_ok: Vec::new(),
536 persist_err: Vec::new(),
537 persist_frontier: None,
538 force_consolidation: false,
539 }
540 }
541
542 /// Returns true if there is no work in this batch.
543 fn is_empty(&self) -> bool {
544 self.desired_ok.is_empty()
545 && self.desired_err.is_empty()
546 && self.persist_ok.is_empty()
547 && self.persist_err.is_empty()
548 && self.persist_frontier.is_none()
549 && !self.force_consolidation
550 }
551 }
552
553 /// A response from the Tokio write task back to the Timely operator.
554 struct WriteResponse {
555 /// The written batch, or `None` if the corrections buffer had no updates.
556 batch: Option<ProtoBatch>,
557 }
558
559 /// Render the `write` operator.
560 ///
561 /// The parameters passed in are:
562 /// * `sink_id`: The `GlobalId` of the sink export.
563 /// * `persist_api`: An object providing access to the output persist shard.
564 /// * `as_of`: The first time for which the sink may produce output.
565 /// * `desired`: The ok/err streams that should be sinked to persist.
566 /// * `persist`: The ok/err streams read back from the output persist shard.
567 /// * `descs`: The stream of batch descriptions produced by the `mint` operator.
568 pub fn render<'s>(
569 sink_id: GlobalId,
570 persist_api: PersistApi,
571 as_of: Antichain<Timestamp>,
572 desired: DesiredStreams<'s>,
573 persist: PersistStreams<'s>,
574 descs: DescsStream<'s>,
575 worker_config: Rc<ConfigSet>,
576 ) -> BatchesStream<'s> {
577 let scope = desired.ok.scope();
578 let worker_id = scope.index();
579
580 let name = operator_name(sink_id, "write");
581 let mut builder = OperatorBuilderRc::new(name, scope.clone());
582 let info = builder.operator_info();
583
584 // Set up correction buffer logging. CorrectionLogger is not Send (uses timely
585 // loggers), so the Tokio task uses ChannelLogging to send events back to the
586 // Timely thread for application by the CorrectionLogger.
587 let mut channel_logging = None;
588 let mut correction_logger = None;
589 if let (Some(compute_logger), Some(differential_logger)) = (
590 scope.worker().logger_for("materialize/compute"),
591 scope.worker().logger_for("differential/arrange"),
592 ) {
593 let operator_info = builder.operator_info();
594 let (tx, rx) = mpsc::unbounded_channel();
595 channel_logging = Some(ChannelLogging::new(tx));
596 correction_logger = Some(CorrectionLogger::new(
597 compute_logger,
598 differential_logger.into(),
599 operator_info.global_id,
600 operator_info.address.to_vec(),
601 rx,
602 ));
603 }
604
605 // It is important that we exchange the `desired` and `persist` data the same way, so
606 // updates that cancel each other out end up on the same worker.
607 let exchange_ok = |(d, _, _): &(Row, Timestamp, Diff)| d.hashed();
608 let exchange_err = |(d, _, _): &(DataflowErrorSer, Timestamp, Diff)| d.hashed();
609
610 // Data inputs are created before the output, so they are not connected to it.
611 let mut desired_ok_input = builder.new_input(desired.ok, Exchange::new(exchange_ok));
612 let mut desired_err_input = builder.new_input(desired.err, Exchange::new(exchange_err));
613 let mut persist_ok_input = builder.new_input(persist.ok, Exchange::new(exchange_ok));
614 let mut persist_err_input = builder.new_input(persist.err, Exchange::new(exchange_err));
615 let mut descs_input = builder.new_input(descs, Pipeline);
616
617 // Only descs (input 4) is connected to the batches output.
618 let (batches_output, batches_output_stream) =
619 builder.new_output_connection([(4, Antichain::from_elem(Default::default()))]);
620 let mut batches_output = OutputBuilder::from(batches_output);
621
622 // Obtain SinkMetrics synchronously from the persist client cache, rather than through
623 // a WriteHandle, to avoid async I/O on the Timely thread.
624 let sink_metrics = persist_api.persist_clients.metrics().sink.clone();
625
626 // Construct corrections on the Timely thread (reads ConfigSet), then move to the
627 // Tokio task. The ChannelLogging sends events back to the Timely thread.
628 let worker_metrics = sink_metrics.for_worker(worker_id);
629 let mut corrections: OkErr<Correction<Row>, Correction<DataflowErrorSer>> = OkErr::new(
630 Correction::new(
631 sink_metrics.clone(),
632 worker_metrics.clone(),
633 channel_logging.clone(),
634 &worker_config,
635 ),
636 Correction::new(
637 sink_metrics.clone(),
638 worker_metrics,
639 channel_logging,
640 &worker_config,
641 ),
642 );
643
644 // Read `MV_SINK_ADVANCE_PERSIST_FRONTIERS` exactly once and reuse the captured value for
645 // both the Tokio-side `corrections.since` initialization below and the Timely-side
646 // `persist_frontiers` initialization in `State::new`. Re-reading the dyncfg per init site
647 // would let the value flip between reads and produce the very inconsistency this fix
648 // addresses: `persist_frontiers = as_of` (gate open) with `corrections.since = MIN`
649 // (snapshot updates not advanced) reproduces the original `UpdateNotBeyondLower` panic.
650 let advance_persist_frontiers_at_startup =
651 MV_SINK_ADVANCE_PERSIST_FRONTIERS.get(&worker_config);
652
653 // Mirror the persist-frontier initialization performed by `State::new` below. With the
654 // flag enabled, `State` advances its Timely-side `persist_frontiers` to `as_of`, opening
655 // the `maybe_start_batch` write gate (`desc.lower <= persist_frontiers.frontier()`)
656 // immediately for the first description minted with `lower = as_of`. The corrections
657 // buffer lives on the Tokio task and only learns of frontier advancements through
658 // `WriteCommand::Batch { persist_frontier, .. }`, which the Timely closure populates only
659 // when an input frontier actually moves. On startup, the input frontiers begin at
660 // `Timestamp::MIN`, so no `Batch` carries `persist_frontier` until the persist input
661 // catches up — yet a `WriteBatch(desc)` with `desc.lower = as_of` can already be sent.
662 // Snapshot-replay updates inserted in the meantime stay at their original timestamps
663 // (`Correction::insert` rounds to `max(t, since)` and `since == MIN`), and slip into the
664 // batch, tripping persist's `UpdateNotBeyondLower` invariant. Advancing
665 // `corrections.since` here keeps the Tokio side in lockstep with the Timely side, the
666 // same invariant `materialized_view::write::State::new` upholds via
667 // `apply_persist_frontier_advancement`.
668 if advance_persist_frontiers_at_startup {
669 corrections.ok.advance_since(as_of.clone());
670 corrections.err.advance_since(as_of.clone());
671 }
672
673 // Channels for commands and responses.
674 let (cmd_tx, mut cmd_rx) = mpsc::unbounded_channel::<WriteCommand>();
675 let (resp_tx, mut resp_rx) = mpsc::unbounded_channel::<WriteResponse>();
676
677 // Spawn Tokio task that owns the WriteHandle and corrections buffer.
678 let (activator, activation_ack) = ArcActivator::new(scope, &info);
679 let write_task_handle = {
680 mz_ore::task::spawn(
681 || operator_name(sink_id, "write::batch_writer"),
682 async move {
683 let mut writer = persist_api.open_writer().await;
684
685 while let Some(cmd) = cmd_rx.recv().await {
686 apply_command(&mut corrections, &mut writer, cmd, &resp_tx).await;
687 // Activate the operator to drain logging events and process batch responses.
688 // ArcActivator suppresses redundant activations, so this is cheap.
689 activator.activate();
690 }
691 },
692 )
693 .abort_on_drop()
694 };
695
696 builder.build(move |capabilities| {
697 // We will use the data capabilities from the `descs` input to produce output, so no
698 // need to hold onto the initial capabilities.
699 drop(capabilities);
700
701 let mut state = State::new(
702 sink_id,
703 worker_id,
704 as_of,
705 advance_persist_frontiers_at_startup,
706 );
707
708 // Whether a batch write is currently in flight in the Tokio task.
709 let mut batch_in_flight: Option<(BatchDescription, Capability<Timestamp>)> = None;
710
711 // CorrectionLogger lives on the Timely thread and drains events from
712 // the channel each activation. On drop, it drains remaining events and
713 // retracts all logged state.
714 let mut correction_logger = correction_logger;
715
716 move |frontiers| {
717 // Keep task handle alive so it is aborted when the operator is dropped.
718 let _ = &write_task_handle;
719
720 // Acknowledge activation so the Tokio task can activate us again.
721 activation_ack.ack();
722
723 // Drain logging events from the Tokio task's ChannelLogging.
724 if let Some(logger) = &mut correction_logger {
725 logger.apply_events();
726 }
727 // Coalesce all work from this activation into a single command. This keeps
728 // per-chunk channel overhead low, which matters for hydration when many small
729 // Timely chunks arrive in a single activation sweep.
730 let mut batch = BatchUpdates::new();
731 desired_ok_input.for_each(|_cap, data| {
732 batch.desired_ok.append(data);
733 });
734 desired_err_input.for_each(|_cap, data| {
735 batch.desired_err.append(data);
736 });
737 persist_ok_input.for_each(|_cap, data| {
738 batch.persist_ok.append(data);
739 });
740 persist_err_input.for_each(|_cap, data| {
741 batch.persist_err.append(data);
742 });
743
744 // Accept batch descriptions.
745 descs_input.for_each(|cap, data| {
746 let cap = cap.retain(0);
747 for desc in data.drain(..) {
748 state.absorb_batch_description(desc, cap.clone());
749 }
750 });
751
752 // Track frontiers. Include the new persist frontier in the coalesced batch for
753 // `advance_since`.
754 state.advance_desired_ok_frontier(frontiers[0].frontier());
755 state.advance_desired_err_frontier(frontiers[1].frontier());
756 if state.advance_persist_ok_frontier(frontiers[2].frontier())
757 | state.advance_persist_err_frontier(frontiers[3].frontier())
758 {
759 batch.persist_frontier = Some(state.persist_frontiers.frontier().to_owned());
760 }
761 if state.should_force_consolidation() {
762 batch.force_consolidation = true;
763 }
764
765 if !batch.is_empty() {
766 cmd_tx
767 .send(WriteCommand::Batch(batch))
768 .expect("write task unexpectedly gone");
769 }
770
771 // Try to receive batch results from the Tokio task.
772 loop {
773 match resp_rx.try_recv() {
774 Ok(resp) => {
775 if let Some((desc, cap)) = batch_in_flight.take() {
776 if let Some(batch) = resp.batch {
777 let mut out = batches_output.activate();
778 out.session(&cap).give((desc, batch));
779 state.trace("wrote a batch");
780 } else {
781 state.trace("skipping empty batch");
782 }
783 }
784 }
785 Err(mpsc::error::TryRecvError::Empty) => break,
786 Err(mpsc::error::TryRecvError::Disconnected) => {
787 panic!("write task unexpectedly gone");
788 }
789 }
790 }
791
792 // If no batch in flight, try to write a new batch.
793 if batch_in_flight.is_none() {
794 if let Some((desc, cap)) = state.maybe_start_batch(&cmd_tx) {
795 batch_in_flight = Some((desc, cap));
796 }
797 }
798 }
799 });
800
801 batches_output_stream
802 }
803
804 /// Apply a single command to the task state.
805 ///
806 /// `desired` updates enter `corrections` as positive contributions and `persist` updates as
807 /// negative contributions, so the buffer contains `desired - persist`, i.e. the updates that
808 /// need to be written to bring the shard in line with `desired`.
809 async fn apply_command(
810 corrections: &mut OkErr<Correction<Row>, Correction<DataflowErrorSer>>,
811 writer: &mut WriteHandle<SourceData, (), Timestamp, StorageDiff>,
812 cmd: WriteCommand,
813 resp_tx: &mpsc::UnboundedSender<WriteResponse>,
814 ) {
815 match cmd {
816 WriteCommand::Batch(mut batch) => {
817 // Apply the same logical sequence of operations that the per-chunk commands
818 // used to: positive desired inserts, negated persist inserts, then optional
819 // frontier advancement and forced consolidation.
820 if !batch.desired_ok.is_empty() {
821 corrections.ok.insert(&mut batch.desired_ok);
822 }
823 if !batch.desired_err.is_empty() {
824 corrections.err.insert(&mut batch.desired_err);
825 }
826 if !batch.persist_ok.is_empty() {
827 corrections.ok.insert_negated(&mut batch.persist_ok);
828 }
829 if !batch.persist_err.is_empty() {
830 corrections.err.insert_negated(&mut batch.persist_err);
831 }
832 if let Some(frontier) = batch.persist_frontier {
833 // We will only emit times at or after the `persist` frontier, so now is a
834 // good time to advance the times of stashed updates.
835 corrections.ok.advance_since(frontier.clone());
836 corrections.err.advance_since(frontier);
837 }
838 if batch.force_consolidation {
839 corrections.ok.consolidate_at_since();
840 corrections.err.consolidate_at_since();
841 }
842 }
843 WriteCommand::WriteBatch(desc) => {
844 // Chain ok and err correction iterators directly, avoiding an
845 // intermediate Vec allocation.
846 let oks = corrections
847 .ok
848 .updates_before(&desc.upper)
849 .map(|(d, t, r)| ((SourceData(Ok(d)), ()), t, r.into_inner()));
850 let errs = corrections
851 .err
852 .updates_before(&desc.upper)
853 .map(|(d, t, r)| ((SourceData(Err(d.deserialize())), ()), t, r.into_inner()));
854 let mut updates = oks.chain(errs).peekable();
855
856 if updates.peek().is_none() {
857 // No corrections to write.
858 let _ = resp_tx.send(WriteResponse { batch: None });
859 return;
860 }
861
862 let batch = writer
863 .batch(updates, desc.lower, desc.upper)
864 .await
865 .expect("valid usage");
866 let proto_batch = batch.into_transmittable_batch();
867 if let Err(err) = resp_tx.send(WriteResponse {
868 batch: Some(proto_batch),
869 }) {
870 let batch =
871 writer.batch_from_transmittable_batch(err.0.batch.expect("just sent"));
872 batch.delete().await;
873 }
874 }
875 }
876 }
877
878 /// State maintained by the `write` operator on the Timely thread.
879 struct State {
880 sink_id: GlobalId,
881 worker_id: usize,
882 /// The frontiers of the `desired` inputs.
883 desired_frontiers: OkErr<Antichain<Timestamp>, Antichain<Timestamp>>,
884 /// The frontiers of the `persist` inputs.
885 ///
886 /// Note that this is _not_ the same as the write frontier of the output persist shard! It
887 /// usually is, but during snapshot processing, these frontiers will start at the shard's
888 /// read frontier, so they can be beyond its write frontier. This is important as it means
889 /// we must not discard batch descriptions based on these persist frontiers: A batch
890 /// description might still be valid even if its `lower` is before the persist frontiers we
891 /// observe.
892 persist_frontiers: OkErr<Antichain<Timestamp>, Antichain<Timestamp>>,
893 /// The current valid batch description and associated output capability, if any.
894 batch_description: Option<(BatchDescription, Capability<Timestamp>)>,
895 /// A request to force a consolidation of corrections once both `desired_frontiers` and
896 /// `persist_frontiers` become greater than the given frontier.
897 ///
898 /// Normally we force a consolidation whenever we write a batch, but there are periods
899 /// (like read-only mode) when that doesn't happen, and we need to manually force
900 /// consolidation instead. Currently this is only used to ensure we quickly get rid of the
901 /// snapshot updates.
902 force_consolidation_after: Option<Antichain<Timestamp>>,
903 }
904
905 impl State {
906 fn new(
907 sink_id: GlobalId,
908 worker_id: usize,
909 as_of: Antichain<Timestamp>,
910 advance_persist_frontiers_at_startup: bool,
911 ) -> Self {
912 // Force a consolidation of corrections after the snapshot updates have been fully
913 // processed, to ensure we get rid of those as quickly as possible.
914 let force_consolidation_after = Some(as_of.clone());
915
916 let mut state = Self {
917 sink_id,
918 worker_id,
919 desired_frontiers: OkErr::new_frontiers(),
920 persist_frontiers: OkErr::new_frontiers(),
921 batch_description: None,
922 force_consolidation_after,
923 };
924
925 // Immediately advance the persist frontier tracking to the `as_of`.
926 // This is important to ensure the persist sink doesn't get stuck if the output shard's
927 // initial frontier is less than the `as_of`. The `mint` operator first emits a batch
928 // description with `lower = as_of`, and the `write` operator only emits a batch when
929 // its observed persist frontier is >= the batch description's `lower`, which (assuming
930 // no other writers) would be never if we didn't advance the observed persist frontier
931 // to the `as_of`.
932 //
933 // The `must_use` bool returned by the advance helpers signals that a corresponding
934 // `corrections.advance_since` must be queued to the Tokio task. We drop it here
935 // because the Tokio-side `corrections.since` is initialized to the same `as_of` in
936 // `write::render` before the task is spawned (using the same captured flag value),
937 // keeping both sides in lockstep without an additional channel send.
938 if advance_persist_frontiers_at_startup {
939 let _ = state.advance_persist_ok_frontier(as_of.borrow());
940 let _ = state.advance_persist_err_frontier(as_of.borrow());
941 }
942
943 state
944 }
945
946 fn trace<S: AsRef<str>>(&self, message: S) {
947 let message = message.as_ref();
948 trace!(
949 sink_id = %self.sink_id,
950 worker = %self.worker_id,
951 desired_frontier = ?self.desired_frontiers.frontier().elements(),
952 persist_frontier = ?self.persist_frontiers.frontier().elements(),
953 batch_description = ?self.batch_description.as_ref().map(|(d, _)| d),
954 message,
955 );
956 }
957
958 fn advance_desired_ok_frontier(&mut self, frontier: AntichainRef<Timestamp>) {
959 if advance(&mut self.desired_frontiers.ok, frontier) {
960 self.trace("advanced `desired` ok frontier");
961 }
962 }
963
964 fn advance_desired_err_frontier(&mut self, frontier: AntichainRef<Timestamp>) {
965 if advance(&mut self.desired_frontiers.err, frontier) {
966 self.trace("advanced `desired` err frontier");
967 }
968 }
969
970 /// Returns true if the persist frontier advanced.
971 ///
972 /// The caller must propagate a `true` return value into the next `WriteCommand::Batch`'s
973 /// `persist_frontier` field so the Tokio task advances `corrections.since` accordingly.
974 /// Dropping the bool leaves the Tokio-side `since` lagging behind `persist_frontiers`,
975 /// which can let updates with timestamps below `desc.lower` slip into a written batch.
976 #[must_use = "advance_persist_ok_frontier's return value gates a `corrections.advance_since` send to the Tokio task"]
977 fn advance_persist_ok_frontier(&mut self, frontier: AntichainRef<Timestamp>) -> bool {
978 if advance(&mut self.persist_frontiers.ok, frontier) {
979 self.trace("advanced `persist` ok frontier");
980 true
981 } else {
982 false
983 }
984 }
985
986 /// Returns true if the persist frontier advanced.
987 ///
988 /// The caller must propagate a `true` return value into the next `WriteCommand::Batch`'s
989 /// `persist_frontier` field so the Tokio task advances `corrections.since` accordingly.
990 /// Dropping the bool leaves the Tokio-side `since` lagging behind `persist_frontiers`,
991 /// which can let updates with timestamps below `desc.lower` slip into a written batch.
992 #[must_use = "advance_persist_err_frontier's return value gates a `corrections.advance_since` send to the Tokio task"]
993 fn advance_persist_err_frontier(&mut self, frontier: AntichainRef<Timestamp>) -> bool {
994 if advance(&mut self.persist_frontiers.err, frontier) {
995 self.trace("advanced `persist` err frontier");
996 true
997 } else {
998 false
999 }
1000 }
1001
1002 /// Check if a forced consolidation should be triggered.
1003 fn should_force_consolidation(&mut self) -> bool {
1004 let Some(request) = &self.force_consolidation_after else {
1005 return false;
1006 };
1007
1008 let desired_frontier = self.desired_frontiers.frontier();
1009 let persist_frontier = self.persist_frontiers.frontier();
1010 if PartialOrder::less_than(request, desired_frontier)
1011 && PartialOrder::less_than(request, persist_frontier)
1012 {
1013 self.trace("requesting correction consolidation");
1014 self.force_consolidation_after = None;
1015 true
1016 } else {
1017 false
1018 }
1019 }
1020
1021 fn absorb_batch_description(&mut self, desc: BatchDescription, cap: Capability<Timestamp>) {
1022 // Enforce monotonicity: drop descriptions whose `lower` regresses below the one we
1023 // already hold. The `mint` operator only emits strictly increasing `lower`s
1024 // (invariant 1), so a regression means this description is outdated. We cannot use
1025 // `persist_frontiers` for the same check, because during snapshot processing those
1026 // frontiers can be ahead of the shard's write frontier and a still-valid description
1027 // may have a `lower` below them.
1028 if let Some((prev, _)) = &self.batch_description {
1029 if PartialOrder::less_than(&desc.lower, &prev.lower) {
1030 self.trace(format!("skipping outdated batch description: {desc:?}"));
1031 return;
1032 }
1033 }
1034
1035 self.batch_description = Some((desc, cap));
1036 self.trace("set batch description");
1037 }
1038
1039 /// Check if a batch can be written and send a write command to the Tokio task if so.
1040 fn maybe_start_batch(
1041 &mut self,
1042 cmd_tx: &mpsc::UnboundedSender<WriteCommand>,
1043 ) -> Option<(BatchDescription, Capability<Timestamp>)> {
1044 let (desc, _cap) = self.batch_description.as_ref()?;
1045
1046 // We can write a new batch if we have seen all `persist` updates before `lower` and
1047 // all `desired` updates before `upper`.
1048 let persist_ready =
1049 PartialOrder::less_equal(&desc.lower, self.persist_frontiers.frontier());
1050 let desired_ready =
1051 PartialOrder::less_equal(&desc.upper, self.desired_frontiers.frontier());
1052 if !persist_ready || !desired_ready {
1053 return None;
1054 }
1055
1056 self.trace("write batch description");
1057 let (desc, cap) = self.batch_description.take()?;
1058 cmd_tx
1059 .send(WriteCommand::WriteBatch(desc.clone()))
1060 .expect("write task unexpectedly gone");
1061 Some((desc, cap))
1062 }
1063 }
1064}
1065
1066/// Implementation of the `append` operator.
1067mod append {
1068 use super::*;
1069
1070 /// Commands sent from the Timely operator to the Tokio append task.
1071 enum AppendCommand {
1072 /// A new batch description has been received.
1073 Description(BatchDescription),
1074 /// A written batch has been received.
1075 Batch(ProtoBatch),
1076 /// The batches frontier has advanced.
1077 BatchesFrontier(Antichain<Timestamp>),
1078 }
1079
1080 /// Render the `append` operator.
1081 ///
1082 /// The parameters passed in are:
1083 /// * `sink_id`: The `GlobalId` of the sink export.
1084 /// * `persist_api`: An object providing access to the output persist shard.
1085 /// * `descs`: The stream of batch descriptions produced by the `mint` operator.
1086 /// * `batches`: The stream of written batches produced by the `write` operator.
1087 pub fn render<'s>(
1088 sink_id: GlobalId,
1089 persist_api: PersistApi,
1090 descs: DescsStream<'s>,
1091 batches: BatchesStream<'s>,
1092 ) {
1093 let scope = descs.scope();
1094 let worker_id = scope.index();
1095
1096 let name = operator_name(sink_id, "append");
1097 let mut builder = OperatorBuilderRc::new(name, scope.clone());
1098 let mut descs_input = builder.new_input(descs, Pipeline);
1099 let batch_exchange =
1100 Exchange::new(|(desc, _): &(BatchDescription, _)| u64::cast_from(desc.append_worker));
1101 let mut batches_input = builder.new_input(batches, batch_exchange);
1102
1103 // Channel for commands to the Tokio append task.
1104 let (cmd_tx, mut cmd_rx) = mpsc::unbounded_channel::<AppendCommand>();
1105
1106 // Spawn Tokio task that owns the append state machine.
1107 let append_task_handle =
1108 mz_ore::task::spawn(|| operator_name(sink_id, "append"), async move {
1109 let writer = persist_api.open_writer().await;
1110 let mut state = State::new(sink_id, worker_id, writer);
1111
1112 while let Some(cmd) = cmd_rx.recv().await {
1113 match cmd {
1114 AppendCommand::Description(desc) => {
1115 state.absorb_batch_description(desc).await;
1116 state.maybe_append_batches().await;
1117 }
1118 AppendCommand::Batch(batch) => {
1119 state.absorb_batch(batch).await;
1120 }
1121 AppendCommand::BatchesFrontier(frontier) => {
1122 state.advance_batches_frontier(frontier.borrow());
1123 state.maybe_append_batches().await;
1124 }
1125 }
1126 }
1127 })
1128 .abort_on_drop();
1129
1130 builder.build(move |_capabilities| {
1131 let mut prev_batches_frontier = Antichain::from_elem(Timestamp::MIN);
1132
1133 move |frontiers| {
1134 // Keep task handle alive so it is aborted when the operator is dropped.
1135 let _ = &append_task_handle;
1136
1137 // Forward batch descriptions to the Tokio task.
1138 descs_input.for_each(|_cap, data| {
1139 for desc in data.drain(..) {
1140 cmd_tx
1141 .send(AppendCommand::Description(desc))
1142 .expect("append task unexpectedly gone");
1143 }
1144 });
1145
1146 // Forward batches to the Tokio task.
1147 batches_input.for_each(|_cap, data| {
1148 for (_desc, batch) in data.drain(..) {
1149 // The batch description is only used for routing and we ignore it
1150 // here since we already get one from `descs_input`.
1151 cmd_tx
1152 .send(AppendCommand::Batch(batch))
1153 .expect("append task unexpectedly gone");
1154 }
1155 });
1156
1157 // Forward batches frontier advancements *after* the per-activation
1158 // `Description`/`Batch` sends above. The Tokio task drains commands FIFO and only
1159 // calls `maybe_append_batches` on `Description`/`BatchesFrontier`; if a frontier
1160 // advance arrived before its batches, the task could append an incomplete set.
1161 // See module-level docs for the full ordering invariant.
1162 let new_batches_frontier = frontiers[1].frontier();
1163 if PartialOrder::less_than(&prev_batches_frontier.borrow(), &new_batches_frontier) {
1164 prev_batches_frontier.clear();
1165 prev_batches_frontier.extend(new_batches_frontier.iter().cloned());
1166 cmd_tx
1167 .send(AppendCommand::BatchesFrontier(
1168 new_batches_frontier.to_owned(),
1169 ))
1170 .expect("append task unexpectedly gone");
1171 }
1172 }
1173 });
1174 }
1175
1176 /// State maintained by the `append` Tokio task.
1177 struct State {
1178 sink_id: GlobalId,
1179 worker_id: usize,
1180 persist_writer: WriteHandle<SourceData, (), Timestamp, StorageDiff>,
1181 /// The current input frontier of `batches`.
1182 batches_frontier: Antichain<Timestamp>,
1183 /// The greatest observed `lower` from both `descs` and `batches`.
1184 lower: Antichain<Timestamp>,
1185 /// The batch description for `lower`, if any.
1186 batch_description: Option<BatchDescription>,
1187 /// Batches received for `lower`.
1188 batches: Vec<Batch<SourceData, (), Timestamp, StorageDiff>>,
1189 }
1190
1191 impl State {
1192 fn new(
1193 sink_id: GlobalId,
1194 worker_id: usize,
1195 persist_writer: WriteHandle<SourceData, (), Timestamp, StorageDiff>,
1196 ) -> Self {
1197 Self {
1198 sink_id,
1199 worker_id,
1200 persist_writer,
1201 batches_frontier: Antichain::from_elem(Timestamp::MIN),
1202 lower: Antichain::from_elem(Timestamp::MIN),
1203 batch_description: None,
1204 batches: Default::default(),
1205 }
1206 }
1207
1208 fn trace<S: AsRef<str>>(&self, message: S) {
1209 let message = message.as_ref();
1210 trace!(
1211 sink_id = %self.sink_id,
1212 worker = %self.worker_id,
1213 batches_frontier = ?self.batches_frontier.elements(),
1214 lower = ?self.lower.elements(),
1215 batch_description = ?self.batch_description,
1216 message,
1217 );
1218 }
1219
1220 fn advance_batches_frontier(&mut self, frontier: AntichainRef<Timestamp>) {
1221 if advance(&mut self.batches_frontier, frontier) {
1222 self.trace("advanced `batches` frontier");
1223 }
1224 }
1225
1226 /// Advance the current `lower`.
1227 ///
1228 /// Discards all currently stashed batches and batch descriptions, assuming that they are
1229 /// now invalid.
1230 async fn advance_lower(&mut self, frontier: Antichain<Timestamp>) {
1231 assert!(PartialOrder::less_than(&self.lower, &frontier));
1232
1233 self.lower = frontier;
1234 self.batch_description = None;
1235
1236 // Remove stashed batches, cleaning up those we didn't append.
1237 for batch in self.batches.drain(..) {
1238 batch.delete().await;
1239 }
1240
1241 self.trace("advanced `lower`");
1242 }
1243
1244 /// Absorb the given batch description into the state, provided it is not outdated.
1245 async fn absorb_batch_description(&mut self, desc: BatchDescription) {
1246 if PartialOrder::less_than(&self.lower, &desc.lower) {
1247 self.advance_lower(desc.lower.clone()).await;
1248 } else if &self.lower != &desc.lower {
1249 self.trace(format!("skipping outdated batch description: {desc:?}"));
1250 return;
1251 }
1252
1253 if desc.append_worker == self.worker_id {
1254 self.batch_description = Some(desc);
1255 self.trace("set batch description");
1256 }
1257 }
1258
1259 /// Absorb the given batch into the state, provided it is not outdated.
1260 async fn absorb_batch(&mut self, batch: ProtoBatch) {
1261 let batch = self.persist_writer.batch_from_transmittable_batch(batch);
1262 if PartialOrder::less_than(&self.lower, batch.lower()) {
1263 self.advance_lower(batch.lower().clone()).await;
1264 } else if &self.lower != batch.lower() {
1265 self.trace(format!(
1266 "skipping outdated batch: ({:?}, {:?})",
1267 batch.lower().elements(),
1268 batch.upper().elements(),
1269 ));
1270
1271 // Ensure the batch's data gets properly cleaned up before dropping it.
1272 batch.delete().await;
1273 return;
1274 }
1275
1276 self.batches.push(batch);
1277 self.trace("absorbed a batch");
1278 }
1279
1280 async fn maybe_append_batches(&mut self) {
1281 let batches_complete = PartialOrder::less_than(&self.lower, &self.batches_frontier);
1282 if !batches_complete {
1283 return;
1284 }
1285
1286 let Some(desc) = self.batch_description.take() else {
1287 return;
1288 };
1289
1290 let new_lower = match self.append_batches(desc).await {
1291 Ok(shard_upper) => {
1292 self.trace("appended a batch");
1293 shard_upper
1294 }
1295 Err(shard_upper) => {
1296 // Failing the append is expected in the presence of concurrent replicas. There
1297 // is nothing special to do here: The self-correcting feedback mechanism
1298 // ensures that we observe the concurrent changes, compute their consequences,
1299 // and append them at a future time.
1300 self.trace(format!(
1301 "append failed due to `lower` mismatch: {:?}",
1302 shard_upper.elements(),
1303 ));
1304 shard_upper
1305 }
1306 };
1307
1308 self.advance_lower(new_lower).await;
1309 }
1310
1311 /// Append the current `batches` to the output shard.
1312 ///
1313 /// Returns whether the append was successful or not, and the current shard upper in either
1314 /// case.
1315 ///
1316 /// This method advances the shard upper to the batch `lower` if necessary. This is the
1317 /// mechanism that brings the shard upper to the sink as-of when appending the initial
1318 /// batch.
1319 ///
1320 /// An alternative mechanism for bringing the shard upper to the sink as-of would be making
1321 /// a single append at operator startup. The reason we are doing it here instead is that it
1322 /// simplifies the implementation of read-only mode. In read-only mode we have to defer any
1323 /// persist writes, including the initial upper bump. Having only a single place that
1324 /// performs writes makes it easy to ensure we are doing that correctly.
1325 async fn append_batches(
1326 &mut self,
1327 desc: BatchDescription,
1328 ) -> Result<Antichain<Timestamp>, Antichain<Timestamp>> {
1329 let (lower, upper) = (desc.lower, desc.upper);
1330 let mut to_append: Vec<_> = self.batches.iter_mut().collect();
1331
1332 loop {
1333 let result = self
1334 .persist_writer
1335 .compare_and_append_batch(&mut to_append, lower.clone(), upper.clone(), true)
1336 .await
1337 .expect("valid usage");
1338
1339 match result {
1340 Ok(()) => return Ok(upper),
1341 Err(mismatch) if PartialOrder::less_than(&mismatch.current, &lower) => {
1342 advance_shard_upper(&mut self.persist_writer, lower.clone()).await;
1343
1344 // At this point the shard's since and upper are likely the same, a state
1345 // that is likely to hit edge-cases in logic reasoning about frontiers.
1346 fail::fail_point!("mv_advanced_upper");
1347 }
1348 Err(mismatch) => return Err(mismatch.current),
1349 }
1350 }
1351 }
1352 }
1353
1354 /// Advance the frontier of the given writer's shard to at least the given `upper`.
1355 async fn advance_shard_upper(
1356 persist_writer: &mut WriteHandle<SourceData, (), Timestamp, StorageDiff>,
1357 upper: Antichain<Timestamp>,
1358 ) {
1359 let empty_updates: &[((SourceData, ()), Timestamp, StorageDiff)] = &[];
1360 let lower = Antichain::from_elem(Timestamp::MIN);
1361 persist_writer
1362 .append(empty_updates, lower, upper)
1363 .await
1364 .expect("valid usage")
1365 .expect("should always succeed");
1366 }
1367}