1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
// Copyright Materialize, Inc. and contributors. All rights reserved.
//
// Use of this software is governed by the Business Source License
// included in the LICENSE file.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

//! Implementation of feedback UPSERT operator and associated helpers. See
//! [`upsert_inner`] for a description of how the operator works and why.

use std::cmp::Reverse;
// We don't care about the order, but we do want drain().
#[allow(clippy::disallowed_types)]
use std::collections::HashMap;
use std::fmt::Debug;
use std::sync::Arc;

use differential_dataflow::hashable::Hashable;
use differential_dataflow::{AsCollection, Collection};
use indexmap::map::Entry;
use itertools::Itertools;
use mz_ore::vec::VecExt;
use mz_repr::{Diff, Row};
use mz_storage_types::errors::{DataflowError, EnvelopeError, UpsertError};
use mz_timely_util::builder_async::{
    Event as AsyncEvent, OperatorBuilder as AsyncOperatorBuilder, PressOnDropButton,
};
use std::convert::Infallible;
use timely::container::CapacityContainerBuilder;
use timely::dataflow::channels::pact::Exchange;
use timely::dataflow::operators::{Capability, CapabilitySet};
use timely::dataflow::{Scope, Stream};
use timely::order::{PartialOrder, TotalOrder};
use timely::progress::{Antichain, Timestamp};

use crate::healthcheck::HealthStatusUpdate;
use crate::metrics::upsert::UpsertMetrics;
use crate::render::sources::OutputIndex;
use crate::upsert::types::UpsertValueAndSize;
use crate::upsert::types::{self as upsert_types, ValueMetadata};
use crate::upsert::types::{StateValue, UpsertState, UpsertStateBackend};
use crate::upsert::UpsertConfig;
use crate::upsert::UpsertErrorEmitter;
use crate::upsert::UpsertKey;
use crate::upsert::UpsertValue;

/// An operator that transforms an input stream of upserts (updates to key-value
/// pairs), which represents an imaginary key-value state, into a differential
/// collection. It keeps an internal map-like state which keeps the latest value
/// for each key, such that it can emit the retractions and additions implied by
/// a new update for a given key.
///
/// This operator is intended to be used in an ingestion pipeline that reads
/// from an external source, and the output of this operator is eventually
/// written to persist.
///
/// The operator has two inputs: a) the source input, of upserts, and b) a
/// persist input that feeds back the upsert state to the operator. Below, there
/// is a section for each input that describes how and why we process updates
/// from each input.
///
/// An important property of this operator is that it does _not_ update the
/// map-like state that it keeps for translating the stream of upserts into a
/// differential collection when it processes source input. It _only_ updates
/// the map-like state based on updates from the persist (feedback) input. We do
/// this because the operator is expected to be used in cases where there are
/// multiple concurrent instances of the same ingestion pipeline, and the
/// different instances might see different input because of concurrency and
/// non-determinism. All instances of the upsert operator must produce output
/// that is consistent with the current state of the output (that all instances
/// produce "collaboratively"). This global state is what the operator
/// continually learns about via updates from the persist input.
///
/// ## Processing the Source Input
///
/// Updates on the source input are stashed/staged until they can be processed.
/// Whether or not an update can be processed depends both on the upper frontier
/// of the source input and on the upper frontier of the persist input:
///
///  - Input updates are only processed once their timestamp is "done", that is
///  the input upper is no longer `less_equal` their timestamp.
///
///  - Input updates are only processed once they are at the persist upper, that
///  is we have emitted and written down updates for all previous times and we
///  have updated our map-like state to the latest global state of the output of
///  the ingestion pipeline. We know this is the case when the persist upper is
///  no longer `less_than` their timestamp.
///
/// As an optimization, we allow processing input updates when they are right at
/// the input frontier. This is called _partial emission_ because we are
/// emitting updates that might be retracted when processing more updates from
/// the same timestamp. In order to be able to process these updates we keep
/// _provisional values_ in our upsert state. These will be overwritten when we
/// get the final upsert values on the persist input.
///
/// ## Processing the Persist Input
///
/// We continually ingest updates from the persist input into our state using
/// `UpsertState::consolidate_chunk`. We might be ingesting updates from the
/// initial snapshot (when starting the operator) that are not consolidated or
/// we might be ingesting updates from a partial emission (see above). In either
/// case, our input might not be consolidated and `consolidate_chunk` is able to
/// handle that.
pub fn upsert_inner<G: Scope, FromTime, F, Fut, US>(
    input: &Collection<G, (UpsertKey, Option<UpsertValue>, FromTime), Diff>,
    key_indices: Vec<usize>,
    resume_upper: Antichain<G::Timestamp>,
    persist_input: Collection<G, Result<Row, DataflowError>, Diff>,
    mut persist_token: Option<Vec<PressOnDropButton>>,
    upsert_metrics: UpsertMetrics,
    source_config: crate::source::SourceExportCreationConfig,
    state_fn: F,
    upsert_config: UpsertConfig,
    prevent_snapshot_buffering: bool,
    snapshot_buffering_max: Option<usize>,
) -> (
    Collection<G, Result<Row, DataflowError>, Diff>,
    Stream<G, (OutputIndex, HealthStatusUpdate)>,
    Stream<G, Infallible>,
    PressOnDropButton,
)
where
    G::Timestamp: TotalOrder + Sync,
    F: FnOnce() -> Fut + 'static,
    Fut: std::future::Future<Output = US>,
    US: UpsertStateBackend<G::Timestamp, Option<FromTime>>,
    FromTime: Debug + timely::ExchangeData + Ord + Sync,
{
    let mut builder = AsyncOperatorBuilder::new("Upsert".to_string(), input.scope());

    // We only care about UpsertValueError since this is the only error that we can retract
    let persist_input = persist_input.flat_map(move |result| {
        let value = match result {
            Ok(ok) => Ok(ok),
            Err(DataflowError::EnvelopeError(err)) => match *err {
                EnvelopeError::Upsert(err) => Err(err),
                _ => return None,
            },
            Err(_) => return None,
        };
        Some((UpsertKey::from_value(value.as_ref(), &key_indices), value))
    });
    let (output_handle, output) = builder.new_output();

    // An output that just reports progress of the snapshot consolidation process upstream to the
    // persist source to ensure that backpressure is applied
    let (_snapshot_handle, snapshot_stream) =
        builder.new_output::<CapacityContainerBuilder<Vec<Infallible>>>();

    let (mut health_output, health_stream) = builder.new_output();
    let mut input = builder.new_input_for(
        &input.inner,
        Exchange::new(move |((key, _, _), _, _)| UpsertKey::hashed(key)),
        &output_handle,
    );

    let mut persist_input = builder.new_disconnected_input(
        &persist_input.inner,
        Exchange::new(|((key, _), _, _)| UpsertKey::hashed(key)),
    );

    let upsert_shared_metrics = Arc::clone(&upsert_metrics.shared);

    let shutdown_button = builder.build(move |caps| async move {
        let [mut output_cap, snapshot_cap, health_cap]: [_; 3] = caps.try_into().unwrap();
        let mut snapshot_cap = CapabilitySet::from_elem(snapshot_cap);

        // The order key of the `UpsertState` is `Option<FromTime>`, which implements `Default`
        // (as required for `consolidate_chunk`), with slightly more efficient serialization
        // than a default `Partitioned`.

        let mut state = UpsertState::<_, G::Timestamp, Option<FromTime>>::new(
            state_fn().await,
            upsert_shared_metrics,
            &upsert_metrics,
            source_config.source_statistics.clone(),
            upsert_config.shrink_upsert_unused_buffers_by_ratio,
        );

        // True while we're still reading the initial "snapshot" (a whole bunch
        // of updates, all at the same initial timestamp) from our persist
        // input or while we're reading the initial snapshot from the upstream
        // source.
        let mut hydrating = true;

        // A re-usable buffer of changes, per key. This is an `IndexMap` because it has to be `drain`-able
        // and have a consistent iteration order.
        let mut commands_state: indexmap::IndexMap<_, upsert_types::UpsertValueAndSize<G::Timestamp, Option<FromTime>>> =
            indexmap::IndexMap::new();
        let mut multi_get_scratch = Vec::new();

        // For stashing source input while it's not eligible for processing.
        // We don't care about the order, but we do want drain().
        #[allow(clippy::disallowed_types)]
        let mut stash = HashMap::new();
        let mut input_upper = Antichain::from_elem(Timestamp::minimum());
        let mut partial_drain_time = None;

        // For our persist/feedback input, both of these.
        let mut persist_stash = vec![];
        let mut persist_upper = Antichain::from_elem(Timestamp::minimum());

        // We keep track of the largest timestamp seen on the persist input so
        // that we can block processing source input while that timestamp is
        // beyond the persist frontier. While ingesting updates of a timestamp,
        // our upsert state is in a consolidating state, and trying to read it
        // at that time would yield a panic.
        //
        // NOTE(aljoscha): You would think that it cannot happen that we even
        // attempt to process source updates while the state is in a
        // consolidating state, because we always wait until the persist
        // frontier "catches up" with the timestamp of the source input. If
        // there is only this here UPSERT operator and no concurrent instances,
        // this is true. But with concurrent instances it can happen that an
        // operator that is faster than us makes it so updates get written to
        // persist. And we would then be ingesting them.
        let mut largest_seen_persist_ts: Option<G::Timestamp> = None;

        // A buffer for our output.
        let mut output_updates = vec![];

        let mut error_emitter = (&mut health_output, &health_cap);


        loop {
            tokio::select! {
                _ = persist_input.ready() => {
                    // Read away as much input as we can.
                    while let Some(persist_event) = persist_input.next_sync() {
                        match persist_event {
                            AsyncEvent::Data(time, data) => {
                                tracing::trace!(
                                    worker_id = %source_config.worker_id,
                                    source_id = %source_config.id,
                                    time=?time,
                                    updates=%data.len(),
                                    "received persist data");

                                persist_stash.extend(data.into_iter().map(|((key, value), ts, diff)| {
                                    largest_seen_persist_ts = std::cmp::max(largest_seen_persist_ts.clone(), Some(ts.clone()));
                                    (key, value, ts, diff)
                                }));
                            }
                            AsyncEvent::Progress(upper) => {
                                tracing::trace!(
                                    worker_id = %source_config.worker_id,
                                    source_id = %source_config.id,
                                    ?upper,
                                    "received persist progress");
                                persist_upper = upper;
                            }
                        }
                    }

                    // When we finish ingesting our initial persist snapshot,
                    // during "re-hydration", we downgrade this to the empty
                    // frontier, so we need to be lenient to this failing from
                    // then on.
                    let _ = snapshot_cap.try_downgrade(persist_upper.iter());

                    let last_rehydration_chunk =
                        hydrating && PartialOrder::less_equal(&resume_upper, &persist_upper);

                    tracing::debug!(
                        worker_id = %source_config.worker_id,
                        source_id = %source_config.id,
                        persist_stash = %persist_stash.len(),
                        %last_rehydration_chunk,
                        ?resume_upper,
                        ?persist_upper,
                        "ingesting persist snapshot chunk");

                    let persist_stash_iter = persist_stash
                        .drain(..)
                        .map(|(key, val, _ts, diff)| (key, val, diff));

                    match state
                        .consolidate_chunk(
                            persist_stash_iter,
                            last_rehydration_chunk,
                        )
                        .await
                    {
                        Ok(_) => {}
                        Err(e) => {
                            // Make sure our persist source can shut down.
                            persist_token.take();
                            snapshot_cap.downgrade(&[]);
                            UpsertErrorEmitter::<G>::emit(
                                &mut error_emitter,
                                "Failed to rehydrate state".to_string(),
                                e,
                            )
                            .await;
                        }
                    }


                    if last_rehydration_chunk {
                        hydrating = false;

                        tracing::info!(
                            worker_id = %source_config.worker_id,
                            source_id = %source_config.id,
                            "upsert source finished rehydration",
                        );

                        snapshot_cap.downgrade(&[]);
                    }

                }
                _ = input.ready() => {
                    let mut events_processed = 0;
                    while let Some(event) = input.next_sync() {
                        match event {
                            AsyncEvent::Data(cap, mut data) => {
                                tracing::trace!(
                                    worker_id = %source_config.worker_id,
                                    source_id = %source_config.id,
                                    time=?cap.time(),
                                    updates=%data.len(),
                                    "received data");

                                let event_time = cap.time().clone();

                                stage_input(
                                    &mut stash,
                                    cap,
                                    &mut data,
                                    &input_upper,
                                    &resume_upper,
                                );

                                if prevent_snapshot_buffering && output_cap.time() == &event_time {
                                    tracing::debug!(
                                        worker_id = %source_config.worker_id,
                                        source_id = %source_config.id,
                                        ?event_time,
                                        ?resume_upper,
                                        ?output_cap,
                                        "allowing partial drain");
                                    partial_drain_time = Some(event_time.clone());
                                } else {
                                    tracing::debug!(
                                        worker_id = %source_config.worker_id,
                                        source_id = %source_config.id,
                                        %prevent_snapshot_buffering,
                                        ?event_time,
                                        ?resume_upper,
                                        ?output_cap,
                                        "not allowing partial drain");
                                }
                            }
                            AsyncEvent::Progress(upper) => {
                                tracing::trace!(
                                    worker_id = %source_config.worker_id,
                                    source_id = %source_config.id,
                                    ?upper,
                                    "received progress");

                                // Ignore progress updates before the `resume_upper`, which is our initial
                                // capability post-snapshotting.
                                if PartialOrder::less_than(&upper, &resume_upper) {
                                    tracing::trace!(
                                        worker_id = %source_config.worker_id,
                                        source_id = %source_config.id,
                                        ?upper,
                                        ?resume_upper,
                                        "ignoring progress updates before resume_upper");
                                    continue;
                                }

                                // Disable partial drain, because this progress
                                // update has moved the frontier. We might allow
                                // it again once we receive data right at the
                                // frontier again.
                                partial_drain_time = None;


                                if let Some(ts) = upper.as_option() {
                                    tracing::trace!(
                                        worker_id = %source_config.worker_id,
                                        source_id = %source_config.id,
                                        ?ts,
                                        "downgrading output capability");
                                    let _ = output_cap.try_downgrade(ts);
                                }
                                input_upper = upper;
                            }
                        }

                        events_processed += 1;
                        if let Some(max) = snapshot_buffering_max {
                            if events_processed >= max {
                                break;
                            }
                        }
                    }
                }
            };

            // While we have partially ingested updates of a timestamp our state
            // is in an inconsistent/consolidating state and accessing it would
            // panic.
            if let Some(largest_seen_persist_ts) = largest_seen_persist_ts.as_ref() {
                if persist_upper.less_equal(largest_seen_persist_ts) {
                    continue;
                }
            }

            // We try and drain from our stash every time we go through the
            // loop. More of our stash can become eligible for draining both
            // when the source-input frontier advances or when the persist
            // frontier advances.

            // We can't easily iterate through the cap -> updates mappings and
            // downgrade the cap at the same time, so we drain them out and
            // re-insert them into the map at their (possibly downgraded) cap.


            let stashed_work = stash.drain().collect_vec();
            for (mut cap, mut updates) in stashed_work.into_iter() {
                tracing::trace!(
                    worker_id = %source_config.worker_id,
                    source_id = %source_config.id,
                    ?cap,
                    ?updates,
                    "stashed updates");

                let mut min_remaining_time = drain_staged_input::<_, G, _, _, _>(
                    &mut updates,
                    &mut commands_state,
                    &mut output_updates,
                    &mut multi_get_scratch,
                    DrainStyle::ToUpper{input_upper: &input_upper, persist_upper: &persist_upper},
                    &mut error_emitter,
                    &mut state,
                    &source_config,
                )
                .await;

                tracing::trace!(
                    worker_id = %source_config.worker_id,
                    source_id = %source_config.id,
                    output_updates = %output_updates.len(),
                    "output updates for complete timestamp");

                for (update, ts, diff) in output_updates.drain(..) {
                    output_handle.give(&cap, (update, ts, diff));
                }

                if !updates.is_empty() {
                    let min_remaining_time = min_remaining_time.take().expect("we still have updates left");
                    cap.downgrade(&min_remaining_time);

                    // Stash them back in, being careful because we might have
                    // to merge them with other updates that we already have for
                    // that timestamp.
                    stash.entry(cap)
                        .and_modify(|existing_updates| existing_updates.append(&mut updates))
                        .or_insert_with(|| updates);

                }
            }


            if input_upper.is_empty() {
                tracing::debug!(
                    worker_id = %source_config.worker_id,
                    source_id = %source_config.id,
                    "input exhausted, shutting down");
                break;
            };

            // If there were staged events that occurred at the capability time, drain
            // them. This is safe because out-of-order updates to the same key that are
            // drained in separate calls to `drain_staged_input` are correctly ordered by
            // their `FromTime` in `drain_staged_input`.
            //
            // Note also that this may result in more updates in the output collection than
            // the minimum. However, because the frontier only advances on `Progress` updates,
            // the collection always accumulates correctly for all keys.
            if let Some(partial_drain_time) = &partial_drain_time {

                let stashed_work = stash.drain().collect_vec();
                for (mut cap, mut updates) in stashed_work.into_iter() {
                    tracing::trace!(
                        worker_id = %source_config.worker_id,
                        source_id = %source_config.id,
                        ?cap,
                        ?updates,
                        "stashed updates");

                    let mut min_remaining_time = drain_staged_input::<_, G, _, _, _>(
                        &mut updates,
                        &mut commands_state,
                        &mut output_updates,
                        &mut multi_get_scratch,
                        DrainStyle::AtTime{
                            time: partial_drain_time.clone(),
                            persist_upper: &persist_upper
                        },
                        &mut error_emitter,
                        &mut state,
                        &source_config,
                    )
                    .await;

                    tracing::trace!(
                        worker_id = %source_config.worker_id,
                        source_id = %source_config.id,
                        output_updates = %output_updates.len(),
                        "output updates for partial timestamp");

                    for (update, ts, diff) in output_updates.drain(..) {
                        output_handle.give(&cap, (update, ts, diff));
                    }

                    if !updates.is_empty() {
                        let min_remaining_time = min_remaining_time.take().expect("we still have updates left");
                        cap.downgrade(&min_remaining_time);

                        // Stash them back in, being careful because we might have
                        // to merge them with other updates that we already have for
                        // that timestamp.
                        stash.entry(cap)
                            .and_modify(|existing_updates| existing_updates.append(&mut updates))
                            .or_insert_with(|| updates);

                    }
                }
            }
        }
    });

    (
        output.as_collection().map(|result| match result {
            Ok(ok) => Ok(ok),
            Err(err) => Err(DataflowError::from(EnvelopeError::Upsert(err))),
        }),
        health_stream,
        snapshot_stream,
        shutdown_button.press_on_drop(),
    )
}

/// Helper method for [`upsert_inner`] used to stage `data` updates
/// from the input/source timely edge.
#[allow(clippy::disallowed_types)]
fn stage_input<T, FromTime>(
    stash: &mut HashMap<Capability<T>, Vec<(T, UpsertKey, Reverse<FromTime>, Option<UpsertValue>)>>,
    cap: Capability<T>,
    data: &mut Vec<((UpsertKey, Option<UpsertValue>, FromTime), T, Diff)>,
    input_upper: &Antichain<T>,
    resume_upper: &Antichain<T>,
) where
    T: PartialOrder + timely::progress::Timestamp,
    FromTime: Ord,
{
    if PartialOrder::less_equal(input_upper, resume_upper) {
        data.retain(|(_, ts, _)| resume_upper.less_equal(ts));
    }

    let stash_for_timestamp = stash.entry(cap).or_default();

    stash_for_timestamp.extend(data.drain(..).map(|((key, value, order), time, diff)| {
        assert!(diff > 0, "invalid upsert input");
        (time, key, Reverse(order), value)
    }));
}

/// The style of drain we are performing on the stash. `AtTime`-drains cannot
/// assume that all values have been seen, and must leave tombstones behind for deleted values.
#[derive(Debug)]
enum DrainStyle<'a, T> {
    ToUpper {
        input_upper: &'a Antichain<T>,
        persist_upper: &'a Antichain<T>,
    },
    // For partial draining when taking the source snapshot.
    AtTime {
        time: T,
        persist_upper: &'a Antichain<T>,
    },
}

/// Helper method for [`upsert_inner`] used to stage `data` updates
/// from the input timely edge.
///
/// Returns the minimum observed time across the updates that remain in the
/// stash or `None` if none are left.
async fn drain_staged_input<S, G, T, FromTime, E>(
    stash: &mut Vec<(T, UpsertKey, Reverse<FromTime>, Option<UpsertValue>)>,
    commands_state: &mut indexmap::IndexMap<UpsertKey, UpsertValueAndSize<T, Option<FromTime>>>,
    output_updates: &mut Vec<(Result<Row, UpsertError>, T, Diff)>,
    multi_get_scratch: &mut Vec<UpsertKey>,
    drain_style: DrainStyle<'_, T>,
    error_emitter: &mut E,
    state: &mut UpsertState<'_, S, T, Option<FromTime>>,
    source_config: &crate::source::SourceExportCreationConfig,
) -> Option<T>
where
    S: UpsertStateBackend<T, Option<FromTime>>,
    G: Scope,
    T: TotalOrder + timely::ExchangeData + Debug + Ord + Sync,
    FromTime: timely::ExchangeData + Ord + Sync,
    E: UpsertErrorEmitter<G>,
{
    let mut min_remaining_time = Antichain::new();

    let mut eligible_updates = stash
        .drain_filter_swapping(|(ts, _, _, _)| {
            let eligible = match &drain_style {
                DrainStyle::ToUpper {
                    input_upper,
                    persist_upper,
                } => {
                    // We make sure that a) we only process updates when we know their
                    // timestamp is complete, that is there will be no more updates for
                    // that timestamp, and b) that "previous" times in the persist
                    // input are complete. The latter makes sure that we emit updates
                    // for the next timestamp that are consistent with the global state
                    // in the output persist shard, which also serves as a persistent
                    // copy of our in-memory/on-disk upsert state.
                    !input_upper.less_equal(ts) && !persist_upper.less_than(ts)
                }
                DrainStyle::AtTime {
                    time,
                    persist_upper,
                } => {
                    // Even when emitting partial updates, we still need to wait
                    // until "previous" times in the persist input are complete.
                    *ts <= *time && !persist_upper.less_than(ts)
                }
            };

            if !eligible {
                min_remaining_time.insert(ts.clone());
            }

            eligible
        })
        .collect_vec();

    tracing::debug!(
        worker_id = %source_config.worker_id,
        source_id = %source_config.id,
        ?drain_style,
        remaining = %stash.len(),
        eligible = eligible_updates.len(),
        "draining stash");

    // Sort the eligible updates by (key, time, Reverse(from_time)) so that
    // deduping by (key, time) gives the latest change for that key.
    eligible_updates.sort_unstable_by(|a, b| {
        let (ts1, key1, from_ts1, val1) = a;
        let (ts2, key2, from_ts2, val2) = b;
        Ord::cmp(&(ts1, key1, from_ts1, val1), &(ts2, key2, from_ts2, val2))
    });

    // Read the previous values _per key_ out of `state`, recording it
    // along with the value with the _latest timestamp for that key_.
    commands_state.clear();
    for (_, key, _, _) in eligible_updates.iter() {
        commands_state.entry(*key).or_default();
    }

    // These iterators iterate in the same order because `commands_state`
    // is an `IndexMap`.
    multi_get_scratch.clear();
    multi_get_scratch.extend(commands_state.iter().map(|(k, _)| *k));
    match state
        .multi_get(multi_get_scratch.drain(..), commands_state.values_mut())
        .await
    {
        Ok(_) => {}
        Err(e) => {
            error_emitter
                .emit("Failed to fetch records from state".to_string(), e)
                .await;
        }
    }

    // From the prefix that can be emitted we can deduplicate based on (ts, key) in
    // order to only process the command with the maximum order within the (ts,
    // key) group. This is achieved by wrapping order in `Reverse(FromTime)` above.;
    let mut commands = eligible_updates.into_iter().dedup_by(|a, b| {
        let ((a_ts, a_key, _, _), (b_ts, b_key, _, _)) = (a, b);
        a_ts == b_ts && a_key == b_key
    });

    let bincode_opts = upsert_types::upsert_bincode_opts();
    // Upsert the values into `commands_state`, by recording the latest
    // value (or deletion). These will be synced at the end to the `state`.
    //
    // Note that we are effectively doing "mini-upsert" here, using
    // `command_state`. This "mini-upsert" is seeded with data from `state`, using
    // a single `multi_get` above, and the final state is written out into
    // `state` using a single `multi_put`. This simplifies `UpsertStateBackend`
    // implementations, and reduces the number of reads and write we need to do.
    //
    // This "mini-upsert" technique is actually useful in `UpsertState`'s
    // `consolidate_snapshot_read_write_inner` implementation, minimizing gets and puts on
    // the `UpsertStateBackend` implementations. In some sense, its "upsert all the way down".
    while let Some((ts, key, from_time, value)) = commands.next() {
        let mut command_state = if let Entry::Occupied(command_state) = commands_state.entry(key) {
            command_state
        } else {
            panic!("key missing from commands_state");
        };

        let existing_state_cell = &mut command_state.get_mut().value;

        if let Some(cs) = existing_state_cell.as_mut() {
            cs.ensure_decoded(bincode_opts);
        }

        // Skip this command if its order key is below the one in the upsert state.
        // Note that the existing order key may be `None` if the existing value
        // is from snapshotting, which always sorts below new values/deletes.
        let existing_order = existing_state_cell
            .as_ref()
            .and_then(|cs| cs.provisional_order(&ts).map_or(None, Option::as_ref));
        //.map(|cs| cs.provisional_order(&ts).map_or(None, Option::as_ref));
        if existing_order >= Some(&from_time.0) {
            // Skip this update. If no later updates adjust this key, then we just
            // end up writing the same value back to state. If there
            // is nothing in the state, `existing_order` is `None`, and this
            // does not occur.
            continue;
        }

        match value {
            Some(value) => {
                let existing_value = existing_state_cell.take();

                let old_value = if let Some(old_value) = existing_value.as_ref() {
                    old_value.provisional_value_ref(&ts)
                } else {
                    None
                };

                if let Some(old_value) = old_value {
                    output_updates.push((old_value.clone(), ts.clone(), -1));
                }

                match &drain_style {
                    DrainStyle::AtTime { .. } => {
                        let new_value = match existing_value {
                            Some(existing_value) => existing_value.into_provisional_value(
                                value.clone(),
                                ts.clone(),
                                Some(from_time.0.clone()),
                            ),
                            None => StateValue::new_provisional_value(
                                value.clone(),
                                ts.clone(),
                                Some(from_time.0.clone()),
                            ),
                        };
                        existing_state_cell.replace(new_value);
                    }
                    DrainStyle::ToUpper { .. } => {
                        // Not writing down provisional values, or anything.
                    }
                };

                output_updates.push((value, ts, 1));
            }
            None => {
                let existing_value = existing_state_cell.take();

                let old_value = if let Some(old_value) = existing_value.as_ref() {
                    old_value.provisional_value_ref(&ts)
                } else {
                    None
                };
                if let Some(old_value) = old_value {
                    output_updates.push((old_value.clone(), ts.clone(), -1));
                }

                match &drain_style {
                    DrainStyle::AtTime { .. } => {
                        let new_value = match existing_value {
                            Some(existing_value) => existing_value
                                .into_provisional_tombstone(ts.clone(), Some(from_time.0.clone())),
                            None => StateValue::new_provisional_tombstone(
                                ts.clone(),
                                Some(from_time.0.clone()),
                            ),
                        };
                        existing_state_cell.replace(new_value);
                    }
                    DrainStyle::ToUpper { .. } => {
                        // Not writing down provisional values, or anything.
                    }
                }
            }
        }
    }

    match &drain_style {
        DrainStyle::AtTime { .. } => {
            match state
                .multi_put(
                    // We don't want to update per-record stats, like size of
                    // records indexed or count of records indexed.
                    //
                    // We only add provisional values and these will be
                    // overwritten once we receive updates for state from the
                    // persist input. And the merge functionality cannot know
                    // what was in state before merging, so it cannot correctly
                    // retract/update stats added here.
                    //
                    // Mostly, the merge functionality can't update those stats
                    // because merging happens in a function that we pass to
                    // rocksdb which doesn't have access to any external
                    // context. And in general, with rocksdb we do blind writes
                    // rather than inspect what was there before when
                    // updating/inserting.
                    false,
                    commands_state.drain(..).map(|(k, cv)| {
                        (
                            k,
                            upsert_types::PutValue {
                                value: cv.value.map(|cv| cv.into_decoded()),
                                previous_value_metadata: cv.metadata.map(|v| ValueMetadata {
                                    size: v.size.try_into().expect("less than i64 size"),
                                    is_tombstone: v.is_tombstone,
                                }),
                            },
                        )
                    }),
                )
                .await
            {
                Ok(_) => {}
                Err(e) => {
                    error_emitter
                        .emit("Failed to update records in state".to_string(), e)
                        .await;
                }
            }
        }
        style => {
            tracing::trace!(
                worker_id = %source_config.worker_id,
                source_id = %source_config.id,
                "not doing state update for drain style {:?}", style);
        }
    }

    min_remaining_time.into_option()
}