1use std::cell::RefCell;
11use std::cmp::Reverse;
12use std::convert::AsRef;
13use std::fmt::Debug;
14use std::hash::{Hash, Hasher};
15use std::path::PathBuf;
16use std::sync::Arc;
17
18use differential_dataflow::hashable::Hashable;
19use differential_dataflow::{AsCollection, VecCollection};
20use futures::StreamExt;
21use futures::future::FutureExt;
22use indexmap::map::Entry;
23use itertools::Itertools;
24use mz_ore::error::ErrorExt;
25use mz_repr::{Datum, DatumVec, Diff, GlobalId, Row};
26use mz_rocksdb::ValueIterator;
27use mz_sql_server_util::cdc::Lsn;
28use mz_storage_operators::metrics::BackpressureMetrics;
29use mz_storage_types::configuration::StorageConfiguration;
30use mz_storage_types::dyncfgs;
31use mz_storage_types::errors::{DataflowError, EnvelopeError, UpsertError};
32use mz_storage_types::sources::MzOffset;
33use mz_storage_types::sources::envelope::UpsertEnvelope;
34use mz_storage_types::sources::kafka::{KafkaTimestamp, RangeBound};
35use mz_storage_types::sources::mysql::GtidPartition;
36use mz_timely_util::builder_async::{
37 AsyncOutputHandle, Event as AsyncEvent, OperatorBuilder as AsyncOperatorBuilder,
38 PressOnDropButton,
39};
40use serde::{Deserialize, Serialize};
41use sha2::{Digest, Sha256};
42use timely::dataflow::channels::pact::Exchange;
43use timely::dataflow::operators::{Capability, InputCapability, Operator};
44use timely::dataflow::{Scope, StreamVec};
45use timely::order::{PartialOrder, TotalOrder};
46use timely::progress::timestamp::Refines;
47use timely::progress::{Antichain, Timestamp};
48
49use crate::healthcheck::HealthStatusUpdate;
50use crate::metrics::upsert::UpsertMetrics;
51use crate::storage_state::StorageInstanceContext;
52use crate::{upsert_continual_feedback, upsert_continual_feedback_v2};
53use types::{
54 BincodeOpts, StateValue, UpsertState, UpsertStateBackend, consolidating_merge_function,
55 upsert_bincode_opts,
56};
57
58#[cfg(test)]
59pub mod memory;
60pub(crate) mod rocksdb;
61pub(crate) mod types;
63
64pub type UpsertValue = Result<Row, Box<UpsertError>>;
65
66#[derive(
67 Copy,
68 Clone,
69 Hash,
70 PartialEq,
71 Eq,
72 PartialOrd,
73 Ord,
74 Serialize,
75 Deserialize,
76 bytemuck::AnyBitPattern,
77 bytemuck::NoUninit
78)]
79#[repr(transparent)]
80pub struct UpsertKey([u8; 32]);
81
82impl columnation::Columnation for UpsertKey {
83 type InnerRegion = columnation::CopyRegion<UpsertKey>;
84}
85
86mod columnar_upsert_key {
102 use super::UpsertKey;
103 use columnar::Columnar;
104 use mz_ore::cast::CastFrom;
105 use std::ops::Range;
106
107 #[derive(Clone, Copy, Default, Debug)]
109 pub struct UpsertKeys<T>(T);
110 impl<D, T: columnar::Push<D>> columnar::Push<D> for UpsertKeys<T> {
111 #[inline(always)]
112 fn push(&mut self, item: D) {
113 self.0.push(item)
114 }
115 }
116 impl<T: columnar::Clear> columnar::Clear for UpsertKeys<T> {
117 #[inline(always)]
118 fn clear(&mut self) {
119 self.0.clear()
120 }
121 }
122 impl<T: columnar::Len> columnar::Len for UpsertKeys<T> {
123 #[inline(always)]
124 fn len(&self) -> usize {
125 self.0.len()
126 }
127 }
128 impl<'a> columnar::Index for UpsertKeys<&'a [UpsertKey]> {
129 type Ref = &'a UpsertKey;
130
131 #[inline(always)]
132 fn get(&self, index: usize) -> Self::Ref {
133 &self.0[index]
134 }
135 }
136
137 impl Columnar for UpsertKey {
138 #[inline(always)]
139 fn into_owned<'a>(other: columnar::Ref<'a, Self>) -> Self {
140 *other
141 }
142 type Container = UpsertKeys<Vec<UpsertKey>>;
143 #[inline(always)]
144 fn reborrow<'b, 'a: 'b>(thing: columnar::Ref<'a, Self>) -> columnar::Ref<'b, Self>
145 where
146 Self: 'a,
147 {
148 thing
149 }
150 }
151
152 impl columnar::Borrow for UpsertKeys<Vec<UpsertKey>> {
153 type Ref<'a> = &'a UpsertKey;
154 type Borrowed<'a>
155 = UpsertKeys<&'a [UpsertKey]>
156 where
157 Self: 'a;
158 #[inline(always)]
159 fn borrow<'a>(&'a self) -> Self::Borrowed<'a> {
160 UpsertKeys(self.0.as_slice())
161 }
162 #[inline(always)]
163 fn reborrow<'b, 'a: 'b>(item: Self::Borrowed<'a>) -> Self::Borrowed<'b>
164 where
165 Self: 'a,
166 {
167 UpsertKeys(item.0)
168 }
169 #[inline(always)]
170 fn reborrow_ref<'b, 'a: 'b>(item: Self::Ref<'a>) -> Self::Ref<'b>
171 where
172 Self: 'a,
173 {
174 item
175 }
176 }
177
178 impl columnar::Container for UpsertKeys<Vec<UpsertKey>> {
179 #[inline(always)]
180 fn extend_from_self(&mut self, other: Self::Borrowed<'_>, range: Range<usize>) {
181 self.0.extend_from_self(other.0, range)
182 }
183 #[inline(always)]
184 fn reserve_for<'a, I>(&mut self, selves: I)
185 where
186 Self: 'a,
187 I: Iterator<Item = Self::Borrowed<'a>> + Clone,
188 {
189 self.0.reserve_for(selves.map(|s| s.0));
190 }
191 }
192
193 impl<'a> columnar::AsBytes<'a> for UpsertKeys<&'a [UpsertKey]> {
194 const SLICE_COUNT: usize = 1;
195 #[inline(always)]
196 fn get_byte_slice(&self, index: usize) -> (u64, &'a [u8]) {
197 debug_assert!(index < Self::SLICE_COUNT);
198 (
199 u64::cast_from(align_of::<UpsertKey>()),
200 bytemuck::cast_slice(self.0),
201 )
202 }
203 #[inline(always)]
204 fn as_bytes(&self) -> impl Iterator<Item = (u64, &'a [u8])> {
205 std::iter::once((
206 u64::cast_from(align_of::<UpsertKey>()),
207 bytemuck::cast_slice(self.0),
208 ))
209 }
210 }
211 impl<'a> columnar::FromBytes<'a> for UpsertKeys<&'a [UpsertKey]> {
212 const SLICE_COUNT: usize = 1;
213 #[inline(always)]
214 fn from_bytes(bytes: &mut impl Iterator<Item = &'a [u8]>) -> Self {
215 UpsertKeys(bytemuck::cast_slice(
216 bytes.next().expect("Iterator exhausted prematurely"),
217 ))
218 }
219 }
220}
221
222pub trait UpsertSourceTime
238where
239 for<'a> columnar::Ref<'a, Self::Order>: Ord,
240{
241 type Order: columnar::Columnar + Clone + Default + Ord + Send + Sync + 'static;
244 fn upsert_order(&self) -> Self::Order;
246}
247
248impl UpsertSourceTime for KafkaTimestamp {
249 type Order = (i64, u64);
255 fn upsert_order(&self) -> (i64, u64) {
256 let partition = match self.interval().lower {
257 RangeBound::NegInfinity => i64::MIN,
258 RangeBound::Elem(p, _) => i64::from(p),
259 RangeBound::PosInfinity => i64::MAX,
260 };
261 (partition, self.timestamp().offset)
262 }
263}
264
265impl UpsertSourceTime for MzOffset {
271 type Order = u64;
272 fn upsert_order(&self) -> u64 {
273 self.offset
274 }
275}
276
277macro_rules! upsert_source_time_unit {
284 ($($ty:ty),+ $(,)?) => {$(
285 impl UpsertSourceTime for $ty {
286 type Order = ();
287 fn upsert_order(&self) {
288 unreachable!(
289 "upsert source stash is not rendered for this source, but \
290 {} reached the projection",
291 std::any::type_name::<Self>(),
292 )
293 }
294 }
295 )+};
296}
297upsert_source_time_unit!(GtidPartition, Lsn);
298
299pub mod upsert_stash_pager {
311 use std::sync::{LazyLock, RwLock};
312
313 use mz_timely_util::column_pager::{ColumnPager, shared_pager};
314
315 static PAGER: LazyLock<RwLock<ColumnPager>> =
318 LazyLock::new(|| RwLock::new(ColumnPager::disabled()));
319
320 pub fn set_enabled(enabled: bool) {
324 *PAGER.write().expect("upsert stash pager poisoned") = shared_pager(enabled);
325 }
326
327 pub fn pager() -> ColumnPager {
329 PAGER.read().expect("upsert stash pager poisoned").clone()
330 }
331}
332
333impl Debug for UpsertKey {
334 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
335 write!(f, "0x")?;
336 for byte in self.0 {
337 write!(f, "{:02x}", byte)?;
338 }
339 Ok(())
340 }
341}
342
343impl AsRef<[u8]> for UpsertKey {
344 #[inline(always)]
345 fn as_ref(&self) -> &[u8] {
349 &self.0
350 }
351}
352
353impl From<&[u8]> for UpsertKey {
354 fn from(bytes: &[u8]) -> Self {
355 UpsertKey(bytes.try_into().expect("invalid key length"))
356 }
357}
358
359type KeyHash = Sha256;
364
365impl UpsertKey {
366 pub fn from_key(key: Result<&Row, &UpsertError>) -> Self {
367 Self::from_iter(key.map(|r| r.iter()))
368 }
369
370 pub fn from_value(value: Result<&Row, &UpsertError>, key_indices: &[usize]) -> Self {
371 thread_local! {
372 static VALUE_DATUMS: RefCell<DatumVec> = RefCell::new(DatumVec::new());
374 }
375 VALUE_DATUMS.with(|value_datums| {
376 let mut value_datums = value_datums.borrow_mut();
377 let value = value.map(|v| value_datums.borrow_with(v));
378 let key = match value {
379 Ok(ref datums) => Ok(key_indices.iter().map(|&idx| datums[idx])),
380 Err(err) => Err(err),
381 };
382 Self::from_iter(key)
383 })
384 }
385
386 pub fn from_iter<'a, 'b>(
387 key: Result<impl Iterator<Item = Datum<'a>> + 'b, &UpsertError>,
388 ) -> Self {
389 thread_local! {
390 static KEY_DATUMS: RefCell<DatumVec> = RefCell::new(DatumVec::new());
392 }
393 KEY_DATUMS.with(|key_datums| {
394 let mut key_datums = key_datums.borrow_mut();
395 let mut key_datums = key_datums.borrow();
398 let key: Result<&[Datum], Datum> = match key {
399 Ok(key) => {
400 for datum in key {
401 key_datums.push(datum);
402 }
403 Ok(&*key_datums)
404 }
405 Err(UpsertError::Value(err)) => {
406 key_datums.extend(err.for_key.iter());
407 Ok(&*key_datums)
408 }
409 Err(UpsertError::KeyDecode(err)) => Err(Datum::Bytes(&err.raw)),
410 Err(UpsertError::NullKey(_)) => Err(Datum::Null),
411 };
412 let mut hasher = DigestHasher(KeyHash::new());
413 key.hash(&mut hasher);
414 Self(hasher.0.finalize().into())
415 })
416 }
417}
418
419struct DigestHasher<H: Digest>(H);
420
421impl<H: Digest> Hasher for DigestHasher<H> {
422 fn write(&mut self, bytes: &[u8]) {
423 self.0.update(bytes);
424 }
425
426 fn finish(&self) -> u64 {
427 panic!("digest wrapper used to produce a hash");
428 }
429}
430
431use std::convert::Infallible;
432use timely::container::CapacityContainerBuilder;
433use timely::dataflow::channels::pact::Pipeline;
434
435use self::types::ValueMetadata;
436
437pub fn rehydration_finished<'scope, T: Timestamp>(
441 scope: Scope<'scope, T>,
442 source_config: &crate::source::RawSourceCreationConfig,
443 token: impl std::any::Any + 'static,
445 resume_upper: Antichain<T>,
446 input: StreamVec<'scope, T, Infallible>,
447) {
448 let worker_id = source_config.worker_id;
449 let id = source_config.id;
450 let mut builder = AsyncOperatorBuilder::new(format!("rehydration_finished({id}"), scope);
451 let mut input = builder.new_disconnected_input(input, Pipeline);
452
453 builder.build(move |_capabilities| async move {
454 let mut input_upper = Antichain::from_elem(Timestamp::minimum());
455 while !PartialOrder::less_equal(&resume_upper, &input_upper) {
457 let Some(event) = input.next().await else {
458 break;
459 };
460 if let AsyncEvent::Progress(upper) = event {
461 input_upper = upper;
462 }
463 }
464 tracing::info!(
465 %worker_id,
466 source_id = %id,
467 "upsert source has downgraded past the resume upper ({resume_upper:?}) across all workers",
468 );
469 drop(token);
470 });
471}
472
473pub(crate) fn upsert<'scope, T, FromTime>(
479 input: VecCollection<'scope, T, (UpsertKey, Option<UpsertValue>, FromTime), Diff>,
480 upsert_envelope: UpsertEnvelope,
481 resume_upper: Antichain<T>,
482 previous: VecCollection<'scope, T, Result<Row, DataflowError>, Diff>,
483 previous_token: Option<Vec<PressOnDropButton>>,
484 source_config: crate::source::SourceExportCreationConfig,
485 instance_context: &StorageInstanceContext,
486 storage_configuration: &StorageConfiguration,
487 dataflow_paramters: &crate::internal_control::DataflowParameters,
488 backpressure_metrics: Option<BackpressureMetrics>,
489) -> (
490 VecCollection<'scope, T, Result<Row, DataflowError>, Diff>,
491 StreamVec<'scope, T, (Option<GlobalId>, HealthStatusUpdate)>,
492 StreamVec<'scope, T, Infallible>,
493 PressOnDropButton,
494)
495where
496 T: Timestamp + TotalOrder + Sync,
497 T: Refines<mz_repr::Timestamp> + TotalOrder + Sync,
498 FromTime: Timestamp + Clone + Sync,
499{
500 let upsert_metrics = source_config.metrics.get_upsert_metrics(
501 source_config.id,
502 source_config.worker_id,
503 backpressure_metrics,
504 );
505
506 let rocksdb_cleanup_tries =
507 dyncfgs::STORAGE_ROCKSDB_CLEANUP_TRIES.get(storage_configuration.config_set());
508
509 let prevent_snapshot_buffering =
512 dyncfgs::STORAGE_UPSERT_PREVENT_SNAPSHOT_BUFFERING.get(storage_configuration.config_set());
513 let snapshot_buffering_max = dyncfgs::STORAGE_UPSERT_MAX_SNAPSHOT_BATCH_BUFFERING
515 .get(storage_configuration.config_set());
516
517 let rocksdb_use_native_merge_operator =
520 dyncfgs::STORAGE_ROCKSDB_USE_MERGE_OPERATOR.get(storage_configuration.config_set());
521
522 let upsert_config = UpsertConfig {
523 shrink_upsert_unused_buffers_by_ratio: storage_configuration
524 .parameters
525 .shrink_upsert_unused_buffers_by_ratio,
526 };
527
528 let thin_input = upsert_thinning(input);
529
530 let tuning = dataflow_paramters.upsert_rocksdb_tuning_config.clone();
531
532 let rocksdb_dir = instance_context
537 .scratch_directory
538 .clone()
539 .unwrap_or_else(|| PathBuf::from("/tmp"))
540 .join("storage")
541 .join("upsert")
542 .join(source_config.id.to_string())
543 .join(source_config.worker_id.to_string());
544
545 tracing::info!(
546 worker_id = %source_config.worker_id,
547 source_id = %source_config.id,
548 ?rocksdb_dir,
549 ?tuning,
550 ?rocksdb_use_native_merge_operator,
551 "rendering upsert source"
552 );
553
554 let rocksdb_shared_metrics = Arc::clone(&upsert_metrics.rocksdb_shared);
555 let rocksdb_instance_metrics = Arc::clone(&upsert_metrics.rocksdb_instance_metrics);
556
557 let env = instance_context.rocksdb_env.clone();
558
559 let rocksdb_init_fn = move || async move {
561 let merge_operator = if rocksdb_use_native_merge_operator {
562 Some((
563 "upsert_state_snapshot_merge_v1".to_string(),
564 |a: &[u8], b: ValueIterator<BincodeOpts, StateValue<T, FromTime>>| {
565 consolidating_merge_function::<T, FromTime>(a.into(), b)
566 },
567 ))
568 } else {
569 None
570 };
571 rocksdb::RocksDB::new(
572 mz_rocksdb::RocksDBInstance::new(
573 &rocksdb_dir,
574 mz_rocksdb::InstanceOptions::new(
575 env,
576 rocksdb_cleanup_tries,
577 merge_operator,
578 upsert_bincode_opts(),
581 ),
582 tuning,
583 rocksdb_shared_metrics,
584 rocksdb_instance_metrics,
585 )
586 .unwrap(),
587 )
588 };
589
590 upsert_operator(
591 thin_input,
592 upsert_envelope.key_indices,
593 resume_upper,
594 previous,
595 previous_token,
596 upsert_metrics,
597 source_config,
598 rocksdb_init_fn,
599 upsert_config,
600 storage_configuration,
601 prevent_snapshot_buffering,
602 snapshot_buffering_max,
603 )
604}
605
606pub(crate) fn upsert_v2<'scope, T, FromTime>(
613 input: VecCollection<'scope, T, (UpsertKey, Option<UpsertValue>, FromTime), Diff>,
614 upsert_envelope: UpsertEnvelope,
615 resume_upper: Antichain<T>,
616 previous: VecCollection<'scope, T, Result<Row, DataflowError>, Diff>,
617 previous_token: Option<Vec<PressOnDropButton>>,
618 source_config: crate::source::SourceExportCreationConfig,
619 backpressure_metrics: Option<BackpressureMetrics>,
620) -> (
621 VecCollection<'scope, T, Result<Row, DataflowError>, Diff>,
622 StreamVec<'scope, T, (Option<GlobalId>, HealthStatusUpdate)>,
623 StreamVec<'scope, T, Infallible>,
624 PressOnDropButton,
625)
626where
627 T: Timestamp + TotalOrder + Sync,
628 T: Refines<mz_repr::Timestamp> + differential_dataflow::lattice::Lattice,
629 T: columnation::Columnation,
630 T: columnar::Columnar + Default,
631 for<'a> columnar::Ref<'a, T>: Copy + Ord,
632 FromTime: Timestamp + Clone + Sync,
633 FromTime: UpsertSourceTime,
634{
635 let upsert_metrics = source_config.metrics.get_upsert_metrics(
636 source_config.id,
637 source_config.worker_id,
638 backpressure_metrics,
639 );
640
641 let thin_input = upsert_thinning(input);
642
643 tracing::info!(
644 worker_id = %source_config.worker_id,
645 source_id = %source_config.id,
646 "rendering upsert source (btreemap backend)"
647 );
648
649 upsert_continual_feedback_v2::upsert_inner(
650 thin_input,
651 upsert_envelope.key_indices,
652 resume_upper,
653 previous,
654 previous_token,
655 upsert_metrics,
656 source_config,
657 )
658}
659
660fn upsert_operator<'scope, T, FromTime, F, Fut, US>(
663 input: VecCollection<'scope, T, (UpsertKey, Option<UpsertValue>, FromTime), Diff>,
664 key_indices: Vec<usize>,
665 resume_upper: Antichain<T>,
666 persist_input: VecCollection<'scope, T, Result<Row, DataflowError>, Diff>,
667 persist_token: Option<Vec<PressOnDropButton>>,
668 upsert_metrics: UpsertMetrics,
669 source_config: crate::source::SourceExportCreationConfig,
670 state: F,
671 upsert_config: UpsertConfig,
672 _storage_configuration: &StorageConfiguration,
673 prevent_snapshot_buffering: bool,
674 snapshot_buffering_max: Option<usize>,
675) -> (
676 VecCollection<'scope, T, Result<Row, DataflowError>, Diff>,
677 StreamVec<'scope, T, (Option<GlobalId>, HealthStatusUpdate)>,
678 StreamVec<'scope, T, Infallible>,
679 PressOnDropButton,
680)
681where
682 T: Timestamp + TotalOrder + Sync,
683 T: Refines<mz_repr::Timestamp> + TotalOrder + Sync,
684 F: FnOnce() -> Fut + 'static,
685 Fut: std::future::Future<Output = US>,
686 US: UpsertStateBackend<T, FromTime>,
687 FromTime: Debug + timely::ExchangeData + Clone + Ord + Sync,
688{
689 let use_continual_feedback_upsert = true;
693
694 tracing::info!(id = %source_config.id, %use_continual_feedback_upsert, "upsert operator implementation");
695
696 if use_continual_feedback_upsert {
697 upsert_continual_feedback::upsert_inner(
698 input,
699 key_indices,
700 resume_upper,
701 persist_input,
702 persist_token,
703 upsert_metrics,
704 source_config,
705 state,
706 upsert_config,
707 prevent_snapshot_buffering,
708 snapshot_buffering_max,
709 )
710 } else {
711 upsert_classic(
712 input,
713 key_indices,
714 resume_upper,
715 persist_input,
716 persist_token,
717 upsert_metrics,
718 source_config,
719 state,
720 upsert_config,
721 prevent_snapshot_buffering,
722 snapshot_buffering_max,
723 )
724 }
725}
726
727fn upsert_thinning<'scope, T, K, V, FromTime>(
732 input: VecCollection<'scope, T, (K, V, FromTime), Diff>,
733) -> VecCollection<'scope, T, (K, V, FromTime), Diff>
734where
735 T: Timestamp + TotalOrder,
736 K: timely::ExchangeData + Clone + Eq + Ord,
737 V: timely::ExchangeData + Clone,
738 FromTime: Timestamp,
739{
740 input
741 .inner
742 .unary(Pipeline, "UpsertThinning", |_, _| {
743 let mut capability: Option<InputCapability<T>> = None;
745 let mut updates = Vec::new();
747 move |input, output| {
748 input.for_each(|cap, data| {
749 assert!(
750 data.iter().all(|(_, _, diff)| diff.is_positive()),
751 "invalid upsert input"
752 );
753 updates.append(data);
754 match capability.as_mut() {
755 Some(capability) => {
756 if cap.time() <= capability.time() {
757 *capability = cap;
758 }
759 }
760 None => capability = Some(cap),
761 }
762 });
763 if let Some(capability) = capability.take() {
764 updates.sort_unstable_by(|a, b| {
767 let ((key1, _, from_time1), time1, _) = a;
768 let ((key2, _, from_time2), time2, _) = b;
769 Ord::cmp(
770 &(key1, time1, Reverse(from_time1)),
771 &(key2, time2, Reverse(from_time2)),
772 )
773 });
774 let mut session = output.session(&capability);
775 session.give_iterator(updates.drain(..).dedup_by(|a, b| {
776 let ((key1, _, _), time1, _) = a;
777 let ((key2, _, _), time2, _) = b;
778 (key1, time1) == (key2, time2)
779 }))
780 }
781 }
782 })
783 .as_collection()
784}
785
786fn stage_input<T, FromTime>(
789 stash: &mut Vec<(T, UpsertKey, Reverse<FromTime>, Option<UpsertValue>)>,
790 data: &mut Vec<((UpsertKey, Option<UpsertValue>, FromTime), T, Diff)>,
791 input_upper: &Antichain<T>,
792 resume_upper: &Antichain<T>,
793 storage_shrink_upsert_unused_buffers_by_ratio: usize,
794) where
795 T: PartialOrder,
796 FromTime: Ord,
797{
798 if PartialOrder::less_equal(input_upper, resume_upper) {
799 data.retain(|(_, ts, _)| resume_upper.less_equal(ts));
800 }
801
802 stash.extend(data.drain(..).map(|((key, value, order), time, diff)| {
803 assert!(diff.is_positive(), "invalid upsert input");
804 (time, key, Reverse(order), value)
805 }));
806
807 if storage_shrink_upsert_unused_buffers_by_ratio > 0 {
808 let reduced_capacity = stash.capacity() / storage_shrink_upsert_unused_buffers_by_ratio;
809 if reduced_capacity > stash.len() {
810 stash.shrink_to(reduced_capacity);
811 }
812 }
813}
814
815#[derive(Debug)]
818enum DrainStyle<'a, T> {
819 ToUpper(&'a Antichain<T>),
820 AtTime(T),
821}
822
823async fn drain_staged_input<S, T, FromTime, E>(
826 stash: &mut Vec<(T, UpsertKey, Reverse<FromTime>, Option<UpsertValue>)>,
827 commands_state: &mut indexmap::IndexMap<UpsertKey, types::UpsertValueAndSize<T, FromTime>>,
828 output_updates: &mut Vec<(UpsertValue, T, Diff)>,
829 multi_get_scratch: &mut Vec<UpsertKey>,
830 drain_style: DrainStyle<'_, T>,
831 error_emitter: &mut E,
832 state: &mut UpsertState<'_, S, T, FromTime>,
833 source_config: &crate::source::SourceExportCreationConfig,
834) where
835 S: UpsertStateBackend<T, FromTime>,
836 T: PartialOrder + Ord + Clone + Send + Sync + Serialize + Debug + 'static,
837 FromTime: timely::ExchangeData + Clone + Ord + Sync,
838 E: UpsertErrorEmitter<T>,
839{
840 stash.sort_unstable();
841
842 let idx = stash.partition_point(|(ts, _, _, _)| match &drain_style {
844 DrainStyle::ToUpper(upper) => !upper.less_equal(ts),
845 DrainStyle::AtTime(time) => ts <= time,
846 });
847
848 tracing::trace!(?drain_style, updates = idx, "draining stash in upsert");
849
850 commands_state.clear();
853 for (_, key, _, _) in stash.iter().take(idx) {
854 commands_state.entry(*key).or_default();
855 }
856
857 multi_get_scratch.clear();
860 multi_get_scratch.extend(commands_state.iter().map(|(k, _)| *k));
861 match state
862 .multi_get(multi_get_scratch.drain(..), commands_state.values_mut())
863 .await
864 {
865 Ok(_) => {}
866 Err(e) => {
867 error_emitter
868 .emit("Failed to fetch records from state".to_string(), e)
869 .await;
870 }
871 }
872
873 let mut commands = stash.drain(..idx).dedup_by(|a, b| {
877 let ((a_ts, a_key, _, _), (b_ts, b_key, _, _)) = (a, b);
878 a_ts == b_ts && a_key == b_key
879 });
880
881 let bincode_opts = types::upsert_bincode_opts();
882 while let Some((ts, key, from_time, value)) = commands.next() {
895 let mut command_state = if let Entry::Occupied(command_state) = commands_state.entry(key) {
896 command_state
897 } else {
898 panic!("key missing from commands_state");
899 };
900
901 let existing_value = &mut command_state.get_mut().value;
902
903 if let Some(cs) = existing_value.as_mut() {
904 cs.ensure_decoded(bincode_opts, source_config.id, Some(&key));
905 }
906
907 let existing_order = existing_value
911 .as_ref()
912 .and_then(|cs| cs.provisional_order(&ts));
913 if existing_order >= Some(&from_time.0) {
914 continue;
919 }
920
921 match value {
922 Some(value) => {
923 if let Some(old_value) =
924 existing_value.replace(StateValue::finalized_value(value.clone()))
925 {
926 if let Some(old_value) = old_value.into_decoded().finalized {
927 output_updates.push((old_value, ts.clone(), Diff::MINUS_ONE));
928 }
929 }
930 output_updates.push((value, ts, Diff::ONE));
931 }
932 None => {
933 if let Some(old_value) = existing_value.take() {
934 if let Some(old_value) = old_value.into_decoded().finalized {
935 output_updates.push((old_value, ts, Diff::MINUS_ONE));
936 }
937 }
938
939 *existing_value = Some(StateValue::tombstone());
941 }
942 }
943 }
944
945 match state
946 .multi_put(
947 true, commands_state.drain(..).map(|(k, cv)| {
949 (
950 k,
951 types::PutValue {
952 value: cv.value.map(|cv| cv.into_decoded()),
953 previous_value_metadata: cv.metadata.map(|v| ValueMetadata {
954 size: v.size.try_into().expect("less than i64 size"),
955 is_tombstone: v.is_tombstone,
956 }),
957 },
958 )
959 }),
960 )
961 .await
962 {
963 Ok(_) => {}
964 Err(e) => {
965 error_emitter
966 .emit("Failed to update records in state".to_string(), e)
967 .await;
968 }
969 }
970}
971
972pub(crate) struct UpsertConfig {
975 pub shrink_upsert_unused_buffers_by_ratio: usize,
976}
977
978fn upsert_classic<'scope, T, FromTime, F, Fut, US>(
979 input: VecCollection<'scope, T, (UpsertKey, Option<UpsertValue>, FromTime), Diff>,
980 key_indices: Vec<usize>,
981 resume_upper: Antichain<T>,
982 previous: VecCollection<'scope, T, Result<Row, DataflowError>, Diff>,
983 previous_token: Option<Vec<PressOnDropButton>>,
984 upsert_metrics: UpsertMetrics,
985 source_config: crate::source::SourceExportCreationConfig,
986 state: F,
987 upsert_config: UpsertConfig,
988 prevent_snapshot_buffering: bool,
989 snapshot_buffering_max: Option<usize>,
990) -> (
991 VecCollection<'scope, T, Result<Row, DataflowError>, Diff>,
992 StreamVec<'scope, T, (Option<GlobalId>, HealthStatusUpdate)>,
993 StreamVec<'scope, T, Infallible>,
994 PressOnDropButton,
995)
996where
997 T: Timestamp + TotalOrder + Sync,
998 F: FnOnce() -> Fut + 'static,
999 Fut: std::future::Future<Output = US>,
1000 US: UpsertStateBackend<T, FromTime>,
1001 FromTime: timely::ExchangeData + Clone + Ord + Sync,
1002{
1003 let mut builder = AsyncOperatorBuilder::new("Upsert".to_string(), input.scope());
1004
1005 let previous = previous.flat_map(move |result| {
1007 let value = match result {
1008 Ok(ok) => Ok(ok),
1009 Err(DataflowError::EnvelopeError(err)) => match *err {
1010 EnvelopeError::Upsert(err) => Err(Box::new(err)),
1011 EnvelopeError::Flat(_) => return None,
1012 },
1013 Err(_) => return None,
1014 };
1015 let value_ref = match value {
1016 Ok(ref row) => Ok(row),
1017 Err(ref err) => Err(&**err),
1018 };
1019 Some((UpsertKey::from_value(value_ref, &key_indices), value))
1020 });
1021 let (output_handle, output) = builder.new_output();
1022
1023 let (_snapshot_handle, snapshot_stream) =
1026 builder.new_output::<CapacityContainerBuilder<Vec<Infallible>>>();
1027
1028 let (mut health_output, health_stream) = builder.new_output();
1029 let mut input = builder.new_input_for(
1030 input.inner,
1031 Exchange::new(move |((key, _, _), _, _)| UpsertKey::hashed(key)),
1032 &output_handle,
1033 );
1034
1035 let mut previous = builder.new_input_for(
1036 previous.inner,
1037 Exchange::new(|((key, _), _, _)| UpsertKey::hashed(key)),
1038 &output_handle,
1039 );
1040
1041 let upsert_shared_metrics = Arc::clone(&upsert_metrics.shared);
1042 let shutdown_button = builder.build(move |caps| async move {
1043 let [mut output_cap, mut snapshot_cap, health_cap]: [_; 3] = caps.try_into().unwrap();
1044
1045 let mut state = UpsertState::<_, _, FromTime>::new(
1046 state().await,
1047 upsert_shared_metrics,
1048 &upsert_metrics,
1049 source_config.source_statistics.clone(),
1050 upsert_config.shrink_upsert_unused_buffers_by_ratio,
1051 );
1052 let mut events = vec![];
1053 let mut snapshot_upper = Antichain::from_elem(Timestamp::minimum());
1054
1055 let mut stash = vec![];
1056
1057 let mut error_emitter = (&mut health_output, &health_cap);
1058
1059 tracing::info!(
1060 ?resume_upper,
1061 ?snapshot_upper,
1062 "timely-{} upsert source {} starting rehydration",
1063 source_config.worker_id,
1064 source_config.id
1065 );
1066 while !PartialOrder::less_equal(&resume_upper, &snapshot_upper) {
1069 previous.ready().await;
1070 while let Some(event) = previous.next_sync() {
1071 match event {
1072 AsyncEvent::Data(_cap, data) => {
1073 events.extend(data.into_iter().filter_map(|((key, value), ts, diff)| {
1074 if !resume_upper.less_equal(&ts) {
1075 Some((key, value, diff))
1076 } else {
1077 None
1078 }
1079 }))
1080 }
1081 AsyncEvent::Progress(upper) => {
1082 snapshot_upper = upper;
1083 }
1084 };
1085 }
1086
1087 match state
1088 .consolidate_chunk(
1089 events.drain(..),
1090 PartialOrder::less_equal(&resume_upper, &snapshot_upper),
1091 )
1092 .await
1093 {
1094 Ok(_) => {
1095 if let Some(ts) = snapshot_upper.clone().into_option() {
1096 if !resume_upper.less_equal(&ts) {
1100 snapshot_cap.downgrade(&ts);
1101 output_cap.downgrade(&ts);
1102 }
1103 }
1104 }
1105 Err(e) => {
1106 UpsertErrorEmitter::<T>::emit(
1107 &mut error_emitter,
1108 "Failed to rehydrate state".to_string(),
1109 e,
1110 )
1111 .await;
1112 }
1113 }
1114 }
1115
1116 drop(events);
1117 drop(previous_token);
1118 drop(snapshot_cap);
1119
1120 while let Some(_event) = previous.next().await {}
1126
1127 if let Some(ts) = resume_upper.as_option() {
1129 output_cap.downgrade(ts);
1130 }
1131
1132 tracing::info!(
1133 "timely-{} upsert source {} finished rehydration",
1134 source_config.worker_id,
1135 source_config.id
1136 );
1137
1138 let mut commands_state: indexmap::IndexMap<_, types::UpsertValueAndSize<T, FromTime>> =
1141 indexmap::IndexMap::new();
1142 let mut multi_get_scratch = Vec::new();
1143
1144 let mut output_updates = vec![];
1146 let mut input_upper = Antichain::from_elem(Timestamp::minimum());
1147
1148 while let Some(event) = input.next().await {
1149 let events = [event]
1152 .into_iter()
1153 .chain(std::iter::from_fn(|| input.next().now_or_never().flatten()))
1154 .enumerate();
1155
1156 let mut partial_drain_time = None;
1157 for (i, event) in events {
1158 match event {
1159 AsyncEvent::Data(cap, mut data) => {
1160 tracing::trace!(
1161 time=?cap.time(),
1162 updates=%data.len(),
1163 "received data in upsert"
1164 );
1165 stage_input(
1166 &mut stash,
1167 &mut data,
1168 &input_upper,
1169 &resume_upper,
1170 upsert_config.shrink_upsert_unused_buffers_by_ratio,
1171 );
1172
1173 let event_time = cap.time();
1174 if prevent_snapshot_buffering && output_cap.time() == event_time {
1181 partial_drain_time = Some(event_time.clone());
1182 }
1183 }
1184 AsyncEvent::Progress(upper) => {
1185 tracing::trace!(?upper, "received progress in upsert");
1186 if PartialOrder::less_than(&upper, &resume_upper) {
1189 continue;
1190 }
1191
1192 partial_drain_time = None;
1195 drain_staged_input::<_, _, _, _>(
1196 &mut stash,
1197 &mut commands_state,
1198 &mut output_updates,
1199 &mut multi_get_scratch,
1200 DrainStyle::ToUpper(&upper),
1201 &mut error_emitter,
1202 &mut state,
1203 &source_config,
1204 )
1205 .await;
1206
1207 output_handle.give_container(&output_cap, &mut output_updates);
1208
1209 if let Some(ts) = upper.as_option() {
1210 output_cap.downgrade(ts);
1211 }
1212 input_upper = upper;
1213 }
1214 }
1215 let events_processed = i + 1;
1216 if let Some(max) = snapshot_buffering_max {
1217 if events_processed >= max {
1218 break;
1219 }
1220 }
1221 }
1222
1223 if let Some(partial_drain_time) = partial_drain_time {
1232 drain_staged_input::<_, _, _, _>(
1233 &mut stash,
1234 &mut commands_state,
1235 &mut output_updates,
1236 &mut multi_get_scratch,
1237 DrainStyle::AtTime(partial_drain_time),
1238 &mut error_emitter,
1239 &mut state,
1240 &source_config,
1241 )
1242 .await;
1243
1244 output_handle.give_container(&output_cap, &mut output_updates);
1245 }
1246 }
1247 });
1248
1249 (
1250 output.as_collection().map(|result| match result {
1251 Ok(ok) => Ok(ok),
1252 Err(err) => Err(DataflowError::from(EnvelopeError::Upsert(*err))),
1253 }),
1254 health_stream,
1255 snapshot_stream,
1256 shutdown_button.press_on_drop(),
1257 )
1258}
1259
1260#[async_trait::async_trait(?Send)]
1261pub(crate) trait UpsertErrorEmitter<T> {
1262 async fn emit(&mut self, context: String, e: anyhow::Error);
1263}
1264
1265#[async_trait::async_trait(?Send)]
1266impl<T: Timestamp> UpsertErrorEmitter<T>
1267 for (
1268 &mut AsyncOutputHandle<
1269 T,
1270 CapacityContainerBuilder<Vec<(Option<GlobalId>, HealthStatusUpdate)>>,
1271 >,
1272 &Capability<T>,
1273 )
1274{
1275 async fn emit(&mut self, context: String, e: anyhow::Error) {
1276 process_upsert_state_error::<T>(context, e, self.0, self.1).await
1277 }
1278}
1279
1280async fn process_upsert_state_error<T: Timestamp>(
1282 context: String,
1283 e: anyhow::Error,
1284 health_output: &AsyncOutputHandle<
1285 T,
1286 CapacityContainerBuilder<Vec<(Option<GlobalId>, HealthStatusUpdate)>>,
1287 >,
1288 health_cap: &Capability<T>,
1289) {
1290 let update = HealthStatusUpdate::halting(e.context(context).to_string_with_causes(), None);
1291 health_output.give(health_cap, (None, update));
1292 std::future::pending::<()>().await;
1293 unreachable!("pending future never returns");
1294}