mz_compute/sink/materialized_view.rs
1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! A dataflow sink that writes input records to a persist shard.
11//!
12//! This implementation is both parallel and self-correcting.
13//!
14//! * parallel: Multiple workers can participate in writing updates for the same times, letting
15//! sink throughput scale with the number of workers allocated to the replica.
16//! * self-correcting: The sink continually compares the contents of the persist shard with the
17//! contents of the input collection and writes down the difference. If the persist shard ends
18//! up with undesired contents for any reason, this is corrected the next time the sink manages
19//! to append to the shard.
20//!
21//! ### Operators
22//!
23//! The persist sink consists of a graph of operators.
24//!
25//! desired persist <---------------.
26//! | | |
27//! | | |
28//! |---------------------. | |
29//! | | | |
30//! | | | |
31//! v v v |
32//! +--------+ +--------+ +--------+
33//! | mint | --descs-.--> | write | --batches--> | append |
34//! +--------+ \ +--------+ .-> +--------+
35//! \_____________________/
36//!
37//! * `mint` mints batch descriptions, i.e., `(lower, upper)` bounds of batches that should be
38//! written. The persist API requires that all workers write batches with the same bounds, so
39//! they can be appended as a single logical batch. To ensure this, the `mint` operator only
40//! runs on a single worker that broadcasts minted descriptions to all workers. Batch bounds are
41//! picked based on the frontiers of the `desired` stream and the output persist shard.
42//! * `write` stages batch data in persist, based on the batch descriptions received from the
43//! `mint` operator, but without appending it to the persist shard. This is a multi-worker
44//! operator, with each worker writing batches of the data that arrives at its local inputs. To
45//! do so it reads from the `desired` and `persist` streams and produces the difference between
46//! them to write back out, ensuring that the final contents of the persist shard match
47//! `desired`.
48//! * `append` appends the batches minted by `mint` and written by `write` to the persist shard.
49//! This is a multi-worker operator, where workers are responsible for different subsets of
50//! batch descriptions. If a worker is responsible for a given batch description, it waits for
51//! all workers to stage their batches for that batch description, then appends all the batches
52//! together as a single logical batch.
53//!
54//! Note that while the above graph suggests that `mint` and `write` both receive copies of the
55//! `desired` stream, the actual implementation passes that stream through `mint` and lets `write`
56//! read the passed-through stream, to avoid cloning data.
57//!
58//! Also note that the `append` operator's implementation would perhaps be more natural as a
59//! single-worker implementation. The purpose of sharing the work between all workers is to avoid a
60//! work imbalance where one worker is overloaded (doing both appends and the consequent persist
61//! maintenance work) while others are comparatively idle.
62//!
63//! The persist sink is written to be robust to the presence of other conflicting instances (e.g.
64//! from other replicas) writing to the same persist shard. Each of the three operators needs to be
65//! able to handle conflicting writes that unexpectedly change the contents of the output persist
66//! shard.
67//!
68//! ### Frontiers
69//!
70//! The `desired` frontier tracks the progress of the upstream dataflow, but may be rounded up to
71//! the next refresh time for dataflows that follow a refresh schedule other than "on commit".
72//!
73//! The `persist` frontier tracks the `upper` frontier of the target persist shard, with one
74//! exception: When the `persist_source` that reads back the shard is rendered, it will start
75//! reading at its `since` frontier. So if the shard's `since` is initially greater than its
76//! `upper`, the `persist` frontier too will be in advance of the shard `upper`, until the `upper`
77//! has caught up. To avoid getting confused by this edge case, the `mint` operator does not use
78//! the `persist` stream to observe the shard frontier but keeps its own `WriteHandle` instead.
79//!
80//! The `descs` frontier communicates which `lower` bounds may still be emitted in batch
81//! descriptions. All future batch descriptions will have a `lower` that is greater or equal to the
82//! current `descs` frontier.
83//!
84//! The `batches` frontier communicates for which `lower` bounds batches may still be written. All
85//! batches for descriptions with `lower`s less than the current `batches` frontier have already
86//! been written.
87//!
88//! ### Invariants
89//!
90//! The implementation upholds several invariants that can be relied upon to simplify the
91//! implementation:
92//!
93//! 1. `lower`s in minted batch descriptions are unique and strictly increasing. That is, the
94//! `mint` operator will never mint the same `lower` twice and a minted `lower` is always
95//! greater than any previously minted ones.
96//! 2. `upper`s in minted batch descriptions are monotonically increasing.
97//! 3. From (1) follows that there is always at most one "valid" batch description in flight in
98//! the operator graph. "Valid" here means that the described batch can be appended to the
99//! persist shard.
100//!
101//! The main simplification these invariants allow is that operators only need to keep track of the
102//! most recent batch description and/or `lower`. Previous batch descriptions are not valid
103//! anymore, so there is no reason to hold any state or perform any work in support of them.
104//!
105//! ### Read-only Mode
106//!
107//! The persist sink can optionally be initialized in read-only mode. In this mode it is passive
108//! and avoids any writes to persist. Activating the `read_only_rx` transitions the sink into write
109//! mode, where it commences normal operation.
110//!
111//! Read-only mode is implemented by the `mint` operator. To disable writes, the `mint` operator
112//! simply avoids minting any batch descriptions. Since both the `write` and the `append` operator
113//! require batch descriptions to write/append batches, this suppresses any persist communication.
114//! At the same time, the `write` operator still observes changes to the `desired` and `persist`
115//! collections, allowing it to keep its correction buffer up-to-date.
116
117use std::any::Any;
118use std::cell::RefCell;
119use std::pin::pin;
120use std::rc::Rc;
121use std::sync::Arc;
122
123use differential_dataflow::{AsCollection, Hashable, VecCollection};
124use futures::StreamExt;
125use mz_compute_types::sinks::{ComputeSinkDesc, MaterializedViewSinkConnection};
126use mz_dyncfg::ConfigSet;
127use mz_ore::cast::CastFrom;
128use mz_persist_client::batch::{Batch, ProtoBatch};
129use mz_persist_client::cache::PersistClientCache;
130use mz_persist_client::metrics::SinkMetrics;
131use mz_persist_client::operators::shard_source::{ErrorHandler, SnapshotMode};
132use mz_persist_client::write::WriteHandle;
133use mz_persist_client::{Diagnostics, PersistClient};
134use mz_persist_types::codec_impls::UnitSchema;
135use mz_repr::{Diff, GlobalId, Row, Timestamp};
136use mz_storage_types::StorageDiff;
137use mz_storage_types::controller::CollectionMetadata;
138use mz_storage_types::errors::DataflowError;
139use mz_storage_types::sources::SourceData;
140use mz_timely_util::builder_async::PressOnDropButton;
141use mz_timely_util::builder_async::{Event, OperatorBuilder};
142use mz_timely_util::probe::{Handle, ProbeNotify};
143use serde::{Deserialize, Serialize};
144use timely::PartialOrder;
145use timely::container::CapacityContainerBuilder;
146use timely::dataflow::channels::pact::{Exchange, Pipeline};
147use timely::dataflow::operators::vec::Broadcast;
148use timely::dataflow::operators::{Capability, CapabilitySet, probe};
149use timely::dataflow::{Scope, StreamVec};
150use timely::progress::Antichain;
151use tokio::sync::watch;
152use tracing::trace;
153
154use crate::compute_state::ComputeState;
155use crate::render::StartSignal;
156use crate::render::sinks::SinkRender;
157use crate::sink::correction::{Correction, Logging};
158use crate::sink::refresh::apply_refresh;
159
160impl<G> SinkRender<G> for MaterializedViewSinkConnection<CollectionMetadata>
161where
162 G: Scope<Timestamp = Timestamp>,
163{
164 fn render_sink(
165 &self,
166 compute_state: &mut ComputeState,
167 sink: &ComputeSinkDesc<CollectionMetadata>,
168 sink_id: GlobalId,
169 as_of: Antichain<Timestamp>,
170 start_signal: StartSignal,
171 mut ok_collection: VecCollection<G, Row, Diff>,
172 mut err_collection: VecCollection<G, DataflowError, Diff>,
173 _ct_times: Option<VecCollection<G, (), Diff>>,
174 output_probe: &Handle<Timestamp>,
175 ) -> Option<Rc<dyn Any>> {
176 // Attach probes reporting the compute frontier.
177 // The `apply_refresh` operator can round up frontiers, making it impossible to accurately
178 // track the progress of the computation, so we need to attach probes before it.
179 let probe = probe::Handle::default();
180 ok_collection = ok_collection
181 .probe_with(&probe)
182 .inner
183 .probe_notify_with(vec![output_probe.clone()])
184 .as_collection();
185 let collection_state = compute_state.expect_collection_mut(sink_id);
186 collection_state.compute_probe = Some(probe);
187
188 // If a `RefreshSchedule` was specified, round up timestamps.
189 if let Some(refresh_schedule) = &sink.refresh_schedule {
190 ok_collection = apply_refresh(ok_collection, refresh_schedule.clone());
191 err_collection = apply_refresh(err_collection, refresh_schedule.clone());
192 }
193
194 if sink.up_to != Antichain::default() {
195 unimplemented!(
196 "UP TO is not supported for persist sinks yet, and shouldn't have been accepted during parsing/planning"
197 )
198 }
199
200 let read_only_rx = collection_state.read_only_rx.clone();
201
202 let token = persist_sink(
203 sink_id,
204 &self.storage_metadata,
205 ok_collection,
206 err_collection,
207 as_of,
208 compute_state,
209 start_signal,
210 read_only_rx,
211 );
212 Some(token)
213 }
214}
215
216/// Type of the `desired` stream, split into `Ok` and `Err` streams.
217type DesiredStreams<S> =
218 OkErr<StreamVec<S, (Row, Timestamp, Diff)>, StreamVec<S, (DataflowError, Timestamp, Diff)>>;
219
220/// Type of the `persist` stream, split into `Ok` and `Err` streams.
221type PersistStreams<S> =
222 OkErr<StreamVec<S, (Row, Timestamp, Diff)>, StreamVec<S, (DataflowError, Timestamp, Diff)>>;
223
224/// Type of the `descs` stream.
225type DescsStream<S> = StreamVec<S, BatchDescription>;
226
227/// Type of the `batches` stream.
228type BatchesStream<S> = StreamVec<S, (BatchDescription, ProtoBatch)>;
229
230/// Type of the shared sink write frontier.
231type SharedSinkFrontier = Rc<RefCell<Antichain<Timestamp>>>;
232
233/// Renders an MV sink writing the given desired collection into the `target` persist collection.
234pub(super) fn persist_sink<S>(
235 sink_id: GlobalId,
236 target: &CollectionMetadata,
237 ok_collection: VecCollection<S, Row, Diff>,
238 err_collection: VecCollection<S, DataflowError, Diff>,
239 as_of: Antichain<Timestamp>,
240 compute_state: &mut ComputeState,
241 start_signal: StartSignal,
242 read_only_rx: watch::Receiver<bool>,
243) -> Rc<dyn Any>
244where
245 S: Scope<Timestamp = Timestamp>,
246{
247 let mut scope = ok_collection.scope();
248 let desired = OkErr::new(ok_collection.inner, err_collection.inner);
249
250 // Read back the persist shard.
251 let (persist, persist_token) = persist_source(
252 &mut scope,
253 sink_id,
254 target.clone(),
255 compute_state,
256 start_signal,
257 );
258
259 let persist_api = PersistApi {
260 persist_clients: Arc::clone(&compute_state.persist_clients),
261 collection: target.clone(),
262 shard_name: sink_id.to_string(),
263 purpose: format!("MV sink {sink_id}"),
264 };
265
266 let (desired, descs, sink_frontier, mint_token) = mint::render(
267 sink_id,
268 persist_api.clone(),
269 as_of.clone(),
270 read_only_rx,
271 desired,
272 );
273
274 let (batches, write_token) = write::render(
275 sink_id,
276 persist_api.clone(),
277 as_of,
278 desired,
279 persist,
280 descs.clone(),
281 Rc::clone(&compute_state.worker_config),
282 );
283
284 let append_token = append::render(sink_id, persist_api, descs, batches);
285
286 // Report sink frontier updates to the `ComputeState`.
287 let collection = compute_state.expect_collection_mut(sink_id);
288 collection.sink_write_frontier = Some(sink_frontier);
289
290 Rc::new((persist_token, mint_token, write_token, append_token))
291}
292
293/// Generic wrapper around ok/err pairs (e.g. streams, frontiers), to simplify code dealing with
294/// such pairs.
295struct OkErr<O, E> {
296 ok: O,
297 err: E,
298}
299
300impl<O, E> OkErr<O, E> {
301 fn new(ok: O, err: E) -> Self {
302 Self { ok, err }
303 }
304}
305
306impl OkErr<Antichain<Timestamp>, Antichain<Timestamp>> {
307 fn new_frontiers() -> Self {
308 Self {
309 ok: Antichain::from_elem(Timestamp::MIN),
310 err: Antichain::from_elem(Timestamp::MIN),
311 }
312 }
313
314 /// Return the overall frontier, i.e., the minimum of `ok` and `err`.
315 fn frontier(&self) -> &Antichain<Timestamp> {
316 if PartialOrder::less_equal(&self.ok, &self.err) {
317 &self.ok
318 } else {
319 &self.err
320 }
321 }
322}
323
324/// Advance the given `frontier` to `new`, if the latter one is greater.
325///
326/// Returns whether `frontier` was advanced.
327fn advance(frontier: &mut Antichain<Timestamp>, new: Antichain<Timestamp>) -> bool {
328 if PartialOrder::less_than(frontier, &new) {
329 *frontier = new;
330 true
331 } else {
332 false
333 }
334}
335
336/// A persist API specialized to a single collection.
337#[derive(Clone)]
338struct PersistApi {
339 persist_clients: Arc<PersistClientCache>,
340 collection: CollectionMetadata,
341 shard_name: String,
342 purpose: String,
343}
344
345impl PersistApi {
346 async fn open_client(&self) -> PersistClient {
347 self.persist_clients
348 .open(self.collection.persist_location.clone())
349 .await
350 .unwrap_or_else(|error| panic!("error opening persist client: {error}"))
351 }
352
353 async fn open_writer(&self) -> WriteHandle<SourceData, (), Timestamp, StorageDiff> {
354 self.open_client()
355 .await
356 .open_writer(
357 self.collection.data_shard,
358 Arc::new(self.collection.relation_desc.clone()),
359 Arc::new(UnitSchema),
360 Diagnostics {
361 shard_name: self.shard_name.clone(),
362 handle_purpose: self.purpose.clone(),
363 },
364 )
365 .await
366 .unwrap_or_else(|error| panic!("error opening persist writer: {error}"))
367 }
368
369 async fn open_metrics(&self) -> SinkMetrics {
370 let client = self.open_client().await;
371 client.metrics().sink.clone()
372 }
373}
374
375/// Instantiate a persist source reading back the `target` collection.
376fn persist_source<S>(
377 scope: &mut S,
378 sink_id: GlobalId,
379 target: CollectionMetadata,
380 compute_state: &ComputeState,
381 start_signal: StartSignal,
382) -> (PersistStreams<S>, Vec<PressOnDropButton>)
383where
384 S: Scope<Timestamp = Timestamp>,
385{
386 // There is no guarantee that the sink as-of is beyond the persist shard's since. If it isn't,
387 // instantiating a `persist_source` with it would panic. So instead we leave it to
388 // `persist_source` to select an appropriate as-of. We only care about times beyond the current
389 // shard upper anyway.
390 //
391 // TODO(teskje): Ideally we would select the as-of as `join(sink_as_of, since, upper)`, to
392 // allow `persist_source` to omit as much historical detail as possible. However, we don't know
393 // the shard frontiers and we cannot get them here as that requires an `async` context. We
394 // should consider extending the `persist_source` API to allow as-of selection based on the
395 // shard's current frontiers.
396 let as_of = None;
397
398 let until = Antichain::new();
399 let map_filter_project = None;
400
401 let (ok_stream, err_stream, token) = mz_storage_operators::persist_source::persist_source(
402 scope,
403 sink_id,
404 Arc::clone(&compute_state.persist_clients),
405 &compute_state.txns_ctx,
406 target,
407 None,
408 as_of,
409 SnapshotMode::Include,
410 until,
411 map_filter_project,
412 compute_state.dataflow_max_inflight_bytes(),
413 start_signal,
414 ErrorHandler::Halt("compute persist sink"),
415 );
416
417 let streams = OkErr::new(ok_stream, err_stream);
418 (streams, token)
419}
420
421/// A description for a batch of updates to be written.
422///
423/// Batch descriptions are produced by the `mint` operator and consumed by the `write` and `append`
424/// operators, where they inform which batches should be written or appended, respectively.
425///
426/// Each batch description also contains the index of its "append worker", i.e. the worker that is
427/// responsible for appending the written batches to the output shard.
428#[derive(Clone, Serialize, Deserialize)]
429struct BatchDescription {
430 lower: Antichain<Timestamp>,
431 upper: Antichain<Timestamp>,
432 append_worker: usize,
433}
434
435impl BatchDescription {
436 fn new(lower: Antichain<Timestamp>, upper: Antichain<Timestamp>, append_worker: usize) -> Self {
437 assert!(PartialOrder::less_than(&lower, &upper));
438 Self {
439 lower,
440 upper,
441 append_worker,
442 }
443 }
444}
445
446impl std::fmt::Debug for BatchDescription {
447 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
448 write!(
449 f,
450 "({:?}, {:?})@{}",
451 self.lower.elements(),
452 self.upper.elements(),
453 self.append_worker,
454 )
455 }
456}
457
458/// Construct a name for the given sub-operator.
459fn operator_name(sink_id: GlobalId, sub_operator: &str) -> String {
460 format!("mv_sink({sink_id})::{sub_operator}")
461}
462
463/// Implementation of the `mint` operator.
464mod mint {
465 use super::*;
466
467 /// Render the `mint` operator.
468 ///
469 /// The parameters passed in are:
470 /// * `sink_id`: The `GlobalId` of the sink export.
471 /// * `persist_api`: An object providing access to the output persist shard.
472 /// * `as_of`: The first time for which the sink may produce output.
473 /// * `read_only_tx`: A receiver that reports the sink is in read-only mode.
474 /// * `desired`: The ok/err streams that should be sinked to persist.
475 pub fn render<S>(
476 sink_id: GlobalId,
477 persist_api: PersistApi,
478 as_of: Antichain<Timestamp>,
479 mut read_only_rx: watch::Receiver<bool>,
480 desired: DesiredStreams<S>,
481 ) -> (
482 DesiredStreams<S>,
483 DescsStream<S>,
484 SharedSinkFrontier,
485 PressOnDropButton,
486 )
487 where
488 S: Scope<Timestamp = Timestamp>,
489 {
490 let scope = desired.ok.scope();
491 let worker_id = scope.index();
492 let worker_count = scope.peers();
493
494 // Determine the active worker for the mint operator.
495 let active_worker_id = usize::cast_from(sink_id.hashed()) % scope.peers();
496
497 let sink_frontier = Rc::new(RefCell::new(Antichain::from_elem(Timestamp::MIN)));
498 let shared_frontier = Rc::clone(&sink_frontier);
499
500 let name = operator_name(sink_id, "mint");
501 let mut op = OperatorBuilder::new(name, scope);
502
503 let (ok_output, ok_stream) = op.new_output::<CapacityContainerBuilder<_>>();
504 let (err_output, err_stream) = op.new_output::<CapacityContainerBuilder<_>>();
505 let desired_outputs = OkErr::new(ok_output, err_output);
506 let desired_output_streams = OkErr::new(ok_stream, err_stream);
507
508 let (desc_output, desc_output_stream) = op.new_output::<CapacityContainerBuilder<_>>();
509
510 let mut desired_inputs = OkErr {
511 ok: op.new_input_for(desired.ok, Pipeline, &desired_outputs.ok),
512 err: op.new_input_for(desired.err, Pipeline, &desired_outputs.err),
513 };
514
515 let button = op.build(move |capabilities| async move {
516 // Passing through the `desired` streams only requires data capabilities, so we can
517 // immediately drop their initial capabilities here.
518 let [_, _, desc_cap]: [_; 3] =
519 capabilities.try_into().expect("one capability per output");
520
521 // Non-active workers just pass the `desired` and `persist` data through.
522 if worker_id != active_worker_id {
523 drop(desc_cap);
524 shared_frontier.borrow_mut().clear();
525
526 loop {
527 tokio::select! {
528 Some(event) = desired_inputs.ok.next() => {
529 if let Event::Data(cap, mut data) = event {
530 desired_outputs.ok.give_container(&cap, &mut data);
531 }
532 }
533 Some(event) = desired_inputs.err.next() => {
534 if let Event::Data(cap, mut data) = event {
535 desired_outputs.err.give_container(&cap, &mut data);
536 }
537 }
538 // All inputs are exhausted, so we can shut down.
539 else => return,
540 }
541 }
542 }
543
544 let mut cap_set = CapabilitySet::from_elem(desc_cap);
545
546 let read_only = *read_only_rx.borrow_and_update();
547 let mut state = State::new(sink_id, worker_count, as_of, read_only);
548
549 // Create a stream that reports advancements of the target shard's frontier and updates
550 // the shared sink frontier.
551 //
552 // We collect the persist frontier from a write handle directly, rather than inspecting
553 // the `persist` stream, because the latter has two annoying glitches:
554 // (a) It starts at the shard's read frontier, not its write frontier.
555 // (b) It can lag behind if there are spikes in ingested data.
556 let mut persist_frontiers = pin!(async_stream::stream! {
557 let mut writer = persist_api.open_writer().await;
558 let mut frontier = Antichain::from_elem(Timestamp::MIN);
559 while !frontier.is_empty() {
560 writer.wait_for_upper_past(&frontier).await;
561 frontier = writer.upper().clone();
562 shared_frontier.borrow_mut().clone_from(&frontier);
563 yield frontier.clone();
564 }
565 });
566
567 loop {
568 // Read from the inputs, pass through all data to the respective outputs, and keep
569 // track of the input frontiers. When a frontier advances we might have to mint a
570 // new batch description.
571 let maybe_desc = tokio::select! {
572 Some(event) = desired_inputs.ok.next() => {
573 match event {
574 Event::Data(cap, mut data) => {
575 desired_outputs.ok.give_container(&cap, &mut data);
576 None
577 }
578 Event::Progress(frontier) => {
579 state.advance_desired_ok_frontier(frontier);
580 state.maybe_mint_batch_description()
581 }
582 }
583 }
584 Some(event) = desired_inputs.err.next() => {
585 match event {
586 Event::Data(cap, mut data) => {
587 desired_outputs.err.give_container(&cap, &mut data);
588 None
589 }
590 Event::Progress(frontier) => {
591 state.advance_desired_err_frontier(frontier);
592 state.maybe_mint_batch_description()
593 }
594 }
595 }
596 Some(frontier) = persist_frontiers.next() => {
597 state.advance_persist_frontier(frontier);
598 state.maybe_mint_batch_description()
599 }
600 Ok(()) = read_only_rx.changed(), if read_only => {
601 state.allow_writes();
602 state.maybe_mint_batch_description()
603 }
604 // All inputs are exhausted, so we can shut down.
605 else => return,
606 };
607
608 if let Some(desc) = maybe_desc {
609 let lower_ts = *desc.lower.as_option().expect("not empty");
610 let cap = cap_set.delayed(&lower_ts);
611 desc_output.give(&cap, desc);
612
613 // We only emit strictly increasing `lower`s, so we can let our output frontier
614 // advance beyond the current `lower`.
615 cap_set.downgrade([lower_ts.step_forward()]);
616 } else {
617 // The next emitted `lower` will be at least the `persist` frontier, so we can
618 // advance our output frontier as far.
619 let _ = cap_set.try_downgrade(state.persist_frontier.iter());
620 }
621 }
622 });
623
624 (
625 desired_output_streams,
626 desc_output_stream,
627 sink_frontier,
628 button.press_on_drop(),
629 )
630 }
631
632 /// State maintained by the `mint` operator.
633 struct State {
634 sink_id: GlobalId,
635 /// The number of workers in the Timely cluster.
636 worker_count: usize,
637 /// The frontiers of the `desired` inputs.
638 desired_frontiers: OkErr<Antichain<Timestamp>, Antichain<Timestamp>>,
639 /// The frontier of the target persist shard.
640 persist_frontier: Antichain<Timestamp>,
641 /// The append worker for the next batch description, chosen in round-robin fashion.
642 next_append_worker: usize,
643 /// The last `lower` we have emitted in a batch description, if any. Whenever the
644 /// `persist_frontier` moves beyond this frontier, we need to mint a new description.
645 last_lower: Option<Antichain<Timestamp>>,
646 /// Whether we are operating in read-only mode.
647 ///
648 /// In read-only mode, minting of batch descriptions is disabled.
649 read_only: bool,
650 }
651
652 impl State {
653 fn new(
654 sink_id: GlobalId,
655 worker_count: usize,
656 as_of: Antichain<Timestamp>,
657 read_only: bool,
658 ) -> Self {
659 // Initializing `persist_frontier` to the `as_of` ensures that the first minted batch
660 // description will have a `lower` of `as_of` or beyond, and thus that we don't spend
661 // work needlessly writing batches at previous times.
662 let persist_frontier = as_of;
663
664 Self {
665 sink_id,
666 worker_count,
667 desired_frontiers: OkErr::new_frontiers(),
668 persist_frontier,
669 next_append_worker: 0,
670 last_lower: None,
671 read_only,
672 }
673 }
674
675 fn trace<S: AsRef<str>>(&self, message: S) {
676 let message = message.as_ref();
677 trace!(
678 sink_id = %self.sink_id,
679 desired_frontier = ?self.desired_frontiers.frontier().elements(),
680 persist_frontier = ?self.persist_frontier.elements(),
681 last_lower = ?self.last_lower.as_ref().map(|f| f.elements()),
682 message,
683 );
684 }
685
686 fn advance_desired_ok_frontier(&mut self, frontier: Antichain<Timestamp>) {
687 if advance(&mut self.desired_frontiers.ok, frontier) {
688 self.trace("advanced `desired` ok frontier");
689 }
690 }
691
692 fn advance_desired_err_frontier(&mut self, frontier: Antichain<Timestamp>) {
693 if advance(&mut self.desired_frontiers.err, frontier) {
694 self.trace("advanced `desired` err frontier");
695 }
696 }
697
698 fn advance_persist_frontier(&mut self, frontier: Antichain<Timestamp>) {
699 if advance(&mut self.persist_frontier, frontier) {
700 self.trace("advanced `persist` frontier");
701 }
702 }
703
704 fn allow_writes(&mut self) {
705 if self.read_only {
706 self.read_only = false;
707 self.trace("disabled read-only mode");
708 }
709 }
710
711 fn maybe_mint_batch_description(&mut self) -> Option<BatchDescription> {
712 let desired_frontier = self.desired_frontiers.frontier();
713 let persist_frontier = &self.persist_frontier;
714
715 // We only mint new batch descriptions when:
716 // 1. We are _not_ in read-only mode.
717 // 2. The `desired` frontier is ahead of the `persist` frontier.
718 // 3. The `persist` frontier advanced since we last emitted a batch description.
719 let desired_ahead = PartialOrder::less_than(persist_frontier, desired_frontier);
720 let persist_advanced = self.last_lower.as_ref().map_or(true, |lower| {
721 PartialOrder::less_than(lower, persist_frontier)
722 });
723
724 if self.read_only || !desired_ahead || !persist_advanced {
725 return None;
726 }
727
728 let lower = persist_frontier.clone();
729 let upper = desired_frontier.clone();
730 let append_worker = self.next_append_worker;
731 let desc = BatchDescription::new(lower, upper, append_worker);
732
733 self.next_append_worker = (append_worker + 1) % self.worker_count;
734 self.last_lower = Some(desc.lower.clone());
735
736 self.trace(format!("minted batch description: {desc:?}"));
737 Some(desc)
738 }
739 }
740}
741
742/// Implementation of the `write` operator.
743mod write {
744 use super::*;
745
746 /// Render the `write` operator.
747 ///
748 /// The parameters passed in are:
749 /// * `sink_id`: The `GlobalId` of the sink export.
750 /// * `persist_api`: An object providing access to the output persist shard.
751 /// * `as_of`: The first time for which the sink may produce output.
752 /// * `desired`: The ok/err streams that should be sinked to persist.
753 /// * `persist`: The ok/err streams read back from the output persist shard.
754 /// * `descs`: The stream of batch descriptions produced by the `mint` operator.
755 pub fn render<S>(
756 sink_id: GlobalId,
757 persist_api: PersistApi,
758 as_of: Antichain<Timestamp>,
759 desired: DesiredStreams<S>,
760 persist: PersistStreams<S>,
761 descs: DescsStream<S>,
762 worker_config: Rc<ConfigSet>,
763 ) -> (BatchesStream<S>, PressOnDropButton)
764 where
765 S: Scope<Timestamp = Timestamp>,
766 {
767 let scope = desired.ok.scope();
768 let worker_id = scope.index();
769
770 let name = operator_name(sink_id, "write");
771 let mut op = OperatorBuilder::new(name, scope.clone());
772
773 let mut logging = None;
774 if let (Some(compute_logger), Some(differential_logger)) = (
775 scope.logger_for("materialize/compute"),
776 scope.logger_for("differential/arrange"),
777 ) {
778 let operator_info = op.operator_info();
779 logging = Some(Logging::new(
780 compute_logger,
781 differential_logger.into(),
782 operator_info.global_id,
783 operator_info.address.to_vec(),
784 ));
785 }
786
787 let (batches_output, batches_output_stream) =
788 op.new_output::<CapacityContainerBuilder<_>>();
789
790 // It is important that we exchange the `desired` and `persist` data the same way, so
791 // updates that cancel each other out end up on the same worker.
792 let exchange_ok = |(d, _, _): &(Row, Timestamp, Diff)| d.hashed();
793 let exchange_err = |(d, _, _): &(DataflowError, Timestamp, Diff)| d.hashed();
794
795 let mut desired_inputs = OkErr::new(
796 op.new_disconnected_input(desired.ok, Exchange::new(exchange_ok)),
797 op.new_disconnected_input(desired.err, Exchange::new(exchange_err)),
798 );
799 let mut persist_inputs = OkErr::new(
800 op.new_disconnected_input(persist.ok, Exchange::new(exchange_ok)),
801 op.new_disconnected_input(persist.err, Exchange::new(exchange_err)),
802 );
803 let mut descs_input = op.new_input_for(descs.broadcast(), Pipeline, &batches_output);
804
805 let button = op.build(move |capabilities| async move {
806 // We will use the data capabilities from the `descs` input to produce output, so no
807 // need to hold onto the initial capabilities.
808 drop(capabilities);
809
810 let writer = persist_api.open_writer().await;
811 let sink_metrics = persist_api.open_metrics().await;
812 let mut state = State::new(
813 sink_id,
814 worker_id,
815 writer,
816 sink_metrics,
817 logging,
818 as_of,
819 &worker_config,
820 );
821
822 loop {
823 // Read from the inputs, extract `desired` updates as positive contributions to
824 // `correction` and `persist` updates as negative contributions. If either the
825 // `desired` or `persist` frontier advances, or if we receive a new batch description,
826 // we might have to write a new batch.
827 let maybe_batch = tokio::select! {
828 Some(event) = desired_inputs.ok.next() => {
829 match event {
830 Event::Data(_cap, mut data) => {
831 state.corrections.ok.insert(&mut data);
832 None
833 }
834 Event::Progress(frontier) => {
835 state.advance_desired_ok_frontier(frontier);
836 state.maybe_write_batch().await
837 }
838 }
839 }
840 Some(event) = desired_inputs.err.next() => {
841 match event {
842 Event::Data(_cap, mut data) => {
843 state.corrections.err.insert(&mut data);
844 None
845 }
846 Event::Progress(frontier) => {
847 state.advance_desired_err_frontier(frontier);
848 state.maybe_write_batch().await
849 }
850 }
851 }
852 Some(event) = persist_inputs.ok.next() => {
853 match event {
854 Event::Data(_cap, mut data) => {
855 state.corrections.ok.insert_negated(&mut data);
856 None
857 }
858 Event::Progress(frontier) => {
859 state.advance_persist_ok_frontier(frontier);
860 state.maybe_write_batch().await
861 }
862 }
863 }
864 Some(event) = persist_inputs.err.next() => {
865 match event {
866 Event::Data(_cap, mut data) => {
867 state.corrections.err.insert_negated(&mut data);
868 None
869 }
870 Event::Progress(frontier) => {
871 state.advance_persist_err_frontier(frontier);
872 state.maybe_write_batch().await
873 }
874 }
875 }
876 Some(event) = descs_input.next() => {
877 match event {
878 Event::Data(cap, data) => {
879 for desc in data {
880 state.absorb_batch_description(desc, cap.clone());
881 }
882 state.maybe_write_batch().await
883 }
884 Event::Progress(_frontier) => None,
885 }
886 }
887 // All inputs are exhausted, so we can shut down.
888 else => return,
889 };
890
891 if let Some((index, batch, cap)) = maybe_batch {
892 batches_output.give(&cap, (index, batch));
893 }
894 }
895 });
896
897 (batches_output_stream, button.press_on_drop())
898 }
899
900 /// State maintained by the `write` operator.
901 struct State {
902 sink_id: GlobalId,
903 worker_id: usize,
904 persist_writer: WriteHandle<SourceData, (), Timestamp, StorageDiff>,
905 /// Contains `desired - persist`, reflecting the updates we would like to commit to
906 /// `persist` in order to "correct" it to track `desired`. This collection is only modified
907 /// by updates received from either the `desired` or `persist` inputs.
908 corrections: OkErr<Correction<Row>, Correction<DataflowError>>,
909 /// The frontiers of the `desired` inputs.
910 desired_frontiers: OkErr<Antichain<Timestamp>, Antichain<Timestamp>>,
911 /// The frontiers of the `persist` inputs.
912 ///
913 /// Note that this is _not_ the same as the write frontier of the output persist shard! It
914 /// usually is, but during snapshot processing, these frontiers will start at the shard's
915 /// read frontier, so they can be beyond its write frontier. This is important as it means
916 /// we must not discard batch descriptions based on these persist frontiers: A batch
917 /// description might still be valid even if its `lower` is before the persist frontiers we
918 /// observe.
919 persist_frontiers: OkErr<Antichain<Timestamp>, Antichain<Timestamp>>,
920 /// The current valid batch description and associated output capability, if any.
921 batch_description: Option<(BatchDescription, Capability<Timestamp>)>,
922 /// A request to force a consolidation of `corrections` once both `desired_frontiers` and
923 /// `persist_frontiers` become greater than the given frontier.
924 ///
925 /// Normally we force a consolidation whenever we write a batch, but there are periods
926 /// (like read-only mode) when that doesn't happen, and we need to manually force
927 /// consolidation instead. Currently this is only used to ensure we quickly get rid of the
928 /// snapshot updates.
929 force_consolidation_after: Option<Antichain<Timestamp>>,
930 }
931
932 impl State {
933 fn new(
934 sink_id: GlobalId,
935 worker_id: usize,
936 persist_writer: WriteHandle<SourceData, (), Timestamp, StorageDiff>,
937 metrics: SinkMetrics,
938 logging: Option<Logging>,
939 as_of: Antichain<Timestamp>,
940 worker_config: &ConfigSet,
941 ) -> Self {
942 let worker_metrics = metrics.for_worker(worker_id);
943
944 // Force a consolidation of `corrections` after the snapshot updates have been fully
945 // processed, to ensure we get rid of those as quickly as possible.
946 let force_consolidation_after = Some(as_of);
947
948 Self {
949 sink_id,
950 worker_id,
951 persist_writer,
952 corrections: OkErr::new(
953 Correction::new(
954 metrics.clone(),
955 worker_metrics.clone(),
956 logging.clone(),
957 worker_config,
958 ),
959 Correction::new(metrics, worker_metrics, logging, worker_config),
960 ),
961 desired_frontiers: OkErr::new_frontiers(),
962 persist_frontiers: OkErr::new_frontiers(),
963 batch_description: None,
964 force_consolidation_after,
965 }
966 }
967
968 fn trace<S: AsRef<str>>(&self, message: S) {
969 let message = message.as_ref();
970 trace!(
971 sink_id = %self.sink_id,
972 worker = %self.worker_id,
973 desired_frontier = ?self.desired_frontiers.frontier().elements(),
974 persist_frontier = ?self.persist_frontiers.frontier().elements(),
975 batch_description = ?self.batch_description.as_ref().map(|(d, _)| d),
976 message,
977 );
978 }
979
980 fn advance_desired_ok_frontier(&mut self, frontier: Antichain<Timestamp>) {
981 if advance(&mut self.desired_frontiers.ok, frontier) {
982 self.apply_desired_frontier_advancement();
983 self.trace("advanced `desired` ok frontier");
984 }
985 }
986
987 fn advance_desired_err_frontier(&mut self, frontier: Antichain<Timestamp>) {
988 if advance(&mut self.desired_frontiers.err, frontier) {
989 self.apply_desired_frontier_advancement();
990 self.trace("advanced `desired` err frontier");
991 }
992 }
993
994 fn advance_persist_ok_frontier(&mut self, frontier: Antichain<Timestamp>) {
995 if advance(&mut self.persist_frontiers.ok, frontier) {
996 self.apply_persist_frontier_advancement();
997 self.trace("advanced `persist` ok frontier");
998 }
999 }
1000
1001 fn advance_persist_err_frontier(&mut self, frontier: Antichain<Timestamp>) {
1002 if advance(&mut self.persist_frontiers.err, frontier) {
1003 self.apply_persist_frontier_advancement();
1004 self.trace("advanced `persist` err frontier");
1005 }
1006 }
1007
1008 /// Apply the effects of a previous `desired` frontier advancement.
1009 fn apply_desired_frontier_advancement(&mut self) {
1010 self.maybe_force_consolidation();
1011 }
1012
1013 /// Apply the effects of a previous `persist` frontier advancement.
1014 fn apply_persist_frontier_advancement(&mut self) {
1015 let frontier = self.persist_frontiers.frontier();
1016
1017 // We will only emit times at or after the `persist` frontier, so now is a good time to
1018 // advance the times of stashed updates.
1019 self.corrections.ok.advance_since(frontier.clone());
1020 self.corrections.err.advance_since(frontier.clone());
1021
1022 self.maybe_force_consolidation();
1023 }
1024
1025 /// If the current consolidation request has become applicable, apply it.
1026 fn maybe_force_consolidation(&mut self) {
1027 let Some(request) = &self.force_consolidation_after else {
1028 return;
1029 };
1030
1031 let desired_frontier = self.desired_frontiers.frontier();
1032 let persist_frontier = self.persist_frontiers.frontier();
1033 if PartialOrder::less_than(request, desired_frontier)
1034 && PartialOrder::less_than(request, persist_frontier)
1035 {
1036 self.trace("forcing correction consolidation");
1037 self.corrections.ok.consolidate_at_since();
1038 self.corrections.err.consolidate_at_since();
1039
1040 // Remove the consolidation request, now that we have fulfilled it.
1041 self.force_consolidation_after = None;
1042 }
1043 }
1044
1045 fn absorb_batch_description(&mut self, desc: BatchDescription, cap: Capability<Timestamp>) {
1046 // The incoming batch description is outdated if we already have a batch description
1047 // with a greater `lower`.
1048 //
1049 // Note that we cannot assume a description is outdated based on the comparison of its
1050 // `lower` with the `persist_frontier`. The persist frontier observed by the `write`
1051 // operator is initialized with the shard's read frontier, so it can be greater than
1052 // the shard's write frontier.
1053 if let Some((prev, _)) = &self.batch_description {
1054 if PartialOrder::less_than(&desc.lower, &prev.lower) {
1055 self.trace(format!("skipping outdated batch description: {desc:?}"));
1056 return;
1057 }
1058 }
1059
1060 self.batch_description = Some((desc, cap));
1061 self.trace("set batch description");
1062 }
1063
1064 async fn maybe_write_batch(
1065 &mut self,
1066 ) -> Option<(BatchDescription, ProtoBatch, Capability<Timestamp>)> {
1067 let (desc, _cap) = self.batch_description.as_ref()?;
1068
1069 // We can write a new batch if we have seen all `persist` updates before `lower` and
1070 // all `desired` updates up to `upper`.
1071 let persist_complete =
1072 PartialOrder::less_equal(&desc.lower, self.persist_frontiers.frontier());
1073 let desired_complete =
1074 PartialOrder::less_equal(&desc.upper, self.desired_frontiers.frontier());
1075 if !persist_complete || !desired_complete {
1076 return None;
1077 }
1078
1079 let (desc, cap) = self.batch_description.take()?;
1080
1081 let ok_updates = self.corrections.ok.updates_before(&desc.upper);
1082 let err_updates = self.corrections.err.updates_before(&desc.upper);
1083
1084 let oks = ok_updates.map(|(d, t, r)| ((SourceData(Ok(d)), ()), t, r.into_inner()));
1085 let errs = err_updates.map(|(d, t, r)| ((SourceData(Err(d)), ()), t, r.into_inner()));
1086 let mut updates = oks.chain(errs).peekable();
1087
1088 // Don't write empty batches.
1089 if updates.peek().is_none() {
1090 drop(updates);
1091 self.trace("skipping empty batch");
1092 return None;
1093 }
1094
1095 let batch = self
1096 .persist_writer
1097 .batch(updates, desc.lower.clone(), desc.upper.clone())
1098 .await
1099 .expect("valid usage")
1100 .into_transmittable_batch();
1101
1102 self.trace("wrote a batch");
1103 Some((desc, batch, cap))
1104 }
1105 }
1106}
1107
1108/// Implementation of the `append` operator.
1109mod append {
1110 use super::*;
1111
1112 /// Render the `append` operator.
1113 ///
1114 /// The parameters passed in are:
1115 /// * `sink_id`: The `GlobalId` of the sink export.
1116 /// * `persist_api`: An object providing access to the output persist shard.
1117 /// * `descs`: The stream of batch descriptions produced by the `mint` operator.
1118 /// * `batches`: The stream of written batches produced by the `write` operator.
1119 pub fn render<S>(
1120 sink_id: GlobalId,
1121 persist_api: PersistApi,
1122 descs: DescsStream<S>,
1123 batches: BatchesStream<S>,
1124 ) -> PressOnDropButton
1125 where
1126 S: Scope<Timestamp = Timestamp>,
1127 {
1128 let scope = descs.scope();
1129 let worker_id = scope.index();
1130
1131 let name = operator_name(sink_id, "append");
1132 let mut op = OperatorBuilder::new(name, scope);
1133
1134 // Broadcast batch descriptions to all workers, regardless of whether or not they are
1135 // responsible for the append, to give them a chance to clean up any outdated state they
1136 // might still hold.
1137 let mut descs_input = op.new_disconnected_input(descs.broadcast(), Pipeline);
1138 let mut batches_input = op.new_disconnected_input(
1139 batches,
1140 Exchange::new(move |(desc, _): &(BatchDescription, _)| {
1141 u64::cast_from(desc.append_worker)
1142 }),
1143 );
1144
1145 let button = op.build(move |_capabilities| async move {
1146 let writer = persist_api.open_writer().await;
1147 let mut state = State::new(sink_id, worker_id, writer);
1148
1149 loop {
1150 // Read from the inputs, absorb batch descriptions and batches. If the `batches`
1151 // frontier advances, or if we receive a new batch description, we might have to
1152 // append a new batch.
1153 tokio::select! {
1154 Some(event) = descs_input.next() => {
1155 if let Event::Data(_cap, data) = event {
1156 for desc in data {
1157 state.absorb_batch_description(desc).await;
1158 state.maybe_append_batches().await;
1159 }
1160 }
1161 }
1162 Some(event) = batches_input.next() => {
1163 match event {
1164 Event::Data(_cap, data) => {
1165 // The batch description is only used for routing and we ignore it
1166 // here since we already get one from `descs_input`.
1167 for (_desc, batch) in data {
1168 state.absorb_batch(batch).await;
1169 }
1170 }
1171 Event::Progress(frontier) => {
1172 state.advance_batches_frontier(frontier);
1173 state.maybe_append_batches().await;
1174 }
1175 }
1176 }
1177 // All inputs are exhausted, so we can shut down.
1178 else => return,
1179 }
1180 }
1181 });
1182
1183 button.press_on_drop()
1184 }
1185
1186 /// State maintained by the `append` operator.
1187 struct State {
1188 sink_id: GlobalId,
1189 worker_id: usize,
1190 persist_writer: WriteHandle<SourceData, (), Timestamp, StorageDiff>,
1191 /// The current input frontier of `batches`.
1192 batches_frontier: Antichain<Timestamp>,
1193 /// The greatest observed `lower` from both `descs` and `batches`.
1194 lower: Antichain<Timestamp>,
1195 /// The batch description for `lower`, if any.
1196 batch_description: Option<BatchDescription>,
1197 /// Batches received for `lower`.
1198 batches: Vec<Batch<SourceData, (), Timestamp, StorageDiff>>,
1199 }
1200
1201 impl State {
1202 fn new(
1203 sink_id: GlobalId,
1204 worker_id: usize,
1205 persist_writer: WriteHandle<SourceData, (), Timestamp, StorageDiff>,
1206 ) -> Self {
1207 Self {
1208 sink_id,
1209 worker_id,
1210 persist_writer,
1211 batches_frontier: Antichain::from_elem(Timestamp::MIN),
1212 lower: Antichain::from_elem(Timestamp::MIN),
1213 batch_description: None,
1214 batches: Default::default(),
1215 }
1216 }
1217
1218 fn trace<S: AsRef<str>>(&self, message: S) {
1219 let message = message.as_ref();
1220 trace!(
1221 sink_id = %self.sink_id,
1222 worker = %self.worker_id,
1223 batches_frontier = ?self.batches_frontier.elements(),
1224 lower = ?self.lower.elements(),
1225 batch_description = ?self.batch_description,
1226 message,
1227 );
1228 }
1229
1230 fn advance_batches_frontier(&mut self, frontier: Antichain<Timestamp>) {
1231 if advance(&mut self.batches_frontier, frontier) {
1232 self.trace("advanced `batches` frontier");
1233 }
1234 }
1235
1236 /// Advance the current `lower`.
1237 ///
1238 /// Discards all currently stashed batches and batch descriptions, assuming that they are
1239 /// now invalid.
1240 async fn advance_lower(&mut self, frontier: Antichain<Timestamp>) {
1241 assert!(PartialOrder::less_than(&self.lower, &frontier));
1242
1243 self.lower = frontier;
1244 self.batch_description = None;
1245
1246 // Remove stashed batches, cleaning up those we didn't append.
1247 for batch in self.batches.drain(..) {
1248 batch.delete().await;
1249 }
1250
1251 self.trace("advanced `lower`");
1252 }
1253
1254 /// Absorb the given batch description into the state, provided it is not outdated.
1255 async fn absorb_batch_description(&mut self, desc: BatchDescription) {
1256 if PartialOrder::less_than(&self.lower, &desc.lower) {
1257 self.advance_lower(desc.lower.clone()).await;
1258 } else if &self.lower != &desc.lower {
1259 self.trace(format!("skipping outdated batch description: {desc:?}"));
1260 return;
1261 }
1262
1263 if desc.append_worker == self.worker_id {
1264 self.batch_description = Some(desc);
1265 self.trace("set batch description");
1266 }
1267 }
1268
1269 /// Absorb the given batch into the state, provided it is not outdated.
1270 async fn absorb_batch(&mut self, batch: ProtoBatch) {
1271 let batch = self.persist_writer.batch_from_transmittable_batch(batch);
1272 if PartialOrder::less_than(&self.lower, batch.lower()) {
1273 self.advance_lower(batch.lower().clone()).await;
1274 } else if &self.lower != batch.lower() {
1275 self.trace(format!(
1276 "skipping outdated batch: ({:?}, {:?})",
1277 batch.lower().elements(),
1278 batch.upper().elements(),
1279 ));
1280
1281 // Ensure the batch's data gets properly cleaned up before dropping it.
1282 batch.delete().await;
1283 return;
1284 }
1285
1286 self.batches.push(batch);
1287 self.trace("absorbed a batch");
1288 }
1289
1290 async fn maybe_append_batches(&mut self) {
1291 let batches_complete = PartialOrder::less_than(&self.lower, &self.batches_frontier);
1292 if !batches_complete {
1293 return;
1294 }
1295
1296 let Some(desc) = self.batch_description.take() else {
1297 return;
1298 };
1299
1300 let new_lower = match self.append_batches(desc).await {
1301 Ok(shard_upper) => {
1302 self.trace("appended a batch");
1303 shard_upper
1304 }
1305 Err(shard_upper) => {
1306 // Failing the append is expected in the presence of concurrent replicas. There
1307 // is nothing special to do here: The self-correcting feedback mechanism
1308 // ensures that we observe the concurrent changes, compute their consequences,
1309 // and append them at a future time.
1310 self.trace(format!(
1311 "append failed due to `lower` mismatch: {:?}",
1312 shard_upper.elements(),
1313 ));
1314 shard_upper
1315 }
1316 };
1317
1318 self.advance_lower(new_lower).await;
1319 }
1320
1321 /// Append the current `batches` to the output shard.
1322 ///
1323 /// Returns whether the append was successful or not, and the current shard upper in either
1324 /// case.
1325 ///
1326 /// This method advances the shard upper to the batch `lower` if necessary. This is the
1327 /// mechanism that brings the shard upper to the sink as-of when appending the initial
1328 /// batch.
1329 ///
1330 /// An alternative mechanism for bringing the shard upper to the sink as-of would be making
1331 /// a single append at operator startup. The reason we are doing it here instead is that it
1332 /// simplifies the implementation of read-only mode. In read-only mode we have to defer any
1333 /// persist writes, including the initial upper bump. Having only a single place that
1334 /// performs writes makes it easy to ensure we are doing that correctly.
1335 async fn append_batches(
1336 &mut self,
1337 desc: BatchDescription,
1338 ) -> Result<Antichain<Timestamp>, Antichain<Timestamp>> {
1339 let (lower, upper) = (desc.lower, desc.upper);
1340 let mut to_append: Vec<_> = self.batches.iter_mut().collect();
1341
1342 loop {
1343 let result = self
1344 .persist_writer
1345 .compare_and_append_batch(&mut to_append, lower.clone(), upper.clone(), true)
1346 .await
1347 .expect("valid usage");
1348
1349 match result {
1350 Ok(()) => return Ok(upper),
1351 Err(mismatch) if PartialOrder::less_than(&mismatch.current, &lower) => {
1352 advance_shard_upper(&mut self.persist_writer, lower.clone()).await;
1353
1354 // At this point the shard's since and upper are likely the same, a state
1355 // that is likely to hit edge-cases in logic reasoning about frontiers.
1356 fail::fail_point!("mv_advanced_upper");
1357 }
1358 Err(mismatch) => return Err(mismatch.current),
1359 }
1360 }
1361 }
1362 }
1363
1364 /// Advance the frontier of the given writer's shard to at least the given `upper`.
1365 async fn advance_shard_upper(
1366 persist_writer: &mut WriteHandle<SourceData, (), Timestamp, StorageDiff>,
1367 upper: Antichain<Timestamp>,
1368 ) {
1369 let empty_updates: &[((SourceData, ()), Timestamp, StorageDiff)] = &[];
1370 let lower = Antichain::from_elem(Timestamp::MIN);
1371 persist_writer
1372 .append(empty_updates, lower, upper)
1373 .await
1374 .expect("valid usage")
1375 .expect("should always succeed");
1376 }
1377}