mz_storage/upsert_continual_feedback.rs
1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Implementation of feedback UPSERT operator and associated helpers. See
11//! [`upsert_inner`] for a description of how the operator works and why.
12
13use std::cmp::Reverse;
14// We don't care about the order, but we do want drain().
15#[allow(clippy::disallowed_types)]
16use std::collections::HashMap;
17use std::fmt::Debug;
18use std::sync::Arc;
19
20use differential_dataflow::hashable::Hashable;
21use differential_dataflow::{AsCollection, Collection};
22use indexmap::map::Entry;
23use itertools::Itertools;
24use mz_ore::vec::VecExt;
25use mz_repr::{Diff, GlobalId, Row};
26use mz_storage_types::errors::{DataflowError, EnvelopeError, UpsertError};
27use mz_timely_util::builder_async::{
28 Event as AsyncEvent, OperatorBuilder as AsyncOperatorBuilder, PressOnDropButton,
29};
30use std::convert::Infallible;
31use timely::container::CapacityContainerBuilder;
32use timely::dataflow::channels::pact::Exchange;
33use timely::dataflow::operators::{Capability, CapabilitySet};
34use timely::dataflow::{Scope, Stream};
35use timely::order::{PartialOrder, TotalOrder};
36use timely::progress::timestamp::Refines;
37use timely::progress::{Antichain, Timestamp};
38
39use crate::healthcheck::HealthStatusUpdate;
40use crate::metrics::upsert::UpsertMetrics;
41use crate::upsert::UpsertConfig;
42use crate::upsert::UpsertErrorEmitter;
43use crate::upsert::UpsertKey;
44use crate::upsert::UpsertValue;
45use crate::upsert::types::UpsertValueAndSize;
46use crate::upsert::types::{self as upsert_types, ValueMetadata};
47use crate::upsert::types::{StateValue, UpsertState, UpsertStateBackend};
48
49/// An operator that transforms an input stream of upserts (updates to key-value
50/// pairs), which represents an imaginary key-value state, into a differential
51/// collection. It keeps an internal map-like state which keeps the latest value
52/// for each key, such that it can emit the retractions and additions implied by
53/// a new update for a given key.
54///
55/// This operator is intended to be used in an ingestion pipeline that reads
56/// from an external source, and the output of this operator is eventually
57/// written to persist.
58///
59/// The operator has two inputs: a) the source input, of upserts, and b) a
60/// persist input that feeds back the upsert state to the operator. Below, there
61/// is a section for each input that describes how and why we process updates
62/// from each input.
63///
64/// An important property of this operator is that it does _not_ update the
65/// map-like state that it keeps for translating the stream of upserts into a
66/// differential collection when it processes source input. It _only_ updates
67/// the map-like state based on updates from the persist (feedback) input. We do
68/// this because the operator is expected to be used in cases where there are
69/// multiple concurrent instances of the same ingestion pipeline, and the
70/// different instances might see different input because of concurrency and
71/// non-determinism. All instances of the upsert operator must produce output
72/// that is consistent with the current state of the output (that all instances
73/// produce "collaboratively"). This global state is what the operator
74/// continually learns about via updates from the persist input.
75///
76/// ## Processing the Source Input
77///
78/// Updates on the source input are stashed/staged until they can be processed.
79/// Whether or not an update can be processed depends both on the upper frontier
80/// of the source input and on the upper frontier of the persist input:
81///
82/// - Input updates are only processed once their timestamp is "done", that is
83/// the input upper is no longer `less_equal` their timestamp.
84///
85/// - Input updates are only processed once they are at the persist upper, that
86/// is we have emitted and written down updates for all previous times and we
87/// have updated our map-like state to the latest global state of the output of
88/// the ingestion pipeline. We know this is the case when the persist upper is
89/// no longer `less_than` their timestamp.
90///
91/// As an optimization, we allow processing input updates when they are right at
92/// the input frontier. This is called _partial emission_ because we are
93/// emitting updates that might be retracted when processing more updates from
94/// the same timestamp. In order to be able to process these updates we keep
95/// _provisional values_ in our upsert state. These will be overwritten when we
96/// get the final upsert values on the persist input.
97///
98/// ## Processing the Persist Input
99///
100/// We continually ingest updates from the persist input into our state using
101/// `UpsertState::consolidate_chunk`. We might be ingesting updates from the
102/// initial snapshot (when starting the operator) that are not consolidated or
103/// we might be ingesting updates from a partial emission (see above). In either
104/// case, our input might not be consolidated and `consolidate_chunk` is able to
105/// handle that.
106pub fn upsert_inner<G: Scope, FromTime, F, Fut, US>(
107 input: &Collection<G, (UpsertKey, Option<UpsertValue>, FromTime), Diff>,
108 key_indices: Vec<usize>,
109 resume_upper: Antichain<G::Timestamp>,
110 persist_input: Collection<G, Result<Row, DataflowError>, Diff>,
111 mut persist_token: Option<Vec<PressOnDropButton>>,
112 upsert_metrics: UpsertMetrics,
113 source_config: crate::source::SourceExportCreationConfig,
114 state_fn: F,
115 upsert_config: UpsertConfig,
116 prevent_snapshot_buffering: bool,
117 snapshot_buffering_max: Option<usize>,
118) -> (
119 Collection<G, Result<Row, DataflowError>, Diff>,
120 Stream<G, (Option<GlobalId>, HealthStatusUpdate)>,
121 Stream<G, Infallible>,
122 PressOnDropButton,
123)
124where
125 G::Timestamp: Refines<mz_repr::Timestamp> + TotalOrder + Sync,
126 F: FnOnce() -> Fut + 'static,
127 Fut: std::future::Future<Output = US>,
128 US: UpsertStateBackend<G::Timestamp, Option<FromTime>>,
129 FromTime: Debug + timely::ExchangeData + Ord + Sync,
130{
131 let mut builder = AsyncOperatorBuilder::new("Upsert".to_string(), input.scope());
132
133 // We only care about UpsertValueError since this is the only error that we can retract
134 let persist_input = persist_input.flat_map(move |result| {
135 let value = match result {
136 Ok(ok) => Ok(ok),
137 Err(DataflowError::EnvelopeError(err)) => match *err {
138 EnvelopeError::Upsert(err) => Err(err),
139 _ => return None,
140 },
141 Err(_) => return None,
142 };
143 Some((UpsertKey::from_value(value.as_ref(), &key_indices), value))
144 });
145 let (output_handle, output) = builder.new_output();
146
147 // An output that just reports progress of the snapshot consolidation process upstream to the
148 // persist source to ensure that backpressure is applied
149 let (_snapshot_handle, snapshot_stream) =
150 builder.new_output::<CapacityContainerBuilder<Vec<Infallible>>>();
151
152 let (mut health_output, health_stream) = builder.new_output();
153 let mut input = builder.new_input_for(
154 &input.inner,
155 Exchange::new(move |((key, _, _), _, _)| UpsertKey::hashed(key)),
156 &output_handle,
157 );
158
159 let mut persist_input = builder.new_disconnected_input(
160 &persist_input.inner,
161 Exchange::new(|((key, _), _, _)| UpsertKey::hashed(key)),
162 );
163
164 let upsert_shared_metrics = Arc::clone(&upsert_metrics.shared);
165
166 let shutdown_button = builder.build(move |caps| async move {
167 let [mut output_cap, snapshot_cap, health_cap]: [_; 3] = caps.try_into().unwrap();
168 let mut snapshot_cap = CapabilitySet::from_elem(snapshot_cap);
169
170 // The order key of the `UpsertState` is `Option<FromTime>`, which implements `Default`
171 // (as required for `consolidate_chunk`), with slightly more efficient serialization
172 // than a default `Partitioned`.
173
174 let mut state = UpsertState::<_, G::Timestamp, Option<FromTime>>::new(
175 state_fn().await,
176 upsert_shared_metrics,
177 &upsert_metrics,
178 source_config.source_statistics.clone(),
179 upsert_config.shrink_upsert_unused_buffers_by_ratio,
180 );
181
182 // True while we're still reading the initial "snapshot" (a whole bunch
183 // of updates, all at the same initial timestamp) from our persist
184 // input or while we're reading the initial snapshot from the upstream
185 // source.
186 let mut hydrating = true;
187
188 // A re-usable buffer of changes, per key. This is an `IndexMap` because it has to be `drain`-able
189 // and have a consistent iteration order.
190 let mut commands_state: indexmap::IndexMap<_, upsert_types::UpsertValueAndSize<G::Timestamp, Option<FromTime>>> =
191 indexmap::IndexMap::new();
192 let mut multi_get_scratch = Vec::new();
193
194 // For stashing source input while it's not eligible for processing.
195 // We don't care about the order, but we do want drain().
196 #[allow(clippy::disallowed_types)]
197 let mut stash = HashMap::new();
198 let mut input_upper = Antichain::from_elem(Timestamp::minimum());
199 let mut partial_drain_time = None;
200
201 // For our persist/feedback input, both of these.
202 let mut persist_stash = vec![];
203 let mut persist_upper = Antichain::from_elem(Timestamp::minimum());
204
205 // We keep track of the largest timestamp seen on the persist input so
206 // that we can block processing source input while that timestamp is
207 // beyond the persist frontier. While ingesting updates of a timestamp,
208 // our upsert state is in a consolidating state, and trying to read it
209 // at that time would yield a panic.
210 //
211 // NOTE(aljoscha): You would think that it cannot happen that we even
212 // attempt to process source updates while the state is in a
213 // consolidating state, because we always wait until the persist
214 // frontier "catches up" with the timestamp of the source input. If
215 // there is only this here UPSERT operator and no concurrent instances,
216 // this is true. But with concurrent instances it can happen that an
217 // operator that is faster than us makes it so updates get written to
218 // persist. And we would then be ingesting them.
219 let mut largest_seen_persist_ts: Option<G::Timestamp> = None;
220
221 // A buffer for our output.
222 let mut output_updates = vec![];
223
224 let mut error_emitter = (&mut health_output, &health_cap);
225
226
227 loop {
228 tokio::select! {
229 _ = persist_input.ready() => {
230 // Read away as much input as we can.
231 while let Some(persist_event) = persist_input.next_sync() {
232 match persist_event {
233 AsyncEvent::Data(time, data) => {
234 tracing::trace!(
235 worker_id = %source_config.worker_id,
236 source_id = %source_config.id,
237 time=?time,
238 updates=%data.len(),
239 "received persist data");
240
241 persist_stash.extend(data.into_iter().map(|((key, value), ts, diff)| {
242 largest_seen_persist_ts = std::cmp::max(largest_seen_persist_ts.clone(), Some(ts.clone()));
243 (key, value, ts, diff)
244 }));
245 }
246 AsyncEvent::Progress(upper) => {
247 tracing::trace!(
248 worker_id = %source_config.worker_id,
249 source_id = %source_config.id,
250 ?upper,
251 "received persist progress");
252 persist_upper = upper;
253 }
254 }
255 }
256
257 let last_rehydration_chunk =
258 hydrating && PartialOrder::less_equal(&resume_upper, &persist_upper);
259
260 tracing::debug!(
261 worker_id = %source_config.worker_id,
262 source_id = %source_config.id,
263 persist_stash = %persist_stash.len(),
264 %hydrating,
265 %last_rehydration_chunk,
266 ?resume_upper,
267 ?persist_upper,
268 "ingesting persist snapshot chunk");
269
270 let persist_stash_iter = persist_stash
271 .drain(..)
272 .map(|(key, val, _ts, diff)| (key, val, diff));
273
274 match state
275 .consolidate_chunk(
276 persist_stash_iter,
277 last_rehydration_chunk,
278 )
279 .await
280 {
281 Ok(_) => {}
282 Err(e) => {
283 // Make sure our persist source can shut down.
284 persist_token.take();
285 snapshot_cap.downgrade(&[]);
286 UpsertErrorEmitter::<G>::emit(
287 &mut error_emitter,
288 "Failed to rehydrate state".to_string(),
289 e,
290 )
291 .await;
292 }
293 }
294
295 tracing::debug!(
296 worker_id = %source_config.worker_id,
297 source_id = %source_config.id,
298 ?resume_upper,
299 ?persist_upper,
300 "downgrading snapshot cap",
301 );
302
303 // Only downgrade this _after_ ingesting the data, because
304 // that can actually take quite some time, and we don't want
305 // to announce that we're done ingesting the initial
306 // snapshot too early.
307 //
308 // When we finish ingesting our initial persist snapshot,
309 // during "re-hydration", we downgrade this to the empty
310 // frontier, so we need to be lenient to this failing from
311 // then on.
312 let _ = snapshot_cap.try_downgrade(persist_upper.iter());
313
314
315
316 if last_rehydration_chunk {
317 hydrating = false;
318
319 tracing::info!(
320 worker_id = %source_config.worker_id,
321 source_id = %source_config.id,
322 "upsert source finished rehydration",
323 );
324
325 snapshot_cap.downgrade(&[]);
326 }
327
328 }
329 _ = input.ready() => {
330 let mut events_processed = 0;
331 while let Some(event) = input.next_sync() {
332 match event {
333 AsyncEvent::Data(cap, mut data) => {
334 tracing::trace!(
335 worker_id = %source_config.worker_id,
336 source_id = %source_config.id,
337 time=?cap.time(),
338 updates=%data.len(),
339 "received data");
340
341 let event_time = cap.time().clone();
342
343 stage_input(
344 &mut stash,
345 cap,
346 &mut data,
347 &input_upper,
348 &resume_upper,
349 );
350
351 if prevent_snapshot_buffering && output_cap.time() == &event_time {
352 tracing::debug!(
353 worker_id = %source_config.worker_id,
354 source_id = %source_config.id,
355 ?event_time,
356 ?resume_upper,
357 ?output_cap,
358 "allowing partial drain");
359 partial_drain_time = Some(event_time.clone());
360 } else {
361 tracing::debug!(
362 worker_id = %source_config.worker_id,
363 source_id = %source_config.id,
364 %prevent_snapshot_buffering,
365 ?event_time,
366 ?resume_upper,
367 ?output_cap,
368 "not allowing partial drain");
369 }
370 }
371 AsyncEvent::Progress(upper) => {
372 tracing::trace!(
373 worker_id = %source_config.worker_id,
374 source_id = %source_config.id,
375 ?upper,
376 "received progress");
377
378 // Ignore progress updates before the `resume_upper`, which is our initial
379 // capability post-snapshotting.
380 if PartialOrder::less_than(&upper, &resume_upper) {
381 tracing::trace!(
382 worker_id = %source_config.worker_id,
383 source_id = %source_config.id,
384 ?upper,
385 ?resume_upper,
386 "ignoring progress updates before resume_upper");
387 continue;
388 }
389
390 // Disable partial drain, because this progress
391 // update has moved the frontier. We might allow
392 // it again once we receive data right at the
393 // frontier again.
394 partial_drain_time = None;
395
396
397 if let Some(ts) = upper.as_option() {
398 tracing::trace!(
399 worker_id = %source_config.worker_id,
400 source_id = %source_config.id,
401 ?ts,
402 "downgrading output capability");
403 let _ = output_cap.try_downgrade(ts);
404 }
405 input_upper = upper;
406 }
407 }
408
409 events_processed += 1;
410 if let Some(max) = snapshot_buffering_max {
411 if events_processed >= max {
412 break;
413 }
414 }
415 }
416 }
417 };
418
419 // While we have partially ingested updates of a timestamp our state
420 // is in an inconsistent/consolidating state and accessing it would
421 // panic.
422 if let Some(largest_seen_persist_ts) = largest_seen_persist_ts.as_ref() {
423 let largest_seen_outer_persist_ts = largest_seen_persist_ts.clone().to_outer();
424 let outer_persist_upper = persist_upper.iter().map(|ts| ts.clone().to_outer());
425 let outer_persist_upper = Antichain::from_iter(outer_persist_upper);
426 if outer_persist_upper.less_equal(&largest_seen_outer_persist_ts) {
427 continue;
428 }
429 }
430
431 // We try and drain from our stash every time we go through the
432 // loop. More of our stash can become eligible for draining both
433 // when the source-input frontier advances or when the persist
434 // frontier advances.
435
436 // We can't easily iterate through the cap -> updates mappings and
437 // downgrade the cap at the same time, so we drain them out and
438 // re-insert them into the map at their (possibly downgraded) cap.
439
440
441 let stashed_work = stash.drain().collect_vec();
442 for (mut cap, mut updates) in stashed_work.into_iter() {
443 tracing::trace!(
444 worker_id = %source_config.worker_id,
445 source_id = %source_config.id,
446 ?cap,
447 ?updates,
448 "stashed updates");
449
450 let mut min_remaining_time = drain_staged_input::<_, G, _, _, _>(
451 &mut updates,
452 &mut commands_state,
453 &mut output_updates,
454 &mut multi_get_scratch,
455 DrainStyle::ToUpper{input_upper: &input_upper, persist_upper: &persist_upper},
456 &mut error_emitter,
457 &mut state,
458 &source_config,
459 )
460 .await;
461
462 tracing::trace!(
463 worker_id = %source_config.worker_id,
464 source_id = %source_config.id,
465 output_updates = %output_updates.len(),
466 "output updates for complete timestamp");
467
468 for (update, ts, diff) in output_updates.drain(..) {
469 output_handle.give(&cap, (update, ts, diff));
470 }
471
472 if !updates.is_empty() {
473 let min_remaining_time = min_remaining_time.take().expect("we still have updates left");
474 cap.downgrade(&min_remaining_time);
475
476 // Stash them back in, being careful because we might have
477 // to merge them with other updates that we already have for
478 // that timestamp.
479 stash.entry(cap)
480 .and_modify(|existing_updates| existing_updates.append(&mut updates))
481 .or_insert_with(|| updates);
482
483 }
484 }
485
486
487 if input_upper.is_empty() {
488 tracing::debug!(
489 worker_id = %source_config.worker_id,
490 source_id = %source_config.id,
491 "input exhausted, shutting down");
492 break;
493 };
494
495 // If there were staged events that occurred at the capability time, drain
496 // them. This is safe because out-of-order updates to the same key that are
497 // drained in separate calls to `drain_staged_input` are correctly ordered by
498 // their `FromTime` in `drain_staged_input`.
499 //
500 // Note also that this may result in more updates in the output collection than
501 // the minimum. However, because the frontier only advances on `Progress` updates,
502 // the collection always accumulates correctly for all keys.
503 if let Some(partial_drain_time) = &partial_drain_time {
504
505 let stashed_work = stash.drain().collect_vec();
506 for (mut cap, mut updates) in stashed_work.into_iter() {
507 tracing::trace!(
508 worker_id = %source_config.worker_id,
509 source_id = %source_config.id,
510 ?cap,
511 ?updates,
512 "stashed updates");
513
514 let mut min_remaining_time = drain_staged_input::<_, G, _, _, _>(
515 &mut updates,
516 &mut commands_state,
517 &mut output_updates,
518 &mut multi_get_scratch,
519 DrainStyle::AtTime{
520 time: partial_drain_time.clone(),
521 persist_upper: &persist_upper
522 },
523 &mut error_emitter,
524 &mut state,
525 &source_config,
526 )
527 .await;
528
529 tracing::trace!(
530 worker_id = %source_config.worker_id,
531 source_id = %source_config.id,
532 output_updates = %output_updates.len(),
533 "output updates for partial timestamp");
534
535 for (update, ts, diff) in output_updates.drain(..) {
536 output_handle.give(&cap, (update, ts, diff));
537 }
538
539 if !updates.is_empty() {
540 let min_remaining_time = min_remaining_time.take().expect("we still have updates left");
541 cap.downgrade(&min_remaining_time);
542
543 // Stash them back in, being careful because we might have
544 // to merge them with other updates that we already have for
545 // that timestamp.
546 stash.entry(cap)
547 .and_modify(|existing_updates| existing_updates.append(&mut updates))
548 .or_insert_with(|| updates);
549
550 }
551 }
552 }
553 }
554 });
555
556 (
557 output.as_collection().map(|result| match result {
558 Ok(ok) => Ok(ok),
559 Err(err) => Err(DataflowError::from(EnvelopeError::Upsert(err))),
560 }),
561 health_stream,
562 snapshot_stream,
563 shutdown_button.press_on_drop(),
564 )
565}
566
567/// Helper method for [`upsert_inner`] used to stage `data` updates
568/// from the input/source timely edge.
569#[allow(clippy::disallowed_types)]
570fn stage_input<T, FromTime>(
571 stash: &mut HashMap<Capability<T>, Vec<(T, UpsertKey, Reverse<FromTime>, Option<UpsertValue>)>>,
572 cap: Capability<T>,
573 data: &mut Vec<((UpsertKey, Option<UpsertValue>, FromTime), T, Diff)>,
574 input_upper: &Antichain<T>,
575 resume_upper: &Antichain<T>,
576) where
577 T: PartialOrder + timely::progress::Timestamp,
578 FromTime: Ord,
579{
580 if PartialOrder::less_equal(input_upper, resume_upper) {
581 data.retain(|(_, ts, _)| resume_upper.less_equal(ts));
582 }
583
584 let stash_for_timestamp = stash.entry(cap).or_default();
585
586 stash_for_timestamp.extend(data.drain(..).map(|((key, value, order), time, diff)| {
587 assert!(diff.is_positive(), "invalid upsert input");
588 (time, key, Reverse(order), value)
589 }));
590}
591
592/// The style of drain we are performing on the stash. `AtTime`-drains cannot
593/// assume that all values have been seen, and must leave tombstones behind for deleted values.
594#[derive(Debug)]
595enum DrainStyle<'a, T> {
596 ToUpper {
597 input_upper: &'a Antichain<T>,
598 persist_upper: &'a Antichain<T>,
599 },
600 // For partial draining when taking the source snapshot.
601 AtTime {
602 time: T,
603 persist_upper: &'a Antichain<T>,
604 },
605}
606
607/// Helper method for [`upsert_inner`] used to stage `data` updates
608/// from the input timely edge.
609///
610/// Returns the minimum observed time across the updates that remain in the
611/// stash or `None` if none are left.
612async fn drain_staged_input<S, G, T, FromTime, E>(
613 stash: &mut Vec<(T, UpsertKey, Reverse<FromTime>, Option<UpsertValue>)>,
614 commands_state: &mut indexmap::IndexMap<UpsertKey, UpsertValueAndSize<T, Option<FromTime>>>,
615 output_updates: &mut Vec<(Result<Row, UpsertError>, T, Diff)>,
616 multi_get_scratch: &mut Vec<UpsertKey>,
617 drain_style: DrainStyle<'_, T>,
618 error_emitter: &mut E,
619 state: &mut UpsertState<'_, S, T, Option<FromTime>>,
620 source_config: &crate::source::SourceExportCreationConfig,
621) -> Option<T>
622where
623 S: UpsertStateBackend<T, Option<FromTime>>,
624 G: Scope,
625 T: TotalOrder + timely::ExchangeData + Debug + Ord + Sync,
626 FromTime: timely::ExchangeData + Ord + Sync,
627 E: UpsertErrorEmitter<G>,
628{
629 let mut min_remaining_time = Antichain::new();
630
631 let mut eligible_updates = stash
632 .drain_filter_swapping(|(ts, _, _, _)| {
633 let eligible = match &drain_style {
634 DrainStyle::ToUpper {
635 input_upper,
636 persist_upper,
637 } => {
638 // We make sure that a) we only process updates when we know their
639 // timestamp is complete, that is there will be no more updates for
640 // that timestamp, and b) that "previous" times in the persist
641 // input are complete. The latter makes sure that we emit updates
642 // for the next timestamp that are consistent with the global state
643 // in the output persist shard, which also serves as a persistent
644 // copy of our in-memory/on-disk upsert state.
645 !input_upper.less_equal(ts) && !persist_upper.less_than(ts)
646 }
647 DrainStyle::AtTime {
648 time,
649 persist_upper,
650 } => {
651 // Even when emitting partial updates, we still need to wait
652 // until "previous" times in the persist input are complete.
653 *ts <= *time && !persist_upper.less_than(ts)
654 }
655 };
656
657 if !eligible {
658 min_remaining_time.insert(ts.clone());
659 }
660
661 eligible
662 })
663 .filter(|(ts, _, _, _)| {
664 let persist_upper = match &drain_style {
665 DrainStyle::ToUpper {
666 input_upper: _,
667 persist_upper,
668 } => persist_upper,
669 DrainStyle::AtTime {
670 time: _,
671 persist_upper,
672 } => persist_upper,
673 };
674
675 // Any update that is "in the past" of the persist upper is not
676 // relevant anymore. We _can_ emit changes for it, but the
677 // downstream persist_sink would filter these updates out because
678 // the shard upper is already further ahead.
679 //
680 // Plus, our upsert state is up-to-date to the persist_upper, so we
681 // wouldn't be able to emit correct retractions for incoming
682 // commands whose `ts` is in the past of that.
683 let relevant = persist_upper.less_equal(ts);
684 relevant
685 })
686 .collect_vec();
687
688 tracing::debug!(
689 worker_id = %source_config.worker_id,
690 source_id = %source_config.id,
691 ?drain_style,
692 remaining = %stash.len(),
693 eligible = eligible_updates.len(),
694 "draining stash");
695
696 // Sort the eligible updates by (key, time, Reverse(from_time)) so that
697 // deduping by (key, time) gives the latest change for that key.
698 eligible_updates.sort_unstable_by(|a, b| {
699 let (ts1, key1, from_ts1, val1) = a;
700 let (ts2, key2, from_ts2, val2) = b;
701 Ord::cmp(&(ts1, key1, from_ts1, val1), &(ts2, key2, from_ts2, val2))
702 });
703
704 // Read the previous values _per key_ out of `state`, recording it
705 // along with the value with the _latest timestamp for that key_.
706 commands_state.clear();
707 for (_, key, _, _) in eligible_updates.iter() {
708 commands_state.entry(*key).or_default();
709 }
710
711 // These iterators iterate in the same order because `commands_state`
712 // is an `IndexMap`.
713 multi_get_scratch.clear();
714 multi_get_scratch.extend(commands_state.iter().map(|(k, _)| *k));
715 match state
716 .multi_get(multi_get_scratch.drain(..), commands_state.values_mut())
717 .await
718 {
719 Ok(_) => {}
720 Err(e) => {
721 error_emitter
722 .emit("Failed to fetch records from state".to_string(), e)
723 .await;
724 }
725 }
726
727 // From the prefix that can be emitted we can deduplicate based on (ts, key) in
728 // order to only process the command with the maximum order within the (ts,
729 // key) group. This is achieved by wrapping order in `Reverse(FromTime)` above.;
730 let mut commands = eligible_updates.into_iter().dedup_by(|a, b| {
731 let ((a_ts, a_key, _, _), (b_ts, b_key, _, _)) = (a, b);
732 a_ts == b_ts && a_key == b_key
733 });
734
735 let bincode_opts = upsert_types::upsert_bincode_opts();
736 // Upsert the values into `commands_state`, by recording the latest
737 // value (or deletion). These will be synced at the end to the `state`.
738 //
739 // Note that we are effectively doing "mini-upsert" here, using
740 // `command_state`. This "mini-upsert" is seeded with data from `state`, using
741 // a single `multi_get` above, and the final state is written out into
742 // `state` using a single `multi_put`. This simplifies `UpsertStateBackend`
743 // implementations, and reduces the number of reads and write we need to do.
744 //
745 // This "mini-upsert" technique is actually useful in `UpsertState`'s
746 // `consolidate_snapshot_read_write_inner` implementation, minimizing gets and puts on
747 // the `UpsertStateBackend` implementations. In some sense, its "upsert all the way down".
748 while let Some((ts, key, from_time, value)) = commands.next() {
749 let mut command_state = if let Entry::Occupied(command_state) = commands_state.entry(key) {
750 command_state
751 } else {
752 panic!("key missing from commands_state");
753 };
754
755 let existing_state_cell = &mut command_state.get_mut().value;
756
757 if let Some(cs) = existing_state_cell.as_mut() {
758 cs.ensure_decoded(bincode_opts);
759 }
760
761 // Skip this command if its order key is below the one in the upsert state.
762 // Note that the existing order key may be `None` if the existing value
763 // is from snapshotting, which always sorts below new values/deletes.
764 let existing_order = existing_state_cell
765 .as_ref()
766 .and_then(|cs| cs.provisional_order(&ts).map_or(None, Option::as_ref));
767 //.map(|cs| cs.provisional_order(&ts).map_or(None, Option::as_ref));
768 if existing_order >= Some(&from_time.0) {
769 // Skip this update. If no later updates adjust this key, then we just
770 // end up writing the same value back to state. If there
771 // is nothing in the state, `existing_order` is `None`, and this
772 // does not occur.
773 continue;
774 }
775
776 match value {
777 Some(value) => {
778 if let Some(old_value) = existing_state_cell.as_ref() {
779 if let Some(old_value) = old_value.provisional_value_ref(&ts) {
780 output_updates.push((old_value.clone(), ts.clone(), Diff::MINUS_ONE));
781 }
782 }
783
784 match &drain_style {
785 DrainStyle::AtTime { .. } => {
786 let existing_value = existing_state_cell.take();
787
788 let new_value = match existing_value {
789 Some(existing_value) => existing_value.clone().into_provisional_value(
790 value.clone(),
791 ts.clone(),
792 Some(from_time.0.clone()),
793 ),
794 None => StateValue::new_provisional_value(
795 value.clone(),
796 ts.clone(),
797 Some(from_time.0.clone()),
798 ),
799 };
800
801 existing_state_cell.replace(new_value);
802 }
803 DrainStyle::ToUpper { .. } => {
804 // Not writing down provisional values, or anything.
805 }
806 };
807
808 output_updates.push((value, ts, Diff::ONE));
809 }
810 None => {
811 if let Some(old_value) = existing_state_cell.as_ref() {
812 if let Some(old_value) = old_value.provisional_value_ref(&ts) {
813 output_updates.push((old_value.clone(), ts.clone(), Diff::MINUS_ONE));
814 }
815 }
816
817 match &drain_style {
818 DrainStyle::AtTime { .. } => {
819 let existing_value = existing_state_cell.take();
820
821 let new_value = match existing_value {
822 Some(existing_value) => existing_value
823 .into_provisional_tombstone(ts.clone(), Some(from_time.0.clone())),
824 None => StateValue::new_provisional_tombstone(
825 ts.clone(),
826 Some(from_time.0.clone()),
827 ),
828 };
829
830 existing_state_cell.replace(new_value);
831 }
832 DrainStyle::ToUpper { .. } => {
833 // Not writing down provisional values, or anything.
834 }
835 }
836 }
837 }
838 }
839
840 match &drain_style {
841 DrainStyle::AtTime { .. } => {
842 match state
843 .multi_put(
844 // We don't want to update per-record stats, like size of
845 // records indexed or count of records indexed.
846 //
847 // We only add provisional values and these will be
848 // overwritten once we receive updates for state from the
849 // persist input. And the merge functionality cannot know
850 // what was in state before merging, so it cannot correctly
851 // retract/update stats added here.
852 //
853 // Mostly, the merge functionality can't update those stats
854 // because merging happens in a function that we pass to
855 // rocksdb which doesn't have access to any external
856 // context. And in general, with rocksdb we do blind writes
857 // rather than inspect what was there before when
858 // updating/inserting.
859 false,
860 commands_state.drain(..).map(|(k, cv)| {
861 (
862 k,
863 upsert_types::PutValue {
864 value: cv.value.map(|cv| cv.into_decoded()),
865 previous_value_metadata: cv.metadata.map(|v| ValueMetadata {
866 size: v.size.try_into().expect("less than i64 size"),
867 is_tombstone: v.is_tombstone,
868 }),
869 },
870 )
871 }),
872 )
873 .await
874 {
875 Ok(_) => {}
876 Err(e) => {
877 error_emitter
878 .emit("Failed to update records in state".to_string(), e)
879 .await;
880 }
881 }
882 }
883 style => {
884 tracing::trace!(
885 worker_id = %source_config.worker_id,
886 source_id = %source_config.id,
887 "not doing state update for drain style {:?}", style);
888 }
889 }
890
891 min_remaining_time.into_option()
892}