mz_storage/
upsert.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::cell::RefCell;
11use std::cmp::Reverse;
12use std::convert::AsRef;
13use std::fmt::Debug;
14use std::hash::{Hash, Hasher};
15use std::sync::Arc;
16
17use differential_dataflow::hashable::Hashable;
18use differential_dataflow::{AsCollection, Collection};
19use futures::StreamExt;
20use futures::future::FutureExt;
21use indexmap::map::Entry;
22use itertools::Itertools;
23use mz_ore::error::ErrorExt;
24use mz_repr::{Datum, DatumVec, Diff, GlobalId, Row};
25use mz_rocksdb::ValueIterator;
26use mz_storage_operators::metrics::BackpressureMetrics;
27use mz_storage_types::configuration::StorageConfiguration;
28use mz_storage_types::dyncfgs;
29use mz_storage_types::errors::{DataflowError, EnvelopeError, UpsertError};
30use mz_storage_types::sources::envelope::UpsertEnvelope;
31use mz_timely_util::builder_async::{
32    AsyncOutputHandle, Event as AsyncEvent, OperatorBuilder as AsyncOperatorBuilder,
33    PressOnDropButton,
34};
35use serde::{Deserialize, Serialize};
36use sha2::{Digest, Sha256};
37use timely::dataflow::channels::pact::Exchange;
38use timely::dataflow::channels::pushers::Tee;
39use timely::dataflow::operators::{Capability, InputCapability, Operator};
40use timely::dataflow::{Scope, ScopeParent, Stream};
41use timely::order::{PartialOrder, TotalOrder};
42use timely::progress::timestamp::Refines;
43use timely::progress::{Antichain, Timestamp};
44
45use crate::healthcheck::HealthStatusUpdate;
46use crate::metrics::upsert::UpsertMetrics;
47use crate::storage_state::StorageInstanceContext;
48use crate::upsert_continual_feedback;
49use autospill::AutoSpillBackend;
50use memory::InMemoryHashMap;
51use types::{
52    BincodeOpts, StateValue, UpsertState, UpsertStateBackend, Value, consolidating_merge_function,
53    upsert_bincode_opts,
54};
55
56mod autospill;
57mod memory;
58mod rocksdb;
59// TODO(aljoscha): Move next to upsert module, rename to upsert_types.
60pub(crate) mod types;
61
62pub type UpsertValue = Result<Row, UpsertError>;
63
64#[derive(Copy, Clone, Debug, Hash, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
65pub struct UpsertKey([u8; 32]);
66
67impl AsRef<[u8]> for UpsertKey {
68    #[inline(always)]
69    // Note we do 1 `multi_get` and 1 `multi_put` while processing a _batch of updates_. Within the
70    // batch, we effectively consolidate each key, before persisting that consolidated value.
71    // Easy!!
72    fn as_ref(&self) -> &[u8] {
73        &self.0
74    }
75}
76
77impl From<&[u8]> for UpsertKey {
78    fn from(bytes: &[u8]) -> Self {
79        UpsertKey(bytes.try_into().expect("invalid key length"))
80    }
81}
82
83/// The hash function used to map upsert keys. It is important that this hash is a cryptographic
84/// hash so that there is no risk of collisions. Collisions on SHA256 have a probability of 2^128
85/// which is many orders of magnitude smaller than many other events that we don't even think about
86/// (e.g bit flips). In short, we can safely assume that sha256(a) == sha256(b) iff a == b.
87type KeyHash = Sha256;
88
89impl UpsertKey {
90    pub fn from_key(key: Result<&Row, &UpsertError>) -> Self {
91        Self::from_iter(key.map(|r| r.iter()))
92    }
93
94    pub fn from_value(value: Result<&Row, &UpsertError>, key_indices: &[usize]) -> Self {
95        thread_local! {
96            /// A thread-local datum cache used to calculate hashes
97            static VALUE_DATUMS: RefCell<DatumVec> = RefCell::new(DatumVec::new());
98        }
99        VALUE_DATUMS.with(|value_datums| {
100            let mut value_datums = value_datums.borrow_mut();
101            let value = value.map(|v| value_datums.borrow_with(v));
102            let key = match value {
103                Ok(ref datums) => Ok(key_indices.iter().map(|&idx| datums[idx])),
104                Err(err) => Err(err),
105            };
106            Self::from_iter(key)
107        })
108    }
109
110    pub fn from_iter<'a, 'b>(
111        key: Result<impl Iterator<Item = Datum<'a>> + 'b, &UpsertError>,
112    ) -> Self {
113        thread_local! {
114            /// A thread-local datum cache used to calculate hashes
115            static KEY_DATUMS: RefCell<DatumVec> = RefCell::new(DatumVec::new());
116        }
117        KEY_DATUMS.with(|key_datums| {
118            let mut key_datums = key_datums.borrow_mut();
119            // Borrowing the DatumVec gives us a temporary buffer to store datums in that will be
120            // automatically cleared on Drop. See the DatumVec docs for more details.
121            let mut key_datums = key_datums.borrow();
122            let key: Result<&[Datum], Datum> = match key {
123                Ok(key) => {
124                    for datum in key {
125                        key_datums.push(datum);
126                    }
127                    Ok(&*key_datums)
128                }
129                Err(UpsertError::Value(err)) => {
130                    key_datums.extend(err.for_key.iter());
131                    Ok(&*key_datums)
132                }
133                Err(UpsertError::KeyDecode(err)) => Err(Datum::Bytes(&err.raw)),
134                Err(UpsertError::NullKey(_)) => Err(Datum::Null),
135            };
136            let mut hasher = DigestHasher(KeyHash::new());
137            key.hash(&mut hasher);
138            Self(hasher.0.finalize().into())
139        })
140    }
141}
142
143struct DigestHasher<H: Digest>(H);
144
145impl<H: Digest> Hasher for DigestHasher<H> {
146    fn write(&mut self, bytes: &[u8]) {
147        self.0.update(bytes);
148    }
149
150    fn finish(&self) -> u64 {
151        panic!("digest wrapper used to produce a hash");
152    }
153}
154
155use std::convert::Infallible;
156use timely::container::CapacityContainerBuilder;
157use timely::dataflow::channels::pact::Pipeline;
158
159use self::types::ValueMetadata;
160
161/// This leaf operator drops `token` after the input reaches the `resume_upper`.
162/// This is useful to take coordinated actions across all workers, after the `upsert`
163/// operator has rehydrated.
164pub fn rehydration_finished<G, T>(
165    scope: G,
166    source_config: &crate::source::RawSourceCreationConfig,
167    // A token that we can drop to signal we are finished rehydrating.
168    token: impl std::any::Any + 'static,
169    resume_upper: Antichain<T>,
170    input: &Stream<G, Infallible>,
171) where
172    G: Scope<Timestamp = T>,
173    T: Timestamp,
174{
175    let worker_id = source_config.worker_id;
176    let id = source_config.id;
177    let mut builder = AsyncOperatorBuilder::new(format!("rehydration_finished({id}"), scope);
178    let mut input = builder.new_disconnected_input(input, Pipeline);
179
180    builder.build(move |_capabilities| async move {
181        let mut input_upper = Antichain::from_elem(Timestamp::minimum());
182        // Ensure this operator finishes if the resume upper is `[0]`
183        while !PartialOrder::less_equal(&resume_upper, &input_upper) {
184            let Some(event) = input.next().await else {
185                break;
186            };
187            if let AsyncEvent::Progress(upper) = event {
188                input_upper = upper;
189            }
190        }
191        tracing::info!(
192            %worker_id,
193            source_id = %id,
194            "upsert source has downgraded past the resume upper ({resume_upper:?}) across all workers",
195        );
196        drop(token);
197    });
198}
199
200/// Resumes an upsert computation at `resume_upper` given as inputs a collection of upsert commands
201/// and the collection of the previous output of this operator.
202/// Returns a tuple of
203/// - A collection of the computed upsert operator and,
204/// - A health update stream to propagate errors
205pub(crate) fn upsert<G: Scope, FromTime>(
206    input: &Collection<G, (UpsertKey, Option<UpsertValue>, FromTime), Diff>,
207    upsert_envelope: UpsertEnvelope,
208    resume_upper: Antichain<G::Timestamp>,
209    previous: Collection<G, Result<Row, DataflowError>, Diff>,
210    previous_token: Option<Vec<PressOnDropButton>>,
211    source_config: crate::source::SourceExportCreationConfig,
212    instance_context: &StorageInstanceContext,
213    storage_configuration: &StorageConfiguration,
214    dataflow_paramters: &crate::internal_control::DataflowParameters,
215    backpressure_metrics: Option<BackpressureMetrics>,
216) -> (
217    Collection<G, Result<Row, DataflowError>, Diff>,
218    Stream<G, (Option<GlobalId>, HealthStatusUpdate)>,
219    Stream<G, Infallible>,
220    PressOnDropButton,
221)
222where
223    G::Timestamp: TotalOrder + Sync,
224    G::Timestamp: Refines<mz_repr::Timestamp> + TotalOrder + Sync,
225    FromTime: Timestamp + Sync,
226{
227    let upsert_metrics = source_config.metrics.get_upsert_metrics(
228        source_config.id,
229        source_config.worker_id,
230        backpressure_metrics,
231    );
232
233    let rocksdb_cleanup_tries =
234        dyncfgs::STORAGE_ROCKSDB_CLEANUP_TRIES.get(storage_configuration.config_set());
235
236    // Whether or not to partially drain the input buffer
237    // to prevent buffering of the _upstream_ snapshot.
238    let prevent_snapshot_buffering =
239        dyncfgs::STORAGE_UPSERT_PREVENT_SNAPSHOT_BUFFERING.get(storage_configuration.config_set());
240    // If the above is true, the number of timely batches to process at once.
241    let snapshot_buffering_max = dyncfgs::STORAGE_UPSERT_MAX_SNAPSHOT_BATCH_BUFFERING
242        .get(storage_configuration.config_set());
243
244    // Whether we should provide the upsert state merge operator to the RocksDB instance
245    // (for faster performance during snapshot hydration).
246    let rocksdb_use_native_merge_operator =
247        dyncfgs::STORAGE_ROCKSDB_USE_MERGE_OPERATOR.get(storage_configuration.config_set());
248
249    let upsert_config = UpsertConfig {
250        shrink_upsert_unused_buffers_by_ratio: storage_configuration
251            .parameters
252            .shrink_upsert_unused_buffers_by_ratio,
253    };
254
255    let thin_input = upsert_thinning(input);
256
257    if let Some(scratch_directory) = instance_context.scratch_directory.as_ref() {
258        let tuning = dataflow_paramters.upsert_rocksdb_tuning_config.clone();
259
260        let allow_auto_spill = storage_configuration
261            .parameters
262            .upsert_auto_spill_config
263            .allow_spilling_to_disk;
264        let spill_threshold = storage_configuration
265            .parameters
266            .upsert_auto_spill_config
267            .spill_to_disk_threshold_bytes;
268
269        tracing::info!(
270            worker_id = %source_config.worker_id,
271            source_id = %source_config.id,
272            ?tuning,
273            ?storage_configuration.parameters.upsert_auto_spill_config,
274            ?rocksdb_use_native_merge_operator,
275            "rendering upsert source with rocksdb-backed upsert state"
276        );
277        let rocksdb_shared_metrics = Arc::clone(&upsert_metrics.rocksdb_shared);
278        let rocksdb_instance_metrics = Arc::clone(&upsert_metrics.rocksdb_instance_metrics);
279        let rocksdb_dir = scratch_directory
280            .join("storage")
281            .join("upsert")
282            .join(source_config.id.to_string())
283            .join(source_config.worker_id.to_string());
284
285        let env = instance_context.rocksdb_env.clone();
286
287        let rocksdb_in_use_metric = Arc::clone(&upsert_metrics.rocksdb_autospill_in_use);
288
289        // A closure that will initialize and return a configured RocksDB instance
290        let rocksdb_init_fn = move || async move {
291            let merge_operator =
292                if rocksdb_use_native_merge_operator {
293                    Some((
294                        "upsert_state_snapshot_merge_v1".to_string(),
295                        |a: &[u8],
296                         b: ValueIterator<
297                            BincodeOpts,
298                            StateValue<G::Timestamp, Option<FromTime>>,
299                        >| {
300                            consolidating_merge_function::<G::Timestamp, Option<FromTime>>(
301                                a.into(),
302                                b,
303                            )
304                        },
305                    ))
306                } else {
307                    None
308                };
309            rocksdb::RocksDB::new(
310                mz_rocksdb::RocksDBInstance::new(
311                    &rocksdb_dir,
312                    mz_rocksdb::InstanceOptions::new(
313                        env,
314                        rocksdb_cleanup_tries,
315                        merge_operator,
316                        // For now, just use the same config as the one used for
317                        // merging snapshots.
318                        upsert_bincode_opts(),
319                    ),
320                    tuning,
321                    rocksdb_shared_metrics,
322                    rocksdb_instance_metrics,
323                )
324                .await
325                .unwrap(),
326            )
327        };
328
329        // TODO(aljoscha): I don't like how we have basically the same call
330        // three times here, but it's hard working around those impl Futures
331        // that return an impl Trait. Oh well...
332        if allow_auto_spill {
333            upsert_operator(
334                &thin_input,
335                upsert_envelope.key_indices,
336                resume_upper,
337                previous,
338                previous_token,
339                upsert_metrics,
340                source_config,
341                move || async move {
342                    AutoSpillBackend::new(rocksdb_init_fn, spill_threshold, rocksdb_in_use_metric)
343                },
344                upsert_config,
345                storage_configuration,
346                prevent_snapshot_buffering,
347                snapshot_buffering_max,
348            )
349        } else {
350            upsert_operator(
351                &thin_input,
352                upsert_envelope.key_indices,
353                resume_upper,
354                previous,
355                previous_token,
356                upsert_metrics,
357                source_config,
358                rocksdb_init_fn,
359                upsert_config,
360                storage_configuration,
361                prevent_snapshot_buffering,
362                snapshot_buffering_max,
363            )
364        }
365    } else {
366        tracing::info!(
367            worker_id = %source_config.worker_id,
368            source_id = %source_config.id,
369            "rendering upsert source with memory-backed upsert state",
370        );
371        upsert_operator(
372            &thin_input,
373            upsert_envelope.key_indices,
374            resume_upper,
375            previous,
376            previous_token,
377            upsert_metrics,
378            source_config,
379            || async { InMemoryHashMap::default() },
380            upsert_config,
381            storage_configuration,
382            prevent_snapshot_buffering,
383            snapshot_buffering_max,
384        )
385    }
386}
387
388// A shim so we can dispatch based on the dyncfg that tells us which upsert
389// operator to use.
390fn upsert_operator<G: Scope, FromTime, F, Fut, US>(
391    input: &Collection<G, (UpsertKey, Option<UpsertValue>, FromTime), Diff>,
392    key_indices: Vec<usize>,
393    resume_upper: Antichain<G::Timestamp>,
394    persist_input: Collection<G, Result<Row, DataflowError>, Diff>,
395    persist_token: Option<Vec<PressOnDropButton>>,
396    upsert_metrics: UpsertMetrics,
397    source_config: crate::source::SourceExportCreationConfig,
398    state: F,
399    upsert_config: UpsertConfig,
400    _storage_configuration: &StorageConfiguration,
401    prevent_snapshot_buffering: bool,
402    snapshot_buffering_max: Option<usize>,
403) -> (
404    Collection<G, Result<Row, DataflowError>, Diff>,
405    Stream<G, (Option<GlobalId>, HealthStatusUpdate)>,
406    Stream<G, Infallible>,
407    PressOnDropButton,
408)
409where
410    G::Timestamp: TotalOrder + Sync,
411    G::Timestamp: Refines<mz_repr::Timestamp> + TotalOrder + Sync,
412    F: FnOnce() -> Fut + 'static,
413    Fut: std::future::Future<Output = US>,
414    US: UpsertStateBackend<G::Timestamp, Option<FromTime>>,
415    FromTime: Debug + timely::ExchangeData + Ord + Sync,
416{
417    // Hard-coded to true because classic UPSERT cannot be used safely with
418    // concurrent ingestions, which we need for both 0dt upgrades and
419    // multi-replica ingestions.
420    let use_continual_feedback_upsert = true;
421
422    tracing::info!(id = %source_config.id, %use_continual_feedback_upsert, "upsert operator implementation");
423
424    if use_continual_feedback_upsert {
425        upsert_continual_feedback::upsert_inner(
426            input,
427            key_indices,
428            resume_upper,
429            persist_input,
430            persist_token,
431            upsert_metrics,
432            source_config,
433            state,
434            upsert_config,
435            prevent_snapshot_buffering,
436            snapshot_buffering_max,
437        )
438    } else {
439        upsert_classic(
440            input,
441            key_indices,
442            resume_upper,
443            persist_input,
444            persist_token,
445            upsert_metrics,
446            source_config,
447            state,
448            upsert_config,
449            prevent_snapshot_buffering,
450            snapshot_buffering_max,
451        )
452    }
453}
454
455/// Renders an operator that discards updates that are known to not affect the outcome of upsert in
456/// a streaming fashion. For each distinct (key, time) in the input it emits the value with the
457/// highest from_time. Its purpose is to thin out data as much as possible before exchanging them
458/// across workers.
459fn upsert_thinning<G, K, V, FromTime>(
460    input: &Collection<G, (K, V, FromTime), Diff>,
461) -> Collection<G, (K, V, FromTime), Diff>
462where
463    G: Scope,
464    G::Timestamp: TotalOrder,
465    K: timely::Data + Eq + Ord,
466    V: timely::Data,
467    FromTime: Timestamp,
468{
469    input
470        .inner
471        .unary(Pipeline, "UpsertThinning", |_, _| {
472            // A capability suitable to emit all updates in `updates`, if any.
473            let mut capability: Option<InputCapability<G::Timestamp>> = None;
474            // A batch of received updates
475            let mut updates = Vec::new();
476            move |input, output| {
477                while let Some((cap, data)) = input.next() {
478                    assert!(
479                        data.iter().all(|(_, _, diff)| diff.is_positive()),
480                        "invalid upsert input"
481                    );
482                    updates.append(data);
483                    match capability.as_mut() {
484                        Some(capability) => {
485                            if cap.time() <= capability.time() {
486                                *capability = cap;
487                            }
488                        }
489                        None => capability = Some(cap),
490                    }
491                }
492                if let Some(capability) = capability.take() {
493                    // Sort by (key, time, Reverse(from_time)) so that deduping by (key, time) gives
494                    // the latest change for that key.
495                    updates.sort_unstable_by(|a, b| {
496                        let ((key1, _, from_time1), time1, _) = a;
497                        let ((key2, _, from_time2), time2, _) = b;
498                        Ord::cmp(
499                            &(key1, time1, Reverse(from_time1)),
500                            &(key2, time2, Reverse(from_time2)),
501                        )
502                    });
503                    let mut session = output.session(&capability);
504                    session.give_iterator(updates.drain(..).dedup_by(|a, b| {
505                        let ((key1, _, _), time1, _) = a;
506                        let ((key2, _, _), time2, _) = b;
507                        (key1, time1) == (key2, time2)
508                    }))
509                }
510            }
511        })
512        .as_collection()
513}
514
515/// Helper method for `upsert_classic` used to stage `data` updates
516/// from the input/source timely edge.
517fn stage_input<T, FromTime>(
518    stash: &mut Vec<(T, UpsertKey, Reverse<FromTime>, Option<UpsertValue>)>,
519    data: &mut Vec<((UpsertKey, Option<UpsertValue>, FromTime), T, Diff)>,
520    input_upper: &Antichain<T>,
521    resume_upper: &Antichain<T>,
522    storage_shrink_upsert_unused_buffers_by_ratio: usize,
523) where
524    T: PartialOrder,
525    FromTime: Ord,
526{
527    if PartialOrder::less_equal(input_upper, resume_upper) {
528        data.retain(|(_, ts, _)| resume_upper.less_equal(ts));
529    }
530
531    stash.extend(data.drain(..).map(|((key, value, order), time, diff)| {
532        assert!(diff.is_positive(), "invalid upsert input");
533        (time, key, Reverse(order), value)
534    }));
535
536    if storage_shrink_upsert_unused_buffers_by_ratio > 0 {
537        let reduced_capacity = stash.capacity() / storage_shrink_upsert_unused_buffers_by_ratio;
538        if reduced_capacity > stash.len() {
539            stash.shrink_to(reduced_capacity);
540        }
541    }
542}
543
544/// The style of drain we are performing on the stash. `AtTime`-drains cannot
545/// assume that all values have been seen, and must leave tombstones behind for deleted values.
546#[derive(Debug)]
547enum DrainStyle<'a, T> {
548    ToUpper(&'a Antichain<T>),
549    AtTime(T),
550}
551
552/// Helper method for `upsert_inner` used to stage `data` updates
553/// from the input timely edge.
554async fn drain_staged_input<S, G, T, FromTime, E>(
555    stash: &mut Vec<(T, UpsertKey, Reverse<FromTime>, Option<UpsertValue>)>,
556    commands_state: &mut indexmap::IndexMap<
557        UpsertKey,
558        types::UpsertValueAndSize<T, Option<FromTime>>,
559    >,
560    output_updates: &mut Vec<(Result<Row, UpsertError>, T, Diff)>,
561    multi_get_scratch: &mut Vec<UpsertKey>,
562    drain_style: DrainStyle<'_, T>,
563    error_emitter: &mut E,
564    state: &mut UpsertState<'_, S, T, Option<FromTime>>,
565) where
566    S: UpsertStateBackend<T, Option<FromTime>>,
567    G: Scope,
568    T: PartialOrder + Ord + Clone + Send + Sync + Serialize + Debug + 'static,
569    FromTime: timely::ExchangeData + Ord + Sync,
570    E: UpsertErrorEmitter<G>,
571{
572    stash.sort_unstable();
573
574    // Find the prefix that we can emit
575    let idx = stash.partition_point(|(ts, _, _, _)| match &drain_style {
576        DrainStyle::ToUpper(upper) => !upper.less_equal(ts),
577        DrainStyle::AtTime(time) => ts <= time,
578    });
579
580    tracing::trace!(?drain_style, updates = idx, "draining stash in upsert");
581
582    // Read the previous values _per key_ out of `state`, recording it
583    // along with the value with the _latest timestamp for that key_.
584    commands_state.clear();
585    for (_, key, _, _) in stash.iter().take(idx) {
586        commands_state.entry(*key).or_default();
587    }
588
589    // These iterators iterate in the same order because `commands_state`
590    // is an `IndexMap`.
591    multi_get_scratch.clear();
592    multi_get_scratch.extend(commands_state.iter().map(|(k, _)| *k));
593    match state
594        .multi_get(multi_get_scratch.drain(..), commands_state.values_mut())
595        .await
596    {
597        Ok(_) => {}
598        Err(e) => {
599            error_emitter
600                .emit("Failed to fetch records from state".to_string(), e)
601                .await;
602        }
603    }
604
605    // From the prefix that can be emitted we can deduplicate based on (ts, key) in
606    // order to only process the command with the maximum order within the (ts,
607    // key) group. This is achieved by wrapping order in `Reverse(FromTime)` above.;
608    let mut commands = stash.drain(..idx).dedup_by(|a, b| {
609        let ((a_ts, a_key, _, _), (b_ts, b_key, _, _)) = (a, b);
610        a_ts == b_ts && a_key == b_key
611    });
612
613    let bincode_opts = types::upsert_bincode_opts();
614    // Upsert the values into `commands_state`, by recording the latest
615    // value (or deletion). These will be synced at the end to the `state`.
616    //
617    // Note that we are effectively doing "mini-upsert" here, using
618    // `command_state`. This "mini-upsert" is seeded with data from `state`, using
619    // a single `multi_get` above, and the final state is written out into
620    // `state` using a single `multi_put`. This simplifies `UpsertStateBackend`
621    // implementations, and reduces the number of reads and write we need to do.
622    //
623    // This "mini-upsert" technique is actually useful in `UpsertState`'s
624    // `consolidate_snapshot_read_write_inner` implementation, minimizing gets and puts on
625    // the `UpsertStateBackend` implementations. In some sense, its "upsert all the way down".
626    while let Some((ts, key, from_time, value)) = commands.next() {
627        let mut command_state = if let Entry::Occupied(command_state) = commands_state.entry(key) {
628            command_state
629        } else {
630            panic!("key missing from commands_state");
631        };
632
633        let existing_value = &mut command_state.get_mut().value;
634
635        if let Some(cs) = existing_value.as_mut() {
636            cs.ensure_decoded(bincode_opts);
637        }
638
639        // Skip this command if its order key is below the one in the upsert state.
640        // Note that the existing order key may be `None` if the existing value
641        // is from snapshotting, which always sorts below new values/deletes.
642        let existing_order = existing_value.as_ref().and_then(|cs| cs.order().as_ref());
643        if existing_order >= Some(&from_time.0) {
644            // Skip this update. If no later updates adjust this key, then we just
645            // end up writing the same value back to state. If there
646            // is nothing in the state, `existing_order` is `None`, and this
647            // does not occur.
648            continue;
649        }
650
651        match value {
652            Some(value) => {
653                if let Some(old_value) = existing_value.replace(StateValue::finalized_value(
654                    value.clone(),
655                    Some(from_time.0.clone()),
656                )) {
657                    if let Value::FinalizedValue(old_value, _) = old_value.into_decoded() {
658                        output_updates.push((old_value, ts.clone(), Diff::MINUS_ONE));
659                    }
660                }
661                output_updates.push((value, ts, Diff::ONE));
662            }
663            None => {
664                if let Some(old_value) = existing_value.take() {
665                    if let Value::FinalizedValue(old_value, _) = old_value.into_decoded() {
666                        output_updates.push((old_value, ts, Diff::MINUS_ONE));
667                    }
668                }
669
670                // Record a tombstone for deletes.
671                *existing_value = Some(StateValue::tombstone(Some(from_time.0.clone())));
672            }
673        }
674    }
675
676    match state
677        .multi_put(
678            true, // Do update per-update stats.
679            commands_state.drain(..).map(|(k, cv)| {
680                (
681                    k,
682                    types::PutValue {
683                        value: cv.value.map(|cv| cv.into_decoded()),
684                        previous_value_metadata: cv.metadata.map(|v| ValueMetadata {
685                            size: v.size.try_into().expect("less than i64 size"),
686                            is_tombstone: v.is_tombstone,
687                        }),
688                    },
689                )
690            }),
691        )
692        .await
693    {
694        Ok(_) => {}
695        Err(e) => {
696            error_emitter
697                .emit("Failed to update records in state".to_string(), e)
698                .await;
699        }
700    }
701}
702
703// Created a struct to hold the configs for upserts.
704// So that new configs don't require a new method parameter.
705pub(crate) struct UpsertConfig {
706    pub shrink_upsert_unused_buffers_by_ratio: usize,
707}
708
709fn upsert_classic<G: Scope, FromTime, F, Fut, US>(
710    input: &Collection<G, (UpsertKey, Option<UpsertValue>, FromTime), Diff>,
711    key_indices: Vec<usize>,
712    resume_upper: Antichain<G::Timestamp>,
713    previous: Collection<G, Result<Row, DataflowError>, Diff>,
714    previous_token: Option<Vec<PressOnDropButton>>,
715    upsert_metrics: UpsertMetrics,
716    source_config: crate::source::SourceExportCreationConfig,
717    state: F,
718    upsert_config: UpsertConfig,
719    prevent_snapshot_buffering: bool,
720    snapshot_buffering_max: Option<usize>,
721) -> (
722    Collection<G, Result<Row, DataflowError>, Diff>,
723    Stream<G, (Option<GlobalId>, HealthStatusUpdate)>,
724    Stream<G, Infallible>,
725    PressOnDropButton,
726)
727where
728    G::Timestamp: TotalOrder + Sync,
729    F: FnOnce() -> Fut + 'static,
730    Fut: std::future::Future<Output = US>,
731    US: UpsertStateBackend<G::Timestamp, Option<FromTime>>,
732    FromTime: timely::ExchangeData + Ord + Sync,
733{
734    let mut builder = AsyncOperatorBuilder::new("Upsert".to_string(), input.scope());
735
736    // We only care about UpsertValueError since this is the only error that we can retract
737    let previous = previous.flat_map(move |result| {
738        let value = match result {
739            Ok(ok) => Ok(ok),
740            Err(DataflowError::EnvelopeError(err)) => match *err {
741                EnvelopeError::Upsert(err) => Err(err),
742                _ => return None,
743            },
744            Err(_) => return None,
745        };
746        Some((UpsertKey::from_value(value.as_ref(), &key_indices), value))
747    });
748    let (output_handle, output) = builder.new_output();
749
750    // An output that just reports progress of the snapshot consolidation process upstream to the
751    // persist source to ensure that backpressure is applied
752    let (_snapshot_handle, snapshot_stream) =
753        builder.new_output::<CapacityContainerBuilder<Vec<Infallible>>>();
754
755    let (mut health_output, health_stream) = builder.new_output();
756    let mut input = builder.new_input_for(
757        &input.inner,
758        Exchange::new(move |((key, _, _), _, _)| UpsertKey::hashed(key)),
759        &output_handle,
760    );
761
762    let mut previous = builder.new_input_for(
763        &previous.inner,
764        Exchange::new(|((key, _), _, _)| UpsertKey::hashed(key)),
765        &output_handle,
766    );
767
768    let upsert_shared_metrics = Arc::clone(&upsert_metrics.shared);
769    let shutdown_button = builder.build(move |caps| async move {
770        let [mut output_cap, mut snapshot_cap, health_cap]: [_; 3] = caps.try_into().unwrap();
771
772        // The order key of the `UpsertState` is `Option<FromTime>`, which implements `Default`
773        // (as required for `consolidate_chunk`), with slightly more efficient serialization
774        // than a default `Partitioned`.
775        let mut state = UpsertState::<_, _, Option<FromTime>>::new(
776            state().await,
777            upsert_shared_metrics,
778            &upsert_metrics,
779            source_config.source_statistics,
780            upsert_config.shrink_upsert_unused_buffers_by_ratio,
781        );
782        let mut events = vec![];
783        let mut snapshot_upper = Antichain::from_elem(Timestamp::minimum());
784
785        let mut stash = vec![];
786
787        let mut error_emitter = (&mut health_output, &health_cap);
788
789        tracing::info!(
790            ?resume_upper,
791            ?snapshot_upper,
792            "timely-{} upsert source {} starting rehydration",
793            source_config.worker_id,
794            source_config.id
795        );
796        // Read and consolidate the snapshot from the 'previous' input until it
797        // reaches the `resume_upper`.
798        while !PartialOrder::less_equal(&resume_upper, &snapshot_upper) {
799            previous.ready().await;
800            while let Some(event) = previous.next_sync() {
801                match event {
802                    AsyncEvent::Data(_cap, data) => {
803                        events.extend(data.into_iter().filter_map(|((key, value), ts, diff)| {
804                            if !resume_upper.less_equal(&ts) {
805                                Some((key, value, diff))
806                            } else {
807                                None
808                            }
809                        }))
810                    }
811                    AsyncEvent::Progress(upper) => {
812                        snapshot_upper = upper;
813                    }
814                };
815            }
816
817            match state
818                .consolidate_chunk(
819                    events.drain(..),
820                    PartialOrder::less_equal(&resume_upper, &snapshot_upper),
821                )
822                .await
823            {
824                Ok(_) => {
825                    if let Some(ts) = snapshot_upper.clone().into_option() {
826                        // As we shutdown, we could ostensibly get data from later than the
827                        // `resume_upper`, which we ignore above. We don't want our output capability to make
828                        // it further than the `resume_upper`.
829                        if !resume_upper.less_equal(&ts) {
830                            snapshot_cap.downgrade(&ts);
831                            output_cap.downgrade(&ts);
832                        }
833                    }
834                }
835                Err(e) => {
836                    UpsertErrorEmitter::<G>::emit(
837                        &mut error_emitter,
838                        "Failed to rehydrate state".to_string(),
839                        e,
840                    )
841                    .await;
842                }
843            }
844        }
845
846        drop(events);
847        drop(previous_token);
848        drop(snapshot_cap);
849
850        // Exchaust the previous input. It is expected to immediately reach the empty
851        // antichain since we have dropped its token.
852        //
853        // Note that we do not need to also process the `input` during this, as the dropped token
854        // will shutdown the `backpressure` operator
855        while let Some(_event) = previous.next().await {}
856
857        // After snapshotting, our output frontier is exactly the `resume_upper`
858        if let Some(ts) = resume_upper.as_option() {
859            output_cap.downgrade(ts);
860        }
861
862        tracing::info!(
863            "timely-{} upsert source {} finished rehydration",
864            source_config.worker_id,
865            source_config.id
866        );
867
868        // A re-usable buffer of changes, per key. This is an `IndexMap` because it has to be `drain`-able
869        // and have a consistent iteration order.
870        let mut commands_state: indexmap::IndexMap<
871            _,
872            types::UpsertValueAndSize<G::Timestamp, Option<FromTime>>,
873        > = indexmap::IndexMap::new();
874        let mut multi_get_scratch = Vec::new();
875
876        // Now can can resume consuming the collection
877        let mut output_updates = vec![];
878        let mut input_upper = Antichain::from_elem(Timestamp::minimum());
879
880        while let Some(event) = input.next().await {
881            // Buffer as many events as possible. This should be bounded, as new data can't be
882            // produced in this worker until we yield to timely.
883            let events = [event]
884                .into_iter()
885                .chain(std::iter::from_fn(|| input.next().now_or_never().flatten()))
886                .enumerate();
887
888            let mut partial_drain_time = None;
889            for (i, event) in events {
890                match event {
891                    AsyncEvent::Data(cap, mut data) => {
892                        tracing::trace!(
893                            time=?cap.time(),
894                            updates=%data.len(),
895                            "received data in upsert"
896                        );
897                        stage_input(
898                            &mut stash,
899                            &mut data,
900                            &input_upper,
901                            &resume_upper,
902                            upsert_config.shrink_upsert_unused_buffers_by_ratio,
903                        );
904
905                        let event_time = cap.time();
906                        // If the data is at _exactly_ the output frontier, we can preemptively drain it into the state.
907                        // Data within this set events strictly beyond this time are staged as
908                        // normal.
909                        //
910                        // This is a load-bearing optimization, as it is required to avoid buffering
911                        // the entire source snapshot in the `stash`.
912                        if prevent_snapshot_buffering && output_cap.time() == event_time {
913                            partial_drain_time = Some(event_time.clone());
914                        }
915                    }
916                    AsyncEvent::Progress(upper) => {
917                        tracing::trace!(?upper, "received progress in upsert");
918                        // Ignore progress updates before the `resume_upper`, which is our initial
919                        // capability post-snapshotting.
920                        if PartialOrder::less_than(&upper, &resume_upper) {
921                            continue;
922                        }
923
924                        // Disable the partial drain as this progress event covers
925                        // the `output_cap` time.
926                        partial_drain_time = None;
927                        drain_staged_input::<_, G, _, _, _>(
928                            &mut stash,
929                            &mut commands_state,
930                            &mut output_updates,
931                            &mut multi_get_scratch,
932                            DrainStyle::ToUpper(&upper),
933                            &mut error_emitter,
934                            &mut state,
935                        )
936                        .await;
937
938                        output_handle.give_container(&output_cap, &mut output_updates);
939
940                        if let Some(ts) = upper.as_option() {
941                            output_cap.downgrade(ts);
942                        }
943                        input_upper = upper;
944                    }
945                }
946                let events_processed = i + 1;
947                if let Some(max) = snapshot_buffering_max {
948                    if events_processed >= max {
949                        break;
950                    }
951                }
952            }
953
954            // If there were staged events that occurred at the capability time, drain
955            // them. This is safe because out-of-order updates to the same key that are
956            // drained in separate calls to `drain_staged_input` are correctly ordered by
957            // their `FromTime` in `drain_staged_input`.
958            //
959            // Note also that this may result in more updates in the output collection than
960            // the minimum. However, because the frontier only advances on `Progress` updates,
961            // the collection always accumulates correctly for all keys.
962            if let Some(partial_drain_time) = partial_drain_time {
963                drain_staged_input::<_, G, _, _, _>(
964                    &mut stash,
965                    &mut commands_state,
966                    &mut output_updates,
967                    &mut multi_get_scratch,
968                    DrainStyle::AtTime(partial_drain_time),
969                    &mut error_emitter,
970                    &mut state,
971                )
972                .await;
973
974                output_handle.give_container(&output_cap, &mut output_updates);
975            }
976        }
977    });
978
979    (
980        output.as_collection().map(|result| match result {
981            Ok(ok) => Ok(ok),
982            Err(err) => Err(DataflowError::from(EnvelopeError::Upsert(err))),
983        }),
984        health_stream,
985        snapshot_stream,
986        shutdown_button.press_on_drop(),
987    )
988}
989
990#[async_trait::async_trait(?Send)]
991pub(crate) trait UpsertErrorEmitter<G> {
992    async fn emit(&mut self, context: String, e: anyhow::Error);
993}
994
995#[async_trait::async_trait(?Send)]
996impl<G: Scope> UpsertErrorEmitter<G>
997    for (
998        &mut AsyncOutputHandle<
999            <G as ScopeParent>::Timestamp,
1000            CapacityContainerBuilder<Vec<(Option<GlobalId>, HealthStatusUpdate)>>,
1001            Tee<<G as ScopeParent>::Timestamp, Vec<(Option<GlobalId>, HealthStatusUpdate)>>,
1002        >,
1003        &Capability<<G as ScopeParent>::Timestamp>,
1004    )
1005{
1006    async fn emit(&mut self, context: String, e: anyhow::Error) {
1007        process_upsert_state_error::<G>(context, e, self.0, self.1).await
1008    }
1009}
1010
1011/// Emit the given error, and stall till the dataflow is restarted.
1012async fn process_upsert_state_error<G: Scope>(
1013    context: String,
1014    e: anyhow::Error,
1015    health_output: &AsyncOutputHandle<
1016        <G as ScopeParent>::Timestamp,
1017        CapacityContainerBuilder<Vec<(Option<GlobalId>, HealthStatusUpdate)>>,
1018        Tee<<G as ScopeParent>::Timestamp, Vec<(Option<GlobalId>, HealthStatusUpdate)>>,
1019    >,
1020    health_cap: &Capability<<G as ScopeParent>::Timestamp>,
1021) {
1022    let update = HealthStatusUpdate::halting(e.context(context).to_string_with_causes(), None);
1023    health_output.give(health_cap, (None, update));
1024    std::future::pending::<()>().await;
1025    unreachable!("pending future never returns");
1026}