1use std::cell::RefCell;
11use std::cmp::Reverse;
12use std::convert::AsRef;
13use std::fmt::Debug;
14use std::hash::{Hash, Hasher};
15use std::sync::Arc;
16
17use differential_dataflow::hashable::Hashable;
18use differential_dataflow::{AsCollection, Collection};
19use futures::StreamExt;
20use futures::future::FutureExt;
21use indexmap::map::Entry;
22use itertools::Itertools;
23use mz_ore::error::ErrorExt;
24use mz_repr::{Datum, DatumVec, Diff, GlobalId, Row};
25use mz_rocksdb::ValueIterator;
26use mz_storage_operators::metrics::BackpressureMetrics;
27use mz_storage_types::configuration::StorageConfiguration;
28use mz_storage_types::dyncfgs;
29use mz_storage_types::errors::{DataflowError, EnvelopeError, UpsertError};
30use mz_storage_types::sources::envelope::UpsertEnvelope;
31use mz_timely_util::builder_async::{
32 AsyncOutputHandle, Event as AsyncEvent, OperatorBuilder as AsyncOperatorBuilder,
33 PressOnDropButton,
34};
35use serde::{Deserialize, Serialize};
36use sha2::{Digest, Sha256};
37use timely::dataflow::channels::pact::Exchange;
38use timely::dataflow::channels::pushers::Tee;
39use timely::dataflow::operators::{Capability, InputCapability, Operator};
40use timely::dataflow::{Scope, ScopeParent, Stream};
41use timely::order::{PartialOrder, TotalOrder};
42use timely::progress::timestamp::Refines;
43use timely::progress::{Antichain, Timestamp};
44
45use crate::healthcheck::HealthStatusUpdate;
46use crate::metrics::upsert::UpsertMetrics;
47use crate::storage_state::StorageInstanceContext;
48use crate::upsert_continual_feedback;
49use autospill::AutoSpillBackend;
50use memory::InMemoryHashMap;
51use types::{
52 BincodeOpts, StateValue, UpsertState, UpsertStateBackend, Value, consolidating_merge_function,
53 upsert_bincode_opts,
54};
55
56mod autospill;
57mod memory;
58mod rocksdb;
59pub(crate) mod types;
61
62pub type UpsertValue = Result<Row, UpsertError>;
63
64#[derive(Copy, Clone, Debug, Hash, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
65pub struct UpsertKey([u8; 32]);
66
67impl AsRef<[u8]> for UpsertKey {
68 #[inline(always)]
69 fn as_ref(&self) -> &[u8] {
73 &self.0
74 }
75}
76
77impl From<&[u8]> for UpsertKey {
78 fn from(bytes: &[u8]) -> Self {
79 UpsertKey(bytes.try_into().expect("invalid key length"))
80 }
81}
82
83type KeyHash = Sha256;
88
89impl UpsertKey {
90 pub fn from_key(key: Result<&Row, &UpsertError>) -> Self {
91 Self::from_iter(key.map(|r| r.iter()))
92 }
93
94 pub fn from_value(value: Result<&Row, &UpsertError>, key_indices: &[usize]) -> Self {
95 thread_local! {
96 static VALUE_DATUMS: RefCell<DatumVec> = RefCell::new(DatumVec::new());
98 }
99 VALUE_DATUMS.with(|value_datums| {
100 let mut value_datums = value_datums.borrow_mut();
101 let value = value.map(|v| value_datums.borrow_with(v));
102 let key = match value {
103 Ok(ref datums) => Ok(key_indices.iter().map(|&idx| datums[idx])),
104 Err(err) => Err(err),
105 };
106 Self::from_iter(key)
107 })
108 }
109
110 pub fn from_iter<'a, 'b>(
111 key: Result<impl Iterator<Item = Datum<'a>> + 'b, &UpsertError>,
112 ) -> Self {
113 thread_local! {
114 static KEY_DATUMS: RefCell<DatumVec> = RefCell::new(DatumVec::new());
116 }
117 KEY_DATUMS.with(|key_datums| {
118 let mut key_datums = key_datums.borrow_mut();
119 let mut key_datums = key_datums.borrow();
122 let key: Result<&[Datum], Datum> = match key {
123 Ok(key) => {
124 for datum in key {
125 key_datums.push(datum);
126 }
127 Ok(&*key_datums)
128 }
129 Err(UpsertError::Value(err)) => {
130 key_datums.extend(err.for_key.iter());
131 Ok(&*key_datums)
132 }
133 Err(UpsertError::KeyDecode(err)) => Err(Datum::Bytes(&err.raw)),
134 Err(UpsertError::NullKey(_)) => Err(Datum::Null),
135 };
136 let mut hasher = DigestHasher(KeyHash::new());
137 key.hash(&mut hasher);
138 Self(hasher.0.finalize().into())
139 })
140 }
141}
142
143struct DigestHasher<H: Digest>(H);
144
145impl<H: Digest> Hasher for DigestHasher<H> {
146 fn write(&mut self, bytes: &[u8]) {
147 self.0.update(bytes);
148 }
149
150 fn finish(&self) -> u64 {
151 panic!("digest wrapper used to produce a hash");
152 }
153}
154
155use std::convert::Infallible;
156use timely::container::CapacityContainerBuilder;
157use timely::dataflow::channels::pact::Pipeline;
158
159use self::types::ValueMetadata;
160
161pub fn rehydration_finished<G, T>(
165 scope: G,
166 source_config: &crate::source::RawSourceCreationConfig,
167 token: impl std::any::Any + 'static,
169 resume_upper: Antichain<T>,
170 input: &Stream<G, Infallible>,
171) where
172 G: Scope<Timestamp = T>,
173 T: Timestamp,
174{
175 let worker_id = source_config.worker_id;
176 let id = source_config.id;
177 let mut builder = AsyncOperatorBuilder::new(format!("rehydration_finished({id}"), scope);
178 let mut input = builder.new_disconnected_input(input, Pipeline);
179
180 builder.build(move |_capabilities| async move {
181 let mut input_upper = Antichain::from_elem(Timestamp::minimum());
182 while !PartialOrder::less_equal(&resume_upper, &input_upper) {
184 let Some(event) = input.next().await else {
185 break;
186 };
187 if let AsyncEvent::Progress(upper) = event {
188 input_upper = upper;
189 }
190 }
191 tracing::info!(
192 %worker_id,
193 source_id = %id,
194 "upsert source has downgraded past the resume upper ({resume_upper:?}) across all workers",
195 );
196 drop(token);
197 });
198}
199
200pub(crate) fn upsert<G: Scope, FromTime>(
206 input: &Collection<G, (UpsertKey, Option<UpsertValue>, FromTime), Diff>,
207 upsert_envelope: UpsertEnvelope,
208 resume_upper: Antichain<G::Timestamp>,
209 previous: Collection<G, Result<Row, DataflowError>, Diff>,
210 previous_token: Option<Vec<PressOnDropButton>>,
211 source_config: crate::source::SourceExportCreationConfig,
212 instance_context: &StorageInstanceContext,
213 storage_configuration: &StorageConfiguration,
214 dataflow_paramters: &crate::internal_control::DataflowParameters,
215 backpressure_metrics: Option<BackpressureMetrics>,
216) -> (
217 Collection<G, Result<Row, DataflowError>, Diff>,
218 Stream<G, (Option<GlobalId>, HealthStatusUpdate)>,
219 Stream<G, Infallible>,
220 PressOnDropButton,
221)
222where
223 G::Timestamp: TotalOrder + Sync,
224 G::Timestamp: Refines<mz_repr::Timestamp> + TotalOrder + Sync,
225 FromTime: Timestamp + Sync,
226{
227 let upsert_metrics = source_config.metrics.get_upsert_metrics(
228 source_config.id,
229 source_config.worker_id,
230 backpressure_metrics,
231 );
232
233 let rocksdb_cleanup_tries =
234 dyncfgs::STORAGE_ROCKSDB_CLEANUP_TRIES.get(storage_configuration.config_set());
235
236 let prevent_snapshot_buffering =
239 dyncfgs::STORAGE_UPSERT_PREVENT_SNAPSHOT_BUFFERING.get(storage_configuration.config_set());
240 let snapshot_buffering_max = dyncfgs::STORAGE_UPSERT_MAX_SNAPSHOT_BATCH_BUFFERING
242 .get(storage_configuration.config_set());
243
244 let rocksdb_use_native_merge_operator =
247 dyncfgs::STORAGE_ROCKSDB_USE_MERGE_OPERATOR.get(storage_configuration.config_set());
248
249 let upsert_config = UpsertConfig {
250 shrink_upsert_unused_buffers_by_ratio: storage_configuration
251 .parameters
252 .shrink_upsert_unused_buffers_by_ratio,
253 };
254
255 let thin_input = upsert_thinning(input);
256
257 if let Some(scratch_directory) = instance_context.scratch_directory.as_ref() {
258 let tuning = dataflow_paramters.upsert_rocksdb_tuning_config.clone();
259
260 let allow_auto_spill = storage_configuration
261 .parameters
262 .upsert_auto_spill_config
263 .allow_spilling_to_disk;
264 let spill_threshold = storage_configuration
265 .parameters
266 .upsert_auto_spill_config
267 .spill_to_disk_threshold_bytes;
268
269 tracing::info!(
270 worker_id = %source_config.worker_id,
271 source_id = %source_config.id,
272 ?tuning,
273 ?storage_configuration.parameters.upsert_auto_spill_config,
274 ?rocksdb_use_native_merge_operator,
275 "rendering upsert source with rocksdb-backed upsert state"
276 );
277 let rocksdb_shared_metrics = Arc::clone(&upsert_metrics.rocksdb_shared);
278 let rocksdb_instance_metrics = Arc::clone(&upsert_metrics.rocksdb_instance_metrics);
279 let rocksdb_dir = scratch_directory
280 .join("storage")
281 .join("upsert")
282 .join(source_config.id.to_string())
283 .join(source_config.worker_id.to_string());
284
285 let env = instance_context.rocksdb_env.clone();
286
287 let rocksdb_in_use_metric = Arc::clone(&upsert_metrics.rocksdb_autospill_in_use);
288
289 let rocksdb_init_fn = move || async move {
291 let merge_operator =
292 if rocksdb_use_native_merge_operator {
293 Some((
294 "upsert_state_snapshot_merge_v1".to_string(),
295 |a: &[u8],
296 b: ValueIterator<
297 BincodeOpts,
298 StateValue<G::Timestamp, Option<FromTime>>,
299 >| {
300 consolidating_merge_function::<G::Timestamp, Option<FromTime>>(
301 a.into(),
302 b,
303 )
304 },
305 ))
306 } else {
307 None
308 };
309 rocksdb::RocksDB::new(
310 mz_rocksdb::RocksDBInstance::new(
311 &rocksdb_dir,
312 mz_rocksdb::InstanceOptions::new(
313 env,
314 rocksdb_cleanup_tries,
315 merge_operator,
316 upsert_bincode_opts(),
319 ),
320 tuning,
321 rocksdb_shared_metrics,
322 rocksdb_instance_metrics,
323 )
324 .await
325 .unwrap(),
326 )
327 };
328
329 if allow_auto_spill {
333 upsert_operator(
334 &thin_input,
335 upsert_envelope.key_indices,
336 resume_upper,
337 previous,
338 previous_token,
339 upsert_metrics,
340 source_config,
341 move || async move {
342 AutoSpillBackend::new(rocksdb_init_fn, spill_threshold, rocksdb_in_use_metric)
343 },
344 upsert_config,
345 storage_configuration,
346 prevent_snapshot_buffering,
347 snapshot_buffering_max,
348 )
349 } else {
350 upsert_operator(
351 &thin_input,
352 upsert_envelope.key_indices,
353 resume_upper,
354 previous,
355 previous_token,
356 upsert_metrics,
357 source_config,
358 rocksdb_init_fn,
359 upsert_config,
360 storage_configuration,
361 prevent_snapshot_buffering,
362 snapshot_buffering_max,
363 )
364 }
365 } else {
366 tracing::info!(
367 worker_id = %source_config.worker_id,
368 source_id = %source_config.id,
369 "rendering upsert source with memory-backed upsert state",
370 );
371 upsert_operator(
372 &thin_input,
373 upsert_envelope.key_indices,
374 resume_upper,
375 previous,
376 previous_token,
377 upsert_metrics,
378 source_config,
379 || async { InMemoryHashMap::default() },
380 upsert_config,
381 storage_configuration,
382 prevent_snapshot_buffering,
383 snapshot_buffering_max,
384 )
385 }
386}
387
388fn upsert_operator<G: Scope, FromTime, F, Fut, US>(
391 input: &Collection<G, (UpsertKey, Option<UpsertValue>, FromTime), Diff>,
392 key_indices: Vec<usize>,
393 resume_upper: Antichain<G::Timestamp>,
394 persist_input: Collection<G, Result<Row, DataflowError>, Diff>,
395 persist_token: Option<Vec<PressOnDropButton>>,
396 upsert_metrics: UpsertMetrics,
397 source_config: crate::source::SourceExportCreationConfig,
398 state: F,
399 upsert_config: UpsertConfig,
400 _storage_configuration: &StorageConfiguration,
401 prevent_snapshot_buffering: bool,
402 snapshot_buffering_max: Option<usize>,
403) -> (
404 Collection<G, Result<Row, DataflowError>, Diff>,
405 Stream<G, (Option<GlobalId>, HealthStatusUpdate)>,
406 Stream<G, Infallible>,
407 PressOnDropButton,
408)
409where
410 G::Timestamp: TotalOrder + Sync,
411 G::Timestamp: Refines<mz_repr::Timestamp> + TotalOrder + Sync,
412 F: FnOnce() -> Fut + 'static,
413 Fut: std::future::Future<Output = US>,
414 US: UpsertStateBackend<G::Timestamp, Option<FromTime>>,
415 FromTime: Debug + timely::ExchangeData + Ord + Sync,
416{
417 let use_continual_feedback_upsert = true;
421
422 tracing::info!(id = %source_config.id, %use_continual_feedback_upsert, "upsert operator implementation");
423
424 if use_continual_feedback_upsert {
425 upsert_continual_feedback::upsert_inner(
426 input,
427 key_indices,
428 resume_upper,
429 persist_input,
430 persist_token,
431 upsert_metrics,
432 source_config,
433 state,
434 upsert_config,
435 prevent_snapshot_buffering,
436 snapshot_buffering_max,
437 )
438 } else {
439 upsert_classic(
440 input,
441 key_indices,
442 resume_upper,
443 persist_input,
444 persist_token,
445 upsert_metrics,
446 source_config,
447 state,
448 upsert_config,
449 prevent_snapshot_buffering,
450 snapshot_buffering_max,
451 )
452 }
453}
454
455fn upsert_thinning<G, K, V, FromTime>(
460 input: &Collection<G, (K, V, FromTime), Diff>,
461) -> Collection<G, (K, V, FromTime), Diff>
462where
463 G: Scope,
464 G::Timestamp: TotalOrder,
465 K: timely::Data + Eq + Ord,
466 V: timely::Data,
467 FromTime: Timestamp,
468{
469 input
470 .inner
471 .unary(Pipeline, "UpsertThinning", |_, _| {
472 let mut capability: Option<InputCapability<G::Timestamp>> = None;
474 let mut updates = Vec::new();
476 move |input, output| {
477 while let Some((cap, data)) = input.next() {
478 assert!(
479 data.iter().all(|(_, _, diff)| diff.is_positive()),
480 "invalid upsert input"
481 );
482 updates.append(data);
483 match capability.as_mut() {
484 Some(capability) => {
485 if cap.time() <= capability.time() {
486 *capability = cap;
487 }
488 }
489 None => capability = Some(cap),
490 }
491 }
492 if let Some(capability) = capability.take() {
493 updates.sort_unstable_by(|a, b| {
496 let ((key1, _, from_time1), time1, _) = a;
497 let ((key2, _, from_time2), time2, _) = b;
498 Ord::cmp(
499 &(key1, time1, Reverse(from_time1)),
500 &(key2, time2, Reverse(from_time2)),
501 )
502 });
503 let mut session = output.session(&capability);
504 session.give_iterator(updates.drain(..).dedup_by(|a, b| {
505 let ((key1, _, _), time1, _) = a;
506 let ((key2, _, _), time2, _) = b;
507 (key1, time1) == (key2, time2)
508 }))
509 }
510 }
511 })
512 .as_collection()
513}
514
515fn stage_input<T, FromTime>(
518 stash: &mut Vec<(T, UpsertKey, Reverse<FromTime>, Option<UpsertValue>)>,
519 data: &mut Vec<((UpsertKey, Option<UpsertValue>, FromTime), T, Diff)>,
520 input_upper: &Antichain<T>,
521 resume_upper: &Antichain<T>,
522 storage_shrink_upsert_unused_buffers_by_ratio: usize,
523) where
524 T: PartialOrder,
525 FromTime: Ord,
526{
527 if PartialOrder::less_equal(input_upper, resume_upper) {
528 data.retain(|(_, ts, _)| resume_upper.less_equal(ts));
529 }
530
531 stash.extend(data.drain(..).map(|((key, value, order), time, diff)| {
532 assert!(diff.is_positive(), "invalid upsert input");
533 (time, key, Reverse(order), value)
534 }));
535
536 if storage_shrink_upsert_unused_buffers_by_ratio > 0 {
537 let reduced_capacity = stash.capacity() / storage_shrink_upsert_unused_buffers_by_ratio;
538 if reduced_capacity > stash.len() {
539 stash.shrink_to(reduced_capacity);
540 }
541 }
542}
543
544#[derive(Debug)]
547enum DrainStyle<'a, T> {
548 ToUpper(&'a Antichain<T>),
549 AtTime(T),
550}
551
552async fn drain_staged_input<S, G, T, FromTime, E>(
555 stash: &mut Vec<(T, UpsertKey, Reverse<FromTime>, Option<UpsertValue>)>,
556 commands_state: &mut indexmap::IndexMap<
557 UpsertKey,
558 types::UpsertValueAndSize<T, Option<FromTime>>,
559 >,
560 output_updates: &mut Vec<(Result<Row, UpsertError>, T, Diff)>,
561 multi_get_scratch: &mut Vec<UpsertKey>,
562 drain_style: DrainStyle<'_, T>,
563 error_emitter: &mut E,
564 state: &mut UpsertState<'_, S, T, Option<FromTime>>,
565) where
566 S: UpsertStateBackend<T, Option<FromTime>>,
567 G: Scope,
568 T: PartialOrder + Ord + Clone + Send + Sync + Serialize + Debug + 'static,
569 FromTime: timely::ExchangeData + Ord + Sync,
570 E: UpsertErrorEmitter<G>,
571{
572 stash.sort_unstable();
573
574 let idx = stash.partition_point(|(ts, _, _, _)| match &drain_style {
576 DrainStyle::ToUpper(upper) => !upper.less_equal(ts),
577 DrainStyle::AtTime(time) => ts <= time,
578 });
579
580 tracing::trace!(?drain_style, updates = idx, "draining stash in upsert");
581
582 commands_state.clear();
585 for (_, key, _, _) in stash.iter().take(idx) {
586 commands_state.entry(*key).or_default();
587 }
588
589 multi_get_scratch.clear();
592 multi_get_scratch.extend(commands_state.iter().map(|(k, _)| *k));
593 match state
594 .multi_get(multi_get_scratch.drain(..), commands_state.values_mut())
595 .await
596 {
597 Ok(_) => {}
598 Err(e) => {
599 error_emitter
600 .emit("Failed to fetch records from state".to_string(), e)
601 .await;
602 }
603 }
604
605 let mut commands = stash.drain(..idx).dedup_by(|a, b| {
609 let ((a_ts, a_key, _, _), (b_ts, b_key, _, _)) = (a, b);
610 a_ts == b_ts && a_key == b_key
611 });
612
613 let bincode_opts = types::upsert_bincode_opts();
614 while let Some((ts, key, from_time, value)) = commands.next() {
627 let mut command_state = if let Entry::Occupied(command_state) = commands_state.entry(key) {
628 command_state
629 } else {
630 panic!("key missing from commands_state");
631 };
632
633 let existing_value = &mut command_state.get_mut().value;
634
635 if let Some(cs) = existing_value.as_mut() {
636 cs.ensure_decoded(bincode_opts);
637 }
638
639 let existing_order = existing_value.as_ref().and_then(|cs| cs.order().as_ref());
643 if existing_order >= Some(&from_time.0) {
644 continue;
649 }
650
651 match value {
652 Some(value) => {
653 if let Some(old_value) = existing_value.replace(StateValue::finalized_value(
654 value.clone(),
655 Some(from_time.0.clone()),
656 )) {
657 if let Value::FinalizedValue(old_value, _) = old_value.into_decoded() {
658 output_updates.push((old_value, ts.clone(), Diff::MINUS_ONE));
659 }
660 }
661 output_updates.push((value, ts, Diff::ONE));
662 }
663 None => {
664 if let Some(old_value) = existing_value.take() {
665 if let Value::FinalizedValue(old_value, _) = old_value.into_decoded() {
666 output_updates.push((old_value, ts, Diff::MINUS_ONE));
667 }
668 }
669
670 *existing_value = Some(StateValue::tombstone(Some(from_time.0.clone())));
672 }
673 }
674 }
675
676 match state
677 .multi_put(
678 true, commands_state.drain(..).map(|(k, cv)| {
680 (
681 k,
682 types::PutValue {
683 value: cv.value.map(|cv| cv.into_decoded()),
684 previous_value_metadata: cv.metadata.map(|v| ValueMetadata {
685 size: v.size.try_into().expect("less than i64 size"),
686 is_tombstone: v.is_tombstone,
687 }),
688 },
689 )
690 }),
691 )
692 .await
693 {
694 Ok(_) => {}
695 Err(e) => {
696 error_emitter
697 .emit("Failed to update records in state".to_string(), e)
698 .await;
699 }
700 }
701}
702
703pub(crate) struct UpsertConfig {
706 pub shrink_upsert_unused_buffers_by_ratio: usize,
707}
708
709fn upsert_classic<G: Scope, FromTime, F, Fut, US>(
710 input: &Collection<G, (UpsertKey, Option<UpsertValue>, FromTime), Diff>,
711 key_indices: Vec<usize>,
712 resume_upper: Antichain<G::Timestamp>,
713 previous: Collection<G, Result<Row, DataflowError>, Diff>,
714 previous_token: Option<Vec<PressOnDropButton>>,
715 upsert_metrics: UpsertMetrics,
716 source_config: crate::source::SourceExportCreationConfig,
717 state: F,
718 upsert_config: UpsertConfig,
719 prevent_snapshot_buffering: bool,
720 snapshot_buffering_max: Option<usize>,
721) -> (
722 Collection<G, Result<Row, DataflowError>, Diff>,
723 Stream<G, (Option<GlobalId>, HealthStatusUpdate)>,
724 Stream<G, Infallible>,
725 PressOnDropButton,
726)
727where
728 G::Timestamp: TotalOrder + Sync,
729 F: FnOnce() -> Fut + 'static,
730 Fut: std::future::Future<Output = US>,
731 US: UpsertStateBackend<G::Timestamp, Option<FromTime>>,
732 FromTime: timely::ExchangeData + Ord + Sync,
733{
734 let mut builder = AsyncOperatorBuilder::new("Upsert".to_string(), input.scope());
735
736 let previous = previous.flat_map(move |result| {
738 let value = match result {
739 Ok(ok) => Ok(ok),
740 Err(DataflowError::EnvelopeError(err)) => match *err {
741 EnvelopeError::Upsert(err) => Err(err),
742 _ => return None,
743 },
744 Err(_) => return None,
745 };
746 Some((UpsertKey::from_value(value.as_ref(), &key_indices), value))
747 });
748 let (output_handle, output) = builder.new_output();
749
750 let (_snapshot_handle, snapshot_stream) =
753 builder.new_output::<CapacityContainerBuilder<Vec<Infallible>>>();
754
755 let (mut health_output, health_stream) = builder.new_output();
756 let mut input = builder.new_input_for(
757 &input.inner,
758 Exchange::new(move |((key, _, _), _, _)| UpsertKey::hashed(key)),
759 &output_handle,
760 );
761
762 let mut previous = builder.new_input_for(
763 &previous.inner,
764 Exchange::new(|((key, _), _, _)| UpsertKey::hashed(key)),
765 &output_handle,
766 );
767
768 let upsert_shared_metrics = Arc::clone(&upsert_metrics.shared);
769 let shutdown_button = builder.build(move |caps| async move {
770 let [mut output_cap, mut snapshot_cap, health_cap]: [_; 3] = caps.try_into().unwrap();
771
772 let mut state = UpsertState::<_, _, Option<FromTime>>::new(
776 state().await,
777 upsert_shared_metrics,
778 &upsert_metrics,
779 source_config.source_statistics,
780 upsert_config.shrink_upsert_unused_buffers_by_ratio,
781 );
782 let mut events = vec![];
783 let mut snapshot_upper = Antichain::from_elem(Timestamp::minimum());
784
785 let mut stash = vec![];
786
787 let mut error_emitter = (&mut health_output, &health_cap);
788
789 tracing::info!(
790 ?resume_upper,
791 ?snapshot_upper,
792 "timely-{} upsert source {} starting rehydration",
793 source_config.worker_id,
794 source_config.id
795 );
796 while !PartialOrder::less_equal(&resume_upper, &snapshot_upper) {
799 previous.ready().await;
800 while let Some(event) = previous.next_sync() {
801 match event {
802 AsyncEvent::Data(_cap, data) => {
803 events.extend(data.into_iter().filter_map(|((key, value), ts, diff)| {
804 if !resume_upper.less_equal(&ts) {
805 Some((key, value, diff))
806 } else {
807 None
808 }
809 }))
810 }
811 AsyncEvent::Progress(upper) => {
812 snapshot_upper = upper;
813 }
814 };
815 }
816
817 match state
818 .consolidate_chunk(
819 events.drain(..),
820 PartialOrder::less_equal(&resume_upper, &snapshot_upper),
821 )
822 .await
823 {
824 Ok(_) => {
825 if let Some(ts) = snapshot_upper.clone().into_option() {
826 if !resume_upper.less_equal(&ts) {
830 snapshot_cap.downgrade(&ts);
831 output_cap.downgrade(&ts);
832 }
833 }
834 }
835 Err(e) => {
836 UpsertErrorEmitter::<G>::emit(
837 &mut error_emitter,
838 "Failed to rehydrate state".to_string(),
839 e,
840 )
841 .await;
842 }
843 }
844 }
845
846 drop(events);
847 drop(previous_token);
848 drop(snapshot_cap);
849
850 while let Some(_event) = previous.next().await {}
856
857 if let Some(ts) = resume_upper.as_option() {
859 output_cap.downgrade(ts);
860 }
861
862 tracing::info!(
863 "timely-{} upsert source {} finished rehydration",
864 source_config.worker_id,
865 source_config.id
866 );
867
868 let mut commands_state: indexmap::IndexMap<
871 _,
872 types::UpsertValueAndSize<G::Timestamp, Option<FromTime>>,
873 > = indexmap::IndexMap::new();
874 let mut multi_get_scratch = Vec::new();
875
876 let mut output_updates = vec![];
878 let mut input_upper = Antichain::from_elem(Timestamp::minimum());
879
880 while let Some(event) = input.next().await {
881 let events = [event]
884 .into_iter()
885 .chain(std::iter::from_fn(|| input.next().now_or_never().flatten()))
886 .enumerate();
887
888 let mut partial_drain_time = None;
889 for (i, event) in events {
890 match event {
891 AsyncEvent::Data(cap, mut data) => {
892 tracing::trace!(
893 time=?cap.time(),
894 updates=%data.len(),
895 "received data in upsert"
896 );
897 stage_input(
898 &mut stash,
899 &mut data,
900 &input_upper,
901 &resume_upper,
902 upsert_config.shrink_upsert_unused_buffers_by_ratio,
903 );
904
905 let event_time = cap.time();
906 if prevent_snapshot_buffering && output_cap.time() == event_time {
913 partial_drain_time = Some(event_time.clone());
914 }
915 }
916 AsyncEvent::Progress(upper) => {
917 tracing::trace!(?upper, "received progress in upsert");
918 if PartialOrder::less_than(&upper, &resume_upper) {
921 continue;
922 }
923
924 partial_drain_time = None;
927 drain_staged_input::<_, G, _, _, _>(
928 &mut stash,
929 &mut commands_state,
930 &mut output_updates,
931 &mut multi_get_scratch,
932 DrainStyle::ToUpper(&upper),
933 &mut error_emitter,
934 &mut state,
935 )
936 .await;
937
938 output_handle.give_container(&output_cap, &mut output_updates);
939
940 if let Some(ts) = upper.as_option() {
941 output_cap.downgrade(ts);
942 }
943 input_upper = upper;
944 }
945 }
946 let events_processed = i + 1;
947 if let Some(max) = snapshot_buffering_max {
948 if events_processed >= max {
949 break;
950 }
951 }
952 }
953
954 if let Some(partial_drain_time) = partial_drain_time {
963 drain_staged_input::<_, G, _, _, _>(
964 &mut stash,
965 &mut commands_state,
966 &mut output_updates,
967 &mut multi_get_scratch,
968 DrainStyle::AtTime(partial_drain_time),
969 &mut error_emitter,
970 &mut state,
971 )
972 .await;
973
974 output_handle.give_container(&output_cap, &mut output_updates);
975 }
976 }
977 });
978
979 (
980 output.as_collection().map(|result| match result {
981 Ok(ok) => Ok(ok),
982 Err(err) => Err(DataflowError::from(EnvelopeError::Upsert(err))),
983 }),
984 health_stream,
985 snapshot_stream,
986 shutdown_button.press_on_drop(),
987 )
988}
989
990#[async_trait::async_trait(?Send)]
991pub(crate) trait UpsertErrorEmitter<G> {
992 async fn emit(&mut self, context: String, e: anyhow::Error);
993}
994
995#[async_trait::async_trait(?Send)]
996impl<G: Scope> UpsertErrorEmitter<G>
997 for (
998 &mut AsyncOutputHandle<
999 <G as ScopeParent>::Timestamp,
1000 CapacityContainerBuilder<Vec<(Option<GlobalId>, HealthStatusUpdate)>>,
1001 Tee<<G as ScopeParent>::Timestamp, Vec<(Option<GlobalId>, HealthStatusUpdate)>>,
1002 >,
1003 &Capability<<G as ScopeParent>::Timestamp>,
1004 )
1005{
1006 async fn emit(&mut self, context: String, e: anyhow::Error) {
1007 process_upsert_state_error::<G>(context, e, self.0, self.1).await
1008 }
1009}
1010
1011async fn process_upsert_state_error<G: Scope>(
1013 context: String,
1014 e: anyhow::Error,
1015 health_output: &AsyncOutputHandle<
1016 <G as ScopeParent>::Timestamp,
1017 CapacityContainerBuilder<Vec<(Option<GlobalId>, HealthStatusUpdate)>>,
1018 Tee<<G as ScopeParent>::Timestamp, Vec<(Option<GlobalId>, HealthStatusUpdate)>>,
1019 >,
1020 health_cap: &Capability<<G as ScopeParent>::Timestamp>,
1021) {
1022 let update = HealthStatusUpdate::halting(e.context(context).to_string_with_causes(), None);
1023 health_output.give(health_cap, (None, update));
1024 std::future::pending::<()>().await;
1025 unreachable!("pending future never returns");
1026}