Skip to main content

mz_storage/
upsert.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::cell::RefCell;
11use std::cmp::Reverse;
12use std::convert::AsRef;
13use std::fmt::Debug;
14use std::hash::{Hash, Hasher};
15use std::path::PathBuf;
16use std::sync::Arc;
17
18use differential_dataflow::hashable::Hashable;
19use differential_dataflow::{AsCollection, VecCollection};
20use futures::StreamExt;
21use futures::future::FutureExt;
22use indexmap::map::Entry;
23use itertools::Itertools;
24use mz_ore::error::ErrorExt;
25use mz_repr::{Datum, DatumVec, Diff, GlobalId, Row};
26use mz_rocksdb::ValueIterator;
27use mz_sql_server_util::cdc::Lsn;
28use mz_storage_operators::metrics::BackpressureMetrics;
29use mz_storage_types::configuration::StorageConfiguration;
30use mz_storage_types::dyncfgs;
31use mz_storage_types::errors::{DataflowError, EnvelopeError, UpsertError};
32use mz_storage_types::sources::MzOffset;
33use mz_storage_types::sources::envelope::UpsertEnvelope;
34use mz_storage_types::sources::kafka::{KafkaTimestamp, RangeBound};
35use mz_storage_types::sources::mysql::GtidPartition;
36use mz_timely_util::builder_async::{
37    AsyncOutputHandle, Event as AsyncEvent, OperatorBuilder as AsyncOperatorBuilder,
38    PressOnDropButton,
39};
40use serde::{Deserialize, Serialize};
41use sha2::{Digest, Sha256};
42use timely::dataflow::channels::pact::Exchange;
43use timely::dataflow::operators::{Capability, InputCapability, Operator};
44use timely::dataflow::{Scope, StreamVec};
45use timely::order::{PartialOrder, TotalOrder};
46use timely::progress::timestamp::Refines;
47use timely::progress::{Antichain, Timestamp};
48
49use crate::healthcheck::HealthStatusUpdate;
50use crate::metrics::upsert::UpsertMetrics;
51use crate::storage_state::StorageInstanceContext;
52use crate::{upsert_continual_feedback, upsert_continual_feedback_v2};
53use types::{
54    BincodeOpts, StateValue, UpsertState, UpsertStateBackend, consolidating_merge_function,
55    upsert_bincode_opts,
56};
57
58#[cfg(test)]
59pub mod memory;
60pub(crate) mod rocksdb;
61// TODO(aljoscha): Move next to upsert module, rename to upsert_types.
62pub(crate) mod types;
63
64pub type UpsertValue = Result<Row, Box<UpsertError>>;
65
66#[derive(
67    Copy,
68    Clone,
69    Hash,
70    PartialEq,
71    Eq,
72    PartialOrd,
73    Ord,
74    Serialize,
75    Deserialize,
76    bytemuck::AnyBitPattern,
77    bytemuck::NoUninit
78)]
79#[repr(transparent)]
80pub struct UpsertKey([u8; 32]);
81
82impl columnation::Columnation for UpsertKey {
83    type InnerRegion = columnation::CopyRegion<UpsertKey>;
84}
85
86/// Columnar (the `columnar` crate, distinct from `columnation`) support for
87/// `UpsertKey`, so the upsert-v2 source stash can use a paged columnar merge
88/// batcher keyed natively by `UpsertKey` (no `Row` packing). `UpsertKey` is a
89/// POD `[u8; 32]` newtype, so the container is a fixed-stride byte column.
90///
91/// This is hand-rolled rather than `#[derive(Columnar)]`d for one load-bearing
92/// reason: the reference type must be `&UpsertKey`. `&UpsertKey` is `Copy + Ord`
93/// (the lexicographic `[u8; 32]` order the persist-feedback trace is keyed on),
94/// which both satisfies the merge batcher's `Ref: Copy + Ord` requirement and —
95/// crucially — matches the read item of the feedback arrangement's
96/// `ColumnationStack<UpsertKey>` key container, so the paged `ValRow` builder
97/// can reconcile the `Column` input against the spine (its `BuilderInput` bound
98/// is `ReadItem: PartialEq<Ref<UpsertKey>>`). A derived impl would yield a
99/// generated `UpsertKeyReference` (and route `[u8; 32]` through the generic,
100/// non-fixed-stride array container), breaking that reconciliation.
101mod columnar_upsert_key {
102    use super::UpsertKey;
103    use columnar::Columnar;
104    use mz_ore::cast::CastFrom;
105    use std::ops::Range;
106
107    /// A newtype wrapper for a vector of `UpsertKey` values.
108    #[derive(Clone, Copy, Default, Debug)]
109    pub struct UpsertKeys<T>(T);
110    impl<D, T: columnar::Push<D>> columnar::Push<D> for UpsertKeys<T> {
111        #[inline(always)]
112        fn push(&mut self, item: D) {
113            self.0.push(item)
114        }
115    }
116    impl<T: columnar::Clear> columnar::Clear for UpsertKeys<T> {
117        #[inline(always)]
118        fn clear(&mut self) {
119            self.0.clear()
120        }
121    }
122    impl<T: columnar::Len> columnar::Len for UpsertKeys<T> {
123        #[inline(always)]
124        fn len(&self) -> usize {
125            self.0.len()
126        }
127    }
128    impl<'a> columnar::Index for UpsertKeys<&'a [UpsertKey]> {
129        type Ref = &'a UpsertKey;
130
131        #[inline(always)]
132        fn get(&self, index: usize) -> Self::Ref {
133            &self.0[index]
134        }
135    }
136
137    impl Columnar for UpsertKey {
138        #[inline(always)]
139        fn into_owned<'a>(other: columnar::Ref<'a, Self>) -> Self {
140            *other
141        }
142        type Container = UpsertKeys<Vec<UpsertKey>>;
143        #[inline(always)]
144        fn reborrow<'b, 'a: 'b>(thing: columnar::Ref<'a, Self>) -> columnar::Ref<'b, Self>
145        where
146            Self: 'a,
147        {
148            thing
149        }
150    }
151
152    impl columnar::Borrow for UpsertKeys<Vec<UpsertKey>> {
153        type Ref<'a> = &'a UpsertKey;
154        type Borrowed<'a>
155            = UpsertKeys<&'a [UpsertKey]>
156        where
157            Self: 'a;
158        #[inline(always)]
159        fn borrow<'a>(&'a self) -> Self::Borrowed<'a> {
160            UpsertKeys(self.0.as_slice())
161        }
162        #[inline(always)]
163        fn reborrow<'b, 'a: 'b>(item: Self::Borrowed<'a>) -> Self::Borrowed<'b>
164        where
165            Self: 'a,
166        {
167            UpsertKeys(item.0)
168        }
169        #[inline(always)]
170        fn reborrow_ref<'b, 'a: 'b>(item: Self::Ref<'a>) -> Self::Ref<'b>
171        where
172            Self: 'a,
173        {
174            item
175        }
176    }
177
178    impl columnar::Container for UpsertKeys<Vec<UpsertKey>> {
179        #[inline(always)]
180        fn extend_from_self(&mut self, other: Self::Borrowed<'_>, range: Range<usize>) {
181            self.0.extend_from_self(other.0, range)
182        }
183        #[inline(always)]
184        fn reserve_for<'a, I>(&mut self, selves: I)
185        where
186            Self: 'a,
187            I: Iterator<Item = Self::Borrowed<'a>> + Clone,
188        {
189            self.0.reserve_for(selves.map(|s| s.0));
190        }
191    }
192
193    impl<'a> columnar::AsBytes<'a> for UpsertKeys<&'a [UpsertKey]> {
194        const SLICE_COUNT: usize = 1;
195        #[inline(always)]
196        fn get_byte_slice(&self, index: usize) -> (u64, &'a [u8]) {
197            debug_assert!(index < Self::SLICE_COUNT);
198            (
199                u64::cast_from(align_of::<UpsertKey>()),
200                bytemuck::cast_slice(self.0),
201            )
202        }
203        #[inline(always)]
204        fn as_bytes(&self) -> impl Iterator<Item = (u64, &'a [u8])> {
205            std::iter::once((
206                u64::cast_from(align_of::<UpsertKey>()),
207                bytemuck::cast_slice(self.0),
208            ))
209        }
210    }
211    impl<'a> columnar::FromBytes<'a> for UpsertKeys<&'a [UpsertKey]> {
212        const SLICE_COUNT: usize = 1;
213        #[inline(always)]
214        fn from_bytes(bytes: &mut impl Iterator<Item = &'a [u8]>) -> Self {
215            UpsertKeys(bytemuck::cast_slice(
216                bytes.next().expect("Iterator exhausted prematurely"),
217            ))
218        }
219    }
220}
221
222/// Projects a source's native `FromTime` to a columnar, totally-ordered key
223/// used by the upsert source stash to keep the latest update per `(key, time)`.
224///
225/// The upsert stash is a paged columnar merge batcher; its diff carries this
226/// projection rather than the raw `FromTime`, so the only columnar type the
227/// stash needs is `Order` — never the (possibly structurally complex) source
228/// timestamp. This is what keeps the columnar requirement off the generic
229/// source-render path: that path only ever needs `FromTime: UpsertSourceTime`.
230/// Only the relative order matters; the value is never read back.
231///
232/// The upsert envelope is rendered for Kafka and the KEY VALUE load generator,
233/// so those source times (`KafkaTimestamp`, `MzOffset`) project to a real order
234/// key. The remaining source times implement the trait only for coherence on
235/// the generic render path — their sources never render upsert — so their
236/// projection is a panicking guard rather than a real key.
237pub trait UpsertSourceTime
238where
239    for<'a> columnar::Ref<'a, Self::Order>: Ord,
240{
241    /// Columnar order key. Must order consistently with the source time it is
242    /// projected from.
243    type Order: columnar::Columnar + Clone + Default + Ord + Send + Sync + 'static;
244    /// Project the source time onto its order key.
245    fn upsert_order(&self) -> Self::Order;
246}
247
248impl UpsertSourceTime for KafkaTimestamp {
249    /// Per-record Kafka source times are exact singletons (a single partition
250    /// at a single offset; see the source reader), and `KafkaTimestamp`'s
251    /// derived `Ord` is lexicographic on `(partition, offset)`, so this flat
252    /// projection is order-preserving. `RangeBound`'s infinities map to the
253    /// `i64` extrema to remain order-consistent for any non-singleton bound.
254    type Order = (i64, u64);
255    fn upsert_order(&self) -> (i64, u64) {
256        let partition = match self.interval().lower {
257            RangeBound::NegInfinity => i64::MIN,
258            RangeBound::Elem(p, _) => i64::from(p),
259            RangeBound::PosInfinity => i64::MAX,
260        };
261        (partition, self.timestamp().offset)
262    }
263}
264
265/// Load-generator (and Postgres) source time. The KEY VALUE load generator is
266/// the one non-Kafka source that renders the upsert envelope (see
267/// `apply_source_envelope_encoding` in the planner), so this projects to the
268/// record offset: offsets increase with each update, so "max order wins" is
269/// exactly "latest update wins" for dedup.
270impl UpsertSourceTime for MzOffset {
271    type Order = u64;
272    fn upsert_order(&self) -> u64 {
273        self.offset
274    }
275}
276
277/// Source times whose sources never render the upsert envelope (MySQL and SQL
278/// Server CDC). `Order = ()` keeps the generic render path free of any columnar
279/// requirement on the source time. The projection panics rather than returning:
280/// `()` would collapse every `from_time` to equal, silently breaking "latest
281/// offset wins" dedup, so if such a source ever reaches the upsert path we want
282/// a loud failure, not arbitrary per-key output.
283macro_rules! upsert_source_time_unit {
284    ($($ty:ty),+ $(,)?) => {$(
285        impl UpsertSourceTime for $ty {
286            type Order = ();
287            fn upsert_order(&self) {
288                unreachable!(
289                    "upsert source stash is not rendered for this source, but \
290                     {} reached the projection",
291                    std::any::type_name::<Self>(),
292                )
293            }
294        }
295    )+};
296}
297upsert_source_time_unit!(GtidPartition, Lsn);
298
299/// Pager for the upsert-v2 source stash.
300///
301/// This draws from the same process-wide [`TieredPolicy`] budget pool as the
302/// compute column-paged batcher — there is one budget and one underlying
303/// `mz_ore::pager` — but whether the stash *uses* it is gated by storage's own
304/// `enable_upsert_paged_spill` flag, independently of compute's
305/// `enable_column_paged_batcher_spill`. The shared pool's budget / backend /
306/// codec are configured by compute's `apply_tiered_config` (storage and compute
307/// run in the same `clusterd` process).
308///
309/// [`TieredPolicy`]: mz_timely_util::column_pager::policy::TieredPolicy
310pub mod upsert_stash_pager {
311    use std::sync::{LazyLock, RwLock};
312
313    use mz_timely_util::column_pager::{ColumnPager, shared_pager};
314
315    /// Active pager handed to upsert source-stash batchers. Defaults to
316    /// disabled (every chunk resident) until [`set_enabled`] turns it on.
317    static PAGER: LazyLock<RwLock<ColumnPager>> =
318        LazyLock::new(|| RwLock::new(ColumnPager::disabled()));
319
320    /// Enable or disable the stash's use of the shared column pager. When
321    /// enabled, the stash spills through the shared budget pool; when disabled
322    /// it keeps every chunk resident.
323    pub fn set_enabled(enabled: bool) {
324        *PAGER.write().expect("upsert stash pager poisoned") = shared_pager(enabled);
325    }
326
327    /// The current upsert-stash pager. Cheap: clones the inner `Arc`.
328    pub fn pager() -> ColumnPager {
329        PAGER.read().expect("upsert stash pager poisoned").clone()
330    }
331}
332
333impl Debug for UpsertKey {
334    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
335        write!(f, "0x")?;
336        for byte in self.0 {
337            write!(f, "{:02x}", byte)?;
338        }
339        Ok(())
340    }
341}
342
343impl AsRef<[u8]> for UpsertKey {
344    #[inline(always)]
345    // Note we do 1 `multi_get` and 1 `multi_put` while processing a _batch of updates_. Within the
346    // batch, we effectively consolidate each key, before persisting that consolidated value.
347    // Easy!!
348    fn as_ref(&self) -> &[u8] {
349        &self.0
350    }
351}
352
353impl From<&[u8]> for UpsertKey {
354    fn from(bytes: &[u8]) -> Self {
355        UpsertKey(bytes.try_into().expect("invalid key length"))
356    }
357}
358
359/// The hash function used to map upsert keys. It is important that this hash is a cryptographic
360/// hash so that there is no risk of collisions. Collisions on SHA256 have a probability of 2^128
361/// which is many orders of magnitude smaller than many other events that we don't even think about
362/// (e.g bit flips). In short, we can safely assume that sha256(a) == sha256(b) iff a == b.
363type KeyHash = Sha256;
364
365impl UpsertKey {
366    pub fn from_key(key: Result<&Row, &UpsertError>) -> Self {
367        Self::from_iter(key.map(|r| r.iter()))
368    }
369
370    pub fn from_value(value: Result<&Row, &UpsertError>, key_indices: &[usize]) -> Self {
371        thread_local! {
372            /// A thread-local datum cache used to calculate hashes
373            static VALUE_DATUMS: RefCell<DatumVec> = RefCell::new(DatumVec::new());
374        }
375        VALUE_DATUMS.with(|value_datums| {
376            let mut value_datums = value_datums.borrow_mut();
377            let value = value.map(|v| value_datums.borrow_with(v));
378            let key = match value {
379                Ok(ref datums) => Ok(key_indices.iter().map(|&idx| datums[idx])),
380                Err(err) => Err(err),
381            };
382            Self::from_iter(key)
383        })
384    }
385
386    pub fn from_iter<'a, 'b>(
387        key: Result<impl Iterator<Item = Datum<'a>> + 'b, &UpsertError>,
388    ) -> Self {
389        thread_local! {
390            /// A thread-local datum cache used to calculate hashes
391            static KEY_DATUMS: RefCell<DatumVec> = RefCell::new(DatumVec::new());
392        }
393        KEY_DATUMS.with(|key_datums| {
394            let mut key_datums = key_datums.borrow_mut();
395            // Borrowing the DatumVec gives us a temporary buffer to store datums in that will be
396            // automatically cleared on Drop. See the DatumVec docs for more details.
397            let mut key_datums = key_datums.borrow();
398            let key: Result<&[Datum], Datum> = match key {
399                Ok(key) => {
400                    for datum in key {
401                        key_datums.push(datum);
402                    }
403                    Ok(&*key_datums)
404                }
405                Err(UpsertError::Value(err)) => {
406                    key_datums.extend(err.for_key.iter());
407                    Ok(&*key_datums)
408                }
409                Err(UpsertError::KeyDecode(err)) => Err(Datum::Bytes(&err.raw)),
410                Err(UpsertError::NullKey(_)) => Err(Datum::Null),
411            };
412            let mut hasher = DigestHasher(KeyHash::new());
413            key.hash(&mut hasher);
414            Self(hasher.0.finalize().into())
415        })
416    }
417}
418
419struct DigestHasher<H: Digest>(H);
420
421impl<H: Digest> Hasher for DigestHasher<H> {
422    fn write(&mut self, bytes: &[u8]) {
423        self.0.update(bytes);
424    }
425
426    fn finish(&self) -> u64 {
427        panic!("digest wrapper used to produce a hash");
428    }
429}
430
431use std::convert::Infallible;
432use timely::container::CapacityContainerBuilder;
433use timely::dataflow::channels::pact::Pipeline;
434
435use self::types::ValueMetadata;
436
437/// This leaf operator drops `token` after the input reaches the `resume_upper`.
438/// This is useful to take coordinated actions across all workers, after the `upsert`
439/// operator has rehydrated.
440pub fn rehydration_finished<'scope, T: Timestamp>(
441    scope: Scope<'scope, T>,
442    source_config: &crate::source::RawSourceCreationConfig,
443    // A token that we can drop to signal we are finished rehydrating.
444    token: impl std::any::Any + 'static,
445    resume_upper: Antichain<T>,
446    input: StreamVec<'scope, T, Infallible>,
447) {
448    let worker_id = source_config.worker_id;
449    let id = source_config.id;
450    let mut builder = AsyncOperatorBuilder::new(format!("rehydration_finished({id}"), scope);
451    let mut input = builder.new_disconnected_input(input, Pipeline);
452
453    builder.build(move |_capabilities| async move {
454        let mut input_upper = Antichain::from_elem(Timestamp::minimum());
455        // Ensure this operator finishes if the resume upper is `[0]`
456        while !PartialOrder::less_equal(&resume_upper, &input_upper) {
457            let Some(event) = input.next().await else {
458                break;
459            };
460            if let AsyncEvent::Progress(upper) = event {
461                input_upper = upper;
462            }
463        }
464        tracing::info!(
465            %worker_id,
466            source_id = %id,
467            "upsert source has downgraded past the resume upper ({resume_upper:?}) across all workers",
468        );
469        drop(token);
470    });
471}
472
473/// Resumes an upsert computation at `resume_upper` given as inputs a collection of upsert commands
474/// and the collection of the previous output of this operator.
475/// Returns a tuple of
476/// - A collection of the computed upsert operator and,
477/// - A health update stream to propagate errors
478pub(crate) fn upsert<'scope, T, FromTime>(
479    input: VecCollection<'scope, T, (UpsertKey, Option<UpsertValue>, FromTime), Diff>,
480    upsert_envelope: UpsertEnvelope,
481    resume_upper: Antichain<T>,
482    previous: VecCollection<'scope, T, Result<Row, DataflowError>, Diff>,
483    previous_token: Option<Vec<PressOnDropButton>>,
484    source_config: crate::source::SourceExportCreationConfig,
485    instance_context: &StorageInstanceContext,
486    storage_configuration: &StorageConfiguration,
487    dataflow_paramters: &crate::internal_control::DataflowParameters,
488    backpressure_metrics: Option<BackpressureMetrics>,
489) -> (
490    VecCollection<'scope, T, Result<Row, DataflowError>, Diff>,
491    StreamVec<'scope, T, (Option<GlobalId>, HealthStatusUpdate)>,
492    StreamVec<'scope, T, Infallible>,
493    PressOnDropButton,
494)
495where
496    T: Timestamp + TotalOrder + Sync,
497    T: Refines<mz_repr::Timestamp> + TotalOrder + Sync,
498    FromTime: Timestamp + Clone + Sync,
499{
500    let upsert_metrics = source_config.metrics.get_upsert_metrics(
501        source_config.id,
502        source_config.worker_id,
503        backpressure_metrics,
504    );
505
506    let rocksdb_cleanup_tries =
507        dyncfgs::STORAGE_ROCKSDB_CLEANUP_TRIES.get(storage_configuration.config_set());
508
509    // Whether or not to partially drain the input buffer
510    // to prevent buffering of the _upstream_ snapshot.
511    let prevent_snapshot_buffering =
512        dyncfgs::STORAGE_UPSERT_PREVENT_SNAPSHOT_BUFFERING.get(storage_configuration.config_set());
513    // If the above is true, the number of timely batches to process at once.
514    let snapshot_buffering_max = dyncfgs::STORAGE_UPSERT_MAX_SNAPSHOT_BATCH_BUFFERING
515        .get(storage_configuration.config_set());
516
517    // Whether we should provide the upsert state merge operator to the RocksDB instance
518    // (for faster performance during snapshot hydration).
519    let rocksdb_use_native_merge_operator =
520        dyncfgs::STORAGE_ROCKSDB_USE_MERGE_OPERATOR.get(storage_configuration.config_set());
521
522    let upsert_config = UpsertConfig {
523        shrink_upsert_unused_buffers_by_ratio: storage_configuration
524            .parameters
525            .shrink_upsert_unused_buffers_by_ratio,
526    };
527
528    let thin_input = upsert_thinning(input);
529
530    let tuning = dataflow_paramters.upsert_rocksdb_tuning_config.clone();
531
532    // When running RocksDB in memory, the file system is emulated. However, we still need to
533    // pick a path that exists because RocksDB will attempt to create the working directory
534    // (see https://github.com/rust-rocksdb/rust-rocksdb/issues/1015) and write a lock file,
535    // so we need to ensure the directory is unique per worker.
536    let rocksdb_dir = instance_context
537        .scratch_directory
538        .clone()
539        .unwrap_or_else(|| PathBuf::from("/tmp"))
540        .join("storage")
541        .join("upsert")
542        .join(source_config.id.to_string())
543        .join(source_config.worker_id.to_string());
544
545    tracing::info!(
546        worker_id = %source_config.worker_id,
547        source_id = %source_config.id,
548        ?rocksdb_dir,
549        ?tuning,
550        ?rocksdb_use_native_merge_operator,
551        "rendering upsert source"
552    );
553
554    let rocksdb_shared_metrics = Arc::clone(&upsert_metrics.rocksdb_shared);
555    let rocksdb_instance_metrics = Arc::clone(&upsert_metrics.rocksdb_instance_metrics);
556
557    let env = instance_context.rocksdb_env.clone();
558
559    // A closure that will initialize and return a configured RocksDB instance
560    let rocksdb_init_fn = move || async move {
561        let merge_operator = if rocksdb_use_native_merge_operator {
562            Some((
563                "upsert_state_snapshot_merge_v1".to_string(),
564                |a: &[u8], b: ValueIterator<BincodeOpts, StateValue<T, FromTime>>| {
565                    consolidating_merge_function::<T, FromTime>(a.into(), b)
566                },
567            ))
568        } else {
569            None
570        };
571        rocksdb::RocksDB::new(
572            mz_rocksdb::RocksDBInstance::new(
573                &rocksdb_dir,
574                mz_rocksdb::InstanceOptions::new(
575                    env,
576                    rocksdb_cleanup_tries,
577                    merge_operator,
578                    // For now, just use the same config as the one used for
579                    // merging snapshots.
580                    upsert_bincode_opts(),
581                ),
582                tuning,
583                rocksdb_shared_metrics,
584                rocksdb_instance_metrics,
585            )
586            .unwrap(),
587        )
588    };
589
590    upsert_operator(
591        thin_input,
592        upsert_envelope.key_indices,
593        resume_upper,
594        previous,
595        previous_token,
596        upsert_metrics,
597        source_config,
598        rocksdb_init_fn,
599        upsert_config,
600        storage_configuration,
601        prevent_snapshot_buffering,
602        snapshot_buffering_max,
603    )
604}
605
606/// An experimental upsert implementation loosely described in this doc:
607/// [Upsert V2 Much Simpler Boogaloo](https://www.notion.so/materialize/Upsert-V2-Much-Simpler-Boogaloo-31913f48d37b807fa88bdeafc27c02d9?source=copy_link)
608///
609/// Instead of using rocksdb as a state backend, this implementation uses a differential dataflow collection to hold the key state,
610/// and performs consolidation of updates with matching keys and MZ timestamps, using max FromTime to choose winners,
611/// resulting in only one record per key per time.
612pub(crate) fn upsert_v2<'scope, T, FromTime>(
613    input: VecCollection<'scope, T, (UpsertKey, Option<UpsertValue>, FromTime), Diff>,
614    upsert_envelope: UpsertEnvelope,
615    resume_upper: Antichain<T>,
616    previous: VecCollection<'scope, T, Result<Row, DataflowError>, Diff>,
617    previous_token: Option<Vec<PressOnDropButton>>,
618    source_config: crate::source::SourceExportCreationConfig,
619    backpressure_metrics: Option<BackpressureMetrics>,
620) -> (
621    VecCollection<'scope, T, Result<Row, DataflowError>, Diff>,
622    StreamVec<'scope, T, (Option<GlobalId>, HealthStatusUpdate)>,
623    StreamVec<'scope, T, Infallible>,
624    PressOnDropButton,
625)
626where
627    T: Timestamp + TotalOrder + Sync,
628    T: Refines<mz_repr::Timestamp> + differential_dataflow::lattice::Lattice,
629    T: columnation::Columnation,
630    T: columnar::Columnar + Default,
631    for<'a> columnar::Ref<'a, T>: Copy + Ord,
632    FromTime: Timestamp + Clone + Sync,
633    FromTime: UpsertSourceTime,
634{
635    let upsert_metrics = source_config.metrics.get_upsert_metrics(
636        source_config.id,
637        source_config.worker_id,
638        backpressure_metrics,
639    );
640
641    let thin_input = upsert_thinning(input);
642
643    tracing::info!(
644        worker_id = %source_config.worker_id,
645        source_id = %source_config.id,
646        "rendering upsert source (btreemap backend)"
647    );
648
649    upsert_continual_feedback_v2::upsert_inner(
650        thin_input,
651        upsert_envelope.key_indices,
652        resume_upper,
653        previous,
654        previous_token,
655        upsert_metrics,
656        source_config,
657    )
658}
659
660// A shim so we can dispatch based on the dyncfg that tells us which upsert
661// operator to use.
662fn upsert_operator<'scope, T, FromTime, F, Fut, US>(
663    input: VecCollection<'scope, T, (UpsertKey, Option<UpsertValue>, FromTime), Diff>,
664    key_indices: Vec<usize>,
665    resume_upper: Antichain<T>,
666    persist_input: VecCollection<'scope, T, Result<Row, DataflowError>, Diff>,
667    persist_token: Option<Vec<PressOnDropButton>>,
668    upsert_metrics: UpsertMetrics,
669    source_config: crate::source::SourceExportCreationConfig,
670    state: F,
671    upsert_config: UpsertConfig,
672    _storage_configuration: &StorageConfiguration,
673    prevent_snapshot_buffering: bool,
674    snapshot_buffering_max: Option<usize>,
675) -> (
676    VecCollection<'scope, T, Result<Row, DataflowError>, Diff>,
677    StreamVec<'scope, T, (Option<GlobalId>, HealthStatusUpdate)>,
678    StreamVec<'scope, T, Infallible>,
679    PressOnDropButton,
680)
681where
682    T: Timestamp + TotalOrder + Sync,
683    T: Refines<mz_repr::Timestamp> + TotalOrder + Sync,
684    F: FnOnce() -> Fut + 'static,
685    Fut: std::future::Future<Output = US>,
686    US: UpsertStateBackend<T, FromTime>,
687    FromTime: Debug + timely::ExchangeData + Clone + Ord + Sync,
688{
689    // Hard-coded to true because classic UPSERT cannot be used safely with
690    // concurrent ingestions, which we need for both 0dt upgrades and
691    // multi-replica ingestions.
692    let use_continual_feedback_upsert = true;
693
694    tracing::info!(id = %source_config.id, %use_continual_feedback_upsert, "upsert operator implementation");
695
696    if use_continual_feedback_upsert {
697        upsert_continual_feedback::upsert_inner(
698            input,
699            key_indices,
700            resume_upper,
701            persist_input,
702            persist_token,
703            upsert_metrics,
704            source_config,
705            state,
706            upsert_config,
707            prevent_snapshot_buffering,
708            snapshot_buffering_max,
709        )
710    } else {
711        upsert_classic(
712            input,
713            key_indices,
714            resume_upper,
715            persist_input,
716            persist_token,
717            upsert_metrics,
718            source_config,
719            state,
720            upsert_config,
721            prevent_snapshot_buffering,
722            snapshot_buffering_max,
723        )
724    }
725}
726
727/// Renders an operator that discards updates that are known to not affect the outcome of upsert in
728/// a streaming fashion. For each distinct (key, time) in the input it emits the value with the
729/// highest from_time. Its purpose is to thin out data as much as possible before exchanging them
730/// across workers.
731fn upsert_thinning<'scope, T, K, V, FromTime>(
732    input: VecCollection<'scope, T, (K, V, FromTime), Diff>,
733) -> VecCollection<'scope, T, (K, V, FromTime), Diff>
734where
735    T: Timestamp + TotalOrder,
736    K: timely::ExchangeData + Clone + Eq + Ord,
737    V: timely::ExchangeData + Clone,
738    FromTime: Timestamp,
739{
740    input
741        .inner
742        .unary(Pipeline, "UpsertThinning", |_, _| {
743            // A capability suitable to emit all updates in `updates`, if any.
744            let mut capability: Option<InputCapability<T>> = None;
745            // A batch of received updates
746            let mut updates = Vec::new();
747            move |input, output| {
748                input.for_each(|cap, data| {
749                    assert!(
750                        data.iter().all(|(_, _, diff)| diff.is_positive()),
751                        "invalid upsert input"
752                    );
753                    updates.append(data);
754                    match capability.as_mut() {
755                        Some(capability) => {
756                            if cap.time() <= capability.time() {
757                                *capability = cap;
758                            }
759                        }
760                        None => capability = Some(cap),
761                    }
762                });
763                if let Some(capability) = capability.take() {
764                    // Sort by (key, time, Reverse(from_time)) so that deduping by (key, time) gives
765                    // the latest change for that key.
766                    updates.sort_unstable_by(|a, b| {
767                        let ((key1, _, from_time1), time1, _) = a;
768                        let ((key2, _, from_time2), time2, _) = b;
769                        Ord::cmp(
770                            &(key1, time1, Reverse(from_time1)),
771                            &(key2, time2, Reverse(from_time2)),
772                        )
773                    });
774                    let mut session = output.session(&capability);
775                    session.give_iterator(updates.drain(..).dedup_by(|a, b| {
776                        let ((key1, _, _), time1, _) = a;
777                        let ((key2, _, _), time2, _) = b;
778                        (key1, time1) == (key2, time2)
779                    }))
780                }
781            }
782        })
783        .as_collection()
784}
785
786/// Helper method for `upsert_classic` used to stage `data` updates
787/// from the input/source timely edge.
788fn stage_input<T, FromTime>(
789    stash: &mut Vec<(T, UpsertKey, Reverse<FromTime>, Option<UpsertValue>)>,
790    data: &mut Vec<((UpsertKey, Option<UpsertValue>, FromTime), T, Diff)>,
791    input_upper: &Antichain<T>,
792    resume_upper: &Antichain<T>,
793    storage_shrink_upsert_unused_buffers_by_ratio: usize,
794) where
795    T: PartialOrder,
796    FromTime: Ord,
797{
798    if PartialOrder::less_equal(input_upper, resume_upper) {
799        data.retain(|(_, ts, _)| resume_upper.less_equal(ts));
800    }
801
802    stash.extend(data.drain(..).map(|((key, value, order), time, diff)| {
803        assert!(diff.is_positive(), "invalid upsert input");
804        (time, key, Reverse(order), value)
805    }));
806
807    if storage_shrink_upsert_unused_buffers_by_ratio > 0 {
808        let reduced_capacity = stash.capacity() / storage_shrink_upsert_unused_buffers_by_ratio;
809        if reduced_capacity > stash.len() {
810            stash.shrink_to(reduced_capacity);
811        }
812    }
813}
814
815/// The style of drain we are performing on the stash. `AtTime`-drains cannot
816/// assume that all values have been seen, and must leave tombstones behind for deleted values.
817#[derive(Debug)]
818enum DrainStyle<'a, T> {
819    ToUpper(&'a Antichain<T>),
820    AtTime(T),
821}
822
823/// Helper method for `upsert_inner` used to stage `data` updates
824/// from the input timely edge.
825async fn drain_staged_input<S, T, FromTime, E>(
826    stash: &mut Vec<(T, UpsertKey, Reverse<FromTime>, Option<UpsertValue>)>,
827    commands_state: &mut indexmap::IndexMap<UpsertKey, types::UpsertValueAndSize<T, FromTime>>,
828    output_updates: &mut Vec<(UpsertValue, T, Diff)>,
829    multi_get_scratch: &mut Vec<UpsertKey>,
830    drain_style: DrainStyle<'_, T>,
831    error_emitter: &mut E,
832    state: &mut UpsertState<'_, S, T, FromTime>,
833    source_config: &crate::source::SourceExportCreationConfig,
834) where
835    S: UpsertStateBackend<T, FromTime>,
836    T: PartialOrder + Ord + Clone + Send + Sync + Serialize + Debug + 'static,
837    FromTime: timely::ExchangeData + Clone + Ord + Sync,
838    E: UpsertErrorEmitter<T>,
839{
840    stash.sort_unstable();
841
842    // Find the prefix that we can emit
843    let idx = stash.partition_point(|(ts, _, _, _)| match &drain_style {
844        DrainStyle::ToUpper(upper) => !upper.less_equal(ts),
845        DrainStyle::AtTime(time) => ts <= time,
846    });
847
848    tracing::trace!(?drain_style, updates = idx, "draining stash in upsert");
849
850    // Read the previous values _per key_ out of `state`, recording it
851    // along with the value with the _latest timestamp for that key_.
852    commands_state.clear();
853    for (_, key, _, _) in stash.iter().take(idx) {
854        commands_state.entry(*key).or_default();
855    }
856
857    // These iterators iterate in the same order because `commands_state`
858    // is an `IndexMap`.
859    multi_get_scratch.clear();
860    multi_get_scratch.extend(commands_state.iter().map(|(k, _)| *k));
861    match state
862        .multi_get(multi_get_scratch.drain(..), commands_state.values_mut())
863        .await
864    {
865        Ok(_) => {}
866        Err(e) => {
867            error_emitter
868                .emit("Failed to fetch records from state".to_string(), e)
869                .await;
870        }
871    }
872
873    // From the prefix that can be emitted we can deduplicate based on (ts, key) in
874    // order to only process the command with the maximum order within the (ts,
875    // key) group. This is achieved by wrapping order in `Reverse(FromTime)` above.;
876    let mut commands = stash.drain(..idx).dedup_by(|a, b| {
877        let ((a_ts, a_key, _, _), (b_ts, b_key, _, _)) = (a, b);
878        a_ts == b_ts && a_key == b_key
879    });
880
881    let bincode_opts = types::upsert_bincode_opts();
882    // Upsert the values into `commands_state`, by recording the latest
883    // value (or deletion). These will be synced at the end to the `state`.
884    //
885    // Note that we are effectively doing "mini-upsert" here, using
886    // `command_state`. This "mini-upsert" is seeded with data from `state`, using
887    // a single `multi_get` above, and the final state is written out into
888    // `state` using a single `multi_put`. This simplifies `UpsertStateBackend`
889    // implementations, and reduces the number of reads and write we need to do.
890    //
891    // This "mini-upsert" technique is actually useful in `UpsertState`'s
892    // `consolidate_snapshot_read_write_inner` implementation, minimizing gets and puts on
893    // the `UpsertStateBackend` implementations. In some sense, its "upsert all the way down".
894    while let Some((ts, key, from_time, value)) = commands.next() {
895        let mut command_state = if let Entry::Occupied(command_state) = commands_state.entry(key) {
896            command_state
897        } else {
898            panic!("key missing from commands_state");
899        };
900
901        let existing_value = &mut command_state.get_mut().value;
902
903        if let Some(cs) = existing_value.as_mut() {
904            cs.ensure_decoded(bincode_opts, source_config.id, Some(&key));
905        }
906
907        // Skip this command if its order key is below the one in the upsert state.
908        // Note that the existing order key may be `None` if the existing value
909        // is from snapshotting, which always sorts below new values/deletes.
910        let existing_order = existing_value
911            .as_ref()
912            .and_then(|cs| cs.provisional_order(&ts));
913        if existing_order >= Some(&from_time.0) {
914            // Skip this update. If no later updates adjust this key, then we just
915            // end up writing the same value back to state. If there
916            // is nothing in the state, `existing_order` is `None`, and this
917            // does not occur.
918            continue;
919        }
920
921        match value {
922            Some(value) => {
923                if let Some(old_value) =
924                    existing_value.replace(StateValue::finalized_value(value.clone()))
925                {
926                    if let Some(old_value) = old_value.into_decoded().finalized {
927                        output_updates.push((old_value, ts.clone(), Diff::MINUS_ONE));
928                    }
929                }
930                output_updates.push((value, ts, Diff::ONE));
931            }
932            None => {
933                if let Some(old_value) = existing_value.take() {
934                    if let Some(old_value) = old_value.into_decoded().finalized {
935                        output_updates.push((old_value, ts, Diff::MINUS_ONE));
936                    }
937                }
938
939                // Record a tombstone for deletes.
940                *existing_value = Some(StateValue::tombstone());
941            }
942        }
943    }
944
945    match state
946        .multi_put(
947            true, // Do update per-update stats.
948            commands_state.drain(..).map(|(k, cv)| {
949                (
950                    k,
951                    types::PutValue {
952                        value: cv.value.map(|cv| cv.into_decoded()),
953                        previous_value_metadata: cv.metadata.map(|v| ValueMetadata {
954                            size: v.size.try_into().expect("less than i64 size"),
955                            is_tombstone: v.is_tombstone,
956                        }),
957                    },
958                )
959            }),
960        )
961        .await
962    {
963        Ok(_) => {}
964        Err(e) => {
965            error_emitter
966                .emit("Failed to update records in state".to_string(), e)
967                .await;
968        }
969    }
970}
971
972// Created a struct to hold the configs for upserts.
973// So that new configs don't require a new method parameter.
974pub(crate) struct UpsertConfig {
975    pub shrink_upsert_unused_buffers_by_ratio: usize,
976}
977
978fn upsert_classic<'scope, T, FromTime, F, Fut, US>(
979    input: VecCollection<'scope, T, (UpsertKey, Option<UpsertValue>, FromTime), Diff>,
980    key_indices: Vec<usize>,
981    resume_upper: Antichain<T>,
982    previous: VecCollection<'scope, T, Result<Row, DataflowError>, Diff>,
983    previous_token: Option<Vec<PressOnDropButton>>,
984    upsert_metrics: UpsertMetrics,
985    source_config: crate::source::SourceExportCreationConfig,
986    state: F,
987    upsert_config: UpsertConfig,
988    prevent_snapshot_buffering: bool,
989    snapshot_buffering_max: Option<usize>,
990) -> (
991    VecCollection<'scope, T, Result<Row, DataflowError>, Diff>,
992    StreamVec<'scope, T, (Option<GlobalId>, HealthStatusUpdate)>,
993    StreamVec<'scope, T, Infallible>,
994    PressOnDropButton,
995)
996where
997    T: Timestamp + TotalOrder + Sync,
998    F: FnOnce() -> Fut + 'static,
999    Fut: std::future::Future<Output = US>,
1000    US: UpsertStateBackend<T, FromTime>,
1001    FromTime: timely::ExchangeData + Clone + Ord + Sync,
1002{
1003    let mut builder = AsyncOperatorBuilder::new("Upsert".to_string(), input.scope());
1004
1005    // We only care about UpsertValueError since this is the only error that we can retract
1006    let previous = previous.flat_map(move |result| {
1007        let value = match result {
1008            Ok(ok) => Ok(ok),
1009            Err(DataflowError::EnvelopeError(err)) => match *err {
1010                EnvelopeError::Upsert(err) => Err(Box::new(err)),
1011                EnvelopeError::Flat(_) => return None,
1012            },
1013            Err(_) => return None,
1014        };
1015        let value_ref = match value {
1016            Ok(ref row) => Ok(row),
1017            Err(ref err) => Err(&**err),
1018        };
1019        Some((UpsertKey::from_value(value_ref, &key_indices), value))
1020    });
1021    let (output_handle, output) = builder.new_output();
1022
1023    // An output that just reports progress of the snapshot consolidation process upstream to the
1024    // persist source to ensure that backpressure is applied
1025    let (_snapshot_handle, snapshot_stream) =
1026        builder.new_output::<CapacityContainerBuilder<Vec<Infallible>>>();
1027
1028    let (mut health_output, health_stream) = builder.new_output();
1029    let mut input = builder.new_input_for(
1030        input.inner,
1031        Exchange::new(move |((key, _, _), _, _)| UpsertKey::hashed(key)),
1032        &output_handle,
1033    );
1034
1035    let mut previous = builder.new_input_for(
1036        previous.inner,
1037        Exchange::new(|((key, _), _, _)| UpsertKey::hashed(key)),
1038        &output_handle,
1039    );
1040
1041    let upsert_shared_metrics = Arc::clone(&upsert_metrics.shared);
1042    let shutdown_button = builder.build(move |caps| async move {
1043        let [mut output_cap, mut snapshot_cap, health_cap]: [_; 3] = caps.try_into().unwrap();
1044
1045        let mut state = UpsertState::<_, _, FromTime>::new(
1046            state().await,
1047            upsert_shared_metrics,
1048            &upsert_metrics,
1049            source_config.source_statistics.clone(),
1050            upsert_config.shrink_upsert_unused_buffers_by_ratio,
1051        );
1052        let mut events = vec![];
1053        let mut snapshot_upper = Antichain::from_elem(Timestamp::minimum());
1054
1055        let mut stash = vec![];
1056
1057        let mut error_emitter = (&mut health_output, &health_cap);
1058
1059        tracing::info!(
1060            ?resume_upper,
1061            ?snapshot_upper,
1062            "timely-{} upsert source {} starting rehydration",
1063            source_config.worker_id,
1064            source_config.id
1065        );
1066        // Read and consolidate the snapshot from the 'previous' input until it
1067        // reaches the `resume_upper`.
1068        while !PartialOrder::less_equal(&resume_upper, &snapshot_upper) {
1069            previous.ready().await;
1070            while let Some(event) = previous.next_sync() {
1071                match event {
1072                    AsyncEvent::Data(_cap, data) => {
1073                        events.extend(data.into_iter().filter_map(|((key, value), ts, diff)| {
1074                            if !resume_upper.less_equal(&ts) {
1075                                Some((key, value, diff))
1076                            } else {
1077                                None
1078                            }
1079                        }))
1080                    }
1081                    AsyncEvent::Progress(upper) => {
1082                        snapshot_upper = upper;
1083                    }
1084                };
1085            }
1086
1087            match state
1088                .consolidate_chunk(
1089                    events.drain(..),
1090                    PartialOrder::less_equal(&resume_upper, &snapshot_upper),
1091                )
1092                .await
1093            {
1094                Ok(_) => {
1095                    if let Some(ts) = snapshot_upper.clone().into_option() {
1096                        // As we shutdown, we could ostensibly get data from later than the
1097                        // `resume_upper`, which we ignore above. We don't want our output capability to make
1098                        // it further than the `resume_upper`.
1099                        if !resume_upper.less_equal(&ts) {
1100                            snapshot_cap.downgrade(&ts);
1101                            output_cap.downgrade(&ts);
1102                        }
1103                    }
1104                }
1105                Err(e) => {
1106                    UpsertErrorEmitter::<T>::emit(
1107                        &mut error_emitter,
1108                        "Failed to rehydrate state".to_string(),
1109                        e,
1110                    )
1111                    .await;
1112                }
1113            }
1114        }
1115
1116        drop(events);
1117        drop(previous_token);
1118        drop(snapshot_cap);
1119
1120        // Exchaust the previous input. It is expected to immediately reach the empty
1121        // antichain since we have dropped its token.
1122        //
1123        // Note that we do not need to also process the `input` during this, as the dropped token
1124        // will shutdown the `backpressure` operator
1125        while let Some(_event) = previous.next().await {}
1126
1127        // After snapshotting, our output frontier is exactly the `resume_upper`
1128        if let Some(ts) = resume_upper.as_option() {
1129            output_cap.downgrade(ts);
1130        }
1131
1132        tracing::info!(
1133            "timely-{} upsert source {} finished rehydration",
1134            source_config.worker_id,
1135            source_config.id
1136        );
1137
1138        // A re-usable buffer of changes, per key. This is an `IndexMap` because it has to be `drain`-able
1139        // and have a consistent iteration order.
1140        let mut commands_state: indexmap::IndexMap<_, types::UpsertValueAndSize<T, FromTime>> =
1141            indexmap::IndexMap::new();
1142        let mut multi_get_scratch = Vec::new();
1143
1144        // Now can can resume consuming the collection
1145        let mut output_updates = vec![];
1146        let mut input_upper = Antichain::from_elem(Timestamp::minimum());
1147
1148        while let Some(event) = input.next().await {
1149            // Buffer as many events as possible. This should be bounded, as new data can't be
1150            // produced in this worker until we yield to timely.
1151            let events = [event]
1152                .into_iter()
1153                .chain(std::iter::from_fn(|| input.next().now_or_never().flatten()))
1154                .enumerate();
1155
1156            let mut partial_drain_time = None;
1157            for (i, event) in events {
1158                match event {
1159                    AsyncEvent::Data(cap, mut data) => {
1160                        tracing::trace!(
1161                            time=?cap.time(),
1162                            updates=%data.len(),
1163                            "received data in upsert"
1164                        );
1165                        stage_input(
1166                            &mut stash,
1167                            &mut data,
1168                            &input_upper,
1169                            &resume_upper,
1170                            upsert_config.shrink_upsert_unused_buffers_by_ratio,
1171                        );
1172
1173                        let event_time = cap.time();
1174                        // If the data is at _exactly_ the output frontier, we can preemptively drain it into the state.
1175                        // Data within this set events strictly beyond this time are staged as
1176                        // normal.
1177                        //
1178                        // This is a load-bearing optimization, as it is required to avoid buffering
1179                        // the entire source snapshot in the `stash`.
1180                        if prevent_snapshot_buffering && output_cap.time() == event_time {
1181                            partial_drain_time = Some(event_time.clone());
1182                        }
1183                    }
1184                    AsyncEvent::Progress(upper) => {
1185                        tracing::trace!(?upper, "received progress in upsert");
1186                        // Ignore progress updates before the `resume_upper`, which is our initial
1187                        // capability post-snapshotting.
1188                        if PartialOrder::less_than(&upper, &resume_upper) {
1189                            continue;
1190                        }
1191
1192                        // Disable the partial drain as this progress event covers
1193                        // the `output_cap` time.
1194                        partial_drain_time = None;
1195                        drain_staged_input::<_, _, _, _>(
1196                            &mut stash,
1197                            &mut commands_state,
1198                            &mut output_updates,
1199                            &mut multi_get_scratch,
1200                            DrainStyle::ToUpper(&upper),
1201                            &mut error_emitter,
1202                            &mut state,
1203                            &source_config,
1204                        )
1205                        .await;
1206
1207                        output_handle.give_container(&output_cap, &mut output_updates);
1208
1209                        if let Some(ts) = upper.as_option() {
1210                            output_cap.downgrade(ts);
1211                        }
1212                        input_upper = upper;
1213                    }
1214                }
1215                let events_processed = i + 1;
1216                if let Some(max) = snapshot_buffering_max {
1217                    if events_processed >= max {
1218                        break;
1219                    }
1220                }
1221            }
1222
1223            // If there were staged events that occurred at the capability time, drain
1224            // them. This is safe because out-of-order updates to the same key that are
1225            // drained in separate calls to `drain_staged_input` are correctly ordered by
1226            // their `FromTime` in `drain_staged_input`.
1227            //
1228            // Note also that this may result in more updates in the output collection than
1229            // the minimum. However, because the frontier only advances on `Progress` updates,
1230            // the collection always accumulates correctly for all keys.
1231            if let Some(partial_drain_time) = partial_drain_time {
1232                drain_staged_input::<_, _, _, _>(
1233                    &mut stash,
1234                    &mut commands_state,
1235                    &mut output_updates,
1236                    &mut multi_get_scratch,
1237                    DrainStyle::AtTime(partial_drain_time),
1238                    &mut error_emitter,
1239                    &mut state,
1240                    &source_config,
1241                )
1242                .await;
1243
1244                output_handle.give_container(&output_cap, &mut output_updates);
1245            }
1246        }
1247    });
1248
1249    (
1250        output.as_collection().map(|result| match result {
1251            Ok(ok) => Ok(ok),
1252            Err(err) => Err(DataflowError::from(EnvelopeError::Upsert(*err))),
1253        }),
1254        health_stream,
1255        snapshot_stream,
1256        shutdown_button.press_on_drop(),
1257    )
1258}
1259
1260#[async_trait::async_trait(?Send)]
1261pub(crate) trait UpsertErrorEmitter<T> {
1262    async fn emit(&mut self, context: String, e: anyhow::Error);
1263}
1264
1265#[async_trait::async_trait(?Send)]
1266impl<T: Timestamp> UpsertErrorEmitter<T>
1267    for (
1268        &mut AsyncOutputHandle<
1269            T,
1270            CapacityContainerBuilder<Vec<(Option<GlobalId>, HealthStatusUpdate)>>,
1271        >,
1272        &Capability<T>,
1273    )
1274{
1275    async fn emit(&mut self, context: String, e: anyhow::Error) {
1276        process_upsert_state_error::<T>(context, e, self.0, self.1).await
1277    }
1278}
1279
1280/// Emit the given error, and stall till the dataflow is restarted.
1281async fn process_upsert_state_error<T: Timestamp>(
1282    context: String,
1283    e: anyhow::Error,
1284    health_output: &AsyncOutputHandle<
1285        T,
1286        CapacityContainerBuilder<Vec<(Option<GlobalId>, HealthStatusUpdate)>>,
1287    >,
1288    health_cap: &Capability<T>,
1289) {
1290    let update = HealthStatusUpdate::halting(e.context(context).to_string_with_causes(), None);
1291    health_output.give(health_cap, (None, update));
1292    std::future::pending::<()>().await;
1293    unreachable!("pending future never returns");
1294}