mz_storage/render/persist_sink.rs
1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Render an operator that persists a source collection.
11//!
12//! ## Implementation
13//!
14//! This module defines the `persist_sink` operator, that writes
15//! a collection produced by source rendering into a persist shard.
16//!
17//! It attempts to use all workers to write data to persist, and uses
18//! single-instance workers to coordinate work. The below diagram
19//! is an overview how it it shaped. There is more information
20//! in the doc comments of the top-level functions of this module.
21//!
22//!```text
23//!
24//! ,------------.
25//! | source |
26//! | collection |
27//! +---+--------+
28//! / |
29//! / |
30//! / |
31//! / |
32//! / |
33//! / |
34//! / |
35//! / |
36//! / ,-+-----------------------.
37//! / | mint_batch_descriptions |
38//! / | one arbitrary worker |
39//! | +-,--,--------+----+------+
40//! ,----------´.-´ | \
41//! _.-´ | .-´ | \
42//! _.-´ | .-´ | \
43//! .-´ .------+----|-------+---------|--------\-----.
44//! / / | | | \ \
45//! ,--------------. ,-----------------. | ,-----------------.
46//! | write_batches| | write_batches | | | write_batches |
47//! | worker 0 | | worker 1 | | | worker N |
48//! +-----+--------+ +-+---------------+ | +--+--------------+
49//! \ \ | /
50//! `-. `, | /
51//! `-._ `-. | /
52//! `-._ `-. | /
53//! `---------. `-. | /
54//! +`---`---+-------------,
55//! | append_batches |
56//! | one arbitrary worker |
57//! +------+---------------+
58//!```
59//!
60//! ## Similarities with `mz_compute::sink::persist_sink`
61//!
62//! This module has many similarities with the compute version of
63//! the same concept, and in fact, is entirely derived from it.
64//!
65//! Compute requires that its `persist_sink` is _self-correcting_;
66//! that is, it corrects what the collection in persist
67//! accumulates to if the collection has values changed at
68//! previous timestamps. It does this by continually comparing
69//! the input stream with the collection as read back from persist.
70//!
71//! Source collections, while definite, cannot be reliably by
72//! re-produced once written down, which means compute's
73//! `persist_sink`'s self-correction mechanism would need to be
74//! skipped on operator startup, and would cause unnecessary read
75//! load on persist.
76//!
77//! Additionally, persisting sources requires we use bounded
78//! amounts of memory, even if a single timestamp represents
79//! a huge amount of data. This is not (currently) possible
80//! to guarantee while also performing self-correction.
81//!
82//! Because of this, we have ripped out the self-correction
83//! mechanism, and aggressively simplified the sub-operators.
84//! Some, particularly `append_batches` could be merged with
85//! the compute version, but that requires some amount of
86//! onerous refactoring that we have chosen to skip for now.
87//!
88// TODO(guswynn): merge at least the `append_batches` operator`
89
90use std::cmp::Ordering;
91use std::collections::{BTreeMap, VecDeque};
92use std::fmt::Debug;
93use std::ops::AddAssign;
94use std::rc::Rc;
95use std::sync::Arc;
96use std::time::Duration;
97
98use differential_dataflow::difference::Monoid;
99use differential_dataflow::lattice::Lattice;
100use differential_dataflow::{AsCollection, Hashable, VecCollection};
101use futures::{StreamExt, future};
102use itertools::Itertools;
103use mz_ore::cast::CastFrom;
104use mz_ore::collections::HashMap;
105use mz_persist_client::Diagnostics;
106use mz_persist_client::batch::{Batch, BatchBuilder, ProtoBatch};
107use mz_persist_client::cache::PersistClientCache;
108use mz_persist_client::error::UpperMismatch;
109use mz_persist_types::codec_impls::UnitSchema;
110use mz_persist_types::{Codec, Codec64};
111use mz_repr::{Diff, GlobalId, Row};
112use mz_storage_types::controller::CollectionMetadata;
113use mz_storage_types::errors::DataflowError;
114use mz_storage_types::sources::SourceData;
115use mz_storage_types::{StorageDiff, dyncfgs};
116use mz_timely_util::builder_async::{
117 Event, OperatorBuilder as AsyncOperatorBuilder, PressOnDropButton,
118};
119use serde::{Deserialize, Serialize};
120use timely::PartialOrder;
121use timely::container::CapacityContainerBuilder;
122use timely::dataflow::channels::pact::{Exchange, Pipeline};
123use timely::dataflow::operators::vec::Broadcast;
124use timely::dataflow::operators::{Capability, CapabilitySet, Inspect};
125use timely::dataflow::{Scope, Stream, StreamVec};
126use timely::progress::{Antichain, Timestamp};
127use tokio::sync::Semaphore;
128use tracing::trace;
129
130use crate::metrics::source::SourcePersistSinkMetrics;
131use crate::storage_state::StorageState;
132
133/// Metrics about batches.
134#[derive(Clone, Debug, Default, Deserialize, Serialize)]
135struct BatchMetrics {
136 inserts: u64,
137 retractions: u64,
138 error_inserts: u64,
139 error_retractions: u64,
140}
141
142impl AddAssign<&BatchMetrics> for BatchMetrics {
143 fn add_assign(&mut self, rhs: &BatchMetrics) {
144 let BatchMetrics {
145 inserts: self_inserts,
146 retractions: self_retractions,
147 error_inserts: self_error_inserts,
148 error_retractions: self_error_retractions,
149 } = self;
150 let BatchMetrics {
151 inserts: rhs_inserts,
152 retractions: rhs_retractions,
153 error_inserts: rhs_error_inserts,
154 error_retractions: rhs_error_retractions,
155 } = rhs;
156 *self_inserts += rhs_inserts;
157 *self_retractions += rhs_retractions;
158 *self_error_inserts += rhs_error_inserts;
159 *self_error_retractions += rhs_error_retractions;
160 }
161}
162
163/// Manages batches and metrics.
164struct BatchBuilderAndMetadata<K, V, T, D>
165where
166 K: Codec,
167 V: Codec,
168 T: Timestamp + Lattice + Codec64,
169{
170 builder: BatchBuilder<K, V, T, D>,
171 data_ts: T,
172 metrics: BatchMetrics,
173}
174
175impl<K, V, T, D> BatchBuilderAndMetadata<K, V, T, D>
176where
177 K: Codec + Debug,
178 V: Codec + Debug,
179 T: Timestamp + Lattice + Codec64,
180 D: Monoid + Codec64,
181{
182 /// Creates a new batch.
183 ///
184 /// NOTE(benesch): temporary restriction: all updates added to the batch
185 /// must be at the specified timestamp `data_ts`.
186 fn new(builder: BatchBuilder<K, V, T, D>, data_ts: T) -> Self {
187 BatchBuilderAndMetadata {
188 builder,
189 data_ts,
190 metrics: Default::default(),
191 }
192 }
193
194 /// Adds an update to the batch.
195 ///
196 /// NOTE(benesch): temporary restriction: all updates added to the batch
197 /// must be at the timestamp specified during creation.
198 async fn add(&mut self, k: &K, v: &V, t: &T, d: &D) {
199 assert_eq!(
200 self.data_ts, *t,
201 "BatchBuilderAndMetadata::add called with a timestamp {t:?} that does not match creation timestamp {:?}",
202 self.data_ts
203 );
204
205 self.builder.add(k, v, t, d).await.expect("invalid usage");
206 }
207
208 async fn finish(self, lower: Antichain<T>, upper: Antichain<T>) -> HollowBatchAndMetadata<T> {
209 let batch = self
210 .builder
211 .finish(upper.clone())
212 .await
213 .expect("invalid usage");
214 HollowBatchAndMetadata {
215 lower,
216 upper,
217 data_ts: self.data_ts,
218 batch: batch.into_transmittable_batch(),
219 metrics: self.metrics,
220 }
221 }
222}
223
224/// A batch or data + metrics moved from `write_batches` to `append_batches`.
225#[derive(Clone, Debug, Deserialize, Serialize)]
226#[serde(bound(
227 serialize = "T: Timestamp + Codec64",
228 deserialize = "T: Timestamp + Codec64"
229))]
230struct HollowBatchAndMetadata<T> {
231 lower: Antichain<T>,
232 upper: Antichain<T>,
233 data_ts: T,
234 batch: ProtoBatch,
235 metrics: BatchMetrics,
236}
237
238/// Holds finished batches for `append_batches`.
239#[derive(Debug, Default)]
240struct BatchSet {
241 finished: Vec<FinishedBatch>,
242 batch_metrics: BatchMetrics,
243}
244
245#[derive(Debug)]
246struct FinishedBatch {
247 batch: Batch<SourceData, (), mz_repr::Timestamp, StorageDiff>,
248 data_ts: mz_repr::Timestamp,
249}
250
251/// Continuously writes the `desired_stream` into persist
252/// This is done via a multi-stage operator graph:
253///
254/// 1. `mint_batch_descriptions` emits new batch descriptions whenever the
255/// frontier of `desired_collection` advances. A batch description is
256/// a pair of `(lower, upper)` that tells write operators
257/// which updates to write and in the end tells the append operator
258/// what frontiers to use when calling `append`/`compare_and_append`.
259/// This is a single-worker operator.
260/// 2. `write_batches` writes the `desired_collection` to persist as
261/// batches and sends those batches along.
262/// This does not yet append the batches to the persist shard, the update are
263/// only uploaded/prepared to be appended to a shard. Also: we only write
264/// updates for batch descriptions that we learned about from
265/// `mint_batch_descriptions`.
266/// 3. `append_batches` takes as input the minted batch descriptions and written
267/// batches. Whenever the frontiers sufficiently advance, we take a batch
268/// description and all the batches that belong to it and append it to the
269/// persist shard.
270///
271/// This operator assumes that the `desired_collection` comes pre-sharded.
272///
273/// Note that `mint_batch_descriptions` inspects the frontier of
274/// `desired_collection`, and passes the data through to `write_batches`.
275/// This is done to avoid a clone of the underlying data so that both
276/// operators can have the collection as input.
277pub(crate) fn render<G>(
278 scope: &G,
279 collection_id: GlobalId,
280 target: CollectionMetadata,
281 desired_collection: VecCollection<G, Result<Row, DataflowError>, Diff>,
282 storage_state: &StorageState,
283 metrics: SourcePersistSinkMetrics,
284 busy_signal: Arc<Semaphore>,
285) -> (
286 StreamVec<G, ()>,
287 StreamVec<G, Rc<anyhow::Error>>,
288 Vec<PressOnDropButton>,
289)
290where
291 G: Scope<Timestamp = mz_repr::Timestamp>,
292{
293 let persist_clients = Arc::clone(&storage_state.persist_clients);
294
295 let operator_name = format!("persist_sink({})", collection_id);
296
297 let (batch_descriptions, passthrough_desired_stream, mint_token) = mint_batch_descriptions(
298 scope,
299 collection_id,
300 &operator_name,
301 &target,
302 desired_collection,
303 Arc::clone(&persist_clients),
304 );
305
306 let (written_batches, write_token) = write_batches(
307 scope,
308 collection_id.clone(),
309 &operator_name,
310 &target,
311 batch_descriptions.clone(),
312 passthrough_desired_stream.as_collection(),
313 Arc::clone(&persist_clients),
314 storage_state,
315 Arc::clone(&busy_signal),
316 );
317
318 let (upper_stream, append_errors, append_token) = append_batches(
319 scope,
320 collection_id.clone(),
321 operator_name,
322 &target,
323 batch_descriptions,
324 written_batches,
325 persist_clients,
326 storage_state,
327 metrics,
328 Arc::clone(&busy_signal),
329 );
330
331 (
332 upper_stream,
333 append_errors,
334 vec![mint_token, write_token, append_token],
335 )
336}
337
338/// Whenever the frontier advances, this mints a new batch description (lower
339/// and upper) that writers should use for writing the next set of batches to
340/// persist.
341///
342/// Only one of the workers does this, meaning there will only be one
343/// description in the stream, even in case of multiple timely workers. Use
344/// `broadcast()` to, ahem, broadcast, the one description to all downstream
345/// write operators/workers.
346fn mint_batch_descriptions<G>(
347 scope: &G,
348 collection_id: GlobalId,
349 operator_name: &str,
350 target: &CollectionMetadata,
351 desired_collection: VecCollection<G, Result<Row, DataflowError>, Diff>,
352 persist_clients: Arc<PersistClientCache>,
353) -> (
354 StreamVec<G, (Antichain<mz_repr::Timestamp>, Antichain<mz_repr::Timestamp>)>,
355 StreamVec<G, (Result<Row, DataflowError>, mz_repr::Timestamp, Diff)>,
356 PressOnDropButton,
357)
358where
359 G: Scope<Timestamp = mz_repr::Timestamp>,
360{
361 let persist_location = target.persist_location.clone();
362 let shard_id = target.data_shard;
363 let target_relation_desc = target.relation_desc.clone();
364
365 // Only one worker is responsible for determining batch descriptions. All
366 // workers must write batches with the same description, to ensure that they
367 // can be combined into one batch that gets appended to Consensus state.
368 let hashed_id = collection_id.hashed();
369 let active_worker = usize::cast_from(hashed_id) % scope.peers() == scope.index();
370
371 // Only the "active" operator will mint batches. All other workers have an
372 // empty frontier. It's necessary to insert all of these into
373 // `compute_state.sink_write_frontier` below so we properly clear out
374 // default frontiers of non-active workers.
375
376 let mut mint_op = AsyncOperatorBuilder::new(
377 format!("{} mint_batch_descriptions", operator_name),
378 scope.clone(),
379 );
380
381 let (output, output_stream) = mint_op.new_output::<CapacityContainerBuilder<Vec<_>>>();
382 let (data_output, data_output_stream) =
383 mint_op.new_output::<CapacityContainerBuilder<Vec<_>>>();
384
385 // The description and the data-passthrough outputs are both driven by this input, so
386 // they use a standard input connection.
387 let mut desired_input =
388 mint_op.new_input_for_many(desired_collection.inner, Pipeline, [&output, &data_output]);
389
390 let shutdown_button = mint_op.build(move |capabilities| async move {
391 // Non-active workers should just pass the data through.
392 if !active_worker {
393 // The description output is entirely driven by the active worker, so we drop
394 // its capability here. The data-passthrough output just uses the data
395 // capabilities.
396 drop(capabilities);
397 while let Some(event) = desired_input.next().await {
398 match event {
399 Event::Data([_output_cap, data_output_cap], mut data) => {
400 data_output.give_container(&data_output_cap, &mut data);
401 }
402 Event::Progress(_) => {}
403 }
404 }
405 return;
406 }
407 // The data-passthrough output should will use the data capabilities, so we drop
408 // its capability here.
409 let [desc_cap, _]: [_; 2] = capabilities.try_into().expect("one capability per output");
410 let mut cap_set = CapabilitySet::from_elem(desc_cap);
411
412 // Initialize this operators's `upper` to the `upper` of the persist shard we are writing
413 // to. Data from the source not beyond this time will be dropped, as it has already
414 // been persisted.
415 // In the future, sources will avoid passing through data not beyond this upper
416 let mut current_upper = {
417 // TODO(aljoscha): We need to figure out what to do with error
418 // results from these calls.
419 let persist_client = persist_clients
420 .open(persist_location)
421 .await
422 .expect("could not open persist client");
423
424 let mut write = persist_client
425 .open_writer::<SourceData, (), mz_repr::Timestamp, StorageDiff>(
426 shard_id,
427 Arc::new(target_relation_desc),
428 Arc::new(UnitSchema),
429 Diagnostics {
430 shard_name: collection_id.to_string(),
431 handle_purpose: format!(
432 "storage::persist_sink::mint_batch_descriptions {}",
433 collection_id
434 ),
435 },
436 )
437 .await
438 .expect("could not open persist shard");
439
440 // TODO: this sink currently cannot tolerate a stale upper... which is bad because the
441 // upper can become stale as soon as it is read. (For example, if another concurrent
442 // instance of the sink has updated it.) Fetching a recent upper helps to mitigate this,
443 // but ideally we would just skip ahead if we discover that our upper is stale.
444 let upper = write.fetch_recent_upper().await.clone();
445 // explicitly expire the once-used write handle.
446 write.expire().await;
447 upper
448 };
449
450 // The current input frontiers.
451 let mut desired_frontier;
452
453 loop {
454 if let Some(event) = desired_input.next().await {
455 match event {
456 Event::Data([_output_cap, data_output_cap], mut data) => {
457 // Just passthrough the data.
458 data_output.give_container(&data_output_cap, &mut data);
459 continue;
460 }
461 Event::Progress(frontier) => {
462 desired_frontier = frontier;
463 }
464 }
465 } else {
466 // Input is exhausted, so we can shut down.
467 return;
468 };
469
470 // If the new frontier for the data input has progressed, produce a batch description.
471 if PartialOrder::less_than(¤t_upper, &desired_frontier) {
472 // The maximal description range we can produce.
473 let batch_description = (current_upper.to_owned(), desired_frontier.to_owned());
474
475 let lower = batch_description.0.as_option().copied().unwrap();
476
477 let cap = cap_set
478 .try_delayed(&lower)
479 .ok_or_else(|| {
480 format!(
481 "minter cannot delay {:?} to {:?}. \
482 Likely because we already emitted a \
483 batch description and delayed.",
484 cap_set, lower
485 )
486 })
487 .unwrap();
488
489 trace!(
490 "persist_sink {collection_id}/{shard_id}: \
491 new batch_description: {:?}",
492 batch_description
493 );
494
495 output.give(&cap, batch_description);
496
497 // We downgrade our capability to the batch
498 // description upper, as there will never be
499 // any overlapping descriptions.
500 trace!(
501 "persist_sink {collection_id}/{shard_id}: \
502 downgrading to {:?}",
503 desired_frontier
504 );
505 cap_set.downgrade(desired_frontier.iter());
506
507 // After successfully emitting a new description, we can update the upper for the
508 // operator.
509 current_upper.clone_from(&desired_frontier);
510 }
511 }
512 });
513
514 (
515 output_stream,
516 data_output_stream,
517 shutdown_button.press_on_drop(),
518 )
519}
520
521/// Writes `desired_collection` to persist, but only for updates
522/// that fall into batch a description that we get via `batch_descriptions`.
523/// This forwards a `HollowBatch` (with additional metadata)
524/// for any batch of updates that was written.
525///
526/// This operator assumes that the `desired_collection` comes pre-sharded.
527///
528/// This also and updates various metrics.
529fn write_batches<G>(
530 scope: &G,
531 collection_id: GlobalId,
532 operator_name: &str,
533 target: &CollectionMetadata,
534 batch_descriptions: Stream<
535 G,
536 Vec<(Antichain<mz_repr::Timestamp>, Antichain<mz_repr::Timestamp>)>,
537 >,
538 desired_collection: VecCollection<G, Result<Row, DataflowError>, Diff>,
539 persist_clients: Arc<PersistClientCache>,
540 storage_state: &StorageState,
541 busy_signal: Arc<Semaphore>,
542) -> (
543 StreamVec<G, HollowBatchAndMetadata<mz_repr::Timestamp>>,
544 PressOnDropButton,
545)
546where
547 G: Scope<Timestamp = mz_repr::Timestamp>,
548{
549 let worker_index = scope.index();
550
551 let persist_location = target.persist_location.clone();
552 let shard_id = target.data_shard;
553 let target_relation_desc = target.relation_desc.clone();
554
555 let source_statistics = storage_state
556 .aggregated_statistics
557 .get_source(&collection_id)
558 .expect("statistics initialized")
559 .clone();
560
561 let mut write_op =
562 AsyncOperatorBuilder::new(format!("{} write_batches", operator_name), scope.clone());
563
564 let (output, output_stream) = write_op.new_output::<CapacityContainerBuilder<Vec<_>>>();
565
566 let mut descriptions_input =
567 write_op.new_input_for(batch_descriptions.broadcast(), Pipeline, &output);
568 let mut desired_input = write_op.new_disconnected_input(desired_collection.inner, Pipeline);
569
570 // This operator accepts the current and desired update streams for a `persist` shard.
571 // It attempts to write out updates, starting from the current's upper frontier, that
572 // will cause the changes of desired to be committed to persist, _but only those also past the
573 // upper_.
574
575 let shutdown_button = write_op.build(move |_capabilities| async move {
576 // In-progress batches of data, keyed by timestamp.
577 let mut stashed_batches = BTreeMap::new();
578
579 // Contains descriptions of batches for which we know that we can
580 // write data. We got these from the "centralized" operator that
581 // determines batch descriptions for all writers.
582 //
583 // `Antichain` does not implement `Ord`, so we cannot use a `BTreeMap`. We need to search
584 // through the map, so we cannot use the `mz_ore` wrapper either.
585 #[allow(clippy::disallowed_types)]
586 let mut in_flight_batches = std::collections::HashMap::<
587 (Antichain<mz_repr::Timestamp>, Antichain<mz_repr::Timestamp>),
588 Capability<mz_repr::Timestamp>,
589 >::new();
590
591 // TODO(aljoscha): We need to figure out what to do with error results from these calls.
592 let persist_client = persist_clients
593 .open(persist_location)
594 .await
595 .expect("could not open persist client");
596
597 let write = persist_client
598 .open_writer::<SourceData, (), mz_repr::Timestamp, StorageDiff>(
599 shard_id,
600 Arc::new(target_relation_desc),
601 Arc::new(UnitSchema),
602 Diagnostics {
603 shard_name: collection_id.to_string(),
604 handle_purpose: format!(
605 "storage::persist_sink::write_batches {}",
606 collection_id
607 ),
608 },
609 )
610 .await
611 .expect("could not open persist shard");
612
613 // The current input frontiers.
614 let mut batch_descriptions_frontier = Antichain::from_elem(Timestamp::minimum());
615 let mut desired_frontier = Antichain::from_elem(Timestamp::minimum());
616
617 // The frontiers of the inputs we have processed, used to avoid redoing work
618 let mut processed_desired_frontier = Antichain::from_elem(Timestamp::minimum());
619 let mut processed_descriptions_frontier = Antichain::from_elem(Timestamp::minimum());
620
621 // A "safe" choice for the lower of new batches we are creating.
622 let mut operator_batch_lower = Antichain::from_elem(Timestamp::minimum());
623
624 while !(batch_descriptions_frontier.is_empty() && desired_frontier.is_empty()) {
625 // Wait for either inputs to become ready
626 tokio::select! {
627 _ = descriptions_input.ready() => {},
628 _ = desired_input.ready() => {},
629 }
630
631 // Collect ready work from both inputs
632 while let Some(event) = descriptions_input.next_sync() {
633 match event {
634 Event::Data(cap, data) => {
635 // Ingest new batch descriptions.
636 for description in data {
637 if collection_id.is_user() {
638 trace!(
639 "persist_sink {collection_id}/{shard_id}: \
640 write_batches: \
641 new_description: {:?}, \
642 desired_frontier: {:?}, \
643 batch_descriptions_frontier: {:?}",
644 description, desired_frontier, batch_descriptions_frontier,
645 );
646 }
647 match in_flight_batches.entry(description) {
648 std::collections::hash_map::Entry::Vacant(v) => {
649 // This _should_ be `.retain`, but rust
650 // currently thinks we can't use `cap`
651 // as an owned value when using the
652 // match guard `Some(event)`
653 v.insert(cap.delayed(cap.time()));
654 }
655 std::collections::hash_map::Entry::Occupied(o) => {
656 let (description, _) = o.remove_entry();
657 panic!(
658 "write_batches: sink {} got more than one \
659 batch for description {:?}, in-flight: {:?}",
660 collection_id, description, in_flight_batches
661 );
662 }
663 }
664 }
665 }
666 Event::Progress(frontier) => {
667 batch_descriptions_frontier = frontier;
668 }
669 }
670 }
671
672 let ready_events = std::iter::from_fn(|| desired_input.next_sync()).collect_vec();
673
674 // We know start the async work for the input we received. Until we finish the dataflow
675 // should be marked as busy.
676 let permit = busy_signal.acquire().await;
677
678 for event in ready_events {
679 match event {
680 Event::Data(_cap, data) => {
681 // Extract desired rows as positive contributions to `correction`.
682 if collection_id.is_user() && !data.is_empty() {
683 trace!(
684 "persist_sink {collection_id}/{shard_id}: \
685 updates: {:?}, \
686 in-flight-batches: {:?}, \
687 desired_frontier: {:?}, \
688 batch_descriptions_frontier: {:?}",
689 data,
690 in_flight_batches,
691 desired_frontier,
692 batch_descriptions_frontier,
693 );
694 }
695
696 for (row, ts, diff) in data {
697 if write.upper().less_equal(&ts) {
698 let builder = stashed_batches.entry(ts).or_insert_with(|| {
699 BatchBuilderAndMetadata::new(
700 write.builder(operator_batch_lower.clone()),
701 ts,
702 )
703 });
704
705 let is_value = row.is_ok();
706
707 builder
708 .add(&SourceData(row), &(), &ts, &diff.into_inner())
709 .await;
710
711 source_statistics.inc_updates_staged_by(1);
712
713 // Note that we assume `diff` is either +1 or -1 here, being anything
714 // else is a logic bug we can't handle at the metric layer. We also
715 // assume this addition doesn't overflow.
716 match (is_value, diff.is_positive()) {
717 (true, true) => builder.metrics.inserts += diff.unsigned_abs(),
718 (true, false) => {
719 builder.metrics.retractions += diff.unsigned_abs()
720 }
721 (false, true) => {
722 builder.metrics.error_inserts += diff.unsigned_abs()
723 }
724 (false, false) => {
725 builder.metrics.error_retractions += diff.unsigned_abs()
726 }
727 }
728 }
729 }
730 }
731 Event::Progress(frontier) => {
732 desired_frontier = frontier;
733 }
734 }
735 }
736 // We may have the opportunity to commit updates, if either frontier
737 // has moved
738 if PartialOrder::less_equal(&processed_desired_frontier, &desired_frontier)
739 || PartialOrder::less_equal(
740 &processed_descriptions_frontier,
741 &batch_descriptions_frontier,
742 )
743 {
744 trace!(
745 "persist_sink {collection_id}/{shard_id}: \
746 CAN emit: \
747 processed_desired_frontier: {:?}, \
748 processed_descriptions_frontier: {:?}, \
749 desired_frontier: {:?}, \
750 batch_descriptions_frontier: {:?}",
751 processed_desired_frontier,
752 processed_descriptions_frontier,
753 desired_frontier,
754 batch_descriptions_frontier,
755 );
756
757 trace!(
758 "persist_sink {collection_id}/{shard_id}: \
759 in-flight batches: {:?}, \
760 batch_descriptions_frontier: {:?}, \
761 desired_frontier: {:?}",
762 in_flight_batches, batch_descriptions_frontier, desired_frontier,
763 );
764
765 // We can write updates for a given batch description when
766 // a) the batch is not beyond `batch_descriptions_frontier`,
767 // and b) we know that we have seen all updates that would
768 // fall into the batch, from `desired_frontier`.
769 let ready_batches = in_flight_batches
770 .keys()
771 .filter(|(lower, upper)| {
772 !PartialOrder::less_equal(&batch_descriptions_frontier, lower)
773 && !PartialOrder::less_than(&desired_frontier, upper)
774 })
775 .cloned()
776 .collect::<Vec<_>>();
777
778 trace!(
779 "persist_sink {collection_id}/{shard_id}: \
780 ready batches: {:?}",
781 ready_batches,
782 );
783
784 for batch_description in ready_batches {
785 let cap = in_flight_batches.remove(&batch_description).unwrap();
786
787 if collection_id.is_user() {
788 trace!(
789 "persist_sink {collection_id}/{shard_id}: \
790 emitting done batch: {:?}, cap: {:?}",
791 batch_description, cap
792 );
793 }
794
795 let (batch_lower, batch_upper) = batch_description;
796
797 let finalized_timestamps: Vec<_> = stashed_batches
798 .keys()
799 .filter(|time| {
800 batch_lower.less_equal(time) && !batch_upper.less_equal(time)
801 })
802 .copied()
803 .collect();
804
805 let mut batch_tokens = vec![];
806 for ts in finalized_timestamps {
807 let batch_builder = stashed_batches.remove(&ts).unwrap();
808
809 if collection_id.is_user() {
810 trace!(
811 "persist_sink {collection_id}/{shard_id}: \
812 wrote batch from worker {}: ({:?}, {:?}),
813 containing {:?}",
814 worker_index, batch_lower, batch_upper, batch_builder.metrics
815 );
816 }
817
818 let batch = batch_builder
819 .finish(batch_lower.clone(), batch_upper.clone())
820 .await;
821
822 // The next "safe" lower for batches is the meet (max) of all the emitted
823 // batches. These uppers all are not beyond the `desired_frontier`, which
824 // means all updates received by this operator will be beyond this lower.
825 // Additionally, the `mint_batch_descriptions` operator ensures that
826 // later-received batch descriptions will start beyond these uppers as
827 // well.
828 //
829 // It is impossible to emit a batch description that is
830 // beyond a not-yet emitted description in `in_flight_batches`, as
831 // a that description would also have been chosen as ready above.
832 operator_batch_lower = operator_batch_lower.join(&batch_upper);
833 batch_tokens.push(batch);
834 }
835
836 output.give_container(&cap, &mut batch_tokens);
837
838 processed_desired_frontier.clone_from(&desired_frontier);
839 processed_descriptions_frontier.clone_from(&batch_descriptions_frontier);
840 }
841 } else {
842 trace!(
843 "persist_sink {collection_id}/{shard_id}: \
844 cannot emit: processed_desired_frontier: {:?}, \
845 processed_descriptions_frontier: {:?}, \
846 desired_frontier: {:?}",
847 processed_desired_frontier, processed_descriptions_frontier, desired_frontier
848 );
849 }
850 drop(permit);
851 }
852 });
853
854 let output_stream = if collection_id.is_user() {
855 output_stream.inspect(|d| trace!("batch: {:?}", d))
856 } else {
857 output_stream
858 };
859
860 (output_stream, shutdown_button.press_on_drop())
861}
862
863/// Fuses written batches together and appends them to persist using one
864/// `compare_and_append` call. Writing only happens for batch descriptions where
865/// we know that no future batches will arrive, that is, for those batch
866/// descriptions that are not beyond the frontier of both the
867/// `batch_descriptions` and `batches` inputs.
868///
869/// This also keeps the shared frontier that is stored in `compute_state` in
870/// sync with the upper of the persist shard, and updates various metrics
871/// and statistics objects.
872fn append_batches<G>(
873 scope: &G,
874 collection_id: GlobalId,
875 operator_name: String,
876 target: &CollectionMetadata,
877 batch_descriptions: Stream<
878 G,
879 Vec<(Antichain<mz_repr::Timestamp>, Antichain<mz_repr::Timestamp>)>,
880 >,
881 batches: StreamVec<G, HollowBatchAndMetadata<mz_repr::Timestamp>>,
882 persist_clients: Arc<PersistClientCache>,
883 storage_state: &StorageState,
884 metrics: SourcePersistSinkMetrics,
885 busy_signal: Arc<Semaphore>,
886) -> (
887 StreamVec<G, ()>,
888 StreamVec<G, Rc<anyhow::Error>>,
889 PressOnDropButton,
890)
891where
892 G: Scope<Timestamp = mz_repr::Timestamp>,
893{
894 let persist_location = target.persist_location.clone();
895 let shard_id = target.data_shard;
896 let target_relation_desc = target.relation_desc.clone();
897
898 // We can only be lenient with concurrent modifications when we know that
899 // this source pipeline is using the feedback upsert operator, which works
900 // correctly when multiple instances of an ingestion pipeline produce
901 // different updates, because of concurrency/non-determinism.
902 let use_continual_feedback_upsert = dyncfgs::STORAGE_USE_CONTINUAL_FEEDBACK_UPSERT
903 .get(storage_state.storage_configuration.config_set());
904 let bail_on_concurrent_modification = !use_continual_feedback_upsert;
905
906 let mut read_only_rx = storage_state.read_only_rx.clone();
907
908 let operator_name = format!("{} append_batches", operator_name);
909 let mut append_op = AsyncOperatorBuilder::new(operator_name, scope.clone());
910
911 let hashed_id = collection_id.hashed();
912 let active_worker = usize::cast_from(hashed_id) % scope.peers() == scope.index();
913 let worker_id = scope.index();
914
915 // Both of these inputs are disconnected from the output capabilities of this operator, as
916 // any output of this operator is entirely driven by the `compare_and_append`s. Currently
917 // this operator has no outputs, but they may be added in the future, when merging with
918 // the compute `persist_sink`.
919 let mut descriptions_input =
920 append_op.new_disconnected_input(batch_descriptions, Exchange::new(move |_| hashed_id));
921 let mut batches_input =
922 append_op.new_disconnected_input(batches, Exchange::new(move |_| hashed_id));
923
924 let current_upper = Rc::clone(&storage_state.source_uppers[&collection_id]);
925 if !active_worker {
926 // This worker is not writing, so make sure it's "taken out" of the
927 // calculation by advancing to the empty frontier.
928 current_upper.borrow_mut().clear();
929 }
930
931 let source_statistics = storage_state
932 .aggregated_statistics
933 .get_source(&collection_id)
934 .expect("statistics initialized")
935 .clone();
936
937 // An output whose frontier tracks the last successful compare and append of this operator
938 let (_upper_output, upper_stream) = append_op.new_output::<CapacityContainerBuilder<Vec<_>>>();
939
940 // This operator accepts the batch descriptions and tokens that represent
941 // written batches. Written batches get appended to persist when we learn
942 // from our input frontiers that we have seen all batches for a given batch
943 // description.
944
945 let (shutdown_button, errors) = append_op.build_fallible(move |caps| Box::pin(async move {
946 let [upper_cap_set]: &mut [_; 1] = caps.try_into().unwrap();
947
948 // This may SEEM unnecessary, but metrics contains extra
949 // `DeleteOnDrop`-wrapped fields that will NOT be moved into this
950 // closure otherwise, dropping and destroying
951 // those metrics. This is because rust now only moves the
952 // explicitly-referenced fields into closures.
953 let metrics = metrics;
954
955 // Contains descriptions of batches for which we know that we can
956 // write data. We got these from the "centralized" operator that
957 // determines batch descriptions for all writers.
958 //
959 // `Antichain` does not implement `Ord`, so we cannot use a `BTreeSet`. We need to search
960 // through the set, so we cannot use the `mz_ore` wrapper either.
961 #[allow(clippy::disallowed_types)]
962 let mut in_flight_descriptions = std::collections::HashSet::<(
963 Antichain<mz_repr::Timestamp>,
964 Antichain<mz_repr::Timestamp>,
965 )>::new();
966
967 // In flight batches that haven't been `compare_and_append`'d yet, plus metrics about
968 // the batch.
969 let mut in_flight_batches = HashMap::<
970 (Antichain<mz_repr::Timestamp>, Antichain<mz_repr::Timestamp>),
971 BatchSet,
972 >::new();
973
974 source_statistics.initialize_rehydration_latency_ms();
975 if !active_worker {
976 // The non-active workers report that they are done snapshotting and hydrating.
977 let empty_frontier = Antichain::new();
978 source_statistics.initialize_snapshot_committed(&empty_frontier);
979 source_statistics.update_rehydration_latency_ms(&empty_frontier);
980 return Ok(());
981 }
982
983 let persist_client = persist_clients
984 .open(persist_location)
985 .await?;
986
987 let mut write = persist_client
988 .open_writer::<SourceData, (), mz_repr::Timestamp, StorageDiff>(
989 shard_id,
990 Arc::new(target_relation_desc),
991 Arc::new(UnitSchema),
992 Diagnostics {
993 shard_name:collection_id.to_string(),
994 handle_purpose: format!("persist_sink::append_batches {}", collection_id)
995 },
996 )
997 .await?;
998
999 // Initialize this sink's `upper` to the `upper` of the persist shard we are writing
1000 // to. Data from the source not beyond this time will be dropped, as it has already
1001 // been persisted.
1002 // In the future, sources will avoid passing through data not beyond this upper
1003 // VERY IMPORTANT: Only the active write worker must change the
1004 // shared upper. All other workers have already cleared this
1005 // upper above.
1006 current_upper.borrow_mut().clone_from(write.upper());
1007 upper_cap_set.downgrade(current_upper.borrow().iter());
1008 source_statistics.initialize_snapshot_committed(write.upper());
1009
1010 // The current input frontiers.
1011 let mut batch_description_frontier = Antichain::from_elem(Timestamp::minimum());
1012 let mut batches_frontier = Antichain::from_elem(Timestamp::minimum());
1013
1014 loop {
1015 tokio::select! {
1016 Some(event) = descriptions_input.next() => {
1017 match event {
1018 Event::Data(_cap, data) => {
1019 // Ingest new batch descriptions.
1020 for batch_description in data {
1021 if collection_id.is_user() {
1022 trace!(
1023 "persist_sink {collection_id}/{shard_id}: \
1024 append_batches: sink {}, \
1025 new description: {:?}, \
1026 batch_description_frontier: {:?}",
1027 collection_id,
1028 batch_description,
1029 batch_description_frontier
1030 );
1031 }
1032
1033 // This line has to be broken up, or
1034 // rustfmt fails in the whole function :(
1035 let is_new = in_flight_descriptions.insert(
1036 batch_description.clone()
1037 );
1038
1039 assert!(
1040 is_new,
1041 "append_batches: sink {} got more than one batch \
1042 for a given description in-flight: {:?}",
1043 collection_id, in_flight_batches
1044 );
1045 }
1046
1047 continue;
1048 }
1049 Event::Progress(frontier) => {
1050 batch_description_frontier = frontier;
1051 }
1052 }
1053 }
1054 Some(event) = batches_input.next() => {
1055 match event {
1056 Event::Data(_cap, data) => {
1057 for batch in data {
1058 let batch_description = (batch.lower.clone(), batch.upper.clone());
1059
1060 let batches = in_flight_batches
1061 .entry(batch_description)
1062 .or_default();
1063
1064 batches.finished.push(FinishedBatch {
1065 batch: write.batch_from_transmittable_batch(batch.batch),
1066 data_ts: batch.data_ts,
1067 });
1068 batches.batch_metrics += &batch.metrics;
1069 }
1070 continue;
1071 }
1072 Event::Progress(frontier) => {
1073 batches_frontier = frontier;
1074 }
1075 }
1076 }
1077 else => {
1078 // All inputs are exhausted, so we can shut down.
1079 return Ok(());
1080 }
1081 };
1082
1083 // Peel off any batches that are not beyond the frontier
1084 // anymore.
1085 //
1086 // It is correct to consider batches that are not beyond the
1087 // `batches_frontier` because it is held back by the writer
1088 // operator as long as a) the `batch_description_frontier` did
1089 // not advance and b) as long as the `desired_frontier` has not
1090 // advanced to the `upper` of a given batch description.
1091
1092 let mut done_batches = in_flight_descriptions
1093 .iter()
1094 .filter(|(lower, _upper)| !PartialOrder::less_equal(&batches_frontier, lower))
1095 .cloned()
1096 .collect::<Vec<_>>();
1097
1098 trace!(
1099 "persist_sink {collection_id}/{shard_id}: \
1100 append_batches: in_flight: {:?}, \
1101 done: {:?}, \
1102 batch_frontier: {:?}, \
1103 batch_description_frontier: {:?}",
1104 in_flight_descriptions,
1105 done_batches,
1106 batches_frontier,
1107 batch_description_frontier
1108 );
1109
1110 // Append batches in order, to ensure that their `lower` and
1111 // `upper` line up.
1112 done_batches.sort_by(|a, b| {
1113 if PartialOrder::less_than(a, b) {
1114 Ordering::Less
1115 } else if PartialOrder::less_than(b, a) {
1116 Ordering::Greater
1117 } else {
1118 Ordering::Equal
1119 }
1120 });
1121
1122 let validate_part_bounds_on_write = write.validate_part_bounds_on_write();
1123 let mut todo = VecDeque::new();
1124
1125 if validate_part_bounds_on_write {
1126 // Persist will expect each batch's bounds to match the append-time bounds; write them separately.
1127 for done_batch_metadata in done_batches.drain(..) {
1128 in_flight_descriptions.remove(&done_batch_metadata);
1129 let batch_set = in_flight_batches
1130 .remove(&done_batch_metadata)
1131 .unwrap_or_default();
1132 todo.push_back((done_batch_metadata, batch_set));
1133 }
1134 } else {
1135 // Persist should allow batches to be written as part of a single append even when the bounds don't
1136 // match exactly; group all eligible batches together.
1137 let mut combined_batch_metadata = None;
1138 let mut combined_batch_set = BatchSet::default();
1139 for done_batch_metadata in done_batches.drain(..) {
1140 in_flight_descriptions.remove(&done_batch_metadata);
1141 let mut batch_set = in_flight_batches
1142 .remove(&done_batch_metadata)
1143 .unwrap_or_default();
1144 match combined_batch_metadata.as_mut() {
1145 Some((_, upper)) => *upper = done_batch_metadata.1,
1146 None => combined_batch_metadata = Some(done_batch_metadata),
1147 }
1148 combined_batch_set.batch_metrics += &batch_set.batch_metrics;
1149 combined_batch_set.finished.append(&mut batch_set.finished);
1150 }
1151 if let Some(done_batch_metadata) = combined_batch_metadata {
1152 todo.push_back((done_batch_metadata, combined_batch_set))
1153 }
1154 };
1155
1156 while let Some((done_batch_metadata, batch_set)) = todo.pop_front() {
1157 in_flight_descriptions.remove(&done_batch_metadata);
1158
1159 let mut batches = batch_set.finished;
1160
1161 trace!(
1162 "persist_sink {collection_id}/{shard_id}: \
1163 done batch: {:?}, {:?}",
1164 done_batch_metadata,
1165 batches
1166 );
1167
1168 let (batch_lower, batch_upper) = done_batch_metadata;
1169
1170 let batch_metrics = batch_set.batch_metrics;
1171
1172 let mut to_append = batches.iter_mut().map(|b| &mut b.batch).collect::<Vec<_>>();
1173
1174 let result = {
1175 let maybe_err = if *read_only_rx.borrow() {
1176
1177 // We have to wait for either us coming out of read-only
1178 // mode or someone else applying a write that covers our
1179 // batch.
1180 //
1181 // If we didn't wait for the latter here, and just go
1182 // around the loop again, we might miss a moment where
1183 // _we_ have to write down a batch. For example when our
1184 // input frontier advances to a state where we can
1185 // write, and the read-write instance sees the same
1186 // update but then crashes before it can append a batch.
1187
1188 let maybe_err = loop {
1189 if collection_id.is_user() {
1190 tracing::debug!(
1191 %worker_id,
1192 %collection_id,
1193 %shard_id,
1194 ?batch_lower,
1195 ?batch_upper,
1196 ?current_upper,
1197 "persist_sink is in read-only mode, waiting until we come out of it or the shard upper advances"
1198 );
1199 }
1200
1201 // We don't try to be smart here, and for example
1202 // use `wait_for_upper_past()`. We'd have to use a
1203 // select!, which would require cancel safety of
1204 // `wait_for_upper_past()`, which it doesn't
1205 // advertise.
1206 let _ = tokio::time::timeout(
1207 Duration::from_secs(1),
1208 read_only_rx.changed(),
1209 )
1210 .await;
1211
1212 if !*read_only_rx.borrow() {
1213 if collection_id.is_user() {
1214 tracing::debug!(
1215 %worker_id,
1216 %collection_id,
1217 %shard_id,
1218 ?batch_lower,
1219 ?batch_upper,
1220 ?current_upper,
1221 "persist_sink has come out of read-only mode"
1222 );
1223 }
1224
1225 // It's okay to write now.
1226 break Ok(());
1227 }
1228
1229 let current_upper = write.fetch_recent_upper().await;
1230
1231 if PartialOrder::less_than(&batch_upper, current_upper) {
1232 // We synthesize an `UpperMismatch` so that we can go
1233 // through the same logic below for trimming down our
1234 // batches.
1235 //
1236 // Notably, we are not trying to be smart, and teach the
1237 // write operator about read-only mode. Writing down
1238 // those batches does not append anything to the persist
1239 // shard, and it would be a hassle to figure out in the
1240 // write workers how to trim down batches in read-only
1241 // mode, when the shard upper advances.
1242 //
1243 // Right here, in the logic below, we have all we need
1244 // for figuring out how to trim our batches.
1245
1246 if collection_id.is_user() {
1247 tracing::debug!(
1248 %worker_id,
1249 %collection_id,
1250 %shard_id,
1251 ?batch_lower,
1252 ?batch_upper,
1253 ?current_upper,
1254 "persist_sink not appending in read-only mode"
1255 );
1256 }
1257
1258 break Err(UpperMismatch {
1259 current: current_upper.clone(),
1260 expected: batch_lower.clone()}
1261 );
1262 }
1263 };
1264
1265 maybe_err
1266 } else {
1267 // It's okay to proceed with the write.
1268 Ok(())
1269 };
1270
1271 match maybe_err {
1272 Ok(()) => {
1273 let _permit = busy_signal.acquire().await;
1274
1275 write.compare_and_append_batch(
1276 &mut to_append[..],
1277 batch_lower.clone(),
1278 batch_upper.clone(),
1279 validate_part_bounds_on_write,
1280 )
1281 .await
1282 .expect("Invalid usage")
1283 },
1284 Err(e) => {
1285 // We forward the synthesize error message, so that
1286 // we go though the batch cleanup logic below.
1287 Err(e)
1288 }
1289 }
1290 };
1291
1292
1293 // These metrics are independent of whether it was _us_ or
1294 // _someone_ that managed to commit a batch that advanced the
1295 // upper.
1296 source_statistics.update_snapshot_committed(&batch_upper);
1297 source_statistics.update_rehydration_latency_ms(&batch_upper);
1298 metrics
1299 .progress
1300 .set(mz_persist_client::metrics::encode_ts_metric(&batch_upper));
1301
1302 if collection_id.is_user() {
1303 trace!(
1304 "persist_sink {collection_id}/{shard_id}: \
1305 append result for batch ({:?} -> {:?}): {:?}",
1306 batch_lower,
1307 batch_upper,
1308 result
1309 );
1310 }
1311
1312 match result {
1313 Ok(()) => {
1314 // Only update these metrics when we know that _we_ were
1315 // successful.
1316 let committed =
1317 batch_metrics.inserts + batch_metrics.retractions;
1318 source_statistics
1319 .inc_updates_committed_by(committed);
1320 metrics.processed_batches.inc();
1321 metrics.row_inserts.inc_by(batch_metrics.inserts);
1322 metrics.row_retractions.inc_by(batch_metrics.retractions);
1323 metrics.error_inserts.inc_by(batch_metrics.error_inserts);
1324 metrics
1325 .error_retractions
1326 .inc_by(batch_metrics.error_retractions);
1327
1328 current_upper.borrow_mut().clone_from(&batch_upper);
1329 upper_cap_set.downgrade(current_upper.borrow().iter());
1330 }
1331 Err(mismatch) => {
1332 // We tried to to a non-contiguous append, that won't work.
1333 if PartialOrder::less_than(&mismatch.current, &batch_lower) {
1334 // Best-effort attempt to delete unneeded batches.
1335 future::join_all(batches.into_iter().map(|b| b.batch.delete())).await;
1336
1337 // We always bail when this happens, regardless of
1338 // `bail_on_concurrent_modification`.
1339 tracing::warn!(
1340 "persist_sink({}): invalid upper! \
1341 Tried to append batch ({:?} -> {:?}) but upper \
1342 is {:?}. This is surpising and likely indicates \
1343 a bug in the persist sink, but we'll restart the \
1344 dataflow and try again.",
1345 collection_id, batch_lower, batch_upper, mismatch.current,
1346 );
1347 anyhow::bail!("collection concurrently modified. Ingestion dataflow will be restarted");
1348 } else if PartialOrder::less_than(&mismatch.current, &batch_upper) {
1349 // The shard's upper was ahead of our batch's lower
1350 // but not ahead of our upper. Cut down the
1351 // description by advancing its lower to the current
1352 // shard upper and try again. IMPORTANT: We can only
1353 // advance the lower, meaning we cut updates away,
1354 // we must not "extend" the batch by changing to a
1355 // lower that is not beyond the current lower. This
1356 // invariant is checked by the first if branch: if
1357 // `!(current_upper < lower)` then it holds that
1358 // `lower <= current_upper`.
1359
1360 // First, construct a new batch description with the
1361 // lower advanced to the current shard upper.
1362 let new_batch_lower = mismatch.current.clone();
1363 let new_done_batch_metadata =
1364 (new_batch_lower.clone(), batch_upper.clone());
1365
1366 // Retain any batches that are still in advance of
1367 // the new lower, and delete any batches that are
1368 // not.
1369 let mut batch_delete_futures = vec![];
1370 let mut new_batch_set = BatchSet::default();
1371 for batch in batches {
1372 if new_batch_lower.less_equal(&batch.data_ts) {
1373 new_batch_set.finished.push(batch);
1374 } else {
1375 batch_delete_futures.push(batch.batch.delete());
1376 }
1377 }
1378
1379 // Re-add the new batch to the list of batches to process.
1380 todo.push_front((new_done_batch_metadata, new_batch_set));
1381
1382 // Best-effort attempt to delete unneeded batches.
1383 future::join_all(batch_delete_futures).await;
1384 } else {
1385 // Best-effort attempt to delete unneeded batches.
1386 future::join_all(batches.into_iter().map(|b| b.batch.delete())).await;
1387 }
1388
1389 if bail_on_concurrent_modification {
1390 tracing::warn!(
1391 "persist_sink({}): invalid upper! \
1392 Tried to append batch ({:?} -> {:?}) but upper \
1393 is {:?}. This is not a problem, it just means \
1394 someone else was faster than us. We will try \
1395 again with a new batch description.",
1396 collection_id, batch_lower, batch_upper, mismatch.current,
1397 );
1398 anyhow::bail!("collection concurrently modified. Ingestion dataflow will be restarted");
1399 }
1400 }
1401 }
1402 }
1403 }
1404 }));
1405
1406 (upper_stream, errors, shutdown_button.press_on_drop())
1407}