Skip to main content

mz_storage/
upsert.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::cell::RefCell;
11use std::cmp::Reverse;
12use std::convert::AsRef;
13use std::fmt::Debug;
14use std::hash::{Hash, Hasher};
15use std::path::PathBuf;
16use std::sync::Arc;
17
18use differential_dataflow::hashable::Hashable;
19use differential_dataflow::{AsCollection, VecCollection};
20use futures::StreamExt;
21use futures::future::FutureExt;
22use indexmap::map::Entry;
23use itertools::Itertools;
24use mz_ore::error::ErrorExt;
25use mz_repr::{Datum, DatumVec, Diff, GlobalId, Row};
26use mz_rocksdb::ValueIterator;
27use mz_storage_operators::metrics::BackpressureMetrics;
28use mz_storage_types::configuration::StorageConfiguration;
29use mz_storage_types::dyncfgs;
30use mz_storage_types::errors::{DataflowError, EnvelopeError, UpsertError};
31use mz_storage_types::sources::envelope::UpsertEnvelope;
32use mz_timely_util::builder_async::{
33    AsyncOutputHandle, Event as AsyncEvent, OperatorBuilder as AsyncOperatorBuilder,
34    PressOnDropButton,
35};
36use serde::{Deserialize, Serialize};
37use sha2::{Digest, Sha256};
38use timely::dataflow::channels::pact::Exchange;
39use timely::dataflow::operators::{Capability, InputCapability, Operator};
40use timely::dataflow::{Scope, StreamVec};
41use timely::order::{PartialOrder, TotalOrder};
42use timely::progress::timestamp::Refines;
43use timely::progress::{Antichain, Timestamp};
44
45use crate::healthcheck::HealthStatusUpdate;
46use crate::metrics::upsert::UpsertMetrics;
47use crate::storage_state::StorageInstanceContext;
48use crate::{upsert_continual_feedback, upsert_continual_feedback_v2};
49use types::{
50    BincodeOpts, StateValue, UpsertState, UpsertStateBackend, consolidating_merge_function,
51    upsert_bincode_opts,
52};
53
54#[cfg(test)]
55pub mod memory;
56pub(crate) mod rocksdb;
57// TODO(aljoscha): Move next to upsert module, rename to upsert_types.
58pub(crate) mod types;
59
60pub type UpsertValue = Result<Row, Box<UpsertError>>;
61
62#[derive(
63    Copy,
64    Clone,
65    Hash,
66    PartialEq,
67    Eq,
68    PartialOrd,
69    Ord,
70    Serialize,
71    Deserialize
72)]
73pub struct UpsertKey([u8; 32]);
74
75impl Debug for UpsertKey {
76    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
77        write!(f, "0x")?;
78        for byte in self.0 {
79            write!(f, "{:02x}", byte)?;
80        }
81        Ok(())
82    }
83}
84
85impl AsRef<[u8]> for UpsertKey {
86    #[inline(always)]
87    // Note we do 1 `multi_get` and 1 `multi_put` while processing a _batch of updates_. Within the
88    // batch, we effectively consolidate each key, before persisting that consolidated value.
89    // Easy!!
90    fn as_ref(&self) -> &[u8] {
91        &self.0
92    }
93}
94
95impl From<&[u8]> for UpsertKey {
96    fn from(bytes: &[u8]) -> Self {
97        UpsertKey(bytes.try_into().expect("invalid key length"))
98    }
99}
100
101/// The hash function used to map upsert keys. It is important that this hash is a cryptographic
102/// hash so that there is no risk of collisions. Collisions on SHA256 have a probability of 2^128
103/// which is many orders of magnitude smaller than many other events that we don't even think about
104/// (e.g bit flips). In short, we can safely assume that sha256(a) == sha256(b) iff a == b.
105type KeyHash = Sha256;
106
107impl UpsertKey {
108    pub fn from_key(key: Result<&Row, &UpsertError>) -> Self {
109        Self::from_iter(key.map(|r| r.iter()))
110    }
111
112    pub fn from_value(value: Result<&Row, &UpsertError>, key_indices: &[usize]) -> Self {
113        thread_local! {
114            /// A thread-local datum cache used to calculate hashes
115            static VALUE_DATUMS: RefCell<DatumVec> = RefCell::new(DatumVec::new());
116        }
117        VALUE_DATUMS.with(|value_datums| {
118            let mut value_datums = value_datums.borrow_mut();
119            let value = value.map(|v| value_datums.borrow_with(v));
120            let key = match value {
121                Ok(ref datums) => Ok(key_indices.iter().map(|&idx| datums[idx])),
122                Err(err) => Err(err),
123            };
124            Self::from_iter(key)
125        })
126    }
127
128    pub fn from_iter<'a, 'b>(
129        key: Result<impl Iterator<Item = Datum<'a>> + 'b, &UpsertError>,
130    ) -> Self {
131        thread_local! {
132            /// A thread-local datum cache used to calculate hashes
133            static KEY_DATUMS: RefCell<DatumVec> = RefCell::new(DatumVec::new());
134        }
135        KEY_DATUMS.with(|key_datums| {
136            let mut key_datums = key_datums.borrow_mut();
137            // Borrowing the DatumVec gives us a temporary buffer to store datums in that will be
138            // automatically cleared on Drop. See the DatumVec docs for more details.
139            let mut key_datums = key_datums.borrow();
140            let key: Result<&[Datum], Datum> = match key {
141                Ok(key) => {
142                    for datum in key {
143                        key_datums.push(datum);
144                    }
145                    Ok(&*key_datums)
146                }
147                Err(UpsertError::Value(err)) => {
148                    key_datums.extend(err.for_key.iter());
149                    Ok(&*key_datums)
150                }
151                Err(UpsertError::KeyDecode(err)) => Err(Datum::Bytes(&err.raw)),
152                Err(UpsertError::NullKey(_)) => Err(Datum::Null),
153            };
154            let mut hasher = DigestHasher(KeyHash::new());
155            key.hash(&mut hasher);
156            Self(hasher.0.finalize().into())
157        })
158    }
159}
160
161struct DigestHasher<H: Digest>(H);
162
163impl<H: Digest> Hasher for DigestHasher<H> {
164    fn write(&mut self, bytes: &[u8]) {
165        self.0.update(bytes);
166    }
167
168    fn finish(&self) -> u64 {
169        panic!("digest wrapper used to produce a hash");
170    }
171}
172
173use std::convert::Infallible;
174use timely::container::CapacityContainerBuilder;
175use timely::dataflow::channels::pact::Pipeline;
176
177use self::types::ValueMetadata;
178
179/// This leaf operator drops `token` after the input reaches the `resume_upper`.
180/// This is useful to take coordinated actions across all workers, after the `upsert`
181/// operator has rehydrated.
182pub fn rehydration_finished<'scope, T: Timestamp>(
183    scope: Scope<'scope, T>,
184    source_config: &crate::source::RawSourceCreationConfig,
185    // A token that we can drop to signal we are finished rehydrating.
186    token: impl std::any::Any + 'static,
187    resume_upper: Antichain<T>,
188    input: StreamVec<'scope, T, Infallible>,
189) {
190    let worker_id = source_config.worker_id;
191    let id = source_config.id;
192    let mut builder = AsyncOperatorBuilder::new(format!("rehydration_finished({id}"), scope);
193    let mut input = builder.new_disconnected_input(input, Pipeline);
194
195    builder.build(move |_capabilities| async move {
196        let mut input_upper = Antichain::from_elem(Timestamp::minimum());
197        // Ensure this operator finishes if the resume upper is `[0]`
198        while !PartialOrder::less_equal(&resume_upper, &input_upper) {
199            let Some(event) = input.next().await else {
200                break;
201            };
202            if let AsyncEvent::Progress(upper) = event {
203                input_upper = upper;
204            }
205        }
206        tracing::info!(
207            %worker_id,
208            source_id = %id,
209            "upsert source has downgraded past the resume upper ({resume_upper:?}) across all workers",
210        );
211        drop(token);
212    });
213}
214
215/// Resumes an upsert computation at `resume_upper` given as inputs a collection of upsert commands
216/// and the collection of the previous output of this operator.
217/// Returns a tuple of
218/// - A collection of the computed upsert operator and,
219/// - A health update stream to propagate errors
220pub(crate) fn upsert<'scope, T, FromTime>(
221    input: VecCollection<'scope, T, (UpsertKey, Option<UpsertValue>, FromTime), Diff>,
222    upsert_envelope: UpsertEnvelope,
223    resume_upper: Antichain<T>,
224    previous: VecCollection<'scope, T, Result<Row, DataflowError>, Diff>,
225    previous_token: Option<Vec<PressOnDropButton>>,
226    source_config: crate::source::SourceExportCreationConfig,
227    instance_context: &StorageInstanceContext,
228    storage_configuration: &StorageConfiguration,
229    dataflow_paramters: &crate::internal_control::DataflowParameters,
230    backpressure_metrics: Option<BackpressureMetrics>,
231) -> (
232    VecCollection<'scope, T, Result<Row, DataflowError>, Diff>,
233    StreamVec<'scope, T, (Option<GlobalId>, HealthStatusUpdate)>,
234    StreamVec<'scope, T, Infallible>,
235    PressOnDropButton,
236)
237where
238    T: Timestamp + TotalOrder + Sync,
239    T: Refines<mz_repr::Timestamp> + TotalOrder + Sync,
240    FromTime: Timestamp + Clone + Sync,
241{
242    let upsert_metrics = source_config.metrics.get_upsert_metrics(
243        source_config.id,
244        source_config.worker_id,
245        backpressure_metrics,
246    );
247
248    let rocksdb_cleanup_tries =
249        dyncfgs::STORAGE_ROCKSDB_CLEANUP_TRIES.get(storage_configuration.config_set());
250
251    // Whether or not to partially drain the input buffer
252    // to prevent buffering of the _upstream_ snapshot.
253    let prevent_snapshot_buffering =
254        dyncfgs::STORAGE_UPSERT_PREVENT_SNAPSHOT_BUFFERING.get(storage_configuration.config_set());
255    // If the above is true, the number of timely batches to process at once.
256    let snapshot_buffering_max = dyncfgs::STORAGE_UPSERT_MAX_SNAPSHOT_BATCH_BUFFERING
257        .get(storage_configuration.config_set());
258
259    // Whether we should provide the upsert state merge operator to the RocksDB instance
260    // (for faster performance during snapshot hydration).
261    let rocksdb_use_native_merge_operator =
262        dyncfgs::STORAGE_ROCKSDB_USE_MERGE_OPERATOR.get(storage_configuration.config_set());
263
264    let upsert_config = UpsertConfig {
265        shrink_upsert_unused_buffers_by_ratio: storage_configuration
266            .parameters
267            .shrink_upsert_unused_buffers_by_ratio,
268    };
269
270    let thin_input = upsert_thinning(input);
271
272    let tuning = dataflow_paramters.upsert_rocksdb_tuning_config.clone();
273
274    // When running RocksDB in memory, the file system is emulated. However, we still need to
275    // pick a path that exists because RocksDB will attempt to create the working directory
276    // (see https://github.com/rust-rocksdb/rust-rocksdb/issues/1015) and write a lock file,
277    // so we need to ensure the directory is unique per worker.
278    let rocksdb_dir = instance_context
279        .scratch_directory
280        .clone()
281        .unwrap_or_else(|| PathBuf::from("/tmp"))
282        .join("storage")
283        .join("upsert")
284        .join(source_config.id.to_string())
285        .join(source_config.worker_id.to_string());
286
287    tracing::info!(
288        worker_id = %source_config.worker_id,
289        source_id = %source_config.id,
290        ?rocksdb_dir,
291        ?tuning,
292        ?rocksdb_use_native_merge_operator,
293        "rendering upsert source"
294    );
295
296    let rocksdb_shared_metrics = Arc::clone(&upsert_metrics.rocksdb_shared);
297    let rocksdb_instance_metrics = Arc::clone(&upsert_metrics.rocksdb_instance_metrics);
298
299    let env = instance_context.rocksdb_env.clone();
300
301    // A closure that will initialize and return a configured RocksDB instance
302    let rocksdb_init_fn = move || async move {
303        let merge_operator = if rocksdb_use_native_merge_operator {
304            Some((
305                "upsert_state_snapshot_merge_v1".to_string(),
306                |a: &[u8], b: ValueIterator<BincodeOpts, StateValue<T, FromTime>>| {
307                    consolidating_merge_function::<T, FromTime>(a.into(), b)
308                },
309            ))
310        } else {
311            None
312        };
313        rocksdb::RocksDB::new(
314            mz_rocksdb::RocksDBInstance::new(
315                &rocksdb_dir,
316                mz_rocksdb::InstanceOptions::new(
317                    env,
318                    rocksdb_cleanup_tries,
319                    merge_operator,
320                    // For now, just use the same config as the one used for
321                    // merging snapshots.
322                    upsert_bincode_opts(),
323                ),
324                tuning,
325                rocksdb_shared_metrics,
326                rocksdb_instance_metrics,
327            )
328            .unwrap(),
329        )
330    };
331
332    upsert_operator(
333        thin_input,
334        upsert_envelope.key_indices,
335        resume_upper,
336        previous,
337        previous_token,
338        upsert_metrics,
339        source_config,
340        rocksdb_init_fn,
341        upsert_config,
342        storage_configuration,
343        prevent_snapshot_buffering,
344        snapshot_buffering_max,
345    )
346}
347
348/// An experimental upsert implementation loosely described in this doc:
349/// [Upsert V2 Much Simpler Boogaloo](https://www.notion.so/materialize/Upsert-V2-Much-Simpler-Boogaloo-31913f48d37b807fa88bdeafc27c02d9?source=copy_link)
350///
351/// Instead of using rocksdb as a state backend, this implementation uses a differential dataflow collection to hold the key state,
352/// and performs consolidation of updates with matching keys and MZ timestamps, using max FromTime to choose winners,
353/// resulting in only one record per key per time.
354pub(crate) fn upsert_v2<'scope, T, FromTime>(
355    input: VecCollection<'scope, T, (UpsertKey, Option<UpsertValue>, FromTime), Diff>,
356    upsert_envelope: UpsertEnvelope,
357    resume_upper: Antichain<T>,
358    previous: VecCollection<'scope, T, Result<Row, DataflowError>, Diff>,
359    previous_token: Option<Vec<PressOnDropButton>>,
360    source_config: crate::source::SourceExportCreationConfig,
361    backpressure_metrics: Option<BackpressureMetrics>,
362) -> (
363    VecCollection<'scope, T, Result<Row, DataflowError>, Diff>,
364    StreamVec<'scope, T, (Option<GlobalId>, HealthStatusUpdate)>,
365    StreamVec<'scope, T, Infallible>,
366    PressOnDropButton,
367)
368where
369    T: Timestamp + TotalOrder + Sync,
370    T: Refines<mz_repr::Timestamp> + TotalOrder + differential_dataflow::lattice::Lattice + Sync,
371    FromTime: Timestamp + Clone + Sync,
372{
373    let upsert_metrics = source_config.metrics.get_upsert_metrics(
374        source_config.id,
375        source_config.worker_id,
376        backpressure_metrics,
377    );
378
379    let thin_input = upsert_thinning(input);
380
381    tracing::info!(
382        worker_id = %source_config.worker_id,
383        source_id = %source_config.id,
384        "rendering upsert source (btreemap backend)"
385    );
386
387    upsert_continual_feedback_v2::upsert_inner(
388        thin_input,
389        upsert_envelope.key_indices,
390        resume_upper,
391        previous,
392        previous_token,
393        upsert_metrics,
394        source_config,
395    )
396}
397
398// A shim so we can dispatch based on the dyncfg that tells us which upsert
399// operator to use.
400fn upsert_operator<'scope, T, FromTime, F, Fut, US>(
401    input: VecCollection<'scope, T, (UpsertKey, Option<UpsertValue>, FromTime), Diff>,
402    key_indices: Vec<usize>,
403    resume_upper: Antichain<T>,
404    persist_input: VecCollection<'scope, T, Result<Row, DataflowError>, Diff>,
405    persist_token: Option<Vec<PressOnDropButton>>,
406    upsert_metrics: UpsertMetrics,
407    source_config: crate::source::SourceExportCreationConfig,
408    state: F,
409    upsert_config: UpsertConfig,
410    _storage_configuration: &StorageConfiguration,
411    prevent_snapshot_buffering: bool,
412    snapshot_buffering_max: Option<usize>,
413) -> (
414    VecCollection<'scope, T, Result<Row, DataflowError>, Diff>,
415    StreamVec<'scope, T, (Option<GlobalId>, HealthStatusUpdate)>,
416    StreamVec<'scope, T, Infallible>,
417    PressOnDropButton,
418)
419where
420    T: Timestamp + TotalOrder + Sync,
421    T: Refines<mz_repr::Timestamp> + TotalOrder + Sync,
422    F: FnOnce() -> Fut + 'static,
423    Fut: std::future::Future<Output = US>,
424    US: UpsertStateBackend<T, FromTime>,
425    FromTime: Debug + timely::ExchangeData + Clone + Ord + Sync,
426{
427    // Hard-coded to true because classic UPSERT cannot be used safely with
428    // concurrent ingestions, which we need for both 0dt upgrades and
429    // multi-replica ingestions.
430    let use_continual_feedback_upsert = true;
431
432    tracing::info!(id = %source_config.id, %use_continual_feedback_upsert, "upsert operator implementation");
433
434    if use_continual_feedback_upsert {
435        upsert_continual_feedback::upsert_inner(
436            input,
437            key_indices,
438            resume_upper,
439            persist_input,
440            persist_token,
441            upsert_metrics,
442            source_config,
443            state,
444            upsert_config,
445            prevent_snapshot_buffering,
446            snapshot_buffering_max,
447        )
448    } else {
449        upsert_classic(
450            input,
451            key_indices,
452            resume_upper,
453            persist_input,
454            persist_token,
455            upsert_metrics,
456            source_config,
457            state,
458            upsert_config,
459            prevent_snapshot_buffering,
460            snapshot_buffering_max,
461        )
462    }
463}
464
465/// Renders an operator that discards updates that are known to not affect the outcome of upsert in
466/// a streaming fashion. For each distinct (key, time) in the input it emits the value with the
467/// highest from_time. Its purpose is to thin out data as much as possible before exchanging them
468/// across workers.
469fn upsert_thinning<'scope, T, K, V, FromTime>(
470    input: VecCollection<'scope, T, (K, V, FromTime), Diff>,
471) -> VecCollection<'scope, T, (K, V, FromTime), Diff>
472where
473    T: Timestamp + TotalOrder,
474    K: timely::ExchangeData + Clone + Eq + Ord,
475    V: timely::ExchangeData + Clone,
476    FromTime: Timestamp,
477{
478    input
479        .inner
480        .unary(Pipeline, "UpsertThinning", |_, _| {
481            // A capability suitable to emit all updates in `updates`, if any.
482            let mut capability: Option<InputCapability<T>> = None;
483            // A batch of received updates
484            let mut updates = Vec::new();
485            move |input, output| {
486                input.for_each(|cap, data| {
487                    assert!(
488                        data.iter().all(|(_, _, diff)| diff.is_positive()),
489                        "invalid upsert input"
490                    );
491                    updates.append(data);
492                    match capability.as_mut() {
493                        Some(capability) => {
494                            if cap.time() <= capability.time() {
495                                *capability = cap;
496                            }
497                        }
498                        None => capability = Some(cap),
499                    }
500                });
501                if let Some(capability) = capability.take() {
502                    // Sort by (key, time, Reverse(from_time)) so that deduping by (key, time) gives
503                    // the latest change for that key.
504                    updates.sort_unstable_by(|a, b| {
505                        let ((key1, _, from_time1), time1, _) = a;
506                        let ((key2, _, from_time2), time2, _) = b;
507                        Ord::cmp(
508                            &(key1, time1, Reverse(from_time1)),
509                            &(key2, time2, Reverse(from_time2)),
510                        )
511                    });
512                    let mut session = output.session(&capability);
513                    session.give_iterator(updates.drain(..).dedup_by(|a, b| {
514                        let ((key1, _, _), time1, _) = a;
515                        let ((key2, _, _), time2, _) = b;
516                        (key1, time1) == (key2, time2)
517                    }))
518                }
519            }
520        })
521        .as_collection()
522}
523
524/// Helper method for `upsert_classic` used to stage `data` updates
525/// from the input/source timely edge.
526fn stage_input<T, FromTime>(
527    stash: &mut Vec<(T, UpsertKey, Reverse<FromTime>, Option<UpsertValue>)>,
528    data: &mut Vec<((UpsertKey, Option<UpsertValue>, FromTime), T, Diff)>,
529    input_upper: &Antichain<T>,
530    resume_upper: &Antichain<T>,
531    storage_shrink_upsert_unused_buffers_by_ratio: usize,
532) where
533    T: PartialOrder,
534    FromTime: Ord,
535{
536    if PartialOrder::less_equal(input_upper, resume_upper) {
537        data.retain(|(_, ts, _)| resume_upper.less_equal(ts));
538    }
539
540    stash.extend(data.drain(..).map(|((key, value, order), time, diff)| {
541        assert!(diff.is_positive(), "invalid upsert input");
542        (time, key, Reverse(order), value)
543    }));
544
545    if storage_shrink_upsert_unused_buffers_by_ratio > 0 {
546        let reduced_capacity = stash.capacity() / storage_shrink_upsert_unused_buffers_by_ratio;
547        if reduced_capacity > stash.len() {
548            stash.shrink_to(reduced_capacity);
549        }
550    }
551}
552
553/// The style of drain we are performing on the stash. `AtTime`-drains cannot
554/// assume that all values have been seen, and must leave tombstones behind for deleted values.
555#[derive(Debug)]
556enum DrainStyle<'a, T> {
557    ToUpper(&'a Antichain<T>),
558    AtTime(T),
559}
560
561/// Helper method for `upsert_inner` used to stage `data` updates
562/// from the input timely edge.
563async fn drain_staged_input<S, T, FromTime, E>(
564    stash: &mut Vec<(T, UpsertKey, Reverse<FromTime>, Option<UpsertValue>)>,
565    commands_state: &mut indexmap::IndexMap<UpsertKey, types::UpsertValueAndSize<T, FromTime>>,
566    output_updates: &mut Vec<(UpsertValue, T, Diff)>,
567    multi_get_scratch: &mut Vec<UpsertKey>,
568    drain_style: DrainStyle<'_, T>,
569    error_emitter: &mut E,
570    state: &mut UpsertState<'_, S, T, FromTime>,
571    source_config: &crate::source::SourceExportCreationConfig,
572) where
573    S: UpsertStateBackend<T, FromTime>,
574    T: PartialOrder + Ord + Clone + Send + Sync + Serialize + Debug + 'static,
575    FromTime: timely::ExchangeData + Clone + Ord + Sync,
576    E: UpsertErrorEmitter<T>,
577{
578    stash.sort_unstable();
579
580    // Find the prefix that we can emit
581    let idx = stash.partition_point(|(ts, _, _, _)| match &drain_style {
582        DrainStyle::ToUpper(upper) => !upper.less_equal(ts),
583        DrainStyle::AtTime(time) => ts <= time,
584    });
585
586    tracing::trace!(?drain_style, updates = idx, "draining stash in upsert");
587
588    // Read the previous values _per key_ out of `state`, recording it
589    // along with the value with the _latest timestamp for that key_.
590    commands_state.clear();
591    for (_, key, _, _) in stash.iter().take(idx) {
592        commands_state.entry(*key).or_default();
593    }
594
595    // These iterators iterate in the same order because `commands_state`
596    // is an `IndexMap`.
597    multi_get_scratch.clear();
598    multi_get_scratch.extend(commands_state.iter().map(|(k, _)| *k));
599    match state
600        .multi_get(multi_get_scratch.drain(..), commands_state.values_mut())
601        .await
602    {
603        Ok(_) => {}
604        Err(e) => {
605            error_emitter
606                .emit("Failed to fetch records from state".to_string(), e)
607                .await;
608        }
609    }
610
611    // From the prefix that can be emitted we can deduplicate based on (ts, key) in
612    // order to only process the command with the maximum order within the (ts,
613    // key) group. This is achieved by wrapping order in `Reverse(FromTime)` above.;
614    let mut commands = stash.drain(..idx).dedup_by(|a, b| {
615        let ((a_ts, a_key, _, _), (b_ts, b_key, _, _)) = (a, b);
616        a_ts == b_ts && a_key == b_key
617    });
618
619    let bincode_opts = types::upsert_bincode_opts();
620    // Upsert the values into `commands_state`, by recording the latest
621    // value (or deletion). These will be synced at the end to the `state`.
622    //
623    // Note that we are effectively doing "mini-upsert" here, using
624    // `command_state`. This "mini-upsert" is seeded with data from `state`, using
625    // a single `multi_get` above, and the final state is written out into
626    // `state` using a single `multi_put`. This simplifies `UpsertStateBackend`
627    // implementations, and reduces the number of reads and write we need to do.
628    //
629    // This "mini-upsert" technique is actually useful in `UpsertState`'s
630    // `consolidate_snapshot_read_write_inner` implementation, minimizing gets and puts on
631    // the `UpsertStateBackend` implementations. In some sense, its "upsert all the way down".
632    while let Some((ts, key, from_time, value)) = commands.next() {
633        let mut command_state = if let Entry::Occupied(command_state) = commands_state.entry(key) {
634            command_state
635        } else {
636            panic!("key missing from commands_state");
637        };
638
639        let existing_value = &mut command_state.get_mut().value;
640
641        if let Some(cs) = existing_value.as_mut() {
642            cs.ensure_decoded(bincode_opts, source_config.id, Some(&key));
643        }
644
645        // Skip this command if its order key is below the one in the upsert state.
646        // Note that the existing order key may be `None` if the existing value
647        // is from snapshotting, which always sorts below new values/deletes.
648        let existing_order = existing_value
649            .as_ref()
650            .and_then(|cs| cs.provisional_order(&ts));
651        if existing_order >= Some(&from_time.0) {
652            // Skip this update. If no later updates adjust this key, then we just
653            // end up writing the same value back to state. If there
654            // is nothing in the state, `existing_order` is `None`, and this
655            // does not occur.
656            continue;
657        }
658
659        match value {
660            Some(value) => {
661                if let Some(old_value) =
662                    existing_value.replace(StateValue::finalized_value(value.clone()))
663                {
664                    if let Some(old_value) = old_value.into_decoded().finalized {
665                        output_updates.push((old_value, ts.clone(), Diff::MINUS_ONE));
666                    }
667                }
668                output_updates.push((value, ts, Diff::ONE));
669            }
670            None => {
671                if let Some(old_value) = existing_value.take() {
672                    if let Some(old_value) = old_value.into_decoded().finalized {
673                        output_updates.push((old_value, ts, Diff::MINUS_ONE));
674                    }
675                }
676
677                // Record a tombstone for deletes.
678                *existing_value = Some(StateValue::tombstone());
679            }
680        }
681    }
682
683    match state
684        .multi_put(
685            true, // Do update per-update stats.
686            commands_state.drain(..).map(|(k, cv)| {
687                (
688                    k,
689                    types::PutValue {
690                        value: cv.value.map(|cv| cv.into_decoded()),
691                        previous_value_metadata: cv.metadata.map(|v| ValueMetadata {
692                            size: v.size.try_into().expect("less than i64 size"),
693                            is_tombstone: v.is_tombstone,
694                        }),
695                    },
696                )
697            }),
698        )
699        .await
700    {
701        Ok(_) => {}
702        Err(e) => {
703            error_emitter
704                .emit("Failed to update records in state".to_string(), e)
705                .await;
706        }
707    }
708}
709
710// Created a struct to hold the configs for upserts.
711// So that new configs don't require a new method parameter.
712pub(crate) struct UpsertConfig {
713    pub shrink_upsert_unused_buffers_by_ratio: usize,
714}
715
716fn upsert_classic<'scope, T, FromTime, F, Fut, US>(
717    input: VecCollection<'scope, T, (UpsertKey, Option<UpsertValue>, FromTime), Diff>,
718    key_indices: Vec<usize>,
719    resume_upper: Antichain<T>,
720    previous: VecCollection<'scope, T, Result<Row, DataflowError>, Diff>,
721    previous_token: Option<Vec<PressOnDropButton>>,
722    upsert_metrics: UpsertMetrics,
723    source_config: crate::source::SourceExportCreationConfig,
724    state: F,
725    upsert_config: UpsertConfig,
726    prevent_snapshot_buffering: bool,
727    snapshot_buffering_max: Option<usize>,
728) -> (
729    VecCollection<'scope, T, Result<Row, DataflowError>, Diff>,
730    StreamVec<'scope, T, (Option<GlobalId>, HealthStatusUpdate)>,
731    StreamVec<'scope, T, Infallible>,
732    PressOnDropButton,
733)
734where
735    T: Timestamp + TotalOrder + Sync,
736    F: FnOnce() -> Fut + 'static,
737    Fut: std::future::Future<Output = US>,
738    US: UpsertStateBackend<T, FromTime>,
739    FromTime: timely::ExchangeData + Clone + Ord + Sync,
740{
741    let mut builder = AsyncOperatorBuilder::new("Upsert".to_string(), input.scope());
742
743    // We only care about UpsertValueError since this is the only error that we can retract
744    let previous = previous.flat_map(move |result| {
745        let value = match result {
746            Ok(ok) => Ok(ok),
747            Err(DataflowError::EnvelopeError(err)) => match *err {
748                EnvelopeError::Upsert(err) => Err(Box::new(err)),
749                EnvelopeError::Flat(_) => return None,
750            },
751            Err(_) => return None,
752        };
753        let value_ref = match value {
754            Ok(ref row) => Ok(row),
755            Err(ref err) => Err(&**err),
756        };
757        Some((UpsertKey::from_value(value_ref, &key_indices), value))
758    });
759    let (output_handle, output) = builder.new_output();
760
761    // An output that just reports progress of the snapshot consolidation process upstream to the
762    // persist source to ensure that backpressure is applied
763    let (_snapshot_handle, snapshot_stream) =
764        builder.new_output::<CapacityContainerBuilder<Vec<Infallible>>>();
765
766    let (mut health_output, health_stream) = builder.new_output();
767    let mut input = builder.new_input_for(
768        input.inner,
769        Exchange::new(move |((key, _, _), _, _)| UpsertKey::hashed(key)),
770        &output_handle,
771    );
772
773    let mut previous = builder.new_input_for(
774        previous.inner,
775        Exchange::new(|((key, _), _, _)| UpsertKey::hashed(key)),
776        &output_handle,
777    );
778
779    let upsert_shared_metrics = Arc::clone(&upsert_metrics.shared);
780    let shutdown_button = builder.build(move |caps| async move {
781        let [mut output_cap, mut snapshot_cap, health_cap]: [_; 3] = caps.try_into().unwrap();
782
783        let mut state = UpsertState::<_, _, FromTime>::new(
784            state().await,
785            upsert_shared_metrics,
786            &upsert_metrics,
787            source_config.source_statistics.clone(),
788            upsert_config.shrink_upsert_unused_buffers_by_ratio,
789        );
790        let mut events = vec![];
791        let mut snapshot_upper = Antichain::from_elem(Timestamp::minimum());
792
793        let mut stash = vec![];
794
795        let mut error_emitter = (&mut health_output, &health_cap);
796
797        tracing::info!(
798            ?resume_upper,
799            ?snapshot_upper,
800            "timely-{} upsert source {} starting rehydration",
801            source_config.worker_id,
802            source_config.id
803        );
804        // Read and consolidate the snapshot from the 'previous' input until it
805        // reaches the `resume_upper`.
806        while !PartialOrder::less_equal(&resume_upper, &snapshot_upper) {
807            previous.ready().await;
808            while let Some(event) = previous.next_sync() {
809                match event {
810                    AsyncEvent::Data(_cap, data) => {
811                        events.extend(data.into_iter().filter_map(|((key, value), ts, diff)| {
812                            if !resume_upper.less_equal(&ts) {
813                                Some((key, value, diff))
814                            } else {
815                                None
816                            }
817                        }))
818                    }
819                    AsyncEvent::Progress(upper) => {
820                        snapshot_upper = upper;
821                    }
822                };
823            }
824
825            match state
826                .consolidate_chunk(
827                    events.drain(..),
828                    PartialOrder::less_equal(&resume_upper, &snapshot_upper),
829                )
830                .await
831            {
832                Ok(_) => {
833                    if let Some(ts) = snapshot_upper.clone().into_option() {
834                        // As we shutdown, we could ostensibly get data from later than the
835                        // `resume_upper`, which we ignore above. We don't want our output capability to make
836                        // it further than the `resume_upper`.
837                        if !resume_upper.less_equal(&ts) {
838                            snapshot_cap.downgrade(&ts);
839                            output_cap.downgrade(&ts);
840                        }
841                    }
842                }
843                Err(e) => {
844                    UpsertErrorEmitter::<T>::emit(
845                        &mut error_emitter,
846                        "Failed to rehydrate state".to_string(),
847                        e,
848                    )
849                    .await;
850                }
851            }
852        }
853
854        drop(events);
855        drop(previous_token);
856        drop(snapshot_cap);
857
858        // Exchaust the previous input. It is expected to immediately reach the empty
859        // antichain since we have dropped its token.
860        //
861        // Note that we do not need to also process the `input` during this, as the dropped token
862        // will shutdown the `backpressure` operator
863        while let Some(_event) = previous.next().await {}
864
865        // After snapshotting, our output frontier is exactly the `resume_upper`
866        if let Some(ts) = resume_upper.as_option() {
867            output_cap.downgrade(ts);
868        }
869
870        tracing::info!(
871            "timely-{} upsert source {} finished rehydration",
872            source_config.worker_id,
873            source_config.id
874        );
875
876        // A re-usable buffer of changes, per key. This is an `IndexMap` because it has to be `drain`-able
877        // and have a consistent iteration order.
878        let mut commands_state: indexmap::IndexMap<_, types::UpsertValueAndSize<T, FromTime>> =
879            indexmap::IndexMap::new();
880        let mut multi_get_scratch = Vec::new();
881
882        // Now can can resume consuming the collection
883        let mut output_updates = vec![];
884        let mut input_upper = Antichain::from_elem(Timestamp::minimum());
885
886        while let Some(event) = input.next().await {
887            // Buffer as many events as possible. This should be bounded, as new data can't be
888            // produced in this worker until we yield to timely.
889            let events = [event]
890                .into_iter()
891                .chain(std::iter::from_fn(|| input.next().now_or_never().flatten()))
892                .enumerate();
893
894            let mut partial_drain_time = None;
895            for (i, event) in events {
896                match event {
897                    AsyncEvent::Data(cap, mut data) => {
898                        tracing::trace!(
899                            time=?cap.time(),
900                            updates=%data.len(),
901                            "received data in upsert"
902                        );
903                        stage_input(
904                            &mut stash,
905                            &mut data,
906                            &input_upper,
907                            &resume_upper,
908                            upsert_config.shrink_upsert_unused_buffers_by_ratio,
909                        );
910
911                        let event_time = cap.time();
912                        // If the data is at _exactly_ the output frontier, we can preemptively drain it into the state.
913                        // Data within this set events strictly beyond this time are staged as
914                        // normal.
915                        //
916                        // This is a load-bearing optimization, as it is required to avoid buffering
917                        // the entire source snapshot in the `stash`.
918                        if prevent_snapshot_buffering && output_cap.time() == event_time {
919                            partial_drain_time = Some(event_time.clone());
920                        }
921                    }
922                    AsyncEvent::Progress(upper) => {
923                        tracing::trace!(?upper, "received progress in upsert");
924                        // Ignore progress updates before the `resume_upper`, which is our initial
925                        // capability post-snapshotting.
926                        if PartialOrder::less_than(&upper, &resume_upper) {
927                            continue;
928                        }
929
930                        // Disable the partial drain as this progress event covers
931                        // the `output_cap` time.
932                        partial_drain_time = None;
933                        drain_staged_input::<_, _, _, _>(
934                            &mut stash,
935                            &mut commands_state,
936                            &mut output_updates,
937                            &mut multi_get_scratch,
938                            DrainStyle::ToUpper(&upper),
939                            &mut error_emitter,
940                            &mut state,
941                            &source_config,
942                        )
943                        .await;
944
945                        output_handle.give_container(&output_cap, &mut output_updates);
946
947                        if let Some(ts) = upper.as_option() {
948                            output_cap.downgrade(ts);
949                        }
950                        input_upper = upper;
951                    }
952                }
953                let events_processed = i + 1;
954                if let Some(max) = snapshot_buffering_max {
955                    if events_processed >= max {
956                        break;
957                    }
958                }
959            }
960
961            // If there were staged events that occurred at the capability time, drain
962            // them. This is safe because out-of-order updates to the same key that are
963            // drained in separate calls to `drain_staged_input` are correctly ordered by
964            // their `FromTime` in `drain_staged_input`.
965            //
966            // Note also that this may result in more updates in the output collection than
967            // the minimum. However, because the frontier only advances on `Progress` updates,
968            // the collection always accumulates correctly for all keys.
969            if let Some(partial_drain_time) = partial_drain_time {
970                drain_staged_input::<_, _, _, _>(
971                    &mut stash,
972                    &mut commands_state,
973                    &mut output_updates,
974                    &mut multi_get_scratch,
975                    DrainStyle::AtTime(partial_drain_time),
976                    &mut error_emitter,
977                    &mut state,
978                    &source_config,
979                )
980                .await;
981
982                output_handle.give_container(&output_cap, &mut output_updates);
983            }
984        }
985    });
986
987    (
988        output.as_collection().map(|result| match result {
989            Ok(ok) => Ok(ok),
990            Err(err) => Err(DataflowError::from(EnvelopeError::Upsert(*err))),
991        }),
992        health_stream,
993        snapshot_stream,
994        shutdown_button.press_on_drop(),
995    )
996}
997
998#[async_trait::async_trait(?Send)]
999pub(crate) trait UpsertErrorEmitter<T> {
1000    async fn emit(&mut self, context: String, e: anyhow::Error);
1001}
1002
1003#[async_trait::async_trait(?Send)]
1004impl<T: Timestamp> UpsertErrorEmitter<T>
1005    for (
1006        &mut AsyncOutputHandle<
1007            T,
1008            CapacityContainerBuilder<Vec<(Option<GlobalId>, HealthStatusUpdate)>>,
1009        >,
1010        &Capability<T>,
1011    )
1012{
1013    async fn emit(&mut self, context: String, e: anyhow::Error) {
1014        process_upsert_state_error::<T>(context, e, self.0, self.1).await
1015    }
1016}
1017
1018/// Emit the given error, and stall till the dataflow is restarted.
1019async fn process_upsert_state_error<T: Timestamp>(
1020    context: String,
1021    e: anyhow::Error,
1022    health_output: &AsyncOutputHandle<
1023        T,
1024        CapacityContainerBuilder<Vec<(Option<GlobalId>, HealthStatusUpdate)>>,
1025    >,
1026    health_cap: &Capability<T>,
1027) {
1028    let update = HealthStatusUpdate::halting(e.context(context).to_string_with_causes(), None);
1029    health_output.give(health_cap, (None, update));
1030    std::future::pending::<()>().await;
1031    unreachable!("pending future never returns");
1032}