mz_storage/upsert/types.rs
1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! # State-management for UPSERT.
11//!
12//! This module and provide structures for use within an UPSERT
13//! operator implementation.
14//!
15//! UPSERT is a effectively a process which transforms a `Stream<(Key, Option<Data>)>`
16//! into a differential collection, by indexing the data based on the key.
17//!
18//! _This module does not implement this transformation, instead exposing APIs designed
19//! for use within an UPSERT operator. There is one exception to this: `consolidate_chunk`
20//! implements an efficient upsert-like transformation to re-index a collection using the
21//! _output collection_ of an upsert transformation. More on this below.
22//!
23//! ## `UpsertState`
24//!
25//! Its primary export is `UpsertState`, which wraps an `UpsertStateBackend` and provides 3 APIs:
26//!
27//! ### `multi_get`
28//! `multi_get` returns the current value for a (unique) set of keys. To keep implementations
29//! efficient, the set of keys is an iterator, and results are written back into another parallel
30//! iterator. In addition to returning the current values, implementations must also return the
31//! _size_ of those values _as they are stored within the implementation_. Implementations are
32//! required to chunk large iterators if they need to operate over smaller batches.
33//!
34//! `multi_get` is implemented directly with `UpsertStateBackend::multi_get`.
35//!
36//! ### `multi_put`
37//! Update or delete values for a set of keys. To keep implementations efficient, the set
38//! of updates is an iterator. Implementations are also required to return the difference
39//! in values and total size after processing the updates. To simplify this (and because
40//! in the `upsert` usecase we have this data readily available), the updates are input
41//! with the size of the current value (if any) that was returned from a previous `multi_get`.
42//! Implementations are required to chunk large iterators if they need to operate over smaller
43//! batches.
44//!
45//! `multi_put` is implemented directly with `UpsertStateBackend::multi_put`.
46//!
47//! ### `consolidate_chunk`
48//!
49//! `consolidate_chunk` re-indexes an UPSERT collection based on its _output collection_ (as
50//! opposed to its _input `Stream`_. Please see the docs on `consolidate_chunk` and `StateValue`
51//! for more information.
52//!
53//! `consolidate_chunk` is implemented with both `UpsertStateBackend::multi_put` and
54//! `UpsertStateBackend::multi_get`
55//!
56//! ## Order Keys
57//!
58//! In practice, the input stream for UPSERT collections includes an _order key_. This is used to
59//! sort data with the same key occurring in the same timestamp. This module provides support
60//! for serializing and deserializing order keys with their associated data. Being able to ingest
61//! data on non-frontier boundaries requires this support.
62//!
63//! A consequence of this is that tombstones with an order key can be stored within the state.
64//! There is currently no support for cleaning these tombstones up, as they are considered rare and
65//! small enough.
66//!
67//! Because `consolidate_chunk` handles data that consolidates correctly, it does not handle
68//! order keys.
69//!
70//!
71//! ## A note on state size
72//!
73//! The `UpsertStateBackend` trait requires implementations report _relatively accurate_ information about
74//! how the state size changes over time. Note that it does NOT ask the implementations to give
75//! accurate information about actual resource consumption (like disk space including space
76//! amplification), and instead is just asking about the size of the values, after they have been
77//! encoded. For implementations like `RocksDB`, these may be highly accurate (it literally
78//! reports the encoded size as written to the RocksDB API, and for others like the
79//! `InMemoryHashMap`, they may be rough estimates of actual memory usage. See
80//! `StateValue::memory_size` for more information.
81//!
82//! Note also that after consolidation, additional space may be used if `StateValue` is
83//! used.
84//!
85
86use std::fmt;
87use std::num::Wrapping;
88use std::sync::Arc;
89use std::time::Instant;
90
91use bincode::Options;
92use itertools::Itertools;
93use mz_ore::error::ErrorExt;
94use mz_repr::{Diff, GlobalId};
95use serde::{Serialize, de::DeserializeOwned};
96
97use crate::metrics::upsert::{UpsertMetrics, UpsertSharedMetrics};
98use crate::statistics::SourceStatistics;
99use crate::upsert::{UpsertKey, UpsertValue};
100
101/// The default set of `bincode` options used for consolidating
102/// upsert updates (and writing values to RocksDB).
103pub type BincodeOpts = bincode::config::DefaultOptions;
104
105/// Build the default `BincodeOpts`.
106pub fn upsert_bincode_opts() -> BincodeOpts {
107 // We don't allow trailing bytes, for now,
108 // and use varint encoding for space saving.
109 bincode::DefaultOptions::new()
110}
111
112/// The result type for `multi_get`.
113/// The value and size are stored in individual `Option`s so callees
114/// can reuse this value as they overwrite this value, keeping
115/// track of the previous metadata. Additionally, values
116/// may be `None` for tombstones.
117#[derive(Clone)]
118pub struct UpsertValueAndSize<T, O> {
119 /// The value, if there was one.
120 pub value: Option<StateValue<T, O>>,
121 /// The size of original`value` as persisted,
122 /// Useful for users keeping track of statistics.
123 pub metadata: Option<ValueMetadata<u64>>,
124}
125
126impl<T, O> std::fmt::Debug for UpsertValueAndSize<T, O> {
127 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
128 f.debug_struct("UpsertValueAndSize")
129 .field("value", &self.value)
130 .field("metadata", &self.metadata)
131 .finish()
132 }
133}
134
135impl<T, O> Default for UpsertValueAndSize<T, O> {
136 fn default() -> Self {
137 Self {
138 value: None,
139 metadata: None,
140 }
141 }
142}
143
144/// Metadata about an existing value in the upsert state backend, as returned
145/// by `multi_get`.
146#[derive(Copy, Clone, Debug)]
147pub struct ValueMetadata<S> {
148 /// The size of the value.
149 pub size: S,
150 /// If the value is a tombstone.
151 pub is_tombstone: bool,
152}
153
154/// A value to put in with `multi_put`.
155#[derive(Clone, Debug)]
156pub struct PutValue<V> {
157 /// The new value, or a `None` to indicate a delete.
158 pub value: Option<V>,
159 /// The value of the previous value for this key, if known.
160 /// Passed into efficiently calculate statistics.
161 pub previous_value_metadata: Option<ValueMetadata<i64>>,
162}
163
164/// A value to put in with a `multi_merge`.
165pub struct MergeValue<V> {
166 /// The value of the merge operand to write to the backend.
167 pub value: V,
168 /// The 'diff' of this merge operand value, used to estimate the overall size diff
169 /// of the working set after this merge operand is merged by the backend.
170 pub diff: Diff,
171}
172
173/// `UpsertState` has 2 modes:
174/// - Normal operation
175/// - Consolidation.
176///
177/// This struct and its substructs are helpers to simplify the logic that
178/// individual `UpsertState` implementations need to do to manage these 2 modes.
179///
180/// Normal operation is simple, we just store an ordinary `UpsertValue`, and allow the implementer
181/// to store it any way they want. During consolidation, the logic is more complex.
182/// See the docs on `StateValue::merge_update` for more information.
183///
184/// Note also that this type is designed to support _partial updates_. All values are
185/// associated with an _order key_ `O` that can be used to determine if a value existing in the
186/// `UpsertStateBackend` occurred before or after a value being considered for insertion.
187///
188/// `O` typically required to be `: Default`, with the default value sorting below all others.
189/// During consolidation, values consolidate correctly (as they are actual
190/// differential updates with diffs), so order keys are not required.
191#[derive(Clone, serde::Serialize, serde::Deserialize)]
192pub enum StateValue<T, O> {
193 Consolidating(Consolidating),
194 Value(Value<T, O>),
195}
196
197impl<T, O> std::fmt::Debug for StateValue<T, O> {
198 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
199 match self {
200 StateValue::Consolidating(c) => std::fmt::Display::fmt(c, f),
201 StateValue::Value(_) => write!(f, "Value"),
202 }
203 }
204}
205
206/// A totally consolidated value stored within the `UpsertStateBackend`.
207///
208/// This type contains support for _tombstones_, that contain an _order key_,
209/// and provisional values.
210///
211/// What is considered finalized and provisional depends on the implementation
212/// of the UPSERT operator: it might consider everything that it writes to its
213/// state finalized, and assume that what it emits will be written down in the
214/// output exactly as presented. Or it might consider everything it writes down
215/// provisional, and only consider updates that it _knows_ to be persisted as
216/// finalized.
217///
218/// Provisional values should only be considered while still "working off"
219/// updates with the same timestamp at which the provisional update was
220/// recorded.
221#[derive(Clone, serde::Serialize, serde::Deserialize, Debug)]
222pub struct Value<T, O> {
223 /// The finalized value of a key is the value we know to be correct for the last complete
224 /// timestamp that got processed. A finalized value of None means that the key has been deleted
225 /// and acts as a tombstone.
226 pub finalized: Option<UpsertValue>,
227 /// When `Some(_)` it contains the upsert value has been processed for a yet incomplete
228 /// timestamp. When None, no provisional update has been emitted yet.
229 pub provisional: Option<ProvisionalValue<T, O>>,
230}
231
232/// A provisional value emitted for a timestamp. This struct contains enough information to
233#[derive(Clone, serde::Serialize, serde::Deserialize, Debug)]
234pub struct ProvisionalValue<T, O> {
235 /// The timestamp at which this provisional value occured at
236 pub timestamp: T,
237 /// The order of this upsert command *within* the timestamp. Commands that happen at the same
238 /// timestamp with lower order get ignored. Commands with higher order override this one. If
239 /// there a case of equal order then the value itself is used as a tie breaker.
240 pub order: O,
241 /// The provisional value. A provisional value of None means that the key has been deleted and
242 /// acts as a tombstone.
243 pub value: Option<UpsertValue>,
244}
245
246/// A value as produced during consolidation.
247#[derive(Clone, Default, serde::Serialize, serde::Deserialize, Debug)]
248pub struct Consolidating {
249 #[serde(with = "serde_bytes")]
250 value_xor: Vec<u8>,
251 len_sum: Wrapping<i64>,
252 checksum_sum: Wrapping<i64>,
253 diff_sum: Wrapping<i64>,
254}
255
256impl fmt::Display for Consolidating {
257 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
258 f.debug_struct("Consolidating")
259 .field("len_sum", &self.len_sum)
260 .field("checksum_sum", &self.checksum_sum)
261 .field("diff_sum", &self.diff_sum)
262 .finish_non_exhaustive()
263 }
264}
265
266impl<T, O> StateValue<T, O> {
267 /// A finalized, that is (assumed) persistent, value.
268 pub fn finalized_value(value: UpsertValue) -> Self {
269 Self::Value(Value {
270 finalized: Some(value),
271 provisional: None,
272 })
273 }
274
275 #[allow(unused)]
276 /// A tombstoned value.
277 pub fn tombstone() -> Self {
278 Self::Value(Value {
279 finalized: None,
280 provisional: None,
281 })
282 }
283
284 /// Whether the value is a tombstone.
285 pub fn is_tombstone(&self) -> bool {
286 match self {
287 Self::Value(value) => value.finalized.is_none(),
288 _ => false,
289 }
290 }
291
292 /// Pull out the `Value` value for a `StateValue`, after `ensure_decoded` has been called.
293 pub fn into_decoded(self) -> Value<T, O> {
294 match self {
295 Self::Value(value) => value,
296 _ => panic!("called `into_decoded without calling `ensure_decoded`"),
297 }
298 }
299
300 /// The size of a `StateValue`, in memory. This is:
301 /// 1. only used in the `InMemoryHashMap` implementation.
302 /// 2. An estimate (it only looks at value sizes, and not errors)
303 ///
304 /// Other implementations may use more accurate accounting.
305 #[cfg(test)]
306 pub fn memory_size(&self) -> usize {
307 use mz_repr::Row;
308 use std::mem::size_of;
309
310 let heap_size = match self {
311 Self::Consolidating(Consolidating { value_xor, .. }) => value_xor.len(),
312 Self::Value(value) => {
313 let finalized_heap_size = match value.finalized {
314 Some(Ok(ref row)) => {
315 // `Row::byte_len` includes the size of `Row`, which is also in `Self`, so we
316 // subtract it.
317 row.byte_len() - size_of::<Row>()
318 }
319 // Assume errors are rare enough to not move the needle.
320 _ => 0,
321 };
322 let provisional_heap_size = match value.provisional {
323 Some(ref provisional) => match provisional.value {
324 Some(Ok(ref row)) => {
325 // `Row::byte_len` includes the size of `Row`, which is also in `Self`, so we
326 // subtract it.
327 row.byte_len() - size_of::<Row>()
328 }
329 // Assume errors are rare enough to not move the needle.
330 _ => 0,
331 },
332 None => 0,
333 };
334 finalized_heap_size + provisional_heap_size
335 }
336 };
337 heap_size + size_of::<Self>()
338 }
339}
340
341impl<T: Eq, O> StateValue<T, O> {
342 /// Creates a new provisional value, occurring at some order key, observed
343 /// at the given timestamp.
344 pub fn new_provisional_value(value: UpsertValue, timestamp: T, order: O) -> Self {
345 Self::Value(Value {
346 finalized: None,
347 provisional: Some(ProvisionalValue {
348 value: Some(value),
349 timestamp,
350 order,
351 }),
352 })
353 }
354
355 /// Creates a provisional value, that retains the finalized value in this `StateValue`.
356 pub fn into_provisional_value(self, value: UpsertValue, timestamp: T, order: O) -> Self {
357 match self {
358 StateValue::Value(finalized) => Self::Value(Value {
359 finalized: finalized.finalized,
360 provisional: Some(ProvisionalValue {
361 value: Some(value),
362 timestamp,
363 order,
364 }),
365 }),
366 StateValue::Consolidating(_) => {
367 panic!("called `into_provisional_value` without calling `ensure_decoded`")
368 }
369 }
370 }
371
372 /// Creates a new provisional tombstone occurring at some order key,
373 /// observed at the given timestamp.
374 pub fn new_provisional_tombstone(timestamp: T, order: O) -> Self {
375 Self::Value(Value {
376 finalized: None,
377 provisional: Some(ProvisionalValue {
378 value: None,
379 timestamp,
380 order,
381 }),
382 })
383 }
384
385 /// Creates a provisional tombstone, that retains the finalized value in this `StateValue`.
386 ///
387 /// We record the current finalized value, so that we can present it when
388 /// needed or when trying to read a provisional value at a different
389 /// timestamp.
390 pub fn into_provisional_tombstone(self, timestamp: T, order: O) -> Self {
391 match self {
392 StateValue::Value(finalized) => Self::Value(Value {
393 finalized: finalized.finalized,
394 provisional: Some(ProvisionalValue {
395 value: None,
396 timestamp,
397 order,
398 }),
399 }),
400 StateValue::Consolidating(_) => {
401 panic!("called `into_provisional_tombstone` without calling `ensure_decoded`")
402 }
403 }
404 }
405
406 /// Returns the order of a provisional value at the given timestamp, if any.
407 pub fn provisional_order(&self, ts: &T) -> Option<&O> {
408 match self {
409 Self::Value(value) => match &value.provisional {
410 Some(p) if &p.timestamp == ts => Some(&p.order),
411 _ => None,
412 },
413 Self::Consolidating(_) => {
414 panic!("called `provisional_order` without calling `ensure_decoded`")
415 }
416 }
417 }
418
419 /// Returns the provisional value, if one is present at the given timestamp.
420 /// Falls back to the finalized value, or `None` if there is neither.
421 pub fn provisional_value_ref(&self, ts: &T) -> Option<&UpsertValue> {
422 match self {
423 Self::Value(value) => match &value.provisional {
424 Some(p) if &p.timestamp == ts => p.value.as_ref(),
425 _ => value.finalized.as_ref(),
426 },
427 Self::Consolidating(_) => {
428 panic!("called `provisional_value_ref` without calling `ensure_decoded`")
429 }
430 }
431 }
432
433 /// Returns the the finalized value, if one is present.
434 pub fn into_finalized_value(self) -> Option<UpsertValue> {
435 match self {
436 Self::Value(v) => v.finalized,
437 _ => panic!("called `into_finalized_value` without calling `ensure_decoded`"),
438 }
439 }
440}
441
442impl<T: Eq, O> StateValue<T, O> {
443 /// We use a XOR trick in order to accumulate the values without having to store the full
444 /// unconsolidated history in memory. For all (value, diff) updates of a key we track:
445 /// - diff_sum = SUM(diff)
446 /// - checksum_sum = SUM(checksum(bincode(value)) * diff)
447 /// - len_sum = SUM(len(bincode(value)) * diff)
448 /// - value_xor = XOR(bincode(value))
449 ///
450 /// ## Return value
451 /// Returns a `bool` indicating whether or not the current merged value is able to be deleted.
452 ///
453 /// ## Correctness
454 ///
455 /// The method is correct because a well formed upsert collection at a given
456 /// timestamp will have for each key:
457 /// - Zero or one updates of the form (cur_value, +1)
458 /// - Zero or more pairs of updates of the form (prev_value, +1), (prev_value, -1)
459 ///
460 /// We are interested in extracting the cur_value of each key and discard all prev_values
461 /// that might be included in the stream. Since the history of prev_values always comes in
462 /// pairs, computing the XOR of those is always going to cancel their effects out. Also,
463 /// since XOR is commutative this property is true independent of the order. The same is
464 /// true for the summations of the length and checksum since the sum will contain the
465 /// unrelated values zero times.
466 ///
467 /// Therefore the accumulators will end up precisely in one of two states:
468 /// 1. diff == 0, checksum == 0, value == [0..] => the key is not present
469 /// 2. diff == 1, checksum == checksum(cur_value) value == cur_value => the key is present
470 ///
471 /// ## Robustness
472 ///
473 /// In the absense of bugs, accumulating the diff and checksum is not required since we know
474 /// that a well formed collection always satisfies XOR(bincode(values)) == bincode(cur_value).
475 /// However bugs may happen and so storing 16 more bytes per key to have a very high
476 /// guarantee that we're not decoding garbage is more than worth it.
477 /// The main key->value used to store previous values.
478 #[allow(clippy::as_conversions)]
479 pub fn merge_update(
480 &mut self,
481 value: UpsertValue,
482 diff: mz_repr::Diff,
483 bincode_opts: BincodeOpts,
484 bincode_buffer: &mut Vec<u8>,
485 ) -> bool {
486 match self {
487 Self::Consolidating(Consolidating {
488 value_xor,
489 len_sum,
490 checksum_sum,
491 diff_sum,
492 }) => {
493 bincode_buffer.clear();
494 bincode_opts
495 .serialize_into(&mut *bincode_buffer, &value)
496 .unwrap();
497 let len = i64::try_from(bincode_buffer.len()).unwrap();
498
499 *diff_sum += diff.into_inner();
500 *len_sum += len.wrapping_mul(diff.into_inner());
501 // Truncation is fine (using `as`) as this is just a checksum
502 *checksum_sum +=
503 (seahash::hash(&*bincode_buffer) as i64).wrapping_mul(diff.into_inner());
504
505 // XOR of even diffs cancel out, so we only do it if diff is odd
506 if diff.abs() % Diff::from(2) == Diff::ONE {
507 if value_xor.len() < bincode_buffer.len() {
508 value_xor.resize(bincode_buffer.len(), 0);
509 }
510 // Note that if the new value is _smaller_ than the `value_xor`, and
511 // the values at the end are zeroed out, we can shrink the buffer. This
512 // is extremely sensitive code, so we don't (yet) do that.
513 for (acc, val) in value_xor[0..bincode_buffer.len()]
514 .iter_mut()
515 .zip_eq(bincode_buffer.drain(..))
516 {
517 *acc ^= val;
518 }
519 }
520
521 // Returns whether or not the value can be deleted. This allows
522 // us to delete values in `UpsertState::consolidate_chunk` (even
523 // if they come back later), to minimize space usage.
524 diff_sum.0 == 0 && checksum_sum.0 == 0 && value_xor.iter().all(|&x| x == 0)
525 }
526 StateValue::Value(_value) => {
527 // We can turn a Value back into a Consolidating state:
528 // `std::mem::take` will leave behind a default value, which
529 // happens to be a default `Consolidating` `StateValue`.
530 let this = std::mem::take(self);
531
532 let finalized_value = this.into_finalized_value();
533 if let Some(finalized_value) = finalized_value {
534 // If we had a value before, merge it into the
535 // now-consolidating state first.
536 let _ =
537 self.merge_update(finalized_value, Diff::ONE, bincode_opts, bincode_buffer);
538
539 // Then merge the new value in.
540 self.merge_update(value, diff, bincode_opts, bincode_buffer)
541 } else {
542 // We didn't have a value before, might have been a
543 // tombstone. So just merge in the new value.
544 self.merge_update(value, diff, bincode_opts, bincode_buffer)
545 }
546 }
547 }
548 }
549
550 /// Merge an existing StateValue into this one, using the same method described in `merge_update`.
551 /// See the docstring above for more information on correctness and robustness.
552 pub fn merge_update_state(&mut self, other: &Self) {
553 match (self, other) {
554 (
555 Self::Consolidating(Consolidating {
556 value_xor,
557 len_sum,
558 checksum_sum,
559 diff_sum,
560 }),
561 Self::Consolidating(other_consolidating),
562 ) => {
563 *diff_sum += other_consolidating.diff_sum;
564 *len_sum += other_consolidating.len_sum;
565 *checksum_sum += other_consolidating.checksum_sum;
566 if other_consolidating.value_xor.len() > value_xor.len() {
567 value_xor.resize(other_consolidating.value_xor.len(), 0);
568 }
569 for (acc, val) in value_xor[0..other_consolidating.value_xor.len()]
570 .iter_mut()
571 .zip_eq(other_consolidating.value_xor.iter())
572 {
573 *acc ^= val;
574 }
575 }
576 _ => panic!("`merge_update_state` called with non-consolidating state"),
577 }
578 }
579
580 /// During and after consolidation, we assume that values in the `UpsertStateBackend` implementation
581 /// can be `Self::Consolidating`, with a `diff_sum` of 1 (or 0, if they have been deleted).
582 /// Afterwards, if we need to retract one of these values, we need to assert that its in this correct state,
583 /// then mutate it to its `Value` state, so the `upsert` operator can use it.
584 #[allow(clippy::as_conversions)]
585 pub fn ensure_decoded(
586 &mut self,
587 bincode_opts: BincodeOpts,
588 source_id: GlobalId,
589 key: Option<&UpsertKey>,
590 ) {
591 match self {
592 StateValue::Consolidating(consolidating) => {
593 match consolidating.diff_sum.0 {
594 1 => {
595 let len = usize::try_from(consolidating.len_sum.0)
596 .map_err(|_| {
597 format!(
598 "len_sum can't be made into a usize, state: {}, {}",
599 consolidating, source_id,
600 )
601 })
602 .expect("invalid upsert state");
603 let value = &consolidating
604 .value_xor
605 .get(..len)
606 .ok_or_else(|| {
607 format!(
608 "value_xor is not the same length ({}) as len ({}), state: {}, {}",
609 consolidating.value_xor.len(),
610 len,
611 consolidating,
612 source_id,
613 )
614 })
615 .expect("invalid upsert state");
616 // Truncation is fine (using `as`) as this is just a checksum
617 assert_eq!(
618 consolidating.checksum_sum.0,
619 // Hash the value, not the full buffer, which may have extra 0's
620 seahash::hash(value) as i64,
621 "invalid upsert state: checksum_sum does not match, state: {}, {}",
622 consolidating,
623 source_id,
624 );
625 *self = Self::finalized_value(bincode_opts.deserialize(value).unwrap());
626 }
627 0 => {
628 assert_eq!(
629 consolidating.len_sum.0, 0,
630 "invalid upsert state: len_sum is non-0, state: {}, {}",
631 consolidating, source_id,
632 );
633 assert_eq!(
634 consolidating.checksum_sum.0, 0,
635 "invalid upsert state: checksum_sum is non-0, state: {}, {}",
636 consolidating, source_id,
637 );
638 assert!(
639 consolidating.value_xor.iter().all(|&x| x == 0),
640 "invalid upsert state: value_xor not all 0s with 0 diff. \
641 Non-zero positions: {:?}, state: {}, {}",
642 consolidating
643 .value_xor
644 .iter()
645 .positions(|&x| x != 0)
646 .collect::<Vec<_>>(),
647 consolidating,
648 source_id,
649 );
650 *self = Self::tombstone();
651 }
652 other => {
653 // If diff_sum is odd, value_xor holds the bincode of a
654 // single value (even XORs cancel out). Try to decode it
655 // so we can log the shape (not contents) for debugging.
656 let value_byte_len = usize::try_from(consolidating.len_sum.0 / other).ok();
657 let decode_ok = value_byte_len
658 .and_then(|l| consolidating.value_xor.get(..l))
659 .and_then(|bytes| bincode_opts.deserialize::<UpsertValue>(bytes).ok())
660 .map(|v| match v {
661 Ok(row) => format!(
662 "Ok(Row(byte_len={}, col_count={}))",
663 row.byte_len(),
664 row.iter().count(),
665 ),
666 Err(_) => "Err(UpsertValueError)".to_string(),
667 });
668 panic!(
669 "invalid upsert state: non 0/1 diff_sum: {}, state: {}, {}, \
670 key: {:?}, value_byte_len: {:?}, decodable: {:?}",
671 other, consolidating, source_id, key, value_byte_len, decode_ok,
672 )
673 }
674 }
675 }
676 _ => {}
677 }
678 }
679}
680
681impl<T, O> Default for StateValue<T, O> {
682 fn default() -> Self {
683 Self::Consolidating(Consolidating::default())
684 }
685}
686
687/// Statistics for a single call to `consolidate_chunk`.
688#[derive(Clone, Default, Debug)]
689pub struct SnapshotStats {
690 /// The number of updates processed.
691 pub updates: u64,
692 /// The aggregated number of values inserted or deleted into `state`.
693 pub values_diff: Diff,
694 /// The total aggregated size of values inserted, deleted, or updated in `state`.
695 /// If the current call to `consolidate_chunk` deletes a lot of values,
696 /// or updates values to smaller ones, this can be negative!
697 pub size_diff: i64,
698 /// The number of inserts i.e. +1 diff
699 pub inserts: u64,
700 /// The number of deletes i.e. -1 diffs
701 pub deletes: u64,
702}
703
704impl std::ops::AddAssign for SnapshotStats {
705 fn add_assign(&mut self, rhs: Self) {
706 self.updates += rhs.updates;
707 self.values_diff += rhs.values_diff;
708 self.size_diff += rhs.size_diff;
709 self.inserts += rhs.inserts;
710 self.deletes += rhs.deletes;
711 }
712}
713
714/// Statistics for a single call to `multi_merge`.
715#[derive(Clone, Default, Debug)]
716pub struct MergeStats {
717 /// The number of updates written as merge operands to the backend, for the backend
718 /// to process async in the `consolidating_merge_function`.
719 /// Should be equal to number of inserts + deletes
720 pub written_merge_operands: u64,
721 /// The total size of values provided to `multi_merge`. The backend will write these
722 /// down and then later merge them in the `consolidating_merge_function`.
723 pub size_written: u64,
724 /// The estimated diff of the total size of the working set after the merge operands
725 /// are merged by the backend. This is an estimate since it can't account for the
726 /// size overhead of `StateValue` for values that consolidate to 0 (tombstoned-values).
727 pub size_diff: i64,
728}
729
730/// Statistics for a single call to `multi_put`.
731#[derive(Clone, Default, Debug)]
732pub struct PutStats {
733 /// The number of puts/deletes processed
734 /// Should be equal to number of inserts + updates + deletes
735 pub processed_puts: u64,
736 /// The aggregated number of non-tombstone values inserted or deleted into `state`.
737 pub values_diff: i64,
738 /// The aggregated number of tombstones inserted or deleted into `state`
739 pub tombstones_diff: i64,
740 /// The total aggregated size of values inserted, deleted, or updated in `state`.
741 /// If the current call to `multi_put` deletes a lot of values,
742 /// or updates values to smaller ones, this can be negative!
743 pub size_diff: i64,
744 /// The number of inserts
745 pub inserts: u64,
746 /// The number of updates
747 pub updates: u64,
748 /// The number of deletes
749 pub deletes: u64,
750}
751
752impl PutStats {
753 /// Adjust the `PutStats` based on the new value and the previous metadata.
754 ///
755 /// The size parameter is separate as its value is backend-dependent. Its optional
756 /// as some backends increase the total size after an entire batch is processed.
757 ///
758 /// This method is provided for implementors of `UpsertStateBackend::multi_put`.
759 pub fn adjust<T, O>(
760 &mut self,
761 new_value: Option<&StateValue<T, O>>,
762 new_size: Option<i64>,
763 previous_metdata: &Option<ValueMetadata<i64>>,
764 ) {
765 self.adjust_size(new_value, new_size, previous_metdata);
766 self.adjust_values(new_value, previous_metdata);
767 self.adjust_tombstone(new_value, previous_metdata);
768 }
769
770 fn adjust_size<T, O>(
771 &mut self,
772 new_value: Option<&StateValue<T, O>>,
773 new_size: Option<i64>,
774 previous_metdata: &Option<ValueMetadata<i64>>,
775 ) {
776 match (&new_value, previous_metdata.as_ref()) {
777 (Some(_), Some(ps)) => {
778 self.size_diff -= ps.size;
779 if let Some(new_size) = new_size {
780 self.size_diff += new_size;
781 }
782 }
783 (None, Some(ps)) => {
784 self.size_diff -= ps.size;
785 }
786 (Some(_), None) => {
787 if let Some(new_size) = new_size {
788 self.size_diff += new_size;
789 }
790 }
791 (None, None) => {}
792 }
793 }
794
795 fn adjust_values<T, O>(
796 &mut self,
797 new_value: Option<&StateValue<T, O>>,
798 previous_metdata: &Option<ValueMetadata<i64>>,
799 ) {
800 let truly_new_value = new_value.map_or(false, |v| !v.is_tombstone());
801 let truly_old_value = previous_metdata.map_or(false, |v| !v.is_tombstone);
802
803 match (truly_new_value, truly_old_value) {
804 (false, true) => {
805 self.values_diff -= 1;
806 }
807 (true, false) => {
808 self.values_diff += 1;
809 }
810 _ => {}
811 }
812 }
813
814 fn adjust_tombstone<T, O>(
815 &mut self,
816 new_value: Option<&StateValue<T, O>>,
817 previous_metdata: &Option<ValueMetadata<i64>>,
818 ) {
819 let new_tombstone = new_value.map_or(false, |v| v.is_tombstone());
820 let old_tombstone = previous_metdata.map_or(false, |v| v.is_tombstone);
821
822 match (new_tombstone, old_tombstone) {
823 (false, true) => {
824 self.tombstones_diff -= 1;
825 }
826 (true, false) => {
827 self.tombstones_diff += 1;
828 }
829 _ => {}
830 }
831 }
832}
833
834/// Statistics for a single call to `multi_get`.
835#[derive(Clone, Default, Debug)]
836pub struct GetStats {
837 /// The number of gets processed
838 pub processed_gets: u64,
839 /// The total size in bytes returned
840 pub processed_gets_size: u64,
841 /// The number of non-empty records returned
842 pub returned_gets: u64,
843}
844
845/// A trait that defines the fundamental primitives required by a state-backing of
846/// `UpsertState`.
847///
848/// Implementors of this trait are blind maps that associate keys and values. They need
849/// not understand the semantics of `StateValue`, tombstones, or anything else related
850/// to a correct `upsert` implementation. The singular exception to this is that they
851/// **must** produce accurate `PutStats` and `GetStats`. The reasoning for this is two-fold:
852/// - efficiency: this avoids additional buffer allocation.
853/// - value sizes: only the backend implementation understands the size of values as recorded
854///
855/// This **must** is not a correctness requirement (we won't panic when emitting statistics), but
856/// rather a requirement to ensure the upsert operator is introspectable.
857#[async_trait::async_trait(?Send)]
858pub trait UpsertStateBackend<T, O>
859where
860 T: 'static,
861 O: 'static,
862{
863 /// Whether this backend supports the `multi_merge` operation.
864 fn supports_merge(&self) -> bool;
865
866 /// Insert or delete for all `puts` keys, prioritizing the last value for
867 /// repeated keys.
868 ///
869 /// The `PutValue` is _guaranteed_ to have an accurate and up-to-date
870 /// record of the metadata for existing value for the given key (if one existed),
871 /// as reported by a previous call to `multi_get`.
872 ///
873 /// `PutStats` **must** be populated correctly, according to these semantics:
874 /// - `values_diff` must record the difference in number of new non-tombstone values being
875 /// inserted into the backend.
876 /// - `tombstones_diff` must record the difference in number of tombstone values being
877 /// inserted into the backend.
878 /// - `size_diff` must record the change in size for the values being inserted/deleted/updated
879 /// in the backend, regardless of whether the values are tombstones or not.
880 async fn multi_put<P>(&mut self, puts: P) -> Result<PutStats, anyhow::Error>
881 where
882 P: IntoIterator<Item = (UpsertKey, PutValue<StateValue<T, O>>)>;
883
884 /// Get the `gets` keys, which must be unique, placing the results in `results_out`.
885 ///
886 /// Panics if `gets` and `results_out` are not the same length.
887 async fn multi_get<'r, G, R>(
888 &mut self,
889 gets: G,
890 results_out: R,
891 ) -> Result<GetStats, anyhow::Error>
892 where
893 G: IntoIterator<Item = UpsertKey>,
894 R: IntoIterator<Item = &'r mut UpsertValueAndSize<T, O>>;
895
896 /// For each key in `merges` writes a 'merge operand' to the backend. The backend stores these
897 /// merge operands and periodically calls the `consolidating_merge_function` to merge them into
898 /// any existing value for each key. The backend will merge the merge operands in the order
899 /// they are provided, and the merge function will always be run for a given key when a `get`
900 /// operation is performed on that key, or when the backend decides to run the merge based
901 /// on its own internal logic.
902 /// This allows avoiding the read-modify-write method of updating many values to
903 /// improve performance.
904 ///
905 /// The `MergeValue` should include a `diff` field that represents the update diff for the
906 /// value. This is used to estimate the overall size diff of the working set
907 /// after the merge operands are merged by the backend `sum[merges: m](m.diff * m.size)`.
908 ///
909 /// `MergeStats` **must** be populated correctly, according to these semantics:
910 /// - `written_merge_operands` must record the number of merge operands written to the backend.
911 /// - `size_written` must record the total size of values written to the backend.
912 /// Note that the size of the post-merge values are not known, so this is the size of the
913 /// values written to the backend as merge operands.
914 /// - `size_diff` must record the estimated diff of the total size of the working set after the
915 /// merge operands are merged by the backend.
916 async fn multi_merge<P>(&mut self, merges: P) -> Result<MergeStats, anyhow::Error>
917 where
918 P: IntoIterator<Item = (UpsertKey, MergeValue<StateValue<T, O>>)>;
919}
920
921/// A function that merges a set of updates for a key into the existing value
922/// for the key. This is called by the backend implementation when it has
923/// accumulated a set of updates for a key, and needs to merge them into the
924/// existing value for the key.
925///
926/// The function is called with the following arguments:
927/// - The key for which the merge is being performed.
928/// - An iterator over any current value and merge operands queued for the key.
929///
930/// The function should return the new value for the key after merging all the updates.
931pub(crate) fn consolidating_merge_function<T: Eq, O>(
932 _key: UpsertKey,
933 updates: impl Iterator<Item = StateValue<T, O>>,
934) -> StateValue<T, O> {
935 let mut current: StateValue<T, O> = Default::default();
936
937 let mut bincode_buf = Vec::new();
938 for update in updates {
939 match update {
940 StateValue::Consolidating(_) => {
941 current.merge_update_state(&update);
942 }
943 StateValue::Value(_) => {
944 // This branch is more expensive, but we hopefully rarely hit
945 // it.
946 if let Some(finalized_value) = update.into_finalized_value() {
947 let mut update = StateValue::default();
948 update.merge_update(
949 finalized_value,
950 Diff::ONE,
951 upsert_bincode_opts(),
952 &mut bincode_buf,
953 );
954 current.merge_update_state(&update);
955 }
956 }
957 }
958 }
959
960 current
961}
962
963/// An `UpsertStateBackend` wrapper that supports consolidating merging, and
964/// reports basic metrics about the usage of the `UpsertStateBackend`.
965pub struct UpsertState<'metrics, S, T, O> {
966 inner: S,
967
968 // The status, start time, and stats about calls to `consolidate_chunk`.
969 pub snapshot_start: Instant,
970 snapshot_stats: SnapshotStats,
971 snapshot_completed: bool,
972
973 // Metrics shared across all workers running the `upsert` operator.
974 metrics: Arc<UpsertSharedMetrics>,
975 // Metrics for a specific worker.
976 worker_metrics: &'metrics UpsertMetrics,
977 // User-facing statistics.
978 stats: SourceStatistics,
979
980 // Bincode options and buffer used in `consolidate_chunk`.
981 bincode_opts: BincodeOpts,
982 bincode_buffer: Vec<u8>,
983
984 // We need to iterate over `updates` in `consolidate_chunk` twice, so we
985 // have a scratch vector for this.
986 consolidate_scratch: Vec<(UpsertKey, UpsertValue, mz_repr::Diff)>,
987 // "mini-upsert" map used in `consolidate_chunk`
988 consolidate_upsert_scratch: indexmap::IndexMap<UpsertKey, UpsertValueAndSize<T, O>>,
989 // a scratch vector for calling `multi_get`
990 multi_get_scratch: Vec<UpsertKey>,
991 shrink_upsert_unused_buffers_by_ratio: usize,
992}
993
994impl<'metrics, S, T, O> UpsertState<'metrics, S, T, O> {
995 pub(crate) fn new(
996 inner: S,
997 metrics: Arc<UpsertSharedMetrics>,
998 worker_metrics: &'metrics UpsertMetrics,
999 stats: SourceStatistics,
1000 shrink_upsert_unused_buffers_by_ratio: usize,
1001 ) -> Self {
1002 Self {
1003 inner,
1004 snapshot_start: Instant::now(),
1005 snapshot_stats: SnapshotStats::default(),
1006 snapshot_completed: false,
1007 metrics,
1008 worker_metrics,
1009 stats,
1010 bincode_opts: upsert_bincode_opts(),
1011 bincode_buffer: Vec::new(),
1012 consolidate_scratch: Vec::new(),
1013 consolidate_upsert_scratch: indexmap::IndexMap::new(),
1014 multi_get_scratch: Vec::new(),
1015 shrink_upsert_unused_buffers_by_ratio,
1016 }
1017 }
1018}
1019
1020impl<S, T, O> UpsertState<'_, S, T, O>
1021where
1022 S: UpsertStateBackend<T, O>,
1023 T: Eq + Clone + Send + Sync + Serialize + 'static,
1024 O: Clone + Send + Sync + Serialize + DeserializeOwned + 'static,
1025{
1026 /// Consolidate the following differential updates into the state. Updates
1027 /// provided to this method can be assumed to consolidate into a single
1028 /// value per-key, after all chunks of updates for a given timestamp have
1029 /// been processed,
1030 ///
1031 /// Therefore, after all updates of a given timestamp have been
1032 /// `consolidated`, all values must be in the correct state (as determined
1033 /// by `StateValue::ensure_decoded`).
1034 ///
1035 /// The `completed` boolean communicates whether or not this is the final
1036 /// chunk of updates for the initial "snapshot" from persist.
1037 ///
1038 /// If the backend supports it, this method will use `multi_merge` to
1039 /// consolidate the updates to avoid having to read the existing value for
1040 /// each key first. On some backends (like RocksDB), this can be
1041 /// significantly faster than the read-then-write consolidation strategy.
1042 ///
1043 /// Also note that we use `self.inner.multi_*`, not `self.multi_*`. This is
1044 /// to avoid erroneously changing metric and stats values.
1045 pub async fn consolidate_chunk<U>(
1046 &mut self,
1047 updates: U,
1048 completed: bool,
1049 ) -> Result<(), anyhow::Error>
1050 where
1051 U: IntoIterator<Item = (UpsertKey, UpsertValue, mz_repr::Diff)> + ExactSizeIterator,
1052 {
1053 fail::fail_point!("fail_consolidate_chunk", |_| {
1054 Err(anyhow::anyhow!("Error consolidating values"))
1055 });
1056
1057 if completed && self.snapshot_completed {
1058 panic!("attempted completion of already completed upsert snapshot")
1059 }
1060
1061 let phase = if !self.snapshot_completed {
1062 "rehydration"
1063 } else {
1064 "steady-state"
1065 };
1066
1067 let now = Instant::now();
1068 let batch_size = updates.len();
1069
1070 tracing::trace!(
1071 %phase,
1072 batch_size,
1073 completed,
1074 "consolidate_chunk: processing batch"
1075 );
1076
1077 self.consolidate_scratch.clear();
1078 self.consolidate_upsert_scratch.clear();
1079 self.multi_get_scratch.clear();
1080
1081 // Shrinking the scratch vectors if the capacity is significantly more than batch size
1082 if self.shrink_upsert_unused_buffers_by_ratio > 0 {
1083 let reduced_capacity =
1084 self.consolidate_scratch.capacity() / self.shrink_upsert_unused_buffers_by_ratio;
1085 if reduced_capacity > batch_size {
1086 // These vectors have already been cleared above and should be empty here
1087 self.consolidate_scratch.shrink_to(reduced_capacity);
1088 self.consolidate_upsert_scratch.shrink_to(reduced_capacity);
1089 self.multi_get_scratch.shrink_to(reduced_capacity);
1090 }
1091 }
1092
1093 // Depending on if the backend supports multi_merge, call the appropriate method.
1094 let stats = if self.inner.supports_merge() {
1095 self.consolidate_merge_inner(updates).await?
1096 } else {
1097 self.consolidate_read_write_inner(updates).await?
1098 };
1099
1100 // NOTE: These metrics use the term `merge` to refer to the consolidation of values.
1101 // This is because they were introduced before we the `multi_merge` operation was added.
1102 self.metrics
1103 .merge_snapshot_latency
1104 .observe(now.elapsed().as_secs_f64());
1105 self.worker_metrics
1106 .merge_snapshot_updates
1107 .inc_by(stats.updates);
1108 self.worker_metrics
1109 .merge_snapshot_inserts
1110 .inc_by(stats.inserts);
1111 self.worker_metrics
1112 .merge_snapshot_deletes
1113 .inc_by(stats.deletes);
1114
1115 self.stats.update_bytes_indexed_by(stats.size_diff);
1116 self.stats
1117 .update_records_indexed_by(stats.values_diff.into_inner());
1118
1119 self.snapshot_stats += stats;
1120
1121 if !self.snapshot_completed {
1122 // Updating the metrics
1123 self.worker_metrics.rehydration_total.set(
1124 self.snapshot_stats
1125 .values_diff
1126 .into_inner()
1127 .try_into()
1128 .unwrap_or_else(|e: std::num::TryFromIntError| {
1129 tracing::warn!(
1130 "rehydration_total metric overflowed or is negative \
1131 and is innacurate: {}. Defaulting to 0",
1132 e.display_with_causes(),
1133 );
1134
1135 0
1136 }),
1137 );
1138 self.worker_metrics
1139 .rehydration_updates
1140 .set(self.snapshot_stats.updates);
1141 }
1142
1143 if completed {
1144 if self.shrink_upsert_unused_buffers_by_ratio > 0 {
1145 // After rehydration is done, these scratch buffers should now be empty
1146 // shrinking them entirely
1147 self.consolidate_scratch.shrink_to_fit();
1148 self.consolidate_upsert_scratch.shrink_to_fit();
1149 self.multi_get_scratch.shrink_to_fit();
1150 }
1151
1152 self.worker_metrics
1153 .rehydration_latency
1154 .set(self.snapshot_start.elapsed().as_secs_f64());
1155
1156 self.snapshot_completed = true;
1157 }
1158 Ok(())
1159 }
1160
1161 /// Consolidate the updates into the state. This method requires the backend
1162 /// has support for the `multi_merge` operation, and will panic if
1163 /// `self.inner.supports_merge()` was not checked before calling this
1164 /// method. `multi_merge` will write the updates as 'merge operands' to the
1165 /// backend, and then the backend will consolidate those updates with any
1166 /// existing state using the `consolidating_merge_function`.
1167 ///
1168 /// This method can have significant performance benefits over the
1169 /// read-then-write method of `consolidate_read_write_inner`.
1170 async fn consolidate_merge_inner<U>(
1171 &mut self,
1172 updates: U,
1173 ) -> Result<SnapshotStats, anyhow::Error>
1174 where
1175 U: IntoIterator<Item = (UpsertKey, UpsertValue, mz_repr::Diff)> + ExactSizeIterator,
1176 {
1177 let mut updates = updates.into_iter().peekable();
1178
1179 let mut stats = SnapshotStats::default();
1180
1181 if updates.peek().is_some() {
1182 let m_stats = self
1183 .inner
1184 .multi_merge(updates.map(|(k, v, diff)| {
1185 // Transform into a `StateValue<O>` that can be used by the
1186 // `consolidating_merge_function` to merge with any existing
1187 // value for the key.
1188 let mut val: StateValue<T, O> = Default::default();
1189 val.merge_update(v, diff, self.bincode_opts, &mut self.bincode_buffer);
1190
1191 stats.updates += 1;
1192 if diff.is_positive() {
1193 stats.inserts += 1;
1194 } else if diff.is_negative() {
1195 stats.deletes += 1;
1196 }
1197
1198 // To keep track of the overall `values_diff` we can use the sum of diffs which
1199 // should be equal to the number of non-tombstoned values in the backend.
1200 // This is a bit misleading as this represents the eventual state after the
1201 // `consolidating_merge_function` has been called to merge all the updates,
1202 // and not the state after this `multi_merge` call.
1203 //
1204 // This does not accurately report values that have been consolidated to diff == 0, as tracking that
1205 // per-key is extremely difficult.
1206 stats.values_diff += diff;
1207
1208 (k, MergeValue { value: val, diff })
1209 }))
1210 .await?;
1211
1212 stats.size_diff = m_stats.size_diff;
1213 }
1214
1215 Ok(stats)
1216 }
1217
1218 /// Consolidates the updates into the state. This method reads the existing
1219 /// values for each key, consolidates the updates, and writes the new values
1220 /// back to the state.
1221 async fn consolidate_read_write_inner<U>(
1222 &mut self,
1223 updates: U,
1224 ) -> Result<SnapshotStats, anyhow::Error>
1225 where
1226 U: IntoIterator<Item = (UpsertKey, UpsertValue, mz_repr::Diff)> + ExactSizeIterator,
1227 {
1228 let mut updates = updates.into_iter().peekable();
1229
1230 let mut stats = SnapshotStats::default();
1231
1232 if updates.peek().is_some() {
1233 self.consolidate_scratch.extend(updates);
1234 self.consolidate_upsert_scratch.extend(
1235 self.consolidate_scratch
1236 .iter()
1237 .map(|(k, _, _)| (*k, UpsertValueAndSize::default())),
1238 );
1239 self.multi_get_scratch
1240 .extend(self.consolidate_upsert_scratch.iter().map(|(k, _)| *k));
1241 self.inner
1242 .multi_get(
1243 self.multi_get_scratch.drain(..),
1244 self.consolidate_upsert_scratch.iter_mut().map(|(_, v)| v),
1245 )
1246 .await?;
1247
1248 for (key, value, diff) in self.consolidate_scratch.drain(..) {
1249 stats.updates += 1;
1250 if diff.is_positive() {
1251 stats.inserts += 1;
1252 } else if diff.is_negative() {
1253 stats.deletes += 1;
1254 }
1255
1256 // We rely on the diffs in our input instead of the result of
1257 // multi_put below. This makes sure we report the same stats as
1258 // `consolidate_merge_inner`, regardless of what values
1259 // there were in state before.
1260 stats.values_diff += diff;
1261
1262 let entry = self.consolidate_upsert_scratch.get_mut(&key).unwrap();
1263 let val = entry.value.get_or_insert_with(Default::default);
1264
1265 if val.merge_update(value, diff, self.bincode_opts, &mut self.bincode_buffer) {
1266 entry.value = None;
1267 }
1268 }
1269
1270 // Note we do 1 `multi_get` and 1 `multi_put` while processing a _batch of updates_.
1271 // Within the batch, we effectively consolidate each key, before persisting that
1272 // consolidated value. Easy!!
1273 let p_stats = self
1274 .inner
1275 .multi_put(self.consolidate_upsert_scratch.drain(..).map(|(k, v)| {
1276 (
1277 k,
1278 PutValue {
1279 value: v.value,
1280 previous_value_metadata: v.metadata.map(|v| ValueMetadata {
1281 size: v.size.try_into().expect("less than i64 size"),
1282 is_tombstone: v.is_tombstone,
1283 }),
1284 },
1285 )
1286 }))
1287 .await?;
1288
1289 stats.size_diff = p_stats.size_diff;
1290 }
1291
1292 Ok(stats)
1293 }
1294
1295 /// Insert or delete for all `puts` keys, prioritizing the last value for
1296 /// repeated keys.
1297 pub async fn multi_put<P>(
1298 &mut self,
1299 update_per_record_stats: bool,
1300 puts: P,
1301 ) -> Result<(), anyhow::Error>
1302 where
1303 P: IntoIterator<Item = (UpsertKey, PutValue<Value<T, O>>)>,
1304 {
1305 fail::fail_point!("fail_state_multi_put", |_| {
1306 Err(anyhow::anyhow!("Error putting values into state"))
1307 });
1308 let now = Instant::now();
1309 let stats = self
1310 .inner
1311 .multi_put(puts.into_iter().map(|(k, pv)| {
1312 (
1313 k,
1314 PutValue {
1315 value: pv.value.map(StateValue::Value),
1316 previous_value_metadata: pv.previous_value_metadata,
1317 },
1318 )
1319 }))
1320 .await?;
1321
1322 self.metrics
1323 .multi_put_latency
1324 .observe(now.elapsed().as_secs_f64());
1325 self.worker_metrics
1326 .multi_put_size
1327 .inc_by(stats.processed_puts);
1328
1329 if update_per_record_stats {
1330 self.worker_metrics.upsert_inserts.inc_by(stats.inserts);
1331 self.worker_metrics.upsert_updates.inc_by(stats.updates);
1332 self.worker_metrics.upsert_deletes.inc_by(stats.deletes);
1333
1334 self.stats.update_bytes_indexed_by(stats.size_diff);
1335 self.stats.update_records_indexed_by(stats.values_diff);
1336 self.stats
1337 .update_envelope_state_tombstones_by(stats.tombstones_diff);
1338 }
1339
1340 Ok(())
1341 }
1342
1343 /// Get the `gets` keys, which must be unique, placing the results in `results_out`.
1344 ///
1345 /// Panics if `gets` and `results_out` are not the same length.
1346 pub async fn multi_get<'r, G, R>(
1347 &mut self,
1348 gets: G,
1349 results_out: R,
1350 ) -> Result<(), anyhow::Error>
1351 where
1352 G: IntoIterator<Item = UpsertKey>,
1353 R: IntoIterator<Item = &'r mut UpsertValueAndSize<T, O>>,
1354 O: 'r,
1355 {
1356 fail::fail_point!("fail_state_multi_get", |_| {
1357 Err(anyhow::anyhow!("Error getting values from state"))
1358 });
1359 let now = Instant::now();
1360 let stats = self.inner.multi_get(gets, results_out).await?;
1361
1362 self.metrics
1363 .multi_get_latency
1364 .observe(now.elapsed().as_secs_f64());
1365 self.worker_metrics
1366 .multi_get_size
1367 .inc_by(stats.processed_gets);
1368 self.worker_metrics
1369 .multi_get_result_count
1370 .inc_by(stats.returned_gets);
1371 self.worker_metrics
1372 .multi_get_result_bytes
1373 .inc_by(stats.processed_gets_size);
1374
1375 Ok(())
1376 }
1377}
1378
1379#[cfg(test)]
1380mod tests {
1381 use mz_repr::Row;
1382
1383 use super::*;
1384 #[mz_ore::test]
1385 fn test_merge_update() {
1386 let mut buf = Vec::new();
1387 let opts = upsert_bincode_opts();
1388
1389 let mut s = StateValue::<(), ()>::Consolidating(Consolidating::default());
1390
1391 let small_row = Ok(Row::default());
1392 let longer_row = Ok(Row::pack([mz_repr::Datum::Null]));
1393 s.merge_update(small_row, Diff::ONE, opts, &mut buf);
1394 s.merge_update(longer_row.clone(), Diff::MINUS_ONE, opts, &mut buf);
1395 // This clears the retraction of the `longer_row`, but the
1396 // `value_xor` is the length of the `longer_row`. This tests
1397 // that we are tracking checksums correctly.
1398 s.merge_update(longer_row, Diff::ONE, opts, &mut buf);
1399
1400 // Assert that the `Consolidating` value is fully merged.
1401 s.ensure_decoded(opts, GlobalId::User(1), None);
1402 }
1403
1404 // We guard some of our assumptions. Increasing in-memory size of StateValue
1405 // has a direct impact on memory usage of in-memory UPSERT sources.
1406 #[mz_ore::test]
1407 fn test_memory_size() {
1408 let finalized_value: StateValue<(), ()> = StateValue::finalized_value(Ok(Row::default()));
1409 assert!(
1410 finalized_value.memory_size() <= 64,
1411 "memory size is {}",
1412 finalized_value.memory_size(),
1413 );
1414
1415 let provisional_value_with_finalized_value: StateValue<(), ()> =
1416 finalized_value.into_provisional_value(Ok(Row::default()), (), ());
1417 assert!(
1418 provisional_value_with_finalized_value.memory_size() <= 64,
1419 "memory size is {}",
1420 provisional_value_with_finalized_value.memory_size(),
1421 );
1422
1423 let provisional_value_without_finalized_value: StateValue<(), ()> =
1424 StateValue::new_provisional_value(Ok(Row::default()), (), ());
1425 assert!(
1426 provisional_value_without_finalized_value.memory_size() <= 64,
1427 "memory size is {}",
1428 provisional_value_without_finalized_value.memory_size(),
1429 );
1430
1431 let mut consolidating_value: StateValue<(), ()> = StateValue::default();
1432 consolidating_value.merge_update(
1433 Ok(Row::default()),
1434 Diff::ONE,
1435 upsert_bincode_opts(),
1436 &mut Vec::new(),
1437 );
1438 assert!(
1439 consolidating_value.memory_size() <= 66,
1440 "memory size is {}",
1441 consolidating_value.memory_size(),
1442 );
1443 }
1444
1445 #[mz_ore::test]
1446 #[should_panic(
1447 expected = "invalid upsert state: len_sum is non-0, state: Consolidating { len_sum: 1"
1448 )]
1449 fn test_merge_update_len_0_assert() {
1450 let mut buf = Vec::new();
1451 let opts = upsert_bincode_opts();
1452
1453 let mut s = StateValue::<(), ()>::Consolidating(Consolidating::default());
1454
1455 let small_row = Ok(mz_repr::Row::default());
1456 let longer_row = Ok(mz_repr::Row::pack([mz_repr::Datum::Null]));
1457 s.merge_update(longer_row.clone(), Diff::ONE, opts, &mut buf);
1458 s.merge_update(small_row.clone(), Diff::MINUS_ONE, opts, &mut buf);
1459
1460 s.ensure_decoded(opts, GlobalId::User(1), None);
1461 }
1462
1463 #[mz_ore::test]
1464 #[should_panic(
1465 expected = "invalid upsert state: \"value_xor is not the same length (3) as len (4), state: Consolidating { len_sum: 4"
1466 )]
1467 fn test_merge_update_len_to_long_assert() {
1468 let mut buf = Vec::new();
1469 let opts = upsert_bincode_opts();
1470
1471 let mut s = StateValue::<(), ()>::Consolidating(Consolidating::default());
1472
1473 let small_row = Ok(mz_repr::Row::default());
1474 let longer_row = Ok(mz_repr::Row::pack([mz_repr::Datum::Null]));
1475 s.merge_update(longer_row.clone(), Diff::ONE, opts, &mut buf);
1476 s.merge_update(small_row.clone(), Diff::MINUS_ONE, opts, &mut buf);
1477 s.merge_update(longer_row.clone(), Diff::ONE, opts, &mut buf);
1478
1479 s.ensure_decoded(opts, GlobalId::User(1), None);
1480 }
1481
1482 #[mz_ore::test]
1483 #[should_panic(expected = "invalid upsert state: checksum_sum does not match")]
1484 fn test_merge_update_checksum_doesnt_match() {
1485 let mut buf = Vec::new();
1486 let opts = upsert_bincode_opts();
1487
1488 let mut s = StateValue::<(), ()>::Consolidating(Consolidating::default());
1489
1490 let small_row = Ok(mz_repr::Row::pack([mz_repr::Datum::Int64(2)]));
1491 let longer_row = Ok(mz_repr::Row::pack([mz_repr::Datum::Int64(1)]));
1492 s.merge_update(longer_row.clone(), Diff::ONE, opts, &mut buf);
1493 s.merge_update(small_row.clone(), Diff::MINUS_ONE, opts, &mut buf);
1494 s.merge_update(longer_row.clone(), Diff::ONE, opts, &mut buf);
1495
1496 s.ensure_decoded(opts, GlobalId::User(1), None);
1497 }
1498}