mz_storage/
upsert.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::cell::RefCell;
11use std::cmp::Reverse;
12use std::convert::AsRef;
13use std::fmt::Debug;
14use std::hash::{Hash, Hasher};
15use std::path::PathBuf;
16use std::sync::Arc;
17
18use differential_dataflow::hashable::Hashable;
19use differential_dataflow::{AsCollection, Collection};
20use futures::StreamExt;
21use futures::future::FutureExt;
22use indexmap::map::Entry;
23use itertools::Itertools;
24use mz_ore::error::ErrorExt;
25use mz_repr::{Datum, DatumVec, Diff, GlobalId, Row};
26use mz_rocksdb::ValueIterator;
27use mz_storage_operators::metrics::BackpressureMetrics;
28use mz_storage_types::configuration::StorageConfiguration;
29use mz_storage_types::dyncfgs;
30use mz_storage_types::errors::{DataflowError, EnvelopeError, UpsertError};
31use mz_storage_types::sources::envelope::UpsertEnvelope;
32use mz_timely_util::builder_async::{
33    AsyncOutputHandle, Event as AsyncEvent, OperatorBuilder as AsyncOperatorBuilder,
34    PressOnDropButton,
35};
36use serde::{Deserialize, Serialize};
37use sha2::{Digest, Sha256};
38use timely::dataflow::channels::pact::Exchange;
39use timely::dataflow::channels::pushers::Tee;
40use timely::dataflow::operators::{Capability, InputCapability, Operator};
41use timely::dataflow::{Scope, ScopeParent, Stream};
42use timely::order::{PartialOrder, TotalOrder};
43use timely::progress::timestamp::Refines;
44use timely::progress::{Antichain, Timestamp};
45
46use crate::healthcheck::HealthStatusUpdate;
47use crate::metrics::upsert::UpsertMetrics;
48use crate::storage_state::StorageInstanceContext;
49use crate::upsert_continual_feedback;
50use types::{
51    BincodeOpts, StateValue, UpsertState, UpsertStateBackend, consolidating_merge_function,
52    upsert_bincode_opts,
53};
54
55#[cfg(test)]
56pub mod memory;
57pub(crate) mod rocksdb;
58// TODO(aljoscha): Move next to upsert module, rename to upsert_types.
59pub(crate) mod types;
60
61pub type UpsertValue = Result<Row, Box<UpsertError>>;
62
63#[derive(Copy, Clone, Hash, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
64pub struct UpsertKey([u8; 32]);
65
66impl Debug for UpsertKey {
67    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
68        write!(f, "0x")?;
69        for byte in self.0 {
70            write!(f, "{:02x}", byte)?;
71        }
72        Ok(())
73    }
74}
75
76impl AsRef<[u8]> for UpsertKey {
77    #[inline(always)]
78    // Note we do 1 `multi_get` and 1 `multi_put` while processing a _batch of updates_. Within the
79    // batch, we effectively consolidate each key, before persisting that consolidated value.
80    // Easy!!
81    fn as_ref(&self) -> &[u8] {
82        &self.0
83    }
84}
85
86impl From<&[u8]> for UpsertKey {
87    fn from(bytes: &[u8]) -> Self {
88        UpsertKey(bytes.try_into().expect("invalid key length"))
89    }
90}
91
92/// The hash function used to map upsert keys. It is important that this hash is a cryptographic
93/// hash so that there is no risk of collisions. Collisions on SHA256 have a probability of 2^128
94/// which is many orders of magnitude smaller than many other events that we don't even think about
95/// (e.g bit flips). In short, we can safely assume that sha256(a) == sha256(b) iff a == b.
96type KeyHash = Sha256;
97
98impl UpsertKey {
99    pub fn from_key(key: Result<&Row, &UpsertError>) -> Self {
100        Self::from_iter(key.map(|r| r.iter()))
101    }
102
103    pub fn from_value(value: Result<&Row, &UpsertError>, key_indices: &[usize]) -> Self {
104        thread_local! {
105            /// A thread-local datum cache used to calculate hashes
106            static VALUE_DATUMS: RefCell<DatumVec> = RefCell::new(DatumVec::new());
107        }
108        VALUE_DATUMS.with(|value_datums| {
109            let mut value_datums = value_datums.borrow_mut();
110            let value = value.map(|v| value_datums.borrow_with(v));
111            let key = match value {
112                Ok(ref datums) => Ok(key_indices.iter().map(|&idx| datums[idx])),
113                Err(err) => Err(err),
114            };
115            Self::from_iter(key)
116        })
117    }
118
119    pub fn from_iter<'a, 'b>(
120        key: Result<impl Iterator<Item = Datum<'a>> + 'b, &UpsertError>,
121    ) -> Self {
122        thread_local! {
123            /// A thread-local datum cache used to calculate hashes
124            static KEY_DATUMS: RefCell<DatumVec> = RefCell::new(DatumVec::new());
125        }
126        KEY_DATUMS.with(|key_datums| {
127            let mut key_datums = key_datums.borrow_mut();
128            // Borrowing the DatumVec gives us a temporary buffer to store datums in that will be
129            // automatically cleared on Drop. See the DatumVec docs for more details.
130            let mut key_datums = key_datums.borrow();
131            let key: Result<&[Datum], Datum> = match key {
132                Ok(key) => {
133                    for datum in key {
134                        key_datums.push(datum);
135                    }
136                    Ok(&*key_datums)
137                }
138                Err(UpsertError::Value(err)) => {
139                    key_datums.extend(err.for_key.iter());
140                    Ok(&*key_datums)
141                }
142                Err(UpsertError::KeyDecode(err)) => Err(Datum::Bytes(&err.raw)),
143                Err(UpsertError::NullKey(_)) => Err(Datum::Null),
144            };
145            let mut hasher = DigestHasher(KeyHash::new());
146            key.hash(&mut hasher);
147            Self(hasher.0.finalize().into())
148        })
149    }
150}
151
152struct DigestHasher<H: Digest>(H);
153
154impl<H: Digest> Hasher for DigestHasher<H> {
155    fn write(&mut self, bytes: &[u8]) {
156        self.0.update(bytes);
157    }
158
159    fn finish(&self) -> u64 {
160        panic!("digest wrapper used to produce a hash");
161    }
162}
163
164use std::convert::Infallible;
165use timely::container::CapacityContainerBuilder;
166use timely::dataflow::channels::pact::Pipeline;
167
168use self::types::ValueMetadata;
169
170/// This leaf operator drops `token` after the input reaches the `resume_upper`.
171/// This is useful to take coordinated actions across all workers, after the `upsert`
172/// operator has rehydrated.
173pub fn rehydration_finished<G, T>(
174    scope: G,
175    source_config: &crate::source::RawSourceCreationConfig,
176    // A token that we can drop to signal we are finished rehydrating.
177    token: impl std::any::Any + 'static,
178    resume_upper: Antichain<T>,
179    input: &Stream<G, Infallible>,
180) where
181    G: Scope<Timestamp = T>,
182    T: Timestamp,
183{
184    let worker_id = source_config.worker_id;
185    let id = source_config.id;
186    let mut builder = AsyncOperatorBuilder::new(format!("rehydration_finished({id}"), scope);
187    let mut input = builder.new_disconnected_input(input, Pipeline);
188
189    builder.build(move |_capabilities| async move {
190        let mut input_upper = Antichain::from_elem(Timestamp::minimum());
191        // Ensure this operator finishes if the resume upper is `[0]`
192        while !PartialOrder::less_equal(&resume_upper, &input_upper) {
193            let Some(event) = input.next().await else {
194                break;
195            };
196            if let AsyncEvent::Progress(upper) = event {
197                input_upper = upper;
198            }
199        }
200        tracing::info!(
201            %worker_id,
202            source_id = %id,
203            "upsert source has downgraded past the resume upper ({resume_upper:?}) across all workers",
204        );
205        drop(token);
206    });
207}
208
209/// Resumes an upsert computation at `resume_upper` given as inputs a collection of upsert commands
210/// and the collection of the previous output of this operator.
211/// Returns a tuple of
212/// - A collection of the computed upsert operator and,
213/// - A health update stream to propagate errors
214pub(crate) fn upsert<G: Scope, FromTime>(
215    input: &Collection<G, (UpsertKey, Option<UpsertValue>, FromTime), Diff>,
216    upsert_envelope: UpsertEnvelope,
217    resume_upper: Antichain<G::Timestamp>,
218    previous: Collection<G, Result<Row, DataflowError>, Diff>,
219    previous_token: Option<Vec<PressOnDropButton>>,
220    source_config: crate::source::SourceExportCreationConfig,
221    instance_context: &StorageInstanceContext,
222    storage_configuration: &StorageConfiguration,
223    dataflow_paramters: &crate::internal_control::DataflowParameters,
224    backpressure_metrics: Option<BackpressureMetrics>,
225) -> (
226    Collection<G, Result<Row, DataflowError>, Diff>,
227    Stream<G, (Option<GlobalId>, HealthStatusUpdate)>,
228    Stream<G, Infallible>,
229    PressOnDropButton,
230)
231where
232    G::Timestamp: TotalOrder + Sync,
233    G::Timestamp: Refines<mz_repr::Timestamp> + TotalOrder + Sync,
234    FromTime: Timestamp + Sync,
235{
236    let upsert_metrics = source_config.metrics.get_upsert_metrics(
237        source_config.id,
238        source_config.worker_id,
239        backpressure_metrics,
240    );
241
242    let rocksdb_cleanup_tries =
243        dyncfgs::STORAGE_ROCKSDB_CLEANUP_TRIES.get(storage_configuration.config_set());
244
245    // Whether or not to partially drain the input buffer
246    // to prevent buffering of the _upstream_ snapshot.
247    let prevent_snapshot_buffering =
248        dyncfgs::STORAGE_UPSERT_PREVENT_SNAPSHOT_BUFFERING.get(storage_configuration.config_set());
249    // If the above is true, the number of timely batches to process at once.
250    let snapshot_buffering_max = dyncfgs::STORAGE_UPSERT_MAX_SNAPSHOT_BATCH_BUFFERING
251        .get(storage_configuration.config_set());
252
253    // Whether we should provide the upsert state merge operator to the RocksDB instance
254    // (for faster performance during snapshot hydration).
255    let rocksdb_use_native_merge_operator =
256        dyncfgs::STORAGE_ROCKSDB_USE_MERGE_OPERATOR.get(storage_configuration.config_set());
257
258    let upsert_config = UpsertConfig {
259        shrink_upsert_unused_buffers_by_ratio: storage_configuration
260            .parameters
261            .shrink_upsert_unused_buffers_by_ratio,
262    };
263
264    let thin_input = upsert_thinning(input);
265
266    let tuning = dataflow_paramters.upsert_rocksdb_tuning_config.clone();
267
268    // When running RocksDB in memory, the file system is emulated. However, we still need to
269    // pick a path that exists because RocksDB will attempt to create the working directory
270    // (see https://github.com/rust-rocksdb/rust-rocksdb/issues/1015) and write a lock file,
271    // so we need to ensure the directory is unique per worker.
272    let rocksdb_dir = instance_context
273        .scratch_directory
274        .clone()
275        .unwrap_or_else(|| PathBuf::from("/tmp"))
276        .join("storage")
277        .join("upsert")
278        .join(source_config.id.to_string())
279        .join(source_config.worker_id.to_string());
280
281    tracing::info!(
282        worker_id = %source_config.worker_id,
283        source_id = %source_config.id,
284        ?rocksdb_dir,
285        ?tuning,
286        ?rocksdb_use_native_merge_operator,
287        "rendering upsert source"
288    );
289
290    let rocksdb_shared_metrics = Arc::clone(&upsert_metrics.rocksdb_shared);
291    let rocksdb_instance_metrics = Arc::clone(&upsert_metrics.rocksdb_instance_metrics);
292
293    let env = instance_context.rocksdb_env.clone();
294
295    // A closure that will initialize and return a configured RocksDB instance
296    let rocksdb_init_fn = move || async move {
297        let merge_operator = if rocksdb_use_native_merge_operator {
298            Some((
299                "upsert_state_snapshot_merge_v1".to_string(),
300                |a: &[u8], b: ValueIterator<BincodeOpts, StateValue<G::Timestamp, FromTime>>| {
301                    consolidating_merge_function::<G::Timestamp, FromTime>(a.into(), b)
302                },
303            ))
304        } else {
305            None
306        };
307        rocksdb::RocksDB::new(
308            mz_rocksdb::RocksDBInstance::new(
309                &rocksdb_dir,
310                mz_rocksdb::InstanceOptions::new(
311                    env,
312                    rocksdb_cleanup_tries,
313                    merge_operator,
314                    // For now, just use the same config as the one used for
315                    // merging snapshots.
316                    upsert_bincode_opts(),
317                ),
318                tuning,
319                rocksdb_shared_metrics,
320                rocksdb_instance_metrics,
321            )
322            .unwrap(),
323        )
324    };
325
326    upsert_operator(
327        &thin_input,
328        upsert_envelope.key_indices,
329        resume_upper,
330        previous,
331        previous_token,
332        upsert_metrics,
333        source_config,
334        rocksdb_init_fn,
335        upsert_config,
336        storage_configuration,
337        prevent_snapshot_buffering,
338        snapshot_buffering_max,
339    )
340}
341
342// A shim so we can dispatch based on the dyncfg that tells us which upsert
343// operator to use.
344fn upsert_operator<G: Scope, FromTime, F, Fut, US>(
345    input: &Collection<G, (UpsertKey, Option<UpsertValue>, FromTime), Diff>,
346    key_indices: Vec<usize>,
347    resume_upper: Antichain<G::Timestamp>,
348    persist_input: Collection<G, Result<Row, DataflowError>, Diff>,
349    persist_token: Option<Vec<PressOnDropButton>>,
350    upsert_metrics: UpsertMetrics,
351    source_config: crate::source::SourceExportCreationConfig,
352    state: F,
353    upsert_config: UpsertConfig,
354    _storage_configuration: &StorageConfiguration,
355    prevent_snapshot_buffering: bool,
356    snapshot_buffering_max: Option<usize>,
357) -> (
358    Collection<G, Result<Row, DataflowError>, Diff>,
359    Stream<G, (Option<GlobalId>, HealthStatusUpdate)>,
360    Stream<G, Infallible>,
361    PressOnDropButton,
362)
363where
364    G::Timestamp: TotalOrder + Sync,
365    G::Timestamp: Refines<mz_repr::Timestamp> + TotalOrder + Sync,
366    F: FnOnce() -> Fut + 'static,
367    Fut: std::future::Future<Output = US>,
368    US: UpsertStateBackend<G::Timestamp, FromTime>,
369    FromTime: Debug + timely::ExchangeData + Ord + Sync,
370{
371    // Hard-coded to true because classic UPSERT cannot be used safely with
372    // concurrent ingestions, which we need for both 0dt upgrades and
373    // multi-replica ingestions.
374    let use_continual_feedback_upsert = true;
375
376    tracing::info!(id = %source_config.id, %use_continual_feedback_upsert, "upsert operator implementation");
377
378    if use_continual_feedback_upsert {
379        upsert_continual_feedback::upsert_inner(
380            input,
381            key_indices,
382            resume_upper,
383            persist_input,
384            persist_token,
385            upsert_metrics,
386            source_config,
387            state,
388            upsert_config,
389            prevent_snapshot_buffering,
390            snapshot_buffering_max,
391        )
392    } else {
393        upsert_classic(
394            input,
395            key_indices,
396            resume_upper,
397            persist_input,
398            persist_token,
399            upsert_metrics,
400            source_config,
401            state,
402            upsert_config,
403            prevent_snapshot_buffering,
404            snapshot_buffering_max,
405        )
406    }
407}
408
409/// Renders an operator that discards updates that are known to not affect the outcome of upsert in
410/// a streaming fashion. For each distinct (key, time) in the input it emits the value with the
411/// highest from_time. Its purpose is to thin out data as much as possible before exchanging them
412/// across workers.
413fn upsert_thinning<G, K, V, FromTime>(
414    input: &Collection<G, (K, V, FromTime), Diff>,
415) -> Collection<G, (K, V, FromTime), Diff>
416where
417    G: Scope,
418    G::Timestamp: TotalOrder,
419    K: timely::Data + Eq + Ord,
420    V: timely::Data,
421    FromTime: Timestamp,
422{
423    input
424        .inner
425        .unary(Pipeline, "UpsertThinning", |_, _| {
426            // A capability suitable to emit all updates in `updates`, if any.
427            let mut capability: Option<InputCapability<G::Timestamp>> = None;
428            // A batch of received updates
429            let mut updates = Vec::new();
430            move |input, output| {
431                while let Some((cap, data)) = input.next() {
432                    assert!(
433                        data.iter().all(|(_, _, diff)| diff.is_positive()),
434                        "invalid upsert input"
435                    );
436                    updates.append(data);
437                    match capability.as_mut() {
438                        Some(capability) => {
439                            if cap.time() <= capability.time() {
440                                *capability = cap;
441                            }
442                        }
443                        None => capability = Some(cap),
444                    }
445                }
446                if let Some(capability) = capability.take() {
447                    // Sort by (key, time, Reverse(from_time)) so that deduping by (key, time) gives
448                    // the latest change for that key.
449                    updates.sort_unstable_by(|a, b| {
450                        let ((key1, _, from_time1), time1, _) = a;
451                        let ((key2, _, from_time2), time2, _) = b;
452                        Ord::cmp(
453                            &(key1, time1, Reverse(from_time1)),
454                            &(key2, time2, Reverse(from_time2)),
455                        )
456                    });
457                    let mut session = output.session(&capability);
458                    session.give_iterator(updates.drain(..).dedup_by(|a, b| {
459                        let ((key1, _, _), time1, _) = a;
460                        let ((key2, _, _), time2, _) = b;
461                        (key1, time1) == (key2, time2)
462                    }))
463                }
464            }
465        })
466        .as_collection()
467}
468
469/// Helper method for `upsert_classic` used to stage `data` updates
470/// from the input/source timely edge.
471fn stage_input<T, FromTime>(
472    stash: &mut Vec<(T, UpsertKey, Reverse<FromTime>, Option<UpsertValue>)>,
473    data: &mut Vec<((UpsertKey, Option<UpsertValue>, FromTime), T, Diff)>,
474    input_upper: &Antichain<T>,
475    resume_upper: &Antichain<T>,
476    storage_shrink_upsert_unused_buffers_by_ratio: usize,
477) where
478    T: PartialOrder,
479    FromTime: Ord,
480{
481    if PartialOrder::less_equal(input_upper, resume_upper) {
482        data.retain(|(_, ts, _)| resume_upper.less_equal(ts));
483    }
484
485    stash.extend(data.drain(..).map(|((key, value, order), time, diff)| {
486        assert!(diff.is_positive(), "invalid upsert input");
487        (time, key, Reverse(order), value)
488    }));
489
490    if storage_shrink_upsert_unused_buffers_by_ratio > 0 {
491        let reduced_capacity = stash.capacity() / storage_shrink_upsert_unused_buffers_by_ratio;
492        if reduced_capacity > stash.len() {
493            stash.shrink_to(reduced_capacity);
494        }
495    }
496}
497
498/// The style of drain we are performing on the stash. `AtTime`-drains cannot
499/// assume that all values have been seen, and must leave tombstones behind for deleted values.
500#[derive(Debug)]
501enum DrainStyle<'a, T> {
502    ToUpper(&'a Antichain<T>),
503    AtTime(T),
504}
505
506/// Helper method for `upsert_inner` used to stage `data` updates
507/// from the input timely edge.
508async fn drain_staged_input<S, G, T, FromTime, E>(
509    stash: &mut Vec<(T, UpsertKey, Reverse<FromTime>, Option<UpsertValue>)>,
510    commands_state: &mut indexmap::IndexMap<UpsertKey, types::UpsertValueAndSize<T, FromTime>>,
511    output_updates: &mut Vec<(UpsertValue, T, Diff)>,
512    multi_get_scratch: &mut Vec<UpsertKey>,
513    drain_style: DrainStyle<'_, T>,
514    error_emitter: &mut E,
515    state: &mut UpsertState<'_, S, T, FromTime>,
516    source_config: &crate::source::SourceExportCreationConfig,
517) where
518    S: UpsertStateBackend<T, FromTime>,
519    G: Scope,
520    T: PartialOrder + Ord + Clone + Send + Sync + Serialize + Debug + 'static,
521    FromTime: timely::ExchangeData + Ord + Sync,
522    E: UpsertErrorEmitter<G>,
523{
524    stash.sort_unstable();
525
526    // Find the prefix that we can emit
527    let idx = stash.partition_point(|(ts, _, _, _)| match &drain_style {
528        DrainStyle::ToUpper(upper) => !upper.less_equal(ts),
529        DrainStyle::AtTime(time) => ts <= time,
530    });
531
532    tracing::trace!(?drain_style, updates = idx, "draining stash in upsert");
533
534    // Read the previous values _per key_ out of `state`, recording it
535    // along with the value with the _latest timestamp for that key_.
536    commands_state.clear();
537    for (_, key, _, _) in stash.iter().take(idx) {
538        commands_state.entry(*key).or_default();
539    }
540
541    // These iterators iterate in the same order because `commands_state`
542    // is an `IndexMap`.
543    multi_get_scratch.clear();
544    multi_get_scratch.extend(commands_state.iter().map(|(k, _)| *k));
545    match state
546        .multi_get(multi_get_scratch.drain(..), commands_state.values_mut())
547        .await
548    {
549        Ok(_) => {}
550        Err(e) => {
551            error_emitter
552                .emit("Failed to fetch records from state".to_string(), e)
553                .await;
554        }
555    }
556
557    // From the prefix that can be emitted we can deduplicate based on (ts, key) in
558    // order to only process the command with the maximum order within the (ts,
559    // key) group. This is achieved by wrapping order in `Reverse(FromTime)` above.;
560    let mut commands = stash.drain(..idx).dedup_by(|a, b| {
561        let ((a_ts, a_key, _, _), (b_ts, b_key, _, _)) = (a, b);
562        a_ts == b_ts && a_key == b_key
563    });
564
565    let bincode_opts = types::upsert_bincode_opts();
566    // Upsert the values into `commands_state`, by recording the latest
567    // value (or deletion). These will be synced at the end to the `state`.
568    //
569    // Note that we are effectively doing "mini-upsert" here, using
570    // `command_state`. This "mini-upsert" is seeded with data from `state`, using
571    // a single `multi_get` above, and the final state is written out into
572    // `state` using a single `multi_put`. This simplifies `UpsertStateBackend`
573    // implementations, and reduces the number of reads and write we need to do.
574    //
575    // This "mini-upsert" technique is actually useful in `UpsertState`'s
576    // `consolidate_snapshot_read_write_inner` implementation, minimizing gets and puts on
577    // the `UpsertStateBackend` implementations. In some sense, its "upsert all the way down".
578    while let Some((ts, key, from_time, value)) = commands.next() {
579        let mut command_state = if let Entry::Occupied(command_state) = commands_state.entry(key) {
580            command_state
581        } else {
582            panic!("key missing from commands_state");
583        };
584
585        let existing_value = &mut command_state.get_mut().value;
586
587        if let Some(cs) = existing_value.as_mut() {
588            cs.ensure_decoded(bincode_opts, source_config.id);
589        }
590
591        // Skip this command if its order key is below the one in the upsert state.
592        // Note that the existing order key may be `None` if the existing value
593        // is from snapshotting, which always sorts below new values/deletes.
594        let existing_order = existing_value
595            .as_ref()
596            .and_then(|cs| cs.provisional_order(&ts));
597        if existing_order >= Some(&from_time.0) {
598            // Skip this update. If no later updates adjust this key, then we just
599            // end up writing the same value back to state. If there
600            // is nothing in the state, `existing_order` is `None`, and this
601            // does not occur.
602            continue;
603        }
604
605        match value {
606            Some(value) => {
607                if let Some(old_value) =
608                    existing_value.replace(StateValue::finalized_value(value.clone()))
609                {
610                    if let Some(old_value) = old_value.into_decoded().finalized {
611                        output_updates.push((old_value, ts.clone(), Diff::MINUS_ONE));
612                    }
613                }
614                output_updates.push((value, ts, Diff::ONE));
615            }
616            None => {
617                if let Some(old_value) = existing_value.take() {
618                    if let Some(old_value) = old_value.into_decoded().finalized {
619                        output_updates.push((old_value, ts, Diff::MINUS_ONE));
620                    }
621                }
622
623                // Record a tombstone for deletes.
624                *existing_value = Some(StateValue::tombstone());
625            }
626        }
627    }
628
629    match state
630        .multi_put(
631            true, // Do update per-update stats.
632            commands_state.drain(..).map(|(k, cv)| {
633                (
634                    k,
635                    types::PutValue {
636                        value: cv.value.map(|cv| cv.into_decoded()),
637                        previous_value_metadata: cv.metadata.map(|v| ValueMetadata {
638                            size: v.size.try_into().expect("less than i64 size"),
639                            is_tombstone: v.is_tombstone,
640                        }),
641                    },
642                )
643            }),
644        )
645        .await
646    {
647        Ok(_) => {}
648        Err(e) => {
649            error_emitter
650                .emit("Failed to update records in state".to_string(), e)
651                .await;
652        }
653    }
654}
655
656// Created a struct to hold the configs for upserts.
657// So that new configs don't require a new method parameter.
658pub(crate) struct UpsertConfig {
659    pub shrink_upsert_unused_buffers_by_ratio: usize,
660}
661
662fn upsert_classic<G: Scope, FromTime, F, Fut, US>(
663    input: &Collection<G, (UpsertKey, Option<UpsertValue>, FromTime), Diff>,
664    key_indices: Vec<usize>,
665    resume_upper: Antichain<G::Timestamp>,
666    previous: Collection<G, Result<Row, DataflowError>, Diff>,
667    previous_token: Option<Vec<PressOnDropButton>>,
668    upsert_metrics: UpsertMetrics,
669    source_config: crate::source::SourceExportCreationConfig,
670    state: F,
671    upsert_config: UpsertConfig,
672    prevent_snapshot_buffering: bool,
673    snapshot_buffering_max: Option<usize>,
674) -> (
675    Collection<G, Result<Row, DataflowError>, Diff>,
676    Stream<G, (Option<GlobalId>, HealthStatusUpdate)>,
677    Stream<G, Infallible>,
678    PressOnDropButton,
679)
680where
681    G::Timestamp: TotalOrder + Sync,
682    F: FnOnce() -> Fut + 'static,
683    Fut: std::future::Future<Output = US>,
684    US: UpsertStateBackend<G::Timestamp, FromTime>,
685    FromTime: timely::ExchangeData + Ord + Sync,
686{
687    let mut builder = AsyncOperatorBuilder::new("Upsert".to_string(), input.scope());
688
689    // We only care about UpsertValueError since this is the only error that we can retract
690    let previous = previous.flat_map(move |result| {
691        let value = match result {
692            Ok(ok) => Ok(ok),
693            Err(DataflowError::EnvelopeError(err)) => match *err {
694                EnvelopeError::Upsert(err) => Err(Box::new(err)),
695                _ => return None,
696            },
697            Err(_) => return None,
698        };
699        let value_ref = match value {
700            Ok(ref row) => Ok(row),
701            Err(ref err) => Err(&**err),
702        };
703        Some((UpsertKey::from_value(value_ref, &key_indices), value))
704    });
705    let (output_handle, output) = builder.new_output();
706
707    // An output that just reports progress of the snapshot consolidation process upstream to the
708    // persist source to ensure that backpressure is applied
709    let (_snapshot_handle, snapshot_stream) =
710        builder.new_output::<CapacityContainerBuilder<Vec<Infallible>>>();
711
712    let (mut health_output, health_stream) = builder.new_output();
713    let mut input = builder.new_input_for(
714        &input.inner,
715        Exchange::new(move |((key, _, _), _, _)| UpsertKey::hashed(key)),
716        &output_handle,
717    );
718
719    let mut previous = builder.new_input_for(
720        &previous.inner,
721        Exchange::new(|((key, _), _, _)| UpsertKey::hashed(key)),
722        &output_handle,
723    );
724
725    let upsert_shared_metrics = Arc::clone(&upsert_metrics.shared);
726    let shutdown_button = builder.build(move |caps| async move {
727        let [mut output_cap, mut snapshot_cap, health_cap]: [_; 3] = caps.try_into().unwrap();
728
729        let mut state = UpsertState::<_, _, FromTime>::new(
730            state().await,
731            upsert_shared_metrics,
732            &upsert_metrics,
733            source_config.source_statistics.clone(),
734            upsert_config.shrink_upsert_unused_buffers_by_ratio,
735        );
736        let mut events = vec![];
737        let mut snapshot_upper = Antichain::from_elem(Timestamp::minimum());
738
739        let mut stash = vec![];
740
741        let mut error_emitter = (&mut health_output, &health_cap);
742
743        tracing::info!(
744            ?resume_upper,
745            ?snapshot_upper,
746            "timely-{} upsert source {} starting rehydration",
747            source_config.worker_id,
748            source_config.id
749        );
750        // Read and consolidate the snapshot from the 'previous' input until it
751        // reaches the `resume_upper`.
752        while !PartialOrder::less_equal(&resume_upper, &snapshot_upper) {
753            previous.ready().await;
754            while let Some(event) = previous.next_sync() {
755                match event {
756                    AsyncEvent::Data(_cap, data) => {
757                        events.extend(data.into_iter().filter_map(|((key, value), ts, diff)| {
758                            if !resume_upper.less_equal(&ts) {
759                                Some((key, value, diff))
760                            } else {
761                                None
762                            }
763                        }))
764                    }
765                    AsyncEvent::Progress(upper) => {
766                        snapshot_upper = upper;
767                    }
768                };
769            }
770
771            match state
772                .consolidate_chunk(
773                    events.drain(..),
774                    PartialOrder::less_equal(&resume_upper, &snapshot_upper),
775                )
776                .await
777            {
778                Ok(_) => {
779                    if let Some(ts) = snapshot_upper.clone().into_option() {
780                        // As we shutdown, we could ostensibly get data from later than the
781                        // `resume_upper`, which we ignore above. We don't want our output capability to make
782                        // it further than the `resume_upper`.
783                        if !resume_upper.less_equal(&ts) {
784                            snapshot_cap.downgrade(&ts);
785                            output_cap.downgrade(&ts);
786                        }
787                    }
788                }
789                Err(e) => {
790                    UpsertErrorEmitter::<G>::emit(
791                        &mut error_emitter,
792                        "Failed to rehydrate state".to_string(),
793                        e,
794                    )
795                    .await;
796                }
797            }
798        }
799
800        drop(events);
801        drop(previous_token);
802        drop(snapshot_cap);
803
804        // Exchaust the previous input. It is expected to immediately reach the empty
805        // antichain since we have dropped its token.
806        //
807        // Note that we do not need to also process the `input` during this, as the dropped token
808        // will shutdown the `backpressure` operator
809        while let Some(_event) = previous.next().await {}
810
811        // After snapshotting, our output frontier is exactly the `resume_upper`
812        if let Some(ts) = resume_upper.as_option() {
813            output_cap.downgrade(ts);
814        }
815
816        tracing::info!(
817            "timely-{} upsert source {} finished rehydration",
818            source_config.worker_id,
819            source_config.id
820        );
821
822        // A re-usable buffer of changes, per key. This is an `IndexMap` because it has to be `drain`-able
823        // and have a consistent iteration order.
824        let mut commands_state: indexmap::IndexMap<
825            _,
826            types::UpsertValueAndSize<G::Timestamp, FromTime>,
827        > = indexmap::IndexMap::new();
828        let mut multi_get_scratch = Vec::new();
829
830        // Now can can resume consuming the collection
831        let mut output_updates = vec![];
832        let mut input_upper = Antichain::from_elem(Timestamp::minimum());
833
834        while let Some(event) = input.next().await {
835            // Buffer as many events as possible. This should be bounded, as new data can't be
836            // produced in this worker until we yield to timely.
837            let events = [event]
838                .into_iter()
839                .chain(std::iter::from_fn(|| input.next().now_or_never().flatten()))
840                .enumerate();
841
842            let mut partial_drain_time = None;
843            for (i, event) in events {
844                match event {
845                    AsyncEvent::Data(cap, mut data) => {
846                        tracing::trace!(
847                            time=?cap.time(),
848                            updates=%data.len(),
849                            "received data in upsert"
850                        );
851                        stage_input(
852                            &mut stash,
853                            &mut data,
854                            &input_upper,
855                            &resume_upper,
856                            upsert_config.shrink_upsert_unused_buffers_by_ratio,
857                        );
858
859                        let event_time = cap.time();
860                        // If the data is at _exactly_ the output frontier, we can preemptively drain it into the state.
861                        // Data within this set events strictly beyond this time are staged as
862                        // normal.
863                        //
864                        // This is a load-bearing optimization, as it is required to avoid buffering
865                        // the entire source snapshot in the `stash`.
866                        if prevent_snapshot_buffering && output_cap.time() == event_time {
867                            partial_drain_time = Some(event_time.clone());
868                        }
869                    }
870                    AsyncEvent::Progress(upper) => {
871                        tracing::trace!(?upper, "received progress in upsert");
872                        // Ignore progress updates before the `resume_upper`, which is our initial
873                        // capability post-snapshotting.
874                        if PartialOrder::less_than(&upper, &resume_upper) {
875                            continue;
876                        }
877
878                        // Disable the partial drain as this progress event covers
879                        // the `output_cap` time.
880                        partial_drain_time = None;
881                        drain_staged_input::<_, G, _, _, _>(
882                            &mut stash,
883                            &mut commands_state,
884                            &mut output_updates,
885                            &mut multi_get_scratch,
886                            DrainStyle::ToUpper(&upper),
887                            &mut error_emitter,
888                            &mut state,
889                            &source_config,
890                        )
891                        .await;
892
893                        output_handle.give_container(&output_cap, &mut output_updates);
894
895                        if let Some(ts) = upper.as_option() {
896                            output_cap.downgrade(ts);
897                        }
898                        input_upper = upper;
899                    }
900                }
901                let events_processed = i + 1;
902                if let Some(max) = snapshot_buffering_max {
903                    if events_processed >= max {
904                        break;
905                    }
906                }
907            }
908
909            // If there were staged events that occurred at the capability time, drain
910            // them. This is safe because out-of-order updates to the same key that are
911            // drained in separate calls to `drain_staged_input` are correctly ordered by
912            // their `FromTime` in `drain_staged_input`.
913            //
914            // Note also that this may result in more updates in the output collection than
915            // the minimum. However, because the frontier only advances on `Progress` updates,
916            // the collection always accumulates correctly for all keys.
917            if let Some(partial_drain_time) = partial_drain_time {
918                drain_staged_input::<_, G, _, _, _>(
919                    &mut stash,
920                    &mut commands_state,
921                    &mut output_updates,
922                    &mut multi_get_scratch,
923                    DrainStyle::AtTime(partial_drain_time),
924                    &mut error_emitter,
925                    &mut state,
926                    &source_config,
927                )
928                .await;
929
930                output_handle.give_container(&output_cap, &mut output_updates);
931            }
932        }
933    });
934
935    (
936        output.as_collection().map(|result| match result {
937            Ok(ok) => Ok(ok),
938            Err(err) => Err(DataflowError::from(EnvelopeError::Upsert(*err))),
939        }),
940        health_stream,
941        snapshot_stream,
942        shutdown_button.press_on_drop(),
943    )
944}
945
946#[async_trait::async_trait(?Send)]
947pub(crate) trait UpsertErrorEmitter<G> {
948    async fn emit(&mut self, context: String, e: anyhow::Error);
949}
950
951#[async_trait::async_trait(?Send)]
952impl<G: Scope> UpsertErrorEmitter<G>
953    for (
954        &mut AsyncOutputHandle<
955            <G as ScopeParent>::Timestamp,
956            CapacityContainerBuilder<Vec<(Option<GlobalId>, HealthStatusUpdate)>>,
957            Tee<<G as ScopeParent>::Timestamp, Vec<(Option<GlobalId>, HealthStatusUpdate)>>,
958        >,
959        &Capability<<G as ScopeParent>::Timestamp>,
960    )
961{
962    async fn emit(&mut self, context: String, e: anyhow::Error) {
963        process_upsert_state_error::<G>(context, e, self.0, self.1).await
964    }
965}
966
967/// Emit the given error, and stall till the dataflow is restarted.
968async fn process_upsert_state_error<G: Scope>(
969    context: String,
970    e: anyhow::Error,
971    health_output: &AsyncOutputHandle<
972        <G as ScopeParent>::Timestamp,
973        CapacityContainerBuilder<Vec<(Option<GlobalId>, HealthStatusUpdate)>>,
974        Tee<<G as ScopeParent>::Timestamp, Vec<(Option<GlobalId>, HealthStatusUpdate)>>,
975    >,
976    health_cap: &Capability<<G as ScopeParent>::Timestamp>,
977) {
978    let update = HealthStatusUpdate::halting(e.context(context).to_string_with_causes(), None);
979    health_output.give(health_cap, (None, update));
980    std::future::pending::<()>().await;
981    unreachable!("pending future never returns");
982}