mz_storage/upsert/types.rs
1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! # State-management for UPSERT.
11//!
12//! This module and provide structures for use within an UPSERT
13//! operator implementation.
14//!
15//! UPSERT is a effectively a process which transforms a `Stream<(Key, Option<Data>)>`
16//! into a differential collection, by indexing the data based on the key.
17//!
18//! _This module does not implement this transformation, instead exposing APIs designed
19//! for use within an UPSERT operator. There is one exception to this: `consolidate_chunk`
20//! implements an efficient upsert-like transformation to re-index a collection using the
21//! _output collection_ of an upsert transformation. More on this below.
22//!
23//! ## `UpsertState`
24//!
25//! Its primary export is `UpsertState`, which wraps an `UpsertStateBackend` and provides 3 APIs:
26//!
27//! ### `multi_get`
28//! `multi_get` returns the current value for a (unique) set of keys. To keep implementations
29//! efficient, the set of keys is an iterator, and results are written back into another parallel
30//! iterator. In addition to returning the current values, implementations must also return the
31//! _size_ of those values _as they are stored within the implementation_. Implementations are
32//! required to chunk large iterators if they need to operate over smaller batches.
33//!
34//! `multi_get` is implemented directly with `UpsertStateBackend::multi_get`.
35//!
36//! ### `multi_put`
37//! Update or delete values for a set of keys. To keep implementations efficient, the set
38//! of updates is an iterator. Implementations are also required to return the difference
39//! in values and total size after processing the updates. To simplify this (and because
40//! in the `upsert` usecase we have this data readily available), the updates are input
41//! with the size of the current value (if any) that was returned from a previous `multi_get`.
42//! Implementations are required to chunk large iterators if they need to operate over smaller
43//! batches.
44//!
45//! `multi_put` is implemented directly with `UpsertStateBackend::multi_put`.
46//!
47//! ### `consolidate_chunk`
48//!
49//! `consolidate_chunk` re-indexes an UPSERT collection based on its _output collection_ (as
50//! opposed to its _input `Stream`_. Please see the docs on `consolidate_chunk` and `StateValue`
51//! for more information.
52//!
53//! `consolidate_chunk` is implemented with both `UpsertStateBackend::multi_put` and
54//! `UpsertStateBackend::multi_get`
55//!
56//! ## Order Keys
57//!
58//! In practice, the input stream for UPSERT collections includes an _order key_. This is used to
59//! sort data with the same key occurring in the same timestamp. This module provides support
60//! for serializing and deserializing order keys with their associated data. Being able to ingest
61//! data on non-frontier boundaries requires this support.
62//!
63//! A consequence of this is that tombstones with an order key can be stored within the state.
64//! There is currently no support for cleaning these tombstones up, as they are considered rare and
65//! small enough.
66//!
67//! Because `consolidate_chunk` handles data that consolidates correctly, it does not handle
68//! order keys.
69//!
70//!
71//! ## A note on state size
72//!
73//! The `UpsertStateBackend` trait requires implementations report _relatively accurate_ information about
74//! how the state size changes over time. Note that it does NOT ask the implementations to give
75//! accurate information about actual resource consumption (like disk space including space
76//! amplification), and instead is just asking about the size of the values, after they have been
77//! encoded. For implementations like `RocksDB`, these may be highly accurate (it literally
78//! reports the encoded size as written to the RocksDB API, and for others like the
79//! `InMemoryHashMap`, they may be rough estimates of actual memory usage. See
80//! `StateValue::memory_size` for more information.
81//!
82//! Note also that after consolidation, additional space may be used if `StateValue` is
83//! used.
84//!
85
86use std::fmt;
87use std::num::Wrapping;
88use std::sync::Arc;
89use std::time::Instant;
90
91use bincode::Options;
92use itertools::Itertools;
93use mz_ore::cast::CastFrom;
94use mz_ore::error::ErrorExt;
95use mz_repr::Diff;
96use serde::{Serialize, de::DeserializeOwned};
97
98use crate::metrics::upsert::{UpsertMetrics, UpsertSharedMetrics};
99use crate::statistics::SourceStatistics;
100use crate::upsert::{UpsertKey, UpsertValue};
101
102/// The default set of `bincode` options used for consolidating
103/// upsert updates (and writing values to RocksDB).
104pub type BincodeOpts = bincode::config::DefaultOptions;
105
106/// Build the default `BincodeOpts`.
107pub fn upsert_bincode_opts() -> BincodeOpts {
108 // We don't allow trailing bytes, for now,
109 // and use varint encoding for space saving.
110 bincode::DefaultOptions::new()
111}
112
113/// The result type for `multi_get`.
114/// The value and size are stored in individual `Option`s so callees
115/// can reuse this value as they overwrite this value, keeping
116/// track of the previous metadata. Additionally, values
117/// may be `None` for tombstones.
118#[derive(Clone)]
119pub struct UpsertValueAndSize<T, O> {
120 /// The value, if there was one.
121 pub value: Option<StateValue<T, O>>,
122 /// The size of original`value` as persisted,
123 /// Useful for users keeping track of statistics.
124 pub metadata: Option<ValueMetadata<u64>>,
125}
126
127impl<T, O> std::fmt::Debug for UpsertValueAndSize<T, O> {
128 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
129 f.debug_struct("UpsertValueAndSize")
130 .field("value", &self.value)
131 .field("metadata", &self.metadata)
132 .finish()
133 }
134}
135
136impl<T, O> Default for UpsertValueAndSize<T, O> {
137 fn default() -> Self {
138 Self {
139 value: None,
140 metadata: None,
141 }
142 }
143}
144
145/// Metadata about an existing value in the upsert state backend, as returned
146/// by `multi_get`.
147#[derive(Copy, Clone, Debug)]
148pub struct ValueMetadata<S> {
149 /// The size of the value.
150 pub size: S,
151 /// If the value is a tombstone.
152 pub is_tombstone: bool,
153}
154
155/// A value to put in with `multi_put`.
156#[derive(Clone, Debug)]
157pub struct PutValue<V> {
158 /// The new value, or a `None` to indicate a delete.
159 pub value: Option<V>,
160 /// The value of the previous value for this key, if known.
161 /// Passed into efficiently calculate statistics.
162 pub previous_value_metadata: Option<ValueMetadata<i64>>,
163}
164
165/// A value to put in with a `multi_merge`.
166pub struct MergeValue<V> {
167 /// The value of the merge operand to write to the backend.
168 pub value: V,
169 /// The 'diff' of this merge operand value, used to estimate the overall size diff
170 /// of the working set after this merge operand is merged by the backend.
171 pub diff: Diff,
172}
173
174/// `UpsertState` has 2 modes:
175/// - Normal operation
176/// - Consolidation.
177///
178/// This struct and its substructs are helpers to simplify the logic that
179/// individual `UpsertState` implementations need to do to manage these 2 modes.
180///
181/// Normal operation is simple, we just store an ordinary `UpsertValue`, and allow the implementer
182/// to store it any way they want. During consolidation, the logic is more complex.
183/// See the docs on `StateValue::merge_update` for more information.
184///
185/// Note also that this type is designed to support _partial updates_. All values are
186/// associated with an _order key_ `O` that can be used to determine if a value existing in the
187/// `UpsertStateBackend` occurred before or after a value being considered for insertion.
188///
189/// `O` typically required to be `: Default`, with the default value sorting below all others.
190/// During consolidation, values consolidate correctly (as they are actual
191/// differential updates with diffs), so order keys are not required.
192#[derive(Clone, serde::Serialize, serde::Deserialize)]
193pub enum StateValue<T, O> {
194 Consolidating(Consolidating),
195 Value(Value<T, O>),
196}
197
198impl<T, O> std::fmt::Debug for StateValue<T, O> {
199 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
200 match self {
201 StateValue::Consolidating(_) => write!(f, "Consolidating"),
202 StateValue::Value(_) => write!(f, "Value"),
203 }
204 }
205}
206
207/// A totally consolidated value stored within the `UpsertStateBackend`.
208///
209/// This type contains support for _tombstones_, that contain an _order key_,
210/// and provisional values.
211///
212/// What is considered finalized and provisional depends on the implementation
213/// of the UPSERT operator: it might consider everything that it writes to its
214/// state finalized, and assume that what it emits will be written down in the
215/// output exactly as presented. Or it might consider everything it writes down
216/// provisional, and only consider updates that it _knows_ to be persisted as
217/// finalized.
218///
219/// Provisional values should only be considered while still "working off"
220/// updates with the same timestamp at which the provisional update was
221/// recorded.
222#[derive(Clone, serde::Serialize, serde::Deserialize, Debug)]
223pub enum Value<T, O> {
224 FinalizedValue(UpsertValue, O),
225 Tombstone(O),
226 ProvisionalValue {
227 // We keep the finalized value around, because the provisional value is
228 // only valid when processing updates at the same timestamp. And at any
229 // point we might still require access to the finalized value.
230 finalized_value: Option<Box<(UpsertValue, O)>>,
231 // A provisional value of `None` is a provisional tombstone.
232 //
233 // WIP: We can also box this, to keep the size of StateValue as it was
234 // previously.
235 provisional_value: (Option<UpsertValue>, T, O),
236 },
237}
238
239/// A value as produced during consolidation.
240#[derive(Clone, Default, serde::Serialize, serde::Deserialize, Debug)]
241pub struct Consolidating {
242 #[serde(with = "serde_bytes")]
243 value_xor: Vec<u8>,
244 len_sum: Wrapping<i64>,
245 checksum_sum: Wrapping<i64>,
246 diff_sum: Wrapping<i64>,
247}
248
249impl fmt::Display for Consolidating {
250 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
251 f.debug_struct("Consolidating")
252 .field("len_sum", &self.len_sum)
253 .field("checksum_sum", &self.checksum_sum)
254 .field("diff_sum", &self.diff_sum)
255 .finish_non_exhaustive()
256 }
257}
258
259impl<T, O> StateValue<T, O> {
260 /// A finalized, that is (assumed) persistent, value occurring at some order
261 /// key.
262 pub fn finalized_value(value: UpsertValue, order: O) -> Self {
263 Self::Value(Value::FinalizedValue(value, order))
264 }
265
266 #[allow(unused)]
267 /// A tombstoned value occurring at some order key.
268 pub fn tombstone(order: O) -> Self {
269 Self::Value(Value::Tombstone(order))
270 }
271
272 /// Whether the value is a tombstone.
273 pub fn is_tombstone(&self) -> bool {
274 match self {
275 Self::Value(Value::Tombstone(_)) => true,
276 _ => false,
277 }
278 }
279
280 /// Pull out the order for the given `Value`, assuming `ensure_decoded` has been called.
281 pub fn order(&self) -> &O {
282 match self {
283 Self::Value(Value::FinalizedValue(_, order)) => order,
284 Self::Value(Value::ProvisionalValue { .. }) => {
285 panic!("order() called on provisional value")
286 }
287 Self::Value(Value::Tombstone(order)) => order,
288 _ => panic!("called `order` without calling `ensure_decoded`"),
289 }
290 }
291
292 /// Pull out the `Value` value for a `StateValue`, after `ensure_decoded` has been called.
293 pub fn into_decoded(self) -> Value<T, O> {
294 match self {
295 Self::Value(value) => value,
296 _ => panic!("called `into_decoded without calling `ensure_decoded`"),
297 }
298 }
299
300 /// The size of a `StateValue`, in memory. This is:
301 /// 1. only used in the `InMemoryHashMap` implementation.
302 /// 2. An estimate (it only looks at value sizes, and not errors)
303 ///
304 /// Other implementations may use more accurate accounting.
305 pub fn memory_size(&self) -> u64 {
306 match self {
307 // Similar to `Row::byte_len`, we add the heap size and the size of the value itself.
308 Self::Consolidating(Consolidating { value_xor, .. }) => {
309 u64::cast_from(value_xor.len()) + u64::cast_from(std::mem::size_of::<Self>())
310 }
311 Self::Value(Value::FinalizedValue(Ok(row), ..)) => {
312 // `Row::byte_len` includes the size of `Row`, which is also in `Self`, so we
313 // subtract it.
314 u64::cast_from(row.byte_len()) - u64::cast_from(std::mem::size_of::<mz_repr::Row>())
315 // This assumes the size of any `O` instantiation is meaningful (i.e. not a heap
316 // object).
317 + u64::cast_from(std::mem::size_of::<Self>())
318 }
319 Self::Value(Value::ProvisionalValue {
320 finalized_value,
321 provisional_value,
322 }) => {
323 finalized_value.as_ref().map(|v| match v.as_ref() {
324 (Ok(row), _order) =>
325 // The finalized value is boxed, so the size of Row is
326 // not included in the outer size of Self. We therefore
327 // don't subtract it here like for the other branches.
328 u64::cast_from(row.byte_len())
329 // Add the size of the order, because it's also behind
330 // the box.
331 + u64::cast_from(std::mem::size_of::<O>()),
332 // Assume errors are rare enough to not move the needle.
333 (Err(_), _order) => 0,
334 }).unwrap_or(0)
335 +
336 provisional_value.0.as_ref().map(|v| match v{
337 Ok(row) =>
338 // `Row::byte_len` includes the size of `Row`, which is
339 // also in `Self`, so we subtract it.
340 u64::cast_from(row.byte_len()) - u64::cast_from(std::mem::size_of::<mz_repr::Row>()),
341 // The size of order is already included in the outer
342 // size of self.
343 // Assume errors are rare enough to not move the needle.
344 Err(_) => 0,
345 }).unwrap_or(0)
346 // This assumes the size of any `O` instantiation is meaningful (i.e. not a heap
347 // object).
348 + u64::cast_from(std::mem::size_of::<Self>())
349 }
350 Self::Value(Value::Tombstone(_)) => {
351 // This assumes the size of any `O` instantiation is meaningful (i.e. not a heap
352 // object).
353 u64::cast_from(std::mem::size_of::<Self>())
354 }
355 Self::Value(Value::FinalizedValue(Err(_), ..)) => {
356 // Assume errors are rare enough to not move the needle.
357 0
358 }
359 }
360 }
361}
362
363impl<T: Eq, O> StateValue<T, O> {
364 /// Creates a new provisional value, occurring at some order key, observed
365 /// at the given timestamp.
366 pub fn new_provisional_value(
367 provisional_value: UpsertValue,
368 provisional_ts: T,
369 order: O,
370 ) -> Self {
371 Self::Value(Value::ProvisionalValue {
372 finalized_value: None,
373 provisional_value: (Some(provisional_value), provisional_ts, order),
374 })
375 }
376
377 /// Creates a provisional value, that retains the finalized value along with
378 /// its order in this `StateValue`, if any.
379 ///
380 /// We record the finalized value, so that we can present it when needed or
381 /// when trying to read a provisional value at a different timestamp.
382 pub fn into_provisional_value(
383 self,
384 provisional_value: UpsertValue,
385 provisional_ts: T,
386 provisional_order: O,
387 ) -> Self {
388 match self {
389 StateValue::Value(Value::FinalizedValue(value, order)) => {
390 StateValue::Value(Value::ProvisionalValue {
391 finalized_value: Some(Box::new((value, order))),
392 provisional_value: (Some(provisional_value), provisional_ts, provisional_order),
393 })
394 }
395 StateValue::Value(Value::Tombstone(_)) => StateValue::Value(Value::ProvisionalValue {
396 finalized_value: None,
397 provisional_value: (Some(provisional_value), provisional_ts, provisional_order),
398 }),
399 StateValue::Value(Value::ProvisionalValue {
400 finalized_value,
401 provisional_value: _,
402 }) => StateValue::Value(Value::ProvisionalValue {
403 finalized_value,
404 provisional_value: (Some(provisional_value), provisional_ts, provisional_order),
405 }),
406 StateValue::Consolidating(_) => {
407 panic!("called `into_provisional_value` without calling `ensure_decoded`")
408 }
409 }
410 }
411
412 /// Creates a new provisional tombstone occurring at some order key,
413 /// observed at the given timestamp.
414 pub fn new_provisional_tombstone(provisional_ts: T, order: O) -> Self {
415 Self::Value(Value::ProvisionalValue {
416 finalized_value: None,
417 provisional_value: (None, provisional_ts, order),
418 })
419 }
420
421 /// Creates a provisional tombstone, that retains the finalized value along
422 /// with its order in this `StateValue`, if any.
423 ///
424 /// We record the current finalized value, so that we can present it when
425 /// needed or when trying to read a provisional value at a different
426 /// timestamp.
427 pub fn into_provisional_tombstone(self, provisional_ts: T, provisional_order: O) -> Self {
428 match self {
429 StateValue::Value(Value::FinalizedValue(value, order)) => {
430 StateValue::Value(Value::ProvisionalValue {
431 finalized_value: Some(Box::new((value, order))),
432 provisional_value: (None, provisional_ts, provisional_order),
433 })
434 }
435 StateValue::Value(Value::Tombstone(order)) => {
436 StateValue::Value(Value::ProvisionalValue {
437 finalized_value: None,
438 provisional_value: (None, provisional_ts, order),
439 })
440 }
441 StateValue::Value(Value::ProvisionalValue {
442 finalized_value,
443 provisional_value: _,
444 }) => StateValue::Value(Value::ProvisionalValue {
445 finalized_value,
446 provisional_value: (None, provisional_ts, provisional_order),
447 }),
448 StateValue::Consolidating(_) => {
449 panic!("called `into_provisional_tombstone` without calling `ensure_decoded`")
450 }
451 }
452 }
453
454 /// Returns the order of a provisional value at the given timestamp. If that
455 /// doesn't exist, the order of the finalized value.
456 ///
457 /// Returns `None` if none of the above exist.
458 pub fn provisional_order(&self, ts: &T) -> Option<&O> {
459 match self {
460 Self::Value(Value::FinalizedValue(_, order)) => Some(order),
461 Self::Value(Value::Tombstone(order)) => Some(order),
462 Self::Value(Value::ProvisionalValue {
463 finalized_value: _,
464 provisional_value: (_, provisional_ts, provisional_order),
465 }) if provisional_ts == ts => Some(provisional_order),
466 Self::Value(Value::ProvisionalValue {
467 finalized_value,
468 provisional_value: _,
469 }) => finalized_value.as_ref().map(|v| &v.1),
470 Self::Consolidating(_) => {
471 panic!("called `provisional_order` without calling `ensure_decoded`")
472 }
473 }
474 }
475
476 // WIP: We don't need these after all, but leave in for review.
477 ///// Returns the order of this value, if a finalized value is present.
478 //pub fn finalized_order(&self) -> Option<&O> {
479 // match self {
480 // Self::Value(Value::FinalizedValue(_, order)) => Some(order),
481 // Self::Value(Value::Tombstone(order)) => Some(order),
482 // Self::Value(Value::ProvisionalValue {
483 // finalized_value,
484 // provisional_value: _,
485 // }) => finalized_value.as_ref().map(|v| &v.1),
486 // Self::Consolidating(_) => {
487 // panic!("called `finalized_order` without calling `ensure_decoded`")
488 // }
489 // }
490 //}
491
492 ///// Returns the provisional value, if one is present at the given timestamp.
493 ///// Falls back to the finalized value, or `None` if there is neither.
494 //pub fn into_provisional_value(self, ts: &T) -> Option<(UpsertValue, O)> {
495 // match self {
496 // Self::Value(Value::FinalizedValue(value, order)) => Some((value, order)),
497 // Self::Value(Value::Tombstone(_)) => None,
498 // Self::Value(Value::ProvisionalValue {
499 // finalized_value: _,
500 // provisional_value: (provisional_value, provisional_ts, provisional_order),
501 // }) if provisional_ts == *ts => provisional_value.map(|v| (v, provisional_order)),
502 // Self::Value(Value::ProvisionalValue {
503 // finalized_value,
504 // provisional_value: _,
505 // }) => finalized_value.map(|boxed| *boxed),
506 // Self::Consolidating(_) => {
507 // panic!("called `into_provisional_value` without calling `ensure_decoded`")
508 // }
509 // }
510 //}
511
512 /// Returns the provisional value, if one is present at the given timestamp.
513 /// Falls back to the finalized value, or `None` if there is neither.
514 pub fn provisional_value_ref(&self, ts: &T) -> Option<&UpsertValue> {
515 match self {
516 Self::Value(Value::FinalizedValue(value, _order)) => Some(value),
517 Self::Value(Value::Tombstone(_)) => None,
518 Self::Value(Value::ProvisionalValue {
519 finalized_value: _,
520 provisional_value: (provisional_value, provisional_ts, _provisional_order),
521 }) if provisional_ts == ts => provisional_value.as_ref(),
522 Self::Value(Value::ProvisionalValue {
523 finalized_value,
524 provisional_value: _,
525 }) => finalized_value.as_ref().map(|boxed| &boxed.0),
526 Self::Consolidating(_) => {
527 panic!("called `provisional_value_ref` without calling `ensure_decoded`")
528 }
529 }
530 }
531
532 /// Returns the the finalized value, if one is present.
533 pub fn into_finalized_value(self) -> Option<(UpsertValue, O)> {
534 match self {
535 Self::Value(Value::FinalizedValue(value, order)) => Some((value, order)),
536 Self::Value(Value::Tombstone(_order)) => None,
537 Self::Value(Value::ProvisionalValue {
538 finalized_value,
539 provisional_value: _,
540 }) => finalized_value.map(|boxed| *boxed),
541 _ => panic!("called `order` without calling `ensure_decoded`"),
542 }
543 }
544}
545
546impl<T: Eq, O: Default> StateValue<T, O> {
547 /// We use a XOR trick in order to accumulate the values without having to store the full
548 /// unconsolidated history in memory. For all (value, diff) updates of a key we track:
549 /// - diff_sum = SUM(diff)
550 /// - checksum_sum = SUM(checksum(bincode(value)) * diff)
551 /// - len_sum = SUM(len(bincode(value)) * diff)
552 /// - value_xor = XOR(bincode(value))
553 ///
554 /// ## Return value
555 /// Returns a `bool` indicating whether or not the current merged value is able to be deleted.
556 ///
557 /// ## Correctness
558 ///
559 /// The method is correct because a well formed upsert collection at a given
560 /// timestamp will have for each key:
561 /// - Zero or one updates of the form (cur_value, +1)
562 /// - Zero or more pairs of updates of the form (prev_value, +1), (prev_value, -1)
563 ///
564 /// We are interested in extracting the cur_value of each key and discard all prev_values
565 /// that might be included in the stream. Since the history of prev_values always comes in
566 /// pairs, computing the XOR of those is always going to cancel their effects out. Also,
567 /// since XOR is commutative this property is true independent of the order. The same is
568 /// true for the summations of the length and checksum since the sum will contain the
569 /// unrelated values zero times.
570 ///
571 /// Therefore the accumulators will end up precisely in one of two states:
572 /// 1. diff == 0, checksum == 0, value == [0..] => the key is not present
573 /// 2. diff == 1, checksum == checksum(cur_value) value == cur_value => the key is present
574 ///
575 /// ## Robustness
576 ///
577 /// In the absense of bugs, accumulating the diff and checksum is not required since we know
578 /// that a well formed collection always satisfies XOR(bincode(values)) == bincode(cur_value).
579 /// However bugs may happen and so storing 16 more bytes per key to have a very high
580 /// guarantee that we're not decoding garbage is more than worth it.
581 /// The main key->value used to store previous values.
582 #[allow(clippy::as_conversions)]
583 pub fn merge_update(
584 &mut self,
585 value: UpsertValue,
586 diff: mz_repr::Diff,
587 bincode_opts: BincodeOpts,
588 bincode_buffer: &mut Vec<u8>,
589 ) -> bool {
590 match self {
591 Self::Consolidating(Consolidating {
592 value_xor,
593 len_sum,
594 checksum_sum,
595 diff_sum,
596 }) => {
597 bincode_buffer.clear();
598 bincode_opts
599 .serialize_into(&mut *bincode_buffer, &value)
600 .unwrap();
601 let len = i64::try_from(bincode_buffer.len()).unwrap();
602
603 *diff_sum += diff.into_inner();
604 *len_sum += len.wrapping_mul(diff.into_inner());
605 // Truncation is fine (using `as`) as this is just a checksum
606 *checksum_sum +=
607 (seahash::hash(&*bincode_buffer) as i64).wrapping_mul(diff.into_inner());
608
609 // XOR of even diffs cancel out, so we only do it if diff is odd
610 if diff.abs() % Diff::from(2) == Diff::ONE {
611 if value_xor.len() < bincode_buffer.len() {
612 value_xor.resize(bincode_buffer.len(), 0);
613 }
614 // Note that if the new value is _smaller_ than the `value_xor`, and
615 // the values at the end are zeroed out, we can shrink the buffer. This
616 // is extremely sensitive code, so we don't (yet) do that.
617 for (acc, val) in value_xor.iter_mut().zip(bincode_buffer.drain(..)) {
618 *acc ^= val;
619 }
620 }
621
622 // Returns whether or not the value can be deleted. This allows
623 // us to delete values in `UpsertState::consolidate_chunk` (even
624 // if they come back later), to minimize space usage.
625 diff_sum.0 == 0 && checksum_sum.0 == 0 && value_xor.iter().all(|&x| x == 0)
626 }
627 StateValue::Value(_value) => {
628 // We can turn a Value back into a Consolidating state:
629 // `std::mem::take` will leave behind a default value, which
630 // happens to be a default `Consolidating` `StateValue`.
631 let this = std::mem::take(self);
632
633 let finalized_value = this.into_finalized_value();
634 if let Some((finalized_value, _order)) = finalized_value {
635 // If we had a value before, merge it into the
636 // now-consolidating state first.
637 let _ =
638 self.merge_update(finalized_value, Diff::ONE, bincode_opts, bincode_buffer);
639
640 // Then merge the new value in.
641 self.merge_update(value, diff, bincode_opts, bincode_buffer)
642 } else {
643 // We didn't have a value before, might have been a
644 // tombstone. So just merge in the new value.
645 self.merge_update(value, diff, bincode_opts, bincode_buffer)
646 }
647 }
648 }
649 }
650
651 /// Merge an existing StateValue into this one, using the same method described in `merge_update`.
652 /// See the docstring above for more information on correctness and robustness.
653 pub fn merge_update_state(&mut self, other: &Self) {
654 match (self, other) {
655 (
656 Self::Consolidating(Consolidating {
657 value_xor,
658 len_sum,
659 checksum_sum,
660 diff_sum,
661 }),
662 Self::Consolidating(other_consolidating),
663 ) => {
664 *diff_sum += other_consolidating.diff_sum;
665 *len_sum += other_consolidating.len_sum;
666 *checksum_sum += other_consolidating.checksum_sum;
667 if other_consolidating.value_xor.len() > value_xor.len() {
668 value_xor.resize(other_consolidating.value_xor.len(), 0);
669 }
670 for (acc, val) in value_xor
671 .iter_mut()
672 .zip(other_consolidating.value_xor.iter())
673 {
674 *acc ^= val;
675 }
676 }
677 _ => panic!("`merge_update_state` called with non-consolidating state"),
678 }
679 }
680
681 /// During and after consolidation, we assume that values in the `UpsertStateBackend` implementation
682 /// can be `Self::Consolidating`, with a `diff_sum` of 1 (or 0, if they have been deleted).
683 /// Afterwards, if we need to retract one of these values, we need to assert that its in this correct state,
684 /// then mutate it to its `Value` state, so the `upsert` operator can use it.
685 #[allow(clippy::as_conversions)]
686 pub fn ensure_decoded(&mut self, bincode_opts: BincodeOpts) {
687 match self {
688 StateValue::Consolidating(consolidating) => {
689 match consolidating.diff_sum.0 {
690 1 => {
691 let len = usize::try_from(consolidating.len_sum.0)
692 .map_err(|_| {
693 format!(
694 "len_sum can't be made into a usize, state: {}",
695 consolidating
696 )
697 })
698 .expect("invalid upsert state");
699 let value = &consolidating
700 .value_xor
701 .get(..len)
702 .ok_or_else(|| {
703 format!(
704 "value_xor is not the same length ({}) as len ({}), state: {}",
705 consolidating.value_xor.len(),
706 len,
707 consolidating
708 )
709 })
710 .expect("invalid upsert state");
711 // Truncation is fine (using `as`) as this is just a checksum
712 assert_eq!(
713 consolidating.checksum_sum.0,
714 // Hash the value, not the full buffer, which may have extra 0's
715 seahash::hash(value) as i64,
716 "invalid upsert state: checksum_sum does not match, state: {}",
717 consolidating
718 );
719 *self = Self::Value(Value::FinalizedValue(
720 bincode_opts.deserialize(value).unwrap(),
721 Default::default(),
722 ));
723 }
724 0 => {
725 assert_eq!(
726 consolidating.len_sum.0, 0,
727 "invalid upsert state: len_sum is non-0, state: {}",
728 consolidating
729 );
730 assert_eq!(
731 consolidating.checksum_sum.0, 0,
732 "invalid upsert state: checksum_sum is non-0, state: {}",
733 consolidating
734 );
735 assert!(
736 consolidating.value_xor.iter().all(|&x| x == 0),
737 "invalid upsert state: value_xor not all 0s with 0 diff. \
738 Non-zero positions: {:?}, state: {}",
739 consolidating
740 .value_xor
741 .iter()
742 .positions(|&x| x != 0)
743 .collect::<Vec<_>>(),
744 consolidating
745 );
746 *self = Self::Value(Value::Tombstone(Default::default()));
747 }
748 other => panic!(
749 "invalid upsert state: non 0/1 diff_sum: {}, state: {}",
750 other, consolidating
751 ),
752 }
753 }
754 _ => {}
755 }
756 }
757}
758
759impl<T, O> Default for StateValue<T, O> {
760 fn default() -> Self {
761 Self::Consolidating(Consolidating::default())
762 }
763}
764
765/// Statistics for a single call to `consolidate_chunk`.
766#[derive(Clone, Default, Debug)]
767pub struct SnapshotStats {
768 /// The number of updates processed.
769 pub updates: u64,
770 /// The aggregated number of values inserted or deleted into `state`.
771 pub values_diff: Diff,
772 /// The total aggregated size of values inserted, deleted, or updated in `state`.
773 /// If the current call to `consolidate_chunk` deletes a lot of values,
774 /// or updates values to smaller ones, this can be negative!
775 pub size_diff: i64,
776 /// The number of inserts i.e. +1 diff
777 pub inserts: u64,
778 /// The number of deletes i.e. -1 diffs
779 pub deletes: u64,
780}
781
782impl std::ops::AddAssign for SnapshotStats {
783 fn add_assign(&mut self, rhs: Self) {
784 self.updates += rhs.updates;
785 self.values_diff += rhs.values_diff;
786 self.size_diff += rhs.size_diff;
787 self.inserts += rhs.inserts;
788 self.deletes += rhs.deletes;
789 }
790}
791
792/// Statistics for a single call to `multi_merge`.
793#[derive(Clone, Default, Debug)]
794pub struct MergeStats {
795 /// The number of updates written as merge operands to the backend, for the backend
796 /// to process async in the `consolidating_merge_function`.
797 /// Should be equal to number of inserts + deletes
798 pub written_merge_operands: u64,
799 /// The total size of values provided to `multi_merge`. The backend will write these
800 /// down and then later merge them in the `consolidating_merge_function`.
801 pub size_written: u64,
802 /// The estimated diff of the total size of the working set after the merge operands
803 /// are merged by the backend. This is an estimate since it can't account for the
804 /// size overhead of `StateValue` for values that consolidate to 0 (tombstoned-values).
805 pub size_diff: i64,
806}
807
808/// Statistics for a single call to `multi_put`.
809#[derive(Clone, Default, Debug)]
810pub struct PutStats {
811 /// The number of puts/deletes processed
812 /// Should be equal to number of inserts + updates + deletes
813 pub processed_puts: u64,
814 /// The aggregated number of non-tombstone values inserted or deleted into `state`.
815 pub values_diff: i64,
816 /// The aggregated number of tombstones inserted or deleted into `state`
817 pub tombstones_diff: i64,
818 /// The total aggregated size of values inserted, deleted, or updated in `state`.
819 /// If the current call to `multi_put` deletes a lot of values,
820 /// or updates values to smaller ones, this can be negative!
821 pub size_diff: i64,
822 /// The number of inserts
823 pub inserts: u64,
824 /// The number of updates
825 pub updates: u64,
826 /// The number of deletes
827 pub deletes: u64,
828}
829
830impl PutStats {
831 /// Adjust the `PutStats` based on the new value and the previous metadata.
832 ///
833 /// The size parameter is separate as its value is backend-dependent. Its optional
834 /// as some backends increase the total size after an entire batch is processed.
835 ///
836 /// This method is provided for implementors of `UpsertStateBackend::multi_put`.
837 pub fn adjust<T, O>(
838 &mut self,
839 new_value: Option<&StateValue<T, O>>,
840 new_size: Option<i64>,
841 previous_metdata: &Option<ValueMetadata<i64>>,
842 ) {
843 self.adjust_size(new_value, new_size, previous_metdata);
844 self.adjust_values(new_value, previous_metdata);
845 self.adjust_tombstone(new_value, previous_metdata);
846 }
847
848 fn adjust_size<T, O>(
849 &mut self,
850 new_value: Option<&StateValue<T, O>>,
851 new_size: Option<i64>,
852 previous_metdata: &Option<ValueMetadata<i64>>,
853 ) {
854 match (&new_value, previous_metdata.as_ref()) {
855 (Some(_), Some(ps)) => {
856 self.size_diff -= ps.size;
857 if let Some(new_size) = new_size {
858 self.size_diff += new_size;
859 }
860 }
861 (None, Some(ps)) => {
862 self.size_diff -= ps.size;
863 }
864 (Some(_), None) => {
865 if let Some(new_size) = new_size {
866 self.size_diff += new_size;
867 }
868 }
869 (None, None) => {}
870 }
871 }
872
873 fn adjust_values<T, O>(
874 &mut self,
875 new_value: Option<&StateValue<T, O>>,
876 previous_metdata: &Option<ValueMetadata<i64>>,
877 ) {
878 let truly_new_value = new_value.map_or(false, |v| !v.is_tombstone());
879 let truly_old_value = previous_metdata.map_or(false, |v| !v.is_tombstone);
880
881 match (truly_new_value, truly_old_value) {
882 (false, true) => {
883 self.values_diff -= 1;
884 }
885 (true, false) => {
886 self.values_diff += 1;
887 }
888 _ => {}
889 }
890 }
891
892 fn adjust_tombstone<T, O>(
893 &mut self,
894 new_value: Option<&StateValue<T, O>>,
895 previous_metdata: &Option<ValueMetadata<i64>>,
896 ) {
897 let new_tombstone = new_value.map_or(false, |v| v.is_tombstone());
898 let old_tombstone = previous_metdata.map_or(false, |v| v.is_tombstone);
899
900 match (new_tombstone, old_tombstone) {
901 (false, true) => {
902 self.tombstones_diff -= 1;
903 }
904 (true, false) => {
905 self.tombstones_diff += 1;
906 }
907 _ => {}
908 }
909 }
910}
911
912/// Statistics for a single call to `multi_get`.
913#[derive(Clone, Default, Debug)]
914pub struct GetStats {
915 /// The number of gets processed
916 pub processed_gets: u64,
917 /// The total size in bytes returned
918 pub processed_gets_size: u64,
919 /// The number of non-empty records returned
920 pub returned_gets: u64,
921}
922
923/// A trait that defines the fundamental primitives required by a state-backing of
924/// `UpsertState`.
925///
926/// Implementors of this trait are blind maps that associate keys and values. They need
927/// not understand the semantics of `StateValue`, tombstones, or anything else related
928/// to a correct `upsert` implementation. The singular exception to this is that they
929/// **must** produce accurate `PutStats` and `GetStats`. The reasoning for this is two-fold:
930/// - efficiency: this avoids additional buffer allocation.
931/// - value sizes: only the backend implementation understands the size of values as recorded
932///
933/// This **must** is not a correctness requirement (we won't panic when emitting statistics), but
934/// rather a requirement to ensure the upsert operator is introspectable.
935#[async_trait::async_trait(?Send)]
936pub trait UpsertStateBackend<T, O>
937where
938 T: 'static,
939 O: 'static,
940{
941 /// Whether this backend supports the `multi_merge` operation.
942 fn supports_merge(&self) -> bool;
943
944 /// Insert or delete for all `puts` keys, prioritizing the last value for
945 /// repeated keys.
946 ///
947 /// The `PutValue` is _guaranteed_ to have an accurate and up-to-date
948 /// record of the metadata for existing value for the given key (if one existed),
949 /// as reported by a previous call to `multi_get`.
950 ///
951 /// `PutStats` **must** be populated correctly, according to these semantics:
952 /// - `values_diff` must record the difference in number of new non-tombstone values being
953 /// inserted into the backend.
954 /// - `tombstones_diff` must record the difference in number of tombstone values being
955 /// inserted into the backend.
956 /// - `size_diff` must record the change in size for the values being inserted/deleted/updated
957 /// in the backend, regardless of whether the values are tombstones or not.
958 async fn multi_put<P>(&mut self, puts: P) -> Result<PutStats, anyhow::Error>
959 where
960 P: IntoIterator<Item = (UpsertKey, PutValue<StateValue<T, O>>)>;
961
962 /// Get the `gets` keys, which must be unique, placing the results in `results_out`.
963 ///
964 /// Panics if `gets` and `results_out` are not the same length.
965 async fn multi_get<'r, G, R>(
966 &mut self,
967 gets: G,
968 results_out: R,
969 ) -> Result<GetStats, anyhow::Error>
970 where
971 G: IntoIterator<Item = UpsertKey>,
972 R: IntoIterator<Item = &'r mut UpsertValueAndSize<T, O>>;
973
974 /// For each key in `merges` writes a 'merge operand' to the backend. The backend stores these
975 /// merge operands and periodically calls the `consolidating_merge_function` to merge them into
976 /// any existing value for each key. The backend will merge the merge operands in the order
977 /// they are provided, and the merge function will always be run for a given key when a `get`
978 /// operation is performed on that key, or when the backend decides to run the merge based
979 /// on its own internal logic.
980 /// This allows avoiding the read-modify-write method of updating many values to
981 /// improve performance.
982 ///
983 /// The `MergeValue` should include a `diff` field that represents the update diff for the
984 /// value. This is used to estimate the overall size diff of the working set
985 /// after the merge operands are merged by the backend `sum[merges: m](m.diff * m.size)`.
986 ///
987 /// `MergeStats` **must** be populated correctly, according to these semantics:
988 /// - `written_merge_operands` must record the number of merge operands written to the backend.
989 /// - `size_written` must record the total size of values written to the backend.
990 /// Note that the size of the post-merge values are not known, so this is the size of the
991 /// values written to the backend as merge operands.
992 /// - `size_diff` must record the estimated diff of the total size of the working set after the
993 /// merge operands are merged by the backend.
994 async fn multi_merge<P>(&mut self, merges: P) -> Result<MergeStats, anyhow::Error>
995 where
996 P: IntoIterator<Item = (UpsertKey, MergeValue<StateValue<T, O>>)>;
997}
998
999/// A function that merges a set of updates for a key into the existing value
1000/// for the key. This is called by the backend implementation when it has
1001/// accumulated a set of updates for a key, and needs to merge them into the
1002/// existing value for the key.
1003///
1004/// The function is called with the following arguments:
1005/// - The key for which the merge is being performed.
1006/// - An iterator over any current value and merge operands queued for the key.
1007///
1008/// The function should return the new value for the key after merging all the updates.
1009pub(crate) fn consolidating_merge_function<T, O>(
1010 _key: UpsertKey,
1011 updates: impl Iterator<Item = StateValue<T, O>>,
1012) -> StateValue<T, O>
1013where
1014 O: Default,
1015 T: std::cmp::Eq,
1016{
1017 let mut current: StateValue<T, O> = Default::default();
1018
1019 let mut bincode_buf = Vec::new();
1020 for update in updates {
1021 match update {
1022 StateValue::Consolidating(_) => {
1023 current.merge_update_state(&update);
1024 }
1025 StateValue::Value(_) => {
1026 // This branch is more expensive, but we hopefully rarely hit
1027 // it.
1028 if let Some((finalized_value, _order)) = update.into_finalized_value() {
1029 let mut update = StateValue::default();
1030 update.merge_update(
1031 finalized_value,
1032 Diff::ONE,
1033 upsert_bincode_opts(),
1034 &mut bincode_buf,
1035 );
1036 current.merge_update_state(&update);
1037 }
1038 }
1039 }
1040 }
1041
1042 current
1043}
1044
1045/// An `UpsertStateBackend` wrapper that supports consolidating merging, and
1046/// reports basic metrics about the usage of the `UpsertStateBackend`.
1047pub struct UpsertState<'metrics, S, T, O> {
1048 inner: S,
1049
1050 // The status, start time, and stats about calls to `consolidate_chunk`.
1051 pub snapshot_start: Instant,
1052 snapshot_stats: SnapshotStats,
1053 snapshot_completed: bool,
1054
1055 // Metrics shared across all workers running the `upsert` operator.
1056 metrics: Arc<UpsertSharedMetrics>,
1057 // Metrics for a specific worker.
1058 worker_metrics: &'metrics UpsertMetrics,
1059 // User-facing statistics.
1060 stats: SourceStatistics,
1061
1062 // Bincode options and buffer used in `consolidate_chunk`.
1063 bincode_opts: BincodeOpts,
1064 bincode_buffer: Vec<u8>,
1065
1066 // We need to iterate over `updates` in `consolidate_chunk` twice, so we
1067 // have a scratch vector for this.
1068 consolidate_scratch: Vec<(UpsertKey, UpsertValue, mz_repr::Diff)>,
1069 // "mini-upsert" map used in `consolidate_chunk`
1070 consolidate_upsert_scratch: indexmap::IndexMap<UpsertKey, UpsertValueAndSize<T, O>>,
1071 // a scratch vector for calling `multi_get`
1072 multi_get_scratch: Vec<UpsertKey>,
1073 shrink_upsert_unused_buffers_by_ratio: usize,
1074}
1075
1076impl<'metrics, S, T, O> UpsertState<'metrics, S, T, O> {
1077 pub(crate) fn new(
1078 inner: S,
1079 metrics: Arc<UpsertSharedMetrics>,
1080 worker_metrics: &'metrics UpsertMetrics,
1081 stats: SourceStatistics,
1082 shrink_upsert_unused_buffers_by_ratio: usize,
1083 ) -> Self {
1084 Self {
1085 inner,
1086 snapshot_start: Instant::now(),
1087 snapshot_stats: SnapshotStats::default(),
1088 snapshot_completed: false,
1089 metrics,
1090 worker_metrics,
1091 stats,
1092 bincode_opts: upsert_bincode_opts(),
1093 bincode_buffer: Vec::new(),
1094 consolidate_scratch: Vec::new(),
1095 consolidate_upsert_scratch: indexmap::IndexMap::new(),
1096 multi_get_scratch: Vec::new(),
1097 shrink_upsert_unused_buffers_by_ratio,
1098 }
1099 }
1100}
1101
1102impl<S, T, O> UpsertState<'_, S, T, O>
1103where
1104 S: UpsertStateBackend<T, O>,
1105 T: Eq + Clone + Send + Sync + Serialize + 'static,
1106 O: Default + Clone + Send + Sync + Serialize + DeserializeOwned + 'static,
1107{
1108 /// Consolidate the following differential updates into the state. Updates
1109 /// provided to this method can be assumed to consolidate into a single
1110 /// value per-key, after all chunks of updates for a given timestamp have
1111 /// been processed,
1112 ///
1113 /// Therefore, after all updates of a given timestamp have been
1114 /// `consolidated`, all values must be in the correct state (as determined
1115 /// by `StateValue::ensure_decoded`).
1116 ///
1117 /// The `completed` boolean communicates whether or not this is the final
1118 /// chunk of updates for the initial "snapshot" from persist.
1119 ///
1120 /// If the backend supports it, this method will use `multi_merge` to
1121 /// consolidate the updates to avoid having to read the existing value for
1122 /// each key first. On some backends (like RocksDB), this can be
1123 /// significantly faster than the read-then-write consolidation strategy.
1124 ///
1125 /// Also note that we use `self.inner.multi_*`, not `self.multi_*`. This is
1126 /// to avoid erroneously changing metric and stats values.
1127 pub async fn consolidate_chunk<U>(
1128 &mut self,
1129 updates: U,
1130 completed: bool,
1131 ) -> Result<(), anyhow::Error>
1132 where
1133 U: IntoIterator<Item = (UpsertKey, UpsertValue, mz_repr::Diff)> + ExactSizeIterator,
1134 {
1135 fail::fail_point!("fail_consolidate_chunk", |_| {
1136 Err(anyhow::anyhow!("Error consolidating values"))
1137 });
1138
1139 if completed && self.snapshot_completed {
1140 panic!("attempted completion of already completed upsert snapshot")
1141 }
1142
1143 let now = Instant::now();
1144 let batch_size = updates.len();
1145
1146 self.consolidate_scratch.clear();
1147 self.consolidate_upsert_scratch.clear();
1148 self.multi_get_scratch.clear();
1149
1150 // Shrinking the scratch vectors if the capacity is significantly more than batch size
1151 if self.shrink_upsert_unused_buffers_by_ratio > 0 {
1152 let reduced_capacity =
1153 self.consolidate_scratch.capacity() / self.shrink_upsert_unused_buffers_by_ratio;
1154 if reduced_capacity > batch_size {
1155 // These vectors have already been cleared above and should be empty here
1156 self.consolidate_scratch.shrink_to(reduced_capacity);
1157 self.consolidate_upsert_scratch.shrink_to(reduced_capacity);
1158 self.multi_get_scratch.shrink_to(reduced_capacity);
1159 }
1160 }
1161
1162 // Depending on if the backend supports multi_merge, call the appropriate method.
1163 // This can change during the lifetime of the `UpsertState` instance (e.g.
1164 // the Autospill backend will switch from in-memory to rocksdb after a certain
1165 // number of updates have been processed and begin supporting multi_merge).
1166 let stats = if self.inner.supports_merge() {
1167 self.consolidate_merge_inner(updates).await?
1168 } else {
1169 self.consolidate_read_write_inner(updates).await?
1170 };
1171
1172 // NOTE: These metrics use the term `merge` to refer to the consolidation of values.
1173 // This is because they were introduced before we the `multi_merge` operation was added.
1174 self.metrics
1175 .merge_snapshot_latency
1176 .observe(now.elapsed().as_secs_f64());
1177 self.worker_metrics
1178 .merge_snapshot_updates
1179 .inc_by(stats.updates);
1180 self.worker_metrics
1181 .merge_snapshot_inserts
1182 .inc_by(stats.inserts);
1183 self.worker_metrics
1184 .merge_snapshot_deletes
1185 .inc_by(stats.deletes);
1186
1187 self.stats.update_bytes_indexed_by(stats.size_diff);
1188 self.stats
1189 .update_records_indexed_by(stats.values_diff.into_inner());
1190
1191 self.snapshot_stats += stats;
1192
1193 if !self.snapshot_completed {
1194 // Updating the metrics
1195 self.worker_metrics.rehydration_total.set(
1196 self.snapshot_stats
1197 .values_diff
1198 .into_inner()
1199 .try_into()
1200 .unwrap_or_else(|e: std::num::TryFromIntError| {
1201 tracing::warn!(
1202 "rehydration_total metric overflowed or is negative \
1203 and is innacurate: {}. Defaulting to 0",
1204 e.display_with_causes(),
1205 );
1206
1207 0
1208 }),
1209 );
1210 self.worker_metrics
1211 .rehydration_updates
1212 .set(self.snapshot_stats.updates);
1213 }
1214
1215 if completed {
1216 if self.shrink_upsert_unused_buffers_by_ratio > 0 {
1217 // After rehydration is done, these scratch buffers should now be empty
1218 // shrinking them entirely
1219 self.consolidate_scratch.shrink_to_fit();
1220 self.consolidate_upsert_scratch.shrink_to_fit();
1221 self.multi_get_scratch.shrink_to_fit();
1222 }
1223
1224 self.worker_metrics
1225 .rehydration_latency
1226 .set(self.snapshot_start.elapsed().as_secs_f64());
1227
1228 self.snapshot_completed = true;
1229 }
1230 Ok(())
1231 }
1232
1233 /// Consolidate the updates into the state. This method requires the backend
1234 /// has support for the `multi_merge` operation, and will panic if
1235 /// `self.inner.supports_merge()` was not checked before calling this
1236 /// method. `multi_merge` will write the updates as 'merge operands' to the
1237 /// backend, and then the backend will consolidate those updates with any
1238 /// existing state using the `consolidating_merge_function`.
1239 ///
1240 /// This method can have significant performance benefits over the
1241 /// read-then-write method of `consolidate_read_write_inner`.
1242 async fn consolidate_merge_inner<U>(
1243 &mut self,
1244 updates: U,
1245 ) -> Result<SnapshotStats, anyhow::Error>
1246 where
1247 U: IntoIterator<Item = (UpsertKey, UpsertValue, mz_repr::Diff)> + ExactSizeIterator,
1248 {
1249 let mut updates = updates.into_iter().peekable();
1250
1251 let mut stats = SnapshotStats::default();
1252
1253 if updates.peek().is_some() {
1254 let m_stats = self
1255 .inner
1256 .multi_merge(updates.map(|(k, v, diff)| {
1257 // Transform into a `StateValue<O>` that can be used by the
1258 // `consolidating_merge_function` to merge with any existing
1259 // value for the key.
1260 let mut val: StateValue<T, O> = Default::default();
1261 val.merge_update(v, diff, self.bincode_opts, &mut self.bincode_buffer);
1262
1263 stats.updates += 1;
1264 if diff.is_positive() {
1265 stats.inserts += 1;
1266 } else if diff.is_negative() {
1267 stats.deletes += 1;
1268 }
1269
1270 // To keep track of the overall `values_diff` we can use the sum of diffs which
1271 // should be equal to the number of non-tombstoned values in the backend.
1272 // This is a bit misleading as this represents the eventual state after the
1273 // `consolidating_merge_function` has been called to merge all the updates,
1274 // and not the state after this `multi_merge` call.
1275 //
1276 // This does not accurately report values that have been consolidated to diff == 0, as tracking that
1277 // per-key is extremely difficult.
1278 stats.values_diff += diff;
1279
1280 (k, MergeValue { value: val, diff })
1281 }))
1282 .await?;
1283
1284 stats.size_diff = m_stats.size_diff;
1285 }
1286
1287 Ok(stats)
1288 }
1289
1290 /// Consolidates the updates into the state. This method reads the existing
1291 /// values for each key, consolidates the updates, and writes the new values
1292 /// back to the state.
1293 async fn consolidate_read_write_inner<U>(
1294 &mut self,
1295 updates: U,
1296 ) -> Result<SnapshotStats, anyhow::Error>
1297 where
1298 U: IntoIterator<Item = (UpsertKey, UpsertValue, mz_repr::Diff)> + ExactSizeIterator,
1299 {
1300 let mut updates = updates.into_iter().peekable();
1301
1302 let mut stats = SnapshotStats::default();
1303
1304 if updates.peek().is_some() {
1305 self.consolidate_scratch.extend(updates);
1306 self.consolidate_upsert_scratch.extend(
1307 self.consolidate_scratch
1308 .iter()
1309 .map(|(k, _, _)| (*k, UpsertValueAndSize::default())),
1310 );
1311 self.multi_get_scratch
1312 .extend(self.consolidate_upsert_scratch.iter().map(|(k, _)| *k));
1313 self.inner
1314 .multi_get(
1315 self.multi_get_scratch.drain(..),
1316 self.consolidate_upsert_scratch.iter_mut().map(|(_, v)| v),
1317 )
1318 .await?;
1319
1320 for (key, value, diff) in self.consolidate_scratch.drain(..) {
1321 stats.updates += 1;
1322 if diff.is_positive() {
1323 stats.inserts += 1;
1324 } else if diff.is_negative() {
1325 stats.deletes += 1;
1326 }
1327
1328 // We rely on the diffs in our input instead of the result of
1329 // multi_put below. This makes sure we report the same stats as
1330 // `consolidate_merge_inner`, regardless of what values
1331 // there were in state before.
1332 stats.values_diff += diff;
1333
1334 let entry = self.consolidate_upsert_scratch.get_mut(&key).unwrap();
1335 let val = entry.value.get_or_insert_with(Default::default);
1336
1337 if val.merge_update(value, diff, self.bincode_opts, &mut self.bincode_buffer) {
1338 entry.value = None;
1339 }
1340 }
1341
1342 // Note we do 1 `multi_get` and 1 `multi_put` while processing a _batch of updates_.
1343 // Within the batch, we effectively consolidate each key, before persisting that
1344 // consolidated value. Easy!!
1345 let p_stats = self
1346 .inner
1347 .multi_put(self.consolidate_upsert_scratch.drain(..).map(|(k, v)| {
1348 (
1349 k,
1350 PutValue {
1351 value: v.value,
1352 previous_value_metadata: v.metadata.map(|v| ValueMetadata {
1353 size: v.size.try_into().expect("less than i64 size"),
1354 is_tombstone: v.is_tombstone,
1355 }),
1356 },
1357 )
1358 }))
1359 .await?;
1360
1361 stats.size_diff = p_stats.size_diff;
1362 }
1363
1364 Ok(stats)
1365 }
1366
1367 /// Insert or delete for all `puts` keys, prioritizing the last value for
1368 /// repeated keys.
1369 pub async fn multi_put<P>(
1370 &mut self,
1371 update_per_record_stats: bool,
1372 puts: P,
1373 ) -> Result<(), anyhow::Error>
1374 where
1375 P: IntoIterator<Item = (UpsertKey, PutValue<Value<T, O>>)>,
1376 {
1377 fail::fail_point!("fail_state_multi_put", |_| {
1378 Err(anyhow::anyhow!("Error putting values into state"))
1379 });
1380 let now = Instant::now();
1381 let stats = self
1382 .inner
1383 .multi_put(puts.into_iter().map(|(k, pv)| {
1384 (
1385 k,
1386 PutValue {
1387 value: pv.value.map(StateValue::Value),
1388 previous_value_metadata: pv.previous_value_metadata,
1389 },
1390 )
1391 }))
1392 .await?;
1393
1394 self.metrics
1395 .multi_put_latency
1396 .observe(now.elapsed().as_secs_f64());
1397 self.worker_metrics
1398 .multi_put_size
1399 .inc_by(stats.processed_puts);
1400
1401 if update_per_record_stats {
1402 self.worker_metrics.upsert_inserts.inc_by(stats.inserts);
1403 self.worker_metrics.upsert_updates.inc_by(stats.updates);
1404 self.worker_metrics.upsert_deletes.inc_by(stats.deletes);
1405
1406 self.stats.update_bytes_indexed_by(stats.size_diff);
1407 self.stats.update_records_indexed_by(stats.values_diff);
1408 self.stats
1409 .update_envelope_state_tombstones_by(stats.tombstones_diff);
1410 }
1411
1412 Ok(())
1413 }
1414
1415 /// Get the `gets` keys, which must be unique, placing the results in `results_out`.
1416 ///
1417 /// Panics if `gets` and `results_out` are not the same length.
1418 pub async fn multi_get<'r, G, R>(
1419 &mut self,
1420 gets: G,
1421 results_out: R,
1422 ) -> Result<(), anyhow::Error>
1423 where
1424 G: IntoIterator<Item = UpsertKey>,
1425 R: IntoIterator<Item = &'r mut UpsertValueAndSize<T, O>>,
1426 O: 'r,
1427 {
1428 fail::fail_point!("fail_state_multi_get", |_| {
1429 Err(anyhow::anyhow!("Error getting values from state"))
1430 });
1431 let now = Instant::now();
1432 let stats = self.inner.multi_get(gets, results_out).await?;
1433
1434 self.metrics
1435 .multi_get_latency
1436 .observe(now.elapsed().as_secs_f64());
1437 self.worker_metrics
1438 .multi_get_size
1439 .inc_by(stats.processed_gets);
1440 self.worker_metrics
1441 .multi_get_result_count
1442 .inc_by(stats.returned_gets);
1443 self.worker_metrics
1444 .multi_get_result_bytes
1445 .inc_by(stats.processed_gets_size);
1446
1447 Ok(())
1448 }
1449}
1450
1451#[cfg(test)]
1452mod tests {
1453 use mz_repr::Row;
1454
1455 use super::*;
1456 #[mz_ore::test]
1457 fn test_merge_update() {
1458 let mut buf = Vec::new();
1459 let opts = upsert_bincode_opts();
1460
1461 let mut s = StateValue::<(), ()>::Consolidating(Consolidating::default());
1462
1463 let small_row = Ok(Row::default());
1464 let longer_row = Ok(Row::pack([mz_repr::Datum::Null]));
1465 s.merge_update(small_row, Diff::ONE, opts, &mut buf);
1466 s.merge_update(longer_row.clone(), Diff::MINUS_ONE, opts, &mut buf);
1467 // This clears the retraction of the `longer_row`, but the
1468 // `value_xor` is the length of the `longer_row`. This tests
1469 // that we are tracking checksums correctly.
1470 s.merge_update(longer_row, Diff::ONE, opts, &mut buf);
1471
1472 // Assert that the `Consolidating` value is fully merged.
1473 s.ensure_decoded(opts);
1474 }
1475
1476 // We guard some of our assumptions. Increasing in-memory size of StateValue
1477 // has a direct impact on memory usage of in-memory UPSERT sources.
1478 #[mz_ore::test]
1479 fn test_memory_size() {
1480 let finalized_value: StateValue<(), ()> =
1481 StateValue::finalized_value(Ok(Row::default()), ());
1482 assert!(
1483 finalized_value.memory_size() <= 88,
1484 "memory size is {}",
1485 finalized_value.memory_size(),
1486 );
1487
1488 let provisional_value_with_finalized_value: StateValue<(), ()> =
1489 finalized_value.into_provisional_value(Ok(Row::default()), (), ());
1490 assert!(
1491 provisional_value_with_finalized_value.memory_size() <= 112,
1492 "memory size is {}",
1493 provisional_value_with_finalized_value.memory_size(),
1494 );
1495
1496 let provisional_value_without_finalized_value: StateValue<(), ()> =
1497 StateValue::new_provisional_value(Ok(Row::default()), (), ());
1498 assert!(
1499 provisional_value_without_finalized_value.memory_size() <= 88,
1500 "memory size is {}",
1501 provisional_value_without_finalized_value.memory_size(),
1502 );
1503
1504 let mut consolidating_value: StateValue<(), ()> = StateValue::default();
1505 consolidating_value.merge_update(
1506 Ok(Row::default()),
1507 Diff::ONE,
1508 upsert_bincode_opts(),
1509 &mut Vec::new(),
1510 );
1511 assert!(
1512 consolidating_value.memory_size() <= 90,
1513 "memory size is {}",
1514 consolidating_value.memory_size(),
1515 );
1516 }
1517
1518 #[mz_ore::test]
1519 #[should_panic(
1520 expected = "invalid upsert state: len_sum is non-0, state: Consolidating { len_sum: 1"
1521 )]
1522 fn test_merge_update_len_0_assert() {
1523 let mut buf = Vec::new();
1524 let opts = upsert_bincode_opts();
1525
1526 let mut s = StateValue::<(), ()>::Consolidating(Consolidating::default());
1527
1528 let small_row = Ok(mz_repr::Row::default());
1529 let longer_row = Ok(mz_repr::Row::pack([mz_repr::Datum::Null]));
1530 s.merge_update(longer_row.clone(), Diff::ONE, opts, &mut buf);
1531 s.merge_update(small_row.clone(), Diff::MINUS_ONE, opts, &mut buf);
1532
1533 s.ensure_decoded(opts);
1534 }
1535
1536 #[mz_ore::test]
1537 #[should_panic(
1538 expected = "invalid upsert state: \"value_xor is not the same length (3) as len (4), state: Consolidating { len_sum: 4"
1539 )]
1540 fn test_merge_update_len_to_long_assert() {
1541 let mut buf = Vec::new();
1542 let opts = upsert_bincode_opts();
1543
1544 let mut s = StateValue::<(), ()>::Consolidating(Consolidating::default());
1545
1546 let small_row = Ok(mz_repr::Row::default());
1547 let longer_row = Ok(mz_repr::Row::pack([mz_repr::Datum::Null]));
1548 s.merge_update(longer_row.clone(), Diff::ONE, opts, &mut buf);
1549 s.merge_update(small_row.clone(), Diff::MINUS_ONE, opts, &mut buf);
1550 s.merge_update(longer_row.clone(), Diff::ONE, opts, &mut buf);
1551
1552 s.ensure_decoded(opts);
1553 }
1554
1555 #[mz_ore::test]
1556 #[should_panic(expected = "invalid upsert state: checksum_sum does not match")]
1557 fn test_merge_update_checksum_doesnt_match() {
1558 let mut buf = Vec::new();
1559 let opts = upsert_bincode_opts();
1560
1561 let mut s = StateValue::<(), ()>::Consolidating(Consolidating::default());
1562
1563 let small_row = Ok(mz_repr::Row::pack([mz_repr::Datum::Int64(2)]));
1564 let longer_row = Ok(mz_repr::Row::pack([mz_repr::Datum::Int64(1)]));
1565 s.merge_update(longer_row.clone(), Diff::ONE, opts, &mut buf);
1566 s.merge_update(small_row.clone(), Diff::MINUS_ONE, opts, &mut buf);
1567 s.merge_update(longer_row.clone(), Diff::ONE, opts, &mut buf);
1568
1569 s.ensure_decoded(opts);
1570 }
1571}