mz_compute/sink/materialized_view_v2.rs
1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Sync Timely operator implementation of the MV sink.
11//!
12//! This module provides an alternative implementation of `persist_sink` that uses sync Timely
13//! operators communicating with Tokio tasks via channels, instead of async Timely operators.
14//! Gated behind the `ENABLE_SYNC_MV_SINK` dyncfg.
15//!
16//! See the [main module](super::materialized_view) for the operator graph and design docs.
17//!
18//! ### Channel ordering requirements
19//!
20//! Each operator splits state across a Timely thread (which observes inputs and frontiers) and a
21//! Tokio task (which owns persist I/O state). They communicate via `mpsc` command channels, which
22//! preserve send order on a single sender. Each operator instance constructs its own
23//! `(tx, rx)` pair inside `render` and never clones the sender, so there is exactly one producer
24//! per channel — sends are totally ordered. Different worker instances of the same operator
25//! never share a channel, so cross-worker ordering is not a concern. The correctness of the
26//! operators relies on a few ordering invariants between the messages sent within a single Timely
27//! activation:
28//!
29//! * **`mint`**: the `persist_watch` Tokio task is the sole producer of persist-frontier updates
30//! and emits them in monotonically increasing order, terminated by the empty frontier. The
31//! Timely closure drains the receiver each activation; processing in receive order is therefore
32//! sufficient. No cross-channel ordering is needed because `mint` only has the one channel.
33//!
34//! * **`write`**: per-activation, the Timely closure first appends all observed input data into
35//! a single `WriteCommand::Batch` and only then sends a `WriteCommand::WriteBatch` (issued from
36//! `maybe_start_batch` after frontier checks). The Tokio task processes commands FIFO, so a
37//! `WriteBatch` is guaranteed to see every `Batch` from the same activation already applied to
38//! the corrections buffer. Reversing this order would let the task write a batch that is
39//! missing updates the Timely closure already observed.
40//!
41//! * **`append`**: per-activation, the Timely closure forwards messages in the order
42//! `Description` → `Batch` → `BatchesFrontier`. The first two carry the data the task needs
43//! to absorb; `BatchesFrontier` is the trigger that allows `maybe_append_batches` to fire.
44//! Sending `BatchesFrontier` *after* its corresponding `Batch` messages ensures the task does
45//! not append a batch description before all batches contributing to it have been absorbed.
46//! If the order were reversed, `maybe_append_batches` could fire on an incomplete `batches`
47//! set and miss writes.
48
49use std::any::Any;
50use std::cell::RefCell;
51use std::rc::Rc;
52use std::sync::Arc;
53
54use differential_dataflow::{Hashable, VecCollection};
55use mz_compute_types::dyncfgs::MV_SINK_ADVANCE_PERSIST_FRONTIERS;
56use mz_dyncfg::ConfigSet;
57use mz_ore::cast::CastFrom;
58use mz_persist_client::batch::{Batch, ProtoBatch};
59use mz_persist_client::write::WriteHandle;
60use mz_repr::{Diff, GlobalId, Row, Timestamp};
61use mz_storage_types::StorageDiff;
62use mz_storage_types::sources::SourceData;
63use timely::PartialOrder;
64use timely::dataflow::channels::pact::{Exchange, Pipeline};
65use timely::dataflow::operators::generic::OutputBuilder;
66use timely::dataflow::operators::generic::builder_rc::OperatorBuilder as OperatorBuilderRc;
67use timely::dataflow::operators::vec::Broadcast;
68use timely::dataflow::operators::{Capability, CapabilitySet};
69use timely::progress::Antichain;
70use timely::progress::frontier::AntichainRef;
71use tokio::sync::{mpsc, watch};
72use tracing::trace;
73
74use crate::compute_state::ComputeState;
75use crate::render::StartSignal;
76use crate::render::errors::DataflowErrorSer;
77use crate::sink::correction::{ChannelLogging, Correction, CorrectionLogger};
78use crate::sink::materialized_view::{
79 BatchDescription, BatchesStream, DescsStream, DesiredStreams, OkErr, PersistApi,
80 PersistStreams, SharedSinkFrontier, advance, operator_name, persist_source,
81};
82
83/// Renders an MV sink writing the given desired collection into the `target` persist collection.
84///
85/// This is the sync Timely operator implementation, using Tokio tasks for I/O.
86pub(super) fn persist_sink<'s>(
87 sink_id: GlobalId,
88 target: &mz_storage_types::controller::CollectionMetadata,
89 ok_collection: VecCollection<'s, Timestamp, Row, Diff>,
90 err_collection: VecCollection<'s, Timestamp, DataflowErrorSer, Diff>,
91 as_of: Antichain<Timestamp>,
92 compute_state: &mut ComputeState,
93 start_signal: StartSignal,
94 read_only_rx: watch::Receiver<bool>,
95) -> Rc<dyn Any> {
96 let scope = ok_collection.scope();
97 let desired = OkErr::new(ok_collection.inner, err_collection.inner);
98
99 // Read back the persist shard.
100 let (persist, persist_token) =
101 persist_source(scope, sink_id, target.clone(), compute_state, start_signal);
102
103 let persist_api = PersistApi {
104 persist_clients: Arc::clone(&compute_state.persist_clients),
105 collection: target.clone(),
106 shard_name: sink_id.to_string(),
107 purpose: format!("MV sink {sink_id}"),
108 };
109
110 let (desired, descs, sink_frontier) = mint::render(
111 sink_id,
112 persist_api.clone(),
113 as_of.clone(),
114 read_only_rx.clone(),
115 desired,
116 );
117
118 // Broadcast batch descriptions to all workers, regardless of whether or not they are
119 // responsible for the append, to give them a chance to clean up any outdated state they
120 // might still hold.
121 let descs = descs.broadcast();
122
123 let batches = write::render(
124 sink_id,
125 persist_api.clone(),
126 as_of,
127 desired,
128 persist,
129 descs.clone(),
130 read_only_rx,
131 Rc::clone(&compute_state.worker_config),
132 );
133
134 append::render(sink_id, persist_api, descs, batches);
135
136 // Report sink frontier updates to the `ComputeState`.
137 let collection = compute_state.expect_collection_mut(sink_id);
138 collection.sink_write_frontier = Some(sink_frontier);
139
140 Rc::new(persist_token)
141}
142
143/// Implementation of the `mint` operator.
144mod mint {
145 use super::*;
146 use timely::progress::frontier::AntichainRef;
147
148 /// Render the `mint` operator.
149 ///
150 /// The parameters passed in are:
151 /// * `sink_id`: The `GlobalId` of the sink export.
152 /// * `persist_api`: An object providing access to the output persist shard.
153 /// * `as_of`: The first time for which the sink may produce output.
154 /// * `read_only_rx`: A receiver that reports the sink is in read-only mode.
155 /// * `desired`: The ok/err streams that should be sinked to persist.
156 pub fn render<'s>(
157 sink_id: GlobalId,
158 persist_api: PersistApi,
159 as_of: Antichain<Timestamp>,
160 mut read_only_rx: watch::Receiver<bool>,
161 desired: DesiredStreams<'s>,
162 ) -> (DesiredStreams<'s>, DescsStream<'s>, SharedSinkFrontier) {
163 let scope = desired.ok.scope();
164 let worker_id = scope.index();
165 let worker_count = scope.peers();
166
167 // Determine the active worker for the mint operator.
168 let active_worker_id = usize::cast_from(sink_id.hashed()) % scope.peers();
169
170 let sink_frontier = Rc::new(RefCell::new(Antichain::from_elem(Timestamp::MIN)));
171 let shared_frontier = Rc::clone(&sink_frontier);
172
173 let name = operator_name(sink_id, "mint");
174 let mut builder = OperatorBuilderRc::new(name, scope.clone());
175 let info = builder.operator_info();
176
177 // Create outputs (before inputs, so no input connections yet).
178 let (ok_output, ok_stream) = builder.new_output();
179 let (err_output, err_stream) = builder.new_output();
180 let (desc_output, desc_stream) = builder.new_output();
181
182 let mut ok_output = OutputBuilder::from(ok_output);
183 let mut err_output = OutputBuilder::from(err_output);
184 let mut desc_output = OutputBuilder::from(desc_output);
185
186 // desired_ok -> output 0 (ok passthrough)
187 let mut desired_ok_input = builder.new_input_connection(
188 desired.ok,
189 Pipeline,
190 [(0, Antichain::from_elem(Default::default()))],
191 );
192 // desired_err -> output 1 (err passthrough)
193 let mut desired_err_input = builder.new_input_connection(
194 desired.err,
195 Pipeline,
196 [(1, Antichain::from_elem(Default::default()))],
197 );
198
199 // Set up background tasks and state for the active worker only.
200 let mut task_handles = Vec::new();
201 let read_only = *read_only_rx.borrow_and_update();
202 let mut state = None;
203 if worker_id == active_worker_id {
204 // Spawn a Tokio task to watch the persist shard's upper frontier.
205 //
206 // We collect the persist frontier from a write handle directly, rather than
207 // inspecting the `persist` stream, because the latter has two annoying glitches:
208 // (a) It starts at the shard's read frontier, not its write frontier.
209 // (b) It can lag behind if there are spikes in ingested data.
210 //
211 // The decoupling from the `persist` stream is load-bearing: that stream can fall
212 // arbitrarily behind the shard upper during snapshot replay or write spikes. Using
213 // it would delay both (1) the controller-visible sink frontier (`shared_frontier`),
214 // which previously caused a CrossJoin feature-bench regression where the controller
215 // held a finished MV dataflow open waiting for the empty frontier, and (2) the
216 // `state.persist_frontier` that gates batch-description minting, stalling mint until
217 // the read-back stream catches up. The goal isn't tick-granular descriptions per
218 // se — it's avoiding the stream-induced stall. See 5eab5ff896 for the original
219 // regression and rationale.
220 //
221 // The task sends the empty frontier as its final message before exiting. The
222 // operator drops `persist_rx` once it receives the empty frontier.
223 let (persist_tx, persist_rx) = mpsc::unbounded_channel();
224 let sync_activator = scope.worker().sync_activator_for(info.address.to_vec());
225 let handle = mz_ore::task::spawn(
226 || operator_name(sink_id, "mint::persist_watch"),
227 async move {
228 let mut writer = persist_api.open_writer().await;
229 let mut frontier = Antichain::from_elem(Timestamp::MIN);
230 loop {
231 writer.wait_for_upper_past(&frontier).await;
232 frontier = writer.upper().clone();
233 if persist_tx.send(frontier.clone()).is_err() {
234 return;
235 }
236 if sync_activator.activate().is_err() {
237 return;
238 }
239 if frontier.is_empty() {
240 return;
241 }
242 }
243 },
244 );
245 task_handles.push(handle.abort_on_drop());
246
247 // Spawn a Tokio task to wake the operator when read-only mode changes.
248 if read_only {
249 let sync_activator = scope.worker().sync_activator_for(info.address.to_vec());
250 let mut rx = read_only_rx.clone();
251 let handle = mz_ore::task::spawn(
252 || format!("mv_sink({sink_id})::mint::read_only_watch"),
253 async move {
254 let _ = rx.changed().await;
255 let _ = sync_activator.activate();
256 },
257 );
258 task_handles.push(handle.abort_on_drop());
259 }
260
261 state = Some(State::new(
262 sink_id,
263 worker_count,
264 as_of,
265 read_only,
266 persist_rx,
267 ));
268 }
269
270 builder.build(move |capabilities| {
271 // Passing through the `desired` streams only requires data capabilities, so we can
272 // immediately drop their initial capabilities here.
273 let [_, _, desc_cap]: [_; 3] =
274 capabilities.try_into().expect("one capability per output");
275
276 let mut cap_set = if state.is_some() {
277 Some(CapabilitySet::from_elem(desc_cap))
278 } else {
279 drop(desc_cap);
280 shared_frontier.borrow_mut().clear();
281 None
282 };
283
284 move |frontiers| {
285 // Keep task handles alive so they are aborted when the operator is dropped.
286 let _ = &task_handles;
287
288 // Pass through desired data.
289 let mut ok_out = ok_output.activate();
290 desired_ok_input.for_each(|cap, data| {
291 ok_out.session(&cap).give_container(data);
292 });
293 let mut err_out = err_output.activate();
294 desired_err_input.for_each(|cap, data| {
295 err_out.session(&cap).give_container(data);
296 });
297
298 let Some(state) = &mut state else {
299 // Non-active worker: just pass through data.
300 return;
301 };
302 let cap_set = cap_set.as_mut().unwrap();
303
304 // Track desired frontiers.
305 state.advance_desired_ok_frontier(frontiers[0].frontier());
306 state.advance_desired_err_frontier(frontiers[1].frontier());
307
308 state.drain_persist_rx(&shared_frontier);
309
310 // Check read-only mode.
311 if state.read_only && read_only_rx.has_changed().unwrap_or(false) {
312 if !*read_only_rx.borrow_and_update() {
313 state.allow_writes();
314 }
315 }
316
317 // Try to mint a batch description.
318 let mut desc_out = desc_output.activate();
319 if let Some(desc) = state.maybe_mint_batch_description() {
320 let lower_ts = *desc.lower.as_option().expect("not empty");
321 let cap = cap_set.delayed(&lower_ts);
322 desc_out.session(&cap).give(desc);
323
324 // We only emit strictly increasing `lower`s, so we can let our output frontier
325 // advance beyond the current `lower`.
326 cap_set.downgrade([lower_ts.step_forward()]);
327 } else {
328 // The next emitted `lower` will be at least the `persist` frontier, so we can
329 // advance our output frontier as far.
330 let _ = cap_set.try_downgrade(state.persist_frontier.iter());
331 }
332 }
333 });
334
335 let desired_output_streams = OkErr::new(ok_stream, err_stream);
336 (desired_output_streams, desc_stream, sink_frontier)
337 }
338
339 /// State maintained by the `mint` operator.
340 struct State {
341 sink_id: GlobalId,
342 /// The number of workers in the Timely cluster.
343 worker_count: usize,
344 /// The frontiers of the `desired` inputs.
345 desired_frontiers: OkErr<Antichain<Timestamp>, Antichain<Timestamp>>,
346 /// The frontier of the target persist shard.
347 persist_frontier: Antichain<Timestamp>,
348 /// Receiver for persist frontier updates from the Tokio persist_watch task.
349 ///
350 /// Dropped once the empty frontier is received (the task's shutdown signal).
351 persist_rx: Option<mpsc::UnboundedReceiver<Antichain<Timestamp>>>,
352 /// The append worker for the next batch description, chosen in round-robin fashion.
353 next_append_worker: usize,
354 /// The last `lower` we have emitted in a batch description, if any. Whenever the
355 /// `persist_frontier` moves beyond this frontier, we need to mint a new description.
356 last_lower: Option<Antichain<Timestamp>>,
357 /// Whether we are operating in read-only mode.
358 ///
359 /// In read-only mode, minting of batch descriptions is disabled.
360 read_only: bool,
361 }
362
363 impl State {
364 fn new(
365 sink_id: GlobalId,
366 worker_count: usize,
367 as_of: Antichain<Timestamp>,
368 read_only: bool,
369 persist_rx: mpsc::UnboundedReceiver<Antichain<Timestamp>>,
370 ) -> Self {
371 // Initializing `persist_frontier` to the `as_of` ensures that the first minted batch
372 // description will have a `lower` of `as_of` or beyond, and thus that we don't spend
373 // effort writing out snapshots of data that is already in the shard.
374 let persist_frontier = as_of;
375
376 Self {
377 sink_id,
378 worker_count,
379 desired_frontiers: OkErr::new_frontiers(),
380 persist_frontier,
381 persist_rx: Some(persist_rx),
382 next_append_worker: 0,
383 last_lower: None,
384 read_only,
385 }
386 }
387
388 fn trace<S: AsRef<str>>(&self, message: S) {
389 let message = message.as_ref();
390 trace!(
391 sink_id = %self.sink_id,
392 desired_frontier = ?self.desired_frontiers.frontier().elements(),
393 persist_frontier = ?self.persist_frontier.elements(),
394 last_lower = ?self.last_lower,
395 message,
396 );
397 }
398
399 fn advance_desired_ok_frontier(&mut self, frontier: AntichainRef<Timestamp>) {
400 if advance(&mut self.desired_frontiers.ok, frontier) {
401 self.trace("advanced `desired` ok frontier");
402 }
403 }
404
405 fn advance_desired_err_frontier(&mut self, frontier: AntichainRef<Timestamp>) {
406 if advance(&mut self.desired_frontiers.err, frontier) {
407 self.trace("advanced `desired` err frontier");
408 }
409 }
410
411 fn advance_persist_frontier(&mut self, frontier: AntichainRef<Timestamp>) {
412 if advance(&mut self.persist_frontier, frontier) {
413 self.trace("advanced `persist` frontier");
414 }
415 }
416
417 /// Drain persist frontier updates from the Tokio task.
418 ///
419 /// Frontiers from the `persist_watch` task are monotonically increasing, so only the
420 /// most recent one matters. We drain all queued messages and apply just the latest,
421 /// avoiding redundant `advance_persist_frontier`/`trace!` calls when several updates
422 /// arrived between activations.
423 ///
424 /// The task sends the empty frontier as its final message before exiting. Once
425 /// received, we drop the receiver.
426 fn drain_persist_rx(&mut self, shared_frontier: &RefCell<Antichain<Timestamp>>) {
427 let Some(mut rx) = self.persist_rx.take() else {
428 return;
429 };
430 let mut latest: Option<Antichain<Timestamp>> = None;
431 let mut closed = false;
432 loop {
433 match rx.try_recv() {
434 Ok(frontier) => {
435 let done = frontier.is_empty();
436 latest = Some(frontier);
437 if done {
438 closed = true;
439 break;
440 }
441 }
442 Err(mpsc::error::TryRecvError::Empty) => break,
443 Err(mpsc::error::TryRecvError::Disconnected) => {
444 panic!("mint persist_watch task unexpectedly gone");
445 }
446 }
447 }
448 if let Some(frontier) = latest {
449 shared_frontier.borrow_mut().clone_from(&frontier);
450 self.advance_persist_frontier(frontier.borrow());
451 }
452 if !closed {
453 self.persist_rx = Some(rx);
454 }
455 }
456
457 fn allow_writes(&mut self) {
458 if self.read_only {
459 self.read_only = false;
460 self.trace("switched to write mode");
461 }
462 }
463
464 fn maybe_mint_batch_description(&mut self) -> Option<BatchDescription> {
465 let desired_frontier = self.desired_frontiers.frontier();
466 let persist_frontier = &self.persist_frontier;
467
468 // We only mint new batch descriptions when:
469 // 1. We are _not_ in read-only mode.
470 // 2. The `desired` frontier is ahead of the `persist` frontier.
471 // 3. The `persist` frontier advanced since we last emitted a batch description.
472 let desired_ahead = PartialOrder::less_than(persist_frontier, desired_frontier);
473 let persist_advanced = self.last_lower.as_ref().map_or(true, |lower| {
474 PartialOrder::less_than(lower, persist_frontier)
475 });
476
477 if self.read_only || !desired_ahead || !persist_advanced {
478 return None;
479 }
480
481 let lower = persist_frontier.clone();
482 let upper = desired_frontier.clone();
483 let append_worker = self.next_append_worker;
484 let desc = BatchDescription::new(lower, upper, append_worker);
485
486 self.next_append_worker = (append_worker + 1) % self.worker_count;
487 self.last_lower = Some(desc.lower.clone());
488
489 self.trace(format!("minted batch description: {desc:?}"));
490 Some(desc)
491 }
492 }
493}
494
495/// Implementation of the `write` operator.
496mod write {
497 use super::*;
498
499 use mz_timely_util::activator::ArcActivator;
500
501 /// Commands sent from the Timely operator to the Tokio write task.
502 enum WriteCommand {
503 /// A coalesced batch of work gathered during a single operator activation.
504 ///
505 /// The Timely closure accumulates all updates observed across the four data inputs plus
506 /// any frontier advancement and forced-consolidation flag, and sends a single
507 /// `WriteCommand::Batch` per activation. This keeps the channel overhead independent of
508 /// the number of Timely chunks processed per activation.
509 Batch(BatchUpdates),
510 /// Write a batch with the given description. The task drains corrections and writes
511 /// them to persist.
512 WriteBatch(BatchDescription),
513 }
514
515 /// The payload of a coalesced [`WriteCommand::Batch`].
516 struct BatchUpdates {
517 /// Positive contributions from the `desired` ok input.
518 desired_ok: Vec<(Row, Timestamp, Diff)>,
519 /// Positive contributions from the `desired` err input.
520 desired_err: Vec<(DataflowErrorSer, Timestamp, Diff)>,
521 /// Negative contributions from the `persist` ok input.
522 persist_ok: Vec<(Row, Timestamp, Diff)>,
523 /// Negative contributions from the `persist` err input.
524 persist_err: Vec<(DataflowErrorSer, Timestamp, Diff)>,
525 /// The new persist frontier, if it advanced this activation.
526 persist_frontier: Option<Antichain<Timestamp>>,
527 /// Whether a consolidation of the corrections buffer should be forced.
528 force_consolidation: bool,
529 }
530
531 impl BatchUpdates {
532 fn new() -> Self {
533 Self {
534 desired_ok: Vec::new(),
535 desired_err: Vec::new(),
536 persist_ok: Vec::new(),
537 persist_err: Vec::new(),
538 persist_frontier: None,
539 force_consolidation: false,
540 }
541 }
542
543 /// Returns true if there is no work in this batch.
544 fn is_empty(&self) -> bool {
545 self.desired_ok.is_empty()
546 && self.desired_err.is_empty()
547 && self.persist_ok.is_empty()
548 && self.persist_err.is_empty()
549 && self.persist_frontier.is_none()
550 && !self.force_consolidation
551 }
552 }
553
554 /// A response from the Tokio write task back to the Timely operator.
555 struct WriteResponse {
556 /// The written batch, or `None` if the corrections buffer had no updates.
557 batch: Option<ProtoBatch>,
558 }
559
560 /// Render the `write` operator.
561 ///
562 /// The parameters passed in are:
563 /// * `sink_id`: The `GlobalId` of the sink export.
564 /// * `persist_api`: An object providing access to the output persist shard.
565 /// * `as_of`: The first time for which the sink may produce output.
566 /// * `desired`: The ok/err streams that should be sinked to persist.
567 /// * `persist`: The ok/err streams read back from the output persist shard.
568 /// * `descs`: The stream of batch descriptions produced by the `mint` operator.
569 pub fn render<'s>(
570 sink_id: GlobalId,
571 persist_api: PersistApi,
572 as_of: Antichain<Timestamp>,
573 desired: DesiredStreams<'s>,
574 persist: PersistStreams<'s>,
575 descs: DescsStream<'s>,
576 mut read_only_rx: watch::Receiver<bool>,
577 worker_config: Rc<ConfigSet>,
578 ) -> BatchesStream<'s> {
579 let scope = desired.ok.scope();
580 let worker_id = scope.index();
581
582 let name = operator_name(sink_id, "write");
583 let mut builder = OperatorBuilderRc::new(name, scope.clone());
584 let info = builder.operator_info();
585
586 // Set up correction buffer logging. CorrectionLogger is not Send (uses timely
587 // loggers), so the Tokio task uses ChannelLogging to send events back to the
588 // Timely thread for application by the CorrectionLogger.
589 let mut channel_logging = None;
590 let mut correction_logger = None;
591 if let (Some(compute_logger), Some(differential_logger)) = (
592 scope.worker().logger_for("materialize/compute"),
593 scope.worker().logger_for("differential/arrange"),
594 ) {
595 let operator_info = builder.operator_info();
596 let (tx, rx) = mpsc::unbounded_channel();
597 channel_logging = Some(ChannelLogging::new(tx));
598 correction_logger = Some(CorrectionLogger::new(
599 compute_logger,
600 differential_logger.into(),
601 operator_info.global_id,
602 operator_info.address.to_vec(),
603 rx,
604 ));
605 }
606
607 // It is important that we exchange the `desired` and `persist` data the same way, so
608 // updates that cancel each other out end up on the same worker.
609 let exchange_ok = |(d, _, _): &(Row, Timestamp, Diff)| d.hashed();
610 let exchange_err = |(d, _, _): &(DataflowErrorSer, Timestamp, Diff)| d.hashed();
611
612 // Data inputs are created before the output, so they are not connected to it.
613 let mut desired_ok_input = builder.new_input(desired.ok, Exchange::new(exchange_ok));
614 let mut desired_err_input = builder.new_input(desired.err, Exchange::new(exchange_err));
615 let mut persist_ok_input = builder.new_input(persist.ok, Exchange::new(exchange_ok));
616 let mut persist_err_input = builder.new_input(persist.err, Exchange::new(exchange_err));
617 let mut descs_input = builder.new_input(descs, Pipeline);
618
619 // Only descs (input 4) is connected to the batches output.
620 let (batches_output, batches_output_stream) =
621 builder.new_output_connection([(4, Antichain::from_elem(Default::default()))]);
622 let mut batches_output = OutputBuilder::from(batches_output);
623
624 // Obtain SinkMetrics synchronously from the persist client cache, rather than through
625 // a WriteHandle, to avoid async I/O on the Timely thread.
626 let sink_metrics = persist_api.persist_clients.metrics().sink.clone();
627
628 // Construct corrections on the Timely thread (reads ConfigSet), then move to the
629 // Tokio task. The ChannelLogging sends events back to the Timely thread.
630 let worker_metrics = sink_metrics.for_worker(worker_id);
631 let mut corrections: OkErr<Correction<Row>, Correction<DataflowErrorSer>> = OkErr::new(
632 Correction::new(
633 sink_metrics.clone(),
634 worker_metrics.clone(),
635 channel_logging.clone(),
636 &worker_config,
637 ),
638 Correction::new(
639 sink_metrics.clone(),
640 worker_metrics,
641 channel_logging,
642 &worker_config,
643 ),
644 );
645
646 // Read `MV_SINK_ADVANCE_PERSIST_FRONTIERS` exactly once and reuse the captured value for
647 // both the Tokio-side `corrections.since` initialization below and the Timely-side
648 // `persist_frontiers` initialization in `State::new`. Re-reading the dyncfg per init site
649 // would let the value flip between reads and produce the very inconsistency this fix
650 // addresses: `persist_frontiers = as_of` (gate open) with `corrections.since = MIN`
651 // (snapshot updates not advanced) reproduces the original `UpdateNotBeyondLower` panic.
652 let advance_persist_frontiers_at_startup =
653 MV_SINK_ADVANCE_PERSIST_FRONTIERS.get(&worker_config);
654
655 // Mirror the persist-frontier initialization performed by `State::new` below. With the
656 // flag enabled, `State` advances its Timely-side `persist_frontiers` to `as_of`, opening
657 // the `maybe_start_batch` write gate (`desc.lower <= persist_frontiers.frontier()`)
658 // immediately for the first description minted with `lower = as_of`. The corrections
659 // buffer lives on the Tokio task and only learns of frontier advancements through
660 // `WriteCommand::Batch { persist_frontier, .. }`, which the Timely closure populates only
661 // when an input frontier actually moves. On startup, the input frontiers begin at
662 // `Timestamp::MIN`, so no `Batch` carries `persist_frontier` until the persist input
663 // catches up — yet a `WriteBatch(desc)` with `desc.lower = as_of` can already be sent.
664 // Snapshot-replay updates inserted in the meantime stay at their original timestamps
665 // (`Correction::insert` rounds to `max(t, since)` and `since == MIN`), and slip into the
666 // batch, tripping persist's `UpdateNotBeyondLower` invariant. Advancing
667 // `corrections.since` here keeps the Tokio side in lockstep with the Timely side, the
668 // same invariant `materialized_view::write::State::new` upholds via
669 // `apply_persist_frontier_advancement`.
670 if advance_persist_frontiers_at_startup {
671 corrections.ok.advance_since(as_of.clone());
672 corrections.err.advance_since(as_of.clone());
673 }
674
675 // Channels for commands and responses.
676 let (cmd_tx, mut cmd_rx) = mpsc::unbounded_channel::<WriteCommand>();
677 let (resp_tx, mut resp_rx) = mpsc::unbounded_channel::<WriteResponse>();
678
679 // Spawn Tokio task that owns the WriteHandle and corrections buffer.
680 let (activator, activation_ack) = ArcActivator::new(scope, &info);
681 let write_task_handle = {
682 mz_ore::task::spawn(
683 || operator_name(sink_id, "write::batch_writer"),
684 async move {
685 let mut writer = persist_api.open_writer().await;
686
687 while let Some(cmd) = cmd_rx.recv().await {
688 apply_command(&mut corrections, &mut writer, cmd, &resp_tx).await;
689 // Activate the operator to drain logging events and process batch responses.
690 // ArcActivator suppresses redundant activations, so this is cheap.
691 activator.activate();
692 }
693 },
694 )
695 .abort_on_drop()
696 };
697
698 // In read-only mode, wake the operator when writes are allowed so the forced
699 // consolidation stops re-arming and the `WriteBatch` path takes over promptly. Without
700 // this the operator only learns of the transition on its next input activation, which an
701 // idle sink may not get for a while.
702 let read_only_watch_handle = (*read_only_rx.borrow()).then(|| {
703 let sync_activator = scope.worker().sync_activator_for(info.address.to_vec());
704 let mut rx = read_only_rx.clone();
705 mz_ore::task::spawn(
706 || operator_name(sink_id, "write::read_only_watch"),
707 async move {
708 let _ = rx.changed().await;
709 let _ = sync_activator.activate();
710 },
711 )
712 .abort_on_drop()
713 });
714
715 builder.build(move |capabilities| {
716 // We will use the data capabilities from the `descs` input to produce output, so no
717 // need to hold onto the initial capabilities.
718 drop(capabilities);
719
720 let read_only = *read_only_rx.borrow_and_update();
721 let mut state = State::new(
722 sink_id,
723 worker_id,
724 as_of,
725 advance_persist_frontiers_at_startup,
726 read_only,
727 );
728
729 // Whether a batch write is currently in flight in the Tokio task.
730 let mut batch_in_flight: Option<(BatchDescription, Capability<Timestamp>)> = None;
731
732 // CorrectionLogger lives on the Timely thread and drains events from
733 // the channel each activation. On drop, it drains remaining events and
734 // retracts all logged state.
735 let mut correction_logger = correction_logger;
736
737 move |frontiers| {
738 // Keep task handles alive so they are aborted when the operator is dropped.
739 let _ = &write_task_handle;
740 let _ = &read_only_watch_handle;
741
742 // Acknowledge activation so the Tokio task can activate us again.
743 activation_ack.ack();
744
745 // Drain logging events from the Tokio task's ChannelLogging.
746 if let Some(logger) = &mut correction_logger {
747 logger.apply_events();
748 }
749 // Coalesce all work from this activation into a single command. This keeps
750 // per-chunk channel overhead low, which matters for hydration when many small
751 // Timely chunks arrive in a single activation sweep.
752 let mut batch = BatchUpdates::new();
753 desired_ok_input.for_each(|_cap, data| {
754 batch.desired_ok.append(data);
755 });
756 desired_err_input.for_each(|_cap, data| {
757 batch.desired_err.append(data);
758 });
759 persist_ok_input.for_each(|_cap, data| {
760 batch.persist_ok.append(data);
761 });
762 persist_err_input.for_each(|_cap, data| {
763 batch.persist_err.append(data);
764 });
765
766 // Accept batch descriptions.
767 descs_input.for_each(|cap, data| {
768 let cap = cap.retain(0);
769 for desc in data.drain(..) {
770 state.absorb_batch_description(desc, cap.clone());
771 }
772 });
773
774 // Track frontiers. Include the new persist frontier in the coalesced batch for
775 // `advance_since`.
776 state.advance_desired_ok_frontier(frontiers[0].frontier());
777 state.advance_desired_err_frontier(frontiers[1].frontier());
778 if state.advance_persist_ok_frontier(frontiers[2].frontier())
779 | state.advance_persist_err_frontier(frontiers[3].frontier())
780 {
781 batch.persist_frontier = Some(state.persist_frontiers.frontier().to_owned());
782 }
783 // Track read-only mode so the forced consolidation stops re-arming once writes
784 // are allowed and the `WriteBatch` path takes over driving consolidation.
785 if state.read_only && read_only_rx.has_changed().unwrap_or(false) {
786 if !*read_only_rx.borrow_and_update() {
787 state.set_read_only(false);
788 }
789 }
790
791 if state.should_force_consolidation() {
792 batch.force_consolidation = true;
793 }
794
795 if !batch.is_empty() {
796 cmd_tx
797 .send(WriteCommand::Batch(batch))
798 .expect("write task unexpectedly gone");
799 }
800
801 // Try to receive batch results from the Tokio task.
802 loop {
803 match resp_rx.try_recv() {
804 Ok(resp) => {
805 if let Some((desc, cap)) = batch_in_flight.take() {
806 if let Some(batch) = resp.batch {
807 let mut out = batches_output.activate();
808 out.session(&cap).give((desc, batch));
809 state.trace("wrote a batch");
810 } else {
811 state.trace("skipping empty batch");
812 }
813 }
814 }
815 Err(mpsc::error::TryRecvError::Empty) => break,
816 Err(mpsc::error::TryRecvError::Disconnected) => {
817 panic!("write task unexpectedly gone");
818 }
819 }
820 }
821
822 // If no batch in flight, try to write a new batch.
823 if batch_in_flight.is_none() {
824 if let Some((desc, cap)) = state.maybe_start_batch(&cmd_tx) {
825 batch_in_flight = Some((desc, cap));
826 }
827 }
828 }
829 });
830
831 batches_output_stream
832 }
833
834 /// Apply a single command to the task state.
835 ///
836 /// `desired` updates enter `corrections` as positive contributions and `persist` updates as
837 /// negative contributions, so the buffer contains `desired - persist`, i.e. the updates that
838 /// need to be written to bring the shard in line with `desired`.
839 async fn apply_command(
840 corrections: &mut OkErr<Correction<Row>, Correction<DataflowErrorSer>>,
841 writer: &mut WriteHandle<SourceData, (), Timestamp, StorageDiff>,
842 cmd: WriteCommand,
843 resp_tx: &mpsc::UnboundedSender<WriteResponse>,
844 ) {
845 match cmd {
846 WriteCommand::Batch(mut batch) => {
847 // Apply the same logical sequence of operations that the per-chunk commands
848 // used to: positive desired inserts, negated persist inserts, then optional
849 // frontier advancement and forced consolidation.
850 if !batch.desired_ok.is_empty() {
851 corrections.ok.insert(&mut batch.desired_ok);
852 }
853 if !batch.desired_err.is_empty() {
854 corrections.err.insert(&mut batch.desired_err);
855 }
856 if !batch.persist_ok.is_empty() {
857 corrections.ok.insert_negated(&mut batch.persist_ok);
858 }
859 if !batch.persist_err.is_empty() {
860 corrections.err.insert_negated(&mut batch.persist_err);
861 }
862 if let Some(frontier) = batch.persist_frontier {
863 // We will only emit times at or after the `persist` frontier, so now is a
864 // good time to advance the times of stashed updates.
865 corrections.ok.advance_since(frontier.clone());
866 corrections.err.advance_since(frontier);
867 }
868 if batch.force_consolidation {
869 corrections.ok.consolidate_at_since();
870 corrections.err.consolidate_at_since();
871 }
872 }
873 WriteCommand::WriteBatch(desc) => {
874 // Chain ok and err correction iterators directly, avoiding an
875 // intermediate Vec allocation.
876 let oks = corrections
877 .ok
878 .updates_before(&desc.upper)
879 .map(|(d, t, r)| ((SourceData(Ok(d)), ()), t, r.into_inner()));
880 let errs = corrections
881 .err
882 .updates_before(&desc.upper)
883 .map(|(d, t, r)| ((SourceData(Err(d.deserialize())), ()), t, r.into_inner()));
884 let mut updates = oks.chain(errs).peekable();
885
886 if updates.peek().is_none() {
887 // No corrections to write.
888 let _ = resp_tx.send(WriteResponse { batch: None });
889 return;
890 }
891
892 let batch = writer
893 .batch(updates, desc.lower, desc.upper)
894 .await
895 .expect("valid usage");
896 let proto_batch = batch.into_transmittable_batch();
897 if let Err(err) = resp_tx.send(WriteResponse {
898 batch: Some(proto_batch),
899 }) {
900 let batch =
901 writer.batch_from_transmittable_batch(err.0.batch.expect("just sent"));
902 batch.delete().await;
903 }
904 }
905 }
906 }
907
908 /// State maintained by the `write` operator on the Timely thread.
909 struct State {
910 sink_id: GlobalId,
911 worker_id: usize,
912 /// The frontiers of the `desired` inputs.
913 desired_frontiers: OkErr<Antichain<Timestamp>, Antichain<Timestamp>>,
914 /// The frontiers of the `persist` inputs.
915 ///
916 /// Note that this is _not_ the same as the write frontier of the output persist shard! It
917 /// usually is, but during snapshot processing, these frontiers will start at the shard's
918 /// read frontier, so they can be beyond its write frontier. This is important as it means
919 /// we must not discard batch descriptions based on these persist frontiers: A batch
920 /// description might still be valid even if its `lower` is before the persist frontiers we
921 /// observe.
922 persist_frontiers: OkErr<Antichain<Timestamp>, Antichain<Timestamp>>,
923 /// The current valid batch description and associated output capability, if any.
924 batch_description: Option<(BatchDescription, Capability<Timestamp>)>,
925 /// A request to force a consolidation of corrections once both `desired_frontiers` and
926 /// `persist_frontiers` become greater than the given frontier.
927 ///
928 /// Normally we force a consolidation whenever we write a batch, but there are periods
929 /// (like read-only mode) when that doesn't happen, and we need to manually force
930 /// consolidation instead. In read-only mode this is re-armed after each firing so it
931 /// keeps sweeping forward as the frontiers advance (see `should_force_consolidation`).
932 force_consolidation_after: Option<Antichain<Timestamp>>,
933 /// Whether the sink is in read-only mode. While read-only the `write` operator mints no
934 /// batches, so the `WriteBatch` path never sweeps `consolidate_before(upper)` forward;
935 /// the forced consolidation stands in for it and is re-armed as long as this holds.
936 read_only: bool,
937 }
938
939 impl State {
940 fn new(
941 sink_id: GlobalId,
942 worker_id: usize,
943 as_of: Antichain<Timestamp>,
944 advance_persist_frontiers_at_startup: bool,
945 read_only: bool,
946 ) -> Self {
947 // Force a consolidation of corrections after the snapshot updates have been fully
948 // processed, to ensure we get rid of those as quickly as possible.
949 let force_consolidation_after = Some(as_of.clone());
950
951 let mut state = Self {
952 sink_id,
953 worker_id,
954 desired_frontiers: OkErr::new_frontiers(),
955 persist_frontiers: OkErr::new_frontiers(),
956 batch_description: None,
957 force_consolidation_after,
958 read_only,
959 };
960
961 // Immediately advance the persist frontier tracking to the `as_of`.
962 // This is important to ensure the persist sink doesn't get stuck if the output shard's
963 // initial frontier is less than the `as_of`. The `mint` operator first emits a batch
964 // description with `lower = as_of`, and the `write` operator only emits a batch when
965 // its observed persist frontier is >= the batch description's `lower`, which (assuming
966 // no other writers) would be never if we didn't advance the observed persist frontier
967 // to the `as_of`.
968 //
969 // The `must_use` bool returned by the advance helpers signals that a corresponding
970 // `corrections.advance_since` must be queued to the Tokio task. We drop it here
971 // because the Tokio-side `corrections.since` is initialized to the same `as_of` in
972 // `write::render` before the task is spawned (using the same captured flag value),
973 // keeping both sides in lockstep without an additional channel send.
974 if advance_persist_frontiers_at_startup {
975 let _ = state.advance_persist_ok_frontier(as_of.borrow());
976 let _ = state.advance_persist_err_frontier(as_of.borrow());
977 }
978
979 state
980 }
981
982 fn trace<S: AsRef<str>>(&self, message: S) {
983 let message = message.as_ref();
984 trace!(
985 sink_id = %self.sink_id,
986 worker = %self.worker_id,
987 desired_frontier = ?self.desired_frontiers.frontier().elements(),
988 persist_frontier = ?self.persist_frontiers.frontier().elements(),
989 batch_description = ?self.batch_description.as_ref().map(|(d, _)| d),
990 message,
991 );
992 }
993
994 fn advance_desired_ok_frontier(&mut self, frontier: AntichainRef<Timestamp>) {
995 if advance(&mut self.desired_frontiers.ok, frontier) {
996 self.trace("advanced `desired` ok frontier");
997 }
998 }
999
1000 fn advance_desired_err_frontier(&mut self, frontier: AntichainRef<Timestamp>) {
1001 if advance(&mut self.desired_frontiers.err, frontier) {
1002 self.trace("advanced `desired` err frontier");
1003 }
1004 }
1005
1006 /// Returns true if the persist frontier advanced.
1007 ///
1008 /// The caller must propagate a `true` return value into the next `WriteCommand::Batch`'s
1009 /// `persist_frontier` field so the Tokio task advances `corrections.since` accordingly.
1010 /// Dropping the bool leaves the Tokio-side `since` lagging behind `persist_frontiers`,
1011 /// which can let updates with timestamps below `desc.lower` slip into a written batch.
1012 #[must_use = "advance_persist_ok_frontier's return value gates a `corrections.advance_since` send to the Tokio task"]
1013 fn advance_persist_ok_frontier(&mut self, frontier: AntichainRef<Timestamp>) -> bool {
1014 if advance(&mut self.persist_frontiers.ok, frontier) {
1015 self.trace("advanced `persist` ok frontier");
1016 true
1017 } else {
1018 false
1019 }
1020 }
1021
1022 /// Returns true if the persist frontier advanced.
1023 ///
1024 /// The caller must propagate a `true` return value into the next `WriteCommand::Batch`'s
1025 /// `persist_frontier` field so the Tokio task advances `corrections.since` accordingly.
1026 /// Dropping the bool leaves the Tokio-side `since` lagging behind `persist_frontiers`,
1027 /// which can let updates with timestamps below `desc.lower` slip into a written batch.
1028 #[must_use = "advance_persist_err_frontier's return value gates a `corrections.advance_since` send to the Tokio task"]
1029 fn advance_persist_err_frontier(&mut self, frontier: AntichainRef<Timestamp>) -> bool {
1030 if advance(&mut self.persist_frontiers.err, frontier) {
1031 self.trace("advanced `persist` err frontier");
1032 true
1033 } else {
1034 false
1035 }
1036 }
1037
1038 /// Check if a forced consolidation should be triggered.
1039 fn should_force_consolidation(&mut self) -> bool {
1040 let Some(request) = &self.force_consolidation_after else {
1041 return false;
1042 };
1043
1044 let desired_frontier = self.desired_frontiers.frontier();
1045 let persist_frontier = self.persist_frontiers.frontier();
1046 if PartialOrder::less_than(request, desired_frontier)
1047 && PartialOrder::less_than(request, persist_frontier)
1048 {
1049 self.trace("requesting correction consolidation");
1050 // In read-only mode the sink mints no batches, so the `WriteBatch` path never
1051 // sweeps `consolidate_before(upper)` forward and a single forced consolidation
1052 // only covers the band below the `since` at the moment it fires, leaving the
1053 // forward region uncancelled for the whole read-only window (CLU-131). Re-arm at
1054 // the current persist frontier so the forced consolidation keeps sweeping forward
1055 // as `desired`/`persist` advance, mirroring the read-write path. Once writes are
1056 // allowed, the `WriteBatch` path takes over and we disarm.
1057 self.force_consolidation_after = if self.read_only {
1058 Some(persist_frontier.to_owned())
1059 } else {
1060 None
1061 };
1062 true
1063 } else {
1064 false
1065 }
1066 }
1067
1068 /// Update read-only mode. While read-only the forced consolidation re-arms itself; once
1069 /// writes are allowed the still-armed request fires one final time and then disarms, after
1070 /// which the `WriteBatch` path drives consolidation.
1071 fn set_read_only(&mut self, read_only: bool) {
1072 self.read_only = read_only;
1073 }
1074
1075 fn absorb_batch_description(&mut self, desc: BatchDescription, cap: Capability<Timestamp>) {
1076 // Enforce monotonicity: drop descriptions whose `lower` regresses below the one we
1077 // already hold. The `mint` operator only emits strictly increasing `lower`s
1078 // (invariant 1), so a regression means this description is outdated. We cannot use
1079 // `persist_frontiers` for the same check, because during snapshot processing those
1080 // frontiers can be ahead of the shard's write frontier and a still-valid description
1081 // may have a `lower` below them.
1082 if let Some((prev, _)) = &self.batch_description {
1083 if PartialOrder::less_than(&desc.lower, &prev.lower) {
1084 self.trace(format!("skipping outdated batch description: {desc:?}"));
1085 return;
1086 }
1087 }
1088
1089 self.batch_description = Some((desc, cap));
1090 self.trace("set batch description");
1091 }
1092
1093 /// Check if a batch can be written and send a write command to the Tokio task if so.
1094 fn maybe_start_batch(
1095 &mut self,
1096 cmd_tx: &mpsc::UnboundedSender<WriteCommand>,
1097 ) -> Option<(BatchDescription, Capability<Timestamp>)> {
1098 let (desc, _cap) = self.batch_description.as_ref()?;
1099
1100 // We can write a new batch if we have seen all `persist` updates before `lower` and
1101 // all `desired` updates before `upper`.
1102 let persist_ready =
1103 PartialOrder::less_equal(&desc.lower, self.persist_frontiers.frontier());
1104 let desired_ready =
1105 PartialOrder::less_equal(&desc.upper, self.desired_frontiers.frontier());
1106 if !persist_ready || !desired_ready {
1107 return None;
1108 }
1109
1110 self.trace("write batch description");
1111 let (desc, cap) = self.batch_description.take()?;
1112 cmd_tx
1113 .send(WriteCommand::WriteBatch(desc.clone()))
1114 .expect("write task unexpectedly gone");
1115 Some((desc, cap))
1116 }
1117 }
1118}
1119
1120/// Implementation of the `append` operator.
1121mod append {
1122 use super::*;
1123
1124 /// Commands sent from the Timely operator to the Tokio append task.
1125 enum AppendCommand {
1126 /// A new batch description has been received.
1127 Description(BatchDescription),
1128 /// A written batch has been received.
1129 Batch(ProtoBatch),
1130 /// The batches frontier has advanced.
1131 BatchesFrontier(Antichain<Timestamp>),
1132 }
1133
1134 /// Render the `append` operator.
1135 ///
1136 /// The parameters passed in are:
1137 /// * `sink_id`: The `GlobalId` of the sink export.
1138 /// * `persist_api`: An object providing access to the output persist shard.
1139 /// * `descs`: The stream of batch descriptions produced by the `mint` operator.
1140 /// * `batches`: The stream of written batches produced by the `write` operator.
1141 pub fn render<'s>(
1142 sink_id: GlobalId,
1143 persist_api: PersistApi,
1144 descs: DescsStream<'s>,
1145 batches: BatchesStream<'s>,
1146 ) {
1147 let scope = descs.scope();
1148 let worker_id = scope.index();
1149
1150 let name = operator_name(sink_id, "append");
1151 let mut builder = OperatorBuilderRc::new(name, scope.clone());
1152 let mut descs_input = builder.new_input(descs, Pipeline);
1153 let batch_exchange =
1154 Exchange::new(|(desc, _): &(BatchDescription, _)| u64::cast_from(desc.append_worker));
1155 let mut batches_input = builder.new_input(batches, batch_exchange);
1156
1157 // Channel for commands to the Tokio append task.
1158 let (cmd_tx, mut cmd_rx) = mpsc::unbounded_channel::<AppendCommand>();
1159
1160 // Spawn Tokio task that owns the append state machine.
1161 let append_task_handle =
1162 mz_ore::task::spawn(|| operator_name(sink_id, "append"), async move {
1163 let writer = persist_api.open_writer().await;
1164 let mut state = State::new(sink_id, worker_id, writer);
1165
1166 while let Some(cmd) = cmd_rx.recv().await {
1167 match cmd {
1168 AppendCommand::Description(desc) => {
1169 state.absorb_batch_description(desc).await;
1170 state.maybe_append_batches().await;
1171 }
1172 AppendCommand::Batch(batch) => {
1173 state.absorb_batch(batch).await;
1174 }
1175 AppendCommand::BatchesFrontier(frontier) => {
1176 state.advance_batches_frontier(frontier.borrow());
1177 state.maybe_append_batches().await;
1178 }
1179 }
1180 }
1181 })
1182 .abort_on_drop();
1183
1184 builder.build(move |_capabilities| {
1185 let mut prev_batches_frontier = Antichain::from_elem(Timestamp::MIN);
1186
1187 move |frontiers| {
1188 // Keep task handle alive so it is aborted when the operator is dropped.
1189 let _ = &append_task_handle;
1190
1191 // Forward batch descriptions to the Tokio task.
1192 descs_input.for_each(|_cap, data| {
1193 for desc in data.drain(..) {
1194 cmd_tx
1195 .send(AppendCommand::Description(desc))
1196 .expect("append task unexpectedly gone");
1197 }
1198 });
1199
1200 // Forward batches to the Tokio task.
1201 batches_input.for_each(|_cap, data| {
1202 for (_desc, batch) in data.drain(..) {
1203 // The batch description is only used for routing and we ignore it
1204 // here since we already get one from `descs_input`.
1205 cmd_tx
1206 .send(AppendCommand::Batch(batch))
1207 .expect("append task unexpectedly gone");
1208 }
1209 });
1210
1211 // Forward batches frontier advancements *after* the per-activation
1212 // `Description`/`Batch` sends above. The Tokio task drains commands FIFO and only
1213 // calls `maybe_append_batches` on `Description`/`BatchesFrontier`; if a frontier
1214 // advance arrived before its batches, the task could append an incomplete set.
1215 // See module-level docs for the full ordering invariant.
1216 let new_batches_frontier = frontiers[1].frontier();
1217 if PartialOrder::less_than(&prev_batches_frontier.borrow(), &new_batches_frontier) {
1218 prev_batches_frontier.clear();
1219 prev_batches_frontier.extend(new_batches_frontier.iter().cloned());
1220 cmd_tx
1221 .send(AppendCommand::BatchesFrontier(
1222 new_batches_frontier.to_owned(),
1223 ))
1224 .expect("append task unexpectedly gone");
1225 }
1226 }
1227 });
1228 }
1229
1230 /// State maintained by the `append` Tokio task.
1231 struct State {
1232 sink_id: GlobalId,
1233 worker_id: usize,
1234 persist_writer: WriteHandle<SourceData, (), Timestamp, StorageDiff>,
1235 /// The current input frontier of `batches`.
1236 batches_frontier: Antichain<Timestamp>,
1237 /// The greatest observed `lower` from both `descs` and `batches`.
1238 lower: Antichain<Timestamp>,
1239 /// The batch description for `lower`, if any.
1240 batch_description: Option<BatchDescription>,
1241 /// Batches received for `lower`.
1242 batches: Vec<Batch<SourceData, (), Timestamp, StorageDiff>>,
1243 }
1244
1245 impl State {
1246 fn new(
1247 sink_id: GlobalId,
1248 worker_id: usize,
1249 persist_writer: WriteHandle<SourceData, (), Timestamp, StorageDiff>,
1250 ) -> Self {
1251 Self {
1252 sink_id,
1253 worker_id,
1254 persist_writer,
1255 batches_frontier: Antichain::from_elem(Timestamp::MIN),
1256 lower: Antichain::from_elem(Timestamp::MIN),
1257 batch_description: None,
1258 batches: Default::default(),
1259 }
1260 }
1261
1262 fn trace<S: AsRef<str>>(&self, message: S) {
1263 let message = message.as_ref();
1264 trace!(
1265 sink_id = %self.sink_id,
1266 worker = %self.worker_id,
1267 batches_frontier = ?self.batches_frontier.elements(),
1268 lower = ?self.lower.elements(),
1269 batch_description = ?self.batch_description,
1270 message,
1271 );
1272 }
1273
1274 fn advance_batches_frontier(&mut self, frontier: AntichainRef<Timestamp>) {
1275 if advance(&mut self.batches_frontier, frontier) {
1276 self.trace("advanced `batches` frontier");
1277 }
1278 }
1279
1280 /// Advance the current `lower`.
1281 ///
1282 /// Discards all currently stashed batches and batch descriptions, assuming that they are
1283 /// now invalid.
1284 async fn advance_lower(&mut self, frontier: Antichain<Timestamp>) {
1285 assert!(PartialOrder::less_than(&self.lower, &frontier));
1286
1287 self.lower = frontier;
1288 self.batch_description = None;
1289
1290 // Remove stashed batches, cleaning up those we didn't append.
1291 for batch in self.batches.drain(..) {
1292 batch.delete().await;
1293 }
1294
1295 self.trace("advanced `lower`");
1296 }
1297
1298 /// Absorb the given batch description into the state, provided it is not outdated.
1299 async fn absorb_batch_description(&mut self, desc: BatchDescription) {
1300 if PartialOrder::less_than(&self.lower, &desc.lower) {
1301 self.advance_lower(desc.lower.clone()).await;
1302 } else if &self.lower != &desc.lower {
1303 self.trace(format!("skipping outdated batch description: {desc:?}"));
1304 return;
1305 }
1306
1307 if desc.append_worker == self.worker_id {
1308 self.batch_description = Some(desc);
1309 self.trace("set batch description");
1310 }
1311 }
1312
1313 /// Absorb the given batch into the state, provided it is not outdated.
1314 async fn absorb_batch(&mut self, batch: ProtoBatch) {
1315 let batch = self.persist_writer.batch_from_transmittable_batch(batch);
1316 if PartialOrder::less_than(&self.lower, batch.lower()) {
1317 self.advance_lower(batch.lower().clone()).await;
1318 } else if &self.lower != batch.lower() {
1319 self.trace(format!(
1320 "skipping outdated batch: ({:?}, {:?})",
1321 batch.lower().elements(),
1322 batch.upper().elements(),
1323 ));
1324
1325 // Ensure the batch's data gets properly cleaned up before dropping it.
1326 batch.delete().await;
1327 return;
1328 }
1329
1330 self.batches.push(batch);
1331 self.trace("absorbed a batch");
1332 }
1333
1334 async fn maybe_append_batches(&mut self) {
1335 let batches_complete = PartialOrder::less_than(&self.lower, &self.batches_frontier);
1336 if !batches_complete {
1337 return;
1338 }
1339
1340 let Some(desc) = self.batch_description.take() else {
1341 return;
1342 };
1343
1344 let new_lower = match self.append_batches(desc).await {
1345 Ok(shard_upper) => {
1346 self.trace("appended a batch");
1347 shard_upper
1348 }
1349 Err(shard_upper) => {
1350 // Failing the append is expected in the presence of concurrent replicas. There
1351 // is nothing special to do here: The self-correcting feedback mechanism
1352 // ensures that we observe the concurrent changes, compute their consequences,
1353 // and append them at a future time.
1354 self.trace(format!(
1355 "append failed due to `lower` mismatch: {:?}",
1356 shard_upper.elements(),
1357 ));
1358 shard_upper
1359 }
1360 };
1361
1362 self.advance_lower(new_lower).await;
1363 }
1364
1365 /// Append the current `batches` to the output shard.
1366 ///
1367 /// Returns whether the append was successful or not, and the current shard upper in either
1368 /// case.
1369 ///
1370 /// This method advances the shard upper to the batch `lower` if necessary. This is the
1371 /// mechanism that brings the shard upper to the sink as-of when appending the initial
1372 /// batch.
1373 ///
1374 /// An alternative mechanism for bringing the shard upper to the sink as-of would be making
1375 /// a single append at operator startup. The reason we are doing it here instead is that it
1376 /// simplifies the implementation of read-only mode. In read-only mode we have to defer any
1377 /// persist writes, including the initial upper bump. Having only a single place that
1378 /// performs writes makes it easy to ensure we are doing that correctly.
1379 async fn append_batches(
1380 &mut self,
1381 desc: BatchDescription,
1382 ) -> Result<Antichain<Timestamp>, Antichain<Timestamp>> {
1383 let (lower, upper) = (desc.lower, desc.upper);
1384 let mut to_append: Vec<_> = self.batches.iter_mut().collect();
1385
1386 loop {
1387 let result = self
1388 .persist_writer
1389 .compare_and_append_batch(&mut to_append, lower.clone(), upper.clone(), true)
1390 .await
1391 .expect("valid usage");
1392
1393 match result {
1394 Ok(()) => return Ok(upper),
1395 Err(mismatch) if PartialOrder::less_than(&mismatch.current, &lower) => {
1396 advance_shard_upper(&mut self.persist_writer, lower.clone()).await;
1397
1398 // At this point the shard's since and upper are likely the same, a state
1399 // that is likely to hit edge-cases in logic reasoning about frontiers.
1400 fail::fail_point!("mv_advanced_upper");
1401 }
1402 Err(mismatch) => return Err(mismatch.current),
1403 }
1404 }
1405 }
1406 }
1407
1408 /// Advance the frontier of the given writer's shard to at least the given `upper`.
1409 async fn advance_shard_upper(
1410 persist_writer: &mut WriteHandle<SourceData, (), Timestamp, StorageDiff>,
1411 upper: Antichain<Timestamp>,
1412 ) {
1413 let empty_updates: &[((SourceData, ()), Timestamp, StorageDiff)] = &[];
1414 let lower = Antichain::from_elem(Timestamp::MIN);
1415 persist_writer
1416 .append(empty_updates, lower, upper)
1417 .await
1418 .expect("valid usage")
1419 .expect("should always succeed");
1420 }
1421}