Skip to main content

mz_storage/
upsert_continual_feedback_v2.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Implementation of the feedback UPSERT operator.
11//!
12//! # Architecture
13//!
14//! The operator converts a stream of upsert commands `(key, Option<value>)` into
15//! a differential collection of `(key, value)` pairs, using a feedback loop
16//! through persist to maintain the "previous value" state needed for computing
17//! retractions.
18//!
19//! ## Dataflow topology
20//!
21//! ```text
22//!   Source input ──► ┌──────────┐ ──► Output ──► Persist
23//!                    │  Upsert  │
24//!   Persist read ──► └──────────┘
25//!       ▲                                           │
26//!       └───────────── feedback ────────────────────┘
27//! ```
28//!
29//! ## Operator loop (each iteration)
30//!
31//! 1. **Ingest source data.** Read upsert commands from the source input,
32//!    wrap each in an [`UpsertDiff`] (carrying a columnar order key projected
33//!    from `FromTime` via [`UpsertSourceTime`] for dedup), and push into the
34//!    source-stash batcher. The batcher is a paged columnar merge batcher: it
35//!    consolidates entries for the same `(key, time)` via the `UpsertDiff`
36//!    Semigroup — keeping the update with the highest order key (latest source
37//!    offset) — through amortized geometric merging as data is pushed in, and
38//!    pages cold chains out of RSS through the pager. This bounds resident
39//!    memory to O(unique key-time pairs) even during large source snapshots.
40//!
41//! 2. **Read persist frontier.** Check the probe on the persist arrangement
42//!    to learn which times have been committed. When the persist frontier
43//!    reaches the resume upper, rehydration is complete.
44//!
45//! 3. **Seal & drain.** Call `batcher.seal(input_upper)` to extract all
46//!    source-finalized entries as sorted, consolidated `Column` chunks. Each
47//!    entry is classified:
48//!    - **Eligible** (at the persist frontier): the persist trace has the
49//!      correct "before" state for this time. Look up the old value via a
50//!      cursor, emit a retraction if present, and emit the new value.
51//!    - **Ineligible** (between persist and input frontiers): persist hasn't
52//!      caught up yet. Push back into the batcher for the next iteration.
53//!    - **Already persisted** (below the persist frontier): some writer has
54//!      already advanced the shard past this time, so it is dropped. See
55//!      [`drain_sealed_input`] for why re-stashing it would strand the data
56//!      and pin the output frontier below the shard upper.
57//!
58//! 4. **Capability management.** Downgrade the output capability to the
59//!    minimum time of any remaining buffered data (in the batcher or pushed
60//!    back as ineligible). Drop the capability entirely when the batcher is
61//!    empty.
62//!
63//! ## Eligibility condition (total order)
64//!
65//! For a total-order timestamp with `input_upper = {i}` and
66//! `persist_upper = {p}`, an entry at time `ts` is eligible when
67//! `ts == p < i` — the source has finalized it and persist is exactly at
68//! that time, so the trace cursor returns the correct prior state. An entry
69//! with `p < ts` is ineligible (persist hasn't caught up), and one with
70//! `ts < p` is already persisted and dropped.
71
72use std::fmt::Debug;
73
74use columnar::Index as _;
75use differential_dataflow::difference::{IsZero, Semigroup};
76use differential_dataflow::hashable::Hashable;
77use differential_dataflow::lattice::Lattice;
78use differential_dataflow::logging::Logger;
79use differential_dataflow::operators::arrange::agent::TraceAgent;
80use differential_dataflow::operators::arrange::arrangement::arrange_core;
81use differential_dataflow::trace::{Batcher, Cursor, Description, TraceReader};
82use differential_dataflow::{AsCollection, VecCollection};
83use mz_repr::{Datum, Diff, GlobalId, Row};
84use mz_row_spine::{ValRowColPagedBuilder, ValRowSpine};
85use mz_storage_types::errors::{DataflowError, EnvelopeError, UpsertError};
86use mz_timely_util::builder_async::{
87    AsyncOutputHandle, Event as AsyncEvent, OperatorBuilder as AsyncOperatorBuilder,
88    PressOnDropButton,
89};
90use mz_timely_util::columnar::batcher::ColumnChunker;
91use mz_timely_util::columnar::builder::ColumnBuilder;
92use mz_timely_util::columnar::merge_batcher::ColumnMergeBatcher;
93use mz_timely_util::columnar::{Col2ValPagedBatcher, Column};
94use mz_timely_util::containers::stack::FueledBuilder;
95use std::convert::Infallible;
96use timely::container::{CapacityContainerBuilder, PushInto};
97use timely::dataflow::StreamVec;
98use timely::dataflow::channels::pact::{Exchange, Pipeline};
99use timely::dataflow::operators::generic::Operator;
100use timely::dataflow::operators::{Capability, CapabilitySet, Exchange as _};
101use timely::order::{PartialOrder, TotalOrder};
102use timely::progress::frontier::AntichainRef;
103use timely::progress::timestamp::Refines;
104use timely::progress::{Antichain, Timestamp};
105
106use crate::healthcheck::HealthStatusUpdate;
107use crate::metrics::upsert::UpsertMetrics;
108use crate::upsert::UpsertKey;
109use crate::upsert::UpsertSourceTime;
110use crate::upsert::UpsertValue;
111
112/// The persist-feedback arrangement's batcher, wrapping [`Col2ValPagedBatcher`]
113/// only to capture the storage upsert-stash pager at construction.
114///
115/// `arrange_core` builds its batcher via [`Batcher::new`], which has no pager
116/// hook, so a plain `Col2ValPagedBatcher` falls back to the process-global
117/// (compute) pager — meaning the feedback arrangement's spill would be gated by
118/// compute's `enable_column_paged_batcher_spill` rather than storage's
119/// `enable_upsert_paged_spill`. Injecting `upsert_stash_pager::pager()` in `new`
120/// puts the feedback arrangement under the same flag as the source stash. Every
121/// other method delegates to the inner batcher unchanged.
122struct UpsertFeedbackBatcher<T: columnar::Columnar>(Col2ValPagedBatcher<UpsertKey, Row, T, Diff>);
123
124impl<T> Batcher for UpsertFeedbackBatcher<T>
125where
126    T: Timestamp + columnar::Columnar + Default + PartialOrder,
127    for<'a> columnar::Ref<'a, T>: Copy + Ord,
128{
129    type Output = Column<((UpsertKey, Row), T, Diff)>;
130    type Time = T;
131
132    fn new(logger: Option<Logger>, operator_id: usize) -> Self {
133        let mut batcher =
134            <Col2ValPagedBatcher<UpsertKey, Row, T, Diff> as Batcher>::new(logger, operator_id);
135        batcher.set_pager(crate::upsert::upsert_stash_pager::pager());
136        Self(batcher)
137    }
138
139    fn seal(&mut self, upper: Antichain<T>) -> (Vec<Self::Output>, Description<T>) {
140        self.0.seal(upper)
141    }
142
143    fn frontier(&mut self) -> AntichainRef<'_, T> {
144        self.0.frontier()
145    }
146}
147
148impl<T> PushInto<Column<((UpsertKey, Row), T, Diff)>> for UpsertFeedbackBatcher<T>
149where
150    T: Timestamp + columnar::Columnar + Default + PartialOrder,
151    for<'a> columnar::Ref<'a, T>: Copy + Ord,
152{
153    fn push_into(&mut self, chunk: Column<((UpsertKey, Row), T, Diff)>) {
154        self.0.push_into(chunk)
155    }
156}
157
158// The source stash carries the upsert payload in a custom diff type so the
159// merge batcher consolidates by (key, time), keeping the update with the
160// highest `FromTime` (latest source offset) per group. The diff is `Columnar`
161// so the paged merge batcher can store it in a `Column` and page it out of RSS.
162//
163// The value is a tag-encoded `Row` (see `upsert_value_to_row`) rather than an
164// `UpsertValue`: folding both the `Ok` and `Err` arms into one `Row` lets the
165// value share a single columnar byte container, and `Row` already implements
166// `Columnar`. `None` is a deletion tombstone.
167
168// Derive ordering on the generated `UpsertDiffReference` too: the paged merge
169// batcher requires `Ref: Ord` to sort the `(key, time, diff)` columns it
170// consolidates. The derived order (by `from_time`, then `value`) is fine —
171// "max FromTime wins" can tie only between equal `from_time`s, and a source
172// never emits two distinct values for the same `(key, time, from_time)`, so
173// the consolidated result doesn't depend on the fold order of equal
174// `(key, time)` runs.
175#[derive(Clone, Debug, Default, columnar::Columnar)]
176#[columnar(derive(PartialEq, Eq, PartialOrd, Ord))]
177struct UpsertDiff<O> {
178    from_time: O,
179    value: Option<Row>,
180}
181
182impl<O> IsZero for UpsertDiff<O> {
183    fn is_zero(&self) -> bool {
184        false
185    }
186}
187
188impl<O: Ord + Clone> Semigroup for UpsertDiff<O> {
189    fn plus_equals(&mut self, rhs: &Self) {
190        if rhs.from_time > self.from_time {
191            *self = rhs.clone();
192        }
193    }
194}
195
196// Accumulate a borrowed columnar reference: the paged merge batcher consolidates
197// `Column`-resident diffs through this path on every fold of an equal
198// `(key, time)` run. Materialize only the order key to decide the "max FromTime
199// wins" comparison — copying the value `Row` out of the column solely when `rhs`
200// wins. Losing folds (the common case for a repeatedly-updated key) then pay no
201// `Row` copy at all.
202impl<'a, O> Semigroup<columnar::Ref<'a, UpsertDiff<O>>> for UpsertDiff<O>
203where
204    O: columnar::Columnar + Ord + Clone,
205{
206    fn plus_equals(&mut self, rhs: &columnar::Ref<'a, UpsertDiff<O>>) {
207        let rhs_from_time = <O as columnar::Columnar>::into_owned(rhs.from_time);
208        if rhs_from_time > self.from_time {
209            self.from_time = rhs_from_time;
210            self.value = <Option<Row> as columnar::Columnar>::into_owned(rhs.value);
211        }
212    }
213}
214
215/// Consolidate `updates` through `chunker` into `Column` chunks and push them
216/// into `batcher`, emptying `updates` (keeping its capacity). The chunker
217/// readies a fully-consolidated chunk per `push_into`, so the `extract` loop
218/// drains everything it produced.
219fn flush_to_batcher<T, O>(
220    updates: &mut Vec<UpsertUpdate<T, O>>,
221    chunker: &mut UpsertChunker<T, O>,
222    batcher: &mut UpsertBatcher<T, O>,
223) where
224    T: columnar::Columnar + Default + Clone + PartialOrder,
225    for<'a> columnar::Ref<'a, T>: Copy + Ord,
226    O: columnar::Columnar + Default + Ord + Clone,
227    for<'a> columnar::Ref<'a, O>: Ord,
228{
229    use timely::container::{ContainerBuilder as _, PushInto as _};
230    if updates.is_empty() {
231        return;
232    }
233    let mut raw: Column<UpsertUpdate<T, O>> = Default::default();
234    for update in updates.drain(..) {
235        raw.push_into(&update);
236    }
237    chunker.push_into(&mut raw);
238    while let Some(chunk) = chunker.extract() {
239        batcher.push_into(std::mem::take(chunk));
240    }
241}
242
243// The source stash uses the paged columnar merge batcher. Data is pushed in
244// unsorted; the batcher maintains geometrically-sized sorted chains and
245// consolidates via the UpsertDiff Semigroup automatically. Unlike DD's
246// in-memory `VecMerger`, this batcher stores each chain entry as a `Column`
247// routed through the process-global pager, so the not-yet-eligible backlog
248// (the snapshot / persist-lag window) pages out of RSS instead of growing it.
249
250/// One source-stash update: a key, its dataflow time, and the payload diff.
251/// `O` is the columnar order key projected from the source `FromTime` (see
252/// [`UpsertSourceTime`]).
253type UpsertUpdate<T, O> = (UpsertKey, T, UpsertDiff<O>);
254
255type UpsertBatcher<T, O> = ColumnMergeBatcher<UpsertKey, T, UpsertDiff<O>>;
256
257/// The chunker that sorts and consolidates raw input into the `Column` chunks
258/// [`UpsertBatcher`] consumes.
259type UpsertChunker<T, O> = ColumnChunker<UpsertUpdate<T, O>>;
260
261/// The operator's data-output handle. A fueled `Vec` builder so the drain can
262/// `give_fueled` each emitted update and yield to timely under large snapshot
263/// drains instead of monopolizing the worker.
264type UpsertOutputHandle<T> =
265    AsyncOutputHandle<T, FueledBuilder<CapacityContainerBuilder<Vec<(UpsertValue, T, Diff)>>>>;
266
267// The persist-feedback arrangement uses a `ValRowSpine<UpsertKey, _, _>`: keys
268// land in a columnation arena (`UpsertKey` is `[u8; 32]` + `Copy`, so it uses
269// `CopyRegion`), and values are stored as packed `Row` bytes in a
270// `DatumContainer`. `UpsertValue` is `Result<Row, Box<UpsertError>>`, so we
271// still need to fold both arms into a single `Row` with a leading tag column
272// so they share the value container.
273
274/// Encode an [`UpsertValue`] as a `Row` with a leading tag column so both `Ok`
275/// and `Err` payloads round-trip through `Row` byte storage.
276fn upsert_value_to_row(value: &UpsertValue) -> Row {
277    let mut row = Row::default();
278    let mut packer = row.packer();
279    match value {
280        Ok(ok) => {
281            packer.push(Datum::UInt8(0));
282            packer.extend(ok.iter());
283        }
284        Err(err) => {
285            packer.push(Datum::UInt8(1));
286            let bytes =
287                bincode::serialize(err.as_ref()).expect("UpsertError is serializable via bincode");
288            packer.push(Datum::Bytes(&bytes));
289        }
290    }
291    row
292}
293
294/// Heap-size estimate for an emitted [`UpsertValue`], used to drive
295/// `give_fueled` yielding on the output edge.
296fn upsert_value_byte_len(value: &UpsertValue) -> usize {
297    match value {
298        Ok(row) => row.byte_len(),
299        Err(err) => std::mem::size_of_val(err.as_ref()),
300    }
301}
302
303/// Decode an [`UpsertValue`] produced by [`upsert_value_to_row`] from any datum
304/// iterator — a `ValRowSpine` cursor's `DatumSeq` or a stashed `Row`'s `iter`.
305fn decode_upsert_value<'a>(mut iter: impl Iterator<Item = Datum<'a>>) -> UpsertValue {
306    let tag = match iter.next() {
307        Some(Datum::UInt8(tag)) => tag,
308        other => panic!("upsert value missing UInt8 tag, got {:?}", other),
309    };
310    match tag {
311        0 => {
312            let mut row = Row::default();
313            row.packer().extend(iter);
314            Ok(row)
315        }
316        1 => {
317            let bytes = match iter.next() {
318                Some(Datum::Bytes(b)) => b,
319                other => panic!("upsert error tag missing Bytes payload, got {:?}", other),
320            };
321            let err: UpsertError =
322                bincode::deserialize(bytes).expect("UpsertError bincode round-trip");
323            Err(Box::new(err))
324        }
325        tag => panic!("unknown upsert value tag {tag}"),
326    }
327}
328
329/// Transforms a stream of upserts (key-value updates) into a differential
330/// collection.
331///
332/// Persist feedback is arranged into a differential trace (DD manages the
333/// spine lifecycle). Source input is stashed with a custom `UpsertDiff`
334/// Semigroup that deduplicates by keeping the highest FromTime per (key, time).
335///
336/// Has two inputs:
337///   1. **Source input** — upsert commands from the external source.
338///   2. **Persist input** — feedback of the operator's own output, read back
339///      from persist.  Arranged into a trace for cursor-based lookups.
340#[allow(clippy::disallowed_methods)]
341pub fn upsert_inner<'scope, T, FromTime>(
342    input: VecCollection<'scope, T, (UpsertKey, Option<UpsertValue>, FromTime), Diff>,
343    key_indices: Vec<usize>,
344    resume_upper: Antichain<T>,
345    persist_input: VecCollection<'scope, T, Result<Row, DataflowError>, Diff>,
346    persist_token: Option<Vec<PressOnDropButton>>,
347    upsert_metrics: UpsertMetrics,
348    source_config: crate::source::SourceExportCreationConfig,
349) -> (
350    VecCollection<'scope, T, Result<Row, DataflowError>, Diff>,
351    StreamVec<'scope, T, (Option<GlobalId>, HealthStatusUpdate)>,
352    StreamVec<'scope, T, Infallible>,
353    PressOnDropButton,
354)
355where
356    T: Timestamp + TotalOrder + Sync,
357    T: Refines<mz_repr::Timestamp> + differential_dataflow::lattice::Lattice,
358    T: columnation::Columnation,
359    T: columnar::Columnar + Default,
360    for<'a> columnar::Ref<'a, T>: Copy + Ord,
361    FromTime: Debug + timely::ExchangeData + Clone + Ord + Sync,
362    FromTime: UpsertSourceTime,
363{
364    // Arrange persist feedback.
365    // Extract (UpsertKey, UpsertValue) from the persist feedback collection
366    // and arrange it. DD manages the spine, batching, and compaction.
367    let persist_keyed = persist_input.flat_map(move |result| {
368        let value = match result {
369            Ok(ok) => Ok(ok),
370            Err(DataflowError::EnvelopeError(err)) => match *err {
371                EnvelopeError::Upsert(err) => Err(Box::new(err)),
372                EnvelopeError::Flat(_) => return None,
373            },
374            Err(_) => return None,
375        };
376        let value_ref = match value {
377            Ok(ref row) => Ok(row),
378            Err(ref err) => Err(&**err),
379        };
380        Some((UpsertKey::from_value(value_ref, &key_indices), value))
381    });
382    let persist_keyed = persist_keyed
383        .inner
384        // The arrangement already implicitly exchanges by key, so this is redundant, but we want to
385        // do it earlier so that we can inspect the stream properly for source statistics.
386        .exchange(move |((key, _), _, _)| UpsertKey::hashed(key))
387        .as_collection()
388        .inspect(move |((_, row), _, diff)| {
389            source_config
390                .source_statistics
391                .update_records_indexed_by(diff.into_inner());
392            source_config.source_statistics.update_bytes_indexed_by(
393                row.as_ref().map_or(0, |r| r.byte_len().try_into().unwrap()) * diff.into_inner(),
394            );
395        });
396    // Encode (UpsertKey, UpsertValue) → (UpsertKey, Row) into `Column`
397    // containers so the feedback arrangement uses the paged columnar path: the
398    // batcher routes its spine input through the process-global pager, paging
399    // cold feedback chains out of RSS, while `ValRowSpine` keeps keys in a
400    // columnation arena (UpsertKey is fixed-size [u8; 32]) and values as packed
401    // `Row` bytes in a `DatumContainer`. Built with `Pipeline` so we keep the
402    // locality established by the `UpsertKey::hashed` exchange above.
403    let encoded = persist_keyed
404        .inner
405        .unary::<ColumnBuilder<((UpsertKey, Row), T, Diff)>, _, _, _>(
406            Pipeline,
407            "Persist feedback encode",
408            |_, _| {
409                move |input, output| {
410                    input.for_each(|time, data| {
411                        let mut session = output.session_with_builder(&time);
412                        for ((key, value), ts, diff) in data.drain(..) {
413                            let row = upsert_value_to_row(&value);
414                            session.give(((&key, &row), &ts, &diff));
415                        }
416                    });
417                }
418            },
419        );
420    let persist_arranged = arrange_core::<
421        _,
422        _,
423        ColumnChunker<((UpsertKey, Row), T, Diff)>,
424        UpsertFeedbackBatcher<T>,
425        ValRowColPagedBuilder<UpsertKey, T, Diff>,
426        ValRowSpine<UpsertKey, T, Diff>,
427    >(encoded, Pipeline, "Persist feedback");
428    let mut persist_trace = persist_arranged.trace.clone();
429
430    // Probe the persist arrangement's stream for frontier tracking.
431    // This replaces receiving the batch stream as an input — we just
432    // read the probe frontier to know when persist has caught up.
433    use timely::dataflow::operators::Probe;
434    let (persist_probe, _persist_probe_stream) = persist_arranged.stream.probe();
435
436    // Build the async processing operator.
437    let mut builder = AsyncOperatorBuilder::new("Upsert V2".to_string(), input.scope());
438
439    let (output_handle, output) = builder
440        .new_output::<FueledBuilder<CapacityContainerBuilder<Vec<(UpsertValue, T, Diff)>>>>();
441    let (_snapshot_handle, snapshot_stream) =
442        builder.new_output::<CapacityContainerBuilder<Vec<Infallible>>>();
443    let (_health_output, health_stream) = builder
444        .new_output::<CapacityContainerBuilder<Vec<(Option<GlobalId>, HealthStatusUpdate)>>>();
445
446    let mut input = builder.new_input_for(
447        input.inner,
448        Exchange::new(move |((key, _, _), _, _)| UpsertKey::hashed(key)),
449        &output_handle,
450    );
451
452    // We still need the persist stream as an input so the operator wakes
453    // when the persist arrangement produces batches (frontier advances).
454    // We read the actual frontier from the probe though.
455    let mut persist_wakeup = builder.new_disconnected_input(_persist_probe_stream, Pipeline);
456
457    let shutdown_button = builder.build(move |caps| async move {
458        // Hold the persist source tokens for the operator's lifetime so the
459        // feedback shard stays open until shutdown.
460        let _persist_token = persist_token;
461
462        let [output_cap, snapshot_cap, _health_cap]: [_; 3] = caps.try_into().unwrap();
463        drop(output_cap);
464        let mut snapshot_cap = CapabilitySet::from_elem(snapshot_cap);
465
466        let mut hydrating = true;
467
468        // Source stash backed by the paged columnar merge batcher. The batcher
469        // maintains geometrically-sized sorted chains and consolidates via the
470        // UpsertDiff Semigroup as data is pushed in, bounding memory to
471        // O(unique key-time pairs) even during large initial snapshots, and
472        // pages cold chains out of RSS through the pager.
473        //
474        // The pager is storage-owned (configured by `UpdateConfiguration` from
475        // storage's own dyncfgs), distinct from the compute column-paged
476        // batcher's process-global pager. Captured once here: backend / budget /
477        // codec tunes take effect live (the policy is reconfigured in place),
478        // but flipping the enable flag takes effect on dataflows created after
479        // the change. While disabled, the pager keeps every chunk resident.
480        let mut batcher: UpsertBatcher<T, FromTime::Order> = Batcher::new(None, 0);
481        batcher.set_pager(crate::upsert::upsert_stash_pager::pager());
482        // The chunker sorts and consolidates raw input into the `Column` chunks
483        // the batcher consumes.
484        let mut chunker: UpsertChunker<T, FromTime::Order> = Default::default();
485        // Scratch buffer for accumulating source events before flushing to
486        // the batcher. Drained on each iteration via the chunker.
487        let mut push_buffer: Vec<UpsertUpdate<T, FromTime::Order>> = Vec::new();
488
489        // Capability held at the minimum time of any buffered data. When
490        // Some, the operator may still produce output; when None, the
491        // batcher is empty.
492        let mut stash_cap: Option<Capability<T>> = None;
493        let mut input_upper = Antichain::from_elem(Timestamp::minimum());
494
495        let snapshot_start = std::time::Instant::now();
496        let mut prev_persist_upper = Antichain::from_elem(Timestamp::minimum());
497
498        // Accumulators for rehydration metrics, set as gauges when rehydration completes.
499        let mut rehydration_total: u64 = 0;
500        let mut rehydration_updates: u64 = 0;
501
502        // Main operator loop. Each iteration performs four steps:
503        //   Step 1: Ingest source data into the batcher.
504        //   Step 2: Read the persist frontier and update rehydration state.
505        //   Step 3: Seal the batcher, drain eligible entries, push back the rest.
506        //   Step 4: Manage the output capability.
507        loop {
508            // Block until woken by source input or a persist frontier advance.
509            tokio::select! {
510                _ = input.ready() => {}
511                _ = persist_wakeup.ready() => {
512                    while persist_wakeup.next_sync().is_some() {}
513                }
514            }
515
516            // Step 1: Ingest source data.
517            // Read all available source events, wrap each value in an
518            // UpsertDiff (carrying FromTime for dedup), and buffer them.
519            // Events before the resume_upper are dropped (already persisted).
520            while let Some(event) = input.next_sync() {
521                match event {
522                    AsyncEvent::Data(cap, data) => {
523                        let mut pushed_any = false;
524                        for ((key, value, from_time), ts, diff) in data {
525                            assert!(diff.is_positive(), "invalid upsert input");
526                            if PartialOrder::less_equal(&input_upper, &resume_upper)
527                                && !resume_upper.less_equal(&ts)
528                            {
529                                continue;
530                            }
531                            let value = value.as_ref().map(upsert_value_to_row);
532                            let from_time = from_time.upsert_order();
533                            push_buffer.push((key, ts, UpsertDiff { from_time, value }));
534                            pushed_any = true;
535                        }
536                        // Track the minimum capability across all buffered data
537                        // so we can emit output at the correct times.
538                        if pushed_any {
539                            stash_cap = Some(match stash_cap {
540                                Some(prev) if cap.time() < prev.time() => cap,
541                                Some(prev) => prev,
542                                None => cap,
543                            });
544                        }
545                    }
546                    AsyncEvent::Progress(upper) => {
547                        if PartialOrder::less_than(&upper, &resume_upper) {
548                            continue;
549                        }
550                        input_upper = upper;
551                    }
552                }
553            }
554
555            // Flush buffered events through the chunker into the batcher. This
556            // triggers the chunker + geometric chain merging, which consolidates
557            // entries for the same (key, time) via the UpsertDiff Semigroup.
558            flush_to_batcher(&mut push_buffer, &mut chunker, &mut batcher);
559
560            // Step 2: Read persist frontier.
561            // The persist probe tells us which output times have been
562            // committed back through the feedback loop. This determines:
563            //   - Whether rehydration is complete (persist >= resume_upper).
564            //   - Which source entries are eligible for processing (their
565            //     time must equal persist_upper so the trace cursor returns
566            //     the correct prior state).
567            //   - How far to compact the persist trace.
568            let persist_upper = persist_probe.with_frontier(|f| f.to_owned());
569
570            if persist_upper != prev_persist_upper {
571                let last_rehydration_chunk =
572                    hydrating && PartialOrder::less_equal(&resume_upper, &persist_upper);
573
574                if last_rehydration_chunk {
575                    hydrating = false;
576                    upsert_metrics
577                        .rehydration_latency
578                        .set(snapshot_start.elapsed().as_secs_f64());
579                    upsert_metrics.rehydration_total.set(rehydration_total);
580                    upsert_metrics.rehydration_updates.set(rehydration_updates);
581                    tracing::info!(
582                        worker_id = %source_config.worker_id,
583                        source_id = %source_config.id,
584                        "upsert finished rehydration",
585                    );
586                    snapshot_cap.downgrade(&[]);
587                }
588
589                let _ = snapshot_cap.try_downgrade(persist_upper.iter());
590
591                // Compact the trace so the spine can merge old batches.
592                persist_trace.set_logical_compaction(persist_upper.borrow());
593                persist_trace.set_physical_compaction(persist_upper.borrow());
594
595                prev_persist_upper = persist_upper.clone();
596            }
597
598            // Step 3: Seal & drain.
599            // Seal the batcher at input_upper to extract all source-finalized
600            // entries as sorted, consolidated chunks. The seal merges all
601            // internal chains (O(N) linear merge of sorted data) and splits
602            // by time: entries at ts < input_upper are extracted, the rest
603            // stay in the batcher.
604            //
605            // Extracted entries are partitioned into:
606            //   - Eligible (ts == persist_upper): processed now via cursor
607            //     lookup on the persist trace.
608            //   - Ineligible (persist_upper < ts < input_upper): persist
609            //     hasn't caught up yet; pushed back into the batcher.
610            //
611            // We skip the seal entirely unless an eligible entry is at all
612            // possible. `seal` performs an O(N) merge of all chains
613            // regardless of how much it extracts, so calling it when nothing
614            // can be processed makes the operator quadratic in the number of
615            // wakeups (a real pathology during upstream snapshots and during
616            // rehydration when the source races ahead of persist).
617            //
618            // For an entry at `ts` to be eligible we need
619            // `ts == persist_upper && ts < input_upper`. The necessary
620            // preconditions, expressible without scanning the batcher:
621            //   1. `cap.time() <= persist_upper`. Since `cap.time()` is
622            //      maintained as a lower bound on `min(ts in batcher)`, if
623            //      `cap.time() > persist_upper` then every buffered ts is
624            //      strictly above persist_upper and none can equal it.
625            //   2. `persist_upper < input_upper`. Otherwise no `ts` that
626            //      satisfies `ts == persist_upper` can also satisfy
627            //      `ts < input_upper`.
628            //
629            // This naturally covers both the post-hydration source-snapshot
630            // case (cap == persist == input → condition 2 fails) and the
631            // rehydration-with-source-ahead case (cap > persist → condition
632            // 1 fails). It also no-ops correctly when persist has shut down
633            // (empty persist_upper makes condition 2 vacuously false).
634            if let Some(cap) = stash_cap.as_mut()
635                && !persist_upper.less_than(cap.time())
636                && PartialOrder::less_than(&persist_upper, &input_upper)
637            {
638                // Step 1 already consolidated `push_buffer` through the chunker
639                // (which readies a complete chunk per `push_into`), so the
640                // chunker holds nothing pending here and we can seal directly.
641                let (sealed, _description) = batcher.seal(input_upper.clone());
642                // Frontier of data remaining in the batcher (ts >= input_upper).
643                let remaining_frontier = batcher.frontier().to_owned();
644
645                let mut ineligible = Vec::new();
646                // `drain_sealed_input` emits eligible output directly through
647                // `output_handle` (fueled), so there is no intermediate output
648                // buffer to drain afterward.
649                let drain_stats = drain_sealed_input(
650                    sealed,
651                    &mut ineligible,
652                    &output_handle,
653                    &*cap,
654                    &persist_upper,
655                    &mut persist_trace,
656                    &source_config.worker_id,
657                    &source_config.id,
658                )
659                .await;
660
661                upsert_metrics.multi_get_size.inc_by(drain_stats.eligible);
662                upsert_metrics
663                    .multi_get_result_count
664                    .inc_by(drain_stats.result_count);
665                upsert_metrics
666                    .multi_put_size
667                    .inc_by(drain_stats.output_count);
668                upsert_metrics.upsert_inserts.inc_by(drain_stats.inserts);
669                upsert_metrics.upsert_updates.inc_by(drain_stats.updates);
670                upsert_metrics.upsert_deletes.inc_by(drain_stats.deletes);
671
672                if hydrating {
673                    rehydration_total += drain_stats.inserts;
674                    rehydration_updates += drain_stats.eligible;
675                }
676
677                // Step 4: Capability management.
678                // Downgrade the output capability to the minimum time of any
679                // remaining data: either entries still in the batcher (above
680                // input_upper) or ineligible entries being pushed back.
681                let min_ineligible_ts = ineligible.iter().map(|(_, ts, _)| ts).min().cloned();
682                flush_to_batcher(&mut ineligible, &mut chunker, &mut batcher);
683
684                let has_remaining = !remaining_frontier.is_empty() || min_ineligible_ts.is_some();
685                if has_remaining {
686                    let min_ts = match (
687                        remaining_frontier.elements().first(),
688                        min_ineligible_ts.as_ref(),
689                    ) {
690                        (Some(a), Some(b)) => std::cmp::min(a, b).clone(),
691                        (Some(a), None) => a.clone(),
692                        (None, Some(b)) => b.clone(),
693                        (None, None) => unreachable!(),
694                    };
695                    cap.downgrade(&min_ts);
696                } else {
697                    // Batcher is completely empty — drop the capability so
698                    // downstream operators can make progress.
699                    stash_cap = None;
700                }
701            }
702
703            if input_upper.is_empty() {
704                break;
705            }
706        }
707    });
708
709    (
710        output
711            .as_collection()
712            .map(|result: UpsertValue| match result {
713                Ok(ok) => Ok(ok),
714                Err(err) => Err(DataflowError::from(EnvelopeError::Upsert(*err))),
715            }),
716        health_stream,
717        snapshot_stream,
718        shutdown_button.press_on_drop(),
719    )
720}
721
722/// Counts from a single call to [`drain_sealed_input`], used to update metrics.
723struct DrainStats {
724    /// Number of entries looked up in the persist trace (cursor seeks).
725    eligible: u64,
726    /// Number of cursor lookups that found an existing value.
727    result_count: u64,
728    /// New value written with no prior value (insert).
729    inserts: u64,
730    /// New value written over an existing value (update).
731    updates: u64,
732    /// Tombstone (None) applied to an existing value (delete).
733    deletes: u64,
734    /// Total output records emitted (retractions + insertions).
735    output_count: u64,
736}
737
738/// Process sealed chunks from the batcher, classifying each entry by its
739/// timestamp relative to `persist_upper`: entries at the frontier are eligible
740/// for processing now (cursor lookup + output), entries above it are returned
741/// in `ineligible` for re-stashing, and entries below it are already persisted
742/// and dropped (see the body for why).
743///
744/// The sealed chunks are already sorted and consolidated by the MergeBatcher,
745/// so the trace cursor walks forward through keys in order — seeks amortize.
746async fn drain_sealed_input<T, O>(
747    sealed: Vec<Column<UpsertUpdate<T, O>>>,
748    ineligible: &mut Vec<UpsertUpdate<T, O>>,
749    output_handle: &UpsertOutputHandle<T>,
750    output_cap: &Capability<T>,
751    persist_upper: &Antichain<T>,
752    trace: &mut TraceAgent<ValRowSpine<UpsertKey, T, Diff>>,
753    worker_id: &usize,
754    source_id: &GlobalId,
755) -> DrainStats
756where
757    T: TotalOrder + Lattice + timely::ExchangeData + Timestamp + Clone + Debug + Ord + Sync,
758    T: columnation::Columnation + columnar::Columnar,
759    O: columnar::Columnar,
760{
761    // Classify each entry by its timestamp relative to `persist_upper`:
762    //
763    //   * `ts == persist_upper`: eligible for processing now.
764    //   * `ts >  persist_upper`: not yet processable; re-stashed (ineligible)
765    //     until the feedback frontier catches up to it.
766    //   * `ts <  persist_upper`: already persisted by some writer and not
767    //     relevant anymore. We DROP it. The downstream persist_sink would
768    //     filter such updates out anyway since the shard upper is further
769    //     ahead, and our state is already up-to-date to `persist_upper` so we
770    //     could not emit correct retractions for it. Re-stashing it would
771    //     strand the data forever (`persist_upper` only advances, so
772    //     `ts == persist_upper` can never again hold) and pin the operator's
773    //     output frontier below the shard upper. This mirrors v1's
774    //     `relevant = persist_upper.less_equal(ts)`.
775    // Walk the sealed chunks by reference rather than collecting the eligible
776    // set into an owned Vec. The chunks are globally sorted (the seal merges
777    // all chains into one run), so the cursor seeks still walk forward and
778    // amortize, and eligible values are emitted straight from the column's
779    // `RowRef` with no owned `UpsertDiff` copy. Only the re-stashed ineligible
780    // set is materialized.
781    let mut eligible_count: u64 = 0;
782    let mut result_count: u64 = 0;
783    let mut output_count: u64 = 0;
784    let mut inserts: u64 = 0;
785    let mut updates: u64 = 0;
786    let mut deletes: u64 = 0;
787
788    let (mut cursor, storage) = trace.cursor();
789
790    for chunk in &sealed {
791        for (key, ts, diff) in chunk.borrow().into_index_iter() {
792            let ts = <T as columnar::Columnar>::into_owned(ts);
793            if !persist_upper.less_equal(&ts) {
794                // ts < persist_upper: drop.
795                continue;
796            }
797            if persist_upper.less_than(&ts) {
798                // ts > persist_upper: re-stash for later (owned).
799                ineligible.push((
800                    *key,
801                    ts,
802                    <UpsertDiff<O> as columnar::Columnar>::into_owned(diff),
803                ));
804                continue;
805            }
806
807            // ts == persist_upper: eligible. Look up the prior value for this
808            // key in the persist trace and emit the retraction / insertion. The
809            // spine stores keys in a columnation arena, so we seek by the
810            // column's borrowed `&UpsertKey` directly.
811            eligible_count += 1;
812            cursor.seek_key(&storage, key);
813            let old_value = match cursor.get_key(&storage) {
814                Some(found) if found == key => {
815                    let mut result = None;
816                    while let Some(val) = cursor.get_val(&storage) {
817                        let mut count = Diff::ZERO;
818                        cursor.map_times(&storage, |_time, d| {
819                            count += d.clone();
820                        });
821                        if count.is_positive() {
822                            assert!(
823                                count == 1.into(),
824                                "unexpected multiple entries for the same key in persist trace"
825                            );
826                            assert!(
827                                result.is_none(),
828                                "unexpected multiple values for the same key in persist trace"
829                            );
830                            result = Some(decode_upsert_value(val));
831                        }
832                        cursor.step_val(&storage);
833                    }
834                    result
835                }
836                _ => None,
837            };
838
839            if old_value.is_some() {
840                result_count += 1;
841            }
842
843            match diff.value {
844                Some(row) => {
845                    if let Some(old_val) = old_value {
846                        let size = upsert_value_byte_len(&old_val);
847                        output_handle
848                            .give_fueled(output_cap, (old_val, ts.clone(), Diff::MINUS_ONE), size)
849                            .await;
850                        output_count += 1;
851                        updates += 1;
852                    } else {
853                        inserts += 1;
854                    }
855                    let new_val = decode_upsert_value(row.iter());
856                    let size = upsert_value_byte_len(&new_val);
857                    output_handle
858                        .give_fueled(output_cap, (new_val, ts, Diff::ONE), size)
859                        .await;
860                    output_count += 1;
861                }
862                None => {
863                    if let Some(old_val) = old_value {
864                        let size = upsert_value_byte_len(&old_val);
865                        output_handle
866                            .give_fueled(output_cap, (old_val, ts, Diff::MINUS_ONE), size)
867                            .await;
868                        output_count += 1;
869                        deletes += 1;
870                    }
871                }
872            }
873        }
874    }
875
876    tracing::debug!(
877        worker_id = %worker_id,
878        source_id = %source_id,
879        ineligible = ineligible.len(),
880        eligible = eligible_count,
881        "drained stash",
882    );
883
884    DrainStats {
885        eligible: eligible_count,
886        result_count,
887        inserts,
888        updates,
889        deletes,
890        output_count,
891    }
892}
893
894#[cfg(test)]
895mod test {
896    use mz_ore::metrics::MetricsRegistry;
897    use mz_persist_types::ShardId;
898    use mz_repr::{Datum, Timestamp as MzTimestamp};
899    use mz_storage_operators::persist_source::Subtime;
900    use mz_storage_types::sources::SourceEnvelope;
901    use mz_storage_types::sources::envelope::{KeyEnvelope, UpsertEnvelope, UpsertStyle};
902    use timely::dataflow::operators::capture::Extract;
903    use timely::dataflow::operators::{Capture, Input};
904    use timely::progress::Timestamp;
905
906    use crate::metrics::StorageMetrics;
907    use crate::metrics::upsert::UpsertMetricDefs;
908    use crate::source::SourceExportCreationConfig;
909    use crate::statistics::{SourceStatistics, SourceStatisticsMetricDefs};
910
911    use super::*;
912
913    // The tests drive the operator with a plain integer `FromTime` standing in
914    // for a Kafka offset; project it to itself so dedup orders by it directly.
915    impl UpsertSourceTime for i32 {
916        type Order = i32;
917        fn upsert_order(&self) -> i32 {
918            *self
919        }
920    }
921
922    type Ts = (MzTimestamp, Subtime);
923
924    fn new_ts(ts: u64) -> Ts {
925        (MzTimestamp::new(ts), Subtime::minimum())
926    }
927
928    fn key(k: i64) -> UpsertKey {
929        UpsertKey::from_key(Ok(&Row::pack_slice(&[Datum::Int64(k)])))
930    }
931
932    fn row(k: i64, v: i64) -> Row {
933        Row::pack_slice(&[Datum::Int64(k), Datum::Int64(v)])
934    }
935
936    macro_rules! upsert_test {
937        (|$input:ident, $persist:ident, $worker:ident| $body:block) => {{
938            let output_handle = timely::execute_directly(move |$worker| {
939                let (mut $input, mut $persist, output_handle) = $worker
940                    .dataflow::<MzTimestamp, _, _>(|scope| {
941                        scope.scoped::<Ts, _, _>("upsert", |scope| {
942                            let (input_handle, input) = scope.new_input();
943                            let (persist_handle, persist_input) = scope.new_input();
944                            let source_id = GlobalId::User(0);
945
946                            let reg = MetricsRegistry::new();
947                            let upsert_defs = UpsertMetricDefs::register_with(&reg);
948                            let upsert_metrics =
949                                UpsertMetrics::new(&upsert_defs, source_id, 0, None);
950
951                            let reg2 = MetricsRegistry::new();
952                            let storage_metrics = StorageMetrics::register_with(&reg2);
953
954                            let reg3 = MetricsRegistry::new();
955                            let stats_defs =
956                                SourceStatisticsMetricDefs::register_with(&reg3);
957                            let envelope = SourceEnvelope::Upsert(UpsertEnvelope {
958                                source_arity: 2,
959                                style: UpsertStyle::Default(KeyEnvelope::Flattened),
960                                key_indices: vec![0],
961                            });
962                            let source_statistics = SourceStatistics::new(
963                                source_id, 0, &stats_defs, source_id, &ShardId::new(),
964                                envelope, Antichain::from_elem(Timestamp::minimum()),
965                            );
966                            let source_config = SourceExportCreationConfig {
967                                id: source_id,
968                                worker_id: 0,
969                                metrics: storage_metrics,
970                                source_statistics,
971                            };
972
973                            let (output, _, _, button) = upsert_inner(
974                                input.as_collection(),
975                                vec![0],
976                                Antichain::from_elem(Timestamp::minimum()),
977                                persist_input.as_collection(),
978                                None,
979                                upsert_metrics,
980                                source_config,
981                            );
982                            std::mem::forget(button);
983                            (input_handle, persist_handle, output.inner.capture())
984                        })
985                    });
986
987                $body
988
989                output_handle
990            });
991
992            let mut actual: Vec<_> = output_handle
993                .extract()
994                .into_iter()
995                .flat_map(|(_cap, container)| container)
996                .collect();
997            differential_dataflow::consolidation::consolidate_updates(&mut actual);
998            actual
999        }};
1000    }
1001
1002    #[mz_ore::test]
1003    #[cfg_attr(miri, ignore)]
1004    fn gh_9160_repro() {
1005        let actual = upsert_test!(|input, persist, worker| {
1006            let key0 = key(0);
1007            let key1 = key(1);
1008            let value1 = row(0, 0);
1009            let value3 = row(0, 1);
1010            let value4 = row(0, 2);
1011
1012            input.send(((key0, Some(Ok(value1.clone())), 1), new_ts(0), Diff::ONE));
1013            input.advance_to(new_ts(2));
1014            worker.step();
1015
1016            persist.send((Ok(value1), new_ts(0), Diff::ONE));
1017            persist.advance_to(new_ts(1));
1018            worker.step();
1019
1020            input.send_batch(&mut vec![
1021                ((key1, None, 2), new_ts(2), Diff::ONE),
1022                ((key0, Some(Ok(value3)), 3), new_ts(3), Diff::ONE),
1023            ]);
1024            input.advance_to(new_ts(3));
1025            input.send_batch(&mut vec![(
1026                (key0, Some(Ok(value4)), 4),
1027                new_ts(3),
1028                Diff::ONE,
1029            )]);
1030            input.advance_to(new_ts(4));
1031            worker.step();
1032
1033            persist.advance_to(new_ts(3));
1034            worker.step();
1035        });
1036
1037        let value1 = row(0, 0);
1038        let value4 = row(0, 2);
1039        let expected: Vec<(Result<Row, DataflowError>, _, _)> = vec![
1040            (Ok(value1.clone()), new_ts(0), Diff::ONE),
1041            (Ok(value1), new_ts(3), Diff::MINUS_ONE),
1042            (Ok(value4), new_ts(3), Diff::ONE),
1043        ];
1044        assert_eq!(actual, expected);
1045    }
1046
1047    #[mz_ore::test]
1048    #[cfg_attr(miri, ignore)]
1049    fn out_of_order_keys_across_timestamps() {
1050        let actual = upsert_test!(|input, persist, worker| {
1051            let key_high = key(99);
1052            let key_low = key(1);
1053            let val_a = row(99, 1);
1054            let val_b = row(1, 2);
1055
1056            input.send(((key_high, Some(Ok(val_a.clone())), 1), new_ts(0), Diff::ONE));
1057            input.advance_to(new_ts(1));
1058            worker.step();
1059            persist.send((Ok(val_a.clone()), new_ts(0), Diff::ONE));
1060            persist.advance_to(new_ts(1));
1061            worker.step();
1062
1063            input.send(((key_low, Some(Ok(val_b.clone())), 2), new_ts(1), Diff::ONE));
1064            input.advance_to(new_ts(2));
1065            worker.step();
1066            persist.send((Ok(val_b.clone()), new_ts(1), Diff::ONE));
1067            persist.advance_to(new_ts(2));
1068            worker.step();
1069
1070            let val_a2 = row(99, 10);
1071            let val_b2 = row(1, 20);
1072            input.send_batch(&mut vec![
1073                (
1074                    (key_high, Some(Ok(val_a2.clone())), 3),
1075                    new_ts(2),
1076                    Diff::ONE,
1077                ),
1078                ((key_low, Some(Ok(val_b2.clone())), 4), new_ts(2), Diff::ONE),
1079            ]);
1080            input.advance_to(new_ts(3));
1081            worker.step();
1082            persist.advance_to(new_ts(3));
1083            worker.step();
1084        });
1085
1086        let val_a = row(99, 1);
1087        let val_b = row(1, 2);
1088        let val_a2 = row(99, 10);
1089        let val_b2 = row(1, 20);
1090        let expected: Vec<(Result<Row, DataflowError>, _, _)> = vec![
1091            (Ok(val_b.clone()), new_ts(1), Diff::ONE),
1092            (Ok(val_b), new_ts(2), Diff::MINUS_ONE),
1093            (Ok(val_b2), new_ts(2), Diff::ONE),
1094            (Ok(val_a.clone()), new_ts(0), Diff::ONE),
1095            (Ok(val_a), new_ts(2), Diff::MINUS_ONE),
1096            (Ok(val_a2), new_ts(2), Diff::ONE),
1097        ];
1098        let mut actual_sorted = actual;
1099        let mut expected_sorted = expected;
1100        actual_sorted.sort();
1101        expected_sorted.sort();
1102        assert_eq!(actual_sorted, expected_sorted);
1103    }
1104
1105    #[mz_ore::test]
1106    #[cfg_attr(miri, ignore)]
1107    fn rehydration_then_update() {
1108        let actual = upsert_test!(|input, persist, worker| {
1109            let k = key(42);
1110            let old_val = row(42, 100);
1111            let new_val = row(42, 200);
1112
1113            persist.send((Ok(old_val), new_ts(0), Diff::ONE));
1114            persist.advance_to(new_ts(1));
1115            worker.step();
1116
1117            input.send(((k, Some(Ok(new_val)), 1), new_ts(1), Diff::ONE));
1118            input.advance_to(new_ts(2));
1119            worker.step();
1120            persist.advance_to(new_ts(2));
1121            worker.step();
1122        });
1123
1124        let old_val = row(42, 100);
1125        let new_val = row(42, 200);
1126        let expected: Vec<(Result<Row, DataflowError>, _, _)> = vec![
1127            (Ok(old_val), new_ts(1), Diff::MINUS_ONE),
1128            (Ok(new_val), new_ts(1), Diff::ONE),
1129        ];
1130        assert_eq!(actual, expected);
1131    }
1132
1133    #[mz_ore::test]
1134    #[cfg_attr(miri, ignore)]
1135    fn delete_existing_key() {
1136        let actual = upsert_test!(|input, persist, worker| {
1137            let k = key(7);
1138            let val = row(7, 77);
1139
1140            input.send(((k, Some(Ok(val.clone())), 1), new_ts(0), Diff::ONE));
1141            input.advance_to(new_ts(1));
1142            worker.step();
1143            persist.send((Ok(val), new_ts(0), Diff::ONE));
1144            persist.advance_to(new_ts(1));
1145            worker.step();
1146
1147            input.send(((k, None, 2), new_ts(1), Diff::ONE));
1148            input.advance_to(new_ts(2));
1149            worker.step();
1150            persist.advance_to(new_ts(2));
1151            worker.step();
1152        });
1153
1154        let val = row(7, 77);
1155        let expected: Vec<(Result<Row, DataflowError>, _, _)> = vec![
1156            (Ok(val.clone()), new_ts(0), Diff::ONE),
1157            (Ok(val), new_ts(1), Diff::MINUS_ONE),
1158        ];
1159        assert_eq!(actual, expected);
1160    }
1161
1162    #[mz_ore::test]
1163    #[cfg_attr(miri, ignore)]
1164    fn multi_batch_rehydration() {
1165        let actual = upsert_test!(|input, persist, worker| {
1166            let k = key(5);
1167            let old_val = row(5, 10);
1168            let new_val = row(5, 20);
1169            let updated_val = row(5, 30);
1170
1171            persist.send((Ok(old_val.clone()), new_ts(0), Diff::ONE));
1172            persist.send((Ok(old_val), new_ts(0), Diff::MINUS_ONE));
1173            persist.send((Ok(new_val), new_ts(0), Diff::ONE));
1174            persist.advance_to(new_ts(1));
1175            worker.step();
1176
1177            input.send(((k, Some(Ok(updated_val)), 1), new_ts(1), Diff::ONE));
1178            input.advance_to(new_ts(2));
1179            worker.step();
1180            persist.advance_to(new_ts(2));
1181            worker.step();
1182        });
1183
1184        let new_val = row(5, 20);
1185        let updated_val = row(5, 30);
1186        let expected: Vec<(Result<Row, DataflowError>, _, _)> = vec![
1187            (Ok(new_val), new_ts(1), Diff::MINUS_ONE),
1188            (Ok(updated_val), new_ts(1), Diff::ONE),
1189        ];
1190        assert_eq!(actual, expected);
1191    }
1192
1193    #[mz_ore::test]
1194    #[cfg_attr(miri, ignore)]
1195    fn delete_nonexistent_key() {
1196        let actual = upsert_test!(|input, persist, worker| {
1197            let k = key(99);
1198
1199            persist.advance_to(new_ts(1));
1200            worker.step();
1201
1202            input.send(((k, None, 1), new_ts(1), Diff::ONE));
1203            input.advance_to(new_ts(2));
1204            worker.step();
1205            persist.advance_to(new_ts(2));
1206            worker.step();
1207        });
1208
1209        assert!(actual.is_empty(), "expected empty output, got: {actual:?}");
1210    }
1211
1212    #[mz_ore::test]
1213    #[cfg_attr(miri, ignore)]
1214    fn reinsert_after_delete() {
1215        let actual = upsert_test!(|input, persist, worker| {
1216            let k = key(3);
1217            let val_a = row(3, 10);
1218            let val_b = row(3, 20);
1219
1220            input.send(((k, Some(Ok(val_a.clone())), 1), new_ts(0), Diff::ONE));
1221            input.advance_to(new_ts(1));
1222            worker.step();
1223            persist.send((Ok(val_a.clone()), new_ts(0), Diff::ONE));
1224            persist.advance_to(new_ts(1));
1225            worker.step();
1226
1227            input.send(((k, None, 2), new_ts(1), Diff::ONE));
1228            input.advance_to(new_ts(2));
1229            worker.step();
1230            persist.send((Ok(val_a), new_ts(1), Diff::MINUS_ONE));
1231            persist.advance_to(new_ts(2));
1232            worker.step();
1233
1234            input.send(((k, Some(Ok(val_b.clone())), 3), new_ts(2), Diff::ONE));
1235            input.advance_to(new_ts(3));
1236            worker.step();
1237            persist.advance_to(new_ts(3));
1238            worker.step();
1239        });
1240
1241        let val_a = row(3, 10);
1242        let val_b = row(3, 20);
1243        let mut expected: Vec<(Result<Row, DataflowError>, _, _)> = vec![
1244            (Ok(val_a.clone()), new_ts(0), Diff::ONE),
1245            (Ok(val_a), new_ts(1), Diff::MINUS_ONE),
1246            (Ok(val_b), new_ts(2), Diff::ONE),
1247        ];
1248        expected.sort();
1249        let mut actual = actual;
1250        actual.sort();
1251        assert_eq!(actual, expected);
1252    }
1253
1254    #[mz_ore::test]
1255    #[cfg_attr(miri, ignore)]
1256    fn idempotent_update() {
1257        let actual = upsert_test!(|input, persist, worker| {
1258            let k = key(11);
1259            let val = row(11, 50);
1260
1261            input.send(((k, Some(Ok(val.clone())), 1), new_ts(0), Diff::ONE));
1262            input.advance_to(new_ts(1));
1263            worker.step();
1264            persist.send((Ok(val.clone()), new_ts(0), Diff::ONE));
1265            persist.advance_to(new_ts(1));
1266            worker.step();
1267
1268            input.send(((k, Some(Ok(val.clone())), 2), new_ts(1), Diff::ONE));
1269            input.advance_to(new_ts(2));
1270            worker.step();
1271            persist.advance_to(new_ts(2));
1272            worker.step();
1273        });
1274
1275        let val = row(11, 50);
1276        let expected: Vec<(Result<Row, DataflowError>, _, _)> =
1277            vec![(Ok(val), new_ts(0), Diff::ONE)];
1278        assert_eq!(actual, expected);
1279    }
1280
1281    /// Operator-level repro of the 0dt read-only-handoff stranding bug.
1282    ///
1283    /// Models a lagging replacement generation: the external (old) writer has
1284    /// already advanced the shard — and therefore the feedback `persist_upper`
1285    /// — to `T = 10`, while the operator itself has emitted nothing. The
1286    /// lagging replacement now produces source data at timestamps BELOW that
1287    /// upper (`ts = 5, 7`), i.e. data the external writer has already persisted.
1288    ///
1289    /// `drain_sealed_input` DROPS such already-persisted data (it satisfies
1290    /// neither `ts == persist_upper` nor `ts > persist_upper`), mirroring v1's
1291    /// `relevant = persist_upper.less_equal(ts)`. Were it instead re-stashed,
1292    /// the data would be stranded forever — `persist_upper` only advances, so
1293    /// `ts == persist_upper` could never again hold — and `min_ineligible_ts`
1294    /// would pin the operator's output capability at `ts = 5`, BELOW the shard
1295    /// upper, where it would stay for good. Dropping it lets the frontier
1296    /// advance freely.
1297    #[mz_ore::test]
1298    #[cfg_attr(miri, ignore)]
1299    fn lagging_replacement_below_upper_strands_data() {
1300        let (frontier, emitted) = run_below_upper_scenario_v2();
1301
1302        // The below-upper data is discarded (no output) and the output frontier
1303        // is not pinned below the shard upper (10); it advances to the input
1304        // upper (11), matching v1's behavior.
1305        assert!(
1306            emitted.is_empty(),
1307            "below-upper data should be dropped, not emitted; got {emitted:?}"
1308        );
1309        assert_eq!(
1310            frontier,
1311            vec![new_ts(11)],
1312            "v2 output frontier should advance to the input upper, not pin below \
1313             persist_upper"
1314        );
1315        assert!(
1316            frontier[0] >= new_ts(10),
1317            "v2 output frontier {frontier:?} should reach at least persist_upper (10)"
1318        );
1319    }
1320
1321    /// Shared driver for the lagging-replacement scenario against v2. Returns
1322    /// `(output_frontier, consolidated_emitted_updates)`.
1323    fn run_below_upper_scenario_v2() -> (Vec<Ts>, Vec<(Result<Row, DataflowError>, Ts, Diff)>) {
1324        use timely::dataflow::operators::Probe;
1325
1326        let (frontier, capture) = timely::execute_directly(move |worker| {
1327            let (mut input, mut persist, probe, capture) =
1328                worker.dataflow::<MzTimestamp, _, _>(|scope| {
1329                    scope.scoped::<Ts, _, _>("upsert", |scope| {
1330                        let (input_handle, input) = scope.new_input();
1331                        let (persist_handle, persist_input) = scope.new_input();
1332                        let source_id = GlobalId::User(0);
1333
1334                        let reg = MetricsRegistry::new();
1335                        let upsert_defs = UpsertMetricDefs::register_with(&reg);
1336                        let upsert_metrics = UpsertMetrics::new(&upsert_defs, source_id, 0, None);
1337
1338                        let reg2 = MetricsRegistry::new();
1339                        let storage_metrics = StorageMetrics::register_with(&reg2);
1340
1341                        let reg3 = MetricsRegistry::new();
1342                        let stats_defs = SourceStatisticsMetricDefs::register_with(&reg3);
1343                        let envelope = SourceEnvelope::Upsert(UpsertEnvelope {
1344                            source_arity: 2,
1345                            style: UpsertStyle::Default(KeyEnvelope::Flattened),
1346                            key_indices: vec![0],
1347                        });
1348                        let source_statistics = SourceStatistics::new(
1349                            source_id,
1350                            0,
1351                            &stats_defs,
1352                            source_id,
1353                            &ShardId::new(),
1354                            envelope,
1355                            Antichain::from_elem(Timestamp::minimum()),
1356                        );
1357                        let source_config = SourceExportCreationConfig {
1358                            id: source_id,
1359                            worker_id: 0,
1360                            metrics: storage_metrics,
1361                            source_statistics,
1362                        };
1363
1364                        let (output, _, _, button) = upsert_inner(
1365                            input.as_collection(),
1366                            vec![0],
1367                            Antichain::from_elem(Timestamp::minimum()),
1368                            persist_input.as_collection(),
1369                            None,
1370                            upsert_metrics,
1371                            source_config,
1372                        );
1373                        std::mem::forget(button);
1374                        let (probe, stream) = output.inner.probe();
1375                        (input_handle, persist_handle, probe, stream.capture())
1376                    })
1377                });
1378
1379            // The external writer has advanced the shard (feedback persist_upper)
1380            // to T = 10 WITHOUT the operator emitting anything itself.
1381            persist.advance_to(new_ts(10));
1382            for _ in 0..20 {
1383                worker.step();
1384            }
1385
1386            // The lagging replacement produces source data at ts BELOW the
1387            // current persist_upper (5 and 7 while persist_upper = 10).
1388            input.send(((key(0), Some(Ok(row(0, 1))), 1), new_ts(5), Diff::ONE));
1389            input.send(((key(1), Some(Ok(row(1, 2))), 2), new_ts(7), Diff::ONE));
1390            input.advance_to(new_ts(11));
1391            for _ in 0..20 {
1392                worker.step();
1393            }
1394
1395            (probe.with_frontier(|f| f.to_vec()), capture)
1396        });
1397
1398        let mut emitted: Vec<_> = capture
1399            .extract()
1400            .into_iter()
1401            .flat_map(|(_cap, c)| c)
1402            .collect();
1403        differential_dataflow::consolidation::consolidate_updates(&mut emitted);
1404        (frontier, emitted)
1405    }
1406}