1use std::cell::RefCell;
11use std::cmp::Reverse;
12use std::convert::AsRef;
13use std::fmt::Debug;
14use std::hash::{Hash, Hasher};
15use std::sync::Arc;
16
17use differential_dataflow::hashable::Hashable;
18use differential_dataflow::{AsCollection, Collection};
19use futures::StreamExt;
20use futures::future::FutureExt;
21use indexmap::map::Entry;
22use itertools::Itertools;
23use mz_ore::error::ErrorExt;
24use mz_repr::{Datum, DatumVec, Diff, GlobalId, Row};
25use mz_rocksdb::ValueIterator;
26use mz_storage_operators::metrics::BackpressureMetrics;
27use mz_storage_types::configuration::StorageConfiguration;
28use mz_storage_types::dyncfgs;
29use mz_storage_types::errors::{DataflowError, EnvelopeError, UpsertError};
30use mz_storage_types::sources::envelope::UpsertEnvelope;
31use mz_timely_util::builder_async::{
32 AsyncOutputHandle, Event as AsyncEvent, OperatorBuilder as AsyncOperatorBuilder,
33 PressOnDropButton,
34};
35use serde::{Deserialize, Serialize};
36use sha2::{Digest, Sha256};
37use timely::dataflow::channels::pact::Exchange;
38use timely::dataflow::channels::pushers::Tee;
39use timely::dataflow::operators::{Capability, InputCapability, Operator};
40use timely::dataflow::{Scope, ScopeParent, Stream};
41use timely::order::{PartialOrder, TotalOrder};
42use timely::progress::timestamp::Refines;
43use timely::progress::{Antichain, Timestamp};
44
45use crate::healthcheck::HealthStatusUpdate;
46use crate::metrics::upsert::UpsertMetrics;
47use crate::storage_state::StorageInstanceContext;
48use crate::upsert_continual_feedback;
49use autospill::AutoSpillBackend;
50use memory::InMemoryHashMap;
51use types::{
52 BincodeOpts, StateValue, UpsertState, UpsertStateBackend, Value, consolidating_merge_function,
53 upsert_bincode_opts,
54};
55
56mod autospill;
57mod memory;
58mod rocksdb;
59pub(crate) mod types;
61
62pub type UpsertValue = Result<Row, UpsertError>;
63
64#[derive(Copy, Clone, Debug, Hash, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
65pub struct UpsertKey([u8; 32]);
66
67impl AsRef<[u8]> for UpsertKey {
68 #[inline(always)]
69 fn as_ref(&self) -> &[u8] {
73 &self.0
74 }
75}
76
77impl From<&[u8]> for UpsertKey {
78 fn from(bytes: &[u8]) -> Self {
79 UpsertKey(bytes.try_into().expect("invalid key length"))
80 }
81}
82
83type KeyHash = Sha256;
88
89impl UpsertKey {
90 pub fn from_key(key: Result<&Row, &UpsertError>) -> Self {
91 Self::from_iter(key.map(|r| r.iter()))
92 }
93
94 pub fn from_value(value: Result<&Row, &UpsertError>, key_indices: &[usize]) -> Self {
95 thread_local! {
96 static VALUE_DATUMS: RefCell<DatumVec> = RefCell::new(DatumVec::new());
98 }
99 VALUE_DATUMS.with(|value_datums| {
100 let mut value_datums = value_datums.borrow_mut();
101 let value = value.map(|v| value_datums.borrow_with(v));
102 let key = match value {
103 Ok(ref datums) => Ok(key_indices.iter().map(|&idx| datums[idx])),
104 Err(err) => Err(err),
105 };
106 Self::from_iter(key)
107 })
108 }
109
110 pub fn from_iter<'a, 'b>(
111 key: Result<impl Iterator<Item = Datum<'a>> + 'b, &UpsertError>,
112 ) -> Self {
113 thread_local! {
114 static KEY_DATUMS: RefCell<DatumVec> = RefCell::new(DatumVec::new());
116 }
117 KEY_DATUMS.with(|key_datums| {
118 let mut key_datums = key_datums.borrow_mut();
119 let mut key_datums = key_datums.borrow();
122 let key: Result<&[Datum], Datum> = match key {
123 Ok(key) => {
124 for datum in key {
125 key_datums.push(datum);
126 }
127 Ok(&*key_datums)
128 }
129 Err(UpsertError::Value(err)) => {
130 key_datums.extend(err.for_key.iter());
131 Ok(&*key_datums)
132 }
133 Err(UpsertError::KeyDecode(err)) => Err(Datum::Bytes(&err.raw)),
134 Err(UpsertError::NullKey(_)) => Err(Datum::Null),
135 };
136 let mut hasher = DigestHasher(KeyHash::new());
137 key.hash(&mut hasher);
138 Self(hasher.0.finalize().into())
139 })
140 }
141}
142
143struct DigestHasher<H: Digest>(H);
144
145impl<H: Digest> Hasher for DigestHasher<H> {
146 fn write(&mut self, bytes: &[u8]) {
147 self.0.update(bytes);
148 }
149
150 fn finish(&self) -> u64 {
151 panic!("digest wrapper used to produce a hash");
152 }
153}
154
155use std::convert::Infallible;
156use timely::container::CapacityContainerBuilder;
157use timely::dataflow::channels::pact::Pipeline;
158
159use self::types::ValueMetadata;
160
161pub fn rehydration_finished<G, T>(
165 scope: G,
166 source_config: &crate::source::RawSourceCreationConfig,
167 token: impl std::any::Any + 'static,
169 resume_upper: Antichain<T>,
170 input: &Stream<G, Infallible>,
171) where
172 G: Scope<Timestamp = T>,
173 T: Timestamp,
174{
175 let worker_id = source_config.worker_id;
176 let id = source_config.id;
177 let mut builder = AsyncOperatorBuilder::new(format!("rehydration_finished({id}"), scope);
178 let mut input = builder.new_disconnected_input(input, Pipeline);
179
180 builder.build(move |_capabilities| async move {
181 let mut input_upper = Antichain::from_elem(Timestamp::minimum());
182 while !PartialOrder::less_equal(&resume_upper, &input_upper) {
184 let Some(event) = input.next().await else {
185 break;
186 };
187 if let AsyncEvent::Progress(upper) = event {
188 input_upper = upper;
189 }
190 }
191 tracing::info!(
192 %worker_id,
193 source_id = %id,
194 "upsert source has downgraded past the resume upper ({resume_upper:?}) across all workers",
195 );
196 drop(token);
197 });
198}
199
200pub(crate) fn upsert<G: Scope, FromTime>(
206 input: &Collection<G, (UpsertKey, Option<UpsertValue>, FromTime), Diff>,
207 upsert_envelope: UpsertEnvelope,
208 resume_upper: Antichain<G::Timestamp>,
209 previous: Collection<G, Result<Row, DataflowError>, Diff>,
210 previous_token: Option<Vec<PressOnDropButton>>,
211 source_config: crate::source::SourceExportCreationConfig,
212 instance_context: &StorageInstanceContext,
213 storage_configuration: &StorageConfiguration,
214 dataflow_paramters: &crate::internal_control::DataflowParameters,
215 backpressure_metrics: Option<BackpressureMetrics>,
216) -> (
217 Collection<G, Result<Row, DataflowError>, Diff>,
218 Stream<G, (Option<GlobalId>, HealthStatusUpdate)>,
219 Stream<G, Infallible>,
220 PressOnDropButton,
221)
222where
223 G::Timestamp: TotalOrder + Sync,
224 G::Timestamp: Refines<mz_repr::Timestamp> + TotalOrder + Sync,
225 FromTime: Timestamp + Sync,
226{
227 let upsert_metrics = source_config.metrics.get_upsert_metrics(
228 source_config.id,
229 source_config.worker_id,
230 backpressure_metrics,
231 );
232
233 let rocksdb_cleanup_tries =
234 dyncfgs::STORAGE_ROCKSDB_CLEANUP_TRIES.get(storage_configuration.config_set());
235
236 let prevent_snapshot_buffering =
239 dyncfgs::STORAGE_UPSERT_PREVENT_SNAPSHOT_BUFFERING.get(storage_configuration.config_set());
240 let snapshot_buffering_max = dyncfgs::STORAGE_UPSERT_MAX_SNAPSHOT_BATCH_BUFFERING
242 .get(storage_configuration.config_set());
243
244 let rocksdb_use_native_merge_operator =
247 dyncfgs::STORAGE_ROCKSDB_USE_MERGE_OPERATOR.get(storage_configuration.config_set());
248
249 let upsert_config = UpsertConfig {
250 shrink_upsert_unused_buffers_by_ratio: storage_configuration
251 .parameters
252 .shrink_upsert_unused_buffers_by_ratio,
253 };
254
255 let thin_input = upsert_thinning(input);
256
257 if let Some(scratch_directory) = instance_context.scratch_directory.as_ref() {
258 let tuning = dataflow_paramters.upsert_rocksdb_tuning_config.clone();
259
260 let allow_auto_spill = storage_configuration
261 .parameters
262 .upsert_auto_spill_config
263 .allow_spilling_to_disk;
264 let spill_threshold = storage_configuration
265 .parameters
266 .upsert_auto_spill_config
267 .spill_to_disk_threshold_bytes;
268
269 tracing::info!(
270 worker_id = %source_config.worker_id,
271 source_id = %source_config.id,
272 ?tuning,
273 ?storage_configuration.parameters.upsert_auto_spill_config,
274 ?rocksdb_use_native_merge_operator,
275 "rendering upsert source with rocksdb-backed upsert state"
276 );
277 let rocksdb_shared_metrics = Arc::clone(&upsert_metrics.rocksdb_shared);
278 let rocksdb_instance_metrics = Arc::clone(&upsert_metrics.rocksdb_instance_metrics);
279 let rocksdb_dir = scratch_directory
280 .join("storage")
281 .join("upsert")
282 .join(source_config.id.to_string())
283 .join(source_config.worker_id.to_string());
284
285 let env = instance_context.rocksdb_env.clone();
286
287 let rocksdb_in_use_metric = Arc::clone(&upsert_metrics.rocksdb_autospill_in_use);
288
289 let rocksdb_init_fn = move || async move {
291 let merge_operator =
292 if rocksdb_use_native_merge_operator {
293 Some((
294 "upsert_state_snapshot_merge_v1".to_string(),
295 |a: &[u8],
296 b: ValueIterator<
297 BincodeOpts,
298 StateValue<G::Timestamp, Option<FromTime>>,
299 >| {
300 consolidating_merge_function::<G::Timestamp, Option<FromTime>>(
301 a.into(),
302 b,
303 )
304 },
305 ))
306 } else {
307 None
308 };
309 rocksdb::RocksDB::new(
310 mz_rocksdb::RocksDBInstance::new(
311 &rocksdb_dir,
312 mz_rocksdb::InstanceOptions::new(
313 env,
314 rocksdb_cleanup_tries,
315 merge_operator,
316 upsert_bincode_opts(),
319 ),
320 tuning,
321 rocksdb_shared_metrics,
322 rocksdb_instance_metrics,
323 )
324 .unwrap(),
325 )
326 };
327
328 if allow_auto_spill {
332 upsert_operator(
333 &thin_input,
334 upsert_envelope.key_indices,
335 resume_upper,
336 previous,
337 previous_token,
338 upsert_metrics,
339 source_config,
340 move || async move {
341 AutoSpillBackend::new(rocksdb_init_fn, spill_threshold, rocksdb_in_use_metric)
342 },
343 upsert_config,
344 storage_configuration,
345 prevent_snapshot_buffering,
346 snapshot_buffering_max,
347 )
348 } else {
349 upsert_operator(
350 &thin_input,
351 upsert_envelope.key_indices,
352 resume_upper,
353 previous,
354 previous_token,
355 upsert_metrics,
356 source_config,
357 rocksdb_init_fn,
358 upsert_config,
359 storage_configuration,
360 prevent_snapshot_buffering,
361 snapshot_buffering_max,
362 )
363 }
364 } else {
365 tracing::info!(
366 worker_id = %source_config.worker_id,
367 source_id = %source_config.id,
368 "rendering upsert source with memory-backed upsert state",
369 );
370 upsert_operator(
371 &thin_input,
372 upsert_envelope.key_indices,
373 resume_upper,
374 previous,
375 previous_token,
376 upsert_metrics,
377 source_config,
378 || async { InMemoryHashMap::default() },
379 upsert_config,
380 storage_configuration,
381 prevent_snapshot_buffering,
382 snapshot_buffering_max,
383 )
384 }
385}
386
387fn upsert_operator<G: Scope, FromTime, F, Fut, US>(
390 input: &Collection<G, (UpsertKey, Option<UpsertValue>, FromTime), Diff>,
391 key_indices: Vec<usize>,
392 resume_upper: Antichain<G::Timestamp>,
393 persist_input: Collection<G, Result<Row, DataflowError>, Diff>,
394 persist_token: Option<Vec<PressOnDropButton>>,
395 upsert_metrics: UpsertMetrics,
396 source_config: crate::source::SourceExportCreationConfig,
397 state: F,
398 upsert_config: UpsertConfig,
399 _storage_configuration: &StorageConfiguration,
400 prevent_snapshot_buffering: bool,
401 snapshot_buffering_max: Option<usize>,
402) -> (
403 Collection<G, Result<Row, DataflowError>, Diff>,
404 Stream<G, (Option<GlobalId>, HealthStatusUpdate)>,
405 Stream<G, Infallible>,
406 PressOnDropButton,
407)
408where
409 G::Timestamp: TotalOrder + Sync,
410 G::Timestamp: Refines<mz_repr::Timestamp> + TotalOrder + Sync,
411 F: FnOnce() -> Fut + 'static,
412 Fut: std::future::Future<Output = US>,
413 US: UpsertStateBackend<G::Timestamp, Option<FromTime>>,
414 FromTime: Debug + timely::ExchangeData + Ord + Sync,
415{
416 let use_continual_feedback_upsert = true;
420
421 tracing::info!(id = %source_config.id, %use_continual_feedback_upsert, "upsert operator implementation");
422
423 if use_continual_feedback_upsert {
424 upsert_continual_feedback::upsert_inner(
425 input,
426 key_indices,
427 resume_upper,
428 persist_input,
429 persist_token,
430 upsert_metrics,
431 source_config,
432 state,
433 upsert_config,
434 prevent_snapshot_buffering,
435 snapshot_buffering_max,
436 )
437 } else {
438 upsert_classic(
439 input,
440 key_indices,
441 resume_upper,
442 persist_input,
443 persist_token,
444 upsert_metrics,
445 source_config,
446 state,
447 upsert_config,
448 prevent_snapshot_buffering,
449 snapshot_buffering_max,
450 )
451 }
452}
453
454fn upsert_thinning<G, K, V, FromTime>(
459 input: &Collection<G, (K, V, FromTime), Diff>,
460) -> Collection<G, (K, V, FromTime), Diff>
461where
462 G: Scope,
463 G::Timestamp: TotalOrder,
464 K: timely::Data + Eq + Ord,
465 V: timely::Data,
466 FromTime: Timestamp,
467{
468 input
469 .inner
470 .unary(Pipeline, "UpsertThinning", |_, _| {
471 let mut capability: Option<InputCapability<G::Timestamp>> = None;
473 let mut updates = Vec::new();
475 move |input, output| {
476 while let Some((cap, data)) = input.next() {
477 assert!(
478 data.iter().all(|(_, _, diff)| diff.is_positive()),
479 "invalid upsert input"
480 );
481 updates.append(data);
482 match capability.as_mut() {
483 Some(capability) => {
484 if cap.time() <= capability.time() {
485 *capability = cap;
486 }
487 }
488 None => capability = Some(cap),
489 }
490 }
491 if let Some(capability) = capability.take() {
492 updates.sort_unstable_by(|a, b| {
495 let ((key1, _, from_time1), time1, _) = a;
496 let ((key2, _, from_time2), time2, _) = b;
497 Ord::cmp(
498 &(key1, time1, Reverse(from_time1)),
499 &(key2, time2, Reverse(from_time2)),
500 )
501 });
502 let mut session = output.session(&capability);
503 session.give_iterator(updates.drain(..).dedup_by(|a, b| {
504 let ((key1, _, _), time1, _) = a;
505 let ((key2, _, _), time2, _) = b;
506 (key1, time1) == (key2, time2)
507 }))
508 }
509 }
510 })
511 .as_collection()
512}
513
514fn stage_input<T, FromTime>(
517 stash: &mut Vec<(T, UpsertKey, Reverse<FromTime>, Option<UpsertValue>)>,
518 data: &mut Vec<((UpsertKey, Option<UpsertValue>, FromTime), T, Diff)>,
519 input_upper: &Antichain<T>,
520 resume_upper: &Antichain<T>,
521 storage_shrink_upsert_unused_buffers_by_ratio: usize,
522) where
523 T: PartialOrder,
524 FromTime: Ord,
525{
526 if PartialOrder::less_equal(input_upper, resume_upper) {
527 data.retain(|(_, ts, _)| resume_upper.less_equal(ts));
528 }
529
530 stash.extend(data.drain(..).map(|((key, value, order), time, diff)| {
531 assert!(diff.is_positive(), "invalid upsert input");
532 (time, key, Reverse(order), value)
533 }));
534
535 if storage_shrink_upsert_unused_buffers_by_ratio > 0 {
536 let reduced_capacity = stash.capacity() / storage_shrink_upsert_unused_buffers_by_ratio;
537 if reduced_capacity > stash.len() {
538 stash.shrink_to(reduced_capacity);
539 }
540 }
541}
542
543#[derive(Debug)]
546enum DrainStyle<'a, T> {
547 ToUpper(&'a Antichain<T>),
548 AtTime(T),
549}
550
551async fn drain_staged_input<S, G, T, FromTime, E>(
554 stash: &mut Vec<(T, UpsertKey, Reverse<FromTime>, Option<UpsertValue>)>,
555 commands_state: &mut indexmap::IndexMap<
556 UpsertKey,
557 types::UpsertValueAndSize<T, Option<FromTime>>,
558 >,
559 output_updates: &mut Vec<(Result<Row, UpsertError>, T, Diff)>,
560 multi_get_scratch: &mut Vec<UpsertKey>,
561 drain_style: DrainStyle<'_, T>,
562 error_emitter: &mut E,
563 state: &mut UpsertState<'_, S, T, Option<FromTime>>,
564) where
565 S: UpsertStateBackend<T, Option<FromTime>>,
566 G: Scope,
567 T: PartialOrder + Ord + Clone + Send + Sync + Serialize + Debug + 'static,
568 FromTime: timely::ExchangeData + Ord + Sync,
569 E: UpsertErrorEmitter<G>,
570{
571 stash.sort_unstable();
572
573 let idx = stash.partition_point(|(ts, _, _, _)| match &drain_style {
575 DrainStyle::ToUpper(upper) => !upper.less_equal(ts),
576 DrainStyle::AtTime(time) => ts <= time,
577 });
578
579 tracing::trace!(?drain_style, updates = idx, "draining stash in upsert");
580
581 commands_state.clear();
584 for (_, key, _, _) in stash.iter().take(idx) {
585 commands_state.entry(*key).or_default();
586 }
587
588 multi_get_scratch.clear();
591 multi_get_scratch.extend(commands_state.iter().map(|(k, _)| *k));
592 match state
593 .multi_get(multi_get_scratch.drain(..), commands_state.values_mut())
594 .await
595 {
596 Ok(_) => {}
597 Err(e) => {
598 error_emitter
599 .emit("Failed to fetch records from state".to_string(), e)
600 .await;
601 }
602 }
603
604 let mut commands = stash.drain(..idx).dedup_by(|a, b| {
608 let ((a_ts, a_key, _, _), (b_ts, b_key, _, _)) = (a, b);
609 a_ts == b_ts && a_key == b_key
610 });
611
612 let bincode_opts = types::upsert_bincode_opts();
613 while let Some((ts, key, from_time, value)) = commands.next() {
626 let mut command_state = if let Entry::Occupied(command_state) = commands_state.entry(key) {
627 command_state
628 } else {
629 panic!("key missing from commands_state");
630 };
631
632 let existing_value = &mut command_state.get_mut().value;
633
634 if let Some(cs) = existing_value.as_mut() {
635 cs.ensure_decoded(bincode_opts);
636 }
637
638 let existing_order = existing_value.as_ref().and_then(|cs| cs.order().as_ref());
642 if existing_order >= Some(&from_time.0) {
643 continue;
648 }
649
650 match value {
651 Some(value) => {
652 if let Some(old_value) = existing_value.replace(StateValue::finalized_value(
653 value.clone(),
654 Some(from_time.0.clone()),
655 )) {
656 if let Value::FinalizedValue(old_value, _) = old_value.into_decoded() {
657 output_updates.push((old_value, ts.clone(), Diff::MINUS_ONE));
658 }
659 }
660 output_updates.push((value, ts, Diff::ONE));
661 }
662 None => {
663 if let Some(old_value) = existing_value.take() {
664 if let Value::FinalizedValue(old_value, _) = old_value.into_decoded() {
665 output_updates.push((old_value, ts, Diff::MINUS_ONE));
666 }
667 }
668
669 *existing_value = Some(StateValue::tombstone(Some(from_time.0.clone())));
671 }
672 }
673 }
674
675 match state
676 .multi_put(
677 true, commands_state.drain(..).map(|(k, cv)| {
679 (
680 k,
681 types::PutValue {
682 value: cv.value.map(|cv| cv.into_decoded()),
683 previous_value_metadata: cv.metadata.map(|v| ValueMetadata {
684 size: v.size.try_into().expect("less than i64 size"),
685 is_tombstone: v.is_tombstone,
686 }),
687 },
688 )
689 }),
690 )
691 .await
692 {
693 Ok(_) => {}
694 Err(e) => {
695 error_emitter
696 .emit("Failed to update records in state".to_string(), e)
697 .await;
698 }
699 }
700}
701
702pub(crate) struct UpsertConfig {
705 pub shrink_upsert_unused_buffers_by_ratio: usize,
706}
707
708fn upsert_classic<G: Scope, FromTime, F, Fut, US>(
709 input: &Collection<G, (UpsertKey, Option<UpsertValue>, FromTime), Diff>,
710 key_indices: Vec<usize>,
711 resume_upper: Antichain<G::Timestamp>,
712 previous: Collection<G, Result<Row, DataflowError>, Diff>,
713 previous_token: Option<Vec<PressOnDropButton>>,
714 upsert_metrics: UpsertMetrics,
715 source_config: crate::source::SourceExportCreationConfig,
716 state: F,
717 upsert_config: UpsertConfig,
718 prevent_snapshot_buffering: bool,
719 snapshot_buffering_max: Option<usize>,
720) -> (
721 Collection<G, Result<Row, DataflowError>, Diff>,
722 Stream<G, (Option<GlobalId>, HealthStatusUpdate)>,
723 Stream<G, Infallible>,
724 PressOnDropButton,
725)
726where
727 G::Timestamp: TotalOrder + Sync,
728 F: FnOnce() -> Fut + 'static,
729 Fut: std::future::Future<Output = US>,
730 US: UpsertStateBackend<G::Timestamp, Option<FromTime>>,
731 FromTime: timely::ExchangeData + Ord + Sync,
732{
733 let mut builder = AsyncOperatorBuilder::new("Upsert".to_string(), input.scope());
734
735 let previous = previous.flat_map(move |result| {
737 let value = match result {
738 Ok(ok) => Ok(ok),
739 Err(DataflowError::EnvelopeError(err)) => match *err {
740 EnvelopeError::Upsert(err) => Err(err),
741 _ => return None,
742 },
743 Err(_) => return None,
744 };
745 Some((UpsertKey::from_value(value.as_ref(), &key_indices), value))
746 });
747 let (output_handle, output) = builder.new_output();
748
749 let (_snapshot_handle, snapshot_stream) =
752 builder.new_output::<CapacityContainerBuilder<Vec<Infallible>>>();
753
754 let (mut health_output, health_stream) = builder.new_output();
755 let mut input = builder.new_input_for(
756 &input.inner,
757 Exchange::new(move |((key, _, _), _, _)| UpsertKey::hashed(key)),
758 &output_handle,
759 );
760
761 let mut previous = builder.new_input_for(
762 &previous.inner,
763 Exchange::new(|((key, _), _, _)| UpsertKey::hashed(key)),
764 &output_handle,
765 );
766
767 let upsert_shared_metrics = Arc::clone(&upsert_metrics.shared);
768 let shutdown_button = builder.build(move |caps| async move {
769 let [mut output_cap, mut snapshot_cap, health_cap]: [_; 3] = caps.try_into().unwrap();
770
771 let mut state = UpsertState::<_, _, Option<FromTime>>::new(
775 state().await,
776 upsert_shared_metrics,
777 &upsert_metrics,
778 source_config.source_statistics,
779 upsert_config.shrink_upsert_unused_buffers_by_ratio,
780 );
781 let mut events = vec![];
782 let mut snapshot_upper = Antichain::from_elem(Timestamp::minimum());
783
784 let mut stash = vec![];
785
786 let mut error_emitter = (&mut health_output, &health_cap);
787
788 tracing::info!(
789 ?resume_upper,
790 ?snapshot_upper,
791 "timely-{} upsert source {} starting rehydration",
792 source_config.worker_id,
793 source_config.id
794 );
795 while !PartialOrder::less_equal(&resume_upper, &snapshot_upper) {
798 previous.ready().await;
799 while let Some(event) = previous.next_sync() {
800 match event {
801 AsyncEvent::Data(_cap, data) => {
802 events.extend(data.into_iter().filter_map(|((key, value), ts, diff)| {
803 if !resume_upper.less_equal(&ts) {
804 Some((key, value, diff))
805 } else {
806 None
807 }
808 }))
809 }
810 AsyncEvent::Progress(upper) => {
811 snapshot_upper = upper;
812 }
813 };
814 }
815
816 match state
817 .consolidate_chunk(
818 events.drain(..),
819 PartialOrder::less_equal(&resume_upper, &snapshot_upper),
820 )
821 .await
822 {
823 Ok(_) => {
824 if let Some(ts) = snapshot_upper.clone().into_option() {
825 if !resume_upper.less_equal(&ts) {
829 snapshot_cap.downgrade(&ts);
830 output_cap.downgrade(&ts);
831 }
832 }
833 }
834 Err(e) => {
835 UpsertErrorEmitter::<G>::emit(
836 &mut error_emitter,
837 "Failed to rehydrate state".to_string(),
838 e,
839 )
840 .await;
841 }
842 }
843 }
844
845 drop(events);
846 drop(previous_token);
847 drop(snapshot_cap);
848
849 while let Some(_event) = previous.next().await {}
855
856 if let Some(ts) = resume_upper.as_option() {
858 output_cap.downgrade(ts);
859 }
860
861 tracing::info!(
862 "timely-{} upsert source {} finished rehydration",
863 source_config.worker_id,
864 source_config.id
865 );
866
867 let mut commands_state: indexmap::IndexMap<
870 _,
871 types::UpsertValueAndSize<G::Timestamp, Option<FromTime>>,
872 > = indexmap::IndexMap::new();
873 let mut multi_get_scratch = Vec::new();
874
875 let mut output_updates = vec![];
877 let mut input_upper = Antichain::from_elem(Timestamp::minimum());
878
879 while let Some(event) = input.next().await {
880 let events = [event]
883 .into_iter()
884 .chain(std::iter::from_fn(|| input.next().now_or_never().flatten()))
885 .enumerate();
886
887 let mut partial_drain_time = None;
888 for (i, event) in events {
889 match event {
890 AsyncEvent::Data(cap, mut data) => {
891 tracing::trace!(
892 time=?cap.time(),
893 updates=%data.len(),
894 "received data in upsert"
895 );
896 stage_input(
897 &mut stash,
898 &mut data,
899 &input_upper,
900 &resume_upper,
901 upsert_config.shrink_upsert_unused_buffers_by_ratio,
902 );
903
904 let event_time = cap.time();
905 if prevent_snapshot_buffering && output_cap.time() == event_time {
912 partial_drain_time = Some(event_time.clone());
913 }
914 }
915 AsyncEvent::Progress(upper) => {
916 tracing::trace!(?upper, "received progress in upsert");
917 if PartialOrder::less_than(&upper, &resume_upper) {
920 continue;
921 }
922
923 partial_drain_time = None;
926 drain_staged_input::<_, G, _, _, _>(
927 &mut stash,
928 &mut commands_state,
929 &mut output_updates,
930 &mut multi_get_scratch,
931 DrainStyle::ToUpper(&upper),
932 &mut error_emitter,
933 &mut state,
934 )
935 .await;
936
937 output_handle.give_container(&output_cap, &mut output_updates);
938
939 if let Some(ts) = upper.as_option() {
940 output_cap.downgrade(ts);
941 }
942 input_upper = upper;
943 }
944 }
945 let events_processed = i + 1;
946 if let Some(max) = snapshot_buffering_max {
947 if events_processed >= max {
948 break;
949 }
950 }
951 }
952
953 if let Some(partial_drain_time) = partial_drain_time {
962 drain_staged_input::<_, G, _, _, _>(
963 &mut stash,
964 &mut commands_state,
965 &mut output_updates,
966 &mut multi_get_scratch,
967 DrainStyle::AtTime(partial_drain_time),
968 &mut error_emitter,
969 &mut state,
970 )
971 .await;
972
973 output_handle.give_container(&output_cap, &mut output_updates);
974 }
975 }
976 });
977
978 (
979 output.as_collection().map(|result| match result {
980 Ok(ok) => Ok(ok),
981 Err(err) => Err(DataflowError::from(EnvelopeError::Upsert(err))),
982 }),
983 health_stream,
984 snapshot_stream,
985 shutdown_button.press_on_drop(),
986 )
987}
988
989#[async_trait::async_trait(?Send)]
990pub(crate) trait UpsertErrorEmitter<G> {
991 async fn emit(&mut self, context: String, e: anyhow::Error);
992}
993
994#[async_trait::async_trait(?Send)]
995impl<G: Scope> UpsertErrorEmitter<G>
996 for (
997 &mut AsyncOutputHandle<
998 <G as ScopeParent>::Timestamp,
999 CapacityContainerBuilder<Vec<(Option<GlobalId>, HealthStatusUpdate)>>,
1000 Tee<<G as ScopeParent>::Timestamp, Vec<(Option<GlobalId>, HealthStatusUpdate)>>,
1001 >,
1002 &Capability<<G as ScopeParent>::Timestamp>,
1003 )
1004{
1005 async fn emit(&mut self, context: String, e: anyhow::Error) {
1006 process_upsert_state_error::<G>(context, e, self.0, self.1).await
1007 }
1008}
1009
1010async fn process_upsert_state_error<G: Scope>(
1012 context: String,
1013 e: anyhow::Error,
1014 health_output: &AsyncOutputHandle<
1015 <G as ScopeParent>::Timestamp,
1016 CapacityContainerBuilder<Vec<(Option<GlobalId>, HealthStatusUpdate)>>,
1017 Tee<<G as ScopeParent>::Timestamp, Vec<(Option<GlobalId>, HealthStatusUpdate)>>,
1018 >,
1019 health_cap: &Capability<<G as ScopeParent>::Timestamp>,
1020) {
1021 let update = HealthStatusUpdate::halting(e.context(context).to_string_with_causes(), None);
1022 health_output.give(health_cap, (None, update));
1023 std::future::pending::<()>().await;
1024 unreachable!("pending future never returns");
1025}