1use std::fmt::Debug;
65use std::marker::PhantomData;
66use std::sync::Arc;
67
68use differential_dataflow::difference::{IsZero, Semigroup};
69use differential_dataflow::hashable::Hashable;
70use differential_dataflow::lattice::Lattice;
71use differential_dataflow::operators::arrange::agent::TraceAgent;
72use differential_dataflow::trace::implementations::ValSpine;
73use differential_dataflow::trace::implementations::chunker::ContainerChunker;
74use differential_dataflow::trace::implementations::merge_batcher::{
75 MergeBatcher, container::VecMerger,
76};
77use differential_dataflow::trace::{Batcher, Builder, Cursor, Description, TraceReader};
78use differential_dataflow::{AsCollection, VecCollection};
79use mz_repr::{Diff, GlobalId, Row};
80use mz_storage_types::errors::{DataflowError, EnvelopeError};
81use mz_timely_util::builder_async::{
82 Event as AsyncEvent, OperatorBuilder as AsyncOperatorBuilder, PressOnDropButton,
83};
84use std::convert::Infallible;
85use timely::container::CapacityContainerBuilder;
86use timely::dataflow::StreamVec;
87use timely::dataflow::channels::pact::{Exchange, Pipeline};
88use timely::dataflow::operators::{Capability, CapabilitySet, Exchange as _};
89use timely::order::{PartialOrder, TotalOrder};
90use timely::progress::timestamp::Refines;
91use timely::progress::{Antichain, Timestamp};
92
93use crate::healthcheck::HealthStatusUpdate;
94use crate::metrics::upsert::UpsertMetrics;
95use crate::upsert::UpsertKey;
96use crate::upsert::UpsertValue;
97
98#[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd)]
105struct UpsertDiff<O> {
106 from_time: O,
107 value: Option<UpsertValue>,
108}
109
110impl<O> IsZero for UpsertDiff<O> {
111 fn is_zero(&self) -> bool {
112 false
113 }
114}
115
116impl<O: Ord + Clone> Semigroup for UpsertDiff<O> {
117 fn plus_equals(&mut self, rhs: &Self) {
118 if rhs.from_time > self.from_time {
119 *self = rhs.clone();
120 }
121 }
122}
123
124type UpsertBatcher<T, FromTime> = MergeBatcher<
130 Vec<(UpsertKey, T, UpsertDiff<FromTime>)>,
131 ContainerChunker<Vec<(UpsertKey, T, UpsertDiff<FromTime>)>>,
132 VecMerger<UpsertKey, T, UpsertDiff<FromTime>>,
133>;
134
135struct CapturingBuilder<D, T>(D, PhantomData<T>);
140
141impl<D, T: Timestamp> Builder for CapturingBuilder<D, T> {
142 type Input = D;
143 type Time = T;
144 type Output = Vec<D>;
145
146 fn with_capacity(_keys: usize, _vals: usize, _upds: usize) -> Self {
147 unimplemented!()
148 }
149
150 fn push(&mut self, _chunk: &mut Self::Input) {
151 unimplemented!()
152 }
153
154 fn done(self, _description: Description<Self::Time>) -> Self::Output {
155 unimplemented!()
156 }
157
158 #[inline]
159 fn seal(chain: &mut Vec<Self::Input>, _description: Description<Self::Time>) -> Self::Output {
160 std::mem::take(chain)
161 }
162}
163
164#[allow(clippy::disallowed_methods)]
176pub fn upsert_inner<'scope, T, FromTime>(
177 input: VecCollection<'scope, T, (UpsertKey, Option<UpsertValue>, FromTime), Diff>,
178 key_indices: Vec<usize>,
179 resume_upper: Antichain<T>,
180 persist_input: VecCollection<'scope, T, Result<Row, DataflowError>, Diff>,
181 persist_token: Option<Vec<PressOnDropButton>>,
182 upsert_metrics: UpsertMetrics,
183 source_config: crate::source::SourceExportCreationConfig,
184) -> (
185 VecCollection<'scope, T, Result<Row, DataflowError>, Diff>,
186 StreamVec<'scope, T, (Option<GlobalId>, HealthStatusUpdate)>,
187 StreamVec<'scope, T, Infallible>,
188 PressOnDropButton,
189)
190where
191 T: Timestamp + TotalOrder + Sync,
192 T: Refines<mz_repr::Timestamp> + TotalOrder + differential_dataflow::lattice::Lattice + Sync,
193 FromTime: Debug + timely::ExchangeData + Clone + Ord + Sync,
194{
195 let persist_keyed = persist_input.flat_map(move |result| {
199 let value = match result {
200 Ok(ok) => Ok(ok),
201 Err(DataflowError::EnvelopeError(err)) => match *err {
202 EnvelopeError::Upsert(err) => Err(Box::new(err)),
203 EnvelopeError::Flat(_) => return None,
204 },
205 Err(_) => return None,
206 };
207 let value_ref = match value {
208 Ok(ref row) => Ok(row),
209 Err(ref err) => Err(&**err),
210 };
211 Some((UpsertKey::from_value(value_ref, &key_indices), value))
212 });
213 let persist_keyed = persist_keyed
214 .inner
215 .exchange(move |((key, _), _, _)| UpsertKey::hashed(key))
218 .as_collection()
219 .inspect(move |((_, row), _, diff)| {
220 source_config
221 .source_statistics
222 .update_records_indexed_by(diff.into_inner());
223 source_config.source_statistics.update_bytes_indexed_by(
224 row.as_ref().map_or(0, |r| r.byte_len().try_into().unwrap()) * diff.into_inner(),
225 );
226 });
227 let persist_arranged = persist_keyed.arrange_by_key();
228 let mut persist_trace = persist_arranged.trace.clone();
229
230 use timely::dataflow::operators::Probe;
234 let (persist_probe, _persist_probe_stream) = persist_arranged.stream.probe();
235
236 let mut builder = AsyncOperatorBuilder::new("Upsert V2".to_string(), input.scope());
238
239 let (output_handle, output) = builder.new_output::<CapacityContainerBuilder<_>>();
240 let (_snapshot_handle, snapshot_stream) =
241 builder.new_output::<CapacityContainerBuilder<Vec<Infallible>>>();
242 let (_health_output, health_stream) = builder
243 .new_output::<CapacityContainerBuilder<Vec<(Option<GlobalId>, HealthStatusUpdate)>>>();
244
245 let mut input = builder.new_input_for(
246 input.inner,
247 Exchange::new(move |((key, _, _), _, _)| UpsertKey::hashed(key)),
248 &output_handle,
249 );
250
251 let mut persist_wakeup = builder.new_disconnected_input(_persist_probe_stream, Pipeline);
255
256 let upsert_shared_metrics = Arc::clone(&upsert_metrics.shared);
257 let _ = upsert_shared_metrics;
258
259 let shutdown_button = builder.build(move |caps| async move {
260 let _persist_token = persist_token;
261
262 let [output_cap, snapshot_cap, _health_cap]: [_; 3] = caps.try_into().unwrap();
263 drop(output_cap);
264 let mut snapshot_cap = CapabilitySet::from_elem(snapshot_cap);
265
266 let mut hydrating = true;
267
268 let mut batcher: UpsertBatcher<T, FromTime> = Batcher::new(None, 0);
273 let mut push_buffer: Vec<(UpsertKey, T, UpsertDiff<FromTime>)> = Vec::new();
276 let mut stash_cap: Option<Capability<T>> = None;
280 let mut input_upper = Antichain::from_elem(Timestamp::minimum());
281
282 let mut output_updates = vec![];
283 let snapshot_start = std::time::Instant::now();
284 let mut prev_persist_upper = Antichain::from_elem(Timestamp::minimum());
285
286 let mut rehydration_total: u64 = 0;
288 let mut rehydration_updates: u64 = 0;
289
290 loop {
298 tokio::select! {
300 _ = input.ready() => {}
301 _ = persist_wakeup.ready() => {
302 while persist_wakeup.next_sync().is_some() {}
303 }
304 }
305
306 while let Some(event) = input.next_sync() {
311 match event {
312 AsyncEvent::Data(cap, data) => {
313 let mut pushed_any = false;
314 for ((key, value, from_time), ts, diff) in data {
315 assert!(diff.is_positive(), "invalid upsert input");
316 if PartialOrder::less_equal(&input_upper, &resume_upper)
317 && !resume_upper.less_equal(&ts)
318 {
319 continue;
320 }
321 push_buffer.push((key, ts, UpsertDiff { from_time, value }));
322 pushed_any = true;
323 }
324 if pushed_any {
327 stash_cap = Some(match stash_cap {
328 Some(prev) if cap.time() < prev.time() => cap,
329 Some(prev) => prev,
330 None => cap,
331 });
332 }
333 }
334 AsyncEvent::Progress(upper) => {
335 if PartialOrder::less_than(&upper, &resume_upper) {
336 continue;
337 }
338 input_upper = upper;
339 }
340 }
341 }
342
343 if !push_buffer.is_empty() {
347 batcher.push_container(&mut push_buffer);
348 }
349
350 let persist_upper = persist_probe.with_frontier(|f| f.to_owned());
359
360 if persist_upper != prev_persist_upper {
361 let last_rehydration_chunk =
362 hydrating && PartialOrder::less_equal(&resume_upper, &persist_upper);
363
364 if last_rehydration_chunk {
365 hydrating = false;
366 upsert_metrics
367 .rehydration_latency
368 .set(snapshot_start.elapsed().as_secs_f64());
369 upsert_metrics.rehydration_total.set(rehydration_total);
370 upsert_metrics.rehydration_updates.set(rehydration_updates);
371 tracing::info!(
372 worker_id = %source_config.worker_id,
373 source_id = %source_config.id,
374 "upsert finished rehydration",
375 );
376 snapshot_cap.downgrade(&[]);
377 }
378
379 let _ = snapshot_cap.try_downgrade(persist_upper.iter());
380
381 persist_trace.set_logical_compaction(persist_upper.borrow());
383 persist_trace.set_physical_compaction(persist_upper.borrow());
384
385 prev_persist_upper = persist_upper.clone();
386 }
387
388 if let Some(cap) = stash_cap.as_mut()
425 && !persist_upper.less_than(cap.time())
426 && PartialOrder::less_than(&persist_upper, &input_upper)
427 {
428 let sealed = batcher.seal::<CapturingBuilder<_, _>>(input_upper.clone());
429 let remaining_frontier = batcher.frontier().to_owned();
431
432 let mut ineligible = Vec::new();
433 let drain_stats = drain_sealed_input(
434 sealed,
435 &mut ineligible,
436 &mut output_updates,
437 &persist_upper,
438 &mut persist_trace,
439 &source_config.worker_id,
440 &source_config.id,
441 );
442
443 upsert_metrics.multi_get_size.inc_by(drain_stats.eligible);
444 upsert_metrics
445 .multi_get_result_count
446 .inc_by(drain_stats.result_count);
447 upsert_metrics
448 .multi_put_size
449 .inc_by(drain_stats.output_count);
450 upsert_metrics.upsert_inserts.inc_by(drain_stats.inserts);
451 upsert_metrics.upsert_updates.inc_by(drain_stats.updates);
452 upsert_metrics.upsert_deletes.inc_by(drain_stats.deletes);
453
454 if hydrating {
455 rehydration_total += drain_stats.inserts;
456 rehydration_updates += drain_stats.eligible;
457 }
458
459 for (update, ts, diff) in output_updates.drain(..) {
462 output_handle.give(cap, (update, ts, diff));
463 }
464
465 let min_ineligible_ts = ineligible.iter().map(|(_, ts, _)| ts).min().cloned();
470 if !ineligible.is_empty() {
471 batcher.push_container(&mut ineligible);
472 }
473
474 let has_remaining = !remaining_frontier.is_empty() || min_ineligible_ts.is_some();
475 if has_remaining {
476 let min_ts = match (
477 remaining_frontier.elements().first(),
478 min_ineligible_ts.as_ref(),
479 ) {
480 (Some(a), Some(b)) => std::cmp::min(a, b).clone(),
481 (Some(a), None) => a.clone(),
482 (None, Some(b)) => b.clone(),
483 (None, None) => unreachable!(),
484 };
485 cap.downgrade(&min_ts);
486 } else {
487 stash_cap = None;
490 }
491 }
492
493 if input_upper.is_empty() {
494 break;
495 }
496 }
497 });
498
499 (
500 output
501 .as_collection()
502 .map(|result: UpsertValue| match result {
503 Ok(ok) => Ok(ok),
504 Err(err) => Err(DataflowError::from(EnvelopeError::Upsert(*err))),
505 }),
506 health_stream,
507 snapshot_stream,
508 shutdown_button.press_on_drop(),
509 )
510}
511
512struct DrainStats {
514 eligible: u64,
516 result_count: u64,
518 inserts: u64,
520 updates: u64,
522 deletes: u64,
524 output_count: u64,
526}
527
528fn drain_sealed_input<T, FromTime>(
534 sealed: Vec<Vec<(UpsertKey, T, UpsertDiff<FromTime>)>>,
535 ineligible: &mut Vec<(UpsertKey, T, UpsertDiff<FromTime>)>,
536 output: &mut Vec<(UpsertValue, T, Diff)>,
537 persist_upper: &Antichain<T>,
538 trace: &mut TraceAgent<ValSpine<UpsertKey, UpsertValue, T, Diff>>,
539 worker_id: &usize,
540 source_id: &GlobalId,
541) -> DrainStats
542where
543 T: TotalOrder + Lattice + timely::ExchangeData + Timestamp + Clone + Debug + Ord + Sync,
544 FromTime: timely::ExchangeData + Clone + Ord + Sync,
545{
546 let mut eligible = Vec::new();
548 for chunk in sealed {
549 for entry in chunk {
550 let (_, ref ts, _) = entry;
551 if !persist_upper.less_than(ts) && persist_upper.less_equal(ts) {
552 eligible.push(entry);
553 } else {
554 ineligible.push(entry);
555 }
556 }
557 }
558
559 tracing::debug!(
560 worker_id = %worker_id,
561 source_id = %source_id,
562 ineligible = ineligible.len(),
563 eligible = eligible.len(),
564 "draining stash",
565 );
566
567 let eligible_count = u64::try_from(eligible.len()).expect("eligible count overflows u64");
568
569 if eligible.is_empty() {
570 return DrainStats {
571 eligible: 0,
572 result_count: 0,
573 inserts: 0,
574 updates: 0,
575 deletes: 0,
576 output_count: 0,
577 };
578 }
579
580 let output_before = output.len();
581 let mut result_count: u64 = 0;
582 let mut inserts: u64 = 0;
583 let mut updates: u64 = 0;
584 let mut deletes: u64 = 0;
585
586 let (mut cursor, storage) = trace.cursor();
589
590 for (key, ts, upsert_diff) in eligible {
591 cursor.seek_key(&storage, &key);
594 let old_value = if cursor.get_key(&storage) == Some(&key) {
595 let mut result = None;
596 while let Some(val) = cursor.get_val(&storage) {
597 let mut count = Diff::ZERO;
598 cursor.map_times(&storage, |_time, diff| {
599 count += diff.clone();
600 });
601 if count.is_positive() {
602 assert!(
603 count == 1.into(),
604 "unexpected multiple entries for the same key in persist trace"
605 );
606 result = Some(val.clone());
607 }
608 cursor.step_val(&storage);
609 }
610 result
611 } else {
612 None
613 };
614
615 if old_value.is_some() {
616 result_count += 1;
617 }
618
619 match upsert_diff.value {
620 Some(new_val) => {
621 if let Some(old_val) = old_value {
622 output.push((old_val, ts.clone(), Diff::MINUS_ONE));
623 updates += 1;
624 } else {
625 inserts += 1;
626 }
627 output.push((new_val, ts, Diff::ONE));
628 }
629 None => {
630 if let Some(old_val) = old_value {
631 output.push((old_val, ts, Diff::MINUS_ONE));
632 deletes += 1;
633 }
634 }
635 }
636 }
637
638 let output_count =
639 u64::try_from(output.len() - output_before).expect("output count overflows u64");
640
641 DrainStats {
642 eligible: eligible_count,
643 result_count,
644 inserts,
645 updates,
646 deletes,
647 output_count,
648 }
649}
650
651#[cfg(test)]
652mod test {
653 use mz_ore::metrics::MetricsRegistry;
654 use mz_persist_types::ShardId;
655 use mz_repr::{Datum, Timestamp as MzTimestamp};
656 use mz_storage_operators::persist_source::Subtime;
657 use mz_storage_types::sources::SourceEnvelope;
658 use mz_storage_types::sources::envelope::{KeyEnvelope, UpsertEnvelope, UpsertStyle};
659 use timely::dataflow::operators::capture::Extract;
660 use timely::dataflow::operators::{Capture, Input};
661 use timely::progress::Timestamp;
662
663 use crate::metrics::StorageMetrics;
664 use crate::metrics::upsert::UpsertMetricDefs;
665 use crate::source::SourceExportCreationConfig;
666 use crate::statistics::{SourceStatistics, SourceStatisticsMetricDefs};
667
668 use super::*;
669
670 type Ts = (MzTimestamp, Subtime);
671
672 fn new_ts(ts: u64) -> Ts {
673 (MzTimestamp::new(ts), Subtime::minimum())
674 }
675
676 fn key(k: i64) -> UpsertKey {
677 UpsertKey::from_key(Ok(&Row::pack_slice(&[Datum::Int64(k)])))
678 }
679
680 fn row(k: i64, v: i64) -> Row {
681 Row::pack_slice(&[Datum::Int64(k), Datum::Int64(v)])
682 }
683
684 macro_rules! upsert_test {
685 (|$input:ident, $persist:ident, $worker:ident| $body:block) => {{
686 let output_handle = timely::execute_directly(move |$worker| {
687 let (mut $input, mut $persist, output_handle) = $worker
688 .dataflow::<MzTimestamp, _, _>(|scope| {
689 scope.scoped::<Ts, _, _>("upsert", |scope| {
690 let (input_handle, input) = scope.new_input();
691 let (persist_handle, persist_input) = scope.new_input();
692 let source_id = GlobalId::User(0);
693
694 let reg = MetricsRegistry::new();
695 let upsert_defs = UpsertMetricDefs::register_with(®);
696 let upsert_metrics =
697 UpsertMetrics::new(&upsert_defs, source_id, 0, None);
698
699 let reg2 = MetricsRegistry::new();
700 let storage_metrics = StorageMetrics::register_with(®2);
701
702 let reg3 = MetricsRegistry::new();
703 let stats_defs =
704 SourceStatisticsMetricDefs::register_with(®3);
705 let envelope = SourceEnvelope::Upsert(UpsertEnvelope {
706 source_arity: 2,
707 style: UpsertStyle::Default(KeyEnvelope::Flattened),
708 key_indices: vec![0],
709 });
710 let source_statistics = SourceStatistics::new(
711 source_id, 0, &stats_defs, source_id, &ShardId::new(),
712 envelope, Antichain::from_elem(Timestamp::minimum()),
713 );
714 let source_config = SourceExportCreationConfig {
715 id: source_id,
716 worker_id: 0,
717 metrics: storage_metrics,
718 source_statistics,
719 };
720
721 let (output, _, _, button) = upsert_inner(
722 input.as_collection(),
723 vec![0],
724 Antichain::from_elem(Timestamp::minimum()),
725 persist_input.as_collection(),
726 None,
727 upsert_metrics,
728 source_config,
729 );
730 std::mem::forget(button);
731 (input_handle, persist_handle, output.inner.capture())
732 })
733 });
734
735 $body
736
737 output_handle
738 });
739
740 let mut actual: Vec<_> = output_handle
741 .extract()
742 .into_iter()
743 .flat_map(|(_cap, container)| container)
744 .collect();
745 differential_dataflow::consolidation::consolidate_updates(&mut actual);
746 actual
747 }};
748 }
749
750 #[mz_ore::test]
751 #[cfg_attr(miri, ignore)]
752 fn gh_9160_repro() {
753 let actual = upsert_test!(|input, persist, worker| {
754 let key0 = key(0);
755 let key1 = key(1);
756 let value1 = row(0, 0);
757 let value3 = row(0, 1);
758 let value4 = row(0, 2);
759
760 input.send(((key0, Some(Ok(value1.clone())), 1), new_ts(0), Diff::ONE));
761 input.advance_to(new_ts(2));
762 worker.step();
763
764 persist.send((Ok(value1), new_ts(0), Diff::ONE));
765 persist.advance_to(new_ts(1));
766 worker.step();
767
768 input.send_batch(&mut vec![
769 ((key1, None, 2), new_ts(2), Diff::ONE),
770 ((key0, Some(Ok(value3)), 3), new_ts(3), Diff::ONE),
771 ]);
772 input.advance_to(new_ts(3));
773 input.send_batch(&mut vec![(
774 (key0, Some(Ok(value4)), 4),
775 new_ts(3),
776 Diff::ONE,
777 )]);
778 input.advance_to(new_ts(4));
779 worker.step();
780
781 persist.advance_to(new_ts(3));
782 worker.step();
783 });
784
785 let value1 = row(0, 0);
786 let value4 = row(0, 2);
787 let expected: Vec<(Result<Row, DataflowError>, _, _)> = vec![
788 (Ok(value1.clone()), new_ts(0), Diff::ONE),
789 (Ok(value1), new_ts(3), Diff::MINUS_ONE),
790 (Ok(value4), new_ts(3), Diff::ONE),
791 ];
792 assert_eq!(actual, expected);
793 }
794
795 #[mz_ore::test]
796 #[cfg_attr(miri, ignore)]
797 fn out_of_order_keys_across_timestamps() {
798 let actual = upsert_test!(|input, persist, worker| {
799 let key_high = key(99);
800 let key_low = key(1);
801 let val_a = row(99, 1);
802 let val_b = row(1, 2);
803
804 input.send(((key_high, Some(Ok(val_a.clone())), 1), new_ts(0), Diff::ONE));
805 input.advance_to(new_ts(1));
806 worker.step();
807 persist.send((Ok(val_a.clone()), new_ts(0), Diff::ONE));
808 persist.advance_to(new_ts(1));
809 worker.step();
810
811 input.send(((key_low, Some(Ok(val_b.clone())), 2), new_ts(1), Diff::ONE));
812 input.advance_to(new_ts(2));
813 worker.step();
814 persist.send((Ok(val_b.clone()), new_ts(1), Diff::ONE));
815 persist.advance_to(new_ts(2));
816 worker.step();
817
818 let val_a2 = row(99, 10);
819 let val_b2 = row(1, 20);
820 input.send_batch(&mut vec![
821 (
822 (key_high, Some(Ok(val_a2.clone())), 3),
823 new_ts(2),
824 Diff::ONE,
825 ),
826 ((key_low, Some(Ok(val_b2.clone())), 4), new_ts(2), Diff::ONE),
827 ]);
828 input.advance_to(new_ts(3));
829 worker.step();
830 persist.advance_to(new_ts(3));
831 worker.step();
832 });
833
834 let val_a = row(99, 1);
835 let val_b = row(1, 2);
836 let val_a2 = row(99, 10);
837 let val_b2 = row(1, 20);
838 let expected: Vec<(Result<Row, DataflowError>, _, _)> = vec![
839 (Ok(val_b.clone()), new_ts(1), Diff::ONE),
840 (Ok(val_b), new_ts(2), Diff::MINUS_ONE),
841 (Ok(val_b2), new_ts(2), Diff::ONE),
842 (Ok(val_a.clone()), new_ts(0), Diff::ONE),
843 (Ok(val_a), new_ts(2), Diff::MINUS_ONE),
844 (Ok(val_a2), new_ts(2), Diff::ONE),
845 ];
846 let mut actual_sorted = actual;
847 let mut expected_sorted = expected;
848 actual_sorted.sort();
849 expected_sorted.sort();
850 assert_eq!(actual_sorted, expected_sorted);
851 }
852
853 #[mz_ore::test]
854 #[cfg_attr(miri, ignore)]
855 fn rehydration_then_update() {
856 let actual = upsert_test!(|input, persist, worker| {
857 let k = key(42);
858 let old_val = row(42, 100);
859 let new_val = row(42, 200);
860
861 persist.send((Ok(old_val), new_ts(0), Diff::ONE));
862 persist.advance_to(new_ts(1));
863 worker.step();
864
865 input.send(((k, Some(Ok(new_val)), 1), new_ts(1), Diff::ONE));
866 input.advance_to(new_ts(2));
867 worker.step();
868 persist.advance_to(new_ts(2));
869 worker.step();
870 });
871
872 let old_val = row(42, 100);
873 let new_val = row(42, 200);
874 let expected: Vec<(Result<Row, DataflowError>, _, _)> = vec![
875 (Ok(old_val), new_ts(1), Diff::MINUS_ONE),
876 (Ok(new_val), new_ts(1), Diff::ONE),
877 ];
878 assert_eq!(actual, expected);
879 }
880
881 #[mz_ore::test]
882 #[cfg_attr(miri, ignore)]
883 fn delete_existing_key() {
884 let actual = upsert_test!(|input, persist, worker| {
885 let k = key(7);
886 let val = row(7, 77);
887
888 input.send(((k, Some(Ok(val.clone())), 1), new_ts(0), Diff::ONE));
889 input.advance_to(new_ts(1));
890 worker.step();
891 persist.send((Ok(val), new_ts(0), Diff::ONE));
892 persist.advance_to(new_ts(1));
893 worker.step();
894
895 input.send(((k, None, 2), new_ts(1), Diff::ONE));
896 input.advance_to(new_ts(2));
897 worker.step();
898 persist.advance_to(new_ts(2));
899 worker.step();
900 });
901
902 let val = row(7, 77);
903 let expected: Vec<(Result<Row, DataflowError>, _, _)> = vec![
904 (Ok(val.clone()), new_ts(0), Diff::ONE),
905 (Ok(val), new_ts(1), Diff::MINUS_ONE),
906 ];
907 assert_eq!(actual, expected);
908 }
909
910 #[mz_ore::test]
911 #[cfg_attr(miri, ignore)]
912 fn multi_batch_rehydration() {
913 let actual = upsert_test!(|input, persist, worker| {
914 let k = key(5);
915 let old_val = row(5, 10);
916 let new_val = row(5, 20);
917 let updated_val = row(5, 30);
918
919 persist.send((Ok(old_val.clone()), new_ts(0), Diff::ONE));
920 persist.send((Ok(old_val), new_ts(0), Diff::MINUS_ONE));
921 persist.send((Ok(new_val), new_ts(0), Diff::ONE));
922 persist.advance_to(new_ts(1));
923 worker.step();
924
925 input.send(((k, Some(Ok(updated_val)), 1), new_ts(1), Diff::ONE));
926 input.advance_to(new_ts(2));
927 worker.step();
928 persist.advance_to(new_ts(2));
929 worker.step();
930 });
931
932 let new_val = row(5, 20);
933 let updated_val = row(5, 30);
934 let expected: Vec<(Result<Row, DataflowError>, _, _)> = vec![
935 (Ok(new_val), new_ts(1), Diff::MINUS_ONE),
936 (Ok(updated_val), new_ts(1), Diff::ONE),
937 ];
938 assert_eq!(actual, expected);
939 }
940
941 #[mz_ore::test]
942 #[cfg_attr(miri, ignore)]
943 fn delete_nonexistent_key() {
944 let actual = upsert_test!(|input, persist, worker| {
945 let k = key(99);
946
947 persist.advance_to(new_ts(1));
948 worker.step();
949
950 input.send(((k, None, 1), new_ts(1), Diff::ONE));
951 input.advance_to(new_ts(2));
952 worker.step();
953 persist.advance_to(new_ts(2));
954 worker.step();
955 });
956
957 assert!(actual.is_empty(), "expected empty output, got: {actual:?}");
958 }
959
960 #[mz_ore::test]
961 #[cfg_attr(miri, ignore)]
962 fn reinsert_after_delete() {
963 let actual = upsert_test!(|input, persist, worker| {
964 let k = key(3);
965 let val_a = row(3, 10);
966 let val_b = row(3, 20);
967
968 input.send(((k, Some(Ok(val_a.clone())), 1), new_ts(0), Diff::ONE));
969 input.advance_to(new_ts(1));
970 worker.step();
971 persist.send((Ok(val_a.clone()), new_ts(0), Diff::ONE));
972 persist.advance_to(new_ts(1));
973 worker.step();
974
975 input.send(((k, None, 2), new_ts(1), Diff::ONE));
976 input.advance_to(new_ts(2));
977 worker.step();
978 persist.send((Ok(val_a), new_ts(1), Diff::MINUS_ONE));
979 persist.advance_to(new_ts(2));
980 worker.step();
981
982 input.send(((k, Some(Ok(val_b.clone())), 3), new_ts(2), Diff::ONE));
983 input.advance_to(new_ts(3));
984 worker.step();
985 persist.advance_to(new_ts(3));
986 worker.step();
987 });
988
989 let val_a = row(3, 10);
990 let val_b = row(3, 20);
991 let mut expected: Vec<(Result<Row, DataflowError>, _, _)> = vec![
992 (Ok(val_a.clone()), new_ts(0), Diff::ONE),
993 (Ok(val_a), new_ts(1), Diff::MINUS_ONE),
994 (Ok(val_b), new_ts(2), Diff::ONE),
995 ];
996 expected.sort();
997 let mut actual = actual;
998 actual.sort();
999 assert_eq!(actual, expected);
1000 }
1001
1002 #[mz_ore::test]
1003 #[cfg_attr(miri, ignore)]
1004 fn idempotent_update() {
1005 let actual = upsert_test!(|input, persist, worker| {
1006 let k = key(11);
1007 let val = row(11, 50);
1008
1009 input.send(((k, Some(Ok(val.clone())), 1), new_ts(0), Diff::ONE));
1010 input.advance_to(new_ts(1));
1011 worker.step();
1012 persist.send((Ok(val.clone()), new_ts(0), Diff::ONE));
1013 persist.advance_to(new_ts(1));
1014 worker.step();
1015
1016 input.send(((k, Some(Ok(val.clone())), 2), new_ts(1), Diff::ONE));
1017 input.advance_to(new_ts(2));
1018 worker.step();
1019 persist.advance_to(new_ts(2));
1020 worker.step();
1021 });
1022
1023 let val = row(11, 50);
1024 let expected: Vec<(Result<Row, DataflowError>, _, _)> =
1025 vec![(Ok(val), new_ts(0), Diff::ONE)];
1026 assert_eq!(actual, expected);
1027 }
1028}