1use std::collections::BTreeMap;
13use std::fmt::Debug;
14use std::ops::ControlFlow::{self, Break, Continue};
15use std::sync::Arc;
16use std::time::Instant;
17
18use differential_dataflow::difference::Semigroup;
19use differential_dataflow::lattice::Lattice;
20use mz_ore::cast::CastFrom;
21use mz_persist::location::{CaSResult, Indeterminate, SeqNo, VersionedData};
22use mz_persist_types::schema::SchemaId;
23use mz_persist_types::{Codec, Codec64};
24use timely::progress::{Antichain, Timestamp};
25use tracing::debug;
26
27use crate::cache::{LockingTypedState, StateCache};
28use crate::error::{CodecMismatch, InvalidUsage};
29use crate::internal::gc::GcReq;
30use crate::internal::maintenance::RoutineMaintenance;
31use crate::internal::metrics::{CmdMetrics, Metrics, ShardMetrics};
32use crate::internal::paths::{PartialRollupKey, RollupId};
33use crate::internal::state::{
34 ActiveGc, ActiveRollup, EncodedSchemas, ExpiryMetrics, GC_FALLBACK_THRESHOLD_MS,
35 GC_MAX_VERSIONS, GC_MIN_VERSIONS, GC_USE_ACTIVE_GC, GcConfig, HollowBatch, LeasedReaderState,
36 ROLLUP_FALLBACK_THRESHOLD_MS, ROLLUP_THRESHOLD, ROLLUP_USE_ACTIVE_ROLLUP, Since, SnapshotErr,
37 StateCollections, TypedState,
38};
39use crate::internal::state_diff::StateDiff;
40use crate::internal::state_versions::{EncodedRollup, StateVersions};
41use crate::internal::trace::FueledMergeReq;
42use crate::internal::watch::StateWatch;
43use crate::read::LeasedReaderId;
44use crate::rpc::{PUBSUB_PUSH_DIFF_ENABLED, PubSubSender};
45use crate::schema::SchemaCache;
46use crate::{Diagnostics, PersistConfig, ShardId};
47
48#[derive(Debug)]
53pub struct Applier<K, V, T, D> {
54 pub(crate) cfg: PersistConfig,
55 pub(crate) metrics: Arc<Metrics>,
56 pub(crate) shard_metrics: Arc<ShardMetrics>,
57 pub(crate) state_versions: Arc<StateVersions>,
58 shared_states: Arc<StateCache>,
59 pubsub_sender: Arc<dyn PubSubSender>,
60 pub(crate) shard_id: ShardId,
61
62 state: Arc<LockingTypedState<K, V, T, D>>,
71}
72
73impl<K, V, T: Clone, D> Clone for Applier<K, V, T, D> {
75 fn clone(&self) -> Self {
76 Self {
77 cfg: self.cfg.clone(),
78 metrics: Arc::clone(&self.metrics),
79 shard_metrics: Arc::clone(&self.shard_metrics),
80 state_versions: Arc::clone(&self.state_versions),
81 shared_states: Arc::clone(&self.shared_states),
82 pubsub_sender: Arc::clone(&self.pubsub_sender),
83 shard_id: self.shard_id,
84 state: Arc::clone(&self.state),
85 }
86 }
87}
88
89impl<K, V, T, D> Applier<K, V, T, D>
90where
91 K: Debug + Codec,
92 V: Debug + Codec,
93 T: Timestamp + Lattice + Codec64 + Sync,
94 D: Semigroup + Codec64,
95{
96 pub async fn new(
97 cfg: PersistConfig,
98 shard_id: ShardId,
99 metrics: Arc<Metrics>,
100 state_versions: Arc<StateVersions>,
101 shared_states: Arc<StateCache>,
102 pubsub_sender: Arc<dyn PubSubSender>,
103 diagnostics: Diagnostics,
104 ) -> Result<Self, Box<CodecMismatch>> {
105 let shard_metrics = metrics.shards.shard(&shard_id, &diagnostics.shard_name);
106 let state = shared_states
107 .get::<K, V, T, D, _, _>(
108 shard_id,
109 || {
110 metrics.cmds.init_state.run_cmd(&shard_metrics, || {
111 state_versions.maybe_init_shard(&shard_metrics)
112 })
113 },
114 &diagnostics,
115 )
116 .await?;
117 let ret = Applier {
118 cfg,
119 metrics,
120 shard_metrics,
121 state_versions,
122 shared_states,
123 pubsub_sender,
124 shard_id,
125 state,
126 };
127 Ok(ret)
128 }
129
130 pub fn watch(&self) -> StateWatch<K, V, T, D> {
132 StateWatch::new(Arc::clone(&self.state), Arc::clone(&self.metrics))
133 }
134
135 pub async fn fetch_upper<R, F: FnMut(&Antichain<T>) -> R>(&self, mut f: F) -> R {
137 self.metrics.cmds.fetch_upper_count.inc();
138 self.fetch_and_update_state(None).await;
139 self.upper(|_seqno, upper| f(upper))
140 }
141
142 pub fn clone_upper(&self) -> Antichain<T> {
149 self.upper(|_seqno, upper| upper.clone())
150 }
151
152 pub(crate) fn upper<R, F: FnMut(SeqNo, &Antichain<T>) -> R>(&self, mut f: F) -> R {
153 self.state
154 .read_lock(&self.metrics.locks.applier_read_cacheable, move |state| {
155 f(state.seqno, state.upper())
156 })
157 }
158
159 pub(crate) fn schemas<R>(
160 &self,
161 mut f: impl FnMut(SeqNo, &BTreeMap<SchemaId, EncodedSchemas>) -> R,
162 ) -> R {
163 self.state
164 .read_lock(&self.metrics.locks.applier_read_cacheable, move |state| {
165 f(state.seqno, &state.collections.schemas)
166 })
167 }
168
169 pub(crate) fn schema_cache(&self) -> SchemaCache<K, V, T, D> {
170 SchemaCache::new(self.state.schema_cache(), self.clone())
171 }
172
173 #[cfg(test)]
180 pub fn since(&self) -> Antichain<T> {
181 self.state
182 .read_lock(&self.metrics.locks.applier_read_cacheable, |state| {
183 state.since().clone()
184 })
185 }
186
187 pub fn reader_lease(&self, id: LeasedReaderId) -> Option<LeasedReaderState<T>> {
194 self.state
195 .read_lock(&self.metrics.locks.applier_read_cacheable, |state| {
196 state.state.collections.leased_readers.get(&id).cloned()
197 })
198 }
199
200 pub fn seqno(&self) -> SeqNo {
207 self.state
208 .read_lock(&self.metrics.locks.applier_read_cacheable, |state| {
209 state.seqno
210 })
211 }
212
213 pub fn seqno_since(&self) -> SeqNo {
220 self.state
221 .read_lock(&self.metrics.locks.applier_read_cacheable, |state| {
222 state.seqno_since()
223 })
224 }
225
226 pub fn is_finalized(&self) -> bool {
233 self.state
234 .read_lock(&self.metrics.locks.applier_read_cacheable, |state| {
235 state.collections.is_tombstone() && state.collections.is_single_empty_batch()
236 })
237 }
238
239 pub fn get_schema(&self, schema_id: SchemaId) -> Option<(K::Schema, V::Schema)> {
241 self.state
242 .read_lock(&self.metrics.locks.applier_read_cacheable, |state| {
243 let x = state.collections.schemas.get(&schema_id)?;
244 Some((K::decode_schema(&x.key), V::decode_schema(&x.val)))
245 })
246 }
247
248 pub fn latest_schema(&self) -> Option<(SchemaId, K::Schema, V::Schema)> {
250 self.state
251 .read_lock(&self.metrics.locks.applier_read_cacheable, |state| {
252 let (id, x) = state.collections.schemas.last_key_value()?;
253 Some((*id, K::decode_schema(&x.key), V::decode_schema(&x.val)))
254 })
255 }
256
257 pub fn check_since_upper_both_empty(&self) -> Result<(), InvalidUsage<T>> {
263 self.state
264 .read_lock(&self.metrics.locks.applier_read_cacheable, |state| {
265 if state.since().is_empty() && state.upper().is_empty() {
266 Ok(())
267 } else {
268 Err(InvalidUsage::FinalizationError {
269 since: state.since().clone(),
270 upper: state.upper().clone(),
271 })
272 }
273 })
274 }
275
276 pub fn rollups_lte_seqno(&self, seqno: SeqNo) -> Vec<(SeqNo, PartialRollupKey)> {
282 self.state
283 .read_lock(&self.metrics.locks.applier_read_noncacheable, |state| {
284 state
285 .collections
286 .rollups
287 .range(..=seqno)
288 .map(|(seqno, rollup)| (*seqno, rollup.key.clone()))
289 .collect::<Vec<(SeqNo, PartialRollupKey)>>()
290 })
291 }
292
293 pub fn all_fueled_merge_reqs(&self) -> Vec<FueledMergeReq<T>> {
294 self.state
295 .read_lock(&self.metrics.locks.applier_read_noncacheable, |state| {
296 state
297 .collections
298 .trace
299 .fueled_merge_reqs_before_ms(u64::MAX, None)
300 .collect()
301 })
302 }
303
304 pub fn snapshot(&self, as_of: &Antichain<T>) -> Result<Vec<HollowBatch<T>>, SnapshotErr<T>> {
305 self.state
306 .read_lock(&self.metrics.locks.applier_read_noncacheable, |state| {
307 state.snapshot(as_of)
308 })
309 }
310
311 pub fn all_batches(&self) -> Vec<HollowBatch<T>> {
312 self.state
313 .read_lock(&self.metrics.locks.applier_read_noncacheable, |state| {
314 state.state.collections.trace.batches().cloned().collect()
315 })
316 }
317
318 pub fn verify_listen(&self, as_of: &Antichain<T>) -> Result<(), Since<T>> {
319 self.state
320 .read_lock(&self.metrics.locks.applier_read_noncacheable, |state| {
321 state.verify_listen(as_of)
322 })
323 }
324
325 pub fn next_listen_batch(&self, frontier: &Antichain<T>) -> Result<HollowBatch<T>, SeqNo> {
326 self.state
327 .read_lock(&self.metrics.locks.applier_read_noncacheable, |state| {
328 state.next_listen_batch(frontier)
329 })
330 }
331
332 pub async fn write_rollup_for_state(&self) -> Option<EncodedRollup> {
333 let state = self
334 .state
335 .read_lock(&self.metrics.locks.applier_read_noncacheable, |state| {
336 state.clone_for_rollup()
337 });
338
339 self.state_versions
340 .write_rollup_for_state(self.shard_metrics.as_ref(), state, &RollupId::new())
341 .await
342 }
343
344 pub async fn apply_unbatched_cmd<
345 R,
346 E,
347 WorkFn: FnMut(SeqNo, &PersistConfig, &mut StateCollections<T>) -> ControlFlow<E, R>,
348 >(
349 &self,
350 cmd: &CmdMetrics,
351 mut work_fn: WorkFn,
352 ) -> Result<(SeqNo, Result<R, E>, RoutineMaintenance), Indeterminate> {
353 loop {
354 cmd.started.inc();
355 let now = Instant::now();
356 let ret = Self::apply_unbatched_cmd_locked(
357 &self.state,
358 cmd,
359 &mut work_fn,
360 &self.cfg,
361 &self.metrics,
362 &self.shard_metrics,
363 &self.state_versions,
364 )
365 .await;
366 cmd.seconds.inc_by(now.elapsed().as_secs_f64());
367
368 match ret {
369 ApplyCmdResult::Committed((diff, new_state, res, maintenance)) => {
370 cmd.succeeded.inc();
371 self.shard_metrics.cmd_succeeded.inc();
372 self.update_state(new_state);
373 if PUBSUB_PUSH_DIFF_ENABLED.get(&self.cfg) {
374 self.pubsub_sender.push_diff(&self.shard_id, &diff);
375 }
376 return Ok((diff.seqno, Ok(res), maintenance));
377 }
378 ApplyCmdResult::SkippedStateTransition((seqno, err, maintenance)) => {
379 cmd.succeeded.inc();
380 self.shard_metrics.cmd_succeeded.inc();
381 return Ok((seqno, Err(err), maintenance));
382 }
383 ApplyCmdResult::Indeterminate(err) => {
384 cmd.failed.inc();
385 return Err(err);
386 }
387 ApplyCmdResult::ExpectationMismatch(seqno) => {
388 cmd.cas_mismatch.inc();
389 self.fetch_and_update_state(Some(seqno)).await;
390 }
391 }
392 }
393 }
394
395 #[allow(clippy::needless_pass_by_ref_mut)]
397 async fn apply_unbatched_cmd_locked<
398 R,
399 E,
400 WorkFn: FnMut(SeqNo, &PersistConfig, &mut StateCollections<T>) -> ControlFlow<E, R>,
401 >(
402 state: &LockingTypedState<K, V, T, D>,
403 cmd: &CmdMetrics,
404 work_fn: &mut WorkFn,
405 cfg: &PersistConfig,
406 metrics: &Metrics,
407 shard_metrics: &ShardMetrics,
408 state_versions: &StateVersions,
409 ) -> ApplyCmdResult<K, V, T, D, R, E> {
410 let computed_next_state = state
411 .read_lock(&metrics.locks.applier_read_noncacheable, |state| {
412 Self::compute_next_state_locked(state, work_fn, metrics, cmd, cfg)
413 });
414
415 let next_state = match computed_next_state {
416 Ok(x) => x,
417 Err((seqno, err)) => {
418 return ApplyCmdResult::SkippedStateTransition((
419 seqno,
420 err,
421 RoutineMaintenance::default(),
422 ));
423 }
424 };
425
426 let NextState {
427 expected,
428 diff,
429 state,
430 expiry_metrics,
431 garbage_collection,
432 write_rollup,
433 work_ret,
434 } = next_state;
435
436 let cas_res = state_versions
442 .try_compare_and_set_current(&cmd.name, shard_metrics, Some(expected), &state, &diff)
443 .await;
444
445 match cas_res {
446 Ok((CaSResult::Committed, diff)) => {
447 assert!(
448 expected <= state.seqno,
449 "state seqno regressed: {} vs {}",
450 expected,
451 state.seqno
452 );
453
454 metrics
455 .lease
456 .timeout_read
457 .inc_by(u64::cast_from(expiry_metrics.readers_expired));
458
459 metrics
460 .state
461 .writer_removed
462 .inc_by(u64::cast_from(expiry_metrics.writers_expired));
463
464 if let Some(gc) = garbage_collection.as_ref() {
465 debug!("Assigned gc request: {:?}", gc);
466 }
467
468 let maintenance = RoutineMaintenance {
469 garbage_collection,
470 write_rollup,
471 };
472
473 ApplyCmdResult::Committed((diff, state, work_ret, maintenance))
474 }
475 Ok((CaSResult::ExpectationMismatch, _diff)) => {
476 ApplyCmdResult::ExpectationMismatch(expected)
477 }
478 Err(err) => ApplyCmdResult::Indeterminate(err),
479 }
480 }
481
482 fn compute_next_state_locked<
483 R,
484 E,
485 WorkFn: FnMut(SeqNo, &PersistConfig, &mut StateCollections<T>) -> ControlFlow<E, R>,
486 >(
487 state: &TypedState<K, V, T, D>,
488 work_fn: &mut WorkFn,
489 metrics: &Metrics,
490 cmd: &CmdMetrics,
491 cfg: &PersistConfig,
492 ) -> Result<NextState<K, V, T, D, R>, (SeqNo, E)> {
493 let is_write = cmd.name == metrics.cmds.compare_and_append.name;
494 let is_rollup = cmd.name == metrics.cmds.add_rollup.name;
495 let is_become_tombstone = cmd.name == metrics.cmds.become_tombstone.name;
496
497 let gc_config = GcConfig {
498 use_active_gc: GC_USE_ACTIVE_GC.get(cfg),
499 fallback_threshold_ms: u64::cast_from(GC_FALLBACK_THRESHOLD_MS.get(cfg)),
500 min_versions: GC_MIN_VERSIONS.get(cfg),
501 max_versions: GC_MAX_VERSIONS.get(cfg),
502 };
503
504 let use_active_rollup = ROLLUP_USE_ACTIVE_ROLLUP.get(cfg);
505 let rollup_threshold = ROLLUP_THRESHOLD.get(cfg);
506 let rollup_fallback_threshold_ms = u64::cast_from(ROLLUP_FALLBACK_THRESHOLD_MS.get(cfg));
507
508 let expected = state.seqno;
509 let was_tombstone_before = state.collections.is_tombstone();
510
511 let (work_ret, mut new_state) = match state.clone_apply(cfg, work_fn) {
512 Continue(x) => x,
513 Break(err) => {
514 return Err((expected, err));
515 }
516 };
517 let expiry_metrics = new_state.expire_at((cfg.now)());
518 new_state.state.collections.trace.roundtrip_structure = true;
519
520 if was_tombstone_before && !(is_rollup || is_become_tombstone) {
530 panic!(
531 "cmd {} unexpectedly tried to commit a new state on a tombstone: {:?}",
532 cmd.name, state
533 );
534 }
535
536 let now = (cfg.now)();
537 let write_rollup = new_state.need_rollup(
538 rollup_threshold,
539 use_active_rollup,
540 rollup_fallback_threshold_ms,
541 now,
542 );
543
544 if let Some(write_rollup_seqno) = write_rollup {
545 if use_active_rollup {
546 new_state.collections.active_rollup = Some(ActiveRollup {
547 seqno: write_rollup_seqno,
548 start_ms: now,
549 });
550 }
551 }
552
553 let garbage_collection = new_state.maybe_gc(is_write, now, gc_config);
559
560 if let Some(gc) = garbage_collection.as_ref() {
561 if gc_config.use_active_gc {
562 new_state.collections.active_gc = Some(ActiveGc {
563 seqno: gc.new_seqno_since,
564 start_ms: now,
565 });
566 }
567 }
568
569 let diff = StateDiff::from_diff(&state.state, &new_state);
572 #[cfg(any(test, debug_assertions))]
575 {
576 if let Err(err) = StateDiff::validate_roundtrip(metrics, state, &diff, &new_state) {
577 panic!("validate_roundtrips failed: {}", err);
578 }
579 }
580
581 Ok(NextState {
582 expected,
583 diff,
584 state: new_state,
585 expiry_metrics,
586 garbage_collection,
587 write_rollup,
588 work_ret,
589 })
590 }
591
592 pub fn update_state(&self, new_state: TypedState<K, V, T, D>) {
593 let (seqno_before, seqno_after) =
594 self.state
595 .write_lock(&self.metrics.locks.applier_write, |state| {
596 let seqno_before = state.seqno;
597 if seqno_before < new_state.seqno {
598 *state = new_state;
599 }
600 let seqno_after = state.seqno;
601 (seqno_before, seqno_after)
602 });
603
604 assert!(
605 seqno_before <= seqno_after,
606 "state seqno regressed: {} vs {}",
607 seqno_before,
608 seqno_after
609 );
610 }
611
612 pub async fn fetch_and_update_state(&self, seqno_hint: Option<SeqNo>) {
616 let current_seqno = self.seqno();
617 let seqno_before = match seqno_hint {
618 None => current_seqno,
619 Some(hint) => {
620 if hint < current_seqno {
623 self.metrics.state.update_state_noop_path.inc();
624 return;
625 }
626 current_seqno
627 }
628 };
629
630 let diffs_to_current = self
631 .state_versions
632 .fetch_all_live_diffs_gt_seqno::<K, V, T, D>(&self.shard_id, seqno_before)
633 .await;
634
635 if diffs_to_current.is_empty() {
637 self.metrics.state.update_state_empty_path.inc();
638 return;
639 }
640
641 let new_seqno = self
642 .state
643 .write_lock(&self.metrics.locks.applier_write, |state| {
644 state.apply_encoded_diffs(&self.cfg, &self.metrics, &diffs_to_current);
645 state.seqno
646 });
647
648 assert!(
649 seqno_before <= new_seqno,
650 "state seqno regressed: {} vs {}",
651 seqno_before,
652 new_seqno
653 );
654
655 if seqno_before < new_seqno {
658 self.metrics.state.update_state_fast_path.inc();
659 return;
660 }
661
662 let new_state = self
666 .state_versions
667 .fetch_current_state(&self.shard_id, diffs_to_current)
668 .await
669 .check_codecs::<K, V, D>(&self.shard_id)
670 .expect("shard codecs should not change");
671
672 let new_seqno = self
673 .state
674 .write_lock(&self.metrics.locks.applier_write, |state| {
675 if state.seqno < new_state.seqno {
676 *state = new_state;
677 }
678 state.seqno
679 });
680
681 self.metrics.state.update_state_slow_path.inc();
682 assert!(
683 seqno_before <= new_seqno,
684 "state seqno regressed: {} vs {}",
685 seqno_before,
686 new_seqno
687 );
688 }
689}
690
691enum ApplyCmdResult<K, V, T, D, R, E> {
692 Committed((VersionedData, TypedState<K, V, T, D>, R, RoutineMaintenance)),
693 SkippedStateTransition((SeqNo, E, RoutineMaintenance)),
694 Indeterminate(Indeterminate),
695 ExpectationMismatch(SeqNo),
696}
697
698struct NextState<K, V, T, D, R> {
699 expected: SeqNo,
700 diff: StateDiff<T>,
701 state: TypedState<K, V, T, D>,
702 expiry_metrics: ExpiryMetrics,
703 write_rollup: Option<SeqNo>,
704 garbage_collection: Option<GcReq>,
705 work_ret: R,
706}