mz_compute/sink/materialized_view.rs
1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! A dataflow sink that writes input records to a persist shard.
11//!
12//! This implementation is both parallel and self-correcting.
13//!
14//! * parallel: Multiple workers can participate in writing updates for the same times, letting
15//! sink throughput scale with the number of workers allocated to the replica.
16//! * self-correcting: The sink continually compares the contents of the persist shard with the
17//! contents of the input collection and writes down the difference. If the persist shard ends
18//! up with undesired contents for any reason, this is corrected the next time the sink manages
19//! to append to the shard.
20//!
21//! ### Operators
22//!
23//! The persist sink consists of a graph of operators.
24//!
25//! desired persist <---------------.
26//! | | |
27//! | | |
28//! |---------------------. | |
29//! | | | |
30//! | | | |
31//! v v v |
32//! +--------+ +--------+ +--------+
33//! | mint | --descs-.--> | write | --batches--> | append |
34//! +--------+ \ +--------+ .-> +--------+
35//! \_____________________/
36//!
37//! * `mint` mints batch descriptions, i.e., `(lower, upper)` bounds of batches that should be
38//! written. The persist API requires that all workers write batches with the same bounds, so
39//! they can be appended as a single logical batch. To ensure this, the `mint` operator only
40//! runs on a single worker that broadcasts minted descriptions to all workers. Batch bounds are
41//! picked based on the frontiers of the `desired` stream and the output persist shard.
42//! * `write` stages batch data in persist, based on the batch descriptions received from the
43//! `mint` operator, but without appending it to the persist shard. This is a multi-worker
44//! operator, with each worker writing batches of the data that arrives at its local inputs. To
45//! do so it reads from the `desired` and `persist` streams and produces the difference between
46//! them to write back out, ensuring that the final contents of the persist shard match
47//! `desired`.
48//! * `append` appends the batches minted by `mint` and written by `write` to the persist shard.
49//! This is a multi-worker operator, where workers are responsible for different subsets of
50//! batch descriptions. If a worker is responsible for a given batch description, it waits for
51//! all workers to stage their batches for that batch description, then appends all the batches
52//! together as a single logical batch.
53//!
54//! Note that while the above graph suggests that `mint` and `write` both receive copies of the
55//! `desired` stream, the actual implementation passes that stream through `mint` and lets `write`
56//! read the passed-through stream, to avoid cloning data.
57//!
58//! Also note that the `append` operator's implementation would perhaps be more natural as a
59//! single-worker implementation. The purpose of sharing the work between all workers is to avoid a
60//! work imbalance where one worker is overloaded (doing both appends and the consequent persist
61//! maintenance work) while others are comparatively idle.
62//!
63//! The persist sink is written to be robust to the presence of other conflicting instances (e.g.
64//! from other replicas) writing to the same persist shard. Each of the three operators needs to be
65//! able to handle conflicting writes that unexpectedly change the contents of the output persist
66//! shard.
67//!
68//! ### Frontiers
69//!
70//! The `desired` frontier tracks the progress of the upstream dataflow, but may be rounded up to
71//! the next refresh time for dataflows that follow a refresh schedule other than "on commit".
72//!
73//! The `persist` frontier tracks the `upper` frontier of the target persist shard, with one
74//! exception: When the `persist_source` that reads back the shard is rendered, it will start
75//! reading at its `since` frontier. So if the shard's `since` is initially greater than its
76//! `upper`, the `persist` frontier too will be in advance of the shard `upper`, until the `upper`
77//! has caught up. To avoid getting confused by this edge case, the `mint` operator does not use
78//! the `persist` stream to observe the shard frontier but keeps its own `WriteHandle` instead.
79//!
80//! The `descs` frontier communicates which `lower` bounds may still be emitted in batch
81//! descriptions. All future batch descriptions will have a `lower` that is greater or equal to the
82//! current `descs` frontier.
83//!
84//! The `batches` frontier communicates for which `lower` bounds batches may still be written. All
85//! batches for descriptions with `lower`s less than the current `batches` frontier have already
86//! been written.
87//!
88//! ### Invariants
89//!
90//! The implementation upholds several invariants that can be relied upon to simplify the
91//! implementation:
92//!
93//! 1. `lower`s in minted batch descriptions are unique and strictly increasing. That is, the
94//! `mint` operator will never mint the same `lower` twice and a minted `lower` is always
95//! greater than any previously minted ones.
96//! 2. `upper`s in minted batch descriptions are monotonically increasing.
97//! 3. From (1) follows that there is always at most one "valid" batch description in flight in
98//! the operator graph. "Valid" here means that the described batch can be appended to the
99//! persist shard.
100//!
101//! The main simplification these invariants allow is that operators only need to keep track of the
102//! most recent batch description and/or `lower`. Previous batch descriptions are not valid
103//! anymore, so there is no reason to hold any state or perform any work in support of them.
104//!
105//! ### Read-only Mode
106//!
107//! The persist sink can optionally be initialized in read-only mode. In this mode it is passive
108//! and avoids any writes to persist. Activating the `read_only_rx` transitions the sink into write
109//! mode, where it commences normal operation.
110//!
111//! Read-only mode is implemented by the `mint` operator. To disable writes, the `mint` operator
112//! simply avoids minting any batch descriptions. Since both the `write` and the `append` operator
113//! require batch descriptions to write/append batches, this suppresses any persist communication.
114//! At the same time, the `write` operator still observes changes to the `desired` and `persist`
115//! collections, allowing it to keep its correction buffer up-to-date.
116
117use std::any::Any;
118use std::cell::RefCell;
119use std::pin::pin;
120use std::rc::Rc;
121use std::sync::Arc;
122
123use differential_dataflow::{AsCollection, Hashable, VecCollection};
124use futures::StreamExt;
125use mz_compute_types::sinks::{ComputeSinkDesc, MaterializedViewSinkConnection};
126use mz_dyncfg::ConfigSet;
127use mz_ore::cast::CastFrom;
128use mz_persist_client::batch::{Batch, ProtoBatch};
129use mz_persist_client::cache::PersistClientCache;
130use mz_persist_client::metrics::SinkMetrics;
131use mz_persist_client::operators::shard_source::{ErrorHandler, SnapshotMode};
132use mz_persist_client::write::WriteHandle;
133use mz_persist_client::{Diagnostics, PersistClient};
134use mz_persist_types::codec_impls::UnitSchema;
135use mz_repr::{Diff, GlobalId, Row, Timestamp};
136use mz_storage_types::StorageDiff;
137use mz_storage_types::controller::CollectionMetadata;
138use mz_storage_types::errors::DataflowError;
139use mz_storage_types::sources::SourceData;
140use mz_timely_util::builder_async::PressOnDropButton;
141use mz_timely_util::builder_async::{Event, OperatorBuilder};
142use mz_timely_util::probe::{Handle, ProbeNotify};
143use serde::{Deserialize, Serialize};
144use timely::PartialOrder;
145use timely::container::CapacityContainerBuilder;
146use timely::dataflow::channels::pact::{Exchange, Pipeline};
147use timely::dataflow::operators::{Broadcast, Capability, CapabilitySet, probe};
148use timely::dataflow::{Scope, Stream};
149use timely::progress::Antichain;
150use tokio::sync::watch;
151use tracing::trace;
152
153use crate::compute_state::ComputeState;
154use crate::render::StartSignal;
155use crate::render::sinks::SinkRender;
156use crate::sink::correction::Correction;
157use crate::sink::refresh::apply_refresh;
158
159impl<G> SinkRender<G> for MaterializedViewSinkConnection<CollectionMetadata>
160where
161 G: Scope<Timestamp = Timestamp>,
162{
163 fn render_sink(
164 &self,
165 compute_state: &mut ComputeState,
166 sink: &ComputeSinkDesc<CollectionMetadata>,
167 sink_id: GlobalId,
168 as_of: Antichain<Timestamp>,
169 start_signal: StartSignal,
170 mut ok_collection: VecCollection<G, Row, Diff>,
171 mut err_collection: VecCollection<G, DataflowError, Diff>,
172 _ct_times: Option<VecCollection<G, (), Diff>>,
173 output_probe: &Handle<Timestamp>,
174 ) -> Option<Rc<dyn Any>> {
175 // Attach probes reporting the compute frontier.
176 // The `apply_refresh` operator can round up frontiers, making it impossible to accurately
177 // track the progress of the computation, so we need to attach probes before it.
178 let probe = probe::Handle::default();
179 ok_collection = ok_collection
180 .probe_with(&probe)
181 .inner
182 .probe_notify_with(vec![output_probe.clone()])
183 .as_collection();
184 let collection_state = compute_state.expect_collection_mut(sink_id);
185 collection_state.compute_probe = Some(probe);
186
187 // If a `RefreshSchedule` was specified, round up timestamps.
188 if let Some(refresh_schedule) = &sink.refresh_schedule {
189 ok_collection = apply_refresh(ok_collection, refresh_schedule.clone());
190 err_collection = apply_refresh(err_collection, refresh_schedule.clone());
191 }
192
193 if sink.up_to != Antichain::default() {
194 unimplemented!(
195 "UP TO is not supported for persist sinks yet, and shouldn't have been accepted during parsing/planning"
196 )
197 }
198
199 let read_only_rx = collection_state.read_only_rx.clone();
200
201 let token = persist_sink(
202 sink_id,
203 &self.storage_metadata,
204 ok_collection,
205 err_collection,
206 as_of,
207 compute_state,
208 start_signal,
209 read_only_rx,
210 );
211 Some(token)
212 }
213}
214
215/// Type of the `desired` stream, split into `Ok` and `Err` streams.
216type DesiredStreams<S> =
217 OkErr<Stream<S, (Row, Timestamp, Diff)>, Stream<S, (DataflowError, Timestamp, Diff)>>;
218
219/// Type of the `persist` stream, split into `Ok` and `Err` streams.
220type PersistStreams<S> =
221 OkErr<Stream<S, (Row, Timestamp, Diff)>, Stream<S, (DataflowError, Timestamp, Diff)>>;
222
223/// Type of the `descs` stream.
224type DescsStream<S> = Stream<S, BatchDescription>;
225
226/// Type of the `batches` stream.
227type BatchesStream<S> = Stream<S, (BatchDescription, ProtoBatch)>;
228
229/// Type of the shared sink write frontier.
230type SharedSinkFrontier = Rc<RefCell<Antichain<Timestamp>>>;
231
232/// Renders an MV sink writing the given desired collection into the `target` persist collection.
233pub(super) fn persist_sink<S>(
234 sink_id: GlobalId,
235 target: &CollectionMetadata,
236 ok_collection: VecCollection<S, Row, Diff>,
237 err_collection: VecCollection<S, DataflowError, Diff>,
238 as_of: Antichain<Timestamp>,
239 compute_state: &mut ComputeState,
240 start_signal: StartSignal,
241 read_only_rx: watch::Receiver<bool>,
242) -> Rc<dyn Any>
243where
244 S: Scope<Timestamp = Timestamp>,
245{
246 let mut scope = ok_collection.scope();
247 let desired = OkErr::new(ok_collection.inner, err_collection.inner);
248
249 // Read back the persist shard.
250 let (persist, persist_token) = persist_source(
251 &mut scope,
252 sink_id,
253 target.clone(),
254 compute_state,
255 start_signal,
256 );
257
258 let persist_api = PersistApi {
259 persist_clients: Arc::clone(&compute_state.persist_clients),
260 collection: target.clone(),
261 shard_name: sink_id.to_string(),
262 purpose: format!("MV sink {sink_id}"),
263 };
264
265 let (desired, descs, sink_frontier, mint_token) = mint::render(
266 sink_id,
267 persist_api.clone(),
268 as_of.clone(),
269 read_only_rx,
270 &desired,
271 );
272
273 let (batches, write_token) = write::render(
274 sink_id,
275 persist_api.clone(),
276 as_of,
277 &desired,
278 &persist,
279 &descs,
280 Rc::clone(&compute_state.worker_config),
281 );
282
283 let append_token = append::render(sink_id, persist_api, &descs, &batches);
284
285 // Report sink frontier updates to the `ComputeState`.
286 let collection = compute_state.expect_collection_mut(sink_id);
287 collection.sink_write_frontier = Some(sink_frontier);
288
289 Rc::new((persist_token, mint_token, write_token, append_token))
290}
291
292/// Generic wrapper around ok/err pairs (e.g. streams, frontiers), to simplify code dealing with
293/// such pairs.
294struct OkErr<O, E> {
295 ok: O,
296 err: E,
297}
298
299impl<O, E> OkErr<O, E> {
300 fn new(ok: O, err: E) -> Self {
301 Self { ok, err }
302 }
303}
304
305impl OkErr<Antichain<Timestamp>, Antichain<Timestamp>> {
306 fn new_frontiers() -> Self {
307 Self {
308 ok: Antichain::from_elem(Timestamp::MIN),
309 err: Antichain::from_elem(Timestamp::MIN),
310 }
311 }
312
313 /// Return the overall frontier, i.e., the minimum of `ok` and `err`.
314 fn frontier(&self) -> &Antichain<Timestamp> {
315 if PartialOrder::less_equal(&self.ok, &self.err) {
316 &self.ok
317 } else {
318 &self.err
319 }
320 }
321}
322
323/// Advance the given `frontier` to `new`, if the latter one is greater.
324///
325/// Returns whether `frontier` was advanced.
326fn advance(frontier: &mut Antichain<Timestamp>, new: Antichain<Timestamp>) -> bool {
327 if PartialOrder::less_than(frontier, &new) {
328 *frontier = new;
329 true
330 } else {
331 false
332 }
333}
334
335/// A persist API specialized to a single collection.
336#[derive(Clone)]
337struct PersistApi {
338 persist_clients: Arc<PersistClientCache>,
339 collection: CollectionMetadata,
340 shard_name: String,
341 purpose: String,
342}
343
344impl PersistApi {
345 async fn open_client(&self) -> PersistClient {
346 self.persist_clients
347 .open(self.collection.persist_location.clone())
348 .await
349 .unwrap_or_else(|error| panic!("error opening persist client: {error}"))
350 }
351
352 async fn open_writer(&self) -> WriteHandle<SourceData, (), Timestamp, StorageDiff> {
353 self.open_client()
354 .await
355 .open_writer(
356 self.collection.data_shard,
357 Arc::new(self.collection.relation_desc.clone()),
358 Arc::new(UnitSchema),
359 Diagnostics {
360 shard_name: self.shard_name.clone(),
361 handle_purpose: self.purpose.clone(),
362 },
363 )
364 .await
365 .unwrap_or_else(|error| panic!("error opening persist writer: {error}"))
366 }
367
368 async fn open_metrics(&self) -> SinkMetrics {
369 let client = self.open_client().await;
370 client.metrics().sink.clone()
371 }
372}
373
374/// Instantiate a persist source reading back the `target` collection.
375fn persist_source<S>(
376 scope: &mut S,
377 sink_id: GlobalId,
378 target: CollectionMetadata,
379 compute_state: &ComputeState,
380 start_signal: StartSignal,
381) -> (PersistStreams<S>, Vec<PressOnDropButton>)
382where
383 S: Scope<Timestamp = Timestamp>,
384{
385 // There is no guarantee that the sink as-of is beyond the persist shard's since. If it isn't,
386 // instantiating a `persist_source` with it would panic. So instead we leave it to
387 // `persist_source` to select an appropriate as-of. We only care about times beyond the current
388 // shard upper anyway.
389 //
390 // TODO(teskje): Ideally we would select the as-of as `join(sink_as_of, since, upper)`, to
391 // allow `persist_source` to omit as much historical detail as possible. However, we don't know
392 // the shard frontiers and we cannot get them here as that requires an `async` context. We
393 // should consider extending the `persist_source` API to allow as-of selection based on the
394 // shard's current frontiers.
395 let as_of = None;
396
397 let until = Antichain::new();
398 let map_filter_project = None;
399
400 let (ok_stream, err_stream, token) = mz_storage_operators::persist_source::persist_source(
401 scope,
402 sink_id,
403 Arc::clone(&compute_state.persist_clients),
404 &compute_state.txns_ctx,
405 target,
406 None,
407 as_of,
408 SnapshotMode::Include,
409 until,
410 map_filter_project,
411 compute_state.dataflow_max_inflight_bytes(),
412 start_signal,
413 ErrorHandler::Halt("compute persist sink"),
414 );
415
416 let streams = OkErr::new(ok_stream, err_stream);
417 (streams, token)
418}
419
420/// A description for a batch of updates to be written.
421///
422/// Batch descriptions are produced by the `mint` operator and consumed by the `write` and `append`
423/// operators, where they inform which batches should be written or appended, respectively.
424///
425/// Each batch description also contains the index of its "append worker", i.e. the worker that is
426/// responsible for appending the written batches to the output shard.
427#[derive(Clone, Serialize, Deserialize)]
428struct BatchDescription {
429 lower: Antichain<Timestamp>,
430 upper: Antichain<Timestamp>,
431 append_worker: usize,
432}
433
434impl BatchDescription {
435 fn new(lower: Antichain<Timestamp>, upper: Antichain<Timestamp>, append_worker: usize) -> Self {
436 assert!(PartialOrder::less_than(&lower, &upper));
437 Self {
438 lower,
439 upper,
440 append_worker,
441 }
442 }
443}
444
445impl std::fmt::Debug for BatchDescription {
446 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
447 write!(
448 f,
449 "({:?}, {:?})@{}",
450 self.lower.elements(),
451 self.upper.elements(),
452 self.append_worker,
453 )
454 }
455}
456
457/// Construct a name for the given sub-operator.
458fn operator_name(sink_id: GlobalId, sub_operator: &str) -> String {
459 format!("mv_sink({sink_id})::{sub_operator}")
460}
461
462/// Implementation of the `mint` operator.
463mod mint {
464 use super::*;
465
466 /// Render the `mint` operator.
467 ///
468 /// The parameters passed in are:
469 /// * `sink_id`: The `GlobalId` of the sink export.
470 /// * `persist_api`: An object providing access to the output persist shard.
471 /// * `as_of`: The first time for which the sink may produce output.
472 /// * `read_only_tx`: A receiver that reports the sink is in read-only mode.
473 /// * `desired`: The ok/err streams that should be sinked to persist.
474 pub fn render<S>(
475 sink_id: GlobalId,
476 persist_api: PersistApi,
477 as_of: Antichain<Timestamp>,
478 mut read_only_rx: watch::Receiver<bool>,
479 desired: &DesiredStreams<S>,
480 ) -> (
481 DesiredStreams<S>,
482 DescsStream<S>,
483 SharedSinkFrontier,
484 PressOnDropButton,
485 )
486 where
487 S: Scope<Timestamp = Timestamp>,
488 {
489 let scope = desired.ok.scope();
490 let worker_id = scope.index();
491 let worker_count = scope.peers();
492
493 // Determine the active worker for the mint operator.
494 let active_worker_id = usize::cast_from(sink_id.hashed()) % scope.peers();
495
496 let sink_frontier = Rc::new(RefCell::new(Antichain::from_elem(Timestamp::MIN)));
497 let shared_frontier = Rc::clone(&sink_frontier);
498
499 let name = operator_name(sink_id, "mint");
500 let mut op = OperatorBuilder::new(name, scope);
501
502 let (ok_output, ok_stream) = op.new_output::<CapacityContainerBuilder<_>>();
503 let (err_output, err_stream) = op.new_output::<CapacityContainerBuilder<_>>();
504 let desired_outputs = OkErr::new(ok_output, err_output);
505 let desired_output_streams = OkErr::new(ok_stream, err_stream);
506
507 let (desc_output, desc_output_stream) = op.new_output::<CapacityContainerBuilder<_>>();
508
509 let mut desired_inputs = OkErr {
510 ok: op.new_input_for(&desired.ok, Pipeline, &desired_outputs.ok),
511 err: op.new_input_for(&desired.err, Pipeline, &desired_outputs.err),
512 };
513
514 let button = op.build(move |capabilities| async move {
515 // Passing through the `desired` streams only requires data capabilities, so we can
516 // immediately drop their initial capabilities here.
517 let [_, _, desc_cap]: [_; 3] =
518 capabilities.try_into().expect("one capability per output");
519
520 // Non-active workers just pass the `desired` and `persist` data through.
521 if worker_id != active_worker_id {
522 drop(desc_cap);
523 shared_frontier.borrow_mut().clear();
524
525 loop {
526 tokio::select! {
527 Some(event) = desired_inputs.ok.next() => {
528 if let Event::Data(cap, mut data) = event {
529 desired_outputs.ok.give_container(&cap, &mut data);
530 }
531 }
532 Some(event) = desired_inputs.err.next() => {
533 if let Event::Data(cap, mut data) = event {
534 desired_outputs.err.give_container(&cap, &mut data);
535 }
536 }
537 // All inputs are exhausted, so we can shut down.
538 else => return,
539 }
540 }
541 }
542
543 let mut cap_set = CapabilitySet::from_elem(desc_cap);
544
545 let read_only = *read_only_rx.borrow_and_update();
546 let mut state = State::new(sink_id, worker_count, as_of, read_only);
547
548 // Create a stream that reports advancements of the target shard's frontier and updates
549 // the shared sink frontier.
550 //
551 // We collect the persist frontier from a write handle directly, rather than inspecting
552 // the `persist` stream, because the latter has two annoying glitches:
553 // (a) It starts at the shard's read frontier, not its write frontier.
554 // (b) It can lag behind if there are spikes in ingested data.
555 let mut persist_frontiers = pin!(async_stream::stream! {
556 let mut writer = persist_api.open_writer().await;
557 let mut frontier = Antichain::from_elem(Timestamp::MIN);
558 while !frontier.is_empty() {
559 writer.wait_for_upper_past(&frontier).await;
560 frontier = writer.upper().clone();
561 shared_frontier.borrow_mut().clone_from(&frontier);
562 yield frontier.clone();
563 }
564 });
565
566 loop {
567 // Read from the inputs, pass through all data to the respective outputs, and keep
568 // track of the input frontiers. When a frontier advances we might have to mint a
569 // new batch description.
570 let maybe_desc = tokio::select! {
571 Some(event) = desired_inputs.ok.next() => {
572 match event {
573 Event::Data(cap, mut data) => {
574 desired_outputs.ok.give_container(&cap, &mut data);
575 None
576 }
577 Event::Progress(frontier) => {
578 state.advance_desired_ok_frontier(frontier);
579 state.maybe_mint_batch_description()
580 }
581 }
582 }
583 Some(event) = desired_inputs.err.next() => {
584 match event {
585 Event::Data(cap, mut data) => {
586 desired_outputs.err.give_container(&cap, &mut data);
587 None
588 }
589 Event::Progress(frontier) => {
590 state.advance_desired_err_frontier(frontier);
591 state.maybe_mint_batch_description()
592 }
593 }
594 }
595 Some(frontier) = persist_frontiers.next() => {
596 state.advance_persist_frontier(frontier);
597 state.maybe_mint_batch_description()
598 }
599 Ok(()) = read_only_rx.changed(), if read_only => {
600 state.allow_writes();
601 state.maybe_mint_batch_description()
602 }
603 // All inputs are exhausted, so we can shut down.
604 else => return,
605 };
606
607 if let Some(desc) = maybe_desc {
608 let lower_ts = *desc.lower.as_option().expect("not empty");
609 let cap = cap_set.delayed(&lower_ts);
610 desc_output.give(&cap, desc);
611
612 // We only emit strictly increasing `lower`s, so we can let our output frontier
613 // advance beyond the current `lower`.
614 cap_set.downgrade([lower_ts.step_forward()]);
615 } else {
616 // The next emitted `lower` will be at least the `persist` frontier, so we can
617 // advance our output frontier as far.
618 let _ = cap_set.try_downgrade(state.persist_frontier.iter());
619 }
620 }
621 });
622
623 (
624 desired_output_streams,
625 desc_output_stream,
626 sink_frontier,
627 button.press_on_drop(),
628 )
629 }
630
631 /// State maintained by the `mint` operator.
632 struct State {
633 sink_id: GlobalId,
634 /// The number of workers in the Timely cluster.
635 worker_count: usize,
636 /// The frontiers of the `desired` inputs.
637 desired_frontiers: OkErr<Antichain<Timestamp>, Antichain<Timestamp>>,
638 /// The frontier of the target persist shard.
639 persist_frontier: Antichain<Timestamp>,
640 /// The append worker for the next batch description, chosen in round-robin fashion.
641 next_append_worker: usize,
642 /// The last `lower` we have emitted in a batch description, if any. Whenever the
643 /// `persist_frontier` moves beyond this frontier, we need to mint a new description.
644 last_lower: Option<Antichain<Timestamp>>,
645 /// Whether we are operating in read-only mode.
646 ///
647 /// In read-only mode, minting of batch descriptions is disabled.
648 read_only: bool,
649 }
650
651 impl State {
652 fn new(
653 sink_id: GlobalId,
654 worker_count: usize,
655 as_of: Antichain<Timestamp>,
656 read_only: bool,
657 ) -> Self {
658 // Initializing `persist_frontier` to the `as_of` ensures that the first minted batch
659 // description will have a `lower` of `as_of` or beyond, and thus that we don't spend
660 // work needlessly writing batches at previous times.
661 let persist_frontier = as_of;
662
663 Self {
664 sink_id,
665 worker_count,
666 desired_frontiers: OkErr::new_frontiers(),
667 persist_frontier,
668 next_append_worker: 0,
669 last_lower: None,
670 read_only,
671 }
672 }
673
674 fn trace<S: AsRef<str>>(&self, message: S) {
675 let message = message.as_ref();
676 trace!(
677 sink_id = %self.sink_id,
678 desired_frontier = ?self.desired_frontiers.frontier().elements(),
679 persist_frontier = ?self.persist_frontier.elements(),
680 last_lower = ?self.last_lower.as_ref().map(|f| f.elements()),
681 message,
682 );
683 }
684
685 fn advance_desired_ok_frontier(&mut self, frontier: Antichain<Timestamp>) {
686 if advance(&mut self.desired_frontiers.ok, frontier) {
687 self.trace("advanced `desired` ok frontier");
688 }
689 }
690
691 fn advance_desired_err_frontier(&mut self, frontier: Antichain<Timestamp>) {
692 if advance(&mut self.desired_frontiers.err, frontier) {
693 self.trace("advanced `desired` err frontier");
694 }
695 }
696
697 fn advance_persist_frontier(&mut self, frontier: Antichain<Timestamp>) {
698 if advance(&mut self.persist_frontier, frontier) {
699 self.trace("advanced `persist` frontier");
700 }
701 }
702
703 fn allow_writes(&mut self) {
704 if self.read_only {
705 self.read_only = false;
706 self.trace("disabled read-only mode");
707 }
708 }
709
710 fn maybe_mint_batch_description(&mut self) -> Option<BatchDescription> {
711 let desired_frontier = self.desired_frontiers.frontier();
712 let persist_frontier = &self.persist_frontier;
713
714 // We only mint new batch descriptions when:
715 // 1. We are _not_ in read-only mode.
716 // 2. The `desired` frontier is ahead of the `persist` frontier.
717 // 3. The `persist` frontier advanced since we last emitted a batch description.
718 let desired_ahead = PartialOrder::less_than(persist_frontier, desired_frontier);
719 let persist_advanced = self.last_lower.as_ref().map_or(true, |lower| {
720 PartialOrder::less_than(lower, persist_frontier)
721 });
722
723 if self.read_only || !desired_ahead || !persist_advanced {
724 return None;
725 }
726
727 let lower = persist_frontier.clone();
728 let upper = desired_frontier.clone();
729 let append_worker = self.next_append_worker;
730 let desc = BatchDescription::new(lower, upper, append_worker);
731
732 self.next_append_worker = (append_worker + 1) % self.worker_count;
733 self.last_lower = Some(desc.lower.clone());
734
735 self.trace(format!("minted batch description: {desc:?}"));
736 Some(desc)
737 }
738 }
739}
740
741/// Implementation of the `write` operator.
742mod write {
743 use super::*;
744
745 /// Render the `write` operator.
746 ///
747 /// The parameters passed in are:
748 /// * `sink_id`: The `GlobalId` of the sink export.
749 /// * `persist_api`: An object providing access to the output persist shard.
750 /// * `as_of`: The first time for which the sink may produce output.
751 /// * `desired`: The ok/err streams that should be sinked to persist.
752 /// * `persist`: The ok/err streams read back from the output persist shard.
753 /// * `descs`: The stream of batch descriptions produced by the `mint` operator.
754 pub fn render<S>(
755 sink_id: GlobalId,
756 persist_api: PersistApi,
757 as_of: Antichain<Timestamp>,
758 desired: &DesiredStreams<S>,
759 persist: &PersistStreams<S>,
760 descs: &Stream<S, BatchDescription>,
761 worker_config: Rc<ConfigSet>,
762 ) -> (BatchesStream<S>, PressOnDropButton)
763 where
764 S: Scope<Timestamp = Timestamp>,
765 {
766 let scope = desired.ok.scope();
767 let worker_id = scope.index();
768
769 let name = operator_name(sink_id, "write");
770 let mut op = OperatorBuilder::new(name, scope);
771
772 let (batches_output, batches_output_stream) =
773 op.new_output::<CapacityContainerBuilder<_>>();
774
775 // It is important that we exchange the `desired` and `persist` data the same way, so
776 // updates that cancel each other out end up on the same worker.
777 let exchange_ok = |(d, _, _): &(Row, Timestamp, Diff)| d.hashed();
778 let exchange_err = |(d, _, _): &(DataflowError, Timestamp, Diff)| d.hashed();
779
780 let mut desired_inputs = OkErr::new(
781 op.new_disconnected_input(&desired.ok, Exchange::new(exchange_ok)),
782 op.new_disconnected_input(&desired.err, Exchange::new(exchange_err)),
783 );
784 let mut persist_inputs = OkErr::new(
785 op.new_disconnected_input(&persist.ok, Exchange::new(exchange_ok)),
786 op.new_disconnected_input(&persist.err, Exchange::new(exchange_err)),
787 );
788 let mut descs_input = op.new_input_for(&descs.broadcast(), Pipeline, &batches_output);
789
790 let button = op.build(move |capabilities| async move {
791 // We will use the data capabilities from the `descs` input to produce output, so no
792 // need to hold onto the initial capabilities.
793 drop(capabilities);
794
795 let writer = persist_api.open_writer().await;
796 let sink_metrics = persist_api.open_metrics().await;
797 let mut state = State::new(
798 sink_id,
799 worker_id,
800 writer,
801 sink_metrics,
802 as_of,
803 &worker_config,
804 );
805
806 loop {
807 // Read from the inputs, extract `desired` updates as positive contributions to
808 // `correction` and `persist` updates as negative contributions. If either the
809 // `desired` or `persist` frontier advances, or if we receive a new batch description,
810 // we might have to write a new batch.
811 let maybe_batch = tokio::select! {
812 Some(event) = desired_inputs.ok.next() => {
813 match event {
814 Event::Data(_cap, mut data) => {
815 state.corrections.ok.insert(&mut data);
816 None
817 }
818 Event::Progress(frontier) => {
819 state.advance_desired_ok_frontier(frontier);
820 state.maybe_write_batch().await
821 }
822 }
823 }
824 Some(event) = desired_inputs.err.next() => {
825 match event {
826 Event::Data(_cap, mut data) => {
827 state.corrections.err.insert(&mut data);
828 None
829 }
830 Event::Progress(frontier) => {
831 state.advance_desired_err_frontier(frontier);
832 state.maybe_write_batch().await
833 }
834 }
835 }
836 Some(event) = persist_inputs.ok.next() => {
837 match event {
838 Event::Data(_cap, mut data) => {
839 state.corrections.ok.insert_negated(&mut data);
840 None
841 }
842 Event::Progress(frontier) => {
843 state.advance_persist_ok_frontier(frontier);
844 state.maybe_write_batch().await
845 }
846 }
847 }
848 Some(event) = persist_inputs.err.next() => {
849 match event {
850 Event::Data(_cap, mut data) => {
851 state.corrections.err.insert_negated(&mut data);
852 None
853 }
854 Event::Progress(frontier) => {
855 state.advance_persist_err_frontier(frontier);
856 state.maybe_write_batch().await
857 }
858 }
859 }
860 Some(event) = descs_input.next() => {
861 match event {
862 Event::Data(cap, data) => {
863 for desc in data {
864 state.absorb_batch_description(desc, cap.clone());
865 }
866 state.maybe_write_batch().await
867 }
868 Event::Progress(_frontier) => None,
869 }
870 }
871 // All inputs are exhausted, so we can shut down.
872 else => return,
873 };
874
875 if let Some((index, batch, cap)) = maybe_batch {
876 batches_output.give(&cap, (index, batch));
877 }
878 }
879 });
880
881 (batches_output_stream, button.press_on_drop())
882 }
883
884 /// State maintained by the `write` operator.
885 struct State {
886 sink_id: GlobalId,
887 worker_id: usize,
888 persist_writer: WriteHandle<SourceData, (), Timestamp, StorageDiff>,
889 /// Contains `desired - persist`, reflecting the updates we would like to commit to
890 /// `persist` in order to "correct" it to track `desired`. This collection is only modified
891 /// by updates received from either the `desired` or `persist` inputs.
892 corrections: OkErr<Correction<Row>, Correction<DataflowError>>,
893 /// The frontiers of the `desired` inputs.
894 desired_frontiers: OkErr<Antichain<Timestamp>, Antichain<Timestamp>>,
895 /// The frontiers of the `persist` inputs.
896 ///
897 /// Note that this is _not_ the same as the write frontier of the output persist shard! It
898 /// usually is, but during snapshot processing, these frontiers will start at the shard's
899 /// read frontier, so they can be beyond its write frontier. This is important as it means
900 /// we must not discard batch descriptions based on these persist frontiers: A batch
901 /// description might still be valid even if its `lower` is before the persist frontiers we
902 /// observe.
903 persist_frontiers: OkErr<Antichain<Timestamp>, Antichain<Timestamp>>,
904 /// The current valid batch description and associated output capability, if any.
905 batch_description: Option<(BatchDescription, Capability<Timestamp>)>,
906 /// A request to force a consolidation of `corrections` once both `desired_frontiers` and
907 /// `persist_frontiers` become greater than the given frontier.
908 ///
909 /// Normally we force a consolidation whenever we write a batch, but there are periods
910 /// (like read-only mode) when that doesn't happen, and we need to manually force
911 /// consolidation instead. Currently this is only used to ensure we quickly get rid of the
912 /// snapshot updates.
913 force_consolidation_after: Option<Antichain<Timestamp>>,
914 }
915
916 impl State {
917 fn new(
918 sink_id: GlobalId,
919 worker_id: usize,
920 persist_writer: WriteHandle<SourceData, (), Timestamp, StorageDiff>,
921 metrics: SinkMetrics,
922 as_of: Antichain<Timestamp>,
923 worker_config: &ConfigSet,
924 ) -> Self {
925 let worker_metrics = metrics.for_worker(worker_id);
926
927 // Force a consolidation of `corrections` after the snapshot updates have been fully
928 // processed, to ensure we get rid of those as quickly as possible.
929 let force_consolidation_after = Some(as_of);
930
931 Self {
932 sink_id,
933 worker_id,
934 persist_writer,
935 corrections: OkErr::new(
936 Correction::new(metrics.clone(), worker_metrics.clone(), worker_config),
937 Correction::new(metrics, worker_metrics, worker_config),
938 ),
939 desired_frontiers: OkErr::new_frontiers(),
940 persist_frontiers: OkErr::new_frontiers(),
941 batch_description: None,
942 force_consolidation_after,
943 }
944 }
945
946 fn trace<S: AsRef<str>>(&self, message: S) {
947 let message = message.as_ref();
948 trace!(
949 sink_id = %self.sink_id,
950 worker = %self.worker_id,
951 desired_frontier = ?self.desired_frontiers.frontier().elements(),
952 persist_frontier = ?self.persist_frontiers.frontier().elements(),
953 batch_description = ?self.batch_description.as_ref().map(|(d, _)| d),
954 message,
955 );
956 }
957
958 fn advance_desired_ok_frontier(&mut self, frontier: Antichain<Timestamp>) {
959 if advance(&mut self.desired_frontiers.ok, frontier) {
960 self.apply_desired_frontier_advancement();
961 self.trace("advanced `desired` ok frontier");
962 }
963 }
964
965 fn advance_desired_err_frontier(&mut self, frontier: Antichain<Timestamp>) {
966 if advance(&mut self.desired_frontiers.err, frontier) {
967 self.apply_desired_frontier_advancement();
968 self.trace("advanced `desired` err frontier");
969 }
970 }
971
972 fn advance_persist_ok_frontier(&mut self, frontier: Antichain<Timestamp>) {
973 if advance(&mut self.persist_frontiers.ok, frontier) {
974 self.apply_persist_frontier_advancement();
975 self.trace("advanced `persist` ok frontier");
976 }
977 }
978
979 fn advance_persist_err_frontier(&mut self, frontier: Antichain<Timestamp>) {
980 if advance(&mut self.persist_frontiers.err, frontier) {
981 self.apply_persist_frontier_advancement();
982 self.trace("advanced `persist` err frontier");
983 }
984 }
985
986 /// Apply the effects of a previous `desired` frontier advancement.
987 fn apply_desired_frontier_advancement(&mut self) {
988 self.maybe_force_consolidation();
989 }
990
991 /// Apply the effects of a previous `persist` frontier advancement.
992 fn apply_persist_frontier_advancement(&mut self) {
993 let frontier = self.persist_frontiers.frontier();
994
995 // We will only emit times at or after the `persist` frontier, so now is a good time to
996 // advance the times of stashed updates.
997 self.corrections.ok.advance_since(frontier.clone());
998 self.corrections.err.advance_since(frontier.clone());
999
1000 self.maybe_force_consolidation();
1001 }
1002
1003 /// If the current consolidation request has become applicable, apply it.
1004 fn maybe_force_consolidation(&mut self) {
1005 let Some(request) = &self.force_consolidation_after else {
1006 return;
1007 };
1008
1009 let desired_frontier = self.desired_frontiers.frontier();
1010 let persist_frontier = self.persist_frontiers.frontier();
1011 if PartialOrder::less_than(request, desired_frontier)
1012 && PartialOrder::less_than(request, persist_frontier)
1013 {
1014 self.trace("forcing correction consolidation");
1015 self.corrections.ok.consolidate_at_since();
1016 self.corrections.err.consolidate_at_since();
1017
1018 // Remove the consolidation request, now that we have fulfilled it.
1019 self.force_consolidation_after = None;
1020 }
1021 }
1022
1023 fn absorb_batch_description(&mut self, desc: BatchDescription, cap: Capability<Timestamp>) {
1024 // The incoming batch description is outdated if we already have a batch description
1025 // with a greater `lower`.
1026 //
1027 // Note that we cannot assume a description is outdated based on the comparison of its
1028 // `lower` with the `persist_frontier`. The persist frontier observed by the `write`
1029 // operator is initialized with the shard's read frontier, so it can be greater than
1030 // the shard's write frontier.
1031 if let Some((prev, _)) = &self.batch_description {
1032 if PartialOrder::less_than(&desc.lower, &prev.lower) {
1033 self.trace(format!("skipping outdated batch description: {desc:?}"));
1034 return;
1035 }
1036 }
1037
1038 self.batch_description = Some((desc, cap));
1039 self.trace("set batch description");
1040 }
1041
1042 async fn maybe_write_batch(
1043 &mut self,
1044 ) -> Option<(BatchDescription, ProtoBatch, Capability<Timestamp>)> {
1045 let (desc, _cap) = self.batch_description.as_ref()?;
1046
1047 // We can write a new batch if we have seen all `persist` updates before `lower` and
1048 // all `desired` updates up to `upper`.
1049 let persist_complete =
1050 PartialOrder::less_equal(&desc.lower, self.persist_frontiers.frontier());
1051 let desired_complete =
1052 PartialOrder::less_equal(&desc.upper, self.desired_frontiers.frontier());
1053 if !persist_complete || !desired_complete {
1054 return None;
1055 }
1056
1057 let (desc, cap) = self.batch_description.take()?;
1058
1059 let ok_updates = self.corrections.ok.updates_before(&desc.upper);
1060 let err_updates = self.corrections.err.updates_before(&desc.upper);
1061
1062 let oks = ok_updates.map(|(d, t, r)| ((SourceData(Ok(d)), ()), t, r.into_inner()));
1063 let errs = err_updates.map(|(d, t, r)| ((SourceData(Err(d)), ()), t, r.into_inner()));
1064 let mut updates = oks.chain(errs).peekable();
1065
1066 // Don't write empty batches.
1067 if updates.peek().is_none() {
1068 drop(updates);
1069 self.trace("skipping empty batch");
1070 return None;
1071 }
1072
1073 let batch = self
1074 .persist_writer
1075 .batch(updates, desc.lower.clone(), desc.upper.clone())
1076 .await
1077 .expect("valid usage")
1078 .into_transmittable_batch();
1079
1080 self.trace("wrote a batch");
1081 Some((desc, batch, cap))
1082 }
1083 }
1084}
1085
1086/// Implementation of the `append` operator.
1087mod append {
1088 use super::*;
1089
1090 /// Render the `append` operator.
1091 ///
1092 /// The parameters passed in are:
1093 /// * `sink_id`: The `GlobalId` of the sink export.
1094 /// * `persist_api`: An object providing access to the output persist shard.
1095 /// * `descs`: The stream of batch descriptions produced by the `mint` operator.
1096 /// * `batches`: The stream of written batches produced by the `write` operator.
1097 pub fn render<S>(
1098 sink_id: GlobalId,
1099 persist_api: PersistApi,
1100 descs: &DescsStream<S>,
1101 batches: &BatchesStream<S>,
1102 ) -> PressOnDropButton
1103 where
1104 S: Scope<Timestamp = Timestamp>,
1105 {
1106 let scope = descs.scope();
1107 let worker_id = scope.index();
1108
1109 let name = operator_name(sink_id, "append");
1110 let mut op = OperatorBuilder::new(name, scope);
1111
1112 // Broadcast batch descriptions to all workers, regardless of whether or not they are
1113 // responsible for the append, to give them a chance to clean up any outdated state they
1114 // might still hold.
1115 let mut descs_input = op.new_disconnected_input(&descs.broadcast(), Pipeline);
1116 let mut batches_input = op.new_disconnected_input(
1117 batches,
1118 Exchange::new(move |(desc, _): &(BatchDescription, _)| {
1119 u64::cast_from(desc.append_worker)
1120 }),
1121 );
1122
1123 let button = op.build(move |_capabilities| async move {
1124 let writer = persist_api.open_writer().await;
1125 let mut state = State::new(sink_id, worker_id, writer);
1126
1127 loop {
1128 // Read from the inputs, absorb batch descriptions and batches. If the `batches`
1129 // frontier advances, or if we receive a new batch description, we might have to
1130 // append a new batch.
1131 tokio::select! {
1132 Some(event) = descs_input.next() => {
1133 if let Event::Data(_cap, data) = event {
1134 for desc in data {
1135 state.absorb_batch_description(desc).await;
1136 state.maybe_append_batches().await;
1137 }
1138 }
1139 }
1140 Some(event) = batches_input.next() => {
1141 match event {
1142 Event::Data(_cap, data) => {
1143 // The batch description is only used for routing and we ignore it
1144 // here since we already get one from `descs_input`.
1145 for (_desc, batch) in data {
1146 state.absorb_batch(batch).await;
1147 }
1148 }
1149 Event::Progress(frontier) => {
1150 state.advance_batches_frontier(frontier);
1151 state.maybe_append_batches().await;
1152 }
1153 }
1154 }
1155 // All inputs are exhausted, so we can shut down.
1156 else => return,
1157 }
1158 }
1159 });
1160
1161 button.press_on_drop()
1162 }
1163
1164 /// State maintained by the `append` operator.
1165 struct State {
1166 sink_id: GlobalId,
1167 worker_id: usize,
1168 persist_writer: WriteHandle<SourceData, (), Timestamp, StorageDiff>,
1169 /// The current input frontier of `batches`.
1170 batches_frontier: Antichain<Timestamp>,
1171 /// The greatest observed `lower` from both `descs` and `batches`.
1172 lower: Antichain<Timestamp>,
1173 /// The batch description for `lower`, if any.
1174 batch_description: Option<BatchDescription>,
1175 /// Batches received for `lower`.
1176 batches: Vec<Batch<SourceData, (), Timestamp, StorageDiff>>,
1177 }
1178
1179 impl State {
1180 fn new(
1181 sink_id: GlobalId,
1182 worker_id: usize,
1183 persist_writer: WriteHandle<SourceData, (), Timestamp, StorageDiff>,
1184 ) -> Self {
1185 Self {
1186 sink_id,
1187 worker_id,
1188 persist_writer,
1189 batches_frontier: Antichain::from_elem(Timestamp::MIN),
1190 lower: Antichain::from_elem(Timestamp::MIN),
1191 batch_description: None,
1192 batches: Default::default(),
1193 }
1194 }
1195
1196 fn trace<S: AsRef<str>>(&self, message: S) {
1197 let message = message.as_ref();
1198 trace!(
1199 sink_id = %self.sink_id,
1200 worker = %self.worker_id,
1201 batches_frontier = ?self.batches_frontier.elements(),
1202 lower = ?self.lower.elements(),
1203 batch_description = ?self.batch_description,
1204 message,
1205 );
1206 }
1207
1208 fn advance_batches_frontier(&mut self, frontier: Antichain<Timestamp>) {
1209 if advance(&mut self.batches_frontier, frontier) {
1210 self.trace("advanced `batches` frontier");
1211 }
1212 }
1213
1214 /// Advance the current `lower`.
1215 ///
1216 /// Discards all currently stashed batches and batch descriptions, assuming that they are
1217 /// now invalid.
1218 async fn advance_lower(&mut self, frontier: Antichain<Timestamp>) {
1219 assert!(PartialOrder::less_than(&self.lower, &frontier));
1220
1221 self.lower = frontier;
1222 self.batch_description = None;
1223
1224 // Remove stashed batches, cleaning up those we didn't append.
1225 for batch in self.batches.drain(..) {
1226 batch.delete().await;
1227 }
1228
1229 self.trace("advanced `lower`");
1230 }
1231
1232 /// Absorb the given batch description into the state, provided it is not outdated.
1233 async fn absorb_batch_description(&mut self, desc: BatchDescription) {
1234 if PartialOrder::less_than(&self.lower, &desc.lower) {
1235 self.advance_lower(desc.lower.clone()).await;
1236 } else if &self.lower != &desc.lower {
1237 self.trace(format!("skipping outdated batch description: {desc:?}"));
1238 return;
1239 }
1240
1241 if desc.append_worker == self.worker_id {
1242 self.batch_description = Some(desc);
1243 self.trace("set batch description");
1244 }
1245 }
1246
1247 /// Absorb the given batch into the state, provided it is not outdated.
1248 async fn absorb_batch(&mut self, batch: ProtoBatch) {
1249 let batch = self.persist_writer.batch_from_transmittable_batch(batch);
1250 if PartialOrder::less_than(&self.lower, batch.lower()) {
1251 self.advance_lower(batch.lower().clone()).await;
1252 } else if &self.lower != batch.lower() {
1253 self.trace(format!(
1254 "skipping outdated batch: ({:?}, {:?})",
1255 batch.lower().elements(),
1256 batch.upper().elements(),
1257 ));
1258
1259 // Ensure the batch's data gets properly cleaned up before dropping it.
1260 batch.delete().await;
1261 return;
1262 }
1263
1264 self.batches.push(batch);
1265 self.trace("absorbed a batch");
1266 }
1267
1268 async fn maybe_append_batches(&mut self) {
1269 let batches_complete = PartialOrder::less_than(&self.lower, &self.batches_frontier);
1270 if !batches_complete {
1271 return;
1272 }
1273
1274 let Some(desc) = self.batch_description.take() else {
1275 return;
1276 };
1277
1278 let new_lower = match self.append_batches(desc).await {
1279 Ok(shard_upper) => {
1280 self.trace("appended a batch");
1281 shard_upper
1282 }
1283 Err(shard_upper) => {
1284 // Failing the append is expected in the presence of concurrent replicas. There
1285 // is nothing special to do here: The self-correcting feedback mechanism
1286 // ensures that we observe the concurrent changes, compute their consequences,
1287 // and append them at a future time.
1288 self.trace(format!(
1289 "append failed due to `lower` mismatch: {:?}",
1290 shard_upper.elements(),
1291 ));
1292 shard_upper
1293 }
1294 };
1295
1296 self.advance_lower(new_lower).await;
1297 }
1298
1299 /// Append the current `batches` to the output shard.
1300 ///
1301 /// Returns whether the append was successful or not, and the current shard upper in either
1302 /// case.
1303 ///
1304 /// This method advances the shard upper to the batch `lower` if necessary. This is the
1305 /// mechanism that brings the shard upper to the sink as-of when appending the initial
1306 /// batch.
1307 ///
1308 /// An alternative mechanism for bringing the shard upper to the sink as-of would be making
1309 /// a single append at operator startup. The reason we are doing it here instead is that it
1310 /// simplifies the implementation of read-only mode. In read-only mode we have to defer any
1311 /// persist writes, including the initial upper bump. Having only a single place that
1312 /// performs writes makes it easy to ensure we are doing that correctly.
1313 async fn append_batches(
1314 &mut self,
1315 desc: BatchDescription,
1316 ) -> Result<Antichain<Timestamp>, Antichain<Timestamp>> {
1317 let (lower, upper) = (desc.lower, desc.upper);
1318 let mut to_append: Vec<_> = self.batches.iter_mut().collect();
1319
1320 loop {
1321 let result = self
1322 .persist_writer
1323 .compare_and_append_batch(&mut to_append, lower.clone(), upper.clone(), true)
1324 .await
1325 .expect("valid usage");
1326
1327 match result {
1328 Ok(()) => return Ok(upper),
1329 Err(mismatch) if PartialOrder::less_than(&mismatch.current, &lower) => {
1330 advance_shard_upper(&mut self.persist_writer, lower.clone()).await;
1331 }
1332 Err(mismatch) => return Err(mismatch.current),
1333 }
1334 }
1335 }
1336 }
1337
1338 /// Advance the frontier of the given writer's shard to at least the given `upper`.
1339 async fn advance_shard_upper(
1340 persist_writer: &mut WriteHandle<SourceData, (), Timestamp, StorageDiff>,
1341 upper: Antichain<Timestamp>,
1342 ) {
1343 let empty_updates: &[((SourceData, ()), Timestamp, StorageDiff)] = &[];
1344 let lower = Antichain::from_elem(Timestamp::MIN);
1345 persist_writer
1346 .append(empty_updates, lower, upper)
1347 .await
1348 .expect("valid usage")
1349 .expect("should always succeed");
1350 }
1351}