Skip to main content

mz_storage/
upsert_continual_feedback_v2.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Implementation of the feedback UPSERT operator.
11//!
12//! # Architecture
13//!
14//! The operator converts a stream of upsert commands `(key, Option<value>)` into
15//! a differential collection of `(key, value)` pairs, using a feedback loop
16//! through persist to maintain the "previous value" state needed for computing
17//! retractions.
18//!
19//! ## Dataflow topology
20//!
21//! ```text
22//!   Source input ──► ┌──────────┐ ──► Output ──► Persist
23//!                    │  Upsert  │
24//!   Persist read ──► └──────────┘
25//!       ▲                                           │
26//!       └───────────── feedback ────────────────────┘
27//! ```
28//!
29//! ## Operator loop (each iteration)
30//!
31//! 1. **Ingest source data.** Read upsert commands from the source input,
32//!    wrap each in an [`UpsertDiff`] (carrying `from_time` for dedup), and
33//!    push into the [`MergeBatcher`]. The batcher consolidates entries for the
34//!    same `(key, time)` using the `UpsertDiff` Semigroup, which keeps the
35//!    update with the highest `FromTime` (latest Kafka offset). This happens
36//!    via amortized geometric merging as data is pushed in, bounding memory
37//!    to O(unique key-time pairs) even during large Kafka snapshots.
38//!
39//! 2. **Read persist frontier.** Check the probe on the persist arrangement
40//!    to learn which times have been committed. When the persist frontier
41//!    reaches the resume upper, rehydration is complete.
42//!
43//! 3. **Seal & drain.** Call `batcher.seal(input_upper)` to extract all
44//!    source-finalized entries as sorted, consolidated chunks. Each entry is
45//!    classified:
46//!    - **Eligible** (at the persist frontier): the persist trace has the
47//!      correct "before" state for this time. Look up the old value via a
48//!      cursor, emit a retraction if present, and emit the new value.
49//!    - **Ineligible** (between persist and input frontiers): persist hasn't
50//!      caught up yet. Push back into the batcher for the next iteration.
51//!
52//! 4. **Capability management.** Downgrade the output capability to the
53//!    minimum time of any remaining buffered data (in the batcher or pushed
54//!    back as ineligible). Drop the capability entirely when the batcher is
55//!    empty.
56//!
57//! ## Eligibility condition (total order)
58//!
59//! For a total-order timestamp with `input_upper = {i}` and
60//! `persist_upper = {p}`, an entry at time `ts` is eligible when
61//! `ts == p < i` — the source has finalized it and persist is exactly at
62//! that time, so the trace cursor returns the correct prior state.
63
64use std::fmt::Debug;
65use std::marker::PhantomData;
66use std::sync::Arc;
67
68use differential_dataflow::difference::{IsZero, Semigroup};
69use differential_dataflow::hashable::Hashable;
70use differential_dataflow::lattice::Lattice;
71use differential_dataflow::operators::arrange::agent::TraceAgent;
72use differential_dataflow::trace::implementations::ValSpine;
73use differential_dataflow::trace::implementations::chunker::ContainerChunker;
74use differential_dataflow::trace::implementations::merge_batcher::{
75    MergeBatcher, container::VecMerger,
76};
77use differential_dataflow::trace::{Batcher, Builder, Cursor, Description, TraceReader};
78use differential_dataflow::{AsCollection, VecCollection};
79use mz_repr::{Diff, GlobalId, Row};
80use mz_storage_types::errors::{DataflowError, EnvelopeError};
81use mz_timely_util::builder_async::{
82    Event as AsyncEvent, OperatorBuilder as AsyncOperatorBuilder, PressOnDropButton,
83};
84use std::convert::Infallible;
85use timely::container::CapacityContainerBuilder;
86use timely::dataflow::StreamVec;
87use timely::dataflow::channels::pact::{Exchange, Pipeline};
88use timely::dataflow::operators::{Capability, CapabilitySet, Exchange as _};
89use timely::order::{PartialOrder, TotalOrder};
90use timely::progress::timestamp::Refines;
91use timely::progress::{Antichain, Timestamp};
92
93use crate::healthcheck::HealthStatusUpdate;
94use crate::metrics::upsert::UpsertMetrics;
95use crate::upsert::UpsertKey;
96use crate::upsert::UpsertValue;
97
98// ── Source stash diff type ───────────────────────────────────────────────────
99// The source stash uses a custom diff type that carries the upsert payload.
100// The Semigroup implementation does "max FromTime wins" — when two updates for
101// the same (key, time) are consolidated, the one with the higher FromTime
102// (latest Kafka offset) is kept.
103
104#[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd)]
105struct UpsertDiff<O> {
106    from_time: O,
107    value: Option<UpsertValue>,
108}
109
110impl<O> IsZero for UpsertDiff<O> {
111    fn is_zero(&self) -> bool {
112        false
113    }
114}
115
116impl<O: Ord + Clone> Semigroup for UpsertDiff<O> {
117    fn plus_equals(&mut self, rhs: &Self) {
118        if rhs.from_time > self.from_time {
119            *self = rhs.clone();
120        }
121    }
122}
123
124// ── MergeBatcher type alias ──────────────────────────────────────────────────
125// The source stash uses DD's MergeBatcher for amortized consolidation.
126// Data is pushed in unsorted; the batcher maintains geometrically-sized sorted
127// chains and consolidates via the UpsertDiff Semigroup automatically.
128
129type UpsertBatcher<T, FromTime> = MergeBatcher<
130    Vec<(UpsertKey, T, UpsertDiff<FromTime>)>,
131    ContainerChunker<Vec<(UpsertKey, T, UpsertDiff<FromTime>)>>,
132    VecMerger<UpsertKey, T, UpsertDiff<FromTime>>,
133>;
134
135/// A minimal [`Builder`] that captures sealed chains without copying.
136///
137/// Used with [`MergeBatcher::seal`] to extract sorted, consolidated chunks
138/// directly as `Vec<Vec<...>>`.
139struct CapturingBuilder<D, T>(D, PhantomData<T>);
140
141impl<D, T: Timestamp> Builder for CapturingBuilder<D, T> {
142    type Input = D;
143    type Time = T;
144    type Output = Vec<D>;
145
146    fn with_capacity(_keys: usize, _vals: usize, _upds: usize) -> Self {
147        unimplemented!()
148    }
149
150    fn push(&mut self, _chunk: &mut Self::Input) {
151        unimplemented!()
152    }
153
154    fn done(self, _description: Description<Self::Time>) -> Self::Output {
155        unimplemented!()
156    }
157
158    #[inline]
159    fn seal(chain: &mut Vec<Self::Input>, _description: Description<Self::Time>) -> Self::Output {
160        std::mem::take(chain)
161    }
162}
163
164/// Transforms a stream of upserts (key-value updates) into a differential
165/// collection.
166///
167/// Persist feedback is arranged into a differential trace (DD manages the
168/// spine lifecycle). Source input is stashed with a custom `UpsertDiff`
169/// Semigroup that deduplicates by keeping the highest FromTime per (key, time).
170///
171/// Has two inputs:
172///   1. **Source input** — upsert commands from the external source.
173///   2. **Persist input** — feedback of the operator's own output, read back
174///      from persist.  Arranged into a trace for cursor-based lookups.
175#[allow(clippy::disallowed_methods)]
176pub fn upsert_inner<'scope, T, FromTime>(
177    input: VecCollection<'scope, T, (UpsertKey, Option<UpsertValue>, FromTime), Diff>,
178    key_indices: Vec<usize>,
179    resume_upper: Antichain<T>,
180    persist_input: VecCollection<'scope, T, Result<Row, DataflowError>, Diff>,
181    persist_token: Option<Vec<PressOnDropButton>>,
182    upsert_metrics: UpsertMetrics,
183    source_config: crate::source::SourceExportCreationConfig,
184) -> (
185    VecCollection<'scope, T, Result<Row, DataflowError>, Diff>,
186    StreamVec<'scope, T, (Option<GlobalId>, HealthStatusUpdate)>,
187    StreamVec<'scope, T, Infallible>,
188    PressOnDropButton,
189)
190where
191    T: Timestamp + TotalOrder + Sync,
192    T: Refines<mz_repr::Timestamp> + TotalOrder + differential_dataflow::lattice::Lattice + Sync,
193    FromTime: Debug + timely::ExchangeData + Clone + Ord + Sync,
194{
195    // ── Arrange persist feedback ────────────────────────────────────────
196    // Extract (UpsertKey, UpsertValue) from the persist feedback collection
197    // and arrange it. DD manages the spine, batching, and compaction.
198    let persist_keyed = persist_input.flat_map(move |result| {
199        let value = match result {
200            Ok(ok) => Ok(ok),
201            Err(DataflowError::EnvelopeError(err)) => match *err {
202                EnvelopeError::Upsert(err) => Err(Box::new(err)),
203                EnvelopeError::Flat(_) => return None,
204            },
205            Err(_) => return None,
206        };
207        let value_ref = match value {
208            Ok(ref row) => Ok(row),
209            Err(ref err) => Err(&**err),
210        };
211        Some((UpsertKey::from_value(value_ref, &key_indices), value))
212    });
213    let persist_keyed = persist_keyed
214        .inner
215        // The arrangement already implicitly exchanges by key, so this is redundant, but we want to
216        // do it earlier so that we can inspect the stream properly for source statistics.
217        .exchange(move |((key, _), _, _)| UpsertKey::hashed(key))
218        .as_collection()
219        .inspect(move |((_, row), _, diff)| {
220            source_config
221                .source_statistics
222                .update_records_indexed_by(diff.into_inner());
223            source_config.source_statistics.update_bytes_indexed_by(
224                row.as_ref().map_or(0, |r| r.byte_len().try_into().unwrap()) * diff.into_inner(),
225            );
226        });
227    let persist_arranged = persist_keyed.arrange_by_key();
228    let mut persist_trace = persist_arranged.trace.clone();
229
230    // Probe the persist arrangement's stream for frontier tracking.
231    // This replaces receiving the batch stream as an input — we just
232    // read the probe frontier to know when persist has caught up.
233    use timely::dataflow::operators::Probe;
234    let (persist_probe, _persist_probe_stream) = persist_arranged.stream.probe();
235
236    // ── Build the async processing operator ─────────────────────────────
237    let mut builder = AsyncOperatorBuilder::new("Upsert V2".to_string(), input.scope());
238
239    let (output_handle, output) = builder.new_output::<CapacityContainerBuilder<_>>();
240    let (_snapshot_handle, snapshot_stream) =
241        builder.new_output::<CapacityContainerBuilder<Vec<Infallible>>>();
242    let (_health_output, health_stream) = builder
243        .new_output::<CapacityContainerBuilder<Vec<(Option<GlobalId>, HealthStatusUpdate)>>>();
244
245    let mut input = builder.new_input_for(
246        input.inner,
247        Exchange::new(move |((key, _, _), _, _)| UpsertKey::hashed(key)),
248        &output_handle,
249    );
250
251    // We still need the persist stream as an input so the operator wakes
252    // when the persist arrangement produces batches (frontier advances).
253    // We read the actual frontier from the probe though.
254    let mut persist_wakeup = builder.new_disconnected_input(_persist_probe_stream, Pipeline);
255
256    let upsert_shared_metrics = Arc::clone(&upsert_metrics.shared);
257    let _ = upsert_shared_metrics;
258
259    let shutdown_button = builder.build(move |caps| async move {
260        let _persist_token = persist_token;
261
262        let [output_cap, snapshot_cap, _health_cap]: [_; 3] = caps.try_into().unwrap();
263        drop(output_cap);
264        let mut snapshot_cap = CapabilitySet::from_elem(snapshot_cap);
265
266        let mut hydrating = true;
267
268        // Source stash backed by DD's MergeBatcher. The batcher maintains
269        // geometrically-sized sorted chains and consolidates via the
270        // UpsertDiff Semigroup as data is pushed in, bounding memory to
271        // O(unique key-time pairs) even during large initial snapshots.
272        let mut batcher: UpsertBatcher<T, FromTime> = Batcher::new(None, 0);
273        // Scratch buffer for accumulating source events before flushing to
274        // the batcher. Drained on each iteration via `push_container`.
275        let mut push_buffer: Vec<(UpsertKey, T, UpsertDiff<FromTime>)> = Vec::new();
276        // Capability held at the minimum time of any buffered data. When
277        // Some, the operator may still produce output; when None, the
278        // batcher is empty.
279        let mut stash_cap: Option<Capability<T>> = None;
280        let mut input_upper = Antichain::from_elem(Timestamp::minimum());
281
282        let mut output_updates = vec![];
283        let snapshot_start = std::time::Instant::now();
284        let mut prev_persist_upper = Antichain::from_elem(Timestamp::minimum());
285
286        // Accumulators for rehydration metrics, set as gauges when rehydration completes.
287        let mut rehydration_total: u64 = 0;
288        let mut rehydration_updates: u64 = 0;
289
290        // ──────────────────────────────────────────────────────────────────
291        // Main operator loop. Each iteration performs four steps:
292        //   Step 1: Ingest source data into the batcher.
293        //   Step 2: Read the persist frontier and update rehydration state.
294        //   Step 3: Seal the batcher, drain eligible entries, push back the rest.
295        //   Step 4: Manage the output capability.
296        // ──────────────────────────────────────────────────────────────────
297        loop {
298            // Block until woken by source input or a persist frontier advance.
299            tokio::select! {
300                _ = input.ready() => {}
301                _ = persist_wakeup.ready() => {
302                    while persist_wakeup.next_sync().is_some() {}
303                }
304            }
305
306            // ── Step 1: Ingest source data ────────────────────────────────
307            // Read all available source events, wrap each value in an
308            // UpsertDiff (carrying FromTime for dedup), and buffer them.
309            // Events before the resume_upper are dropped (already persisted).
310            while let Some(event) = input.next_sync() {
311                match event {
312                    AsyncEvent::Data(cap, data) => {
313                        let mut pushed_any = false;
314                        for ((key, value, from_time), ts, diff) in data {
315                            assert!(diff.is_positive(), "invalid upsert input");
316                            if PartialOrder::less_equal(&input_upper, &resume_upper)
317                                && !resume_upper.less_equal(&ts)
318                            {
319                                continue;
320                            }
321                            push_buffer.push((key, ts, UpsertDiff { from_time, value }));
322                            pushed_any = true;
323                        }
324                        // Track the minimum capability across all buffered data
325                        // so we can emit output at the correct times.
326                        if pushed_any {
327                            stash_cap = Some(match stash_cap {
328                                Some(prev) if cap.time() < prev.time() => cap,
329                                Some(prev) => prev,
330                                None => cap,
331                            });
332                        }
333                    }
334                    AsyncEvent::Progress(upper) => {
335                        if PartialOrder::less_than(&upper, &resume_upper) {
336                            continue;
337                        }
338                        input_upper = upper;
339                    }
340                }
341            }
342
343            // Flush buffered events into the batcher. This triggers the
344            // chunker + geometric chain merging, which consolidates entries
345            // for the same (key, time) via the UpsertDiff Semigroup.
346            if !push_buffer.is_empty() {
347                batcher.push_container(&mut push_buffer);
348            }
349
350            // ── Step 2: Read persist frontier ─────────────────────────────
351            // The persist probe tells us which output times have been
352            // committed back through the feedback loop. This determines:
353            //   - Whether rehydration is complete (persist >= resume_upper).
354            //   - Which source entries are eligible for processing (their
355            //     time must equal persist_upper so the trace cursor returns
356            //     the correct prior state).
357            //   - How far to compact the persist trace.
358            let persist_upper = persist_probe.with_frontier(|f| f.to_owned());
359
360            if persist_upper != prev_persist_upper {
361                let last_rehydration_chunk =
362                    hydrating && PartialOrder::less_equal(&resume_upper, &persist_upper);
363
364                if last_rehydration_chunk {
365                    hydrating = false;
366                    upsert_metrics
367                        .rehydration_latency
368                        .set(snapshot_start.elapsed().as_secs_f64());
369                    upsert_metrics.rehydration_total.set(rehydration_total);
370                    upsert_metrics.rehydration_updates.set(rehydration_updates);
371                    tracing::info!(
372                        worker_id = %source_config.worker_id,
373                        source_id = %source_config.id,
374                        "upsert finished rehydration",
375                    );
376                    snapshot_cap.downgrade(&[]);
377                }
378
379                let _ = snapshot_cap.try_downgrade(persist_upper.iter());
380
381                // Compact the trace so the spine can merge old batches.
382                persist_trace.set_logical_compaction(persist_upper.borrow());
383                persist_trace.set_physical_compaction(persist_upper.borrow());
384
385                prev_persist_upper = persist_upper.clone();
386            }
387
388            // ── Step 3: Seal & drain ──────────────────────────────────────
389            // Seal the batcher at input_upper to extract all source-finalized
390            // entries as sorted, consolidated chunks. The seal merges all
391            // internal chains (O(N) linear merge of sorted data) and splits
392            // by time: entries at ts < input_upper are extracted, the rest
393            // stay in the batcher.
394            //
395            // Extracted entries are partitioned into:
396            //   - Eligible (ts == persist_upper): processed now via cursor
397            //     lookup on the persist trace.
398            //   - Ineligible (persist_upper < ts < input_upper): persist
399            //     hasn't caught up yet; pushed back into the batcher.
400            if stash_cap.is_some() {
401                let cap = stash_cap.as_mut().unwrap();
402
403                let sealed = batcher.seal::<CapturingBuilder<_, _>>(input_upper.clone());
404                // Frontier of data remaining in the batcher (ts >= input_upper).
405                let remaining_frontier = batcher.frontier().to_owned();
406
407                let mut ineligible = Vec::new();
408                let drain_stats = drain_sealed_input(
409                    sealed,
410                    &mut ineligible,
411                    &mut output_updates,
412                    &persist_upper,
413                    &mut persist_trace,
414                    &source_config.worker_id,
415                    &source_config.id,
416                );
417
418                upsert_metrics.multi_get_size.inc_by(drain_stats.eligible);
419                upsert_metrics
420                    .multi_get_result_count
421                    .inc_by(drain_stats.result_count);
422                upsert_metrics
423                    .multi_put_size
424                    .inc_by(drain_stats.output_count);
425                upsert_metrics.upsert_inserts.inc_by(drain_stats.inserts);
426                upsert_metrics.upsert_updates.inc_by(drain_stats.updates);
427                upsert_metrics.upsert_deletes.inc_by(drain_stats.deletes);
428
429                if hydrating {
430                    rehydration_total += drain_stats.inserts;
431                    rehydration_updates += drain_stats.eligible;
432                }
433
434                // Emit output: retractions of old values and insertions of
435                // new values, all at the eligible timestamp.
436                for (update, ts, diff) in output_updates.drain(..) {
437                    output_handle.give(cap, (update, ts, diff));
438                }
439
440                // ── Step 4: Capability management ─────────────────────────
441                // Downgrade the output capability to the minimum time of any
442                // remaining data: either entries still in the batcher (above
443                // input_upper) or ineligible entries being pushed back.
444                let min_ineligible_ts = ineligible.iter().map(|(_, ts, _)| ts).min().cloned();
445                if !ineligible.is_empty() {
446                    batcher.push_container(&mut ineligible);
447                }
448
449                let has_remaining = !remaining_frontier.is_empty() || min_ineligible_ts.is_some();
450                if has_remaining {
451                    let min_ts = match (
452                        remaining_frontier.elements().first(),
453                        min_ineligible_ts.as_ref(),
454                    ) {
455                        (Some(a), Some(b)) => std::cmp::min(a, b).clone(),
456                        (Some(a), None) => a.clone(),
457                        (None, Some(b)) => b.clone(),
458                        (None, None) => unreachable!(),
459                    };
460                    cap.downgrade(&min_ts);
461                } else {
462                    // Batcher is completely empty — drop the capability so
463                    // downstream operators can make progress.
464                    stash_cap = None;
465                }
466            }
467
468            if input_upper.is_empty() {
469                break;
470            }
471        }
472    });
473
474    (
475        output
476            .as_collection()
477            .map(|result: UpsertValue| match result {
478                Ok(ok) => Ok(ok),
479                Err(err) => Err(DataflowError::from(EnvelopeError::Upsert(*err))),
480            }),
481        health_stream,
482        snapshot_stream,
483        shutdown_button.press_on_drop(),
484    )
485}
486
487/// Counts from a single call to [`drain_sealed_input`], used to update metrics.
488struct DrainStats {
489    /// Number of entries looked up in the persist trace (cursor seeks).
490    eligible: u64,
491    /// Number of cursor lookups that found an existing value.
492    result_count: u64,
493    /// New value written with no prior value (insert).
494    inserts: u64,
495    /// New value written over an existing value (update).
496    updates: u64,
497    /// Tombstone (None) applied to an existing value (delete).
498    deletes: u64,
499    /// Total output records emitted (retractions + insertions).
500    output_count: u64,
501}
502
503/// Process sealed chunks from the batcher. Entries at the persist frontier are
504/// eligible for processing (cursor lookup + output); all others are returned
505/// in `ineligible` for re-stashing.
506///
507/// The sealed chunks are already sorted and consolidated by the MergeBatcher.
508fn drain_sealed_input<T, FromTime>(
509    sealed: Vec<Vec<(UpsertKey, T, UpsertDiff<FromTime>)>>,
510    ineligible: &mut Vec<(UpsertKey, T, UpsertDiff<FromTime>)>,
511    output: &mut Vec<(UpsertValue, T, Diff)>,
512    persist_upper: &Antichain<T>,
513    trace: &mut TraceAgent<ValSpine<UpsertKey, UpsertValue, T, Diff>>,
514    worker_id: &usize,
515    source_id: &GlobalId,
516) -> DrainStats
517where
518    T: TotalOrder + Lattice + timely::ExchangeData + Timestamp + Clone + Debug + Ord + Sync,
519    FromTime: timely::ExchangeData + Clone + Ord + Sync,
520{
521    // Separate eligible (at persist frontier) from ineligible.
522    let mut eligible = Vec::new();
523    for chunk in sealed {
524        for entry in chunk {
525            let (_, ref ts, _) = entry;
526            if !persist_upper.less_than(ts) && persist_upper.less_equal(ts) {
527                eligible.push(entry);
528            } else {
529                ineligible.push(entry);
530            }
531        }
532    }
533
534    tracing::debug!(
535        worker_id = %worker_id,
536        source_id = %source_id,
537        ineligible = ineligible.len(),
538        eligible = eligible.len(),
539        "draining stash",
540    );
541
542    let eligible_count = u64::try_from(eligible.len()).expect("eligible count overflows u64");
543
544    if eligible.is_empty() {
545        return DrainStats {
546            eligible: 0,
547            result_count: 0,
548            inserts: 0,
549            updates: 0,
550            deletes: 0,
551            output_count: 0,
552        };
553    }
554
555    let output_before = output.len();
556    let mut result_count: u64 = 0;
557    let mut inserts: u64 = 0;
558    let mut updates: u64 = 0;
559    let mut deletes: u64 = 0;
560
561    // Eligible entries are sorted by (key, time) from the batcher.
562    // The trace cursor moves forward through keys, matching this order.
563    let (mut cursor, storage) = trace.cursor();
564
565    for (key, ts, upsert_diff) in eligible {
566        // Look up the current value for this key in the persist trace.
567        // For ValSpine with Vector layout, Key<'a> = &'a UpsertKey.
568        cursor.seek_key(&storage, &key);
569        let old_value = if cursor.get_key(&storage) == Some(&key) {
570            let mut result = None;
571            while let Some(val) = cursor.get_val(&storage) {
572                let mut count = Diff::ZERO;
573                cursor.map_times(&storage, |_time, diff| {
574                    count += diff.clone();
575                });
576                if count.is_positive() {
577                    assert!(
578                        count == 1.into(),
579                        "unexpected multiple entries for the same key in persist trace"
580                    );
581                    result = Some(val.clone());
582                }
583                cursor.step_val(&storage);
584            }
585            result
586        } else {
587            None
588        };
589
590        if old_value.is_some() {
591            result_count += 1;
592        }
593
594        match upsert_diff.value {
595            Some(new_val) => {
596                if let Some(old_val) = old_value {
597                    output.push((old_val, ts.clone(), Diff::MINUS_ONE));
598                    updates += 1;
599                } else {
600                    inserts += 1;
601                }
602                output.push((new_val, ts, Diff::ONE));
603            }
604            None => {
605                if let Some(old_val) = old_value {
606                    output.push((old_val, ts, Diff::MINUS_ONE));
607                    deletes += 1;
608                }
609            }
610        }
611    }
612
613    let output_count =
614        u64::try_from(output.len() - output_before).expect("output count overflows u64");
615
616    DrainStats {
617        eligible: eligible_count,
618        result_count,
619        inserts,
620        updates,
621        deletes,
622        output_count,
623    }
624}
625
626#[cfg(test)]
627mod test {
628    use mz_ore::metrics::MetricsRegistry;
629    use mz_persist_types::ShardId;
630    use mz_repr::{Datum, Timestamp as MzTimestamp};
631    use mz_storage_operators::persist_source::Subtime;
632    use mz_storage_types::sources::SourceEnvelope;
633    use mz_storage_types::sources::envelope::{KeyEnvelope, UpsertEnvelope, UpsertStyle};
634    use timely::dataflow::operators::capture::Extract;
635    use timely::dataflow::operators::{Capture, Input};
636    use timely::progress::Timestamp;
637
638    use crate::metrics::StorageMetrics;
639    use crate::metrics::upsert::UpsertMetricDefs;
640    use crate::source::SourceExportCreationConfig;
641    use crate::statistics::{SourceStatistics, SourceStatisticsMetricDefs};
642
643    use super::*;
644
645    type Ts = (MzTimestamp, Subtime);
646
647    fn new_ts(ts: u64) -> Ts {
648        (MzTimestamp::new(ts), Subtime::minimum())
649    }
650
651    fn key(k: i64) -> UpsertKey {
652        UpsertKey::from_key(Ok(&Row::pack_slice(&[Datum::Int64(k)])))
653    }
654
655    fn row(k: i64, v: i64) -> Row {
656        Row::pack_slice(&[Datum::Int64(k), Datum::Int64(v)])
657    }
658
659    macro_rules! upsert_test {
660        (|$input:ident, $persist:ident, $worker:ident| $body:block) => {{
661            let output_handle = timely::execute_directly(move |$worker| {
662                let (mut $input, mut $persist, output_handle) = $worker
663                    .dataflow::<MzTimestamp, _, _>(|scope| {
664                        scope.scoped::<Ts, _, _>("upsert", |scope| {
665                            let (input_handle, input) = scope.new_input();
666                            let (persist_handle, persist_input) = scope.new_input();
667                            let source_id = GlobalId::User(0);
668
669                            let reg = MetricsRegistry::new();
670                            let upsert_defs = UpsertMetricDefs::register_with(&reg);
671                            let upsert_metrics =
672                                UpsertMetrics::new(&upsert_defs, source_id, 0, None);
673
674                            let reg2 = MetricsRegistry::new();
675                            let storage_metrics = StorageMetrics::register_with(&reg2);
676
677                            let reg3 = MetricsRegistry::new();
678                            let stats_defs =
679                                SourceStatisticsMetricDefs::register_with(&reg3);
680                            let envelope = SourceEnvelope::Upsert(UpsertEnvelope {
681                                source_arity: 2,
682                                style: UpsertStyle::Default(KeyEnvelope::Flattened),
683                                key_indices: vec![0],
684                            });
685                            let source_statistics = SourceStatistics::new(
686                                source_id, 0, &stats_defs, source_id, &ShardId::new(),
687                                envelope, Antichain::from_elem(Timestamp::minimum()),
688                            );
689                            let source_config = SourceExportCreationConfig {
690                                id: source_id,
691                                worker_id: 0,
692                                metrics: storage_metrics,
693                                source_statistics,
694                            };
695
696                            let (output, _, _, button) = upsert_inner(
697                                input.as_collection(),
698                                vec![0],
699                                Antichain::from_elem(Timestamp::minimum()),
700                                persist_input.as_collection(),
701                                None,
702                                upsert_metrics,
703                                source_config,
704                            );
705                            std::mem::forget(button);
706                            (input_handle, persist_handle, output.inner.capture())
707                        })
708                    });
709
710                $body
711
712                output_handle
713            });
714
715            let mut actual: Vec<_> = output_handle
716                .extract()
717                .into_iter()
718                .flat_map(|(_cap, container)| container)
719                .collect();
720            differential_dataflow::consolidation::consolidate_updates(&mut actual);
721            actual
722        }};
723    }
724
725    #[mz_ore::test]
726    #[cfg_attr(miri, ignore)]
727    fn gh_9160_repro() {
728        let actual = upsert_test!(|input, persist, worker| {
729            let key0 = key(0);
730            let key1 = key(1);
731            let value1 = row(0, 0);
732            let value3 = row(0, 1);
733            let value4 = row(0, 2);
734
735            input.send(((key0, Some(Ok(value1.clone())), 1), new_ts(0), Diff::ONE));
736            input.advance_to(new_ts(2));
737            worker.step();
738
739            persist.send((Ok(value1), new_ts(0), Diff::ONE));
740            persist.advance_to(new_ts(1));
741            worker.step();
742
743            input.send_batch(&mut vec![
744                ((key1, None, 2), new_ts(2), Diff::ONE),
745                ((key0, Some(Ok(value3)), 3), new_ts(3), Diff::ONE),
746            ]);
747            input.advance_to(new_ts(3));
748            input.send_batch(&mut vec![(
749                (key0, Some(Ok(value4)), 4),
750                new_ts(3),
751                Diff::ONE,
752            )]);
753            input.advance_to(new_ts(4));
754            worker.step();
755
756            persist.advance_to(new_ts(3));
757            worker.step();
758        });
759
760        let value1 = row(0, 0);
761        let value4 = row(0, 2);
762        let expected: Vec<(Result<Row, DataflowError>, _, _)> = vec![
763            (Ok(value1.clone()), new_ts(0), Diff::ONE),
764            (Ok(value1), new_ts(3), Diff::MINUS_ONE),
765            (Ok(value4), new_ts(3), Diff::ONE),
766        ];
767        assert_eq!(actual, expected);
768    }
769
770    #[mz_ore::test]
771    #[cfg_attr(miri, ignore)]
772    fn out_of_order_keys_across_timestamps() {
773        let actual = upsert_test!(|input, persist, worker| {
774            let key_high = key(99);
775            let key_low = key(1);
776            let val_a = row(99, 1);
777            let val_b = row(1, 2);
778
779            input.send(((key_high, Some(Ok(val_a.clone())), 1), new_ts(0), Diff::ONE));
780            input.advance_to(new_ts(1));
781            worker.step();
782            persist.send((Ok(val_a.clone()), new_ts(0), Diff::ONE));
783            persist.advance_to(new_ts(1));
784            worker.step();
785
786            input.send(((key_low, Some(Ok(val_b.clone())), 2), new_ts(1), Diff::ONE));
787            input.advance_to(new_ts(2));
788            worker.step();
789            persist.send((Ok(val_b.clone()), new_ts(1), Diff::ONE));
790            persist.advance_to(new_ts(2));
791            worker.step();
792
793            let val_a2 = row(99, 10);
794            let val_b2 = row(1, 20);
795            input.send_batch(&mut vec![
796                (
797                    (key_high, Some(Ok(val_a2.clone())), 3),
798                    new_ts(2),
799                    Diff::ONE,
800                ),
801                ((key_low, Some(Ok(val_b2.clone())), 4), new_ts(2), Diff::ONE),
802            ]);
803            input.advance_to(new_ts(3));
804            worker.step();
805            persist.advance_to(new_ts(3));
806            worker.step();
807        });
808
809        let val_a = row(99, 1);
810        let val_b = row(1, 2);
811        let val_a2 = row(99, 10);
812        let val_b2 = row(1, 20);
813        let expected: Vec<(Result<Row, DataflowError>, _, _)> = vec![
814            (Ok(val_b.clone()), new_ts(1), Diff::ONE),
815            (Ok(val_b), new_ts(2), Diff::MINUS_ONE),
816            (Ok(val_b2), new_ts(2), Diff::ONE),
817            (Ok(val_a.clone()), new_ts(0), Diff::ONE),
818            (Ok(val_a), new_ts(2), Diff::MINUS_ONE),
819            (Ok(val_a2), new_ts(2), Diff::ONE),
820        ];
821        let mut actual_sorted = actual;
822        let mut expected_sorted = expected;
823        actual_sorted.sort();
824        expected_sorted.sort();
825        assert_eq!(actual_sorted, expected_sorted);
826    }
827
828    #[mz_ore::test]
829    #[cfg_attr(miri, ignore)]
830    fn rehydration_then_update() {
831        let actual = upsert_test!(|input, persist, worker| {
832            let k = key(42);
833            let old_val = row(42, 100);
834            let new_val = row(42, 200);
835
836            persist.send((Ok(old_val), new_ts(0), Diff::ONE));
837            persist.advance_to(new_ts(1));
838            worker.step();
839
840            input.send(((k, Some(Ok(new_val)), 1), new_ts(1), Diff::ONE));
841            input.advance_to(new_ts(2));
842            worker.step();
843            persist.advance_to(new_ts(2));
844            worker.step();
845        });
846
847        let old_val = row(42, 100);
848        let new_val = row(42, 200);
849        let expected: Vec<(Result<Row, DataflowError>, _, _)> = vec![
850            (Ok(old_val), new_ts(1), Diff::MINUS_ONE),
851            (Ok(new_val), new_ts(1), Diff::ONE),
852        ];
853        assert_eq!(actual, expected);
854    }
855
856    #[mz_ore::test]
857    #[cfg_attr(miri, ignore)]
858    fn delete_existing_key() {
859        let actual = upsert_test!(|input, persist, worker| {
860            let k = key(7);
861            let val = row(7, 77);
862
863            input.send(((k, Some(Ok(val.clone())), 1), new_ts(0), Diff::ONE));
864            input.advance_to(new_ts(1));
865            worker.step();
866            persist.send((Ok(val), new_ts(0), Diff::ONE));
867            persist.advance_to(new_ts(1));
868            worker.step();
869
870            input.send(((k, None, 2), new_ts(1), Diff::ONE));
871            input.advance_to(new_ts(2));
872            worker.step();
873            persist.advance_to(new_ts(2));
874            worker.step();
875        });
876
877        let val = row(7, 77);
878        let expected: Vec<(Result<Row, DataflowError>, _, _)> = vec![
879            (Ok(val.clone()), new_ts(0), Diff::ONE),
880            (Ok(val), new_ts(1), Diff::MINUS_ONE),
881        ];
882        assert_eq!(actual, expected);
883    }
884
885    #[mz_ore::test]
886    #[cfg_attr(miri, ignore)]
887    fn multi_batch_rehydration() {
888        let actual = upsert_test!(|input, persist, worker| {
889            let k = key(5);
890            let old_val = row(5, 10);
891            let new_val = row(5, 20);
892            let updated_val = row(5, 30);
893
894            persist.send((Ok(old_val.clone()), new_ts(0), Diff::ONE));
895            persist.send((Ok(old_val), new_ts(0), Diff::MINUS_ONE));
896            persist.send((Ok(new_val), new_ts(0), Diff::ONE));
897            persist.advance_to(new_ts(1));
898            worker.step();
899
900            input.send(((k, Some(Ok(updated_val)), 1), new_ts(1), Diff::ONE));
901            input.advance_to(new_ts(2));
902            worker.step();
903            persist.advance_to(new_ts(2));
904            worker.step();
905        });
906
907        let new_val = row(5, 20);
908        let updated_val = row(5, 30);
909        let expected: Vec<(Result<Row, DataflowError>, _, _)> = vec![
910            (Ok(new_val), new_ts(1), Diff::MINUS_ONE),
911            (Ok(updated_val), new_ts(1), Diff::ONE),
912        ];
913        assert_eq!(actual, expected);
914    }
915
916    #[mz_ore::test]
917    #[cfg_attr(miri, ignore)]
918    fn delete_nonexistent_key() {
919        let actual = upsert_test!(|input, persist, worker| {
920            let k = key(99);
921
922            persist.advance_to(new_ts(1));
923            worker.step();
924
925            input.send(((k, None, 1), new_ts(1), Diff::ONE));
926            input.advance_to(new_ts(2));
927            worker.step();
928            persist.advance_to(new_ts(2));
929            worker.step();
930        });
931
932        assert!(actual.is_empty(), "expected empty output, got: {actual:?}");
933    }
934
935    #[mz_ore::test]
936    #[cfg_attr(miri, ignore)]
937    fn reinsert_after_delete() {
938        let actual = upsert_test!(|input, persist, worker| {
939            let k = key(3);
940            let val_a = row(3, 10);
941            let val_b = row(3, 20);
942
943            input.send(((k, Some(Ok(val_a.clone())), 1), new_ts(0), Diff::ONE));
944            input.advance_to(new_ts(1));
945            worker.step();
946            persist.send((Ok(val_a.clone()), new_ts(0), Diff::ONE));
947            persist.advance_to(new_ts(1));
948            worker.step();
949
950            input.send(((k, None, 2), new_ts(1), Diff::ONE));
951            input.advance_to(new_ts(2));
952            worker.step();
953            persist.send((Ok(val_a), new_ts(1), Diff::MINUS_ONE));
954            persist.advance_to(new_ts(2));
955            worker.step();
956
957            input.send(((k, Some(Ok(val_b.clone())), 3), new_ts(2), Diff::ONE));
958            input.advance_to(new_ts(3));
959            worker.step();
960            persist.advance_to(new_ts(3));
961            worker.step();
962        });
963
964        let val_a = row(3, 10);
965        let val_b = row(3, 20);
966        let mut expected: Vec<(Result<Row, DataflowError>, _, _)> = vec![
967            (Ok(val_a.clone()), new_ts(0), Diff::ONE),
968            (Ok(val_a), new_ts(1), Diff::MINUS_ONE),
969            (Ok(val_b), new_ts(2), Diff::ONE),
970        ];
971        expected.sort();
972        let mut actual = actual;
973        actual.sort();
974        assert_eq!(actual, expected);
975    }
976
977    #[mz_ore::test]
978    #[cfg_attr(miri, ignore)]
979    fn idempotent_update() {
980        let actual = upsert_test!(|input, persist, worker| {
981            let k = key(11);
982            let val = row(11, 50);
983
984            input.send(((k, Some(Ok(val.clone())), 1), new_ts(0), Diff::ONE));
985            input.advance_to(new_ts(1));
986            worker.step();
987            persist.send((Ok(val.clone()), new_ts(0), Diff::ONE));
988            persist.advance_to(new_ts(1));
989            worker.step();
990
991            input.send(((k, Some(Ok(val.clone())), 2), new_ts(1), Diff::ONE));
992            input.advance_to(new_ts(2));
993            worker.step();
994            persist.advance_to(new_ts(2));
995            worker.step();
996        });
997
998        let val = row(11, 50);
999        let expected: Vec<(Result<Row, DataflowError>, _, _)> =
1000            vec![(Ok(val), new_ts(0), Diff::ONE)];
1001        assert_eq!(actual, expected);
1002    }
1003}