mz_storage/render/persist_sink.rs
1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Render an operator that persists a source collection.
11//!
12//! ## Implementation
13//!
14//! This module defines the `persist_sink` operator, that writes
15//! a collection produced by source rendering into a persist shard.
16//!
17//! It attempts to use all workers to write data to persist, and uses
18//! single-instance workers to coordinate work. The below diagram
19//! is an overview how it it shaped. There is more information
20//! in the doc comments of the top-level functions of this module.
21//!
22//!```text
23//!
24//!                                       ,------------.
25//!                                       | source     |
26//!                                       | collection |
27//!                                       +---+--------+
28//!                                       /   |
29//!                                      /    |
30//!                                     /     |
31//!                                    /      |
32//!                                   /       |
33//!                                  /        |
34//!                                 /         |
35//!                                /          |
36//!                               /     ,-+-----------------------.
37//!                              /      | mint_batch_descriptions |
38//!                             /       | one arbitrary worker    |
39//!                            |        +-,--,--------+----+------+
40//!                           ,----------´.-´         |     \
41//!                       _.-´ |       .-´            |      \
42//!                   _.-´     |    .-´               |       \
43//!                .-´  .------+----|-------+---------|--------\-----.
44//!               /    /            |       |         |         \     \
45//!        ,--------------.   ,-----------------.     |     ,-----------------.
46//!        | write_batches|   |  write_batches  |     |     |  write_batches  |
47//!        | worker 0     |   | worker 1        |     |     | worker N        |
48//!        +-----+--------+   +-+---------------+     |     +--+--------------+
49//!               \              \                    |        /
50//!                `-.            `,                  |       /
51//!                   `-._          `-.               |      /
52//!                       `-._         `-.            |     /
53//!                           `---------. `-.         |    /
54//!                                     +`---`---+-------------,
55//!                                     | append_batches       |
56//!                                     | one arbitrary worker |
57//!                                     +------+---------------+
58//!```
59//!
60//! ## Similarities with `mz_compute::sink::persist_sink`
61//!
62//! This module has many similarities with the compute version of
63//! the same concept, and in fact, is entirely derived from it.
64//!
65//! Compute requires that its `persist_sink` is _self-correcting_;
66//! that is, it corrects what the collection in persist
67//! accumulates to if the collection has values changed at
68//! previous timestamps. It does this by continually comparing
69//! the input stream with the collection as read back from persist.
70//!
71//! Source collections, while definite, cannot be reliably by
72//! re-produced once written down, which means compute's
73//! `persist_sink`'s self-correction mechanism would need to be
74//! skipped on operator startup, and would cause unnecessary read
75//! load on persist.
76//!
77//! Additionally, persisting sources requires we use bounded
78//! amounts of memory, even if a single timestamp represents
79//! a huge amount of data. This is not (currently) possible
80//! to guarantee while also performing self-correction.
81//!
82//! Because of this, we have ripped out the self-correction
83//! mechanism, and aggressively simplified the sub-operators.
84//! Some, particularly `append_batches` could be merged with
85//! the compute version, but that requires some amount of
86//! onerous refactoring that we have chosen to skip for now.
87//!
88// TODO(guswynn): merge at least the `append_batches` operator`
89
90use std::cmp::Ordering;
91use std::collections::{BTreeMap, VecDeque};
92use std::fmt::Debug;
93use std::ops::AddAssign;
94use std::rc::Rc;
95use std::sync::Arc;
96use std::time::Duration;
97
98use differential_dataflow::difference::Semigroup;
99use differential_dataflow::lattice::Lattice;
100use differential_dataflow::{AsCollection, Collection, Hashable};
101use futures::{StreamExt, future};
102use itertools::Itertools;
103use mz_ore::cast::CastFrom;
104use mz_ore::collections::HashMap;
105use mz_persist_client::Diagnostics;
106use mz_persist_client::batch::{Batch, BatchBuilder, ProtoBatch};
107use mz_persist_client::cache::PersistClientCache;
108use mz_persist_client::error::UpperMismatch;
109use mz_persist_types::codec_impls::UnitSchema;
110use mz_persist_types::{Codec, Codec64};
111use mz_repr::{Diff, GlobalId, Row};
112use mz_storage_types::controller::CollectionMetadata;
113use mz_storage_types::errors::DataflowError;
114use mz_storage_types::sources::SourceData;
115use mz_storage_types::{StorageDiff, dyncfgs};
116use mz_timely_util::builder_async::{
117    Event, OperatorBuilder as AsyncOperatorBuilder, PressOnDropButton,
118};
119use serde::{Deserialize, Serialize};
120use timely::PartialOrder;
121use timely::container::CapacityContainerBuilder;
122use timely::dataflow::channels::pact::{Exchange, Pipeline};
123use timely::dataflow::operators::{Broadcast, Capability, CapabilitySet, Inspect};
124use timely::dataflow::{Scope, Stream};
125use timely::progress::{Antichain, Timestamp};
126use tokio::sync::Semaphore;
127use tracing::trace;
128
129use crate::metrics::source::SourcePersistSinkMetrics;
130use crate::storage_state::StorageState;
131
132/// Metrics about batches.
133#[derive(Clone, Debug, Default, Deserialize, Serialize)]
134struct BatchMetrics {
135    inserts: u64,
136    retractions: u64,
137    error_inserts: u64,
138    error_retractions: u64,
139}
140
141impl AddAssign<&BatchMetrics> for BatchMetrics {
142    fn add_assign(&mut self, rhs: &BatchMetrics) {
143        let BatchMetrics {
144            inserts: self_inserts,
145            retractions: self_retractions,
146            error_inserts: self_error_inserts,
147            error_retractions: self_error_retractions,
148        } = self;
149        let BatchMetrics {
150            inserts: rhs_inserts,
151            retractions: rhs_retractions,
152            error_inserts: rhs_error_inserts,
153            error_retractions: rhs_error_retractions,
154        } = rhs;
155        *self_inserts += rhs_inserts;
156        *self_retractions += rhs_retractions;
157        *self_error_inserts += rhs_error_inserts;
158        *self_error_retractions += rhs_error_retractions;
159    }
160}
161
162/// Manages batches and metrics.
163struct BatchBuilderAndMetadata<K, V, T, D>
164where
165    K: Codec,
166    V: Codec,
167    T: Timestamp + Lattice + Codec64,
168{
169    builder: BatchBuilder<K, V, T, D>,
170    data_ts: T,
171    metrics: BatchMetrics,
172}
173
174impl<K, V, T, D> BatchBuilderAndMetadata<K, V, T, D>
175where
176    K: Codec + Debug,
177    V: Codec + Debug,
178    T: Timestamp + Lattice + Codec64,
179    D: Semigroup + Codec64,
180{
181    /// Creates a new batch.
182    ///
183    /// NOTE(benesch): temporary restriction: all updates added to the batch
184    /// must be at the specified timestamp `data_ts`.
185    fn new(builder: BatchBuilder<K, V, T, D>, data_ts: T) -> Self {
186        BatchBuilderAndMetadata {
187            builder,
188            data_ts,
189            metrics: Default::default(),
190        }
191    }
192
193    /// Adds an update to the batch.
194    ///
195    /// NOTE(benesch): temporary restriction: all updates added to the batch
196    /// must be at the timestamp specified during creation.
197    async fn add(&mut self, k: &K, v: &V, t: &T, d: &D) {
198        assert_eq!(
199            self.data_ts, *t,
200            "BatchBuilderAndMetadata::add called with a timestamp {t:?} that does not match creation timestamp {:?}",
201            self.data_ts
202        );
203
204        self.builder.add(k, v, t, d).await.expect("invalid usage");
205    }
206
207    async fn finish(self, lower: Antichain<T>, upper: Antichain<T>) -> HollowBatchAndMetadata<T> {
208        let batch = self
209            .builder
210            .finish(upper.clone())
211            .await
212            .expect("invalid usage");
213        HollowBatchAndMetadata {
214            lower,
215            upper,
216            data_ts: self.data_ts,
217            batch: batch.into_transmittable_batch(),
218            metrics: self.metrics,
219        }
220    }
221}
222
223/// A batch or data + metrics moved from `write_batches` to `append_batches`.
224#[derive(Clone, Debug, Deserialize, Serialize)]
225#[serde(bound(
226    serialize = "T: Timestamp + Codec64",
227    deserialize = "T: Timestamp + Codec64"
228))]
229struct HollowBatchAndMetadata<T> {
230    lower: Antichain<T>,
231    upper: Antichain<T>,
232    data_ts: T,
233    batch: ProtoBatch,
234    metrics: BatchMetrics,
235}
236
237/// Holds finished batches for `append_batches`.
238#[derive(Debug, Default)]
239struct BatchSet {
240    finished: Vec<FinishedBatch>,
241    batch_metrics: BatchMetrics,
242}
243
244#[derive(Debug)]
245struct FinishedBatch {
246    batch: Batch<SourceData, (), mz_repr::Timestamp, StorageDiff>,
247    data_ts: mz_repr::Timestamp,
248}
249
250/// Continuously writes the `desired_stream` into persist
251/// This is done via a multi-stage operator graph:
252///
253/// 1. `mint_batch_descriptions` emits new batch descriptions whenever the
254///    frontier of `desired_collection` advances. A batch description is
255///    a pair of `(lower, upper)` that tells write operators
256///    which updates to write and in the end tells the append operator
257///    what frontiers to use when calling `append`/`compare_and_append`.
258///    This is a single-worker operator.
259/// 2. `write_batches` writes the `desired_collection` to persist as
260///    batches and sends those batches along.
261///    This does not yet append the batches to the persist shard, the update are
262///    only uploaded/prepared to be appended to a shard. Also: we only write
263///    updates for batch descriptions that we learned about from
264///    `mint_batch_descriptions`.
265/// 3. `append_batches` takes as input the minted batch descriptions and written
266///    batches. Whenever the frontiers sufficiently advance, we take a batch
267///    description and all the batches that belong to it and append it to the
268///    persist shard.
269///
270/// This operator assumes that the `desired_collection` comes pre-sharded.
271///
272/// Note that `mint_batch_descriptions` inspects the frontier of
273/// `desired_collection`, and passes the data through to `write_batches`.
274/// This is done to avoid a clone of the underlying data so that both
275/// operators can have the collection as input.
276pub(crate) fn render<G>(
277    scope: &G,
278    collection_id: GlobalId,
279    target: CollectionMetadata,
280    desired_collection: Collection<G, Result<Row, DataflowError>, Diff>,
281    storage_state: &StorageState,
282    metrics: SourcePersistSinkMetrics,
283    busy_signal: Arc<Semaphore>,
284) -> (
285    Stream<G, ()>,
286    Stream<G, Rc<anyhow::Error>>,
287    Vec<PressOnDropButton>,
288)
289where
290    G: Scope<Timestamp = mz_repr::Timestamp>,
291{
292    let persist_clients = Arc::clone(&storage_state.persist_clients);
293
294    let operator_name = format!("persist_sink({})", collection_id);
295
296    let (batch_descriptions, passthrough_desired_stream, mint_token) = mint_batch_descriptions(
297        scope,
298        collection_id,
299        &operator_name,
300        &target,
301        &desired_collection,
302        Arc::clone(&persist_clients),
303    );
304
305    let (written_batches, write_token) = write_batches(
306        scope,
307        collection_id.clone(),
308        &operator_name,
309        &target,
310        &batch_descriptions,
311        &passthrough_desired_stream.as_collection(),
312        Arc::clone(&persist_clients),
313        storage_state,
314        Arc::clone(&busy_signal),
315    );
316
317    let (upper_stream, append_errors, append_token) = append_batches(
318        scope,
319        collection_id.clone(),
320        operator_name,
321        &target,
322        &batch_descriptions,
323        &written_batches,
324        persist_clients,
325        storage_state,
326        metrics,
327        Arc::clone(&busy_signal),
328    );
329
330    (
331        upper_stream,
332        append_errors,
333        vec![mint_token, write_token, append_token],
334    )
335}
336
337/// Whenever the frontier advances, this mints a new batch description (lower
338/// and upper) that writers should use for writing the next set of batches to
339/// persist.
340///
341/// Only one of the workers does this, meaning there will only be one
342/// description in the stream, even in case of multiple timely workers. Use
343/// `broadcast()` to, ahem, broadcast, the one description to all downstream
344/// write operators/workers.
345fn mint_batch_descriptions<G>(
346    scope: &G,
347    collection_id: GlobalId,
348    operator_name: &str,
349    target: &CollectionMetadata,
350    desired_collection: &Collection<G, Result<Row, DataflowError>, Diff>,
351    persist_clients: Arc<PersistClientCache>,
352) -> (
353    Stream<G, (Antichain<mz_repr::Timestamp>, Antichain<mz_repr::Timestamp>)>,
354    Stream<G, (Result<Row, DataflowError>, mz_repr::Timestamp, Diff)>,
355    PressOnDropButton,
356)
357where
358    G: Scope<Timestamp = mz_repr::Timestamp>,
359{
360    let persist_location = target.persist_location.clone();
361    let shard_id = target.data_shard;
362    let target_relation_desc = target.relation_desc.clone();
363
364    // Only one worker is responsible for determining batch descriptions. All
365    // workers must write batches with the same description, to ensure that they
366    // can be combined into one batch that gets appended to Consensus state.
367    let hashed_id = collection_id.hashed();
368    let active_worker = usize::cast_from(hashed_id) % scope.peers() == scope.index();
369
370    // Only the "active" operator will mint batches. All other workers have an
371    // empty frontier. It's necessary to insert all of these into
372    // `compute_state.sink_write_frontier` below so we properly clear out
373    // default frontiers of non-active workers.
374
375    let mut mint_op = AsyncOperatorBuilder::new(
376        format!("{} mint_batch_descriptions", operator_name),
377        scope.clone(),
378    );
379
380    let (output, output_stream) = mint_op.new_output();
381    let (data_output, data_output_stream) = mint_op.new_output::<CapacityContainerBuilder<_>>();
382
383    // The description and the data-passthrough outputs are both driven by this input, so
384    // they use a standard input connection.
385    let mut desired_input =
386        mint_op.new_input_for_many(&desired_collection.inner, Pipeline, [&output, &data_output]);
387
388    let shutdown_button = mint_op.build(move |capabilities| async move {
389        // Non-active workers should just pass the data through.
390        if !active_worker {
391            // The description output is entirely driven by the active worker, so we drop
392            // its capability here. The data-passthrough output just uses the data
393            // capabilities.
394            drop(capabilities);
395            while let Some(event) = desired_input.next().await {
396                match event {
397                    Event::Data([_output_cap, data_output_cap], mut data) => {
398                        data_output.give_container(&data_output_cap, &mut data);
399                    }
400                    Event::Progress(_) => {}
401                }
402            }
403            return;
404        }
405        // The data-passthrough output should will use the data capabilities, so we drop
406        // its capability here.
407        let [desc_cap, _]: [_; 2] = capabilities.try_into().expect("one capability per output");
408        let mut cap_set = CapabilitySet::from_elem(desc_cap);
409
410        // Initialize this operators's `upper` to the `upper` of the persist shard we are writing
411        // to. Data from the source not beyond this time will be dropped, as it has already
412        // been persisted.
413        // In the future, sources will avoid passing through data not beyond this upper
414        let mut current_upper = {
415            // TODO(aljoscha): We need to figure out what to do with error
416            // results from these calls.
417            let persist_client = persist_clients
418                .open(persist_location)
419                .await
420                .expect("could not open persist client");
421
422            let mut write = persist_client
423                .open_writer::<SourceData, (), mz_repr::Timestamp, StorageDiff>(
424                    shard_id,
425                    Arc::new(target_relation_desc),
426                    Arc::new(UnitSchema),
427                    Diagnostics {
428                        shard_name: collection_id.to_string(),
429                        handle_purpose: format!(
430                            "storage::persist_sink::mint_batch_descriptions {}",
431                            collection_id
432                        ),
433                    },
434                )
435                .await
436                .expect("could not open persist shard");
437
438            // TODO: this sink currently cannot tolerate a stale upper... which is bad because the
439            // upper can become stale as soon as it is read. (For example, if another concurrent
440            // instance of the sink has updated it.) Fetching a recent upper helps to mitigate this,
441            // but ideally we would just skip ahead if we discover that our upper is stale.
442            let upper = write.fetch_recent_upper().await.clone();
443            // explicitly expire the once-used write handle.
444            write.expire().await;
445            upper
446        };
447
448        // The current input frontiers.
449        let mut desired_frontier;
450
451        loop {
452            if let Some(event) = desired_input.next().await {
453                match event {
454                    Event::Data([_output_cap, data_output_cap], mut data) => {
455                        // Just passthrough the data.
456                        data_output.give_container(&data_output_cap, &mut data);
457                        continue;
458                    }
459                    Event::Progress(frontier) => {
460                        desired_frontier = frontier;
461                    }
462                }
463            } else {
464                // Input is exhausted, so we can shut down.
465                return;
466            };
467
468            // If the new frontier for the data input has progressed, produce a batch description.
469            if PartialOrder::less_than(¤t_upper, &desired_frontier) {
470                // The maximal description range we can produce.
471                let batch_description = (current_upper.to_owned(), desired_frontier.to_owned());
472
473                let lower = batch_description.0.as_option().copied().unwrap();
474
475                let cap = cap_set
476                    .try_delayed(&lower)
477                    .ok_or_else(|| {
478                        format!(
479                            "minter cannot delay {:?} to {:?}. \
480                                Likely because we already emitted a \
481                                batch description and delayed.",
482                            cap_set, lower
483                        )
484                    })
485                    .unwrap();
486
487                trace!(
488                    "persist_sink {collection_id}/{shard_id}: \
489                        new batch_description: {:?}",
490                    batch_description
491                );
492
493                output.give(&cap, batch_description);
494
495                // We downgrade our capability to the batch
496                // description upper, as there will never be
497                // any overlapping descriptions.
498                trace!(
499                    "persist_sink {collection_id}/{shard_id}: \
500                        downgrading to {:?}",
501                    desired_frontier
502                );
503                cap_set.downgrade(desired_frontier.iter());
504
505                // After successfully emitting a new description, we can update the upper for the
506                // operator.
507                current_upper.clone_from(&desired_frontier);
508            }
509        }
510    });
511
512    (
513        output_stream,
514        data_output_stream,
515        shutdown_button.press_on_drop(),
516    )
517}
518
519/// Writes `desired_collection` to persist, but only for updates
520/// that fall into batch a description that we get via `batch_descriptions`.
521/// This forwards a `HollowBatch` (with additional metadata)
522/// for any batch of updates that was written.
523///
524/// This operator assumes that the `desired_collection` comes pre-sharded.
525///
526/// This also and updates various metrics.
527fn write_batches<G>(
528    scope: &G,
529    collection_id: GlobalId,
530    operator_name: &str,
531    target: &CollectionMetadata,
532    batch_descriptions: &Stream<G, (Antichain<mz_repr::Timestamp>, Antichain<mz_repr::Timestamp>)>,
533    desired_collection: &Collection<G, Result<Row, DataflowError>, Diff>,
534    persist_clients: Arc<PersistClientCache>,
535    storage_state: &StorageState,
536    busy_signal: Arc<Semaphore>,
537) -> (
538    Stream<G, HollowBatchAndMetadata<mz_repr::Timestamp>>,
539    PressOnDropButton,
540)
541where
542    G: Scope<Timestamp = mz_repr::Timestamp>,
543{
544    let worker_index = scope.index();
545
546    let persist_location = target.persist_location.clone();
547    let shard_id = target.data_shard;
548    let target_relation_desc = target.relation_desc.clone();
549
550    let source_statistics = storage_state
551        .aggregated_statistics
552        .get_source(&collection_id)
553        .expect("statistics initialized")
554        .clone();
555
556    let mut write_op =
557        AsyncOperatorBuilder::new(format!("{} write_batches", operator_name), scope.clone());
558
559    let (output, output_stream) = write_op.new_output::<CapacityContainerBuilder<_>>();
560
561    let mut descriptions_input =
562        write_op.new_input_for(&batch_descriptions.broadcast(), Pipeline, &output);
563    let mut desired_input = write_op.new_disconnected_input(&desired_collection.inner, Pipeline);
564
565    // This operator accepts the current and desired update streams for a `persist` shard.
566    // It attempts to write out updates, starting from the current's upper frontier, that
567    // will cause the changes of desired to be committed to persist, _but only those also past the
568    // upper_.
569
570    let shutdown_button = write_op.build(move |_capabilities| async move {
571        // In-progress batches of data, keyed by timestamp.
572        let mut stashed_batches = BTreeMap::new();
573
574        // Contains descriptions of batches for which we know that we can
575        // write data. We got these from the "centralized" operator that
576        // determines batch descriptions for all writers.
577        //
578        // `Antichain` does not implement `Ord`, so we cannot use a `BTreeMap`. We need to search
579        // through the map, so we cannot use the `mz_ore` wrapper either.
580        #[allow(clippy::disallowed_types)]
581        let mut in_flight_batches = std::collections::HashMap::<
582            (Antichain<mz_repr::Timestamp>, Antichain<mz_repr::Timestamp>),
583            Capability<mz_repr::Timestamp>,
584        >::new();
585
586        // TODO(aljoscha): We need to figure out what to do with error results from these calls.
587        let persist_client = persist_clients
588            .open(persist_location)
589            .await
590            .expect("could not open persist client");
591
592        let write = persist_client
593            .open_writer::<SourceData, (), mz_repr::Timestamp, StorageDiff>(
594                shard_id,
595                Arc::new(target_relation_desc),
596                Arc::new(UnitSchema),
597                Diagnostics {
598                    shard_name: collection_id.to_string(),
599                    handle_purpose: format!(
600                        "storage::persist_sink::write_batches {}",
601                        collection_id
602                    ),
603                },
604            )
605            .await
606            .expect("could not open persist shard");
607
608        // The current input frontiers.
609        let mut batch_descriptions_frontier = Antichain::from_elem(Timestamp::minimum());
610        let mut desired_frontier = Antichain::from_elem(Timestamp::minimum());
611
612        // The frontiers of the inputs we have processed, used to avoid redoing work
613        let mut processed_desired_frontier = Antichain::from_elem(Timestamp::minimum());
614        let mut processed_descriptions_frontier = Antichain::from_elem(Timestamp::minimum());
615
616        // A "safe" choice for the lower of new batches we are creating.
617        let mut operator_batch_lower = Antichain::from_elem(Timestamp::minimum());
618
619        while !(batch_descriptions_frontier.is_empty() && desired_frontier.is_empty()) {
620            // Wait for either inputs to become ready
621            tokio::select! {
622                _ = descriptions_input.ready() => {},
623                _ = desired_input.ready() => {},
624            }
625
626            // Collect ready work from both inputs
627            while let Some(event) = descriptions_input.next_sync() {
628                match event {
629                    Event::Data(cap, data) => {
630                        // Ingest new batch descriptions.
631                        for description in data {
632                            if collection_id.is_user() {
633                                trace!(
634                                    "persist_sink {collection_id}/{shard_id}: \
635                                        write_batches: \
636                                        new_description: {:?}, \
637                                        desired_frontier: {:?}, \
638                                        batch_descriptions_frontier: {:?}",
639                                    description, desired_frontier, batch_descriptions_frontier,
640                                );
641                            }
642                            match in_flight_batches.entry(description) {
643                                std::collections::hash_map::Entry::Vacant(v) => {
644                                    // This _should_ be `.retain`, but rust
645                                    // currently thinks we can't use `cap`
646                                    // as an owned value when using the
647                                    // match guard `Some(event)`
648                                    v.insert(cap.delayed(cap.time()));
649                                }
650                                std::collections::hash_map::Entry::Occupied(o) => {
651                                    let (description, _) = o.remove_entry();
652                                    panic!(
653                                        "write_batches: sink {} got more than one \
654                                            batch for description {:?}, in-flight: {:?}",
655                                        collection_id, description, in_flight_batches
656                                    );
657                                }
658                            }
659                        }
660                    }
661                    Event::Progress(frontier) => {
662                        batch_descriptions_frontier = frontier;
663                    }
664                }
665            }
666
667            let ready_events = std::iter::from_fn(|| desired_input.next_sync()).collect_vec();
668
669            // We know start the async work for the input we received. Until we finish the dataflow
670            // should be marked as busy.
671            let permit = busy_signal.acquire().await;
672
673            for event in ready_events {
674                match event {
675                    Event::Data(_cap, data) => {
676                        // Extract desired rows as positive contributions to `correction`.
677                        if collection_id.is_user() && !data.is_empty() {
678                            trace!(
679                                "persist_sink {collection_id}/{shard_id}: \
680                                    updates: {:?}, \
681                                    in-flight-batches: {:?}, \
682                                    desired_frontier: {:?}, \
683                                    batch_descriptions_frontier: {:?}",
684                                data,
685                                in_flight_batches,
686                                desired_frontier,
687                                batch_descriptions_frontier,
688                            );
689                        }
690
691                        for (row, ts, diff) in data {
692                            if write.upper().less_equal(&ts) {
693                                let builder = stashed_batches.entry(ts).or_insert_with(|| {
694                                    BatchBuilderAndMetadata::new(
695                                        write.builder(operator_batch_lower.clone()),
696                                        ts,
697                                    )
698                                });
699
700                                let is_value = row.is_ok();
701
702                                builder
703                                    .add(&SourceData(row), &(), &ts, &diff.into_inner())
704                                    .await;
705
706                                source_statistics.inc_updates_staged_by(1);
707
708                                // Note that we assume `diff` is either +1 or -1 here, being anything
709                                // else is a logic bug we can't handle at the metric layer. We also
710                                // assume this addition doesn't overflow.
711                                match (is_value, diff.is_positive()) {
712                                    (true, true) => builder.metrics.inserts += diff.unsigned_abs(),
713                                    (true, false) => {
714                                        builder.metrics.retractions += diff.unsigned_abs()
715                                    }
716                                    (false, true) => {
717                                        builder.metrics.error_inserts += diff.unsigned_abs()
718                                    }
719                                    (false, false) => {
720                                        builder.metrics.error_retractions += diff.unsigned_abs()
721                                    }
722                                }
723                            }
724                        }
725                    }
726                    Event::Progress(frontier) => {
727                        desired_frontier = frontier;
728                    }
729                }
730            }
731            // We may have the opportunity to commit updates, if either frontier
732            // has moved
733            if PartialOrder::less_equal(&processed_desired_frontier, &desired_frontier)
734                || PartialOrder::less_equal(
735                    &processed_descriptions_frontier,
736                    &batch_descriptions_frontier,
737                )
738            {
739                trace!(
740                    "persist_sink {collection_id}/{shard_id}: \
741                        CAN emit: \
742                        processed_desired_frontier: {:?}, \
743                        processed_descriptions_frontier: {:?}, \
744                        desired_frontier: {:?}, \
745                        batch_descriptions_frontier: {:?}",
746                    processed_desired_frontier,
747                    processed_descriptions_frontier,
748                    desired_frontier,
749                    batch_descriptions_frontier,
750                );
751
752                trace!(
753                    "persist_sink {collection_id}/{shard_id}: \
754                        in-flight batches: {:?}, \
755                        batch_descriptions_frontier: {:?}, \
756                        desired_frontier: {:?}",
757                    in_flight_batches, batch_descriptions_frontier, desired_frontier,
758                );
759
760                // We can write updates for a given batch description when
761                // a) the batch is not beyond `batch_descriptions_frontier`,
762                // and b) we know that we have seen all updates that would
763                // fall into the batch, from `desired_frontier`.
764                let ready_batches = in_flight_batches
765                    .keys()
766                    .filter(|(lower, upper)| {
767                        !PartialOrder::less_equal(&batch_descriptions_frontier, lower)
768                            && !PartialOrder::less_than(&desired_frontier, upper)
769                    })
770                    .cloned()
771                    .collect::<Vec<_>>();
772
773                trace!(
774                    "persist_sink {collection_id}/{shard_id}: \
775                        ready batches: {:?}",
776                    ready_batches,
777                );
778
779                for batch_description in ready_batches {
780                    let cap = in_flight_batches.remove(&batch_description).unwrap();
781
782                    if collection_id.is_user() {
783                        trace!(
784                            "persist_sink {collection_id}/{shard_id}: \
785                                emitting done batch: {:?}, cap: {:?}",
786                            batch_description, cap
787                        );
788                    }
789
790                    let (batch_lower, batch_upper) = batch_description;
791
792                    let finalized_timestamps: Vec<_> = stashed_batches
793                        .keys()
794                        .filter(|time| {
795                            batch_lower.less_equal(time) && !batch_upper.less_equal(time)
796                        })
797                        .copied()
798                        .collect();
799
800                    let mut batch_tokens = vec![];
801                    for ts in finalized_timestamps {
802                        let batch_builder = stashed_batches.remove(&ts).unwrap();
803
804                        if collection_id.is_user() {
805                            trace!(
806                                "persist_sink {collection_id}/{shard_id}: \
807                                    wrote batch from worker {}: ({:?}, {:?}),
808                                    containing {:?}",
809                                worker_index, batch_lower, batch_upper, batch_builder.metrics
810                            );
811                        }
812
813                        let batch = batch_builder
814                            .finish(batch_lower.clone(), batch_upper.clone())
815                            .await;
816
817                        // The next "safe" lower for batches is the meet (max) of all the emitted
818                        // batches. These uppers all are not beyond the `desired_frontier`, which
819                        // means all updates received by this operator will be beyond this lower.
820                        // Additionally, the `mint_batch_descriptions` operator ensures that
821                        // later-received batch descriptions will start beyond these uppers as
822                        // well.
823                        //
824                        // It is impossible to emit a batch description that is
825                        // beyond a not-yet emitted description in `in_flight_batches`, as
826                        // a that description would also have been chosen as ready above.
827                        operator_batch_lower = operator_batch_lower.join(&batch_upper);
828                        batch_tokens.push(batch);
829                    }
830
831                    output.give_container(&cap, &mut batch_tokens);
832
833                    processed_desired_frontier.clone_from(&desired_frontier);
834                    processed_descriptions_frontier.clone_from(&batch_descriptions_frontier);
835                }
836            } else {
837                trace!(
838                    "persist_sink {collection_id}/{shard_id}: \
839                        cannot emit: processed_desired_frontier: {:?}, \
840                        processed_descriptions_frontier: {:?}, \
841                        desired_frontier: {:?}",
842                    processed_desired_frontier, processed_descriptions_frontier, desired_frontier
843                );
844            }
845            drop(permit);
846        }
847    });
848
849    if collection_id.is_user() {
850        output_stream.inspect(|d| trace!("batch: {:?}", d));
851    }
852
853    (output_stream, shutdown_button.press_on_drop())
854}
855
856/// Fuses written batches together and appends them to persist using one
857/// `compare_and_append` call. Writing only happens for batch descriptions where
858/// we know that no future batches will arrive, that is, for those batch
859/// descriptions that are not beyond the frontier of both the
860/// `batch_descriptions` and `batches` inputs.
861///
862/// This also keeps the shared frontier that is stored in `compute_state` in
863/// sync with the upper of the persist shard, and updates various metrics
864/// and statistics objects.
865fn append_batches<G>(
866    scope: &G,
867    collection_id: GlobalId,
868    operator_name: String,
869    target: &CollectionMetadata,
870    batch_descriptions: &Stream<G, (Antichain<mz_repr::Timestamp>, Antichain<mz_repr::Timestamp>)>,
871    batches: &Stream<G, HollowBatchAndMetadata<mz_repr::Timestamp>>,
872    persist_clients: Arc<PersistClientCache>,
873    storage_state: &StorageState,
874    metrics: SourcePersistSinkMetrics,
875    busy_signal: Arc<Semaphore>,
876) -> (
877    Stream<G, ()>,
878    Stream<G, Rc<anyhow::Error>>,
879    PressOnDropButton,
880)
881where
882    G: Scope<Timestamp = mz_repr::Timestamp>,
883{
884    let persist_location = target.persist_location.clone();
885    let shard_id = target.data_shard;
886    let target_relation_desc = target.relation_desc.clone();
887
888    // We can only be lenient with concurrent modifications when we know that
889    // this source pipeline is using the feedback upsert operator, which works
890    // correctly when multiple instances of an ingestion pipeline produce
891    // different updates, because of concurrency/non-determinism.
892    let use_continual_feedback_upsert = dyncfgs::STORAGE_USE_CONTINUAL_FEEDBACK_UPSERT
893        .get(storage_state.storage_configuration.config_set());
894    let bail_on_concurrent_modification = !use_continual_feedback_upsert;
895
896    let mut read_only_rx = storage_state.read_only_rx.clone();
897
898    let operator_name = format!("{} append_batches", operator_name);
899    let mut append_op = AsyncOperatorBuilder::new(operator_name, scope.clone());
900
901    let hashed_id = collection_id.hashed();
902    let active_worker = usize::cast_from(hashed_id) % scope.peers() == scope.index();
903    let worker_id = scope.index();
904
905    // Both of these inputs are disconnected from the output capabilities of this operator, as
906    // any output of this operator is entirely driven by the `compare_and_append`s. Currently
907    // this operator has no outputs, but they may be added in the future, when merging with
908    // the compute `persist_sink`.
909    let mut descriptions_input =
910        append_op.new_disconnected_input(batch_descriptions, Exchange::new(move |_| hashed_id));
911    let mut batches_input =
912        append_op.new_disconnected_input(batches, Exchange::new(move |_| hashed_id));
913
914    let current_upper = Rc::clone(&storage_state.source_uppers[&collection_id]);
915    if !active_worker {
916        // This worker is not writing, so make sure it's "taken out" of the
917        // calculation by advancing to the empty frontier.
918        current_upper.borrow_mut().clear();
919    }
920
921    let source_statistics = storage_state
922        .aggregated_statistics
923        .get_source(&collection_id)
924        .expect("statistics initialized")
925        .clone();
926
927    // An output whose frontier tracks the last successful compare and append of this operator
928    let (_upper_output, upper_stream) = append_op.new_output::<CapacityContainerBuilder<_>>();
929
930    // This operator accepts the batch descriptions and tokens that represent
931    // written batches. Written batches get appended to persist when we learn
932    // from our input frontiers that we have seen all batches for a given batch
933    // description.
934
935    let (shutdown_button, errors) = append_op.build_fallible(move |caps| Box::pin(async move {
936        let [upper_cap_set]: &mut [_; 1] = caps.try_into().unwrap();
937
938        // This may SEEM unnecessary, but metrics contains extra
939        // `DeleteOnDrop`-wrapped fields that will NOT be moved into this
940        // closure otherwise, dropping and destroying
941        // those metrics. This is because rust now only moves the
942        // explicitly-referenced fields into closures.
943        let metrics = metrics;
944
945        // Contains descriptions of batches for which we know that we can
946        // write data. We got these from the "centralized" operator that
947        // determines batch descriptions for all writers.
948        //
949        // `Antichain` does not implement `Ord`, so we cannot use a `BTreeSet`. We need to search
950        // through the set, so we cannot use the `mz_ore` wrapper either.
951        #[allow(clippy::disallowed_types)]
952        let mut in_flight_descriptions = std::collections::HashSet::<(
953            Antichain<mz_repr::Timestamp>,
954            Antichain<mz_repr::Timestamp>,
955        )>::new();
956
957        // In flight batches that haven't been `compare_and_append`'d yet, plus metrics about
958        // the batch.
959        let mut in_flight_batches = HashMap::<
960            (Antichain<mz_repr::Timestamp>, Antichain<mz_repr::Timestamp>),
961            BatchSet,
962        >::new();
963
964        source_statistics.initialize_rehydration_latency_ms();
965        if !active_worker {
966            // The non-active workers report that they are done snapshotting and hydrating.
967            let empty_frontier = Antichain::new();
968            source_statistics.initialize_snapshot_committed(&empty_frontier);
969            source_statistics.update_rehydration_latency_ms(&empty_frontier);
970            return Ok(());
971        }
972
973        let persist_client = persist_clients
974            .open(persist_location)
975            .await?;
976
977        let mut write = persist_client
978            .open_writer::<SourceData, (), mz_repr::Timestamp, StorageDiff>(
979                shard_id,
980                Arc::new(target_relation_desc),
981                Arc::new(UnitSchema),
982                Diagnostics {
983                    shard_name:collection_id.to_string(),
984                    handle_purpose: format!("persist_sink::append_batches {}", collection_id)
985                },
986            )
987            .await?;
988
989        // Initialize this sink's `upper` to the `upper` of the persist shard we are writing
990        // to. Data from the source not beyond this time will be dropped, as it has already
991        // been persisted.
992        // In the future, sources will avoid passing through data not beyond this upper
993        // VERY IMPORTANT: Only the active write worker must change the
994        // shared upper. All other workers have already cleared this
995        // upper above.
996        current_upper.borrow_mut().clone_from(write.upper());
997        upper_cap_set.downgrade(current_upper.borrow().iter());
998        source_statistics.initialize_snapshot_committed(write.upper());
999
1000        // The current input frontiers.
1001        let mut batch_description_frontier = Antichain::from_elem(Timestamp::minimum());
1002        let mut batches_frontier = Antichain::from_elem(Timestamp::minimum());
1003
1004        loop {
1005            tokio::select! {
1006                Some(event) = descriptions_input.next() => {
1007                    match event {
1008                        Event::Data(_cap, data) => {
1009                            // Ingest new batch descriptions.
1010                            for batch_description in data {
1011                                if collection_id.is_user() {
1012                                    trace!(
1013                                        "persist_sink {collection_id}/{shard_id}: \
1014                                            append_batches: sink {}, \
1015                                            new description: {:?}, \
1016                                            batch_description_frontier: {:?}",
1017                                        collection_id,
1018                                        batch_description,
1019                                        batch_description_frontier
1020                                    );
1021                                }
1022
1023                                // This line has to be broken up, or
1024                                // rustfmt fails in the whole function :(
1025                                let is_new = in_flight_descriptions.insert(
1026                                    batch_description.clone()
1027                                );
1028
1029                                assert!(
1030                                    is_new,
1031                                    "append_batches: sink {} got more than one batch \
1032                                        for a given description in-flight: {:?}",
1033                                    collection_id, in_flight_batches
1034                                );
1035                            }
1036
1037                            continue;
1038                        }
1039                        Event::Progress(frontier) => {
1040                            batch_description_frontier = frontier;
1041                        }
1042                    }
1043                }
1044                Some(event) = batches_input.next() => {
1045                    match event {
1046                        Event::Data(_cap, data) => {
1047                            for batch in data {
1048                                let batch_description = (batch.lower.clone(), batch.upper.clone());
1049
1050                                let batches = in_flight_batches
1051                                    .entry(batch_description)
1052                                    .or_default();
1053
1054                                batches.finished.push(FinishedBatch {
1055                                    batch: write.batch_from_transmittable_batch(batch.batch),
1056                                    data_ts: batch.data_ts,
1057                                });
1058                                batches.batch_metrics += &batch.metrics;
1059                            }
1060                            continue;
1061                        }
1062                        Event::Progress(frontier) => {
1063                            batches_frontier = frontier;
1064                        }
1065                    }
1066                }
1067                else => {
1068                    // All inputs are exhausted, so we can shut down.
1069                    return Ok(());
1070                }
1071            };
1072
1073            // Peel off any batches that are not beyond the frontier
1074            // anymore.
1075            //
1076            // It is correct to consider batches that are not beyond the
1077            // `batches_frontier` because it is held back by the writer
1078            // operator as long as a) the `batch_description_frontier` did
1079            // not advance and b) as long as the `desired_frontier` has not
1080            // advanced to the `upper` of a given batch description.
1081
1082            let mut done_batches = in_flight_descriptions
1083                .iter()
1084                .filter(|(lower, _upper)| !PartialOrder::less_equal(&batches_frontier, lower))
1085                .cloned()
1086                .collect::<Vec<_>>();
1087
1088            trace!(
1089                "persist_sink {collection_id}/{shard_id}: \
1090                    append_batches: in_flight: {:?}, \
1091                    done: {:?}, \
1092                    batch_frontier: {:?}, \
1093                    batch_description_frontier: {:?}",
1094                in_flight_descriptions,
1095                done_batches,
1096                batches_frontier,
1097                batch_description_frontier
1098            );
1099
1100            // Append batches in order, to ensure that their `lower` and
1101            // `upper` line up.
1102            done_batches.sort_by(|a, b| {
1103                if PartialOrder::less_than(a, b) {
1104                    Ordering::Less
1105                } else if PartialOrder::less_than(b, a) {
1106                    Ordering::Greater
1107                } else {
1108                    Ordering::Equal
1109                }
1110            });
1111
1112            let validate_part_bounds_on_write = write.validate_part_bounds_on_write();
1113            let mut todo = VecDeque::new();
1114
1115            if !validate_part_bounds_on_write {
1116                let mut combined_batch_metadata = None;
1117                let mut combined_batch_set = BatchSet::default();
1118                for done_batch_metadata in done_batches.drain(..) {
1119                    in_flight_descriptions.remove(&done_batch_metadata);
1120                    let mut batch_set = in_flight_batches
1121                        .remove(&done_batch_metadata)
1122                        .unwrap_or_default();
1123                    match combined_batch_metadata.as_mut() {
1124                        Some((_, upper)) => *upper = done_batch_metadata.1,
1125                        None => combined_batch_metadata = Some(done_batch_metadata),
1126                    }
1127                    combined_batch_set.batch_metrics += &batch_set.batch_metrics;
1128                    combined_batch_set.finished.append(&mut batch_set.finished);
1129                }
1130                if let Some(done_batch_metadata) = combined_batch_metadata {
1131                    todo.push_back((done_batch_metadata, combined_batch_set))
1132                }
1133            } else {
1134                for done_batch_metadata in done_batches.drain(..) {
1135                    in_flight_descriptions.remove(&done_batch_metadata);
1136                    let batch_set = in_flight_batches
1137                        .remove(&done_batch_metadata)
1138                        .unwrap_or_default();
1139                    todo.push_back((done_batch_metadata, batch_set));
1140                }
1141            };
1142
1143            while let Some((done_batch_metadata, batch_set)) = todo.pop_front() {
1144                in_flight_descriptions.remove(&done_batch_metadata);
1145
1146                let mut batches = batch_set.finished;
1147
1148                trace!(
1149                    "persist_sink {collection_id}/{shard_id}: \
1150                        done batch: {:?}, {:?}",
1151                    done_batch_metadata,
1152                    batches
1153                );
1154
1155                let (batch_lower, batch_upper) = done_batch_metadata;
1156
1157                let batch_metrics = batch_set.batch_metrics;
1158
1159                let mut to_append = batches.iter_mut().map(|b| &mut b.batch).collect::<Vec<_>>();
1160
1161                let result = {
1162                    let maybe_err = if *read_only_rx.borrow() {
1163
1164                        // We have to wait for either us coming out of read-only
1165                        // mode or someone else applying a write that covers our
1166                        // batch.
1167                        //
1168                        // If we didn't wait for the latter here, and just go
1169                        // around the loop again, we might miss a moment where
1170                        // _we_ have to write down a batch. For example when our
1171                        // input frontier advances to a state where we can
1172                        // write, and the read-write instance sees the same
1173                        // update but then crashes before it can append a batch.
1174
1175                        let maybe_err = loop {
1176                            if collection_id.is_user() {
1177                                tracing::debug!(
1178                                    %worker_id,
1179                                    %collection_id,
1180                                    %shard_id,
1181                                    ?batch_lower,
1182                                    ?batch_upper,
1183                                    ?current_upper,
1184                                    "persist_sink is in read-only mode, waiting until we come out of it or the shard upper advances"
1185                                );
1186                            }
1187
1188                            // We don't try to be smart here, and for example
1189                            // use `wait_for_upper_past()`. We'd have to use a
1190                            // select!, which would require cancel safety of
1191                            // `wait_for_upper_past()`, which it doesn't
1192                            // advertise.
1193                            let _ = tokio::time::timeout(Duration::from_secs(1), read_only_rx.changed()).await;
1194
1195                            if !*read_only_rx.borrow() {
1196                                if collection_id.is_user() {
1197                                    tracing::debug!(
1198                                        %worker_id,
1199                                        %collection_id,
1200                                        %shard_id,
1201                                        ?batch_lower,
1202                                        ?batch_upper,
1203                                        ?current_upper,
1204                                        "persist_sink has come out of read-only mode"
1205                                    );
1206                                }
1207
1208                                // It's okay to write now.
1209                                break Ok(());
1210                            }
1211
1212                            let current_upper = write.fetch_recent_upper().await;
1213
1214                            if PartialOrder::less_than(&batch_upper, current_upper) {
1215                                // We synthesize an `UpperMismatch` so that we can go
1216                                // through the same logic below for trimming down our
1217                                // batches.
1218                                //
1219                                // Notably, we are not trying to be smart, and teach the
1220                                // write operator about read-only mode. Writing down
1221                                // those batches does not append anything to the persist
1222                                // shard, and it would be a hassle to figure out in the
1223                                // write workers how to trim down batches in read-only
1224                                // mode, when the shard upper advances.
1225                                //
1226                                // Right here, in the logic below, we have all we need
1227                                // for figuring out how to trim our batches.
1228
1229                                if collection_id.is_user() {
1230                                    tracing::debug!(
1231                                        %worker_id,
1232                                        %collection_id,
1233                                        %shard_id,
1234                                        ?batch_lower,
1235                                        ?batch_upper,
1236                                        ?current_upper,
1237                                        "persist_sink not appending in read-only mode"
1238                                    );
1239                                }
1240
1241                                break Err(UpperMismatch {
1242                                    current: current_upper.clone(),
1243                                    expected: batch_lower.clone()}
1244                                );
1245                            }
1246                        };
1247
1248                        maybe_err
1249                    } else {
1250                        // It's okay to proceed with the write.
1251                        Ok(())
1252                    };
1253
1254                    match maybe_err {
1255                        Ok(()) => {
1256                            let _permit = busy_signal.acquire().await;
1257
1258                            write.compare_and_append_batch(
1259                                &mut to_append[..],
1260                                batch_lower.clone(),
1261                                batch_upper.clone(),
1262                                validate_part_bounds_on_write,
1263                            )
1264                            .await
1265                            .expect("Invalid usage")
1266                        },
1267                        Err(e) => {
1268                            // We forward the synthesize error message, so that
1269                            // we go though the batch cleanup logic below.
1270                            Err(e)
1271                        }
1272                    }
1273                };
1274
1275
1276                // These metrics are independent of whether it was _us_ or
1277                // _someone_ that managed to commit a batch that advanced the
1278                // upper.
1279                source_statistics.update_snapshot_committed(&batch_upper);
1280                source_statistics.update_rehydration_latency_ms(&batch_upper);
1281                metrics
1282                    .progress
1283                    .set(mz_persist_client::metrics::encode_ts_metric(&batch_upper));
1284
1285                if collection_id.is_user() {
1286                    trace!(
1287                        "persist_sink {collection_id}/{shard_id}: \
1288                            append result for batch ({:?} -> {:?}): {:?}",
1289                        batch_lower,
1290                        batch_upper,
1291                        result
1292                    );
1293                }
1294
1295                match result {
1296                    Ok(()) => {
1297                        // Only update these metrics when we know that _we_ were
1298                        // successful.
1299                        source_statistics
1300                            .inc_updates_committed_by(batch_metrics.inserts + batch_metrics.retractions);
1301                        metrics.processed_batches.inc();
1302                        metrics.row_inserts.inc_by(batch_metrics.inserts);
1303                        metrics.row_retractions.inc_by(batch_metrics.retractions);
1304                        metrics.error_inserts.inc_by(batch_metrics.error_inserts);
1305                        metrics
1306                            .error_retractions
1307                            .inc_by(batch_metrics.error_retractions);
1308
1309                        current_upper.borrow_mut().clone_from(&batch_upper);
1310                        upper_cap_set.downgrade(current_upper.borrow().iter());
1311                    }
1312                    Err(mismatch) => {
1313                        // We tried to to a non-contiguous append, that won't work.
1314                        if PartialOrder::less_than(&mismatch.current, &batch_lower) {
1315                            // Best-effort attempt to delete unneeded batches.
1316                            future::join_all(batches.into_iter().map(|b| b.batch.delete())).await;
1317
1318                            // We always bail when this happens, regardless of
1319                            // `bail_on_concurrent_modification`.
1320                            tracing::warn!(
1321                                "persist_sink({}): invalid upper! \
1322                                    Tried to append batch ({:?} -> {:?}) but upper \
1323                                    is {:?}. This is surpising and likely indicates \
1324                                    a bug in the persist sink, but we'll restart the \
1325                                    dataflow and try again.",
1326                                collection_id, batch_lower, batch_upper, mismatch.current,
1327                            );
1328                            anyhow::bail!("collection concurrently modified. Ingestion dataflow will be restarted");
1329                        } else if PartialOrder::less_than(&mismatch.current, &batch_upper) {
1330                            // The shard's upper was ahead of our batch's lower
1331                            // but not ahead of our upper. Cut down the
1332                            // description by advancing its lower to the current
1333                            // shard upper and try again. IMPORTANT: We can only
1334                            // advance the lower, meaning we cut updates away,
1335                            // we must not "extend" the batch by changing to a
1336                            // lower that is not beyond the current lower. This
1337                            // invariant is checked by the first if branch: if
1338                            // `!(current_upper < lower)` then it holds that
1339                            // `lower <= current_upper`.
1340
1341                            // First, construct a new batch description with the
1342                            // lower advanced to the current shard upper.
1343                            let new_batch_lower = mismatch.current.clone();
1344                            let new_done_batch_metadata = (new_batch_lower.clone(), batch_upper.clone());
1345
1346                            // Retain any batches that are still in advance of
1347                            // the new lower, and delete any batches that are
1348                            // not.
1349                            let mut batch_delete_futures = vec![];
1350                            let mut new_batch_set = BatchSet::default();
1351                            for batch in batches {
1352                                if new_batch_lower.less_equal(&batch.data_ts) {
1353                                    new_batch_set.finished.push(batch);
1354                                } else {
1355                                    batch_delete_futures.push(batch.batch.delete());
1356                                }
1357                            }
1358
1359                            // Re-add the new batch to the list of batches to process.
1360                            todo.push_front((new_done_batch_metadata, new_batch_set));
1361
1362                            // Best-effort attempt to delete unneeded batches.
1363                            future::join_all(batch_delete_futures).await;
1364                        } else {
1365                            // Best-effort attempt to delete unneeded batches.
1366                            future::join_all(batches.into_iter().map(|b| b.batch.delete())).await;
1367                        }
1368
1369                        if bail_on_concurrent_modification {
1370                            tracing::warn!(
1371                                "persist_sink({}): invalid upper! \
1372                                    Tried to append batch ({:?} -> {:?}) but upper \
1373                                    is {:?}. This is not a problem, it just means \
1374                                    someone else was faster than us. We will try \
1375                                    again with a new batch description.",
1376                                collection_id, batch_lower, batch_upper, mismatch.current,
1377                            );
1378                            anyhow::bail!("collection concurrently modified. Ingestion dataflow will be restarted");
1379                        }
1380                    }
1381                }
1382            }
1383        }
1384    }));
1385
1386    (upper_stream, errors, shutdown_button.press_on_drop())
1387}