Skip to main content

mz_storage/upsert/
types.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! # State-management for UPSERT.
11//!
12//! This module and provide structures for use within an UPSERT
13//! operator implementation.
14//!
15//! UPSERT is a effectively a process which transforms a `Stream<(Key, Option<Data>)>`
16//! into a differential collection, by indexing the data based on the key.
17//!
18//! _This module does not implement this transformation, instead exposing APIs designed
19//! for use within an UPSERT operator. There is one exception to this: `consolidate_chunk`
20//! implements an efficient upsert-like transformation to re-index a collection using the
21//! _output collection_ of an upsert transformation. More on this below.
22//!
23//! ## `UpsertState`
24//!
25//! Its primary export is `UpsertState`, which wraps an `UpsertStateBackend` and provides 3 APIs:
26//!
27//! ### `multi_get`
28//! `multi_get` returns the current value for a (unique) set of keys. To keep implementations
29//! efficient, the set of keys is an iterator, and results are written back into another parallel
30//! iterator. In addition to returning the current values, implementations must also return the
31//! _size_ of those values _as they are stored within the implementation_. Implementations are
32//! required to chunk large iterators if they need to operate over smaller batches.
33//!
34//! `multi_get` is implemented directly with `UpsertStateBackend::multi_get`.
35//!
36//! ### `multi_put`
37//! Update or delete values for a set of keys. To keep implementations efficient, the set
38//! of updates is an iterator. Implementations are also required to return the difference
39//! in values and total size after processing the updates. To simplify this (and because
40//! in the `upsert` usecase we have this data readily available), the updates are input
41//! with the size of the current value (if any) that was returned from a previous `multi_get`.
42//! Implementations are required to chunk large iterators if they need to operate over smaller
43//! batches.
44//!
45//! `multi_put` is implemented directly with `UpsertStateBackend::multi_put`.
46//!
47//! ### `consolidate_chunk`
48//!
49//! `consolidate_chunk` re-indexes an UPSERT collection based on its _output collection_ (as
50//! opposed to its _input `Stream`_. Please see the docs on `consolidate_chunk` and `StateValue`
51//! for more information.
52//!
53//! `consolidate_chunk` is implemented with both `UpsertStateBackend::multi_put` and
54//! `UpsertStateBackend::multi_get`
55//!
56//! ## Order Keys
57//!
58//! In practice, the input stream for UPSERT collections includes an _order key_. This is used to
59//! sort data with the same key occurring in the same timestamp. This module provides support
60//! for serializing and deserializing order keys with their associated data. Being able to ingest
61//! data on non-frontier boundaries requires this support.
62//!
63//! A consequence of this is that tombstones with an order key can be stored within the state.
64//! There is currently no support for cleaning these tombstones up, as they are considered rare and
65//! small enough.
66//!
67//! Because `consolidate_chunk` handles data that consolidates correctly, it does not handle
68//! order keys.
69//!
70//!
71//! ## A note on state size
72//!
73//! The `UpsertStateBackend` trait requires implementations report _relatively accurate_ information about
74//! how the state size changes over time. Note that it does NOT ask the implementations to give
75//! accurate information about actual resource consumption (like disk space including space
76//! amplification), and instead is just asking about the size of the values, after they have been
77//! encoded. For implementations like `RocksDB`, these may be highly accurate (it literally
78//! reports the encoded size as written to the RocksDB API, and for others like the
79//! `InMemoryHashMap`, they may be rough estimates of actual memory usage. See
80//! `StateValue::memory_size` for more information.
81//!
82//! Note also that after consolidation, additional space may be used if `StateValue` is
83//! used.
84//!
85
86use std::fmt;
87use std::num::Wrapping;
88use std::sync::Arc;
89use std::time::Instant;
90
91use bincode::Options;
92use itertools::Itertools;
93use mz_ore::error::ErrorExt;
94use mz_repr::{Diff, GlobalId};
95use serde::{Serialize, de::DeserializeOwned};
96
97use crate::metrics::upsert::{UpsertMetrics, UpsertSharedMetrics};
98use crate::statistics::SourceStatistics;
99use crate::upsert::{UpsertKey, UpsertValue};
100
101/// The default set of `bincode` options used for consolidating
102/// upsert updates (and writing values to RocksDB).
103pub type BincodeOpts = bincode::config::DefaultOptions;
104
105/// Build the default `BincodeOpts`.
106pub fn upsert_bincode_opts() -> BincodeOpts {
107    // We don't allow trailing bytes, for now,
108    // and use varint encoding for space saving.
109    bincode::DefaultOptions::new()
110}
111
112/// The result type for `multi_get`.
113/// The value and size are stored in individual `Option`s so callees
114/// can reuse this value as they overwrite this value, keeping
115/// track of the previous metadata. Additionally, values
116/// may be `None` for tombstones.
117#[derive(Clone)]
118pub struct UpsertValueAndSize<T, O> {
119    /// The value, if there was one.
120    pub value: Option<StateValue<T, O>>,
121    /// The size of original`value` as persisted,
122    /// Useful for users keeping track of statistics.
123    pub metadata: Option<ValueMetadata<u64>>,
124}
125
126impl<T, O> std::fmt::Debug for UpsertValueAndSize<T, O> {
127    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
128        f.debug_struct("UpsertValueAndSize")
129            .field("value", &self.value)
130            .field("metadata", &self.metadata)
131            .finish()
132    }
133}
134
135impl<T, O> Default for UpsertValueAndSize<T, O> {
136    fn default() -> Self {
137        Self {
138            value: None,
139            metadata: None,
140        }
141    }
142}
143
144/// Metadata about an existing value in the upsert state backend, as returned
145/// by `multi_get`.
146#[derive(Copy, Clone, Debug)]
147pub struct ValueMetadata<S> {
148    /// The size of the value.
149    pub size: S,
150    /// If the value is a tombstone.
151    pub is_tombstone: bool,
152}
153
154/// A value to put in with `multi_put`.
155#[derive(Clone, Debug)]
156pub struct PutValue<V> {
157    /// The new value, or a `None` to indicate a delete.
158    pub value: Option<V>,
159    /// The value of the previous value for this key, if known.
160    /// Passed into efficiently calculate statistics.
161    pub previous_value_metadata: Option<ValueMetadata<i64>>,
162}
163
164/// A value to put in with a `multi_merge`.
165pub struct MergeValue<V> {
166    /// The value of the merge operand to write to the backend.
167    pub value: V,
168    /// The 'diff' of this merge operand value, used to estimate the overall size diff
169    /// of the working set after this merge operand is merged by the backend.
170    pub diff: Diff,
171}
172
173/// `UpsertState` has 2 modes:
174/// - Normal operation
175/// - Consolidation.
176///
177/// This struct and its substructs are helpers to simplify the logic that
178/// individual `UpsertState` implementations need to do to manage these 2 modes.
179///
180/// Normal operation is simple, we just store an ordinary `UpsertValue`, and allow the implementer
181/// to store it any way they want. During consolidation, the logic is more complex.
182/// See the docs on `StateValue::merge_update` for more information.
183///
184/// Note also that this type is designed to support _partial updates_. All values are
185/// associated with an _order key_ `O` that can be used to determine if a value existing in the
186/// `UpsertStateBackend` occurred before or after a value being considered for insertion.
187///
188/// `O` typically required to be `: Default`, with the default value sorting below all others.
189/// During consolidation, values consolidate correctly (as they are actual
190/// differential updates with diffs), so order keys are not required.
191#[derive(Clone, serde::Serialize, serde::Deserialize)]
192pub enum StateValue<T, O> {
193    Consolidating(Consolidating),
194    Value(Value<T, O>),
195}
196
197impl<T, O> std::fmt::Debug for StateValue<T, O> {
198    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
199        match self {
200            StateValue::Consolidating(c) => std::fmt::Display::fmt(c, f),
201            StateValue::Value(_) => write!(f, "Value"),
202        }
203    }
204}
205
206/// A totally consolidated value stored within the `UpsertStateBackend`.
207///
208/// This type contains support for _tombstones_, that contain an _order key_,
209/// and provisional values.
210///
211/// What is considered finalized and provisional depends on the implementation
212/// of the UPSERT operator: it might consider everything that it writes to its
213/// state finalized, and assume that what it emits will be written down in the
214/// output exactly as presented. Or it might consider everything it writes down
215/// provisional, and only consider updates that it _knows_ to be persisted as
216/// finalized.
217///
218/// Provisional values should only be considered while still "working off"
219/// updates with the same timestamp at which the provisional update was
220/// recorded.
221#[derive(Clone, serde::Serialize, serde::Deserialize, Debug)]
222pub struct Value<T, O> {
223    /// The finalized value of a key is the value we know to be correct for the last complete
224    /// timestamp that got processed. A finalized value of None means that the key has been deleted
225    /// and acts as a tombstone.
226    pub finalized: Option<UpsertValue>,
227    /// When `Some(_)` it contains the upsert value has been processed for a yet incomplete
228    /// timestamp. When None, no provisional update has been emitted yet.
229    pub provisional: Option<ProvisionalValue<T, O>>,
230}
231
232/// A provisional value emitted for a timestamp. This struct contains enough information to
233#[derive(Clone, serde::Serialize, serde::Deserialize, Debug)]
234pub struct ProvisionalValue<T, O> {
235    /// The timestamp at which this provisional value occured at
236    pub timestamp: T,
237    /// The order of this upsert command *within* the timestamp. Commands that happen at the same
238    /// timestamp with lower order get ignored. Commands with higher order override this one. If
239    /// there a case of equal order then the value itself is used as a tie breaker.
240    pub order: O,
241    /// The provisional value. A provisional value of None means that the key has been deleted and
242    /// acts as a tombstone.
243    pub value: Option<UpsertValue>,
244}
245
246/// A value as produced during consolidation.
247#[derive(Clone, Default, serde::Serialize, serde::Deserialize, Debug)]
248pub struct Consolidating {
249    #[serde(with = "serde_bytes")]
250    value_xor: Vec<u8>,
251    len_sum: Wrapping<i64>,
252    checksum_sum: Wrapping<i64>,
253    diff_sum: Wrapping<i64>,
254}
255
256impl fmt::Display for Consolidating {
257    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
258        f.debug_struct("Consolidating")
259            .field("len_sum", &self.len_sum)
260            .field("checksum_sum", &self.checksum_sum)
261            .field("diff_sum", &self.diff_sum)
262            .finish_non_exhaustive()
263    }
264}
265
266impl<T, O> StateValue<T, O> {
267    /// A finalized, that is (assumed) persistent, value.
268    pub fn finalized_value(value: UpsertValue) -> Self {
269        Self::Value(Value {
270            finalized: Some(value),
271            provisional: None,
272        })
273    }
274
275    #[allow(unused)]
276    /// A tombstoned value.
277    pub fn tombstone() -> Self {
278        Self::Value(Value {
279            finalized: None,
280            provisional: None,
281        })
282    }
283
284    /// Whether the value is a tombstone.
285    pub fn is_tombstone(&self) -> bool {
286        match self {
287            Self::Value(value) => value.finalized.is_none(),
288            Self::Consolidating(_) => false,
289        }
290    }
291
292    /// Pull out the `Value` value for a `StateValue`, after `ensure_decoded` has been called.
293    pub fn into_decoded(self) -> Value<T, O> {
294        match self {
295            Self::Value(value) => value,
296            Self::Consolidating(_) => {
297                panic!("called `into_decoded without calling `ensure_decoded`")
298            }
299        }
300    }
301
302    /// The size of a `StateValue`, in memory. This is:
303    /// 1. only used in the `InMemoryHashMap` implementation.
304    /// 2. An estimate (it only looks at value sizes, and not errors)
305    ///
306    /// Other implementations may use more accurate accounting.
307    #[cfg(test)]
308    pub fn memory_size(&self) -> usize {
309        use mz_repr::Row;
310        use std::mem::size_of;
311
312        let heap_size = match self {
313            Self::Consolidating(Consolidating { value_xor, .. }) => value_xor.len(),
314            Self::Value(value) => {
315                let finalized_heap_size = match value.finalized {
316                    Some(Ok(ref row)) => {
317                        // `Row::byte_len` includes the size of `Row`, which is also in `Self`, so we
318                        // subtract it.
319                        row.byte_len() - size_of::<Row>()
320                    }
321                    // Assume errors are rare enough to not move the needle.
322                    _ => 0,
323                };
324                let provisional_heap_size = match value.provisional {
325                    Some(ref provisional) => match provisional.value {
326                        Some(Ok(ref row)) => {
327                            // `Row::byte_len` includes the size of `Row`, which is also in `Self`, so we
328                            // subtract it.
329                            row.byte_len() - size_of::<Row>()
330                        }
331                        // Assume errors are rare enough to not move the needle.
332                        _ => 0,
333                    },
334                    None => 0,
335                };
336                finalized_heap_size + provisional_heap_size
337            }
338        };
339        heap_size + size_of::<Self>()
340    }
341}
342
343impl<T: Eq, O> StateValue<T, O> {
344    /// Creates a new provisional value, occurring at some order key, observed
345    /// at the given timestamp.
346    pub fn new_provisional_value(value: UpsertValue, timestamp: T, order: O) -> Self {
347        Self::Value(Value {
348            finalized: None,
349            provisional: Some(ProvisionalValue {
350                value: Some(value),
351                timestamp,
352                order,
353            }),
354        })
355    }
356
357    /// Creates a provisional value, that retains the finalized value in this `StateValue`.
358    pub fn into_provisional_value(self, value: UpsertValue, timestamp: T, order: O) -> Self {
359        match self {
360            StateValue::Value(finalized) => Self::Value(Value {
361                finalized: finalized.finalized,
362                provisional: Some(ProvisionalValue {
363                    value: Some(value),
364                    timestamp,
365                    order,
366                }),
367            }),
368            StateValue::Consolidating(_) => {
369                panic!("called `into_provisional_value` without calling `ensure_decoded`")
370            }
371        }
372    }
373
374    /// Creates a new provisional tombstone occurring at some order key,
375    /// observed at the given timestamp.
376    pub fn new_provisional_tombstone(timestamp: T, order: O) -> Self {
377        Self::Value(Value {
378            finalized: None,
379            provisional: Some(ProvisionalValue {
380                value: None,
381                timestamp,
382                order,
383            }),
384        })
385    }
386
387    /// Creates a provisional tombstone, that retains the finalized value in this `StateValue`.
388    ///
389    /// We record the current finalized value, so that we can present it when
390    /// needed or when trying to read a provisional value at a different
391    /// timestamp.
392    pub fn into_provisional_tombstone(self, timestamp: T, order: O) -> Self {
393        match self {
394            StateValue::Value(finalized) => Self::Value(Value {
395                finalized: finalized.finalized,
396                provisional: Some(ProvisionalValue {
397                    value: None,
398                    timestamp,
399                    order,
400                }),
401            }),
402            StateValue::Consolidating(_) => {
403                panic!("called `into_provisional_tombstone` without calling `ensure_decoded`")
404            }
405        }
406    }
407
408    /// Returns the order of a provisional value at the given timestamp, if any.
409    pub fn provisional_order(&self, ts: &T) -> Option<&O> {
410        match self {
411            Self::Value(value) => match &value.provisional {
412                Some(p) if &p.timestamp == ts => Some(&p.order),
413                _ => None,
414            },
415            Self::Consolidating(_) => {
416                panic!("called `provisional_order` without calling `ensure_decoded`")
417            }
418        }
419    }
420
421    /// Returns the provisional value, if one is present at the given timestamp.
422    /// Falls back to the finalized value, or `None` if there is neither.
423    pub fn provisional_value_ref(&self, ts: &T) -> Option<&UpsertValue> {
424        match self {
425            Self::Value(value) => match &value.provisional {
426                Some(p) if &p.timestamp == ts => p.value.as_ref(),
427                _ => value.finalized.as_ref(),
428            },
429            Self::Consolidating(_) => {
430                panic!("called `provisional_value_ref` without calling `ensure_decoded`")
431            }
432        }
433    }
434
435    /// Returns the the finalized value, if one is present.
436    pub fn into_finalized_value(self) -> Option<UpsertValue> {
437        match self {
438            Self::Value(v) => v.finalized,
439            Self::Consolidating(_) => {
440                panic!("called `into_finalized_value` without calling `ensure_decoded`")
441            }
442        }
443    }
444}
445
446impl<T: Eq, O> StateValue<T, O> {
447    /// We use a XOR trick in order to accumulate the values without having to store the full
448    /// unconsolidated history in memory. For all (value, diff) updates of a key we track:
449    /// - diff_sum = SUM(diff)
450    /// - checksum_sum = SUM(checksum(bincode(value)) * diff)
451    /// - len_sum = SUM(len(bincode(value)) * diff)
452    /// - value_xor = XOR(bincode(value))
453    ///
454    /// ## Return value
455    /// Returns a `bool` indicating whether or not the current merged value is able to be deleted.
456    ///
457    /// ## Correctness
458    ///
459    /// The method is correct because a well formed upsert collection at a given
460    /// timestamp will have for each key:
461    /// - Zero or one updates of the form (cur_value, +1)
462    /// - Zero or more pairs of updates of the form (prev_value, +1), (prev_value, -1)
463    ///
464    /// We are interested in extracting the cur_value of each key and discard all prev_values
465    /// that might be included in the stream. Since the history of prev_values always comes in
466    /// pairs, computing the XOR of those is always going to cancel their effects out. Also,
467    /// since XOR is commutative this property is true independent of the order. The same is
468    /// true for the summations of the length and checksum since the sum will contain the
469    /// unrelated values zero times.
470    ///
471    /// Therefore the accumulators will end up precisely in one of two states:
472    /// 1. diff == 0, checksum == 0, value == [0..] => the key is not present
473    /// 2. diff == 1, checksum == checksum(cur_value) value == cur_value => the key is present
474    ///
475    /// ## Robustness
476    ///
477    /// In the absense of bugs, accumulating the diff and checksum is not required since we know
478    /// that a well formed collection always satisfies XOR(bincode(values)) == bincode(cur_value).
479    /// However bugs may happen and so storing 16 more bytes per key to have a very high
480    /// guarantee that we're not decoding garbage is more than worth it.
481    /// The main key->value used to store previous values.
482    #[allow(clippy::as_conversions)]
483    pub fn merge_update(
484        &mut self,
485        value: UpsertValue,
486        diff: mz_repr::Diff,
487        bincode_opts: BincodeOpts,
488        bincode_buffer: &mut Vec<u8>,
489    ) -> bool {
490        match self {
491            Self::Consolidating(Consolidating {
492                value_xor,
493                len_sum,
494                checksum_sum,
495                diff_sum,
496            }) => {
497                bincode_buffer.clear();
498                bincode_opts
499                    .serialize_into(&mut *bincode_buffer, &value)
500                    .unwrap();
501                let len = i64::try_from(bincode_buffer.len()).unwrap();
502
503                *diff_sum += diff.into_inner();
504                *len_sum += len.wrapping_mul(diff.into_inner());
505                // Truncation is fine (using `as`) as this is just a checksum
506                *checksum_sum +=
507                    (seahash::hash(&*bincode_buffer) as i64).wrapping_mul(diff.into_inner());
508
509                // XOR of even diffs cancel out, so we only do it if diff is odd
510                if diff.abs() % Diff::from(2) == Diff::ONE {
511                    if value_xor.len() < bincode_buffer.len() {
512                        value_xor.resize(bincode_buffer.len(), 0);
513                    }
514                    // Note that if the new value is _smaller_ than the `value_xor`, and
515                    // the values at the end are zeroed out, we can shrink the buffer. This
516                    // is extremely sensitive code, so we don't (yet) do that.
517                    for (acc, val) in value_xor[0..bincode_buffer.len()]
518                        .iter_mut()
519                        .zip_eq(bincode_buffer.drain(..))
520                    {
521                        *acc ^= val;
522                    }
523                }
524
525                // Returns whether or not the value can be deleted. This allows
526                // us to delete values in `UpsertState::consolidate_chunk` (even
527                // if they come back later), to minimize space usage.
528                diff_sum.0 == 0 && checksum_sum.0 == 0 && value_xor.iter().all(|&x| x == 0)
529            }
530            StateValue::Value(_value) => {
531                // We can turn a Value back into a Consolidating state:
532                // `std::mem::take` will leave behind a default value, which
533                // happens to be a default `Consolidating` `StateValue`.
534                let this = std::mem::take(self);
535
536                let finalized_value = this.into_finalized_value();
537                if let Some(finalized_value) = finalized_value {
538                    // If we had a value before, merge it into the
539                    // now-consolidating state first.
540                    let _ =
541                        self.merge_update(finalized_value, Diff::ONE, bincode_opts, bincode_buffer);
542
543                    // Then merge the new value in.
544                    self.merge_update(value, diff, bincode_opts, bincode_buffer)
545                } else {
546                    // We didn't have a value before, might have been a
547                    // tombstone. So just merge in the new value.
548                    self.merge_update(value, diff, bincode_opts, bincode_buffer)
549                }
550            }
551        }
552    }
553
554    /// Merge an existing StateValue into this one, using the same method described in `merge_update`.
555    /// See the docstring above for more information on correctness and robustness.
556    pub fn merge_update_state(&mut self, other: &Self) {
557        match (self, other) {
558            (
559                Self::Consolidating(Consolidating {
560                    value_xor,
561                    len_sum,
562                    checksum_sum,
563                    diff_sum,
564                }),
565                Self::Consolidating(other_consolidating),
566            ) => {
567                *diff_sum += other_consolidating.diff_sum;
568                *len_sum += other_consolidating.len_sum;
569                *checksum_sum += other_consolidating.checksum_sum;
570                if other_consolidating.value_xor.len() > value_xor.len() {
571                    value_xor.resize(other_consolidating.value_xor.len(), 0);
572                }
573                for (acc, val) in value_xor[0..other_consolidating.value_xor.len()]
574                    .iter_mut()
575                    .zip_eq(other_consolidating.value_xor.iter())
576                {
577                    *acc ^= val;
578                }
579            }
580            _ => panic!("`merge_update_state` called with non-consolidating state"),
581        }
582    }
583
584    /// During and after consolidation, we assume that values in the `UpsertStateBackend` implementation
585    /// can be `Self::Consolidating`, with a `diff_sum` of 1 (or 0, if they have been deleted).
586    /// Afterwards, if we need to retract one of these values, we need to assert that its in this correct state,
587    /// then mutate it to its `Value` state, so the `upsert` operator can use it.
588    #[allow(clippy::as_conversions)]
589    pub fn ensure_decoded(
590        &mut self,
591        bincode_opts: BincodeOpts,
592        source_id: GlobalId,
593        key: Option<&UpsertKey>,
594    ) {
595        match self {
596            StateValue::Consolidating(consolidating) => {
597                match consolidating.diff_sum.0 {
598                    1 => {
599                        let len = usize::try_from(consolidating.len_sum.0)
600                            .map_err(|_| {
601                                format!(
602                                    "len_sum can't be made into a usize, state: {}, {}",
603                                    consolidating, source_id,
604                                )
605                            })
606                            .expect("invalid upsert state");
607                        let value = &consolidating
608                            .value_xor
609                            .get(..len)
610                            .ok_or_else(|| {
611                                format!(
612                                    "value_xor is not the same length ({}) as len ({}), state: {}, {}",
613                                    consolidating.value_xor.len(),
614                                    len,
615                                    consolidating,
616                                    source_id,
617                                )
618                            })
619                            .expect("invalid upsert state");
620                        // Truncation is fine (using `as`) as this is just a checksum
621                        assert_eq!(
622                            consolidating.checksum_sum.0,
623                            // Hash the value, not the full buffer, which may have extra 0's
624                            seahash::hash(value) as i64,
625                            "invalid upsert state: checksum_sum does not match, state: {}, {}",
626                            consolidating,
627                            source_id,
628                        );
629                        *self = Self::finalized_value(bincode_opts.deserialize(value).unwrap());
630                    }
631                    0 => {
632                        assert_eq!(
633                            consolidating.len_sum.0, 0,
634                            "invalid upsert state: len_sum is non-0, state: {}, {}",
635                            consolidating, source_id,
636                        );
637                        assert_eq!(
638                            consolidating.checksum_sum.0, 0,
639                            "invalid upsert state: checksum_sum is non-0, state: {}, {}",
640                            consolidating, source_id,
641                        );
642                        assert!(
643                            consolidating.value_xor.iter().all(|&x| x == 0),
644                            "invalid upsert state: value_xor not all 0s with 0 diff. \
645                            Non-zero positions: {:?}, state: {}, {}",
646                            consolidating
647                                .value_xor
648                                .iter()
649                                .positions(|&x| x != 0)
650                                .collect::<Vec<_>>(),
651                            consolidating,
652                            source_id,
653                        );
654                        *self = Self::tombstone();
655                    }
656                    other => {
657                        // If diff_sum is odd, value_xor holds the bincode of a
658                        // single value (even XORs cancel out). Try to decode it
659                        // so we can log the shape (not contents) for debugging.
660                        let value_byte_len = usize::try_from(consolidating.len_sum.0 / other).ok();
661                        let decode_ok = value_byte_len
662                            .and_then(|l| consolidating.value_xor.get(..l))
663                            .and_then(|bytes| bincode_opts.deserialize::<UpsertValue>(bytes).ok())
664                            .map(|v| match v {
665                                Ok(row) => format!(
666                                    "Ok(Row(byte_len={}, col_count={}))",
667                                    row.byte_len(),
668                                    row.iter().count(),
669                                ),
670                                Err(_) => "Err(UpsertValueError)".to_string(),
671                            });
672                        panic!(
673                            "invalid upsert state: non 0/1 diff_sum: {}, state: {}, {}, \
674                            key: {:?}, value_byte_len: {:?}, decodable: {:?}",
675                            other, consolidating, source_id, key, value_byte_len, decode_ok,
676                        )
677                    }
678                }
679            }
680            StateValue::Value(_) => {}
681        }
682    }
683}
684
685impl<T, O> Default for StateValue<T, O> {
686    fn default() -> Self {
687        Self::Consolidating(Consolidating::default())
688    }
689}
690
691/// Statistics for a single call to `consolidate_chunk`.
692#[derive(Clone, Default, Debug)]
693pub struct SnapshotStats {
694    /// The number of updates processed.
695    pub updates: u64,
696    /// The aggregated number of values inserted or deleted into `state`.
697    pub values_diff: Diff,
698    /// The total aggregated size of values inserted, deleted, or updated in `state`.
699    /// If the current call to `consolidate_chunk` deletes a lot of values,
700    /// or updates values to smaller ones, this can be negative!
701    pub size_diff: i64,
702    /// The number of inserts i.e. +1 diff
703    pub inserts: u64,
704    /// The number of deletes i.e. -1 diffs
705    pub deletes: u64,
706}
707
708impl std::ops::AddAssign for SnapshotStats {
709    fn add_assign(&mut self, rhs: Self) {
710        self.updates += rhs.updates;
711        self.values_diff += rhs.values_diff;
712        self.size_diff += rhs.size_diff;
713        self.inserts += rhs.inserts;
714        self.deletes += rhs.deletes;
715    }
716}
717
718/// Statistics for a single call to `multi_merge`.
719#[derive(Clone, Default, Debug)]
720pub struct MergeStats {
721    /// The number of updates written as merge operands to the backend, for the backend
722    /// to process async in the `consolidating_merge_function`.
723    /// Should be equal to number of inserts + deletes
724    pub written_merge_operands: u64,
725    /// The total size of values provided to `multi_merge`. The backend will write these
726    /// down and then later merge them in the `consolidating_merge_function`.
727    pub size_written: u64,
728    /// The estimated diff of the total size of the working set after the merge operands
729    /// are merged by the backend. This is an estimate since it can't account for the
730    /// size overhead of `StateValue` for values that consolidate to 0 (tombstoned-values).
731    pub size_diff: i64,
732}
733
734/// Statistics for a single call to `multi_put`.
735#[derive(Clone, Default, Debug)]
736pub struct PutStats {
737    /// The number of puts/deletes processed
738    /// Should be equal to number of inserts + updates + deletes
739    pub processed_puts: u64,
740    /// The aggregated number of non-tombstone values inserted or deleted into `state`.
741    pub values_diff: i64,
742    /// The aggregated number of tombstones inserted or deleted into `state`
743    pub tombstones_diff: i64,
744    /// The total aggregated size of values inserted, deleted, or updated in `state`.
745    /// If the current call to `multi_put` deletes a lot of values,
746    /// or updates values to smaller ones, this can be negative!
747    pub size_diff: i64,
748    /// The number of inserts
749    pub inserts: u64,
750    /// The number of updates
751    pub updates: u64,
752    /// The number of deletes
753    pub deletes: u64,
754}
755
756impl PutStats {
757    /// Adjust the `PutStats` based on the new value and the previous metadata.
758    ///
759    /// The size parameter is separate as its value is backend-dependent. Its optional
760    /// as some backends increase the total size after an entire batch is processed.
761    ///
762    /// This method is provided for implementors of `UpsertStateBackend::multi_put`.
763    pub fn adjust<T, O>(
764        &mut self,
765        new_value: Option<&StateValue<T, O>>,
766        new_size: Option<i64>,
767        previous_metdata: &Option<ValueMetadata<i64>>,
768    ) {
769        self.adjust_size(new_value, new_size, previous_metdata);
770        self.adjust_values(new_value, previous_metdata);
771        self.adjust_tombstone(new_value, previous_metdata);
772    }
773
774    fn adjust_size<T, O>(
775        &mut self,
776        new_value: Option<&StateValue<T, O>>,
777        new_size: Option<i64>,
778        previous_metdata: &Option<ValueMetadata<i64>>,
779    ) {
780        match (&new_value, previous_metdata.as_ref()) {
781            (Some(_), Some(ps)) => {
782                self.size_diff -= ps.size;
783                if let Some(new_size) = new_size {
784                    self.size_diff += new_size;
785                }
786            }
787            (None, Some(ps)) => {
788                self.size_diff -= ps.size;
789            }
790            (Some(_), None) => {
791                if let Some(new_size) = new_size {
792                    self.size_diff += new_size;
793                }
794            }
795            (None, None) => {}
796        }
797    }
798
799    fn adjust_values<T, O>(
800        &mut self,
801        new_value: Option<&StateValue<T, O>>,
802        previous_metdata: &Option<ValueMetadata<i64>>,
803    ) {
804        let truly_new_value = new_value.map_or(false, |v| !v.is_tombstone());
805        let truly_old_value = previous_metdata.map_or(false, |v| !v.is_tombstone);
806
807        match (truly_new_value, truly_old_value) {
808            (false, true) => {
809                self.values_diff -= 1;
810            }
811            (true, false) => {
812                self.values_diff += 1;
813            }
814            _ => {}
815        }
816    }
817
818    fn adjust_tombstone<T, O>(
819        &mut self,
820        new_value: Option<&StateValue<T, O>>,
821        previous_metdata: &Option<ValueMetadata<i64>>,
822    ) {
823        let new_tombstone = new_value.map_or(false, |v| v.is_tombstone());
824        let old_tombstone = previous_metdata.map_or(false, |v| v.is_tombstone);
825
826        match (new_tombstone, old_tombstone) {
827            (false, true) => {
828                self.tombstones_diff -= 1;
829            }
830            (true, false) => {
831                self.tombstones_diff += 1;
832            }
833            _ => {}
834        }
835    }
836}
837
838/// Statistics for a single call to `multi_get`.
839#[derive(Clone, Default, Debug)]
840pub struct GetStats {
841    /// The number of gets processed
842    pub processed_gets: u64,
843    /// The total size in bytes returned
844    pub processed_gets_size: u64,
845    /// The number of non-empty records returned
846    pub returned_gets: u64,
847}
848
849/// A trait that defines the fundamental primitives required by a state-backing of
850/// `UpsertState`.
851///
852/// Implementors of this trait are blind maps that associate keys and values. They need
853/// not understand the semantics of `StateValue`, tombstones, or anything else related
854/// to a correct `upsert` implementation. The singular exception to this is that they
855/// **must** produce accurate `PutStats` and `GetStats`. The reasoning for this is two-fold:
856/// - efficiency: this avoids additional buffer allocation.
857/// - value sizes: only the backend implementation understands the size of values as recorded
858///
859/// This **must** is not a correctness requirement (we won't panic when emitting statistics), but
860/// rather a requirement to ensure the upsert operator is introspectable.
861#[async_trait::async_trait(?Send)]
862pub trait UpsertStateBackend<T, O>
863where
864    T: 'static,
865    O: 'static,
866{
867    /// Whether this backend supports the `multi_merge` operation.
868    fn supports_merge(&self) -> bool;
869
870    /// Insert or delete for all `puts` keys, prioritizing the last value for
871    /// repeated keys.
872    ///
873    /// The `PutValue` is _guaranteed_ to have an accurate and up-to-date
874    /// record of the metadata for existing value for the given key (if one existed),
875    /// as reported by a previous call to `multi_get`.
876    ///
877    /// `PutStats` **must** be populated correctly, according to these semantics:
878    /// - `values_diff` must record the difference in number of new non-tombstone values being
879    /// inserted into the backend.
880    /// - `tombstones_diff` must record the difference in number of tombstone values being
881    /// inserted into the backend.
882    /// - `size_diff` must record the change in size for the values being inserted/deleted/updated
883    /// in the backend, regardless of whether the values are tombstones or not.
884    async fn multi_put<P>(&mut self, puts: P) -> Result<PutStats, anyhow::Error>
885    where
886        P: IntoIterator<Item = (UpsertKey, PutValue<StateValue<T, O>>)>;
887
888    /// Get the `gets` keys, which must be unique, placing the results in `results_out`.
889    ///
890    /// Panics if `gets` and `results_out` are not the same length.
891    async fn multi_get<'r, G, R>(
892        &mut self,
893        gets: G,
894        results_out: R,
895    ) -> Result<GetStats, anyhow::Error>
896    where
897        G: IntoIterator<Item = UpsertKey>,
898        R: IntoIterator<Item = &'r mut UpsertValueAndSize<T, O>>;
899
900    /// For each key in `merges` writes a 'merge operand' to the backend. The backend stores these
901    /// merge operands and periodically calls the `consolidating_merge_function` to merge them into
902    /// any existing value for each key. The backend will merge the merge operands in the order
903    /// they are provided, and the merge function will always be run for a given key when a `get`
904    /// operation is performed on that key, or when the backend decides to run the merge based
905    /// on its own internal logic.
906    /// This allows avoiding the read-modify-write method of updating many values to
907    /// improve performance.
908    ///
909    /// The `MergeValue` should include a `diff` field that represents the update diff for the
910    /// value. This is used to estimate the overall size diff of the working set
911    /// after the merge operands are merged by the backend `sum[merges: m](m.diff * m.size)`.
912    ///
913    /// `MergeStats` **must** be populated correctly, according to these semantics:
914    /// - `written_merge_operands` must record the number of merge operands written to the backend.
915    /// - `size_written` must record the total size of values written to the backend.
916    ///     Note that the size of the post-merge values are not known, so this is the size of the
917    ///     values written to the backend as merge operands.
918    /// - `size_diff` must record the estimated diff of the total size of the working set after the
919    ///    merge operands are merged by the backend.
920    async fn multi_merge<P>(&mut self, merges: P) -> Result<MergeStats, anyhow::Error>
921    where
922        P: IntoIterator<Item = (UpsertKey, MergeValue<StateValue<T, O>>)>;
923}
924
925/// A function that merges a set of updates for a key into the existing value
926/// for the key. This is called by the backend implementation when it has
927/// accumulated a set of updates for a key, and needs to merge them into the
928/// existing value for the key.
929///
930/// The function is called with the following arguments:
931/// - The key for which the merge is being performed.
932/// - An iterator over any current value and merge operands queued for the key.
933///
934/// The function should return the new value for the key after merging all the updates.
935pub(crate) fn consolidating_merge_function<T: Eq, O>(
936    _key: UpsertKey,
937    updates: impl Iterator<Item = StateValue<T, O>>,
938) -> StateValue<T, O> {
939    let mut current: StateValue<T, O> = Default::default();
940
941    let mut bincode_buf = Vec::new();
942    for update in updates {
943        match update {
944            StateValue::Consolidating(_) => {
945                current.merge_update_state(&update);
946            }
947            StateValue::Value(_) => {
948                // This branch is more expensive, but we hopefully rarely hit
949                // it.
950                if let Some(finalized_value) = update.into_finalized_value() {
951                    let mut update = StateValue::default();
952                    update.merge_update(
953                        finalized_value,
954                        Diff::ONE,
955                        upsert_bincode_opts(),
956                        &mut bincode_buf,
957                    );
958                    current.merge_update_state(&update);
959                }
960            }
961        }
962    }
963
964    current
965}
966
967/// An `UpsertStateBackend` wrapper that supports consolidating merging, and
968/// reports basic metrics about the usage of the `UpsertStateBackend`.
969pub struct UpsertState<'metrics, S, T, O> {
970    inner: S,
971
972    // The status, start time, and stats about calls to `consolidate_chunk`.
973    pub snapshot_start: Instant,
974    snapshot_stats: SnapshotStats,
975    snapshot_completed: bool,
976
977    // Metrics shared across all workers running the `upsert` operator.
978    metrics: Arc<UpsertSharedMetrics>,
979    // Metrics for a specific worker.
980    worker_metrics: &'metrics UpsertMetrics,
981    // User-facing statistics.
982    stats: SourceStatistics,
983
984    // Bincode options and buffer used in `consolidate_chunk`.
985    bincode_opts: BincodeOpts,
986    bincode_buffer: Vec<u8>,
987
988    // We need to iterate over `updates` in `consolidate_chunk` twice, so we
989    // have a scratch vector for this.
990    consolidate_scratch: Vec<(UpsertKey, UpsertValue, mz_repr::Diff)>,
991    // "mini-upsert" map used in `consolidate_chunk`
992    consolidate_upsert_scratch: indexmap::IndexMap<UpsertKey, UpsertValueAndSize<T, O>>,
993    // a scratch vector for calling `multi_get`
994    multi_get_scratch: Vec<UpsertKey>,
995    shrink_upsert_unused_buffers_by_ratio: usize,
996}
997
998impl<'metrics, S, T, O> UpsertState<'metrics, S, T, O> {
999    pub(crate) fn new(
1000        inner: S,
1001        metrics: Arc<UpsertSharedMetrics>,
1002        worker_metrics: &'metrics UpsertMetrics,
1003        stats: SourceStatistics,
1004        shrink_upsert_unused_buffers_by_ratio: usize,
1005    ) -> Self {
1006        Self {
1007            inner,
1008            snapshot_start: Instant::now(),
1009            snapshot_stats: SnapshotStats::default(),
1010            snapshot_completed: false,
1011            metrics,
1012            worker_metrics,
1013            stats,
1014            bincode_opts: upsert_bincode_opts(),
1015            bincode_buffer: Vec::new(),
1016            consolidate_scratch: Vec::new(),
1017            consolidate_upsert_scratch: indexmap::IndexMap::new(),
1018            multi_get_scratch: Vec::new(),
1019            shrink_upsert_unused_buffers_by_ratio,
1020        }
1021    }
1022}
1023
1024impl<S, T, O> UpsertState<'_, S, T, O>
1025where
1026    S: UpsertStateBackend<T, O>,
1027    T: Eq + Clone + Send + Sync + Serialize + 'static,
1028    O: Clone + Send + Sync + Serialize + DeserializeOwned + 'static,
1029{
1030    /// Consolidate the following differential updates into the state. Updates
1031    /// provided to this method can be assumed to consolidate into a single
1032    /// value per-key, after all chunks of updates for a given timestamp have
1033    /// been processed,
1034    ///
1035    /// Therefore, after all updates of a given timestamp have been
1036    /// `consolidated`, all values must be in the correct state (as determined
1037    /// by `StateValue::ensure_decoded`).
1038    ///
1039    /// The `completed` boolean communicates whether or not this is the final
1040    /// chunk of updates for the initial "snapshot" from persist.
1041    ///
1042    /// If the backend supports it, this method will use `multi_merge` to
1043    /// consolidate the updates to avoid having to read the existing value for
1044    /// each key first. On some backends (like RocksDB), this can be
1045    /// significantly faster than the read-then-write consolidation strategy.
1046    ///
1047    /// Also note that we use `self.inner.multi_*`, not `self.multi_*`. This is
1048    /// to avoid erroneously changing metric and stats values.
1049    pub async fn consolidate_chunk<U>(
1050        &mut self,
1051        updates: U,
1052        completed: bool,
1053    ) -> Result<(), anyhow::Error>
1054    where
1055        U: IntoIterator<Item = (UpsertKey, UpsertValue, mz_repr::Diff)> + ExactSizeIterator,
1056    {
1057        fail::fail_point!("fail_consolidate_chunk", |_| {
1058            Err(anyhow::anyhow!("Error consolidating values"))
1059        });
1060
1061        if completed && self.snapshot_completed {
1062            panic!("attempted completion of already completed upsert snapshot")
1063        }
1064
1065        let phase = if !self.snapshot_completed {
1066            "rehydration"
1067        } else {
1068            "steady-state"
1069        };
1070
1071        let now = Instant::now();
1072        let batch_size = updates.len();
1073
1074        tracing::trace!(
1075            %phase,
1076            batch_size,
1077            completed,
1078            "consolidate_chunk: processing batch"
1079        );
1080
1081        self.consolidate_scratch.clear();
1082        self.consolidate_upsert_scratch.clear();
1083        self.multi_get_scratch.clear();
1084
1085        // Shrinking the scratch vectors if the capacity is significantly more than batch size
1086        if self.shrink_upsert_unused_buffers_by_ratio > 0 {
1087            let reduced_capacity =
1088                self.consolidate_scratch.capacity() / self.shrink_upsert_unused_buffers_by_ratio;
1089            if reduced_capacity > batch_size {
1090                // These vectors have already been cleared above and should be empty here
1091                self.consolidate_scratch.shrink_to(reduced_capacity);
1092                self.consolidate_upsert_scratch.shrink_to(reduced_capacity);
1093                self.multi_get_scratch.shrink_to(reduced_capacity);
1094            }
1095        }
1096
1097        // Depending on if the backend supports multi_merge, call the appropriate method.
1098        let stats = if self.inner.supports_merge() {
1099            self.consolidate_merge_inner(updates).await?
1100        } else {
1101            self.consolidate_read_write_inner(updates).await?
1102        };
1103
1104        // NOTE: These metrics use the term `merge` to refer to the consolidation of values.
1105        // This is because they were introduced before we the `multi_merge` operation was added.
1106        self.metrics
1107            .merge_snapshot_latency
1108            .observe(now.elapsed().as_secs_f64());
1109        self.worker_metrics
1110            .merge_snapshot_updates
1111            .inc_by(stats.updates);
1112        self.worker_metrics
1113            .merge_snapshot_inserts
1114            .inc_by(stats.inserts);
1115        self.worker_metrics
1116            .merge_snapshot_deletes
1117            .inc_by(stats.deletes);
1118
1119        self.stats.update_bytes_indexed_by(stats.size_diff);
1120        self.stats
1121            .update_records_indexed_by(stats.values_diff.into_inner());
1122
1123        self.snapshot_stats += stats;
1124
1125        if !self.snapshot_completed {
1126            // Updating the metrics
1127            self.worker_metrics.rehydration_total.set(
1128                self.snapshot_stats
1129                    .values_diff
1130                    .into_inner()
1131                    .try_into()
1132                    .unwrap_or_else(|e: std::num::TryFromIntError| {
1133                        tracing::warn!(
1134                            "rehydration_total metric overflowed or is negative \
1135                        and is innacurate: {}. Defaulting to 0",
1136                            e.display_with_causes(),
1137                        );
1138
1139                        0
1140                    }),
1141            );
1142            self.worker_metrics
1143                .rehydration_updates
1144                .set(self.snapshot_stats.updates);
1145        }
1146
1147        if completed {
1148            if self.shrink_upsert_unused_buffers_by_ratio > 0 {
1149                // After rehydration is done, these scratch buffers should now be empty
1150                // shrinking them entirely
1151                self.consolidate_scratch.shrink_to_fit();
1152                self.consolidate_upsert_scratch.shrink_to_fit();
1153                self.multi_get_scratch.shrink_to_fit();
1154            }
1155
1156            self.worker_metrics
1157                .rehydration_latency
1158                .set(self.snapshot_start.elapsed().as_secs_f64());
1159
1160            self.snapshot_completed = true;
1161        }
1162        Ok(())
1163    }
1164
1165    /// Consolidate the updates into the state. This method requires the backend
1166    /// has support for the `multi_merge` operation, and will panic if
1167    /// `self.inner.supports_merge()` was not checked before calling this
1168    /// method. `multi_merge` will write the updates as 'merge operands' to the
1169    /// backend, and then the backend will consolidate those updates with any
1170    /// existing state using the `consolidating_merge_function`.
1171    ///
1172    /// This method can have significant performance benefits over the
1173    /// read-then-write method of `consolidate_read_write_inner`.
1174    async fn consolidate_merge_inner<U>(
1175        &mut self,
1176        updates: U,
1177    ) -> Result<SnapshotStats, anyhow::Error>
1178    where
1179        U: IntoIterator<Item = (UpsertKey, UpsertValue, mz_repr::Diff)> + ExactSizeIterator,
1180    {
1181        let mut updates = updates.into_iter().peekable();
1182
1183        let mut stats = SnapshotStats::default();
1184
1185        if updates.peek().is_some() {
1186            let m_stats = self
1187                .inner
1188                .multi_merge(updates.map(|(k, v, diff)| {
1189                    // Transform into a `StateValue<O>` that can be used by the
1190                    // `consolidating_merge_function` to merge with any existing
1191                    // value for the key.
1192                    let mut val: StateValue<T, O> = Default::default();
1193                    val.merge_update(v, diff, self.bincode_opts, &mut self.bincode_buffer);
1194
1195                    stats.updates += 1;
1196                    if diff.is_positive() {
1197                        stats.inserts += 1;
1198                    } else if diff.is_negative() {
1199                        stats.deletes += 1;
1200                    }
1201
1202                    // To keep track of the overall `values_diff` we can use the sum of diffs which
1203                    // should be equal to the number of non-tombstoned values in the backend.
1204                    // This is a bit misleading as this represents the eventual state after the
1205                    // `consolidating_merge_function` has been called to merge all the updates,
1206                    // and not the state after this `multi_merge` call.
1207                    //
1208                    // This does not accurately report values that have been consolidated to diff == 0, as tracking that
1209                    // per-key is extremely difficult.
1210                    stats.values_diff += diff;
1211
1212                    (k, MergeValue { value: val, diff })
1213                }))
1214                .await?;
1215
1216            stats.size_diff = m_stats.size_diff;
1217        }
1218
1219        Ok(stats)
1220    }
1221
1222    /// Consolidates the updates into the state. This method reads the existing
1223    /// values for each key, consolidates the updates, and writes the new values
1224    /// back to the state.
1225    async fn consolidate_read_write_inner<U>(
1226        &mut self,
1227        updates: U,
1228    ) -> Result<SnapshotStats, anyhow::Error>
1229    where
1230        U: IntoIterator<Item = (UpsertKey, UpsertValue, mz_repr::Diff)> + ExactSizeIterator,
1231    {
1232        let mut updates = updates.into_iter().peekable();
1233
1234        let mut stats = SnapshotStats::default();
1235
1236        if updates.peek().is_some() {
1237            self.consolidate_scratch.extend(updates);
1238            self.consolidate_upsert_scratch.extend(
1239                self.consolidate_scratch
1240                    .iter()
1241                    .map(|(k, _, _)| (*k, UpsertValueAndSize::default())),
1242            );
1243            self.multi_get_scratch
1244                .extend(self.consolidate_upsert_scratch.iter().map(|(k, _)| *k));
1245            self.inner
1246                .multi_get(
1247                    self.multi_get_scratch.drain(..),
1248                    self.consolidate_upsert_scratch.iter_mut().map(|(_, v)| v),
1249                )
1250                .await?;
1251
1252            for (key, value, diff) in self.consolidate_scratch.drain(..) {
1253                stats.updates += 1;
1254                if diff.is_positive() {
1255                    stats.inserts += 1;
1256                } else if diff.is_negative() {
1257                    stats.deletes += 1;
1258                }
1259
1260                // We rely on the diffs in our input instead of the result of
1261                // multi_put below. This makes sure we report the same stats as
1262                // `consolidate_merge_inner`, regardless of what values
1263                // there were in state before.
1264                stats.values_diff += diff;
1265
1266                let entry = self.consolidate_upsert_scratch.get_mut(&key).unwrap();
1267                let val = entry.value.get_or_insert_with(Default::default);
1268
1269                if val.merge_update(value, diff, self.bincode_opts, &mut self.bincode_buffer) {
1270                    entry.value = None;
1271                }
1272            }
1273
1274            // Note we do 1 `multi_get` and 1 `multi_put` while processing a _batch of updates_.
1275            // Within the batch, we effectively consolidate each key, before persisting that
1276            // consolidated value. Easy!!
1277            let p_stats = self
1278                .inner
1279                .multi_put(self.consolidate_upsert_scratch.drain(..).map(|(k, v)| {
1280                    (
1281                        k,
1282                        PutValue {
1283                            value: v.value,
1284                            previous_value_metadata: v.metadata.map(|v| ValueMetadata {
1285                                size: v.size.try_into().expect("less than i64 size"),
1286                                is_tombstone: v.is_tombstone,
1287                            }),
1288                        },
1289                    )
1290                }))
1291                .await?;
1292
1293            stats.size_diff = p_stats.size_diff;
1294        }
1295
1296        Ok(stats)
1297    }
1298
1299    /// Insert or delete for all `puts` keys, prioritizing the last value for
1300    /// repeated keys.
1301    pub async fn multi_put<P>(
1302        &mut self,
1303        update_per_record_stats: bool,
1304        puts: P,
1305    ) -> Result<(), anyhow::Error>
1306    where
1307        P: IntoIterator<Item = (UpsertKey, PutValue<Value<T, O>>)>,
1308    {
1309        fail::fail_point!("fail_state_multi_put", |_| {
1310            Err(anyhow::anyhow!("Error putting values into state"))
1311        });
1312        let now = Instant::now();
1313        let stats = self
1314            .inner
1315            .multi_put(puts.into_iter().map(|(k, pv)| {
1316                (
1317                    k,
1318                    PutValue {
1319                        value: pv.value.map(StateValue::Value),
1320                        previous_value_metadata: pv.previous_value_metadata,
1321                    },
1322                )
1323            }))
1324            .await?;
1325
1326        self.metrics
1327            .multi_put_latency
1328            .observe(now.elapsed().as_secs_f64());
1329        self.worker_metrics
1330            .multi_put_size
1331            .inc_by(stats.processed_puts);
1332
1333        if update_per_record_stats {
1334            self.worker_metrics.upsert_inserts.inc_by(stats.inserts);
1335            self.worker_metrics.upsert_updates.inc_by(stats.updates);
1336            self.worker_metrics.upsert_deletes.inc_by(stats.deletes);
1337
1338            self.stats.update_bytes_indexed_by(stats.size_diff);
1339            self.stats.update_records_indexed_by(stats.values_diff);
1340            self.stats
1341                .update_envelope_state_tombstones_by(stats.tombstones_diff);
1342        }
1343
1344        Ok(())
1345    }
1346
1347    /// Get the `gets` keys, which must be unique, placing the results in `results_out`.
1348    ///
1349    /// Panics if `gets` and `results_out` are not the same length.
1350    pub async fn multi_get<'r, G, R>(
1351        &mut self,
1352        gets: G,
1353        results_out: R,
1354    ) -> Result<(), anyhow::Error>
1355    where
1356        G: IntoIterator<Item = UpsertKey>,
1357        R: IntoIterator<Item = &'r mut UpsertValueAndSize<T, O>>,
1358        O: 'r,
1359    {
1360        fail::fail_point!("fail_state_multi_get", |_| {
1361            Err(anyhow::anyhow!("Error getting values from state"))
1362        });
1363        let now = Instant::now();
1364        let stats = self.inner.multi_get(gets, results_out).await?;
1365
1366        self.metrics
1367            .multi_get_latency
1368            .observe(now.elapsed().as_secs_f64());
1369        self.worker_metrics
1370            .multi_get_size
1371            .inc_by(stats.processed_gets);
1372        self.worker_metrics
1373            .multi_get_result_count
1374            .inc_by(stats.returned_gets);
1375        self.worker_metrics
1376            .multi_get_result_bytes
1377            .inc_by(stats.processed_gets_size);
1378
1379        Ok(())
1380    }
1381}
1382
1383#[cfg(test)]
1384mod tests {
1385    use mz_repr::Row;
1386
1387    use super::*;
1388    #[mz_ore::test]
1389    fn test_merge_update() {
1390        let mut buf = Vec::new();
1391        let opts = upsert_bincode_opts();
1392
1393        let mut s = StateValue::<(), ()>::Consolidating(Consolidating::default());
1394
1395        let small_row = Ok(Row::default());
1396        let longer_row = Ok(Row::pack([mz_repr::Datum::Null]));
1397        s.merge_update(small_row, Diff::ONE, opts, &mut buf);
1398        s.merge_update(longer_row.clone(), Diff::MINUS_ONE, opts, &mut buf);
1399        // This clears the retraction of the `longer_row`, but the
1400        // `value_xor` is the length of the `longer_row`. This tests
1401        // that we are tracking checksums correctly.
1402        s.merge_update(longer_row, Diff::ONE, opts, &mut buf);
1403
1404        // Assert that the `Consolidating` value is fully merged.
1405        s.ensure_decoded(opts, GlobalId::User(1), None);
1406    }
1407
1408    // We guard some of our assumptions. Increasing in-memory size of StateValue
1409    // has a direct impact on memory usage of in-memory UPSERT sources.
1410    #[mz_ore::test]
1411    fn test_memory_size() {
1412        let finalized_value: StateValue<(), ()> = StateValue::finalized_value(Ok(Row::default()));
1413        assert!(
1414            finalized_value.memory_size() <= 64,
1415            "memory size is {}",
1416            finalized_value.memory_size(),
1417        );
1418
1419        let provisional_value_with_finalized_value: StateValue<(), ()> =
1420            finalized_value.into_provisional_value(Ok(Row::default()), (), ());
1421        assert!(
1422            provisional_value_with_finalized_value.memory_size() <= 64,
1423            "memory size is {}",
1424            provisional_value_with_finalized_value.memory_size(),
1425        );
1426
1427        let provisional_value_without_finalized_value: StateValue<(), ()> =
1428            StateValue::new_provisional_value(Ok(Row::default()), (), ());
1429        assert!(
1430            provisional_value_without_finalized_value.memory_size() <= 64,
1431            "memory size is {}",
1432            provisional_value_without_finalized_value.memory_size(),
1433        );
1434
1435        let mut consolidating_value: StateValue<(), ()> = StateValue::default();
1436        consolidating_value.merge_update(
1437            Ok(Row::default()),
1438            Diff::ONE,
1439            upsert_bincode_opts(),
1440            &mut Vec::new(),
1441        );
1442        assert!(
1443            consolidating_value.memory_size() <= 66,
1444            "memory size is {}",
1445            consolidating_value.memory_size(),
1446        );
1447    }
1448
1449    #[mz_ore::test]
1450    #[should_panic(
1451        expected = "invalid upsert state: len_sum is non-0, state: Consolidating { len_sum: 1"
1452    )]
1453    fn test_merge_update_len_0_assert() {
1454        let mut buf = Vec::new();
1455        let opts = upsert_bincode_opts();
1456
1457        let mut s = StateValue::<(), ()>::Consolidating(Consolidating::default());
1458
1459        let small_row = Ok(mz_repr::Row::default());
1460        let longer_row = Ok(mz_repr::Row::pack([mz_repr::Datum::Null]));
1461        s.merge_update(longer_row.clone(), Diff::ONE, opts, &mut buf);
1462        s.merge_update(small_row.clone(), Diff::MINUS_ONE, opts, &mut buf);
1463
1464        s.ensure_decoded(opts, GlobalId::User(1), None);
1465    }
1466
1467    #[mz_ore::test]
1468    #[should_panic(
1469        expected = "invalid upsert state: \"value_xor is not the same length (3) as len (4), state: Consolidating { len_sum: 4"
1470    )]
1471    fn test_merge_update_len_to_long_assert() {
1472        let mut buf = Vec::new();
1473        let opts = upsert_bincode_opts();
1474
1475        let mut s = StateValue::<(), ()>::Consolidating(Consolidating::default());
1476
1477        let small_row = Ok(mz_repr::Row::default());
1478        let longer_row = Ok(mz_repr::Row::pack([mz_repr::Datum::Null]));
1479        s.merge_update(longer_row.clone(), Diff::ONE, opts, &mut buf);
1480        s.merge_update(small_row.clone(), Diff::MINUS_ONE, opts, &mut buf);
1481        s.merge_update(longer_row.clone(), Diff::ONE, opts, &mut buf);
1482
1483        s.ensure_decoded(opts, GlobalId::User(1), None);
1484    }
1485
1486    #[mz_ore::test]
1487    #[should_panic(expected = "invalid upsert state: checksum_sum does not match")]
1488    fn test_merge_update_checksum_doesnt_match() {
1489        let mut buf = Vec::new();
1490        let opts = upsert_bincode_opts();
1491
1492        let mut s = StateValue::<(), ()>::Consolidating(Consolidating::default());
1493
1494        let small_row = Ok(mz_repr::Row::pack([mz_repr::Datum::Int64(2)]));
1495        let longer_row = Ok(mz_repr::Row::pack([mz_repr::Datum::Int64(1)]));
1496        s.merge_update(longer_row.clone(), Diff::ONE, opts, &mut buf);
1497        s.merge_update(small_row.clone(), Diff::MINUS_ONE, opts, &mut buf);
1498        s.merge_update(longer_row.clone(), Diff::ONE, opts, &mut buf);
1499
1500        s.ensure_decoded(opts, GlobalId::User(1), None);
1501    }
1502}