Skip to main content

mz_storage/
upsert_continual_feedback_v2.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Implementation of the feedback UPSERT operator.
11//!
12//! # Architecture
13//!
14//! The operator converts a stream of upsert commands `(key, Option<value>)` into
15//! a differential collection of `(key, value)` pairs, using a feedback loop
16//! through persist to maintain the "previous value" state needed for computing
17//! retractions.
18//!
19//! ## Dataflow topology
20//!
21//! ```text
22//!   Source input ──► ┌──────────┐ ──► Output ──► Persist
23//!                    │  Upsert  │
24//!   Persist read ──► └──────────┘
25//!       ▲                                           │
26//!       └───────────── feedback ────────────────────┘
27//! ```
28//!
29//! ## Operator loop (each iteration)
30//!
31//! 1. **Ingest source data.** Read upsert commands from the source input,
32//!    wrap each in an [`UpsertDiff`] (carrying `from_time` for dedup), and
33//!    push into the [`MergeBatcher`]. The batcher consolidates entries for the
34//!    same `(key, time)` using the `UpsertDiff` Semigroup, which keeps the
35//!    update with the highest `FromTime` (latest Kafka offset). This happens
36//!    via amortized geometric merging as data is pushed in, bounding memory
37//!    to O(unique key-time pairs) even during large Kafka snapshots.
38//!
39//! 2. **Read persist frontier.** Check the probe on the persist arrangement
40//!    to learn which times have been committed. When the persist frontier
41//!    reaches the resume upper, rehydration is complete.
42//!
43//! 3. **Seal & drain.** Call `batcher.seal(input_upper)` to extract all
44//!    source-finalized entries as sorted, consolidated chunks. Each entry is
45//!    classified:
46//!    - **Eligible** (at the persist frontier): the persist trace has the
47//!      correct "before" state for this time. Look up the old value via a
48//!      cursor, emit a retraction if present, and emit the new value.
49//!    - **Ineligible** (between persist and input frontiers): persist hasn't
50//!      caught up yet. Push back into the batcher for the next iteration.
51//!
52//! 4. **Capability management.** Downgrade the output capability to the
53//!    minimum time of any remaining buffered data (in the batcher or pushed
54//!    back as ineligible). Drop the capability entirely when the batcher is
55//!    empty.
56//!
57//! ## Eligibility condition (total order)
58//!
59//! For a total-order timestamp with `input_upper = {i}` and
60//! `persist_upper = {p}`, an entry at time `ts` is eligible when
61//! `ts == p < i` — the source has finalized it and persist is exactly at
62//! that time, so the trace cursor returns the correct prior state.
63
64use std::fmt::Debug;
65use std::marker::PhantomData;
66use std::sync::Arc;
67
68use differential_dataflow::difference::{IsZero, Semigroup};
69use differential_dataflow::hashable::Hashable;
70use differential_dataflow::lattice::Lattice;
71use differential_dataflow::operators::arrange::agent::TraceAgent;
72use differential_dataflow::trace::implementations::ValSpine;
73use differential_dataflow::trace::implementations::chunker::ContainerChunker;
74use differential_dataflow::trace::implementations::merge_batcher::{
75    MergeBatcher, container::VecMerger,
76};
77use differential_dataflow::trace::{Batcher, Builder, Cursor, Description, TraceReader};
78use differential_dataflow::{AsCollection, VecCollection};
79use mz_repr::{Diff, GlobalId, Row};
80use mz_storage_types::errors::{DataflowError, EnvelopeError};
81use mz_timely_util::builder_async::{
82    Event as AsyncEvent, OperatorBuilder as AsyncOperatorBuilder, PressOnDropButton,
83};
84use std::convert::Infallible;
85use timely::container::CapacityContainerBuilder;
86use timely::dataflow::StreamVec;
87use timely::dataflow::channels::pact::{Exchange, Pipeline};
88use timely::dataflow::operators::{Capability, CapabilitySet, Exchange as _};
89use timely::order::{PartialOrder, TotalOrder};
90use timely::progress::timestamp::Refines;
91use timely::progress::{Antichain, Timestamp};
92
93use crate::healthcheck::HealthStatusUpdate;
94use crate::metrics::upsert::UpsertMetrics;
95use crate::upsert::UpsertKey;
96use crate::upsert::UpsertValue;
97
98// ── Source stash diff type ───────────────────────────────────────────────────
99// The source stash uses a custom diff type that carries the upsert payload.
100// The Semigroup implementation does "max FromTime wins" — when two updates for
101// the same (key, time) are consolidated, the one with the higher FromTime
102// (latest Kafka offset) is kept.
103
104#[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd)]
105struct UpsertDiff<O> {
106    from_time: O,
107    value: Option<UpsertValue>,
108}
109
110impl<O> IsZero for UpsertDiff<O> {
111    fn is_zero(&self) -> bool {
112        false
113    }
114}
115
116impl<O: Ord + Clone> Semigroup for UpsertDiff<O> {
117    fn plus_equals(&mut self, rhs: &Self) {
118        if rhs.from_time > self.from_time {
119            *self = rhs.clone();
120        }
121    }
122}
123
124// ── MergeBatcher type alias ──────────────────────────────────────────────────
125// The source stash uses DD's MergeBatcher for amortized consolidation.
126// Data is pushed in unsorted; the batcher maintains geometrically-sized sorted
127// chains and consolidates via the UpsertDiff Semigroup automatically.
128
129type UpsertBatcher<T, FromTime> = MergeBatcher<
130    Vec<(UpsertKey, T, UpsertDiff<FromTime>)>,
131    ContainerChunker<Vec<(UpsertKey, T, UpsertDiff<FromTime>)>>,
132    VecMerger<UpsertKey, T, UpsertDiff<FromTime>>,
133>;
134
135/// A minimal [`Builder`] that captures sealed chains without copying.
136///
137/// Used with [`MergeBatcher::seal`] to extract sorted, consolidated chunks
138/// directly as `Vec<Vec<...>>`.
139struct CapturingBuilder<D, T>(D, PhantomData<T>);
140
141impl<D, T: Timestamp> Builder for CapturingBuilder<D, T> {
142    type Input = D;
143    type Time = T;
144    type Output = Vec<D>;
145
146    fn with_capacity(_keys: usize, _vals: usize, _upds: usize) -> Self {
147        unimplemented!()
148    }
149
150    fn push(&mut self, _chunk: &mut Self::Input) {
151        unimplemented!()
152    }
153
154    fn done(self, _description: Description<Self::Time>) -> Self::Output {
155        unimplemented!()
156    }
157
158    #[inline]
159    fn seal(chain: &mut Vec<Self::Input>, _description: Description<Self::Time>) -> Self::Output {
160        std::mem::take(chain)
161    }
162}
163
164/// Transforms a stream of upserts (key-value updates) into a differential
165/// collection.
166///
167/// Persist feedback is arranged into a differential trace (DD manages the
168/// spine lifecycle). Source input is stashed with a custom `UpsertDiff`
169/// Semigroup that deduplicates by keeping the highest FromTime per (key, time).
170///
171/// Has two inputs:
172///   1. **Source input** — upsert commands from the external source.
173///   2. **Persist input** — feedback of the operator's own output, read back
174///      from persist.  Arranged into a trace for cursor-based lookups.
175#[allow(clippy::disallowed_methods)]
176pub fn upsert_inner<'scope, T, FromTime>(
177    input: VecCollection<'scope, T, (UpsertKey, Option<UpsertValue>, FromTime), Diff>,
178    key_indices: Vec<usize>,
179    resume_upper: Antichain<T>,
180    persist_input: VecCollection<'scope, T, Result<Row, DataflowError>, Diff>,
181    persist_token: Option<Vec<PressOnDropButton>>,
182    upsert_metrics: UpsertMetrics,
183    source_config: crate::source::SourceExportCreationConfig,
184) -> (
185    VecCollection<'scope, T, Result<Row, DataflowError>, Diff>,
186    StreamVec<'scope, T, (Option<GlobalId>, HealthStatusUpdate)>,
187    StreamVec<'scope, T, Infallible>,
188    PressOnDropButton,
189)
190where
191    T: Timestamp + TotalOrder + Sync,
192    T: Refines<mz_repr::Timestamp> + TotalOrder + differential_dataflow::lattice::Lattice + Sync,
193    FromTime: Debug + timely::ExchangeData + Clone + Ord + Sync,
194{
195    // ── Arrange persist feedback ────────────────────────────────────────
196    // Extract (UpsertKey, UpsertValue) from the persist feedback collection
197    // and arrange it. DD manages the spine, batching, and compaction.
198    let persist_keyed = persist_input.flat_map(move |result| {
199        let value = match result {
200            Ok(ok) => Ok(ok),
201            Err(DataflowError::EnvelopeError(err)) => match *err {
202                EnvelopeError::Upsert(err) => Err(Box::new(err)),
203                EnvelopeError::Flat(_) => return None,
204            },
205            Err(_) => return None,
206        };
207        let value_ref = match value {
208            Ok(ref row) => Ok(row),
209            Err(ref err) => Err(&**err),
210        };
211        Some((UpsertKey::from_value(value_ref, &key_indices), value))
212    });
213    let persist_keyed = persist_keyed
214        .inner
215        // The arrangement already implicitly exchanges by key, so this is redundant, but we want to
216        // do it earlier so that we can inspect the stream properly for source statistics.
217        .exchange(move |((key, _), _, _)| UpsertKey::hashed(key))
218        .as_collection()
219        .inspect(move |((_, row), _, diff)| {
220            source_config
221                .source_statistics
222                .update_records_indexed_by(diff.into_inner());
223            source_config.source_statistics.update_bytes_indexed_by(
224                row.as_ref().map_or(0, |r| r.byte_len().try_into().unwrap()) * diff.into_inner(),
225            );
226        });
227    let persist_arranged = persist_keyed.arrange_by_key();
228    let mut persist_trace = persist_arranged.trace.clone();
229
230    // Probe the persist arrangement's stream for frontier tracking.
231    // This replaces receiving the batch stream as an input — we just
232    // read the probe frontier to know when persist has caught up.
233    use timely::dataflow::operators::Probe;
234    let (persist_probe, _persist_probe_stream) = persist_arranged.stream.probe();
235
236    // ── Build the async processing operator ─────────────────────────────
237    let mut builder = AsyncOperatorBuilder::new("Upsert V2".to_string(), input.scope());
238
239    let (output_handle, output) = builder.new_output::<CapacityContainerBuilder<_>>();
240    let (_snapshot_handle, snapshot_stream) =
241        builder.new_output::<CapacityContainerBuilder<Vec<Infallible>>>();
242    let (_health_output, health_stream) = builder
243        .new_output::<CapacityContainerBuilder<Vec<(Option<GlobalId>, HealthStatusUpdate)>>>();
244
245    let mut input = builder.new_input_for(
246        input.inner,
247        Exchange::new(move |((key, _, _), _, _)| UpsertKey::hashed(key)),
248        &output_handle,
249    );
250
251    // We still need the persist stream as an input so the operator wakes
252    // when the persist arrangement produces batches (frontier advances).
253    // We read the actual frontier from the probe though.
254    let mut persist_wakeup = builder.new_disconnected_input(_persist_probe_stream, Pipeline);
255
256    let upsert_shared_metrics = Arc::clone(&upsert_metrics.shared);
257    let _ = upsert_shared_metrics;
258
259    let shutdown_button = builder.build(move |caps| async move {
260        let _persist_token = persist_token;
261
262        let [output_cap, snapshot_cap, _health_cap]: [_; 3] = caps.try_into().unwrap();
263        drop(output_cap);
264        let mut snapshot_cap = CapabilitySet::from_elem(snapshot_cap);
265
266        let mut hydrating = true;
267
268        // Source stash backed by DD's MergeBatcher. The batcher maintains
269        // geometrically-sized sorted chains and consolidates via the
270        // UpsertDiff Semigroup as data is pushed in, bounding memory to
271        // O(unique key-time pairs) even during large initial snapshots.
272        let mut batcher: UpsertBatcher<T, FromTime> = Batcher::new(None, 0);
273        // Scratch buffer for accumulating source events before flushing to
274        // the batcher. Drained on each iteration via `push_container`.
275        let mut push_buffer: Vec<(UpsertKey, T, UpsertDiff<FromTime>)> = Vec::new();
276        // Capability held at the minimum time of any buffered data. When
277        // Some, the operator may still produce output; when None, the
278        // batcher is empty.
279        let mut stash_cap: Option<Capability<T>> = None;
280        let mut input_upper = Antichain::from_elem(Timestamp::minimum());
281
282        let mut output_updates = vec![];
283        let snapshot_start = std::time::Instant::now();
284        let mut prev_persist_upper = Antichain::from_elem(Timestamp::minimum());
285
286        // Accumulators for rehydration metrics, set as gauges when rehydration completes.
287        let mut rehydration_total: u64 = 0;
288        let mut rehydration_updates: u64 = 0;
289
290        // ──────────────────────────────────────────────────────────────────
291        // Main operator loop. Each iteration performs four steps:
292        //   Step 1: Ingest source data into the batcher.
293        //   Step 2: Read the persist frontier and update rehydration state.
294        //   Step 3: Seal the batcher, drain eligible entries, push back the rest.
295        //   Step 4: Manage the output capability.
296        // ──────────────────────────────────────────────────────────────────
297        loop {
298            // Block until woken by source input or a persist frontier advance.
299            tokio::select! {
300                _ = input.ready() => {}
301                _ = persist_wakeup.ready() => {
302                    while persist_wakeup.next_sync().is_some() {}
303                }
304            }
305
306            // ── Step 1: Ingest source data ────────────────────────────────
307            // Read all available source events, wrap each value in an
308            // UpsertDiff (carrying FromTime for dedup), and buffer them.
309            // Events before the resume_upper are dropped (already persisted).
310            while let Some(event) = input.next_sync() {
311                match event {
312                    AsyncEvent::Data(cap, data) => {
313                        let mut pushed_any = false;
314                        for ((key, value, from_time), ts, diff) in data {
315                            assert!(diff.is_positive(), "invalid upsert input");
316                            if PartialOrder::less_equal(&input_upper, &resume_upper)
317                                && !resume_upper.less_equal(&ts)
318                            {
319                                continue;
320                            }
321                            push_buffer.push((key, ts, UpsertDiff { from_time, value }));
322                            pushed_any = true;
323                        }
324                        // Track the minimum capability across all buffered data
325                        // so we can emit output at the correct times.
326                        if pushed_any {
327                            stash_cap = Some(match stash_cap {
328                                Some(prev) if cap.time() < prev.time() => cap,
329                                Some(prev) => prev,
330                                None => cap,
331                            });
332                        }
333                    }
334                    AsyncEvent::Progress(upper) => {
335                        if PartialOrder::less_than(&upper, &resume_upper) {
336                            continue;
337                        }
338                        input_upper = upper;
339                    }
340                }
341            }
342
343            // Flush buffered events into the batcher. This triggers the
344            // chunker + geometric chain merging, which consolidates entries
345            // for the same (key, time) via the UpsertDiff Semigroup.
346            if !push_buffer.is_empty() {
347                batcher.push_container(&mut push_buffer);
348            }
349
350            // ── Step 2: Read persist frontier ─────────────────────────────
351            // The persist probe tells us which output times have been
352            // committed back through the feedback loop. This determines:
353            //   - Whether rehydration is complete (persist >= resume_upper).
354            //   - Which source entries are eligible for processing (their
355            //     time must equal persist_upper so the trace cursor returns
356            //     the correct prior state).
357            //   - How far to compact the persist trace.
358            let persist_upper = persist_probe.with_frontier(|f| f.to_owned());
359
360            if persist_upper != prev_persist_upper {
361                let last_rehydration_chunk =
362                    hydrating && PartialOrder::less_equal(&resume_upper, &persist_upper);
363
364                if last_rehydration_chunk {
365                    hydrating = false;
366                    upsert_metrics
367                        .rehydration_latency
368                        .set(snapshot_start.elapsed().as_secs_f64());
369                    upsert_metrics.rehydration_total.set(rehydration_total);
370                    upsert_metrics.rehydration_updates.set(rehydration_updates);
371                    tracing::info!(
372                        worker_id = %source_config.worker_id,
373                        source_id = %source_config.id,
374                        "upsert finished rehydration",
375                    );
376                    snapshot_cap.downgrade(&[]);
377                }
378
379                let _ = snapshot_cap.try_downgrade(persist_upper.iter());
380
381                // Compact the trace so the spine can merge old batches.
382                persist_trace.set_logical_compaction(persist_upper.borrow());
383                persist_trace.set_physical_compaction(persist_upper.borrow());
384
385                prev_persist_upper = persist_upper.clone();
386            }
387
388            // ── Step 3: Seal & drain ──────────────────────────────────────
389            // Seal the batcher at input_upper to extract all source-finalized
390            // entries as sorted, consolidated chunks. The seal merges all
391            // internal chains (O(N) linear merge of sorted data) and splits
392            // by time: entries at ts < input_upper are extracted, the rest
393            // stay in the batcher.
394            //
395            // Extracted entries are partitioned into:
396            //   - Eligible (ts == persist_upper): processed now via cursor
397            //     lookup on the persist trace.
398            //   - Ineligible (persist_upper < ts < input_upper): persist
399            //     hasn't caught up yet; pushed back into the batcher.
400            //
401            // We skip the seal entirely unless an eligible entry is at all
402            // possible. `seal` performs an O(N) merge of all chains
403            // regardless of how much it extracts, so calling it when nothing
404            // can be processed makes the operator quadratic in the number of
405            // wakeups (a real pathology during upstream snapshots and during
406            // rehydration when the source races ahead of persist).
407            //
408            // For an entry at `ts` to be eligible we need
409            // `ts == persist_upper && ts < input_upper`. The necessary
410            // preconditions, expressible without scanning the batcher:
411            //   1. `cap.time() <= persist_upper`. Since `cap.time()` is
412            //      maintained as a lower bound on `min(ts in batcher)`, if
413            //      `cap.time() > persist_upper` then every buffered ts is
414            //      strictly above persist_upper and none can equal it.
415            //   2. `persist_upper < input_upper`. Otherwise no `ts` that
416            //      satisfies `ts == persist_upper` can also satisfy
417            //      `ts < input_upper`.
418            //
419            // This naturally covers both the post-hydration source-snapshot
420            // case (cap == persist == input → condition 2 fails) and the
421            // rehydration-with-source-ahead case (cap > persist → condition
422            // 1 fails). It also no-ops correctly when persist has shut down
423            // (empty persist_upper makes condition 2 vacuously false).
424            if let Some(cap) = stash_cap.as_mut()
425                && !persist_upper.less_than(cap.time())
426                && PartialOrder::less_than(&persist_upper, &input_upper)
427            {
428                let sealed = batcher.seal::<CapturingBuilder<_, _>>(input_upper.clone());
429                // Frontier of data remaining in the batcher (ts >= input_upper).
430                let remaining_frontier = batcher.frontier().to_owned();
431
432                let mut ineligible = Vec::new();
433                let drain_stats = drain_sealed_input(
434                    sealed,
435                    &mut ineligible,
436                    &mut output_updates,
437                    &persist_upper,
438                    &mut persist_trace,
439                    &source_config.worker_id,
440                    &source_config.id,
441                );
442
443                upsert_metrics.multi_get_size.inc_by(drain_stats.eligible);
444                upsert_metrics
445                    .multi_get_result_count
446                    .inc_by(drain_stats.result_count);
447                upsert_metrics
448                    .multi_put_size
449                    .inc_by(drain_stats.output_count);
450                upsert_metrics.upsert_inserts.inc_by(drain_stats.inserts);
451                upsert_metrics.upsert_updates.inc_by(drain_stats.updates);
452                upsert_metrics.upsert_deletes.inc_by(drain_stats.deletes);
453
454                if hydrating {
455                    rehydration_total += drain_stats.inserts;
456                    rehydration_updates += drain_stats.eligible;
457                }
458
459                // Emit output: retractions of old values and insertions of
460                // new values, all at the eligible timestamp.
461                for (update, ts, diff) in output_updates.drain(..) {
462                    output_handle.give(cap, (update, ts, diff));
463                }
464
465                // ── Step 4: Capability management ─────────────────────────
466                // Downgrade the output capability to the minimum time of any
467                // remaining data: either entries still in the batcher (above
468                // input_upper) or ineligible entries being pushed back.
469                let min_ineligible_ts = ineligible.iter().map(|(_, ts, _)| ts).min().cloned();
470                if !ineligible.is_empty() {
471                    batcher.push_container(&mut ineligible);
472                }
473
474                let has_remaining = !remaining_frontier.is_empty() || min_ineligible_ts.is_some();
475                if has_remaining {
476                    let min_ts = match (
477                        remaining_frontier.elements().first(),
478                        min_ineligible_ts.as_ref(),
479                    ) {
480                        (Some(a), Some(b)) => std::cmp::min(a, b).clone(),
481                        (Some(a), None) => a.clone(),
482                        (None, Some(b)) => b.clone(),
483                        (None, None) => unreachable!(),
484                    };
485                    cap.downgrade(&min_ts);
486                } else {
487                    // Batcher is completely empty — drop the capability so
488                    // downstream operators can make progress.
489                    stash_cap = None;
490                }
491            }
492
493            if input_upper.is_empty() {
494                break;
495            }
496        }
497    });
498
499    (
500        output
501            .as_collection()
502            .map(|result: UpsertValue| match result {
503                Ok(ok) => Ok(ok),
504                Err(err) => Err(DataflowError::from(EnvelopeError::Upsert(*err))),
505            }),
506        health_stream,
507        snapshot_stream,
508        shutdown_button.press_on_drop(),
509    )
510}
511
512/// Counts from a single call to [`drain_sealed_input`], used to update metrics.
513struct DrainStats {
514    /// Number of entries looked up in the persist trace (cursor seeks).
515    eligible: u64,
516    /// Number of cursor lookups that found an existing value.
517    result_count: u64,
518    /// New value written with no prior value (insert).
519    inserts: u64,
520    /// New value written over an existing value (update).
521    updates: u64,
522    /// Tombstone (None) applied to an existing value (delete).
523    deletes: u64,
524    /// Total output records emitted (retractions + insertions).
525    output_count: u64,
526}
527
528/// Process sealed chunks from the batcher. Entries at the persist frontier are
529/// eligible for processing (cursor lookup + output); all others are returned
530/// in `ineligible` for re-stashing.
531///
532/// The sealed chunks are already sorted and consolidated by the MergeBatcher.
533fn drain_sealed_input<T, FromTime>(
534    sealed: Vec<Vec<(UpsertKey, T, UpsertDiff<FromTime>)>>,
535    ineligible: &mut Vec<(UpsertKey, T, UpsertDiff<FromTime>)>,
536    output: &mut Vec<(UpsertValue, T, Diff)>,
537    persist_upper: &Antichain<T>,
538    trace: &mut TraceAgent<ValSpine<UpsertKey, UpsertValue, T, Diff>>,
539    worker_id: &usize,
540    source_id: &GlobalId,
541) -> DrainStats
542where
543    T: TotalOrder + Lattice + timely::ExchangeData + Timestamp + Clone + Debug + Ord + Sync,
544    FromTime: timely::ExchangeData + Clone + Ord + Sync,
545{
546    // Separate eligible (at persist frontier) from ineligible.
547    let mut eligible = Vec::new();
548    for chunk in sealed {
549        for entry in chunk {
550            let (_, ref ts, _) = entry;
551            if !persist_upper.less_than(ts) && persist_upper.less_equal(ts) {
552                eligible.push(entry);
553            } else {
554                ineligible.push(entry);
555            }
556        }
557    }
558
559    tracing::debug!(
560        worker_id = %worker_id,
561        source_id = %source_id,
562        ineligible = ineligible.len(),
563        eligible = eligible.len(),
564        "draining stash",
565    );
566
567    let eligible_count = u64::try_from(eligible.len()).expect("eligible count overflows u64");
568
569    if eligible.is_empty() {
570        return DrainStats {
571            eligible: 0,
572            result_count: 0,
573            inserts: 0,
574            updates: 0,
575            deletes: 0,
576            output_count: 0,
577        };
578    }
579
580    let output_before = output.len();
581    let mut result_count: u64 = 0;
582    let mut inserts: u64 = 0;
583    let mut updates: u64 = 0;
584    let mut deletes: u64 = 0;
585
586    // Eligible entries are sorted by (key, time) from the batcher.
587    // The trace cursor moves forward through keys, matching this order.
588    let (mut cursor, storage) = trace.cursor();
589
590    for (key, ts, upsert_diff) in eligible {
591        // Look up the current value for this key in the persist trace.
592        // For ValSpine with Vector layout, Key<'a> = &'a UpsertKey.
593        cursor.seek_key(&storage, &key);
594        let old_value = if cursor.get_key(&storage) == Some(&key) {
595            let mut result = None;
596            while let Some(val) = cursor.get_val(&storage) {
597                let mut count = Diff::ZERO;
598                cursor.map_times(&storage, |_time, diff| {
599                    count += diff.clone();
600                });
601                if count.is_positive() {
602                    assert!(
603                        count == 1.into(),
604                        "unexpected multiple entries for the same key in persist trace"
605                    );
606                    result = Some(val.clone());
607                }
608                cursor.step_val(&storage);
609            }
610            result
611        } else {
612            None
613        };
614
615        if old_value.is_some() {
616            result_count += 1;
617        }
618
619        match upsert_diff.value {
620            Some(new_val) => {
621                if let Some(old_val) = old_value {
622                    output.push((old_val, ts.clone(), Diff::MINUS_ONE));
623                    updates += 1;
624                } else {
625                    inserts += 1;
626                }
627                output.push((new_val, ts, Diff::ONE));
628            }
629            None => {
630                if let Some(old_val) = old_value {
631                    output.push((old_val, ts, Diff::MINUS_ONE));
632                    deletes += 1;
633                }
634            }
635        }
636    }
637
638    let output_count =
639        u64::try_from(output.len() - output_before).expect("output count overflows u64");
640
641    DrainStats {
642        eligible: eligible_count,
643        result_count,
644        inserts,
645        updates,
646        deletes,
647        output_count,
648    }
649}
650
651#[cfg(test)]
652mod test {
653    use mz_ore::metrics::MetricsRegistry;
654    use mz_persist_types::ShardId;
655    use mz_repr::{Datum, Timestamp as MzTimestamp};
656    use mz_storage_operators::persist_source::Subtime;
657    use mz_storage_types::sources::SourceEnvelope;
658    use mz_storage_types::sources::envelope::{KeyEnvelope, UpsertEnvelope, UpsertStyle};
659    use timely::dataflow::operators::capture::Extract;
660    use timely::dataflow::operators::{Capture, Input};
661    use timely::progress::Timestamp;
662
663    use crate::metrics::StorageMetrics;
664    use crate::metrics::upsert::UpsertMetricDefs;
665    use crate::source::SourceExportCreationConfig;
666    use crate::statistics::{SourceStatistics, SourceStatisticsMetricDefs};
667
668    use super::*;
669
670    type Ts = (MzTimestamp, Subtime);
671
672    fn new_ts(ts: u64) -> Ts {
673        (MzTimestamp::new(ts), Subtime::minimum())
674    }
675
676    fn key(k: i64) -> UpsertKey {
677        UpsertKey::from_key(Ok(&Row::pack_slice(&[Datum::Int64(k)])))
678    }
679
680    fn row(k: i64, v: i64) -> Row {
681        Row::pack_slice(&[Datum::Int64(k), Datum::Int64(v)])
682    }
683
684    macro_rules! upsert_test {
685        (|$input:ident, $persist:ident, $worker:ident| $body:block) => {{
686            let output_handle = timely::execute_directly(move |$worker| {
687                let (mut $input, mut $persist, output_handle) = $worker
688                    .dataflow::<MzTimestamp, _, _>(|scope| {
689                        scope.scoped::<Ts, _, _>("upsert", |scope| {
690                            let (input_handle, input) = scope.new_input();
691                            let (persist_handle, persist_input) = scope.new_input();
692                            let source_id = GlobalId::User(0);
693
694                            let reg = MetricsRegistry::new();
695                            let upsert_defs = UpsertMetricDefs::register_with(&reg);
696                            let upsert_metrics =
697                                UpsertMetrics::new(&upsert_defs, source_id, 0, None);
698
699                            let reg2 = MetricsRegistry::new();
700                            let storage_metrics = StorageMetrics::register_with(&reg2);
701
702                            let reg3 = MetricsRegistry::new();
703                            let stats_defs =
704                                SourceStatisticsMetricDefs::register_with(&reg3);
705                            let envelope = SourceEnvelope::Upsert(UpsertEnvelope {
706                                source_arity: 2,
707                                style: UpsertStyle::Default(KeyEnvelope::Flattened),
708                                key_indices: vec![0],
709                            });
710                            let source_statistics = SourceStatistics::new(
711                                source_id, 0, &stats_defs, source_id, &ShardId::new(),
712                                envelope, Antichain::from_elem(Timestamp::minimum()),
713                            );
714                            let source_config = SourceExportCreationConfig {
715                                id: source_id,
716                                worker_id: 0,
717                                metrics: storage_metrics,
718                                source_statistics,
719                            };
720
721                            let (output, _, _, button) = upsert_inner(
722                                input.as_collection(),
723                                vec![0],
724                                Antichain::from_elem(Timestamp::minimum()),
725                                persist_input.as_collection(),
726                                None,
727                                upsert_metrics,
728                                source_config,
729                            );
730                            std::mem::forget(button);
731                            (input_handle, persist_handle, output.inner.capture())
732                        })
733                    });
734
735                $body
736
737                output_handle
738            });
739
740            let mut actual: Vec<_> = output_handle
741                .extract()
742                .into_iter()
743                .flat_map(|(_cap, container)| container)
744                .collect();
745            differential_dataflow::consolidation::consolidate_updates(&mut actual);
746            actual
747        }};
748    }
749
750    #[mz_ore::test]
751    #[cfg_attr(miri, ignore)]
752    fn gh_9160_repro() {
753        let actual = upsert_test!(|input, persist, worker| {
754            let key0 = key(0);
755            let key1 = key(1);
756            let value1 = row(0, 0);
757            let value3 = row(0, 1);
758            let value4 = row(0, 2);
759
760            input.send(((key0, Some(Ok(value1.clone())), 1), new_ts(0), Diff::ONE));
761            input.advance_to(new_ts(2));
762            worker.step();
763
764            persist.send((Ok(value1), new_ts(0), Diff::ONE));
765            persist.advance_to(new_ts(1));
766            worker.step();
767
768            input.send_batch(&mut vec![
769                ((key1, None, 2), new_ts(2), Diff::ONE),
770                ((key0, Some(Ok(value3)), 3), new_ts(3), Diff::ONE),
771            ]);
772            input.advance_to(new_ts(3));
773            input.send_batch(&mut vec![(
774                (key0, Some(Ok(value4)), 4),
775                new_ts(3),
776                Diff::ONE,
777            )]);
778            input.advance_to(new_ts(4));
779            worker.step();
780
781            persist.advance_to(new_ts(3));
782            worker.step();
783        });
784
785        let value1 = row(0, 0);
786        let value4 = row(0, 2);
787        let expected: Vec<(Result<Row, DataflowError>, _, _)> = vec![
788            (Ok(value1.clone()), new_ts(0), Diff::ONE),
789            (Ok(value1), new_ts(3), Diff::MINUS_ONE),
790            (Ok(value4), new_ts(3), Diff::ONE),
791        ];
792        assert_eq!(actual, expected);
793    }
794
795    #[mz_ore::test]
796    #[cfg_attr(miri, ignore)]
797    fn out_of_order_keys_across_timestamps() {
798        let actual = upsert_test!(|input, persist, worker| {
799            let key_high = key(99);
800            let key_low = key(1);
801            let val_a = row(99, 1);
802            let val_b = row(1, 2);
803
804            input.send(((key_high, Some(Ok(val_a.clone())), 1), new_ts(0), Diff::ONE));
805            input.advance_to(new_ts(1));
806            worker.step();
807            persist.send((Ok(val_a.clone()), new_ts(0), Diff::ONE));
808            persist.advance_to(new_ts(1));
809            worker.step();
810
811            input.send(((key_low, Some(Ok(val_b.clone())), 2), new_ts(1), Diff::ONE));
812            input.advance_to(new_ts(2));
813            worker.step();
814            persist.send((Ok(val_b.clone()), new_ts(1), Diff::ONE));
815            persist.advance_to(new_ts(2));
816            worker.step();
817
818            let val_a2 = row(99, 10);
819            let val_b2 = row(1, 20);
820            input.send_batch(&mut vec![
821                (
822                    (key_high, Some(Ok(val_a2.clone())), 3),
823                    new_ts(2),
824                    Diff::ONE,
825                ),
826                ((key_low, Some(Ok(val_b2.clone())), 4), new_ts(2), Diff::ONE),
827            ]);
828            input.advance_to(new_ts(3));
829            worker.step();
830            persist.advance_to(new_ts(3));
831            worker.step();
832        });
833
834        let val_a = row(99, 1);
835        let val_b = row(1, 2);
836        let val_a2 = row(99, 10);
837        let val_b2 = row(1, 20);
838        let expected: Vec<(Result<Row, DataflowError>, _, _)> = vec![
839            (Ok(val_b.clone()), new_ts(1), Diff::ONE),
840            (Ok(val_b), new_ts(2), Diff::MINUS_ONE),
841            (Ok(val_b2), new_ts(2), Diff::ONE),
842            (Ok(val_a.clone()), new_ts(0), Diff::ONE),
843            (Ok(val_a), new_ts(2), Diff::MINUS_ONE),
844            (Ok(val_a2), new_ts(2), Diff::ONE),
845        ];
846        let mut actual_sorted = actual;
847        let mut expected_sorted = expected;
848        actual_sorted.sort();
849        expected_sorted.sort();
850        assert_eq!(actual_sorted, expected_sorted);
851    }
852
853    #[mz_ore::test]
854    #[cfg_attr(miri, ignore)]
855    fn rehydration_then_update() {
856        let actual = upsert_test!(|input, persist, worker| {
857            let k = key(42);
858            let old_val = row(42, 100);
859            let new_val = row(42, 200);
860
861            persist.send((Ok(old_val), new_ts(0), Diff::ONE));
862            persist.advance_to(new_ts(1));
863            worker.step();
864
865            input.send(((k, Some(Ok(new_val)), 1), new_ts(1), Diff::ONE));
866            input.advance_to(new_ts(2));
867            worker.step();
868            persist.advance_to(new_ts(2));
869            worker.step();
870        });
871
872        let old_val = row(42, 100);
873        let new_val = row(42, 200);
874        let expected: Vec<(Result<Row, DataflowError>, _, _)> = vec![
875            (Ok(old_val), new_ts(1), Diff::MINUS_ONE),
876            (Ok(new_val), new_ts(1), Diff::ONE),
877        ];
878        assert_eq!(actual, expected);
879    }
880
881    #[mz_ore::test]
882    #[cfg_attr(miri, ignore)]
883    fn delete_existing_key() {
884        let actual = upsert_test!(|input, persist, worker| {
885            let k = key(7);
886            let val = row(7, 77);
887
888            input.send(((k, Some(Ok(val.clone())), 1), new_ts(0), Diff::ONE));
889            input.advance_to(new_ts(1));
890            worker.step();
891            persist.send((Ok(val), new_ts(0), Diff::ONE));
892            persist.advance_to(new_ts(1));
893            worker.step();
894
895            input.send(((k, None, 2), new_ts(1), Diff::ONE));
896            input.advance_to(new_ts(2));
897            worker.step();
898            persist.advance_to(new_ts(2));
899            worker.step();
900        });
901
902        let val = row(7, 77);
903        let expected: Vec<(Result<Row, DataflowError>, _, _)> = vec![
904            (Ok(val.clone()), new_ts(0), Diff::ONE),
905            (Ok(val), new_ts(1), Diff::MINUS_ONE),
906        ];
907        assert_eq!(actual, expected);
908    }
909
910    #[mz_ore::test]
911    #[cfg_attr(miri, ignore)]
912    fn multi_batch_rehydration() {
913        let actual = upsert_test!(|input, persist, worker| {
914            let k = key(5);
915            let old_val = row(5, 10);
916            let new_val = row(5, 20);
917            let updated_val = row(5, 30);
918
919            persist.send((Ok(old_val.clone()), new_ts(0), Diff::ONE));
920            persist.send((Ok(old_val), new_ts(0), Diff::MINUS_ONE));
921            persist.send((Ok(new_val), new_ts(0), Diff::ONE));
922            persist.advance_to(new_ts(1));
923            worker.step();
924
925            input.send(((k, Some(Ok(updated_val)), 1), new_ts(1), Diff::ONE));
926            input.advance_to(new_ts(2));
927            worker.step();
928            persist.advance_to(new_ts(2));
929            worker.step();
930        });
931
932        let new_val = row(5, 20);
933        let updated_val = row(5, 30);
934        let expected: Vec<(Result<Row, DataflowError>, _, _)> = vec![
935            (Ok(new_val), new_ts(1), Diff::MINUS_ONE),
936            (Ok(updated_val), new_ts(1), Diff::ONE),
937        ];
938        assert_eq!(actual, expected);
939    }
940
941    #[mz_ore::test]
942    #[cfg_attr(miri, ignore)]
943    fn delete_nonexistent_key() {
944        let actual = upsert_test!(|input, persist, worker| {
945            let k = key(99);
946
947            persist.advance_to(new_ts(1));
948            worker.step();
949
950            input.send(((k, None, 1), new_ts(1), Diff::ONE));
951            input.advance_to(new_ts(2));
952            worker.step();
953            persist.advance_to(new_ts(2));
954            worker.step();
955        });
956
957        assert!(actual.is_empty(), "expected empty output, got: {actual:?}");
958    }
959
960    #[mz_ore::test]
961    #[cfg_attr(miri, ignore)]
962    fn reinsert_after_delete() {
963        let actual = upsert_test!(|input, persist, worker| {
964            let k = key(3);
965            let val_a = row(3, 10);
966            let val_b = row(3, 20);
967
968            input.send(((k, Some(Ok(val_a.clone())), 1), new_ts(0), Diff::ONE));
969            input.advance_to(new_ts(1));
970            worker.step();
971            persist.send((Ok(val_a.clone()), new_ts(0), Diff::ONE));
972            persist.advance_to(new_ts(1));
973            worker.step();
974
975            input.send(((k, None, 2), new_ts(1), Diff::ONE));
976            input.advance_to(new_ts(2));
977            worker.step();
978            persist.send((Ok(val_a), new_ts(1), Diff::MINUS_ONE));
979            persist.advance_to(new_ts(2));
980            worker.step();
981
982            input.send(((k, Some(Ok(val_b.clone())), 3), new_ts(2), Diff::ONE));
983            input.advance_to(new_ts(3));
984            worker.step();
985            persist.advance_to(new_ts(3));
986            worker.step();
987        });
988
989        let val_a = row(3, 10);
990        let val_b = row(3, 20);
991        let mut expected: Vec<(Result<Row, DataflowError>, _, _)> = vec![
992            (Ok(val_a.clone()), new_ts(0), Diff::ONE),
993            (Ok(val_a), new_ts(1), Diff::MINUS_ONE),
994            (Ok(val_b), new_ts(2), Diff::ONE),
995        ];
996        expected.sort();
997        let mut actual = actual;
998        actual.sort();
999        assert_eq!(actual, expected);
1000    }
1001
1002    #[mz_ore::test]
1003    #[cfg_attr(miri, ignore)]
1004    fn idempotent_update() {
1005        let actual = upsert_test!(|input, persist, worker| {
1006            let k = key(11);
1007            let val = row(11, 50);
1008
1009            input.send(((k, Some(Ok(val.clone())), 1), new_ts(0), Diff::ONE));
1010            input.advance_to(new_ts(1));
1011            worker.step();
1012            persist.send((Ok(val.clone()), new_ts(0), Diff::ONE));
1013            persist.advance_to(new_ts(1));
1014            worker.step();
1015
1016            input.send(((k, Some(Ok(val.clone())), 2), new_ts(1), Diff::ONE));
1017            input.advance_to(new_ts(2));
1018            worker.step();
1019            persist.advance_to(new_ts(2));
1020            worker.step();
1021        });
1022
1023        let val = row(11, 50);
1024        let expected: Vec<(Result<Row, DataflowError>, _, _)> =
1025            vec![(Ok(val), new_ts(0), Diff::ONE)];
1026        assert_eq!(actual, expected);
1027    }
1028}