1use std::fmt::Debug;
65use std::marker::PhantomData;
66use std::sync::Arc;
67
68use differential_dataflow::difference::{IsZero, Semigroup};
69use differential_dataflow::hashable::Hashable;
70use differential_dataflow::lattice::Lattice;
71use differential_dataflow::operators::arrange::agent::TraceAgent;
72use differential_dataflow::trace::implementations::ValSpine;
73use differential_dataflow::trace::implementations::chunker::ContainerChunker;
74use differential_dataflow::trace::implementations::merge_batcher::{
75 MergeBatcher, container::VecMerger,
76};
77use differential_dataflow::trace::{Batcher, Builder, Cursor, Description, TraceReader};
78use differential_dataflow::{AsCollection, VecCollection};
79use mz_repr::{Diff, GlobalId, Row};
80use mz_storage_types::errors::{DataflowError, EnvelopeError};
81use mz_timely_util::builder_async::{
82 Event as AsyncEvent, OperatorBuilder as AsyncOperatorBuilder, PressOnDropButton,
83};
84use std::convert::Infallible;
85use timely::container::CapacityContainerBuilder;
86use timely::dataflow::StreamVec;
87use timely::dataflow::channels::pact::{Exchange, Pipeline};
88use timely::dataflow::operators::{Capability, CapabilitySet, Exchange as _};
89use timely::order::{PartialOrder, TotalOrder};
90use timely::progress::timestamp::Refines;
91use timely::progress::{Antichain, Timestamp};
92
93use crate::healthcheck::HealthStatusUpdate;
94use crate::metrics::upsert::UpsertMetrics;
95use crate::upsert::UpsertKey;
96use crate::upsert::UpsertValue;
97
98#[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd)]
105struct UpsertDiff<O> {
106 from_time: O,
107 value: Option<UpsertValue>,
108}
109
110impl<O> IsZero for UpsertDiff<O> {
111 fn is_zero(&self) -> bool {
112 false
113 }
114}
115
116impl<O: Ord + Clone> Semigroup for UpsertDiff<O> {
117 fn plus_equals(&mut self, rhs: &Self) {
118 if rhs.from_time > self.from_time {
119 *self = rhs.clone();
120 }
121 }
122}
123
124type UpsertBatcher<T, FromTime> = MergeBatcher<
130 Vec<(UpsertKey, T, UpsertDiff<FromTime>)>,
131 ContainerChunker<Vec<(UpsertKey, T, UpsertDiff<FromTime>)>>,
132 VecMerger<UpsertKey, T, UpsertDiff<FromTime>>,
133>;
134
135struct CapturingBuilder<D, T>(D, PhantomData<T>);
140
141impl<D, T: Timestamp> Builder for CapturingBuilder<D, T> {
142 type Input = D;
143 type Time = T;
144 type Output = Vec<D>;
145
146 fn with_capacity(_keys: usize, _vals: usize, _upds: usize) -> Self {
147 unimplemented!()
148 }
149
150 fn push(&mut self, _chunk: &mut Self::Input) {
151 unimplemented!()
152 }
153
154 fn done(self, _description: Description<Self::Time>) -> Self::Output {
155 unimplemented!()
156 }
157
158 #[inline]
159 fn seal(chain: &mut Vec<Self::Input>, _description: Description<Self::Time>) -> Self::Output {
160 std::mem::take(chain)
161 }
162}
163
164#[allow(clippy::disallowed_methods)]
176pub fn upsert_inner<'scope, T, FromTime>(
177 input: VecCollection<'scope, T, (UpsertKey, Option<UpsertValue>, FromTime), Diff>,
178 key_indices: Vec<usize>,
179 resume_upper: Antichain<T>,
180 persist_input: VecCollection<'scope, T, Result<Row, DataflowError>, Diff>,
181 persist_token: Option<Vec<PressOnDropButton>>,
182 upsert_metrics: UpsertMetrics,
183 source_config: crate::source::SourceExportCreationConfig,
184) -> (
185 VecCollection<'scope, T, Result<Row, DataflowError>, Diff>,
186 StreamVec<'scope, T, (Option<GlobalId>, HealthStatusUpdate)>,
187 StreamVec<'scope, T, Infallible>,
188 PressOnDropButton,
189)
190where
191 T: Timestamp + TotalOrder + Sync,
192 T: Refines<mz_repr::Timestamp> + TotalOrder + differential_dataflow::lattice::Lattice + Sync,
193 FromTime: Debug + timely::ExchangeData + Clone + Ord + Sync,
194{
195 let persist_keyed = persist_input.flat_map(move |result| {
199 let value = match result {
200 Ok(ok) => Ok(ok),
201 Err(DataflowError::EnvelopeError(err)) => match *err {
202 EnvelopeError::Upsert(err) => Err(Box::new(err)),
203 EnvelopeError::Flat(_) => return None,
204 },
205 Err(_) => return None,
206 };
207 let value_ref = match value {
208 Ok(ref row) => Ok(row),
209 Err(ref err) => Err(&**err),
210 };
211 Some((UpsertKey::from_value(value_ref, &key_indices), value))
212 });
213 let persist_keyed = persist_keyed
214 .inner
215 .exchange(move |((key, _), _, _)| UpsertKey::hashed(key))
218 .as_collection()
219 .inspect(move |((_, row), _, diff)| {
220 source_config
221 .source_statistics
222 .update_records_indexed_by(diff.into_inner());
223 source_config.source_statistics.update_bytes_indexed_by(
224 row.as_ref().map_or(0, |r| r.byte_len().try_into().unwrap()) * diff.into_inner(),
225 );
226 });
227 let persist_arranged = persist_keyed.arrange_by_key();
228 let mut persist_trace = persist_arranged.trace.clone();
229
230 use timely::dataflow::operators::Probe;
234 let (persist_probe, _persist_probe_stream) = persist_arranged.stream.probe();
235
236 let mut builder = AsyncOperatorBuilder::new("Upsert V2".to_string(), input.scope());
238
239 let (output_handle, output) = builder.new_output::<CapacityContainerBuilder<_>>();
240 let (_snapshot_handle, snapshot_stream) =
241 builder.new_output::<CapacityContainerBuilder<Vec<Infallible>>>();
242 let (_health_output, health_stream) = builder
243 .new_output::<CapacityContainerBuilder<Vec<(Option<GlobalId>, HealthStatusUpdate)>>>();
244
245 let mut input = builder.new_input_for(
246 input.inner,
247 Exchange::new(move |((key, _, _), _, _)| UpsertKey::hashed(key)),
248 &output_handle,
249 );
250
251 let mut persist_wakeup = builder.new_disconnected_input(_persist_probe_stream, Pipeline);
255
256 let upsert_shared_metrics = Arc::clone(&upsert_metrics.shared);
257 let _ = upsert_shared_metrics;
258
259 let shutdown_button = builder.build(move |caps| async move {
260 let _persist_token = persist_token;
261
262 let [output_cap, snapshot_cap, _health_cap]: [_; 3] = caps.try_into().unwrap();
263 drop(output_cap);
264 let mut snapshot_cap = CapabilitySet::from_elem(snapshot_cap);
265
266 let mut hydrating = true;
267
268 let mut batcher: UpsertBatcher<T, FromTime> = Batcher::new(None, 0);
273 let mut push_buffer: Vec<(UpsertKey, T, UpsertDiff<FromTime>)> = Vec::new();
276 let mut stash_cap: Option<Capability<T>> = None;
280 let mut input_upper = Antichain::from_elem(Timestamp::minimum());
281
282 let mut output_updates = vec![];
283 let snapshot_start = std::time::Instant::now();
284 let mut prev_persist_upper = Antichain::from_elem(Timestamp::minimum());
285
286 let mut rehydration_total: u64 = 0;
288 let mut rehydration_updates: u64 = 0;
289
290 loop {
298 tokio::select! {
300 _ = input.ready() => {}
301 _ = persist_wakeup.ready() => {
302 while persist_wakeup.next_sync().is_some() {}
303 }
304 }
305
306 while let Some(event) = input.next_sync() {
311 match event {
312 AsyncEvent::Data(cap, data) => {
313 let mut pushed_any = false;
314 for ((key, value, from_time), ts, diff) in data {
315 assert!(diff.is_positive(), "invalid upsert input");
316 if PartialOrder::less_equal(&input_upper, &resume_upper)
317 && !resume_upper.less_equal(&ts)
318 {
319 continue;
320 }
321 push_buffer.push((key, ts, UpsertDiff { from_time, value }));
322 pushed_any = true;
323 }
324 if pushed_any {
327 stash_cap = Some(match stash_cap {
328 Some(prev) if cap.time() < prev.time() => cap,
329 Some(prev) => prev,
330 None => cap,
331 });
332 }
333 }
334 AsyncEvent::Progress(upper) => {
335 if PartialOrder::less_than(&upper, &resume_upper) {
336 continue;
337 }
338 input_upper = upper;
339 }
340 }
341 }
342
343 if !push_buffer.is_empty() {
347 batcher.push_container(&mut push_buffer);
348 }
349
350 let persist_upper = persist_probe.with_frontier(|f| f.to_owned());
359
360 if persist_upper != prev_persist_upper {
361 let last_rehydration_chunk =
362 hydrating && PartialOrder::less_equal(&resume_upper, &persist_upper);
363
364 if last_rehydration_chunk {
365 hydrating = false;
366 upsert_metrics
367 .rehydration_latency
368 .set(snapshot_start.elapsed().as_secs_f64());
369 upsert_metrics.rehydration_total.set(rehydration_total);
370 upsert_metrics.rehydration_updates.set(rehydration_updates);
371 tracing::info!(
372 worker_id = %source_config.worker_id,
373 source_id = %source_config.id,
374 "upsert finished rehydration",
375 );
376 snapshot_cap.downgrade(&[]);
377 }
378
379 let _ = snapshot_cap.try_downgrade(persist_upper.iter());
380
381 persist_trace.set_logical_compaction(persist_upper.borrow());
383 persist_trace.set_physical_compaction(persist_upper.borrow());
384
385 prev_persist_upper = persist_upper.clone();
386 }
387
388 if stash_cap.is_some() {
401 let cap = stash_cap.as_mut().unwrap();
402
403 let sealed = batcher.seal::<CapturingBuilder<_, _>>(input_upper.clone());
404 let remaining_frontier = batcher.frontier().to_owned();
406
407 let mut ineligible = Vec::new();
408 let drain_stats = drain_sealed_input(
409 sealed,
410 &mut ineligible,
411 &mut output_updates,
412 &persist_upper,
413 &mut persist_trace,
414 &source_config.worker_id,
415 &source_config.id,
416 );
417
418 upsert_metrics.multi_get_size.inc_by(drain_stats.eligible);
419 upsert_metrics
420 .multi_get_result_count
421 .inc_by(drain_stats.result_count);
422 upsert_metrics
423 .multi_put_size
424 .inc_by(drain_stats.output_count);
425 upsert_metrics.upsert_inserts.inc_by(drain_stats.inserts);
426 upsert_metrics.upsert_updates.inc_by(drain_stats.updates);
427 upsert_metrics.upsert_deletes.inc_by(drain_stats.deletes);
428
429 if hydrating {
430 rehydration_total += drain_stats.inserts;
431 rehydration_updates += drain_stats.eligible;
432 }
433
434 for (update, ts, diff) in output_updates.drain(..) {
437 output_handle.give(cap, (update, ts, diff));
438 }
439
440 let min_ineligible_ts = ineligible.iter().map(|(_, ts, _)| ts).min().cloned();
445 if !ineligible.is_empty() {
446 batcher.push_container(&mut ineligible);
447 }
448
449 let has_remaining = !remaining_frontier.is_empty() || min_ineligible_ts.is_some();
450 if has_remaining {
451 let min_ts = match (
452 remaining_frontier.elements().first(),
453 min_ineligible_ts.as_ref(),
454 ) {
455 (Some(a), Some(b)) => std::cmp::min(a, b).clone(),
456 (Some(a), None) => a.clone(),
457 (None, Some(b)) => b.clone(),
458 (None, None) => unreachable!(),
459 };
460 cap.downgrade(&min_ts);
461 } else {
462 stash_cap = None;
465 }
466 }
467
468 if input_upper.is_empty() {
469 break;
470 }
471 }
472 });
473
474 (
475 output
476 .as_collection()
477 .map(|result: UpsertValue| match result {
478 Ok(ok) => Ok(ok),
479 Err(err) => Err(DataflowError::from(EnvelopeError::Upsert(*err))),
480 }),
481 health_stream,
482 snapshot_stream,
483 shutdown_button.press_on_drop(),
484 )
485}
486
487struct DrainStats {
489 eligible: u64,
491 result_count: u64,
493 inserts: u64,
495 updates: u64,
497 deletes: u64,
499 output_count: u64,
501}
502
503fn drain_sealed_input<T, FromTime>(
509 sealed: Vec<Vec<(UpsertKey, T, UpsertDiff<FromTime>)>>,
510 ineligible: &mut Vec<(UpsertKey, T, UpsertDiff<FromTime>)>,
511 output: &mut Vec<(UpsertValue, T, Diff)>,
512 persist_upper: &Antichain<T>,
513 trace: &mut TraceAgent<ValSpine<UpsertKey, UpsertValue, T, Diff>>,
514 worker_id: &usize,
515 source_id: &GlobalId,
516) -> DrainStats
517where
518 T: TotalOrder + Lattice + timely::ExchangeData + Timestamp + Clone + Debug + Ord + Sync,
519 FromTime: timely::ExchangeData + Clone + Ord + Sync,
520{
521 let mut eligible = Vec::new();
523 for chunk in sealed {
524 for entry in chunk {
525 let (_, ref ts, _) = entry;
526 if !persist_upper.less_than(ts) && persist_upper.less_equal(ts) {
527 eligible.push(entry);
528 } else {
529 ineligible.push(entry);
530 }
531 }
532 }
533
534 tracing::debug!(
535 worker_id = %worker_id,
536 source_id = %source_id,
537 ineligible = ineligible.len(),
538 eligible = eligible.len(),
539 "draining stash",
540 );
541
542 let eligible_count = u64::try_from(eligible.len()).expect("eligible count overflows u64");
543
544 if eligible.is_empty() {
545 return DrainStats {
546 eligible: 0,
547 result_count: 0,
548 inserts: 0,
549 updates: 0,
550 deletes: 0,
551 output_count: 0,
552 };
553 }
554
555 let output_before = output.len();
556 let mut result_count: u64 = 0;
557 let mut inserts: u64 = 0;
558 let mut updates: u64 = 0;
559 let mut deletes: u64 = 0;
560
561 let (mut cursor, storage) = trace.cursor();
564
565 for (key, ts, upsert_diff) in eligible {
566 cursor.seek_key(&storage, &key);
569 let old_value = if cursor.get_key(&storage) == Some(&key) {
570 let mut result = None;
571 while let Some(val) = cursor.get_val(&storage) {
572 let mut count = Diff::ZERO;
573 cursor.map_times(&storage, |_time, diff| {
574 count += diff.clone();
575 });
576 if count.is_positive() {
577 assert!(
578 count == 1.into(),
579 "unexpected multiple entries for the same key in persist trace"
580 );
581 result = Some(val.clone());
582 }
583 cursor.step_val(&storage);
584 }
585 result
586 } else {
587 None
588 };
589
590 if old_value.is_some() {
591 result_count += 1;
592 }
593
594 match upsert_diff.value {
595 Some(new_val) => {
596 if let Some(old_val) = old_value {
597 output.push((old_val, ts.clone(), Diff::MINUS_ONE));
598 updates += 1;
599 } else {
600 inserts += 1;
601 }
602 output.push((new_val, ts, Diff::ONE));
603 }
604 None => {
605 if let Some(old_val) = old_value {
606 output.push((old_val, ts, Diff::MINUS_ONE));
607 deletes += 1;
608 }
609 }
610 }
611 }
612
613 let output_count =
614 u64::try_from(output.len() - output_before).expect("output count overflows u64");
615
616 DrainStats {
617 eligible: eligible_count,
618 result_count,
619 inserts,
620 updates,
621 deletes,
622 output_count,
623 }
624}
625
626#[cfg(test)]
627mod test {
628 use mz_ore::metrics::MetricsRegistry;
629 use mz_persist_types::ShardId;
630 use mz_repr::{Datum, Timestamp as MzTimestamp};
631 use mz_storage_operators::persist_source::Subtime;
632 use mz_storage_types::sources::SourceEnvelope;
633 use mz_storage_types::sources::envelope::{KeyEnvelope, UpsertEnvelope, UpsertStyle};
634 use timely::dataflow::operators::capture::Extract;
635 use timely::dataflow::operators::{Capture, Input};
636 use timely::progress::Timestamp;
637
638 use crate::metrics::StorageMetrics;
639 use crate::metrics::upsert::UpsertMetricDefs;
640 use crate::source::SourceExportCreationConfig;
641 use crate::statistics::{SourceStatistics, SourceStatisticsMetricDefs};
642
643 use super::*;
644
645 type Ts = (MzTimestamp, Subtime);
646
647 fn new_ts(ts: u64) -> Ts {
648 (MzTimestamp::new(ts), Subtime::minimum())
649 }
650
651 fn key(k: i64) -> UpsertKey {
652 UpsertKey::from_key(Ok(&Row::pack_slice(&[Datum::Int64(k)])))
653 }
654
655 fn row(k: i64, v: i64) -> Row {
656 Row::pack_slice(&[Datum::Int64(k), Datum::Int64(v)])
657 }
658
659 macro_rules! upsert_test {
660 (|$input:ident, $persist:ident, $worker:ident| $body:block) => {{
661 let output_handle = timely::execute_directly(move |$worker| {
662 let (mut $input, mut $persist, output_handle) = $worker
663 .dataflow::<MzTimestamp, _, _>(|scope| {
664 scope.scoped::<Ts, _, _>("upsert", |scope| {
665 let (input_handle, input) = scope.new_input();
666 let (persist_handle, persist_input) = scope.new_input();
667 let source_id = GlobalId::User(0);
668
669 let reg = MetricsRegistry::new();
670 let upsert_defs = UpsertMetricDefs::register_with(®);
671 let upsert_metrics =
672 UpsertMetrics::new(&upsert_defs, source_id, 0, None);
673
674 let reg2 = MetricsRegistry::new();
675 let storage_metrics = StorageMetrics::register_with(®2);
676
677 let reg3 = MetricsRegistry::new();
678 let stats_defs =
679 SourceStatisticsMetricDefs::register_with(®3);
680 let envelope = SourceEnvelope::Upsert(UpsertEnvelope {
681 source_arity: 2,
682 style: UpsertStyle::Default(KeyEnvelope::Flattened),
683 key_indices: vec![0],
684 });
685 let source_statistics = SourceStatistics::new(
686 source_id, 0, &stats_defs, source_id, &ShardId::new(),
687 envelope, Antichain::from_elem(Timestamp::minimum()),
688 );
689 let source_config = SourceExportCreationConfig {
690 id: source_id,
691 worker_id: 0,
692 metrics: storage_metrics,
693 source_statistics,
694 };
695
696 let (output, _, _, button) = upsert_inner(
697 input.as_collection(),
698 vec![0],
699 Antichain::from_elem(Timestamp::minimum()),
700 persist_input.as_collection(),
701 None,
702 upsert_metrics,
703 source_config,
704 );
705 std::mem::forget(button);
706 (input_handle, persist_handle, output.inner.capture())
707 })
708 });
709
710 $body
711
712 output_handle
713 });
714
715 let mut actual: Vec<_> = output_handle
716 .extract()
717 .into_iter()
718 .flat_map(|(_cap, container)| container)
719 .collect();
720 differential_dataflow::consolidation::consolidate_updates(&mut actual);
721 actual
722 }};
723 }
724
725 #[mz_ore::test]
726 #[cfg_attr(miri, ignore)]
727 fn gh_9160_repro() {
728 let actual = upsert_test!(|input, persist, worker| {
729 let key0 = key(0);
730 let key1 = key(1);
731 let value1 = row(0, 0);
732 let value3 = row(0, 1);
733 let value4 = row(0, 2);
734
735 input.send(((key0, Some(Ok(value1.clone())), 1), new_ts(0), Diff::ONE));
736 input.advance_to(new_ts(2));
737 worker.step();
738
739 persist.send((Ok(value1), new_ts(0), Diff::ONE));
740 persist.advance_to(new_ts(1));
741 worker.step();
742
743 input.send_batch(&mut vec![
744 ((key1, None, 2), new_ts(2), Diff::ONE),
745 ((key0, Some(Ok(value3)), 3), new_ts(3), Diff::ONE),
746 ]);
747 input.advance_to(new_ts(3));
748 input.send_batch(&mut vec![(
749 (key0, Some(Ok(value4)), 4),
750 new_ts(3),
751 Diff::ONE,
752 )]);
753 input.advance_to(new_ts(4));
754 worker.step();
755
756 persist.advance_to(new_ts(3));
757 worker.step();
758 });
759
760 let value1 = row(0, 0);
761 let value4 = row(0, 2);
762 let expected: Vec<(Result<Row, DataflowError>, _, _)> = vec![
763 (Ok(value1.clone()), new_ts(0), Diff::ONE),
764 (Ok(value1), new_ts(3), Diff::MINUS_ONE),
765 (Ok(value4), new_ts(3), Diff::ONE),
766 ];
767 assert_eq!(actual, expected);
768 }
769
770 #[mz_ore::test]
771 #[cfg_attr(miri, ignore)]
772 fn out_of_order_keys_across_timestamps() {
773 let actual = upsert_test!(|input, persist, worker| {
774 let key_high = key(99);
775 let key_low = key(1);
776 let val_a = row(99, 1);
777 let val_b = row(1, 2);
778
779 input.send(((key_high, Some(Ok(val_a.clone())), 1), new_ts(0), Diff::ONE));
780 input.advance_to(new_ts(1));
781 worker.step();
782 persist.send((Ok(val_a.clone()), new_ts(0), Diff::ONE));
783 persist.advance_to(new_ts(1));
784 worker.step();
785
786 input.send(((key_low, Some(Ok(val_b.clone())), 2), new_ts(1), Diff::ONE));
787 input.advance_to(new_ts(2));
788 worker.step();
789 persist.send((Ok(val_b.clone()), new_ts(1), Diff::ONE));
790 persist.advance_to(new_ts(2));
791 worker.step();
792
793 let val_a2 = row(99, 10);
794 let val_b2 = row(1, 20);
795 input.send_batch(&mut vec![
796 (
797 (key_high, Some(Ok(val_a2.clone())), 3),
798 new_ts(2),
799 Diff::ONE,
800 ),
801 ((key_low, Some(Ok(val_b2.clone())), 4), new_ts(2), Diff::ONE),
802 ]);
803 input.advance_to(new_ts(3));
804 worker.step();
805 persist.advance_to(new_ts(3));
806 worker.step();
807 });
808
809 let val_a = row(99, 1);
810 let val_b = row(1, 2);
811 let val_a2 = row(99, 10);
812 let val_b2 = row(1, 20);
813 let expected: Vec<(Result<Row, DataflowError>, _, _)> = vec![
814 (Ok(val_b.clone()), new_ts(1), Diff::ONE),
815 (Ok(val_b), new_ts(2), Diff::MINUS_ONE),
816 (Ok(val_b2), new_ts(2), Diff::ONE),
817 (Ok(val_a.clone()), new_ts(0), Diff::ONE),
818 (Ok(val_a), new_ts(2), Diff::MINUS_ONE),
819 (Ok(val_a2), new_ts(2), Diff::ONE),
820 ];
821 let mut actual_sorted = actual;
822 let mut expected_sorted = expected;
823 actual_sorted.sort();
824 expected_sorted.sort();
825 assert_eq!(actual_sorted, expected_sorted);
826 }
827
828 #[mz_ore::test]
829 #[cfg_attr(miri, ignore)]
830 fn rehydration_then_update() {
831 let actual = upsert_test!(|input, persist, worker| {
832 let k = key(42);
833 let old_val = row(42, 100);
834 let new_val = row(42, 200);
835
836 persist.send((Ok(old_val), new_ts(0), Diff::ONE));
837 persist.advance_to(new_ts(1));
838 worker.step();
839
840 input.send(((k, Some(Ok(new_val)), 1), new_ts(1), Diff::ONE));
841 input.advance_to(new_ts(2));
842 worker.step();
843 persist.advance_to(new_ts(2));
844 worker.step();
845 });
846
847 let old_val = row(42, 100);
848 let new_val = row(42, 200);
849 let expected: Vec<(Result<Row, DataflowError>, _, _)> = vec![
850 (Ok(old_val), new_ts(1), Diff::MINUS_ONE),
851 (Ok(new_val), new_ts(1), Diff::ONE),
852 ];
853 assert_eq!(actual, expected);
854 }
855
856 #[mz_ore::test]
857 #[cfg_attr(miri, ignore)]
858 fn delete_existing_key() {
859 let actual = upsert_test!(|input, persist, worker| {
860 let k = key(7);
861 let val = row(7, 77);
862
863 input.send(((k, Some(Ok(val.clone())), 1), new_ts(0), Diff::ONE));
864 input.advance_to(new_ts(1));
865 worker.step();
866 persist.send((Ok(val), new_ts(0), Diff::ONE));
867 persist.advance_to(new_ts(1));
868 worker.step();
869
870 input.send(((k, None, 2), new_ts(1), Diff::ONE));
871 input.advance_to(new_ts(2));
872 worker.step();
873 persist.advance_to(new_ts(2));
874 worker.step();
875 });
876
877 let val = row(7, 77);
878 let expected: Vec<(Result<Row, DataflowError>, _, _)> = vec![
879 (Ok(val.clone()), new_ts(0), Diff::ONE),
880 (Ok(val), new_ts(1), Diff::MINUS_ONE),
881 ];
882 assert_eq!(actual, expected);
883 }
884
885 #[mz_ore::test]
886 #[cfg_attr(miri, ignore)]
887 fn multi_batch_rehydration() {
888 let actual = upsert_test!(|input, persist, worker| {
889 let k = key(5);
890 let old_val = row(5, 10);
891 let new_val = row(5, 20);
892 let updated_val = row(5, 30);
893
894 persist.send((Ok(old_val.clone()), new_ts(0), Diff::ONE));
895 persist.send((Ok(old_val), new_ts(0), Diff::MINUS_ONE));
896 persist.send((Ok(new_val), new_ts(0), Diff::ONE));
897 persist.advance_to(new_ts(1));
898 worker.step();
899
900 input.send(((k, Some(Ok(updated_val)), 1), new_ts(1), Diff::ONE));
901 input.advance_to(new_ts(2));
902 worker.step();
903 persist.advance_to(new_ts(2));
904 worker.step();
905 });
906
907 let new_val = row(5, 20);
908 let updated_val = row(5, 30);
909 let expected: Vec<(Result<Row, DataflowError>, _, _)> = vec![
910 (Ok(new_val), new_ts(1), Diff::MINUS_ONE),
911 (Ok(updated_val), new_ts(1), Diff::ONE),
912 ];
913 assert_eq!(actual, expected);
914 }
915
916 #[mz_ore::test]
917 #[cfg_attr(miri, ignore)]
918 fn delete_nonexistent_key() {
919 let actual = upsert_test!(|input, persist, worker| {
920 let k = key(99);
921
922 persist.advance_to(new_ts(1));
923 worker.step();
924
925 input.send(((k, None, 1), new_ts(1), Diff::ONE));
926 input.advance_to(new_ts(2));
927 worker.step();
928 persist.advance_to(new_ts(2));
929 worker.step();
930 });
931
932 assert!(actual.is_empty(), "expected empty output, got: {actual:?}");
933 }
934
935 #[mz_ore::test]
936 #[cfg_attr(miri, ignore)]
937 fn reinsert_after_delete() {
938 let actual = upsert_test!(|input, persist, worker| {
939 let k = key(3);
940 let val_a = row(3, 10);
941 let val_b = row(3, 20);
942
943 input.send(((k, Some(Ok(val_a.clone())), 1), new_ts(0), Diff::ONE));
944 input.advance_to(new_ts(1));
945 worker.step();
946 persist.send((Ok(val_a.clone()), new_ts(0), Diff::ONE));
947 persist.advance_to(new_ts(1));
948 worker.step();
949
950 input.send(((k, None, 2), new_ts(1), Diff::ONE));
951 input.advance_to(new_ts(2));
952 worker.step();
953 persist.send((Ok(val_a), new_ts(1), Diff::MINUS_ONE));
954 persist.advance_to(new_ts(2));
955 worker.step();
956
957 input.send(((k, Some(Ok(val_b.clone())), 3), new_ts(2), Diff::ONE));
958 input.advance_to(new_ts(3));
959 worker.step();
960 persist.advance_to(new_ts(3));
961 worker.step();
962 });
963
964 let val_a = row(3, 10);
965 let val_b = row(3, 20);
966 let mut expected: Vec<(Result<Row, DataflowError>, _, _)> = vec![
967 (Ok(val_a.clone()), new_ts(0), Diff::ONE),
968 (Ok(val_a), new_ts(1), Diff::MINUS_ONE),
969 (Ok(val_b), new_ts(2), Diff::ONE),
970 ];
971 expected.sort();
972 let mut actual = actual;
973 actual.sort();
974 assert_eq!(actual, expected);
975 }
976
977 #[mz_ore::test]
978 #[cfg_attr(miri, ignore)]
979 fn idempotent_update() {
980 let actual = upsert_test!(|input, persist, worker| {
981 let k = key(11);
982 let val = row(11, 50);
983
984 input.send(((k, Some(Ok(val.clone())), 1), new_ts(0), Diff::ONE));
985 input.advance_to(new_ts(1));
986 worker.step();
987 persist.send((Ok(val.clone()), new_ts(0), Diff::ONE));
988 persist.advance_to(new_ts(1));
989 worker.step();
990
991 input.send(((k, Some(Ok(val.clone())), 2), new_ts(1), Diff::ONE));
992 input.advance_to(new_ts(2));
993 worker.step();
994 persist.advance_to(new_ts(2));
995 worker.step();
996 });
997
998 let val = row(11, 50);
999 let expected: Vec<(Result<Row, DataflowError>, _, _)> =
1000 vec![(Ok(val), new_ts(0), Diff::ONE)];
1001 assert_eq!(actual, expected);
1002 }
1003}