1use std::cell::RefCell;
11use std::cmp::Reverse;
12use std::convert::AsRef;
13use std::fmt::Debug;
14use std::hash::{Hash, Hasher};
15use std::path::PathBuf;
16use std::sync::Arc;
17
18use differential_dataflow::hashable::Hashable;
19use differential_dataflow::{AsCollection, VecCollection};
20use futures::StreamExt;
21use futures::future::FutureExt;
22use indexmap::map::Entry;
23use itertools::Itertools;
24use mz_ore::error::ErrorExt;
25use mz_repr::{Datum, DatumVec, Diff, GlobalId, Row};
26use mz_rocksdb::ValueIterator;
27use mz_storage_operators::metrics::BackpressureMetrics;
28use mz_storage_types::configuration::StorageConfiguration;
29use mz_storage_types::dyncfgs;
30use mz_storage_types::errors::{DataflowError, EnvelopeError, UpsertError};
31use mz_storage_types::sources::envelope::UpsertEnvelope;
32use mz_timely_util::builder_async::{
33 AsyncOutputHandle, Event as AsyncEvent, OperatorBuilder as AsyncOperatorBuilder,
34 PressOnDropButton,
35};
36use serde::{Deserialize, Serialize};
37use sha2::{Digest, Sha256};
38use timely::dataflow::channels::pact::Exchange;
39use timely::dataflow::operators::{Capability, InputCapability, Operator};
40use timely::dataflow::{Scope, StreamVec};
41use timely::order::{PartialOrder, TotalOrder};
42use timely::progress::timestamp::Refines;
43use timely::progress::{Antichain, Timestamp};
44
45use crate::healthcheck::HealthStatusUpdate;
46use crate::metrics::upsert::UpsertMetrics;
47use crate::storage_state::StorageInstanceContext;
48use crate::{upsert_continual_feedback, upsert_continual_feedback_v2};
49use types::{
50 BincodeOpts, StateValue, UpsertState, UpsertStateBackend, consolidating_merge_function,
51 upsert_bincode_opts,
52};
53
54#[cfg(test)]
55pub mod memory;
56pub(crate) mod rocksdb;
57pub(crate) mod types;
59
60pub type UpsertValue = Result<Row, Box<UpsertError>>;
61
62#[derive(
63 Copy,
64 Clone,
65 Hash,
66 PartialEq,
67 Eq,
68 PartialOrd,
69 Ord,
70 Serialize,
71 Deserialize
72)]
73pub struct UpsertKey([u8; 32]);
74
75impl Debug for UpsertKey {
76 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
77 write!(f, "0x")?;
78 for byte in self.0 {
79 write!(f, "{:02x}", byte)?;
80 }
81 Ok(())
82 }
83}
84
85impl AsRef<[u8]> for UpsertKey {
86 #[inline(always)]
87 fn as_ref(&self) -> &[u8] {
91 &self.0
92 }
93}
94
95impl From<&[u8]> for UpsertKey {
96 fn from(bytes: &[u8]) -> Self {
97 UpsertKey(bytes.try_into().expect("invalid key length"))
98 }
99}
100
101type KeyHash = Sha256;
106
107impl UpsertKey {
108 pub fn from_key(key: Result<&Row, &UpsertError>) -> Self {
109 Self::from_iter(key.map(|r| r.iter()))
110 }
111
112 pub fn from_value(value: Result<&Row, &UpsertError>, key_indices: &[usize]) -> Self {
113 thread_local! {
114 static VALUE_DATUMS: RefCell<DatumVec> = RefCell::new(DatumVec::new());
116 }
117 VALUE_DATUMS.with(|value_datums| {
118 let mut value_datums = value_datums.borrow_mut();
119 let value = value.map(|v| value_datums.borrow_with(v));
120 let key = match value {
121 Ok(ref datums) => Ok(key_indices.iter().map(|&idx| datums[idx])),
122 Err(err) => Err(err),
123 };
124 Self::from_iter(key)
125 })
126 }
127
128 pub fn from_iter<'a, 'b>(
129 key: Result<impl Iterator<Item = Datum<'a>> + 'b, &UpsertError>,
130 ) -> Self {
131 thread_local! {
132 static KEY_DATUMS: RefCell<DatumVec> = RefCell::new(DatumVec::new());
134 }
135 KEY_DATUMS.with(|key_datums| {
136 let mut key_datums = key_datums.borrow_mut();
137 let mut key_datums = key_datums.borrow();
140 let key: Result<&[Datum], Datum> = match key {
141 Ok(key) => {
142 for datum in key {
143 key_datums.push(datum);
144 }
145 Ok(&*key_datums)
146 }
147 Err(UpsertError::Value(err)) => {
148 key_datums.extend(err.for_key.iter());
149 Ok(&*key_datums)
150 }
151 Err(UpsertError::KeyDecode(err)) => Err(Datum::Bytes(&err.raw)),
152 Err(UpsertError::NullKey(_)) => Err(Datum::Null),
153 };
154 let mut hasher = DigestHasher(KeyHash::new());
155 key.hash(&mut hasher);
156 Self(hasher.0.finalize().into())
157 })
158 }
159}
160
161struct DigestHasher<H: Digest>(H);
162
163impl<H: Digest> Hasher for DigestHasher<H> {
164 fn write(&mut self, bytes: &[u8]) {
165 self.0.update(bytes);
166 }
167
168 fn finish(&self) -> u64 {
169 panic!("digest wrapper used to produce a hash");
170 }
171}
172
173use std::convert::Infallible;
174use timely::container::CapacityContainerBuilder;
175use timely::dataflow::channels::pact::Pipeline;
176
177use self::types::ValueMetadata;
178
179pub fn rehydration_finished<'scope, T: Timestamp>(
183 scope: Scope<'scope, T>,
184 source_config: &crate::source::RawSourceCreationConfig,
185 token: impl std::any::Any + 'static,
187 resume_upper: Antichain<T>,
188 input: StreamVec<'scope, T, Infallible>,
189) {
190 let worker_id = source_config.worker_id;
191 let id = source_config.id;
192 let mut builder = AsyncOperatorBuilder::new(format!("rehydration_finished({id}"), scope);
193 let mut input = builder.new_disconnected_input(input, Pipeline);
194
195 builder.build(move |_capabilities| async move {
196 let mut input_upper = Antichain::from_elem(Timestamp::minimum());
197 while !PartialOrder::less_equal(&resume_upper, &input_upper) {
199 let Some(event) = input.next().await else {
200 break;
201 };
202 if let AsyncEvent::Progress(upper) = event {
203 input_upper = upper;
204 }
205 }
206 tracing::info!(
207 %worker_id,
208 source_id = %id,
209 "upsert source has downgraded past the resume upper ({resume_upper:?}) across all workers",
210 );
211 drop(token);
212 });
213}
214
215pub(crate) fn upsert<'scope, T, FromTime>(
221 input: VecCollection<'scope, T, (UpsertKey, Option<UpsertValue>, FromTime), Diff>,
222 upsert_envelope: UpsertEnvelope,
223 resume_upper: Antichain<T>,
224 previous: VecCollection<'scope, T, Result<Row, DataflowError>, Diff>,
225 previous_token: Option<Vec<PressOnDropButton>>,
226 source_config: crate::source::SourceExportCreationConfig,
227 instance_context: &StorageInstanceContext,
228 storage_configuration: &StorageConfiguration,
229 dataflow_paramters: &crate::internal_control::DataflowParameters,
230 backpressure_metrics: Option<BackpressureMetrics>,
231) -> (
232 VecCollection<'scope, T, Result<Row, DataflowError>, Diff>,
233 StreamVec<'scope, T, (Option<GlobalId>, HealthStatusUpdate)>,
234 StreamVec<'scope, T, Infallible>,
235 PressOnDropButton,
236)
237where
238 T: Timestamp + TotalOrder + Sync,
239 T: Refines<mz_repr::Timestamp> + TotalOrder + Sync,
240 FromTime: Timestamp + Clone + Sync,
241{
242 let upsert_metrics = source_config.metrics.get_upsert_metrics(
243 source_config.id,
244 source_config.worker_id,
245 backpressure_metrics,
246 );
247
248 let rocksdb_cleanup_tries =
249 dyncfgs::STORAGE_ROCKSDB_CLEANUP_TRIES.get(storage_configuration.config_set());
250
251 let prevent_snapshot_buffering =
254 dyncfgs::STORAGE_UPSERT_PREVENT_SNAPSHOT_BUFFERING.get(storage_configuration.config_set());
255 let snapshot_buffering_max = dyncfgs::STORAGE_UPSERT_MAX_SNAPSHOT_BATCH_BUFFERING
257 .get(storage_configuration.config_set());
258
259 let rocksdb_use_native_merge_operator =
262 dyncfgs::STORAGE_ROCKSDB_USE_MERGE_OPERATOR.get(storage_configuration.config_set());
263
264 let upsert_config = UpsertConfig {
265 shrink_upsert_unused_buffers_by_ratio: storage_configuration
266 .parameters
267 .shrink_upsert_unused_buffers_by_ratio,
268 };
269
270 let thin_input = upsert_thinning(input);
271
272 let tuning = dataflow_paramters.upsert_rocksdb_tuning_config.clone();
273
274 let rocksdb_dir = instance_context
279 .scratch_directory
280 .clone()
281 .unwrap_or_else(|| PathBuf::from("/tmp"))
282 .join("storage")
283 .join("upsert")
284 .join(source_config.id.to_string())
285 .join(source_config.worker_id.to_string());
286
287 tracing::info!(
288 worker_id = %source_config.worker_id,
289 source_id = %source_config.id,
290 ?rocksdb_dir,
291 ?tuning,
292 ?rocksdb_use_native_merge_operator,
293 "rendering upsert source"
294 );
295
296 let rocksdb_shared_metrics = Arc::clone(&upsert_metrics.rocksdb_shared);
297 let rocksdb_instance_metrics = Arc::clone(&upsert_metrics.rocksdb_instance_metrics);
298
299 let env = instance_context.rocksdb_env.clone();
300
301 let rocksdb_init_fn = move || async move {
303 let merge_operator = if rocksdb_use_native_merge_operator {
304 Some((
305 "upsert_state_snapshot_merge_v1".to_string(),
306 |a: &[u8], b: ValueIterator<BincodeOpts, StateValue<T, FromTime>>| {
307 consolidating_merge_function::<T, FromTime>(a.into(), b)
308 },
309 ))
310 } else {
311 None
312 };
313 rocksdb::RocksDB::new(
314 mz_rocksdb::RocksDBInstance::new(
315 &rocksdb_dir,
316 mz_rocksdb::InstanceOptions::new(
317 env,
318 rocksdb_cleanup_tries,
319 merge_operator,
320 upsert_bincode_opts(),
323 ),
324 tuning,
325 rocksdb_shared_metrics,
326 rocksdb_instance_metrics,
327 )
328 .unwrap(),
329 )
330 };
331
332 upsert_operator(
333 thin_input,
334 upsert_envelope.key_indices,
335 resume_upper,
336 previous,
337 previous_token,
338 upsert_metrics,
339 source_config,
340 rocksdb_init_fn,
341 upsert_config,
342 storage_configuration,
343 prevent_snapshot_buffering,
344 snapshot_buffering_max,
345 )
346}
347
348pub(crate) fn upsert_v2<'scope, T, FromTime>(
355 input: VecCollection<'scope, T, (UpsertKey, Option<UpsertValue>, FromTime), Diff>,
356 upsert_envelope: UpsertEnvelope,
357 resume_upper: Antichain<T>,
358 previous: VecCollection<'scope, T, Result<Row, DataflowError>, Diff>,
359 previous_token: Option<Vec<PressOnDropButton>>,
360 source_config: crate::source::SourceExportCreationConfig,
361 backpressure_metrics: Option<BackpressureMetrics>,
362) -> (
363 VecCollection<'scope, T, Result<Row, DataflowError>, Diff>,
364 StreamVec<'scope, T, (Option<GlobalId>, HealthStatusUpdate)>,
365 StreamVec<'scope, T, Infallible>,
366 PressOnDropButton,
367)
368where
369 T: Timestamp + TotalOrder + Sync,
370 T: Refines<mz_repr::Timestamp> + TotalOrder + differential_dataflow::lattice::Lattice + Sync,
371 FromTime: Timestamp + Clone + Sync,
372{
373 let upsert_metrics = source_config.metrics.get_upsert_metrics(
374 source_config.id,
375 source_config.worker_id,
376 backpressure_metrics,
377 );
378
379 let thin_input = upsert_thinning(input);
380
381 tracing::info!(
382 worker_id = %source_config.worker_id,
383 source_id = %source_config.id,
384 "rendering upsert source (btreemap backend)"
385 );
386
387 upsert_continual_feedback_v2::upsert_inner(
388 thin_input,
389 upsert_envelope.key_indices,
390 resume_upper,
391 previous,
392 previous_token,
393 upsert_metrics,
394 source_config,
395 )
396}
397
398fn upsert_operator<'scope, T, FromTime, F, Fut, US>(
401 input: VecCollection<'scope, T, (UpsertKey, Option<UpsertValue>, FromTime), Diff>,
402 key_indices: Vec<usize>,
403 resume_upper: Antichain<T>,
404 persist_input: VecCollection<'scope, T, Result<Row, DataflowError>, Diff>,
405 persist_token: Option<Vec<PressOnDropButton>>,
406 upsert_metrics: UpsertMetrics,
407 source_config: crate::source::SourceExportCreationConfig,
408 state: F,
409 upsert_config: UpsertConfig,
410 _storage_configuration: &StorageConfiguration,
411 prevent_snapshot_buffering: bool,
412 snapshot_buffering_max: Option<usize>,
413) -> (
414 VecCollection<'scope, T, Result<Row, DataflowError>, Diff>,
415 StreamVec<'scope, T, (Option<GlobalId>, HealthStatusUpdate)>,
416 StreamVec<'scope, T, Infallible>,
417 PressOnDropButton,
418)
419where
420 T: Timestamp + TotalOrder + Sync,
421 T: Refines<mz_repr::Timestamp> + TotalOrder + Sync,
422 F: FnOnce() -> Fut + 'static,
423 Fut: std::future::Future<Output = US>,
424 US: UpsertStateBackend<T, FromTime>,
425 FromTime: Debug + timely::ExchangeData + Clone + Ord + Sync,
426{
427 let use_continual_feedback_upsert = true;
431
432 tracing::info!(id = %source_config.id, %use_continual_feedback_upsert, "upsert operator implementation");
433
434 if use_continual_feedback_upsert {
435 upsert_continual_feedback::upsert_inner(
436 input,
437 key_indices,
438 resume_upper,
439 persist_input,
440 persist_token,
441 upsert_metrics,
442 source_config,
443 state,
444 upsert_config,
445 prevent_snapshot_buffering,
446 snapshot_buffering_max,
447 )
448 } else {
449 upsert_classic(
450 input,
451 key_indices,
452 resume_upper,
453 persist_input,
454 persist_token,
455 upsert_metrics,
456 source_config,
457 state,
458 upsert_config,
459 prevent_snapshot_buffering,
460 snapshot_buffering_max,
461 )
462 }
463}
464
465fn upsert_thinning<'scope, T, K, V, FromTime>(
470 input: VecCollection<'scope, T, (K, V, FromTime), Diff>,
471) -> VecCollection<'scope, T, (K, V, FromTime), Diff>
472where
473 T: Timestamp + TotalOrder,
474 K: timely::ExchangeData + Clone + Eq + Ord,
475 V: timely::ExchangeData + Clone,
476 FromTime: Timestamp,
477{
478 input
479 .inner
480 .unary(Pipeline, "UpsertThinning", |_, _| {
481 let mut capability: Option<InputCapability<T>> = None;
483 let mut updates = Vec::new();
485 move |input, output| {
486 input.for_each(|cap, data| {
487 assert!(
488 data.iter().all(|(_, _, diff)| diff.is_positive()),
489 "invalid upsert input"
490 );
491 updates.append(data);
492 match capability.as_mut() {
493 Some(capability) => {
494 if cap.time() <= capability.time() {
495 *capability = cap;
496 }
497 }
498 None => capability = Some(cap),
499 }
500 });
501 if let Some(capability) = capability.take() {
502 updates.sort_unstable_by(|a, b| {
505 let ((key1, _, from_time1), time1, _) = a;
506 let ((key2, _, from_time2), time2, _) = b;
507 Ord::cmp(
508 &(key1, time1, Reverse(from_time1)),
509 &(key2, time2, Reverse(from_time2)),
510 )
511 });
512 let mut session = output.session(&capability);
513 session.give_iterator(updates.drain(..).dedup_by(|a, b| {
514 let ((key1, _, _), time1, _) = a;
515 let ((key2, _, _), time2, _) = b;
516 (key1, time1) == (key2, time2)
517 }))
518 }
519 }
520 })
521 .as_collection()
522}
523
524fn stage_input<T, FromTime>(
527 stash: &mut Vec<(T, UpsertKey, Reverse<FromTime>, Option<UpsertValue>)>,
528 data: &mut Vec<((UpsertKey, Option<UpsertValue>, FromTime), T, Diff)>,
529 input_upper: &Antichain<T>,
530 resume_upper: &Antichain<T>,
531 storage_shrink_upsert_unused_buffers_by_ratio: usize,
532) where
533 T: PartialOrder,
534 FromTime: Ord,
535{
536 if PartialOrder::less_equal(input_upper, resume_upper) {
537 data.retain(|(_, ts, _)| resume_upper.less_equal(ts));
538 }
539
540 stash.extend(data.drain(..).map(|((key, value, order), time, diff)| {
541 assert!(diff.is_positive(), "invalid upsert input");
542 (time, key, Reverse(order), value)
543 }));
544
545 if storage_shrink_upsert_unused_buffers_by_ratio > 0 {
546 let reduced_capacity = stash.capacity() / storage_shrink_upsert_unused_buffers_by_ratio;
547 if reduced_capacity > stash.len() {
548 stash.shrink_to(reduced_capacity);
549 }
550 }
551}
552
553#[derive(Debug)]
556enum DrainStyle<'a, T> {
557 ToUpper(&'a Antichain<T>),
558 AtTime(T),
559}
560
561async fn drain_staged_input<S, T, FromTime, E>(
564 stash: &mut Vec<(T, UpsertKey, Reverse<FromTime>, Option<UpsertValue>)>,
565 commands_state: &mut indexmap::IndexMap<UpsertKey, types::UpsertValueAndSize<T, FromTime>>,
566 output_updates: &mut Vec<(UpsertValue, T, Diff)>,
567 multi_get_scratch: &mut Vec<UpsertKey>,
568 drain_style: DrainStyle<'_, T>,
569 error_emitter: &mut E,
570 state: &mut UpsertState<'_, S, T, FromTime>,
571 source_config: &crate::source::SourceExportCreationConfig,
572) where
573 S: UpsertStateBackend<T, FromTime>,
574 T: PartialOrder + Ord + Clone + Send + Sync + Serialize + Debug + 'static,
575 FromTime: timely::ExchangeData + Clone + Ord + Sync,
576 E: UpsertErrorEmitter<T>,
577{
578 stash.sort_unstable();
579
580 let idx = stash.partition_point(|(ts, _, _, _)| match &drain_style {
582 DrainStyle::ToUpper(upper) => !upper.less_equal(ts),
583 DrainStyle::AtTime(time) => ts <= time,
584 });
585
586 tracing::trace!(?drain_style, updates = idx, "draining stash in upsert");
587
588 commands_state.clear();
591 for (_, key, _, _) in stash.iter().take(idx) {
592 commands_state.entry(*key).or_default();
593 }
594
595 multi_get_scratch.clear();
598 multi_get_scratch.extend(commands_state.iter().map(|(k, _)| *k));
599 match state
600 .multi_get(multi_get_scratch.drain(..), commands_state.values_mut())
601 .await
602 {
603 Ok(_) => {}
604 Err(e) => {
605 error_emitter
606 .emit("Failed to fetch records from state".to_string(), e)
607 .await;
608 }
609 }
610
611 let mut commands = stash.drain(..idx).dedup_by(|a, b| {
615 let ((a_ts, a_key, _, _), (b_ts, b_key, _, _)) = (a, b);
616 a_ts == b_ts && a_key == b_key
617 });
618
619 let bincode_opts = types::upsert_bincode_opts();
620 while let Some((ts, key, from_time, value)) = commands.next() {
633 let mut command_state = if let Entry::Occupied(command_state) = commands_state.entry(key) {
634 command_state
635 } else {
636 panic!("key missing from commands_state");
637 };
638
639 let existing_value = &mut command_state.get_mut().value;
640
641 if let Some(cs) = existing_value.as_mut() {
642 cs.ensure_decoded(bincode_opts, source_config.id, Some(&key));
643 }
644
645 let existing_order = existing_value
649 .as_ref()
650 .and_then(|cs| cs.provisional_order(&ts));
651 if existing_order >= Some(&from_time.0) {
652 continue;
657 }
658
659 match value {
660 Some(value) => {
661 if let Some(old_value) =
662 existing_value.replace(StateValue::finalized_value(value.clone()))
663 {
664 if let Some(old_value) = old_value.into_decoded().finalized {
665 output_updates.push((old_value, ts.clone(), Diff::MINUS_ONE));
666 }
667 }
668 output_updates.push((value, ts, Diff::ONE));
669 }
670 None => {
671 if let Some(old_value) = existing_value.take() {
672 if let Some(old_value) = old_value.into_decoded().finalized {
673 output_updates.push((old_value, ts, Diff::MINUS_ONE));
674 }
675 }
676
677 *existing_value = Some(StateValue::tombstone());
679 }
680 }
681 }
682
683 match state
684 .multi_put(
685 true, commands_state.drain(..).map(|(k, cv)| {
687 (
688 k,
689 types::PutValue {
690 value: cv.value.map(|cv| cv.into_decoded()),
691 previous_value_metadata: cv.metadata.map(|v| ValueMetadata {
692 size: v.size.try_into().expect("less than i64 size"),
693 is_tombstone: v.is_tombstone,
694 }),
695 },
696 )
697 }),
698 )
699 .await
700 {
701 Ok(_) => {}
702 Err(e) => {
703 error_emitter
704 .emit("Failed to update records in state".to_string(), e)
705 .await;
706 }
707 }
708}
709
710pub(crate) struct UpsertConfig {
713 pub shrink_upsert_unused_buffers_by_ratio: usize,
714}
715
716fn upsert_classic<'scope, T, FromTime, F, Fut, US>(
717 input: VecCollection<'scope, T, (UpsertKey, Option<UpsertValue>, FromTime), Diff>,
718 key_indices: Vec<usize>,
719 resume_upper: Antichain<T>,
720 previous: VecCollection<'scope, T, Result<Row, DataflowError>, Diff>,
721 previous_token: Option<Vec<PressOnDropButton>>,
722 upsert_metrics: UpsertMetrics,
723 source_config: crate::source::SourceExportCreationConfig,
724 state: F,
725 upsert_config: UpsertConfig,
726 prevent_snapshot_buffering: bool,
727 snapshot_buffering_max: Option<usize>,
728) -> (
729 VecCollection<'scope, T, Result<Row, DataflowError>, Diff>,
730 StreamVec<'scope, T, (Option<GlobalId>, HealthStatusUpdate)>,
731 StreamVec<'scope, T, Infallible>,
732 PressOnDropButton,
733)
734where
735 T: Timestamp + TotalOrder + Sync,
736 F: FnOnce() -> Fut + 'static,
737 Fut: std::future::Future<Output = US>,
738 US: UpsertStateBackend<T, FromTime>,
739 FromTime: timely::ExchangeData + Clone + Ord + Sync,
740{
741 let mut builder = AsyncOperatorBuilder::new("Upsert".to_string(), input.scope());
742
743 let previous = previous.flat_map(move |result| {
745 let value = match result {
746 Ok(ok) => Ok(ok),
747 Err(DataflowError::EnvelopeError(err)) => match *err {
748 EnvelopeError::Upsert(err) => Err(Box::new(err)),
749 EnvelopeError::Flat(_) => return None,
750 },
751 Err(_) => return None,
752 };
753 let value_ref = match value {
754 Ok(ref row) => Ok(row),
755 Err(ref err) => Err(&**err),
756 };
757 Some((UpsertKey::from_value(value_ref, &key_indices), value))
758 });
759 let (output_handle, output) = builder.new_output();
760
761 let (_snapshot_handle, snapshot_stream) =
764 builder.new_output::<CapacityContainerBuilder<Vec<Infallible>>>();
765
766 let (mut health_output, health_stream) = builder.new_output();
767 let mut input = builder.new_input_for(
768 input.inner,
769 Exchange::new(move |((key, _, _), _, _)| UpsertKey::hashed(key)),
770 &output_handle,
771 );
772
773 let mut previous = builder.new_input_for(
774 previous.inner,
775 Exchange::new(|((key, _), _, _)| UpsertKey::hashed(key)),
776 &output_handle,
777 );
778
779 let upsert_shared_metrics = Arc::clone(&upsert_metrics.shared);
780 let shutdown_button = builder.build(move |caps| async move {
781 let [mut output_cap, mut snapshot_cap, health_cap]: [_; 3] = caps.try_into().unwrap();
782
783 let mut state = UpsertState::<_, _, FromTime>::new(
784 state().await,
785 upsert_shared_metrics,
786 &upsert_metrics,
787 source_config.source_statistics.clone(),
788 upsert_config.shrink_upsert_unused_buffers_by_ratio,
789 );
790 let mut events = vec![];
791 let mut snapshot_upper = Antichain::from_elem(Timestamp::minimum());
792
793 let mut stash = vec![];
794
795 let mut error_emitter = (&mut health_output, &health_cap);
796
797 tracing::info!(
798 ?resume_upper,
799 ?snapshot_upper,
800 "timely-{} upsert source {} starting rehydration",
801 source_config.worker_id,
802 source_config.id
803 );
804 while !PartialOrder::less_equal(&resume_upper, &snapshot_upper) {
807 previous.ready().await;
808 while let Some(event) = previous.next_sync() {
809 match event {
810 AsyncEvent::Data(_cap, data) => {
811 events.extend(data.into_iter().filter_map(|((key, value), ts, diff)| {
812 if !resume_upper.less_equal(&ts) {
813 Some((key, value, diff))
814 } else {
815 None
816 }
817 }))
818 }
819 AsyncEvent::Progress(upper) => {
820 snapshot_upper = upper;
821 }
822 };
823 }
824
825 match state
826 .consolidate_chunk(
827 events.drain(..),
828 PartialOrder::less_equal(&resume_upper, &snapshot_upper),
829 )
830 .await
831 {
832 Ok(_) => {
833 if let Some(ts) = snapshot_upper.clone().into_option() {
834 if !resume_upper.less_equal(&ts) {
838 snapshot_cap.downgrade(&ts);
839 output_cap.downgrade(&ts);
840 }
841 }
842 }
843 Err(e) => {
844 UpsertErrorEmitter::<T>::emit(
845 &mut error_emitter,
846 "Failed to rehydrate state".to_string(),
847 e,
848 )
849 .await;
850 }
851 }
852 }
853
854 drop(events);
855 drop(previous_token);
856 drop(snapshot_cap);
857
858 while let Some(_event) = previous.next().await {}
864
865 if let Some(ts) = resume_upper.as_option() {
867 output_cap.downgrade(ts);
868 }
869
870 tracing::info!(
871 "timely-{} upsert source {} finished rehydration",
872 source_config.worker_id,
873 source_config.id
874 );
875
876 let mut commands_state: indexmap::IndexMap<_, types::UpsertValueAndSize<T, FromTime>> =
879 indexmap::IndexMap::new();
880 let mut multi_get_scratch = Vec::new();
881
882 let mut output_updates = vec![];
884 let mut input_upper = Antichain::from_elem(Timestamp::minimum());
885
886 while let Some(event) = input.next().await {
887 let events = [event]
890 .into_iter()
891 .chain(std::iter::from_fn(|| input.next().now_or_never().flatten()))
892 .enumerate();
893
894 let mut partial_drain_time = None;
895 for (i, event) in events {
896 match event {
897 AsyncEvent::Data(cap, mut data) => {
898 tracing::trace!(
899 time=?cap.time(),
900 updates=%data.len(),
901 "received data in upsert"
902 );
903 stage_input(
904 &mut stash,
905 &mut data,
906 &input_upper,
907 &resume_upper,
908 upsert_config.shrink_upsert_unused_buffers_by_ratio,
909 );
910
911 let event_time = cap.time();
912 if prevent_snapshot_buffering && output_cap.time() == event_time {
919 partial_drain_time = Some(event_time.clone());
920 }
921 }
922 AsyncEvent::Progress(upper) => {
923 tracing::trace!(?upper, "received progress in upsert");
924 if PartialOrder::less_than(&upper, &resume_upper) {
927 continue;
928 }
929
930 partial_drain_time = None;
933 drain_staged_input::<_, _, _, _>(
934 &mut stash,
935 &mut commands_state,
936 &mut output_updates,
937 &mut multi_get_scratch,
938 DrainStyle::ToUpper(&upper),
939 &mut error_emitter,
940 &mut state,
941 &source_config,
942 )
943 .await;
944
945 output_handle.give_container(&output_cap, &mut output_updates);
946
947 if let Some(ts) = upper.as_option() {
948 output_cap.downgrade(ts);
949 }
950 input_upper = upper;
951 }
952 }
953 let events_processed = i + 1;
954 if let Some(max) = snapshot_buffering_max {
955 if events_processed >= max {
956 break;
957 }
958 }
959 }
960
961 if let Some(partial_drain_time) = partial_drain_time {
970 drain_staged_input::<_, _, _, _>(
971 &mut stash,
972 &mut commands_state,
973 &mut output_updates,
974 &mut multi_get_scratch,
975 DrainStyle::AtTime(partial_drain_time),
976 &mut error_emitter,
977 &mut state,
978 &source_config,
979 )
980 .await;
981
982 output_handle.give_container(&output_cap, &mut output_updates);
983 }
984 }
985 });
986
987 (
988 output.as_collection().map(|result| match result {
989 Ok(ok) => Ok(ok),
990 Err(err) => Err(DataflowError::from(EnvelopeError::Upsert(*err))),
991 }),
992 health_stream,
993 snapshot_stream,
994 shutdown_button.press_on_drop(),
995 )
996}
997
998#[async_trait::async_trait(?Send)]
999pub(crate) trait UpsertErrorEmitter<T> {
1000 async fn emit(&mut self, context: String, e: anyhow::Error);
1001}
1002
1003#[async_trait::async_trait(?Send)]
1004impl<T: Timestamp> UpsertErrorEmitter<T>
1005 for (
1006 &mut AsyncOutputHandle<
1007 T,
1008 CapacityContainerBuilder<Vec<(Option<GlobalId>, HealthStatusUpdate)>>,
1009 >,
1010 &Capability<T>,
1011 )
1012{
1013 async fn emit(&mut self, context: String, e: anyhow::Error) {
1014 process_upsert_state_error::<T>(context, e, self.0, self.1).await
1015 }
1016}
1017
1018async fn process_upsert_state_error<T: Timestamp>(
1020 context: String,
1021 e: anyhow::Error,
1022 health_output: &AsyncOutputHandle<
1023 T,
1024 CapacityContainerBuilder<Vec<(Option<GlobalId>, HealthStatusUpdate)>>,
1025 >,
1026 health_cap: &Capability<T>,
1027) {
1028 let update = HealthStatusUpdate::halting(e.context(context).to_string_with_causes(), None);
1029 health_output.give(health_cap, (None, update));
1030 std::future::pending::<()>().await;
1031 unreachable!("pending future never returns");
1032}