1use std::collections::BTreeMap;
13use std::fmt::Debug;
14use std::ops::ControlFlow::{self, Break, Continue};
15use std::sync::Arc;
16use std::time::Instant;
17
18use differential_dataflow::difference::Semigroup;
19use differential_dataflow::lattice::Lattice;
20use mz_ore::cast::CastFrom;
21use mz_persist::location::{CaSResult, Indeterminate, SeqNo, VersionedData};
22use mz_persist_types::schema::SchemaId;
23use mz_persist_types::{Codec, Codec64};
24use timely::progress::{Antichain, Timestamp};
25use tracing::debug;
26
27use crate::cache::{LockingTypedState, StateCache};
28use crate::error::{CodecMismatch, InvalidUsage};
29use crate::internal::gc::GcReq;
30use crate::internal::maintenance::RoutineMaintenance;
31use crate::internal::metrics::{CmdMetrics, Metrics, ShardMetrics};
32use crate::internal::paths::{PartialRollupKey, RollupId};
33use crate::internal::state::{
34 EncodedSchemas, ExpiryMetrics, HollowBatch, ROLLUP_THRESHOLD, Since, SnapshotErr,
35 StateCollections, TypedState, Upper,
36};
37use crate::internal::state_diff::StateDiff;
38use crate::internal::state_versions::{EncodedRollup, StateVersions};
39use crate::internal::trace::FueledMergeReq;
40use crate::internal::watch::StateWatch;
41use crate::rpc::{PUBSUB_PUSH_DIFF_ENABLED, PubSubSender};
42use crate::schema::SchemaCache;
43use crate::{Diagnostics, PersistConfig, ShardId};
44
45use super::state::{
46 ActiveGc, ActiveRollup, GC_FALLBACK_THRESHOLD_MS, GC_USE_ACTIVE_GC,
47 ROLLUP_FALLBACK_THRESHOLD_MS, ROLLUP_USE_ACTIVE_ROLLUP,
48};
49
50#[derive(Debug)]
55pub struct Applier<K, V, T, D> {
56 pub(crate) cfg: PersistConfig,
57 pub(crate) metrics: Arc<Metrics>,
58 pub(crate) shard_metrics: Arc<ShardMetrics>,
59 pub(crate) state_versions: Arc<StateVersions>,
60 shared_states: Arc<StateCache>,
61 pubsub_sender: Arc<dyn PubSubSender>,
62 pub(crate) shard_id: ShardId,
63
64 state: Arc<LockingTypedState<K, V, T, D>>,
73}
74
75impl<K, V, T: Clone, D> Clone for Applier<K, V, T, D> {
77 fn clone(&self) -> Self {
78 Self {
79 cfg: self.cfg.clone(),
80 metrics: Arc::clone(&self.metrics),
81 shard_metrics: Arc::clone(&self.shard_metrics),
82 state_versions: Arc::clone(&self.state_versions),
83 shared_states: Arc::clone(&self.shared_states),
84 pubsub_sender: Arc::clone(&self.pubsub_sender),
85 shard_id: self.shard_id,
86 state: Arc::clone(&self.state),
87 }
88 }
89}
90
91impl<K, V, T, D> Applier<K, V, T, D>
92where
93 K: Debug + Codec,
94 V: Debug + Codec,
95 T: Timestamp + Lattice + Codec64 + Sync,
96 D: Semigroup + Codec64,
97{
98 pub async fn new(
99 cfg: PersistConfig,
100 shard_id: ShardId,
101 metrics: Arc<Metrics>,
102 state_versions: Arc<StateVersions>,
103 shared_states: Arc<StateCache>,
104 pubsub_sender: Arc<dyn PubSubSender>,
105 diagnostics: Diagnostics,
106 ) -> Result<Self, Box<CodecMismatch>> {
107 let shard_metrics = metrics.shards.shard(&shard_id, &diagnostics.shard_name);
108 let state = shared_states
109 .get::<K, V, T, D, _, _>(
110 shard_id,
111 || {
112 metrics.cmds.init_state.run_cmd(&shard_metrics, || {
113 state_versions.maybe_init_shard(&shard_metrics)
114 })
115 },
116 &diagnostics,
117 )
118 .await?;
119 let ret = Applier {
120 cfg,
121 metrics,
122 shard_metrics,
123 state_versions,
124 shared_states,
125 pubsub_sender,
126 shard_id,
127 state,
128 };
129 Ok(ret)
130 }
131
132 pub fn watch(&self) -> StateWatch<K, V, T, D> {
134 StateWatch::new(Arc::clone(&self.state), Arc::clone(&self.metrics))
135 }
136
137 pub async fn fetch_upper<R, F: FnMut(&Antichain<T>) -> R>(&self, mut f: F) -> R {
139 self.metrics.cmds.fetch_upper_count.inc();
140 self.fetch_and_update_state(None).await;
141 self.upper(|_seqno, upper| f(upper))
142 }
143
144 pub fn clone_upper(&self) -> Antichain<T> {
151 self.upper(|_seqno, upper| upper.clone())
152 }
153
154 pub(crate) fn upper<R, F: FnMut(SeqNo, &Antichain<T>) -> R>(&self, mut f: F) -> R {
155 self.state
156 .read_lock(&self.metrics.locks.applier_read_cacheable, move |state| {
157 f(state.seqno, state.upper())
158 })
159 }
160
161 pub(crate) fn schemas<R>(
162 &self,
163 mut f: impl FnMut(SeqNo, &BTreeMap<SchemaId, EncodedSchemas>) -> R,
164 ) -> R {
165 self.state
166 .read_lock(&self.metrics.locks.applier_read_cacheable, move |state| {
167 f(state.seqno, &state.collections.schemas)
168 })
169 }
170
171 pub(crate) fn schema_cache(&self) -> SchemaCache<K, V, T, D> {
172 SchemaCache::new(self.state.schema_cache(), self.clone())
173 }
174
175 #[cfg(test)]
182 pub fn since(&self) -> Antichain<T> {
183 self.state
184 .read_lock(&self.metrics.locks.applier_read_cacheable, |state| {
185 state.since().clone()
186 })
187 }
188
189 pub fn seqno(&self) -> SeqNo {
196 self.state
197 .read_lock(&self.metrics.locks.applier_read_cacheable, |state| {
198 state.seqno
199 })
200 }
201
202 pub fn seqno_since(&self) -> SeqNo {
209 self.state
210 .read_lock(&self.metrics.locks.applier_read_cacheable, |state| {
211 state.seqno_since()
212 })
213 }
214
215 pub fn is_finalized(&self) -> bool {
222 self.state
223 .read_lock(&self.metrics.locks.applier_read_cacheable, |state| {
224 state.collections.is_tombstone() && state.collections.is_single_empty_batch()
225 })
226 }
227
228 pub fn get_schema(&self, schema_id: SchemaId) -> Option<(K::Schema, V::Schema)> {
230 self.state
231 .read_lock(&self.metrics.locks.applier_read_cacheable, |state| {
232 let x = state.collections.schemas.get(&schema_id)?;
233 Some((K::decode_schema(&x.key), V::decode_schema(&x.val)))
234 })
235 }
236
237 pub fn latest_schema(&self) -> Option<(SchemaId, K::Schema, V::Schema)> {
239 self.state
240 .read_lock(&self.metrics.locks.applier_read_cacheable, |state| {
241 let (id, x) = state.collections.schemas.last_key_value()?;
242 Some((*id, K::decode_schema(&x.key), V::decode_schema(&x.val)))
243 })
244 }
245
246 pub fn check_since_upper_both_empty(&self) -> Result<(), InvalidUsage<T>> {
252 self.state
253 .read_lock(&self.metrics.locks.applier_read_cacheable, |state| {
254 if state.since().is_empty() && state.upper().is_empty() {
255 Ok(())
256 } else {
257 Err(InvalidUsage::FinalizationError {
258 since: state.since().clone(),
259 upper: state.upper().clone(),
260 })
261 }
262 })
263 }
264
265 pub fn rollups_lte_seqno(&self, seqno: SeqNo) -> Vec<(SeqNo, PartialRollupKey)> {
271 self.state
272 .read_lock(&self.metrics.locks.applier_read_noncacheable, |state| {
273 state
274 .collections
275 .rollups
276 .range(..=seqno)
277 .map(|(seqno, rollup)| (*seqno, rollup.key.clone()))
278 .collect::<Vec<(SeqNo, PartialRollupKey)>>()
279 })
280 }
281
282 pub fn all_fueled_merge_reqs(&self) -> Vec<FueledMergeReq<T>> {
283 self.state
284 .read_lock(&self.metrics.locks.applier_read_noncacheable, |state| {
285 state
286 .collections
287 .trace
288 .fueled_merge_reqs_before_ms(u64::MAX, None)
289 .collect()
290 })
291 }
292
293 pub fn snapshot(&self, as_of: &Antichain<T>) -> Result<Vec<HollowBatch<T>>, SnapshotErr<T>> {
294 self.state
295 .read_lock(&self.metrics.locks.applier_read_noncacheable, |state| {
296 state.snapshot(as_of)
297 })
298 }
299
300 pub fn all_batches(&self) -> Vec<HollowBatch<T>> {
301 self.state
302 .read_lock(&self.metrics.locks.applier_read_noncacheable, |state| {
303 state.state.collections.trace.batches().cloned().collect()
304 })
305 }
306
307 pub fn verify_listen(&self, as_of: &Antichain<T>) -> Result<Result<(), Upper<T>>, Since<T>> {
308 self.state
309 .read_lock(&self.metrics.locks.applier_read_noncacheable, |state| {
310 state.verify_listen(as_of)
311 })
312 }
313
314 pub fn next_listen_batch(&self, frontier: &Antichain<T>) -> Result<HollowBatch<T>, SeqNo> {
315 self.state
316 .read_lock(&self.metrics.locks.applier_read_noncacheable, |state| {
317 state.next_listen_batch(frontier)
318 })
319 }
320
321 pub async fn write_rollup_for_state(&self) -> Option<EncodedRollup> {
322 let state = self
323 .state
324 .read_lock(&self.metrics.locks.applier_read_noncacheable, |state| {
325 state.clone_for_rollup()
326 });
327
328 self.state_versions
329 .write_rollup_for_state(self.shard_metrics.as_ref(), state, &RollupId::new())
330 .await
331 }
332
333 pub async fn apply_unbatched_cmd<
334 R,
335 E,
336 WorkFn: FnMut(SeqNo, &PersistConfig, &mut StateCollections<T>) -> ControlFlow<E, R>,
337 >(
338 &self,
339 cmd: &CmdMetrics,
340 mut work_fn: WorkFn,
341 ) -> Result<(SeqNo, Result<R, E>, RoutineMaintenance), Indeterminate> {
342 loop {
343 cmd.started.inc();
344 let now = Instant::now();
345 let ret = Self::apply_unbatched_cmd_locked(
346 &self.state,
347 cmd,
348 &mut work_fn,
349 &self.cfg,
350 &self.metrics,
351 &self.shard_metrics,
352 &self.state_versions,
353 )
354 .await;
355 cmd.seconds.inc_by(now.elapsed().as_secs_f64());
356
357 match ret {
358 ApplyCmdResult::Committed((diff, new_state, res, maintenance)) => {
359 cmd.succeeded.inc();
360 self.shard_metrics.cmd_succeeded.inc();
361 self.update_state(new_state);
362 if PUBSUB_PUSH_DIFF_ENABLED.get(&self.cfg) {
363 self.pubsub_sender.push_diff(&self.shard_id, &diff);
364 }
365 return Ok((diff.seqno, Ok(res), maintenance));
366 }
367 ApplyCmdResult::SkippedStateTransition((seqno, err, maintenance)) => {
368 cmd.succeeded.inc();
369 self.shard_metrics.cmd_succeeded.inc();
370 return Ok((seqno, Err(err), maintenance));
371 }
372 ApplyCmdResult::Indeterminate(err) => {
373 cmd.failed.inc();
374 return Err(err);
375 }
376 ApplyCmdResult::ExpectationMismatch(seqno) => {
377 cmd.cas_mismatch.inc();
378 self.fetch_and_update_state(Some(seqno)).await;
379 }
380 }
381 }
382 }
383
384 #[allow(clippy::needless_pass_by_ref_mut)]
386 async fn apply_unbatched_cmd_locked<
387 R,
388 E,
389 WorkFn: FnMut(SeqNo, &PersistConfig, &mut StateCollections<T>) -> ControlFlow<E, R>,
390 >(
391 state: &LockingTypedState<K, V, T, D>,
392 cmd: &CmdMetrics,
393 work_fn: &mut WorkFn,
394 cfg: &PersistConfig,
395 metrics: &Metrics,
396 shard_metrics: &ShardMetrics,
397 state_versions: &StateVersions,
398 ) -> ApplyCmdResult<K, V, T, D, R, E> {
399 let computed_next_state = state
400 .read_lock(&metrics.locks.applier_read_noncacheable, |state| {
401 Self::compute_next_state_locked(state, work_fn, metrics, cmd, cfg)
402 });
403
404 let next_state = match computed_next_state {
405 Ok(x) => x,
406 Err((seqno, err)) => {
407 return ApplyCmdResult::SkippedStateTransition((
408 seqno,
409 err,
410 RoutineMaintenance::default(),
411 ));
412 }
413 };
414
415 let NextState {
416 expected,
417 diff,
418 state,
419 expiry_metrics,
420 garbage_collection,
421 write_rollup,
422 work_ret,
423 } = next_state;
424
425 let cas_res = state_versions
431 .try_compare_and_set_current(&cmd.name, shard_metrics, Some(expected), &state, &diff)
432 .await;
433
434 match cas_res {
435 Ok((CaSResult::Committed, diff)) => {
436 assert!(
437 expected <= state.seqno,
438 "state seqno regressed: {} vs {}",
439 expected,
440 state.seqno
441 );
442
443 metrics
444 .lease
445 .timeout_read
446 .inc_by(u64::cast_from(expiry_metrics.readers_expired));
447
448 metrics
449 .state
450 .writer_removed
451 .inc_by(u64::cast_from(expiry_metrics.writers_expired));
452
453 if let Some(gc) = garbage_collection.as_ref() {
454 debug!("Assigned gc request: {:?}", gc);
455 }
456
457 let maintenance = RoutineMaintenance {
458 garbage_collection,
459 write_rollup,
460 };
461
462 ApplyCmdResult::Committed((diff, state, work_ret, maintenance))
463 }
464 Ok((CaSResult::ExpectationMismatch, _diff)) => {
465 ApplyCmdResult::ExpectationMismatch(expected)
466 }
467 Err(err) => ApplyCmdResult::Indeterminate(err),
468 }
469 }
470
471 fn compute_next_state_locked<
472 R,
473 E,
474 WorkFn: FnMut(SeqNo, &PersistConfig, &mut StateCollections<T>) -> ControlFlow<E, R>,
475 >(
476 state: &TypedState<K, V, T, D>,
477 work_fn: &mut WorkFn,
478 metrics: &Metrics,
479 cmd: &CmdMetrics,
480 cfg: &PersistConfig,
481 ) -> Result<NextState<K, V, T, D, R>, (SeqNo, E)> {
482 let is_write = cmd.name == metrics.cmds.compare_and_append.name;
483 let is_rollup = cmd.name == metrics.cmds.add_rollup.name;
484 let is_become_tombstone = cmd.name == metrics.cmds.become_tombstone.name;
485
486 let expected = state.seqno;
487 let was_tombstone_before = state.collections.is_tombstone();
488
489 let (work_ret, mut new_state) = match state.clone_apply(cfg, work_fn) {
490 Continue(x) => x,
491 Break(err) => {
492 return Err((expected, err));
493 }
494 };
495 let expiry_metrics = new_state.expire_at((cfg.now)());
496 new_state.state.collections.trace.roundtrip_structure = true;
497
498 if was_tombstone_before && !(is_rollup || is_become_tombstone) {
508 panic!(
509 "cmd {} unexpectedly tried to commit a new state on a tombstone: {:?}",
510 cmd.name, state
511 );
512 }
513
514 let now = (cfg.now)();
515 let write_rollup = new_state.need_rollup(
516 ROLLUP_THRESHOLD.get(cfg),
517 ROLLUP_USE_ACTIVE_ROLLUP.get(cfg),
518 u64::cast_from(ROLLUP_FALLBACK_THRESHOLD_MS.get(cfg)),
519 now,
520 );
521
522 if let Some(write_rollup_seqno) = write_rollup {
523 if ROLLUP_USE_ACTIVE_ROLLUP.get(cfg) {
524 new_state.collections.active_rollup = Some(ActiveRollup {
525 seqno: write_rollup_seqno,
526 start_ms: now,
527 });
528 }
529 }
530
531 let garbage_collection = new_state.maybe_gc(
542 is_write,
543 GC_USE_ACTIVE_GC.get(cfg),
544 u64::cast_from(GC_FALLBACK_THRESHOLD_MS.get(cfg)),
545 now,
546 );
547
548 if let Some(gc) = garbage_collection.as_ref() {
549 if GC_USE_ACTIVE_GC.get(cfg) {
550 new_state.collections.active_gc = Some(ActiveGc {
551 seqno: gc.new_seqno_since,
552 start_ms: now,
553 });
554 }
555 }
556
557 let diff = StateDiff::from_diff(&state.state, &new_state);
562 #[cfg(any(test, debug_assertions))]
565 {
566 if let Err(err) = StateDiff::validate_roundtrip(metrics, state, &diff, &new_state) {
567 panic!("validate_roundtrips failed: {}", err);
568 }
569 }
570
571 Ok(NextState {
572 expected,
573 diff,
574 state: new_state,
575 expiry_metrics,
576 garbage_collection,
577 write_rollup,
578 work_ret,
579 })
580 }
581
582 pub fn update_state(&self, new_state: TypedState<K, V, T, D>) {
583 let (seqno_before, seqno_after) =
584 self.state
585 .write_lock(&self.metrics.locks.applier_write, |state| {
586 let seqno_before = state.seqno;
587 if seqno_before < new_state.seqno {
588 *state = new_state;
589 }
590 let seqno_after = state.seqno;
591 (seqno_before, seqno_after)
592 });
593
594 assert!(
595 seqno_before <= seqno_after,
596 "state seqno regressed: {} vs {}",
597 seqno_before,
598 seqno_after
599 );
600 }
601
602 pub async fn fetch_and_update_state(&self, seqno_hint: Option<SeqNo>) {
606 let current_seqno = self.seqno();
607 let seqno_before = match seqno_hint {
608 None => current_seqno,
609 Some(hint) => {
610 if hint < current_seqno {
613 self.metrics.state.update_state_noop_path.inc();
614 return;
615 }
616 current_seqno
617 }
618 };
619
620 let diffs_to_current = self
621 .state_versions
622 .fetch_all_live_diffs_gt_seqno::<K, V, T, D>(&self.shard_id, seqno_before)
623 .await;
624
625 if diffs_to_current.is_empty() {
627 self.metrics.state.update_state_empty_path.inc();
628 return;
629 }
630
631 let new_seqno = self
632 .state
633 .write_lock(&self.metrics.locks.applier_write, |state| {
634 state.apply_encoded_diffs(&self.cfg, &self.metrics, &diffs_to_current);
635 state.seqno
636 });
637
638 assert!(
639 seqno_before <= new_seqno,
640 "state seqno regressed: {} vs {}",
641 seqno_before,
642 new_seqno
643 );
644
645 if seqno_before < new_seqno {
648 self.metrics.state.update_state_fast_path.inc();
649 return;
650 }
651
652 let new_state = self
656 .state_versions
657 .fetch_current_state(&self.shard_id, diffs_to_current)
658 .await
659 .check_codecs::<K, V, D>(&self.shard_id)
660 .expect("shard codecs should not change");
661
662 let new_seqno = self
663 .state
664 .write_lock(&self.metrics.locks.applier_write, |state| {
665 if state.seqno < new_state.seqno {
666 *state = new_state;
667 }
668 state.seqno
669 });
670
671 self.metrics.state.update_state_slow_path.inc();
672 assert!(
673 seqno_before <= new_seqno,
674 "state seqno regressed: {} vs {}",
675 seqno_before,
676 new_seqno
677 );
678 }
679}
680
681enum ApplyCmdResult<K, V, T, D, R, E> {
682 Committed((VersionedData, TypedState<K, V, T, D>, R, RoutineMaintenance)),
683 SkippedStateTransition((SeqNo, E, RoutineMaintenance)),
684 Indeterminate(Indeterminate),
685 ExpectationMismatch(SeqNo),
686}
687
688struct NextState<K, V, T, D, R> {
689 expected: SeqNo,
690 diff: StateDiff<T>,
691 state: TypedState<K, V, T, D>,
692 expiry_metrics: ExpiryMetrics,
693 write_rollup: Option<SeqNo>,
694 garbage_collection: Option<GcReq>,
695 work_ret: R,
696}