mz_storage/render/persist_sink.rs
1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Render an operator that persists a source collection.
11//!
12//! ## Implementation
13//!
14//! This module defines the `persist_sink` operator, that writes
15//! a collection produced by source rendering into a persist shard.
16//!
17//! It attempts to use all workers to write data to persist, and uses
18//! single-instance workers to coordinate work. The below diagram
19//! is an overview how it it shaped. There is more information
20//! in the doc comments of the top-level functions of this module.
21//!
22//!```text
23//!
24//! ,------------.
25//! | source |
26//! | collection |
27//! +---+--------+
28//! / |
29//! / |
30//! / |
31//! / |
32//! / |
33//! / |
34//! / |
35//! / |
36//! / ,-+-----------------------.
37//! / | mint_batch_descriptions |
38//! / | one arbitrary worker |
39//! | +-,--,--------+----+------+
40//! ,----------´.-´ | \
41//! _.-´ | .-´ | \
42//! _.-´ | .-´ | \
43//! .-´ .------+----|-------+---------|--------\-----.
44//! / / | | | \ \
45//! ,--------------. ,-----------------. | ,-----------------.
46//! | write_batches| | write_batches | | | write_batches |
47//! | worker 0 | | worker 1 | | | worker N |
48//! +-----+--------+ +-+---------------+ | +--+--------------+
49//! \ \ | /
50//! `-. `, | /
51//! `-._ `-. | /
52//! `-._ `-. | /
53//! `---------. `-. | /
54//! +`---`---+-------------,
55//! | append_batches |
56//! | one arbitrary worker |
57//! +------+---------------+
58//!```
59//!
60//! ## Similarities with `mz_compute::sink::persist_sink`
61//!
62//! This module has many similarities with the compute version of
63//! the same concept, and in fact, is entirely derived from it.
64//!
65//! Compute requires that its `persist_sink` is _self-correcting_;
66//! that is, it corrects what the collection in persist
67//! accumulates to if the collection has values changed at
68//! previous timestamps. It does this by continually comparing
69//! the input stream with the collection as read back from persist.
70//!
71//! Source collections, while definite, cannot be reliably by
72//! re-produced once written down, which means compute's
73//! `persist_sink`'s self-correction mechanism would need to be
74//! skipped on operator startup, and would cause unnecessary read
75//! load on persist.
76//!
77//! Additionally, persisting sources requires we use bounded
78//! amounts of memory, even if a single timestamp represents
79//! a huge amount of data. This is not (currently) possible
80//! to guarantee while also performing self-correction.
81//!
82//! Because of this, we have ripped out the self-correction
83//! mechanism, and aggressively simplified the sub-operators.
84//! Some, particularly `append_batches` could be merged with
85//! the compute version, but that requires some amount of
86//! onerous refactoring that we have chosen to skip for now.
87//!
88// TODO(guswynn): merge at least the `append_batches` operator`
89
90use std::cmp::Ordering;
91use std::collections::BTreeMap;
92use std::fmt::Debug;
93use std::ops::AddAssign;
94use std::rc::Rc;
95use std::sync::Arc;
96use std::time::Duration;
97
98use differential_dataflow::difference::Semigroup;
99use differential_dataflow::lattice::Lattice;
100use differential_dataflow::{AsCollection, Collection, Hashable};
101use futures::{StreamExt, future};
102use itertools::Itertools;
103use mz_ore::cast::CastFrom;
104use mz_ore::collections::HashMap;
105use mz_persist_client::Diagnostics;
106use mz_persist_client::batch::{Batch, BatchBuilder, ProtoBatch};
107use mz_persist_client::cache::PersistClientCache;
108use mz_persist_client::error::UpperMismatch;
109use mz_persist_types::codec_impls::UnitSchema;
110use mz_persist_types::{Codec, Codec64};
111use mz_repr::{Diff, GlobalId, Row};
112use mz_storage_types::controller::CollectionMetadata;
113use mz_storage_types::errors::DataflowError;
114use mz_storage_types::sources::SourceData;
115use mz_storage_types::{StorageDiff, dyncfgs};
116use mz_timely_util::builder_async::{
117 Event, OperatorBuilder as AsyncOperatorBuilder, PressOnDropButton,
118};
119use serde::{Deserialize, Serialize};
120use timely::PartialOrder;
121use timely::container::CapacityContainerBuilder;
122use timely::dataflow::channels::pact::{Exchange, Pipeline};
123use timely::dataflow::operators::{Broadcast, Capability, CapabilitySet, Inspect};
124use timely::dataflow::{Scope, Stream};
125use timely::progress::{Antichain, Timestamp};
126use tokio::sync::Semaphore;
127use tracing::trace;
128
129use crate::metrics::source::SourcePersistSinkMetrics;
130use crate::storage_state::StorageState;
131
132/// Metrics about batches.
133#[derive(Clone, Debug, Default, Deserialize, Serialize)]
134struct BatchMetrics {
135 inserts: u64,
136 retractions: u64,
137 error_inserts: u64,
138 error_retractions: u64,
139}
140
141impl AddAssign<&BatchMetrics> for BatchMetrics {
142 fn add_assign(&mut self, rhs: &BatchMetrics) {
143 let BatchMetrics {
144 inserts: self_inserts,
145 retractions: self_retractions,
146 error_inserts: self_error_inserts,
147 error_retractions: self_error_retractions,
148 } = self;
149 let BatchMetrics {
150 inserts: rhs_inserts,
151 retractions: rhs_retractions,
152 error_inserts: rhs_error_inserts,
153 error_retractions: rhs_error_retractions,
154 } = rhs;
155 *self_inserts += rhs_inserts;
156 *self_retractions += rhs_retractions;
157 *self_error_inserts += rhs_error_inserts;
158 *self_error_retractions += rhs_error_retractions;
159 }
160}
161
162/// Manages batches and metrics.
163struct BatchBuilderAndMetadata<K, V, T, D>
164where
165 K: Codec,
166 V: Codec,
167 T: Timestamp + Lattice + Codec64,
168{
169 builder: BatchBuilder<K, V, T, D>,
170 data_ts: T,
171 metrics: BatchMetrics,
172}
173
174impl<K, V, T, D> BatchBuilderAndMetadata<K, V, T, D>
175where
176 K: Codec + Debug,
177 V: Codec + Debug,
178 T: Timestamp + Lattice + Codec64,
179 D: Semigroup + Codec64,
180{
181 /// Creates a new batch.
182 ///
183 /// NOTE(benesch): temporary restriction: all updates added to the batch
184 /// must be at the specified timestamp `data_ts`.
185 fn new(builder: BatchBuilder<K, V, T, D>, data_ts: T) -> Self {
186 BatchBuilderAndMetadata {
187 builder,
188 data_ts,
189 metrics: Default::default(),
190 }
191 }
192
193 /// Adds an update to the batch.
194 ///
195 /// NOTE(benesch): temporary restriction: all updates added to the batch
196 /// must be at the timestamp specified during creation.
197 async fn add(&mut self, k: &K, v: &V, t: &T, d: &D) {
198 assert_eq!(
199 self.data_ts, *t,
200 "BatchBuilderAndMetadata::add called with a timestamp {t:?} that does not match creation timestamp {:?}",
201 self.data_ts
202 );
203
204 self.builder.add(k, v, t, d).await.expect("invalid usage");
205 }
206
207 async fn finish(self, lower: Antichain<T>, upper: Antichain<T>) -> HollowBatchAndMetadata<T> {
208 let batch = self
209 .builder
210 .finish(upper.clone())
211 .await
212 .expect("invalid usage");
213 HollowBatchAndMetadata {
214 lower,
215 upper,
216 data_ts: self.data_ts,
217 batch: batch.into_transmittable_batch(),
218 metrics: self.metrics,
219 }
220 }
221}
222
223/// A batch or data + metrics moved from `write_batches` to `append_batches`.
224#[derive(Clone, Debug, Deserialize, Serialize)]
225#[serde(bound(
226 serialize = "T: Timestamp + Codec64",
227 deserialize = "T: Timestamp + Codec64"
228))]
229struct HollowBatchAndMetadata<T> {
230 lower: Antichain<T>,
231 upper: Antichain<T>,
232 data_ts: T,
233 batch: ProtoBatch,
234 metrics: BatchMetrics,
235}
236
237/// Holds finished batches for `append_batches`.
238#[derive(Debug, Default)]
239struct BatchSet {
240 finished: Vec<FinishedBatch>,
241 batch_metrics: BatchMetrics,
242}
243
244#[derive(Debug)]
245struct FinishedBatch {
246 batch: Batch<SourceData, (), mz_repr::Timestamp, StorageDiff>,
247 data_ts: mz_repr::Timestamp,
248}
249
250/// Continuously writes the `desired_stream` into persist
251/// This is done via a multi-stage operator graph:
252///
253/// 1. `mint_batch_descriptions` emits new batch descriptions whenever the
254/// frontier of `desired_collection` advances. A batch description is
255/// a pair of `(lower, upper)` that tells write operators
256/// which updates to write and in the end tells the append operator
257/// what frontiers to use when calling `append`/`compare_and_append`.
258/// This is a single-worker operator.
259/// 2. `write_batches` writes the `desired_collection` to persist as
260/// batches and sends those batches along.
261/// This does not yet append the batches to the persist shard, the update are
262/// only uploaded/prepared to be appended to a shard. Also: we only write
263/// updates for batch descriptions that we learned about from
264/// `mint_batch_descriptions`.
265/// 3. `append_batches` takes as input the minted batch descriptions and written
266/// batches. Whenever the frontiers sufficiently advance, we take a batch
267/// description and all the batches that belong to it and append it to the
268/// persist shard.
269///
270/// This operator assumes that the `desired_collection` comes pre-sharded.
271///
272/// Note that `mint_batch_descriptions` inspects the frontier of
273/// `desired_collection`, and passes the data through to `write_batches`.
274/// This is done to avoid a clone of the underlying data so that both
275/// operators can have the collection as input.
276pub(crate) fn render<G>(
277 scope: &G,
278 collection_id: GlobalId,
279 target: CollectionMetadata,
280 desired_collection: Collection<G, Result<Row, DataflowError>, Diff>,
281 storage_state: &StorageState,
282 metrics: SourcePersistSinkMetrics,
283 busy_signal: Arc<Semaphore>,
284) -> (
285 Stream<G, ()>,
286 Stream<G, Rc<anyhow::Error>>,
287 Vec<PressOnDropButton>,
288)
289where
290 G: Scope<Timestamp = mz_repr::Timestamp>,
291{
292 let persist_clients = Arc::clone(&storage_state.persist_clients);
293
294 let operator_name = format!("persist_sink({})", collection_id);
295
296 let (batch_descriptions, passthrough_desired_stream, mint_token) = mint_batch_descriptions(
297 scope,
298 collection_id,
299 &operator_name,
300 &target,
301 &desired_collection,
302 Arc::clone(&persist_clients),
303 );
304
305 let (written_batches, write_token) = write_batches(
306 scope,
307 collection_id.clone(),
308 &operator_name,
309 &target,
310 &batch_descriptions,
311 &passthrough_desired_stream.as_collection(),
312 Arc::clone(&persist_clients),
313 storage_state,
314 Arc::clone(&busy_signal),
315 );
316
317 let (upper_stream, append_errors, append_token) = append_batches(
318 scope,
319 collection_id.clone(),
320 operator_name,
321 &target,
322 &batch_descriptions,
323 &written_batches,
324 persist_clients,
325 storage_state,
326 metrics,
327 Arc::clone(&busy_signal),
328 );
329
330 (
331 upper_stream,
332 append_errors,
333 vec![mint_token, write_token, append_token],
334 )
335}
336
337/// Whenever the frontier advances, this mints a new batch description (lower
338/// and upper) that writers should use for writing the next set of batches to
339/// persist.
340///
341/// Only one of the workers does this, meaning there will only be one
342/// description in the stream, even in case of multiple timely workers. Use
343/// `broadcast()` to, ahem, broadcast, the one description to all downstream
344/// write operators/workers.
345fn mint_batch_descriptions<G>(
346 scope: &G,
347 collection_id: GlobalId,
348 operator_name: &str,
349 target: &CollectionMetadata,
350 desired_collection: &Collection<G, Result<Row, DataflowError>, Diff>,
351 persist_clients: Arc<PersistClientCache>,
352) -> (
353 Stream<G, (Antichain<mz_repr::Timestamp>, Antichain<mz_repr::Timestamp>)>,
354 Stream<G, (Result<Row, DataflowError>, mz_repr::Timestamp, Diff)>,
355 PressOnDropButton,
356)
357where
358 G: Scope<Timestamp = mz_repr::Timestamp>,
359{
360 let persist_location = target.persist_location.clone();
361 let shard_id = target.data_shard;
362 let target_relation_desc = target.relation_desc.clone();
363
364 // Only one worker is responsible for determining batch descriptions. All
365 // workers must write batches with the same description, to ensure that they
366 // can be combined into one batch that gets appended to Consensus state.
367 let hashed_id = collection_id.hashed();
368 let active_worker = usize::cast_from(hashed_id) % scope.peers() == scope.index();
369
370 // Only the "active" operator will mint batches. All other workers have an
371 // empty frontier. It's necessary to insert all of these into
372 // `compute_state.sink_write_frontier` below so we properly clear out
373 // default frontiers of non-active workers.
374
375 let mut mint_op = AsyncOperatorBuilder::new(
376 format!("{} mint_batch_descriptions", operator_name),
377 scope.clone(),
378 );
379
380 let (output, output_stream) = mint_op.new_output();
381 let (data_output, data_output_stream) = mint_op.new_output::<CapacityContainerBuilder<_>>();
382
383 // The description and the data-passthrough outputs are both driven by this input, so
384 // they use a standard input connection.
385 let mut desired_input =
386 mint_op.new_input_for_many(&desired_collection.inner, Pipeline, [&output, &data_output]);
387
388 let shutdown_button = mint_op.build(move |capabilities| async move {
389 // Non-active workers should just pass the data through.
390 if !active_worker {
391 // The description output is entirely driven by the active worker, so we drop
392 // its capability here. The data-passthrough output just uses the data
393 // capabilities.
394 drop(capabilities);
395 while let Some(event) = desired_input.next().await {
396 match event {
397 Event::Data([_output_cap, data_output_cap], mut data) => {
398 data_output.give_container(&data_output_cap, &mut data);
399 }
400 Event::Progress(_) => {}
401 }
402 }
403 return;
404 }
405 // The data-passthrough output should will use the data capabilities, so we drop
406 // its capability here.
407 let [desc_cap, _]: [_; 2] = capabilities.try_into().expect("one capability per output");
408 let mut cap_set = CapabilitySet::from_elem(desc_cap);
409
410 // Initialize this operators's `upper` to the `upper` of the persist shard we are writing
411 // to. Data from the source not beyond this time will be dropped, as it has already
412 // been persisted.
413 // In the future, sources will avoid passing through data not beyond this upper
414 let mut current_upper = {
415 // TODO(aljoscha): We need to figure out what to do with error
416 // results from these calls.
417 let persist_client = persist_clients
418 .open(persist_location)
419 .await
420 .expect("could not open persist client");
421
422 let mut write = persist_client
423 .open_writer::<SourceData, (), mz_repr::Timestamp, StorageDiff>(
424 shard_id,
425 Arc::new(target_relation_desc),
426 Arc::new(UnitSchema),
427 Diagnostics {
428 shard_name: collection_id.to_string(),
429 handle_purpose: format!(
430 "storage::persist_sink::mint_batch_descriptions {}",
431 collection_id
432 ),
433 },
434 )
435 .await
436 .expect("could not open persist shard");
437
438 // TODO: this sink currently cannot tolerate a stale upper... which is bad because the
439 // upper can become stale as soon as it is read. (For example, if another concurrent
440 // instance of the sink has updated it.) Fetching a recent upper helps to mitigate this,
441 // but ideally we would just skip ahead if we discover that our upper is stale.
442 let upper = write.fetch_recent_upper().await.clone();
443 // explicitly expire the once-used write handle.
444 write.expire().await;
445 upper
446 };
447
448 // The current input frontiers.
449 let mut desired_frontier;
450
451 loop {
452 if let Some(event) = desired_input.next().await {
453 match event {
454 Event::Data([_output_cap, data_output_cap], mut data) => {
455 // Just passthrough the data.
456 data_output.give_container(&data_output_cap, &mut data);
457 continue;
458 }
459 Event::Progress(frontier) => {
460 desired_frontier = frontier;
461 }
462 }
463 } else {
464 // Input is exhausted, so we can shut down.
465 return;
466 };
467
468 // If the new frontier for the data input has progressed, produce a batch description.
469 if PartialOrder::less_than(¤t_upper, &desired_frontier) {
470 // The maximal description range we can produce.
471 let batch_description = (current_upper.to_owned(), desired_frontier.to_owned());
472
473 let lower = batch_description.0.as_option().copied().unwrap();
474
475 let cap = cap_set
476 .try_delayed(&lower)
477 .ok_or_else(|| {
478 format!(
479 "minter cannot delay {:?} to {:?}. \
480 Likely because we already emitted a \
481 batch description and delayed.",
482 cap_set, lower
483 )
484 })
485 .unwrap();
486
487 trace!(
488 "persist_sink {collection_id}/{shard_id}: \
489 new batch_description: {:?}",
490 batch_description
491 );
492
493 output.give(&cap, batch_description);
494
495 // We downgrade our capability to the batch
496 // description upper, as there will never be
497 // any overlapping descriptions.
498 trace!(
499 "persist_sink {collection_id}/{shard_id}: \
500 downgrading to {:?}",
501 desired_frontier
502 );
503 cap_set.downgrade(desired_frontier.iter());
504
505 // After successfully emitting a new description, we can update the upper for the
506 // operator.
507 current_upper.clone_from(&desired_frontier);
508 }
509 }
510 });
511
512 (
513 output_stream,
514 data_output_stream,
515 shutdown_button.press_on_drop(),
516 )
517}
518
519/// Writes `desired_collection` to persist, but only for updates
520/// that fall into batch a description that we get via `batch_descriptions`.
521/// This forwards a `HollowBatch` (with additional metadata)
522/// for any batch of updates that was written.
523///
524/// This operator assumes that the `desired_collection` comes pre-sharded.
525///
526/// This also and updates various metrics.
527fn write_batches<G>(
528 scope: &G,
529 collection_id: GlobalId,
530 operator_name: &str,
531 target: &CollectionMetadata,
532 batch_descriptions: &Stream<G, (Antichain<mz_repr::Timestamp>, Antichain<mz_repr::Timestamp>)>,
533 desired_collection: &Collection<G, Result<Row, DataflowError>, Diff>,
534 persist_clients: Arc<PersistClientCache>,
535 storage_state: &StorageState,
536 busy_signal: Arc<Semaphore>,
537) -> (
538 Stream<G, HollowBatchAndMetadata<mz_repr::Timestamp>>,
539 PressOnDropButton,
540)
541where
542 G: Scope<Timestamp = mz_repr::Timestamp>,
543{
544 let worker_index = scope.index();
545
546 let persist_location = target.persist_location.clone();
547 let shard_id = target.data_shard;
548 let target_relation_desc = target.relation_desc.clone();
549
550 let source_statistics = storage_state
551 .aggregated_statistics
552 .get_source(&collection_id)
553 .expect("statistics initialized")
554 .clone();
555
556 let mut write_op =
557 AsyncOperatorBuilder::new(format!("{} write_batches", operator_name), scope.clone());
558
559 let (output, output_stream) = write_op.new_output::<CapacityContainerBuilder<_>>();
560
561 let mut descriptions_input =
562 write_op.new_input_for(&batch_descriptions.broadcast(), Pipeline, &output);
563 let mut desired_input = write_op.new_disconnected_input(&desired_collection.inner, Pipeline);
564
565 // This operator accepts the current and desired update streams for a `persist` shard.
566 // It attempts to write out updates, starting from the current's upper frontier, that
567 // will cause the changes of desired to be committed to persist, _but only those also past the
568 // upper_.
569
570 let shutdown_button = write_op.build(move |_capabilities| async move {
571 // In-progress batches of data, keyed by timestamp.
572 let mut stashed_batches = BTreeMap::new();
573
574 // Contains descriptions of batches for which we know that we can
575 // write data. We got these from the "centralized" operator that
576 // determines batch descriptions for all writers.
577 //
578 // `Antichain` does not implement `Ord`, so we cannot use a `BTreeMap`. We need to search
579 // through the map, so we cannot use the `mz_ore` wrapper either.
580 #[allow(clippy::disallowed_types)]
581 let mut in_flight_batches = std::collections::HashMap::<
582 (Antichain<mz_repr::Timestamp>, Antichain<mz_repr::Timestamp>),
583 Capability<mz_repr::Timestamp>,
584 >::new();
585
586 // TODO(aljoscha): We need to figure out what to do with error results from these calls.
587 let persist_client = persist_clients
588 .open(persist_location)
589 .await
590 .expect("could not open persist client");
591
592 let write = persist_client
593 .open_writer::<SourceData, (), mz_repr::Timestamp, StorageDiff>(
594 shard_id,
595 Arc::new(target_relation_desc),
596 Arc::new(UnitSchema),
597 Diagnostics {
598 shard_name: collection_id.to_string(),
599 handle_purpose: format!(
600 "storage::persist_sink::write_batches {}",
601 collection_id
602 ),
603 },
604 )
605 .await
606 .expect("could not open persist shard");
607
608 // The current input frontiers.
609 let mut batch_descriptions_frontier = Antichain::from_elem(Timestamp::minimum());
610 let mut desired_frontier = Antichain::from_elem(Timestamp::minimum());
611
612 // The frontiers of the inputs we have processed, used to avoid redoing work
613 let mut processed_desired_frontier = Antichain::from_elem(Timestamp::minimum());
614 let mut processed_descriptions_frontier = Antichain::from_elem(Timestamp::minimum());
615
616 // A "safe" choice for the lower of new batches we are creating.
617 let mut operator_batch_lower = Antichain::from_elem(Timestamp::minimum());
618
619 while !(batch_descriptions_frontier.is_empty() && desired_frontier.is_empty()) {
620 // Wait for either inputs to become ready
621 tokio::select! {
622 _ = descriptions_input.ready() => {},
623 _ = desired_input.ready() => {},
624 }
625
626 // Collect ready work from both inputs
627 while let Some(event) = descriptions_input.next_sync() {
628 match event {
629 Event::Data(cap, data) => {
630 // Ingest new batch descriptions.
631 for description in data {
632 if collection_id.is_user() {
633 trace!(
634 "persist_sink {collection_id}/{shard_id}: \
635 write_batches: \
636 new_description: {:?}, \
637 desired_frontier: {:?}, \
638 batch_descriptions_frontier: {:?}",
639 description, desired_frontier, batch_descriptions_frontier,
640 );
641 }
642 match in_flight_batches.entry(description) {
643 std::collections::hash_map::Entry::Vacant(v) => {
644 // This _should_ be `.retain`, but rust
645 // currently thinks we can't use `cap`
646 // as an owned value when using the
647 // match guard `Some(event)`
648 v.insert(cap.delayed(cap.time()));
649 }
650 std::collections::hash_map::Entry::Occupied(o) => {
651 let (description, _) = o.remove_entry();
652 panic!(
653 "write_batches: sink {} got more than one \
654 batch for description {:?}, in-flight: {:?}",
655 collection_id, description, in_flight_batches
656 );
657 }
658 }
659 }
660 }
661 Event::Progress(frontier) => {
662 batch_descriptions_frontier = frontier;
663 }
664 }
665 }
666
667 let ready_events = std::iter::from_fn(|| desired_input.next_sync()).collect_vec();
668
669 // We know start the async work for the input we received. Until we finish the dataflow
670 // should be marked as busy.
671 let permit = busy_signal.acquire().await;
672
673 for event in ready_events {
674 match event {
675 Event::Data(_cap, data) => {
676 // Extract desired rows as positive contributions to `correction`.
677 if collection_id.is_user() && !data.is_empty() {
678 trace!(
679 "persist_sink {collection_id}/{shard_id}: \
680 updates: {:?}, \
681 in-flight-batches: {:?}, \
682 desired_frontier: {:?}, \
683 batch_descriptions_frontier: {:?}",
684 data,
685 in_flight_batches,
686 desired_frontier,
687 batch_descriptions_frontier,
688 );
689 }
690
691 for (row, ts, diff) in data {
692 if write.upper().less_equal(&ts) {
693 let builder = stashed_batches.entry(ts).or_insert_with(|| {
694 BatchBuilderAndMetadata::new(
695 write.builder(operator_batch_lower.clone()),
696 ts,
697 )
698 });
699
700 let is_value = row.is_ok();
701
702 builder
703 .add(&SourceData(row), &(), &ts, &diff.into_inner())
704 .await;
705
706 source_statistics.inc_updates_staged_by(1);
707
708 // Note that we assume `diff` is either +1 or -1 here, being anything
709 // else is a logic bug we can't handle at the metric layer. We also
710 // assume this addition doesn't overflow.
711 match (is_value, diff.is_positive()) {
712 (true, true) => builder.metrics.inserts += diff.unsigned_abs(),
713 (true, false) => {
714 builder.metrics.retractions += diff.unsigned_abs()
715 }
716 (false, true) => {
717 builder.metrics.error_inserts += diff.unsigned_abs()
718 }
719 (false, false) => {
720 builder.metrics.error_retractions += diff.unsigned_abs()
721 }
722 }
723 }
724 }
725 }
726 Event::Progress(frontier) => {
727 desired_frontier = frontier;
728 }
729 }
730 }
731 // We may have the opportunity to commit updates, if either frontier
732 // has moved
733 if PartialOrder::less_equal(&processed_desired_frontier, &desired_frontier)
734 || PartialOrder::less_equal(
735 &processed_descriptions_frontier,
736 &batch_descriptions_frontier,
737 )
738 {
739 trace!(
740 "persist_sink {collection_id}/{shard_id}: \
741 CAN emit: \
742 processed_desired_frontier: {:?}, \
743 processed_descriptions_frontier: {:?}, \
744 desired_frontier: {:?}, \
745 batch_descriptions_frontier: {:?}",
746 processed_desired_frontier,
747 processed_descriptions_frontier,
748 desired_frontier,
749 batch_descriptions_frontier,
750 );
751
752 trace!(
753 "persist_sink {collection_id}/{shard_id}: \
754 in-flight batches: {:?}, \
755 batch_descriptions_frontier: {:?}, \
756 desired_frontier: {:?}",
757 in_flight_batches, batch_descriptions_frontier, desired_frontier,
758 );
759
760 // We can write updates for a given batch description when
761 // a) the batch is not beyond `batch_descriptions_frontier`,
762 // and b) we know that we have seen all updates that would
763 // fall into the batch, from `desired_frontier`.
764 let ready_batches = in_flight_batches
765 .keys()
766 .filter(|(lower, upper)| {
767 !PartialOrder::less_equal(&batch_descriptions_frontier, lower)
768 && !PartialOrder::less_than(&desired_frontier, upper)
769 })
770 .cloned()
771 .collect::<Vec<_>>();
772
773 trace!(
774 "persist_sink {collection_id}/{shard_id}: \
775 ready batches: {:?}",
776 ready_batches,
777 );
778
779 for batch_description in ready_batches {
780 let cap = in_flight_batches.remove(&batch_description).unwrap();
781
782 if collection_id.is_user() {
783 trace!(
784 "persist_sink {collection_id}/{shard_id}: \
785 emitting done batch: {:?}, cap: {:?}",
786 batch_description, cap
787 );
788 }
789
790 let (batch_lower, batch_upper) = batch_description;
791
792 let finalized_timestamps: Vec<_> = stashed_batches
793 .keys()
794 .filter(|time| {
795 batch_lower.less_equal(time) && !batch_upper.less_equal(time)
796 })
797 .copied()
798 .collect();
799
800 let mut batch_tokens = vec![];
801 for ts in finalized_timestamps {
802 let batch_builder = stashed_batches.remove(&ts).unwrap();
803
804 if collection_id.is_user() {
805 trace!(
806 "persist_sink {collection_id}/{shard_id}: \
807 wrote batch from worker {}: ({:?}, {:?}),
808 containing {:?}",
809 worker_index, batch_lower, batch_upper, batch_builder.metrics
810 );
811 }
812
813 let batch = batch_builder
814 .finish(batch_lower.clone(), batch_upper.clone())
815 .await;
816
817 // The next "safe" lower for batches is the meet (max) of all the emitted
818 // batches. These uppers all are not beyond the `desired_frontier`, which
819 // means all updates received by this operator will be beyond this lower.
820 // Additionally, the `mint_batch_descriptions` operator ensures that
821 // later-received batch descriptions will start beyond these uppers as
822 // well.
823 //
824 // It is impossible to emit a batch description that is
825 // beyond a not-yet emitted description in `in_flight_batches`, as
826 // a that description would also have been chosen as ready above.
827 operator_batch_lower = operator_batch_lower.join(&batch_upper);
828 batch_tokens.push(batch);
829 }
830
831 output.give_container(&cap, &mut batch_tokens);
832
833 processed_desired_frontier.clone_from(&desired_frontier);
834 processed_descriptions_frontier.clone_from(&batch_descriptions_frontier);
835 }
836 } else {
837 trace!(
838 "persist_sink {collection_id}/{shard_id}: \
839 cannot emit: processed_desired_frontier: {:?}, \
840 processed_descriptions_frontier: {:?}, \
841 desired_frontier: {:?}",
842 processed_desired_frontier, processed_descriptions_frontier, desired_frontier
843 );
844 }
845 drop(permit);
846 }
847 });
848
849 if collection_id.is_user() {
850 output_stream.inspect(|d| trace!("batch: {:?}", d));
851 }
852
853 (output_stream, shutdown_button.press_on_drop())
854}
855
856/// Fuses written batches together and appends them to persist using one
857/// `compare_and_append` call. Writing only happens for batch descriptions where
858/// we know that no future batches will arrive, that is, for those batch
859/// descriptions that are not beyond the frontier of both the
860/// `batch_descriptions` and `batches` inputs.
861///
862/// This also keeps the shared frontier that is stored in `compute_state` in
863/// sync with the upper of the persist shard, and updates various metrics
864/// and statistics objects.
865fn append_batches<G>(
866 scope: &G,
867 collection_id: GlobalId,
868 operator_name: String,
869 target: &CollectionMetadata,
870 batch_descriptions: &Stream<G, (Antichain<mz_repr::Timestamp>, Antichain<mz_repr::Timestamp>)>,
871 batches: &Stream<G, HollowBatchAndMetadata<mz_repr::Timestamp>>,
872 persist_clients: Arc<PersistClientCache>,
873 storage_state: &StorageState,
874 metrics: SourcePersistSinkMetrics,
875 busy_signal: Arc<Semaphore>,
876) -> (
877 Stream<G, ()>,
878 Stream<G, Rc<anyhow::Error>>,
879 PressOnDropButton,
880)
881where
882 G: Scope<Timestamp = mz_repr::Timestamp>,
883{
884 let persist_location = target.persist_location.clone();
885 let shard_id = target.data_shard;
886 let target_relation_desc = target.relation_desc.clone();
887
888 // We can only be lenient with concurrent modifications when we know that
889 // this source pipeline is using the feedback upsert operator, which works
890 // correctly when multiple instances of an ingestion pipeline produce
891 // different updates, because of concurrency/non-determinism.
892 let use_continual_feedback_upsert = dyncfgs::STORAGE_USE_CONTINUAL_FEEDBACK_UPSERT
893 .get(storage_state.storage_configuration.config_set());
894 let bail_on_concurrent_modification = !use_continual_feedback_upsert;
895
896 let mut read_only_rx = storage_state.read_only_rx.clone();
897
898 let operator_name = format!("{} append_batches", operator_name);
899 let mut append_op = AsyncOperatorBuilder::new(operator_name, scope.clone());
900
901 let hashed_id = collection_id.hashed();
902 let active_worker = usize::cast_from(hashed_id) % scope.peers() == scope.index();
903 let worker_id = scope.index();
904
905 // Both of these inputs are disconnected from the output capabilities of this operator, as
906 // any output of this operator is entirely driven by the `compare_and_append`s. Currently
907 // this operator has no outputs, but they may be added in the future, when merging with
908 // the compute `persist_sink`.
909 let mut descriptions_input =
910 append_op.new_disconnected_input(batch_descriptions, Exchange::new(move |_| hashed_id));
911 let mut batches_input =
912 append_op.new_disconnected_input(batches, Exchange::new(move |_| hashed_id));
913
914 let current_upper = Rc::clone(&storage_state.source_uppers[&collection_id]);
915 if !active_worker {
916 // This worker is not writing, so make sure it's "taken out" of the
917 // calculation by advancing to the empty frontier.
918 current_upper.borrow_mut().clear();
919 }
920
921 let source_statistics = storage_state
922 .aggregated_statistics
923 .get_source(&collection_id)
924 .expect("statistics initialized")
925 .clone();
926
927 // An output whose frontier tracks the last successful compare and append of this operator
928 let (_upper_output, upper_stream) = append_op.new_output::<CapacityContainerBuilder<_>>();
929
930 // This operator accepts the batch descriptions and tokens that represent
931 // written batches. Written batches get appended to persist when we learn
932 // from our input frontiers that we have seen all batches for a given batch
933 // description.
934
935 let (shutdown_button, errors) = append_op.build_fallible(move |caps| Box::pin(async move {
936 let [upper_cap_set]: &mut [_; 1] = caps.try_into().unwrap();
937
938 // This may SEEM unnecessary, but metrics contains extra
939 // `DeleteOnDrop`-wrapped fields that will NOT be moved into this
940 // closure otherwise, dropping and destroying
941 // those metrics. This is because rust now only moves the
942 // explicitly-referenced fields into closures.
943 let metrics = metrics;
944
945 // Contains descriptions of batches for which we know that we can
946 // write data. We got these from the "centralized" operator that
947 // determines batch descriptions for all writers.
948 //
949 // `Antichain` does not implement `Ord`, so we cannot use a `BTreeSet`. We need to search
950 // through the set, so we cannot use the `mz_ore` wrapper either.
951 #[allow(clippy::disallowed_types)]
952 let mut in_flight_descriptions = std::collections::HashSet::<(
953 Antichain<mz_repr::Timestamp>,
954 Antichain<mz_repr::Timestamp>,
955 )>::new();
956
957 // In flight batches that haven't been `compare_and_append`'d yet, plus metrics about
958 // the batch.
959 let mut in_flight_batches = HashMap::<
960 (Antichain<mz_repr::Timestamp>, Antichain<mz_repr::Timestamp>),
961 BatchSet,
962 >::new();
963
964 source_statistics.initialize_rehydration_latency_ms();
965 if !active_worker {
966 // The non-active workers report that they are done snapshotting and hydrating.
967 let empty_frontier = Antichain::new();
968 source_statistics.initialize_snapshot_committed(&empty_frontier);
969 source_statistics.update_rehydration_latency_ms(&empty_frontier);
970 return Ok(());
971 }
972
973 let persist_client = persist_clients
974 .open(persist_location)
975 .await?;
976
977 let mut write = persist_client
978 .open_writer::<SourceData, (), mz_repr::Timestamp, StorageDiff>(
979 shard_id,
980 Arc::new(target_relation_desc),
981 Arc::new(UnitSchema),
982 Diagnostics {
983 shard_name:collection_id.to_string(),
984 handle_purpose: format!("persist_sink::append_batches {}", collection_id)
985 },
986 )
987 .await?;
988
989 // Initialize this sink's `upper` to the `upper` of the persist shard we are writing
990 // to. Data from the source not beyond this time will be dropped, as it has already
991 // been persisted.
992 // In the future, sources will avoid passing through data not beyond this upper
993 // VERY IMPORTANT: Only the active write worker must change the
994 // shared upper. All other workers have already cleared this
995 // upper above.
996 current_upper.borrow_mut().clone_from(write.upper());
997 upper_cap_set.downgrade(current_upper.borrow().iter());
998 source_statistics.initialize_snapshot_committed(write.upper());
999
1000 // The current input frontiers.
1001 let mut batch_description_frontier = Antichain::from_elem(Timestamp::minimum());
1002 let mut batches_frontier = Antichain::from_elem(Timestamp::minimum());
1003
1004 loop {
1005 tokio::select! {
1006 Some(event) = descriptions_input.next() => {
1007 match event {
1008 Event::Data(_cap, data) => {
1009 // Ingest new batch descriptions.
1010 for batch_description in data {
1011 if collection_id.is_user() {
1012 trace!(
1013 "persist_sink {collection_id}/{shard_id}: \
1014 append_batches: sink {}, \
1015 new description: {:?}, \
1016 batch_description_frontier: {:?}",
1017 collection_id,
1018 batch_description,
1019 batch_description_frontier
1020 );
1021 }
1022
1023 // This line has to be broken up, or
1024 // rustfmt fails in the whole function :(
1025 let is_new = in_flight_descriptions.insert(
1026 batch_description.clone()
1027 );
1028
1029 assert!(
1030 is_new,
1031 "append_batches: sink {} got more than one batch \
1032 for a given description in-flight: {:?}",
1033 collection_id, in_flight_batches
1034 );
1035 }
1036
1037 continue;
1038 }
1039 Event::Progress(frontier) => {
1040 batch_description_frontier = frontier;
1041 }
1042 }
1043 }
1044 Some(event) = batches_input.next() => {
1045 match event {
1046 Event::Data(_cap, data) => {
1047 for batch in data {
1048 let batch_description = (batch.lower.clone(), batch.upper.clone());
1049
1050 let batches = in_flight_batches
1051 .entry(batch_description)
1052 .or_default();
1053
1054 batches.finished.push(FinishedBatch {
1055 batch: write.batch_from_transmittable_batch(batch.batch),
1056 data_ts: batch.data_ts,
1057 });
1058 batches.batch_metrics += &batch.metrics;
1059 }
1060 continue;
1061 }
1062 Event::Progress(frontier) => {
1063 batches_frontier = frontier;
1064 }
1065 }
1066 }
1067 else => {
1068 // All inputs are exhausted, so we can shut down.
1069 return Ok(());
1070 }
1071 };
1072
1073 // Peel off any batches that are not beyond the frontier
1074 // anymore.
1075 //
1076 // It is correct to consider batches that are not beyond the
1077 // `batches_frontier` because it is held back by the writer
1078 // operator as long as a) the `batch_description_frontier` did
1079 // not advance and b) as long as the `desired_frontier` has not
1080 // advanced to the `upper` of a given batch description.
1081
1082 let mut done_batches = in_flight_descriptions
1083 .iter()
1084 .filter(|(lower, _upper)| !PartialOrder::less_equal(&batches_frontier, lower))
1085 .cloned()
1086 .collect::<Vec<_>>();
1087
1088 trace!(
1089 "persist_sink {collection_id}/{shard_id}: \
1090 append_batches: in_flight: {:?}, \
1091 done: {:?}, \
1092 batch_frontier: {:?}, \
1093 batch_description_frontier: {:?}",
1094 in_flight_descriptions,
1095 done_batches,
1096 batches_frontier,
1097 batch_description_frontier
1098 );
1099
1100 // Append batches in order, to ensure that their `lower` and
1101 // `upper` line up.
1102 done_batches.sort_by(|a, b| {
1103 if PartialOrder::less_than(a, b) {
1104 Ordering::Less
1105 } else if PartialOrder::less_than(b, a) {
1106 Ordering::Greater
1107 } else {
1108 Ordering::Equal
1109 }
1110 });
1111
1112 // Reverse, as we'll pop batches off the end of the queue.
1113 done_batches.reverse();
1114
1115 while let Some(done_batch_metadata) = done_batches.pop() {
1116 in_flight_descriptions.remove(&done_batch_metadata);
1117
1118 let batch_set = in_flight_batches
1119 .remove(&done_batch_metadata)
1120 .unwrap_or_default();
1121
1122 let mut batches = batch_set.finished;
1123
1124 trace!(
1125 "persist_sink {collection_id}/{shard_id}: \
1126 done batch: {:?}, {:?}",
1127 done_batch_metadata,
1128 batches
1129 );
1130
1131 let (batch_lower, batch_upper) = done_batch_metadata;
1132
1133 let batch_metrics = batch_set.batch_metrics;
1134
1135 let mut to_append = batches.iter_mut().map(|b| &mut b.batch).collect::<Vec<_>>();
1136
1137 let result = {
1138 let maybe_err = if *read_only_rx.borrow() {
1139
1140 // We have to wait for either us coming out of read-only
1141 // mode or someone else applying a write that covers our
1142 // batch.
1143 //
1144 // If we didn't wait for the latter here, and just go
1145 // around the loop again, we might miss a moment where
1146 // _we_ have to write down a batch. For example when our
1147 // input frontier advances to a state where we can
1148 // write, and the read-write instance sees the same
1149 // update but then crashes before it can append a batch.
1150
1151 let maybe_err = loop {
1152 if collection_id.is_user() {
1153 tracing::debug!(
1154 %worker_id,
1155 %collection_id,
1156 %shard_id,
1157 ?batch_lower,
1158 ?batch_upper,
1159 ?current_upper,
1160 "persist_sink is in read-only mode, waiting until we come out of it or the shard upper advances"
1161 );
1162 }
1163
1164 // We don't try to be smart here, and for example
1165 // use `wait_for_upper_past()`. We'd have to use a
1166 // select!, which would require cancel safety of
1167 // `wait_for_upper_past()`, which it doesn't
1168 // advertise.
1169 let _ = tokio::time::timeout(Duration::from_secs(1), read_only_rx.changed()).await;
1170
1171 if !*read_only_rx.borrow() {
1172 if collection_id.is_user() {
1173 tracing::debug!(
1174 %worker_id,
1175 %collection_id,
1176 %shard_id,
1177 ?batch_lower,
1178 ?batch_upper,
1179 ?current_upper,
1180 "persist_sink has come out of read-only mode"
1181 );
1182 }
1183
1184 // It's okay to write now.
1185 break Ok(());
1186 }
1187
1188 let current_upper = write.fetch_recent_upper().await;
1189
1190 if PartialOrder::less_than(&batch_upper, current_upper) {
1191 // We synthesize an `UpperMismatch` so that we can go
1192 // through the same logic below for trimming down our
1193 // batches.
1194 //
1195 // Notably, we are not trying to be smart, and teach the
1196 // write operator about read-only mode. Writing down
1197 // those batches does not append anything to the persist
1198 // shard, and it would be a hassle to figure out in the
1199 // write workers how to trim down batches in read-only
1200 // mode, when the shard upper advances.
1201 //
1202 // Right here, in the logic below, we have all we need
1203 // for figuring out how to trim our batches.
1204
1205 if collection_id.is_user() {
1206 tracing::debug!(
1207 %worker_id,
1208 %collection_id,
1209 %shard_id,
1210 ?batch_lower,
1211 ?batch_upper,
1212 ?current_upper,
1213 "persist_sink not appending in read-only mode"
1214 );
1215 }
1216
1217 break Err(UpperMismatch {
1218 current: current_upper.clone(),
1219 expected: batch_lower.clone()}
1220 );
1221 }
1222 };
1223
1224 maybe_err
1225 } else {
1226 // It's okay to proceed with the write.
1227 Ok(())
1228 };
1229
1230 match maybe_err {
1231 Ok(()) => {
1232 let _permit = busy_signal.acquire().await;
1233
1234 write.compare_and_append_batch(
1235 &mut to_append[..],
1236 batch_lower.clone(),
1237 batch_upper.clone(),
1238 )
1239 .await
1240 .expect("Invalid usage")
1241 },
1242 Err(e) => {
1243 // We forward the synthesize error message, so that
1244 // we go though the batch cleanup logic below.
1245 Err(e)
1246 }
1247 }
1248 };
1249
1250
1251 // These metrics are independent of whether it was _us_ or
1252 // _someone_ that managed to commit a batch that advanced the
1253 // upper.
1254 source_statistics.update_snapshot_committed(&batch_upper);
1255 source_statistics.update_rehydration_latency_ms(&batch_upper);
1256 metrics
1257 .progress
1258 .set(mz_persist_client::metrics::encode_ts_metric(&batch_upper));
1259
1260 if collection_id.is_user() {
1261 trace!(
1262 "persist_sink {collection_id}/{shard_id}: \
1263 append result for batch ({:?} -> {:?}): {:?}",
1264 batch_lower,
1265 batch_upper,
1266 result
1267 );
1268 }
1269
1270 match result {
1271 Ok(()) => {
1272 // Only update these metrics when we know that _we_ were
1273 // successful.
1274 source_statistics
1275 .inc_updates_committed_by(batch_metrics.inserts + batch_metrics.retractions);
1276 metrics.processed_batches.inc();
1277 metrics.row_inserts.inc_by(batch_metrics.inserts);
1278 metrics.row_retractions.inc_by(batch_metrics.retractions);
1279 metrics.error_inserts.inc_by(batch_metrics.error_inserts);
1280 metrics
1281 .error_retractions
1282 .inc_by(batch_metrics.error_retractions);
1283
1284 current_upper.borrow_mut().clone_from(&batch_upper);
1285 upper_cap_set.downgrade(current_upper.borrow().iter());
1286 }
1287 Err(mismatch) => {
1288 // We tried to to a non-contiguous append, that won't work.
1289 if PartialOrder::less_than(&mismatch.current, &batch_lower) {
1290 // Best-effort attempt to delete unneeded batches.
1291 future::join_all(batches.into_iter().map(|b| b.batch.delete())).await;
1292
1293 // We always bail when this happens, regardless of
1294 // `bail_on_concurrent_modification`.
1295 tracing::warn!(
1296 "persist_sink({}): invalid upper! \
1297 Tried to append batch ({:?} -> {:?}) but upper \
1298 is {:?}. This is surpising and likely indicates \
1299 a bug in the persist sink, but we'll restart the \
1300 dataflow and try again.",
1301 collection_id, batch_lower, batch_upper, mismatch.current,
1302 );
1303 anyhow::bail!("collection concurrently modified. Ingestion dataflow will be restarted");
1304 } else if PartialOrder::less_than(&mismatch.current, &batch_upper) {
1305 // The shard's upper was ahead of our batch's lower
1306 // but not ahead of our upper. Cut down the
1307 // description by advancing its lower to the current
1308 // shard upper and try again. IMPORTANT: We can only
1309 // advance the lower, meaning we cut updates away,
1310 // we must not "extend" the batch by changing to a
1311 // lower that is not beyond the current lower. This
1312 // invariant is checked by the first if branch: if
1313 // `!(current_upper < lower)` then it holds that
1314 // `lower <= current_upper`.
1315
1316 // First, construct a new batch description with the
1317 // lower advanced to the current shard upper.
1318 let new_batch_lower = mismatch.current.clone();
1319 let new_done_batch_metadata = (new_batch_lower.clone(), batch_upper.clone());
1320
1321 // Re-add the new batch to the list of batches to
1322 // process.
1323 done_batches.push(new_done_batch_metadata.clone());
1324
1325 // Retain any batches that are still in advance of
1326 // the new lower, and delete any batches that are
1327 // not.
1328 //
1329 // Temporary measure: this bookkeeping is made
1330 // possible by the fact that each batch only
1331 // contains data at a single timestamp, even though
1332 // it might declare a larger lower or upper. In the
1333 // future, we'll want to use persist's `append` API
1334 // and let persist handle the truncation internally.
1335 let new_batch_set = in_flight_batches.entry(new_done_batch_metadata).or_default();
1336 let mut batch_delete_futures = vec![];
1337 for batch in batches {
1338 if new_batch_lower.less_equal(&batch.data_ts) {
1339 new_batch_set.finished.push(batch);
1340 } else {
1341 batch_delete_futures.push(batch.batch.delete());
1342 }
1343 }
1344
1345 // Best-effort attempt to delete unneeded batches.
1346 future::join_all(batch_delete_futures).await;
1347 } else {
1348 // Best-effort attempt to delete unneeded batches.
1349 future::join_all(batches.into_iter().map(|b| b.batch.delete())).await;
1350 }
1351
1352 if bail_on_concurrent_modification {
1353 tracing::warn!(
1354 "persist_sink({}): invalid upper! \
1355 Tried to append batch ({:?} -> {:?}) but upper \
1356 is {:?}. This is not a problem, it just means \
1357 someone else was faster than us. We will try \
1358 again with a new batch description.",
1359 collection_id, batch_lower, batch_upper, mismatch.current,
1360 );
1361 anyhow::bail!("collection concurrently modified. Ingestion dataflow will be restarted");
1362 }
1363 }
1364 }
1365 }
1366 }
1367 }));
1368
1369 (upper_stream, errors, shutdown_button.press_on_drop())
1370}