Skip to main content

mz_storage/render/
persist_sink.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Render an operator that persists a source collection.
11//!
12//! ## Implementation
13//!
14//! This module defines the `persist_sink` operator, that writes
15//! a collection produced by source rendering into a persist shard.
16//!
17//! It attempts to use all workers to write data to persist, and uses
18//! single-instance workers to coordinate work. The below diagram
19//! is an overview how it it shaped. There is more information
20//! in the doc comments of the top-level functions of this module.
21//!
22//!```text
23//!
24//!                                       ,------------.
25//!                                       | source     |
26//!                                       | collection |
27//!                                       +---+--------+
28//!                                       /   |
29//!                                      /    |
30//!                                     /     |
31//!                                    /      |
32//!                                   /       |
33//!                                  /        |
34//!                                 /         |
35//!                                /          |
36//!                               /     ,-+-----------------------.
37//!                              /      | mint_batch_descriptions |
38//!                             /       | one arbitrary worker    |
39//!                            |        +-,--,--------+----+------+
40//!                           ,----------´.-´         |     \
41//!                       _.-´ |       .-´            |      \
42//!                   _.-´     |    .-´               |       \
43//!                .-´  .------+----|-------+---------|--------\-----.
44//!               /    /            |       |         |         \     \
45//!        ,--------------.   ,-----------------.     |     ,-----------------.
46//!        | write_batches|   |  write_batches  |     |     |  write_batches  |
47//!        | worker 0     |   | worker 1        |     |     | worker N        |
48//!        +-----+--------+   +-+---------------+     |     +--+--------------+
49//!               \              \                    |        /
50//!                `-.            `,                  |       /
51//!                   `-._          `-.               |      /
52//!                       `-._         `-.            |     /
53//!                           `---------. `-.         |    /
54//!                                     +`---`---+-------------,
55//!                                     | append_batches       |
56//!                                     | one arbitrary worker |
57//!                                     +------+---------------+
58//!```
59//!
60//! ## Similarities with `mz_compute::sink::persist_sink`
61//!
62//! This module has many similarities with the compute version of
63//! the same concept, and in fact, is entirely derived from it.
64//!
65//! Compute requires that its `persist_sink` is _self-correcting_;
66//! that is, it corrects what the collection in persist
67//! accumulates to if the collection has values changed at
68//! previous timestamps. It does this by continually comparing
69//! the input stream with the collection as read back from persist.
70//!
71//! Source collections, while definite, cannot be reliably by
72//! re-produced once written down, which means compute's
73//! `persist_sink`'s self-correction mechanism would need to be
74//! skipped on operator startup, and would cause unnecessary read
75//! load on persist.
76//!
77//! Additionally, persisting sources requires we use bounded
78//! amounts of memory, even if a single timestamp represents
79//! a huge amount of data. This is not (currently) possible
80//! to guarantee while also performing self-correction.
81//!
82//! Because of this, we have ripped out the self-correction
83//! mechanism, and aggressively simplified the sub-operators.
84//! Some, particularly `append_batches` could be merged with
85//! the compute version, but that requires some amount of
86//! onerous refactoring that we have chosen to skip for now.
87//!
88// TODO(guswynn): merge at least the `append_batches` operator`
89
90use std::cmp::Ordering;
91use std::collections::{BTreeMap, VecDeque};
92use std::fmt::Debug;
93use std::ops::AddAssign;
94use std::rc::Rc;
95use std::sync::Arc;
96use std::time::Duration;
97
98use differential_dataflow::difference::Monoid;
99use differential_dataflow::lattice::Lattice;
100use differential_dataflow::{AsCollection, Hashable, VecCollection};
101use futures::{StreamExt, future};
102use itertools::Itertools;
103use mz_ore::cast::CastFrom;
104use mz_ore::collections::HashMap;
105use mz_persist_client::Diagnostics;
106use mz_persist_client::batch::{Batch, BatchBuilder, ProtoBatch};
107use mz_persist_client::cache::PersistClientCache;
108use mz_persist_client::error::UpperMismatch;
109use mz_persist_types::codec_impls::UnitSchema;
110use mz_persist_types::{Codec, Codec64};
111use mz_repr::{Diff, GlobalId, Row};
112use mz_storage_types::controller::CollectionMetadata;
113use mz_storage_types::errors::DataflowError;
114use mz_storage_types::sources::SourceData;
115use mz_storage_types::{StorageDiff, dyncfgs};
116use mz_timely_util::builder_async::{
117    Event, OperatorBuilder as AsyncOperatorBuilder, PressOnDropButton,
118};
119use serde::{Deserialize, Serialize};
120use timely::PartialOrder;
121use timely::container::CapacityContainerBuilder;
122use timely::dataflow::channels::pact::{Exchange, Pipeline};
123use timely::dataflow::operators::vec::Broadcast;
124use timely::dataflow::operators::{Capability, CapabilitySet, Inspect};
125use timely::dataflow::{Scope, Stream, StreamVec};
126use timely::progress::{Antichain, Timestamp};
127use tokio::sync::Semaphore;
128use tracing::trace;
129
130use crate::metrics::source::SourcePersistSinkMetrics;
131use crate::storage_state::StorageState;
132
133/// Metrics about batches.
134#[derive(Clone, Debug, Default, Deserialize, Serialize)]
135struct BatchMetrics {
136    inserts: u64,
137    retractions: u64,
138    error_inserts: u64,
139    error_retractions: u64,
140}
141
142impl AddAssign<&BatchMetrics> for BatchMetrics {
143    fn add_assign(&mut self, rhs: &BatchMetrics) {
144        let BatchMetrics {
145            inserts: self_inserts,
146            retractions: self_retractions,
147            error_inserts: self_error_inserts,
148            error_retractions: self_error_retractions,
149        } = self;
150        let BatchMetrics {
151            inserts: rhs_inserts,
152            retractions: rhs_retractions,
153            error_inserts: rhs_error_inserts,
154            error_retractions: rhs_error_retractions,
155        } = rhs;
156        *self_inserts += rhs_inserts;
157        *self_retractions += rhs_retractions;
158        *self_error_inserts += rhs_error_inserts;
159        *self_error_retractions += rhs_error_retractions;
160    }
161}
162
163/// Manages batches and metrics.
164struct BatchBuilderAndMetadata<K, V, T, D>
165where
166    K: Codec,
167    V: Codec,
168    T: Timestamp + Lattice + Codec64,
169{
170    builder: BatchBuilder<K, V, T, D>,
171    data_ts: T,
172    metrics: BatchMetrics,
173}
174
175impl<K, V, T, D> BatchBuilderAndMetadata<K, V, T, D>
176where
177    K: Codec + Debug,
178    V: Codec + Debug,
179    T: Timestamp + Lattice + Codec64,
180    D: Monoid + Codec64,
181{
182    /// Creates a new batch.
183    ///
184    /// NOTE(benesch): temporary restriction: all updates added to the batch
185    /// must be at the specified timestamp `data_ts`.
186    fn new(builder: BatchBuilder<K, V, T, D>, data_ts: T) -> Self {
187        BatchBuilderAndMetadata {
188            builder,
189            data_ts,
190            metrics: Default::default(),
191        }
192    }
193
194    /// Adds an update to the batch.
195    ///
196    /// NOTE(benesch): temporary restriction: all updates added to the batch
197    /// must be at the timestamp specified during creation.
198    async fn add(&mut self, k: &K, v: &V, t: &T, d: &D) {
199        assert_eq!(
200            self.data_ts, *t,
201            "BatchBuilderAndMetadata::add called with a timestamp {t:?} that does not match creation timestamp {:?}",
202            self.data_ts
203        );
204
205        self.builder.add(k, v, t, d).await.expect("invalid usage");
206    }
207
208    async fn finish(self, lower: Antichain<T>, upper: Antichain<T>) -> HollowBatchAndMetadata<T> {
209        let batch = self
210            .builder
211            .finish(upper.clone())
212            .await
213            .expect("invalid usage");
214        HollowBatchAndMetadata {
215            lower,
216            upper,
217            data_ts: self.data_ts,
218            batch: batch.into_transmittable_batch(),
219            metrics: self.metrics,
220        }
221    }
222}
223
224/// A batch or data + metrics moved from `write_batches` to `append_batches`.
225#[derive(Clone, Debug, Deserialize, Serialize)]
226#[serde(bound(
227    serialize = "T: Timestamp + Codec64",
228    deserialize = "T: Timestamp + Codec64"
229))]
230struct HollowBatchAndMetadata<T> {
231    lower: Antichain<T>,
232    upper: Antichain<T>,
233    data_ts: T,
234    batch: ProtoBatch,
235    metrics: BatchMetrics,
236}
237
238/// Holds finished batches for `append_batches`.
239#[derive(Debug, Default)]
240struct BatchSet {
241    finished: Vec<FinishedBatch>,
242    batch_metrics: BatchMetrics,
243}
244
245#[derive(Debug)]
246struct FinishedBatch {
247    batch: Batch<SourceData, (), mz_repr::Timestamp, StorageDiff>,
248    data_ts: mz_repr::Timestamp,
249}
250
251/// Continuously writes the `desired_stream` into persist
252/// This is done via a multi-stage operator graph:
253///
254/// 1. `mint_batch_descriptions` emits new batch descriptions whenever the
255///    frontier of `desired_collection` advances. A batch description is
256///    a pair of `(lower, upper)` that tells write operators
257///    which updates to write and in the end tells the append operator
258///    what frontiers to use when calling `append`/`compare_and_append`.
259///    This is a single-worker operator.
260/// 2. `write_batches` writes the `desired_collection` to persist as
261///    batches and sends those batches along.
262///    This does not yet append the batches to the persist shard, the update are
263///    only uploaded/prepared to be appended to a shard. Also: we only write
264///    updates for batch descriptions that we learned about from
265///    `mint_batch_descriptions`.
266/// 3. `append_batches` takes as input the minted batch descriptions and written
267///    batches. Whenever the frontiers sufficiently advance, we take a batch
268///    description and all the batches that belong to it and append it to the
269///    persist shard.
270///
271/// This operator assumes that the `desired_collection` comes pre-sharded.
272///
273/// Note that `mint_batch_descriptions` inspects the frontier of
274/// `desired_collection`, and passes the data through to `write_batches`.
275/// This is done to avoid a clone of the underlying data so that both
276/// operators can have the collection as input.
277pub(crate) fn render<'scope>(
278    scope: Scope<'scope, mz_repr::Timestamp>,
279    collection_id: GlobalId,
280    target: CollectionMetadata,
281    desired_collection: VecCollection<'scope, mz_repr::Timestamp, Result<Row, DataflowError>, Diff>,
282    storage_state: &StorageState,
283    metrics: SourcePersistSinkMetrics,
284    busy_signal: Arc<Semaphore>,
285) -> (
286    StreamVec<'scope, mz_repr::Timestamp, ()>,
287    StreamVec<'scope, mz_repr::Timestamp, Rc<anyhow::Error>>,
288    Vec<PressOnDropButton>,
289) {
290    let persist_clients = Arc::clone(&storage_state.persist_clients);
291
292    let operator_name = format!("persist_sink({})", collection_id);
293
294    let (batch_descriptions, passthrough_desired_stream, mint_token) = mint_batch_descriptions(
295        scope,
296        collection_id,
297        &operator_name,
298        &target,
299        desired_collection,
300        Arc::clone(&persist_clients),
301    );
302
303    let (written_batches, write_token) = write_batches(
304        scope,
305        collection_id.clone(),
306        &operator_name,
307        &target,
308        batch_descriptions.clone(),
309        passthrough_desired_stream.as_collection(),
310        Arc::clone(&persist_clients),
311        storage_state,
312        Arc::clone(&busy_signal),
313    );
314
315    let (upper_stream, append_errors, append_token) = append_batches(
316        scope,
317        collection_id.clone(),
318        operator_name,
319        &target,
320        batch_descriptions,
321        written_batches,
322        persist_clients,
323        storage_state,
324        metrics,
325        Arc::clone(&busy_signal),
326    );
327
328    (
329        upper_stream,
330        append_errors,
331        vec![mint_token, write_token, append_token],
332    )
333}
334
335/// Whenever the frontier advances, this mints a new batch description (lower
336/// and upper) that writers should use for writing the next set of batches to
337/// persist.
338///
339/// Only one of the workers does this, meaning there will only be one
340/// description in the stream, even in case of multiple timely workers. Use
341/// `broadcast()` to, ahem, broadcast, the one description to all downstream
342/// write operators/workers.
343fn mint_batch_descriptions<'scope>(
344    scope: Scope<'scope, mz_repr::Timestamp>,
345    collection_id: GlobalId,
346    operator_name: &str,
347    target: &CollectionMetadata,
348    desired_collection: VecCollection<'scope, mz_repr::Timestamp, Result<Row, DataflowError>, Diff>,
349    persist_clients: Arc<PersistClientCache>,
350) -> (
351    StreamVec<
352        'scope,
353        mz_repr::Timestamp,
354        (Antichain<mz_repr::Timestamp>, Antichain<mz_repr::Timestamp>),
355    >,
356    StreamVec<'scope, mz_repr::Timestamp, (Result<Row, DataflowError>, mz_repr::Timestamp, Diff)>,
357    PressOnDropButton,
358) {
359    let persist_location = target.persist_location.clone();
360    let shard_id = target.data_shard;
361    let target_relation_desc = target.relation_desc.clone();
362
363    // Only one worker is responsible for determining batch descriptions. All
364    // workers must write batches with the same description, to ensure that they
365    // can be combined into one batch that gets appended to Consensus state.
366    let hashed_id = collection_id.hashed();
367    let active_worker = usize::cast_from(hashed_id) % scope.peers() == scope.index();
368
369    // Only the "active" operator will mint batches. All other workers have an
370    // empty frontier. It's necessary to insert all of these into
371    // `compute_state.sink_write_frontier` below so we properly clear out
372    // default frontiers of non-active workers.
373
374    let mut mint_op = AsyncOperatorBuilder::new(
375        format!("{} mint_batch_descriptions", operator_name),
376        scope.clone(),
377    );
378
379    let (output, output_stream) = mint_op.new_output::<CapacityContainerBuilder<Vec<_>>>();
380    let (data_output, data_output_stream) =
381        mint_op.new_output::<CapacityContainerBuilder<Vec<_>>>();
382
383    // The description and the data-passthrough outputs are both driven by this input, so
384    // they use a standard input connection.
385    let mut desired_input =
386        mint_op.new_input_for_many(desired_collection.inner, Pipeline, [&output, &data_output]);
387
388    let shutdown_button = mint_op.build(move |capabilities| async move {
389        // Non-active workers should just pass the data through.
390        if !active_worker {
391            // The description output is entirely driven by the active worker, so we drop
392            // its capability here. The data-passthrough output just uses the data
393            // capabilities.
394            drop(capabilities);
395            while let Some(event) = desired_input.next().await {
396                match event {
397                    Event::Data([_output_cap, data_output_cap], mut data) => {
398                        data_output.give_container(&data_output_cap, &mut data);
399                    }
400                    Event::Progress(_) => {}
401                }
402            }
403            return;
404        }
405        // The data-passthrough output should will use the data capabilities, so we drop
406        // its capability here.
407        let [desc_cap, _]: [_; 2] = capabilities.try_into().expect("one capability per output");
408        let mut cap_set = CapabilitySet::from_elem(desc_cap);
409
410        // Initialize this operators's `upper` to the `upper` of the persist shard we are writing
411        // to. Data from the source not beyond this time will be dropped, as it has already
412        // been persisted.
413        // In the future, sources will avoid passing through data not beyond this upper
414        let mut current_upper = {
415            // TODO(aljoscha): We need to figure out what to do with error
416            // results from these calls.
417            let persist_client = persist_clients
418                .open(persist_location)
419                .await
420                .expect("could not open persist client");
421
422            let mut write = persist_client
423                .open_writer::<SourceData, (), mz_repr::Timestamp, StorageDiff>(
424                    shard_id,
425                    Arc::new(target_relation_desc),
426                    Arc::new(UnitSchema),
427                    Diagnostics {
428                        shard_name: collection_id.to_string(),
429                        handle_purpose: format!(
430                            "storage::persist_sink::mint_batch_descriptions {}",
431                            collection_id
432                        ),
433                    },
434                )
435                .await
436                .expect("could not open persist shard");
437
438            // TODO: this sink currently cannot tolerate a stale upper... which is bad because the
439            // upper can become stale as soon as it is read. (For example, if another concurrent
440            // instance of the sink has updated it.) Fetching a recent upper helps to mitigate this,
441            // but ideally we would just skip ahead if we discover that our upper is stale.
442            let upper = write.fetch_recent_upper().await.clone();
443            // explicitly expire the once-used write handle.
444            write.expire().await;
445            upper
446        };
447
448        // The current input frontiers.
449        let mut desired_frontier;
450
451        loop {
452            if let Some(event) = desired_input.next().await {
453                match event {
454                    Event::Data([_output_cap, data_output_cap], mut data) => {
455                        // Just passthrough the data.
456                        data_output.give_container(&data_output_cap, &mut data);
457                        continue;
458                    }
459                    Event::Progress(frontier) => {
460                        desired_frontier = frontier;
461                    }
462                }
463            } else {
464                // Input is exhausted, so we can shut down.
465                return;
466            };
467
468            // If the new frontier for the data input has progressed, produce a batch description.
469            if PartialOrder::less_than(&current_upper, &desired_frontier) {
470                // The maximal description range we can produce.
471                let batch_description = (current_upper.to_owned(), desired_frontier.to_owned());
472
473                let lower = batch_description.0.as_option().copied().unwrap();
474
475                let cap = cap_set
476                    .try_delayed(&lower)
477                    .ok_or_else(|| {
478                        format!(
479                            "minter cannot delay {:?} to {:?}. \
480                                Likely because we already emitted a \
481                                batch description and delayed.",
482                            cap_set, lower
483                        )
484                    })
485                    .unwrap();
486
487                trace!(
488                    "persist_sink {collection_id}/{shard_id}: \
489                        new batch_description: {:?}",
490                    batch_description
491                );
492
493                output.give(&cap, batch_description);
494
495                // We downgrade our capability to the batch
496                // description upper, as there will never be
497                // any overlapping descriptions.
498                trace!(
499                    "persist_sink {collection_id}/{shard_id}: \
500                        downgrading to {:?}",
501                    desired_frontier
502                );
503                cap_set.downgrade(desired_frontier.iter());
504
505                // After successfully emitting a new description, we can update the upper for the
506                // operator.
507                current_upper.clone_from(&desired_frontier);
508            }
509        }
510    });
511
512    (
513        output_stream,
514        data_output_stream,
515        shutdown_button.press_on_drop(),
516    )
517}
518
519/// Writes `desired_collection` to persist, but only for updates
520/// that fall into batch a description that we get via `batch_descriptions`.
521/// This forwards a `HollowBatch` (with additional metadata)
522/// for any batch of updates that was written.
523///
524/// This operator assumes that the `desired_collection` comes pre-sharded.
525///
526/// This also and updates various metrics.
527fn write_batches<'scope>(
528    scope: Scope<'scope, mz_repr::Timestamp>,
529    collection_id: GlobalId,
530    operator_name: &str,
531    target: &CollectionMetadata,
532    batch_descriptions: Stream<
533        'scope,
534        mz_repr::Timestamp,
535        Vec<(Antichain<mz_repr::Timestamp>, Antichain<mz_repr::Timestamp>)>,
536    >,
537    desired_collection: VecCollection<'scope, mz_repr::Timestamp, Result<Row, DataflowError>, Diff>,
538    persist_clients: Arc<PersistClientCache>,
539    storage_state: &StorageState,
540    busy_signal: Arc<Semaphore>,
541) -> (
542    StreamVec<'scope, mz_repr::Timestamp, HollowBatchAndMetadata<mz_repr::Timestamp>>,
543    PressOnDropButton,
544) {
545    let worker_index = scope.index();
546
547    let persist_location = target.persist_location.clone();
548    let shard_id = target.data_shard;
549    let target_relation_desc = target.relation_desc.clone();
550
551    let source_statistics = storage_state
552        .aggregated_statistics
553        .get_source(&collection_id)
554        .expect("statistics initialized")
555        .clone();
556
557    let mut write_op =
558        AsyncOperatorBuilder::new(format!("{} write_batches", operator_name), scope.clone());
559
560    let (output, output_stream) = write_op.new_output::<CapacityContainerBuilder<Vec<_>>>();
561
562    let mut descriptions_input =
563        write_op.new_input_for(batch_descriptions.broadcast(), Pipeline, &output);
564    let mut desired_input = write_op.new_disconnected_input(desired_collection.inner, Pipeline);
565
566    // This operator accepts the current and desired update streams for a `persist` shard.
567    // It attempts to write out updates, starting from the current's upper frontier, that
568    // will cause the changes of desired to be committed to persist, _but only those also past the
569    // upper_.
570
571    let shutdown_button = write_op.build(move |_capabilities| async move {
572        // In-progress batches of data, keyed by timestamp.
573        let mut stashed_batches = BTreeMap::new();
574
575        // Contains descriptions of batches for which we know that we can
576        // write data. We got these from the "centralized" operator that
577        // determines batch descriptions for all writers.
578        //
579        // `Antichain` does not implement `Ord`, so we cannot use a `BTreeMap`. We need to search
580        // through the map, so we cannot use the `mz_ore` wrapper either.
581        #[allow(clippy::disallowed_types)]
582        let mut in_flight_batches = std::collections::HashMap::<
583            (Antichain<mz_repr::Timestamp>, Antichain<mz_repr::Timestamp>),
584            Capability<mz_repr::Timestamp>,
585        >::new();
586
587        // TODO(aljoscha): We need to figure out what to do with error results from these calls.
588        let persist_client = persist_clients
589            .open(persist_location)
590            .await
591            .expect("could not open persist client");
592
593        let write = persist_client
594            .open_writer::<SourceData, (), mz_repr::Timestamp, StorageDiff>(
595                shard_id,
596                Arc::new(target_relation_desc),
597                Arc::new(UnitSchema),
598                Diagnostics {
599                    shard_name: collection_id.to_string(),
600                    handle_purpose: format!(
601                        "storage::persist_sink::write_batches {}",
602                        collection_id
603                    ),
604                },
605            )
606            .await
607            .expect("could not open persist shard");
608
609        // The current input frontiers.
610        let mut batch_descriptions_frontier = Antichain::from_elem(Timestamp::minimum());
611        let mut desired_frontier = Antichain::from_elem(Timestamp::minimum());
612
613        // The frontiers of the inputs we have processed, used to avoid redoing work
614        let mut processed_desired_frontier = Antichain::from_elem(Timestamp::minimum());
615        let mut processed_descriptions_frontier = Antichain::from_elem(Timestamp::minimum());
616
617        // A "safe" choice for the lower of new batches we are creating.
618        let mut operator_batch_lower = Antichain::from_elem(Timestamp::minimum());
619
620        while !(batch_descriptions_frontier.is_empty() && desired_frontier.is_empty()) {
621            // Wait for either inputs to become ready
622            tokio::select! {
623                _ = descriptions_input.ready() => {},
624                _ = desired_input.ready() => {},
625            }
626
627            // Collect ready work from both inputs
628            while let Some(event) = descriptions_input.next_sync() {
629                match event {
630                    Event::Data(cap, data) => {
631                        // Ingest new batch descriptions.
632                        for description in data {
633                            if collection_id.is_user() {
634                                trace!(
635                                    "persist_sink {collection_id}/{shard_id}: \
636                                        write_batches: \
637                                        new_description: {:?}, \
638                                        desired_frontier: {:?}, \
639                                        batch_descriptions_frontier: {:?}",
640                                    description, desired_frontier, batch_descriptions_frontier,
641                                );
642                            }
643                            match in_flight_batches.entry(description) {
644                                std::collections::hash_map::Entry::Vacant(v) => {
645                                    // This _should_ be `.retain`, but rust
646                                    // currently thinks we can't use `cap`
647                                    // as an owned value when using the
648                                    // match guard `Some(event)`
649                                    v.insert(cap.delayed(cap.time()));
650                                }
651                                std::collections::hash_map::Entry::Occupied(o) => {
652                                    let (description, _) = o.remove_entry();
653                                    panic!(
654                                        "write_batches: sink {} got more than one \
655                                            batch for description {:?}, in-flight: {:?}",
656                                        collection_id, description, in_flight_batches
657                                    );
658                                }
659                            }
660                        }
661                    }
662                    Event::Progress(frontier) => {
663                        batch_descriptions_frontier = frontier;
664                    }
665                }
666            }
667
668            let ready_events = std::iter::from_fn(|| desired_input.next_sync()).collect_vec();
669
670            // We know start the async work for the input we received. Until we finish the dataflow
671            // should be marked as busy.
672            let permit = busy_signal.acquire().await;
673
674            for event in ready_events {
675                match event {
676                    Event::Data(_cap, data) => {
677                        // Extract desired rows as positive contributions to `correction`.
678                        if collection_id.is_user() && !data.is_empty() {
679                            trace!(
680                                "persist_sink {collection_id}/{shard_id}: \
681                                    updates: {:?}, \
682                                    in-flight-batches: {:?}, \
683                                    desired_frontier: {:?}, \
684                                    batch_descriptions_frontier: {:?}",
685                                data,
686                                in_flight_batches,
687                                desired_frontier,
688                                batch_descriptions_frontier,
689                            );
690                        }
691
692                        for (row, ts, diff) in data {
693                            if write.upper().less_equal(&ts) {
694                                let builder = stashed_batches.entry(ts).or_insert_with(|| {
695                                    BatchBuilderAndMetadata::new(
696                                        write.builder(operator_batch_lower.clone()),
697                                        ts,
698                                    )
699                                });
700
701                                let is_value = row.is_ok();
702
703                                builder
704                                    .add(&SourceData(row), &(), &ts, &diff.into_inner())
705                                    .await;
706
707                                source_statistics.inc_updates_staged_by(1);
708
709                                // Note that we assume `diff` is either +1 or -1 here, being anything
710                                // else is a logic bug we can't handle at the metric layer. We also
711                                // assume this addition doesn't overflow.
712                                match (is_value, diff.is_positive()) {
713                                    (true, true) => builder.metrics.inserts += diff.unsigned_abs(),
714                                    (true, false) => {
715                                        builder.metrics.retractions += diff.unsigned_abs()
716                                    }
717                                    (false, true) => {
718                                        builder.metrics.error_inserts += diff.unsigned_abs()
719                                    }
720                                    (false, false) => {
721                                        builder.metrics.error_retractions += diff.unsigned_abs()
722                                    }
723                                }
724                            }
725                        }
726                    }
727                    Event::Progress(frontier) => {
728                        desired_frontier = frontier;
729                    }
730                }
731            }
732            // We may have the opportunity to commit updates, if either frontier
733            // has moved
734            if PartialOrder::less_equal(&processed_desired_frontier, &desired_frontier)
735                || PartialOrder::less_equal(
736                    &processed_descriptions_frontier,
737                    &batch_descriptions_frontier,
738                )
739            {
740                trace!(
741                    "persist_sink {collection_id}/{shard_id}: \
742                        CAN emit: \
743                        processed_desired_frontier: {:?}, \
744                        processed_descriptions_frontier: {:?}, \
745                        desired_frontier: {:?}, \
746                        batch_descriptions_frontier: {:?}",
747                    processed_desired_frontier,
748                    processed_descriptions_frontier,
749                    desired_frontier,
750                    batch_descriptions_frontier,
751                );
752
753                trace!(
754                    "persist_sink {collection_id}/{shard_id}: \
755                        in-flight batches: {:?}, \
756                        batch_descriptions_frontier: {:?}, \
757                        desired_frontier: {:?}",
758                    in_flight_batches, batch_descriptions_frontier, desired_frontier,
759                );
760
761                // We can write updates for a given batch description when
762                // a) the batch is not beyond `batch_descriptions_frontier`,
763                // and b) we know that we have seen all updates that would
764                // fall into the batch, from `desired_frontier`.
765                let ready_batches = in_flight_batches
766                    .keys()
767                    .filter(|(lower, upper)| {
768                        !PartialOrder::less_equal(&batch_descriptions_frontier, lower)
769                            && !PartialOrder::less_than(&desired_frontier, upper)
770                    })
771                    .cloned()
772                    .collect::<Vec<_>>();
773
774                trace!(
775                    "persist_sink {collection_id}/{shard_id}: \
776                        ready batches: {:?}",
777                    ready_batches,
778                );
779
780                for batch_description in ready_batches {
781                    let cap = in_flight_batches.remove(&batch_description).unwrap();
782
783                    if collection_id.is_user() {
784                        trace!(
785                            "persist_sink {collection_id}/{shard_id}: \
786                                emitting done batch: {:?}, cap: {:?}",
787                            batch_description, cap
788                        );
789                    }
790
791                    let (batch_lower, batch_upper) = batch_description;
792
793                    let finalized_timestamps: Vec<_> = stashed_batches
794                        .keys()
795                        .filter(|time| {
796                            batch_lower.less_equal(time) && !batch_upper.less_equal(time)
797                        })
798                        .copied()
799                        .collect();
800
801                    let mut batch_tokens = vec![];
802                    for ts in finalized_timestamps {
803                        let batch_builder = stashed_batches.remove(&ts).unwrap();
804
805                        if collection_id.is_user() {
806                            trace!(
807                                "persist_sink {collection_id}/{shard_id}: \
808                                    wrote batch from worker {}: ({:?}, {:?}),
809                                    containing {:?}",
810                                worker_index, batch_lower, batch_upper, batch_builder.metrics
811                            );
812                        }
813
814                        let batch = batch_builder
815                            .finish(batch_lower.clone(), batch_upper.clone())
816                            .await;
817
818                        // The next "safe" lower for batches is the meet (max) of all the emitted
819                        // batches. These uppers all are not beyond the `desired_frontier`, which
820                        // means all updates received by this operator will be beyond this lower.
821                        // Additionally, the `mint_batch_descriptions` operator ensures that
822                        // later-received batch descriptions will start beyond these uppers as
823                        // well.
824                        //
825                        // It is impossible to emit a batch description that is
826                        // beyond a not-yet emitted description in `in_flight_batches`, as
827                        // a that description would also have been chosen as ready above.
828                        operator_batch_lower = operator_batch_lower.join(&batch_upper);
829                        batch_tokens.push(batch);
830                    }
831
832                    output.give_container(&cap, &mut batch_tokens);
833
834                    processed_desired_frontier.clone_from(&desired_frontier);
835                    processed_descriptions_frontier.clone_from(&batch_descriptions_frontier);
836                }
837            } else {
838                trace!(
839                    "persist_sink {collection_id}/{shard_id}: \
840                        cannot emit: processed_desired_frontier: {:?}, \
841                        processed_descriptions_frontier: {:?}, \
842                        desired_frontier: {:?}",
843                    processed_desired_frontier, processed_descriptions_frontier, desired_frontier
844                );
845            }
846            drop(permit);
847        }
848    });
849
850    let output_stream = if collection_id.is_user() {
851        output_stream.inspect(|d| trace!("batch: {:?}", d))
852    } else {
853        output_stream
854    };
855
856    (output_stream, shutdown_button.press_on_drop())
857}
858
859/// Fuses written batches together and appends them to persist using one
860/// `compare_and_append` call. Writing only happens for batch descriptions where
861/// we know that no future batches will arrive, that is, for those batch
862/// descriptions that are not beyond the frontier of both the
863/// `batch_descriptions` and `batches` inputs.
864///
865/// This also keeps the shared frontier that is stored in `compute_state` in
866/// sync with the upper of the persist shard, and updates various metrics
867/// and statistics objects.
868fn append_batches<'scope>(
869    scope: Scope<'scope, mz_repr::Timestamp>,
870    collection_id: GlobalId,
871    operator_name: String,
872    target: &CollectionMetadata,
873    batch_descriptions: Stream<
874        'scope,
875        mz_repr::Timestamp,
876        Vec<(Antichain<mz_repr::Timestamp>, Antichain<mz_repr::Timestamp>)>,
877    >,
878    batches: StreamVec<'scope, mz_repr::Timestamp, HollowBatchAndMetadata<mz_repr::Timestamp>>,
879    persist_clients: Arc<PersistClientCache>,
880    storage_state: &StorageState,
881    metrics: SourcePersistSinkMetrics,
882    busy_signal: Arc<Semaphore>,
883) -> (
884    StreamVec<'scope, mz_repr::Timestamp, ()>,
885    StreamVec<'scope, mz_repr::Timestamp, Rc<anyhow::Error>>,
886    PressOnDropButton,
887) {
888    let persist_location = target.persist_location.clone();
889    let shard_id = target.data_shard;
890    let target_relation_desc = target.relation_desc.clone();
891
892    // We can only be lenient with concurrent modifications when we know that
893    // this source pipeline is using the feedback upsert operator, which works
894    // correctly when multiple instances of an ingestion pipeline produce
895    // different updates, because of concurrency/non-determinism.
896    let use_continual_feedback_upsert = dyncfgs::STORAGE_USE_CONTINUAL_FEEDBACK_UPSERT
897        .get(storage_state.storage_configuration.config_set());
898    let bail_on_concurrent_modification = !use_continual_feedback_upsert;
899
900    let mut read_only_rx = storage_state.read_only_rx.clone();
901
902    let operator_name = format!("{} append_batches", operator_name);
903    let mut append_op = AsyncOperatorBuilder::new(operator_name, scope.clone());
904
905    let hashed_id = collection_id.hashed();
906    let active_worker = usize::cast_from(hashed_id) % scope.peers() == scope.index();
907    let worker_id = scope.index();
908
909    // Both of these inputs are disconnected from the output capabilities of this operator, as
910    // any output of this operator is entirely driven by the `compare_and_append`s. Currently
911    // this operator has no outputs, but they may be added in the future, when merging with
912    // the compute `persist_sink`.
913    let mut descriptions_input =
914        append_op.new_disconnected_input(batch_descriptions, Exchange::new(move |_| hashed_id));
915    let mut batches_input =
916        append_op.new_disconnected_input(batches, Exchange::new(move |_| hashed_id));
917
918    let current_upper = Rc::clone(&storage_state.source_uppers[&collection_id]);
919    if !active_worker {
920        // This worker is not writing, so make sure it's "taken out" of the
921        // calculation by advancing to the empty frontier.
922        current_upper.borrow_mut().clear();
923    }
924
925    let source_statistics = storage_state
926        .aggregated_statistics
927        .get_source(&collection_id)
928        .expect("statistics initialized")
929        .clone();
930
931    // An output whose frontier tracks the last successful compare and append of this operator
932    let (_upper_output, upper_stream) = append_op.new_output::<CapacityContainerBuilder<Vec<_>>>();
933
934    // This operator accepts the batch descriptions and tokens that represent
935    // written batches. Written batches get appended to persist when we learn
936    // from our input frontiers that we have seen all batches for a given batch
937    // description.
938
939    let (shutdown_button, errors) = append_op.build_fallible(move |caps| Box::pin(async move {
940        let [upper_cap_set]: &mut [_; 1] = caps.try_into().unwrap();
941
942        // This may SEEM unnecessary, but metrics contains extra
943        // `DeleteOnDrop`-wrapped fields that will NOT be moved into this
944        // closure otherwise, dropping and destroying
945        // those metrics. This is because rust now only moves the
946        // explicitly-referenced fields into closures.
947        let metrics = metrics;
948
949        // Contains descriptions of batches for which we know that we can
950        // write data. We got these from the "centralized" operator that
951        // determines batch descriptions for all writers.
952        //
953        // `Antichain` does not implement `Ord`, so we cannot use a `BTreeSet`. We need to search
954        // through the set, so we cannot use the `mz_ore` wrapper either.
955        #[allow(clippy::disallowed_types)]
956        let mut in_flight_descriptions = std::collections::HashSet::<(
957            Antichain<mz_repr::Timestamp>,
958            Antichain<mz_repr::Timestamp>,
959        )>::new();
960
961        // In flight batches that haven't been `compare_and_append`'d yet, plus metrics about
962        // the batch.
963        let mut in_flight_batches = HashMap::<
964            (Antichain<mz_repr::Timestamp>, Antichain<mz_repr::Timestamp>),
965            BatchSet,
966        >::new();
967
968        source_statistics.initialize_rehydration_latency_ms();
969        if !active_worker {
970            // The non-active workers report that they are done snapshotting and hydrating.
971            let empty_frontier = Antichain::new();
972            source_statistics.initialize_snapshot_committed(&empty_frontier);
973            source_statistics.update_rehydration_latency_ms(&empty_frontier);
974            return Ok(());
975        }
976
977        let persist_client = persist_clients
978            .open(persist_location)
979            .await?;
980
981        let mut write = persist_client
982            .open_writer::<SourceData, (), mz_repr::Timestamp, StorageDiff>(
983                shard_id,
984                Arc::new(target_relation_desc),
985                Arc::new(UnitSchema),
986                Diagnostics {
987                    shard_name:collection_id.to_string(),
988                    handle_purpose: format!("persist_sink::append_batches {}", collection_id)
989                },
990            )
991            .await?;
992
993        // Initialize this sink's `upper` to the `upper` of the persist shard we are writing
994        // to. Data from the source not beyond this time will be dropped, as it has already
995        // been persisted.
996        // In the future, sources will avoid passing through data not beyond this upper
997        // VERY IMPORTANT: Only the active write worker must change the
998        // shared upper. All other workers have already cleared this
999        // upper above.
1000        current_upper.borrow_mut().clone_from(write.upper());
1001        upper_cap_set.downgrade(current_upper.borrow().iter());
1002        source_statistics.initialize_snapshot_committed(write.upper());
1003
1004        // The current input frontiers.
1005        let mut batch_description_frontier = Antichain::from_elem(Timestamp::minimum());
1006        let mut batches_frontier = Antichain::from_elem(Timestamp::minimum());
1007
1008        loop {
1009            tokio::select! {
1010                Some(event) = descriptions_input.next() => {
1011                    match event {
1012                        Event::Data(_cap, data) => {
1013                            // Ingest new batch descriptions.
1014                            for batch_description in data {
1015                                if collection_id.is_user() {
1016                                    trace!(
1017                                        "persist_sink {collection_id}/{shard_id}: \
1018                                            append_batches: sink {}, \
1019                                            new description: {:?}, \
1020                                            batch_description_frontier: {:?}",
1021                                        collection_id,
1022                                        batch_description,
1023                                        batch_description_frontier
1024                                    );
1025                                }
1026
1027                                // This line has to be broken up, or
1028                                // rustfmt fails in the whole function :(
1029                                let is_new = in_flight_descriptions.insert(
1030                                    batch_description.clone()
1031                                );
1032
1033                                assert!(
1034                                    is_new,
1035                                    "append_batches: sink {} got more than one batch \
1036                                        for a given description in-flight: {:?}",
1037                                    collection_id, in_flight_batches
1038                                );
1039                            }
1040
1041                            continue;
1042                        }
1043                        Event::Progress(frontier) => {
1044                            batch_description_frontier = frontier;
1045                        }
1046                    }
1047                }
1048                Some(event) = batches_input.next() => {
1049                    match event {
1050                        Event::Data(_cap, data) => {
1051                            for batch in data {
1052                                let batch_description = (batch.lower.clone(), batch.upper.clone());
1053
1054                                let batches = in_flight_batches
1055                                    .entry(batch_description)
1056                                    .or_default();
1057
1058                                batches.finished.push(FinishedBatch {
1059                                    batch: write.batch_from_transmittable_batch(batch.batch),
1060                                    data_ts: batch.data_ts,
1061                                });
1062                                batches.batch_metrics += &batch.metrics;
1063                            }
1064                            continue;
1065                        }
1066                        Event::Progress(frontier) => {
1067                            batches_frontier = frontier;
1068                        }
1069                    }
1070                }
1071                else => {
1072                    // All inputs are exhausted, so we can shut down.
1073                    return Ok(());
1074                }
1075            };
1076
1077            // Peel off any batches that are not beyond the frontier
1078            // anymore.
1079            //
1080            // It is correct to consider batches that are not beyond the
1081            // `batches_frontier` because it is held back by the writer
1082            // operator as long as a) the `batch_description_frontier` did
1083            // not advance and b) as long as the `desired_frontier` has not
1084            // advanced to the `upper` of a given batch description.
1085
1086            let mut done_batches = in_flight_descriptions
1087                .iter()
1088                .filter(|(lower, _upper)| !PartialOrder::less_equal(&batches_frontier, lower))
1089                .cloned()
1090                .collect::<Vec<_>>();
1091
1092            trace!(
1093                "persist_sink {collection_id}/{shard_id}: \
1094                    append_batches: in_flight: {:?}, \
1095                    done: {:?}, \
1096                    batch_frontier: {:?}, \
1097                    batch_description_frontier: {:?}",
1098                in_flight_descriptions,
1099                done_batches,
1100                batches_frontier,
1101                batch_description_frontier
1102            );
1103
1104            // Append batches in order, to ensure that their `lower` and
1105            // `upper` line up.
1106            done_batches.sort_by(|a, b| {
1107                if PartialOrder::less_than(a, b) {
1108                    Ordering::Less
1109                } else if PartialOrder::less_than(b, a) {
1110                    Ordering::Greater
1111                } else {
1112                    Ordering::Equal
1113                }
1114            });
1115
1116            let validate_part_bounds_on_write = write.validate_part_bounds_on_write();
1117            let mut todo = VecDeque::new();
1118
1119            if validate_part_bounds_on_write {
1120                // Persist will expect each batch's bounds to match the append-time bounds; write them separately.
1121                for done_batch_metadata in done_batches.drain(..) {
1122                    in_flight_descriptions.remove(&done_batch_metadata);
1123                    let batch_set = in_flight_batches
1124                        .remove(&done_batch_metadata)
1125                        .unwrap_or_default();
1126                    todo.push_back((done_batch_metadata, batch_set));
1127                }
1128            } else {
1129                // Persist should allow batches to be written as part of a single append even when the bounds don't
1130                // match exactly; group all eligible batches together.
1131                let mut combined_batch_metadata = None;
1132                let mut combined_batch_set = BatchSet::default();
1133                for done_batch_metadata in done_batches.drain(..) {
1134                    in_flight_descriptions.remove(&done_batch_metadata);
1135                    let mut batch_set = in_flight_batches
1136                        .remove(&done_batch_metadata)
1137                        .unwrap_or_default();
1138                    match combined_batch_metadata.as_mut() {
1139                        Some((_, upper)) => *upper = done_batch_metadata.1,
1140                        None => combined_batch_metadata = Some(done_batch_metadata),
1141                    }
1142                    combined_batch_set.batch_metrics += &batch_set.batch_metrics;
1143                    combined_batch_set.finished.append(&mut batch_set.finished);
1144                }
1145                if let Some(done_batch_metadata) = combined_batch_metadata {
1146                    todo.push_back((done_batch_metadata, combined_batch_set))
1147                }
1148            };
1149
1150            while let Some((done_batch_metadata, batch_set)) = todo.pop_front() {
1151                in_flight_descriptions.remove(&done_batch_metadata);
1152
1153                let mut batches = batch_set.finished;
1154
1155                trace!(
1156                    "persist_sink {collection_id}/{shard_id}: \
1157                        done batch: {:?}, {:?}",
1158                    done_batch_metadata,
1159                    batches
1160                );
1161
1162                let (batch_lower, batch_upper) = done_batch_metadata;
1163
1164                let batch_metrics = batch_set.batch_metrics;
1165
1166                let mut to_append = batches.iter_mut().map(|b| &mut b.batch).collect::<Vec<_>>();
1167
1168                let result = {
1169                    let maybe_err = if *read_only_rx.borrow() {
1170
1171                        // We have to wait for either us coming out of read-only
1172                        // mode or someone else applying a write that covers our
1173                        // batch.
1174                        //
1175                        // If we didn't wait for the latter here, and just go
1176                        // around the loop again, we might miss a moment where
1177                        // _we_ have to write down a batch. For example when our
1178                        // input frontier advances to a state where we can
1179                        // write, and the read-write instance sees the same
1180                        // update but then crashes before it can append a batch.
1181
1182                        let maybe_err = loop {
1183                            if collection_id.is_user() {
1184                                tracing::debug!(
1185                                    %worker_id,
1186                                    %collection_id,
1187                                    %shard_id,
1188                                    ?batch_lower,
1189                                    ?batch_upper,
1190                                    ?current_upper,
1191                                    "persist_sink is in read-only mode, waiting until we come out of it or the shard upper advances"
1192                                );
1193                            }
1194
1195                            // We don't try to be smart here, and for example
1196                            // use `wait_for_upper_past()`. We'd have to use a
1197                            // select!, which would require cancel safety of
1198                            // `wait_for_upper_past()`, which it doesn't
1199                            // advertise.
1200                            let _ = tokio::time::timeout(
1201                                Duration::from_secs(1),
1202                                read_only_rx.changed(),
1203                            )
1204                            .await;
1205
1206                            if !*read_only_rx.borrow() {
1207                                if collection_id.is_user() {
1208                                    tracing::debug!(
1209                                        %worker_id,
1210                                        %collection_id,
1211                                        %shard_id,
1212                                        ?batch_lower,
1213                                        ?batch_upper,
1214                                        ?current_upper,
1215                                        "persist_sink has come out of read-only mode"
1216                                    );
1217                                }
1218
1219                                // It's okay to write now.
1220                                break Ok(());
1221                            }
1222
1223                            let current_upper = write.fetch_recent_upper().await;
1224
1225                            if PartialOrder::less_than(&batch_upper, current_upper) {
1226                                // We synthesize an `UpperMismatch` so that we can go
1227                                // through the same logic below for trimming down our
1228                                // batches.
1229                                //
1230                                // Notably, we are not trying to be smart, and teach the
1231                                // write operator about read-only mode. Writing down
1232                                // those batches does not append anything to the persist
1233                                // shard, and it would be a hassle to figure out in the
1234                                // write workers how to trim down batches in read-only
1235                                // mode, when the shard upper advances.
1236                                //
1237                                // Right here, in the logic below, we have all we need
1238                                // for figuring out how to trim our batches.
1239
1240                                if collection_id.is_user() {
1241                                    tracing::debug!(
1242                                        %worker_id,
1243                                        %collection_id,
1244                                        %shard_id,
1245                                        ?batch_lower,
1246                                        ?batch_upper,
1247                                        ?current_upper,
1248                                        "persist_sink not appending in read-only mode"
1249                                    );
1250                                }
1251
1252                                break Err(UpperMismatch {
1253                                    current: current_upper.clone(),
1254                                    expected: batch_lower.clone()}
1255                                );
1256                            }
1257                        };
1258
1259                        maybe_err
1260                    } else {
1261                        // It's okay to proceed with the write.
1262                        Ok(())
1263                    };
1264
1265                    match maybe_err {
1266                        Ok(()) => {
1267                            let _permit = busy_signal.acquire().await;
1268
1269                            write.compare_and_append_batch(
1270                                &mut to_append[..],
1271                                batch_lower.clone(),
1272                                batch_upper.clone(),
1273                                validate_part_bounds_on_write,
1274                            )
1275                            .await
1276                            .expect("Invalid usage")
1277                        },
1278                        Err(e) => {
1279                            // We forward the synthesize error message, so that
1280                            // we go though the batch cleanup logic below.
1281                            Err(e)
1282                        }
1283                    }
1284                };
1285
1286
1287                // These metrics are independent of whether it was _us_ or
1288                // _someone_ that managed to commit a batch that advanced the
1289                // upper.
1290                source_statistics.update_snapshot_committed(&batch_upper);
1291                source_statistics.update_rehydration_latency_ms(&batch_upper);
1292                metrics
1293                    .progress
1294                    .set(mz_persist_client::metrics::encode_ts_metric(&batch_upper));
1295
1296                if collection_id.is_user() {
1297                    trace!(
1298                        "persist_sink {collection_id}/{shard_id}: \
1299                            append result for batch ({:?} -> {:?}): {:?}",
1300                        batch_lower,
1301                        batch_upper,
1302                        result
1303                    );
1304                }
1305
1306                match result {
1307                    Ok(()) => {
1308                        // Only update these metrics when we know that _we_ were
1309                        // successful.
1310                        let committed =
1311                            batch_metrics.inserts + batch_metrics.retractions;
1312                        source_statistics
1313                            .inc_updates_committed_by(committed);
1314                        metrics.processed_batches.inc();
1315                        metrics.row_inserts.inc_by(batch_metrics.inserts);
1316                        metrics.row_retractions.inc_by(batch_metrics.retractions);
1317                        metrics.error_inserts.inc_by(batch_metrics.error_inserts);
1318                        metrics
1319                            .error_retractions
1320                            .inc_by(batch_metrics.error_retractions);
1321
1322                        current_upper.borrow_mut().clone_from(&batch_upper);
1323                        upper_cap_set.downgrade(current_upper.borrow().iter());
1324                    }
1325                    Err(mismatch) => {
1326                        // We tried to to a non-contiguous append, that won't work.
1327                        if PartialOrder::less_than(&mismatch.current, &batch_lower) {
1328                            // Best-effort attempt to delete unneeded batches.
1329                            future::join_all(batches.into_iter().map(|b| b.batch.delete())).await;
1330
1331                            // We always bail when this happens, regardless of
1332                            // `bail_on_concurrent_modification`.
1333                            tracing::warn!(
1334                                "persist_sink({}): invalid upper! \
1335                                    Tried to append batch ({:?} -> {:?}) but upper \
1336                                    is {:?}. This is surpising and likely indicates \
1337                                    a bug in the persist sink, but we'll restart the \
1338                                    dataflow and try again.",
1339                                collection_id, batch_lower, batch_upper, mismatch.current,
1340                            );
1341                            anyhow::bail!("collection concurrently modified. Ingestion dataflow will be restarted");
1342                        } else if PartialOrder::less_than(&mismatch.current, &batch_upper) {
1343                            // The shard's upper was ahead of our batch's lower
1344                            // but not ahead of our upper. Cut down the
1345                            // description by advancing its lower to the current
1346                            // shard upper and try again. IMPORTANT: We can only
1347                            // advance the lower, meaning we cut updates away,
1348                            // we must not "extend" the batch by changing to a
1349                            // lower that is not beyond the current lower. This
1350                            // invariant is checked by the first if branch: if
1351                            // `!(current_upper < lower)` then it holds that
1352                            // `lower <= current_upper`.
1353
1354                            // First, construct a new batch description with the
1355                            // lower advanced to the current shard upper.
1356                            let new_batch_lower = mismatch.current.clone();
1357                            let new_done_batch_metadata =
1358                                (new_batch_lower.clone(), batch_upper.clone());
1359
1360                            // Retain any batches that are still in advance of
1361                            // the new lower, and delete any batches that are
1362                            // not.
1363                            let mut batch_delete_futures = vec![];
1364                            let mut new_batch_set = BatchSet::default();
1365                            for batch in batches {
1366                                if new_batch_lower.less_equal(&batch.data_ts) {
1367                                    new_batch_set.finished.push(batch);
1368                                } else {
1369                                    batch_delete_futures.push(batch.batch.delete());
1370                                }
1371                            }
1372
1373                            // Re-add the new batch to the list of batches to process.
1374                            todo.push_front((new_done_batch_metadata, new_batch_set));
1375
1376                            // Best-effort attempt to delete unneeded batches.
1377                            future::join_all(batch_delete_futures).await;
1378                        } else {
1379                            // Best-effort attempt to delete unneeded batches.
1380                            future::join_all(batches.into_iter().map(|b| b.batch.delete())).await;
1381                        }
1382
1383                        if bail_on_concurrent_modification {
1384                            tracing::warn!(
1385                                "persist_sink({}): invalid upper! \
1386                                    Tried to append batch ({:?} -> {:?}) but upper \
1387                                    is {:?}. This is not a problem, it just means \
1388                                    someone else was faster than us. We will try \
1389                                    again with a new batch description.",
1390                                collection_id, batch_lower, batch_upper, mismatch.current,
1391                            );
1392                            anyhow::bail!("collection concurrently modified. Ingestion dataflow will be restarted");
1393                        }
1394                    }
1395                }
1396            }
1397        }
1398    }));
1399
1400    (upper_stream, errors, shutdown_button.press_on_drop())
1401}