1use std::collections::BTreeMap;
13use std::fmt::Debug;
14use std::ops::ControlFlow::{self, Break, Continue};
15use std::sync::Arc;
16use std::time::Instant;
17
18use differential_dataflow::difference::Monoid;
19use differential_dataflow::lattice::Lattice;
20use mz_ore::cast::CastFrom;
21use mz_persist::location::{CaSResult, Indeterminate, SeqNo, VersionedData};
22use mz_persist_types::schema::SchemaId;
23use mz_persist_types::{Codec, Codec64};
24use timely::progress::{Antichain, Timestamp};
25use tracing::debug;
26
27use crate::cache::{LockingTypedState, StateCache};
28use crate::error::{CodecMismatch, InvalidUsage};
29use crate::internal::gc::GcReq;
30use crate::internal::maintenance::RoutineMaintenance;
31use crate::internal::metrics::{CmdMetrics, Metrics, ShardMetrics};
32use crate::internal::paths::{PartialRollupKey, RollupId};
33use crate::internal::state::{
34 ActiveGc, ActiveRollup, EncodedSchemas, ExpiryMetrics, GC_FALLBACK_THRESHOLD_MS,
35 GC_MAX_VERSIONS, GC_MIN_VERSIONS, GC_USE_ACTIVE_GC, GcConfig, HollowBatch, LeasedReaderState,
36 ROLLUP_FALLBACK_THRESHOLD_MS, ROLLUP_THRESHOLD, ROLLUP_USE_ACTIVE_ROLLUP, Since, SnapshotErr,
37 StateCollections, TypedState,
38};
39use crate::internal::state_diff::StateDiff;
40use crate::internal::state_versions::{EncodedRollup, StateVersions};
41use crate::internal::trace::FueledMergeReq;
42use crate::internal::watch::StateWatch;
43use crate::read::LeasedReaderId;
44use crate::rpc::{PUBSUB_PUSH_DIFF_ENABLED, PubSubSender};
45use crate::schema::SchemaCache;
46use crate::{Diagnostics, PersistConfig, ShardId};
47
48#[derive(Debug)]
53pub struct Applier<K, V, T, D> {
54 pub(crate) cfg: PersistConfig,
55 pub(crate) metrics: Arc<Metrics>,
56 pub(crate) shard_metrics: Arc<ShardMetrics>,
57 pub(crate) state_versions: Arc<StateVersions>,
58 shared_states: Arc<StateCache>,
59 pubsub_sender: Arc<dyn PubSubSender>,
60 pub(crate) shard_id: ShardId,
61
62 state: Arc<LockingTypedState<K, V, T, D>>,
71}
72
73impl<K, V, T: Clone, D> Clone for Applier<K, V, T, D> {
75 fn clone(&self) -> Self {
76 Self {
77 cfg: self.cfg.clone(),
78 metrics: Arc::clone(&self.metrics),
79 shard_metrics: Arc::clone(&self.shard_metrics),
80 state_versions: Arc::clone(&self.state_versions),
81 shared_states: Arc::clone(&self.shared_states),
82 pubsub_sender: Arc::clone(&self.pubsub_sender),
83 shard_id: self.shard_id,
84 state: Arc::clone(&self.state),
85 }
86 }
87}
88
89impl<K, V, T, D> Applier<K, V, T, D>
90where
91 K: Debug + Codec,
92 V: Debug + Codec,
93 T: Timestamp + Lattice + Codec64 + Sync,
94 D: Monoid + Codec64,
95{
96 pub async fn new(
97 cfg: PersistConfig,
98 shard_id: ShardId,
99 metrics: Arc<Metrics>,
100 state_versions: Arc<StateVersions>,
101 shared_states: Arc<StateCache>,
102 pubsub_sender: Arc<dyn PubSubSender>,
103 diagnostics: Diagnostics,
104 ) -> Result<Self, Box<CodecMismatch>> {
105 let shard_metrics = metrics.shards.shard(&shard_id, &diagnostics.shard_name);
106 let state = shared_states
107 .get::<K, V, T, D, _, _>(
108 shard_id,
109 || {
110 metrics.cmds.init_state.run_cmd(&shard_metrics, || {
111 state_versions.maybe_init_shard(&shard_metrics)
112 })
113 },
114 &diagnostics,
115 )
116 .await?;
117 let ret = Applier {
118 cfg,
119 metrics,
120 shard_metrics,
121 state_versions,
122 shared_states,
123 pubsub_sender,
124 shard_id,
125 state,
126 };
127 Ok(ret)
128 }
129
130 pub fn watch(&self) -> StateWatch<K, V, T, D> {
132 StateWatch::new(Arc::clone(&self.state), Arc::clone(&self.metrics))
133 }
134
135 pub async fn fetch_upper<R, F: FnMut(&Antichain<T>) -> R>(&self, mut f: F) -> R {
137 self.metrics.cmds.fetch_upper_count.inc();
138 self.fetch_and_update_state(None).await;
139 self.upper(|_seqno, upper| f(upper))
140 }
141
142 pub fn clone_upper(&self) -> Antichain<T> {
149 self.upper(|_seqno, upper| upper.clone())
150 }
151
152 pub(crate) fn upper<R, F: FnMut(SeqNo, &Antichain<T>) -> R>(&self, mut f: F) -> R {
153 self.state
154 .read_lock(&self.metrics.locks.applier_read_cacheable, move |state| {
155 f(state.seqno, state.upper())
156 })
157 }
158
159 pub(crate) fn schemas<R>(
160 &self,
161 mut f: impl FnMut(SeqNo, &BTreeMap<SchemaId, EncodedSchemas>) -> R,
162 ) -> R {
163 self.state
164 .read_lock(&self.metrics.locks.applier_read_cacheable, move |state| {
165 f(state.seqno, &state.collections.schemas)
166 })
167 }
168
169 pub(crate) fn schema_cache(&self) -> SchemaCache<K, V, T, D> {
170 SchemaCache::new(self.state.schema_cache(), self.clone())
171 }
172
173 #[cfg(test)]
180 pub fn since(&self) -> Antichain<T> {
181 self.state
182 .read_lock(&self.metrics.locks.applier_read_cacheable, |state| {
183 state.since().clone()
184 })
185 }
186
187 pub fn reader_lease(&self, id: LeasedReaderId) -> Option<LeasedReaderState<T>> {
194 self.state
195 .read_lock(&self.metrics.locks.applier_read_cacheable, |state| {
196 state.state.collections.leased_readers.get(&id).cloned()
197 })
198 }
199
200 pub fn seqno(&self) -> SeqNo {
207 self.state
208 .read_lock(&self.metrics.locks.applier_read_cacheable, |state| {
209 state.seqno
210 })
211 }
212
213 pub fn seqno_since(&self) -> SeqNo {
220 self.state
221 .read_lock(&self.metrics.locks.applier_read_cacheable, |state| {
222 state.seqno_since()
223 })
224 }
225
226 pub fn is_finalized(&self) -> bool {
233 self.state
234 .read_lock(&self.metrics.locks.applier_read_cacheable, |state| {
235 state.collections.is_tombstone() && state.collections.is_single_empty_batch()
236 })
237 }
238
239 pub fn get_schema(&self, schema_id: SchemaId) -> Option<(K::Schema, V::Schema)> {
241 self.state
242 .read_lock(&self.metrics.locks.applier_read_cacheable, |state| {
243 let x = state.collections.schemas.get(&schema_id)?;
244 Some((K::decode_schema(&x.key), V::decode_schema(&x.val)))
245 })
246 }
247
248 pub fn latest_schema(&self) -> Option<(SchemaId, K::Schema, V::Schema)> {
250 self.state
251 .read_lock(&self.metrics.locks.applier_read_cacheable, |state| {
252 let (id, x) = state.collections.schemas.last_key_value()?;
253 Some((*id, K::decode_schema(&x.key), V::decode_schema(&x.val)))
254 })
255 }
256
257 pub fn find_schema(&self, key_schema: &K::Schema, val_schema: &V::Schema) -> Option<SchemaId> {
259 self.state
260 .read_lock(&self.metrics.locks.applier_read_cacheable, |state| {
261 let mut schemas = state.collections.schemas.iter().rev();
264 schemas
265 .find(|(_, x)| {
266 K::decode_schema(&x.key) == *key_schema
267 && V::decode_schema(&x.val) == *val_schema
268 })
269 .map(|(id, _)| *id)
270 })
271 }
272
273 pub fn check_since_upper_both_empty(&self) -> Result<(), InvalidUsage<T>> {
279 self.state
280 .read_lock(&self.metrics.locks.applier_read_cacheable, |state| {
281 if state.since().is_empty() && state.upper().is_empty() {
282 Ok(())
283 } else {
284 Err(InvalidUsage::FinalizationError {
285 since: state.since().clone(),
286 upper: state.upper().clone(),
287 })
288 }
289 })
290 }
291
292 pub fn rollups_lte_seqno(&self, seqno: SeqNo) -> Vec<(SeqNo, PartialRollupKey)> {
298 self.state
299 .read_lock(&self.metrics.locks.applier_read_noncacheable, |state| {
300 state
301 .collections
302 .rollups
303 .range(..=seqno)
304 .map(|(seqno, rollup)| (*seqno, rollup.key.clone()))
305 .collect::<Vec<(SeqNo, PartialRollupKey)>>()
306 })
307 }
308
309 pub fn all_fueled_merge_reqs(&self) -> Vec<FueledMergeReq<T>> {
310 self.state
311 .read_lock(&self.metrics.locks.applier_read_noncacheable, |state| {
312 state
313 .collections
314 .trace
315 .fueled_merge_reqs_before_ms(u64::MAX, None)
316 .collect()
317 })
318 }
319
320 pub fn snapshot(&self, as_of: &Antichain<T>) -> Result<Vec<HollowBatch<T>>, SnapshotErr<T>> {
321 self.state
322 .read_lock(&self.metrics.locks.applier_read_noncacheable, |state| {
323 state.snapshot(as_of)
324 })
325 }
326
327 pub fn all_batches(&self) -> Vec<HollowBatch<T>> {
328 self.state
329 .read_lock(&self.metrics.locks.applier_read_noncacheable, |state| {
330 state.state.collections.trace.batches().cloned().collect()
331 })
332 }
333
334 pub fn verify_listen(&self, as_of: &Antichain<T>) -> Result<(), Since<T>> {
335 self.state
336 .read_lock(&self.metrics.locks.applier_read_noncacheable, |state| {
337 state.verify_listen(as_of)
338 })
339 }
340
341 pub fn next_listen_batch(&self, frontier: &Antichain<T>) -> Result<HollowBatch<T>, SeqNo> {
342 self.state
343 .read_lock(&self.metrics.locks.applier_read_noncacheable, |state| {
344 state.next_listen_batch(frontier)
345 })
346 }
347
348 pub async fn write_rollup_for_state(&self) -> Option<EncodedRollup> {
349 let state = self
350 .state
351 .read_lock(&self.metrics.locks.applier_read_noncacheable, |state| {
352 state.clone_for_rollup()
353 });
354
355 self.state_versions
356 .write_rollup_for_state(self.shard_metrics.as_ref(), state, &RollupId::new())
357 .await
358 }
359
360 pub async fn apply_unbatched_cmd<
361 R,
362 E,
363 WorkFn: FnMut(SeqNo, &PersistConfig, &mut StateCollections<T>) -> ControlFlow<E, R>,
364 >(
365 &self,
366 cmd: &CmdMetrics,
367 mut work_fn: WorkFn,
368 ) -> Result<(SeqNo, Result<R, E>, RoutineMaintenance), Indeterminate> {
369 loop {
370 cmd.started.inc();
371 let now = Instant::now();
372 let ret = Self::apply_unbatched_cmd_locked(
373 &self.state,
374 cmd,
375 &mut work_fn,
376 &self.cfg,
377 &self.metrics,
378 &self.shard_metrics,
379 &self.state_versions,
380 )
381 .await;
382 cmd.seconds.inc_by(now.elapsed().as_secs_f64());
383
384 match ret {
385 ApplyCmdResult::Committed((diff, new_state, res, maintenance)) => {
386 cmd.succeeded.inc();
387 self.shard_metrics.cmd_succeeded.inc();
388 self.update_state(new_state);
389 if PUBSUB_PUSH_DIFF_ENABLED.get(&self.cfg) {
390 self.pubsub_sender.push_diff(&self.shard_id, &diff);
391 }
392 return Ok((diff.seqno, Ok(res), maintenance));
393 }
394 ApplyCmdResult::SkippedStateTransition((seqno, err, maintenance)) => {
395 cmd.succeeded.inc();
396 self.shard_metrics.cmd_succeeded.inc();
397 return Ok((seqno, Err(err), maintenance));
398 }
399 ApplyCmdResult::Indeterminate(err) => {
400 cmd.failed.inc();
401 return Err(err);
402 }
403 ApplyCmdResult::ExpectationMismatch(seqno) => {
404 cmd.cas_mismatch.inc();
405 self.fetch_and_update_state(Some(seqno)).await;
406 }
407 }
408 }
409 }
410
411 #[allow(clippy::needless_pass_by_ref_mut)]
413 async fn apply_unbatched_cmd_locked<
414 R,
415 E,
416 WorkFn: FnMut(SeqNo, &PersistConfig, &mut StateCollections<T>) -> ControlFlow<E, R>,
417 >(
418 state: &LockingTypedState<K, V, T, D>,
419 cmd: &CmdMetrics,
420 work_fn: &mut WorkFn,
421 cfg: &PersistConfig,
422 metrics: &Metrics,
423 shard_metrics: &ShardMetrics,
424 state_versions: &StateVersions,
425 ) -> ApplyCmdResult<K, V, T, D, R, E> {
426 let computed_next_state = state
427 .read_lock(&metrics.locks.applier_read_noncacheable, |state| {
428 Self::compute_next_state_locked(state, work_fn, metrics, cmd, cfg)
429 });
430
431 let next_state = match computed_next_state {
432 Ok(x) => x,
433 Err((seqno, err)) => {
434 return ApplyCmdResult::SkippedStateTransition((
435 seqno,
436 err,
437 RoutineMaintenance::default(),
438 ));
439 }
440 };
441
442 let NextState {
443 expected,
444 diff,
445 state,
446 expiry_metrics,
447 garbage_collection,
448 write_rollup,
449 work_ret,
450 } = next_state;
451
452 let cas_res = state_versions
458 .try_compare_and_set_current(&cmd.name, shard_metrics, Some(expected), &state, &diff)
459 .await;
460
461 match cas_res {
462 Ok((CaSResult::Committed, diff)) => {
463 assert!(
464 expected <= state.seqno,
465 "state seqno regressed: {} vs {}",
466 expected,
467 state.seqno
468 );
469
470 metrics
471 .lease
472 .timeout_read
473 .inc_by(u64::cast_from(expiry_metrics.readers_expired));
474
475 metrics
476 .state
477 .writer_removed
478 .inc_by(u64::cast_from(expiry_metrics.writers_expired));
479
480 if let Some(gc) = garbage_collection.as_ref() {
481 debug!("Assigned gc request: {:?}", gc);
482 }
483
484 let maintenance = RoutineMaintenance {
485 garbage_collection,
486 write_rollup,
487 };
488
489 ApplyCmdResult::Committed((diff, state, work_ret, maintenance))
490 }
491 Ok((CaSResult::ExpectationMismatch, _diff)) => {
492 ApplyCmdResult::ExpectationMismatch(expected)
493 }
494 Err(err) => ApplyCmdResult::Indeterminate(err),
495 }
496 }
497
498 fn compute_next_state_locked<
499 R,
500 E,
501 WorkFn: FnMut(SeqNo, &PersistConfig, &mut StateCollections<T>) -> ControlFlow<E, R>,
502 >(
503 state: &TypedState<K, V, T, D>,
504 work_fn: &mut WorkFn,
505 metrics: &Metrics,
506 cmd: &CmdMetrics,
507 cfg: &PersistConfig,
508 ) -> Result<NextState<K, V, T, D, R>, (SeqNo, E)> {
509 let is_write = cmd.name == metrics.cmds.compare_and_append.name;
510 let is_rollup = cmd.name == metrics.cmds.add_rollup.name;
511 let is_become_tombstone = cmd.name == metrics.cmds.become_tombstone.name;
512
513 let gc_config = GcConfig {
514 use_active_gc: GC_USE_ACTIVE_GC.get(cfg),
515 fallback_threshold_ms: u64::cast_from(GC_FALLBACK_THRESHOLD_MS.get(cfg)),
516 min_versions: GC_MIN_VERSIONS.get(cfg),
517 max_versions: GC_MAX_VERSIONS.get(cfg),
518 };
519
520 let use_active_rollup = ROLLUP_USE_ACTIVE_ROLLUP.get(cfg);
521 let rollup_threshold = ROLLUP_THRESHOLD.get(cfg);
522 let rollup_fallback_threshold_ms = u64::cast_from(ROLLUP_FALLBACK_THRESHOLD_MS.get(cfg));
523
524 let expected = state.seqno;
525 let was_tombstone_before = state.collections.is_tombstone();
526
527 let (work_ret, mut new_state) = match state.clone_apply(cfg, work_fn) {
528 Continue(x) => x,
529 Break(err) => {
530 return Err((expected, err));
531 }
532 };
533 let expiry_metrics = new_state.expire_at((cfg.now)());
534 new_state.state.collections.trace.roundtrip_structure = true;
535
536 if was_tombstone_before && !(is_rollup || is_become_tombstone) {
546 panic!(
547 "cmd {} unexpectedly tried to commit a new state on a tombstone: {:?}",
548 cmd.name, state
549 );
550 }
551
552 let now = (cfg.now)();
553 let write_rollup = new_state.need_rollup(
554 rollup_threshold,
555 use_active_rollup,
556 rollup_fallback_threshold_ms,
557 now,
558 );
559
560 if let Some(write_rollup_seqno) = write_rollup {
561 if use_active_rollup {
562 new_state.collections.active_rollup = Some(ActiveRollup {
563 seqno: write_rollup_seqno,
564 start_ms: now,
565 });
566 }
567 }
568
569 let garbage_collection = new_state.maybe_gc(is_write, now, gc_config);
575
576 if let Some(gc) = garbage_collection.as_ref() {
577 if gc_config.use_active_gc {
578 new_state.collections.active_gc = Some(ActiveGc {
579 seqno: gc.new_seqno_since,
580 start_ms: now,
581 });
582 }
583 }
584
585 let diff = StateDiff::from_diff(&state.state, &new_state);
588 #[cfg(any(test, debug_assertions))]
591 {
592 if let Err(err) = StateDiff::validate_roundtrip(metrics, state, &diff, &new_state) {
593 panic!("validate_roundtrips failed: {}", err);
594 }
595 }
596
597 Ok(NextState {
598 expected,
599 diff,
600 state: new_state,
601 expiry_metrics,
602 garbage_collection,
603 write_rollup,
604 work_ret,
605 })
606 }
607
608 pub fn update_state(&self, new_state: TypedState<K, V, T, D>) {
609 let (seqno_before, seqno_after) =
610 self.state
611 .write_lock(&self.metrics.locks.applier_write, |state| {
612 let seqno_before = state.seqno;
613 if seqno_before < new_state.seqno {
614 *state = new_state;
615 }
616 let seqno_after = state.seqno;
617 (seqno_before, seqno_after)
618 });
619
620 assert!(
621 seqno_before <= seqno_after,
622 "state seqno regressed: {} vs {}",
623 seqno_before,
624 seqno_after
625 );
626 }
627
628 pub async fn fetch_and_update_state(&self, seqno_hint: Option<SeqNo>) {
632 let current_seqno = self.seqno();
633 let seqno_before = match seqno_hint {
634 None => current_seqno,
635 Some(hint) => {
636 if hint < current_seqno {
639 self.metrics.state.update_state_noop_path.inc();
640 return;
641 }
642 current_seqno
643 }
644 };
645
646 let diffs_to_current = self
647 .state_versions
648 .fetch_all_live_diffs_gt_seqno::<K, V, T, D>(&self.shard_id, seqno_before)
649 .await;
650
651 if diffs_to_current.is_empty() {
653 self.metrics.state.update_state_empty_path.inc();
654 return;
655 }
656
657 let new_seqno = self
658 .state
659 .write_lock(&self.metrics.locks.applier_write, |state| {
660 state.apply_encoded_diffs(&self.cfg, &self.metrics, &diffs_to_current);
661 state.seqno
662 });
663
664 assert!(
665 seqno_before <= new_seqno,
666 "state seqno regressed: {} vs {}",
667 seqno_before,
668 new_seqno
669 );
670
671 if seqno_before < new_seqno {
674 self.metrics.state.update_state_fast_path.inc();
675 return;
676 }
677
678 let new_state = self
682 .state_versions
683 .fetch_current_state(&self.shard_id, diffs_to_current)
684 .await
685 .check_codecs::<K, V, D>(&self.shard_id)
686 .expect("shard codecs should not change");
687
688 let new_seqno = self
689 .state
690 .write_lock(&self.metrics.locks.applier_write, |state| {
691 if state.seqno < new_state.seqno {
692 *state = new_state;
693 }
694 state.seqno
695 });
696
697 self.metrics.state.update_state_slow_path.inc();
698 assert!(
699 seqno_before <= new_seqno,
700 "state seqno regressed: {} vs {}",
701 seqno_before,
702 new_seqno
703 );
704 }
705}
706
707enum ApplyCmdResult<K, V, T, D, R, E> {
708 Committed((VersionedData, TypedState<K, V, T, D>, R, RoutineMaintenance)),
709 SkippedStateTransition((SeqNo, E, RoutineMaintenance)),
710 Indeterminate(Indeterminate),
711 ExpectationMismatch(SeqNo),
712}
713
714struct NextState<K, V, T, D, R> {
715 expected: SeqNo,
716 diff: StateDiff<T>,
717 state: TypedState<K, V, T, D>,
718 expiry_metrics: ExpiryMetrics,
719 write_rollup: Option<SeqNo>,
720 garbage_collection: Option<GcReq>,
721 work_ret: R,
722}