1use std::cell::RefCell;
11use std::cmp::Reverse;
12use std::convert::AsRef;
13use std::fmt::Debug;
14use std::hash::{Hash, Hasher};
15use std::path::PathBuf;
16use std::sync::Arc;
17
18use differential_dataflow::hashable::Hashable;
19use differential_dataflow::{AsCollection, VecCollection};
20use futures::StreamExt;
21use futures::future::FutureExt;
22use indexmap::map::Entry;
23use itertools::Itertools;
24use mz_ore::error::ErrorExt;
25use mz_repr::{Datum, DatumVec, Diff, GlobalId, Row};
26use mz_rocksdb::ValueIterator;
27use mz_storage_operators::metrics::BackpressureMetrics;
28use mz_storage_types::configuration::StorageConfiguration;
29use mz_storage_types::dyncfgs;
30use mz_storage_types::errors::{DataflowError, EnvelopeError, UpsertError};
31use mz_storage_types::sources::envelope::UpsertEnvelope;
32use mz_timely_util::builder_async::{
33 AsyncOutputHandle, Event as AsyncEvent, OperatorBuilder as AsyncOperatorBuilder,
34 PressOnDropButton,
35};
36use serde::{Deserialize, Serialize};
37use sha2::{Digest, Sha256};
38use timely::dataflow::channels::pact::Exchange;
39use timely::dataflow::operators::{Capability, InputCapability, Operator};
40use timely::dataflow::{Scope, ScopeParent, Stream};
41use timely::order::{PartialOrder, TotalOrder};
42use timely::progress::timestamp::Refines;
43use timely::progress::{Antichain, Timestamp};
44
45use crate::healthcheck::HealthStatusUpdate;
46use crate::metrics::upsert::UpsertMetrics;
47use crate::storage_state::StorageInstanceContext;
48use crate::upsert_continual_feedback;
49use types::{
50 BincodeOpts, StateValue, UpsertState, UpsertStateBackend, consolidating_merge_function,
51 upsert_bincode_opts,
52};
53
54#[cfg(test)]
55pub mod memory;
56pub(crate) mod rocksdb;
57pub(crate) mod types;
59
60pub type UpsertValue = Result<Row, Box<UpsertError>>;
61
62#[derive(Copy, Clone, Hash, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
63pub struct UpsertKey([u8; 32]);
64
65impl Debug for UpsertKey {
66 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
67 write!(f, "0x")?;
68 for byte in self.0 {
69 write!(f, "{:02x}", byte)?;
70 }
71 Ok(())
72 }
73}
74
75impl AsRef<[u8]> for UpsertKey {
76 #[inline(always)]
77 fn as_ref(&self) -> &[u8] {
81 &self.0
82 }
83}
84
85impl From<&[u8]> for UpsertKey {
86 fn from(bytes: &[u8]) -> Self {
87 UpsertKey(bytes.try_into().expect("invalid key length"))
88 }
89}
90
91type KeyHash = Sha256;
96
97impl UpsertKey {
98 pub fn from_key(key: Result<&Row, &UpsertError>) -> Self {
99 Self::from_iter(key.map(|r| r.iter()))
100 }
101
102 pub fn from_value(value: Result<&Row, &UpsertError>, key_indices: &[usize]) -> Self {
103 thread_local! {
104 static VALUE_DATUMS: RefCell<DatumVec> = RefCell::new(DatumVec::new());
106 }
107 VALUE_DATUMS.with(|value_datums| {
108 let mut value_datums = value_datums.borrow_mut();
109 let value = value.map(|v| value_datums.borrow_with(v));
110 let key = match value {
111 Ok(ref datums) => Ok(key_indices.iter().map(|&idx| datums[idx])),
112 Err(err) => Err(err),
113 };
114 Self::from_iter(key)
115 })
116 }
117
118 pub fn from_iter<'a, 'b>(
119 key: Result<impl Iterator<Item = Datum<'a>> + 'b, &UpsertError>,
120 ) -> Self {
121 thread_local! {
122 static KEY_DATUMS: RefCell<DatumVec> = RefCell::new(DatumVec::new());
124 }
125 KEY_DATUMS.with(|key_datums| {
126 let mut key_datums = key_datums.borrow_mut();
127 let mut key_datums = key_datums.borrow();
130 let key: Result<&[Datum], Datum> = match key {
131 Ok(key) => {
132 for datum in key {
133 key_datums.push(datum);
134 }
135 Ok(&*key_datums)
136 }
137 Err(UpsertError::Value(err)) => {
138 key_datums.extend(err.for_key.iter());
139 Ok(&*key_datums)
140 }
141 Err(UpsertError::KeyDecode(err)) => Err(Datum::Bytes(&err.raw)),
142 Err(UpsertError::NullKey(_)) => Err(Datum::Null),
143 };
144 let mut hasher = DigestHasher(KeyHash::new());
145 key.hash(&mut hasher);
146 Self(hasher.0.finalize().into())
147 })
148 }
149}
150
151struct DigestHasher<H: Digest>(H);
152
153impl<H: Digest> Hasher for DigestHasher<H> {
154 fn write(&mut self, bytes: &[u8]) {
155 self.0.update(bytes);
156 }
157
158 fn finish(&self) -> u64 {
159 panic!("digest wrapper used to produce a hash");
160 }
161}
162
163use std::convert::Infallible;
164use timely::container::CapacityContainerBuilder;
165use timely::dataflow::channels::pact::Pipeline;
166
167use self::types::ValueMetadata;
168
169pub fn rehydration_finished<G, T>(
173 scope: G,
174 source_config: &crate::source::RawSourceCreationConfig,
175 token: impl std::any::Any + 'static,
177 resume_upper: Antichain<T>,
178 input: &Stream<G, Infallible>,
179) where
180 G: Scope<Timestamp = T>,
181 T: Timestamp,
182{
183 let worker_id = source_config.worker_id;
184 let id = source_config.id;
185 let mut builder = AsyncOperatorBuilder::new(format!("rehydration_finished({id}"), scope);
186 let mut input = builder.new_disconnected_input(input, Pipeline);
187
188 builder.build(move |_capabilities| async move {
189 let mut input_upper = Antichain::from_elem(Timestamp::minimum());
190 while !PartialOrder::less_equal(&resume_upper, &input_upper) {
192 let Some(event) = input.next().await else {
193 break;
194 };
195 if let AsyncEvent::Progress(upper) = event {
196 input_upper = upper;
197 }
198 }
199 tracing::info!(
200 %worker_id,
201 source_id = %id,
202 "upsert source has downgraded past the resume upper ({resume_upper:?}) across all workers",
203 );
204 drop(token);
205 });
206}
207
208pub(crate) fn upsert<G: Scope, FromTime>(
214 input: &VecCollection<G, (UpsertKey, Option<UpsertValue>, FromTime), Diff>,
215 upsert_envelope: UpsertEnvelope,
216 resume_upper: Antichain<G::Timestamp>,
217 previous: VecCollection<G, Result<Row, DataflowError>, Diff>,
218 previous_token: Option<Vec<PressOnDropButton>>,
219 source_config: crate::source::SourceExportCreationConfig,
220 instance_context: &StorageInstanceContext,
221 storage_configuration: &StorageConfiguration,
222 dataflow_paramters: &crate::internal_control::DataflowParameters,
223 backpressure_metrics: Option<BackpressureMetrics>,
224) -> (
225 VecCollection<G, Result<Row, DataflowError>, Diff>,
226 Stream<G, (Option<GlobalId>, HealthStatusUpdate)>,
227 Stream<G, Infallible>,
228 PressOnDropButton,
229)
230where
231 G::Timestamp: TotalOrder + Sync,
232 G::Timestamp: Refines<mz_repr::Timestamp> + TotalOrder + Sync,
233 FromTime: Timestamp + Sync,
234{
235 let upsert_metrics = source_config.metrics.get_upsert_metrics(
236 source_config.id,
237 source_config.worker_id,
238 backpressure_metrics,
239 );
240
241 let rocksdb_cleanup_tries =
242 dyncfgs::STORAGE_ROCKSDB_CLEANUP_TRIES.get(storage_configuration.config_set());
243
244 let prevent_snapshot_buffering =
247 dyncfgs::STORAGE_UPSERT_PREVENT_SNAPSHOT_BUFFERING.get(storage_configuration.config_set());
248 let snapshot_buffering_max = dyncfgs::STORAGE_UPSERT_MAX_SNAPSHOT_BATCH_BUFFERING
250 .get(storage_configuration.config_set());
251
252 let rocksdb_use_native_merge_operator =
255 dyncfgs::STORAGE_ROCKSDB_USE_MERGE_OPERATOR.get(storage_configuration.config_set());
256
257 let upsert_config = UpsertConfig {
258 shrink_upsert_unused_buffers_by_ratio: storage_configuration
259 .parameters
260 .shrink_upsert_unused_buffers_by_ratio,
261 };
262
263 let thin_input = upsert_thinning(input);
264
265 let tuning = dataflow_paramters.upsert_rocksdb_tuning_config.clone();
266
267 let rocksdb_dir = instance_context
272 .scratch_directory
273 .clone()
274 .unwrap_or_else(|| PathBuf::from("/tmp"))
275 .join("storage")
276 .join("upsert")
277 .join(source_config.id.to_string())
278 .join(source_config.worker_id.to_string());
279
280 tracing::info!(
281 worker_id = %source_config.worker_id,
282 source_id = %source_config.id,
283 ?rocksdb_dir,
284 ?tuning,
285 ?rocksdb_use_native_merge_operator,
286 "rendering upsert source"
287 );
288
289 let rocksdb_shared_metrics = Arc::clone(&upsert_metrics.rocksdb_shared);
290 let rocksdb_instance_metrics = Arc::clone(&upsert_metrics.rocksdb_instance_metrics);
291
292 let env = instance_context.rocksdb_env.clone();
293
294 let rocksdb_init_fn = move || async move {
296 let merge_operator = if rocksdb_use_native_merge_operator {
297 Some((
298 "upsert_state_snapshot_merge_v1".to_string(),
299 |a: &[u8], b: ValueIterator<BincodeOpts, StateValue<G::Timestamp, FromTime>>| {
300 consolidating_merge_function::<G::Timestamp, FromTime>(a.into(), b)
301 },
302 ))
303 } else {
304 None
305 };
306 rocksdb::RocksDB::new(
307 mz_rocksdb::RocksDBInstance::new(
308 &rocksdb_dir,
309 mz_rocksdb::InstanceOptions::new(
310 env,
311 rocksdb_cleanup_tries,
312 merge_operator,
313 upsert_bincode_opts(),
316 ),
317 tuning,
318 rocksdb_shared_metrics,
319 rocksdb_instance_metrics,
320 )
321 .unwrap(),
322 )
323 };
324
325 upsert_operator(
326 &thin_input,
327 upsert_envelope.key_indices,
328 resume_upper,
329 previous,
330 previous_token,
331 upsert_metrics,
332 source_config,
333 rocksdb_init_fn,
334 upsert_config,
335 storage_configuration,
336 prevent_snapshot_buffering,
337 snapshot_buffering_max,
338 )
339}
340
341fn upsert_operator<G: Scope, FromTime, F, Fut, US>(
344 input: &VecCollection<G, (UpsertKey, Option<UpsertValue>, FromTime), Diff>,
345 key_indices: Vec<usize>,
346 resume_upper: Antichain<G::Timestamp>,
347 persist_input: VecCollection<G, Result<Row, DataflowError>, Diff>,
348 persist_token: Option<Vec<PressOnDropButton>>,
349 upsert_metrics: UpsertMetrics,
350 source_config: crate::source::SourceExportCreationConfig,
351 state: F,
352 upsert_config: UpsertConfig,
353 _storage_configuration: &StorageConfiguration,
354 prevent_snapshot_buffering: bool,
355 snapshot_buffering_max: Option<usize>,
356) -> (
357 VecCollection<G, Result<Row, DataflowError>, Diff>,
358 Stream<G, (Option<GlobalId>, HealthStatusUpdate)>,
359 Stream<G, Infallible>,
360 PressOnDropButton,
361)
362where
363 G::Timestamp: TotalOrder + Sync,
364 G::Timestamp: Refines<mz_repr::Timestamp> + TotalOrder + Sync,
365 F: FnOnce() -> Fut + 'static,
366 Fut: std::future::Future<Output = US>,
367 US: UpsertStateBackend<G::Timestamp, FromTime>,
368 FromTime: Debug + timely::ExchangeData + Ord + Sync,
369{
370 let use_continual_feedback_upsert = true;
374
375 tracing::info!(id = %source_config.id, %use_continual_feedback_upsert, "upsert operator implementation");
376
377 if use_continual_feedback_upsert {
378 upsert_continual_feedback::upsert_inner(
379 input,
380 key_indices,
381 resume_upper,
382 persist_input,
383 persist_token,
384 upsert_metrics,
385 source_config,
386 state,
387 upsert_config,
388 prevent_snapshot_buffering,
389 snapshot_buffering_max,
390 )
391 } else {
392 upsert_classic(
393 input,
394 key_indices,
395 resume_upper,
396 persist_input,
397 persist_token,
398 upsert_metrics,
399 source_config,
400 state,
401 upsert_config,
402 prevent_snapshot_buffering,
403 snapshot_buffering_max,
404 )
405 }
406}
407
408fn upsert_thinning<G, K, V, FromTime>(
413 input: &VecCollection<G, (K, V, FromTime), Diff>,
414) -> VecCollection<G, (K, V, FromTime), Diff>
415where
416 G: Scope,
417 G::Timestamp: TotalOrder,
418 K: timely::Data + Eq + Ord,
419 V: timely::Data,
420 FromTime: Timestamp,
421{
422 input
423 .inner
424 .unary(Pipeline, "UpsertThinning", |_, _| {
425 let mut capability: Option<InputCapability<G::Timestamp>> = None;
427 let mut updates = Vec::new();
429 move |input, output| {
430 input.for_each(|cap, data| {
431 assert!(
432 data.iter().all(|(_, _, diff)| diff.is_positive()),
433 "invalid upsert input"
434 );
435 updates.append(data);
436 match capability.as_mut() {
437 Some(capability) => {
438 if cap.time() <= capability.time() {
439 *capability = cap;
440 }
441 }
442 None => capability = Some(cap),
443 }
444 });
445 if let Some(capability) = capability.take() {
446 updates.sort_unstable_by(|a, b| {
449 let ((key1, _, from_time1), time1, _) = a;
450 let ((key2, _, from_time2), time2, _) = b;
451 Ord::cmp(
452 &(key1, time1, Reverse(from_time1)),
453 &(key2, time2, Reverse(from_time2)),
454 )
455 });
456 let mut session = output.session(&capability);
457 session.give_iterator(updates.drain(..).dedup_by(|a, b| {
458 let ((key1, _, _), time1, _) = a;
459 let ((key2, _, _), time2, _) = b;
460 (key1, time1) == (key2, time2)
461 }))
462 }
463 }
464 })
465 .as_collection()
466}
467
468fn stage_input<T, FromTime>(
471 stash: &mut Vec<(T, UpsertKey, Reverse<FromTime>, Option<UpsertValue>)>,
472 data: &mut Vec<((UpsertKey, Option<UpsertValue>, FromTime), T, Diff)>,
473 input_upper: &Antichain<T>,
474 resume_upper: &Antichain<T>,
475 storage_shrink_upsert_unused_buffers_by_ratio: usize,
476) where
477 T: PartialOrder,
478 FromTime: Ord,
479{
480 if PartialOrder::less_equal(input_upper, resume_upper) {
481 data.retain(|(_, ts, _)| resume_upper.less_equal(ts));
482 }
483
484 stash.extend(data.drain(..).map(|((key, value, order), time, diff)| {
485 assert!(diff.is_positive(), "invalid upsert input");
486 (time, key, Reverse(order), value)
487 }));
488
489 if storage_shrink_upsert_unused_buffers_by_ratio > 0 {
490 let reduced_capacity = stash.capacity() / storage_shrink_upsert_unused_buffers_by_ratio;
491 if reduced_capacity > stash.len() {
492 stash.shrink_to(reduced_capacity);
493 }
494 }
495}
496
497#[derive(Debug)]
500enum DrainStyle<'a, T> {
501 ToUpper(&'a Antichain<T>),
502 AtTime(T),
503}
504
505async fn drain_staged_input<S, G, T, FromTime, E>(
508 stash: &mut Vec<(T, UpsertKey, Reverse<FromTime>, Option<UpsertValue>)>,
509 commands_state: &mut indexmap::IndexMap<UpsertKey, types::UpsertValueAndSize<T, FromTime>>,
510 output_updates: &mut Vec<(UpsertValue, T, Diff)>,
511 multi_get_scratch: &mut Vec<UpsertKey>,
512 drain_style: DrainStyle<'_, T>,
513 error_emitter: &mut E,
514 state: &mut UpsertState<'_, S, T, FromTime>,
515 source_config: &crate::source::SourceExportCreationConfig,
516) where
517 S: UpsertStateBackend<T, FromTime>,
518 G: Scope,
519 T: PartialOrder + Ord + Clone + Send + Sync + Serialize + Debug + 'static,
520 FromTime: timely::ExchangeData + Ord + Sync,
521 E: UpsertErrorEmitter<G>,
522{
523 stash.sort_unstable();
524
525 let idx = stash.partition_point(|(ts, _, _, _)| match &drain_style {
527 DrainStyle::ToUpper(upper) => !upper.less_equal(ts),
528 DrainStyle::AtTime(time) => ts <= time,
529 });
530
531 tracing::trace!(?drain_style, updates = idx, "draining stash in upsert");
532
533 commands_state.clear();
536 for (_, key, _, _) in stash.iter().take(idx) {
537 commands_state.entry(*key).or_default();
538 }
539
540 multi_get_scratch.clear();
543 multi_get_scratch.extend(commands_state.iter().map(|(k, _)| *k));
544 match state
545 .multi_get(multi_get_scratch.drain(..), commands_state.values_mut())
546 .await
547 {
548 Ok(_) => {}
549 Err(e) => {
550 error_emitter
551 .emit("Failed to fetch records from state".to_string(), e)
552 .await;
553 }
554 }
555
556 let mut commands = stash.drain(..idx).dedup_by(|a, b| {
560 let ((a_ts, a_key, _, _), (b_ts, b_key, _, _)) = (a, b);
561 a_ts == b_ts && a_key == b_key
562 });
563
564 let bincode_opts = types::upsert_bincode_opts();
565 while let Some((ts, key, from_time, value)) = commands.next() {
578 let mut command_state = if let Entry::Occupied(command_state) = commands_state.entry(key) {
579 command_state
580 } else {
581 panic!("key missing from commands_state");
582 };
583
584 let existing_value = &mut command_state.get_mut().value;
585
586 if let Some(cs) = existing_value.as_mut() {
587 cs.ensure_decoded(bincode_opts, source_config.id);
588 }
589
590 let existing_order = existing_value
594 .as_ref()
595 .and_then(|cs| cs.provisional_order(&ts));
596 if existing_order >= Some(&from_time.0) {
597 continue;
602 }
603
604 match value {
605 Some(value) => {
606 if let Some(old_value) =
607 existing_value.replace(StateValue::finalized_value(value.clone()))
608 {
609 if let Some(old_value) = old_value.into_decoded().finalized {
610 output_updates.push((old_value, ts.clone(), Diff::MINUS_ONE));
611 }
612 }
613 output_updates.push((value, ts, Diff::ONE));
614 }
615 None => {
616 if let Some(old_value) = existing_value.take() {
617 if let Some(old_value) = old_value.into_decoded().finalized {
618 output_updates.push((old_value, ts, Diff::MINUS_ONE));
619 }
620 }
621
622 *existing_value = Some(StateValue::tombstone());
624 }
625 }
626 }
627
628 match state
629 .multi_put(
630 true, commands_state.drain(..).map(|(k, cv)| {
632 (
633 k,
634 types::PutValue {
635 value: cv.value.map(|cv| cv.into_decoded()),
636 previous_value_metadata: cv.metadata.map(|v| ValueMetadata {
637 size: v.size.try_into().expect("less than i64 size"),
638 is_tombstone: v.is_tombstone,
639 }),
640 },
641 )
642 }),
643 )
644 .await
645 {
646 Ok(_) => {}
647 Err(e) => {
648 error_emitter
649 .emit("Failed to update records in state".to_string(), e)
650 .await;
651 }
652 }
653}
654
655pub(crate) struct UpsertConfig {
658 pub shrink_upsert_unused_buffers_by_ratio: usize,
659}
660
661fn upsert_classic<G: Scope, FromTime, F, Fut, US>(
662 input: &VecCollection<G, (UpsertKey, Option<UpsertValue>, FromTime), Diff>,
663 key_indices: Vec<usize>,
664 resume_upper: Antichain<G::Timestamp>,
665 previous: VecCollection<G, Result<Row, DataflowError>, Diff>,
666 previous_token: Option<Vec<PressOnDropButton>>,
667 upsert_metrics: UpsertMetrics,
668 source_config: crate::source::SourceExportCreationConfig,
669 state: F,
670 upsert_config: UpsertConfig,
671 prevent_snapshot_buffering: bool,
672 snapshot_buffering_max: Option<usize>,
673) -> (
674 VecCollection<G, Result<Row, DataflowError>, Diff>,
675 Stream<G, (Option<GlobalId>, HealthStatusUpdate)>,
676 Stream<G, Infallible>,
677 PressOnDropButton,
678)
679where
680 G::Timestamp: TotalOrder + Sync,
681 F: FnOnce() -> Fut + 'static,
682 Fut: std::future::Future<Output = US>,
683 US: UpsertStateBackend<G::Timestamp, FromTime>,
684 FromTime: timely::ExchangeData + Ord + Sync,
685{
686 let mut builder = AsyncOperatorBuilder::new("Upsert".to_string(), input.scope());
687
688 let previous = previous.flat_map(move |result| {
690 let value = match result {
691 Ok(ok) => Ok(ok),
692 Err(DataflowError::EnvelopeError(err)) => match *err {
693 EnvelopeError::Upsert(err) => Err(Box::new(err)),
694 _ => return None,
695 },
696 Err(_) => return None,
697 };
698 let value_ref = match value {
699 Ok(ref row) => Ok(row),
700 Err(ref err) => Err(&**err),
701 };
702 Some((UpsertKey::from_value(value_ref, &key_indices), value))
703 });
704 let (output_handle, output) = builder.new_output();
705
706 let (_snapshot_handle, snapshot_stream) =
709 builder.new_output::<CapacityContainerBuilder<Vec<Infallible>>>();
710
711 let (mut health_output, health_stream) = builder.new_output();
712 let mut input = builder.new_input_for(
713 &input.inner,
714 Exchange::new(move |((key, _, _), _, _)| UpsertKey::hashed(key)),
715 &output_handle,
716 );
717
718 let mut previous = builder.new_input_for(
719 &previous.inner,
720 Exchange::new(|((key, _), _, _)| UpsertKey::hashed(key)),
721 &output_handle,
722 );
723
724 let upsert_shared_metrics = Arc::clone(&upsert_metrics.shared);
725 let shutdown_button = builder.build(move |caps| async move {
726 let [mut output_cap, mut snapshot_cap, health_cap]: [_; 3] = caps.try_into().unwrap();
727
728 let mut state = UpsertState::<_, _, FromTime>::new(
729 state().await,
730 upsert_shared_metrics,
731 &upsert_metrics,
732 source_config.source_statistics.clone(),
733 upsert_config.shrink_upsert_unused_buffers_by_ratio,
734 );
735 let mut events = vec![];
736 let mut snapshot_upper = Antichain::from_elem(Timestamp::minimum());
737
738 let mut stash = vec![];
739
740 let mut error_emitter = (&mut health_output, &health_cap);
741
742 tracing::info!(
743 ?resume_upper,
744 ?snapshot_upper,
745 "timely-{} upsert source {} starting rehydration",
746 source_config.worker_id,
747 source_config.id
748 );
749 while !PartialOrder::less_equal(&resume_upper, &snapshot_upper) {
752 previous.ready().await;
753 while let Some(event) = previous.next_sync() {
754 match event {
755 AsyncEvent::Data(_cap, data) => {
756 events.extend(data.into_iter().filter_map(|((key, value), ts, diff)| {
757 if !resume_upper.less_equal(&ts) {
758 Some((key, value, diff))
759 } else {
760 None
761 }
762 }))
763 }
764 AsyncEvent::Progress(upper) => {
765 snapshot_upper = upper;
766 }
767 };
768 }
769
770 match state
771 .consolidate_chunk(
772 events.drain(..),
773 PartialOrder::less_equal(&resume_upper, &snapshot_upper),
774 )
775 .await
776 {
777 Ok(_) => {
778 if let Some(ts) = snapshot_upper.clone().into_option() {
779 if !resume_upper.less_equal(&ts) {
783 snapshot_cap.downgrade(&ts);
784 output_cap.downgrade(&ts);
785 }
786 }
787 }
788 Err(e) => {
789 UpsertErrorEmitter::<G>::emit(
790 &mut error_emitter,
791 "Failed to rehydrate state".to_string(),
792 e,
793 )
794 .await;
795 }
796 }
797 }
798
799 drop(events);
800 drop(previous_token);
801 drop(snapshot_cap);
802
803 while let Some(_event) = previous.next().await {}
809
810 if let Some(ts) = resume_upper.as_option() {
812 output_cap.downgrade(ts);
813 }
814
815 tracing::info!(
816 "timely-{} upsert source {} finished rehydration",
817 source_config.worker_id,
818 source_config.id
819 );
820
821 let mut commands_state: indexmap::IndexMap<
824 _,
825 types::UpsertValueAndSize<G::Timestamp, FromTime>,
826 > = indexmap::IndexMap::new();
827 let mut multi_get_scratch = Vec::new();
828
829 let mut output_updates = vec![];
831 let mut input_upper = Antichain::from_elem(Timestamp::minimum());
832
833 while let Some(event) = input.next().await {
834 let events = [event]
837 .into_iter()
838 .chain(std::iter::from_fn(|| input.next().now_or_never().flatten()))
839 .enumerate();
840
841 let mut partial_drain_time = None;
842 for (i, event) in events {
843 match event {
844 AsyncEvent::Data(cap, mut data) => {
845 tracing::trace!(
846 time=?cap.time(),
847 updates=%data.len(),
848 "received data in upsert"
849 );
850 stage_input(
851 &mut stash,
852 &mut data,
853 &input_upper,
854 &resume_upper,
855 upsert_config.shrink_upsert_unused_buffers_by_ratio,
856 );
857
858 let event_time = cap.time();
859 if prevent_snapshot_buffering && output_cap.time() == event_time {
866 partial_drain_time = Some(event_time.clone());
867 }
868 }
869 AsyncEvent::Progress(upper) => {
870 tracing::trace!(?upper, "received progress in upsert");
871 if PartialOrder::less_than(&upper, &resume_upper) {
874 continue;
875 }
876
877 partial_drain_time = None;
880 drain_staged_input::<_, G, _, _, _>(
881 &mut stash,
882 &mut commands_state,
883 &mut output_updates,
884 &mut multi_get_scratch,
885 DrainStyle::ToUpper(&upper),
886 &mut error_emitter,
887 &mut state,
888 &source_config,
889 )
890 .await;
891
892 output_handle.give_container(&output_cap, &mut output_updates);
893
894 if let Some(ts) = upper.as_option() {
895 output_cap.downgrade(ts);
896 }
897 input_upper = upper;
898 }
899 }
900 let events_processed = i + 1;
901 if let Some(max) = snapshot_buffering_max {
902 if events_processed >= max {
903 break;
904 }
905 }
906 }
907
908 if let Some(partial_drain_time) = partial_drain_time {
917 drain_staged_input::<_, G, _, _, _>(
918 &mut stash,
919 &mut commands_state,
920 &mut output_updates,
921 &mut multi_get_scratch,
922 DrainStyle::AtTime(partial_drain_time),
923 &mut error_emitter,
924 &mut state,
925 &source_config,
926 )
927 .await;
928
929 output_handle.give_container(&output_cap, &mut output_updates);
930 }
931 }
932 });
933
934 (
935 output.as_collection().map(|result| match result {
936 Ok(ok) => Ok(ok),
937 Err(err) => Err(DataflowError::from(EnvelopeError::Upsert(*err))),
938 }),
939 health_stream,
940 snapshot_stream,
941 shutdown_button.press_on_drop(),
942 )
943}
944
945#[async_trait::async_trait(?Send)]
946pub(crate) trait UpsertErrorEmitter<G> {
947 async fn emit(&mut self, context: String, e: anyhow::Error);
948}
949
950#[async_trait::async_trait(?Send)]
951impl<G: Scope> UpsertErrorEmitter<G>
952 for (
953 &mut AsyncOutputHandle<
954 <G as ScopeParent>::Timestamp,
955 CapacityContainerBuilder<Vec<(Option<GlobalId>, HealthStatusUpdate)>>,
956 >,
957 &Capability<<G as ScopeParent>::Timestamp>,
958 )
959{
960 async fn emit(&mut self, context: String, e: anyhow::Error) {
961 process_upsert_state_error::<G>(context, e, self.0, self.1).await
962 }
963}
964
965async fn process_upsert_state_error<G: Scope>(
967 context: String,
968 e: anyhow::Error,
969 health_output: &AsyncOutputHandle<
970 <G as ScopeParent>::Timestamp,
971 CapacityContainerBuilder<Vec<(Option<GlobalId>, HealthStatusUpdate)>>,
972 >,
973 health_cap: &Capability<<G as ScopeParent>::Timestamp>,
974) {
975 let update = HealthStatusUpdate::halting(e.context(context).to_string_with_causes(), None);
976 health_output.give(health_cap, (None, update));
977 std::future::pending::<()>().await;
978 unreachable!("pending future never returns");
979}