mz_storage/upsert_continual_feedback.rs
1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Implementation of feedback UPSERT operator and associated helpers. See
11//! [`upsert_inner`] for a description of how the operator works and why.
12
13use std::cmp::Reverse;
14use std::fmt::Debug;
15use std::sync::Arc;
16
17use differential_dataflow::hashable::Hashable;
18use differential_dataflow::{AsCollection, VecCollection};
19use indexmap::map::Entry;
20use itertools::Itertools;
21use mz_repr::{Diff, GlobalId, Row};
22use mz_storage_types::errors::{DataflowError, EnvelopeError};
23use mz_timely_util::builder_async::{
24 Event as AsyncEvent, OperatorBuilder as AsyncOperatorBuilder, PressOnDropButton,
25};
26use std::convert::Infallible;
27use timely::container::CapacityContainerBuilder;
28use timely::dataflow::StreamVec;
29use timely::dataflow::channels::pact::Exchange;
30use timely::dataflow::operators::{Capability, CapabilitySet};
31use timely::order::{PartialOrder, TotalOrder};
32use timely::progress::timestamp::Refines;
33use timely::progress::{Antichain, Timestamp};
34
35use crate::healthcheck::HealthStatusUpdate;
36use crate::metrics::upsert::UpsertMetrics;
37use crate::upsert::UpsertConfig;
38use crate::upsert::UpsertErrorEmitter;
39use crate::upsert::UpsertKey;
40use crate::upsert::UpsertValue;
41use crate::upsert::types::UpsertValueAndSize;
42use crate::upsert::types::{self as upsert_types, ValueMetadata};
43use crate::upsert::types::{StateValue, UpsertState, UpsertStateBackend};
44
45/// An operator that transforms an input stream of upserts (updates to key-value
46/// pairs), which represents an imaginary key-value state, into a differential
47/// collection. It keeps an internal map-like state which keeps the latest value
48/// for each key, such that it can emit the retractions and additions implied by
49/// a new update for a given key.
50///
51/// This operator is intended to be used in an ingestion pipeline that reads
52/// from an external source, and the output of this operator is eventually
53/// written to persist.
54///
55/// The operator has two inputs: a) the source input, of upserts, and b) a
56/// persist input that feeds back the upsert state to the operator. Below, there
57/// is a section for each input that describes how and why we process updates
58/// from each input.
59///
60/// An important property of this operator is that it does _not_ update the
61/// map-like state that it keeps for translating the stream of upserts into a
62/// differential collection when it processes source input. It _only_ updates
63/// the map-like state based on updates from the persist (feedback) input. We do
64/// this because the operator is expected to be used in cases where there are
65/// multiple concurrent instances of the same ingestion pipeline, and the
66/// different instances might see different input because of concurrency and
67/// non-determinism. All instances of the upsert operator must produce output
68/// that is consistent with the current state of the output (that all instances
69/// produce "collaboratively"). This global state is what the operator
70/// continually learns about via updates from the persist input.
71///
72/// ## Processing the Source Input
73///
74/// Updates on the source input are stashed/staged until they can be processed.
75/// Whether or not an update can be processed depends both on the upper frontier
76/// of the source input and on the upper frontier of the persist input:
77///
78/// - Input updates are only processed once their timestamp is "done", that is
79/// the input upper is no longer `less_equal` their timestamp.
80///
81/// - Input updates are only processed once they are at the persist upper, that
82/// is we have emitted and written down updates for all previous times and we
83/// have updated our map-like state to the latest global state of the output of
84/// the ingestion pipeline. We know this is the case when the persist upper is
85/// no longer `less_than` their timestamp.
86///
87/// As an optimization, we allow processing input updates when they are right at
88/// the input frontier. This is called _partial emission_ because we are
89/// emitting updates that might be retracted when processing more updates from
90/// the same timestamp. In order to be able to process these updates we keep
91/// _provisional values_ in our upsert state. These will be overwritten when we
92/// get the final upsert values on the persist input.
93///
94/// ## Processing the Persist Input
95///
96/// We continually ingest updates from the persist input into our state using
97/// `UpsertState::consolidate_chunk`. We might be ingesting updates from the
98/// initial snapshot (when starting the operator) that are not consolidated or
99/// we might be ingesting updates from a partial emission (see above). In either
100/// case, our input might not be consolidated and `consolidate_chunk` is able to
101/// handle that.
102pub fn upsert_inner<'scope, T, FromTime, F, Fut, US>(
103 input: VecCollection<'scope, T, (UpsertKey, Option<UpsertValue>, FromTime), Diff>,
104 key_indices: Vec<usize>,
105 resume_upper: Antichain<T>,
106 persist_input: VecCollection<'scope, T, Result<Row, DataflowError>, Diff>,
107 mut persist_token: Option<Vec<PressOnDropButton>>,
108 upsert_metrics: UpsertMetrics,
109 source_config: crate::source::SourceExportCreationConfig,
110 state_fn: F,
111 upsert_config: UpsertConfig,
112 prevent_snapshot_buffering: bool,
113 snapshot_buffering_max: Option<usize>,
114) -> (
115 VecCollection<'scope, T, Result<Row, DataflowError>, Diff>,
116 StreamVec<'scope, T, (Option<GlobalId>, HealthStatusUpdate)>,
117 StreamVec<'scope, T, Infallible>,
118 PressOnDropButton,
119)
120where
121 T: Timestamp + Refines<mz_repr::Timestamp> + TotalOrder + Sync,
122 F: FnOnce() -> Fut + 'static,
123 Fut: std::future::Future<Output = US>,
124 US: UpsertStateBackend<T, FromTime>,
125 FromTime: Debug + timely::ExchangeData + Clone + Ord + Sync,
126{
127 let mut builder = AsyncOperatorBuilder::new("Upsert".to_string(), input.scope());
128
129 // We only care about UpsertValueError since this is the only error that we can retract
130 let persist_input = persist_input.flat_map(move |result| {
131 let value = match result {
132 Ok(ok) => Ok(ok),
133 Err(DataflowError::EnvelopeError(err)) => match *err {
134 EnvelopeError::Upsert(err) => Err(Box::new(err)),
135 EnvelopeError::Flat(_) => return None,
136 },
137 Err(_) => return None,
138 };
139 let value_ref = match value {
140 Ok(ref row) => Ok(row),
141 Err(ref err) => Err(&**err),
142 };
143 Some((UpsertKey::from_value(value_ref, &key_indices), value))
144 });
145 let (output_handle, output) = builder.new_output::<CapacityContainerBuilder<_>>();
146
147 // An output that just reports progress of the snapshot consolidation process upstream to the
148 // persist source to ensure that backpressure is applied
149 let (_snapshot_handle, snapshot_stream) =
150 builder.new_output::<CapacityContainerBuilder<Vec<Infallible>>>();
151
152 let (mut health_output, health_stream) = builder.new_output();
153 let mut input = builder.new_input_for(
154 input.inner,
155 Exchange::new(move |((key, _, _), _, _)| UpsertKey::hashed(key)),
156 &output_handle,
157 );
158
159 let mut persist_input = builder.new_disconnected_input(
160 persist_input.inner,
161 Exchange::new(|((key, _), _, _)| UpsertKey::hashed(key)),
162 );
163
164 let upsert_shared_metrics = Arc::clone(&upsert_metrics.shared);
165
166 let shutdown_button = builder.build(move |caps| async move {
167 let [output_cap, snapshot_cap, health_cap]: [_; 3] = caps.try_into().unwrap();
168 drop(output_cap);
169 let mut snapshot_cap = CapabilitySet::from_elem(snapshot_cap);
170
171 let mut state = UpsertState::<_, T, FromTime>::new(
172 state_fn().await,
173 upsert_shared_metrics,
174 &upsert_metrics,
175 source_config.source_statistics.clone(),
176 upsert_config.shrink_upsert_unused_buffers_by_ratio,
177 );
178
179 // True while we're still reading the initial "snapshot" (a whole bunch
180 // of updates, all at the same initial timestamp) from our persist
181 // input or while we're reading the initial snapshot from the upstream
182 // source.
183 let mut hydrating = true;
184
185 // A re-usable buffer of changes, per key. This is an `IndexMap`
186 // because it has to be `drain`-able and have a consistent iteration
187 // order.
188 let mut commands_state: indexmap::IndexMap<
189 _,
190 upsert_types::UpsertValueAndSize<T, FromTime>,
191 > = indexmap::IndexMap::new();
192 let mut multi_get_scratch = Vec::new();
193
194 // For stashing source input while it's not eligible for processing.
195 let mut stash = vec![];
196 // A capability suitable for emitting any updates based on stash. No capability is held
197 // when the stash is empty.
198 let mut stash_cap: Option<Capability<T>> = None;
199 let mut input_upper = Antichain::from_elem(Timestamp::minimum());
200 let mut partial_drain_time = None;
201
202 // For our persist/feedback input, both of these.
203 let mut persist_stash = vec![];
204 let mut persist_upper = Antichain::from_elem(Timestamp::minimum());
205
206 // We keep track of the largest timestamp seen on the persist input so
207 // that we can block processing source input while that timestamp is
208 // beyond the persist frontier. While ingesting updates of a timestamp,
209 // our upsert state is in a consolidating state, and trying to read it
210 // at that time would yield a panic.
211 //
212 // NOTE(aljoscha): You would think that it cannot happen that we even
213 // attempt to process source updates while the state is in a
214 // consolidating state, because we always wait until the persist
215 // frontier "catches up" with the timestamp of the source input. If
216 // there is only this here UPSERT operator and no concurrent instances,
217 // this is true. But with concurrent instances it can happen that an
218 // operator that is faster than us makes it so updates get written to
219 // persist. And we would then be ingesting them.
220 let mut largest_seen_persist_ts: Option<T> = None;
221
222 // A buffer for our output.
223 let mut output_updates = vec![];
224
225 let mut error_emitter = (&mut health_output, &health_cap);
226
227 loop {
228 tokio::select! {
229 _ = persist_input.ready() => {
230 // Read away as much input as we can.
231 while let Some(persist_event) = persist_input.next_sync() {
232 match persist_event {
233 AsyncEvent::Data(time, data) => {
234 tracing::trace!(
235 worker_id = %source_config.worker_id,
236 source_id = %source_config.id,
237 time=?time,
238 updates=%data.len(),
239 "received persist data");
240
241 persist_stash.extend(data.into_iter().map(
242 |((key, value), ts, diff)| {
243 largest_seen_persist_ts =
244 std::cmp::max(
245 largest_seen_persist_ts
246 .clone(),
247 Some(ts.clone()),
248 );
249 (key, value, ts, diff)
250 },
251 ));
252 }
253 AsyncEvent::Progress(upper) => {
254 tracing::trace!(
255 worker_id = %source_config.worker_id,
256 source_id = %source_config.id,
257 ?upper,
258 "received persist progress");
259 persist_upper = upper;
260 }
261 }
262 }
263
264 let last_rehydration_chunk =
265 hydrating && PartialOrder::less_equal(&resume_upper, &persist_upper);
266
267 tracing::debug!(
268 worker_id = %source_config.worker_id,
269 source_id = %source_config.id,
270 persist_stash = %persist_stash.len(),
271 %hydrating,
272 %last_rehydration_chunk,
273 ?resume_upper,
274 ?persist_upper,
275 "ingesting persist snapshot chunk");
276
277 // Log any (key, ts) pairs in this batch that have a suspicious
278 // net diff, to help diagnose how diff_sum corruption enters the
279 // system.
280 //
281 // Consolidating by key alone is too noisy during hydration,
282 // because a single batch can legitimately contain multiple
283 // timestamps for the same key. The suspicious shape for this
284 // bug is multiple net updates for the same key at one logical
285 // timestamp.
286 {
287 let mut key_ts_diffs: Vec<(
288 (UpsertKey, T),
289 mz_repr::Diff
290 )> = persist_stash
291 .iter()
292 .map(|(key, _val, ts, diff)| ((*key, ts.clone()), *diff))
293 .collect();
294 differential_dataflow::consolidation::consolidate(&mut key_ts_diffs);
295 for ((key, ts), net_diff) in &key_ts_diffs {
296 if net_diff.into_inner() > 1 || net_diff.into_inner() < -1 {
297 tracing::warn!(
298 worker_id = %source_config.worker_id,
299 source_id = %source_config.id,
300 ?key,
301 ?ts,
302 net_diff = net_diff.into_inner(),
303 %hydrating,
304 ?persist_upper,
305 "persist feedback batch has (key, ts) with suspicious net diff \
306 (expected -1, 0, or 1 after per-(key, ts) consolidation)"
307 );
308 }
309 }
310 }
311
312 let persist_stash_iter = persist_stash
313 .drain(..)
314 .map(|(key, val, _ts, diff)| (key, val, diff));
315
316 match state
317 .consolidate_chunk(
318 persist_stash_iter,
319 last_rehydration_chunk,
320 )
321 .await
322 {
323 Ok(_) => {}
324 Err(e) => {
325 // Make sure our persist source can shut down.
326 persist_token.take();
327 snapshot_cap.downgrade(&[]);
328 UpsertErrorEmitter::<T>::emit(
329 &mut error_emitter,
330 "Failed to rehydrate state".to_string(),
331 e,
332 )
333 .await;
334 }
335 }
336
337 tracing::debug!(
338 worker_id = %source_config.worker_id,
339 source_id = %source_config.id,
340 ?resume_upper,
341 ?persist_upper,
342 "downgrading snapshot cap",
343 );
344
345 // Only downgrade this _after_ ingesting the data, because
346 // that can actually take quite some time, and we don't want
347 // to announce that we're done ingesting the initial
348 // snapshot too early.
349 //
350 // When we finish ingesting our initial persist snapshot,
351 // during "re-hydration", we downgrade this to the empty
352 // frontier, so we need to be lenient to this failing from
353 // then on.
354 let _ = snapshot_cap.try_downgrade(persist_upper.iter());
355
356
357
358 if last_rehydration_chunk {
359 hydrating = false;
360
361 tracing::info!(
362 worker_id = %source_config.worker_id,
363 source_id = %source_config.id,
364 "upsert source finished rehydration",
365 );
366
367 snapshot_cap.downgrade(&[]);
368 }
369
370 }
371 _ = input.ready() => {
372 let mut events_processed = 0;
373 while let Some(event) = input.next_sync() {
374 match event {
375 AsyncEvent::Data(cap, mut data) => {
376 tracing::trace!(
377 worker_id = %source_config.worker_id,
378 source_id = %source_config.id,
379 time=?cap.time(),
380 updates=%data.len(),
381 "received data");
382
383 let event_time = cap.time().clone();
384
385 stage_input(
386 &mut stash,
387 &mut data,
388 &input_upper,
389 &resume_upper,
390 );
391 if !stash.is_empty() {
392 // Update the stashed capability to the minimum
393 stash_cap = match stash_cap {
394 Some(stash_cap) => {
395 if cap.time() < stash_cap.time() {
396 Some(cap)
397 } else {
398 Some(stash_cap)
399 }
400 }
401 None => Some(cap)
402 };
403 }
404
405 if prevent_snapshot_buffering
406 && input_upper.as_option()
407 == Some(&event_time)
408 {
409 tracing::debug!(
410 worker_id = %source_config.worker_id,
411 source_id = %source_config.id,
412 ?event_time,
413 ?resume_upper,
414 ?input_upper,
415 "allowing partial drain");
416 partial_drain_time = Some(event_time.clone());
417 } else {
418 tracing::debug!(
419 worker_id = %source_config.worker_id,
420 source_id = %source_config.id,
421 %prevent_snapshot_buffering,
422 ?event_time,
423 ?resume_upper,
424 ?input_upper,
425 "not allowing partial drain");
426 }
427 }
428 AsyncEvent::Progress(upper) => {
429 tracing::trace!(
430 worker_id = %source_config.worker_id,
431 source_id = %source_config.id,
432 ?upper,
433 "received progress");
434
435 // Ignore progress updates before the `resume_upper`, which is our initial
436 // capability post-snapshotting.
437 if PartialOrder::less_than(&upper, &resume_upper) {
438 tracing::trace!(
439 worker_id = %source_config.worker_id,
440 source_id = %source_config.id,
441 ?upper,
442 ?resume_upper,
443 "ignoring progress updates before resume_upper");
444 continue;
445 }
446
447 // Disable partial drain, because this progress
448 // update has moved the frontier. We might allow
449 // it again once we receive data right at the
450 // frontier again.
451 partial_drain_time = None;
452 input_upper = upper;
453 }
454 }
455
456 events_processed += 1;
457 if let Some(max) = snapshot_buffering_max {
458 if events_processed >= max {
459 break;
460 }
461 }
462 }
463 }
464 };
465
466 // While we have partially ingested updates of a timestamp our state
467 // is in an inconsistent/consolidating state and accessing it would
468 // panic.
469 if let Some(largest_seen_persist_ts) = largest_seen_persist_ts.as_ref() {
470 let largest_seen_outer_persist_ts = largest_seen_persist_ts.clone().to_outer();
471 let outer_persist_upper = persist_upper.iter().map(|ts| ts.clone().to_outer());
472 let outer_persist_upper = Antichain::from_iter(outer_persist_upper);
473 if outer_persist_upper.less_equal(&largest_seen_outer_persist_ts) {
474 continue;
475 }
476 }
477
478 // We try and drain from our stash every time we go through the
479 // loop. More of our stash can become eligible for draining both
480 // when the source-input frontier advances or when the persist
481 // frontier advances.
482 if !stash.is_empty() {
483 let cap = stash_cap
484 .as_mut()
485 .expect("missing capability for non-empty stash");
486
487 tracing::trace!(
488 worker_id = %source_config.worker_id,
489 source_id = %source_config.id,
490 ?cap,
491 ?stash,
492 "stashed updates");
493
494 let mut min_remaining_time = drain_staged_input::<_, _, _, _>(
495 &mut stash,
496 &mut commands_state,
497 &mut output_updates,
498 &mut multi_get_scratch,
499 DrainStyle::ToUpper {
500 input_upper: &input_upper,
501 persist_upper: &persist_upper,
502 },
503 &mut error_emitter,
504 &mut state,
505 &source_config,
506 )
507 .await;
508
509 tracing::trace!(
510 worker_id = %source_config.worker_id,
511 source_id = %source_config.id,
512 output_updates = %output_updates.len(),
513 "output updates for complete timestamp");
514
515 for (update, ts, diff) in output_updates.drain(..) {
516 output_handle.give(cap, (update, ts, diff));
517 }
518
519 if !stash.is_empty() {
520 let min_remaining_time = min_remaining_time
521 .take()
522 .expect("we still have updates left");
523 cap.downgrade(&min_remaining_time);
524 } else {
525 stash_cap = None;
526 }
527 }
528
529 if input_upper.is_empty() {
530 tracing::debug!(
531 worker_id = %source_config.worker_id,
532 source_id = %source_config.id,
533 "input exhausted, shutting down");
534 break;
535 };
536
537 // If there were staged events that occurred at the capability time, drain
538 // them. This is safe because out-of-order updates to the same key that are
539 // drained in separate calls to `drain_staged_input` are correctly ordered by
540 // their `FromTime` in `drain_staged_input`.
541 //
542 // Note also that this may result in more updates in the output collection than
543 // the minimum. However, because the frontier only advances on `Progress` updates,
544 // the collection always accumulates correctly for all keys.
545 if let Some(partial_drain_time) = &partial_drain_time {
546 if !stash.is_empty() {
547 let cap = stash_cap
548 .as_mut()
549 .expect("missing capability for non-empty stash");
550
551 tracing::trace!(
552 worker_id = %source_config.worker_id,
553 source_id = %source_config.id,
554 ?cap,
555 ?stash,
556 "stashed updates");
557
558 let mut min_remaining_time = drain_staged_input::<_, _, _, _>(
559 &mut stash,
560 &mut commands_state,
561 &mut output_updates,
562 &mut multi_get_scratch,
563 DrainStyle::AtTime {
564 time: partial_drain_time.clone(),
565 persist_upper: &persist_upper,
566 },
567 &mut error_emitter,
568 &mut state,
569 &source_config,
570 )
571 .await;
572
573 tracing::trace!(
574 worker_id = %source_config.worker_id,
575 source_id = %source_config.id,
576 output_updates = %output_updates.len(),
577 "output updates for partial timestamp");
578
579 for (update, ts, diff) in output_updates.drain(..) {
580 output_handle.give(cap, (update, ts, diff));
581 }
582
583 if !stash.is_empty() {
584 let min_remaining_time = min_remaining_time
585 .take()
586 .expect("we still have updates left");
587 cap.downgrade(&min_remaining_time);
588 } else {
589 stash_cap = None;
590 }
591 }
592 }
593 }
594 });
595
596 (
597 output
598 .as_collection()
599 .map(|result: UpsertValue| match result {
600 Ok(ok) => Ok(ok),
601 Err(err) => Err(DataflowError::from(EnvelopeError::Upsert(*err))),
602 }),
603 health_stream,
604 snapshot_stream,
605 shutdown_button.press_on_drop(),
606 )
607}
608
609/// Helper method for [`upsert_inner`] used to stage `data` updates
610/// from the input/source timely edge.
611#[allow(clippy::disallowed_types)]
612fn stage_input<T, FromTime>(
613 stash: &mut Vec<(T, UpsertKey, Reverse<FromTime>, Option<UpsertValue>)>,
614 data: &mut Vec<((UpsertKey, Option<UpsertValue>, FromTime), T, Diff)>,
615 input_upper: &Antichain<T>,
616 resume_upper: &Antichain<T>,
617) where
618 T: PartialOrder + timely::progress::Timestamp,
619 FromTime: Ord,
620{
621 if PartialOrder::less_equal(input_upper, resume_upper) {
622 data.retain(|(_, ts, _)| resume_upper.less_equal(ts));
623 }
624
625 stash.extend(data.drain(..).map(|((key, value, order), time, diff)| {
626 assert!(diff.is_positive(), "invalid upsert input");
627 (time, key, Reverse(order), value)
628 }));
629}
630
631/// The style of drain we are performing on the stash. `AtTime`-drains cannot
632/// assume that all values have been seen, and must leave tombstones behind for deleted values.
633#[derive(Debug)]
634enum DrainStyle<'a, T> {
635 ToUpper {
636 input_upper: &'a Antichain<T>,
637 persist_upper: &'a Antichain<T>,
638 },
639 // For partial draining when taking the source snapshot.
640 AtTime {
641 time: T,
642 persist_upper: &'a Antichain<T>,
643 },
644}
645
646/// Helper method for [`upsert_inner`] used to stage `data` updates
647/// from the input timely edge.
648///
649/// Returns the minimum observed time across the updates that remain in the
650/// stash or `None` if none are left.
651///
652/// ## Correctness
653///
654/// It is safe to call this function multiple times with the same `persist_upper` provided that the
655/// drain style is `AtTime`, which updates the state such that past actions are remembered and can
656/// be undone in subsequent calls.
657///
658/// It is *not* safe to call this function more than once with the same `persist_upper` and a
659/// `ToUpper` drain style. Doing so causes all calls except the first one to base their work on
660/// stale state, since in this drain style no modifications to the state are made.
661async fn drain_staged_input<S, T, FromTime, E>(
662 stash: &mut Vec<(T, UpsertKey, Reverse<FromTime>, Option<UpsertValue>)>,
663 commands_state: &mut indexmap::IndexMap<UpsertKey, UpsertValueAndSize<T, FromTime>>,
664 output_updates: &mut Vec<(UpsertValue, T, Diff)>,
665 multi_get_scratch: &mut Vec<UpsertKey>,
666 drain_style: DrainStyle<'_, T>,
667 error_emitter: &mut E,
668 state: &mut UpsertState<'_, S, T, FromTime>,
669 source_config: &crate::source::SourceExportCreationConfig,
670) -> Option<T>
671where
672 S: UpsertStateBackend<T, FromTime>,
673 T: Timestamp + TotalOrder + timely::ExchangeData + Clone + Debug + Ord + Sync,
674 FromTime: timely::ExchangeData + Clone + Ord + Sync,
675 E: UpsertErrorEmitter<T>,
676{
677 let mut min_remaining_time = Antichain::new();
678
679 let mut eligible_updates = stash
680 .extract_if(.., |(ts, _, _, _)| {
681 let eligible = match &drain_style {
682 DrainStyle::ToUpper {
683 input_upper,
684 persist_upper,
685 } => {
686 // We make sure that a) we only process updates when we know their
687 // timestamp is complete, that is there will be no more updates for
688 // that timestamp, and b) that "previous" times in the persist
689 // input are complete. The latter makes sure that we emit updates
690 // for the next timestamp that are consistent with the global state
691 // in the output persist shard, which also serves as a persistent
692 // copy of our in-memory/on-disk upsert state.
693 !input_upper.less_equal(ts) && !persist_upper.less_than(ts)
694 }
695 DrainStyle::AtTime {
696 time,
697 persist_upper,
698 } => {
699 // Even when emitting partial updates, we still need to wait
700 // until "previous" times in the persist input are complete.
701 *ts <= *time && !persist_upper.less_than(ts)
702 }
703 };
704
705 if !eligible {
706 min_remaining_time.insert(ts.clone());
707 }
708
709 eligible
710 })
711 .filter(|(ts, _, _, _)| {
712 let persist_upper = match &drain_style {
713 DrainStyle::ToUpper {
714 input_upper: _,
715 persist_upper,
716 } => persist_upper,
717 DrainStyle::AtTime {
718 time: _,
719 persist_upper,
720 } => persist_upper,
721 };
722
723 // Any update that is "in the past" of the persist upper is not
724 // relevant anymore. We _can_ emit changes for it, but the
725 // downstream persist_sink would filter these updates out because
726 // the shard upper is already further ahead.
727 //
728 // Plus, our upsert state is up-to-date to the persist_upper, so we
729 // wouldn't be able to emit correct retractions for incoming
730 // commands whose `ts` is in the past of that.
731 let relevant = persist_upper.less_equal(ts);
732 relevant
733 })
734 .collect_vec();
735
736 tracing::debug!(
737 worker_id = %source_config.worker_id,
738 source_id = %source_config.id,
739 ?drain_style,
740 remaining = %stash.len(),
741 eligible = eligible_updates.len(),
742 "draining stash");
743
744 // Sort the eligible updates by (key, time, Reverse(from_time)) so that
745 // deduping by (key, time) gives the latest change for that key.
746 eligible_updates.sort_unstable_by(|a, b| {
747 let (ts1, key1, from_ts1, val1) = a;
748 let (ts2, key2, from_ts2, val2) = b;
749 Ord::cmp(&(ts1, key1, from_ts1, val1), &(ts2, key2, from_ts2, val2))
750 });
751
752 // Read the previous values _per key_ out of `state`, recording it
753 // along with the value with the _latest timestamp for that key_.
754 commands_state.clear();
755 for (_, key, _, _) in eligible_updates.iter() {
756 commands_state.entry(*key).or_default();
757 }
758
759 // These iterators iterate in the same order because `commands_state`
760 // is an `IndexMap`.
761 multi_get_scratch.clear();
762 multi_get_scratch.extend(commands_state.iter().map(|(k, _)| *k));
763 match state
764 .multi_get(multi_get_scratch.drain(..), commands_state.values_mut())
765 .await
766 {
767 Ok(_) => {}
768 Err(e) => {
769 error_emitter
770 .emit("Failed to fetch records from state".to_string(), e)
771 .await;
772 }
773 }
774
775 // From the prefix that can be emitted we can deduplicate based on (ts, key) in
776 // order to only process the command with the maximum order within the (ts,
777 // key) group. This is achieved by wrapping order in `Reverse(FromTime)` above.;
778 let mut commands = eligible_updates.into_iter().dedup_by(|a, b| {
779 let ((a_ts, a_key, _, _), (b_ts, b_key, _, _)) = (a, b);
780 a_ts == b_ts && a_key == b_key
781 });
782
783 let bincode_opts = upsert_types::upsert_bincode_opts();
784 // Upsert the values into `commands_state`, by recording the latest
785 // value (or deletion). These will be synced at the end to the `state`.
786 //
787 // Note that we are effectively doing "mini-upsert" here, using
788 // `command_state`. This "mini-upsert" is seeded with data from `state`, using
789 // a single `multi_get` above, and the final state is written out into
790 // `state` using a single `multi_put`. This simplifies `UpsertStateBackend`
791 // implementations, and reduces the number of reads and write we need to do.
792 //
793 // This "mini-upsert" technique is actually useful in `UpsertState`'s
794 // `consolidate_snapshot_read_write_inner` implementation, minimizing gets and puts on
795 // the `UpsertStateBackend` implementations. In some sense, its "upsert all the way down".
796 while let Some((ts, key, from_time, value)) = commands.next() {
797 let mut command_state = if let Entry::Occupied(command_state) = commands_state.entry(key) {
798 command_state
799 } else {
800 panic!("key missing from commands_state");
801 };
802
803 let existing_state_cell = &mut command_state.get_mut().value;
804
805 if let Some(cs) = existing_state_cell.as_mut() {
806 cs.ensure_decoded(bincode_opts, source_config.id, Some(&key));
807 }
808
809 // Skip this command if its order key is below the one in the upsert state.
810 // Note that the existing order key may be `None` if the existing value
811 // is from snapshotting, which always sorts below new values/deletes.
812 let existing_order = existing_state_cell
813 .as_ref()
814 .and_then(|cs| cs.provisional_order(&ts));
815 if existing_order >= Some(&from_time.0) {
816 // Skip this update. If no later updates adjust this key, then we just
817 // end up writing the same value back to state. If there
818 // is nothing in the state, `existing_order` is `None`, and this
819 // does not occur.
820 continue;
821 }
822
823 match value {
824 Some(value) => {
825 if let Some(old_value) = existing_state_cell.as_ref() {
826 if let Some(old_value) = old_value.provisional_value_ref(&ts) {
827 output_updates.push((old_value.clone(), ts.clone(), Diff::MINUS_ONE));
828 }
829 }
830
831 match &drain_style {
832 DrainStyle::AtTime { .. } => {
833 let existing_value = existing_state_cell.take();
834
835 let new_value = match existing_value {
836 Some(existing_value) => existing_value.clone().into_provisional_value(
837 value.clone(),
838 ts.clone(),
839 from_time.0.clone(),
840 ),
841 None => StateValue::new_provisional_value(
842 value.clone(),
843 ts.clone(),
844 from_time.0.clone(),
845 ),
846 };
847
848 existing_state_cell.replace(new_value);
849 }
850 DrainStyle::ToUpper { .. } => {
851 // Not writing down provisional values, or anything.
852 }
853 };
854
855 output_updates.push((value, ts, Diff::ONE));
856 }
857 None => {
858 if let Some(old_value) = existing_state_cell.as_ref() {
859 if let Some(old_value) = old_value.provisional_value_ref(&ts) {
860 output_updates.push((old_value.clone(), ts.clone(), Diff::MINUS_ONE));
861 }
862 }
863
864 match &drain_style {
865 DrainStyle::AtTime { .. } => {
866 let existing_value = existing_state_cell.take();
867
868 let new_value = match existing_value {
869 Some(existing_value) => existing_value
870 .into_provisional_tombstone(ts.clone(), from_time.0.clone()),
871 None => StateValue::new_provisional_tombstone(
872 ts.clone(),
873 from_time.0.clone(),
874 ),
875 };
876
877 existing_state_cell.replace(new_value);
878 }
879 DrainStyle::ToUpper { .. } => {
880 // Not writing down provisional values, or anything.
881 }
882 }
883 }
884 }
885 }
886
887 match &drain_style {
888 DrainStyle::AtTime { .. } => {
889 match state
890 .multi_put(
891 // We don't want to update per-record stats, like size of
892 // records indexed or count of records indexed.
893 //
894 // We only add provisional values and these will be
895 // overwritten once we receive updates for state from the
896 // persist input. And the merge functionality cannot know
897 // what was in state before merging, so it cannot correctly
898 // retract/update stats added here.
899 //
900 // Mostly, the merge functionality can't update those stats
901 // because merging happens in a function that we pass to
902 // rocksdb which doesn't have access to any external
903 // context. And in general, with rocksdb we do blind writes
904 // rather than inspect what was there before when
905 // updating/inserting.
906 false,
907 commands_state.drain(..).map(|(k, cv)| {
908 (
909 k,
910 upsert_types::PutValue {
911 value: cv.value.map(|cv| cv.into_decoded()),
912 previous_value_metadata: cv.metadata.map(|v| ValueMetadata {
913 size: v.size.try_into().expect("less than i64 size"),
914 is_tombstone: v.is_tombstone,
915 }),
916 },
917 )
918 }),
919 )
920 .await
921 {
922 Ok(_) => {}
923 Err(e) => {
924 error_emitter
925 .emit("Failed to update records in state".to_string(), e)
926 .await;
927 }
928 }
929 }
930 style @ DrainStyle::ToUpper { .. } => {
931 tracing::trace!(
932 worker_id = %source_config.worker_id,
933 source_id = %source_config.id,
934 "not doing state update for drain style {:?}", style);
935 }
936 }
937
938 min_remaining_time.into_option()
939}
940
941#[cfg(test)]
942mod test {
943 use std::sync::mpsc;
944
945 use mz_ore::metrics::MetricsRegistry;
946 use mz_persist_types::ShardId;
947 use mz_repr::{Datum, Timestamp as MzTimestamp};
948 use mz_rocksdb::{RocksDBConfig, ValueIterator};
949 use mz_storage_operators::persist_source::Subtime;
950 use mz_storage_types::sources::SourceEnvelope;
951 use mz_storage_types::sources::envelope::{KeyEnvelope, UpsertEnvelope, UpsertStyle};
952 use rocksdb::Env;
953 use timely::dataflow::operators::capture::Extract;
954 use timely::dataflow::operators::{Capture, Input, Probe};
955 use timely::progress::Timestamp;
956
957 use crate::metrics::StorageMetrics;
958 use crate::metrics::upsert::UpsertMetricDefs;
959 use crate::source::SourceExportCreationConfig;
960 use crate::statistics::{SourceStatistics, SourceStatisticsMetricDefs};
961 use crate::upsert::memory::InMemoryHashMap;
962 use crate::upsert::types::{BincodeOpts, consolidating_merge_function, upsert_bincode_opts};
963
964 use super::*;
965
966 #[mz_ore::test]
967 #[cfg_attr(miri, ignore)]
968 fn gh_9160_repro() {
969 // Helper to wrap timestamps in the appropriate types
970 let new_ts = |ts| (MzTimestamp::new(ts), Subtime::minimum());
971
972 let output_handle = timely::execute_directly(move |worker| {
973 let (mut input_handle, mut persist_handle, output_handle) = worker
974 .dataflow::<MzTimestamp, _, _>(|scope| {
975 // Enter a subscope since the upsert operator expects to work a backpressure
976 // enabled scope.
977 scope.scoped::<(MzTimestamp, Subtime), _, _>("upsert", |scope| {
978 let (input_handle, input) = scope.new_input();
979 let (persist_handle, persist_input) = scope.new_input();
980 let upsert_config = UpsertConfig {
981 shrink_upsert_unused_buffers_by_ratio: 0,
982 };
983 let source_id = GlobalId::User(0);
984 let metrics_registry = MetricsRegistry::new();
985 let upsert_metrics_defs =
986 UpsertMetricDefs::register_with(&metrics_registry);
987 let upsert_metrics =
988 UpsertMetrics::new(&upsert_metrics_defs, source_id, 0, None);
989
990 let metrics_registry = MetricsRegistry::new();
991 let storage_metrics = StorageMetrics::register_with(&metrics_registry);
992
993 let metrics_registry = MetricsRegistry::new();
994 let source_statistics_defs =
995 SourceStatisticsMetricDefs::register_with(&metrics_registry);
996 let envelope = SourceEnvelope::Upsert(UpsertEnvelope {
997 source_arity: 2,
998 style: UpsertStyle::Default(KeyEnvelope::Flattened),
999 key_indices: vec![0],
1000 });
1001 let source_statistics = SourceStatistics::new(
1002 source_id,
1003 0,
1004 &source_statistics_defs,
1005 source_id,
1006 &ShardId::new(),
1007 envelope,
1008 Antichain::from_elem(Timestamp::minimum()),
1009 );
1010
1011 let source_config = SourceExportCreationConfig {
1012 id: GlobalId::User(0),
1013 worker_id: 0,
1014 metrics: storage_metrics,
1015 source_statistics,
1016 };
1017
1018 let (output, _, _, button) = upsert_inner(
1019 input.as_collection(),
1020 vec![0],
1021 Antichain::from_elem(Timestamp::minimum()),
1022 persist_input.as_collection(),
1023 None,
1024 upsert_metrics,
1025 source_config,
1026 || async { InMemoryHashMap::default() },
1027 upsert_config,
1028 true,
1029 None,
1030 );
1031 std::mem::forget(button);
1032
1033 (input_handle, persist_handle, output.inner.capture())
1034 })
1035 });
1036
1037 // We work with a hypothetical schema of (key int, value int).
1038
1039 // The input will contain records for two keys, 0 and 1.
1040 let key0 = UpsertKey::from_key(Ok(&Row::pack_slice(&[Datum::Int64(0)])));
1041 let key1 = UpsertKey::from_key(Ok(&Row::pack_slice(&[Datum::Int64(1)])));
1042
1043 // We will assume that the kafka topic contains the following messages with their
1044 // associated reclocked timestamp:
1045 // 1. {offset=1, key=0, value=0} @ mz_time = 0
1046 // 2. {offset=2, key=1, value=NULL} @ mz_time = 2 // <- deletion of unrelated key. Causes the operator
1047 // // to maintain the associated cap to time 2
1048 // 3. {offset=3, key=0, value=1} @ mz_time = 3
1049 // 4. {offset=4, key=0, value=2} @ mz_time = 3 // <- messages 2 and 3 are reclocked to time 3
1050 let value1 = Row::pack_slice(&[Datum::Int64(0), Datum::Int64(0)]);
1051 let value3 = Row::pack_slice(&[Datum::Int64(0), Datum::Int64(1)]);
1052 let value4 = Row::pack_slice(&[Datum::Int64(0), Datum::Int64(2)]);
1053 let msg1 = (key0, Some(Ok(value1.clone())), 1);
1054 let msg2 = (key1, None, 2);
1055 let msg3 = (key0, Some(Ok(value3)), 3);
1056 let msg4 = (key0, Some(Ok(value4)), 4);
1057
1058 // The first message will initialize the upsert state such that key 0 has value 0 and
1059 // produce an output update to that effect.
1060 input_handle.send((msg1, new_ts(0), Diff::ONE));
1061 input_handle.advance_to(new_ts(2));
1062 worker.step();
1063
1064 // We assume this worker succesfully CAAs the update to the shard so we send it back
1065 // through the persist_input
1066 persist_handle.send((Ok(value1), new_ts(0), Diff::ONE));
1067 persist_handle.advance_to(new_ts(1));
1068 worker.step();
1069
1070 // Then, messages 2 and 3 are sent as one batch with capability = 2
1071 input_handle.send_batch(&mut vec![
1072 (msg2, new_ts(2), Diff::ONE),
1073 (msg3, new_ts(3), Diff::ONE),
1074 ]);
1075 // Advance our capability to 3
1076 input_handle.advance_to(new_ts(3));
1077 // Message 4 is sent with capability 3
1078 input_handle.send_batch(&mut vec![(msg4, new_ts(3), Diff::ONE)]);
1079 // Advance our capability to 4
1080 input_handle.advance_to(new_ts(4));
1081 // We now step the worker so that the pending data is received. This causes the
1082 // operator to store internally the following map from capabilities to updates:
1083 // cap=2 => [ msg2, msg3 ]
1084 // cap=3 => [ msg4 ]
1085 worker.step();
1086
1087 // We now assume that another replica raced us and processed msg1 at time 2, which in
1088 // this test is a no-op so the persist frontier advances to time 3 without new data.
1089 persist_handle.advance_to(new_ts(3));
1090 // We now step this worker again, which will notice that the persist upper is {3} and
1091 // wlil attempt to process msg3 and msg4 *separately*, causing it to produce a double
1092 // retraction.
1093 worker.step();
1094
1095 output_handle
1096 });
1097
1098 let mut actual_output = output_handle
1099 .extract()
1100 .into_iter()
1101 .flat_map(|(_cap, container)| container)
1102 .collect();
1103 differential_dataflow::consolidation::consolidate_updates(&mut actual_output);
1104
1105 // The expected consolidated output contains only updates for key 0 which has the value 0
1106 // at timestamp 0 and the value 2 at timestamp 3
1107 let value1 = Row::pack_slice(&[Datum::Int64(0), Datum::Int64(0)]);
1108 let value4 = Row::pack_slice(&[Datum::Int64(0), Datum::Int64(2)]);
1109 let expected_output: Vec<(Result<Row, DataflowError>, _, _)> = vec![
1110 (Ok(value1.clone()), new_ts(0), Diff::ONE),
1111 (Ok(value1), new_ts(3), Diff::MINUS_ONE),
1112 (Ok(value4), new_ts(3), Diff::ONE),
1113 ];
1114 assert_eq!(actual_output, expected_output);
1115 }
1116
1117 #[mz_ore::test]
1118 #[cfg_attr(miri, ignore)]
1119 fn gh_9540_repro() {
1120 // Helper to wrap timestamps in the appropriate types
1121 let mz_ts = |ts| (MzTimestamp::new(ts), Subtime::minimum());
1122 let (tx, rx) = mpsc::channel::<std::thread::JoinHandle<()>>();
1123
1124 let rocksdb_dir = tempfile::tempdir().unwrap();
1125 let output_handle = timely::execute_directly(move |worker| {
1126 let tx = tx.clone();
1127 let (mut input_handle, mut persist_handle, output_probe, output_handle) =
1128 worker.dataflow::<MzTimestamp, _, _>(|scope| {
1129 // Enter a subscope since the upsert operator expects to work a backpressure
1130 // enabled scope.
1131 scope.scoped::<(MzTimestamp, Subtime), _, _>("upsert", |scope| {
1132 let (input_handle, input) = scope.new_input();
1133 let (persist_handle, persist_input) = scope.new_input();
1134 let upsert_config = UpsertConfig {
1135 shrink_upsert_unused_buffers_by_ratio: 0,
1136 };
1137 let source_id = GlobalId::User(0);
1138 let metrics_registry = MetricsRegistry::new();
1139 let upsert_metrics_defs =
1140 UpsertMetricDefs::register_with(&metrics_registry);
1141 let upsert_metrics =
1142 UpsertMetrics::new(&upsert_metrics_defs, source_id, 0, None);
1143 let rocksdb_shared_metrics = Arc::clone(&upsert_metrics.rocksdb_shared);
1144 let rocksdb_instance_metrics =
1145 Arc::clone(&upsert_metrics.rocksdb_instance_metrics);
1146
1147 let metrics_registry = MetricsRegistry::new();
1148 let storage_metrics = StorageMetrics::register_with(&metrics_registry);
1149
1150 let metrics_registry = MetricsRegistry::new();
1151 let source_statistics_defs =
1152 SourceStatisticsMetricDefs::register_with(&metrics_registry);
1153 let envelope = SourceEnvelope::Upsert(UpsertEnvelope {
1154 source_arity: 2,
1155 style: UpsertStyle::Default(KeyEnvelope::Flattened),
1156 key_indices: vec![0],
1157 });
1158 let source_statistics = SourceStatistics::new(
1159 source_id,
1160 0,
1161 &source_statistics_defs,
1162 source_id,
1163 &ShardId::new(),
1164 envelope,
1165 Antichain::from_elem(Timestamp::minimum()),
1166 );
1167
1168 let source_config = SourceExportCreationConfig {
1169 id: GlobalId::User(0),
1170 worker_id: 0,
1171 metrics: storage_metrics,
1172 source_statistics,
1173 };
1174
1175 // A closure that will initialize and return a configured RocksDB instance
1176 let rocksdb_init_fn = move || async move {
1177 let merge_operator = Some((
1178 "upsert_state_snapshot_merge_v1".to_string(),
1179 |a: &[u8],
1180 b: ValueIterator<
1181 BincodeOpts,
1182 StateValue<(MzTimestamp, Subtime), u64>,
1183 >| {
1184 consolidating_merge_function::<(MzTimestamp, Subtime), u64>(
1185 a.into(),
1186 b,
1187 )
1188 },
1189 ));
1190 let rocksdb_cleanup_tries = 5;
1191 let tuning = RocksDBConfig::new(Default::default(), None);
1192 let mut rocksdb_inst = mz_rocksdb::RocksDBInstance::new(
1193 rocksdb_dir.path(),
1194 mz_rocksdb::InstanceOptions::new(
1195 Env::mem_env().unwrap(),
1196 rocksdb_cleanup_tries,
1197 merge_operator,
1198 // For now, just use the same config as the one used for
1199 // merging snapshots.
1200 upsert_bincode_opts(),
1201 ),
1202 tuning,
1203 rocksdb_shared_metrics,
1204 rocksdb_instance_metrics,
1205 )
1206 .unwrap();
1207
1208 let handle = rocksdb_inst.take_core_loop_handle().expect("join handle");
1209 tx.send(handle).expect("sent joinhandle");
1210 crate::upsert::rocksdb::RocksDB::new(rocksdb_inst)
1211 };
1212
1213 let (output, _, _, button) = upsert_inner(
1214 input.as_collection(),
1215 vec![0],
1216 Antichain::from_elem(Timestamp::minimum()),
1217 persist_input.as_collection(),
1218 None,
1219 upsert_metrics,
1220 source_config,
1221 rocksdb_init_fn,
1222 upsert_config,
1223 true,
1224 None,
1225 );
1226 std::mem::forget(button);
1227
1228 let (probe, stream) = output.inner.probe();
1229 (input_handle, persist_handle, probe, stream.capture())
1230 })
1231 });
1232
1233 // We work with a hypothetical schema of (key int, value int).
1234
1235 // The input will contain records for two keys, 0 and 1.
1236 let key0 = UpsertKey::from_key(Ok(&Row::pack_slice(&[Datum::Int64(0)])));
1237
1238 // We will assume that the kafka topic contains the following messages with their
1239 // associated reclocked timestamp:
1240 // 1. {offset=1, key=0, value=0} @ mz_time = 0
1241 // 2. {offset=2, key=0, value=NULL} @ mz_time = 1
1242 // 3. {offset=3, key=0, value=0} @ mz_time = 2
1243 // 4. {offset=4, key=0, value=NULL} @ mz_time = 2 // <- messages 3 and 4 are *BOTH* reclocked to time 2
1244 let value1 = Row::pack_slice(&[Datum::Int64(0), Datum::Int64(0)]);
1245 let msg1 = ((key0, Some(Ok(value1.clone())), 1), mz_ts(0), Diff::ONE);
1246 let msg2 = ((key0, None, 2), mz_ts(1), Diff::ONE);
1247 let msg3 = ((key0, Some(Ok(value1.clone())), 3), mz_ts(2), Diff::ONE);
1248 let msg4 = ((key0, None, 4), mz_ts(2), Diff::ONE);
1249
1250 // The first message will initialize the upsert state such that key 0 has value 0 and
1251 // produce an output update to that effect.
1252 input_handle.send(msg1);
1253 input_handle.advance_to(mz_ts(1));
1254 while output_probe.less_than(&mz_ts(1)) {
1255 worker.step_or_park(None);
1256 }
1257 // Feedback the produced output..
1258 persist_handle.send((Ok(value1.clone()), mz_ts(0), Diff::ONE));
1259 persist_handle.advance_to(mz_ts(1));
1260 // ..and send the next upsert command that deletes the key.
1261 input_handle.send(msg2);
1262 input_handle.advance_to(mz_ts(2));
1263 while output_probe.less_than(&mz_ts(2)) {
1264 worker.step_or_park(None);
1265 }
1266
1267 // Feedback the produced output..
1268 persist_handle.send((Ok(value1), mz_ts(1), Diff::MINUS_ONE));
1269 persist_handle.advance_to(mz_ts(2));
1270 // ..and send the next *out of order* upsert command that deletes the key. Here msg4
1271 // happens at offset 4 and the operator should rememeber that.
1272 input_handle.send(msg4);
1273 input_handle.flush();
1274 // Run the worker for enough steps to process these events. We can't guide the
1275 // execution with the probe here since the frontier does not advance, only provisional
1276 // updates are produced.
1277 for _ in 0..5 {
1278 worker.step();
1279 }
1280
1281 // Send the missing message that will now confuse the operator because it has lost
1282 // track that for key 0 it has already seen a command for offset 4, and therefore msg3
1283 // should be skipped.
1284 input_handle.send(msg3);
1285 input_handle.flush();
1286 input_handle.advance_to(mz_ts(3));
1287
1288 output_handle
1289 });
1290
1291 let mut actual_output = output_handle
1292 .extract()
1293 .into_iter()
1294 .flat_map(|(_cap, container)| container)
1295 .collect();
1296 differential_dataflow::consolidation::consolidate_updates(&mut actual_output);
1297
1298 // The expected consolidated output contains only updates for key 0 which has the value 0
1299 // at timestamp 0 and the value 2 at timestamp 3
1300 let value1 = Row::pack_slice(&[Datum::Int64(0), Datum::Int64(0)]);
1301 let expected_output: Vec<(Result<Row, DataflowError>, _, _)> = vec![
1302 (Ok(value1.clone()), mz_ts(0), Diff::ONE),
1303 (Ok(value1), mz_ts(1), Diff::MINUS_ONE),
1304 ];
1305 assert_eq!(actual_output, expected_output);
1306
1307 while let Ok(handle) = rx.recv() {
1308 handle.join().expect("threads completed successfully");
1309 }
1310 }
1311}