mz_storage/
upsert_continual_feedback.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Implementation of feedback UPSERT operator and associated helpers. See
11//! [`upsert_inner`] for a description of how the operator works and why.
12
13use std::cmp::Reverse;
14use std::fmt::Debug;
15use std::sync::Arc;
16
17use differential_dataflow::hashable::Hashable;
18use differential_dataflow::{AsCollection, Collection};
19use indexmap::map::Entry;
20use itertools::Itertools;
21use mz_repr::{Diff, GlobalId, Row};
22use mz_storage_types::errors::{DataflowError, EnvelopeError};
23use mz_timely_util::builder_async::{
24    Event as AsyncEvent, OperatorBuilder as AsyncOperatorBuilder, PressOnDropButton,
25};
26use std::convert::Infallible;
27use timely::container::CapacityContainerBuilder;
28use timely::dataflow::channels::pact::Exchange;
29use timely::dataflow::operators::{Capability, CapabilitySet};
30use timely::dataflow::{Scope, Stream};
31use timely::order::{PartialOrder, TotalOrder};
32use timely::progress::timestamp::Refines;
33use timely::progress::{Antichain, Timestamp};
34
35use crate::healthcheck::HealthStatusUpdate;
36use crate::metrics::upsert::UpsertMetrics;
37use crate::upsert::UpsertConfig;
38use crate::upsert::UpsertErrorEmitter;
39use crate::upsert::UpsertKey;
40use crate::upsert::UpsertValue;
41use crate::upsert::types::UpsertValueAndSize;
42use crate::upsert::types::{self as upsert_types, ValueMetadata};
43use crate::upsert::types::{StateValue, UpsertState, UpsertStateBackend};
44
45/// An operator that transforms an input stream of upserts (updates to key-value
46/// pairs), which represents an imaginary key-value state, into a differential
47/// collection. It keeps an internal map-like state which keeps the latest value
48/// for each key, such that it can emit the retractions and additions implied by
49/// a new update for a given key.
50///
51/// This operator is intended to be used in an ingestion pipeline that reads
52/// from an external source, and the output of this operator is eventually
53/// written to persist.
54///
55/// The operator has two inputs: a) the source input, of upserts, and b) a
56/// persist input that feeds back the upsert state to the operator. Below, there
57/// is a section for each input that describes how and why we process updates
58/// from each input.
59///
60/// An important property of this operator is that it does _not_ update the
61/// map-like state that it keeps for translating the stream of upserts into a
62/// differential collection when it processes source input. It _only_ updates
63/// the map-like state based on updates from the persist (feedback) input. We do
64/// this because the operator is expected to be used in cases where there are
65/// multiple concurrent instances of the same ingestion pipeline, and the
66/// different instances might see different input because of concurrency and
67/// non-determinism. All instances of the upsert operator must produce output
68/// that is consistent with the current state of the output (that all instances
69/// produce "collaboratively"). This global state is what the operator
70/// continually learns about via updates from the persist input.
71///
72/// ## Processing the Source Input
73///
74/// Updates on the source input are stashed/staged until they can be processed.
75/// Whether or not an update can be processed depends both on the upper frontier
76/// of the source input and on the upper frontier of the persist input:
77///
78///  - Input updates are only processed once their timestamp is "done", that is
79///  the input upper is no longer `less_equal` their timestamp.
80///
81///  - Input updates are only processed once they are at the persist upper, that
82///  is we have emitted and written down updates for all previous times and we
83///  have updated our map-like state to the latest global state of the output of
84///  the ingestion pipeline. We know this is the case when the persist upper is
85///  no longer `less_than` their timestamp.
86///
87/// As an optimization, we allow processing input updates when they are right at
88/// the input frontier. This is called _partial emission_ because we are
89/// emitting updates that might be retracted when processing more updates from
90/// the same timestamp. In order to be able to process these updates we keep
91/// _provisional values_ in our upsert state. These will be overwritten when we
92/// get the final upsert values on the persist input.
93///
94/// ## Processing the Persist Input
95///
96/// We continually ingest updates from the persist input into our state using
97/// `UpsertState::consolidate_chunk`. We might be ingesting updates from the
98/// initial snapshot (when starting the operator) that are not consolidated or
99/// we might be ingesting updates from a partial emission (see above). In either
100/// case, our input might not be consolidated and `consolidate_chunk` is able to
101/// handle that.
102pub fn upsert_inner<G: Scope, FromTime, F, Fut, US>(
103    input: &Collection<G, (UpsertKey, Option<UpsertValue>, FromTime), Diff>,
104    key_indices: Vec<usize>,
105    resume_upper: Antichain<G::Timestamp>,
106    persist_input: Collection<G, Result<Row, DataflowError>, Diff>,
107    mut persist_token: Option<Vec<PressOnDropButton>>,
108    upsert_metrics: UpsertMetrics,
109    source_config: crate::source::SourceExportCreationConfig,
110    state_fn: F,
111    upsert_config: UpsertConfig,
112    prevent_snapshot_buffering: bool,
113    snapshot_buffering_max: Option<usize>,
114) -> (
115    Collection<G, Result<Row, DataflowError>, Diff>,
116    Stream<G, (Option<GlobalId>, HealthStatusUpdate)>,
117    Stream<G, Infallible>,
118    PressOnDropButton,
119)
120where
121    G::Timestamp: Refines<mz_repr::Timestamp> + TotalOrder + Sync,
122    F: FnOnce() -> Fut + 'static,
123    Fut: std::future::Future<Output = US>,
124    US: UpsertStateBackend<G::Timestamp, FromTime>,
125    FromTime: Debug + timely::ExchangeData + Ord + Sync,
126{
127    let mut builder = AsyncOperatorBuilder::new("Upsert".to_string(), input.scope());
128
129    // We only care about UpsertValueError since this is the only error that we can retract
130    let persist_input = persist_input.flat_map(move |result| {
131        let value = match result {
132            Ok(ok) => Ok(ok),
133            Err(DataflowError::EnvelopeError(err)) => match *err {
134                EnvelopeError::Upsert(err) => Err(Box::new(err)),
135                _ => return None,
136            },
137            Err(_) => return None,
138        };
139        let value_ref = match value {
140            Ok(ref row) => Ok(row),
141            Err(ref err) => Err(&**err),
142        };
143        Some((UpsertKey::from_value(value_ref, &key_indices), value))
144    });
145    let (output_handle, output) = builder.new_output();
146
147    // An output that just reports progress of the snapshot consolidation process upstream to the
148    // persist source to ensure that backpressure is applied
149    let (_snapshot_handle, snapshot_stream) =
150        builder.new_output::<CapacityContainerBuilder<Vec<Infallible>>>();
151
152    let (mut health_output, health_stream) = builder.new_output();
153    let mut input = builder.new_input_for(
154        &input.inner,
155        Exchange::new(move |((key, _, _), _, _)| UpsertKey::hashed(key)),
156        &output_handle,
157    );
158
159    let mut persist_input = builder.new_disconnected_input(
160        &persist_input.inner,
161        Exchange::new(|((key, _), _, _)| UpsertKey::hashed(key)),
162    );
163
164    let upsert_shared_metrics = Arc::clone(&upsert_metrics.shared);
165
166    let shutdown_button = builder.build(move |caps| async move {
167        let [output_cap, snapshot_cap, health_cap]: [_; 3] = caps.try_into().unwrap();
168        drop(output_cap);
169        let mut snapshot_cap = CapabilitySet::from_elem(snapshot_cap);
170
171        let mut state = UpsertState::<_, G::Timestamp, FromTime>::new(
172            state_fn().await,
173            upsert_shared_metrics,
174            &upsert_metrics,
175            source_config.source_statistics.clone(),
176            upsert_config.shrink_upsert_unused_buffers_by_ratio,
177        );
178
179        // True while we're still reading the initial "snapshot" (a whole bunch
180        // of updates, all at the same initial timestamp) from our persist
181        // input or while we're reading the initial snapshot from the upstream
182        // source.
183        let mut hydrating = true;
184
185        // A re-usable buffer of changes, per key. This is an `IndexMap` because it has to be `drain`-able
186        // and have a consistent iteration order.
187        let mut commands_state: indexmap::IndexMap<_, upsert_types::UpsertValueAndSize<G::Timestamp, FromTime>> =
188            indexmap::IndexMap::new();
189        let mut multi_get_scratch = Vec::new();
190
191        // For stashing source input while it's not eligible for processing.
192        let mut stash = vec![];
193        // A capability suitable for emitting any updates based on stash. No capability is held
194        // when the stash is empty.
195        let mut stash_cap: Option<Capability<G::Timestamp>> = None;
196        let mut input_upper = Antichain::from_elem(Timestamp::minimum());
197        let mut partial_drain_time = None;
198
199        // For our persist/feedback input, both of these.
200        let mut persist_stash = vec![];
201        let mut persist_upper = Antichain::from_elem(Timestamp::minimum());
202
203        // We keep track of the largest timestamp seen on the persist input so
204        // that we can block processing source input while that timestamp is
205        // beyond the persist frontier. While ingesting updates of a timestamp,
206        // our upsert state is in a consolidating state, and trying to read it
207        // at that time would yield a panic.
208        //
209        // NOTE(aljoscha): You would think that it cannot happen that we even
210        // attempt to process source updates while the state is in a
211        // consolidating state, because we always wait until the persist
212        // frontier "catches up" with the timestamp of the source input. If
213        // there is only this here UPSERT operator and no concurrent instances,
214        // this is true. But with concurrent instances it can happen that an
215        // operator that is faster than us makes it so updates get written to
216        // persist. And we would then be ingesting them.
217        let mut largest_seen_persist_ts: Option<G::Timestamp> = None;
218
219        // A buffer for our output.
220        let mut output_updates = vec![];
221
222        let mut error_emitter = (&mut health_output, &health_cap);
223
224
225        loop {
226            tokio::select! {
227                _ = persist_input.ready() => {
228                    // Read away as much input as we can.
229                    while let Some(persist_event) = persist_input.next_sync() {
230                        match persist_event {
231                            AsyncEvent::Data(time, data) => {
232                                tracing::trace!(
233                                    worker_id = %source_config.worker_id,
234                                    source_id = %source_config.id,
235                                    time=?time,
236                                    updates=%data.len(),
237                                    "received persist data");
238
239                                persist_stash.extend(data.into_iter().map(|((key, value), ts, diff)| {
240                                    largest_seen_persist_ts = std::cmp::max(largest_seen_persist_ts.clone(), Some(ts.clone()));
241                                    (key, value, ts, diff)
242                                }));
243                            }
244                            AsyncEvent::Progress(upper) => {
245                                tracing::trace!(
246                                    worker_id = %source_config.worker_id,
247                                    source_id = %source_config.id,
248                                    ?upper,
249                                    "received persist progress");
250                                persist_upper = upper;
251                            }
252                        }
253                    }
254
255                    let last_rehydration_chunk =
256                        hydrating && PartialOrder::less_equal(&resume_upper, &persist_upper);
257
258                    tracing::debug!(
259                        worker_id = %source_config.worker_id,
260                        source_id = %source_config.id,
261                        persist_stash = %persist_stash.len(),
262                        %hydrating,
263                        %last_rehydration_chunk,
264                        ?resume_upper,
265                        ?persist_upper,
266                        "ingesting persist snapshot chunk");
267
268                    let persist_stash_iter = persist_stash
269                        .drain(..)
270                        .map(|(key, val, _ts, diff)| (key, val, diff));
271
272                    match state
273                        .consolidate_chunk(
274                            persist_stash_iter,
275                            last_rehydration_chunk,
276                        )
277                        .await
278                    {
279                        Ok(_) => {}
280                        Err(e) => {
281                            // Make sure our persist source can shut down.
282                            persist_token.take();
283                            snapshot_cap.downgrade(&[]);
284                            UpsertErrorEmitter::<G>::emit(
285                                &mut error_emitter,
286                                "Failed to rehydrate state".to_string(),
287                                e,
288                            )
289                            .await;
290                        }
291                    }
292
293                    tracing::debug!(
294                        worker_id = %source_config.worker_id,
295                        source_id = %source_config.id,
296                        ?resume_upper,
297                        ?persist_upper,
298                        "downgrading snapshot cap",
299                    );
300
301                    // Only downgrade this _after_ ingesting the data, because
302                    // that can actually take quite some time, and we don't want
303                    // to announce that we're done ingesting the initial
304                    // snapshot too early.
305                    //
306                    // When we finish ingesting our initial persist snapshot,
307                    // during "re-hydration", we downgrade this to the empty
308                    // frontier, so we need to be lenient to this failing from
309                    // then on.
310                    let _ = snapshot_cap.try_downgrade(persist_upper.iter());
311
312
313
314                    if last_rehydration_chunk {
315                        hydrating = false;
316
317                        tracing::info!(
318                            worker_id = %source_config.worker_id,
319                            source_id = %source_config.id,
320                            "upsert source finished rehydration",
321                        );
322
323                        snapshot_cap.downgrade(&[]);
324                    }
325
326                }
327                _ = input.ready() => {
328                    let mut events_processed = 0;
329                    while let Some(event) = input.next_sync() {
330                        match event {
331                            AsyncEvent::Data(cap, mut data) => {
332                                tracing::trace!(
333                                    worker_id = %source_config.worker_id,
334                                    source_id = %source_config.id,
335                                    time=?cap.time(),
336                                    updates=%data.len(),
337                                    "received data");
338
339                                let event_time = cap.time().clone();
340
341                                stage_input(
342                                    &mut stash,
343                                    &mut data,
344                                    &input_upper,
345                                    &resume_upper,
346                                );
347                                if !stash.is_empty() {
348                                    // Update the stashed capability to the minimum
349                                    stash_cap = match stash_cap {
350                                        Some(stash_cap) => {
351                                            if cap.time() < stash_cap.time() {
352                                                Some(cap)
353                                            } else {
354                                                Some(stash_cap)
355                                            }
356                                        }
357                                        None => Some(cap)
358                                    };
359                                }
360
361                                if prevent_snapshot_buffering && input_upper.as_option() == Some(&event_time) {
362                                    tracing::debug!(
363                                        worker_id = %source_config.worker_id,
364                                        source_id = %source_config.id,
365                                        ?event_time,
366                                        ?resume_upper,
367                                        ?input_upper,
368                                        "allowing partial drain");
369                                    partial_drain_time = Some(event_time.clone());
370                                } else {
371                                    tracing::debug!(
372                                        worker_id = %source_config.worker_id,
373                                        source_id = %source_config.id,
374                                        %prevent_snapshot_buffering,
375                                        ?event_time,
376                                        ?resume_upper,
377                                        ?input_upper,
378                                        "not allowing partial drain");
379                                }
380                            }
381                            AsyncEvent::Progress(upper) => {
382                                tracing::trace!(
383                                    worker_id = %source_config.worker_id,
384                                    source_id = %source_config.id,
385                                    ?upper,
386                                    "received progress");
387
388                                // Ignore progress updates before the `resume_upper`, which is our initial
389                                // capability post-snapshotting.
390                                if PartialOrder::less_than(&upper, &resume_upper) {
391                                    tracing::trace!(
392                                        worker_id = %source_config.worker_id,
393                                        source_id = %source_config.id,
394                                        ?upper,
395                                        ?resume_upper,
396                                        "ignoring progress updates before resume_upper");
397                                    continue;
398                                }
399
400                                // Disable partial drain, because this progress
401                                // update has moved the frontier. We might allow
402                                // it again once we receive data right at the
403                                // frontier again.
404                                partial_drain_time = None;
405                                input_upper = upper;
406                            }
407                        }
408
409                        events_processed += 1;
410                        if let Some(max) = snapshot_buffering_max {
411                            if events_processed >= max {
412                                break;
413                            }
414                        }
415                    }
416                }
417            };
418
419            // While we have partially ingested updates of a timestamp our state
420            // is in an inconsistent/consolidating state and accessing it would
421            // panic.
422            if let Some(largest_seen_persist_ts) = largest_seen_persist_ts.as_ref() {
423                let largest_seen_outer_persist_ts = largest_seen_persist_ts.clone().to_outer();
424                let outer_persist_upper = persist_upper.iter().map(|ts| ts.clone().to_outer());
425                let outer_persist_upper = Antichain::from_iter(outer_persist_upper);
426                if outer_persist_upper.less_equal(&largest_seen_outer_persist_ts) {
427                    continue;
428                }
429            }
430
431            // We try and drain from our stash every time we go through the
432            // loop. More of our stash can become eligible for draining both
433            // when the source-input frontier advances or when the persist
434            // frontier advances.
435            if !stash.is_empty() {
436                let cap = stash_cap.as_mut().expect("missing capability for non-empty stash");
437
438                tracing::trace!(
439                    worker_id = %source_config.worker_id,
440                    source_id = %source_config.id,
441                    ?cap,
442                    ?stash,
443                    "stashed updates");
444
445                let mut min_remaining_time = drain_staged_input::<_, G, _, _, _>(
446                    &mut stash,
447                    &mut commands_state,
448                    &mut output_updates,
449                    &mut multi_get_scratch,
450                    DrainStyle::ToUpper{input_upper: &input_upper, persist_upper: &persist_upper},
451                    &mut error_emitter,
452                    &mut state,
453                    &source_config,
454                )
455                .await;
456
457                tracing::trace!(
458                    worker_id = %source_config.worker_id,
459                    source_id = %source_config.id,
460                    output_updates = %output_updates.len(),
461                    "output updates for complete timestamp");
462
463                for (update, ts, diff) in output_updates.drain(..) {
464                    output_handle.give(cap, (update, ts, diff));
465                }
466
467                if !stash.is_empty() {
468                    let min_remaining_time = min_remaining_time.take().expect("we still have updates left");
469                    cap.downgrade(&min_remaining_time);
470                } else {
471                    stash_cap = None;
472                }
473            }
474
475
476            if input_upper.is_empty() {
477                tracing::debug!(
478                    worker_id = %source_config.worker_id,
479                    source_id = %source_config.id,
480                    "input exhausted, shutting down");
481                break;
482            };
483
484            // If there were staged events that occurred at the capability time, drain
485            // them. This is safe because out-of-order updates to the same key that are
486            // drained in separate calls to `drain_staged_input` are correctly ordered by
487            // their `FromTime` in `drain_staged_input`.
488            //
489            // Note also that this may result in more updates in the output collection than
490            // the minimum. However, because the frontier only advances on `Progress` updates,
491            // the collection always accumulates correctly for all keys.
492            if let Some(partial_drain_time) = &partial_drain_time {
493                if !stash.is_empty() {
494                    let cap = stash_cap.as_mut().expect("missing capability for non-empty stash");
495
496                    tracing::trace!(
497                        worker_id = %source_config.worker_id,
498                        source_id = %source_config.id,
499                        ?cap,
500                        ?stash,
501                        "stashed updates");
502
503                    let mut min_remaining_time = drain_staged_input::<_, G, _, _, _>(
504                        &mut stash,
505                        &mut commands_state,
506                        &mut output_updates,
507                        &mut multi_get_scratch,
508                        DrainStyle::AtTime{
509                            time: partial_drain_time.clone(),
510                            persist_upper: &persist_upper
511                        },
512                        &mut error_emitter,
513                        &mut state,
514                        &source_config,
515                    )
516                    .await;
517
518                    tracing::trace!(
519                        worker_id = %source_config.worker_id,
520                        source_id = %source_config.id,
521                        output_updates = %output_updates.len(),
522                        "output updates for partial timestamp");
523
524                    for (update, ts, diff) in output_updates.drain(..) {
525                        output_handle.give(cap, (update, ts, diff));
526                    }
527
528                    if !stash.is_empty() {
529                        let min_remaining_time = min_remaining_time.take().expect("we still have updates left");
530                        cap.downgrade(&min_remaining_time);
531                    } else {
532                        stash_cap = None;
533                    }
534                }
535            }
536        }
537    });
538
539    (
540        output.as_collection().map(|result| match result {
541            Ok(ok) => Ok(ok),
542            Err(err) => Err(DataflowError::from(EnvelopeError::Upsert(*err))),
543        }),
544        health_stream,
545        snapshot_stream,
546        shutdown_button.press_on_drop(),
547    )
548}
549
550/// Helper method for [`upsert_inner`] used to stage `data` updates
551/// from the input/source timely edge.
552#[allow(clippy::disallowed_types)]
553fn stage_input<T, FromTime>(
554    stash: &mut Vec<(T, UpsertKey, Reverse<FromTime>, Option<UpsertValue>)>,
555    data: &mut Vec<((UpsertKey, Option<UpsertValue>, FromTime), T, Diff)>,
556    input_upper: &Antichain<T>,
557    resume_upper: &Antichain<T>,
558) where
559    T: PartialOrder + timely::progress::Timestamp,
560    FromTime: Ord,
561{
562    if PartialOrder::less_equal(input_upper, resume_upper) {
563        data.retain(|(_, ts, _)| resume_upper.less_equal(ts));
564    }
565
566    stash.extend(data.drain(..).map(|((key, value, order), time, diff)| {
567        assert!(diff.is_positive(), "invalid upsert input");
568        (time, key, Reverse(order), value)
569    }));
570}
571
572/// The style of drain we are performing on the stash. `AtTime`-drains cannot
573/// assume that all values have been seen, and must leave tombstones behind for deleted values.
574#[derive(Debug)]
575enum DrainStyle<'a, T> {
576    ToUpper {
577        input_upper: &'a Antichain<T>,
578        persist_upper: &'a Antichain<T>,
579    },
580    // For partial draining when taking the source snapshot.
581    AtTime {
582        time: T,
583        persist_upper: &'a Antichain<T>,
584    },
585}
586
587/// Helper method for [`upsert_inner`] used to stage `data` updates
588/// from the input timely edge.
589///
590/// Returns the minimum observed time across the updates that remain in the
591/// stash or `None` if none are left.
592///
593/// ## Correctness
594///
595/// It is safe to call this function multiple times with the same `persist_upper` provided that the
596/// drain style is `AtTime`, which updates the state such that past actions are remembered and can
597/// be undone in subsequent calls.
598///
599/// It is *not* safe to call this function more than once with the same `persist_upper` and a
600/// `ToUpper` drain style. Doing so causes all calls except the first one to base their work on
601/// stale state, since in this drain style no modifications to the state are made.
602async fn drain_staged_input<S, G, T, FromTime, E>(
603    stash: &mut Vec<(T, UpsertKey, Reverse<FromTime>, Option<UpsertValue>)>,
604    commands_state: &mut indexmap::IndexMap<UpsertKey, UpsertValueAndSize<T, FromTime>>,
605    output_updates: &mut Vec<(UpsertValue, T, Diff)>,
606    multi_get_scratch: &mut Vec<UpsertKey>,
607    drain_style: DrainStyle<'_, T>,
608    error_emitter: &mut E,
609    state: &mut UpsertState<'_, S, T, FromTime>,
610    source_config: &crate::source::SourceExportCreationConfig,
611) -> Option<T>
612where
613    S: UpsertStateBackend<T, FromTime>,
614    G: Scope,
615    T: TotalOrder + timely::ExchangeData + Debug + Ord + Sync,
616    FromTime: timely::ExchangeData + Ord + Sync,
617    E: UpsertErrorEmitter<G>,
618{
619    let mut min_remaining_time = Antichain::new();
620
621    let mut eligible_updates = stash
622        .extract_if(.., |(ts, _, _, _)| {
623            let eligible = match &drain_style {
624                DrainStyle::ToUpper {
625                    input_upper,
626                    persist_upper,
627                } => {
628                    // We make sure that a) we only process updates when we know their
629                    // timestamp is complete, that is there will be no more updates for
630                    // that timestamp, and b) that "previous" times in the persist
631                    // input are complete. The latter makes sure that we emit updates
632                    // for the next timestamp that are consistent with the global state
633                    // in the output persist shard, which also serves as a persistent
634                    // copy of our in-memory/on-disk upsert state.
635                    !input_upper.less_equal(ts) && !persist_upper.less_than(ts)
636                }
637                DrainStyle::AtTime {
638                    time,
639                    persist_upper,
640                } => {
641                    // Even when emitting partial updates, we still need to wait
642                    // until "previous" times in the persist input are complete.
643                    *ts <= *time && !persist_upper.less_than(ts)
644                }
645            };
646
647            if !eligible {
648                min_remaining_time.insert(ts.clone());
649            }
650
651            eligible
652        })
653        .filter(|(ts, _, _, _)| {
654            let persist_upper = match &drain_style {
655                DrainStyle::ToUpper {
656                    input_upper: _,
657                    persist_upper,
658                } => persist_upper,
659                DrainStyle::AtTime {
660                    time: _,
661                    persist_upper,
662                } => persist_upper,
663            };
664
665            // Any update that is "in the past" of the persist upper is not
666            // relevant anymore. We _can_ emit changes for it, but the
667            // downstream persist_sink would filter these updates out because
668            // the shard upper is already further ahead.
669            //
670            // Plus, our upsert state is up-to-date to the persist_upper, so we
671            // wouldn't be able to emit correct retractions for incoming
672            // commands whose `ts` is in the past of that.
673            let relevant = persist_upper.less_equal(ts);
674            relevant
675        })
676        .collect_vec();
677
678    tracing::debug!(
679        worker_id = %source_config.worker_id,
680        source_id = %source_config.id,
681        ?drain_style,
682        remaining = %stash.len(),
683        eligible = eligible_updates.len(),
684        "draining stash");
685
686    // Sort the eligible updates by (key, time, Reverse(from_time)) so that
687    // deduping by (key, time) gives the latest change for that key.
688    eligible_updates.sort_unstable_by(|a, b| {
689        let (ts1, key1, from_ts1, val1) = a;
690        let (ts2, key2, from_ts2, val2) = b;
691        Ord::cmp(&(ts1, key1, from_ts1, val1), &(ts2, key2, from_ts2, val2))
692    });
693
694    // Read the previous values _per key_ out of `state`, recording it
695    // along with the value with the _latest timestamp for that key_.
696    commands_state.clear();
697    for (_, key, _, _) in eligible_updates.iter() {
698        commands_state.entry(*key).or_default();
699    }
700
701    // These iterators iterate in the same order because `commands_state`
702    // is an `IndexMap`.
703    multi_get_scratch.clear();
704    multi_get_scratch.extend(commands_state.iter().map(|(k, _)| *k));
705    match state
706        .multi_get(multi_get_scratch.drain(..), commands_state.values_mut())
707        .await
708    {
709        Ok(_) => {}
710        Err(e) => {
711            error_emitter
712                .emit("Failed to fetch records from state".to_string(), e)
713                .await;
714        }
715    }
716
717    // From the prefix that can be emitted we can deduplicate based on (ts, key) in
718    // order to only process the command with the maximum order within the (ts,
719    // key) group. This is achieved by wrapping order in `Reverse(FromTime)` above.;
720    let mut commands = eligible_updates.into_iter().dedup_by(|a, b| {
721        let ((a_ts, a_key, _, _), (b_ts, b_key, _, _)) = (a, b);
722        a_ts == b_ts && a_key == b_key
723    });
724
725    let bincode_opts = upsert_types::upsert_bincode_opts();
726    // Upsert the values into `commands_state`, by recording the latest
727    // value (or deletion). These will be synced at the end to the `state`.
728    //
729    // Note that we are effectively doing "mini-upsert" here, using
730    // `command_state`. This "mini-upsert" is seeded with data from `state`, using
731    // a single `multi_get` above, and the final state is written out into
732    // `state` using a single `multi_put`. This simplifies `UpsertStateBackend`
733    // implementations, and reduces the number of reads and write we need to do.
734    //
735    // This "mini-upsert" technique is actually useful in `UpsertState`'s
736    // `consolidate_snapshot_read_write_inner` implementation, minimizing gets and puts on
737    // the `UpsertStateBackend` implementations. In some sense, its "upsert all the way down".
738    while let Some((ts, key, from_time, value)) = commands.next() {
739        let mut command_state = if let Entry::Occupied(command_state) = commands_state.entry(key) {
740            command_state
741        } else {
742            panic!("key missing from commands_state");
743        };
744
745        let existing_state_cell = &mut command_state.get_mut().value;
746
747        if let Some(cs) = existing_state_cell.as_mut() {
748            cs.ensure_decoded(bincode_opts, source_config.id);
749        }
750
751        // Skip this command if its order key is below the one in the upsert state.
752        // Note that the existing order key may be `None` if the existing value
753        // is from snapshotting, which always sorts below new values/deletes.
754        let existing_order = existing_state_cell
755            .as_ref()
756            .and_then(|cs| cs.provisional_order(&ts));
757        if existing_order >= Some(&from_time.0) {
758            // Skip this update. If no later updates adjust this key, then we just
759            // end up writing the same value back to state. If there
760            // is nothing in the state, `existing_order` is `None`, and this
761            // does not occur.
762            continue;
763        }
764
765        match value {
766            Some(value) => {
767                if let Some(old_value) = existing_state_cell.as_ref() {
768                    if let Some(old_value) = old_value.provisional_value_ref(&ts) {
769                        output_updates.push((old_value.clone(), ts.clone(), Diff::MINUS_ONE));
770                    }
771                }
772
773                match &drain_style {
774                    DrainStyle::AtTime { .. } => {
775                        let existing_value = existing_state_cell.take();
776
777                        let new_value = match existing_value {
778                            Some(existing_value) => existing_value.clone().into_provisional_value(
779                                value.clone(),
780                                ts.clone(),
781                                from_time.0.clone(),
782                            ),
783                            None => StateValue::new_provisional_value(
784                                value.clone(),
785                                ts.clone(),
786                                from_time.0.clone(),
787                            ),
788                        };
789
790                        existing_state_cell.replace(new_value);
791                    }
792                    DrainStyle::ToUpper { .. } => {
793                        // Not writing down provisional values, or anything.
794                    }
795                };
796
797                output_updates.push((value, ts, Diff::ONE));
798            }
799            None => {
800                if let Some(old_value) = existing_state_cell.as_ref() {
801                    if let Some(old_value) = old_value.provisional_value_ref(&ts) {
802                        output_updates.push((old_value.clone(), ts.clone(), Diff::MINUS_ONE));
803                    }
804                }
805
806                match &drain_style {
807                    DrainStyle::AtTime { .. } => {
808                        let existing_value = existing_state_cell.take();
809
810                        let new_value = match existing_value {
811                            Some(existing_value) => existing_value
812                                .into_provisional_tombstone(ts.clone(), from_time.0.clone()),
813                            None => StateValue::new_provisional_tombstone(
814                                ts.clone(),
815                                from_time.0.clone(),
816                            ),
817                        };
818
819                        existing_state_cell.replace(new_value);
820                    }
821                    DrainStyle::ToUpper { .. } => {
822                        // Not writing down provisional values, or anything.
823                    }
824                }
825            }
826        }
827    }
828
829    match &drain_style {
830        DrainStyle::AtTime { .. } => {
831            match state
832                .multi_put(
833                    // We don't want to update per-record stats, like size of
834                    // records indexed or count of records indexed.
835                    //
836                    // We only add provisional values and these will be
837                    // overwritten once we receive updates for state from the
838                    // persist input. And the merge functionality cannot know
839                    // what was in state before merging, so it cannot correctly
840                    // retract/update stats added here.
841                    //
842                    // Mostly, the merge functionality can't update those stats
843                    // because merging happens in a function that we pass to
844                    // rocksdb which doesn't have access to any external
845                    // context. And in general, with rocksdb we do blind writes
846                    // rather than inspect what was there before when
847                    // updating/inserting.
848                    false,
849                    commands_state.drain(..).map(|(k, cv)| {
850                        (
851                            k,
852                            upsert_types::PutValue {
853                                value: cv.value.map(|cv| cv.into_decoded()),
854                                previous_value_metadata: cv.metadata.map(|v| ValueMetadata {
855                                    size: v.size.try_into().expect("less than i64 size"),
856                                    is_tombstone: v.is_tombstone,
857                                }),
858                            },
859                        )
860                    }),
861                )
862                .await
863            {
864                Ok(_) => {}
865                Err(e) => {
866                    error_emitter
867                        .emit("Failed to update records in state".to_string(), e)
868                        .await;
869                }
870            }
871        }
872        style => {
873            tracing::trace!(
874                worker_id = %source_config.worker_id,
875                source_id = %source_config.id,
876                "not doing state update for drain style {:?}", style);
877        }
878    }
879
880    min_remaining_time.into_option()
881}
882
883#[cfg(test)]
884mod test {
885    use mz_ore::metrics::MetricsRegistry;
886    use mz_persist_types::ShardId;
887    use mz_repr::{Datum, Timestamp as MzTimestamp};
888    use mz_rocksdb::{RocksDBConfig, ValueIterator};
889    use mz_storage_operators::persist_source::Subtime;
890    use mz_storage_types::sources::SourceEnvelope;
891    use mz_storage_types::sources::envelope::{KeyEnvelope, UpsertEnvelope, UpsertStyle};
892    use rocksdb::Env;
893    use timely::dataflow::operators::capture::Extract;
894    use timely::dataflow::operators::{Capture, Input, Probe};
895    use timely::progress::Timestamp;
896
897    use crate::metrics::StorageMetrics;
898    use crate::metrics::upsert::UpsertMetricDefs;
899    use crate::source::SourceExportCreationConfig;
900    use crate::statistics::{SourceStatistics, SourceStatisticsMetricDefs};
901    use crate::upsert::memory::InMemoryHashMap;
902    use crate::upsert::types::{BincodeOpts, consolidating_merge_function, upsert_bincode_opts};
903
904    use super::*;
905
906    #[mz_ore::test]
907    #[cfg_attr(miri, ignore)]
908    fn gh_9160_repro() {
909        // Helper to wrap timestamps in the appropriate types
910        let new_ts = |ts| (MzTimestamp::new(ts), Subtime::minimum());
911
912        let output_handle = timely::execute_directly(move |worker| {
913            let (mut input_handle, mut persist_handle, output_handle) = worker
914                .dataflow::<MzTimestamp, _, _>(|scope| {
915                    // Enter a subscope since the upsert operator expects to work a backpressure
916                    // enabled scope.
917                    scope.scoped::<(MzTimestamp, Subtime), _, _>("upsert", |scope| {
918                        let (input_handle, input) = scope.new_input();
919                        let (persist_handle, persist_input) = scope.new_input();
920                        let upsert_config = UpsertConfig {
921                            shrink_upsert_unused_buffers_by_ratio: 0,
922                        };
923                        let source_id = GlobalId::User(0);
924                        let metrics_registry = MetricsRegistry::new();
925                        let upsert_metrics_defs =
926                            UpsertMetricDefs::register_with(&metrics_registry);
927                        let upsert_metrics =
928                            UpsertMetrics::new(&upsert_metrics_defs, source_id, 0, None);
929
930                        let metrics_registry = MetricsRegistry::new();
931                        let storage_metrics = StorageMetrics::register_with(&metrics_registry);
932
933                        let metrics_registry = MetricsRegistry::new();
934                        let source_statistics_defs =
935                            SourceStatisticsMetricDefs::register_with(&metrics_registry);
936                        let envelope = SourceEnvelope::Upsert(UpsertEnvelope {
937                            source_arity: 2,
938                            style: UpsertStyle::Default(KeyEnvelope::Flattened),
939                            key_indices: vec![0],
940                        });
941                        let source_statistics = SourceStatistics::new(
942                            source_id,
943                            0,
944                            &source_statistics_defs,
945                            source_id,
946                            &ShardId::new(),
947                            envelope,
948                            Antichain::from_elem(Timestamp::minimum()),
949                        );
950
951                        let source_config = SourceExportCreationConfig {
952                            id: GlobalId::User(0),
953                            worker_id: 0,
954                            metrics: storage_metrics,
955                            source_statistics,
956                        };
957
958                        let (output, _, _, button) = upsert_inner(
959                            &input.as_collection(),
960                            vec![0],
961                            Antichain::from_elem(Timestamp::minimum()),
962                            persist_input.as_collection(),
963                            None,
964                            upsert_metrics,
965                            source_config,
966                            || async { InMemoryHashMap::default() },
967                            upsert_config,
968                            true,
969                            None,
970                        );
971                        std::mem::forget(button);
972
973                        (input_handle, persist_handle, output.inner.capture())
974                    })
975                });
976
977            // We work with a hypothetical schema of (key int, value int).
978
979            // The input will contain records for two keys, 0 and 1.
980            let key0 = UpsertKey::from_key(Ok(&Row::pack_slice(&[Datum::Int64(0)])));
981            let key1 = UpsertKey::from_key(Ok(&Row::pack_slice(&[Datum::Int64(1)])));
982
983            // We will assume that the kafka topic contains the following messages with their
984            // associated reclocked timestamp:
985            //  1. {offset=1, key=0, value=0}    @ mz_time = 0
986            //  2. {offset=2, key=1, value=NULL} @ mz_time = 2  // <- deletion of unrelated key. Causes the operator
987            //                                                  //    to maintain the associated cap to time 2
988            //  3. {offset=3, key=0, value=1}    @ mz_time = 3
989            //  4. {offset=4, key=0, value=2}    @ mz_time = 3  // <- messages 2 and 3 are reclocked to time 3
990            let value1 = Row::pack_slice(&[Datum::Int64(0), Datum::Int64(0)]);
991            let value3 = Row::pack_slice(&[Datum::Int64(0), Datum::Int64(1)]);
992            let value4 = Row::pack_slice(&[Datum::Int64(0), Datum::Int64(2)]);
993            let msg1 = (key0, Some(Ok(value1.clone())), 1);
994            let msg2 = (key1, None, 2);
995            let msg3 = (key0, Some(Ok(value3)), 3);
996            let msg4 = (key0, Some(Ok(value4)), 4);
997
998            // The first message will initialize the upsert state such that key 0 has value 0 and
999            // produce an output update to that effect.
1000            input_handle.send((msg1, new_ts(0), Diff::ONE));
1001            input_handle.advance_to(new_ts(2));
1002            worker.step();
1003
1004            // We assume this worker succesfully CAAs the update to the shard so we send it back
1005            // through the persist_input
1006            persist_handle.send((Ok(value1), new_ts(0), Diff::ONE));
1007            persist_handle.advance_to(new_ts(1));
1008            worker.step();
1009
1010            // Then, messages 2 and 3 are sent as one batch with capability = 2
1011            input_handle.send_batch(&mut vec![
1012                (msg2, new_ts(2), Diff::ONE),
1013                (msg3, new_ts(3), Diff::ONE),
1014            ]);
1015            // Advance our capability to 3
1016            input_handle.advance_to(new_ts(3));
1017            // Message 4 is sent with capability 3
1018            input_handle.send_batch(&mut vec![(msg4, new_ts(3), Diff::ONE)]);
1019            // Advance our capability to 4
1020            input_handle.advance_to(new_ts(4));
1021            // We now step the worker so that the pending data is received. This causes the
1022            // operator to store internally the following map from capabilities to updates:
1023            // cap=2 => [ msg2, msg3 ]
1024            // cap=3 => [ msg4 ]
1025            worker.step();
1026
1027            // We now assume that another replica raced us and processed msg1 at time 2, which in
1028            // this test is a no-op so the persist frontier advances to time 3 without new data.
1029            persist_handle.advance_to(new_ts(3));
1030            // We now step this worker again, which will notice that the persist upper is {3} and
1031            // wlil attempt to process msg3 and msg4 *separately*, causing it to produce a double
1032            // retraction.
1033            worker.step();
1034
1035            output_handle
1036        });
1037
1038        let mut actual_output = output_handle
1039            .extract()
1040            .into_iter()
1041            .flat_map(|(_cap, container)| container)
1042            .collect();
1043        differential_dataflow::consolidation::consolidate_updates(&mut actual_output);
1044
1045        // The expected consolidated output contains only updates for key 0 which has the value 0
1046        // at timestamp 0 and the value 2 at timestamp 3
1047        let value1 = Row::pack_slice(&[Datum::Int64(0), Datum::Int64(0)]);
1048        let value4 = Row::pack_slice(&[Datum::Int64(0), Datum::Int64(2)]);
1049        let expected_output: Vec<(Result<Row, DataflowError>, _, _)> = vec![
1050            (Ok(value1.clone()), new_ts(0), Diff::ONE),
1051            (Ok(value1), new_ts(3), Diff::MINUS_ONE),
1052            (Ok(value4), new_ts(3), Diff::ONE),
1053        ];
1054        assert_eq!(actual_output, expected_output);
1055    }
1056
1057    #[mz_ore::test]
1058    #[cfg_attr(miri, ignore)]
1059    fn gh_9540_repro() {
1060        // Helper to wrap timestamps in the appropriate types
1061        let mz_ts = |ts| (MzTimestamp::new(ts), Subtime::minimum());
1062
1063        let rocksdb_dir = tempfile::tempdir().unwrap();
1064        let output_handle = timely::execute_directly(move |worker| {
1065            let (mut input_handle, mut persist_handle, output_probe, output_handle) =
1066                worker.dataflow::<MzTimestamp, _, _>(|scope| {
1067                    // Enter a subscope since the upsert operator expects to work a backpressure
1068                    // enabled scope.
1069                    scope.scoped::<(MzTimestamp, Subtime), _, _>("upsert", |scope| {
1070                        let (input_handle, input) = scope.new_input();
1071                        let (persist_handle, persist_input) = scope.new_input();
1072                        let upsert_config = UpsertConfig {
1073                            shrink_upsert_unused_buffers_by_ratio: 0,
1074                        };
1075                        let source_id = GlobalId::User(0);
1076                        let metrics_registry = MetricsRegistry::new();
1077                        let upsert_metrics_defs =
1078                            UpsertMetricDefs::register_with(&metrics_registry);
1079                        let upsert_metrics =
1080                            UpsertMetrics::new(&upsert_metrics_defs, source_id, 0, None);
1081                        let rocksdb_shared_metrics = Arc::clone(&upsert_metrics.rocksdb_shared);
1082                        let rocksdb_instance_metrics =
1083                            Arc::clone(&upsert_metrics.rocksdb_instance_metrics);
1084
1085                        let metrics_registry = MetricsRegistry::new();
1086                        let storage_metrics = StorageMetrics::register_with(&metrics_registry);
1087
1088                        let metrics_registry = MetricsRegistry::new();
1089                        let source_statistics_defs =
1090                            SourceStatisticsMetricDefs::register_with(&metrics_registry);
1091                        let envelope = SourceEnvelope::Upsert(UpsertEnvelope {
1092                            source_arity: 2,
1093                            style: UpsertStyle::Default(KeyEnvelope::Flattened),
1094                            key_indices: vec![0],
1095                        });
1096                        let source_statistics = SourceStatistics::new(
1097                            source_id,
1098                            0,
1099                            &source_statistics_defs,
1100                            source_id,
1101                            &ShardId::new(),
1102                            envelope,
1103                            Antichain::from_elem(Timestamp::minimum()),
1104                        );
1105
1106                        let source_config = SourceExportCreationConfig {
1107                            id: GlobalId::User(0),
1108                            worker_id: 0,
1109                            metrics: storage_metrics,
1110                            source_statistics,
1111                        };
1112
1113                        // A closure that will initialize and return a configured RocksDB instance
1114                        let rocksdb_init_fn = move || async move {
1115                            let merge_operator = Some((
1116                                "upsert_state_snapshot_merge_v1".to_string(),
1117                                |a: &[u8],
1118                                 b: ValueIterator<
1119                                    BincodeOpts,
1120                                    StateValue<(MzTimestamp, Subtime), u64>,
1121                                >| {
1122                                    consolidating_merge_function::<(MzTimestamp, Subtime), u64>(
1123                                        a.into(),
1124                                        b,
1125                                    )
1126                                },
1127                            ));
1128                            let rocksdb_cleanup_tries = 5;
1129                            let tuning = RocksDBConfig::new(Default::default(), None);
1130                            crate::upsert::rocksdb::RocksDB::new(
1131                                mz_rocksdb::RocksDBInstance::new(
1132                                    rocksdb_dir.path(),
1133                                    mz_rocksdb::InstanceOptions::new(
1134                                        Env::mem_env().unwrap(),
1135                                        rocksdb_cleanup_tries,
1136                                        merge_operator,
1137                                        // For now, just use the same config as the one used for
1138                                        // merging snapshots.
1139                                        upsert_bincode_opts(),
1140                                    ),
1141                                    tuning,
1142                                    rocksdb_shared_metrics,
1143                                    rocksdb_instance_metrics,
1144                                )
1145                                .unwrap(),
1146                            )
1147                        };
1148
1149                        let (output, _, _, button) = upsert_inner(
1150                            &input.as_collection(),
1151                            vec![0],
1152                            Antichain::from_elem(Timestamp::minimum()),
1153                            persist_input.as_collection(),
1154                            None,
1155                            upsert_metrics,
1156                            source_config,
1157                            rocksdb_init_fn,
1158                            upsert_config,
1159                            true,
1160                            None,
1161                        );
1162                        std::mem::forget(button);
1163
1164                        (
1165                            input_handle,
1166                            persist_handle,
1167                            output.inner.probe(),
1168                            output.inner.capture(),
1169                        )
1170                    })
1171                });
1172
1173            // We work with a hypothetical schema of (key int, value int).
1174
1175            // The input will contain records for two keys, 0 and 1.
1176            let key0 = UpsertKey::from_key(Ok(&Row::pack_slice(&[Datum::Int64(0)])));
1177
1178            // We will assume that the kafka topic contains the following messages with their
1179            // associated reclocked timestamp:
1180            //  1. {offset=1, key=0, value=0}    @ mz_time = 0
1181            //  2. {offset=2, key=0, value=NULL} @ mz_time = 1
1182            //  3. {offset=3, key=0, value=0}    @ mz_time = 2
1183            //  4. {offset=4, key=0, value=NULL} @ mz_time = 2  // <- messages 3 and 4 are *BOTH* reclocked to time 2
1184            let value1 = Row::pack_slice(&[Datum::Int64(0), Datum::Int64(0)]);
1185            let msg1 = ((key0, Some(Ok(value1.clone())), 1), mz_ts(0), Diff::ONE);
1186            let msg2 = ((key0, None, 2), mz_ts(1), Diff::ONE);
1187            let msg3 = ((key0, Some(Ok(value1.clone())), 3), mz_ts(2), Diff::ONE);
1188            let msg4 = ((key0, None, 4), mz_ts(2), Diff::ONE);
1189
1190            // The first message will initialize the upsert state such that key 0 has value 0 and
1191            // produce an output update to that effect.
1192            input_handle.send(msg1);
1193            input_handle.advance_to(mz_ts(1));
1194            while output_probe.less_than(&mz_ts(1)) {
1195                worker.step_or_park(None);
1196            }
1197            // Feedback the produced output..
1198            persist_handle.send((Ok(value1.clone()), mz_ts(0), Diff::ONE));
1199            persist_handle.advance_to(mz_ts(1));
1200            // ..and send the next upsert command that deletes the key.
1201            input_handle.send(msg2);
1202            input_handle.advance_to(mz_ts(2));
1203            while output_probe.less_than(&mz_ts(2)) {
1204                worker.step_or_park(None);
1205            }
1206
1207            // Feedback the produced output..
1208            persist_handle.send((Ok(value1), mz_ts(1), Diff::MINUS_ONE));
1209            persist_handle.advance_to(mz_ts(2));
1210            // ..and send the next *out of order* upsert command that deletes the key. Here msg4
1211            // happens at offset 4 and the operator should rememeber that.
1212            input_handle.send(msg4);
1213            input_handle.flush();
1214            // Run the worker for enough steps to process these events. We can't guide the
1215            // execution with the probe here since the frontier does not advance, only provisional
1216            // updates are produced.
1217            for _ in 0..5 {
1218                worker.step();
1219            }
1220
1221            // Send the missing message that will now confuse the operator because it has lost
1222            // track that for key 0 it has already seen a command for offset 4, and therefore msg3
1223            // should be skipped.
1224            input_handle.send(msg3);
1225            input_handle.flush();
1226            input_handle.advance_to(mz_ts(3));
1227
1228            output_handle
1229        });
1230
1231        let mut actual_output = output_handle
1232            .extract()
1233            .into_iter()
1234            .flat_map(|(_cap, container)| container)
1235            .collect();
1236        differential_dataflow::consolidation::consolidate_updates(&mut actual_output);
1237
1238        // The expected consolidated output contains only updates for key 0 which has the value 0
1239        // at timestamp 0 and the value 2 at timestamp 3
1240        let value1 = Row::pack_slice(&[Datum::Int64(0), Datum::Int64(0)]);
1241        let expected_output: Vec<(Result<Row, DataflowError>, _, _)> = vec![
1242            (Ok(value1.clone()), mz_ts(0), Diff::ONE),
1243            (Ok(value1), mz_ts(1), Diff::MINUS_ONE),
1244        ];
1245        assert_eq!(actual_output, expected_output);
1246    }
1247}