Skip to main content

mz_storage/render/
persist_sink.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Render an operator that persists a source collection.
11//!
12//! ## Implementation
13//!
14//! This module defines the `persist_sink` operator, that writes
15//! a collection produced by source rendering into a persist shard.
16//!
17//! It attempts to use all workers to write data to persist, and uses
18//! single-instance workers to coordinate work. The below diagram
19//! is an overview how it it shaped. There is more information
20//! in the doc comments of the top-level functions of this module.
21//!
22//!```text
23//!
24//!                                       ,------------.
25//!                                       | source     |
26//!                                       | collection |
27//!                                       +---+--------+
28//!                                       /   |
29//!                                      /    |
30//!                                     /     |
31//!                                    /      |
32//!                                   /       |
33//!                                  /        |
34//!                                 /         |
35//!                                /          |
36//!                               /     ,-+-----------------------.
37//!                              /      | mint_batch_descriptions |
38//!                             /       | one arbitrary worker    |
39//!                            |        +-,--,--------+----+------+
40//!                           ,----------´.-´         |     \
41//!                       _.-´ |       .-´            |      \
42//!                   _.-´     |    .-´               |       \
43//!                .-´  .------+----|-------+---------|--------\-----.
44//!               /    /            |       |         |         \     \
45//!        ,--------------.   ,-----------------.     |     ,-----------------.
46//!        | write_batches|   |  write_batches  |     |     |  write_batches  |
47//!        | worker 0     |   | worker 1        |     |     | worker N        |
48//!        +-----+--------+   +-+---------------+     |     +--+--------------+
49//!               \              \                    |        /
50//!                `-.            `,                  |       /
51//!                   `-._          `-.               |      /
52//!                       `-._         `-.            |     /
53//!                           `---------. `-.         |    /
54//!                                     +`---`---+-------------,
55//!                                     | append_batches       |
56//!                                     | one arbitrary worker |
57//!                                     +------+---------------+
58//!```
59//!
60//! ## Similarities with `mz_compute::sink::persist_sink`
61//!
62//! This module has many similarities with the compute version of
63//! the same concept, and in fact, is entirely derived from it.
64//!
65//! Compute requires that its `persist_sink` is _self-correcting_;
66//! that is, it corrects what the collection in persist
67//! accumulates to if the collection has values changed at
68//! previous timestamps. It does this by continually comparing
69//! the input stream with the collection as read back from persist.
70//!
71//! Source collections, while definite, cannot be reliably by
72//! re-produced once written down, which means compute's
73//! `persist_sink`'s self-correction mechanism would need to be
74//! skipped on operator startup, and would cause unnecessary read
75//! load on persist.
76//!
77//! Additionally, persisting sources requires we use bounded
78//! amounts of memory, even if a single timestamp represents
79//! a huge amount of data. This is not (currently) possible
80//! to guarantee while also performing self-correction.
81//!
82//! Because of this, we have ripped out the self-correction
83//! mechanism, and aggressively simplified the sub-operators.
84//! Some, particularly `append_batches` could be merged with
85//! the compute version, but that requires some amount of
86//! onerous refactoring that we have chosen to skip for now.
87//!
88// TODO(guswynn): merge at least the `append_batches` operator`
89
90use std::cmp::Ordering;
91use std::collections::{BTreeMap, VecDeque};
92use std::fmt::Debug;
93use std::ops::AddAssign;
94use std::rc::Rc;
95use std::sync::Arc;
96use std::time::Duration;
97
98use differential_dataflow::difference::Monoid;
99use differential_dataflow::lattice::Lattice;
100use differential_dataflow::{AsCollection, Hashable, VecCollection};
101use futures::{StreamExt, future};
102use itertools::Itertools;
103use mz_ore::cast::CastFrom;
104use mz_ore::collections::HashMap;
105use mz_persist_client::Diagnostics;
106use mz_persist_client::batch::{Batch, BatchBuilder, ProtoBatch};
107use mz_persist_client::cache::PersistClientCache;
108use mz_persist_client::error::UpperMismatch;
109use mz_persist_types::codec_impls::UnitSchema;
110use mz_persist_types::{Codec, Codec64};
111use mz_repr::{Diff, GlobalId, Row};
112use mz_storage_types::controller::CollectionMetadata;
113use mz_storage_types::errors::DataflowError;
114use mz_storage_types::sources::SourceData;
115use mz_storage_types::{StorageDiff, dyncfgs};
116use mz_timely_util::builder_async::{
117    Event, OperatorBuilder as AsyncOperatorBuilder, PressOnDropButton,
118};
119use serde::{Deserialize, Serialize};
120use timely::PartialOrder;
121use timely::container::CapacityContainerBuilder;
122use timely::dataflow::channels::pact::{Exchange, Pipeline};
123use timely::dataflow::operators::vec::Broadcast;
124use timely::dataflow::operators::{Capability, CapabilitySet, Inspect};
125use timely::dataflow::{Scope, Stream, StreamVec};
126use timely::progress::{Antichain, Timestamp};
127use tokio::sync::Semaphore;
128use tracing::trace;
129
130use crate::metrics::source::SourcePersistSinkMetrics;
131use crate::storage_state::StorageState;
132
133/// Metrics about batches.
134#[derive(Clone, Debug, Default, Deserialize, Serialize)]
135struct BatchMetrics {
136    inserts: u64,
137    retractions: u64,
138    error_inserts: u64,
139    error_retractions: u64,
140}
141
142impl AddAssign<&BatchMetrics> for BatchMetrics {
143    fn add_assign(&mut self, rhs: &BatchMetrics) {
144        let BatchMetrics {
145            inserts: self_inserts,
146            retractions: self_retractions,
147            error_inserts: self_error_inserts,
148            error_retractions: self_error_retractions,
149        } = self;
150        let BatchMetrics {
151            inserts: rhs_inserts,
152            retractions: rhs_retractions,
153            error_inserts: rhs_error_inserts,
154            error_retractions: rhs_error_retractions,
155        } = rhs;
156        *self_inserts += rhs_inserts;
157        *self_retractions += rhs_retractions;
158        *self_error_inserts += rhs_error_inserts;
159        *self_error_retractions += rhs_error_retractions;
160    }
161}
162
163/// Manages batches and metrics.
164struct BatchBuilderAndMetadata<K, V, T, D>
165where
166    K: Codec,
167    V: Codec,
168    T: Timestamp + Lattice + Codec64,
169{
170    builder: BatchBuilder<K, V, T, D>,
171    data_ts: T,
172    metrics: BatchMetrics,
173}
174
175impl<K, V, T, D> BatchBuilderAndMetadata<K, V, T, D>
176where
177    K: Codec + Debug,
178    V: Codec + Debug,
179    T: Timestamp + Lattice + Codec64,
180    D: Monoid + Codec64,
181{
182    /// Creates a new batch.
183    ///
184    /// NOTE(benesch): temporary restriction: all updates added to the batch
185    /// must be at the specified timestamp `data_ts`.
186    fn new(builder: BatchBuilder<K, V, T, D>, data_ts: T) -> Self {
187        BatchBuilderAndMetadata {
188            builder,
189            data_ts,
190            metrics: Default::default(),
191        }
192    }
193
194    /// Adds an update to the batch.
195    ///
196    /// NOTE(benesch): temporary restriction: all updates added to the batch
197    /// must be at the timestamp specified during creation.
198    async fn add(&mut self, k: &K, v: &V, t: &T, d: &D) {
199        assert_eq!(
200            self.data_ts, *t,
201            "BatchBuilderAndMetadata::add called with a timestamp {t:?} that does not match creation timestamp {:?}",
202            self.data_ts
203        );
204
205        self.builder.add(k, v, t, d).await.expect("invalid usage");
206    }
207
208    async fn finish(self, lower: Antichain<T>, upper: Antichain<T>) -> HollowBatchAndMetadata<T> {
209        let batch = self
210            .builder
211            .finish(upper.clone())
212            .await
213            .expect("invalid usage");
214        HollowBatchAndMetadata {
215            lower,
216            upper,
217            data_ts: self.data_ts,
218            batch: batch.into_transmittable_batch(),
219            metrics: self.metrics,
220        }
221    }
222}
223
224/// A batch or data + metrics moved from `write_batches` to `append_batches`.
225#[derive(Clone, Debug, Deserialize, Serialize)]
226#[serde(bound(
227    serialize = "T: Timestamp + Codec64",
228    deserialize = "T: Timestamp + Codec64"
229))]
230struct HollowBatchAndMetadata<T> {
231    lower: Antichain<T>,
232    upper: Antichain<T>,
233    data_ts: T,
234    batch: ProtoBatch,
235    metrics: BatchMetrics,
236}
237
238/// Holds finished batches for `append_batches`.
239#[derive(Debug, Default)]
240struct BatchSet {
241    finished: Vec<FinishedBatch>,
242    batch_metrics: BatchMetrics,
243}
244
245#[derive(Debug)]
246struct FinishedBatch {
247    batch: Batch<SourceData, (), mz_repr::Timestamp, StorageDiff>,
248    data_ts: mz_repr::Timestamp,
249}
250
251/// Continuously writes the `desired_stream` into persist
252/// This is done via a multi-stage operator graph:
253///
254/// 1. `mint_batch_descriptions` emits new batch descriptions whenever the
255///    frontier of `desired_collection` advances. A batch description is
256///    a pair of `(lower, upper)` that tells write operators
257///    which updates to write and in the end tells the append operator
258///    what frontiers to use when calling `append`/`compare_and_append`.
259///    This is a single-worker operator.
260/// 2. `write_batches` writes the `desired_collection` to persist as
261///    batches and sends those batches along.
262///    This does not yet append the batches to the persist shard, the update are
263///    only uploaded/prepared to be appended to a shard. Also: we only write
264///    updates for batch descriptions that we learned about from
265///    `mint_batch_descriptions`.
266/// 3. `append_batches` takes as input the minted batch descriptions and written
267///    batches. Whenever the frontiers sufficiently advance, we take a batch
268///    description and all the batches that belong to it and append it to the
269///    persist shard.
270///
271/// This operator assumes that the `desired_collection` comes pre-sharded.
272///
273/// Note that `mint_batch_descriptions` inspects the frontier of
274/// `desired_collection`, and passes the data through to `write_batches`.
275/// This is done to avoid a clone of the underlying data so that both
276/// operators can have the collection as input.
277pub(crate) fn render<G>(
278    scope: &G,
279    collection_id: GlobalId,
280    target: CollectionMetadata,
281    desired_collection: VecCollection<G, Result<Row, DataflowError>, Diff>,
282    storage_state: &StorageState,
283    metrics: SourcePersistSinkMetrics,
284    busy_signal: Arc<Semaphore>,
285) -> (
286    StreamVec<G, ()>,
287    StreamVec<G, Rc<anyhow::Error>>,
288    Vec<PressOnDropButton>,
289)
290where
291    G: Scope<Timestamp = mz_repr::Timestamp>,
292{
293    let persist_clients = Arc::clone(&storage_state.persist_clients);
294
295    let operator_name = format!("persist_sink({})", collection_id);
296
297    let (batch_descriptions, passthrough_desired_stream, mint_token) = mint_batch_descriptions(
298        scope,
299        collection_id,
300        &operator_name,
301        &target,
302        desired_collection,
303        Arc::clone(&persist_clients),
304    );
305
306    let (written_batches, write_token) = write_batches(
307        scope,
308        collection_id.clone(),
309        &operator_name,
310        &target,
311        batch_descriptions.clone(),
312        passthrough_desired_stream.as_collection(),
313        Arc::clone(&persist_clients),
314        storage_state,
315        Arc::clone(&busy_signal),
316    );
317
318    let (upper_stream, append_errors, append_token) = append_batches(
319        scope,
320        collection_id.clone(),
321        operator_name,
322        &target,
323        batch_descriptions,
324        written_batches,
325        persist_clients,
326        storage_state,
327        metrics,
328        Arc::clone(&busy_signal),
329    );
330
331    (
332        upper_stream,
333        append_errors,
334        vec![mint_token, write_token, append_token],
335    )
336}
337
338/// Whenever the frontier advances, this mints a new batch description (lower
339/// and upper) that writers should use for writing the next set of batches to
340/// persist.
341///
342/// Only one of the workers does this, meaning there will only be one
343/// description in the stream, even in case of multiple timely workers. Use
344/// `broadcast()` to, ahem, broadcast, the one description to all downstream
345/// write operators/workers.
346fn mint_batch_descriptions<G>(
347    scope: &G,
348    collection_id: GlobalId,
349    operator_name: &str,
350    target: &CollectionMetadata,
351    desired_collection: VecCollection<G, Result<Row, DataflowError>, Diff>,
352    persist_clients: Arc<PersistClientCache>,
353) -> (
354    StreamVec<G, (Antichain<mz_repr::Timestamp>, Antichain<mz_repr::Timestamp>)>,
355    StreamVec<G, (Result<Row, DataflowError>, mz_repr::Timestamp, Diff)>,
356    PressOnDropButton,
357)
358where
359    G: Scope<Timestamp = mz_repr::Timestamp>,
360{
361    let persist_location = target.persist_location.clone();
362    let shard_id = target.data_shard;
363    let target_relation_desc = target.relation_desc.clone();
364
365    // Only one worker is responsible for determining batch descriptions. All
366    // workers must write batches with the same description, to ensure that they
367    // can be combined into one batch that gets appended to Consensus state.
368    let hashed_id = collection_id.hashed();
369    let active_worker = usize::cast_from(hashed_id) % scope.peers() == scope.index();
370
371    // Only the "active" operator will mint batches. All other workers have an
372    // empty frontier. It's necessary to insert all of these into
373    // `compute_state.sink_write_frontier` below so we properly clear out
374    // default frontiers of non-active workers.
375
376    let mut mint_op = AsyncOperatorBuilder::new(
377        format!("{} mint_batch_descriptions", operator_name),
378        scope.clone(),
379    );
380
381    let (output, output_stream) = mint_op.new_output::<CapacityContainerBuilder<Vec<_>>>();
382    let (data_output, data_output_stream) =
383        mint_op.new_output::<CapacityContainerBuilder<Vec<_>>>();
384
385    // The description and the data-passthrough outputs are both driven by this input, so
386    // they use a standard input connection.
387    let mut desired_input =
388        mint_op.new_input_for_many(desired_collection.inner, Pipeline, [&output, &data_output]);
389
390    let shutdown_button = mint_op.build(move |capabilities| async move {
391        // Non-active workers should just pass the data through.
392        if !active_worker {
393            // The description output is entirely driven by the active worker, so we drop
394            // its capability here. The data-passthrough output just uses the data
395            // capabilities.
396            drop(capabilities);
397            while let Some(event) = desired_input.next().await {
398                match event {
399                    Event::Data([_output_cap, data_output_cap], mut data) => {
400                        data_output.give_container(&data_output_cap, &mut data);
401                    }
402                    Event::Progress(_) => {}
403                }
404            }
405            return;
406        }
407        // The data-passthrough output should will use the data capabilities, so we drop
408        // its capability here.
409        let [desc_cap, _]: [_; 2] = capabilities.try_into().expect("one capability per output");
410        let mut cap_set = CapabilitySet::from_elem(desc_cap);
411
412        // Initialize this operators's `upper` to the `upper` of the persist shard we are writing
413        // to. Data from the source not beyond this time will be dropped, as it has already
414        // been persisted.
415        // In the future, sources will avoid passing through data not beyond this upper
416        let mut current_upper = {
417            // TODO(aljoscha): We need to figure out what to do with error
418            // results from these calls.
419            let persist_client = persist_clients
420                .open(persist_location)
421                .await
422                .expect("could not open persist client");
423
424            let mut write = persist_client
425                .open_writer::<SourceData, (), mz_repr::Timestamp, StorageDiff>(
426                    shard_id,
427                    Arc::new(target_relation_desc),
428                    Arc::new(UnitSchema),
429                    Diagnostics {
430                        shard_name: collection_id.to_string(),
431                        handle_purpose: format!(
432                            "storage::persist_sink::mint_batch_descriptions {}",
433                            collection_id
434                        ),
435                    },
436                )
437                .await
438                .expect("could not open persist shard");
439
440            // TODO: this sink currently cannot tolerate a stale upper... which is bad because the
441            // upper can become stale as soon as it is read. (For example, if another concurrent
442            // instance of the sink has updated it.) Fetching a recent upper helps to mitigate this,
443            // but ideally we would just skip ahead if we discover that our upper is stale.
444            let upper = write.fetch_recent_upper().await.clone();
445            // explicitly expire the once-used write handle.
446            write.expire().await;
447            upper
448        };
449
450        // The current input frontiers.
451        let mut desired_frontier;
452
453        loop {
454            if let Some(event) = desired_input.next().await {
455                match event {
456                    Event::Data([_output_cap, data_output_cap], mut data) => {
457                        // Just passthrough the data.
458                        data_output.give_container(&data_output_cap, &mut data);
459                        continue;
460                    }
461                    Event::Progress(frontier) => {
462                        desired_frontier = frontier;
463                    }
464                }
465            } else {
466                // Input is exhausted, so we can shut down.
467                return;
468            };
469
470            // If the new frontier for the data input has progressed, produce a batch description.
471            if PartialOrder::less_than(&current_upper, &desired_frontier) {
472                // The maximal description range we can produce.
473                let batch_description = (current_upper.to_owned(), desired_frontier.to_owned());
474
475                let lower = batch_description.0.as_option().copied().unwrap();
476
477                let cap = cap_set
478                    .try_delayed(&lower)
479                    .ok_or_else(|| {
480                        format!(
481                            "minter cannot delay {:?} to {:?}. \
482                                Likely because we already emitted a \
483                                batch description and delayed.",
484                            cap_set, lower
485                        )
486                    })
487                    .unwrap();
488
489                trace!(
490                    "persist_sink {collection_id}/{shard_id}: \
491                        new batch_description: {:?}",
492                    batch_description
493                );
494
495                output.give(&cap, batch_description);
496
497                // We downgrade our capability to the batch
498                // description upper, as there will never be
499                // any overlapping descriptions.
500                trace!(
501                    "persist_sink {collection_id}/{shard_id}: \
502                        downgrading to {:?}",
503                    desired_frontier
504                );
505                cap_set.downgrade(desired_frontier.iter());
506
507                // After successfully emitting a new description, we can update the upper for the
508                // operator.
509                current_upper.clone_from(&desired_frontier);
510            }
511        }
512    });
513
514    (
515        output_stream,
516        data_output_stream,
517        shutdown_button.press_on_drop(),
518    )
519}
520
521/// Writes `desired_collection` to persist, but only for updates
522/// that fall into batch a description that we get via `batch_descriptions`.
523/// This forwards a `HollowBatch` (with additional metadata)
524/// for any batch of updates that was written.
525///
526/// This operator assumes that the `desired_collection` comes pre-sharded.
527///
528/// This also and updates various metrics.
529fn write_batches<G>(
530    scope: &G,
531    collection_id: GlobalId,
532    operator_name: &str,
533    target: &CollectionMetadata,
534    batch_descriptions: Stream<
535        G,
536        Vec<(Antichain<mz_repr::Timestamp>, Antichain<mz_repr::Timestamp>)>,
537    >,
538    desired_collection: VecCollection<G, Result<Row, DataflowError>, Diff>,
539    persist_clients: Arc<PersistClientCache>,
540    storage_state: &StorageState,
541    busy_signal: Arc<Semaphore>,
542) -> (
543    StreamVec<G, HollowBatchAndMetadata<mz_repr::Timestamp>>,
544    PressOnDropButton,
545)
546where
547    G: Scope<Timestamp = mz_repr::Timestamp>,
548{
549    let worker_index = scope.index();
550
551    let persist_location = target.persist_location.clone();
552    let shard_id = target.data_shard;
553    let target_relation_desc = target.relation_desc.clone();
554
555    let source_statistics = storage_state
556        .aggregated_statistics
557        .get_source(&collection_id)
558        .expect("statistics initialized")
559        .clone();
560
561    let mut write_op =
562        AsyncOperatorBuilder::new(format!("{} write_batches", operator_name), scope.clone());
563
564    let (output, output_stream) = write_op.new_output::<CapacityContainerBuilder<Vec<_>>>();
565
566    let mut descriptions_input =
567        write_op.new_input_for(batch_descriptions.broadcast(), Pipeline, &output);
568    let mut desired_input = write_op.new_disconnected_input(desired_collection.inner, Pipeline);
569
570    // This operator accepts the current and desired update streams for a `persist` shard.
571    // It attempts to write out updates, starting from the current's upper frontier, that
572    // will cause the changes of desired to be committed to persist, _but only those also past the
573    // upper_.
574
575    let shutdown_button = write_op.build(move |_capabilities| async move {
576        // In-progress batches of data, keyed by timestamp.
577        let mut stashed_batches = BTreeMap::new();
578
579        // Contains descriptions of batches for which we know that we can
580        // write data. We got these from the "centralized" operator that
581        // determines batch descriptions for all writers.
582        //
583        // `Antichain` does not implement `Ord`, so we cannot use a `BTreeMap`. We need to search
584        // through the map, so we cannot use the `mz_ore` wrapper either.
585        #[allow(clippy::disallowed_types)]
586        let mut in_flight_batches = std::collections::HashMap::<
587            (Antichain<mz_repr::Timestamp>, Antichain<mz_repr::Timestamp>),
588            Capability<mz_repr::Timestamp>,
589        >::new();
590
591        // TODO(aljoscha): We need to figure out what to do with error results from these calls.
592        let persist_client = persist_clients
593            .open(persist_location)
594            .await
595            .expect("could not open persist client");
596
597        let write = persist_client
598            .open_writer::<SourceData, (), mz_repr::Timestamp, StorageDiff>(
599                shard_id,
600                Arc::new(target_relation_desc),
601                Arc::new(UnitSchema),
602                Diagnostics {
603                    shard_name: collection_id.to_string(),
604                    handle_purpose: format!(
605                        "storage::persist_sink::write_batches {}",
606                        collection_id
607                    ),
608                },
609            )
610            .await
611            .expect("could not open persist shard");
612
613        // The current input frontiers.
614        let mut batch_descriptions_frontier = Antichain::from_elem(Timestamp::minimum());
615        let mut desired_frontier = Antichain::from_elem(Timestamp::minimum());
616
617        // The frontiers of the inputs we have processed, used to avoid redoing work
618        let mut processed_desired_frontier = Antichain::from_elem(Timestamp::minimum());
619        let mut processed_descriptions_frontier = Antichain::from_elem(Timestamp::minimum());
620
621        // A "safe" choice for the lower of new batches we are creating.
622        let mut operator_batch_lower = Antichain::from_elem(Timestamp::minimum());
623
624        while !(batch_descriptions_frontier.is_empty() && desired_frontier.is_empty()) {
625            // Wait for either inputs to become ready
626            tokio::select! {
627                _ = descriptions_input.ready() => {},
628                _ = desired_input.ready() => {},
629            }
630
631            // Collect ready work from both inputs
632            while let Some(event) = descriptions_input.next_sync() {
633                match event {
634                    Event::Data(cap, data) => {
635                        // Ingest new batch descriptions.
636                        for description in data {
637                            if collection_id.is_user() {
638                                trace!(
639                                    "persist_sink {collection_id}/{shard_id}: \
640                                        write_batches: \
641                                        new_description: {:?}, \
642                                        desired_frontier: {:?}, \
643                                        batch_descriptions_frontier: {:?}",
644                                    description, desired_frontier, batch_descriptions_frontier,
645                                );
646                            }
647                            match in_flight_batches.entry(description) {
648                                std::collections::hash_map::Entry::Vacant(v) => {
649                                    // This _should_ be `.retain`, but rust
650                                    // currently thinks we can't use `cap`
651                                    // as an owned value when using the
652                                    // match guard `Some(event)`
653                                    v.insert(cap.delayed(cap.time()));
654                                }
655                                std::collections::hash_map::Entry::Occupied(o) => {
656                                    let (description, _) = o.remove_entry();
657                                    panic!(
658                                        "write_batches: sink {} got more than one \
659                                            batch for description {:?}, in-flight: {:?}",
660                                        collection_id, description, in_flight_batches
661                                    );
662                                }
663                            }
664                        }
665                    }
666                    Event::Progress(frontier) => {
667                        batch_descriptions_frontier = frontier;
668                    }
669                }
670            }
671
672            let ready_events = std::iter::from_fn(|| desired_input.next_sync()).collect_vec();
673
674            // We know start the async work for the input we received. Until we finish the dataflow
675            // should be marked as busy.
676            let permit = busy_signal.acquire().await;
677
678            for event in ready_events {
679                match event {
680                    Event::Data(_cap, data) => {
681                        // Extract desired rows as positive contributions to `correction`.
682                        if collection_id.is_user() && !data.is_empty() {
683                            trace!(
684                                "persist_sink {collection_id}/{shard_id}: \
685                                    updates: {:?}, \
686                                    in-flight-batches: {:?}, \
687                                    desired_frontier: {:?}, \
688                                    batch_descriptions_frontier: {:?}",
689                                data,
690                                in_flight_batches,
691                                desired_frontier,
692                                batch_descriptions_frontier,
693                            );
694                        }
695
696                        for (row, ts, diff) in data {
697                            if write.upper().less_equal(&ts) {
698                                let builder = stashed_batches.entry(ts).or_insert_with(|| {
699                                    BatchBuilderAndMetadata::new(
700                                        write.builder(operator_batch_lower.clone()),
701                                        ts,
702                                    )
703                                });
704
705                                let is_value = row.is_ok();
706
707                                builder
708                                    .add(&SourceData(row), &(), &ts, &diff.into_inner())
709                                    .await;
710
711                                source_statistics.inc_updates_staged_by(1);
712
713                                // Note that we assume `diff` is either +1 or -1 here, being anything
714                                // else is a logic bug we can't handle at the metric layer. We also
715                                // assume this addition doesn't overflow.
716                                match (is_value, diff.is_positive()) {
717                                    (true, true) => builder.metrics.inserts += diff.unsigned_abs(),
718                                    (true, false) => {
719                                        builder.metrics.retractions += diff.unsigned_abs()
720                                    }
721                                    (false, true) => {
722                                        builder.metrics.error_inserts += diff.unsigned_abs()
723                                    }
724                                    (false, false) => {
725                                        builder.metrics.error_retractions += diff.unsigned_abs()
726                                    }
727                                }
728                            }
729                        }
730                    }
731                    Event::Progress(frontier) => {
732                        desired_frontier = frontier;
733                    }
734                }
735            }
736            // We may have the opportunity to commit updates, if either frontier
737            // has moved
738            if PartialOrder::less_equal(&processed_desired_frontier, &desired_frontier)
739                || PartialOrder::less_equal(
740                    &processed_descriptions_frontier,
741                    &batch_descriptions_frontier,
742                )
743            {
744                trace!(
745                    "persist_sink {collection_id}/{shard_id}: \
746                        CAN emit: \
747                        processed_desired_frontier: {:?}, \
748                        processed_descriptions_frontier: {:?}, \
749                        desired_frontier: {:?}, \
750                        batch_descriptions_frontier: {:?}",
751                    processed_desired_frontier,
752                    processed_descriptions_frontier,
753                    desired_frontier,
754                    batch_descriptions_frontier,
755                );
756
757                trace!(
758                    "persist_sink {collection_id}/{shard_id}: \
759                        in-flight batches: {:?}, \
760                        batch_descriptions_frontier: {:?}, \
761                        desired_frontier: {:?}",
762                    in_flight_batches, batch_descriptions_frontier, desired_frontier,
763                );
764
765                // We can write updates for a given batch description when
766                // a) the batch is not beyond `batch_descriptions_frontier`,
767                // and b) we know that we have seen all updates that would
768                // fall into the batch, from `desired_frontier`.
769                let ready_batches = in_flight_batches
770                    .keys()
771                    .filter(|(lower, upper)| {
772                        !PartialOrder::less_equal(&batch_descriptions_frontier, lower)
773                            && !PartialOrder::less_than(&desired_frontier, upper)
774                    })
775                    .cloned()
776                    .collect::<Vec<_>>();
777
778                trace!(
779                    "persist_sink {collection_id}/{shard_id}: \
780                        ready batches: {:?}",
781                    ready_batches,
782                );
783
784                for batch_description in ready_batches {
785                    let cap = in_flight_batches.remove(&batch_description).unwrap();
786
787                    if collection_id.is_user() {
788                        trace!(
789                            "persist_sink {collection_id}/{shard_id}: \
790                                emitting done batch: {:?}, cap: {:?}",
791                            batch_description, cap
792                        );
793                    }
794
795                    let (batch_lower, batch_upper) = batch_description;
796
797                    let finalized_timestamps: Vec<_> = stashed_batches
798                        .keys()
799                        .filter(|time| {
800                            batch_lower.less_equal(time) && !batch_upper.less_equal(time)
801                        })
802                        .copied()
803                        .collect();
804
805                    let mut batch_tokens = vec![];
806                    for ts in finalized_timestamps {
807                        let batch_builder = stashed_batches.remove(&ts).unwrap();
808
809                        if collection_id.is_user() {
810                            trace!(
811                                "persist_sink {collection_id}/{shard_id}: \
812                                    wrote batch from worker {}: ({:?}, {:?}),
813                                    containing {:?}",
814                                worker_index, batch_lower, batch_upper, batch_builder.metrics
815                            );
816                        }
817
818                        let batch = batch_builder
819                            .finish(batch_lower.clone(), batch_upper.clone())
820                            .await;
821
822                        // The next "safe" lower for batches is the meet (max) of all the emitted
823                        // batches. These uppers all are not beyond the `desired_frontier`, which
824                        // means all updates received by this operator will be beyond this lower.
825                        // Additionally, the `mint_batch_descriptions` operator ensures that
826                        // later-received batch descriptions will start beyond these uppers as
827                        // well.
828                        //
829                        // It is impossible to emit a batch description that is
830                        // beyond a not-yet emitted description in `in_flight_batches`, as
831                        // a that description would also have been chosen as ready above.
832                        operator_batch_lower = operator_batch_lower.join(&batch_upper);
833                        batch_tokens.push(batch);
834                    }
835
836                    output.give_container(&cap, &mut batch_tokens);
837
838                    processed_desired_frontier.clone_from(&desired_frontier);
839                    processed_descriptions_frontier.clone_from(&batch_descriptions_frontier);
840                }
841            } else {
842                trace!(
843                    "persist_sink {collection_id}/{shard_id}: \
844                        cannot emit: processed_desired_frontier: {:?}, \
845                        processed_descriptions_frontier: {:?}, \
846                        desired_frontier: {:?}",
847                    processed_desired_frontier, processed_descriptions_frontier, desired_frontier
848                );
849            }
850            drop(permit);
851        }
852    });
853
854    let output_stream = if collection_id.is_user() {
855        output_stream.inspect(|d| trace!("batch: {:?}", d))
856    } else {
857        output_stream
858    };
859
860    (output_stream, shutdown_button.press_on_drop())
861}
862
863/// Fuses written batches together and appends them to persist using one
864/// `compare_and_append` call. Writing only happens for batch descriptions where
865/// we know that no future batches will arrive, that is, for those batch
866/// descriptions that are not beyond the frontier of both the
867/// `batch_descriptions` and `batches` inputs.
868///
869/// This also keeps the shared frontier that is stored in `compute_state` in
870/// sync with the upper of the persist shard, and updates various metrics
871/// and statistics objects.
872fn append_batches<G>(
873    scope: &G,
874    collection_id: GlobalId,
875    operator_name: String,
876    target: &CollectionMetadata,
877    batch_descriptions: Stream<
878        G,
879        Vec<(Antichain<mz_repr::Timestamp>, Antichain<mz_repr::Timestamp>)>,
880    >,
881    batches: StreamVec<G, HollowBatchAndMetadata<mz_repr::Timestamp>>,
882    persist_clients: Arc<PersistClientCache>,
883    storage_state: &StorageState,
884    metrics: SourcePersistSinkMetrics,
885    busy_signal: Arc<Semaphore>,
886) -> (
887    StreamVec<G, ()>,
888    StreamVec<G, Rc<anyhow::Error>>,
889    PressOnDropButton,
890)
891where
892    G: Scope<Timestamp = mz_repr::Timestamp>,
893{
894    let persist_location = target.persist_location.clone();
895    let shard_id = target.data_shard;
896    let target_relation_desc = target.relation_desc.clone();
897
898    // We can only be lenient with concurrent modifications when we know that
899    // this source pipeline is using the feedback upsert operator, which works
900    // correctly when multiple instances of an ingestion pipeline produce
901    // different updates, because of concurrency/non-determinism.
902    let use_continual_feedback_upsert = dyncfgs::STORAGE_USE_CONTINUAL_FEEDBACK_UPSERT
903        .get(storage_state.storage_configuration.config_set());
904    let bail_on_concurrent_modification = !use_continual_feedback_upsert;
905
906    let mut read_only_rx = storage_state.read_only_rx.clone();
907
908    let operator_name = format!("{} append_batches", operator_name);
909    let mut append_op = AsyncOperatorBuilder::new(operator_name, scope.clone());
910
911    let hashed_id = collection_id.hashed();
912    let active_worker = usize::cast_from(hashed_id) % scope.peers() == scope.index();
913    let worker_id = scope.index();
914
915    // Both of these inputs are disconnected from the output capabilities of this operator, as
916    // any output of this operator is entirely driven by the `compare_and_append`s. Currently
917    // this operator has no outputs, but they may be added in the future, when merging with
918    // the compute `persist_sink`.
919    let mut descriptions_input =
920        append_op.new_disconnected_input(batch_descriptions, Exchange::new(move |_| hashed_id));
921    let mut batches_input =
922        append_op.new_disconnected_input(batches, Exchange::new(move |_| hashed_id));
923
924    let current_upper = Rc::clone(&storage_state.source_uppers[&collection_id]);
925    if !active_worker {
926        // This worker is not writing, so make sure it's "taken out" of the
927        // calculation by advancing to the empty frontier.
928        current_upper.borrow_mut().clear();
929    }
930
931    let source_statistics = storage_state
932        .aggregated_statistics
933        .get_source(&collection_id)
934        .expect("statistics initialized")
935        .clone();
936
937    // An output whose frontier tracks the last successful compare and append of this operator
938    let (_upper_output, upper_stream) = append_op.new_output::<CapacityContainerBuilder<Vec<_>>>();
939
940    // This operator accepts the batch descriptions and tokens that represent
941    // written batches. Written batches get appended to persist when we learn
942    // from our input frontiers that we have seen all batches for a given batch
943    // description.
944
945    let (shutdown_button, errors) = append_op.build_fallible(move |caps| Box::pin(async move {
946        let [upper_cap_set]: &mut [_; 1] = caps.try_into().unwrap();
947
948        // This may SEEM unnecessary, but metrics contains extra
949        // `DeleteOnDrop`-wrapped fields that will NOT be moved into this
950        // closure otherwise, dropping and destroying
951        // those metrics. This is because rust now only moves the
952        // explicitly-referenced fields into closures.
953        let metrics = metrics;
954
955        // Contains descriptions of batches for which we know that we can
956        // write data. We got these from the "centralized" operator that
957        // determines batch descriptions for all writers.
958        //
959        // `Antichain` does not implement `Ord`, so we cannot use a `BTreeSet`. We need to search
960        // through the set, so we cannot use the `mz_ore` wrapper either.
961        #[allow(clippy::disallowed_types)]
962        let mut in_flight_descriptions = std::collections::HashSet::<(
963            Antichain<mz_repr::Timestamp>,
964            Antichain<mz_repr::Timestamp>,
965        )>::new();
966
967        // In flight batches that haven't been `compare_and_append`'d yet, plus metrics about
968        // the batch.
969        let mut in_flight_batches = HashMap::<
970            (Antichain<mz_repr::Timestamp>, Antichain<mz_repr::Timestamp>),
971            BatchSet,
972        >::new();
973
974        source_statistics.initialize_rehydration_latency_ms();
975        if !active_worker {
976            // The non-active workers report that they are done snapshotting and hydrating.
977            let empty_frontier = Antichain::new();
978            source_statistics.initialize_snapshot_committed(&empty_frontier);
979            source_statistics.update_rehydration_latency_ms(&empty_frontier);
980            return Ok(());
981        }
982
983        let persist_client = persist_clients
984            .open(persist_location)
985            .await?;
986
987        let mut write = persist_client
988            .open_writer::<SourceData, (), mz_repr::Timestamp, StorageDiff>(
989                shard_id,
990                Arc::new(target_relation_desc),
991                Arc::new(UnitSchema),
992                Diagnostics {
993                    shard_name:collection_id.to_string(),
994                    handle_purpose: format!("persist_sink::append_batches {}", collection_id)
995                },
996            )
997            .await?;
998
999        // Initialize this sink's `upper` to the `upper` of the persist shard we are writing
1000        // to. Data from the source not beyond this time will be dropped, as it has already
1001        // been persisted.
1002        // In the future, sources will avoid passing through data not beyond this upper
1003        // VERY IMPORTANT: Only the active write worker must change the
1004        // shared upper. All other workers have already cleared this
1005        // upper above.
1006        current_upper.borrow_mut().clone_from(write.upper());
1007        upper_cap_set.downgrade(current_upper.borrow().iter());
1008        source_statistics.initialize_snapshot_committed(write.upper());
1009
1010        // The current input frontiers.
1011        let mut batch_description_frontier = Antichain::from_elem(Timestamp::minimum());
1012        let mut batches_frontier = Antichain::from_elem(Timestamp::minimum());
1013
1014        loop {
1015            tokio::select! {
1016                Some(event) = descriptions_input.next() => {
1017                    match event {
1018                        Event::Data(_cap, data) => {
1019                            // Ingest new batch descriptions.
1020                            for batch_description in data {
1021                                if collection_id.is_user() {
1022                                    trace!(
1023                                        "persist_sink {collection_id}/{shard_id}: \
1024                                            append_batches: sink {}, \
1025                                            new description: {:?}, \
1026                                            batch_description_frontier: {:?}",
1027                                        collection_id,
1028                                        batch_description,
1029                                        batch_description_frontier
1030                                    );
1031                                }
1032
1033                                // This line has to be broken up, or
1034                                // rustfmt fails in the whole function :(
1035                                let is_new = in_flight_descriptions.insert(
1036                                    batch_description.clone()
1037                                );
1038
1039                                assert!(
1040                                    is_new,
1041                                    "append_batches: sink {} got more than one batch \
1042                                        for a given description in-flight: {:?}",
1043                                    collection_id, in_flight_batches
1044                                );
1045                            }
1046
1047                            continue;
1048                        }
1049                        Event::Progress(frontier) => {
1050                            batch_description_frontier = frontier;
1051                        }
1052                    }
1053                }
1054                Some(event) = batches_input.next() => {
1055                    match event {
1056                        Event::Data(_cap, data) => {
1057                            for batch in data {
1058                                let batch_description = (batch.lower.clone(), batch.upper.clone());
1059
1060                                let batches = in_flight_batches
1061                                    .entry(batch_description)
1062                                    .or_default();
1063
1064                                batches.finished.push(FinishedBatch {
1065                                    batch: write.batch_from_transmittable_batch(batch.batch),
1066                                    data_ts: batch.data_ts,
1067                                });
1068                                batches.batch_metrics += &batch.metrics;
1069                            }
1070                            continue;
1071                        }
1072                        Event::Progress(frontier) => {
1073                            batches_frontier = frontier;
1074                        }
1075                    }
1076                }
1077                else => {
1078                    // All inputs are exhausted, so we can shut down.
1079                    return Ok(());
1080                }
1081            };
1082
1083            // Peel off any batches that are not beyond the frontier
1084            // anymore.
1085            //
1086            // It is correct to consider batches that are not beyond the
1087            // `batches_frontier` because it is held back by the writer
1088            // operator as long as a) the `batch_description_frontier` did
1089            // not advance and b) as long as the `desired_frontier` has not
1090            // advanced to the `upper` of a given batch description.
1091
1092            let mut done_batches = in_flight_descriptions
1093                .iter()
1094                .filter(|(lower, _upper)| !PartialOrder::less_equal(&batches_frontier, lower))
1095                .cloned()
1096                .collect::<Vec<_>>();
1097
1098            trace!(
1099                "persist_sink {collection_id}/{shard_id}: \
1100                    append_batches: in_flight: {:?}, \
1101                    done: {:?}, \
1102                    batch_frontier: {:?}, \
1103                    batch_description_frontier: {:?}",
1104                in_flight_descriptions,
1105                done_batches,
1106                batches_frontier,
1107                batch_description_frontier
1108            );
1109
1110            // Append batches in order, to ensure that their `lower` and
1111            // `upper` line up.
1112            done_batches.sort_by(|a, b| {
1113                if PartialOrder::less_than(a, b) {
1114                    Ordering::Less
1115                } else if PartialOrder::less_than(b, a) {
1116                    Ordering::Greater
1117                } else {
1118                    Ordering::Equal
1119                }
1120            });
1121
1122            let validate_part_bounds_on_write = write.validate_part_bounds_on_write();
1123            let mut todo = VecDeque::new();
1124
1125            if validate_part_bounds_on_write {
1126                // Persist will expect each batch's bounds to match the append-time bounds; write them separately.
1127                for done_batch_metadata in done_batches.drain(..) {
1128                    in_flight_descriptions.remove(&done_batch_metadata);
1129                    let batch_set = in_flight_batches
1130                        .remove(&done_batch_metadata)
1131                        .unwrap_or_default();
1132                    todo.push_back((done_batch_metadata, batch_set));
1133                }
1134            } else {
1135                // Persist should allow batches to be written as part of a single append even when the bounds don't
1136                // match exactly; group all eligible batches together.
1137                let mut combined_batch_metadata = None;
1138                let mut combined_batch_set = BatchSet::default();
1139                for done_batch_metadata in done_batches.drain(..) {
1140                    in_flight_descriptions.remove(&done_batch_metadata);
1141                    let mut batch_set = in_flight_batches
1142                        .remove(&done_batch_metadata)
1143                        .unwrap_or_default();
1144                    match combined_batch_metadata.as_mut() {
1145                        Some((_, upper)) => *upper = done_batch_metadata.1,
1146                        None => combined_batch_metadata = Some(done_batch_metadata),
1147                    }
1148                    combined_batch_set.batch_metrics += &batch_set.batch_metrics;
1149                    combined_batch_set.finished.append(&mut batch_set.finished);
1150                }
1151                if let Some(done_batch_metadata) = combined_batch_metadata {
1152                    todo.push_back((done_batch_metadata, combined_batch_set))
1153                }
1154            };
1155
1156            while let Some((done_batch_metadata, batch_set)) = todo.pop_front() {
1157                in_flight_descriptions.remove(&done_batch_metadata);
1158
1159                let mut batches = batch_set.finished;
1160
1161                trace!(
1162                    "persist_sink {collection_id}/{shard_id}: \
1163                        done batch: {:?}, {:?}",
1164                    done_batch_metadata,
1165                    batches
1166                );
1167
1168                let (batch_lower, batch_upper) = done_batch_metadata;
1169
1170                let batch_metrics = batch_set.batch_metrics;
1171
1172                let mut to_append = batches.iter_mut().map(|b| &mut b.batch).collect::<Vec<_>>();
1173
1174                let result = {
1175                    let maybe_err = if *read_only_rx.borrow() {
1176
1177                        // We have to wait for either us coming out of read-only
1178                        // mode or someone else applying a write that covers our
1179                        // batch.
1180                        //
1181                        // If we didn't wait for the latter here, and just go
1182                        // around the loop again, we might miss a moment where
1183                        // _we_ have to write down a batch. For example when our
1184                        // input frontier advances to a state where we can
1185                        // write, and the read-write instance sees the same
1186                        // update but then crashes before it can append a batch.
1187
1188                        let maybe_err = loop {
1189                            if collection_id.is_user() {
1190                                tracing::debug!(
1191                                    %worker_id,
1192                                    %collection_id,
1193                                    %shard_id,
1194                                    ?batch_lower,
1195                                    ?batch_upper,
1196                                    ?current_upper,
1197                                    "persist_sink is in read-only mode, waiting until we come out of it or the shard upper advances"
1198                                );
1199                            }
1200
1201                            // We don't try to be smart here, and for example
1202                            // use `wait_for_upper_past()`. We'd have to use a
1203                            // select!, which would require cancel safety of
1204                            // `wait_for_upper_past()`, which it doesn't
1205                            // advertise.
1206                            let _ = tokio::time::timeout(
1207                                Duration::from_secs(1),
1208                                read_only_rx.changed(),
1209                            )
1210                            .await;
1211
1212                            if !*read_only_rx.borrow() {
1213                                if collection_id.is_user() {
1214                                    tracing::debug!(
1215                                        %worker_id,
1216                                        %collection_id,
1217                                        %shard_id,
1218                                        ?batch_lower,
1219                                        ?batch_upper,
1220                                        ?current_upper,
1221                                        "persist_sink has come out of read-only mode"
1222                                    );
1223                                }
1224
1225                                // It's okay to write now.
1226                                break Ok(());
1227                            }
1228
1229                            let current_upper = write.fetch_recent_upper().await;
1230
1231                            if PartialOrder::less_than(&batch_upper, current_upper) {
1232                                // We synthesize an `UpperMismatch` so that we can go
1233                                // through the same logic below for trimming down our
1234                                // batches.
1235                                //
1236                                // Notably, we are not trying to be smart, and teach the
1237                                // write operator about read-only mode. Writing down
1238                                // those batches does not append anything to the persist
1239                                // shard, and it would be a hassle to figure out in the
1240                                // write workers how to trim down batches in read-only
1241                                // mode, when the shard upper advances.
1242                                //
1243                                // Right here, in the logic below, we have all we need
1244                                // for figuring out how to trim our batches.
1245
1246                                if collection_id.is_user() {
1247                                    tracing::debug!(
1248                                        %worker_id,
1249                                        %collection_id,
1250                                        %shard_id,
1251                                        ?batch_lower,
1252                                        ?batch_upper,
1253                                        ?current_upper,
1254                                        "persist_sink not appending in read-only mode"
1255                                    );
1256                                }
1257
1258                                break Err(UpperMismatch {
1259                                    current: current_upper.clone(),
1260                                    expected: batch_lower.clone()}
1261                                );
1262                            }
1263                        };
1264
1265                        maybe_err
1266                    } else {
1267                        // It's okay to proceed with the write.
1268                        Ok(())
1269                    };
1270
1271                    match maybe_err {
1272                        Ok(()) => {
1273                            let _permit = busy_signal.acquire().await;
1274
1275                            write.compare_and_append_batch(
1276                                &mut to_append[..],
1277                                batch_lower.clone(),
1278                                batch_upper.clone(),
1279                                validate_part_bounds_on_write,
1280                            )
1281                            .await
1282                            .expect("Invalid usage")
1283                        },
1284                        Err(e) => {
1285                            // We forward the synthesize error message, so that
1286                            // we go though the batch cleanup logic below.
1287                            Err(e)
1288                        }
1289                    }
1290                };
1291
1292
1293                // These metrics are independent of whether it was _us_ or
1294                // _someone_ that managed to commit a batch that advanced the
1295                // upper.
1296                source_statistics.update_snapshot_committed(&batch_upper);
1297                source_statistics.update_rehydration_latency_ms(&batch_upper);
1298                metrics
1299                    .progress
1300                    .set(mz_persist_client::metrics::encode_ts_metric(&batch_upper));
1301
1302                if collection_id.is_user() {
1303                    trace!(
1304                        "persist_sink {collection_id}/{shard_id}: \
1305                            append result for batch ({:?} -> {:?}): {:?}",
1306                        batch_lower,
1307                        batch_upper,
1308                        result
1309                    );
1310                }
1311
1312                match result {
1313                    Ok(()) => {
1314                        // Only update these metrics when we know that _we_ were
1315                        // successful.
1316                        let committed =
1317                            batch_metrics.inserts + batch_metrics.retractions;
1318                        source_statistics
1319                            .inc_updates_committed_by(committed);
1320                        metrics.processed_batches.inc();
1321                        metrics.row_inserts.inc_by(batch_metrics.inserts);
1322                        metrics.row_retractions.inc_by(batch_metrics.retractions);
1323                        metrics.error_inserts.inc_by(batch_metrics.error_inserts);
1324                        metrics
1325                            .error_retractions
1326                            .inc_by(batch_metrics.error_retractions);
1327
1328                        current_upper.borrow_mut().clone_from(&batch_upper);
1329                        upper_cap_set.downgrade(current_upper.borrow().iter());
1330                    }
1331                    Err(mismatch) => {
1332                        // We tried to to a non-contiguous append, that won't work.
1333                        if PartialOrder::less_than(&mismatch.current, &batch_lower) {
1334                            // Best-effort attempt to delete unneeded batches.
1335                            future::join_all(batches.into_iter().map(|b| b.batch.delete())).await;
1336
1337                            // We always bail when this happens, regardless of
1338                            // `bail_on_concurrent_modification`.
1339                            tracing::warn!(
1340                                "persist_sink({}): invalid upper! \
1341                                    Tried to append batch ({:?} -> {:?}) but upper \
1342                                    is {:?}. This is surpising and likely indicates \
1343                                    a bug in the persist sink, but we'll restart the \
1344                                    dataflow and try again.",
1345                                collection_id, batch_lower, batch_upper, mismatch.current,
1346                            );
1347                            anyhow::bail!("collection concurrently modified. Ingestion dataflow will be restarted");
1348                        } else if PartialOrder::less_than(&mismatch.current, &batch_upper) {
1349                            // The shard's upper was ahead of our batch's lower
1350                            // but not ahead of our upper. Cut down the
1351                            // description by advancing its lower to the current
1352                            // shard upper and try again. IMPORTANT: We can only
1353                            // advance the lower, meaning we cut updates away,
1354                            // we must not "extend" the batch by changing to a
1355                            // lower that is not beyond the current lower. This
1356                            // invariant is checked by the first if branch: if
1357                            // `!(current_upper < lower)` then it holds that
1358                            // `lower <= current_upper`.
1359
1360                            // First, construct a new batch description with the
1361                            // lower advanced to the current shard upper.
1362                            let new_batch_lower = mismatch.current.clone();
1363                            let new_done_batch_metadata =
1364                                (new_batch_lower.clone(), batch_upper.clone());
1365
1366                            // Retain any batches that are still in advance of
1367                            // the new lower, and delete any batches that are
1368                            // not.
1369                            let mut batch_delete_futures = vec![];
1370                            let mut new_batch_set = BatchSet::default();
1371                            for batch in batches {
1372                                if new_batch_lower.less_equal(&batch.data_ts) {
1373                                    new_batch_set.finished.push(batch);
1374                                } else {
1375                                    batch_delete_futures.push(batch.batch.delete());
1376                                }
1377                            }
1378
1379                            // Re-add the new batch to the list of batches to process.
1380                            todo.push_front((new_done_batch_metadata, new_batch_set));
1381
1382                            // Best-effort attempt to delete unneeded batches.
1383                            future::join_all(batch_delete_futures).await;
1384                        } else {
1385                            // Best-effort attempt to delete unneeded batches.
1386                            future::join_all(batches.into_iter().map(|b| b.batch.delete())).await;
1387                        }
1388
1389                        if bail_on_concurrent_modification {
1390                            tracing::warn!(
1391                                "persist_sink({}): invalid upper! \
1392                                    Tried to append batch ({:?} -> {:?}) but upper \
1393                                    is {:?}. This is not a problem, it just means \
1394                                    someone else was faster than us. We will try \
1395                                    again with a new batch description.",
1396                                collection_id, batch_lower, batch_upper, mismatch.current,
1397                            );
1398                            anyhow::bail!("collection concurrently modified. Ingestion dataflow will be restarted");
1399                        }
1400                    }
1401                }
1402            }
1403        }
1404    }));
1405
1406    (upper_stream, errors, shutdown_button.press_on_drop())
1407}