mz_compute/sink/materialized_view.rs
1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! A dataflow sink that writes input records to a persist shard.
11//!
12//! This implementation is both parallel and self-correcting.
13//!
14//! * parallel: Multiple workers can participate in writing updates for the same times, letting
15//! sink throughput scale with the number of workers allocated to the replica.
16//! * self-correcting: The sink continually compares the contents of the persist shard with the
17//! contents of the input collection and writes down the difference. If the persist shard ends
18//! up with undesired contents for any reason, this is corrected the next time the sink manages
19//! to append to the shard.
20//!
21//! ### Operators
22//!
23//! The persist sink consists of a graph of operators.
24//!
25//! desired persist <---------------.
26//! | | |
27//! | | |
28//! |---------------------. | |
29//! | | | |
30//! | | | |
31//! v v v |
32//! +--------+ +--------+ +--------+
33//! | mint | --descs-.--> | write | --batches--> | append |
34//! +--------+ \ +--------+ .-> +--------+
35//! \_____________________/
36//!
37//! * `mint` mints batch descriptions, i.e., `(lower, upper)` bounds of batches that should be
38//! written. The persist API requires that all workers write batches with the same bounds, so
39//! they can be appended as a single logical batch. To ensure this, the `mint` operator only
40//! runs on a single worker that broadcasts minted descriptions to all workers. Batch bounds are
41//! picked based on the frontiers of the `desired` stream and the output persist shard.
42//! * `write` stages batch data in persist, based on the batch descriptions received from the
43//! `mint` operator, but without appending it to the persist shard. This is a multi-worker
44//! operator, with each worker writing batches of the data that arrives at its local inputs. To
45//! do so it reads from the `desired` and `persist` streams and produces the difference between
46//! them to write back out, ensuring that the final contents of the persist shard match
47//! `desired`.
48//! * `append` appends the batches minted by `mint` and written by `write` to the persist shard.
49//! This is a multi-worker operator, where workers are responsible for different subsets of
50//! batch descriptions. If a worker is responsible for a given batch description, it waits for
51//! all workers to stage their batches for that batch description, then appends all the batches
52//! together as a single logical batch.
53//!
54//! Note that while the above graph suggests that `mint` and `write` both receive copies of the
55//! `desired` stream, the actual implementation passes that stream through `mint` and lets `write`
56//! read the passed-through stream, to avoid cloning data.
57//!
58//! Also note that the `append` operator's implementation would perhaps be more natural as a
59//! single-worker implementation. The purpose of sharing the work between all workers is to avoid a
60//! work imbalance where one worker is overloaded (doing both appends and the consequent persist
61//! maintenance work) while others are comparatively idle.
62//!
63//! The persist sink is written to be robust to the presence of other conflicting instances (e.g.
64//! from other replicas) writing to the same persist shard. Each of the three operators needs to be
65//! able to handle conflicting writes that unexpectedly change the contents of the output persist
66//! shard.
67//!
68//! ### Frontiers
69//!
70//! The `desired` frontier tracks the progress of the upstream dataflow, but may be rounded up to
71//! the next refresh time for dataflows that follow a refresh schedule other than "on commit".
72//!
73//! The `persist` frontier tracks the `upper` frontier of the target persist shard, with one
74//! exception: When the `persist_source` that reads back the shard is rendered, it will start
75//! reading at its `since` frontier. So if the shard's `since` is initially greater than its
76//! `upper`, the `persist` frontier too will be in advance of the shard `upper`, until the `upper`
77//! has caught up. To avoid getting confused by this edge case, the `mint` operator does not use
78//! the `persist` stream to observe the shard frontier but keeps its own `WriteHandle` instead.
79//!
80//! The `descs` frontier communicates which `lower` bounds may still be emitted in batch
81//! descriptions. All future batch descriptions will have a `lower` that is greater or equal to the
82//! current `descs` frontier.
83//!
84//! The `batches` frontier communicates for which `lower` bounds batches may still be written. All
85//! batches for descriptions with `lower`s less than the current `batches` frontier have already
86//! been written.
87//!
88//! ### Invariants
89//!
90//! The implementation upholds several invariants that can be relied upon to simplify the
91//! implementation:
92//!
93//! 1. `lower`s in minted batch descriptions are unique and strictly increasing. That is, the
94//! `mint` operator will never mint the same `lower` twice and a minted `lower` is always
95//! greater than any previously minted ones.
96//! 2. `upper`s in minted batch descriptions are monotonically increasing.
97//! 3. From (1) follows that there is always at most one "valid" batch description in flight in
98//! the operator graph. "Valid" here means that the described batch can be appended to the
99//! persist shard.
100//!
101//! The main simplification these invariants allow is that operators only need to keep track of the
102//! most recent batch description and/or `lower`. Previous batch descriptions are not valid
103//! anymore, so there is no reason to hold any state or perform any work in support of them.
104//!
105//! ### Read-only Mode
106//!
107//! The persist sink can optionally be initialized in read-only mode. In this mode it is passive
108//! and avoids any writes to persist. Activating the `read_only_rx` transitions the sink into write
109//! mode, where it commences normal operation.
110//!
111//! Read-only mode is implemented by the `mint` operator. To disable writes, the `mint` operator
112//! simply avoids minting any batch descriptions. Since both the `write` and the `append` operator
113//! require batch descriptions to write/append batches, this suppresses any persist communication.
114//! At the same time, the `write` operator still observes changes to the `desired` and `persist`
115//! collections, allowing it to keep its correction buffer up-to-date.
116
117use std::any::Any;
118use std::cell::RefCell;
119use std::pin::pin;
120use std::rc::Rc;
121use std::sync::Arc;
122
123use differential_dataflow::{AsCollection, Hashable, VecCollection};
124use futures::StreamExt;
125use mz_compute_types::dyncfgs::MV_SINK_ADVANCE_PERSIST_FRONTIERS;
126use mz_compute_types::sinks::{ComputeSinkDesc, MaterializedViewSinkConnection};
127use mz_dyncfg::ConfigSet;
128use mz_ore::cast::CastFrom;
129use mz_persist_client::batch::{Batch, ProtoBatch};
130use mz_persist_client::cache::PersistClientCache;
131use mz_persist_client::metrics::SinkMetrics;
132use mz_persist_client::operators::shard_source::{ErrorHandler, SnapshotMode};
133use mz_persist_client::write::WriteHandle;
134use mz_persist_client::{Diagnostics, PersistClient};
135use mz_persist_types::codec_impls::UnitSchema;
136use mz_repr::{Diff, GlobalId, Row, Timestamp};
137use mz_storage_types::StorageDiff;
138use mz_storage_types::controller::CollectionMetadata;
139use mz_storage_types::sources::SourceData;
140use mz_timely_util::builder_async::PressOnDropButton;
141use mz_timely_util::builder_async::{Event, OperatorBuilder};
142use mz_timely_util::probe::{Handle, ProbeNotify};
143use serde::{Deserialize, Serialize};
144use timely::PartialOrder;
145use timely::container::CapacityContainerBuilder;
146use timely::dataflow::channels::pact::{Exchange, Pipeline};
147use timely::dataflow::operators::vec::Broadcast;
148use timely::dataflow::operators::{Capability, CapabilitySet, probe};
149use timely::dataflow::{Scope, StreamVec};
150use timely::progress::Antichain;
151use tokio::sync::watch;
152use tracing::trace;
153
154use crate::compute_state::ComputeState;
155use crate::render::StartSignal;
156use crate::render::errors::DataflowErrorSer;
157use crate::render::sinks::SinkRender;
158use crate::sink::correction::{ChannelLogging, Correction, CorrectionLogger};
159use crate::sink::materialized_view_v2;
160use crate::sink::refresh::apply_refresh;
161
162impl<'scope> SinkRender<'scope> for MaterializedViewSinkConnection<CollectionMetadata> {
163 fn render_sink(
164 &self,
165 compute_state: &mut ComputeState,
166 sink: &ComputeSinkDesc<CollectionMetadata>,
167 sink_id: GlobalId,
168 as_of: Antichain<Timestamp>,
169 start_signal: StartSignal,
170 mut ok_collection: VecCollection<'scope, Timestamp, Row, Diff>,
171 mut err_collection: VecCollection<'scope, Timestamp, DataflowErrorSer, Diff>,
172 output_probe: &Handle<Timestamp>,
173 ) -> Option<Rc<dyn Any>> {
174 // Attach probes reporting the compute frontier.
175 // The `apply_refresh` operator can round up frontiers, making it impossible to accurately
176 // track the progress of the computation, so we need to attach probes before it.
177 let probe = probe::Handle::default();
178 ok_collection = ok_collection
179 .probe_with(&probe)
180 .inner
181 .probe_notify_with(vec![output_probe.clone()])
182 .as_collection();
183 let collection_state = compute_state.expect_collection_mut(sink_id);
184 collection_state.compute_probe = Some(probe);
185
186 // If a `RefreshSchedule` was specified, round up timestamps.
187 if let Some(refresh_schedule) = &sink.refresh_schedule {
188 ok_collection = apply_refresh(ok_collection, refresh_schedule.clone());
189 err_collection = apply_refresh(err_collection, refresh_schedule.clone());
190 }
191
192 if sink.up_to != Antichain::default() {
193 unimplemented!(
194 "UP TO is not supported for persist sinks yet, and shouldn't have been accepted during parsing/planning"
195 )
196 }
197
198 let read_only_rx = collection_state.read_only_rx.clone();
199
200 let token = persist_sink(
201 sink_id,
202 &self.storage_metadata,
203 ok_collection,
204 err_collection,
205 as_of,
206 compute_state,
207 start_signal,
208 read_only_rx,
209 );
210 Some(token)
211 }
212}
213
214/// Type of the `desired` stream, split into `Ok` and `Err` streams.
215pub(super) type DesiredStreams<'s> = OkErr<
216 StreamVec<'s, Timestamp, (Row, Timestamp, Diff)>,
217 StreamVec<'s, Timestamp, (DataflowErrorSer, Timestamp, Diff)>,
218>;
219
220/// Type of the `persist` stream, split into `Ok` and `Err` streams.
221pub(super) type PersistStreams<'s> = OkErr<
222 StreamVec<'s, Timestamp, (Row, Timestamp, Diff)>,
223 StreamVec<'s, Timestamp, (DataflowErrorSer, Timestamp, Diff)>,
224>;
225
226/// Type of the `descs` stream.
227pub(super) type DescsStream<'s> = StreamVec<'s, Timestamp, BatchDescription>;
228
229/// Type of the `batches` stream.
230pub(super) type BatchesStream<'s> = StreamVec<'s, Timestamp, (BatchDescription, ProtoBatch)>;
231
232/// Type of the shared sink write frontier.
233pub(super) type SharedSinkFrontier = Rc<RefCell<Antichain<Timestamp>>>;
234
235/// Renders an MV sink writing the given desired collection into the `target` persist collection.
236pub(super) fn persist_sink<'s>(
237 sink_id: GlobalId,
238 target: &CollectionMetadata,
239 ok_collection: VecCollection<'s, Timestamp, Row, Diff>,
240 err_collection: VecCollection<'s, Timestamp, DataflowErrorSer, Diff>,
241 as_of: Antichain<Timestamp>,
242 compute_state: &mut ComputeState,
243 start_signal: StartSignal,
244 read_only_rx: watch::Receiver<bool>,
245) -> Rc<dyn Any>
246where
247{
248 if mz_compute_types::dyncfgs::ENABLE_SYNC_MV_SINK.get(&compute_state.worker_config) {
249 return materialized_view_v2::persist_sink(
250 sink_id,
251 target,
252 ok_collection,
253 err_collection,
254 as_of,
255 compute_state,
256 start_signal,
257 read_only_rx,
258 );
259 }
260
261 let scope = ok_collection.scope();
262 let desired = OkErr::new(ok_collection.inner, err_collection.inner);
263
264 // Read back the persist shard.
265 let (persist, persist_token) =
266 persist_source(scope, sink_id, target.clone(), compute_state, start_signal);
267
268 let persist_api = PersistApi {
269 persist_clients: Arc::clone(&compute_state.persist_clients),
270 collection: target.clone(),
271 shard_name: sink_id.to_string(),
272 purpose: format!("MV sink {sink_id}"),
273 };
274
275 let (desired, descs, sink_frontier, mint_token) = mint::render(
276 sink_id,
277 persist_api.clone(),
278 as_of.clone(),
279 read_only_rx,
280 desired,
281 );
282
283 let (batches, write_token) = write::render(
284 sink_id,
285 persist_api.clone(),
286 as_of,
287 desired,
288 persist,
289 descs.clone(),
290 Rc::clone(&compute_state.worker_config),
291 );
292
293 let append_token = append::render(sink_id, persist_api, descs, batches);
294
295 // Report sink frontier updates to the `ComputeState`.
296 let collection = compute_state.expect_collection_mut(sink_id);
297 collection.sink_write_frontier = Some(sink_frontier);
298
299 Rc::new((persist_token, mint_token, write_token, append_token))
300}
301
302/// Generic wrapper around ok/err pairs (e.g. streams, frontiers), to simplify code dealing with
303/// such pairs.
304pub(super) struct OkErr<O, E> {
305 pub(super) ok: O,
306 pub(super) err: E,
307}
308
309impl<O, E> OkErr<O, E> {
310 pub(super) fn new(ok: O, err: E) -> Self {
311 Self { ok, err }
312 }
313}
314
315impl OkErr<Antichain<Timestamp>, Antichain<Timestamp>> {
316 pub(super) fn new_frontiers() -> Self {
317 Self {
318 ok: Antichain::from_elem(Timestamp::MIN),
319 err: Antichain::from_elem(Timestamp::MIN),
320 }
321 }
322
323 /// Return the overall frontier, i.e., the minimum of `ok` and `err`.
324 pub(super) fn frontier(&self) -> &Antichain<Timestamp> {
325 if PartialOrder::less_equal(&self.ok, &self.err) {
326 &self.ok
327 } else {
328 &self.err
329 }
330 }
331}
332
333/// Advance the given `frontier` to `new`, if the latter one is greater.
334///
335/// Returns whether `frontier` was advanced.
336pub(super) fn advance(
337 frontier: &mut Antichain<Timestamp>,
338 new: timely::progress::frontier::AntichainRef<'_, Timestamp>,
339) -> bool {
340 if PartialOrder::less_than(&frontier.borrow(), &new) {
341 frontier.clear();
342 frontier.extend(new.iter().cloned());
343 true
344 } else {
345 false
346 }
347}
348
349/// A persist API specialized to a single collection.
350#[derive(Clone)]
351pub(super) struct PersistApi {
352 pub(super) persist_clients: Arc<PersistClientCache>,
353 pub(super) collection: CollectionMetadata,
354 pub(super) shard_name: String,
355 pub(super) purpose: String,
356}
357
358impl PersistApi {
359 pub(super) async fn open_client(&self) -> PersistClient {
360 self.persist_clients
361 .open(self.collection.persist_location.clone())
362 .await
363 .unwrap_or_else(|error| panic!("error opening persist client: {error}"))
364 }
365
366 pub(super) async fn open_writer(&self) -> WriteHandle<SourceData, (), Timestamp, StorageDiff> {
367 self.open_client()
368 .await
369 .open_writer(
370 self.collection.data_shard,
371 Arc::new(self.collection.relation_desc.clone()),
372 Arc::new(UnitSchema),
373 Diagnostics {
374 shard_name: self.shard_name.clone(),
375 handle_purpose: self.purpose.clone(),
376 },
377 )
378 .await
379 .unwrap_or_else(|error| panic!("error opening persist writer: {error}"))
380 }
381
382 async fn open_metrics(&self) -> SinkMetrics {
383 let client = self.open_client().await;
384 client.metrics().sink.clone()
385 }
386}
387
388/// Instantiate a persist source reading back the `target` collection.
389pub(super) fn persist_source<'s>(
390 scope: Scope<'s, Timestamp>,
391 sink_id: GlobalId,
392 target: CollectionMetadata,
393 compute_state: &ComputeState,
394 start_signal: StartSignal,
395) -> (PersistStreams<'s>, Vec<PressOnDropButton>) {
396 // There is no guarantee that the sink as-of is beyond the persist shard's since. If it isn't,
397 // instantiating a `persist_source` with it would panic. So instead we leave it to
398 // `persist_source` to select an appropriate as-of. We only care about times beyond the current
399 // shard upper anyway.
400 //
401 // TODO(teskje): Ideally we would select the as-of as `join(sink_as_of, since, upper)`, to
402 // allow `persist_source` to omit as much historical detail as possible. However, we don't know
403 // the shard frontiers and we cannot get them here as that requires an `async` context. We
404 // should consider extending the `persist_source` API to allow as-of selection based on the
405 // shard's current frontiers.
406 let as_of = None;
407
408 let until = Antichain::new();
409 let map_filter_project = None;
410
411 let (ok_stream, err_stream, token) =
412 mz_storage_operators::persist_source::persist_source::<DataflowErrorSer>(
413 scope,
414 sink_id,
415 Arc::clone(&compute_state.persist_clients),
416 &compute_state.txns_ctx,
417 target,
418 None,
419 as_of,
420 SnapshotMode::Include,
421 until,
422 map_filter_project,
423 compute_state.dataflow_max_inflight_bytes(),
424 start_signal.into_send_future(),
425 ErrorHandler::Halt("compute persist sink"),
426 );
427
428 let streams = OkErr::new(ok_stream, err_stream);
429 (streams, token)
430}
431
432/// A description for a batch of updates to be written.
433///
434/// Batch descriptions are produced by the `mint` operator and consumed by the `write` and `append`
435/// operators, where they inform which batches should be written or appended, respectively.
436///
437/// Each batch description also contains the index of its "append worker", i.e. the worker that is
438/// responsible for appending the written batches to the output shard.
439#[derive(Clone, Serialize, Deserialize)]
440pub(super) struct BatchDescription {
441 pub(super) lower: Antichain<Timestamp>,
442 pub(super) upper: Antichain<Timestamp>,
443 pub(super) append_worker: usize,
444}
445
446impl BatchDescription {
447 pub(super) fn new(
448 lower: Antichain<Timestamp>,
449 upper: Antichain<Timestamp>,
450 append_worker: usize,
451 ) -> Self {
452 assert!(PartialOrder::less_than(&lower, &upper));
453 Self {
454 lower,
455 upper,
456 append_worker,
457 }
458 }
459}
460
461impl std::fmt::Debug for BatchDescription {
462 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
463 write!(
464 f,
465 "({:?}, {:?})@{}",
466 self.lower.elements(),
467 self.upper.elements(),
468 self.append_worker,
469 )
470 }
471}
472
473/// Construct a name for the given sub-operator.
474pub(super) fn operator_name(sink_id: GlobalId, sub_operator: &str) -> String {
475 format!("mv_sink({sink_id})::{sub_operator}")
476}
477
478/// Implementation of the `mint` operator.
479mod mint {
480 use super::*;
481
482 /// Render the `mint` operator.
483 ///
484 /// The parameters passed in are:
485 /// * `sink_id`: The `GlobalId` of the sink export.
486 /// * `persist_api`: An object providing access to the output persist shard.
487 /// * `as_of`: The first time for which the sink may produce output.
488 /// * `read_only_tx`: A receiver that reports the sink is in read-only mode.
489 /// * `desired`: The ok/err streams that should be sinked to persist.
490 pub fn render<'s>(
491 sink_id: GlobalId,
492 persist_api: PersistApi,
493 as_of: Antichain<Timestamp>,
494 mut read_only_rx: watch::Receiver<bool>,
495 desired: DesiredStreams<'s>,
496 ) -> (
497 DesiredStreams<'s>,
498 DescsStream<'s>,
499 SharedSinkFrontier,
500 PressOnDropButton,
501 ) {
502 let scope = desired.ok.scope();
503 let worker_id = scope.index();
504 let worker_count = scope.peers();
505
506 // Determine the active worker for the mint operator.
507 let active_worker_id = usize::cast_from(sink_id.hashed()) % scope.peers();
508
509 let sink_frontier = Rc::new(RefCell::new(Antichain::from_elem(Timestamp::MIN)));
510 let shared_frontier = Rc::clone(&sink_frontier);
511
512 let name = operator_name(sink_id, "mint");
513 let mut op = OperatorBuilder::new(name, scope);
514
515 let (ok_output, ok_stream) = op.new_output::<CapacityContainerBuilder<_>>();
516 let (err_output, err_stream) = op.new_output::<CapacityContainerBuilder<_>>();
517 let desired_outputs = OkErr::new(ok_output, err_output);
518 let desired_output_streams = OkErr::new(ok_stream, err_stream);
519
520 let (desc_output, desc_output_stream) = op.new_output::<CapacityContainerBuilder<_>>();
521
522 let mut desired_inputs = OkErr {
523 ok: op.new_input_for(desired.ok, Pipeline, &desired_outputs.ok),
524 err: op.new_input_for(desired.err, Pipeline, &desired_outputs.err),
525 };
526
527 let button = op.build(move |capabilities| async move {
528 // Passing through the `desired` streams only requires data capabilities, so we can
529 // immediately drop their initial capabilities here.
530 let [_, _, desc_cap]: [_; 3] =
531 capabilities.try_into().expect("one capability per output");
532
533 // Non-active workers just pass the `desired` and `persist` data through.
534 if worker_id != active_worker_id {
535 drop(desc_cap);
536 shared_frontier.borrow_mut().clear();
537
538 loop {
539 tokio::select! {
540 Some(event) = desired_inputs.ok.next() => {
541 if let Event::Data(cap, mut data) = event {
542 desired_outputs.ok.give_container(&cap, &mut data);
543 }
544 }
545 Some(event) = desired_inputs.err.next() => {
546 if let Event::Data(cap, mut data) = event {
547 desired_outputs.err.give_container(&cap, &mut data);
548 }
549 }
550 // All inputs are exhausted, so we can shut down.
551 else => return,
552 }
553 }
554 }
555
556 let mut cap_set = CapabilitySet::from_elem(desc_cap);
557
558 let read_only = *read_only_rx.borrow_and_update();
559 let mut state = State::new(sink_id, worker_count, as_of, read_only);
560
561 // Create a stream that reports advancements of the target shard's frontier and updates
562 // the shared sink frontier.
563 //
564 // We collect the persist frontier from a write handle directly, rather than inspecting
565 // the `persist` stream, because the latter has two annoying glitches:
566 // (a) It starts at the shard's read frontier, not its write frontier.
567 // (b) It can lag behind if there are spikes in ingested data.
568 //
569 // The decoupling from the `persist` stream is load-bearing: that stream can fall
570 // arbitrarily behind the shard upper during snapshot replay or write spikes. Using it
571 // would delay both (1) the controller-visible sink frontier (`shared_frontier`),
572 // which previously caused a CrossJoin feature-bench regression where the controller
573 // held a finished MV dataflow open waiting for the empty frontier, and (2) the
574 // `state.persist_frontier` that gates batch-description minting, stalling mint until
575 // the read-back stream catches up. The goal isn't tick-granular descriptions per
576 // se — it's avoiding the stream-induced stall. See 5eab5ff896 for the original
577 // regression and rationale.
578 let mut persist_frontiers = pin!(async_stream::stream! {
579 let mut writer = persist_api.open_writer().await;
580 let mut frontier = Antichain::from_elem(Timestamp::MIN);
581 while !frontier.is_empty() {
582 writer.wait_for_upper_past(&frontier).await;
583 frontier = writer.upper().clone();
584 shared_frontier.borrow_mut().clone_from(&frontier);
585 yield frontier.clone();
586 }
587 });
588
589 loop {
590 // Read from the inputs, pass through all data to the respective outputs, and keep
591 // track of the input frontiers. When a frontier advances we might have to mint a
592 // new batch description.
593 let maybe_desc = tokio::select! {
594 Some(event) = desired_inputs.ok.next() => {
595 match event {
596 Event::Data(cap, mut data) => {
597 desired_outputs.ok.give_container(&cap, &mut data);
598 None
599 }
600 Event::Progress(frontier) => {
601 state.advance_desired_ok_frontier(frontier);
602 state.maybe_mint_batch_description()
603 }
604 }
605 }
606 Some(event) = desired_inputs.err.next() => {
607 match event {
608 Event::Data(cap, mut data) => {
609 desired_outputs.err.give_container(&cap, &mut data);
610 None
611 }
612 Event::Progress(frontier) => {
613 state.advance_desired_err_frontier(frontier);
614 state.maybe_mint_batch_description()
615 }
616 }
617 }
618 Some(frontier) = persist_frontiers.next() => {
619 state.advance_persist_frontier(frontier);
620 state.maybe_mint_batch_description()
621 }
622 Ok(()) = read_only_rx.changed(), if read_only => {
623 state.allow_writes();
624 state.maybe_mint_batch_description()
625 }
626 // All inputs are exhausted, so we can shut down.
627 else => return,
628 };
629
630 if let Some(desc) = maybe_desc {
631 let lower_ts = *desc.lower.as_option().expect("not empty");
632 let cap = cap_set.delayed(&lower_ts);
633 desc_output.give(&cap, desc);
634
635 // We only emit strictly increasing `lower`s, so we can let our output frontier
636 // advance beyond the current `lower`.
637 cap_set.downgrade([lower_ts.step_forward()]);
638 } else {
639 // The next emitted `lower` will be at least the `persist` frontier, so we can
640 // advance our output frontier as far.
641 let _ = cap_set.try_downgrade(state.persist_frontier.iter());
642 }
643 }
644 });
645
646 (
647 desired_output_streams,
648 desc_output_stream,
649 sink_frontier,
650 button.press_on_drop(),
651 )
652 }
653
654 /// State maintained by the `mint` operator.
655 struct State {
656 sink_id: GlobalId,
657 /// The number of workers in the Timely cluster.
658 worker_count: usize,
659 /// The frontiers of the `desired` inputs.
660 desired_frontiers: OkErr<Antichain<Timestamp>, Antichain<Timestamp>>,
661 /// The frontier of the target persist shard.
662 persist_frontier: Antichain<Timestamp>,
663 /// The append worker for the next batch description, chosen in round-robin fashion.
664 next_append_worker: usize,
665 /// The last `lower` we have emitted in a batch description, if any. Whenever the
666 /// `persist_frontier` moves beyond this frontier, we need to mint a new description.
667 last_lower: Option<Antichain<Timestamp>>,
668 /// Whether we are operating in read-only mode.
669 ///
670 /// In read-only mode, minting of batch descriptions is disabled.
671 read_only: bool,
672 }
673
674 impl State {
675 fn new(
676 sink_id: GlobalId,
677 worker_count: usize,
678 as_of: Antichain<Timestamp>,
679 read_only: bool,
680 ) -> Self {
681 // Initializing `persist_frontier` to the `as_of` ensures that the first minted batch
682 // description will have a `lower` of `as_of` or beyond, and thus that we don't spend
683 // work needlessly writing batches at previous times.
684 let persist_frontier = as_of;
685
686 Self {
687 sink_id,
688 worker_count,
689 desired_frontiers: OkErr::new_frontiers(),
690 persist_frontier,
691 next_append_worker: 0,
692 last_lower: None,
693 read_only,
694 }
695 }
696
697 fn trace<S: AsRef<str>>(&self, message: S) {
698 let message = message.as_ref();
699 trace!(
700 sink_id = %self.sink_id,
701 desired_frontier = ?self.desired_frontiers.frontier().elements(),
702 persist_frontier = ?self.persist_frontier.elements(),
703 last_lower = ?self.last_lower.as_ref().map(|f| f.elements()),
704 message,
705 );
706 }
707
708 fn advance_desired_ok_frontier(&mut self, frontier: Antichain<Timestamp>) {
709 if advance(&mut self.desired_frontiers.ok, frontier.borrow()) {
710 self.trace("advanced `desired` ok frontier");
711 }
712 }
713
714 fn advance_desired_err_frontier(&mut self, frontier: Antichain<Timestamp>) {
715 if advance(&mut self.desired_frontiers.err, frontier.borrow()) {
716 self.trace("advanced `desired` err frontier");
717 }
718 }
719
720 fn advance_persist_frontier(&mut self, frontier: Antichain<Timestamp>) {
721 if advance(&mut self.persist_frontier, frontier.borrow()) {
722 self.trace("advanced `persist` frontier");
723 }
724 }
725
726 fn allow_writes(&mut self) {
727 if self.read_only {
728 self.read_only = false;
729 self.trace("disabled read-only mode");
730 }
731 }
732
733 fn maybe_mint_batch_description(&mut self) -> Option<BatchDescription> {
734 let desired_frontier = self.desired_frontiers.frontier();
735 let persist_frontier = &self.persist_frontier;
736
737 // We only mint new batch descriptions when:
738 // 1. We are _not_ in read-only mode.
739 // 2. The `desired` frontier is ahead of the `persist` frontier.
740 // 3. The `persist` frontier advanced since we last emitted a batch description.
741 let desired_ahead = PartialOrder::less_than(persist_frontier, desired_frontier);
742 let persist_advanced = self.last_lower.as_ref().map_or(true, |lower| {
743 PartialOrder::less_than(lower, persist_frontier)
744 });
745
746 if self.read_only || !desired_ahead || !persist_advanced {
747 return None;
748 }
749
750 let lower = persist_frontier.clone();
751 let upper = desired_frontier.clone();
752 let append_worker = self.next_append_worker;
753 let desc = BatchDescription::new(lower, upper, append_worker);
754
755 self.next_append_worker = (append_worker + 1) % self.worker_count;
756 self.last_lower = Some(desc.lower.clone());
757
758 self.trace(format!("minted batch description: {desc:?}"));
759 Some(desc)
760 }
761 }
762}
763
764/// Implementation of the `write` operator.
765mod write {
766 use super::*;
767
768 /// Render the `write` operator.
769 ///
770 /// The parameters passed in are:
771 /// * `sink_id`: The `GlobalId` of the sink export.
772 /// * `persist_api`: An object providing access to the output persist shard.
773 /// * `as_of`: The first time for which the sink may produce output.
774 /// * `desired`: The ok/err streams that should be sinked to persist.
775 /// * `persist`: The ok/err streams read back from the output persist shard.
776 /// * `descs`: The stream of batch descriptions produced by the `mint` operator.
777 pub fn render<'s>(
778 sink_id: GlobalId,
779 persist_api: PersistApi,
780 as_of: Antichain<Timestamp>,
781 desired: DesiredStreams<'s>,
782 persist: PersistStreams<'s>,
783 descs: DescsStream<'s>,
784 worker_config: Rc<ConfigSet>,
785 ) -> (BatchesStream<'s>, PressOnDropButton) {
786 let scope = desired.ok.scope();
787 let worker_id = scope.index();
788
789 let name = operator_name(sink_id, "write");
790 let mut op = OperatorBuilder::new(name, scope.clone());
791
792 let mut channel_logging = None;
793 let mut correction_logger = None;
794 if let (Some(compute_logger), Some(differential_logger)) = (
795 scope.worker().logger_for("materialize/compute"),
796 scope.worker().logger_for("differential/arrange"),
797 ) {
798 let operator_info = op.operator_info();
799 let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
800 channel_logging = Some(ChannelLogging::new(tx));
801 correction_logger = Some(CorrectionLogger::new(
802 compute_logger,
803 differential_logger.into(),
804 operator_info.global_id,
805 operator_info.address.to_vec(),
806 rx,
807 ));
808 }
809
810 let (batches_output, batches_output_stream) =
811 op.new_output::<CapacityContainerBuilder<_>>();
812
813 // It is important that we exchange the `desired` and `persist` data the same way, so
814 // updates that cancel each other out end up on the same worker.
815 let exchange_ok = |(d, _, _): &(Row, Timestamp, Diff)| d.hashed();
816 let exchange_err = |(d, _, _): &(DataflowErrorSer, Timestamp, Diff)| d.hashed();
817
818 let mut desired_inputs = OkErr::new(
819 op.new_disconnected_input(desired.ok, Exchange::new(exchange_ok)),
820 op.new_disconnected_input(desired.err, Exchange::new(exchange_err)),
821 );
822 let mut persist_inputs = OkErr::new(
823 op.new_disconnected_input(persist.ok, Exchange::new(exchange_ok)),
824 op.new_disconnected_input(persist.err, Exchange::new(exchange_err)),
825 );
826 let mut descs_input = op.new_input_for(descs.broadcast(), Pipeline, &batches_output);
827
828 let button = op.build(move |capabilities| async move {
829 // We will use the data capabilities from the `descs` input to produce output, so no
830 // need to hold onto the initial capabilities.
831 drop(capabilities);
832
833 let writer = persist_api.open_writer().await;
834 let sink_metrics = persist_api.open_metrics().await;
835 let mut state = State::new(
836 sink_id,
837 worker_id,
838 writer,
839 sink_metrics,
840 channel_logging,
841 as_of,
842 &worker_config,
843 );
844 let mut correction_logger = correction_logger;
845
846 loop {
847 // Drain correction logging events from the channel.
848 if let Some(logger) = &mut correction_logger {
849 logger.apply_events();
850 }
851
852 // Read from the inputs, extract `desired` updates as positive contributions to
853 // `correction` and `persist` updates as negative contributions. If either the
854 // `desired` or `persist` frontier advances, or if we receive a new batch description,
855 // we might have to write a new batch.
856 let maybe_batch = tokio::select! {
857 Some(event) = desired_inputs.ok.next() => {
858 match event {
859 Event::Data(_cap, mut data) => {
860 state.corrections.ok.insert(&mut data);
861 None
862 }
863 Event::Progress(frontier) => {
864 state.advance_desired_ok_frontier(frontier);
865 state.maybe_write_batch().await
866 }
867 }
868 }
869 Some(event) = desired_inputs.err.next() => {
870 match event {
871 Event::Data(_cap, mut data) => {
872 state.corrections.err.insert(&mut data);
873 None
874 }
875 Event::Progress(frontier) => {
876 state.advance_desired_err_frontier(frontier);
877 state.maybe_write_batch().await
878 }
879 }
880 }
881 Some(event) = persist_inputs.ok.next() => {
882 match event {
883 Event::Data(_cap, mut data) => {
884 state.corrections.ok.insert_negated(&mut data);
885 None
886 }
887 Event::Progress(frontier) => {
888 state.advance_persist_ok_frontier(frontier);
889 state.maybe_write_batch().await
890 }
891 }
892 }
893 Some(event) = persist_inputs.err.next() => {
894 match event {
895 Event::Data(_cap, mut data) => {
896 state.corrections.err.insert_negated(&mut data);
897 None
898 }
899 Event::Progress(frontier) => {
900 state.advance_persist_err_frontier(frontier);
901 state.maybe_write_batch().await
902 }
903 }
904 }
905 Some(event) = descs_input.next() => {
906 match event {
907 Event::Data(cap, data) => {
908 for desc in data {
909 state.absorb_batch_description(desc, cap.clone());
910 }
911 state.maybe_write_batch().await
912 }
913 Event::Progress(_frontier) => None,
914 }
915 }
916 // All inputs are exhausted, so we can shut down.
917 else => return,
918 };
919
920 if let Some((index, batch, cap)) = maybe_batch {
921 batches_output.give(&cap, (index, batch));
922 }
923 }
924 });
925
926 (batches_output_stream, button.press_on_drop())
927 }
928
929 /// State maintained by the `write` operator.
930 struct State {
931 sink_id: GlobalId,
932 worker_id: usize,
933 persist_writer: WriteHandle<SourceData, (), Timestamp, StorageDiff>,
934 /// Contains `desired - persist`, reflecting the updates we would like to commit to
935 /// `persist` in order to "correct" it to track `desired`. This collection is only modified
936 /// by updates received from either the `desired` or `persist` inputs.
937 corrections: OkErr<Correction<Row>, Correction<DataflowErrorSer>>,
938 /// The frontiers of the `desired` inputs.
939 desired_frontiers: OkErr<Antichain<Timestamp>, Antichain<Timestamp>>,
940 /// The frontiers of the `persist` inputs.
941 ///
942 /// Note that this is _not_ the same as the write frontier of the output persist shard! It
943 /// usually is, but during snapshot processing, these frontiers will start at the shard's
944 /// read frontier, so they can be beyond its write frontier. This is important as it means
945 /// we must not discard batch descriptions based on these persist frontiers: A batch
946 /// description might still be valid even if its `lower` is before the persist frontiers we
947 /// observe.
948 persist_frontiers: OkErr<Antichain<Timestamp>, Antichain<Timestamp>>,
949 /// The current valid batch description and associated output capability, if any.
950 batch_description: Option<(BatchDescription, Capability<Timestamp>)>,
951 /// A request to force a consolidation of `corrections` once both `desired_frontiers` and
952 /// `persist_frontiers` become greater than the given frontier.
953 ///
954 /// Normally we force a consolidation whenever we write a batch, but there are periods
955 /// (like read-only mode) when that doesn't happen, and we need to manually force
956 /// consolidation instead. Currently this is only used to ensure we quickly get rid of the
957 /// snapshot updates.
958 force_consolidation_after: Option<Antichain<Timestamp>>,
959 }
960
961 impl State {
962 fn new(
963 sink_id: GlobalId,
964 worker_id: usize,
965 persist_writer: WriteHandle<SourceData, (), Timestamp, StorageDiff>,
966 metrics: SinkMetrics,
967 logging: Option<ChannelLogging>,
968 as_of: Antichain<Timestamp>,
969 worker_config: &ConfigSet,
970 ) -> Self {
971 let worker_metrics = metrics.for_worker(worker_id);
972
973 // Force a consolidation of `corrections` after the snapshot updates have been fully
974 // processed, to ensure we get rid of those as quickly as possible.
975 let force_consolidation_after = Some(as_of.clone());
976
977 let mut state = Self {
978 sink_id,
979 worker_id,
980 persist_writer,
981 corrections: OkErr::new(
982 Correction::new(
983 metrics.clone(),
984 worker_metrics.clone(),
985 logging.clone(),
986 worker_config,
987 ),
988 Correction::new(metrics, worker_metrics, logging, worker_config),
989 ),
990 desired_frontiers: OkErr::new_frontiers(),
991 persist_frontiers: OkErr::new_frontiers(),
992 batch_description: None,
993 force_consolidation_after,
994 };
995
996 // Immediately advance the persist frontier tracking to the `as_of`.
997 // This is important to ensure the persist sink doesn't get stuck if the output shard's
998 // initial frontier is less than the `as_of`. The `mint` operator first emits a batch
999 // description with `lower = as_of`, and the `write` operator only emits a batch when
1000 // its observed persist frontier is >= the batch description's `lower`, which (assuming
1001 // no other writers) would be never if we didn't advance the observed persist frontier
1002 // to the `as_of`.
1003 if MV_SINK_ADVANCE_PERSIST_FRONTIERS.get(worker_config) {
1004 state.advance_persist_ok_frontier(as_of.clone());
1005 state.advance_persist_err_frontier(as_of);
1006 }
1007
1008 state
1009 }
1010
1011 fn trace<S: AsRef<str>>(&self, message: S) {
1012 let message = message.as_ref();
1013 trace!(
1014 sink_id = %self.sink_id,
1015 worker = %self.worker_id,
1016 desired_frontier = ?self.desired_frontiers.frontier().elements(),
1017 persist_frontier = ?self.persist_frontiers.frontier().elements(),
1018 batch_description = ?self.batch_description.as_ref().map(|(d, _)| d),
1019 message,
1020 );
1021 }
1022
1023 fn advance_desired_ok_frontier(&mut self, frontier: Antichain<Timestamp>) {
1024 if advance(&mut self.desired_frontiers.ok, frontier.borrow()) {
1025 self.apply_desired_frontier_advancement();
1026 self.trace("advanced `desired` ok frontier");
1027 }
1028 }
1029
1030 fn advance_desired_err_frontier(&mut self, frontier: Antichain<Timestamp>) {
1031 if advance(&mut self.desired_frontiers.err, frontier.borrow()) {
1032 self.apply_desired_frontier_advancement();
1033 self.trace("advanced `desired` err frontier");
1034 }
1035 }
1036
1037 fn advance_persist_ok_frontier(&mut self, frontier: Antichain<Timestamp>) {
1038 if advance(&mut self.persist_frontiers.ok, frontier.borrow()) {
1039 self.apply_persist_frontier_advancement();
1040 self.trace("advanced `persist` ok frontier");
1041 }
1042 }
1043
1044 fn advance_persist_err_frontier(&mut self, frontier: Antichain<Timestamp>) {
1045 if advance(&mut self.persist_frontiers.err, frontier.borrow()) {
1046 self.apply_persist_frontier_advancement();
1047 self.trace("advanced `persist` err frontier");
1048 }
1049 }
1050
1051 /// Apply the effects of a previous `desired` frontier advancement.
1052 fn apply_desired_frontier_advancement(&mut self) {
1053 self.maybe_force_consolidation();
1054 }
1055
1056 /// Apply the effects of a previous `persist` frontier advancement.
1057 fn apply_persist_frontier_advancement(&mut self) {
1058 let frontier = self.persist_frontiers.frontier();
1059
1060 // We will only emit times at or after the `persist` frontier, so now is a good time to
1061 // advance the times of stashed updates.
1062 self.corrections.ok.advance_since(frontier.clone());
1063 self.corrections.err.advance_since(frontier.clone());
1064
1065 self.maybe_force_consolidation();
1066 }
1067
1068 /// If the current consolidation request has become applicable, apply it.
1069 fn maybe_force_consolidation(&mut self) {
1070 let Some(request) = &self.force_consolidation_after else {
1071 return;
1072 };
1073
1074 let desired_frontier = self.desired_frontiers.frontier();
1075 let persist_frontier = self.persist_frontiers.frontier();
1076 if PartialOrder::less_than(request, desired_frontier)
1077 && PartialOrder::less_than(request, persist_frontier)
1078 {
1079 self.trace("forcing correction consolidation");
1080 self.corrections.ok.consolidate_at_since();
1081 self.corrections.err.consolidate_at_since();
1082
1083 // Remove the consolidation request, now that we have fulfilled it.
1084 self.force_consolidation_after = None;
1085 }
1086 }
1087
1088 fn absorb_batch_description(&mut self, desc: BatchDescription, cap: Capability<Timestamp>) {
1089 // The incoming batch description is outdated if we already have a batch description
1090 // with a greater `lower`.
1091 //
1092 // Note that we cannot assume a description is outdated based on the comparison of its
1093 // `lower` with the `persist_frontier`. The persist frontier observed by the `write`
1094 // operator is initialized with the shard's read frontier, so it can be greater than
1095 // the shard's write frontier.
1096 if let Some((prev, _)) = &self.batch_description {
1097 if PartialOrder::less_than(&desc.lower, &prev.lower) {
1098 self.trace(format!("skipping outdated batch description: {desc:?}"));
1099 return;
1100 }
1101 }
1102
1103 self.batch_description = Some((desc, cap));
1104 self.trace("set batch description");
1105 }
1106
1107 async fn maybe_write_batch(
1108 &mut self,
1109 ) -> Option<(BatchDescription, ProtoBatch, Capability<Timestamp>)> {
1110 let (desc, _cap) = self.batch_description.as_ref()?;
1111
1112 // We can write a new batch if we have seen all `persist` updates before `lower` and
1113 // all `desired` updates up to `upper`.
1114 let persist_complete =
1115 PartialOrder::less_equal(&desc.lower, self.persist_frontiers.frontier());
1116 let desired_complete =
1117 PartialOrder::less_equal(&desc.upper, self.desired_frontiers.frontier());
1118 if !persist_complete || !desired_complete {
1119 return None;
1120 }
1121
1122 let (desc, cap) = self.batch_description.take()?;
1123
1124 let ok_updates = self.corrections.ok.updates_before(&desc.upper);
1125 let err_updates = self.corrections.err.updates_before(&desc.upper);
1126
1127 let oks = ok_updates.map(|(d, t, r)| ((SourceData(Ok(d)), ()), t, r.into_inner()));
1128 let errs = err_updates
1129 .map(|(d, t, r)| ((SourceData(Err(d.deserialize())), ()), t, r.into_inner()));
1130 let mut updates = oks.chain(errs).peekable();
1131
1132 // Don't write empty batches.
1133 if updates.peek().is_none() {
1134 drop(updates);
1135 self.trace("skipping empty batch");
1136 return None;
1137 }
1138
1139 let batch = self
1140 .persist_writer
1141 .batch(updates, desc.lower.clone(), desc.upper.clone())
1142 .await
1143 .expect("valid usage")
1144 .into_transmittable_batch();
1145
1146 self.trace("wrote a batch");
1147 Some((desc, batch, cap))
1148 }
1149 }
1150}
1151
1152/// Implementation of the `append` operator.
1153mod append {
1154 use super::*;
1155
1156 /// Render the `append` operator.
1157 ///
1158 /// The parameters passed in are:
1159 /// * `sink_id`: The `GlobalId` of the sink export.
1160 /// * `persist_api`: An object providing access to the output persist shard.
1161 /// * `descs`: The stream of batch descriptions produced by the `mint` operator.
1162 /// * `batches`: The stream of written batches produced by the `write` operator.
1163 pub fn render<'s>(
1164 sink_id: GlobalId,
1165 persist_api: PersistApi,
1166 descs: DescsStream<'s>,
1167 batches: BatchesStream<'s>,
1168 ) -> PressOnDropButton {
1169 let scope = descs.scope();
1170 let worker_id = scope.index();
1171
1172 let name = operator_name(sink_id, "append");
1173 let mut op = OperatorBuilder::new(name, scope);
1174
1175 // Broadcast batch descriptions to all workers, regardless of whether or not they are
1176 // responsible for the append, to give them a chance to clean up any outdated state they
1177 // might still hold.
1178 let mut descs_input = op.new_disconnected_input(descs.broadcast(), Pipeline);
1179 let mut batches_input = op.new_disconnected_input(
1180 batches,
1181 Exchange::new(move |(desc, _): &(BatchDescription, _)| {
1182 u64::cast_from(desc.append_worker)
1183 }),
1184 );
1185
1186 let button = op.build(move |_capabilities| async move {
1187 let writer = persist_api.open_writer().await;
1188 let mut state = State::new(sink_id, worker_id, writer);
1189
1190 loop {
1191 // Read from the inputs, absorb batch descriptions and batches. If the `batches`
1192 // frontier advances, or if we receive a new batch description, we might have to
1193 // append a new batch.
1194 tokio::select! {
1195 Some(event) = descs_input.next() => {
1196 if let Event::Data(_cap, data) = event {
1197 for desc in data {
1198 state.absorb_batch_description(desc).await;
1199 state.maybe_append_batches().await;
1200 }
1201 }
1202 }
1203 Some(event) = batches_input.next() => {
1204 match event {
1205 Event::Data(_cap, data) => {
1206 // The batch description is only used for routing and we ignore it
1207 // here since we already get one from `descs_input`.
1208 for (_desc, batch) in data {
1209 state.absorb_batch(batch).await;
1210 }
1211 }
1212 Event::Progress(frontier) => {
1213 state.advance_batches_frontier(frontier);
1214 state.maybe_append_batches().await;
1215 }
1216 }
1217 }
1218 // All inputs are exhausted, so we can shut down.
1219 else => return,
1220 }
1221 }
1222 });
1223
1224 button.press_on_drop()
1225 }
1226
1227 /// State maintained by the `append` operator.
1228 struct State {
1229 sink_id: GlobalId,
1230 worker_id: usize,
1231 persist_writer: WriteHandle<SourceData, (), Timestamp, StorageDiff>,
1232 /// The current input frontier of `batches`.
1233 batches_frontier: Antichain<Timestamp>,
1234 /// The greatest observed `lower` from both `descs` and `batches`.
1235 lower: Antichain<Timestamp>,
1236 /// The batch description for `lower`, if any.
1237 batch_description: Option<BatchDescription>,
1238 /// Batches received for `lower`.
1239 batches: Vec<Batch<SourceData, (), Timestamp, StorageDiff>>,
1240 }
1241
1242 impl State {
1243 fn new(
1244 sink_id: GlobalId,
1245 worker_id: usize,
1246 persist_writer: WriteHandle<SourceData, (), Timestamp, StorageDiff>,
1247 ) -> Self {
1248 Self {
1249 sink_id,
1250 worker_id,
1251 persist_writer,
1252 batches_frontier: Antichain::from_elem(Timestamp::MIN),
1253 lower: Antichain::from_elem(Timestamp::MIN),
1254 batch_description: None,
1255 batches: Default::default(),
1256 }
1257 }
1258
1259 fn trace<S: AsRef<str>>(&self, message: S) {
1260 let message = message.as_ref();
1261 trace!(
1262 sink_id = %self.sink_id,
1263 worker = %self.worker_id,
1264 batches_frontier = ?self.batches_frontier.elements(),
1265 lower = ?self.lower.elements(),
1266 batch_description = ?self.batch_description,
1267 message,
1268 );
1269 }
1270
1271 fn advance_batches_frontier(&mut self, frontier: Antichain<Timestamp>) {
1272 if advance(&mut self.batches_frontier, frontier.borrow()) {
1273 self.trace("advanced `batches` frontier");
1274 }
1275 }
1276
1277 /// Advance the current `lower`.
1278 ///
1279 /// Discards all currently stashed batches and batch descriptions, assuming that they are
1280 /// now invalid.
1281 async fn advance_lower(&mut self, frontier: Antichain<Timestamp>) {
1282 assert!(PartialOrder::less_than(&self.lower, &frontier));
1283
1284 self.lower = frontier;
1285 self.batch_description = None;
1286
1287 // Remove stashed batches, cleaning up those we didn't append.
1288 for batch in self.batches.drain(..) {
1289 batch.delete().await;
1290 }
1291
1292 self.trace("advanced `lower`");
1293 }
1294
1295 /// Absorb the given batch description into the state, provided it is not outdated.
1296 async fn absorb_batch_description(&mut self, desc: BatchDescription) {
1297 if PartialOrder::less_than(&self.lower, &desc.lower) {
1298 self.advance_lower(desc.lower.clone()).await;
1299 } else if &self.lower != &desc.lower {
1300 self.trace(format!("skipping outdated batch description: {desc:?}"));
1301 return;
1302 }
1303
1304 if desc.append_worker == self.worker_id {
1305 self.batch_description = Some(desc);
1306 self.trace("set batch description");
1307 }
1308 }
1309
1310 /// Absorb the given batch into the state, provided it is not outdated.
1311 async fn absorb_batch(&mut self, batch: ProtoBatch) {
1312 let batch = self.persist_writer.batch_from_transmittable_batch(batch);
1313 if PartialOrder::less_than(&self.lower, batch.lower()) {
1314 self.advance_lower(batch.lower().clone()).await;
1315 } else if &self.lower != batch.lower() {
1316 self.trace(format!(
1317 "skipping outdated batch: ({:?}, {:?})",
1318 batch.lower().elements(),
1319 batch.upper().elements(),
1320 ));
1321
1322 // Ensure the batch's data gets properly cleaned up before dropping it.
1323 batch.delete().await;
1324 return;
1325 }
1326
1327 self.batches.push(batch);
1328 self.trace("absorbed a batch");
1329 }
1330
1331 async fn maybe_append_batches(&mut self) {
1332 let batches_complete = PartialOrder::less_than(&self.lower, &self.batches_frontier);
1333 if !batches_complete {
1334 return;
1335 }
1336
1337 let Some(desc) = self.batch_description.take() else {
1338 return;
1339 };
1340
1341 let new_lower = match self.append_batches(desc).await {
1342 Ok(shard_upper) => {
1343 self.trace("appended a batch");
1344 shard_upper
1345 }
1346 Err(shard_upper) => {
1347 // Failing the append is expected in the presence of concurrent replicas. There
1348 // is nothing special to do here: The self-correcting feedback mechanism
1349 // ensures that we observe the concurrent changes, compute their consequences,
1350 // and append them at a future time.
1351 self.trace(format!(
1352 "append failed due to `lower` mismatch: {:?}",
1353 shard_upper.elements(),
1354 ));
1355 shard_upper
1356 }
1357 };
1358
1359 self.advance_lower(new_lower).await;
1360 }
1361
1362 /// Append the current `batches` to the output shard.
1363 ///
1364 /// Returns whether the append was successful or not, and the current shard upper in either
1365 /// case.
1366 ///
1367 /// This method advances the shard upper to the batch `lower` if necessary. This is the
1368 /// mechanism that brings the shard upper to the sink as-of when appending the initial
1369 /// batch.
1370 ///
1371 /// An alternative mechanism for bringing the shard upper to the sink as-of would be making
1372 /// a single append at operator startup. The reason we are doing it here instead is that it
1373 /// simplifies the implementation of read-only mode. In read-only mode we have to defer any
1374 /// persist writes, including the initial upper bump. Having only a single place that
1375 /// performs writes makes it easy to ensure we are doing that correctly.
1376 async fn append_batches(
1377 &mut self,
1378 desc: BatchDescription,
1379 ) -> Result<Antichain<Timestamp>, Antichain<Timestamp>> {
1380 let (lower, upper) = (desc.lower, desc.upper);
1381 let mut to_append: Vec<_> = self.batches.iter_mut().collect();
1382
1383 loop {
1384 let result = self
1385 .persist_writer
1386 .compare_and_append_batch(&mut to_append, lower.clone(), upper.clone(), true)
1387 .await
1388 .expect("valid usage");
1389
1390 match result {
1391 Ok(()) => return Ok(upper),
1392 Err(mismatch) if PartialOrder::less_than(&mismatch.current, &lower) => {
1393 advance_shard_upper(&mut self.persist_writer, lower.clone()).await;
1394
1395 // At this point the shard's since and upper are likely the same, a state
1396 // that is likely to hit edge-cases in logic reasoning about frontiers.
1397 fail::fail_point!("mv_advanced_upper");
1398 }
1399 Err(mismatch) => return Err(mismatch.current),
1400 }
1401 }
1402 }
1403 }
1404
1405 /// Advance the frontier of the given writer's shard to at least the given `upper`.
1406 async fn advance_shard_upper(
1407 persist_writer: &mut WriteHandle<SourceData, (), Timestamp, StorageDiff>,
1408 upper: Antichain<Timestamp>,
1409 ) {
1410 let empty_updates: &[((SourceData, ()), Timestamp, StorageDiff)] = &[];
1411 let lower = Antichain::from_elem(Timestamp::MIN);
1412 persist_writer
1413 .append(empty_updates, lower, upper)
1414 .await
1415 .expect("valid usage")
1416 .expect("should always succeed");
1417 }
1418}