mz_storage/render/persist_sink.rs
1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Render an operator that persists a source collection.
11//!
12//! ## Implementation
13//!
14//! This module defines the `persist_sink` operator, that writes
15//! a collection produced by source rendering into a persist shard.
16//!
17//! It attempts to use all workers to write data to persist, and uses
18//! single-instance workers to coordinate work. The below diagram
19//! is an overview how it it shaped. There is more information
20//! in the doc comments of the top-level functions of this module.
21//!
22//!```text
23//!
24//! ,------------.
25//! | source |
26//! | collection |
27//! +---+--------+
28//! / |
29//! / |
30//! / |
31//! / |
32//! / |
33//! / |
34//! / |
35//! / |
36//! / ,-+-----------------------.
37//! / | mint_batch_descriptions |
38//! / | one arbitrary worker |
39//! | +-,--,--------+----+------+
40//! ,----------´.-´ | \
41//! _.-´ | .-´ | \
42//! _.-´ | .-´ | \
43//! .-´ .------+----|-------+---------|--------\-----.
44//! / / | | | \ \
45//! ,--------------. ,-----------------. | ,-----------------.
46//! | write_batches| | write_batches | | | write_batches |
47//! | worker 0 | | worker 1 | | | worker N |
48//! +-----+--------+ +-+---------------+ | +--+--------------+
49//! \ \ | /
50//! `-. `, | /
51//! `-._ `-. | /
52//! `-._ `-. | /
53//! `---------. `-. | /
54//! +`---`---+-------------,
55//! | append_batches |
56//! | one arbitrary worker |
57//! +------+---------------+
58//!```
59//!
60//! ## Similarities with `mz_compute::sink::persist_sink`
61//!
62//! This module has many similarities with the compute version of
63//! the same concept, and in fact, is entirely derived from it.
64//!
65//! Compute requires that its `persist_sink` is _self-correcting_;
66//! that is, it corrects what the collection in persist
67//! accumulates to if the collection has values changed at
68//! previous timestamps. It does this by continually comparing
69//! the input stream with the collection as read back from persist.
70//!
71//! Source collections, while definite, cannot be reliably by
72//! re-produced once written down, which means compute's
73//! `persist_sink`'s self-correction mechanism would need to be
74//! skipped on operator startup, and would cause unnecessary read
75//! load on persist.
76//!
77//! Additionally, persisting sources requires we use bounded
78//! amounts of memory, even if a single timestamp represents
79//! a huge amount of data. This is not (currently) possible
80//! to guarantee while also performing self-correction.
81//!
82//! Because of this, we have ripped out the self-correction
83//! mechanism, and aggressively simplified the sub-operators.
84//! Some, particularly `append_batches` could be merged with
85//! the compute version, but that requires some amount of
86//! onerous refactoring that we have chosen to skip for now.
87//!
88// TODO(guswynn): merge at least the `append_batches` operator`
89
90use std::cmp::Ordering;
91use std::collections::{BTreeMap, VecDeque};
92use std::fmt::Debug;
93use std::ops::AddAssign;
94use std::rc::Rc;
95use std::sync::Arc;
96use std::time::Duration;
97
98use differential_dataflow::difference::Monoid;
99use differential_dataflow::lattice::Lattice;
100use differential_dataflow::{AsCollection, Hashable, VecCollection};
101use futures::{StreamExt, future};
102use itertools::Itertools;
103use mz_ore::cast::CastFrom;
104use mz_ore::collections::HashMap;
105use mz_persist_client::Diagnostics;
106use mz_persist_client::batch::{Batch, BatchBuilder, ProtoBatch};
107use mz_persist_client::cache::PersistClientCache;
108use mz_persist_client::error::UpperMismatch;
109use mz_persist_types::codec_impls::UnitSchema;
110use mz_persist_types::{Codec, Codec64};
111use mz_repr::{Diff, GlobalId, Row};
112use mz_storage_types::controller::CollectionMetadata;
113use mz_storage_types::errors::DataflowError;
114use mz_storage_types::sources::SourceData;
115use mz_storage_types::{StorageDiff, dyncfgs};
116use mz_timely_util::builder_async::{
117 Event, OperatorBuilder as AsyncOperatorBuilder, PressOnDropButton,
118};
119use serde::{Deserialize, Serialize};
120use timely::PartialOrder;
121use timely::container::CapacityContainerBuilder;
122use timely::dataflow::channels::pact::{Exchange, Pipeline};
123use timely::dataflow::operators::vec::Broadcast;
124use timely::dataflow::operators::{Capability, CapabilitySet, Inspect};
125use timely::dataflow::{Scope, Stream, StreamVec};
126use timely::progress::{Antichain, Timestamp};
127use tokio::sync::Semaphore;
128use tracing::trace;
129
130use crate::metrics::source::SourcePersistSinkMetrics;
131use crate::storage_state::StorageState;
132
133/// Metrics about batches.
134#[derive(Clone, Debug, Default, Deserialize, Serialize)]
135struct BatchMetrics {
136 inserts: u64,
137 retractions: u64,
138 error_inserts: u64,
139 error_retractions: u64,
140}
141
142impl AddAssign<&BatchMetrics> for BatchMetrics {
143 fn add_assign(&mut self, rhs: &BatchMetrics) {
144 let BatchMetrics {
145 inserts: self_inserts,
146 retractions: self_retractions,
147 error_inserts: self_error_inserts,
148 error_retractions: self_error_retractions,
149 } = self;
150 let BatchMetrics {
151 inserts: rhs_inserts,
152 retractions: rhs_retractions,
153 error_inserts: rhs_error_inserts,
154 error_retractions: rhs_error_retractions,
155 } = rhs;
156 *self_inserts += rhs_inserts;
157 *self_retractions += rhs_retractions;
158 *self_error_inserts += rhs_error_inserts;
159 *self_error_retractions += rhs_error_retractions;
160 }
161}
162
163/// Manages batches and metrics.
164struct BatchBuilderAndMetadata<K, V, T, D>
165where
166 K: Codec,
167 V: Codec,
168 T: Timestamp + Lattice + Codec64,
169{
170 builder: BatchBuilder<K, V, T, D>,
171 data_ts: T,
172 metrics: BatchMetrics,
173}
174
175impl<K, V, T, D> BatchBuilderAndMetadata<K, V, T, D>
176where
177 K: Codec + Debug,
178 V: Codec + Debug,
179 T: Timestamp + Lattice + Codec64,
180 D: Monoid + Codec64,
181{
182 /// Creates a new batch.
183 ///
184 /// NOTE(benesch): temporary restriction: all updates added to the batch
185 /// must be at the specified timestamp `data_ts`.
186 fn new(builder: BatchBuilder<K, V, T, D>, data_ts: T) -> Self {
187 BatchBuilderAndMetadata {
188 builder,
189 data_ts,
190 metrics: Default::default(),
191 }
192 }
193
194 /// Adds an update to the batch.
195 ///
196 /// NOTE(benesch): temporary restriction: all updates added to the batch
197 /// must be at the timestamp specified during creation.
198 async fn add(&mut self, k: &K, v: &V, t: &T, d: &D) {
199 assert_eq!(
200 self.data_ts, *t,
201 "BatchBuilderAndMetadata::add called with a timestamp {t:?} that does not match creation timestamp {:?}",
202 self.data_ts
203 );
204
205 self.builder.add(k, v, t, d).await.expect("invalid usage");
206 }
207
208 async fn finish(self, lower: Antichain<T>, upper: Antichain<T>) -> HollowBatchAndMetadata<T> {
209 let batch = self
210 .builder
211 .finish(upper.clone())
212 .await
213 .expect("invalid usage");
214 HollowBatchAndMetadata {
215 lower,
216 upper,
217 data_ts: self.data_ts,
218 batch: batch.into_transmittable_batch(),
219 metrics: self.metrics,
220 }
221 }
222}
223
224/// A batch or data + metrics moved from `write_batches` to `append_batches`.
225#[derive(Clone, Debug, Deserialize, Serialize)]
226#[serde(bound(
227 serialize = "T: Timestamp + Codec64",
228 deserialize = "T: Timestamp + Codec64"
229))]
230struct HollowBatchAndMetadata<T> {
231 lower: Antichain<T>,
232 upper: Antichain<T>,
233 data_ts: T,
234 batch: ProtoBatch,
235 metrics: BatchMetrics,
236}
237
238/// Holds finished batches for `append_batches`.
239#[derive(Debug, Default)]
240struct BatchSet {
241 finished: Vec<FinishedBatch>,
242 batch_metrics: BatchMetrics,
243}
244
245#[derive(Debug)]
246struct FinishedBatch {
247 batch: Batch<SourceData, (), mz_repr::Timestamp, StorageDiff>,
248 data_ts: mz_repr::Timestamp,
249}
250
251/// Continuously writes the `desired_stream` into persist
252/// This is done via a multi-stage operator graph:
253///
254/// 1. `mint_batch_descriptions` emits new batch descriptions whenever the
255/// frontier of `desired_collection` advances. A batch description is
256/// a pair of `(lower, upper)` that tells write operators
257/// which updates to write and in the end tells the append operator
258/// what frontiers to use when calling `append`/`compare_and_append`.
259/// This is a single-worker operator.
260/// 2. `write_batches` writes the `desired_collection` to persist as
261/// batches and sends those batches along.
262/// This does not yet append the batches to the persist shard, the update are
263/// only uploaded/prepared to be appended to a shard. Also: we only write
264/// updates for batch descriptions that we learned about from
265/// `mint_batch_descriptions`.
266/// 3. `append_batches` takes as input the minted batch descriptions and written
267/// batches. Whenever the frontiers sufficiently advance, we take a batch
268/// description and all the batches that belong to it and append it to the
269/// persist shard.
270///
271/// This operator assumes that the `desired_collection` comes pre-sharded.
272///
273/// Note that `mint_batch_descriptions` inspects the frontier of
274/// `desired_collection`, and passes the data through to `write_batches`.
275/// This is done to avoid a clone of the underlying data so that both
276/// operators can have the collection as input.
277pub(crate) fn render<'scope>(
278 scope: Scope<'scope, mz_repr::Timestamp>,
279 collection_id: GlobalId,
280 target: CollectionMetadata,
281 desired_collection: VecCollection<'scope, mz_repr::Timestamp, Result<Row, DataflowError>, Diff>,
282 storage_state: &StorageState,
283 metrics: SourcePersistSinkMetrics,
284 busy_signal: Arc<Semaphore>,
285) -> (
286 StreamVec<'scope, mz_repr::Timestamp, ()>,
287 StreamVec<'scope, mz_repr::Timestamp, Rc<anyhow::Error>>,
288 Vec<PressOnDropButton>,
289) {
290 let persist_clients = Arc::clone(&storage_state.persist_clients);
291
292 let operator_name = format!("persist_sink({})", collection_id);
293
294 let (batch_descriptions, passthrough_desired_stream, mint_token) = mint_batch_descriptions(
295 scope,
296 collection_id,
297 &operator_name,
298 &target,
299 desired_collection,
300 Arc::clone(&persist_clients),
301 );
302
303 let (written_batches, write_token) = write_batches(
304 scope,
305 collection_id.clone(),
306 &operator_name,
307 &target,
308 batch_descriptions.clone(),
309 passthrough_desired_stream.as_collection(),
310 Arc::clone(&persist_clients),
311 storage_state,
312 Arc::clone(&busy_signal),
313 );
314
315 let (upper_stream, append_errors, append_token) = append_batches(
316 scope,
317 collection_id.clone(),
318 operator_name,
319 &target,
320 batch_descriptions,
321 written_batches,
322 persist_clients,
323 storage_state,
324 metrics,
325 Arc::clone(&busy_signal),
326 );
327
328 (
329 upper_stream,
330 append_errors,
331 vec![mint_token, write_token, append_token],
332 )
333}
334
335/// Whenever the frontier advances, this mints a new batch description (lower
336/// and upper) that writers should use for writing the next set of batches to
337/// persist.
338///
339/// Only one of the workers does this, meaning there will only be one
340/// description in the stream, even in case of multiple timely workers. Use
341/// `broadcast()` to, ahem, broadcast, the one description to all downstream
342/// write operators/workers.
343fn mint_batch_descriptions<'scope>(
344 scope: Scope<'scope, mz_repr::Timestamp>,
345 collection_id: GlobalId,
346 operator_name: &str,
347 target: &CollectionMetadata,
348 desired_collection: VecCollection<'scope, mz_repr::Timestamp, Result<Row, DataflowError>, Diff>,
349 persist_clients: Arc<PersistClientCache>,
350) -> (
351 StreamVec<
352 'scope,
353 mz_repr::Timestamp,
354 (Antichain<mz_repr::Timestamp>, Antichain<mz_repr::Timestamp>),
355 >,
356 StreamVec<'scope, mz_repr::Timestamp, (Result<Row, DataflowError>, mz_repr::Timestamp, Diff)>,
357 PressOnDropButton,
358) {
359 let persist_location = target.persist_location.clone();
360 let shard_id = target.data_shard;
361 let target_relation_desc = target.relation_desc.clone();
362
363 // Only one worker is responsible for determining batch descriptions. All
364 // workers must write batches with the same description, to ensure that they
365 // can be combined into one batch that gets appended to Consensus state.
366 let hashed_id = collection_id.hashed();
367 let active_worker = usize::cast_from(hashed_id) % scope.peers() == scope.index();
368
369 // Only the "active" operator will mint batches. All other workers have an
370 // empty frontier. It's necessary to insert all of these into
371 // `compute_state.sink_write_frontier` below so we properly clear out
372 // default frontiers of non-active workers.
373
374 let mut mint_op = AsyncOperatorBuilder::new(
375 format!("{} mint_batch_descriptions", operator_name),
376 scope.clone(),
377 );
378
379 let (output, output_stream) = mint_op.new_output::<CapacityContainerBuilder<Vec<_>>>();
380 let (data_output, data_output_stream) =
381 mint_op.new_output::<CapacityContainerBuilder<Vec<_>>>();
382
383 // The description and the data-passthrough outputs are both driven by this input, so
384 // they use a standard input connection.
385 let mut desired_input =
386 mint_op.new_input_for_many(desired_collection.inner, Pipeline, [&output, &data_output]);
387
388 let shutdown_button = mint_op.build(move |capabilities| async move {
389 // Non-active workers should just pass the data through.
390 if !active_worker {
391 // The description output is entirely driven by the active worker, so we drop
392 // its capability here. The data-passthrough output just uses the data
393 // capabilities.
394 drop(capabilities);
395 while let Some(event) = desired_input.next().await {
396 match event {
397 Event::Data([_output_cap, data_output_cap], mut data) => {
398 data_output.give_container(&data_output_cap, &mut data);
399 }
400 Event::Progress(_) => {}
401 }
402 }
403 return;
404 }
405 // The data-passthrough output should will use the data capabilities, so we drop
406 // its capability here.
407 let [desc_cap, _]: [_; 2] = capabilities.try_into().expect("one capability per output");
408 let mut cap_set = CapabilitySet::from_elem(desc_cap);
409
410 // Initialize this operators's `upper` to the `upper` of the persist shard we are writing
411 // to. Data from the source not beyond this time will be dropped, as it has already
412 // been persisted.
413 // In the future, sources will avoid passing through data not beyond this upper
414 let mut current_upper = {
415 // TODO(aljoscha): We need to figure out what to do with error
416 // results from these calls.
417 let persist_client = persist_clients
418 .open(persist_location)
419 .await
420 .expect("could not open persist client");
421
422 let mut write = persist_client
423 .open_writer::<SourceData, (), mz_repr::Timestamp, StorageDiff>(
424 shard_id,
425 Arc::new(target_relation_desc),
426 Arc::new(UnitSchema),
427 Diagnostics {
428 shard_name: collection_id.to_string(),
429 handle_purpose: format!(
430 "storage::persist_sink::mint_batch_descriptions {}",
431 collection_id
432 ),
433 },
434 )
435 .await
436 .expect("could not open persist shard");
437
438 // TODO: this sink currently cannot tolerate a stale upper... which is bad because the
439 // upper can become stale as soon as it is read. (For example, if another concurrent
440 // instance of the sink has updated it.) Fetching a recent upper helps to mitigate this,
441 // but ideally we would just skip ahead if we discover that our upper is stale.
442 let upper = write.fetch_recent_upper().await.clone();
443 // explicitly expire the once-used write handle.
444 write.expire().await;
445 upper
446 };
447
448 // The current input frontiers.
449 let mut desired_frontier;
450
451 loop {
452 if let Some(event) = desired_input.next().await {
453 match event {
454 Event::Data([_output_cap, data_output_cap], mut data) => {
455 // Just passthrough the data.
456 data_output.give_container(&data_output_cap, &mut data);
457 continue;
458 }
459 Event::Progress(frontier) => {
460 desired_frontier = frontier;
461 }
462 }
463 } else {
464 // Input is exhausted, so we can shut down.
465 return;
466 };
467
468 // If the new frontier for the data input has progressed, produce a batch description.
469 if PartialOrder::less_than(¤t_upper, &desired_frontier) {
470 // The maximal description range we can produce.
471 let batch_description = (current_upper.to_owned(), desired_frontier.to_owned());
472
473 let lower = batch_description.0.as_option().copied().unwrap();
474
475 let cap = cap_set
476 .try_delayed(&lower)
477 .ok_or_else(|| {
478 format!(
479 "minter cannot delay {:?} to {:?}. \
480 Likely because we already emitted a \
481 batch description and delayed.",
482 cap_set, lower
483 )
484 })
485 .unwrap();
486
487 trace!(
488 "persist_sink {collection_id}/{shard_id}: \
489 new batch_description: {:?}",
490 batch_description
491 );
492
493 output.give(&cap, batch_description);
494
495 // We downgrade our capability to the batch
496 // description upper, as there will never be
497 // any overlapping descriptions.
498 trace!(
499 "persist_sink {collection_id}/{shard_id}: \
500 downgrading to {:?}",
501 desired_frontier
502 );
503 cap_set.downgrade(desired_frontier.iter());
504
505 // After successfully emitting a new description, we can update the upper for the
506 // operator.
507 current_upper.clone_from(&desired_frontier);
508 }
509 }
510 });
511
512 (
513 output_stream,
514 data_output_stream,
515 shutdown_button.press_on_drop(),
516 )
517}
518
519/// Writes `desired_collection` to persist, but only for updates
520/// that fall into batch a description that we get via `batch_descriptions`.
521/// This forwards a `HollowBatch` (with additional metadata)
522/// for any batch of updates that was written.
523///
524/// This operator assumes that the `desired_collection` comes pre-sharded.
525///
526/// This also and updates various metrics.
527fn write_batches<'scope>(
528 scope: Scope<'scope, mz_repr::Timestamp>,
529 collection_id: GlobalId,
530 operator_name: &str,
531 target: &CollectionMetadata,
532 batch_descriptions: Stream<
533 'scope,
534 mz_repr::Timestamp,
535 Vec<(Antichain<mz_repr::Timestamp>, Antichain<mz_repr::Timestamp>)>,
536 >,
537 desired_collection: VecCollection<'scope, mz_repr::Timestamp, Result<Row, DataflowError>, Diff>,
538 persist_clients: Arc<PersistClientCache>,
539 storage_state: &StorageState,
540 busy_signal: Arc<Semaphore>,
541) -> (
542 StreamVec<'scope, mz_repr::Timestamp, HollowBatchAndMetadata<mz_repr::Timestamp>>,
543 PressOnDropButton,
544) {
545 let worker_index = scope.index();
546
547 let persist_location = target.persist_location.clone();
548 let shard_id = target.data_shard;
549 let target_relation_desc = target.relation_desc.clone();
550
551 let source_statistics = storage_state
552 .aggregated_statistics
553 .get_source(&collection_id)
554 .expect("statistics initialized")
555 .clone();
556
557 let mut write_op =
558 AsyncOperatorBuilder::new(format!("{} write_batches", operator_name), scope.clone());
559
560 let (output, output_stream) = write_op.new_output::<CapacityContainerBuilder<Vec<_>>>();
561
562 let mut descriptions_input =
563 write_op.new_input_for(batch_descriptions.broadcast(), Pipeline, &output);
564 let mut desired_input = write_op.new_disconnected_input(desired_collection.inner, Pipeline);
565
566 // This operator accepts the current and desired update streams for a `persist` shard.
567 // It attempts to write out updates, starting from the current's upper frontier, that
568 // will cause the changes of desired to be committed to persist, _but only those also past the
569 // upper_.
570
571 let shutdown_button = write_op.build(move |_capabilities| async move {
572 // In-progress batches of data, keyed by timestamp.
573 let mut stashed_batches = BTreeMap::new();
574
575 // Contains descriptions of batches for which we know that we can
576 // write data. We got these from the "centralized" operator that
577 // determines batch descriptions for all writers.
578 //
579 // `Antichain` does not implement `Ord`, so we cannot use a `BTreeMap`. We need to search
580 // through the map, so we cannot use the `mz_ore` wrapper either.
581 #[allow(clippy::disallowed_types)]
582 let mut in_flight_batches = std::collections::HashMap::<
583 (Antichain<mz_repr::Timestamp>, Antichain<mz_repr::Timestamp>),
584 Capability<mz_repr::Timestamp>,
585 >::new();
586
587 // TODO(aljoscha): We need to figure out what to do with error results from these calls.
588 let persist_client = persist_clients
589 .open(persist_location)
590 .await
591 .expect("could not open persist client");
592
593 let write = persist_client
594 .open_writer::<SourceData, (), mz_repr::Timestamp, StorageDiff>(
595 shard_id,
596 Arc::new(target_relation_desc),
597 Arc::new(UnitSchema),
598 Diagnostics {
599 shard_name: collection_id.to_string(),
600 handle_purpose: format!(
601 "storage::persist_sink::write_batches {}",
602 collection_id
603 ),
604 },
605 )
606 .await
607 .expect("could not open persist shard");
608
609 // The current input frontiers.
610 let mut batch_descriptions_frontier = Antichain::from_elem(Timestamp::minimum());
611 let mut desired_frontier = Antichain::from_elem(Timestamp::minimum());
612
613 // The frontiers of the inputs we have processed, used to avoid redoing work
614 let mut processed_desired_frontier = Antichain::from_elem(Timestamp::minimum());
615 let mut processed_descriptions_frontier = Antichain::from_elem(Timestamp::minimum());
616
617 // A "safe" choice for the lower of new batches we are creating.
618 let mut operator_batch_lower = Antichain::from_elem(Timestamp::minimum());
619
620 while !(batch_descriptions_frontier.is_empty() && desired_frontier.is_empty()) {
621 // Wait for either inputs to become ready
622 tokio::select! {
623 _ = descriptions_input.ready() => {},
624 _ = desired_input.ready() => {},
625 }
626
627 // Collect ready work from both inputs
628 while let Some(event) = descriptions_input.next_sync() {
629 match event {
630 Event::Data(cap, data) => {
631 // Ingest new batch descriptions.
632 for description in data {
633 if collection_id.is_user() {
634 trace!(
635 "persist_sink {collection_id}/{shard_id}: \
636 write_batches: \
637 new_description: {:?}, \
638 desired_frontier: {:?}, \
639 batch_descriptions_frontier: {:?}",
640 description, desired_frontier, batch_descriptions_frontier,
641 );
642 }
643 match in_flight_batches.entry(description) {
644 std::collections::hash_map::Entry::Vacant(v) => {
645 // This _should_ be `.retain`, but rust
646 // currently thinks we can't use `cap`
647 // as an owned value when using the
648 // match guard `Some(event)`
649 v.insert(cap.delayed(cap.time()));
650 }
651 std::collections::hash_map::Entry::Occupied(o) => {
652 let (description, _) = o.remove_entry();
653 panic!(
654 "write_batches: sink {} got more than one \
655 batch for description {:?}, in-flight: {:?}",
656 collection_id, description, in_flight_batches
657 );
658 }
659 }
660 }
661 }
662 Event::Progress(frontier) => {
663 batch_descriptions_frontier = frontier;
664 }
665 }
666 }
667
668 let ready_events = std::iter::from_fn(|| desired_input.next_sync()).collect_vec();
669
670 // We know start the async work for the input we received. Until we finish the dataflow
671 // should be marked as busy.
672 let permit = busy_signal.acquire().await;
673
674 for event in ready_events {
675 match event {
676 Event::Data(_cap, data) => {
677 // Extract desired rows as positive contributions to `correction`.
678 if collection_id.is_user() && !data.is_empty() {
679 trace!(
680 "persist_sink {collection_id}/{shard_id}: \
681 updates: {:?}, \
682 in-flight-batches: {:?}, \
683 desired_frontier: {:?}, \
684 batch_descriptions_frontier: {:?}",
685 data,
686 in_flight_batches,
687 desired_frontier,
688 batch_descriptions_frontier,
689 );
690 }
691
692 for (row, ts, diff) in data {
693 if write.upper().less_equal(&ts) {
694 let builder = stashed_batches.entry(ts).or_insert_with(|| {
695 BatchBuilderAndMetadata::new(
696 write.builder(operator_batch_lower.clone()),
697 ts,
698 )
699 });
700
701 let is_value = row.is_ok();
702
703 builder
704 .add(&SourceData(row), &(), &ts, &diff.into_inner())
705 .await;
706
707 source_statistics.inc_updates_staged_by(1);
708
709 // Note that we assume `diff` is either +1 or -1 here, being anything
710 // else is a logic bug we can't handle at the metric layer. We also
711 // assume this addition doesn't overflow.
712 match (is_value, diff.is_positive()) {
713 (true, true) => builder.metrics.inserts += diff.unsigned_abs(),
714 (true, false) => {
715 builder.metrics.retractions += diff.unsigned_abs()
716 }
717 (false, true) => {
718 builder.metrics.error_inserts += diff.unsigned_abs()
719 }
720 (false, false) => {
721 builder.metrics.error_retractions += diff.unsigned_abs()
722 }
723 }
724 }
725 }
726 }
727 Event::Progress(frontier) => {
728 desired_frontier = frontier;
729 }
730 }
731 }
732 // We may have the opportunity to commit updates, if either frontier
733 // has moved
734 if PartialOrder::less_equal(&processed_desired_frontier, &desired_frontier)
735 || PartialOrder::less_equal(
736 &processed_descriptions_frontier,
737 &batch_descriptions_frontier,
738 )
739 {
740 trace!(
741 "persist_sink {collection_id}/{shard_id}: \
742 CAN emit: \
743 processed_desired_frontier: {:?}, \
744 processed_descriptions_frontier: {:?}, \
745 desired_frontier: {:?}, \
746 batch_descriptions_frontier: {:?}",
747 processed_desired_frontier,
748 processed_descriptions_frontier,
749 desired_frontier,
750 batch_descriptions_frontier,
751 );
752
753 trace!(
754 "persist_sink {collection_id}/{shard_id}: \
755 in-flight batches: {:?}, \
756 batch_descriptions_frontier: {:?}, \
757 desired_frontier: {:?}",
758 in_flight_batches, batch_descriptions_frontier, desired_frontier,
759 );
760
761 // We can write updates for a given batch description when
762 // a) the batch is not beyond `batch_descriptions_frontier`,
763 // and b) we know that we have seen all updates that would
764 // fall into the batch, from `desired_frontier`.
765 let ready_batches = in_flight_batches
766 .keys()
767 .filter(|(lower, upper)| {
768 !PartialOrder::less_equal(&batch_descriptions_frontier, lower)
769 && !PartialOrder::less_than(&desired_frontier, upper)
770 })
771 .cloned()
772 .collect::<Vec<_>>();
773
774 trace!(
775 "persist_sink {collection_id}/{shard_id}: \
776 ready batches: {:?}",
777 ready_batches,
778 );
779
780 for batch_description in ready_batches {
781 let cap = in_flight_batches.remove(&batch_description).unwrap();
782
783 if collection_id.is_user() {
784 trace!(
785 "persist_sink {collection_id}/{shard_id}: \
786 emitting done batch: {:?}, cap: {:?}",
787 batch_description, cap
788 );
789 }
790
791 let (batch_lower, batch_upper) = batch_description;
792
793 let finalized_timestamps: Vec<_> = stashed_batches
794 .keys()
795 .filter(|time| {
796 batch_lower.less_equal(time) && !batch_upper.less_equal(time)
797 })
798 .copied()
799 .collect();
800
801 let mut batch_tokens = vec![];
802 for ts in finalized_timestamps {
803 let batch_builder = stashed_batches.remove(&ts).unwrap();
804
805 if collection_id.is_user() {
806 trace!(
807 "persist_sink {collection_id}/{shard_id}: \
808 wrote batch from worker {}: ({:?}, {:?}),
809 containing {:?}",
810 worker_index, batch_lower, batch_upper, batch_builder.metrics
811 );
812 }
813
814 let batch = batch_builder
815 .finish(batch_lower.clone(), batch_upper.clone())
816 .await;
817
818 // The next "safe" lower for batches is the meet (max) of all the emitted
819 // batches. These uppers all are not beyond the `desired_frontier`, which
820 // means all updates received by this operator will be beyond this lower.
821 // Additionally, the `mint_batch_descriptions` operator ensures that
822 // later-received batch descriptions will start beyond these uppers as
823 // well.
824 //
825 // It is impossible to emit a batch description that is
826 // beyond a not-yet emitted description in `in_flight_batches`, as
827 // a that description would also have been chosen as ready above.
828 operator_batch_lower = operator_batch_lower.join(&batch_upper);
829 batch_tokens.push(batch);
830 }
831
832 output.give_container(&cap, &mut batch_tokens);
833
834 processed_desired_frontier.clone_from(&desired_frontier);
835 processed_descriptions_frontier.clone_from(&batch_descriptions_frontier);
836 }
837 } else {
838 trace!(
839 "persist_sink {collection_id}/{shard_id}: \
840 cannot emit: processed_desired_frontier: {:?}, \
841 processed_descriptions_frontier: {:?}, \
842 desired_frontier: {:?}",
843 processed_desired_frontier, processed_descriptions_frontier, desired_frontier
844 );
845 }
846 drop(permit);
847 }
848 });
849
850 let output_stream = if collection_id.is_user() {
851 output_stream.inspect(|d| trace!("batch: {:?}", d))
852 } else {
853 output_stream
854 };
855
856 (output_stream, shutdown_button.press_on_drop())
857}
858
859/// Fuses written batches together and appends them to persist using one
860/// `compare_and_append` call. Writing only happens for batch descriptions where
861/// we know that no future batches will arrive, that is, for those batch
862/// descriptions that are not beyond the frontier of both the
863/// `batch_descriptions` and `batches` inputs.
864///
865/// This also keeps the shared frontier that is stored in `compute_state` in
866/// sync with the upper of the persist shard, and updates various metrics
867/// and statistics objects.
868fn append_batches<'scope>(
869 scope: Scope<'scope, mz_repr::Timestamp>,
870 collection_id: GlobalId,
871 operator_name: String,
872 target: &CollectionMetadata,
873 batch_descriptions: Stream<
874 'scope,
875 mz_repr::Timestamp,
876 Vec<(Antichain<mz_repr::Timestamp>, Antichain<mz_repr::Timestamp>)>,
877 >,
878 batches: StreamVec<'scope, mz_repr::Timestamp, HollowBatchAndMetadata<mz_repr::Timestamp>>,
879 persist_clients: Arc<PersistClientCache>,
880 storage_state: &StorageState,
881 metrics: SourcePersistSinkMetrics,
882 busy_signal: Arc<Semaphore>,
883) -> (
884 StreamVec<'scope, mz_repr::Timestamp, ()>,
885 StreamVec<'scope, mz_repr::Timestamp, Rc<anyhow::Error>>,
886 PressOnDropButton,
887) {
888 let persist_location = target.persist_location.clone();
889 let shard_id = target.data_shard;
890 let target_relation_desc = target.relation_desc.clone();
891
892 // We can only be lenient with concurrent modifications when we know that
893 // this source pipeline is using the feedback upsert operator, which works
894 // correctly when multiple instances of an ingestion pipeline produce
895 // different updates, because of concurrency/non-determinism.
896 let use_continual_feedback_upsert = dyncfgs::STORAGE_USE_CONTINUAL_FEEDBACK_UPSERT
897 .get(storage_state.storage_configuration.config_set());
898 let bail_on_concurrent_modification = !use_continual_feedback_upsert;
899
900 let mut read_only_rx = storage_state.read_only_rx.clone();
901
902 let operator_name = format!("{} append_batches", operator_name);
903 let mut append_op = AsyncOperatorBuilder::new(operator_name, scope.clone());
904
905 let hashed_id = collection_id.hashed();
906 let active_worker = usize::cast_from(hashed_id) % scope.peers() == scope.index();
907 let worker_id = scope.index();
908
909 // Both of these inputs are disconnected from the output capabilities of this operator, as
910 // any output of this operator is entirely driven by the `compare_and_append`s. Currently
911 // this operator has no outputs, but they may be added in the future, when merging with
912 // the compute `persist_sink`.
913 let mut descriptions_input =
914 append_op.new_disconnected_input(batch_descriptions, Exchange::new(move |_| hashed_id));
915 let mut batches_input =
916 append_op.new_disconnected_input(batches, Exchange::new(move |_| hashed_id));
917
918 let current_upper = Rc::clone(&storage_state.source_uppers[&collection_id]);
919 if !active_worker {
920 // This worker is not writing, so make sure it's "taken out" of the
921 // calculation by advancing to the empty frontier.
922 current_upper.borrow_mut().clear();
923 }
924
925 let source_statistics = storage_state
926 .aggregated_statistics
927 .get_source(&collection_id)
928 .expect("statistics initialized")
929 .clone();
930
931 // An output whose frontier tracks the last successful compare and append of this operator
932 let (_upper_output, upper_stream) = append_op.new_output::<CapacityContainerBuilder<Vec<_>>>();
933
934 // This operator accepts the batch descriptions and tokens that represent
935 // written batches. Written batches get appended to persist when we learn
936 // from our input frontiers that we have seen all batches for a given batch
937 // description.
938
939 let (shutdown_button, errors) = append_op.build_fallible(move |caps| Box::pin(async move {
940 let [upper_cap_set]: &mut [_; 1] = caps.try_into().unwrap();
941
942 // This may SEEM unnecessary, but metrics contains extra
943 // `DeleteOnDrop`-wrapped fields that will NOT be moved into this
944 // closure otherwise, dropping and destroying
945 // those metrics. This is because rust now only moves the
946 // explicitly-referenced fields into closures.
947 let metrics = metrics;
948
949 // Contains descriptions of batches for which we know that we can
950 // write data. We got these from the "centralized" operator that
951 // determines batch descriptions for all writers.
952 //
953 // `Antichain` does not implement `Ord`, so we cannot use a `BTreeSet`. We need to search
954 // through the set, so we cannot use the `mz_ore` wrapper either.
955 #[allow(clippy::disallowed_types)]
956 let mut in_flight_descriptions = std::collections::HashSet::<(
957 Antichain<mz_repr::Timestamp>,
958 Antichain<mz_repr::Timestamp>,
959 )>::new();
960
961 // In flight batches that haven't been `compare_and_append`'d yet, plus metrics about
962 // the batch.
963 let mut in_flight_batches = HashMap::<
964 (Antichain<mz_repr::Timestamp>, Antichain<mz_repr::Timestamp>),
965 BatchSet,
966 >::new();
967
968 source_statistics.initialize_rehydration_latency_ms();
969 if !active_worker {
970 // The non-active workers report that they are done snapshotting and hydrating.
971 let empty_frontier = Antichain::new();
972 source_statistics.initialize_snapshot_committed(&empty_frontier);
973 source_statistics.update_rehydration_latency_ms(&empty_frontier);
974 return Ok(());
975 }
976
977 let persist_client = persist_clients
978 .open(persist_location)
979 .await?;
980
981 let mut write = persist_client
982 .open_writer::<SourceData, (), mz_repr::Timestamp, StorageDiff>(
983 shard_id,
984 Arc::new(target_relation_desc),
985 Arc::new(UnitSchema),
986 Diagnostics {
987 shard_name:collection_id.to_string(),
988 handle_purpose: format!("persist_sink::append_batches {}", collection_id)
989 },
990 )
991 .await?;
992
993 // Initialize this sink's `upper` to the `upper` of the persist shard we are writing
994 // to. Data from the source not beyond this time will be dropped, as it has already
995 // been persisted.
996 // In the future, sources will avoid passing through data not beyond this upper
997 // VERY IMPORTANT: Only the active write worker must change the
998 // shared upper. All other workers have already cleared this
999 // upper above.
1000 current_upper.borrow_mut().clone_from(write.upper());
1001 upper_cap_set.downgrade(current_upper.borrow().iter());
1002 source_statistics.initialize_snapshot_committed(write.upper());
1003
1004 // The current input frontiers.
1005 let mut batch_description_frontier = Antichain::from_elem(Timestamp::minimum());
1006 let mut batches_frontier = Antichain::from_elem(Timestamp::minimum());
1007
1008 loop {
1009 tokio::select! {
1010 Some(event) = descriptions_input.next() => {
1011 match event {
1012 Event::Data(_cap, data) => {
1013 // Ingest new batch descriptions.
1014 for batch_description in data {
1015 if collection_id.is_user() {
1016 trace!(
1017 "persist_sink {collection_id}/{shard_id}: \
1018 append_batches: sink {}, \
1019 new description: {:?}, \
1020 batch_description_frontier: {:?}",
1021 collection_id,
1022 batch_description,
1023 batch_description_frontier
1024 );
1025 }
1026
1027 // This line has to be broken up, or
1028 // rustfmt fails in the whole function :(
1029 let is_new = in_flight_descriptions.insert(
1030 batch_description.clone()
1031 );
1032
1033 assert!(
1034 is_new,
1035 "append_batches: sink {} got more than one batch \
1036 for a given description in-flight: {:?}",
1037 collection_id, in_flight_batches
1038 );
1039 }
1040
1041 continue;
1042 }
1043 Event::Progress(frontier) => {
1044 batch_description_frontier = frontier;
1045 }
1046 }
1047 }
1048 Some(event) = batches_input.next() => {
1049 match event {
1050 Event::Data(_cap, data) => {
1051 for batch in data {
1052 let batch_description = (batch.lower.clone(), batch.upper.clone());
1053
1054 let batches = in_flight_batches
1055 .entry(batch_description)
1056 .or_default();
1057
1058 batches.finished.push(FinishedBatch {
1059 batch: write.batch_from_transmittable_batch(batch.batch),
1060 data_ts: batch.data_ts,
1061 });
1062 batches.batch_metrics += &batch.metrics;
1063 }
1064 continue;
1065 }
1066 Event::Progress(frontier) => {
1067 batches_frontier = frontier;
1068 }
1069 }
1070 }
1071 else => {
1072 // All inputs are exhausted, so we can shut down.
1073 return Ok(());
1074 }
1075 };
1076
1077 // Peel off any batches that are not beyond the frontier
1078 // anymore.
1079 //
1080 // It is correct to consider batches that are not beyond the
1081 // `batches_frontier` because it is held back by the writer
1082 // operator as long as a) the `batch_description_frontier` did
1083 // not advance and b) as long as the `desired_frontier` has not
1084 // advanced to the `upper` of a given batch description.
1085
1086 let mut done_batches = in_flight_descriptions
1087 .iter()
1088 .filter(|(lower, _upper)| !PartialOrder::less_equal(&batches_frontier, lower))
1089 .cloned()
1090 .collect::<Vec<_>>();
1091
1092 trace!(
1093 "persist_sink {collection_id}/{shard_id}: \
1094 append_batches: in_flight: {:?}, \
1095 done: {:?}, \
1096 batch_frontier: {:?}, \
1097 batch_description_frontier: {:?}",
1098 in_flight_descriptions,
1099 done_batches,
1100 batches_frontier,
1101 batch_description_frontier
1102 );
1103
1104 // Append batches in order, to ensure that their `lower` and
1105 // `upper` line up.
1106 done_batches.sort_by(|a, b| {
1107 if PartialOrder::less_than(a, b) {
1108 Ordering::Less
1109 } else if PartialOrder::less_than(b, a) {
1110 Ordering::Greater
1111 } else {
1112 Ordering::Equal
1113 }
1114 });
1115
1116 let validate_part_bounds_on_write = write.validate_part_bounds_on_write();
1117 let mut todo = VecDeque::new();
1118
1119 if validate_part_bounds_on_write {
1120 // Persist will expect each batch's bounds to match the append-time bounds; write them separately.
1121 for done_batch_metadata in done_batches.drain(..) {
1122 in_flight_descriptions.remove(&done_batch_metadata);
1123 let batch_set = in_flight_batches
1124 .remove(&done_batch_metadata)
1125 .unwrap_or_default();
1126 todo.push_back((done_batch_metadata, batch_set));
1127 }
1128 } else {
1129 // Persist should allow batches to be written as part of a single append even when the bounds don't
1130 // match exactly; group all eligible batches together.
1131 let mut combined_batch_metadata = None;
1132 let mut combined_batch_set = BatchSet::default();
1133 for done_batch_metadata in done_batches.drain(..) {
1134 in_flight_descriptions.remove(&done_batch_metadata);
1135 let mut batch_set = in_flight_batches
1136 .remove(&done_batch_metadata)
1137 .unwrap_or_default();
1138 match combined_batch_metadata.as_mut() {
1139 Some((_, upper)) => *upper = done_batch_metadata.1,
1140 None => combined_batch_metadata = Some(done_batch_metadata),
1141 }
1142 combined_batch_set.batch_metrics += &batch_set.batch_metrics;
1143 combined_batch_set.finished.append(&mut batch_set.finished);
1144 }
1145 if let Some(done_batch_metadata) = combined_batch_metadata {
1146 todo.push_back((done_batch_metadata, combined_batch_set))
1147 }
1148 };
1149
1150 while let Some((done_batch_metadata, batch_set)) = todo.pop_front() {
1151 in_flight_descriptions.remove(&done_batch_metadata);
1152
1153 let mut batches = batch_set.finished;
1154
1155 trace!(
1156 "persist_sink {collection_id}/{shard_id}: \
1157 done batch: {:?}, {:?}",
1158 done_batch_metadata,
1159 batches
1160 );
1161
1162 let (batch_lower, batch_upper) = done_batch_metadata;
1163
1164 let batch_metrics = batch_set.batch_metrics;
1165
1166 let mut to_append = batches.iter_mut().map(|b| &mut b.batch).collect::<Vec<_>>();
1167
1168 let result = {
1169 let maybe_err = if *read_only_rx.borrow() {
1170
1171 // We have to wait for either us coming out of read-only
1172 // mode or someone else applying a write that covers our
1173 // batch.
1174 //
1175 // If we didn't wait for the latter here, and just go
1176 // around the loop again, we might miss a moment where
1177 // _we_ have to write down a batch. For example when our
1178 // input frontier advances to a state where we can
1179 // write, and the read-write instance sees the same
1180 // update but then crashes before it can append a batch.
1181
1182 let maybe_err = loop {
1183 if collection_id.is_user() {
1184 tracing::debug!(
1185 %worker_id,
1186 %collection_id,
1187 %shard_id,
1188 ?batch_lower,
1189 ?batch_upper,
1190 ?current_upper,
1191 "persist_sink is in read-only mode, waiting until we come out of it or the shard upper advances"
1192 );
1193 }
1194
1195 // We don't try to be smart here, and for example
1196 // use `wait_for_upper_past()`. We'd have to use a
1197 // select!, which would require cancel safety of
1198 // `wait_for_upper_past()`, which it doesn't
1199 // advertise.
1200 let _ = tokio::time::timeout(
1201 Duration::from_secs(1),
1202 read_only_rx.changed(),
1203 )
1204 .await;
1205
1206 if !*read_only_rx.borrow() {
1207 if collection_id.is_user() {
1208 tracing::debug!(
1209 %worker_id,
1210 %collection_id,
1211 %shard_id,
1212 ?batch_lower,
1213 ?batch_upper,
1214 ?current_upper,
1215 "persist_sink has come out of read-only mode"
1216 );
1217 }
1218
1219 // It's okay to write now.
1220 break Ok(());
1221 }
1222
1223 let current_upper = write.fetch_recent_upper().await;
1224
1225 if PartialOrder::less_than(&batch_upper, current_upper) {
1226 // We synthesize an `UpperMismatch` so that we can go
1227 // through the same logic below for trimming down our
1228 // batches.
1229 //
1230 // Notably, we are not trying to be smart, and teach the
1231 // write operator about read-only mode. Writing down
1232 // those batches does not append anything to the persist
1233 // shard, and it would be a hassle to figure out in the
1234 // write workers how to trim down batches in read-only
1235 // mode, when the shard upper advances.
1236 //
1237 // Right here, in the logic below, we have all we need
1238 // for figuring out how to trim our batches.
1239
1240 if collection_id.is_user() {
1241 tracing::debug!(
1242 %worker_id,
1243 %collection_id,
1244 %shard_id,
1245 ?batch_lower,
1246 ?batch_upper,
1247 ?current_upper,
1248 "persist_sink not appending in read-only mode"
1249 );
1250 }
1251
1252 break Err(UpperMismatch {
1253 current: current_upper.clone(),
1254 expected: batch_lower.clone()}
1255 );
1256 }
1257 };
1258
1259 maybe_err
1260 } else {
1261 // It's okay to proceed with the write.
1262 Ok(())
1263 };
1264
1265 match maybe_err {
1266 Ok(()) => {
1267 let _permit = busy_signal.acquire().await;
1268
1269 write.compare_and_append_batch(
1270 &mut to_append[..],
1271 batch_lower.clone(),
1272 batch_upper.clone(),
1273 validate_part_bounds_on_write,
1274 )
1275 .await
1276 .expect("Invalid usage")
1277 },
1278 Err(e) => {
1279 // We forward the synthesize error message, so that
1280 // we go though the batch cleanup logic below.
1281 Err(e)
1282 }
1283 }
1284 };
1285
1286
1287 // These metrics are independent of whether it was _us_ or
1288 // _someone_ that managed to commit a batch that advanced the
1289 // upper.
1290 source_statistics.update_snapshot_committed(&batch_upper);
1291 source_statistics.update_rehydration_latency_ms(&batch_upper);
1292 metrics
1293 .progress
1294 .set(mz_persist_client::metrics::encode_ts_metric(&batch_upper));
1295
1296 if collection_id.is_user() {
1297 trace!(
1298 "persist_sink {collection_id}/{shard_id}: \
1299 append result for batch ({:?} -> {:?}): {:?}",
1300 batch_lower,
1301 batch_upper,
1302 result
1303 );
1304 }
1305
1306 match result {
1307 Ok(()) => {
1308 // Only update these metrics when we know that _we_ were
1309 // successful.
1310 let committed =
1311 batch_metrics.inserts + batch_metrics.retractions;
1312 source_statistics
1313 .inc_updates_committed_by(committed);
1314 metrics.processed_batches.inc();
1315 metrics.row_inserts.inc_by(batch_metrics.inserts);
1316 metrics.row_retractions.inc_by(batch_metrics.retractions);
1317 metrics.error_inserts.inc_by(batch_metrics.error_inserts);
1318 metrics
1319 .error_retractions
1320 .inc_by(batch_metrics.error_retractions);
1321
1322 current_upper.borrow_mut().clone_from(&batch_upper);
1323 upper_cap_set.downgrade(current_upper.borrow().iter());
1324 }
1325 Err(mismatch) => {
1326 // We tried to to a non-contiguous append, that won't work.
1327 if PartialOrder::less_than(&mismatch.current, &batch_lower) {
1328 // Best-effort attempt to delete unneeded batches.
1329 future::join_all(batches.into_iter().map(|b| b.batch.delete())).await;
1330
1331 // We always bail when this happens, regardless of
1332 // `bail_on_concurrent_modification`.
1333 tracing::warn!(
1334 "persist_sink({}): invalid upper! \
1335 Tried to append batch ({:?} -> {:?}) but upper \
1336 is {:?}. This is surpising and likely indicates \
1337 a bug in the persist sink, but we'll restart the \
1338 dataflow and try again.",
1339 collection_id, batch_lower, batch_upper, mismatch.current,
1340 );
1341 anyhow::bail!("collection concurrently modified. Ingestion dataflow will be restarted");
1342 } else if PartialOrder::less_than(&mismatch.current, &batch_upper) {
1343 // The shard's upper was ahead of our batch's lower
1344 // but not ahead of our upper. Cut down the
1345 // description by advancing its lower to the current
1346 // shard upper and try again. IMPORTANT: We can only
1347 // advance the lower, meaning we cut updates away,
1348 // we must not "extend" the batch by changing to a
1349 // lower that is not beyond the current lower. This
1350 // invariant is checked by the first if branch: if
1351 // `!(current_upper < lower)` then it holds that
1352 // `lower <= current_upper`.
1353
1354 // First, construct a new batch description with the
1355 // lower advanced to the current shard upper.
1356 let new_batch_lower = mismatch.current.clone();
1357 let new_done_batch_metadata =
1358 (new_batch_lower.clone(), batch_upper.clone());
1359
1360 // Retain any batches that are still in advance of
1361 // the new lower, and delete any batches that are
1362 // not.
1363 let mut batch_delete_futures = vec![];
1364 let mut new_batch_set = BatchSet::default();
1365 for batch in batches {
1366 if new_batch_lower.less_equal(&batch.data_ts) {
1367 new_batch_set.finished.push(batch);
1368 } else {
1369 batch_delete_futures.push(batch.batch.delete());
1370 }
1371 }
1372
1373 // Re-add the new batch to the list of batches to process.
1374 todo.push_front((new_done_batch_metadata, new_batch_set));
1375
1376 // Best-effort attempt to delete unneeded batches.
1377 future::join_all(batch_delete_futures).await;
1378 } else {
1379 // Best-effort attempt to delete unneeded batches.
1380 future::join_all(batches.into_iter().map(|b| b.batch.delete())).await;
1381 }
1382
1383 if bail_on_concurrent_modification {
1384 tracing::warn!(
1385 "persist_sink({}): invalid upper! \
1386 Tried to append batch ({:?} -> {:?}) but upper \
1387 is {:?}. This is not a problem, it just means \
1388 someone else was faster than us. We will try \
1389 again with a new batch description.",
1390 collection_id, batch_lower, batch_upper, mismatch.current,
1391 );
1392 anyhow::bail!("collection concurrently modified. Ingestion dataflow will be restarted");
1393 }
1394 }
1395 }
1396 }
1397 }
1398 }));
1399
1400 (upper_stream, errors, shutdown_button.press_on_drop())
1401}