mz_storage/upsert/
types.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! # State-management for UPSERT.
11//!
12//! This module and provide structures for use within an UPSERT
13//! operator implementation.
14//!
15//! UPSERT is a effectively a process which transforms a `Stream<(Key, Option<Data>)>`
16//! into a differential collection, by indexing the data based on the key.
17//!
18//! _This module does not implement this transformation, instead exposing APIs designed
19//! for use within an UPSERT operator. There is one exception to this: `consolidate_chunk`
20//! implements an efficient upsert-like transformation to re-index a collection using the
21//! _output collection_ of an upsert transformation. More on this below.
22//!
23//! ## `UpsertState`
24//!
25//! Its primary export is `UpsertState`, which wraps an `UpsertStateBackend` and provides 3 APIs:
26//!
27//! ### `multi_get`
28//! `multi_get` returns the current value for a (unique) set of keys. To keep implementations
29//! efficient, the set of keys is an iterator, and results are written back into another parallel
30//! iterator. In addition to returning the current values, implementations must also return the
31//! _size_ of those values _as they are stored within the implementation_. Implementations are
32//! required to chunk large iterators if they need to operate over smaller batches.
33//!
34//! `multi_get` is implemented directly with `UpsertStateBackend::multi_get`.
35//!
36//! ### `multi_put`
37//! Update or delete values for a set of keys. To keep implementations efficient, the set
38//! of updates is an iterator. Implementations are also required to return the difference
39//! in values and total size after processing the updates. To simplify this (and because
40//! in the `upsert` usecase we have this data readily available), the updates are input
41//! with the size of the current value (if any) that was returned from a previous `multi_get`.
42//! Implementations are required to chunk large iterators if they need to operate over smaller
43//! batches.
44//!
45//! `multi_put` is implemented directly with `UpsertStateBackend::multi_put`.
46//!
47//! ### `consolidate_chunk`
48//!
49//! `consolidate_chunk` re-indexes an UPSERT collection based on its _output collection_ (as
50//! opposed to its _input `Stream`_. Please see the docs on `consolidate_chunk` and `StateValue`
51//! for more information.
52//!
53//! `consolidate_chunk` is implemented with both `UpsertStateBackend::multi_put` and
54//! `UpsertStateBackend::multi_get`
55//!
56//! ## Order Keys
57//!
58//! In practice, the input stream for UPSERT collections includes an _order key_. This is used to
59//! sort data with the same key occurring in the same timestamp. This module provides support
60//! for serializing and deserializing order keys with their associated data. Being able to ingest
61//! data on non-frontier boundaries requires this support.
62//!
63//! A consequence of this is that tombstones with an order key can be stored within the state.
64//! There is currently no support for cleaning these tombstones up, as they are considered rare and
65//! small enough.
66//!
67//! Because `consolidate_chunk` handles data that consolidates correctly, it does not handle
68//! order keys.
69//!
70//!
71//! ## A note on state size
72//!
73//! The `UpsertStateBackend` trait requires implementations report _relatively accurate_ information about
74//! how the state size changes over time. Note that it does NOT ask the implementations to give
75//! accurate information about actual resource consumption (like disk space including space
76//! amplification), and instead is just asking about the size of the values, after they have been
77//! encoded. For implementations like `RocksDB`, these may be highly accurate (it literally
78//! reports the encoded size as written to the RocksDB API, and for others like the
79//! `InMemoryHashMap`, they may be rough estimates of actual memory usage. See
80//! `StateValue::memory_size` for more information.
81//!
82//! Note also that after consolidation, additional space may be used if `StateValue` is
83//! used.
84//!
85
86use std::fmt;
87use std::num::Wrapping;
88use std::sync::Arc;
89use std::time::Instant;
90
91use bincode::Options;
92use itertools::Itertools;
93use mz_ore::cast::CastFrom;
94use mz_ore::error::ErrorExt;
95use mz_repr::{Diff, GlobalId};
96use serde::{Serialize, de::DeserializeOwned};
97
98use crate::metrics::upsert::{UpsertMetrics, UpsertSharedMetrics};
99use crate::statistics::SourceStatistics;
100use crate::upsert::{UpsertKey, UpsertValue};
101
102/// The default set of `bincode` options used for consolidating
103/// upsert updates (and writing values to RocksDB).
104pub type BincodeOpts = bincode::config::DefaultOptions;
105
106/// Build the default `BincodeOpts`.
107pub fn upsert_bincode_opts() -> BincodeOpts {
108    // We don't allow trailing bytes, for now,
109    // and use varint encoding for space saving.
110    bincode::DefaultOptions::new()
111}
112
113/// The result type for `multi_get`.
114/// The value and size are stored in individual `Option`s so callees
115/// can reuse this value as they overwrite this value, keeping
116/// track of the previous metadata. Additionally, values
117/// may be `None` for tombstones.
118#[derive(Clone)]
119pub struct UpsertValueAndSize<T, O> {
120    /// The value, if there was one.
121    pub value: Option<StateValue<T, O>>,
122    /// The size of original`value` as persisted,
123    /// Useful for users keeping track of statistics.
124    pub metadata: Option<ValueMetadata<u64>>,
125}
126
127impl<T, O> std::fmt::Debug for UpsertValueAndSize<T, O> {
128    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
129        f.debug_struct("UpsertValueAndSize")
130            .field("value", &self.value)
131            .field("metadata", &self.metadata)
132            .finish()
133    }
134}
135
136impl<T, O> Default for UpsertValueAndSize<T, O> {
137    fn default() -> Self {
138        Self {
139            value: None,
140            metadata: None,
141        }
142    }
143}
144
145/// Metadata about an existing value in the upsert state backend, as returned
146/// by `multi_get`.
147#[derive(Copy, Clone, Debug)]
148pub struct ValueMetadata<S> {
149    /// The size of the value.
150    pub size: S,
151    /// If the value is a tombstone.
152    pub is_tombstone: bool,
153}
154
155/// A value to put in with `multi_put`.
156#[derive(Clone, Debug)]
157pub struct PutValue<V> {
158    /// The new value, or a `None` to indicate a delete.
159    pub value: Option<V>,
160    /// The value of the previous value for this key, if known.
161    /// Passed into efficiently calculate statistics.
162    pub previous_value_metadata: Option<ValueMetadata<i64>>,
163}
164
165/// A value to put in with a `multi_merge`.
166pub struct MergeValue<V> {
167    /// The value of the merge operand to write to the backend.
168    pub value: V,
169    /// The 'diff' of this merge operand value, used to estimate the overall size diff
170    /// of the working set after this merge operand is merged by the backend.
171    pub diff: Diff,
172}
173
174/// `UpsertState` has 2 modes:
175/// - Normal operation
176/// - Consolidation.
177///
178/// This struct and its substructs are helpers to simplify the logic that
179/// individual `UpsertState` implementations need to do to manage these 2 modes.
180///
181/// Normal operation is simple, we just store an ordinary `UpsertValue`, and allow the implementer
182/// to store it any way they want. During consolidation, the logic is more complex.
183/// See the docs on `StateValue::merge_update` for more information.
184///
185/// Note also that this type is designed to support _partial updates_. All values are
186/// associated with an _order key_ `O` that can be used to determine if a value existing in the
187/// `UpsertStateBackend` occurred before or after a value being considered for insertion.
188///
189/// `O` typically required to be `: Default`, with the default value sorting below all others.
190/// During consolidation, values consolidate correctly (as they are actual
191/// differential updates with diffs), so order keys are not required.
192#[derive(Clone, serde::Serialize, serde::Deserialize)]
193pub enum StateValue<T, O> {
194    Consolidating(Consolidating),
195    Value(Value<T, O>),
196}
197
198impl<T, O> std::fmt::Debug for StateValue<T, O> {
199    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
200        match self {
201            StateValue::Consolidating(_) => write!(f, "Consolidating"),
202            StateValue::Value(_) => write!(f, "Value"),
203        }
204    }
205}
206
207/// A totally consolidated value stored within the `UpsertStateBackend`.
208///
209/// This type contains support for _tombstones_, that contain an _order key_,
210/// and provisional values.
211///
212/// What is considered finalized and provisional depends on the implementation
213/// of the UPSERT operator: it might consider everything that it writes to its
214/// state finalized, and assume that what it emits will be written down in the
215/// output exactly as presented. Or it might consider everything it writes down
216/// provisional, and only consider updates that it _knows_ to be persisted as
217/// finalized.
218///
219/// Provisional values should only be considered while still "working off"
220/// updates with the same timestamp at which the provisional update was
221/// recorded.
222#[derive(Clone, serde::Serialize, serde::Deserialize, Debug)]
223pub enum Value<T, O> {
224    FinalizedValue(UpsertValue, O),
225    Tombstone(O),
226    ProvisionalValue {
227        // We keep the finalized value around, because the provisional value is
228        // only valid when processing updates at the same timestamp. And at any
229        // point we might still require access to the finalized value.
230        finalized_value: Option<Box<(UpsertValue, O)>>,
231        // A provisional value of `None` is a provisional tombstone.
232        //
233        // WIP: We can also box this, to keep the size of StateValue as it was
234        // previously.
235        provisional_value: (Option<UpsertValue>, T, O),
236    },
237}
238
239/// A value as produced during consolidation.
240#[derive(Clone, Default, serde::Serialize, serde::Deserialize, Debug)]
241pub struct Consolidating {
242    #[serde(with = "serde_bytes")]
243    value_xor: Vec<u8>,
244    len_sum: Wrapping<i64>,
245    checksum_sum: Wrapping<i64>,
246    diff_sum: Wrapping<i64>,
247}
248
249impl fmt::Display for Consolidating {
250    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
251        f.debug_struct("Consolidating")
252            .field("len_sum", &self.len_sum)
253            .field("checksum_sum", &self.checksum_sum)
254            .field("diff_sum", &self.diff_sum)
255            .finish_non_exhaustive()
256    }
257}
258
259impl<T, O> StateValue<T, O> {
260    /// A finalized, that is (assumed) persistent, value occurring at some order
261    /// key.
262    pub fn finalized_value(value: UpsertValue, order: O) -> Self {
263        Self::Value(Value::FinalizedValue(value, order))
264    }
265
266    #[allow(unused)]
267    /// A tombstoned value occurring at some order key.
268    pub fn tombstone(order: O) -> Self {
269        Self::Value(Value::Tombstone(order))
270    }
271
272    /// Whether the value is a tombstone.
273    pub fn is_tombstone(&self) -> bool {
274        match self {
275            Self::Value(Value::Tombstone(_)) => true,
276            _ => false,
277        }
278    }
279
280    /// Pull out the order for the given `Value`, assuming `ensure_decoded` has been called.
281    pub fn order(&self) -> &O {
282        match self {
283            Self::Value(Value::FinalizedValue(_, order)) => order,
284            Self::Value(Value::ProvisionalValue { .. }) => {
285                panic!("order() called on provisional value")
286            }
287            Self::Value(Value::Tombstone(order)) => order,
288            _ => panic!("called `order` without calling `ensure_decoded`"),
289        }
290    }
291
292    /// Pull out the `Value` value for a `StateValue`, after `ensure_decoded` has been called.
293    pub fn into_decoded(self) -> Value<T, O> {
294        match self {
295            Self::Value(value) => value,
296            _ => panic!("called `into_decoded without calling `ensure_decoded`"),
297        }
298    }
299
300    /// The size of a `StateValue`, in memory. This is:
301    /// 1. only used in the `InMemoryHashMap` implementation.
302    /// 2. An estimate (it only looks at value sizes, and not errors)
303    ///
304    /// Other implementations may use more accurate accounting.
305    pub fn memory_size(&self) -> u64 {
306        match self {
307            // Similar to `Row::byte_len`, we add the heap size and the size of the value itself.
308            Self::Consolidating(Consolidating { value_xor, .. }) => {
309                u64::cast_from(value_xor.len()) + u64::cast_from(std::mem::size_of::<Self>())
310            }
311            Self::Value(Value::FinalizedValue(Ok(row), ..)) => {
312                // `Row::byte_len` includes the size of `Row`, which is also in `Self`, so we
313                // subtract it.
314                u64::cast_from(row.byte_len()) - u64::cast_from(std::mem::size_of::<mz_repr::Row>())
315                // This assumes the size of any `O` instantiation is meaningful (i.e. not a heap
316                // object).
317                + u64::cast_from(std::mem::size_of::<Self>())
318            }
319            Self::Value(Value::ProvisionalValue {
320                finalized_value,
321                provisional_value,
322            }) => {
323                finalized_value.as_ref().map(|v| match v.as_ref() {
324                    (Ok(row), _order) =>
325                        // The finalized value is boxed, so the size of Row is
326                        // not included in the outer size of Self. We therefore
327                        // don't subtract it here like for the other branches.
328                        u64::cast_from(row.byte_len())
329                        // Add the size of the order, because it's also behind
330                        // the box.
331                        + u64::cast_from(std::mem::size_of::<O>()),
332                    // Assume errors are rare enough to not move the needle.
333                    (Err(_), _order) => 0,
334                }).unwrap_or(0)
335                +
336                provisional_value.0.as_ref().map(|v| match v{
337                    Ok(row) =>
338                        // `Row::byte_len` includes the size of `Row`, which is
339                        // also in `Self`, so we subtract it.
340                        u64::cast_from(row.byte_len()) - u64::cast_from(std::mem::size_of::<mz_repr::Row>()),
341                        // The size of order is already included in the outer
342                        // size of self.
343                    // Assume errors are rare enough to not move the needle.
344                    Err(_) => 0,
345                }).unwrap_or(0)
346                // This assumes the size of any `O` instantiation is meaningful (i.e. not a heap
347                // object).
348                + u64::cast_from(std::mem::size_of::<Self>())
349            }
350            Self::Value(Value::Tombstone(_)) => {
351                // This assumes the size of any `O` instantiation is meaningful (i.e. not a heap
352                // object).
353                u64::cast_from(std::mem::size_of::<Self>())
354            }
355            Self::Value(Value::FinalizedValue(Err(_), ..)) => {
356                // Assume errors are rare enough to not move the needle.
357                0
358            }
359        }
360    }
361}
362
363impl<T: Eq, O> StateValue<T, O> {
364    /// Creates a new provisional value, occurring at some order key, observed
365    /// at the given timestamp.
366    pub fn new_provisional_value(
367        provisional_value: UpsertValue,
368        provisional_ts: T,
369        order: O,
370    ) -> Self {
371        Self::Value(Value::ProvisionalValue {
372            finalized_value: None,
373            provisional_value: (Some(provisional_value), provisional_ts, order),
374        })
375    }
376
377    /// Creates a provisional value, that retains the finalized value along with
378    /// its order in this `StateValue`, if any.
379    ///
380    /// We record the finalized value, so that we can present it when needed or
381    /// when trying to read a provisional value at a different timestamp.
382    pub fn into_provisional_value(
383        self,
384        provisional_value: UpsertValue,
385        provisional_ts: T,
386        provisional_order: O,
387    ) -> Self {
388        match self {
389            StateValue::Value(Value::FinalizedValue(value, order)) => {
390                StateValue::Value(Value::ProvisionalValue {
391                    finalized_value: Some(Box::new((value, order))),
392                    provisional_value: (Some(provisional_value), provisional_ts, provisional_order),
393                })
394            }
395            StateValue::Value(Value::Tombstone(_)) => StateValue::Value(Value::ProvisionalValue {
396                finalized_value: None,
397                provisional_value: (Some(provisional_value), provisional_ts, provisional_order),
398            }),
399            StateValue::Value(Value::ProvisionalValue {
400                finalized_value,
401                provisional_value: _,
402            }) => StateValue::Value(Value::ProvisionalValue {
403                finalized_value,
404                provisional_value: (Some(provisional_value), provisional_ts, provisional_order),
405            }),
406            StateValue::Consolidating(_) => {
407                panic!("called `into_provisional_value` without calling `ensure_decoded`")
408            }
409        }
410    }
411
412    /// Creates a new provisional tombstone occurring at some order key,
413    /// observed at the given timestamp.
414    pub fn new_provisional_tombstone(provisional_ts: T, order: O) -> Self {
415        Self::Value(Value::ProvisionalValue {
416            finalized_value: None,
417            provisional_value: (None, provisional_ts, order),
418        })
419    }
420
421    /// Creates a provisional tombstone, that retains the finalized value along
422    /// with its order in this `StateValue`, if any.
423    ///
424    /// We record the current finalized value, so that we can present it when
425    /// needed or when trying to read a provisional value at a different
426    /// timestamp.
427    pub fn into_provisional_tombstone(self, provisional_ts: T, provisional_order: O) -> Self {
428        match self {
429            StateValue::Value(Value::FinalizedValue(value, order)) => {
430                StateValue::Value(Value::ProvisionalValue {
431                    finalized_value: Some(Box::new((value, order))),
432                    provisional_value: (None, provisional_ts, provisional_order),
433                })
434            }
435            StateValue::Value(Value::Tombstone(order)) => {
436                StateValue::Value(Value::ProvisionalValue {
437                    finalized_value: None,
438                    provisional_value: (None, provisional_ts, order),
439                })
440            }
441            StateValue::Value(Value::ProvisionalValue {
442                finalized_value,
443                provisional_value: _,
444            }) => StateValue::Value(Value::ProvisionalValue {
445                finalized_value,
446                provisional_value: (None, provisional_ts, provisional_order),
447            }),
448            StateValue::Consolidating(_) => {
449                panic!("called `into_provisional_tombstone` without calling `ensure_decoded`")
450            }
451        }
452    }
453
454    /// Returns the order of a provisional value at the given timestamp. If that
455    /// doesn't exist, the order of the finalized value.
456    ///
457    /// Returns `None` if none of the above exist.
458    pub fn provisional_order(&self, ts: &T) -> Option<&O> {
459        match self {
460            Self::Value(Value::FinalizedValue(_, order)) => Some(order),
461            Self::Value(Value::Tombstone(order)) => Some(order),
462            Self::Value(Value::ProvisionalValue {
463                finalized_value: _,
464                provisional_value: (_, provisional_ts, provisional_order),
465            }) if provisional_ts == ts => Some(provisional_order),
466            Self::Value(Value::ProvisionalValue {
467                finalized_value,
468                provisional_value: _,
469            }) => finalized_value.as_ref().map(|v| &v.1),
470            Self::Consolidating(_) => {
471                panic!("called `provisional_order` without calling `ensure_decoded`")
472            }
473        }
474    }
475
476    // WIP: We don't need these after all, but leave in for review.
477    ///// Returns the order of this value, if a finalized value is present.
478    //pub fn finalized_order(&self) -> Option<&O> {
479    //    match self {
480    //        Self::Value(Value::FinalizedValue(_, order)) => Some(order),
481    //        Self::Value(Value::Tombstone(order)) => Some(order),
482    //        Self::Value(Value::ProvisionalValue {
483    //            finalized_value,
484    //            provisional_value: _,
485    //        }) => finalized_value.as_ref().map(|v| &v.1),
486    //        Self::Consolidating(_) => {
487    //            panic!("called `finalized_order` without calling `ensure_decoded`")
488    //        }
489    //    }
490    //}
491
492    ///// Returns the provisional value, if one is present at the given timestamp.
493    ///// Falls back to the finalized value, or `None` if there is neither.
494    //pub fn into_provisional_value(self, ts: &T) -> Option<(UpsertValue, O)> {
495    //    match self {
496    //        Self::Value(Value::FinalizedValue(value, order)) => Some((value, order)),
497    //        Self::Value(Value::Tombstone(_)) => None,
498    //        Self::Value(Value::ProvisionalValue {
499    //            finalized_value: _,
500    //            provisional_value: (provisional_value, provisional_ts, provisional_order),
501    //        }) if provisional_ts == *ts => provisional_value.map(|v| (v, provisional_order)),
502    //        Self::Value(Value::ProvisionalValue {
503    //            finalized_value,
504    //            provisional_value: _,
505    //        }) => finalized_value.map(|boxed| *boxed),
506    //        Self::Consolidating(_) => {
507    //            panic!("called `into_provisional_value` without calling `ensure_decoded`")
508    //        }
509    //    }
510    //}
511
512    /// Returns the provisional value, if one is present at the given timestamp.
513    /// Falls back to the finalized value, or `None` if there is neither.
514    pub fn provisional_value_ref(&self, ts: &T) -> Option<&UpsertValue> {
515        match self {
516            Self::Value(Value::FinalizedValue(value, _order)) => Some(value),
517            Self::Value(Value::Tombstone(_)) => None,
518            Self::Value(Value::ProvisionalValue {
519                finalized_value: _,
520                provisional_value: (provisional_value, provisional_ts, _provisional_order),
521            }) if provisional_ts == ts => provisional_value.as_ref(),
522            Self::Value(Value::ProvisionalValue {
523                finalized_value,
524                provisional_value: _,
525            }) => finalized_value.as_ref().map(|boxed| &boxed.0),
526            Self::Consolidating(_) => {
527                panic!("called `provisional_value_ref` without calling `ensure_decoded`")
528            }
529        }
530    }
531
532    /// Returns the the finalized value, if one is present.
533    pub fn into_finalized_value(self) -> Option<(UpsertValue, O)> {
534        match self {
535            Self::Value(Value::FinalizedValue(value, order)) => Some((value, order)),
536            Self::Value(Value::Tombstone(_order)) => None,
537            Self::Value(Value::ProvisionalValue {
538                finalized_value,
539                provisional_value: _,
540            }) => finalized_value.map(|boxed| *boxed),
541            _ => panic!("called `order` without calling `ensure_decoded`"),
542        }
543    }
544}
545
546impl<T: Eq, O: Default> StateValue<T, O> {
547    /// We use a XOR trick in order to accumulate the values without having to store the full
548    /// unconsolidated history in memory. For all (value, diff) updates of a key we track:
549    /// - diff_sum = SUM(diff)
550    /// - checksum_sum = SUM(checksum(bincode(value)) * diff)
551    /// - len_sum = SUM(len(bincode(value)) * diff)
552    /// - value_xor = XOR(bincode(value))
553    ///
554    /// ## Return value
555    /// Returns a `bool` indicating whether or not the current merged value is able to be deleted.
556    ///
557    /// ## Correctness
558    ///
559    /// The method is correct because a well formed upsert collection at a given
560    /// timestamp will have for each key:
561    /// - Zero or one updates of the form (cur_value, +1)
562    /// - Zero or more pairs of updates of the form (prev_value, +1), (prev_value, -1)
563    ///
564    /// We are interested in extracting the cur_value of each key and discard all prev_values
565    /// that might be included in the stream. Since the history of prev_values always comes in
566    /// pairs, computing the XOR of those is always going to cancel their effects out. Also,
567    /// since XOR is commutative this property is true independent of the order. The same is
568    /// true for the summations of the length and checksum since the sum will contain the
569    /// unrelated values zero times.
570    ///
571    /// Therefore the accumulators will end up precisely in one of two states:
572    /// 1. diff == 0, checksum == 0, value == [0..] => the key is not present
573    /// 2. diff == 1, checksum == checksum(cur_value) value == cur_value => the key is present
574    ///
575    /// ## Robustness
576    ///
577    /// In the absense of bugs, accumulating the diff and checksum is not required since we know
578    /// that a well formed collection always satisfies XOR(bincode(values)) == bincode(cur_value).
579    /// However bugs may happen and so storing 16 more bytes per key to have a very high
580    /// guarantee that we're not decoding garbage is more than worth it.
581    /// The main key->value used to store previous values.
582    #[allow(clippy::as_conversions)]
583    pub fn merge_update(
584        &mut self,
585        value: UpsertValue,
586        diff: mz_repr::Diff,
587        bincode_opts: BincodeOpts,
588        bincode_buffer: &mut Vec<u8>,
589    ) -> bool {
590        match self {
591            Self::Consolidating(Consolidating {
592                value_xor,
593                len_sum,
594                checksum_sum,
595                diff_sum,
596            }) => {
597                bincode_buffer.clear();
598                bincode_opts
599                    .serialize_into(&mut *bincode_buffer, &value)
600                    .unwrap();
601                let len = i64::try_from(bincode_buffer.len()).unwrap();
602
603                *diff_sum += diff.into_inner();
604                *len_sum += len.wrapping_mul(diff.into_inner());
605                // Truncation is fine (using `as`) as this is just a checksum
606                *checksum_sum +=
607                    (seahash::hash(&*bincode_buffer) as i64).wrapping_mul(diff.into_inner());
608
609                // XOR of even diffs cancel out, so we only do it if diff is odd
610                if diff.abs() % Diff::from(2) == Diff::ONE {
611                    if value_xor.len() < bincode_buffer.len() {
612                        value_xor.resize(bincode_buffer.len(), 0);
613                    }
614                    // Note that if the new value is _smaller_ than the `value_xor`, and
615                    // the values at the end are zeroed out, we can shrink the buffer. This
616                    // is extremely sensitive code, so we don't (yet) do that.
617                    for (acc, val) in value_xor.iter_mut().zip(bincode_buffer.drain(..)) {
618                        *acc ^= val;
619                    }
620                }
621
622                // Returns whether or not the value can be deleted. This allows
623                // us to delete values in `UpsertState::consolidate_chunk` (even
624                // if they come back later), to minimize space usage.
625                diff_sum.0 == 0 && checksum_sum.0 == 0 && value_xor.iter().all(|&x| x == 0)
626            }
627            StateValue::Value(_value) => {
628                // We can turn a Value back into a Consolidating state:
629                // `std::mem::take` will leave behind a default value, which
630                // happens to be a default `Consolidating` `StateValue`.
631                let this = std::mem::take(self);
632
633                let finalized_value = this.into_finalized_value();
634                if let Some((finalized_value, _order)) = finalized_value {
635                    // If we had a value before, merge it into the
636                    // now-consolidating state first.
637                    let _ =
638                        self.merge_update(finalized_value, Diff::ONE, bincode_opts, bincode_buffer);
639
640                    // Then merge the new value in.
641                    self.merge_update(value, diff, bincode_opts, bincode_buffer)
642                } else {
643                    // We didn't have a value before, might have been a
644                    // tombstone. So just merge in the new value.
645                    self.merge_update(value, diff, bincode_opts, bincode_buffer)
646                }
647            }
648        }
649    }
650
651    /// Merge an existing StateValue into this one, using the same method described in `merge_update`.
652    /// See the docstring above for more information on correctness and robustness.
653    pub fn merge_update_state(&mut self, other: &Self) {
654        match (self, other) {
655            (
656                Self::Consolidating(Consolidating {
657                    value_xor,
658                    len_sum,
659                    checksum_sum,
660                    diff_sum,
661                }),
662                Self::Consolidating(other_consolidating),
663            ) => {
664                *diff_sum += other_consolidating.diff_sum;
665                *len_sum += other_consolidating.len_sum;
666                *checksum_sum += other_consolidating.checksum_sum;
667                if other_consolidating.value_xor.len() > value_xor.len() {
668                    value_xor.resize(other_consolidating.value_xor.len(), 0);
669                }
670                for (acc, val) in value_xor
671                    .iter_mut()
672                    .zip(other_consolidating.value_xor.iter())
673                {
674                    *acc ^= val;
675                }
676            }
677            _ => panic!("`merge_update_state` called with non-consolidating state"),
678        }
679    }
680
681    /// During and after consolidation, we assume that values in the `UpsertStateBackend` implementation
682    /// can be `Self::Consolidating`, with a `diff_sum` of 1 (or 0, if they have been deleted).
683    /// Afterwards, if we need to retract one of these values, we need to assert that its in this correct state,
684    /// then mutate it to its `Value` state, so the `upsert` operator can use it.
685    #[allow(clippy::as_conversions)]
686    pub fn ensure_decoded(&mut self, bincode_opts: BincodeOpts, source_id: GlobalId) {
687        match self {
688            StateValue::Consolidating(consolidating) => {
689                match consolidating.diff_sum.0 {
690                    1 => {
691                        let len = usize::try_from(consolidating.len_sum.0)
692                            .map_err(|_| {
693                                format!(
694                                    "len_sum can't be made into a usize, state: {}, {}",
695                                    consolidating, source_id,
696                                )
697                            })
698                            .expect("invalid upsert state");
699                        let value = &consolidating
700                            .value_xor
701                            .get(..len)
702                            .ok_or_else(|| {
703                                format!(
704                                    "value_xor is not the same length ({}) as len ({}), state: {}, {}",
705                                    consolidating.value_xor.len(),
706                                    len,
707                                    consolidating,
708                                    source_id,
709                                )
710                            })
711                            .expect("invalid upsert state");
712                        // Truncation is fine (using `as`) as this is just a checksum
713                        assert_eq!(
714                            consolidating.checksum_sum.0,
715                            // Hash the value, not the full buffer, which may have extra 0's
716                            seahash::hash(value) as i64,
717                            "invalid upsert state: checksum_sum does not match, state: {}, {}",
718                            consolidating,
719                            source_id,
720                        );
721                        *self = Self::Value(Value::FinalizedValue(
722                            bincode_opts.deserialize(value).unwrap(),
723                            Default::default(),
724                        ));
725                    }
726                    0 => {
727                        assert_eq!(
728                            consolidating.len_sum.0, 0,
729                            "invalid upsert state: len_sum is non-0, state: {}, {}",
730                            consolidating, source_id,
731                        );
732                        assert_eq!(
733                            consolidating.checksum_sum.0, 0,
734                            "invalid upsert state: checksum_sum is non-0, state: {}, {}",
735                            consolidating, source_id,
736                        );
737                        assert!(
738                            consolidating.value_xor.iter().all(|&x| x == 0),
739                            "invalid upsert state: value_xor not all 0s with 0 diff. \
740                            Non-zero positions: {:?}, state: {}, {}",
741                            consolidating
742                                .value_xor
743                                .iter()
744                                .positions(|&x| x != 0)
745                                .collect::<Vec<_>>(),
746                            consolidating,
747                            source_id,
748                        );
749                        *self = Self::Value(Value::Tombstone(Default::default()));
750                    }
751                    other => panic!(
752                        "invalid upsert state: non 0/1 diff_sum: {}, state: {}, {}",
753                        other, consolidating, source_id
754                    ),
755                }
756            }
757            _ => {}
758        }
759    }
760}
761
762impl<T, O> Default for StateValue<T, O> {
763    fn default() -> Self {
764        Self::Consolidating(Consolidating::default())
765    }
766}
767
768/// Statistics for a single call to `consolidate_chunk`.
769#[derive(Clone, Default, Debug)]
770pub struct SnapshotStats {
771    /// The number of updates processed.
772    pub updates: u64,
773    /// The aggregated number of values inserted or deleted into `state`.
774    pub values_diff: Diff,
775    /// The total aggregated size of values inserted, deleted, or updated in `state`.
776    /// If the current call to `consolidate_chunk` deletes a lot of values,
777    /// or updates values to smaller ones, this can be negative!
778    pub size_diff: i64,
779    /// The number of inserts i.e. +1 diff
780    pub inserts: u64,
781    /// The number of deletes i.e. -1 diffs
782    pub deletes: u64,
783}
784
785impl std::ops::AddAssign for SnapshotStats {
786    fn add_assign(&mut self, rhs: Self) {
787        self.updates += rhs.updates;
788        self.values_diff += rhs.values_diff;
789        self.size_diff += rhs.size_diff;
790        self.inserts += rhs.inserts;
791        self.deletes += rhs.deletes;
792    }
793}
794
795/// Statistics for a single call to `multi_merge`.
796#[derive(Clone, Default, Debug)]
797pub struct MergeStats {
798    /// The number of updates written as merge operands to the backend, for the backend
799    /// to process async in the `consolidating_merge_function`.
800    /// Should be equal to number of inserts + deletes
801    pub written_merge_operands: u64,
802    /// The total size of values provided to `multi_merge`. The backend will write these
803    /// down and then later merge them in the `consolidating_merge_function`.
804    pub size_written: u64,
805    /// The estimated diff of the total size of the working set after the merge operands
806    /// are merged by the backend. This is an estimate since it can't account for the
807    /// size overhead of `StateValue` for values that consolidate to 0 (tombstoned-values).
808    pub size_diff: i64,
809}
810
811/// Statistics for a single call to `multi_put`.
812#[derive(Clone, Default, Debug)]
813pub struct PutStats {
814    /// The number of puts/deletes processed
815    /// Should be equal to number of inserts + updates + deletes
816    pub processed_puts: u64,
817    /// The aggregated number of non-tombstone values inserted or deleted into `state`.
818    pub values_diff: i64,
819    /// The aggregated number of tombstones inserted or deleted into `state`
820    pub tombstones_diff: i64,
821    /// The total aggregated size of values inserted, deleted, or updated in `state`.
822    /// If the current call to `multi_put` deletes a lot of values,
823    /// or updates values to smaller ones, this can be negative!
824    pub size_diff: i64,
825    /// The number of inserts
826    pub inserts: u64,
827    /// The number of updates
828    pub updates: u64,
829    /// The number of deletes
830    pub deletes: u64,
831}
832
833impl PutStats {
834    /// Adjust the `PutStats` based on the new value and the previous metadata.
835    ///
836    /// The size parameter is separate as its value is backend-dependent. Its optional
837    /// as some backends increase the total size after an entire batch is processed.
838    ///
839    /// This method is provided for implementors of `UpsertStateBackend::multi_put`.
840    pub fn adjust<T, O>(
841        &mut self,
842        new_value: Option<&StateValue<T, O>>,
843        new_size: Option<i64>,
844        previous_metdata: &Option<ValueMetadata<i64>>,
845    ) {
846        self.adjust_size(new_value, new_size, previous_metdata);
847        self.adjust_values(new_value, previous_metdata);
848        self.adjust_tombstone(new_value, previous_metdata);
849    }
850
851    fn adjust_size<T, O>(
852        &mut self,
853        new_value: Option<&StateValue<T, O>>,
854        new_size: Option<i64>,
855        previous_metdata: &Option<ValueMetadata<i64>>,
856    ) {
857        match (&new_value, previous_metdata.as_ref()) {
858            (Some(_), Some(ps)) => {
859                self.size_diff -= ps.size;
860                if let Some(new_size) = new_size {
861                    self.size_diff += new_size;
862                }
863            }
864            (None, Some(ps)) => {
865                self.size_diff -= ps.size;
866            }
867            (Some(_), None) => {
868                if let Some(new_size) = new_size {
869                    self.size_diff += new_size;
870                }
871            }
872            (None, None) => {}
873        }
874    }
875
876    fn adjust_values<T, O>(
877        &mut self,
878        new_value: Option<&StateValue<T, O>>,
879        previous_metdata: &Option<ValueMetadata<i64>>,
880    ) {
881        let truly_new_value = new_value.map_or(false, |v| !v.is_tombstone());
882        let truly_old_value = previous_metdata.map_or(false, |v| !v.is_tombstone);
883
884        match (truly_new_value, truly_old_value) {
885            (false, true) => {
886                self.values_diff -= 1;
887            }
888            (true, false) => {
889                self.values_diff += 1;
890            }
891            _ => {}
892        }
893    }
894
895    fn adjust_tombstone<T, O>(
896        &mut self,
897        new_value: Option<&StateValue<T, O>>,
898        previous_metdata: &Option<ValueMetadata<i64>>,
899    ) {
900        let new_tombstone = new_value.map_or(false, |v| v.is_tombstone());
901        let old_tombstone = previous_metdata.map_or(false, |v| v.is_tombstone);
902
903        match (new_tombstone, old_tombstone) {
904            (false, true) => {
905                self.tombstones_diff -= 1;
906            }
907            (true, false) => {
908                self.tombstones_diff += 1;
909            }
910            _ => {}
911        }
912    }
913}
914
915/// Statistics for a single call to `multi_get`.
916#[derive(Clone, Default, Debug)]
917pub struct GetStats {
918    /// The number of gets processed
919    pub processed_gets: u64,
920    /// The total size in bytes returned
921    pub processed_gets_size: u64,
922    /// The number of non-empty records returned
923    pub returned_gets: u64,
924}
925
926/// A trait that defines the fundamental primitives required by a state-backing of
927/// `UpsertState`.
928///
929/// Implementors of this trait are blind maps that associate keys and values. They need
930/// not understand the semantics of `StateValue`, tombstones, or anything else related
931/// to a correct `upsert` implementation. The singular exception to this is that they
932/// **must** produce accurate `PutStats` and `GetStats`. The reasoning for this is two-fold:
933/// - efficiency: this avoids additional buffer allocation.
934/// - value sizes: only the backend implementation understands the size of values as recorded
935///
936/// This **must** is not a correctness requirement (we won't panic when emitting statistics), but
937/// rather a requirement to ensure the upsert operator is introspectable.
938#[async_trait::async_trait(?Send)]
939pub trait UpsertStateBackend<T, O>
940where
941    T: 'static,
942    O: 'static,
943{
944    /// Whether this backend supports the `multi_merge` operation.
945    fn supports_merge(&self) -> bool;
946
947    /// Insert or delete for all `puts` keys, prioritizing the last value for
948    /// repeated keys.
949    ///
950    /// The `PutValue` is _guaranteed_ to have an accurate and up-to-date
951    /// record of the metadata for existing value for the given key (if one existed),
952    /// as reported by a previous call to `multi_get`.
953    ///
954    /// `PutStats` **must** be populated correctly, according to these semantics:
955    /// - `values_diff` must record the difference in number of new non-tombstone values being
956    /// inserted into the backend.
957    /// - `tombstones_diff` must record the difference in number of tombstone values being
958    /// inserted into the backend.
959    /// - `size_diff` must record the change in size for the values being inserted/deleted/updated
960    /// in the backend, regardless of whether the values are tombstones or not.
961    async fn multi_put<P>(&mut self, puts: P) -> Result<PutStats, anyhow::Error>
962    where
963        P: IntoIterator<Item = (UpsertKey, PutValue<StateValue<T, O>>)>;
964
965    /// Get the `gets` keys, which must be unique, placing the results in `results_out`.
966    ///
967    /// Panics if `gets` and `results_out` are not the same length.
968    async fn multi_get<'r, G, R>(
969        &mut self,
970        gets: G,
971        results_out: R,
972    ) -> Result<GetStats, anyhow::Error>
973    where
974        G: IntoIterator<Item = UpsertKey>,
975        R: IntoIterator<Item = &'r mut UpsertValueAndSize<T, O>>;
976
977    /// For each key in `merges` writes a 'merge operand' to the backend. The backend stores these
978    /// merge operands and periodically calls the `consolidating_merge_function` to merge them into
979    /// any existing value for each key. The backend will merge the merge operands in the order
980    /// they are provided, and the merge function will always be run for a given key when a `get`
981    /// operation is performed on that key, or when the backend decides to run the merge based
982    /// on its own internal logic.
983    /// This allows avoiding the read-modify-write method of updating many values to
984    /// improve performance.
985    ///
986    /// The `MergeValue` should include a `diff` field that represents the update diff for the
987    /// value. This is used to estimate the overall size diff of the working set
988    /// after the merge operands are merged by the backend `sum[merges: m](m.diff * m.size)`.
989    ///
990    /// `MergeStats` **must** be populated correctly, according to these semantics:
991    /// - `written_merge_operands` must record the number of merge operands written to the backend.
992    /// - `size_written` must record the total size of values written to the backend.
993    ///     Note that the size of the post-merge values are not known, so this is the size of the
994    ///     values written to the backend as merge operands.
995    /// - `size_diff` must record the estimated diff of the total size of the working set after the
996    ///    merge operands are merged by the backend.
997    async fn multi_merge<P>(&mut self, merges: P) -> Result<MergeStats, anyhow::Error>
998    where
999        P: IntoIterator<Item = (UpsertKey, MergeValue<StateValue<T, O>>)>;
1000}
1001
1002/// A function that merges a set of updates for a key into the existing value
1003/// for the key. This is called by the backend implementation when it has
1004/// accumulated a set of updates for a key, and needs to merge them into the
1005/// existing value for the key.
1006///
1007/// The function is called with the following arguments:
1008/// - The key for which the merge is being performed.
1009/// - An iterator over any current value and merge operands queued for the key.
1010///
1011/// The function should return the new value for the key after merging all the updates.
1012pub(crate) fn consolidating_merge_function<T, O>(
1013    _key: UpsertKey,
1014    updates: impl Iterator<Item = StateValue<T, O>>,
1015) -> StateValue<T, O>
1016where
1017    O: Default,
1018    T: std::cmp::Eq,
1019{
1020    let mut current: StateValue<T, O> = Default::default();
1021
1022    let mut bincode_buf = Vec::new();
1023    for update in updates {
1024        match update {
1025            StateValue::Consolidating(_) => {
1026                current.merge_update_state(&update);
1027            }
1028            StateValue::Value(_) => {
1029                // This branch is more expensive, but we hopefully rarely hit
1030                // it.
1031                if let Some((finalized_value, _order)) = update.into_finalized_value() {
1032                    let mut update = StateValue::default();
1033                    update.merge_update(
1034                        finalized_value,
1035                        Diff::ONE,
1036                        upsert_bincode_opts(),
1037                        &mut bincode_buf,
1038                    );
1039                    current.merge_update_state(&update);
1040                }
1041            }
1042        }
1043    }
1044
1045    current
1046}
1047
1048/// An `UpsertStateBackend` wrapper that supports consolidating merging, and
1049/// reports basic metrics about the usage of the `UpsertStateBackend`.
1050pub struct UpsertState<'metrics, S, T, O> {
1051    inner: S,
1052
1053    // The status, start time, and stats about calls to `consolidate_chunk`.
1054    pub snapshot_start: Instant,
1055    snapshot_stats: SnapshotStats,
1056    snapshot_completed: bool,
1057
1058    // Metrics shared across all workers running the `upsert` operator.
1059    metrics: Arc<UpsertSharedMetrics>,
1060    // Metrics for a specific worker.
1061    worker_metrics: &'metrics UpsertMetrics,
1062    // User-facing statistics.
1063    stats: SourceStatistics,
1064
1065    // Bincode options and buffer used in `consolidate_chunk`.
1066    bincode_opts: BincodeOpts,
1067    bincode_buffer: Vec<u8>,
1068
1069    // We need to iterate over `updates` in `consolidate_chunk` twice, so we
1070    // have a scratch vector for this.
1071    consolidate_scratch: Vec<(UpsertKey, UpsertValue, mz_repr::Diff)>,
1072    // "mini-upsert" map used in `consolidate_chunk`
1073    consolidate_upsert_scratch: indexmap::IndexMap<UpsertKey, UpsertValueAndSize<T, O>>,
1074    // a scratch vector for calling `multi_get`
1075    multi_get_scratch: Vec<UpsertKey>,
1076    shrink_upsert_unused_buffers_by_ratio: usize,
1077}
1078
1079impl<'metrics, S, T, O> UpsertState<'metrics, S, T, O> {
1080    pub(crate) fn new(
1081        inner: S,
1082        metrics: Arc<UpsertSharedMetrics>,
1083        worker_metrics: &'metrics UpsertMetrics,
1084        stats: SourceStatistics,
1085        shrink_upsert_unused_buffers_by_ratio: usize,
1086    ) -> Self {
1087        Self {
1088            inner,
1089            snapshot_start: Instant::now(),
1090            snapshot_stats: SnapshotStats::default(),
1091            snapshot_completed: false,
1092            metrics,
1093            worker_metrics,
1094            stats,
1095            bincode_opts: upsert_bincode_opts(),
1096            bincode_buffer: Vec::new(),
1097            consolidate_scratch: Vec::new(),
1098            consolidate_upsert_scratch: indexmap::IndexMap::new(),
1099            multi_get_scratch: Vec::new(),
1100            shrink_upsert_unused_buffers_by_ratio,
1101        }
1102    }
1103}
1104
1105impl<S, T, O> UpsertState<'_, S, T, O>
1106where
1107    S: UpsertStateBackend<T, O>,
1108    T: Eq + Clone + Send + Sync + Serialize + 'static,
1109    O: Default + Clone + Send + Sync + Serialize + DeserializeOwned + 'static,
1110{
1111    /// Consolidate the following differential updates into the state. Updates
1112    /// provided to this method can be assumed to consolidate into a single
1113    /// value per-key, after all chunks of updates for a given timestamp have
1114    /// been processed,
1115    ///
1116    /// Therefore, after all updates of a given timestamp have been
1117    /// `consolidated`, all values must be in the correct state (as determined
1118    /// by `StateValue::ensure_decoded`).
1119    ///
1120    /// The `completed` boolean communicates whether or not this is the final
1121    /// chunk of updates for the initial "snapshot" from persist.
1122    ///
1123    /// If the backend supports it, this method will use `multi_merge` to
1124    /// consolidate the updates to avoid having to read the existing value for
1125    /// each key first. On some backends (like RocksDB), this can be
1126    /// significantly faster than the read-then-write consolidation strategy.
1127    ///
1128    /// Also note that we use `self.inner.multi_*`, not `self.multi_*`. This is
1129    /// to avoid erroneously changing metric and stats values.
1130    pub async fn consolidate_chunk<U>(
1131        &mut self,
1132        updates: U,
1133        completed: bool,
1134    ) -> Result<(), anyhow::Error>
1135    where
1136        U: IntoIterator<Item = (UpsertKey, UpsertValue, mz_repr::Diff)> + ExactSizeIterator,
1137    {
1138        fail::fail_point!("fail_consolidate_chunk", |_| {
1139            Err(anyhow::anyhow!("Error consolidating values"))
1140        });
1141
1142        if completed && self.snapshot_completed {
1143            panic!("attempted completion of already completed upsert snapshot")
1144        }
1145
1146        let now = Instant::now();
1147        let batch_size = updates.len();
1148
1149        self.consolidate_scratch.clear();
1150        self.consolidate_upsert_scratch.clear();
1151        self.multi_get_scratch.clear();
1152
1153        // Shrinking the scratch vectors if the capacity is significantly more than batch size
1154        if self.shrink_upsert_unused_buffers_by_ratio > 0 {
1155            let reduced_capacity =
1156                self.consolidate_scratch.capacity() / self.shrink_upsert_unused_buffers_by_ratio;
1157            if reduced_capacity > batch_size {
1158                // These vectors have already been cleared above and should be empty here
1159                self.consolidate_scratch.shrink_to(reduced_capacity);
1160                self.consolidate_upsert_scratch.shrink_to(reduced_capacity);
1161                self.multi_get_scratch.shrink_to(reduced_capacity);
1162            }
1163        }
1164
1165        // Depending on if the backend supports multi_merge, call the appropriate method.
1166        // This can change during the lifetime of the `UpsertState` instance (e.g.
1167        // the Autospill backend will switch from in-memory to rocksdb after a certain
1168        // number of updates have been processed and begin supporting multi_merge).
1169        let stats = if self.inner.supports_merge() {
1170            self.consolidate_merge_inner(updates).await?
1171        } else {
1172            self.consolidate_read_write_inner(updates).await?
1173        };
1174
1175        // NOTE: These metrics use the term `merge` to refer to the consolidation of values.
1176        // This is because they were introduced before we the `multi_merge` operation was added.
1177        self.metrics
1178            .merge_snapshot_latency
1179            .observe(now.elapsed().as_secs_f64());
1180        self.worker_metrics
1181            .merge_snapshot_updates
1182            .inc_by(stats.updates);
1183        self.worker_metrics
1184            .merge_snapshot_inserts
1185            .inc_by(stats.inserts);
1186        self.worker_metrics
1187            .merge_snapshot_deletes
1188            .inc_by(stats.deletes);
1189
1190        self.stats.update_bytes_indexed_by(stats.size_diff);
1191        self.stats
1192            .update_records_indexed_by(stats.values_diff.into_inner());
1193
1194        self.snapshot_stats += stats;
1195
1196        if !self.snapshot_completed {
1197            // Updating the metrics
1198            self.worker_metrics.rehydration_total.set(
1199                self.snapshot_stats
1200                    .values_diff
1201                    .into_inner()
1202                    .try_into()
1203                    .unwrap_or_else(|e: std::num::TryFromIntError| {
1204                        tracing::warn!(
1205                            "rehydration_total metric overflowed or is negative \
1206                        and is innacurate: {}. Defaulting to 0",
1207                            e.display_with_causes(),
1208                        );
1209
1210                        0
1211                    }),
1212            );
1213            self.worker_metrics
1214                .rehydration_updates
1215                .set(self.snapshot_stats.updates);
1216        }
1217
1218        if completed {
1219            if self.shrink_upsert_unused_buffers_by_ratio > 0 {
1220                // After rehydration is done, these scratch buffers should now be empty
1221                // shrinking them entirely
1222                self.consolidate_scratch.shrink_to_fit();
1223                self.consolidate_upsert_scratch.shrink_to_fit();
1224                self.multi_get_scratch.shrink_to_fit();
1225            }
1226
1227            self.worker_metrics
1228                .rehydration_latency
1229                .set(self.snapshot_start.elapsed().as_secs_f64());
1230
1231            self.snapshot_completed = true;
1232        }
1233        Ok(())
1234    }
1235
1236    /// Consolidate the updates into the state. This method requires the backend
1237    /// has support for the `multi_merge` operation, and will panic if
1238    /// `self.inner.supports_merge()` was not checked before calling this
1239    /// method. `multi_merge` will write the updates as 'merge operands' to the
1240    /// backend, and then the backend will consolidate those updates with any
1241    /// existing state using the `consolidating_merge_function`.
1242    ///
1243    /// This method can have significant performance benefits over the
1244    /// read-then-write method of `consolidate_read_write_inner`.
1245    async fn consolidate_merge_inner<U>(
1246        &mut self,
1247        updates: U,
1248    ) -> Result<SnapshotStats, anyhow::Error>
1249    where
1250        U: IntoIterator<Item = (UpsertKey, UpsertValue, mz_repr::Diff)> + ExactSizeIterator,
1251    {
1252        let mut updates = updates.into_iter().peekable();
1253
1254        let mut stats = SnapshotStats::default();
1255
1256        if updates.peek().is_some() {
1257            let m_stats = self
1258                .inner
1259                .multi_merge(updates.map(|(k, v, diff)| {
1260                    // Transform into a `StateValue<O>` that can be used by the
1261                    // `consolidating_merge_function` to merge with any existing
1262                    // value for the key.
1263                    let mut val: StateValue<T, O> = Default::default();
1264                    val.merge_update(v, diff, self.bincode_opts, &mut self.bincode_buffer);
1265
1266                    stats.updates += 1;
1267                    if diff.is_positive() {
1268                        stats.inserts += 1;
1269                    } else if diff.is_negative() {
1270                        stats.deletes += 1;
1271                    }
1272
1273                    // To keep track of the overall `values_diff` we can use the sum of diffs which
1274                    // should be equal to the number of non-tombstoned values in the backend.
1275                    // This is a bit misleading as this represents the eventual state after the
1276                    // `consolidating_merge_function` has been called to merge all the updates,
1277                    // and not the state after this `multi_merge` call.
1278                    //
1279                    // This does not accurately report values that have been consolidated to diff == 0, as tracking that
1280                    // per-key is extremely difficult.
1281                    stats.values_diff += diff;
1282
1283                    (k, MergeValue { value: val, diff })
1284                }))
1285                .await?;
1286
1287            stats.size_diff = m_stats.size_diff;
1288        }
1289
1290        Ok(stats)
1291    }
1292
1293    /// Consolidates the updates into the state. This method reads the existing
1294    /// values for each key, consolidates the updates, and writes the new values
1295    /// back to the state.
1296    async fn consolidate_read_write_inner<U>(
1297        &mut self,
1298        updates: U,
1299    ) -> Result<SnapshotStats, anyhow::Error>
1300    where
1301        U: IntoIterator<Item = (UpsertKey, UpsertValue, mz_repr::Diff)> + ExactSizeIterator,
1302    {
1303        let mut updates = updates.into_iter().peekable();
1304
1305        let mut stats = SnapshotStats::default();
1306
1307        if updates.peek().is_some() {
1308            self.consolidate_scratch.extend(updates);
1309            self.consolidate_upsert_scratch.extend(
1310                self.consolidate_scratch
1311                    .iter()
1312                    .map(|(k, _, _)| (*k, UpsertValueAndSize::default())),
1313            );
1314            self.multi_get_scratch
1315                .extend(self.consolidate_upsert_scratch.iter().map(|(k, _)| *k));
1316            self.inner
1317                .multi_get(
1318                    self.multi_get_scratch.drain(..),
1319                    self.consolidate_upsert_scratch.iter_mut().map(|(_, v)| v),
1320                )
1321                .await?;
1322
1323            for (key, value, diff) in self.consolidate_scratch.drain(..) {
1324                stats.updates += 1;
1325                if diff.is_positive() {
1326                    stats.inserts += 1;
1327                } else if diff.is_negative() {
1328                    stats.deletes += 1;
1329                }
1330
1331                // We rely on the diffs in our input instead of the result of
1332                // multi_put below. This makes sure we report the same stats as
1333                // `consolidate_merge_inner`, regardless of what values
1334                // there were in state before.
1335                stats.values_diff += diff;
1336
1337                let entry = self.consolidate_upsert_scratch.get_mut(&key).unwrap();
1338                let val = entry.value.get_or_insert_with(Default::default);
1339
1340                if val.merge_update(value, diff, self.bincode_opts, &mut self.bincode_buffer) {
1341                    entry.value = None;
1342                }
1343            }
1344
1345            // Note we do 1 `multi_get` and 1 `multi_put` while processing a _batch of updates_.
1346            // Within the batch, we effectively consolidate each key, before persisting that
1347            // consolidated value. Easy!!
1348            let p_stats = self
1349                .inner
1350                .multi_put(self.consolidate_upsert_scratch.drain(..).map(|(k, v)| {
1351                    (
1352                        k,
1353                        PutValue {
1354                            value: v.value,
1355                            previous_value_metadata: v.metadata.map(|v| ValueMetadata {
1356                                size: v.size.try_into().expect("less than i64 size"),
1357                                is_tombstone: v.is_tombstone,
1358                            }),
1359                        },
1360                    )
1361                }))
1362                .await?;
1363
1364            stats.size_diff = p_stats.size_diff;
1365        }
1366
1367        Ok(stats)
1368    }
1369
1370    /// Insert or delete for all `puts` keys, prioritizing the last value for
1371    /// repeated keys.
1372    pub async fn multi_put<P>(
1373        &mut self,
1374        update_per_record_stats: bool,
1375        puts: P,
1376    ) -> Result<(), anyhow::Error>
1377    where
1378        P: IntoIterator<Item = (UpsertKey, PutValue<Value<T, O>>)>,
1379    {
1380        fail::fail_point!("fail_state_multi_put", |_| {
1381            Err(anyhow::anyhow!("Error putting values into state"))
1382        });
1383        let now = Instant::now();
1384        let stats = self
1385            .inner
1386            .multi_put(puts.into_iter().map(|(k, pv)| {
1387                (
1388                    k,
1389                    PutValue {
1390                        value: pv.value.map(StateValue::Value),
1391                        previous_value_metadata: pv.previous_value_metadata,
1392                    },
1393                )
1394            }))
1395            .await?;
1396
1397        self.metrics
1398            .multi_put_latency
1399            .observe(now.elapsed().as_secs_f64());
1400        self.worker_metrics
1401            .multi_put_size
1402            .inc_by(stats.processed_puts);
1403
1404        if update_per_record_stats {
1405            self.worker_metrics.upsert_inserts.inc_by(stats.inserts);
1406            self.worker_metrics.upsert_updates.inc_by(stats.updates);
1407            self.worker_metrics.upsert_deletes.inc_by(stats.deletes);
1408
1409            self.stats.update_bytes_indexed_by(stats.size_diff);
1410            self.stats.update_records_indexed_by(stats.values_diff);
1411            self.stats
1412                .update_envelope_state_tombstones_by(stats.tombstones_diff);
1413        }
1414
1415        Ok(())
1416    }
1417
1418    /// Get the `gets` keys, which must be unique, placing the results in `results_out`.
1419    ///
1420    /// Panics if `gets` and `results_out` are not the same length.
1421    pub async fn multi_get<'r, G, R>(
1422        &mut self,
1423        gets: G,
1424        results_out: R,
1425    ) -> Result<(), anyhow::Error>
1426    where
1427        G: IntoIterator<Item = UpsertKey>,
1428        R: IntoIterator<Item = &'r mut UpsertValueAndSize<T, O>>,
1429        O: 'r,
1430    {
1431        fail::fail_point!("fail_state_multi_get", |_| {
1432            Err(anyhow::anyhow!("Error getting values from state"))
1433        });
1434        let now = Instant::now();
1435        let stats = self.inner.multi_get(gets, results_out).await?;
1436
1437        self.metrics
1438            .multi_get_latency
1439            .observe(now.elapsed().as_secs_f64());
1440        self.worker_metrics
1441            .multi_get_size
1442            .inc_by(stats.processed_gets);
1443        self.worker_metrics
1444            .multi_get_result_count
1445            .inc_by(stats.returned_gets);
1446        self.worker_metrics
1447            .multi_get_result_bytes
1448            .inc_by(stats.processed_gets_size);
1449
1450        Ok(())
1451    }
1452}
1453
1454#[cfg(test)]
1455mod tests {
1456    use mz_repr::Row;
1457
1458    use super::*;
1459    #[mz_ore::test]
1460    fn test_merge_update() {
1461        let mut buf = Vec::new();
1462        let opts = upsert_bincode_opts();
1463
1464        let mut s = StateValue::<(), ()>::Consolidating(Consolidating::default());
1465
1466        let small_row = Ok(Row::default());
1467        let longer_row = Ok(Row::pack([mz_repr::Datum::Null]));
1468        s.merge_update(small_row, Diff::ONE, opts, &mut buf);
1469        s.merge_update(longer_row.clone(), Diff::MINUS_ONE, opts, &mut buf);
1470        // This clears the retraction of the `longer_row`, but the
1471        // `value_xor` is the length of the `longer_row`. This tests
1472        // that we are tracking checksums correctly.
1473        s.merge_update(longer_row, Diff::ONE, opts, &mut buf);
1474
1475        // Assert that the `Consolidating` value is fully merged.
1476        s.ensure_decoded(opts, GlobalId::User(1));
1477    }
1478
1479    // We guard some of our assumptions. Increasing in-memory size of StateValue
1480    // has a direct impact on memory usage of in-memory UPSERT sources.
1481    #[mz_ore::test]
1482    fn test_memory_size() {
1483        let finalized_value: StateValue<(), ()> =
1484            StateValue::finalized_value(Ok(Row::default()), ());
1485        assert!(
1486            finalized_value.memory_size() <= 88,
1487            "memory size is {}",
1488            finalized_value.memory_size(),
1489        );
1490
1491        let provisional_value_with_finalized_value: StateValue<(), ()> =
1492            finalized_value.into_provisional_value(Ok(Row::default()), (), ());
1493        assert!(
1494            provisional_value_with_finalized_value.memory_size() <= 112,
1495            "memory size is {}",
1496            provisional_value_with_finalized_value.memory_size(),
1497        );
1498
1499        let provisional_value_without_finalized_value: StateValue<(), ()> =
1500            StateValue::new_provisional_value(Ok(Row::default()), (), ());
1501        assert!(
1502            provisional_value_without_finalized_value.memory_size() <= 88,
1503            "memory size is {}",
1504            provisional_value_without_finalized_value.memory_size(),
1505        );
1506
1507        let mut consolidating_value: StateValue<(), ()> = StateValue::default();
1508        consolidating_value.merge_update(
1509            Ok(Row::default()),
1510            Diff::ONE,
1511            upsert_bincode_opts(),
1512            &mut Vec::new(),
1513        );
1514        assert!(
1515            consolidating_value.memory_size() <= 90,
1516            "memory size is {}",
1517            consolidating_value.memory_size(),
1518        );
1519    }
1520
1521    #[mz_ore::test]
1522    #[should_panic(
1523        expected = "invalid upsert state: len_sum is non-0, state: Consolidating { len_sum: 1"
1524    )]
1525    fn test_merge_update_len_0_assert() {
1526        let mut buf = Vec::new();
1527        let opts = upsert_bincode_opts();
1528
1529        let mut s = StateValue::<(), ()>::Consolidating(Consolidating::default());
1530
1531        let small_row = Ok(mz_repr::Row::default());
1532        let longer_row = Ok(mz_repr::Row::pack([mz_repr::Datum::Null]));
1533        s.merge_update(longer_row.clone(), Diff::ONE, opts, &mut buf);
1534        s.merge_update(small_row.clone(), Diff::MINUS_ONE, opts, &mut buf);
1535
1536        s.ensure_decoded(opts, GlobalId::User(1));
1537    }
1538
1539    #[mz_ore::test]
1540    #[should_panic(
1541        expected = "invalid upsert state: \"value_xor is not the same length (3) as len (4), state: Consolidating { len_sum: 4"
1542    )]
1543    fn test_merge_update_len_to_long_assert() {
1544        let mut buf = Vec::new();
1545        let opts = upsert_bincode_opts();
1546
1547        let mut s = StateValue::<(), ()>::Consolidating(Consolidating::default());
1548
1549        let small_row = Ok(mz_repr::Row::default());
1550        let longer_row = Ok(mz_repr::Row::pack([mz_repr::Datum::Null]));
1551        s.merge_update(longer_row.clone(), Diff::ONE, opts, &mut buf);
1552        s.merge_update(small_row.clone(), Diff::MINUS_ONE, opts, &mut buf);
1553        s.merge_update(longer_row.clone(), Diff::ONE, opts, &mut buf);
1554
1555        s.ensure_decoded(opts, GlobalId::User(1));
1556    }
1557
1558    #[mz_ore::test]
1559    #[should_panic(expected = "invalid upsert state: checksum_sum does not match")]
1560    fn test_merge_update_checksum_doesnt_match() {
1561        let mut buf = Vec::new();
1562        let opts = upsert_bincode_opts();
1563
1564        let mut s = StateValue::<(), ()>::Consolidating(Consolidating::default());
1565
1566        let small_row = Ok(mz_repr::Row::pack([mz_repr::Datum::Int64(2)]));
1567        let longer_row = Ok(mz_repr::Row::pack([mz_repr::Datum::Int64(1)]));
1568        s.merge_update(longer_row.clone(), Diff::ONE, opts, &mut buf);
1569        s.merge_update(small_row.clone(), Diff::MINUS_ONE, opts, &mut buf);
1570        s.merge_update(longer_row.clone(), Diff::ONE, opts, &mut buf);
1571
1572        s.ensure_decoded(opts, GlobalId::User(1));
1573    }
1574}