1use std::cell::RefCell;
11use std::cmp::Reverse;
12use std::convert::AsRef;
13use std::fmt::Debug;
14use std::hash::{Hash, Hasher};
15use std::path::PathBuf;
16use std::sync::Arc;
17
18use differential_dataflow::hashable::Hashable;
19use differential_dataflow::{AsCollection, VecCollection};
20use futures::StreamExt;
21use futures::future::FutureExt;
22use indexmap::map::Entry;
23use itertools::Itertools;
24use mz_ore::error::ErrorExt;
25use mz_repr::{Datum, DatumVec, Diff, GlobalId, Row};
26use mz_rocksdb::ValueIterator;
27use mz_storage_operators::metrics::BackpressureMetrics;
28use mz_storage_types::configuration::StorageConfiguration;
29use mz_storage_types::dyncfgs;
30use mz_storage_types::errors::{DataflowError, EnvelopeError, UpsertError};
31use mz_storage_types::sources::envelope::UpsertEnvelope;
32use mz_timely_util::builder_async::{
33 AsyncOutputHandle, Event as AsyncEvent, OperatorBuilder as AsyncOperatorBuilder,
34 PressOnDropButton,
35};
36use serde::{Deserialize, Serialize};
37use sha2::{Digest, Sha256};
38use timely::dataflow::channels::pact::Exchange;
39use timely::dataflow::operators::{Capability, InputCapability, Operator};
40use timely::dataflow::{Scope, ScopeParent, Stream};
41use timely::order::{PartialOrder, TotalOrder};
42use timely::progress::timestamp::Refines;
43use timely::progress::{Antichain, Timestamp};
44
45use crate::healthcheck::HealthStatusUpdate;
46use crate::metrics::upsert::UpsertMetrics;
47use crate::storage_state::StorageInstanceContext;
48use crate::upsert_continual_feedback;
49use types::{
50 BincodeOpts, StateValue, UpsertState, UpsertStateBackend, consolidating_merge_function,
51 upsert_bincode_opts,
52};
53
54#[cfg(test)]
55pub mod memory;
56pub(crate) mod rocksdb;
57pub(crate) mod types;
59
60pub type UpsertValue = Result<Row, Box<UpsertError>>;
61
62#[derive(
63 Copy,
64 Clone,
65 Hash,
66 PartialEq,
67 Eq,
68 PartialOrd,
69 Ord,
70 Serialize,
71 Deserialize
72)]
73pub struct UpsertKey([u8; 32]);
74
75impl Debug for UpsertKey {
76 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
77 write!(f, "0x")?;
78 for byte in self.0 {
79 write!(f, "{:02x}", byte)?;
80 }
81 Ok(())
82 }
83}
84
85impl AsRef<[u8]> for UpsertKey {
86 #[inline(always)]
87 fn as_ref(&self) -> &[u8] {
91 &self.0
92 }
93}
94
95impl From<&[u8]> for UpsertKey {
96 fn from(bytes: &[u8]) -> Self {
97 UpsertKey(bytes.try_into().expect("invalid key length"))
98 }
99}
100
101type KeyHash = Sha256;
106
107impl UpsertKey {
108 pub fn from_key(key: Result<&Row, &UpsertError>) -> Self {
109 Self::from_iter(key.map(|r| r.iter()))
110 }
111
112 pub fn from_value(value: Result<&Row, &UpsertError>, key_indices: &[usize]) -> Self {
113 thread_local! {
114 static VALUE_DATUMS: RefCell<DatumVec> = RefCell::new(DatumVec::new());
116 }
117 VALUE_DATUMS.with(|value_datums| {
118 let mut value_datums = value_datums.borrow_mut();
119 let value = value.map(|v| value_datums.borrow_with(v));
120 let key = match value {
121 Ok(ref datums) => Ok(key_indices.iter().map(|&idx| datums[idx])),
122 Err(err) => Err(err),
123 };
124 Self::from_iter(key)
125 })
126 }
127
128 pub fn from_iter<'a, 'b>(
129 key: Result<impl Iterator<Item = Datum<'a>> + 'b, &UpsertError>,
130 ) -> Self {
131 thread_local! {
132 static KEY_DATUMS: RefCell<DatumVec> = RefCell::new(DatumVec::new());
134 }
135 KEY_DATUMS.with(|key_datums| {
136 let mut key_datums = key_datums.borrow_mut();
137 let mut key_datums = key_datums.borrow();
140 let key: Result<&[Datum], Datum> = match key {
141 Ok(key) => {
142 for datum in key {
143 key_datums.push(datum);
144 }
145 Ok(&*key_datums)
146 }
147 Err(UpsertError::Value(err)) => {
148 key_datums.extend(err.for_key.iter());
149 Ok(&*key_datums)
150 }
151 Err(UpsertError::KeyDecode(err)) => Err(Datum::Bytes(&err.raw)),
152 Err(UpsertError::NullKey(_)) => Err(Datum::Null),
153 };
154 let mut hasher = DigestHasher(KeyHash::new());
155 key.hash(&mut hasher);
156 Self(hasher.0.finalize().into())
157 })
158 }
159}
160
161struct DigestHasher<H: Digest>(H);
162
163impl<H: Digest> Hasher for DigestHasher<H> {
164 fn write(&mut self, bytes: &[u8]) {
165 self.0.update(bytes);
166 }
167
168 fn finish(&self) -> u64 {
169 panic!("digest wrapper used to produce a hash");
170 }
171}
172
173use std::convert::Infallible;
174use timely::container::CapacityContainerBuilder;
175use timely::dataflow::channels::pact::Pipeline;
176
177use self::types::ValueMetadata;
178
179pub fn rehydration_finished<G, T>(
183 scope: G,
184 source_config: &crate::source::RawSourceCreationConfig,
185 token: impl std::any::Any + 'static,
187 resume_upper: Antichain<T>,
188 input: &Stream<G, Infallible>,
189) where
190 G: Scope<Timestamp = T>,
191 T: Timestamp,
192{
193 let worker_id = source_config.worker_id;
194 let id = source_config.id;
195 let mut builder = AsyncOperatorBuilder::new(format!("rehydration_finished({id}"), scope);
196 let mut input = builder.new_disconnected_input(input, Pipeline);
197
198 builder.build(move |_capabilities| async move {
199 let mut input_upper = Antichain::from_elem(Timestamp::minimum());
200 while !PartialOrder::less_equal(&resume_upper, &input_upper) {
202 let Some(event) = input.next().await else {
203 break;
204 };
205 if let AsyncEvent::Progress(upper) = event {
206 input_upper = upper;
207 }
208 }
209 tracing::info!(
210 %worker_id,
211 source_id = %id,
212 "upsert source has downgraded past the resume upper ({resume_upper:?}) across all workers",
213 );
214 drop(token);
215 });
216}
217
218pub(crate) fn upsert<G: Scope, FromTime>(
224 input: &VecCollection<G, (UpsertKey, Option<UpsertValue>, FromTime), Diff>,
225 upsert_envelope: UpsertEnvelope,
226 resume_upper: Antichain<G::Timestamp>,
227 previous: VecCollection<G, Result<Row, DataflowError>, Diff>,
228 previous_token: Option<Vec<PressOnDropButton>>,
229 source_config: crate::source::SourceExportCreationConfig,
230 instance_context: &StorageInstanceContext,
231 storage_configuration: &StorageConfiguration,
232 dataflow_paramters: &crate::internal_control::DataflowParameters,
233 backpressure_metrics: Option<BackpressureMetrics>,
234) -> (
235 VecCollection<G, Result<Row, DataflowError>, Diff>,
236 Stream<G, (Option<GlobalId>, HealthStatusUpdate)>,
237 Stream<G, Infallible>,
238 PressOnDropButton,
239)
240where
241 G::Timestamp: TotalOrder + Sync,
242 G::Timestamp: Refines<mz_repr::Timestamp> + TotalOrder + Sync,
243 FromTime: Timestamp + Sync,
244{
245 let upsert_metrics = source_config.metrics.get_upsert_metrics(
246 source_config.id,
247 source_config.worker_id,
248 backpressure_metrics,
249 );
250
251 let rocksdb_cleanup_tries =
252 dyncfgs::STORAGE_ROCKSDB_CLEANUP_TRIES.get(storage_configuration.config_set());
253
254 let prevent_snapshot_buffering =
257 dyncfgs::STORAGE_UPSERT_PREVENT_SNAPSHOT_BUFFERING.get(storage_configuration.config_set());
258 let snapshot_buffering_max = dyncfgs::STORAGE_UPSERT_MAX_SNAPSHOT_BATCH_BUFFERING
260 .get(storage_configuration.config_set());
261
262 let rocksdb_use_native_merge_operator =
265 dyncfgs::STORAGE_ROCKSDB_USE_MERGE_OPERATOR.get(storage_configuration.config_set());
266
267 let upsert_config = UpsertConfig {
268 shrink_upsert_unused_buffers_by_ratio: storage_configuration
269 .parameters
270 .shrink_upsert_unused_buffers_by_ratio,
271 };
272
273 let thin_input = upsert_thinning(input);
274
275 let tuning = dataflow_paramters.upsert_rocksdb_tuning_config.clone();
276
277 let rocksdb_dir = instance_context
282 .scratch_directory
283 .clone()
284 .unwrap_or_else(|| PathBuf::from("/tmp"))
285 .join("storage")
286 .join("upsert")
287 .join(source_config.id.to_string())
288 .join(source_config.worker_id.to_string());
289
290 tracing::info!(
291 worker_id = %source_config.worker_id,
292 source_id = %source_config.id,
293 ?rocksdb_dir,
294 ?tuning,
295 ?rocksdb_use_native_merge_operator,
296 "rendering upsert source"
297 );
298
299 let rocksdb_shared_metrics = Arc::clone(&upsert_metrics.rocksdb_shared);
300 let rocksdb_instance_metrics = Arc::clone(&upsert_metrics.rocksdb_instance_metrics);
301
302 let env = instance_context.rocksdb_env.clone();
303
304 let rocksdb_init_fn = move || async move {
306 let merge_operator = if rocksdb_use_native_merge_operator {
307 Some((
308 "upsert_state_snapshot_merge_v1".to_string(),
309 |a: &[u8], b: ValueIterator<BincodeOpts, StateValue<G::Timestamp, FromTime>>| {
310 consolidating_merge_function::<G::Timestamp, FromTime>(a.into(), b)
311 },
312 ))
313 } else {
314 None
315 };
316 rocksdb::RocksDB::new(
317 mz_rocksdb::RocksDBInstance::new(
318 &rocksdb_dir,
319 mz_rocksdb::InstanceOptions::new(
320 env,
321 rocksdb_cleanup_tries,
322 merge_operator,
323 upsert_bincode_opts(),
326 ),
327 tuning,
328 rocksdb_shared_metrics,
329 rocksdb_instance_metrics,
330 )
331 .unwrap(),
332 )
333 };
334
335 upsert_operator(
336 &thin_input,
337 upsert_envelope.key_indices,
338 resume_upper,
339 previous,
340 previous_token,
341 upsert_metrics,
342 source_config,
343 rocksdb_init_fn,
344 upsert_config,
345 storage_configuration,
346 prevent_snapshot_buffering,
347 snapshot_buffering_max,
348 )
349}
350
351fn upsert_operator<G: Scope, FromTime, F, Fut, US>(
354 input: &VecCollection<G, (UpsertKey, Option<UpsertValue>, FromTime), Diff>,
355 key_indices: Vec<usize>,
356 resume_upper: Antichain<G::Timestamp>,
357 persist_input: VecCollection<G, Result<Row, DataflowError>, Diff>,
358 persist_token: Option<Vec<PressOnDropButton>>,
359 upsert_metrics: UpsertMetrics,
360 source_config: crate::source::SourceExportCreationConfig,
361 state: F,
362 upsert_config: UpsertConfig,
363 _storage_configuration: &StorageConfiguration,
364 prevent_snapshot_buffering: bool,
365 snapshot_buffering_max: Option<usize>,
366) -> (
367 VecCollection<G, Result<Row, DataflowError>, Diff>,
368 Stream<G, (Option<GlobalId>, HealthStatusUpdate)>,
369 Stream<G, Infallible>,
370 PressOnDropButton,
371)
372where
373 G::Timestamp: TotalOrder + Sync,
374 G::Timestamp: Refines<mz_repr::Timestamp> + TotalOrder + Sync,
375 F: FnOnce() -> Fut + 'static,
376 Fut: std::future::Future<Output = US>,
377 US: UpsertStateBackend<G::Timestamp, FromTime>,
378 FromTime: Debug + timely::ExchangeData + Ord + Sync,
379{
380 let use_continual_feedback_upsert = true;
384
385 tracing::info!(id = %source_config.id, %use_continual_feedback_upsert, "upsert operator implementation");
386
387 if use_continual_feedback_upsert {
388 upsert_continual_feedback::upsert_inner(
389 input,
390 key_indices,
391 resume_upper,
392 persist_input,
393 persist_token,
394 upsert_metrics,
395 source_config,
396 state,
397 upsert_config,
398 prevent_snapshot_buffering,
399 snapshot_buffering_max,
400 )
401 } else {
402 upsert_classic(
403 input,
404 key_indices,
405 resume_upper,
406 persist_input,
407 persist_token,
408 upsert_metrics,
409 source_config,
410 state,
411 upsert_config,
412 prevent_snapshot_buffering,
413 snapshot_buffering_max,
414 )
415 }
416}
417
418fn upsert_thinning<G, K, V, FromTime>(
423 input: &VecCollection<G, (K, V, FromTime), Diff>,
424) -> VecCollection<G, (K, V, FromTime), Diff>
425where
426 G: Scope,
427 G::Timestamp: TotalOrder,
428 K: timely::Data + Eq + Ord,
429 V: timely::Data,
430 FromTime: Timestamp,
431{
432 input
433 .inner
434 .unary(Pipeline, "UpsertThinning", |_, _| {
435 let mut capability: Option<InputCapability<G::Timestamp>> = None;
437 let mut updates = Vec::new();
439 move |input, output| {
440 input.for_each(|cap, data| {
441 assert!(
442 data.iter().all(|(_, _, diff)| diff.is_positive()),
443 "invalid upsert input"
444 );
445 updates.append(data);
446 match capability.as_mut() {
447 Some(capability) => {
448 if cap.time() <= capability.time() {
449 *capability = cap;
450 }
451 }
452 None => capability = Some(cap),
453 }
454 });
455 if let Some(capability) = capability.take() {
456 updates.sort_unstable_by(|a, b| {
459 let ((key1, _, from_time1), time1, _) = a;
460 let ((key2, _, from_time2), time2, _) = b;
461 Ord::cmp(
462 &(key1, time1, Reverse(from_time1)),
463 &(key2, time2, Reverse(from_time2)),
464 )
465 });
466 let mut session = output.session(&capability);
467 session.give_iterator(updates.drain(..).dedup_by(|a, b| {
468 let ((key1, _, _), time1, _) = a;
469 let ((key2, _, _), time2, _) = b;
470 (key1, time1) == (key2, time2)
471 }))
472 }
473 }
474 })
475 .as_collection()
476}
477
478fn stage_input<T, FromTime>(
481 stash: &mut Vec<(T, UpsertKey, Reverse<FromTime>, Option<UpsertValue>)>,
482 data: &mut Vec<((UpsertKey, Option<UpsertValue>, FromTime), T, Diff)>,
483 input_upper: &Antichain<T>,
484 resume_upper: &Antichain<T>,
485 storage_shrink_upsert_unused_buffers_by_ratio: usize,
486) where
487 T: PartialOrder,
488 FromTime: Ord,
489{
490 if PartialOrder::less_equal(input_upper, resume_upper) {
491 data.retain(|(_, ts, _)| resume_upper.less_equal(ts));
492 }
493
494 stash.extend(data.drain(..).map(|((key, value, order), time, diff)| {
495 assert!(diff.is_positive(), "invalid upsert input");
496 (time, key, Reverse(order), value)
497 }));
498
499 if storage_shrink_upsert_unused_buffers_by_ratio > 0 {
500 let reduced_capacity = stash.capacity() / storage_shrink_upsert_unused_buffers_by_ratio;
501 if reduced_capacity > stash.len() {
502 stash.shrink_to(reduced_capacity);
503 }
504 }
505}
506
507#[derive(Debug)]
510enum DrainStyle<'a, T> {
511 ToUpper(&'a Antichain<T>),
512 AtTime(T),
513}
514
515async fn drain_staged_input<S, G, T, FromTime, E>(
518 stash: &mut Vec<(T, UpsertKey, Reverse<FromTime>, Option<UpsertValue>)>,
519 commands_state: &mut indexmap::IndexMap<UpsertKey, types::UpsertValueAndSize<T, FromTime>>,
520 output_updates: &mut Vec<(UpsertValue, T, Diff)>,
521 multi_get_scratch: &mut Vec<UpsertKey>,
522 drain_style: DrainStyle<'_, T>,
523 error_emitter: &mut E,
524 state: &mut UpsertState<'_, S, T, FromTime>,
525 source_config: &crate::source::SourceExportCreationConfig,
526) where
527 S: UpsertStateBackend<T, FromTime>,
528 G: Scope,
529 T: PartialOrder + Ord + Clone + Send + Sync + Serialize + Debug + 'static,
530 FromTime: timely::ExchangeData + Ord + Sync,
531 E: UpsertErrorEmitter<G>,
532{
533 stash.sort_unstable();
534
535 let idx = stash.partition_point(|(ts, _, _, _)| match &drain_style {
537 DrainStyle::ToUpper(upper) => !upper.less_equal(ts),
538 DrainStyle::AtTime(time) => ts <= time,
539 });
540
541 tracing::trace!(?drain_style, updates = idx, "draining stash in upsert");
542
543 commands_state.clear();
546 for (_, key, _, _) in stash.iter().take(idx) {
547 commands_state.entry(*key).or_default();
548 }
549
550 multi_get_scratch.clear();
553 multi_get_scratch.extend(commands_state.iter().map(|(k, _)| *k));
554 match state
555 .multi_get(multi_get_scratch.drain(..), commands_state.values_mut())
556 .await
557 {
558 Ok(_) => {}
559 Err(e) => {
560 error_emitter
561 .emit("Failed to fetch records from state".to_string(), e)
562 .await;
563 }
564 }
565
566 let mut commands = stash.drain(..idx).dedup_by(|a, b| {
570 let ((a_ts, a_key, _, _), (b_ts, b_key, _, _)) = (a, b);
571 a_ts == b_ts && a_key == b_key
572 });
573
574 let bincode_opts = types::upsert_bincode_opts();
575 while let Some((ts, key, from_time, value)) = commands.next() {
588 let mut command_state = if let Entry::Occupied(command_state) = commands_state.entry(key) {
589 command_state
590 } else {
591 panic!("key missing from commands_state");
592 };
593
594 let existing_value = &mut command_state.get_mut().value;
595
596 if let Some(cs) = existing_value.as_mut() {
597 cs.ensure_decoded(bincode_opts, source_config.id);
598 }
599
600 let existing_order = existing_value
604 .as_ref()
605 .and_then(|cs| cs.provisional_order(&ts));
606 if existing_order >= Some(&from_time.0) {
607 continue;
612 }
613
614 match value {
615 Some(value) => {
616 if let Some(old_value) =
617 existing_value.replace(StateValue::finalized_value(value.clone()))
618 {
619 if let Some(old_value) = old_value.into_decoded().finalized {
620 output_updates.push((old_value, ts.clone(), Diff::MINUS_ONE));
621 }
622 }
623 output_updates.push((value, ts, Diff::ONE));
624 }
625 None => {
626 if let Some(old_value) = existing_value.take() {
627 if let Some(old_value) = old_value.into_decoded().finalized {
628 output_updates.push((old_value, ts, Diff::MINUS_ONE));
629 }
630 }
631
632 *existing_value = Some(StateValue::tombstone());
634 }
635 }
636 }
637
638 match state
639 .multi_put(
640 true, commands_state.drain(..).map(|(k, cv)| {
642 (
643 k,
644 types::PutValue {
645 value: cv.value.map(|cv| cv.into_decoded()),
646 previous_value_metadata: cv.metadata.map(|v| ValueMetadata {
647 size: v.size.try_into().expect("less than i64 size"),
648 is_tombstone: v.is_tombstone,
649 }),
650 },
651 )
652 }),
653 )
654 .await
655 {
656 Ok(_) => {}
657 Err(e) => {
658 error_emitter
659 .emit("Failed to update records in state".to_string(), e)
660 .await;
661 }
662 }
663}
664
665pub(crate) struct UpsertConfig {
668 pub shrink_upsert_unused_buffers_by_ratio: usize,
669}
670
671fn upsert_classic<G: Scope, FromTime, F, Fut, US>(
672 input: &VecCollection<G, (UpsertKey, Option<UpsertValue>, FromTime), Diff>,
673 key_indices: Vec<usize>,
674 resume_upper: Antichain<G::Timestamp>,
675 previous: VecCollection<G, Result<Row, DataflowError>, Diff>,
676 previous_token: Option<Vec<PressOnDropButton>>,
677 upsert_metrics: UpsertMetrics,
678 source_config: crate::source::SourceExportCreationConfig,
679 state: F,
680 upsert_config: UpsertConfig,
681 prevent_snapshot_buffering: bool,
682 snapshot_buffering_max: Option<usize>,
683) -> (
684 VecCollection<G, Result<Row, DataflowError>, Diff>,
685 Stream<G, (Option<GlobalId>, HealthStatusUpdate)>,
686 Stream<G, Infallible>,
687 PressOnDropButton,
688)
689where
690 G::Timestamp: TotalOrder + Sync,
691 F: FnOnce() -> Fut + 'static,
692 Fut: std::future::Future<Output = US>,
693 US: UpsertStateBackend<G::Timestamp, FromTime>,
694 FromTime: timely::ExchangeData + Ord + Sync,
695{
696 let mut builder = AsyncOperatorBuilder::new("Upsert".to_string(), input.scope());
697
698 let previous = previous.flat_map(move |result| {
700 let value = match result {
701 Ok(ok) => Ok(ok),
702 Err(DataflowError::EnvelopeError(err)) => match *err {
703 EnvelopeError::Upsert(err) => Err(Box::new(err)),
704 _ => return None,
705 },
706 Err(_) => return None,
707 };
708 let value_ref = match value {
709 Ok(ref row) => Ok(row),
710 Err(ref err) => Err(&**err),
711 };
712 Some((UpsertKey::from_value(value_ref, &key_indices), value))
713 });
714 let (output_handle, output) = builder.new_output();
715
716 let (_snapshot_handle, snapshot_stream) =
719 builder.new_output::<CapacityContainerBuilder<Vec<Infallible>>>();
720
721 let (mut health_output, health_stream) = builder.new_output();
722 let mut input = builder.new_input_for(
723 &input.inner,
724 Exchange::new(move |((key, _, _), _, _)| UpsertKey::hashed(key)),
725 &output_handle,
726 );
727
728 let mut previous = builder.new_input_for(
729 &previous.inner,
730 Exchange::new(|((key, _), _, _)| UpsertKey::hashed(key)),
731 &output_handle,
732 );
733
734 let upsert_shared_metrics = Arc::clone(&upsert_metrics.shared);
735 let shutdown_button = builder.build(move |caps| async move {
736 let [mut output_cap, mut snapshot_cap, health_cap]: [_; 3] = caps.try_into().unwrap();
737
738 let mut state = UpsertState::<_, _, FromTime>::new(
739 state().await,
740 upsert_shared_metrics,
741 &upsert_metrics,
742 source_config.source_statistics.clone(),
743 upsert_config.shrink_upsert_unused_buffers_by_ratio,
744 );
745 let mut events = vec![];
746 let mut snapshot_upper = Antichain::from_elem(Timestamp::minimum());
747
748 let mut stash = vec![];
749
750 let mut error_emitter = (&mut health_output, &health_cap);
751
752 tracing::info!(
753 ?resume_upper,
754 ?snapshot_upper,
755 "timely-{} upsert source {} starting rehydration",
756 source_config.worker_id,
757 source_config.id
758 );
759 while !PartialOrder::less_equal(&resume_upper, &snapshot_upper) {
762 previous.ready().await;
763 while let Some(event) = previous.next_sync() {
764 match event {
765 AsyncEvent::Data(_cap, data) => {
766 events.extend(data.into_iter().filter_map(|((key, value), ts, diff)| {
767 if !resume_upper.less_equal(&ts) {
768 Some((key, value, diff))
769 } else {
770 None
771 }
772 }))
773 }
774 AsyncEvent::Progress(upper) => {
775 snapshot_upper = upper;
776 }
777 };
778 }
779
780 match state
781 .consolidate_chunk(
782 events.drain(..),
783 PartialOrder::less_equal(&resume_upper, &snapshot_upper),
784 )
785 .await
786 {
787 Ok(_) => {
788 if let Some(ts) = snapshot_upper.clone().into_option() {
789 if !resume_upper.less_equal(&ts) {
793 snapshot_cap.downgrade(&ts);
794 output_cap.downgrade(&ts);
795 }
796 }
797 }
798 Err(e) => {
799 UpsertErrorEmitter::<G>::emit(
800 &mut error_emitter,
801 "Failed to rehydrate state".to_string(),
802 e,
803 )
804 .await;
805 }
806 }
807 }
808
809 drop(events);
810 drop(previous_token);
811 drop(snapshot_cap);
812
813 while let Some(_event) = previous.next().await {}
819
820 if let Some(ts) = resume_upper.as_option() {
822 output_cap.downgrade(ts);
823 }
824
825 tracing::info!(
826 "timely-{} upsert source {} finished rehydration",
827 source_config.worker_id,
828 source_config.id
829 );
830
831 let mut commands_state: indexmap::IndexMap<
834 _,
835 types::UpsertValueAndSize<G::Timestamp, FromTime>,
836 > = indexmap::IndexMap::new();
837 let mut multi_get_scratch = Vec::new();
838
839 let mut output_updates = vec![];
841 let mut input_upper = Antichain::from_elem(Timestamp::minimum());
842
843 while let Some(event) = input.next().await {
844 let events = [event]
847 .into_iter()
848 .chain(std::iter::from_fn(|| input.next().now_or_never().flatten()))
849 .enumerate();
850
851 let mut partial_drain_time = None;
852 for (i, event) in events {
853 match event {
854 AsyncEvent::Data(cap, mut data) => {
855 tracing::trace!(
856 time=?cap.time(),
857 updates=%data.len(),
858 "received data in upsert"
859 );
860 stage_input(
861 &mut stash,
862 &mut data,
863 &input_upper,
864 &resume_upper,
865 upsert_config.shrink_upsert_unused_buffers_by_ratio,
866 );
867
868 let event_time = cap.time();
869 if prevent_snapshot_buffering && output_cap.time() == event_time {
876 partial_drain_time = Some(event_time.clone());
877 }
878 }
879 AsyncEvent::Progress(upper) => {
880 tracing::trace!(?upper, "received progress in upsert");
881 if PartialOrder::less_than(&upper, &resume_upper) {
884 continue;
885 }
886
887 partial_drain_time = None;
890 drain_staged_input::<_, G, _, _, _>(
891 &mut stash,
892 &mut commands_state,
893 &mut output_updates,
894 &mut multi_get_scratch,
895 DrainStyle::ToUpper(&upper),
896 &mut error_emitter,
897 &mut state,
898 &source_config,
899 )
900 .await;
901
902 output_handle.give_container(&output_cap, &mut output_updates);
903
904 if let Some(ts) = upper.as_option() {
905 output_cap.downgrade(ts);
906 }
907 input_upper = upper;
908 }
909 }
910 let events_processed = i + 1;
911 if let Some(max) = snapshot_buffering_max {
912 if events_processed >= max {
913 break;
914 }
915 }
916 }
917
918 if let Some(partial_drain_time) = partial_drain_time {
927 drain_staged_input::<_, G, _, _, _>(
928 &mut stash,
929 &mut commands_state,
930 &mut output_updates,
931 &mut multi_get_scratch,
932 DrainStyle::AtTime(partial_drain_time),
933 &mut error_emitter,
934 &mut state,
935 &source_config,
936 )
937 .await;
938
939 output_handle.give_container(&output_cap, &mut output_updates);
940 }
941 }
942 });
943
944 (
945 output.as_collection().map(|result| match result {
946 Ok(ok) => Ok(ok),
947 Err(err) => Err(DataflowError::from(EnvelopeError::Upsert(*err))),
948 }),
949 health_stream,
950 snapshot_stream,
951 shutdown_button.press_on_drop(),
952 )
953}
954
955#[async_trait::async_trait(?Send)]
956pub(crate) trait UpsertErrorEmitter<G> {
957 async fn emit(&mut self, context: String, e: anyhow::Error);
958}
959
960#[async_trait::async_trait(?Send)]
961impl<G: Scope> UpsertErrorEmitter<G>
962 for (
963 &mut AsyncOutputHandle<
964 <G as ScopeParent>::Timestamp,
965 CapacityContainerBuilder<Vec<(Option<GlobalId>, HealthStatusUpdate)>>,
966 >,
967 &Capability<<G as ScopeParent>::Timestamp>,
968 )
969{
970 async fn emit(&mut self, context: String, e: anyhow::Error) {
971 process_upsert_state_error::<G>(context, e, self.0, self.1).await
972 }
973}
974
975async fn process_upsert_state_error<G: Scope>(
977 context: String,
978 e: anyhow::Error,
979 health_output: &AsyncOutputHandle<
980 <G as ScopeParent>::Timestamp,
981 CapacityContainerBuilder<Vec<(Option<GlobalId>, HealthStatusUpdate)>>,
982 >,
983 health_cap: &Capability<<G as ScopeParent>::Timestamp>,
984) {
985 let update = HealthStatusUpdate::halting(e.context(context).to_string_with_causes(), None);
986 health_output.give(health_cap, (None, update));
987 std::future::pending::<()>().await;
988 unreachable!("pending future never returns");
989}