Skip to main content

mz_storage/
upsert_continual_feedback.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Implementation of feedback UPSERT operator and associated helpers. See
11//! [`upsert_inner`] for a description of how the operator works and why.
12
13use std::cmp::Reverse;
14use std::fmt::Debug;
15use std::sync::Arc;
16
17use differential_dataflow::hashable::Hashable;
18use differential_dataflow::{AsCollection, VecCollection};
19use indexmap::map::Entry;
20use itertools::Itertools;
21use mz_repr::{Diff, GlobalId, Row};
22use mz_storage_types::errors::{DataflowError, EnvelopeError};
23use mz_timely_util::builder_async::{
24    Event as AsyncEvent, OperatorBuilder as AsyncOperatorBuilder, PressOnDropButton,
25};
26use std::convert::Infallible;
27use timely::container::CapacityContainerBuilder;
28use timely::dataflow::channels::pact::Exchange;
29use timely::dataflow::operators::{Capability, CapabilitySet};
30use timely::dataflow::{Scope, StreamVec};
31use timely::order::{PartialOrder, TotalOrder};
32use timely::progress::timestamp::Refines;
33use timely::progress::{Antichain, Timestamp};
34
35use crate::healthcheck::HealthStatusUpdate;
36use crate::metrics::upsert::UpsertMetrics;
37use crate::upsert::UpsertConfig;
38use crate::upsert::UpsertErrorEmitter;
39use crate::upsert::UpsertKey;
40use crate::upsert::UpsertValue;
41use crate::upsert::types::UpsertValueAndSize;
42use crate::upsert::types::{self as upsert_types, ValueMetadata};
43use crate::upsert::types::{StateValue, UpsertState, UpsertStateBackend};
44
45/// An operator that transforms an input stream of upserts (updates to key-value
46/// pairs), which represents an imaginary key-value state, into a differential
47/// collection. It keeps an internal map-like state which keeps the latest value
48/// for each key, such that it can emit the retractions and additions implied by
49/// a new update for a given key.
50///
51/// This operator is intended to be used in an ingestion pipeline that reads
52/// from an external source, and the output of this operator is eventually
53/// written to persist.
54///
55/// The operator has two inputs: a) the source input, of upserts, and b) a
56/// persist input that feeds back the upsert state to the operator. Below, there
57/// is a section for each input that describes how and why we process updates
58/// from each input.
59///
60/// An important property of this operator is that it does _not_ update the
61/// map-like state that it keeps for translating the stream of upserts into a
62/// differential collection when it processes source input. It _only_ updates
63/// the map-like state based on updates from the persist (feedback) input. We do
64/// this because the operator is expected to be used in cases where there are
65/// multiple concurrent instances of the same ingestion pipeline, and the
66/// different instances might see different input because of concurrency and
67/// non-determinism. All instances of the upsert operator must produce output
68/// that is consistent with the current state of the output (that all instances
69/// produce "collaboratively"). This global state is what the operator
70/// continually learns about via updates from the persist input.
71///
72/// ## Processing the Source Input
73///
74/// Updates on the source input are stashed/staged until they can be processed.
75/// Whether or not an update can be processed depends both on the upper frontier
76/// of the source input and on the upper frontier of the persist input:
77///
78///  - Input updates are only processed once their timestamp is "done", that is
79///  the input upper is no longer `less_equal` their timestamp.
80///
81///  - Input updates are only processed once they are at the persist upper, that
82///  is we have emitted and written down updates for all previous times and we
83///  have updated our map-like state to the latest global state of the output of
84///  the ingestion pipeline. We know this is the case when the persist upper is
85///  no longer `less_than` their timestamp.
86///
87/// As an optimization, we allow processing input updates when they are right at
88/// the input frontier. This is called _partial emission_ because we are
89/// emitting updates that might be retracted when processing more updates from
90/// the same timestamp. In order to be able to process these updates we keep
91/// _provisional values_ in our upsert state. These will be overwritten when we
92/// get the final upsert values on the persist input.
93///
94/// ## Processing the Persist Input
95///
96/// We continually ingest updates from the persist input into our state using
97/// `UpsertState::consolidate_chunk`. We might be ingesting updates from the
98/// initial snapshot (when starting the operator) that are not consolidated or
99/// we might be ingesting updates from a partial emission (see above). In either
100/// case, our input might not be consolidated and `consolidate_chunk` is able to
101/// handle that.
102pub fn upsert_inner<G: Scope, FromTime, F, Fut, US>(
103    input: VecCollection<G, (UpsertKey, Option<UpsertValue>, FromTime), Diff>,
104    key_indices: Vec<usize>,
105    resume_upper: Antichain<G::Timestamp>,
106    persist_input: VecCollection<G, Result<Row, DataflowError>, Diff>,
107    mut persist_token: Option<Vec<PressOnDropButton>>,
108    upsert_metrics: UpsertMetrics,
109    source_config: crate::source::SourceExportCreationConfig,
110    state_fn: F,
111    upsert_config: UpsertConfig,
112    prevent_snapshot_buffering: bool,
113    snapshot_buffering_max: Option<usize>,
114) -> (
115    VecCollection<G, Result<Row, DataflowError>, Diff>,
116    StreamVec<G, (Option<GlobalId>, HealthStatusUpdate)>,
117    StreamVec<G, Infallible>,
118    PressOnDropButton,
119)
120where
121    G::Timestamp: Refines<mz_repr::Timestamp> + TotalOrder + Sync,
122    F: FnOnce() -> Fut + 'static,
123    Fut: std::future::Future<Output = US>,
124    US: UpsertStateBackend<G::Timestamp, FromTime>,
125    FromTime: Debug + timely::ExchangeData + Clone + Ord + Sync,
126{
127    let mut builder = AsyncOperatorBuilder::new("Upsert".to_string(), input.scope());
128
129    // We only care about UpsertValueError since this is the only error that we can retract
130    let persist_input = persist_input.flat_map(move |result| {
131        let value = match result {
132            Ok(ok) => Ok(ok),
133            Err(DataflowError::EnvelopeError(err)) => match *err {
134                EnvelopeError::Upsert(err) => Err(Box::new(err)),
135                EnvelopeError::Flat(_) => return None,
136            },
137            Err(_) => return None,
138        };
139        let value_ref = match value {
140            Ok(ref row) => Ok(row),
141            Err(ref err) => Err(&**err),
142        };
143        Some((UpsertKey::from_value(value_ref, &key_indices), value))
144    });
145    let (output_handle, output) = builder.new_output::<CapacityContainerBuilder<_>>();
146
147    // An output that just reports progress of the snapshot consolidation process upstream to the
148    // persist source to ensure that backpressure is applied
149    let (_snapshot_handle, snapshot_stream) =
150        builder.new_output::<CapacityContainerBuilder<Vec<Infallible>>>();
151
152    let (mut health_output, health_stream) = builder.new_output();
153    let mut input = builder.new_input_for(
154        input.inner,
155        Exchange::new(move |((key, _, _), _, _)| UpsertKey::hashed(key)),
156        &output_handle,
157    );
158
159    let mut persist_input = builder.new_disconnected_input(
160        persist_input.inner,
161        Exchange::new(|((key, _), _, _)| UpsertKey::hashed(key)),
162    );
163
164    let upsert_shared_metrics = Arc::clone(&upsert_metrics.shared);
165
166    let shutdown_button = builder.build(move |caps| async move {
167        let [output_cap, snapshot_cap, health_cap]: [_; 3] = caps.try_into().unwrap();
168        drop(output_cap);
169        let mut snapshot_cap = CapabilitySet::from_elem(snapshot_cap);
170
171        let mut state = UpsertState::<_, G::Timestamp, FromTime>::new(
172            state_fn().await,
173            upsert_shared_metrics,
174            &upsert_metrics,
175            source_config.source_statistics.clone(),
176            upsert_config.shrink_upsert_unused_buffers_by_ratio,
177        );
178
179        // True while we're still reading the initial "snapshot" (a whole bunch
180        // of updates, all at the same initial timestamp) from our persist
181        // input or while we're reading the initial snapshot from the upstream
182        // source.
183        let mut hydrating = true;
184
185        // A re-usable buffer of changes, per key. This is an `IndexMap`
186        // because it has to be `drain`-able and have a consistent iteration
187        // order.
188        let mut commands_state: indexmap::IndexMap<
189            _,
190            upsert_types::UpsertValueAndSize<G::Timestamp, FromTime>,
191        > = indexmap::IndexMap::new();
192        let mut multi_get_scratch = Vec::new();
193
194        // For stashing source input while it's not eligible for processing.
195        let mut stash = vec![];
196        // A capability suitable for emitting any updates based on stash. No capability is held
197        // when the stash is empty.
198        let mut stash_cap: Option<Capability<G::Timestamp>> = None;
199        let mut input_upper = Antichain::from_elem(Timestamp::minimum());
200        let mut partial_drain_time = None;
201
202        // For our persist/feedback input, both of these.
203        let mut persist_stash = vec![];
204        let mut persist_upper = Antichain::from_elem(Timestamp::minimum());
205
206        // We keep track of the largest timestamp seen on the persist input so
207        // that we can block processing source input while that timestamp is
208        // beyond the persist frontier. While ingesting updates of a timestamp,
209        // our upsert state is in a consolidating state, and trying to read it
210        // at that time would yield a panic.
211        //
212        // NOTE(aljoscha): You would think that it cannot happen that we even
213        // attempt to process source updates while the state is in a
214        // consolidating state, because we always wait until the persist
215        // frontier "catches up" with the timestamp of the source input. If
216        // there is only this here UPSERT operator and no concurrent instances,
217        // this is true. But with concurrent instances it can happen that an
218        // operator that is faster than us makes it so updates get written to
219        // persist. And we would then be ingesting them.
220        let mut largest_seen_persist_ts: Option<G::Timestamp> = None;
221
222        // A buffer for our output.
223        let mut output_updates = vec![];
224
225        let mut error_emitter = (&mut health_output, &health_cap);
226
227        loop {
228            tokio::select! {
229                _ = persist_input.ready() => {
230                    // Read away as much input as we can.
231                    while let Some(persist_event) = persist_input.next_sync() {
232                        match persist_event {
233                            AsyncEvent::Data(time, data) => {
234                                tracing::trace!(
235                                    worker_id = %source_config.worker_id,
236                                    source_id = %source_config.id,
237                                    time=?time,
238                                    updates=%data.len(),
239                                    "received persist data");
240
241                                persist_stash.extend(data.into_iter().map(
242                                    |((key, value), ts, diff)| {
243                                        largest_seen_persist_ts =
244                                            std::cmp::max(
245                                                largest_seen_persist_ts
246                                                    .clone(),
247                                                Some(ts.clone()),
248                                            );
249                                        (key, value, ts, diff)
250                                    },
251                                ));
252                            }
253                            AsyncEvent::Progress(upper) => {
254                                tracing::trace!(
255                                    worker_id = %source_config.worker_id,
256                                    source_id = %source_config.id,
257                                    ?upper,
258                                    "received persist progress");
259                                persist_upper = upper;
260                            }
261                        }
262                    }
263
264                    let last_rehydration_chunk =
265                        hydrating && PartialOrder::less_equal(&resume_upper, &persist_upper);
266
267                    tracing::debug!(
268                        worker_id = %source_config.worker_id,
269                        source_id = %source_config.id,
270                        persist_stash = %persist_stash.len(),
271                        %hydrating,
272                        %last_rehydration_chunk,
273                        ?resume_upper,
274                        ?persist_upper,
275                        "ingesting persist snapshot chunk");
276
277                    // Log any (key, ts) pairs in this batch that have a suspicious
278                    // net diff, to help diagnose how diff_sum corruption enters the
279                    // system.
280                    //
281                    // Consolidating by key alone is too noisy during hydration,
282                    // because a single batch can legitimately contain multiple
283                    // timestamps for the same key. The suspicious shape for this
284                    // bug is multiple net updates for the same key at one logical
285                    // timestamp.
286                    {
287                        let mut key_ts_diffs: Vec<(
288                            (UpsertKey, G::Timestamp),
289                            mz_repr::Diff
290                        )> = persist_stash
291                            .iter()
292                            .map(|(key, _val, ts, diff)| ((*key, ts.clone()), *diff))
293                            .collect();
294                        differential_dataflow::consolidation::consolidate(&mut key_ts_diffs);
295                        for ((key, ts), net_diff) in &key_ts_diffs {
296                            if net_diff.into_inner() > 1 || net_diff.into_inner() < -1 {
297                                tracing::warn!(
298                                    worker_id = %source_config.worker_id,
299                                    source_id = %source_config.id,
300                                    ?key,
301                                    ?ts,
302                                    net_diff = net_diff.into_inner(),
303                                    %hydrating,
304                                    ?persist_upper,
305                                    "persist feedback batch has (key, ts) with suspicious net diff \
306                                    (expected -1, 0, or 1 after per-(key, ts) consolidation)"
307                                );
308                            }
309                        }
310                    }
311
312                    let persist_stash_iter = persist_stash
313                        .drain(..)
314                        .map(|(key, val, _ts, diff)| (key, val, diff));
315
316                    match state
317                        .consolidate_chunk(
318                            persist_stash_iter,
319                            last_rehydration_chunk,
320                        )
321                        .await
322                    {
323                        Ok(_) => {}
324                        Err(e) => {
325                            // Make sure our persist source can shut down.
326                            persist_token.take();
327                            snapshot_cap.downgrade(&[]);
328                            UpsertErrorEmitter::<G>::emit(
329                                &mut error_emitter,
330                                "Failed to rehydrate state".to_string(),
331                                e,
332                            )
333                            .await;
334                        }
335                    }
336
337                    tracing::debug!(
338                        worker_id = %source_config.worker_id,
339                        source_id = %source_config.id,
340                        ?resume_upper,
341                        ?persist_upper,
342                        "downgrading snapshot cap",
343                    );
344
345                    // Only downgrade this _after_ ingesting the data, because
346                    // that can actually take quite some time, and we don't want
347                    // to announce that we're done ingesting the initial
348                    // snapshot too early.
349                    //
350                    // When we finish ingesting our initial persist snapshot,
351                    // during "re-hydration", we downgrade this to the empty
352                    // frontier, so we need to be lenient to this failing from
353                    // then on.
354                    let _ = snapshot_cap.try_downgrade(persist_upper.iter());
355
356
357
358                    if last_rehydration_chunk {
359                        hydrating = false;
360
361                        tracing::info!(
362                            worker_id = %source_config.worker_id,
363                            source_id = %source_config.id,
364                            "upsert source finished rehydration",
365                        );
366
367                        snapshot_cap.downgrade(&[]);
368                    }
369
370                }
371                _ = input.ready() => {
372                    let mut events_processed = 0;
373                    while let Some(event) = input.next_sync() {
374                        match event {
375                            AsyncEvent::Data(cap, mut data) => {
376                                tracing::trace!(
377                                    worker_id = %source_config.worker_id,
378                                    source_id = %source_config.id,
379                                    time=?cap.time(),
380                                    updates=%data.len(),
381                                    "received data");
382
383                                let event_time = cap.time().clone();
384
385                                stage_input(
386                                    &mut stash,
387                                    &mut data,
388                                    &input_upper,
389                                    &resume_upper,
390                                );
391                                if !stash.is_empty() {
392                                    // Update the stashed capability to the minimum
393                                    stash_cap = match stash_cap {
394                                        Some(stash_cap) => {
395                                            if cap.time() < stash_cap.time() {
396                                                Some(cap)
397                                            } else {
398                                                Some(stash_cap)
399                                            }
400                                        }
401                                        None => Some(cap)
402                                    };
403                                }
404
405                                if prevent_snapshot_buffering
406                                    && input_upper.as_option()
407                                        == Some(&event_time)
408                                {
409                                    tracing::debug!(
410                                        worker_id = %source_config.worker_id,
411                                        source_id = %source_config.id,
412                                        ?event_time,
413                                        ?resume_upper,
414                                        ?input_upper,
415                                        "allowing partial drain");
416                                    partial_drain_time = Some(event_time.clone());
417                                } else {
418                                    tracing::debug!(
419                                        worker_id = %source_config.worker_id,
420                                        source_id = %source_config.id,
421                                        %prevent_snapshot_buffering,
422                                        ?event_time,
423                                        ?resume_upper,
424                                        ?input_upper,
425                                        "not allowing partial drain");
426                                }
427                            }
428                            AsyncEvent::Progress(upper) => {
429                                tracing::trace!(
430                                    worker_id = %source_config.worker_id,
431                                    source_id = %source_config.id,
432                                    ?upper,
433                                    "received progress");
434
435                                // Ignore progress updates before the `resume_upper`, which is our initial
436                                // capability post-snapshotting.
437                                if PartialOrder::less_than(&upper, &resume_upper) {
438                                    tracing::trace!(
439                                        worker_id = %source_config.worker_id,
440                                        source_id = %source_config.id,
441                                        ?upper,
442                                        ?resume_upper,
443                                        "ignoring progress updates before resume_upper");
444                                    continue;
445                                }
446
447                                // Disable partial drain, because this progress
448                                // update has moved the frontier. We might allow
449                                // it again once we receive data right at the
450                                // frontier again.
451                                partial_drain_time = None;
452                                input_upper = upper;
453                            }
454                        }
455
456                        events_processed += 1;
457                        if let Some(max) = snapshot_buffering_max {
458                            if events_processed >= max {
459                                break;
460                            }
461                        }
462                    }
463                }
464            };
465
466            // While we have partially ingested updates of a timestamp our state
467            // is in an inconsistent/consolidating state and accessing it would
468            // panic.
469            if let Some(largest_seen_persist_ts) = largest_seen_persist_ts.as_ref() {
470                let largest_seen_outer_persist_ts = largest_seen_persist_ts.clone().to_outer();
471                let outer_persist_upper = persist_upper.iter().map(|ts| ts.clone().to_outer());
472                let outer_persist_upper = Antichain::from_iter(outer_persist_upper);
473                if outer_persist_upper.less_equal(&largest_seen_outer_persist_ts) {
474                    continue;
475                }
476            }
477
478            // We try and drain from our stash every time we go through the
479            // loop. More of our stash can become eligible for draining both
480            // when the source-input frontier advances or when the persist
481            // frontier advances.
482            if !stash.is_empty() {
483                let cap = stash_cap
484                    .as_mut()
485                    .expect("missing capability for non-empty stash");
486
487                tracing::trace!(
488                    worker_id = %source_config.worker_id,
489                    source_id = %source_config.id,
490                    ?cap,
491                    ?stash,
492                    "stashed updates");
493
494                let mut min_remaining_time = drain_staged_input::<_, G, _, _, _>(
495                    &mut stash,
496                    &mut commands_state,
497                    &mut output_updates,
498                    &mut multi_get_scratch,
499                    DrainStyle::ToUpper {
500                        input_upper: &input_upper,
501                        persist_upper: &persist_upper,
502                    },
503                    &mut error_emitter,
504                    &mut state,
505                    &source_config,
506                )
507                .await;
508
509                tracing::trace!(
510                    worker_id = %source_config.worker_id,
511                    source_id = %source_config.id,
512                    output_updates = %output_updates.len(),
513                    "output updates for complete timestamp");
514
515                for (update, ts, diff) in output_updates.drain(..) {
516                    output_handle.give(cap, (update, ts, diff));
517                }
518
519                if !stash.is_empty() {
520                    let min_remaining_time = min_remaining_time
521                        .take()
522                        .expect("we still have updates left");
523                    cap.downgrade(&min_remaining_time);
524                } else {
525                    stash_cap = None;
526                }
527            }
528
529            if input_upper.is_empty() {
530                tracing::debug!(
531                    worker_id = %source_config.worker_id,
532                    source_id = %source_config.id,
533                    "input exhausted, shutting down");
534                break;
535            };
536
537            // If there were staged events that occurred at the capability time, drain
538            // them. This is safe because out-of-order updates to the same key that are
539            // drained in separate calls to `drain_staged_input` are correctly ordered by
540            // their `FromTime` in `drain_staged_input`.
541            //
542            // Note also that this may result in more updates in the output collection than
543            // the minimum. However, because the frontier only advances on `Progress` updates,
544            // the collection always accumulates correctly for all keys.
545            if let Some(partial_drain_time) = &partial_drain_time {
546                if !stash.is_empty() {
547                    let cap = stash_cap
548                        .as_mut()
549                        .expect("missing capability for non-empty stash");
550
551                    tracing::trace!(
552                        worker_id = %source_config.worker_id,
553                        source_id = %source_config.id,
554                        ?cap,
555                        ?stash,
556                        "stashed updates");
557
558                    let mut min_remaining_time = drain_staged_input::<_, G, _, _, _>(
559                        &mut stash,
560                        &mut commands_state,
561                        &mut output_updates,
562                        &mut multi_get_scratch,
563                        DrainStyle::AtTime {
564                            time: partial_drain_time.clone(),
565                            persist_upper: &persist_upper,
566                        },
567                        &mut error_emitter,
568                        &mut state,
569                        &source_config,
570                    )
571                    .await;
572
573                    tracing::trace!(
574                        worker_id = %source_config.worker_id,
575                        source_id = %source_config.id,
576                        output_updates = %output_updates.len(),
577                        "output updates for partial timestamp");
578
579                    for (update, ts, diff) in output_updates.drain(..) {
580                        output_handle.give(cap, (update, ts, diff));
581                    }
582
583                    if !stash.is_empty() {
584                        let min_remaining_time = min_remaining_time
585                            .take()
586                            .expect("we still have updates left");
587                        cap.downgrade(&min_remaining_time);
588                    } else {
589                        stash_cap = None;
590                    }
591                }
592            }
593        }
594    });
595
596    (
597        output
598            .as_collection()
599            .map(|result: UpsertValue| match result {
600                Ok(ok) => Ok(ok),
601                Err(err) => Err(DataflowError::from(EnvelopeError::Upsert(*err))),
602            }),
603        health_stream,
604        snapshot_stream,
605        shutdown_button.press_on_drop(),
606    )
607}
608
609/// Helper method for [`upsert_inner`] used to stage `data` updates
610/// from the input/source timely edge.
611#[allow(clippy::disallowed_types)]
612fn stage_input<T, FromTime>(
613    stash: &mut Vec<(T, UpsertKey, Reverse<FromTime>, Option<UpsertValue>)>,
614    data: &mut Vec<((UpsertKey, Option<UpsertValue>, FromTime), T, Diff)>,
615    input_upper: &Antichain<T>,
616    resume_upper: &Antichain<T>,
617) where
618    T: PartialOrder + timely::progress::Timestamp,
619    FromTime: Ord,
620{
621    if PartialOrder::less_equal(input_upper, resume_upper) {
622        data.retain(|(_, ts, _)| resume_upper.less_equal(ts));
623    }
624
625    stash.extend(data.drain(..).map(|((key, value, order), time, diff)| {
626        assert!(diff.is_positive(), "invalid upsert input");
627        (time, key, Reverse(order), value)
628    }));
629}
630
631/// The style of drain we are performing on the stash. `AtTime`-drains cannot
632/// assume that all values have been seen, and must leave tombstones behind for deleted values.
633#[derive(Debug)]
634enum DrainStyle<'a, T> {
635    ToUpper {
636        input_upper: &'a Antichain<T>,
637        persist_upper: &'a Antichain<T>,
638    },
639    // For partial draining when taking the source snapshot.
640    AtTime {
641        time: T,
642        persist_upper: &'a Antichain<T>,
643    },
644}
645
646/// Helper method for [`upsert_inner`] used to stage `data` updates
647/// from the input timely edge.
648///
649/// Returns the minimum observed time across the updates that remain in the
650/// stash or `None` if none are left.
651///
652/// ## Correctness
653///
654/// It is safe to call this function multiple times with the same `persist_upper` provided that the
655/// drain style is `AtTime`, which updates the state such that past actions are remembered and can
656/// be undone in subsequent calls.
657///
658/// It is *not* safe to call this function more than once with the same `persist_upper` and a
659/// `ToUpper` drain style. Doing so causes all calls except the first one to base their work on
660/// stale state, since in this drain style no modifications to the state are made.
661async fn drain_staged_input<S, G, T, FromTime, E>(
662    stash: &mut Vec<(T, UpsertKey, Reverse<FromTime>, Option<UpsertValue>)>,
663    commands_state: &mut indexmap::IndexMap<UpsertKey, UpsertValueAndSize<T, FromTime>>,
664    output_updates: &mut Vec<(UpsertValue, T, Diff)>,
665    multi_get_scratch: &mut Vec<UpsertKey>,
666    drain_style: DrainStyle<'_, T>,
667    error_emitter: &mut E,
668    state: &mut UpsertState<'_, S, T, FromTime>,
669    source_config: &crate::source::SourceExportCreationConfig,
670) -> Option<T>
671where
672    S: UpsertStateBackend<T, FromTime>,
673    G: Scope,
674    T: TotalOrder + timely::ExchangeData + Clone + Debug + Ord + Sync,
675    FromTime: timely::ExchangeData + Clone + Ord + Sync,
676    E: UpsertErrorEmitter<G>,
677{
678    let mut min_remaining_time = Antichain::new();
679
680    let mut eligible_updates = stash
681        .extract_if(.., |(ts, _, _, _)| {
682            let eligible = match &drain_style {
683                DrainStyle::ToUpper {
684                    input_upper,
685                    persist_upper,
686                } => {
687                    // We make sure that a) we only process updates when we know their
688                    // timestamp is complete, that is there will be no more updates for
689                    // that timestamp, and b) that "previous" times in the persist
690                    // input are complete. The latter makes sure that we emit updates
691                    // for the next timestamp that are consistent with the global state
692                    // in the output persist shard, which also serves as a persistent
693                    // copy of our in-memory/on-disk upsert state.
694                    !input_upper.less_equal(ts) && !persist_upper.less_than(ts)
695                }
696                DrainStyle::AtTime {
697                    time,
698                    persist_upper,
699                } => {
700                    // Even when emitting partial updates, we still need to wait
701                    // until "previous" times in the persist input are complete.
702                    *ts <= *time && !persist_upper.less_than(ts)
703                }
704            };
705
706            if !eligible {
707                min_remaining_time.insert(ts.clone());
708            }
709
710            eligible
711        })
712        .filter(|(ts, _, _, _)| {
713            let persist_upper = match &drain_style {
714                DrainStyle::ToUpper {
715                    input_upper: _,
716                    persist_upper,
717                } => persist_upper,
718                DrainStyle::AtTime {
719                    time: _,
720                    persist_upper,
721                } => persist_upper,
722            };
723
724            // Any update that is "in the past" of the persist upper is not
725            // relevant anymore. We _can_ emit changes for it, but the
726            // downstream persist_sink would filter these updates out because
727            // the shard upper is already further ahead.
728            //
729            // Plus, our upsert state is up-to-date to the persist_upper, so we
730            // wouldn't be able to emit correct retractions for incoming
731            // commands whose `ts` is in the past of that.
732            let relevant = persist_upper.less_equal(ts);
733            relevant
734        })
735        .collect_vec();
736
737    tracing::debug!(
738        worker_id = %source_config.worker_id,
739        source_id = %source_config.id,
740        ?drain_style,
741        remaining = %stash.len(),
742        eligible = eligible_updates.len(),
743        "draining stash");
744
745    // Sort the eligible updates by (key, time, Reverse(from_time)) so that
746    // deduping by (key, time) gives the latest change for that key.
747    eligible_updates.sort_unstable_by(|a, b| {
748        let (ts1, key1, from_ts1, val1) = a;
749        let (ts2, key2, from_ts2, val2) = b;
750        Ord::cmp(&(ts1, key1, from_ts1, val1), &(ts2, key2, from_ts2, val2))
751    });
752
753    // Read the previous values _per key_ out of `state`, recording it
754    // along with the value with the _latest timestamp for that key_.
755    commands_state.clear();
756    for (_, key, _, _) in eligible_updates.iter() {
757        commands_state.entry(*key).or_default();
758    }
759
760    // These iterators iterate in the same order because `commands_state`
761    // is an `IndexMap`.
762    multi_get_scratch.clear();
763    multi_get_scratch.extend(commands_state.iter().map(|(k, _)| *k));
764    match state
765        .multi_get(multi_get_scratch.drain(..), commands_state.values_mut())
766        .await
767    {
768        Ok(_) => {}
769        Err(e) => {
770            error_emitter
771                .emit("Failed to fetch records from state".to_string(), e)
772                .await;
773        }
774    }
775
776    // From the prefix that can be emitted we can deduplicate based on (ts, key) in
777    // order to only process the command with the maximum order within the (ts,
778    // key) group. This is achieved by wrapping order in `Reverse(FromTime)` above.;
779    let mut commands = eligible_updates.into_iter().dedup_by(|a, b| {
780        let ((a_ts, a_key, _, _), (b_ts, b_key, _, _)) = (a, b);
781        a_ts == b_ts && a_key == b_key
782    });
783
784    let bincode_opts = upsert_types::upsert_bincode_opts();
785    // Upsert the values into `commands_state`, by recording the latest
786    // value (or deletion). These will be synced at the end to the `state`.
787    //
788    // Note that we are effectively doing "mini-upsert" here, using
789    // `command_state`. This "mini-upsert" is seeded with data from `state`, using
790    // a single `multi_get` above, and the final state is written out into
791    // `state` using a single `multi_put`. This simplifies `UpsertStateBackend`
792    // implementations, and reduces the number of reads and write we need to do.
793    //
794    // This "mini-upsert" technique is actually useful in `UpsertState`'s
795    // `consolidate_snapshot_read_write_inner` implementation, minimizing gets and puts on
796    // the `UpsertStateBackend` implementations. In some sense, its "upsert all the way down".
797    while let Some((ts, key, from_time, value)) = commands.next() {
798        let mut command_state = if let Entry::Occupied(command_state) = commands_state.entry(key) {
799            command_state
800        } else {
801            panic!("key missing from commands_state");
802        };
803
804        let existing_state_cell = &mut command_state.get_mut().value;
805
806        if let Some(cs) = existing_state_cell.as_mut() {
807            cs.ensure_decoded(bincode_opts, source_config.id, Some(&key));
808        }
809
810        // Skip this command if its order key is below the one in the upsert state.
811        // Note that the existing order key may be `None` if the existing value
812        // is from snapshotting, which always sorts below new values/deletes.
813        let existing_order = existing_state_cell
814            .as_ref()
815            .and_then(|cs| cs.provisional_order(&ts));
816        if existing_order >= Some(&from_time.0) {
817            // Skip this update. If no later updates adjust this key, then we just
818            // end up writing the same value back to state. If there
819            // is nothing in the state, `existing_order` is `None`, and this
820            // does not occur.
821            continue;
822        }
823
824        match value {
825            Some(value) => {
826                if let Some(old_value) = existing_state_cell.as_ref() {
827                    if let Some(old_value) = old_value.provisional_value_ref(&ts) {
828                        output_updates.push((old_value.clone(), ts.clone(), Diff::MINUS_ONE));
829                    }
830                }
831
832                match &drain_style {
833                    DrainStyle::AtTime { .. } => {
834                        let existing_value = existing_state_cell.take();
835
836                        let new_value = match existing_value {
837                            Some(existing_value) => existing_value.clone().into_provisional_value(
838                                value.clone(),
839                                ts.clone(),
840                                from_time.0.clone(),
841                            ),
842                            None => StateValue::new_provisional_value(
843                                value.clone(),
844                                ts.clone(),
845                                from_time.0.clone(),
846                            ),
847                        };
848
849                        existing_state_cell.replace(new_value);
850                    }
851                    DrainStyle::ToUpper { .. } => {
852                        // Not writing down provisional values, or anything.
853                    }
854                };
855
856                output_updates.push((value, ts, Diff::ONE));
857            }
858            None => {
859                if let Some(old_value) = existing_state_cell.as_ref() {
860                    if let Some(old_value) = old_value.provisional_value_ref(&ts) {
861                        output_updates.push((old_value.clone(), ts.clone(), Diff::MINUS_ONE));
862                    }
863                }
864
865                match &drain_style {
866                    DrainStyle::AtTime { .. } => {
867                        let existing_value = existing_state_cell.take();
868
869                        let new_value = match existing_value {
870                            Some(existing_value) => existing_value
871                                .into_provisional_tombstone(ts.clone(), from_time.0.clone()),
872                            None => StateValue::new_provisional_tombstone(
873                                ts.clone(),
874                                from_time.0.clone(),
875                            ),
876                        };
877
878                        existing_state_cell.replace(new_value);
879                    }
880                    DrainStyle::ToUpper { .. } => {
881                        // Not writing down provisional values, or anything.
882                    }
883                }
884            }
885        }
886    }
887
888    match &drain_style {
889        DrainStyle::AtTime { .. } => {
890            match state
891                .multi_put(
892                    // We don't want to update per-record stats, like size of
893                    // records indexed or count of records indexed.
894                    //
895                    // We only add provisional values and these will be
896                    // overwritten once we receive updates for state from the
897                    // persist input. And the merge functionality cannot know
898                    // what was in state before merging, so it cannot correctly
899                    // retract/update stats added here.
900                    //
901                    // Mostly, the merge functionality can't update those stats
902                    // because merging happens in a function that we pass to
903                    // rocksdb which doesn't have access to any external
904                    // context. And in general, with rocksdb we do blind writes
905                    // rather than inspect what was there before when
906                    // updating/inserting.
907                    false,
908                    commands_state.drain(..).map(|(k, cv)| {
909                        (
910                            k,
911                            upsert_types::PutValue {
912                                value: cv.value.map(|cv| cv.into_decoded()),
913                                previous_value_metadata: cv.metadata.map(|v| ValueMetadata {
914                                    size: v.size.try_into().expect("less than i64 size"),
915                                    is_tombstone: v.is_tombstone,
916                                }),
917                            },
918                        )
919                    }),
920                )
921                .await
922            {
923                Ok(_) => {}
924                Err(e) => {
925                    error_emitter
926                        .emit("Failed to update records in state".to_string(), e)
927                        .await;
928                }
929            }
930        }
931        style @ DrainStyle::ToUpper { .. } => {
932            tracing::trace!(
933                worker_id = %source_config.worker_id,
934                source_id = %source_config.id,
935                "not doing state update for drain style {:?}", style);
936        }
937    }
938
939    min_remaining_time.into_option()
940}
941
942#[cfg(test)]
943mod test {
944    use std::sync::mpsc;
945
946    use mz_ore::metrics::MetricsRegistry;
947    use mz_persist_types::ShardId;
948    use mz_repr::{Datum, Timestamp as MzTimestamp};
949    use mz_rocksdb::{RocksDBConfig, ValueIterator};
950    use mz_storage_operators::persist_source::Subtime;
951    use mz_storage_types::sources::SourceEnvelope;
952    use mz_storage_types::sources::envelope::{KeyEnvelope, UpsertEnvelope, UpsertStyle};
953    use rocksdb::Env;
954    use timely::dataflow::operators::capture::Extract;
955    use timely::dataflow::operators::{Capture, Input, Probe};
956    use timely::progress::Timestamp;
957
958    use crate::metrics::StorageMetrics;
959    use crate::metrics::upsert::UpsertMetricDefs;
960    use crate::source::SourceExportCreationConfig;
961    use crate::statistics::{SourceStatistics, SourceStatisticsMetricDefs};
962    use crate::upsert::memory::InMemoryHashMap;
963    use crate::upsert::types::{BincodeOpts, consolidating_merge_function, upsert_bincode_opts};
964
965    use super::*;
966
967    #[mz_ore::test]
968    #[cfg_attr(miri, ignore)]
969    fn gh_9160_repro() {
970        // Helper to wrap timestamps in the appropriate types
971        let new_ts = |ts| (MzTimestamp::new(ts), Subtime::minimum());
972
973        let output_handle = timely::execute_directly(move |worker| {
974            let (mut input_handle, mut persist_handle, output_handle) = worker
975                .dataflow::<MzTimestamp, _, _>(|scope| {
976                    // Enter a subscope since the upsert operator expects to work a backpressure
977                    // enabled scope.
978                    scope.scoped::<(MzTimestamp, Subtime), _, _>("upsert", |scope| {
979                        let (input_handle, input) = scope.new_input();
980                        let (persist_handle, persist_input) = scope.new_input();
981                        let upsert_config = UpsertConfig {
982                            shrink_upsert_unused_buffers_by_ratio: 0,
983                        };
984                        let source_id = GlobalId::User(0);
985                        let metrics_registry = MetricsRegistry::new();
986                        let upsert_metrics_defs =
987                            UpsertMetricDefs::register_with(&metrics_registry);
988                        let upsert_metrics =
989                            UpsertMetrics::new(&upsert_metrics_defs, source_id, 0, None);
990
991                        let metrics_registry = MetricsRegistry::new();
992                        let storage_metrics = StorageMetrics::register_with(&metrics_registry);
993
994                        let metrics_registry = MetricsRegistry::new();
995                        let source_statistics_defs =
996                            SourceStatisticsMetricDefs::register_with(&metrics_registry);
997                        let envelope = SourceEnvelope::Upsert(UpsertEnvelope {
998                            source_arity: 2,
999                            style: UpsertStyle::Default(KeyEnvelope::Flattened),
1000                            key_indices: vec![0],
1001                        });
1002                        let source_statistics = SourceStatistics::new(
1003                            source_id,
1004                            0,
1005                            &source_statistics_defs,
1006                            source_id,
1007                            &ShardId::new(),
1008                            envelope,
1009                            Antichain::from_elem(Timestamp::minimum()),
1010                        );
1011
1012                        let source_config = SourceExportCreationConfig {
1013                            id: GlobalId::User(0),
1014                            worker_id: 0,
1015                            metrics: storage_metrics,
1016                            source_statistics,
1017                        };
1018
1019                        let (output, _, _, button) = upsert_inner(
1020                            input.as_collection(),
1021                            vec![0],
1022                            Antichain::from_elem(Timestamp::minimum()),
1023                            persist_input.as_collection(),
1024                            None,
1025                            upsert_metrics,
1026                            source_config,
1027                            || async { InMemoryHashMap::default() },
1028                            upsert_config,
1029                            true,
1030                            None,
1031                        );
1032                        std::mem::forget(button);
1033
1034                        (input_handle, persist_handle, output.inner.capture())
1035                    })
1036                });
1037
1038            // We work with a hypothetical schema of (key int, value int).
1039
1040            // The input will contain records for two keys, 0 and 1.
1041            let key0 = UpsertKey::from_key(Ok(&Row::pack_slice(&[Datum::Int64(0)])));
1042            let key1 = UpsertKey::from_key(Ok(&Row::pack_slice(&[Datum::Int64(1)])));
1043
1044            // We will assume that the kafka topic contains the following messages with their
1045            // associated reclocked timestamp:
1046            //  1. {offset=1, key=0, value=0}    @ mz_time = 0
1047            //  2. {offset=2, key=1, value=NULL} @ mz_time = 2  // <- deletion of unrelated key. Causes the operator
1048            //                                                  //    to maintain the associated cap to time 2
1049            //  3. {offset=3, key=0, value=1}    @ mz_time = 3
1050            //  4. {offset=4, key=0, value=2}    @ mz_time = 3  // <- messages 2 and 3 are reclocked to time 3
1051            let value1 = Row::pack_slice(&[Datum::Int64(0), Datum::Int64(0)]);
1052            let value3 = Row::pack_slice(&[Datum::Int64(0), Datum::Int64(1)]);
1053            let value4 = Row::pack_slice(&[Datum::Int64(0), Datum::Int64(2)]);
1054            let msg1 = (key0, Some(Ok(value1.clone())), 1);
1055            let msg2 = (key1, None, 2);
1056            let msg3 = (key0, Some(Ok(value3)), 3);
1057            let msg4 = (key0, Some(Ok(value4)), 4);
1058
1059            // The first message will initialize the upsert state such that key 0 has value 0 and
1060            // produce an output update to that effect.
1061            input_handle.send((msg1, new_ts(0), Diff::ONE));
1062            input_handle.advance_to(new_ts(2));
1063            worker.step();
1064
1065            // We assume this worker succesfully CAAs the update to the shard so we send it back
1066            // through the persist_input
1067            persist_handle.send((Ok(value1), new_ts(0), Diff::ONE));
1068            persist_handle.advance_to(new_ts(1));
1069            worker.step();
1070
1071            // Then, messages 2 and 3 are sent as one batch with capability = 2
1072            input_handle.send_batch(&mut vec![
1073                (msg2, new_ts(2), Diff::ONE),
1074                (msg3, new_ts(3), Diff::ONE),
1075            ]);
1076            // Advance our capability to 3
1077            input_handle.advance_to(new_ts(3));
1078            // Message 4 is sent with capability 3
1079            input_handle.send_batch(&mut vec![(msg4, new_ts(3), Diff::ONE)]);
1080            // Advance our capability to 4
1081            input_handle.advance_to(new_ts(4));
1082            // We now step the worker so that the pending data is received. This causes the
1083            // operator to store internally the following map from capabilities to updates:
1084            // cap=2 => [ msg2, msg3 ]
1085            // cap=3 => [ msg4 ]
1086            worker.step();
1087
1088            // We now assume that another replica raced us and processed msg1 at time 2, which in
1089            // this test is a no-op so the persist frontier advances to time 3 without new data.
1090            persist_handle.advance_to(new_ts(3));
1091            // We now step this worker again, which will notice that the persist upper is {3} and
1092            // wlil attempt to process msg3 and msg4 *separately*, causing it to produce a double
1093            // retraction.
1094            worker.step();
1095
1096            output_handle
1097        });
1098
1099        let mut actual_output = output_handle
1100            .extract()
1101            .into_iter()
1102            .flat_map(|(_cap, container)| container)
1103            .collect();
1104        differential_dataflow::consolidation::consolidate_updates(&mut actual_output);
1105
1106        // The expected consolidated output contains only updates for key 0 which has the value 0
1107        // at timestamp 0 and the value 2 at timestamp 3
1108        let value1 = Row::pack_slice(&[Datum::Int64(0), Datum::Int64(0)]);
1109        let value4 = Row::pack_slice(&[Datum::Int64(0), Datum::Int64(2)]);
1110        let expected_output: Vec<(Result<Row, DataflowError>, _, _)> = vec![
1111            (Ok(value1.clone()), new_ts(0), Diff::ONE),
1112            (Ok(value1), new_ts(3), Diff::MINUS_ONE),
1113            (Ok(value4), new_ts(3), Diff::ONE),
1114        ];
1115        assert_eq!(actual_output, expected_output);
1116    }
1117
1118    #[mz_ore::test]
1119    #[cfg_attr(miri, ignore)]
1120    fn gh_9540_repro() {
1121        // Helper to wrap timestamps in the appropriate types
1122        let mz_ts = |ts| (MzTimestamp::new(ts), Subtime::minimum());
1123        let (tx, rx) = mpsc::channel::<std::thread::JoinHandle<()>>();
1124
1125        let rocksdb_dir = tempfile::tempdir().unwrap();
1126        let output_handle = timely::execute_directly(move |worker| {
1127            let tx = tx.clone();
1128            let (mut input_handle, mut persist_handle, output_probe, output_handle) =
1129                worker.dataflow::<MzTimestamp, _, _>(|scope| {
1130                    // Enter a subscope since the upsert operator expects to work a backpressure
1131                    // enabled scope.
1132                    scope.scoped::<(MzTimestamp, Subtime), _, _>("upsert", |scope| {
1133                        let (input_handle, input) = scope.new_input();
1134                        let (persist_handle, persist_input) = scope.new_input();
1135                        let upsert_config = UpsertConfig {
1136                            shrink_upsert_unused_buffers_by_ratio: 0,
1137                        };
1138                        let source_id = GlobalId::User(0);
1139                        let metrics_registry = MetricsRegistry::new();
1140                        let upsert_metrics_defs =
1141                            UpsertMetricDefs::register_with(&metrics_registry);
1142                        let upsert_metrics =
1143                            UpsertMetrics::new(&upsert_metrics_defs, source_id, 0, None);
1144                        let rocksdb_shared_metrics = Arc::clone(&upsert_metrics.rocksdb_shared);
1145                        let rocksdb_instance_metrics =
1146                            Arc::clone(&upsert_metrics.rocksdb_instance_metrics);
1147
1148                        let metrics_registry = MetricsRegistry::new();
1149                        let storage_metrics = StorageMetrics::register_with(&metrics_registry);
1150
1151                        let metrics_registry = MetricsRegistry::new();
1152                        let source_statistics_defs =
1153                            SourceStatisticsMetricDefs::register_with(&metrics_registry);
1154                        let envelope = SourceEnvelope::Upsert(UpsertEnvelope {
1155                            source_arity: 2,
1156                            style: UpsertStyle::Default(KeyEnvelope::Flattened),
1157                            key_indices: vec![0],
1158                        });
1159                        let source_statistics = SourceStatistics::new(
1160                            source_id,
1161                            0,
1162                            &source_statistics_defs,
1163                            source_id,
1164                            &ShardId::new(),
1165                            envelope,
1166                            Antichain::from_elem(Timestamp::minimum()),
1167                        );
1168
1169                        let source_config = SourceExportCreationConfig {
1170                            id: GlobalId::User(0),
1171                            worker_id: 0,
1172                            metrics: storage_metrics,
1173                            source_statistics,
1174                        };
1175
1176                        // A closure that will initialize and return a configured RocksDB instance
1177                        let rocksdb_init_fn = move || async move {
1178                            let merge_operator = Some((
1179                                "upsert_state_snapshot_merge_v1".to_string(),
1180                                |a: &[u8],
1181                                 b: ValueIterator<
1182                                    BincodeOpts,
1183                                    StateValue<(MzTimestamp, Subtime), u64>,
1184                                >| {
1185                                    consolidating_merge_function::<(MzTimestamp, Subtime), u64>(
1186                                        a.into(),
1187                                        b,
1188                                    )
1189                                },
1190                            ));
1191                            let rocksdb_cleanup_tries = 5;
1192                            let tuning = RocksDBConfig::new(Default::default(), None);
1193                            let mut rocksdb_inst = mz_rocksdb::RocksDBInstance::new(
1194                                rocksdb_dir.path(),
1195                                mz_rocksdb::InstanceOptions::new(
1196                                    Env::mem_env().unwrap(),
1197                                    rocksdb_cleanup_tries,
1198                                    merge_operator,
1199                                    // For now, just use the same config as the one used for
1200                                    // merging snapshots.
1201                                    upsert_bincode_opts(),
1202                                ),
1203                                tuning,
1204                                rocksdb_shared_metrics,
1205                                rocksdb_instance_metrics,
1206                            )
1207                            .unwrap();
1208
1209                            let handle = rocksdb_inst.take_core_loop_handle().expect("join handle");
1210                            tx.send(handle).expect("sent joinhandle");
1211                            crate::upsert::rocksdb::RocksDB::new(rocksdb_inst)
1212                        };
1213
1214                        let (output, _, _, button) = upsert_inner(
1215                            input.as_collection(),
1216                            vec![0],
1217                            Antichain::from_elem(Timestamp::minimum()),
1218                            persist_input.as_collection(),
1219                            None,
1220                            upsert_metrics,
1221                            source_config,
1222                            rocksdb_init_fn,
1223                            upsert_config,
1224                            true,
1225                            None,
1226                        );
1227                        std::mem::forget(button);
1228
1229                        let (probe, stream) = output.inner.probe();
1230                        (input_handle, persist_handle, probe, stream.capture())
1231                    })
1232                });
1233
1234            // We work with a hypothetical schema of (key int, value int).
1235
1236            // The input will contain records for two keys, 0 and 1.
1237            let key0 = UpsertKey::from_key(Ok(&Row::pack_slice(&[Datum::Int64(0)])));
1238
1239            // We will assume that the kafka topic contains the following messages with their
1240            // associated reclocked timestamp:
1241            //  1. {offset=1, key=0, value=0}    @ mz_time = 0
1242            //  2. {offset=2, key=0, value=NULL} @ mz_time = 1
1243            //  3. {offset=3, key=0, value=0}    @ mz_time = 2
1244            //  4. {offset=4, key=0, value=NULL} @ mz_time = 2  // <- messages 3 and 4 are *BOTH* reclocked to time 2
1245            let value1 = Row::pack_slice(&[Datum::Int64(0), Datum::Int64(0)]);
1246            let msg1 = ((key0, Some(Ok(value1.clone())), 1), mz_ts(0), Diff::ONE);
1247            let msg2 = ((key0, None, 2), mz_ts(1), Diff::ONE);
1248            let msg3 = ((key0, Some(Ok(value1.clone())), 3), mz_ts(2), Diff::ONE);
1249            let msg4 = ((key0, None, 4), mz_ts(2), Diff::ONE);
1250
1251            // The first message will initialize the upsert state such that key 0 has value 0 and
1252            // produce an output update to that effect.
1253            input_handle.send(msg1);
1254            input_handle.advance_to(mz_ts(1));
1255            while output_probe.less_than(&mz_ts(1)) {
1256                worker.step_or_park(None);
1257            }
1258            // Feedback the produced output..
1259            persist_handle.send((Ok(value1.clone()), mz_ts(0), Diff::ONE));
1260            persist_handle.advance_to(mz_ts(1));
1261            // ..and send the next upsert command that deletes the key.
1262            input_handle.send(msg2);
1263            input_handle.advance_to(mz_ts(2));
1264            while output_probe.less_than(&mz_ts(2)) {
1265                worker.step_or_park(None);
1266            }
1267
1268            // Feedback the produced output..
1269            persist_handle.send((Ok(value1), mz_ts(1), Diff::MINUS_ONE));
1270            persist_handle.advance_to(mz_ts(2));
1271            // ..and send the next *out of order* upsert command that deletes the key. Here msg4
1272            // happens at offset 4 and the operator should rememeber that.
1273            input_handle.send(msg4);
1274            input_handle.flush();
1275            // Run the worker for enough steps to process these events. We can't guide the
1276            // execution with the probe here since the frontier does not advance, only provisional
1277            // updates are produced.
1278            for _ in 0..5 {
1279                worker.step();
1280            }
1281
1282            // Send the missing message that will now confuse the operator because it has lost
1283            // track that for key 0 it has already seen a command for offset 4, and therefore msg3
1284            // should be skipped.
1285            input_handle.send(msg3);
1286            input_handle.flush();
1287            input_handle.advance_to(mz_ts(3));
1288
1289            output_handle
1290        });
1291
1292        let mut actual_output = output_handle
1293            .extract()
1294            .into_iter()
1295            .flat_map(|(_cap, container)| container)
1296            .collect();
1297        differential_dataflow::consolidation::consolidate_updates(&mut actual_output);
1298
1299        // The expected consolidated output contains only updates for key 0 which has the value 0
1300        // at timestamp 0 and the value 2 at timestamp 3
1301        let value1 = Row::pack_slice(&[Datum::Int64(0), Datum::Int64(0)]);
1302        let expected_output: Vec<(Result<Row, DataflowError>, _, _)> = vec![
1303            (Ok(value1.clone()), mz_ts(0), Diff::ONE),
1304            (Ok(value1), mz_ts(1), Diff::MINUS_ONE),
1305        ];
1306        assert_eq!(actual_output, expected_output);
1307
1308        while let Ok(handle) = rx.recv() {
1309            handle.join().expect("threads completed successfully");
1310        }
1311    }
1312}