1use std::fmt::Debug;
73
74use columnar::Index as _;
75use differential_dataflow::difference::{IsZero, Semigroup};
76use differential_dataflow::hashable::Hashable;
77use differential_dataflow::lattice::Lattice;
78use differential_dataflow::logging::Logger;
79use differential_dataflow::operators::arrange::agent::TraceAgent;
80use differential_dataflow::operators::arrange::arrangement::arrange_core;
81use differential_dataflow::trace::{Batcher, Cursor, Description, TraceReader};
82use differential_dataflow::{AsCollection, VecCollection};
83use mz_repr::{Datum, Diff, GlobalId, Row};
84use mz_row_spine::{ValRowColPagedBuilder, ValRowSpine};
85use mz_storage_types::errors::{DataflowError, EnvelopeError, UpsertError};
86use mz_timely_util::builder_async::{
87 AsyncOutputHandle, Event as AsyncEvent, OperatorBuilder as AsyncOperatorBuilder,
88 PressOnDropButton,
89};
90use mz_timely_util::columnar::batcher::ColumnChunker;
91use mz_timely_util::columnar::builder::ColumnBuilder;
92use mz_timely_util::columnar::merge_batcher::ColumnMergeBatcher;
93use mz_timely_util::columnar::{Col2ValPagedBatcher, Column};
94use mz_timely_util::containers::stack::FueledBuilder;
95use std::convert::Infallible;
96use timely::container::{CapacityContainerBuilder, PushInto};
97use timely::dataflow::StreamVec;
98use timely::dataflow::channels::pact::{Exchange, Pipeline};
99use timely::dataflow::operators::generic::Operator;
100use timely::dataflow::operators::{Capability, CapabilitySet, Exchange as _};
101use timely::order::{PartialOrder, TotalOrder};
102use timely::progress::frontier::AntichainRef;
103use timely::progress::timestamp::Refines;
104use timely::progress::{Antichain, Timestamp};
105
106use crate::healthcheck::HealthStatusUpdate;
107use crate::metrics::upsert::UpsertMetrics;
108use crate::upsert::UpsertKey;
109use crate::upsert::UpsertSourceTime;
110use crate::upsert::UpsertValue;
111
112struct UpsertFeedbackBatcher<T: columnar::Columnar>(Col2ValPagedBatcher<UpsertKey, Row, T, Diff>);
123
124impl<T> Batcher for UpsertFeedbackBatcher<T>
125where
126 T: Timestamp + columnar::Columnar + Default + PartialOrder,
127 for<'a> columnar::Ref<'a, T>: Copy + Ord,
128{
129 type Output = Column<((UpsertKey, Row), T, Diff)>;
130 type Time = T;
131
132 fn new(logger: Option<Logger>, operator_id: usize) -> Self {
133 let mut batcher =
134 <Col2ValPagedBatcher<UpsertKey, Row, T, Diff> as Batcher>::new(logger, operator_id);
135 batcher.set_pager(crate::upsert::upsert_stash_pager::pager());
136 Self(batcher)
137 }
138
139 fn seal(&mut self, upper: Antichain<T>) -> (Vec<Self::Output>, Description<T>) {
140 self.0.seal(upper)
141 }
142
143 fn frontier(&mut self) -> AntichainRef<'_, T> {
144 self.0.frontier()
145 }
146}
147
148impl<T> PushInto<Column<((UpsertKey, Row), T, Diff)>> for UpsertFeedbackBatcher<T>
149where
150 T: Timestamp + columnar::Columnar + Default + PartialOrder,
151 for<'a> columnar::Ref<'a, T>: Copy + Ord,
152{
153 fn push_into(&mut self, chunk: Column<((UpsertKey, Row), T, Diff)>) {
154 self.0.push_into(chunk)
155 }
156}
157
158#[derive(Clone, Debug, Default, columnar::Columnar)]
176#[columnar(derive(PartialEq, Eq, PartialOrd, Ord))]
177struct UpsertDiff<O> {
178 from_time: O,
179 value: Option<Row>,
180}
181
182impl<O> IsZero for UpsertDiff<O> {
183 fn is_zero(&self) -> bool {
184 false
185 }
186}
187
188impl<O: Ord + Clone> Semigroup for UpsertDiff<O> {
189 fn plus_equals(&mut self, rhs: &Self) {
190 if rhs.from_time > self.from_time {
191 *self = rhs.clone();
192 }
193 }
194}
195
196impl<'a, O> Semigroup<columnar::Ref<'a, UpsertDiff<O>>> for UpsertDiff<O>
203where
204 O: columnar::Columnar + Ord + Clone,
205{
206 fn plus_equals(&mut self, rhs: &columnar::Ref<'a, UpsertDiff<O>>) {
207 let rhs_from_time = <O as columnar::Columnar>::into_owned(rhs.from_time);
208 if rhs_from_time > self.from_time {
209 self.from_time = rhs_from_time;
210 self.value = <Option<Row> as columnar::Columnar>::into_owned(rhs.value);
211 }
212 }
213}
214
215fn flush_to_batcher<T, O>(
220 updates: &mut Vec<UpsertUpdate<T, O>>,
221 chunker: &mut UpsertChunker<T, O>,
222 batcher: &mut UpsertBatcher<T, O>,
223) where
224 T: columnar::Columnar + Default + Clone + PartialOrder,
225 for<'a> columnar::Ref<'a, T>: Copy + Ord,
226 O: columnar::Columnar + Default + Ord + Clone,
227 for<'a> columnar::Ref<'a, O>: Ord,
228{
229 use timely::container::{ContainerBuilder as _, PushInto as _};
230 if updates.is_empty() {
231 return;
232 }
233 let mut raw: Column<UpsertUpdate<T, O>> = Default::default();
234 for update in updates.drain(..) {
235 raw.push_into(&update);
236 }
237 chunker.push_into(&mut raw);
238 while let Some(chunk) = chunker.extract() {
239 batcher.push_into(std::mem::take(chunk));
240 }
241}
242
243type UpsertUpdate<T, O> = (UpsertKey, T, UpsertDiff<O>);
254
255type UpsertBatcher<T, O> = ColumnMergeBatcher<UpsertKey, T, UpsertDiff<O>>;
256
257type UpsertChunker<T, O> = ColumnChunker<UpsertUpdate<T, O>>;
260
261type UpsertOutputHandle<T> =
265 AsyncOutputHandle<T, FueledBuilder<CapacityContainerBuilder<Vec<(UpsertValue, T, Diff)>>>>;
266
267fn upsert_value_to_row(value: &UpsertValue) -> Row {
277 let mut row = Row::default();
278 let mut packer = row.packer();
279 match value {
280 Ok(ok) => {
281 packer.push(Datum::UInt8(0));
282 packer.extend(ok.iter());
283 }
284 Err(err) => {
285 packer.push(Datum::UInt8(1));
286 let bytes =
287 bincode::serialize(err.as_ref()).expect("UpsertError is serializable via bincode");
288 packer.push(Datum::Bytes(&bytes));
289 }
290 }
291 row
292}
293
294fn upsert_value_byte_len(value: &UpsertValue) -> usize {
297 match value {
298 Ok(row) => row.byte_len(),
299 Err(err) => std::mem::size_of_val(err.as_ref()),
300 }
301}
302
303fn decode_upsert_value<'a>(mut iter: impl Iterator<Item = Datum<'a>>) -> UpsertValue {
306 let tag = match iter.next() {
307 Some(Datum::UInt8(tag)) => tag,
308 other => panic!("upsert value missing UInt8 tag, got {:?}", other),
309 };
310 match tag {
311 0 => {
312 let mut row = Row::default();
313 row.packer().extend(iter);
314 Ok(row)
315 }
316 1 => {
317 let bytes = match iter.next() {
318 Some(Datum::Bytes(b)) => b,
319 other => panic!("upsert error tag missing Bytes payload, got {:?}", other),
320 };
321 let err: UpsertError =
322 bincode::deserialize(bytes).expect("UpsertError bincode round-trip");
323 Err(Box::new(err))
324 }
325 tag => panic!("unknown upsert value tag {tag}"),
326 }
327}
328
329#[allow(clippy::disallowed_methods)]
341pub fn upsert_inner<'scope, T, FromTime>(
342 input: VecCollection<'scope, T, (UpsertKey, Option<UpsertValue>, FromTime), Diff>,
343 key_indices: Vec<usize>,
344 resume_upper: Antichain<T>,
345 persist_input: VecCollection<'scope, T, Result<Row, DataflowError>, Diff>,
346 persist_token: Option<Vec<PressOnDropButton>>,
347 upsert_metrics: UpsertMetrics,
348 source_config: crate::source::SourceExportCreationConfig,
349) -> (
350 VecCollection<'scope, T, Result<Row, DataflowError>, Diff>,
351 StreamVec<'scope, T, (Option<GlobalId>, HealthStatusUpdate)>,
352 StreamVec<'scope, T, Infallible>,
353 PressOnDropButton,
354)
355where
356 T: Timestamp + TotalOrder + Sync,
357 T: Refines<mz_repr::Timestamp> + differential_dataflow::lattice::Lattice,
358 T: columnation::Columnation,
359 T: columnar::Columnar + Default,
360 for<'a> columnar::Ref<'a, T>: Copy + Ord,
361 FromTime: Debug + timely::ExchangeData + Clone + Ord + Sync,
362 FromTime: UpsertSourceTime,
363{
364 let persist_keyed = persist_input.flat_map(move |result| {
368 let value = match result {
369 Ok(ok) => Ok(ok),
370 Err(DataflowError::EnvelopeError(err)) => match *err {
371 EnvelopeError::Upsert(err) => Err(Box::new(err)),
372 EnvelopeError::Flat(_) => return None,
373 },
374 Err(_) => return None,
375 };
376 let value_ref = match value {
377 Ok(ref row) => Ok(row),
378 Err(ref err) => Err(&**err),
379 };
380 Some((UpsertKey::from_value(value_ref, &key_indices), value))
381 });
382 let persist_keyed = persist_keyed
383 .inner
384 .exchange(move |((key, _), _, _)| UpsertKey::hashed(key))
387 .as_collection()
388 .inspect(move |((_, row), _, diff)| {
389 source_config
390 .source_statistics
391 .update_records_indexed_by(diff.into_inner());
392 source_config.source_statistics.update_bytes_indexed_by(
393 row.as_ref().map_or(0, |r| r.byte_len().try_into().unwrap()) * diff.into_inner(),
394 );
395 });
396 let encoded = persist_keyed
404 .inner
405 .unary::<ColumnBuilder<((UpsertKey, Row), T, Diff)>, _, _, _>(
406 Pipeline,
407 "Persist feedback encode",
408 |_, _| {
409 move |input, output| {
410 input.for_each(|time, data| {
411 let mut session = output.session_with_builder(&time);
412 for ((key, value), ts, diff) in data.drain(..) {
413 let row = upsert_value_to_row(&value);
414 session.give(((&key, &row), &ts, &diff));
415 }
416 });
417 }
418 },
419 );
420 let persist_arranged = arrange_core::<
421 _,
422 _,
423 ColumnChunker<((UpsertKey, Row), T, Diff)>,
424 UpsertFeedbackBatcher<T>,
425 ValRowColPagedBuilder<UpsertKey, T, Diff>,
426 ValRowSpine<UpsertKey, T, Diff>,
427 >(encoded, Pipeline, "Persist feedback");
428 let mut persist_trace = persist_arranged.trace.clone();
429
430 use timely::dataflow::operators::Probe;
434 let (persist_probe, _persist_probe_stream) = persist_arranged.stream.probe();
435
436 let mut builder = AsyncOperatorBuilder::new("Upsert V2".to_string(), input.scope());
438
439 let (output_handle, output) = builder
440 .new_output::<FueledBuilder<CapacityContainerBuilder<Vec<(UpsertValue, T, Diff)>>>>();
441 let (_snapshot_handle, snapshot_stream) =
442 builder.new_output::<CapacityContainerBuilder<Vec<Infallible>>>();
443 let (_health_output, health_stream) = builder
444 .new_output::<CapacityContainerBuilder<Vec<(Option<GlobalId>, HealthStatusUpdate)>>>();
445
446 let mut input = builder.new_input_for(
447 input.inner,
448 Exchange::new(move |((key, _, _), _, _)| UpsertKey::hashed(key)),
449 &output_handle,
450 );
451
452 let mut persist_wakeup = builder.new_disconnected_input(_persist_probe_stream, Pipeline);
456
457 let shutdown_button = builder.build(move |caps| async move {
458 let _persist_token = persist_token;
461
462 let [output_cap, snapshot_cap, _health_cap]: [_; 3] = caps.try_into().unwrap();
463 drop(output_cap);
464 let mut snapshot_cap = CapabilitySet::from_elem(snapshot_cap);
465
466 let mut hydrating = true;
467
468 let mut batcher: UpsertBatcher<T, FromTime::Order> = Batcher::new(None, 0);
481 batcher.set_pager(crate::upsert::upsert_stash_pager::pager());
482 let mut chunker: UpsertChunker<T, FromTime::Order> = Default::default();
485 let mut push_buffer: Vec<UpsertUpdate<T, FromTime::Order>> = Vec::new();
488
489 let mut stash_cap: Option<Capability<T>> = None;
493 let mut input_upper = Antichain::from_elem(Timestamp::minimum());
494
495 let snapshot_start = std::time::Instant::now();
496 let mut prev_persist_upper = Antichain::from_elem(Timestamp::minimum());
497
498 let mut rehydration_total: u64 = 0;
500 let mut rehydration_updates: u64 = 0;
501
502 loop {
508 tokio::select! {
510 _ = input.ready() => {}
511 _ = persist_wakeup.ready() => {
512 while persist_wakeup.next_sync().is_some() {}
513 }
514 }
515
516 while let Some(event) = input.next_sync() {
521 match event {
522 AsyncEvent::Data(cap, data) => {
523 let mut pushed_any = false;
524 for ((key, value, from_time), ts, diff) in data {
525 assert!(diff.is_positive(), "invalid upsert input");
526 if PartialOrder::less_equal(&input_upper, &resume_upper)
527 && !resume_upper.less_equal(&ts)
528 {
529 continue;
530 }
531 let value = value.as_ref().map(upsert_value_to_row);
532 let from_time = from_time.upsert_order();
533 push_buffer.push((key, ts, UpsertDiff { from_time, value }));
534 pushed_any = true;
535 }
536 if pushed_any {
539 stash_cap = Some(match stash_cap {
540 Some(prev) if cap.time() < prev.time() => cap,
541 Some(prev) => prev,
542 None => cap,
543 });
544 }
545 }
546 AsyncEvent::Progress(upper) => {
547 if PartialOrder::less_than(&upper, &resume_upper) {
548 continue;
549 }
550 input_upper = upper;
551 }
552 }
553 }
554
555 flush_to_batcher(&mut push_buffer, &mut chunker, &mut batcher);
559
560 let persist_upper = persist_probe.with_frontier(|f| f.to_owned());
569
570 if persist_upper != prev_persist_upper {
571 let last_rehydration_chunk =
572 hydrating && PartialOrder::less_equal(&resume_upper, &persist_upper);
573
574 if last_rehydration_chunk {
575 hydrating = false;
576 upsert_metrics
577 .rehydration_latency
578 .set(snapshot_start.elapsed().as_secs_f64());
579 upsert_metrics.rehydration_total.set(rehydration_total);
580 upsert_metrics.rehydration_updates.set(rehydration_updates);
581 tracing::info!(
582 worker_id = %source_config.worker_id,
583 source_id = %source_config.id,
584 "upsert finished rehydration",
585 );
586 snapshot_cap.downgrade(&[]);
587 }
588
589 let _ = snapshot_cap.try_downgrade(persist_upper.iter());
590
591 persist_trace.set_logical_compaction(persist_upper.borrow());
593 persist_trace.set_physical_compaction(persist_upper.borrow());
594
595 prev_persist_upper = persist_upper.clone();
596 }
597
598 if let Some(cap) = stash_cap.as_mut()
635 && !persist_upper.less_than(cap.time())
636 && PartialOrder::less_than(&persist_upper, &input_upper)
637 {
638 let (sealed, _description) = batcher.seal(input_upper.clone());
642 let remaining_frontier = batcher.frontier().to_owned();
644
645 let mut ineligible = Vec::new();
646 let drain_stats = drain_sealed_input(
650 sealed,
651 &mut ineligible,
652 &output_handle,
653 &*cap,
654 &persist_upper,
655 &mut persist_trace,
656 &source_config.worker_id,
657 &source_config.id,
658 )
659 .await;
660
661 upsert_metrics.multi_get_size.inc_by(drain_stats.eligible);
662 upsert_metrics
663 .multi_get_result_count
664 .inc_by(drain_stats.result_count);
665 upsert_metrics
666 .multi_put_size
667 .inc_by(drain_stats.output_count);
668 upsert_metrics.upsert_inserts.inc_by(drain_stats.inserts);
669 upsert_metrics.upsert_updates.inc_by(drain_stats.updates);
670 upsert_metrics.upsert_deletes.inc_by(drain_stats.deletes);
671
672 if hydrating {
673 rehydration_total += drain_stats.inserts;
674 rehydration_updates += drain_stats.eligible;
675 }
676
677 let min_ineligible_ts = ineligible.iter().map(|(_, ts, _)| ts).min().cloned();
682 flush_to_batcher(&mut ineligible, &mut chunker, &mut batcher);
683
684 let has_remaining = !remaining_frontier.is_empty() || min_ineligible_ts.is_some();
685 if has_remaining {
686 let min_ts = match (
687 remaining_frontier.elements().first(),
688 min_ineligible_ts.as_ref(),
689 ) {
690 (Some(a), Some(b)) => std::cmp::min(a, b).clone(),
691 (Some(a), None) => a.clone(),
692 (None, Some(b)) => b.clone(),
693 (None, None) => unreachable!(),
694 };
695 cap.downgrade(&min_ts);
696 } else {
697 stash_cap = None;
700 }
701 }
702
703 if input_upper.is_empty() {
704 break;
705 }
706 }
707 });
708
709 (
710 output
711 .as_collection()
712 .map(|result: UpsertValue| match result {
713 Ok(ok) => Ok(ok),
714 Err(err) => Err(DataflowError::from(EnvelopeError::Upsert(*err))),
715 }),
716 health_stream,
717 snapshot_stream,
718 shutdown_button.press_on_drop(),
719 )
720}
721
722struct DrainStats {
724 eligible: u64,
726 result_count: u64,
728 inserts: u64,
730 updates: u64,
732 deletes: u64,
734 output_count: u64,
736}
737
738async fn drain_sealed_input<T, O>(
747 sealed: Vec<Column<UpsertUpdate<T, O>>>,
748 ineligible: &mut Vec<UpsertUpdate<T, O>>,
749 output_handle: &UpsertOutputHandle<T>,
750 output_cap: &Capability<T>,
751 persist_upper: &Antichain<T>,
752 trace: &mut TraceAgent<ValRowSpine<UpsertKey, T, Diff>>,
753 worker_id: &usize,
754 source_id: &GlobalId,
755) -> DrainStats
756where
757 T: TotalOrder + Lattice + timely::ExchangeData + Timestamp + Clone + Debug + Ord + Sync,
758 T: columnation::Columnation + columnar::Columnar,
759 O: columnar::Columnar,
760{
761 let mut eligible_count: u64 = 0;
782 let mut result_count: u64 = 0;
783 let mut output_count: u64 = 0;
784 let mut inserts: u64 = 0;
785 let mut updates: u64 = 0;
786 let mut deletes: u64 = 0;
787
788 let (mut cursor, storage) = trace.cursor();
789
790 for chunk in &sealed {
791 for (key, ts, diff) in chunk.borrow().into_index_iter() {
792 let ts = <T as columnar::Columnar>::into_owned(ts);
793 if !persist_upper.less_equal(&ts) {
794 continue;
796 }
797 if persist_upper.less_than(&ts) {
798 ineligible.push((
800 *key,
801 ts,
802 <UpsertDiff<O> as columnar::Columnar>::into_owned(diff),
803 ));
804 continue;
805 }
806
807 eligible_count += 1;
812 cursor.seek_key(&storage, key);
813 let old_value = match cursor.get_key(&storage) {
814 Some(found) if found == key => {
815 let mut result = None;
816 while let Some(val) = cursor.get_val(&storage) {
817 let mut count = Diff::ZERO;
818 cursor.map_times(&storage, |_time, d| {
819 count += d.clone();
820 });
821 if count.is_positive() {
822 assert!(
823 count == 1.into(),
824 "unexpected multiple entries for the same key in persist trace"
825 );
826 assert!(
827 result.is_none(),
828 "unexpected multiple values for the same key in persist trace"
829 );
830 result = Some(decode_upsert_value(val));
831 }
832 cursor.step_val(&storage);
833 }
834 result
835 }
836 _ => None,
837 };
838
839 if old_value.is_some() {
840 result_count += 1;
841 }
842
843 match diff.value {
844 Some(row) => {
845 if let Some(old_val) = old_value {
846 let size = upsert_value_byte_len(&old_val);
847 output_handle
848 .give_fueled(output_cap, (old_val, ts.clone(), Diff::MINUS_ONE), size)
849 .await;
850 output_count += 1;
851 updates += 1;
852 } else {
853 inserts += 1;
854 }
855 let new_val = decode_upsert_value(row.iter());
856 let size = upsert_value_byte_len(&new_val);
857 output_handle
858 .give_fueled(output_cap, (new_val, ts, Diff::ONE), size)
859 .await;
860 output_count += 1;
861 }
862 None => {
863 if let Some(old_val) = old_value {
864 let size = upsert_value_byte_len(&old_val);
865 output_handle
866 .give_fueled(output_cap, (old_val, ts, Diff::MINUS_ONE), size)
867 .await;
868 output_count += 1;
869 deletes += 1;
870 }
871 }
872 }
873 }
874 }
875
876 tracing::debug!(
877 worker_id = %worker_id,
878 source_id = %source_id,
879 ineligible = ineligible.len(),
880 eligible = eligible_count,
881 "drained stash",
882 );
883
884 DrainStats {
885 eligible: eligible_count,
886 result_count,
887 inserts,
888 updates,
889 deletes,
890 output_count,
891 }
892}
893
894#[cfg(test)]
895mod test {
896 use mz_ore::metrics::MetricsRegistry;
897 use mz_persist_types::ShardId;
898 use mz_repr::{Datum, Timestamp as MzTimestamp};
899 use mz_storage_operators::persist_source::Subtime;
900 use mz_storage_types::sources::SourceEnvelope;
901 use mz_storage_types::sources::envelope::{KeyEnvelope, UpsertEnvelope, UpsertStyle};
902 use timely::dataflow::operators::capture::Extract;
903 use timely::dataflow::operators::{Capture, Input};
904 use timely::progress::Timestamp;
905
906 use crate::metrics::StorageMetrics;
907 use crate::metrics::upsert::UpsertMetricDefs;
908 use crate::source::SourceExportCreationConfig;
909 use crate::statistics::{SourceStatistics, SourceStatisticsMetricDefs};
910
911 use super::*;
912
913 impl UpsertSourceTime for i32 {
916 type Order = i32;
917 fn upsert_order(&self) -> i32 {
918 *self
919 }
920 }
921
922 type Ts = (MzTimestamp, Subtime);
923
924 fn new_ts(ts: u64) -> Ts {
925 (MzTimestamp::new(ts), Subtime::minimum())
926 }
927
928 fn key(k: i64) -> UpsertKey {
929 UpsertKey::from_key(Ok(&Row::pack_slice(&[Datum::Int64(k)])))
930 }
931
932 fn row(k: i64, v: i64) -> Row {
933 Row::pack_slice(&[Datum::Int64(k), Datum::Int64(v)])
934 }
935
936 macro_rules! upsert_test {
937 (|$input:ident, $persist:ident, $worker:ident| $body:block) => {{
938 let output_handle = timely::execute_directly(move |$worker| {
939 let (mut $input, mut $persist, output_handle) = $worker
940 .dataflow::<MzTimestamp, _, _>(|scope| {
941 scope.scoped::<Ts, _, _>("upsert", |scope| {
942 let (input_handle, input) = scope.new_input();
943 let (persist_handle, persist_input) = scope.new_input();
944 let source_id = GlobalId::User(0);
945
946 let reg = MetricsRegistry::new();
947 let upsert_defs = UpsertMetricDefs::register_with(®);
948 let upsert_metrics =
949 UpsertMetrics::new(&upsert_defs, source_id, 0, None);
950
951 let reg2 = MetricsRegistry::new();
952 let storage_metrics = StorageMetrics::register_with(®2);
953
954 let reg3 = MetricsRegistry::new();
955 let stats_defs =
956 SourceStatisticsMetricDefs::register_with(®3);
957 let envelope = SourceEnvelope::Upsert(UpsertEnvelope {
958 source_arity: 2,
959 style: UpsertStyle::Default(KeyEnvelope::Flattened),
960 key_indices: vec![0],
961 });
962 let source_statistics = SourceStatistics::new(
963 source_id, 0, &stats_defs, source_id, &ShardId::new(),
964 envelope, Antichain::from_elem(Timestamp::minimum()),
965 );
966 let source_config = SourceExportCreationConfig {
967 id: source_id,
968 worker_id: 0,
969 metrics: storage_metrics,
970 source_statistics,
971 };
972
973 let (output, _, _, button) = upsert_inner(
974 input.as_collection(),
975 vec![0],
976 Antichain::from_elem(Timestamp::minimum()),
977 persist_input.as_collection(),
978 None,
979 upsert_metrics,
980 source_config,
981 );
982 std::mem::forget(button);
983 (input_handle, persist_handle, output.inner.capture())
984 })
985 });
986
987 $body
988
989 output_handle
990 });
991
992 let mut actual: Vec<_> = output_handle
993 .extract()
994 .into_iter()
995 .flat_map(|(_cap, container)| container)
996 .collect();
997 differential_dataflow::consolidation::consolidate_updates(&mut actual);
998 actual
999 }};
1000 }
1001
1002 #[mz_ore::test]
1003 #[cfg_attr(miri, ignore)]
1004 fn gh_9160_repro() {
1005 let actual = upsert_test!(|input, persist, worker| {
1006 let key0 = key(0);
1007 let key1 = key(1);
1008 let value1 = row(0, 0);
1009 let value3 = row(0, 1);
1010 let value4 = row(0, 2);
1011
1012 input.send(((key0, Some(Ok(value1.clone())), 1), new_ts(0), Diff::ONE));
1013 input.advance_to(new_ts(2));
1014 worker.step();
1015
1016 persist.send((Ok(value1), new_ts(0), Diff::ONE));
1017 persist.advance_to(new_ts(1));
1018 worker.step();
1019
1020 input.send_batch(&mut vec![
1021 ((key1, None, 2), new_ts(2), Diff::ONE),
1022 ((key0, Some(Ok(value3)), 3), new_ts(3), Diff::ONE),
1023 ]);
1024 input.advance_to(new_ts(3));
1025 input.send_batch(&mut vec![(
1026 (key0, Some(Ok(value4)), 4),
1027 new_ts(3),
1028 Diff::ONE,
1029 )]);
1030 input.advance_to(new_ts(4));
1031 worker.step();
1032
1033 persist.advance_to(new_ts(3));
1034 worker.step();
1035 });
1036
1037 let value1 = row(0, 0);
1038 let value4 = row(0, 2);
1039 let expected: Vec<(Result<Row, DataflowError>, _, _)> = vec![
1040 (Ok(value1.clone()), new_ts(0), Diff::ONE),
1041 (Ok(value1), new_ts(3), Diff::MINUS_ONE),
1042 (Ok(value4), new_ts(3), Diff::ONE),
1043 ];
1044 assert_eq!(actual, expected);
1045 }
1046
1047 #[mz_ore::test]
1048 #[cfg_attr(miri, ignore)]
1049 fn out_of_order_keys_across_timestamps() {
1050 let actual = upsert_test!(|input, persist, worker| {
1051 let key_high = key(99);
1052 let key_low = key(1);
1053 let val_a = row(99, 1);
1054 let val_b = row(1, 2);
1055
1056 input.send(((key_high, Some(Ok(val_a.clone())), 1), new_ts(0), Diff::ONE));
1057 input.advance_to(new_ts(1));
1058 worker.step();
1059 persist.send((Ok(val_a.clone()), new_ts(0), Diff::ONE));
1060 persist.advance_to(new_ts(1));
1061 worker.step();
1062
1063 input.send(((key_low, Some(Ok(val_b.clone())), 2), new_ts(1), Diff::ONE));
1064 input.advance_to(new_ts(2));
1065 worker.step();
1066 persist.send((Ok(val_b.clone()), new_ts(1), Diff::ONE));
1067 persist.advance_to(new_ts(2));
1068 worker.step();
1069
1070 let val_a2 = row(99, 10);
1071 let val_b2 = row(1, 20);
1072 input.send_batch(&mut vec![
1073 (
1074 (key_high, Some(Ok(val_a2.clone())), 3),
1075 new_ts(2),
1076 Diff::ONE,
1077 ),
1078 ((key_low, Some(Ok(val_b2.clone())), 4), new_ts(2), Diff::ONE),
1079 ]);
1080 input.advance_to(new_ts(3));
1081 worker.step();
1082 persist.advance_to(new_ts(3));
1083 worker.step();
1084 });
1085
1086 let val_a = row(99, 1);
1087 let val_b = row(1, 2);
1088 let val_a2 = row(99, 10);
1089 let val_b2 = row(1, 20);
1090 let expected: Vec<(Result<Row, DataflowError>, _, _)> = vec![
1091 (Ok(val_b.clone()), new_ts(1), Diff::ONE),
1092 (Ok(val_b), new_ts(2), Diff::MINUS_ONE),
1093 (Ok(val_b2), new_ts(2), Diff::ONE),
1094 (Ok(val_a.clone()), new_ts(0), Diff::ONE),
1095 (Ok(val_a), new_ts(2), Diff::MINUS_ONE),
1096 (Ok(val_a2), new_ts(2), Diff::ONE),
1097 ];
1098 let mut actual_sorted = actual;
1099 let mut expected_sorted = expected;
1100 actual_sorted.sort();
1101 expected_sorted.sort();
1102 assert_eq!(actual_sorted, expected_sorted);
1103 }
1104
1105 #[mz_ore::test]
1106 #[cfg_attr(miri, ignore)]
1107 fn rehydration_then_update() {
1108 let actual = upsert_test!(|input, persist, worker| {
1109 let k = key(42);
1110 let old_val = row(42, 100);
1111 let new_val = row(42, 200);
1112
1113 persist.send((Ok(old_val), new_ts(0), Diff::ONE));
1114 persist.advance_to(new_ts(1));
1115 worker.step();
1116
1117 input.send(((k, Some(Ok(new_val)), 1), new_ts(1), Diff::ONE));
1118 input.advance_to(new_ts(2));
1119 worker.step();
1120 persist.advance_to(new_ts(2));
1121 worker.step();
1122 });
1123
1124 let old_val = row(42, 100);
1125 let new_val = row(42, 200);
1126 let expected: Vec<(Result<Row, DataflowError>, _, _)> = vec![
1127 (Ok(old_val), new_ts(1), Diff::MINUS_ONE),
1128 (Ok(new_val), new_ts(1), Diff::ONE),
1129 ];
1130 assert_eq!(actual, expected);
1131 }
1132
1133 #[mz_ore::test]
1134 #[cfg_attr(miri, ignore)]
1135 fn delete_existing_key() {
1136 let actual = upsert_test!(|input, persist, worker| {
1137 let k = key(7);
1138 let val = row(7, 77);
1139
1140 input.send(((k, Some(Ok(val.clone())), 1), new_ts(0), Diff::ONE));
1141 input.advance_to(new_ts(1));
1142 worker.step();
1143 persist.send((Ok(val), new_ts(0), Diff::ONE));
1144 persist.advance_to(new_ts(1));
1145 worker.step();
1146
1147 input.send(((k, None, 2), new_ts(1), Diff::ONE));
1148 input.advance_to(new_ts(2));
1149 worker.step();
1150 persist.advance_to(new_ts(2));
1151 worker.step();
1152 });
1153
1154 let val = row(7, 77);
1155 let expected: Vec<(Result<Row, DataflowError>, _, _)> = vec![
1156 (Ok(val.clone()), new_ts(0), Diff::ONE),
1157 (Ok(val), new_ts(1), Diff::MINUS_ONE),
1158 ];
1159 assert_eq!(actual, expected);
1160 }
1161
1162 #[mz_ore::test]
1163 #[cfg_attr(miri, ignore)]
1164 fn multi_batch_rehydration() {
1165 let actual = upsert_test!(|input, persist, worker| {
1166 let k = key(5);
1167 let old_val = row(5, 10);
1168 let new_val = row(5, 20);
1169 let updated_val = row(5, 30);
1170
1171 persist.send((Ok(old_val.clone()), new_ts(0), Diff::ONE));
1172 persist.send((Ok(old_val), new_ts(0), Diff::MINUS_ONE));
1173 persist.send((Ok(new_val), new_ts(0), Diff::ONE));
1174 persist.advance_to(new_ts(1));
1175 worker.step();
1176
1177 input.send(((k, Some(Ok(updated_val)), 1), new_ts(1), Diff::ONE));
1178 input.advance_to(new_ts(2));
1179 worker.step();
1180 persist.advance_to(new_ts(2));
1181 worker.step();
1182 });
1183
1184 let new_val = row(5, 20);
1185 let updated_val = row(5, 30);
1186 let expected: Vec<(Result<Row, DataflowError>, _, _)> = vec![
1187 (Ok(new_val), new_ts(1), Diff::MINUS_ONE),
1188 (Ok(updated_val), new_ts(1), Diff::ONE),
1189 ];
1190 assert_eq!(actual, expected);
1191 }
1192
1193 #[mz_ore::test]
1194 #[cfg_attr(miri, ignore)]
1195 fn delete_nonexistent_key() {
1196 let actual = upsert_test!(|input, persist, worker| {
1197 let k = key(99);
1198
1199 persist.advance_to(new_ts(1));
1200 worker.step();
1201
1202 input.send(((k, None, 1), new_ts(1), Diff::ONE));
1203 input.advance_to(new_ts(2));
1204 worker.step();
1205 persist.advance_to(new_ts(2));
1206 worker.step();
1207 });
1208
1209 assert!(actual.is_empty(), "expected empty output, got: {actual:?}");
1210 }
1211
1212 #[mz_ore::test]
1213 #[cfg_attr(miri, ignore)]
1214 fn reinsert_after_delete() {
1215 let actual = upsert_test!(|input, persist, worker| {
1216 let k = key(3);
1217 let val_a = row(3, 10);
1218 let val_b = row(3, 20);
1219
1220 input.send(((k, Some(Ok(val_a.clone())), 1), new_ts(0), Diff::ONE));
1221 input.advance_to(new_ts(1));
1222 worker.step();
1223 persist.send((Ok(val_a.clone()), new_ts(0), Diff::ONE));
1224 persist.advance_to(new_ts(1));
1225 worker.step();
1226
1227 input.send(((k, None, 2), new_ts(1), Diff::ONE));
1228 input.advance_to(new_ts(2));
1229 worker.step();
1230 persist.send((Ok(val_a), new_ts(1), Diff::MINUS_ONE));
1231 persist.advance_to(new_ts(2));
1232 worker.step();
1233
1234 input.send(((k, Some(Ok(val_b.clone())), 3), new_ts(2), Diff::ONE));
1235 input.advance_to(new_ts(3));
1236 worker.step();
1237 persist.advance_to(new_ts(3));
1238 worker.step();
1239 });
1240
1241 let val_a = row(3, 10);
1242 let val_b = row(3, 20);
1243 let mut expected: Vec<(Result<Row, DataflowError>, _, _)> = vec![
1244 (Ok(val_a.clone()), new_ts(0), Diff::ONE),
1245 (Ok(val_a), new_ts(1), Diff::MINUS_ONE),
1246 (Ok(val_b), new_ts(2), Diff::ONE),
1247 ];
1248 expected.sort();
1249 let mut actual = actual;
1250 actual.sort();
1251 assert_eq!(actual, expected);
1252 }
1253
1254 #[mz_ore::test]
1255 #[cfg_attr(miri, ignore)]
1256 fn idempotent_update() {
1257 let actual = upsert_test!(|input, persist, worker| {
1258 let k = key(11);
1259 let val = row(11, 50);
1260
1261 input.send(((k, Some(Ok(val.clone())), 1), new_ts(0), Diff::ONE));
1262 input.advance_to(new_ts(1));
1263 worker.step();
1264 persist.send((Ok(val.clone()), new_ts(0), Diff::ONE));
1265 persist.advance_to(new_ts(1));
1266 worker.step();
1267
1268 input.send(((k, Some(Ok(val.clone())), 2), new_ts(1), Diff::ONE));
1269 input.advance_to(new_ts(2));
1270 worker.step();
1271 persist.advance_to(new_ts(2));
1272 worker.step();
1273 });
1274
1275 let val = row(11, 50);
1276 let expected: Vec<(Result<Row, DataflowError>, _, _)> =
1277 vec![(Ok(val), new_ts(0), Diff::ONE)];
1278 assert_eq!(actual, expected);
1279 }
1280
1281 #[mz_ore::test]
1298 #[cfg_attr(miri, ignore)]
1299 fn lagging_replacement_below_upper_strands_data() {
1300 let (frontier, emitted) = run_below_upper_scenario_v2();
1301
1302 assert!(
1306 emitted.is_empty(),
1307 "below-upper data should be dropped, not emitted; got {emitted:?}"
1308 );
1309 assert_eq!(
1310 frontier,
1311 vec![new_ts(11)],
1312 "v2 output frontier should advance to the input upper, not pin below \
1313 persist_upper"
1314 );
1315 assert!(
1316 frontier[0] >= new_ts(10),
1317 "v2 output frontier {frontier:?} should reach at least persist_upper (10)"
1318 );
1319 }
1320
1321 fn run_below_upper_scenario_v2() -> (Vec<Ts>, Vec<(Result<Row, DataflowError>, Ts, Diff)>) {
1324 use timely::dataflow::operators::Probe;
1325
1326 let (frontier, capture) = timely::execute_directly(move |worker| {
1327 let (mut input, mut persist, probe, capture) =
1328 worker.dataflow::<MzTimestamp, _, _>(|scope| {
1329 scope.scoped::<Ts, _, _>("upsert", |scope| {
1330 let (input_handle, input) = scope.new_input();
1331 let (persist_handle, persist_input) = scope.new_input();
1332 let source_id = GlobalId::User(0);
1333
1334 let reg = MetricsRegistry::new();
1335 let upsert_defs = UpsertMetricDefs::register_with(®);
1336 let upsert_metrics = UpsertMetrics::new(&upsert_defs, source_id, 0, None);
1337
1338 let reg2 = MetricsRegistry::new();
1339 let storage_metrics = StorageMetrics::register_with(®2);
1340
1341 let reg3 = MetricsRegistry::new();
1342 let stats_defs = SourceStatisticsMetricDefs::register_with(®3);
1343 let envelope = SourceEnvelope::Upsert(UpsertEnvelope {
1344 source_arity: 2,
1345 style: UpsertStyle::Default(KeyEnvelope::Flattened),
1346 key_indices: vec![0],
1347 });
1348 let source_statistics = SourceStatistics::new(
1349 source_id,
1350 0,
1351 &stats_defs,
1352 source_id,
1353 &ShardId::new(),
1354 envelope,
1355 Antichain::from_elem(Timestamp::minimum()),
1356 );
1357 let source_config = SourceExportCreationConfig {
1358 id: source_id,
1359 worker_id: 0,
1360 metrics: storage_metrics,
1361 source_statistics,
1362 };
1363
1364 let (output, _, _, button) = upsert_inner(
1365 input.as_collection(),
1366 vec![0],
1367 Antichain::from_elem(Timestamp::minimum()),
1368 persist_input.as_collection(),
1369 None,
1370 upsert_metrics,
1371 source_config,
1372 );
1373 std::mem::forget(button);
1374 let (probe, stream) = output.inner.probe();
1375 (input_handle, persist_handle, probe, stream.capture())
1376 })
1377 });
1378
1379 persist.advance_to(new_ts(10));
1382 for _ in 0..20 {
1383 worker.step();
1384 }
1385
1386 input.send(((key(0), Some(Ok(row(0, 1))), 1), new_ts(5), Diff::ONE));
1389 input.send(((key(1), Some(Ok(row(1, 2))), 2), new_ts(7), Diff::ONE));
1390 input.advance_to(new_ts(11));
1391 for _ in 0..20 {
1392 worker.step();
1393 }
1394
1395 (probe.with_frontier(|f| f.to_vec()), capture)
1396 });
1397
1398 let mut emitted: Vec<_> = capture
1399 .extract()
1400 .into_iter()
1401 .flat_map(|(_cap, c)| c)
1402 .collect();
1403 differential_dataflow::consolidation::consolidate_updates(&mut emitted);
1404 (frontier, emitted)
1405 }
1406}