mz_storage/
upsert.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::cell::RefCell;
11use std::cmp::Reverse;
12use std::convert::AsRef;
13use std::fmt::Debug;
14use std::hash::{Hash, Hasher};
15use std::path::PathBuf;
16use std::sync::Arc;
17
18use differential_dataflow::hashable::Hashable;
19use differential_dataflow::{AsCollection, VecCollection};
20use futures::StreamExt;
21use futures::future::FutureExt;
22use indexmap::map::Entry;
23use itertools::Itertools;
24use mz_ore::error::ErrorExt;
25use mz_repr::{Datum, DatumVec, Diff, GlobalId, Row};
26use mz_rocksdb::ValueIterator;
27use mz_storage_operators::metrics::BackpressureMetrics;
28use mz_storage_types::configuration::StorageConfiguration;
29use mz_storage_types::dyncfgs;
30use mz_storage_types::errors::{DataflowError, EnvelopeError, UpsertError};
31use mz_storage_types::sources::envelope::UpsertEnvelope;
32use mz_timely_util::builder_async::{
33    AsyncOutputHandle, Event as AsyncEvent, OperatorBuilder as AsyncOperatorBuilder,
34    PressOnDropButton,
35};
36use serde::{Deserialize, Serialize};
37use sha2::{Digest, Sha256};
38use timely::dataflow::channels::pact::Exchange;
39use timely::dataflow::operators::{Capability, InputCapability, Operator};
40use timely::dataflow::{Scope, ScopeParent, Stream};
41use timely::order::{PartialOrder, TotalOrder};
42use timely::progress::timestamp::Refines;
43use timely::progress::{Antichain, Timestamp};
44
45use crate::healthcheck::HealthStatusUpdate;
46use crate::metrics::upsert::UpsertMetrics;
47use crate::storage_state::StorageInstanceContext;
48use crate::upsert_continual_feedback;
49use types::{
50    BincodeOpts, StateValue, UpsertState, UpsertStateBackend, consolidating_merge_function,
51    upsert_bincode_opts,
52};
53
54#[cfg(test)]
55pub mod memory;
56pub(crate) mod rocksdb;
57// TODO(aljoscha): Move next to upsert module, rename to upsert_types.
58pub(crate) mod types;
59
60pub type UpsertValue = Result<Row, Box<UpsertError>>;
61
62#[derive(Copy, Clone, Hash, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
63pub struct UpsertKey([u8; 32]);
64
65impl Debug for UpsertKey {
66    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
67        write!(f, "0x")?;
68        for byte in self.0 {
69            write!(f, "{:02x}", byte)?;
70        }
71        Ok(())
72    }
73}
74
75impl AsRef<[u8]> for UpsertKey {
76    #[inline(always)]
77    // Note we do 1 `multi_get` and 1 `multi_put` while processing a _batch of updates_. Within the
78    // batch, we effectively consolidate each key, before persisting that consolidated value.
79    // Easy!!
80    fn as_ref(&self) -> &[u8] {
81        &self.0
82    }
83}
84
85impl From<&[u8]> for UpsertKey {
86    fn from(bytes: &[u8]) -> Self {
87        UpsertKey(bytes.try_into().expect("invalid key length"))
88    }
89}
90
91/// The hash function used to map upsert keys. It is important that this hash is a cryptographic
92/// hash so that there is no risk of collisions. Collisions on SHA256 have a probability of 2^128
93/// which is many orders of magnitude smaller than many other events that we don't even think about
94/// (e.g bit flips). In short, we can safely assume that sha256(a) == sha256(b) iff a == b.
95type KeyHash = Sha256;
96
97impl UpsertKey {
98    pub fn from_key(key: Result<&Row, &UpsertError>) -> Self {
99        Self::from_iter(key.map(|r| r.iter()))
100    }
101
102    pub fn from_value(value: Result<&Row, &UpsertError>, key_indices: &[usize]) -> Self {
103        thread_local! {
104            /// A thread-local datum cache used to calculate hashes
105            static VALUE_DATUMS: RefCell<DatumVec> = RefCell::new(DatumVec::new());
106        }
107        VALUE_DATUMS.with(|value_datums| {
108            let mut value_datums = value_datums.borrow_mut();
109            let value = value.map(|v| value_datums.borrow_with(v));
110            let key = match value {
111                Ok(ref datums) => Ok(key_indices.iter().map(|&idx| datums[idx])),
112                Err(err) => Err(err),
113            };
114            Self::from_iter(key)
115        })
116    }
117
118    pub fn from_iter<'a, 'b>(
119        key: Result<impl Iterator<Item = Datum<'a>> + 'b, &UpsertError>,
120    ) -> Self {
121        thread_local! {
122            /// A thread-local datum cache used to calculate hashes
123            static KEY_DATUMS: RefCell<DatumVec> = RefCell::new(DatumVec::new());
124        }
125        KEY_DATUMS.with(|key_datums| {
126            let mut key_datums = key_datums.borrow_mut();
127            // Borrowing the DatumVec gives us a temporary buffer to store datums in that will be
128            // automatically cleared on Drop. See the DatumVec docs for more details.
129            let mut key_datums = key_datums.borrow();
130            let key: Result<&[Datum], Datum> = match key {
131                Ok(key) => {
132                    for datum in key {
133                        key_datums.push(datum);
134                    }
135                    Ok(&*key_datums)
136                }
137                Err(UpsertError::Value(err)) => {
138                    key_datums.extend(err.for_key.iter());
139                    Ok(&*key_datums)
140                }
141                Err(UpsertError::KeyDecode(err)) => Err(Datum::Bytes(&err.raw)),
142                Err(UpsertError::NullKey(_)) => Err(Datum::Null),
143            };
144            let mut hasher = DigestHasher(KeyHash::new());
145            key.hash(&mut hasher);
146            Self(hasher.0.finalize().into())
147        })
148    }
149}
150
151struct DigestHasher<H: Digest>(H);
152
153impl<H: Digest> Hasher for DigestHasher<H> {
154    fn write(&mut self, bytes: &[u8]) {
155        self.0.update(bytes);
156    }
157
158    fn finish(&self) -> u64 {
159        panic!("digest wrapper used to produce a hash");
160    }
161}
162
163use std::convert::Infallible;
164use timely::container::CapacityContainerBuilder;
165use timely::dataflow::channels::pact::Pipeline;
166
167use self::types::ValueMetadata;
168
169/// This leaf operator drops `token` after the input reaches the `resume_upper`.
170/// This is useful to take coordinated actions across all workers, after the `upsert`
171/// operator has rehydrated.
172pub fn rehydration_finished<G, T>(
173    scope: G,
174    source_config: &crate::source::RawSourceCreationConfig,
175    // A token that we can drop to signal we are finished rehydrating.
176    token: impl std::any::Any + 'static,
177    resume_upper: Antichain<T>,
178    input: &Stream<G, Infallible>,
179) where
180    G: Scope<Timestamp = T>,
181    T: Timestamp,
182{
183    let worker_id = source_config.worker_id;
184    let id = source_config.id;
185    let mut builder = AsyncOperatorBuilder::new(format!("rehydration_finished({id}"), scope);
186    let mut input = builder.new_disconnected_input(input, Pipeline);
187
188    builder.build(move |_capabilities| async move {
189        let mut input_upper = Antichain::from_elem(Timestamp::minimum());
190        // Ensure this operator finishes if the resume upper is `[0]`
191        while !PartialOrder::less_equal(&resume_upper, &input_upper) {
192            let Some(event) = input.next().await else {
193                break;
194            };
195            if let AsyncEvent::Progress(upper) = event {
196                input_upper = upper;
197            }
198        }
199        tracing::info!(
200            %worker_id,
201            source_id = %id,
202            "upsert source has downgraded past the resume upper ({resume_upper:?}) across all workers",
203        );
204        drop(token);
205    });
206}
207
208/// Resumes an upsert computation at `resume_upper` given as inputs a collection of upsert commands
209/// and the collection of the previous output of this operator.
210/// Returns a tuple of
211/// - A collection of the computed upsert operator and,
212/// - A health update stream to propagate errors
213pub(crate) fn upsert<G: Scope, FromTime>(
214    input: &VecCollection<G, (UpsertKey, Option<UpsertValue>, FromTime), Diff>,
215    upsert_envelope: UpsertEnvelope,
216    resume_upper: Antichain<G::Timestamp>,
217    previous: VecCollection<G, Result<Row, DataflowError>, Diff>,
218    previous_token: Option<Vec<PressOnDropButton>>,
219    source_config: crate::source::SourceExportCreationConfig,
220    instance_context: &StorageInstanceContext,
221    storage_configuration: &StorageConfiguration,
222    dataflow_paramters: &crate::internal_control::DataflowParameters,
223    backpressure_metrics: Option<BackpressureMetrics>,
224) -> (
225    VecCollection<G, Result<Row, DataflowError>, Diff>,
226    Stream<G, (Option<GlobalId>, HealthStatusUpdate)>,
227    Stream<G, Infallible>,
228    PressOnDropButton,
229)
230where
231    G::Timestamp: TotalOrder + Sync,
232    G::Timestamp: Refines<mz_repr::Timestamp> + TotalOrder + Sync,
233    FromTime: Timestamp + Sync,
234{
235    let upsert_metrics = source_config.metrics.get_upsert_metrics(
236        source_config.id,
237        source_config.worker_id,
238        backpressure_metrics,
239    );
240
241    let rocksdb_cleanup_tries =
242        dyncfgs::STORAGE_ROCKSDB_CLEANUP_TRIES.get(storage_configuration.config_set());
243
244    // Whether or not to partially drain the input buffer
245    // to prevent buffering of the _upstream_ snapshot.
246    let prevent_snapshot_buffering =
247        dyncfgs::STORAGE_UPSERT_PREVENT_SNAPSHOT_BUFFERING.get(storage_configuration.config_set());
248    // If the above is true, the number of timely batches to process at once.
249    let snapshot_buffering_max = dyncfgs::STORAGE_UPSERT_MAX_SNAPSHOT_BATCH_BUFFERING
250        .get(storage_configuration.config_set());
251
252    // Whether we should provide the upsert state merge operator to the RocksDB instance
253    // (for faster performance during snapshot hydration).
254    let rocksdb_use_native_merge_operator =
255        dyncfgs::STORAGE_ROCKSDB_USE_MERGE_OPERATOR.get(storage_configuration.config_set());
256
257    let upsert_config = UpsertConfig {
258        shrink_upsert_unused_buffers_by_ratio: storage_configuration
259            .parameters
260            .shrink_upsert_unused_buffers_by_ratio,
261    };
262
263    let thin_input = upsert_thinning(input);
264
265    let tuning = dataflow_paramters.upsert_rocksdb_tuning_config.clone();
266
267    // When running RocksDB in memory, the file system is emulated. However, we still need to
268    // pick a path that exists because RocksDB will attempt to create the working directory
269    // (see https://github.com/rust-rocksdb/rust-rocksdb/issues/1015) and write a lock file,
270    // so we need to ensure the directory is unique per worker.
271    let rocksdb_dir = instance_context
272        .scratch_directory
273        .clone()
274        .unwrap_or_else(|| PathBuf::from("/tmp"))
275        .join("storage")
276        .join("upsert")
277        .join(source_config.id.to_string())
278        .join(source_config.worker_id.to_string());
279
280    tracing::info!(
281        worker_id = %source_config.worker_id,
282        source_id = %source_config.id,
283        ?rocksdb_dir,
284        ?tuning,
285        ?rocksdb_use_native_merge_operator,
286        "rendering upsert source"
287    );
288
289    let rocksdb_shared_metrics = Arc::clone(&upsert_metrics.rocksdb_shared);
290    let rocksdb_instance_metrics = Arc::clone(&upsert_metrics.rocksdb_instance_metrics);
291
292    let env = instance_context.rocksdb_env.clone();
293
294    // A closure that will initialize and return a configured RocksDB instance
295    let rocksdb_init_fn = move || async move {
296        let merge_operator = if rocksdb_use_native_merge_operator {
297            Some((
298                "upsert_state_snapshot_merge_v1".to_string(),
299                |a: &[u8], b: ValueIterator<BincodeOpts, StateValue<G::Timestamp, FromTime>>| {
300                    consolidating_merge_function::<G::Timestamp, FromTime>(a.into(), b)
301                },
302            ))
303        } else {
304            None
305        };
306        rocksdb::RocksDB::new(
307            mz_rocksdb::RocksDBInstance::new(
308                &rocksdb_dir,
309                mz_rocksdb::InstanceOptions::new(
310                    env,
311                    rocksdb_cleanup_tries,
312                    merge_operator,
313                    // For now, just use the same config as the one used for
314                    // merging snapshots.
315                    upsert_bincode_opts(),
316                ),
317                tuning,
318                rocksdb_shared_metrics,
319                rocksdb_instance_metrics,
320            )
321            .unwrap(),
322        )
323    };
324
325    upsert_operator(
326        &thin_input,
327        upsert_envelope.key_indices,
328        resume_upper,
329        previous,
330        previous_token,
331        upsert_metrics,
332        source_config,
333        rocksdb_init_fn,
334        upsert_config,
335        storage_configuration,
336        prevent_snapshot_buffering,
337        snapshot_buffering_max,
338    )
339}
340
341// A shim so we can dispatch based on the dyncfg that tells us which upsert
342// operator to use.
343fn upsert_operator<G: Scope, FromTime, F, Fut, US>(
344    input: &VecCollection<G, (UpsertKey, Option<UpsertValue>, FromTime), Diff>,
345    key_indices: Vec<usize>,
346    resume_upper: Antichain<G::Timestamp>,
347    persist_input: VecCollection<G, Result<Row, DataflowError>, Diff>,
348    persist_token: Option<Vec<PressOnDropButton>>,
349    upsert_metrics: UpsertMetrics,
350    source_config: crate::source::SourceExportCreationConfig,
351    state: F,
352    upsert_config: UpsertConfig,
353    _storage_configuration: &StorageConfiguration,
354    prevent_snapshot_buffering: bool,
355    snapshot_buffering_max: Option<usize>,
356) -> (
357    VecCollection<G, Result<Row, DataflowError>, Diff>,
358    Stream<G, (Option<GlobalId>, HealthStatusUpdate)>,
359    Stream<G, Infallible>,
360    PressOnDropButton,
361)
362where
363    G::Timestamp: TotalOrder + Sync,
364    G::Timestamp: Refines<mz_repr::Timestamp> + TotalOrder + Sync,
365    F: FnOnce() -> Fut + 'static,
366    Fut: std::future::Future<Output = US>,
367    US: UpsertStateBackend<G::Timestamp, FromTime>,
368    FromTime: Debug + timely::ExchangeData + Ord + Sync,
369{
370    // Hard-coded to true because classic UPSERT cannot be used safely with
371    // concurrent ingestions, which we need for both 0dt upgrades and
372    // multi-replica ingestions.
373    let use_continual_feedback_upsert = true;
374
375    tracing::info!(id = %source_config.id, %use_continual_feedback_upsert, "upsert operator implementation");
376
377    if use_continual_feedback_upsert {
378        upsert_continual_feedback::upsert_inner(
379            input,
380            key_indices,
381            resume_upper,
382            persist_input,
383            persist_token,
384            upsert_metrics,
385            source_config,
386            state,
387            upsert_config,
388            prevent_snapshot_buffering,
389            snapshot_buffering_max,
390        )
391    } else {
392        upsert_classic(
393            input,
394            key_indices,
395            resume_upper,
396            persist_input,
397            persist_token,
398            upsert_metrics,
399            source_config,
400            state,
401            upsert_config,
402            prevent_snapshot_buffering,
403            snapshot_buffering_max,
404        )
405    }
406}
407
408/// Renders an operator that discards updates that are known to not affect the outcome of upsert in
409/// a streaming fashion. For each distinct (key, time) in the input it emits the value with the
410/// highest from_time. Its purpose is to thin out data as much as possible before exchanging them
411/// across workers.
412fn upsert_thinning<G, K, V, FromTime>(
413    input: &VecCollection<G, (K, V, FromTime), Diff>,
414) -> VecCollection<G, (K, V, FromTime), Diff>
415where
416    G: Scope,
417    G::Timestamp: TotalOrder,
418    K: timely::Data + Eq + Ord,
419    V: timely::Data,
420    FromTime: Timestamp,
421{
422    input
423        .inner
424        .unary(Pipeline, "UpsertThinning", |_, _| {
425            // A capability suitable to emit all updates in `updates`, if any.
426            let mut capability: Option<InputCapability<G::Timestamp>> = None;
427            // A batch of received updates
428            let mut updates = Vec::new();
429            move |input, output| {
430                input.for_each(|cap, data| {
431                    assert!(
432                        data.iter().all(|(_, _, diff)| diff.is_positive()),
433                        "invalid upsert input"
434                    );
435                    updates.append(data);
436                    match capability.as_mut() {
437                        Some(capability) => {
438                            if cap.time() <= capability.time() {
439                                *capability = cap;
440                            }
441                        }
442                        None => capability = Some(cap),
443                    }
444                });
445                if let Some(capability) = capability.take() {
446                    // Sort by (key, time, Reverse(from_time)) so that deduping by (key, time) gives
447                    // the latest change for that key.
448                    updates.sort_unstable_by(|a, b| {
449                        let ((key1, _, from_time1), time1, _) = a;
450                        let ((key2, _, from_time2), time2, _) = b;
451                        Ord::cmp(
452                            &(key1, time1, Reverse(from_time1)),
453                            &(key2, time2, Reverse(from_time2)),
454                        )
455                    });
456                    let mut session = output.session(&capability);
457                    session.give_iterator(updates.drain(..).dedup_by(|a, b| {
458                        let ((key1, _, _), time1, _) = a;
459                        let ((key2, _, _), time2, _) = b;
460                        (key1, time1) == (key2, time2)
461                    }))
462                }
463            }
464        })
465        .as_collection()
466}
467
468/// Helper method for `upsert_classic` used to stage `data` updates
469/// from the input/source timely edge.
470fn stage_input<T, FromTime>(
471    stash: &mut Vec<(T, UpsertKey, Reverse<FromTime>, Option<UpsertValue>)>,
472    data: &mut Vec<((UpsertKey, Option<UpsertValue>, FromTime), T, Diff)>,
473    input_upper: &Antichain<T>,
474    resume_upper: &Antichain<T>,
475    storage_shrink_upsert_unused_buffers_by_ratio: usize,
476) where
477    T: PartialOrder,
478    FromTime: Ord,
479{
480    if PartialOrder::less_equal(input_upper, resume_upper) {
481        data.retain(|(_, ts, _)| resume_upper.less_equal(ts));
482    }
483
484    stash.extend(data.drain(..).map(|((key, value, order), time, diff)| {
485        assert!(diff.is_positive(), "invalid upsert input");
486        (time, key, Reverse(order), value)
487    }));
488
489    if storage_shrink_upsert_unused_buffers_by_ratio > 0 {
490        let reduced_capacity = stash.capacity() / storage_shrink_upsert_unused_buffers_by_ratio;
491        if reduced_capacity > stash.len() {
492            stash.shrink_to(reduced_capacity);
493        }
494    }
495}
496
497/// The style of drain we are performing on the stash. `AtTime`-drains cannot
498/// assume that all values have been seen, and must leave tombstones behind for deleted values.
499#[derive(Debug)]
500enum DrainStyle<'a, T> {
501    ToUpper(&'a Antichain<T>),
502    AtTime(T),
503}
504
505/// Helper method for `upsert_inner` used to stage `data` updates
506/// from the input timely edge.
507async fn drain_staged_input<S, G, T, FromTime, E>(
508    stash: &mut Vec<(T, UpsertKey, Reverse<FromTime>, Option<UpsertValue>)>,
509    commands_state: &mut indexmap::IndexMap<UpsertKey, types::UpsertValueAndSize<T, FromTime>>,
510    output_updates: &mut Vec<(UpsertValue, T, Diff)>,
511    multi_get_scratch: &mut Vec<UpsertKey>,
512    drain_style: DrainStyle<'_, T>,
513    error_emitter: &mut E,
514    state: &mut UpsertState<'_, S, T, FromTime>,
515    source_config: &crate::source::SourceExportCreationConfig,
516) where
517    S: UpsertStateBackend<T, FromTime>,
518    G: Scope,
519    T: PartialOrder + Ord + Clone + Send + Sync + Serialize + Debug + 'static,
520    FromTime: timely::ExchangeData + Ord + Sync,
521    E: UpsertErrorEmitter<G>,
522{
523    stash.sort_unstable();
524
525    // Find the prefix that we can emit
526    let idx = stash.partition_point(|(ts, _, _, _)| match &drain_style {
527        DrainStyle::ToUpper(upper) => !upper.less_equal(ts),
528        DrainStyle::AtTime(time) => ts <= time,
529    });
530
531    tracing::trace!(?drain_style, updates = idx, "draining stash in upsert");
532
533    // Read the previous values _per key_ out of `state`, recording it
534    // along with the value with the _latest timestamp for that key_.
535    commands_state.clear();
536    for (_, key, _, _) in stash.iter().take(idx) {
537        commands_state.entry(*key).or_default();
538    }
539
540    // These iterators iterate in the same order because `commands_state`
541    // is an `IndexMap`.
542    multi_get_scratch.clear();
543    multi_get_scratch.extend(commands_state.iter().map(|(k, _)| *k));
544    match state
545        .multi_get(multi_get_scratch.drain(..), commands_state.values_mut())
546        .await
547    {
548        Ok(_) => {}
549        Err(e) => {
550            error_emitter
551                .emit("Failed to fetch records from state".to_string(), e)
552                .await;
553        }
554    }
555
556    // From the prefix that can be emitted we can deduplicate based on (ts, key) in
557    // order to only process the command with the maximum order within the (ts,
558    // key) group. This is achieved by wrapping order in `Reverse(FromTime)` above.;
559    let mut commands = stash.drain(..idx).dedup_by(|a, b| {
560        let ((a_ts, a_key, _, _), (b_ts, b_key, _, _)) = (a, b);
561        a_ts == b_ts && a_key == b_key
562    });
563
564    let bincode_opts = types::upsert_bincode_opts();
565    // Upsert the values into `commands_state`, by recording the latest
566    // value (or deletion). These will be synced at the end to the `state`.
567    //
568    // Note that we are effectively doing "mini-upsert" here, using
569    // `command_state`. This "mini-upsert" is seeded with data from `state`, using
570    // a single `multi_get` above, and the final state is written out into
571    // `state` using a single `multi_put`. This simplifies `UpsertStateBackend`
572    // implementations, and reduces the number of reads and write we need to do.
573    //
574    // This "mini-upsert" technique is actually useful in `UpsertState`'s
575    // `consolidate_snapshot_read_write_inner` implementation, minimizing gets and puts on
576    // the `UpsertStateBackend` implementations. In some sense, its "upsert all the way down".
577    while let Some((ts, key, from_time, value)) = commands.next() {
578        let mut command_state = if let Entry::Occupied(command_state) = commands_state.entry(key) {
579            command_state
580        } else {
581            panic!("key missing from commands_state");
582        };
583
584        let existing_value = &mut command_state.get_mut().value;
585
586        if let Some(cs) = existing_value.as_mut() {
587            cs.ensure_decoded(bincode_opts, source_config.id);
588        }
589
590        // Skip this command if its order key is below the one in the upsert state.
591        // Note that the existing order key may be `None` if the existing value
592        // is from snapshotting, which always sorts below new values/deletes.
593        let existing_order = existing_value
594            .as_ref()
595            .and_then(|cs| cs.provisional_order(&ts));
596        if existing_order >= Some(&from_time.0) {
597            // Skip this update. If no later updates adjust this key, then we just
598            // end up writing the same value back to state. If there
599            // is nothing in the state, `existing_order` is `None`, and this
600            // does not occur.
601            continue;
602        }
603
604        match value {
605            Some(value) => {
606                if let Some(old_value) =
607                    existing_value.replace(StateValue::finalized_value(value.clone()))
608                {
609                    if let Some(old_value) = old_value.into_decoded().finalized {
610                        output_updates.push((old_value, ts.clone(), Diff::MINUS_ONE));
611                    }
612                }
613                output_updates.push((value, ts, Diff::ONE));
614            }
615            None => {
616                if let Some(old_value) = existing_value.take() {
617                    if let Some(old_value) = old_value.into_decoded().finalized {
618                        output_updates.push((old_value, ts, Diff::MINUS_ONE));
619                    }
620                }
621
622                // Record a tombstone for deletes.
623                *existing_value = Some(StateValue::tombstone());
624            }
625        }
626    }
627
628    match state
629        .multi_put(
630            true, // Do update per-update stats.
631            commands_state.drain(..).map(|(k, cv)| {
632                (
633                    k,
634                    types::PutValue {
635                        value: cv.value.map(|cv| cv.into_decoded()),
636                        previous_value_metadata: cv.metadata.map(|v| ValueMetadata {
637                            size: v.size.try_into().expect("less than i64 size"),
638                            is_tombstone: v.is_tombstone,
639                        }),
640                    },
641                )
642            }),
643        )
644        .await
645    {
646        Ok(_) => {}
647        Err(e) => {
648            error_emitter
649                .emit("Failed to update records in state".to_string(), e)
650                .await;
651        }
652    }
653}
654
655// Created a struct to hold the configs for upserts.
656// So that new configs don't require a new method parameter.
657pub(crate) struct UpsertConfig {
658    pub shrink_upsert_unused_buffers_by_ratio: usize,
659}
660
661fn upsert_classic<G: Scope, FromTime, F, Fut, US>(
662    input: &VecCollection<G, (UpsertKey, Option<UpsertValue>, FromTime), Diff>,
663    key_indices: Vec<usize>,
664    resume_upper: Antichain<G::Timestamp>,
665    previous: VecCollection<G, Result<Row, DataflowError>, Diff>,
666    previous_token: Option<Vec<PressOnDropButton>>,
667    upsert_metrics: UpsertMetrics,
668    source_config: crate::source::SourceExportCreationConfig,
669    state: F,
670    upsert_config: UpsertConfig,
671    prevent_snapshot_buffering: bool,
672    snapshot_buffering_max: Option<usize>,
673) -> (
674    VecCollection<G, Result<Row, DataflowError>, Diff>,
675    Stream<G, (Option<GlobalId>, HealthStatusUpdate)>,
676    Stream<G, Infallible>,
677    PressOnDropButton,
678)
679where
680    G::Timestamp: TotalOrder + Sync,
681    F: FnOnce() -> Fut + 'static,
682    Fut: std::future::Future<Output = US>,
683    US: UpsertStateBackend<G::Timestamp, FromTime>,
684    FromTime: timely::ExchangeData + Ord + Sync,
685{
686    let mut builder = AsyncOperatorBuilder::new("Upsert".to_string(), input.scope());
687
688    // We only care about UpsertValueError since this is the only error that we can retract
689    let previous = previous.flat_map(move |result| {
690        let value = match result {
691            Ok(ok) => Ok(ok),
692            Err(DataflowError::EnvelopeError(err)) => match *err {
693                EnvelopeError::Upsert(err) => Err(Box::new(err)),
694                _ => return None,
695            },
696            Err(_) => return None,
697        };
698        let value_ref = match value {
699            Ok(ref row) => Ok(row),
700            Err(ref err) => Err(&**err),
701        };
702        Some((UpsertKey::from_value(value_ref, &key_indices), value))
703    });
704    let (output_handle, output) = builder.new_output();
705
706    // An output that just reports progress of the snapshot consolidation process upstream to the
707    // persist source to ensure that backpressure is applied
708    let (_snapshot_handle, snapshot_stream) =
709        builder.new_output::<CapacityContainerBuilder<Vec<Infallible>>>();
710
711    let (mut health_output, health_stream) = builder.new_output();
712    let mut input = builder.new_input_for(
713        &input.inner,
714        Exchange::new(move |((key, _, _), _, _)| UpsertKey::hashed(key)),
715        &output_handle,
716    );
717
718    let mut previous = builder.new_input_for(
719        &previous.inner,
720        Exchange::new(|((key, _), _, _)| UpsertKey::hashed(key)),
721        &output_handle,
722    );
723
724    let upsert_shared_metrics = Arc::clone(&upsert_metrics.shared);
725    let shutdown_button = builder.build(move |caps| async move {
726        let [mut output_cap, mut snapshot_cap, health_cap]: [_; 3] = caps.try_into().unwrap();
727
728        let mut state = UpsertState::<_, _, FromTime>::new(
729            state().await,
730            upsert_shared_metrics,
731            &upsert_metrics,
732            source_config.source_statistics.clone(),
733            upsert_config.shrink_upsert_unused_buffers_by_ratio,
734        );
735        let mut events = vec![];
736        let mut snapshot_upper = Antichain::from_elem(Timestamp::minimum());
737
738        let mut stash = vec![];
739
740        let mut error_emitter = (&mut health_output, &health_cap);
741
742        tracing::info!(
743            ?resume_upper,
744            ?snapshot_upper,
745            "timely-{} upsert source {} starting rehydration",
746            source_config.worker_id,
747            source_config.id
748        );
749        // Read and consolidate the snapshot from the 'previous' input until it
750        // reaches the `resume_upper`.
751        while !PartialOrder::less_equal(&resume_upper, &snapshot_upper) {
752            previous.ready().await;
753            while let Some(event) = previous.next_sync() {
754                match event {
755                    AsyncEvent::Data(_cap, data) => {
756                        events.extend(data.into_iter().filter_map(|((key, value), ts, diff)| {
757                            if !resume_upper.less_equal(&ts) {
758                                Some((key, value, diff))
759                            } else {
760                                None
761                            }
762                        }))
763                    }
764                    AsyncEvent::Progress(upper) => {
765                        snapshot_upper = upper;
766                    }
767                };
768            }
769
770            match state
771                .consolidate_chunk(
772                    events.drain(..),
773                    PartialOrder::less_equal(&resume_upper, &snapshot_upper),
774                )
775                .await
776            {
777                Ok(_) => {
778                    if let Some(ts) = snapshot_upper.clone().into_option() {
779                        // As we shutdown, we could ostensibly get data from later than the
780                        // `resume_upper`, which we ignore above. We don't want our output capability to make
781                        // it further than the `resume_upper`.
782                        if !resume_upper.less_equal(&ts) {
783                            snapshot_cap.downgrade(&ts);
784                            output_cap.downgrade(&ts);
785                        }
786                    }
787                }
788                Err(e) => {
789                    UpsertErrorEmitter::<G>::emit(
790                        &mut error_emitter,
791                        "Failed to rehydrate state".to_string(),
792                        e,
793                    )
794                    .await;
795                }
796            }
797        }
798
799        drop(events);
800        drop(previous_token);
801        drop(snapshot_cap);
802
803        // Exchaust the previous input. It is expected to immediately reach the empty
804        // antichain since we have dropped its token.
805        //
806        // Note that we do not need to also process the `input` during this, as the dropped token
807        // will shutdown the `backpressure` operator
808        while let Some(_event) = previous.next().await {}
809
810        // After snapshotting, our output frontier is exactly the `resume_upper`
811        if let Some(ts) = resume_upper.as_option() {
812            output_cap.downgrade(ts);
813        }
814
815        tracing::info!(
816            "timely-{} upsert source {} finished rehydration",
817            source_config.worker_id,
818            source_config.id
819        );
820
821        // A re-usable buffer of changes, per key. This is an `IndexMap` because it has to be `drain`-able
822        // and have a consistent iteration order.
823        let mut commands_state: indexmap::IndexMap<
824            _,
825            types::UpsertValueAndSize<G::Timestamp, FromTime>,
826        > = indexmap::IndexMap::new();
827        let mut multi_get_scratch = Vec::new();
828
829        // Now can can resume consuming the collection
830        let mut output_updates = vec![];
831        let mut input_upper = Antichain::from_elem(Timestamp::minimum());
832
833        while let Some(event) = input.next().await {
834            // Buffer as many events as possible. This should be bounded, as new data can't be
835            // produced in this worker until we yield to timely.
836            let events = [event]
837                .into_iter()
838                .chain(std::iter::from_fn(|| input.next().now_or_never().flatten()))
839                .enumerate();
840
841            let mut partial_drain_time = None;
842            for (i, event) in events {
843                match event {
844                    AsyncEvent::Data(cap, mut data) => {
845                        tracing::trace!(
846                            time=?cap.time(),
847                            updates=%data.len(),
848                            "received data in upsert"
849                        );
850                        stage_input(
851                            &mut stash,
852                            &mut data,
853                            &input_upper,
854                            &resume_upper,
855                            upsert_config.shrink_upsert_unused_buffers_by_ratio,
856                        );
857
858                        let event_time = cap.time();
859                        // If the data is at _exactly_ the output frontier, we can preemptively drain it into the state.
860                        // Data within this set events strictly beyond this time are staged as
861                        // normal.
862                        //
863                        // This is a load-bearing optimization, as it is required to avoid buffering
864                        // the entire source snapshot in the `stash`.
865                        if prevent_snapshot_buffering && output_cap.time() == event_time {
866                            partial_drain_time = Some(event_time.clone());
867                        }
868                    }
869                    AsyncEvent::Progress(upper) => {
870                        tracing::trace!(?upper, "received progress in upsert");
871                        // Ignore progress updates before the `resume_upper`, which is our initial
872                        // capability post-snapshotting.
873                        if PartialOrder::less_than(&upper, &resume_upper) {
874                            continue;
875                        }
876
877                        // Disable the partial drain as this progress event covers
878                        // the `output_cap` time.
879                        partial_drain_time = None;
880                        drain_staged_input::<_, G, _, _, _>(
881                            &mut stash,
882                            &mut commands_state,
883                            &mut output_updates,
884                            &mut multi_get_scratch,
885                            DrainStyle::ToUpper(&upper),
886                            &mut error_emitter,
887                            &mut state,
888                            &source_config,
889                        )
890                        .await;
891
892                        output_handle.give_container(&output_cap, &mut output_updates);
893
894                        if let Some(ts) = upper.as_option() {
895                            output_cap.downgrade(ts);
896                        }
897                        input_upper = upper;
898                    }
899                }
900                let events_processed = i + 1;
901                if let Some(max) = snapshot_buffering_max {
902                    if events_processed >= max {
903                        break;
904                    }
905                }
906            }
907
908            // If there were staged events that occurred at the capability time, drain
909            // them. This is safe because out-of-order updates to the same key that are
910            // drained in separate calls to `drain_staged_input` are correctly ordered by
911            // their `FromTime` in `drain_staged_input`.
912            //
913            // Note also that this may result in more updates in the output collection than
914            // the minimum. However, because the frontier only advances on `Progress` updates,
915            // the collection always accumulates correctly for all keys.
916            if let Some(partial_drain_time) = partial_drain_time {
917                drain_staged_input::<_, G, _, _, _>(
918                    &mut stash,
919                    &mut commands_state,
920                    &mut output_updates,
921                    &mut multi_get_scratch,
922                    DrainStyle::AtTime(partial_drain_time),
923                    &mut error_emitter,
924                    &mut state,
925                    &source_config,
926                )
927                .await;
928
929                output_handle.give_container(&output_cap, &mut output_updates);
930            }
931        }
932    });
933
934    (
935        output.as_collection().map(|result| match result {
936            Ok(ok) => Ok(ok),
937            Err(err) => Err(DataflowError::from(EnvelopeError::Upsert(*err))),
938        }),
939        health_stream,
940        snapshot_stream,
941        shutdown_button.press_on_drop(),
942    )
943}
944
945#[async_trait::async_trait(?Send)]
946pub(crate) trait UpsertErrorEmitter<G> {
947    async fn emit(&mut self, context: String, e: anyhow::Error);
948}
949
950#[async_trait::async_trait(?Send)]
951impl<G: Scope> UpsertErrorEmitter<G>
952    for (
953        &mut AsyncOutputHandle<
954            <G as ScopeParent>::Timestamp,
955            CapacityContainerBuilder<Vec<(Option<GlobalId>, HealthStatusUpdate)>>,
956        >,
957        &Capability<<G as ScopeParent>::Timestamp>,
958    )
959{
960    async fn emit(&mut self, context: String, e: anyhow::Error) {
961        process_upsert_state_error::<G>(context, e, self.0, self.1).await
962    }
963}
964
965/// Emit the given error, and stall till the dataflow is restarted.
966async fn process_upsert_state_error<G: Scope>(
967    context: String,
968    e: anyhow::Error,
969    health_output: &AsyncOutputHandle<
970        <G as ScopeParent>::Timestamp,
971        CapacityContainerBuilder<Vec<(Option<GlobalId>, HealthStatusUpdate)>>,
972    >,
973    health_cap: &Capability<<G as ScopeParent>::Timestamp>,
974) {
975    let update = HealthStatusUpdate::halting(e.context(context).to_string_with_causes(), None);
976    health_output.give(health_cap, (None, update));
977    std::future::pending::<()>().await;
978    unreachable!("pending future never returns");
979}