1use std::cell::RefCell;
11use std::cmp::Reverse;
12use std::convert::AsRef;
13use std::fmt::Debug;
14use std::hash::{Hash, Hasher};
15use std::path::PathBuf;
16use std::sync::Arc;
17
18use differential_dataflow::hashable::Hashable;
19use differential_dataflow::{AsCollection, Collection};
20use futures::StreamExt;
21use futures::future::FutureExt;
22use indexmap::map::Entry;
23use itertools::Itertools;
24use mz_ore::error::ErrorExt;
25use mz_repr::{Datum, DatumVec, Diff, GlobalId, Row};
26use mz_rocksdb::ValueIterator;
27use mz_storage_operators::metrics::BackpressureMetrics;
28use mz_storage_types::configuration::StorageConfiguration;
29use mz_storage_types::dyncfgs;
30use mz_storage_types::errors::{DataflowError, EnvelopeError, UpsertError};
31use mz_storage_types::sources::envelope::UpsertEnvelope;
32use mz_timely_util::builder_async::{
33 AsyncOutputHandle, Event as AsyncEvent, OperatorBuilder as AsyncOperatorBuilder,
34 PressOnDropButton,
35};
36use serde::{Deserialize, Serialize};
37use sha2::{Digest, Sha256};
38use timely::dataflow::channels::pact::Exchange;
39use timely::dataflow::channels::pushers::Tee;
40use timely::dataflow::operators::{Capability, InputCapability, Operator};
41use timely::dataflow::{Scope, ScopeParent, Stream};
42use timely::order::{PartialOrder, TotalOrder};
43use timely::progress::timestamp::Refines;
44use timely::progress::{Antichain, Timestamp};
45
46use crate::healthcheck::HealthStatusUpdate;
47use crate::metrics::upsert::UpsertMetrics;
48use crate::storage_state::StorageInstanceContext;
49use crate::upsert_continual_feedback;
50use types::{
51 BincodeOpts, StateValue, UpsertState, UpsertStateBackend, consolidating_merge_function,
52 upsert_bincode_opts,
53};
54
55#[cfg(test)]
56pub mod memory;
57pub(crate) mod rocksdb;
58pub(crate) mod types;
60
61pub type UpsertValue = Result<Row, Box<UpsertError>>;
62
63#[derive(Copy, Clone, Hash, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
64pub struct UpsertKey([u8; 32]);
65
66impl Debug for UpsertKey {
67 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
68 write!(f, "0x")?;
69 for byte in self.0 {
70 write!(f, "{:02x}", byte)?;
71 }
72 Ok(())
73 }
74}
75
76impl AsRef<[u8]> for UpsertKey {
77 #[inline(always)]
78 fn as_ref(&self) -> &[u8] {
82 &self.0
83 }
84}
85
86impl From<&[u8]> for UpsertKey {
87 fn from(bytes: &[u8]) -> Self {
88 UpsertKey(bytes.try_into().expect("invalid key length"))
89 }
90}
91
92type KeyHash = Sha256;
97
98impl UpsertKey {
99 pub fn from_key(key: Result<&Row, &UpsertError>) -> Self {
100 Self::from_iter(key.map(|r| r.iter()))
101 }
102
103 pub fn from_value(value: Result<&Row, &UpsertError>, key_indices: &[usize]) -> Self {
104 thread_local! {
105 static VALUE_DATUMS: RefCell<DatumVec> = RefCell::new(DatumVec::new());
107 }
108 VALUE_DATUMS.with(|value_datums| {
109 let mut value_datums = value_datums.borrow_mut();
110 let value = value.map(|v| value_datums.borrow_with(v));
111 let key = match value {
112 Ok(ref datums) => Ok(key_indices.iter().map(|&idx| datums[idx])),
113 Err(err) => Err(err),
114 };
115 Self::from_iter(key)
116 })
117 }
118
119 pub fn from_iter<'a, 'b>(
120 key: Result<impl Iterator<Item = Datum<'a>> + 'b, &UpsertError>,
121 ) -> Self {
122 thread_local! {
123 static KEY_DATUMS: RefCell<DatumVec> = RefCell::new(DatumVec::new());
125 }
126 KEY_DATUMS.with(|key_datums| {
127 let mut key_datums = key_datums.borrow_mut();
128 let mut key_datums = key_datums.borrow();
131 let key: Result<&[Datum], Datum> = match key {
132 Ok(key) => {
133 for datum in key {
134 key_datums.push(datum);
135 }
136 Ok(&*key_datums)
137 }
138 Err(UpsertError::Value(err)) => {
139 key_datums.extend(err.for_key.iter());
140 Ok(&*key_datums)
141 }
142 Err(UpsertError::KeyDecode(err)) => Err(Datum::Bytes(&err.raw)),
143 Err(UpsertError::NullKey(_)) => Err(Datum::Null),
144 };
145 let mut hasher = DigestHasher(KeyHash::new());
146 key.hash(&mut hasher);
147 Self(hasher.0.finalize().into())
148 })
149 }
150}
151
152struct DigestHasher<H: Digest>(H);
153
154impl<H: Digest> Hasher for DigestHasher<H> {
155 fn write(&mut self, bytes: &[u8]) {
156 self.0.update(bytes);
157 }
158
159 fn finish(&self) -> u64 {
160 panic!("digest wrapper used to produce a hash");
161 }
162}
163
164use std::convert::Infallible;
165use timely::container::CapacityContainerBuilder;
166use timely::dataflow::channels::pact::Pipeline;
167
168use self::types::ValueMetadata;
169
170pub fn rehydration_finished<G, T>(
174 scope: G,
175 source_config: &crate::source::RawSourceCreationConfig,
176 token: impl std::any::Any + 'static,
178 resume_upper: Antichain<T>,
179 input: &Stream<G, Infallible>,
180) where
181 G: Scope<Timestamp = T>,
182 T: Timestamp,
183{
184 let worker_id = source_config.worker_id;
185 let id = source_config.id;
186 let mut builder = AsyncOperatorBuilder::new(format!("rehydration_finished({id}"), scope);
187 let mut input = builder.new_disconnected_input(input, Pipeline);
188
189 builder.build(move |_capabilities| async move {
190 let mut input_upper = Antichain::from_elem(Timestamp::minimum());
191 while !PartialOrder::less_equal(&resume_upper, &input_upper) {
193 let Some(event) = input.next().await else {
194 break;
195 };
196 if let AsyncEvent::Progress(upper) = event {
197 input_upper = upper;
198 }
199 }
200 tracing::info!(
201 %worker_id,
202 source_id = %id,
203 "upsert source has downgraded past the resume upper ({resume_upper:?}) across all workers",
204 );
205 drop(token);
206 });
207}
208
209pub(crate) fn upsert<G: Scope, FromTime>(
215 input: &Collection<G, (UpsertKey, Option<UpsertValue>, FromTime), Diff>,
216 upsert_envelope: UpsertEnvelope,
217 resume_upper: Antichain<G::Timestamp>,
218 previous: Collection<G, Result<Row, DataflowError>, Diff>,
219 previous_token: Option<Vec<PressOnDropButton>>,
220 source_config: crate::source::SourceExportCreationConfig,
221 instance_context: &StorageInstanceContext,
222 storage_configuration: &StorageConfiguration,
223 dataflow_paramters: &crate::internal_control::DataflowParameters,
224 backpressure_metrics: Option<BackpressureMetrics>,
225) -> (
226 Collection<G, Result<Row, DataflowError>, Diff>,
227 Stream<G, (Option<GlobalId>, HealthStatusUpdate)>,
228 Stream<G, Infallible>,
229 PressOnDropButton,
230)
231where
232 G::Timestamp: TotalOrder + Sync,
233 G::Timestamp: Refines<mz_repr::Timestamp> + TotalOrder + Sync,
234 FromTime: Timestamp + Sync,
235{
236 let upsert_metrics = source_config.metrics.get_upsert_metrics(
237 source_config.id,
238 source_config.worker_id,
239 backpressure_metrics,
240 );
241
242 let rocksdb_cleanup_tries =
243 dyncfgs::STORAGE_ROCKSDB_CLEANUP_TRIES.get(storage_configuration.config_set());
244
245 let prevent_snapshot_buffering =
248 dyncfgs::STORAGE_UPSERT_PREVENT_SNAPSHOT_BUFFERING.get(storage_configuration.config_set());
249 let snapshot_buffering_max = dyncfgs::STORAGE_UPSERT_MAX_SNAPSHOT_BATCH_BUFFERING
251 .get(storage_configuration.config_set());
252
253 let rocksdb_use_native_merge_operator =
256 dyncfgs::STORAGE_ROCKSDB_USE_MERGE_OPERATOR.get(storage_configuration.config_set());
257
258 let upsert_config = UpsertConfig {
259 shrink_upsert_unused_buffers_by_ratio: storage_configuration
260 .parameters
261 .shrink_upsert_unused_buffers_by_ratio,
262 };
263
264 let thin_input = upsert_thinning(input);
265
266 let tuning = dataflow_paramters.upsert_rocksdb_tuning_config.clone();
267
268 let rocksdb_dir = instance_context
273 .scratch_directory
274 .clone()
275 .unwrap_or_else(|| PathBuf::from("/tmp"))
276 .join("storage")
277 .join("upsert")
278 .join(source_config.id.to_string())
279 .join(source_config.worker_id.to_string());
280
281 tracing::info!(
282 worker_id = %source_config.worker_id,
283 source_id = %source_config.id,
284 ?rocksdb_dir,
285 ?tuning,
286 ?rocksdb_use_native_merge_operator,
287 "rendering upsert source"
288 );
289
290 let rocksdb_shared_metrics = Arc::clone(&upsert_metrics.rocksdb_shared);
291 let rocksdb_instance_metrics = Arc::clone(&upsert_metrics.rocksdb_instance_metrics);
292
293 let env = instance_context.rocksdb_env.clone();
294
295 let rocksdb_init_fn = move || async move {
297 let merge_operator = if rocksdb_use_native_merge_operator {
298 Some((
299 "upsert_state_snapshot_merge_v1".to_string(),
300 |a: &[u8], b: ValueIterator<BincodeOpts, StateValue<G::Timestamp, FromTime>>| {
301 consolidating_merge_function::<G::Timestamp, FromTime>(a.into(), b)
302 },
303 ))
304 } else {
305 None
306 };
307 rocksdb::RocksDB::new(
308 mz_rocksdb::RocksDBInstance::new(
309 &rocksdb_dir,
310 mz_rocksdb::InstanceOptions::new(
311 env,
312 rocksdb_cleanup_tries,
313 merge_operator,
314 upsert_bincode_opts(),
317 ),
318 tuning,
319 rocksdb_shared_metrics,
320 rocksdb_instance_metrics,
321 )
322 .unwrap(),
323 )
324 };
325
326 upsert_operator(
327 &thin_input,
328 upsert_envelope.key_indices,
329 resume_upper,
330 previous,
331 previous_token,
332 upsert_metrics,
333 source_config,
334 rocksdb_init_fn,
335 upsert_config,
336 storage_configuration,
337 prevent_snapshot_buffering,
338 snapshot_buffering_max,
339 )
340}
341
342fn upsert_operator<G: Scope, FromTime, F, Fut, US>(
345 input: &Collection<G, (UpsertKey, Option<UpsertValue>, FromTime), Diff>,
346 key_indices: Vec<usize>,
347 resume_upper: Antichain<G::Timestamp>,
348 persist_input: Collection<G, Result<Row, DataflowError>, Diff>,
349 persist_token: Option<Vec<PressOnDropButton>>,
350 upsert_metrics: UpsertMetrics,
351 source_config: crate::source::SourceExportCreationConfig,
352 state: F,
353 upsert_config: UpsertConfig,
354 _storage_configuration: &StorageConfiguration,
355 prevent_snapshot_buffering: bool,
356 snapshot_buffering_max: Option<usize>,
357) -> (
358 Collection<G, Result<Row, DataflowError>, Diff>,
359 Stream<G, (Option<GlobalId>, HealthStatusUpdate)>,
360 Stream<G, Infallible>,
361 PressOnDropButton,
362)
363where
364 G::Timestamp: TotalOrder + Sync,
365 G::Timestamp: Refines<mz_repr::Timestamp> + TotalOrder + Sync,
366 F: FnOnce() -> Fut + 'static,
367 Fut: std::future::Future<Output = US>,
368 US: UpsertStateBackend<G::Timestamp, FromTime>,
369 FromTime: Debug + timely::ExchangeData + Ord + Sync,
370{
371 let use_continual_feedback_upsert = true;
375
376 tracing::info!(id = %source_config.id, %use_continual_feedback_upsert, "upsert operator implementation");
377
378 if use_continual_feedback_upsert {
379 upsert_continual_feedback::upsert_inner(
380 input,
381 key_indices,
382 resume_upper,
383 persist_input,
384 persist_token,
385 upsert_metrics,
386 source_config,
387 state,
388 upsert_config,
389 prevent_snapshot_buffering,
390 snapshot_buffering_max,
391 )
392 } else {
393 upsert_classic(
394 input,
395 key_indices,
396 resume_upper,
397 persist_input,
398 persist_token,
399 upsert_metrics,
400 source_config,
401 state,
402 upsert_config,
403 prevent_snapshot_buffering,
404 snapshot_buffering_max,
405 )
406 }
407}
408
409fn upsert_thinning<G, K, V, FromTime>(
414 input: &Collection<G, (K, V, FromTime), Diff>,
415) -> Collection<G, (K, V, FromTime), Diff>
416where
417 G: Scope,
418 G::Timestamp: TotalOrder,
419 K: timely::Data + Eq + Ord,
420 V: timely::Data,
421 FromTime: Timestamp,
422{
423 input
424 .inner
425 .unary(Pipeline, "UpsertThinning", |_, _| {
426 let mut capability: Option<InputCapability<G::Timestamp>> = None;
428 let mut updates = Vec::new();
430 move |input, output| {
431 while let Some((cap, data)) = input.next() {
432 assert!(
433 data.iter().all(|(_, _, diff)| diff.is_positive()),
434 "invalid upsert input"
435 );
436 updates.append(data);
437 match capability.as_mut() {
438 Some(capability) => {
439 if cap.time() <= capability.time() {
440 *capability = cap;
441 }
442 }
443 None => capability = Some(cap),
444 }
445 }
446 if let Some(capability) = capability.take() {
447 updates.sort_unstable_by(|a, b| {
450 let ((key1, _, from_time1), time1, _) = a;
451 let ((key2, _, from_time2), time2, _) = b;
452 Ord::cmp(
453 &(key1, time1, Reverse(from_time1)),
454 &(key2, time2, Reverse(from_time2)),
455 )
456 });
457 let mut session = output.session(&capability);
458 session.give_iterator(updates.drain(..).dedup_by(|a, b| {
459 let ((key1, _, _), time1, _) = a;
460 let ((key2, _, _), time2, _) = b;
461 (key1, time1) == (key2, time2)
462 }))
463 }
464 }
465 })
466 .as_collection()
467}
468
469fn stage_input<T, FromTime>(
472 stash: &mut Vec<(T, UpsertKey, Reverse<FromTime>, Option<UpsertValue>)>,
473 data: &mut Vec<((UpsertKey, Option<UpsertValue>, FromTime), T, Diff)>,
474 input_upper: &Antichain<T>,
475 resume_upper: &Antichain<T>,
476 storage_shrink_upsert_unused_buffers_by_ratio: usize,
477) where
478 T: PartialOrder,
479 FromTime: Ord,
480{
481 if PartialOrder::less_equal(input_upper, resume_upper) {
482 data.retain(|(_, ts, _)| resume_upper.less_equal(ts));
483 }
484
485 stash.extend(data.drain(..).map(|((key, value, order), time, diff)| {
486 assert!(diff.is_positive(), "invalid upsert input");
487 (time, key, Reverse(order), value)
488 }));
489
490 if storage_shrink_upsert_unused_buffers_by_ratio > 0 {
491 let reduced_capacity = stash.capacity() / storage_shrink_upsert_unused_buffers_by_ratio;
492 if reduced_capacity > stash.len() {
493 stash.shrink_to(reduced_capacity);
494 }
495 }
496}
497
498#[derive(Debug)]
501enum DrainStyle<'a, T> {
502 ToUpper(&'a Antichain<T>),
503 AtTime(T),
504}
505
506async fn drain_staged_input<S, G, T, FromTime, E>(
509 stash: &mut Vec<(T, UpsertKey, Reverse<FromTime>, Option<UpsertValue>)>,
510 commands_state: &mut indexmap::IndexMap<UpsertKey, types::UpsertValueAndSize<T, FromTime>>,
511 output_updates: &mut Vec<(UpsertValue, T, Diff)>,
512 multi_get_scratch: &mut Vec<UpsertKey>,
513 drain_style: DrainStyle<'_, T>,
514 error_emitter: &mut E,
515 state: &mut UpsertState<'_, S, T, FromTime>,
516 source_config: &crate::source::SourceExportCreationConfig,
517) where
518 S: UpsertStateBackend<T, FromTime>,
519 G: Scope,
520 T: PartialOrder + Ord + Clone + Send + Sync + Serialize + Debug + 'static,
521 FromTime: timely::ExchangeData + Ord + Sync,
522 E: UpsertErrorEmitter<G>,
523{
524 stash.sort_unstable();
525
526 let idx = stash.partition_point(|(ts, _, _, _)| match &drain_style {
528 DrainStyle::ToUpper(upper) => !upper.less_equal(ts),
529 DrainStyle::AtTime(time) => ts <= time,
530 });
531
532 tracing::trace!(?drain_style, updates = idx, "draining stash in upsert");
533
534 commands_state.clear();
537 for (_, key, _, _) in stash.iter().take(idx) {
538 commands_state.entry(*key).or_default();
539 }
540
541 multi_get_scratch.clear();
544 multi_get_scratch.extend(commands_state.iter().map(|(k, _)| *k));
545 match state
546 .multi_get(multi_get_scratch.drain(..), commands_state.values_mut())
547 .await
548 {
549 Ok(_) => {}
550 Err(e) => {
551 error_emitter
552 .emit("Failed to fetch records from state".to_string(), e)
553 .await;
554 }
555 }
556
557 let mut commands = stash.drain(..idx).dedup_by(|a, b| {
561 let ((a_ts, a_key, _, _), (b_ts, b_key, _, _)) = (a, b);
562 a_ts == b_ts && a_key == b_key
563 });
564
565 let bincode_opts = types::upsert_bincode_opts();
566 while let Some((ts, key, from_time, value)) = commands.next() {
579 let mut command_state = if let Entry::Occupied(command_state) = commands_state.entry(key) {
580 command_state
581 } else {
582 panic!("key missing from commands_state");
583 };
584
585 let existing_value = &mut command_state.get_mut().value;
586
587 if let Some(cs) = existing_value.as_mut() {
588 cs.ensure_decoded(bincode_opts, source_config.id);
589 }
590
591 let existing_order = existing_value
595 .as_ref()
596 .and_then(|cs| cs.provisional_order(&ts));
597 if existing_order >= Some(&from_time.0) {
598 continue;
603 }
604
605 match value {
606 Some(value) => {
607 if let Some(old_value) =
608 existing_value.replace(StateValue::finalized_value(value.clone()))
609 {
610 if let Some(old_value) = old_value.into_decoded().finalized {
611 output_updates.push((old_value, ts.clone(), Diff::MINUS_ONE));
612 }
613 }
614 output_updates.push((value, ts, Diff::ONE));
615 }
616 None => {
617 if let Some(old_value) = existing_value.take() {
618 if let Some(old_value) = old_value.into_decoded().finalized {
619 output_updates.push((old_value, ts, Diff::MINUS_ONE));
620 }
621 }
622
623 *existing_value = Some(StateValue::tombstone());
625 }
626 }
627 }
628
629 match state
630 .multi_put(
631 true, commands_state.drain(..).map(|(k, cv)| {
633 (
634 k,
635 types::PutValue {
636 value: cv.value.map(|cv| cv.into_decoded()),
637 previous_value_metadata: cv.metadata.map(|v| ValueMetadata {
638 size: v.size.try_into().expect("less than i64 size"),
639 is_tombstone: v.is_tombstone,
640 }),
641 },
642 )
643 }),
644 )
645 .await
646 {
647 Ok(_) => {}
648 Err(e) => {
649 error_emitter
650 .emit("Failed to update records in state".to_string(), e)
651 .await;
652 }
653 }
654}
655
656pub(crate) struct UpsertConfig {
659 pub shrink_upsert_unused_buffers_by_ratio: usize,
660}
661
662fn upsert_classic<G: Scope, FromTime, F, Fut, US>(
663 input: &Collection<G, (UpsertKey, Option<UpsertValue>, FromTime), Diff>,
664 key_indices: Vec<usize>,
665 resume_upper: Antichain<G::Timestamp>,
666 previous: Collection<G, Result<Row, DataflowError>, Diff>,
667 previous_token: Option<Vec<PressOnDropButton>>,
668 upsert_metrics: UpsertMetrics,
669 source_config: crate::source::SourceExportCreationConfig,
670 state: F,
671 upsert_config: UpsertConfig,
672 prevent_snapshot_buffering: bool,
673 snapshot_buffering_max: Option<usize>,
674) -> (
675 Collection<G, Result<Row, DataflowError>, Diff>,
676 Stream<G, (Option<GlobalId>, HealthStatusUpdate)>,
677 Stream<G, Infallible>,
678 PressOnDropButton,
679)
680where
681 G::Timestamp: TotalOrder + Sync,
682 F: FnOnce() -> Fut + 'static,
683 Fut: std::future::Future<Output = US>,
684 US: UpsertStateBackend<G::Timestamp, FromTime>,
685 FromTime: timely::ExchangeData + Ord + Sync,
686{
687 let mut builder = AsyncOperatorBuilder::new("Upsert".to_string(), input.scope());
688
689 let previous = previous.flat_map(move |result| {
691 let value = match result {
692 Ok(ok) => Ok(ok),
693 Err(DataflowError::EnvelopeError(err)) => match *err {
694 EnvelopeError::Upsert(err) => Err(Box::new(err)),
695 _ => return None,
696 },
697 Err(_) => return None,
698 };
699 let value_ref = match value {
700 Ok(ref row) => Ok(row),
701 Err(ref err) => Err(&**err),
702 };
703 Some((UpsertKey::from_value(value_ref, &key_indices), value))
704 });
705 let (output_handle, output) = builder.new_output();
706
707 let (_snapshot_handle, snapshot_stream) =
710 builder.new_output::<CapacityContainerBuilder<Vec<Infallible>>>();
711
712 let (mut health_output, health_stream) = builder.new_output();
713 let mut input = builder.new_input_for(
714 &input.inner,
715 Exchange::new(move |((key, _, _), _, _)| UpsertKey::hashed(key)),
716 &output_handle,
717 );
718
719 let mut previous = builder.new_input_for(
720 &previous.inner,
721 Exchange::new(|((key, _), _, _)| UpsertKey::hashed(key)),
722 &output_handle,
723 );
724
725 let upsert_shared_metrics = Arc::clone(&upsert_metrics.shared);
726 let shutdown_button = builder.build(move |caps| async move {
727 let [mut output_cap, mut snapshot_cap, health_cap]: [_; 3] = caps.try_into().unwrap();
728
729 let mut state = UpsertState::<_, _, FromTime>::new(
730 state().await,
731 upsert_shared_metrics,
732 &upsert_metrics,
733 source_config.source_statistics.clone(),
734 upsert_config.shrink_upsert_unused_buffers_by_ratio,
735 );
736 let mut events = vec![];
737 let mut snapshot_upper = Antichain::from_elem(Timestamp::minimum());
738
739 let mut stash = vec![];
740
741 let mut error_emitter = (&mut health_output, &health_cap);
742
743 tracing::info!(
744 ?resume_upper,
745 ?snapshot_upper,
746 "timely-{} upsert source {} starting rehydration",
747 source_config.worker_id,
748 source_config.id
749 );
750 while !PartialOrder::less_equal(&resume_upper, &snapshot_upper) {
753 previous.ready().await;
754 while let Some(event) = previous.next_sync() {
755 match event {
756 AsyncEvent::Data(_cap, data) => {
757 events.extend(data.into_iter().filter_map(|((key, value), ts, diff)| {
758 if !resume_upper.less_equal(&ts) {
759 Some((key, value, diff))
760 } else {
761 None
762 }
763 }))
764 }
765 AsyncEvent::Progress(upper) => {
766 snapshot_upper = upper;
767 }
768 };
769 }
770
771 match state
772 .consolidate_chunk(
773 events.drain(..),
774 PartialOrder::less_equal(&resume_upper, &snapshot_upper),
775 )
776 .await
777 {
778 Ok(_) => {
779 if let Some(ts) = snapshot_upper.clone().into_option() {
780 if !resume_upper.less_equal(&ts) {
784 snapshot_cap.downgrade(&ts);
785 output_cap.downgrade(&ts);
786 }
787 }
788 }
789 Err(e) => {
790 UpsertErrorEmitter::<G>::emit(
791 &mut error_emitter,
792 "Failed to rehydrate state".to_string(),
793 e,
794 )
795 .await;
796 }
797 }
798 }
799
800 drop(events);
801 drop(previous_token);
802 drop(snapshot_cap);
803
804 while let Some(_event) = previous.next().await {}
810
811 if let Some(ts) = resume_upper.as_option() {
813 output_cap.downgrade(ts);
814 }
815
816 tracing::info!(
817 "timely-{} upsert source {} finished rehydration",
818 source_config.worker_id,
819 source_config.id
820 );
821
822 let mut commands_state: indexmap::IndexMap<
825 _,
826 types::UpsertValueAndSize<G::Timestamp, FromTime>,
827 > = indexmap::IndexMap::new();
828 let mut multi_get_scratch = Vec::new();
829
830 let mut output_updates = vec![];
832 let mut input_upper = Antichain::from_elem(Timestamp::minimum());
833
834 while let Some(event) = input.next().await {
835 let events = [event]
838 .into_iter()
839 .chain(std::iter::from_fn(|| input.next().now_or_never().flatten()))
840 .enumerate();
841
842 let mut partial_drain_time = None;
843 for (i, event) in events {
844 match event {
845 AsyncEvent::Data(cap, mut data) => {
846 tracing::trace!(
847 time=?cap.time(),
848 updates=%data.len(),
849 "received data in upsert"
850 );
851 stage_input(
852 &mut stash,
853 &mut data,
854 &input_upper,
855 &resume_upper,
856 upsert_config.shrink_upsert_unused_buffers_by_ratio,
857 );
858
859 let event_time = cap.time();
860 if prevent_snapshot_buffering && output_cap.time() == event_time {
867 partial_drain_time = Some(event_time.clone());
868 }
869 }
870 AsyncEvent::Progress(upper) => {
871 tracing::trace!(?upper, "received progress in upsert");
872 if PartialOrder::less_than(&upper, &resume_upper) {
875 continue;
876 }
877
878 partial_drain_time = None;
881 drain_staged_input::<_, G, _, _, _>(
882 &mut stash,
883 &mut commands_state,
884 &mut output_updates,
885 &mut multi_get_scratch,
886 DrainStyle::ToUpper(&upper),
887 &mut error_emitter,
888 &mut state,
889 &source_config,
890 )
891 .await;
892
893 output_handle.give_container(&output_cap, &mut output_updates);
894
895 if let Some(ts) = upper.as_option() {
896 output_cap.downgrade(ts);
897 }
898 input_upper = upper;
899 }
900 }
901 let events_processed = i + 1;
902 if let Some(max) = snapshot_buffering_max {
903 if events_processed >= max {
904 break;
905 }
906 }
907 }
908
909 if let Some(partial_drain_time) = partial_drain_time {
918 drain_staged_input::<_, G, _, _, _>(
919 &mut stash,
920 &mut commands_state,
921 &mut output_updates,
922 &mut multi_get_scratch,
923 DrainStyle::AtTime(partial_drain_time),
924 &mut error_emitter,
925 &mut state,
926 &source_config,
927 )
928 .await;
929
930 output_handle.give_container(&output_cap, &mut output_updates);
931 }
932 }
933 });
934
935 (
936 output.as_collection().map(|result| match result {
937 Ok(ok) => Ok(ok),
938 Err(err) => Err(DataflowError::from(EnvelopeError::Upsert(*err))),
939 }),
940 health_stream,
941 snapshot_stream,
942 shutdown_button.press_on_drop(),
943 )
944}
945
946#[async_trait::async_trait(?Send)]
947pub(crate) trait UpsertErrorEmitter<G> {
948 async fn emit(&mut self, context: String, e: anyhow::Error);
949}
950
951#[async_trait::async_trait(?Send)]
952impl<G: Scope> UpsertErrorEmitter<G>
953 for (
954 &mut AsyncOutputHandle<
955 <G as ScopeParent>::Timestamp,
956 CapacityContainerBuilder<Vec<(Option<GlobalId>, HealthStatusUpdate)>>,
957 Tee<<G as ScopeParent>::Timestamp, Vec<(Option<GlobalId>, HealthStatusUpdate)>>,
958 >,
959 &Capability<<G as ScopeParent>::Timestamp>,
960 )
961{
962 async fn emit(&mut self, context: String, e: anyhow::Error) {
963 process_upsert_state_error::<G>(context, e, self.0, self.1).await
964 }
965}
966
967async fn process_upsert_state_error<G: Scope>(
969 context: String,
970 e: anyhow::Error,
971 health_output: &AsyncOutputHandle<
972 <G as ScopeParent>::Timestamp,
973 CapacityContainerBuilder<Vec<(Option<GlobalId>, HealthStatusUpdate)>>,
974 Tee<<G as ScopeParent>::Timestamp, Vec<(Option<GlobalId>, HealthStatusUpdate)>>,
975 >,
976 health_cap: &Capability<<G as ScopeParent>::Timestamp>,
977) {
978 let update = HealthStatusUpdate::halting(e.context(context).to_string_with_causes(), None);
979 health_output.give(health_cap, (None, update));
980 std::future::pending::<()>().await;
981 unreachable!("pending future never returns");
982}