mz_storage/upsert_continual_feedback.rs
1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Implementation of feedback UPSERT operator and associated helpers. See
11//! [`upsert_inner`] for a description of how the operator works and why.
12
13use std::cmp::Reverse;
14use std::fmt::Debug;
15use std::sync::Arc;
16
17use differential_dataflow::hashable::Hashable;
18use differential_dataflow::{AsCollection, VecCollection};
19use indexmap::map::Entry;
20use itertools::Itertools;
21use mz_repr::{Diff, GlobalId, Row};
22use mz_storage_types::errors::{DataflowError, EnvelopeError};
23use mz_timely_util::builder_async::{
24 Event as AsyncEvent, OperatorBuilder as AsyncOperatorBuilder, PressOnDropButton,
25};
26use std::convert::Infallible;
27use timely::container::CapacityContainerBuilder;
28use timely::dataflow::channels::pact::Exchange;
29use timely::dataflow::operators::{Capability, CapabilitySet};
30use timely::dataflow::{Scope, Stream};
31use timely::order::{PartialOrder, TotalOrder};
32use timely::progress::timestamp::Refines;
33use timely::progress::{Antichain, Timestamp};
34
35use crate::healthcheck::HealthStatusUpdate;
36use crate::metrics::upsert::UpsertMetrics;
37use crate::upsert::UpsertConfig;
38use crate::upsert::UpsertErrorEmitter;
39use crate::upsert::UpsertKey;
40use crate::upsert::UpsertValue;
41use crate::upsert::types::UpsertValueAndSize;
42use crate::upsert::types::{self as upsert_types, ValueMetadata};
43use crate::upsert::types::{StateValue, UpsertState, UpsertStateBackend};
44
45/// An operator that transforms an input stream of upserts (updates to key-value
46/// pairs), which represents an imaginary key-value state, into a differential
47/// collection. It keeps an internal map-like state which keeps the latest value
48/// for each key, such that it can emit the retractions and additions implied by
49/// a new update for a given key.
50///
51/// This operator is intended to be used in an ingestion pipeline that reads
52/// from an external source, and the output of this operator is eventually
53/// written to persist.
54///
55/// The operator has two inputs: a) the source input, of upserts, and b) a
56/// persist input that feeds back the upsert state to the operator. Below, there
57/// is a section for each input that describes how and why we process updates
58/// from each input.
59///
60/// An important property of this operator is that it does _not_ update the
61/// map-like state that it keeps for translating the stream of upserts into a
62/// differential collection when it processes source input. It _only_ updates
63/// the map-like state based on updates from the persist (feedback) input. We do
64/// this because the operator is expected to be used in cases where there are
65/// multiple concurrent instances of the same ingestion pipeline, and the
66/// different instances might see different input because of concurrency and
67/// non-determinism. All instances of the upsert operator must produce output
68/// that is consistent with the current state of the output (that all instances
69/// produce "collaboratively"). This global state is what the operator
70/// continually learns about via updates from the persist input.
71///
72/// ## Processing the Source Input
73///
74/// Updates on the source input are stashed/staged until they can be processed.
75/// Whether or not an update can be processed depends both on the upper frontier
76/// of the source input and on the upper frontier of the persist input:
77///
78/// - Input updates are only processed once their timestamp is "done", that is
79/// the input upper is no longer `less_equal` their timestamp.
80///
81/// - Input updates are only processed once they are at the persist upper, that
82/// is we have emitted and written down updates for all previous times and we
83/// have updated our map-like state to the latest global state of the output of
84/// the ingestion pipeline. We know this is the case when the persist upper is
85/// no longer `less_than` their timestamp.
86///
87/// As an optimization, we allow processing input updates when they are right at
88/// the input frontier. This is called _partial emission_ because we are
89/// emitting updates that might be retracted when processing more updates from
90/// the same timestamp. In order to be able to process these updates we keep
91/// _provisional values_ in our upsert state. These will be overwritten when we
92/// get the final upsert values on the persist input.
93///
94/// ## Processing the Persist Input
95///
96/// We continually ingest updates from the persist input into our state using
97/// `UpsertState::consolidate_chunk`. We might be ingesting updates from the
98/// initial snapshot (when starting the operator) that are not consolidated or
99/// we might be ingesting updates from a partial emission (see above). In either
100/// case, our input might not be consolidated and `consolidate_chunk` is able to
101/// handle that.
102pub fn upsert_inner<G: Scope, FromTime, F, Fut, US>(
103 input: &VecCollection<G, (UpsertKey, Option<UpsertValue>, FromTime), Diff>,
104 key_indices: Vec<usize>,
105 resume_upper: Antichain<G::Timestamp>,
106 persist_input: VecCollection<G, Result<Row, DataflowError>, Diff>,
107 mut persist_token: Option<Vec<PressOnDropButton>>,
108 upsert_metrics: UpsertMetrics,
109 source_config: crate::source::SourceExportCreationConfig,
110 state_fn: F,
111 upsert_config: UpsertConfig,
112 prevent_snapshot_buffering: bool,
113 snapshot_buffering_max: Option<usize>,
114) -> (
115 VecCollection<G, Result<Row, DataflowError>, Diff>,
116 Stream<G, (Option<GlobalId>, HealthStatusUpdate)>,
117 Stream<G, Infallible>,
118 PressOnDropButton,
119)
120where
121 G::Timestamp: Refines<mz_repr::Timestamp> + TotalOrder + Sync,
122 F: FnOnce() -> Fut + 'static,
123 Fut: std::future::Future<Output = US>,
124 US: UpsertStateBackend<G::Timestamp, FromTime>,
125 FromTime: Debug + timely::ExchangeData + Ord + Sync,
126{
127 let mut builder = AsyncOperatorBuilder::new("Upsert".to_string(), input.scope());
128
129 // We only care about UpsertValueError since this is the only error that we can retract
130 let persist_input = persist_input.flat_map(move |result| {
131 let value = match result {
132 Ok(ok) => Ok(ok),
133 Err(DataflowError::EnvelopeError(err)) => match *err {
134 EnvelopeError::Upsert(err) => Err(Box::new(err)),
135 _ => return None,
136 },
137 Err(_) => return None,
138 };
139 let value_ref = match value {
140 Ok(ref row) => Ok(row),
141 Err(ref err) => Err(&**err),
142 };
143 Some((UpsertKey::from_value(value_ref, &key_indices), value))
144 });
145 let (output_handle, output) = builder.new_output::<CapacityContainerBuilder<_>>();
146
147 // An output that just reports progress of the snapshot consolidation process upstream to the
148 // persist source to ensure that backpressure is applied
149 let (_snapshot_handle, snapshot_stream) =
150 builder.new_output::<CapacityContainerBuilder<Vec<Infallible>>>();
151
152 let (mut health_output, health_stream) = builder.new_output();
153 let mut input = builder.new_input_for(
154 &input.inner,
155 Exchange::new(move |((key, _, _), _, _)| UpsertKey::hashed(key)),
156 &output_handle,
157 );
158
159 let mut persist_input = builder.new_disconnected_input(
160 &persist_input.inner,
161 Exchange::new(|((key, _), _, _)| UpsertKey::hashed(key)),
162 );
163
164 let upsert_shared_metrics = Arc::clone(&upsert_metrics.shared);
165
166 let shutdown_button = builder.build(move |caps| async move {
167 let [output_cap, snapshot_cap, health_cap]: [_; 3] = caps.try_into().unwrap();
168 drop(output_cap);
169 let mut snapshot_cap = CapabilitySet::from_elem(snapshot_cap);
170
171 let mut state = UpsertState::<_, G::Timestamp, FromTime>::new(
172 state_fn().await,
173 upsert_shared_metrics,
174 &upsert_metrics,
175 source_config.source_statistics.clone(),
176 upsert_config.shrink_upsert_unused_buffers_by_ratio,
177 );
178
179 // True while we're still reading the initial "snapshot" (a whole bunch
180 // of updates, all at the same initial timestamp) from our persist
181 // input or while we're reading the initial snapshot from the upstream
182 // source.
183 let mut hydrating = true;
184
185 // A re-usable buffer of changes, per key. This is an `IndexMap` because it has to be `drain`-able
186 // and have a consistent iteration order.
187 let mut commands_state: indexmap::IndexMap<_, upsert_types::UpsertValueAndSize<G::Timestamp, FromTime>> =
188 indexmap::IndexMap::new();
189 let mut multi_get_scratch = Vec::new();
190
191 // For stashing source input while it's not eligible for processing.
192 let mut stash = vec![];
193 // A capability suitable for emitting any updates based on stash. No capability is held
194 // when the stash is empty.
195 let mut stash_cap: Option<Capability<G::Timestamp>> = None;
196 let mut input_upper = Antichain::from_elem(Timestamp::minimum());
197 let mut partial_drain_time = None;
198
199 // For our persist/feedback input, both of these.
200 let mut persist_stash = vec![];
201 let mut persist_upper = Antichain::from_elem(Timestamp::minimum());
202
203 // We keep track of the largest timestamp seen on the persist input so
204 // that we can block processing source input while that timestamp is
205 // beyond the persist frontier. While ingesting updates of a timestamp,
206 // our upsert state is in a consolidating state, and trying to read it
207 // at that time would yield a panic.
208 //
209 // NOTE(aljoscha): You would think that it cannot happen that we even
210 // attempt to process source updates while the state is in a
211 // consolidating state, because we always wait until the persist
212 // frontier "catches up" with the timestamp of the source input. If
213 // there is only this here UPSERT operator and no concurrent instances,
214 // this is true. But with concurrent instances it can happen that an
215 // operator that is faster than us makes it so updates get written to
216 // persist. And we would then be ingesting them.
217 let mut largest_seen_persist_ts: Option<G::Timestamp> = None;
218
219 // A buffer for our output.
220 let mut output_updates = vec![];
221
222 let mut error_emitter = (&mut health_output, &health_cap);
223
224
225 loop {
226 tokio::select! {
227 _ = persist_input.ready() => {
228 // Read away as much input as we can.
229 while let Some(persist_event) = persist_input.next_sync() {
230 match persist_event {
231 AsyncEvent::Data(time, data) => {
232 tracing::trace!(
233 worker_id = %source_config.worker_id,
234 source_id = %source_config.id,
235 time=?time,
236 updates=%data.len(),
237 "received persist data");
238
239 persist_stash.extend(data.into_iter().map(|((key, value), ts, diff)| {
240 largest_seen_persist_ts = std::cmp::max(largest_seen_persist_ts.clone(), Some(ts.clone()));
241 (key, value, ts, diff)
242 }));
243 }
244 AsyncEvent::Progress(upper) => {
245 tracing::trace!(
246 worker_id = %source_config.worker_id,
247 source_id = %source_config.id,
248 ?upper,
249 "received persist progress");
250 persist_upper = upper;
251 }
252 }
253 }
254
255 let last_rehydration_chunk =
256 hydrating && PartialOrder::less_equal(&resume_upper, &persist_upper);
257
258 tracing::debug!(
259 worker_id = %source_config.worker_id,
260 source_id = %source_config.id,
261 persist_stash = %persist_stash.len(),
262 %hydrating,
263 %last_rehydration_chunk,
264 ?resume_upper,
265 ?persist_upper,
266 "ingesting persist snapshot chunk");
267
268 let persist_stash_iter = persist_stash
269 .drain(..)
270 .map(|(key, val, _ts, diff)| (key, val, diff));
271
272 match state
273 .consolidate_chunk(
274 persist_stash_iter,
275 last_rehydration_chunk,
276 )
277 .await
278 {
279 Ok(_) => {}
280 Err(e) => {
281 // Make sure our persist source can shut down.
282 persist_token.take();
283 snapshot_cap.downgrade(&[]);
284 UpsertErrorEmitter::<G>::emit(
285 &mut error_emitter,
286 "Failed to rehydrate state".to_string(),
287 e,
288 )
289 .await;
290 }
291 }
292
293 tracing::debug!(
294 worker_id = %source_config.worker_id,
295 source_id = %source_config.id,
296 ?resume_upper,
297 ?persist_upper,
298 "downgrading snapshot cap",
299 );
300
301 // Only downgrade this _after_ ingesting the data, because
302 // that can actually take quite some time, and we don't want
303 // to announce that we're done ingesting the initial
304 // snapshot too early.
305 //
306 // When we finish ingesting our initial persist snapshot,
307 // during "re-hydration", we downgrade this to the empty
308 // frontier, so we need to be lenient to this failing from
309 // then on.
310 let _ = snapshot_cap.try_downgrade(persist_upper.iter());
311
312
313
314 if last_rehydration_chunk {
315 hydrating = false;
316
317 tracing::info!(
318 worker_id = %source_config.worker_id,
319 source_id = %source_config.id,
320 "upsert source finished rehydration",
321 );
322
323 snapshot_cap.downgrade(&[]);
324 }
325
326 }
327 _ = input.ready() => {
328 let mut events_processed = 0;
329 while let Some(event) = input.next_sync() {
330 match event {
331 AsyncEvent::Data(cap, mut data) => {
332 tracing::trace!(
333 worker_id = %source_config.worker_id,
334 source_id = %source_config.id,
335 time=?cap.time(),
336 updates=%data.len(),
337 "received data");
338
339 let event_time = cap.time().clone();
340
341 stage_input(
342 &mut stash,
343 &mut data,
344 &input_upper,
345 &resume_upper,
346 );
347 if !stash.is_empty() {
348 // Update the stashed capability to the minimum
349 stash_cap = match stash_cap {
350 Some(stash_cap) => {
351 if cap.time() < stash_cap.time() {
352 Some(cap)
353 } else {
354 Some(stash_cap)
355 }
356 }
357 None => Some(cap)
358 };
359 }
360
361 if prevent_snapshot_buffering && input_upper.as_option() == Some(&event_time) {
362 tracing::debug!(
363 worker_id = %source_config.worker_id,
364 source_id = %source_config.id,
365 ?event_time,
366 ?resume_upper,
367 ?input_upper,
368 "allowing partial drain");
369 partial_drain_time = Some(event_time.clone());
370 } else {
371 tracing::debug!(
372 worker_id = %source_config.worker_id,
373 source_id = %source_config.id,
374 %prevent_snapshot_buffering,
375 ?event_time,
376 ?resume_upper,
377 ?input_upper,
378 "not allowing partial drain");
379 }
380 }
381 AsyncEvent::Progress(upper) => {
382 tracing::trace!(
383 worker_id = %source_config.worker_id,
384 source_id = %source_config.id,
385 ?upper,
386 "received progress");
387
388 // Ignore progress updates before the `resume_upper`, which is our initial
389 // capability post-snapshotting.
390 if PartialOrder::less_than(&upper, &resume_upper) {
391 tracing::trace!(
392 worker_id = %source_config.worker_id,
393 source_id = %source_config.id,
394 ?upper,
395 ?resume_upper,
396 "ignoring progress updates before resume_upper");
397 continue;
398 }
399
400 // Disable partial drain, because this progress
401 // update has moved the frontier. We might allow
402 // it again once we receive data right at the
403 // frontier again.
404 partial_drain_time = None;
405 input_upper = upper;
406 }
407 }
408
409 events_processed += 1;
410 if let Some(max) = snapshot_buffering_max {
411 if events_processed >= max {
412 break;
413 }
414 }
415 }
416 }
417 };
418
419 // While we have partially ingested updates of a timestamp our state
420 // is in an inconsistent/consolidating state and accessing it would
421 // panic.
422 if let Some(largest_seen_persist_ts) = largest_seen_persist_ts.as_ref() {
423 let largest_seen_outer_persist_ts = largest_seen_persist_ts.clone().to_outer();
424 let outer_persist_upper = persist_upper.iter().map(|ts| ts.clone().to_outer());
425 let outer_persist_upper = Antichain::from_iter(outer_persist_upper);
426 if outer_persist_upper.less_equal(&largest_seen_outer_persist_ts) {
427 continue;
428 }
429 }
430
431 // We try and drain from our stash every time we go through the
432 // loop. More of our stash can become eligible for draining both
433 // when the source-input frontier advances or when the persist
434 // frontier advances.
435 if !stash.is_empty() {
436 let cap = stash_cap.as_mut().expect("missing capability for non-empty stash");
437
438 tracing::trace!(
439 worker_id = %source_config.worker_id,
440 source_id = %source_config.id,
441 ?cap,
442 ?stash,
443 "stashed updates");
444
445 let mut min_remaining_time = drain_staged_input::<_, G, _, _, _>(
446 &mut stash,
447 &mut commands_state,
448 &mut output_updates,
449 &mut multi_get_scratch,
450 DrainStyle::ToUpper{input_upper: &input_upper, persist_upper: &persist_upper},
451 &mut error_emitter,
452 &mut state,
453 &source_config,
454 )
455 .await;
456
457 tracing::trace!(
458 worker_id = %source_config.worker_id,
459 source_id = %source_config.id,
460 output_updates = %output_updates.len(),
461 "output updates for complete timestamp");
462
463 for (update, ts, diff) in output_updates.drain(..) {
464 output_handle.give(cap, (update, ts, diff));
465 }
466
467 if !stash.is_empty() {
468 let min_remaining_time = min_remaining_time.take().expect("we still have updates left");
469 cap.downgrade(&min_remaining_time);
470 } else {
471 stash_cap = None;
472 }
473 }
474
475
476 if input_upper.is_empty() {
477 tracing::debug!(
478 worker_id = %source_config.worker_id,
479 source_id = %source_config.id,
480 "input exhausted, shutting down");
481 break;
482 };
483
484 // If there were staged events that occurred at the capability time, drain
485 // them. This is safe because out-of-order updates to the same key that are
486 // drained in separate calls to `drain_staged_input` are correctly ordered by
487 // their `FromTime` in `drain_staged_input`.
488 //
489 // Note also that this may result in more updates in the output collection than
490 // the minimum. However, because the frontier only advances on `Progress` updates,
491 // the collection always accumulates correctly for all keys.
492 if let Some(partial_drain_time) = &partial_drain_time {
493 if !stash.is_empty() {
494 let cap = stash_cap.as_mut().expect("missing capability for non-empty stash");
495
496 tracing::trace!(
497 worker_id = %source_config.worker_id,
498 source_id = %source_config.id,
499 ?cap,
500 ?stash,
501 "stashed updates");
502
503 let mut min_remaining_time = drain_staged_input::<_, G, _, _, _>(
504 &mut stash,
505 &mut commands_state,
506 &mut output_updates,
507 &mut multi_get_scratch,
508 DrainStyle::AtTime{
509 time: partial_drain_time.clone(),
510 persist_upper: &persist_upper
511 },
512 &mut error_emitter,
513 &mut state,
514 &source_config,
515 )
516 .await;
517
518 tracing::trace!(
519 worker_id = %source_config.worker_id,
520 source_id = %source_config.id,
521 output_updates = %output_updates.len(),
522 "output updates for partial timestamp");
523
524 for (update, ts, diff) in output_updates.drain(..) {
525 output_handle.give(cap, (update, ts, diff));
526 }
527
528 if !stash.is_empty() {
529 let min_remaining_time = min_remaining_time.take().expect("we still have updates left");
530 cap.downgrade(&min_remaining_time);
531 } else {
532 stash_cap = None;
533 }
534 }
535 }
536 }
537 });
538
539 (
540 output
541 .as_collection()
542 .map(|result: UpsertValue| match result {
543 Ok(ok) => Ok(ok),
544 Err(err) => Err(DataflowError::from(EnvelopeError::Upsert(*err))),
545 }),
546 health_stream,
547 snapshot_stream,
548 shutdown_button.press_on_drop(),
549 )
550}
551
552/// Helper method for [`upsert_inner`] used to stage `data` updates
553/// from the input/source timely edge.
554#[allow(clippy::disallowed_types)]
555fn stage_input<T, FromTime>(
556 stash: &mut Vec<(T, UpsertKey, Reverse<FromTime>, Option<UpsertValue>)>,
557 data: &mut Vec<((UpsertKey, Option<UpsertValue>, FromTime), T, Diff)>,
558 input_upper: &Antichain<T>,
559 resume_upper: &Antichain<T>,
560) where
561 T: PartialOrder + timely::progress::Timestamp,
562 FromTime: Ord,
563{
564 if PartialOrder::less_equal(input_upper, resume_upper) {
565 data.retain(|(_, ts, _)| resume_upper.less_equal(ts));
566 }
567
568 stash.extend(data.drain(..).map(|((key, value, order), time, diff)| {
569 assert!(diff.is_positive(), "invalid upsert input");
570 (time, key, Reverse(order), value)
571 }));
572}
573
574/// The style of drain we are performing on the stash. `AtTime`-drains cannot
575/// assume that all values have been seen, and must leave tombstones behind for deleted values.
576#[derive(Debug)]
577enum DrainStyle<'a, T> {
578 ToUpper {
579 input_upper: &'a Antichain<T>,
580 persist_upper: &'a Antichain<T>,
581 },
582 // For partial draining when taking the source snapshot.
583 AtTime {
584 time: T,
585 persist_upper: &'a Antichain<T>,
586 },
587}
588
589/// Helper method for [`upsert_inner`] used to stage `data` updates
590/// from the input timely edge.
591///
592/// Returns the minimum observed time across the updates that remain in the
593/// stash or `None` if none are left.
594///
595/// ## Correctness
596///
597/// It is safe to call this function multiple times with the same `persist_upper` provided that the
598/// drain style is `AtTime`, which updates the state such that past actions are remembered and can
599/// be undone in subsequent calls.
600///
601/// It is *not* safe to call this function more than once with the same `persist_upper` and a
602/// `ToUpper` drain style. Doing so causes all calls except the first one to base their work on
603/// stale state, since in this drain style no modifications to the state are made.
604async fn drain_staged_input<S, G, T, FromTime, E>(
605 stash: &mut Vec<(T, UpsertKey, Reverse<FromTime>, Option<UpsertValue>)>,
606 commands_state: &mut indexmap::IndexMap<UpsertKey, UpsertValueAndSize<T, FromTime>>,
607 output_updates: &mut Vec<(UpsertValue, T, Diff)>,
608 multi_get_scratch: &mut Vec<UpsertKey>,
609 drain_style: DrainStyle<'_, T>,
610 error_emitter: &mut E,
611 state: &mut UpsertState<'_, S, T, FromTime>,
612 source_config: &crate::source::SourceExportCreationConfig,
613) -> Option<T>
614where
615 S: UpsertStateBackend<T, FromTime>,
616 G: Scope,
617 T: TotalOrder + timely::ExchangeData + Debug + Ord + Sync,
618 FromTime: timely::ExchangeData + Ord + Sync,
619 E: UpsertErrorEmitter<G>,
620{
621 let mut min_remaining_time = Antichain::new();
622
623 let mut eligible_updates = stash
624 .extract_if(.., |(ts, _, _, _)| {
625 let eligible = match &drain_style {
626 DrainStyle::ToUpper {
627 input_upper,
628 persist_upper,
629 } => {
630 // We make sure that a) we only process updates when we know their
631 // timestamp is complete, that is there will be no more updates for
632 // that timestamp, and b) that "previous" times in the persist
633 // input are complete. The latter makes sure that we emit updates
634 // for the next timestamp that are consistent with the global state
635 // in the output persist shard, which also serves as a persistent
636 // copy of our in-memory/on-disk upsert state.
637 !input_upper.less_equal(ts) && !persist_upper.less_than(ts)
638 }
639 DrainStyle::AtTime {
640 time,
641 persist_upper,
642 } => {
643 // Even when emitting partial updates, we still need to wait
644 // until "previous" times in the persist input are complete.
645 *ts <= *time && !persist_upper.less_than(ts)
646 }
647 };
648
649 if !eligible {
650 min_remaining_time.insert(ts.clone());
651 }
652
653 eligible
654 })
655 .filter(|(ts, _, _, _)| {
656 let persist_upper = match &drain_style {
657 DrainStyle::ToUpper {
658 input_upper: _,
659 persist_upper,
660 } => persist_upper,
661 DrainStyle::AtTime {
662 time: _,
663 persist_upper,
664 } => persist_upper,
665 };
666
667 // Any update that is "in the past" of the persist upper is not
668 // relevant anymore. We _can_ emit changes for it, but the
669 // downstream persist_sink would filter these updates out because
670 // the shard upper is already further ahead.
671 //
672 // Plus, our upsert state is up-to-date to the persist_upper, so we
673 // wouldn't be able to emit correct retractions for incoming
674 // commands whose `ts` is in the past of that.
675 let relevant = persist_upper.less_equal(ts);
676 relevant
677 })
678 .collect_vec();
679
680 tracing::debug!(
681 worker_id = %source_config.worker_id,
682 source_id = %source_config.id,
683 ?drain_style,
684 remaining = %stash.len(),
685 eligible = eligible_updates.len(),
686 "draining stash");
687
688 // Sort the eligible updates by (key, time, Reverse(from_time)) so that
689 // deduping by (key, time) gives the latest change for that key.
690 eligible_updates.sort_unstable_by(|a, b| {
691 let (ts1, key1, from_ts1, val1) = a;
692 let (ts2, key2, from_ts2, val2) = b;
693 Ord::cmp(&(ts1, key1, from_ts1, val1), &(ts2, key2, from_ts2, val2))
694 });
695
696 // Read the previous values _per key_ out of `state`, recording it
697 // along with the value with the _latest timestamp for that key_.
698 commands_state.clear();
699 for (_, key, _, _) in eligible_updates.iter() {
700 commands_state.entry(*key).or_default();
701 }
702
703 // These iterators iterate in the same order because `commands_state`
704 // is an `IndexMap`.
705 multi_get_scratch.clear();
706 multi_get_scratch.extend(commands_state.iter().map(|(k, _)| *k));
707 match state
708 .multi_get(multi_get_scratch.drain(..), commands_state.values_mut())
709 .await
710 {
711 Ok(_) => {}
712 Err(e) => {
713 error_emitter
714 .emit("Failed to fetch records from state".to_string(), e)
715 .await;
716 }
717 }
718
719 // From the prefix that can be emitted we can deduplicate based on (ts, key) in
720 // order to only process the command with the maximum order within the (ts,
721 // key) group. This is achieved by wrapping order in `Reverse(FromTime)` above.;
722 let mut commands = eligible_updates.into_iter().dedup_by(|a, b| {
723 let ((a_ts, a_key, _, _), (b_ts, b_key, _, _)) = (a, b);
724 a_ts == b_ts && a_key == b_key
725 });
726
727 let bincode_opts = upsert_types::upsert_bincode_opts();
728 // Upsert the values into `commands_state`, by recording the latest
729 // value (or deletion). These will be synced at the end to the `state`.
730 //
731 // Note that we are effectively doing "mini-upsert" here, using
732 // `command_state`. This "mini-upsert" is seeded with data from `state`, using
733 // a single `multi_get` above, and the final state is written out into
734 // `state` using a single `multi_put`. This simplifies `UpsertStateBackend`
735 // implementations, and reduces the number of reads and write we need to do.
736 //
737 // This "mini-upsert" technique is actually useful in `UpsertState`'s
738 // `consolidate_snapshot_read_write_inner` implementation, minimizing gets and puts on
739 // the `UpsertStateBackend` implementations. In some sense, its "upsert all the way down".
740 while let Some((ts, key, from_time, value)) = commands.next() {
741 let mut command_state = if let Entry::Occupied(command_state) = commands_state.entry(key) {
742 command_state
743 } else {
744 panic!("key missing from commands_state");
745 };
746
747 let existing_state_cell = &mut command_state.get_mut().value;
748
749 if let Some(cs) = existing_state_cell.as_mut() {
750 cs.ensure_decoded(bincode_opts, source_config.id);
751 }
752
753 // Skip this command if its order key is below the one in the upsert state.
754 // Note that the existing order key may be `None` if the existing value
755 // is from snapshotting, which always sorts below new values/deletes.
756 let existing_order = existing_state_cell
757 .as_ref()
758 .and_then(|cs| cs.provisional_order(&ts));
759 if existing_order >= Some(&from_time.0) {
760 // Skip this update. If no later updates adjust this key, then we just
761 // end up writing the same value back to state. If there
762 // is nothing in the state, `existing_order` is `None`, and this
763 // does not occur.
764 continue;
765 }
766
767 match value {
768 Some(value) => {
769 if let Some(old_value) = existing_state_cell.as_ref() {
770 if let Some(old_value) = old_value.provisional_value_ref(&ts) {
771 output_updates.push((old_value.clone(), ts.clone(), Diff::MINUS_ONE));
772 }
773 }
774
775 match &drain_style {
776 DrainStyle::AtTime { .. } => {
777 let existing_value = existing_state_cell.take();
778
779 let new_value = match existing_value {
780 Some(existing_value) => existing_value.clone().into_provisional_value(
781 value.clone(),
782 ts.clone(),
783 from_time.0.clone(),
784 ),
785 None => StateValue::new_provisional_value(
786 value.clone(),
787 ts.clone(),
788 from_time.0.clone(),
789 ),
790 };
791
792 existing_state_cell.replace(new_value);
793 }
794 DrainStyle::ToUpper { .. } => {
795 // Not writing down provisional values, or anything.
796 }
797 };
798
799 output_updates.push((value, ts, Diff::ONE));
800 }
801 None => {
802 if let Some(old_value) = existing_state_cell.as_ref() {
803 if let Some(old_value) = old_value.provisional_value_ref(&ts) {
804 output_updates.push((old_value.clone(), ts.clone(), Diff::MINUS_ONE));
805 }
806 }
807
808 match &drain_style {
809 DrainStyle::AtTime { .. } => {
810 let existing_value = existing_state_cell.take();
811
812 let new_value = match existing_value {
813 Some(existing_value) => existing_value
814 .into_provisional_tombstone(ts.clone(), from_time.0.clone()),
815 None => StateValue::new_provisional_tombstone(
816 ts.clone(),
817 from_time.0.clone(),
818 ),
819 };
820
821 existing_state_cell.replace(new_value);
822 }
823 DrainStyle::ToUpper { .. } => {
824 // Not writing down provisional values, or anything.
825 }
826 }
827 }
828 }
829 }
830
831 match &drain_style {
832 DrainStyle::AtTime { .. } => {
833 match state
834 .multi_put(
835 // We don't want to update per-record stats, like size of
836 // records indexed or count of records indexed.
837 //
838 // We only add provisional values and these will be
839 // overwritten once we receive updates for state from the
840 // persist input. And the merge functionality cannot know
841 // what was in state before merging, so it cannot correctly
842 // retract/update stats added here.
843 //
844 // Mostly, the merge functionality can't update those stats
845 // because merging happens in a function that we pass to
846 // rocksdb which doesn't have access to any external
847 // context. And in general, with rocksdb we do blind writes
848 // rather than inspect what was there before when
849 // updating/inserting.
850 false,
851 commands_state.drain(..).map(|(k, cv)| {
852 (
853 k,
854 upsert_types::PutValue {
855 value: cv.value.map(|cv| cv.into_decoded()),
856 previous_value_metadata: cv.metadata.map(|v| ValueMetadata {
857 size: v.size.try_into().expect("less than i64 size"),
858 is_tombstone: v.is_tombstone,
859 }),
860 },
861 )
862 }),
863 )
864 .await
865 {
866 Ok(_) => {}
867 Err(e) => {
868 error_emitter
869 .emit("Failed to update records in state".to_string(), e)
870 .await;
871 }
872 }
873 }
874 style => {
875 tracing::trace!(
876 worker_id = %source_config.worker_id,
877 source_id = %source_config.id,
878 "not doing state update for drain style {:?}", style);
879 }
880 }
881
882 min_remaining_time.into_option()
883}
884
885#[cfg(test)]
886mod test {
887 use std::sync::mpsc;
888
889 use mz_ore::metrics::MetricsRegistry;
890 use mz_persist_types::ShardId;
891 use mz_repr::{Datum, Timestamp as MzTimestamp};
892 use mz_rocksdb::{RocksDBConfig, ValueIterator};
893 use mz_storage_operators::persist_source::Subtime;
894 use mz_storage_types::sources::SourceEnvelope;
895 use mz_storage_types::sources::envelope::{KeyEnvelope, UpsertEnvelope, UpsertStyle};
896 use rocksdb::Env;
897 use timely::dataflow::operators::capture::Extract;
898 use timely::dataflow::operators::{Capture, Input, Probe};
899 use timely::progress::Timestamp;
900
901 use crate::metrics::StorageMetrics;
902 use crate::metrics::upsert::UpsertMetricDefs;
903 use crate::source::SourceExportCreationConfig;
904 use crate::statistics::{SourceStatistics, SourceStatisticsMetricDefs};
905 use crate::upsert::memory::InMemoryHashMap;
906 use crate::upsert::types::{BincodeOpts, consolidating_merge_function, upsert_bincode_opts};
907
908 use super::*;
909
910 #[mz_ore::test]
911 #[cfg_attr(miri, ignore)]
912 fn gh_9160_repro() {
913 // Helper to wrap timestamps in the appropriate types
914 let new_ts = |ts| (MzTimestamp::new(ts), Subtime::minimum());
915
916 let output_handle = timely::execute_directly(move |worker| {
917 let (mut input_handle, mut persist_handle, output_handle) = worker
918 .dataflow::<MzTimestamp, _, _>(|scope| {
919 // Enter a subscope since the upsert operator expects to work a backpressure
920 // enabled scope.
921 scope.scoped::<(MzTimestamp, Subtime), _, _>("upsert", |scope| {
922 let (input_handle, input) = scope.new_input();
923 let (persist_handle, persist_input) = scope.new_input();
924 let upsert_config = UpsertConfig {
925 shrink_upsert_unused_buffers_by_ratio: 0,
926 };
927 let source_id = GlobalId::User(0);
928 let metrics_registry = MetricsRegistry::new();
929 let upsert_metrics_defs =
930 UpsertMetricDefs::register_with(&metrics_registry);
931 let upsert_metrics =
932 UpsertMetrics::new(&upsert_metrics_defs, source_id, 0, None);
933
934 let metrics_registry = MetricsRegistry::new();
935 let storage_metrics = StorageMetrics::register_with(&metrics_registry);
936
937 let metrics_registry = MetricsRegistry::new();
938 let source_statistics_defs =
939 SourceStatisticsMetricDefs::register_with(&metrics_registry);
940 let envelope = SourceEnvelope::Upsert(UpsertEnvelope {
941 source_arity: 2,
942 style: UpsertStyle::Default(KeyEnvelope::Flattened),
943 key_indices: vec![0],
944 });
945 let source_statistics = SourceStatistics::new(
946 source_id,
947 0,
948 &source_statistics_defs,
949 source_id,
950 &ShardId::new(),
951 envelope,
952 Antichain::from_elem(Timestamp::minimum()),
953 );
954
955 let source_config = SourceExportCreationConfig {
956 id: GlobalId::User(0),
957 worker_id: 0,
958 metrics: storage_metrics,
959 source_statistics,
960 };
961
962 let (output, _, _, button) = upsert_inner(
963 &input.as_collection(),
964 vec![0],
965 Antichain::from_elem(Timestamp::minimum()),
966 persist_input.as_collection(),
967 None,
968 upsert_metrics,
969 source_config,
970 || async { InMemoryHashMap::default() },
971 upsert_config,
972 true,
973 None,
974 );
975 std::mem::forget(button);
976
977 (input_handle, persist_handle, output.inner.capture())
978 })
979 });
980
981 // We work with a hypothetical schema of (key int, value int).
982
983 // The input will contain records for two keys, 0 and 1.
984 let key0 = UpsertKey::from_key(Ok(&Row::pack_slice(&[Datum::Int64(0)])));
985 let key1 = UpsertKey::from_key(Ok(&Row::pack_slice(&[Datum::Int64(1)])));
986
987 // We will assume that the kafka topic contains the following messages with their
988 // associated reclocked timestamp:
989 // 1. {offset=1, key=0, value=0} @ mz_time = 0
990 // 2. {offset=2, key=1, value=NULL} @ mz_time = 2 // <- deletion of unrelated key. Causes the operator
991 // // to maintain the associated cap to time 2
992 // 3. {offset=3, key=0, value=1} @ mz_time = 3
993 // 4. {offset=4, key=0, value=2} @ mz_time = 3 // <- messages 2 and 3 are reclocked to time 3
994 let value1 = Row::pack_slice(&[Datum::Int64(0), Datum::Int64(0)]);
995 let value3 = Row::pack_slice(&[Datum::Int64(0), Datum::Int64(1)]);
996 let value4 = Row::pack_slice(&[Datum::Int64(0), Datum::Int64(2)]);
997 let msg1 = (key0, Some(Ok(value1.clone())), 1);
998 let msg2 = (key1, None, 2);
999 let msg3 = (key0, Some(Ok(value3)), 3);
1000 let msg4 = (key0, Some(Ok(value4)), 4);
1001
1002 // The first message will initialize the upsert state such that key 0 has value 0 and
1003 // produce an output update to that effect.
1004 input_handle.send((msg1, new_ts(0), Diff::ONE));
1005 input_handle.advance_to(new_ts(2));
1006 worker.step();
1007
1008 // We assume this worker succesfully CAAs the update to the shard so we send it back
1009 // through the persist_input
1010 persist_handle.send((Ok(value1), new_ts(0), Diff::ONE));
1011 persist_handle.advance_to(new_ts(1));
1012 worker.step();
1013
1014 // Then, messages 2 and 3 are sent as one batch with capability = 2
1015 input_handle.send_batch(&mut vec![
1016 (msg2, new_ts(2), Diff::ONE),
1017 (msg3, new_ts(3), Diff::ONE),
1018 ]);
1019 // Advance our capability to 3
1020 input_handle.advance_to(new_ts(3));
1021 // Message 4 is sent with capability 3
1022 input_handle.send_batch(&mut vec![(msg4, new_ts(3), Diff::ONE)]);
1023 // Advance our capability to 4
1024 input_handle.advance_to(new_ts(4));
1025 // We now step the worker so that the pending data is received. This causes the
1026 // operator to store internally the following map from capabilities to updates:
1027 // cap=2 => [ msg2, msg3 ]
1028 // cap=3 => [ msg4 ]
1029 worker.step();
1030
1031 // We now assume that another replica raced us and processed msg1 at time 2, which in
1032 // this test is a no-op so the persist frontier advances to time 3 without new data.
1033 persist_handle.advance_to(new_ts(3));
1034 // We now step this worker again, which will notice that the persist upper is {3} and
1035 // wlil attempt to process msg3 and msg4 *separately*, causing it to produce a double
1036 // retraction.
1037 worker.step();
1038
1039 output_handle
1040 });
1041
1042 let mut actual_output = output_handle
1043 .extract()
1044 .into_iter()
1045 .flat_map(|(_cap, container)| container)
1046 .collect();
1047 differential_dataflow::consolidation::consolidate_updates(&mut actual_output);
1048
1049 // The expected consolidated output contains only updates for key 0 which has the value 0
1050 // at timestamp 0 and the value 2 at timestamp 3
1051 let value1 = Row::pack_slice(&[Datum::Int64(0), Datum::Int64(0)]);
1052 let value4 = Row::pack_slice(&[Datum::Int64(0), Datum::Int64(2)]);
1053 let expected_output: Vec<(Result<Row, DataflowError>, _, _)> = vec![
1054 (Ok(value1.clone()), new_ts(0), Diff::ONE),
1055 (Ok(value1), new_ts(3), Diff::MINUS_ONE),
1056 (Ok(value4), new_ts(3), Diff::ONE),
1057 ];
1058 assert_eq!(actual_output, expected_output);
1059 }
1060
1061 #[mz_ore::test]
1062 #[cfg_attr(miri, ignore)]
1063 fn gh_9540_repro() {
1064 // Helper to wrap timestamps in the appropriate types
1065 let mz_ts = |ts| (MzTimestamp::new(ts), Subtime::minimum());
1066 let (tx, rx) = mpsc::channel::<std::thread::JoinHandle<()>>();
1067
1068 let rocksdb_dir = tempfile::tempdir().unwrap();
1069 let output_handle = timely::execute_directly(move |worker| {
1070 let tx = tx.clone();
1071 let (mut input_handle, mut persist_handle, output_probe, output_handle) =
1072 worker.dataflow::<MzTimestamp, _, _>(|scope| {
1073 // Enter a subscope since the upsert operator expects to work a backpressure
1074 // enabled scope.
1075 scope.scoped::<(MzTimestamp, Subtime), _, _>("upsert", |scope| {
1076 let (input_handle, input) = scope.new_input();
1077 let (persist_handle, persist_input) = scope.new_input();
1078 let upsert_config = UpsertConfig {
1079 shrink_upsert_unused_buffers_by_ratio: 0,
1080 };
1081 let source_id = GlobalId::User(0);
1082 let metrics_registry = MetricsRegistry::new();
1083 let upsert_metrics_defs =
1084 UpsertMetricDefs::register_with(&metrics_registry);
1085 let upsert_metrics =
1086 UpsertMetrics::new(&upsert_metrics_defs, source_id, 0, None);
1087 let rocksdb_shared_metrics = Arc::clone(&upsert_metrics.rocksdb_shared);
1088 let rocksdb_instance_metrics =
1089 Arc::clone(&upsert_metrics.rocksdb_instance_metrics);
1090
1091 let metrics_registry = MetricsRegistry::new();
1092 let storage_metrics = StorageMetrics::register_with(&metrics_registry);
1093
1094 let metrics_registry = MetricsRegistry::new();
1095 let source_statistics_defs =
1096 SourceStatisticsMetricDefs::register_with(&metrics_registry);
1097 let envelope = SourceEnvelope::Upsert(UpsertEnvelope {
1098 source_arity: 2,
1099 style: UpsertStyle::Default(KeyEnvelope::Flattened),
1100 key_indices: vec![0],
1101 });
1102 let source_statistics = SourceStatistics::new(
1103 source_id,
1104 0,
1105 &source_statistics_defs,
1106 source_id,
1107 &ShardId::new(),
1108 envelope,
1109 Antichain::from_elem(Timestamp::minimum()),
1110 );
1111
1112 let source_config = SourceExportCreationConfig {
1113 id: GlobalId::User(0),
1114 worker_id: 0,
1115 metrics: storage_metrics,
1116 source_statistics,
1117 };
1118
1119 // A closure that will initialize and return a configured RocksDB instance
1120 let rocksdb_init_fn = move || async move {
1121 let merge_operator = Some((
1122 "upsert_state_snapshot_merge_v1".to_string(),
1123 |a: &[u8],
1124 b: ValueIterator<
1125 BincodeOpts,
1126 StateValue<(MzTimestamp, Subtime), u64>,
1127 >| {
1128 consolidating_merge_function::<(MzTimestamp, Subtime), u64>(
1129 a.into(),
1130 b,
1131 )
1132 },
1133 ));
1134 let rocksdb_cleanup_tries = 5;
1135 let tuning = RocksDBConfig::new(Default::default(), None);
1136 let mut rocksdb_inst = mz_rocksdb::RocksDBInstance::new(
1137 rocksdb_dir.path(),
1138 mz_rocksdb::InstanceOptions::new(
1139 Env::mem_env().unwrap(),
1140 rocksdb_cleanup_tries,
1141 merge_operator,
1142 // For now, just use the same config as the one used for
1143 // merging snapshots.
1144 upsert_bincode_opts(),
1145 ),
1146 tuning,
1147 rocksdb_shared_metrics,
1148 rocksdb_instance_metrics,
1149 )
1150 .unwrap();
1151
1152 let handle = rocksdb_inst.take_core_loop_handle().expect("join handle");
1153 tx.send(handle).expect("sent joinhandle");
1154 crate::upsert::rocksdb::RocksDB::new(rocksdb_inst)
1155 };
1156
1157 let (output, _, _, button) = upsert_inner(
1158 &input.as_collection(),
1159 vec![0],
1160 Antichain::from_elem(Timestamp::minimum()),
1161 persist_input.as_collection(),
1162 None,
1163 upsert_metrics,
1164 source_config,
1165 rocksdb_init_fn,
1166 upsert_config,
1167 true,
1168 None,
1169 );
1170 std::mem::forget(button);
1171
1172 (
1173 input_handle,
1174 persist_handle,
1175 output.inner.probe(),
1176 output.inner.capture(),
1177 )
1178 })
1179 });
1180
1181 // We work with a hypothetical schema of (key int, value int).
1182
1183 // The input will contain records for two keys, 0 and 1.
1184 let key0 = UpsertKey::from_key(Ok(&Row::pack_slice(&[Datum::Int64(0)])));
1185
1186 // We will assume that the kafka topic contains the following messages with their
1187 // associated reclocked timestamp:
1188 // 1. {offset=1, key=0, value=0} @ mz_time = 0
1189 // 2. {offset=2, key=0, value=NULL} @ mz_time = 1
1190 // 3. {offset=3, key=0, value=0} @ mz_time = 2
1191 // 4. {offset=4, key=0, value=NULL} @ mz_time = 2 // <- messages 3 and 4 are *BOTH* reclocked to time 2
1192 let value1 = Row::pack_slice(&[Datum::Int64(0), Datum::Int64(0)]);
1193 let msg1 = ((key0, Some(Ok(value1.clone())), 1), mz_ts(0), Diff::ONE);
1194 let msg2 = ((key0, None, 2), mz_ts(1), Diff::ONE);
1195 let msg3 = ((key0, Some(Ok(value1.clone())), 3), mz_ts(2), Diff::ONE);
1196 let msg4 = ((key0, None, 4), mz_ts(2), Diff::ONE);
1197
1198 // The first message will initialize the upsert state such that key 0 has value 0 and
1199 // produce an output update to that effect.
1200 input_handle.send(msg1);
1201 input_handle.advance_to(mz_ts(1));
1202 while output_probe.less_than(&mz_ts(1)) {
1203 worker.step_or_park(None);
1204 }
1205 // Feedback the produced output..
1206 persist_handle.send((Ok(value1.clone()), mz_ts(0), Diff::ONE));
1207 persist_handle.advance_to(mz_ts(1));
1208 // ..and send the next upsert command that deletes the key.
1209 input_handle.send(msg2);
1210 input_handle.advance_to(mz_ts(2));
1211 while output_probe.less_than(&mz_ts(2)) {
1212 worker.step_or_park(None);
1213 }
1214
1215 // Feedback the produced output..
1216 persist_handle.send((Ok(value1), mz_ts(1), Diff::MINUS_ONE));
1217 persist_handle.advance_to(mz_ts(2));
1218 // ..and send the next *out of order* upsert command that deletes the key. Here msg4
1219 // happens at offset 4 and the operator should rememeber that.
1220 input_handle.send(msg4);
1221 input_handle.flush();
1222 // Run the worker for enough steps to process these events. We can't guide the
1223 // execution with the probe here since the frontier does not advance, only provisional
1224 // updates are produced.
1225 for _ in 0..5 {
1226 worker.step();
1227 }
1228
1229 // Send the missing message that will now confuse the operator because it has lost
1230 // track that for key 0 it has already seen a command for offset 4, and therefore msg3
1231 // should be skipped.
1232 input_handle.send(msg3);
1233 input_handle.flush();
1234 input_handle.advance_to(mz_ts(3));
1235
1236 output_handle
1237 });
1238
1239 let mut actual_output = output_handle
1240 .extract()
1241 .into_iter()
1242 .flat_map(|(_cap, container)| container)
1243 .collect();
1244 differential_dataflow::consolidation::consolidate_updates(&mut actual_output);
1245
1246 // The expected consolidated output contains only updates for key 0 which has the value 0
1247 // at timestamp 0 and the value 2 at timestamp 3
1248 let value1 = Row::pack_slice(&[Datum::Int64(0), Datum::Int64(0)]);
1249 let expected_output: Vec<(Result<Row, DataflowError>, _, _)> = vec![
1250 (Ok(value1.clone()), mz_ts(0), Diff::ONE),
1251 (Ok(value1), mz_ts(1), Diff::MINUS_ONE),
1252 ];
1253 assert_eq!(actual_output, expected_output);
1254
1255 while let Ok(handle) = rx.recv() {
1256 handle.join().expect("threads completed successfully");
1257 }
1258 }
1259}