mz_storage/
upsert_continual_feedback.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Implementation of feedback UPSERT operator and associated helpers. See
11//! [`upsert_inner`] for a description of how the operator works and why.
12
13use std::cmp::Reverse;
14// We don't care about the order, but we do want drain().
15#[allow(clippy::disallowed_types)]
16use std::collections::HashMap;
17use std::fmt::Debug;
18use std::sync::Arc;
19
20use differential_dataflow::hashable::Hashable;
21use differential_dataflow::{AsCollection, Collection};
22use indexmap::map::Entry;
23use itertools::Itertools;
24use mz_ore::vec::VecExt;
25use mz_repr::{Diff, GlobalId, Row};
26use mz_storage_types::errors::{DataflowError, EnvelopeError, UpsertError};
27use mz_timely_util::builder_async::{
28    Event as AsyncEvent, OperatorBuilder as AsyncOperatorBuilder, PressOnDropButton,
29};
30use std::convert::Infallible;
31use timely::container::CapacityContainerBuilder;
32use timely::dataflow::channels::pact::Exchange;
33use timely::dataflow::operators::{Capability, CapabilitySet};
34use timely::dataflow::{Scope, Stream};
35use timely::order::{PartialOrder, TotalOrder};
36use timely::progress::timestamp::Refines;
37use timely::progress::{Antichain, Timestamp};
38
39use crate::healthcheck::HealthStatusUpdate;
40use crate::metrics::upsert::UpsertMetrics;
41use crate::upsert::UpsertConfig;
42use crate::upsert::UpsertErrorEmitter;
43use crate::upsert::UpsertKey;
44use crate::upsert::UpsertValue;
45use crate::upsert::types::UpsertValueAndSize;
46use crate::upsert::types::{self as upsert_types, ValueMetadata};
47use crate::upsert::types::{StateValue, UpsertState, UpsertStateBackend};
48
49/// An operator that transforms an input stream of upserts (updates to key-value
50/// pairs), which represents an imaginary key-value state, into a differential
51/// collection. It keeps an internal map-like state which keeps the latest value
52/// for each key, such that it can emit the retractions and additions implied by
53/// a new update for a given key.
54///
55/// This operator is intended to be used in an ingestion pipeline that reads
56/// from an external source, and the output of this operator is eventually
57/// written to persist.
58///
59/// The operator has two inputs: a) the source input, of upserts, and b) a
60/// persist input that feeds back the upsert state to the operator. Below, there
61/// is a section for each input that describes how and why we process updates
62/// from each input.
63///
64/// An important property of this operator is that it does _not_ update the
65/// map-like state that it keeps for translating the stream of upserts into a
66/// differential collection when it processes source input. It _only_ updates
67/// the map-like state based on updates from the persist (feedback) input. We do
68/// this because the operator is expected to be used in cases where there are
69/// multiple concurrent instances of the same ingestion pipeline, and the
70/// different instances might see different input because of concurrency and
71/// non-determinism. All instances of the upsert operator must produce output
72/// that is consistent with the current state of the output (that all instances
73/// produce "collaboratively"). This global state is what the operator
74/// continually learns about via updates from the persist input.
75///
76/// ## Processing the Source Input
77///
78/// Updates on the source input are stashed/staged until they can be processed.
79/// Whether or not an update can be processed depends both on the upper frontier
80/// of the source input and on the upper frontier of the persist input:
81///
82///  - Input updates are only processed once their timestamp is "done", that is
83///  the input upper is no longer `less_equal` their timestamp.
84///
85///  - Input updates are only processed once they are at the persist upper, that
86///  is we have emitted and written down updates for all previous times and we
87///  have updated our map-like state to the latest global state of the output of
88///  the ingestion pipeline. We know this is the case when the persist upper is
89///  no longer `less_than` their timestamp.
90///
91/// As an optimization, we allow processing input updates when they are right at
92/// the input frontier. This is called _partial emission_ because we are
93/// emitting updates that might be retracted when processing more updates from
94/// the same timestamp. In order to be able to process these updates we keep
95/// _provisional values_ in our upsert state. These will be overwritten when we
96/// get the final upsert values on the persist input.
97///
98/// ## Processing the Persist Input
99///
100/// We continually ingest updates from the persist input into our state using
101/// `UpsertState::consolidate_chunk`. We might be ingesting updates from the
102/// initial snapshot (when starting the operator) that are not consolidated or
103/// we might be ingesting updates from a partial emission (see above). In either
104/// case, our input might not be consolidated and `consolidate_chunk` is able to
105/// handle that.
106pub fn upsert_inner<G: Scope, FromTime, F, Fut, US>(
107    input: &Collection<G, (UpsertKey, Option<UpsertValue>, FromTime), Diff>,
108    key_indices: Vec<usize>,
109    resume_upper: Antichain<G::Timestamp>,
110    persist_input: Collection<G, Result<Row, DataflowError>, Diff>,
111    mut persist_token: Option<Vec<PressOnDropButton>>,
112    upsert_metrics: UpsertMetrics,
113    source_config: crate::source::SourceExportCreationConfig,
114    state_fn: F,
115    upsert_config: UpsertConfig,
116    prevent_snapshot_buffering: bool,
117    snapshot_buffering_max: Option<usize>,
118) -> (
119    Collection<G, Result<Row, DataflowError>, Diff>,
120    Stream<G, (Option<GlobalId>, HealthStatusUpdate)>,
121    Stream<G, Infallible>,
122    PressOnDropButton,
123)
124where
125    G::Timestamp: Refines<mz_repr::Timestamp> + TotalOrder + Sync,
126    F: FnOnce() -> Fut + 'static,
127    Fut: std::future::Future<Output = US>,
128    US: UpsertStateBackend<G::Timestamp, Option<FromTime>>,
129    FromTime: Debug + timely::ExchangeData + Ord + Sync,
130{
131    let mut builder = AsyncOperatorBuilder::new("Upsert".to_string(), input.scope());
132
133    // We only care about UpsertValueError since this is the only error that we can retract
134    let persist_input = persist_input.flat_map(move |result| {
135        let value = match result {
136            Ok(ok) => Ok(ok),
137            Err(DataflowError::EnvelopeError(err)) => match *err {
138                EnvelopeError::Upsert(err) => Err(err),
139                _ => return None,
140            },
141            Err(_) => return None,
142        };
143        Some((UpsertKey::from_value(value.as_ref(), &key_indices), value))
144    });
145    let (output_handle, output) = builder.new_output();
146
147    // An output that just reports progress of the snapshot consolidation process upstream to the
148    // persist source to ensure that backpressure is applied
149    let (_snapshot_handle, snapshot_stream) =
150        builder.new_output::<CapacityContainerBuilder<Vec<Infallible>>>();
151
152    let (mut health_output, health_stream) = builder.new_output();
153    let mut input = builder.new_input_for(
154        &input.inner,
155        Exchange::new(move |((key, _, _), _, _)| UpsertKey::hashed(key)),
156        &output_handle,
157    );
158
159    let mut persist_input = builder.new_disconnected_input(
160        &persist_input.inner,
161        Exchange::new(|((key, _), _, _)| UpsertKey::hashed(key)),
162    );
163
164    let upsert_shared_metrics = Arc::clone(&upsert_metrics.shared);
165
166    let shutdown_button = builder.build(move |caps| async move {
167        let [mut output_cap, snapshot_cap, health_cap]: [_; 3] = caps.try_into().unwrap();
168        let mut snapshot_cap = CapabilitySet::from_elem(snapshot_cap);
169
170        // The order key of the `UpsertState` is `Option<FromTime>`, which implements `Default`
171        // (as required for `consolidate_chunk`), with slightly more efficient serialization
172        // than a default `Partitioned`.
173
174        let mut state = UpsertState::<_, G::Timestamp, Option<FromTime>>::new(
175            state_fn().await,
176            upsert_shared_metrics,
177            &upsert_metrics,
178            source_config.source_statistics.clone(),
179            upsert_config.shrink_upsert_unused_buffers_by_ratio,
180        );
181
182        // True while we're still reading the initial "snapshot" (a whole bunch
183        // of updates, all at the same initial timestamp) from our persist
184        // input or while we're reading the initial snapshot from the upstream
185        // source.
186        let mut hydrating = true;
187
188        // A re-usable buffer of changes, per key. This is an `IndexMap` because it has to be `drain`-able
189        // and have a consistent iteration order.
190        let mut commands_state: indexmap::IndexMap<_, upsert_types::UpsertValueAndSize<G::Timestamp, Option<FromTime>>> =
191            indexmap::IndexMap::new();
192        let mut multi_get_scratch = Vec::new();
193
194        // For stashing source input while it's not eligible for processing.
195        // We don't care about the order, but we do want drain().
196        #[allow(clippy::disallowed_types)]
197        let mut stash = HashMap::new();
198        let mut input_upper = Antichain::from_elem(Timestamp::minimum());
199        let mut partial_drain_time = None;
200
201        // For our persist/feedback input, both of these.
202        let mut persist_stash = vec![];
203        let mut persist_upper = Antichain::from_elem(Timestamp::minimum());
204
205        // We keep track of the largest timestamp seen on the persist input so
206        // that we can block processing source input while that timestamp is
207        // beyond the persist frontier. While ingesting updates of a timestamp,
208        // our upsert state is in a consolidating state, and trying to read it
209        // at that time would yield a panic.
210        //
211        // NOTE(aljoscha): You would think that it cannot happen that we even
212        // attempt to process source updates while the state is in a
213        // consolidating state, because we always wait until the persist
214        // frontier "catches up" with the timestamp of the source input. If
215        // there is only this here UPSERT operator and no concurrent instances,
216        // this is true. But with concurrent instances it can happen that an
217        // operator that is faster than us makes it so updates get written to
218        // persist. And we would then be ingesting them.
219        let mut largest_seen_persist_ts: Option<G::Timestamp> = None;
220
221        // A buffer for our output.
222        let mut output_updates = vec![];
223
224        let mut error_emitter = (&mut health_output, &health_cap);
225
226
227        loop {
228            tokio::select! {
229                _ = persist_input.ready() => {
230                    // Read away as much input as we can.
231                    while let Some(persist_event) = persist_input.next_sync() {
232                        match persist_event {
233                            AsyncEvent::Data(time, data) => {
234                                tracing::trace!(
235                                    worker_id = %source_config.worker_id,
236                                    source_id = %source_config.id,
237                                    time=?time,
238                                    updates=%data.len(),
239                                    "received persist data");
240
241                                persist_stash.extend(data.into_iter().map(|((key, value), ts, diff)| {
242                                    largest_seen_persist_ts = std::cmp::max(largest_seen_persist_ts.clone(), Some(ts.clone()));
243                                    (key, value, ts, diff)
244                                }));
245                            }
246                            AsyncEvent::Progress(upper) => {
247                                tracing::trace!(
248                                    worker_id = %source_config.worker_id,
249                                    source_id = %source_config.id,
250                                    ?upper,
251                                    "received persist progress");
252                                persist_upper = upper;
253                            }
254                        }
255                    }
256
257                    let last_rehydration_chunk =
258                        hydrating && PartialOrder::less_equal(&resume_upper, &persist_upper);
259
260                    tracing::debug!(
261                        worker_id = %source_config.worker_id,
262                        source_id = %source_config.id,
263                        persist_stash = %persist_stash.len(),
264                        %hydrating,
265                        %last_rehydration_chunk,
266                        ?resume_upper,
267                        ?persist_upper,
268                        "ingesting persist snapshot chunk");
269
270                    let persist_stash_iter = persist_stash
271                        .drain(..)
272                        .map(|(key, val, _ts, diff)| (key, val, diff));
273
274                    match state
275                        .consolidate_chunk(
276                            persist_stash_iter,
277                            last_rehydration_chunk,
278                        )
279                        .await
280                    {
281                        Ok(_) => {}
282                        Err(e) => {
283                            // Make sure our persist source can shut down.
284                            persist_token.take();
285                            snapshot_cap.downgrade(&[]);
286                            UpsertErrorEmitter::<G>::emit(
287                                &mut error_emitter,
288                                "Failed to rehydrate state".to_string(),
289                                e,
290                            )
291                            .await;
292                        }
293                    }
294
295                    tracing::debug!(
296                        worker_id = %source_config.worker_id,
297                        source_id = %source_config.id,
298                        ?resume_upper,
299                        ?persist_upper,
300                        "downgrading snapshot cap",
301                    );
302
303                    // Only downgrade this _after_ ingesting the data, because
304                    // that can actually take quite some time, and we don't want
305                    // to announce that we're done ingesting the initial
306                    // snapshot too early.
307                    //
308                    // When we finish ingesting our initial persist snapshot,
309                    // during "re-hydration", we downgrade this to the empty
310                    // frontier, so we need to be lenient to this failing from
311                    // then on.
312                    let _ = snapshot_cap.try_downgrade(persist_upper.iter());
313
314
315
316                    if last_rehydration_chunk {
317                        hydrating = false;
318
319                        tracing::info!(
320                            worker_id = %source_config.worker_id,
321                            source_id = %source_config.id,
322                            "upsert source finished rehydration",
323                        );
324
325                        snapshot_cap.downgrade(&[]);
326                    }
327
328                }
329                _ = input.ready() => {
330                    let mut events_processed = 0;
331                    while let Some(event) = input.next_sync() {
332                        match event {
333                            AsyncEvent::Data(cap, mut data) => {
334                                tracing::trace!(
335                                    worker_id = %source_config.worker_id,
336                                    source_id = %source_config.id,
337                                    time=?cap.time(),
338                                    updates=%data.len(),
339                                    "received data");
340
341                                let event_time = cap.time().clone();
342
343                                stage_input(
344                                    &mut stash,
345                                    cap,
346                                    &mut data,
347                                    &input_upper,
348                                    &resume_upper,
349                                );
350
351                                if prevent_snapshot_buffering && output_cap.time() == &event_time {
352                                    tracing::debug!(
353                                        worker_id = %source_config.worker_id,
354                                        source_id = %source_config.id,
355                                        ?event_time,
356                                        ?resume_upper,
357                                        ?output_cap,
358                                        "allowing partial drain");
359                                    partial_drain_time = Some(event_time.clone());
360                                } else {
361                                    tracing::debug!(
362                                        worker_id = %source_config.worker_id,
363                                        source_id = %source_config.id,
364                                        %prevent_snapshot_buffering,
365                                        ?event_time,
366                                        ?resume_upper,
367                                        ?output_cap,
368                                        "not allowing partial drain");
369                                }
370                            }
371                            AsyncEvent::Progress(upper) => {
372                                tracing::trace!(
373                                    worker_id = %source_config.worker_id,
374                                    source_id = %source_config.id,
375                                    ?upper,
376                                    "received progress");
377
378                                // Ignore progress updates before the `resume_upper`, which is our initial
379                                // capability post-snapshotting.
380                                if PartialOrder::less_than(&upper, &resume_upper) {
381                                    tracing::trace!(
382                                        worker_id = %source_config.worker_id,
383                                        source_id = %source_config.id,
384                                        ?upper,
385                                        ?resume_upper,
386                                        "ignoring progress updates before resume_upper");
387                                    continue;
388                                }
389
390                                // Disable partial drain, because this progress
391                                // update has moved the frontier. We might allow
392                                // it again once we receive data right at the
393                                // frontier again.
394                                partial_drain_time = None;
395
396
397                                if let Some(ts) = upper.as_option() {
398                                    tracing::trace!(
399                                        worker_id = %source_config.worker_id,
400                                        source_id = %source_config.id,
401                                        ?ts,
402                                        "downgrading output capability");
403                                    let _ = output_cap.try_downgrade(ts);
404                                }
405                                input_upper = upper;
406                            }
407                        }
408
409                        events_processed += 1;
410                        if let Some(max) = snapshot_buffering_max {
411                            if events_processed >= max {
412                                break;
413                            }
414                        }
415                    }
416                }
417            };
418
419            // While we have partially ingested updates of a timestamp our state
420            // is in an inconsistent/consolidating state and accessing it would
421            // panic.
422            if let Some(largest_seen_persist_ts) = largest_seen_persist_ts.as_ref() {
423                let largest_seen_outer_persist_ts = largest_seen_persist_ts.clone().to_outer();
424                let outer_persist_upper = persist_upper.iter().map(|ts| ts.clone().to_outer());
425                let outer_persist_upper = Antichain::from_iter(outer_persist_upper);
426                if outer_persist_upper.less_equal(&largest_seen_outer_persist_ts) {
427                    continue;
428                }
429            }
430
431            // We try and drain from our stash every time we go through the
432            // loop. More of our stash can become eligible for draining both
433            // when the source-input frontier advances or when the persist
434            // frontier advances.
435
436            // We can't easily iterate through the cap -> updates mappings and
437            // downgrade the cap at the same time, so we drain them out and
438            // re-insert them into the map at their (possibly downgraded) cap.
439
440
441            let stashed_work = stash.drain().collect_vec();
442            for (mut cap, mut updates) in stashed_work.into_iter() {
443                tracing::trace!(
444                    worker_id = %source_config.worker_id,
445                    source_id = %source_config.id,
446                    ?cap,
447                    ?updates,
448                    "stashed updates");
449
450                let mut min_remaining_time = drain_staged_input::<_, G, _, _, _>(
451                    &mut updates,
452                    &mut commands_state,
453                    &mut output_updates,
454                    &mut multi_get_scratch,
455                    DrainStyle::ToUpper{input_upper: &input_upper, persist_upper: &persist_upper},
456                    &mut error_emitter,
457                    &mut state,
458                    &source_config,
459                )
460                .await;
461
462                tracing::trace!(
463                    worker_id = %source_config.worker_id,
464                    source_id = %source_config.id,
465                    output_updates = %output_updates.len(),
466                    "output updates for complete timestamp");
467
468                for (update, ts, diff) in output_updates.drain(..) {
469                    output_handle.give(&cap, (update, ts, diff));
470                }
471
472                if !updates.is_empty() {
473                    let min_remaining_time = min_remaining_time.take().expect("we still have updates left");
474                    cap.downgrade(&min_remaining_time);
475
476                    // Stash them back in, being careful because we might have
477                    // to merge them with other updates that we already have for
478                    // that timestamp.
479                    stash.entry(cap)
480                        .and_modify(|existing_updates| existing_updates.append(&mut updates))
481                        .or_insert_with(|| updates);
482
483                }
484            }
485
486
487            if input_upper.is_empty() {
488                tracing::debug!(
489                    worker_id = %source_config.worker_id,
490                    source_id = %source_config.id,
491                    "input exhausted, shutting down");
492                break;
493            };
494
495            // If there were staged events that occurred at the capability time, drain
496            // them. This is safe because out-of-order updates to the same key that are
497            // drained in separate calls to `drain_staged_input` are correctly ordered by
498            // their `FromTime` in `drain_staged_input`.
499            //
500            // Note also that this may result in more updates in the output collection than
501            // the minimum. However, because the frontier only advances on `Progress` updates,
502            // the collection always accumulates correctly for all keys.
503            if let Some(partial_drain_time) = &partial_drain_time {
504
505                let stashed_work = stash.drain().collect_vec();
506                for (mut cap, mut updates) in stashed_work.into_iter() {
507                    tracing::trace!(
508                        worker_id = %source_config.worker_id,
509                        source_id = %source_config.id,
510                        ?cap,
511                        ?updates,
512                        "stashed updates");
513
514                    let mut min_remaining_time = drain_staged_input::<_, G, _, _, _>(
515                        &mut updates,
516                        &mut commands_state,
517                        &mut output_updates,
518                        &mut multi_get_scratch,
519                        DrainStyle::AtTime{
520                            time: partial_drain_time.clone(),
521                            persist_upper: &persist_upper
522                        },
523                        &mut error_emitter,
524                        &mut state,
525                        &source_config,
526                    )
527                    .await;
528
529                    tracing::trace!(
530                        worker_id = %source_config.worker_id,
531                        source_id = %source_config.id,
532                        output_updates = %output_updates.len(),
533                        "output updates for partial timestamp");
534
535                    for (update, ts, diff) in output_updates.drain(..) {
536                        output_handle.give(&cap, (update, ts, diff));
537                    }
538
539                    if !updates.is_empty() {
540                        let min_remaining_time = min_remaining_time.take().expect("we still have updates left");
541                        cap.downgrade(&min_remaining_time);
542
543                        // Stash them back in, being careful because we might have
544                        // to merge them with other updates that we already have for
545                        // that timestamp.
546                        stash.entry(cap)
547                            .and_modify(|existing_updates| existing_updates.append(&mut updates))
548                            .or_insert_with(|| updates);
549
550                    }
551                }
552            }
553        }
554    });
555
556    (
557        output.as_collection().map(|result| match result {
558            Ok(ok) => Ok(ok),
559            Err(err) => Err(DataflowError::from(EnvelopeError::Upsert(err))),
560        }),
561        health_stream,
562        snapshot_stream,
563        shutdown_button.press_on_drop(),
564    )
565}
566
567/// Helper method for [`upsert_inner`] used to stage `data` updates
568/// from the input/source timely edge.
569#[allow(clippy::disallowed_types)]
570fn stage_input<T, FromTime>(
571    stash: &mut HashMap<Capability<T>, Vec<(T, UpsertKey, Reverse<FromTime>, Option<UpsertValue>)>>,
572    cap: Capability<T>,
573    data: &mut Vec<((UpsertKey, Option<UpsertValue>, FromTime), T, Diff)>,
574    input_upper: &Antichain<T>,
575    resume_upper: &Antichain<T>,
576) where
577    T: PartialOrder + timely::progress::Timestamp,
578    FromTime: Ord,
579{
580    if PartialOrder::less_equal(input_upper, resume_upper) {
581        data.retain(|(_, ts, _)| resume_upper.less_equal(ts));
582    }
583
584    let stash_for_timestamp = stash.entry(cap).or_default();
585
586    stash_for_timestamp.extend(data.drain(..).map(|((key, value, order), time, diff)| {
587        assert!(diff.is_positive(), "invalid upsert input");
588        (time, key, Reverse(order), value)
589    }));
590}
591
592/// The style of drain we are performing on the stash. `AtTime`-drains cannot
593/// assume that all values have been seen, and must leave tombstones behind for deleted values.
594#[derive(Debug)]
595enum DrainStyle<'a, T> {
596    ToUpper {
597        input_upper: &'a Antichain<T>,
598        persist_upper: &'a Antichain<T>,
599    },
600    // For partial draining when taking the source snapshot.
601    AtTime {
602        time: T,
603        persist_upper: &'a Antichain<T>,
604    },
605}
606
607/// Helper method for [`upsert_inner`] used to stage `data` updates
608/// from the input timely edge.
609///
610/// Returns the minimum observed time across the updates that remain in the
611/// stash or `None` if none are left.
612async fn drain_staged_input<S, G, T, FromTime, E>(
613    stash: &mut Vec<(T, UpsertKey, Reverse<FromTime>, Option<UpsertValue>)>,
614    commands_state: &mut indexmap::IndexMap<UpsertKey, UpsertValueAndSize<T, Option<FromTime>>>,
615    output_updates: &mut Vec<(Result<Row, UpsertError>, T, Diff)>,
616    multi_get_scratch: &mut Vec<UpsertKey>,
617    drain_style: DrainStyle<'_, T>,
618    error_emitter: &mut E,
619    state: &mut UpsertState<'_, S, T, Option<FromTime>>,
620    source_config: &crate::source::SourceExportCreationConfig,
621) -> Option<T>
622where
623    S: UpsertStateBackend<T, Option<FromTime>>,
624    G: Scope,
625    T: TotalOrder + timely::ExchangeData + Debug + Ord + Sync,
626    FromTime: timely::ExchangeData + Ord + Sync,
627    E: UpsertErrorEmitter<G>,
628{
629    let mut min_remaining_time = Antichain::new();
630
631    let mut eligible_updates = stash
632        .drain_filter_swapping(|(ts, _, _, _)| {
633            let eligible = match &drain_style {
634                DrainStyle::ToUpper {
635                    input_upper,
636                    persist_upper,
637                } => {
638                    // We make sure that a) we only process updates when we know their
639                    // timestamp is complete, that is there will be no more updates for
640                    // that timestamp, and b) that "previous" times in the persist
641                    // input are complete. The latter makes sure that we emit updates
642                    // for the next timestamp that are consistent with the global state
643                    // in the output persist shard, which also serves as a persistent
644                    // copy of our in-memory/on-disk upsert state.
645                    !input_upper.less_equal(ts) && !persist_upper.less_than(ts)
646                }
647                DrainStyle::AtTime {
648                    time,
649                    persist_upper,
650                } => {
651                    // Even when emitting partial updates, we still need to wait
652                    // until "previous" times in the persist input are complete.
653                    *ts <= *time && !persist_upper.less_than(ts)
654                }
655            };
656
657            if !eligible {
658                min_remaining_time.insert(ts.clone());
659            }
660
661            eligible
662        })
663        .filter(|(ts, _, _, _)| {
664            let persist_upper = match &drain_style {
665                DrainStyle::ToUpper {
666                    input_upper: _,
667                    persist_upper,
668                } => persist_upper,
669                DrainStyle::AtTime {
670                    time: _,
671                    persist_upper,
672                } => persist_upper,
673            };
674
675            // Any update that is "in the past" of the persist upper is not
676            // relevant anymore. We _can_ emit changes for it, but the
677            // downstream persist_sink would filter these updates out because
678            // the shard upper is already further ahead.
679            //
680            // Plus, our upsert state is up-to-date to the persist_upper, so we
681            // wouldn't be able to emit correct retractions for incoming
682            // commands whose `ts` is in the past of that.
683            let relevant = persist_upper.less_equal(ts);
684            relevant
685        })
686        .collect_vec();
687
688    tracing::debug!(
689        worker_id = %source_config.worker_id,
690        source_id = %source_config.id,
691        ?drain_style,
692        remaining = %stash.len(),
693        eligible = eligible_updates.len(),
694        "draining stash");
695
696    // Sort the eligible updates by (key, time, Reverse(from_time)) so that
697    // deduping by (key, time) gives the latest change for that key.
698    eligible_updates.sort_unstable_by(|a, b| {
699        let (ts1, key1, from_ts1, val1) = a;
700        let (ts2, key2, from_ts2, val2) = b;
701        Ord::cmp(&(ts1, key1, from_ts1, val1), &(ts2, key2, from_ts2, val2))
702    });
703
704    // Read the previous values _per key_ out of `state`, recording it
705    // along with the value with the _latest timestamp for that key_.
706    commands_state.clear();
707    for (_, key, _, _) in eligible_updates.iter() {
708        commands_state.entry(*key).or_default();
709    }
710
711    // These iterators iterate in the same order because `commands_state`
712    // is an `IndexMap`.
713    multi_get_scratch.clear();
714    multi_get_scratch.extend(commands_state.iter().map(|(k, _)| *k));
715    match state
716        .multi_get(multi_get_scratch.drain(..), commands_state.values_mut())
717        .await
718    {
719        Ok(_) => {}
720        Err(e) => {
721            error_emitter
722                .emit("Failed to fetch records from state".to_string(), e)
723                .await;
724        }
725    }
726
727    // From the prefix that can be emitted we can deduplicate based on (ts, key) in
728    // order to only process the command with the maximum order within the (ts,
729    // key) group. This is achieved by wrapping order in `Reverse(FromTime)` above.;
730    let mut commands = eligible_updates.into_iter().dedup_by(|a, b| {
731        let ((a_ts, a_key, _, _), (b_ts, b_key, _, _)) = (a, b);
732        a_ts == b_ts && a_key == b_key
733    });
734
735    let bincode_opts = upsert_types::upsert_bincode_opts();
736    // Upsert the values into `commands_state`, by recording the latest
737    // value (or deletion). These will be synced at the end to the `state`.
738    //
739    // Note that we are effectively doing "mini-upsert" here, using
740    // `command_state`. This "mini-upsert" is seeded with data from `state`, using
741    // a single `multi_get` above, and the final state is written out into
742    // `state` using a single `multi_put`. This simplifies `UpsertStateBackend`
743    // implementations, and reduces the number of reads and write we need to do.
744    //
745    // This "mini-upsert" technique is actually useful in `UpsertState`'s
746    // `consolidate_snapshot_read_write_inner` implementation, minimizing gets and puts on
747    // the `UpsertStateBackend` implementations. In some sense, its "upsert all the way down".
748    while let Some((ts, key, from_time, value)) = commands.next() {
749        let mut command_state = if let Entry::Occupied(command_state) = commands_state.entry(key) {
750            command_state
751        } else {
752            panic!("key missing from commands_state");
753        };
754
755        let existing_state_cell = &mut command_state.get_mut().value;
756
757        if let Some(cs) = existing_state_cell.as_mut() {
758            cs.ensure_decoded(bincode_opts);
759        }
760
761        // Skip this command if its order key is below the one in the upsert state.
762        // Note that the existing order key may be `None` if the existing value
763        // is from snapshotting, which always sorts below new values/deletes.
764        let existing_order = existing_state_cell
765            .as_ref()
766            .and_then(|cs| cs.provisional_order(&ts).map_or(None, Option::as_ref));
767        //.map(|cs| cs.provisional_order(&ts).map_or(None, Option::as_ref));
768        if existing_order >= Some(&from_time.0) {
769            // Skip this update. If no later updates adjust this key, then we just
770            // end up writing the same value back to state. If there
771            // is nothing in the state, `existing_order` is `None`, and this
772            // does not occur.
773            continue;
774        }
775
776        match value {
777            Some(value) => {
778                if let Some(old_value) = existing_state_cell.as_ref() {
779                    if let Some(old_value) = old_value.provisional_value_ref(&ts) {
780                        output_updates.push((old_value.clone(), ts.clone(), Diff::MINUS_ONE));
781                    }
782                }
783
784                match &drain_style {
785                    DrainStyle::AtTime { .. } => {
786                        let existing_value = existing_state_cell.take();
787
788                        let new_value = match existing_value {
789                            Some(existing_value) => existing_value.clone().into_provisional_value(
790                                value.clone(),
791                                ts.clone(),
792                                Some(from_time.0.clone()),
793                            ),
794                            None => StateValue::new_provisional_value(
795                                value.clone(),
796                                ts.clone(),
797                                Some(from_time.0.clone()),
798                            ),
799                        };
800
801                        existing_state_cell.replace(new_value);
802                    }
803                    DrainStyle::ToUpper { .. } => {
804                        // Not writing down provisional values, or anything.
805                    }
806                };
807
808                output_updates.push((value, ts, Diff::ONE));
809            }
810            None => {
811                if let Some(old_value) = existing_state_cell.as_ref() {
812                    if let Some(old_value) = old_value.provisional_value_ref(&ts) {
813                        output_updates.push((old_value.clone(), ts.clone(), Diff::MINUS_ONE));
814                    }
815                }
816
817                match &drain_style {
818                    DrainStyle::AtTime { .. } => {
819                        let existing_value = existing_state_cell.take();
820
821                        let new_value = match existing_value {
822                            Some(existing_value) => existing_value
823                                .into_provisional_tombstone(ts.clone(), Some(from_time.0.clone())),
824                            None => StateValue::new_provisional_tombstone(
825                                ts.clone(),
826                                Some(from_time.0.clone()),
827                            ),
828                        };
829
830                        existing_state_cell.replace(new_value);
831                    }
832                    DrainStyle::ToUpper { .. } => {
833                        // Not writing down provisional values, or anything.
834                    }
835                }
836            }
837        }
838    }
839
840    match &drain_style {
841        DrainStyle::AtTime { .. } => {
842            match state
843                .multi_put(
844                    // We don't want to update per-record stats, like size of
845                    // records indexed or count of records indexed.
846                    //
847                    // We only add provisional values and these will be
848                    // overwritten once we receive updates for state from the
849                    // persist input. And the merge functionality cannot know
850                    // what was in state before merging, so it cannot correctly
851                    // retract/update stats added here.
852                    //
853                    // Mostly, the merge functionality can't update those stats
854                    // because merging happens in a function that we pass to
855                    // rocksdb which doesn't have access to any external
856                    // context. And in general, with rocksdb we do blind writes
857                    // rather than inspect what was there before when
858                    // updating/inserting.
859                    false,
860                    commands_state.drain(..).map(|(k, cv)| {
861                        (
862                            k,
863                            upsert_types::PutValue {
864                                value: cv.value.map(|cv| cv.into_decoded()),
865                                previous_value_metadata: cv.metadata.map(|v| ValueMetadata {
866                                    size: v.size.try_into().expect("less than i64 size"),
867                                    is_tombstone: v.is_tombstone,
868                                }),
869                            },
870                        )
871                    }),
872                )
873                .await
874            {
875                Ok(_) => {}
876                Err(e) => {
877                    error_emitter
878                        .emit("Failed to update records in state".to_string(), e)
879                        .await;
880                }
881            }
882        }
883        style => {
884            tracing::trace!(
885                worker_id = %source_config.worker_id,
886                source_id = %source_config.id,
887                "not doing state update for drain style {:?}", style);
888        }
889    }
890
891    min_remaining_time.into_option()
892}