1use std::collections::BTreeMap;
13use std::fmt::Debug;
14use std::ops::ControlFlow::{self, Break, Continue};
15use std::sync::Arc;
16use std::time::Instant;
17
18use crate::cache::{LockingTypedState, StateCache};
19use crate::error::{CodecMismatch, InvalidUsage};
20use crate::internal::gc::GcReq;
21use crate::internal::maintenance::RoutineMaintenance;
22use crate::internal::metrics::{CmdMetrics, Metrics, ShardMetrics};
23use crate::internal::paths::{PartialRollupKey, RollupId};
24use crate::internal::state::{
25 ActiveGc, ActiveRollup, EncodedSchemas, ExpiryMetrics, GC_FALLBACK_THRESHOLD_MS,
26 GC_MAX_VERSIONS, GC_MIN_VERSIONS, GC_USE_ACTIVE_GC, GcConfig, HollowBatch, LeasedReaderState,
27 ROLLUP_FALLBACK_THRESHOLD_MS, ROLLUP_THRESHOLD, ROLLUP_USE_ACTIVE_ROLLUP, Since, SnapshotErr,
28 StateCollections, TypedState,
29};
30use crate::internal::state_diff::StateDiff;
31use crate::internal::state_versions::{EncodedRollup, StateVersions};
32use crate::internal::trace::FueledMergeReq;
33use crate::internal::watch::StateWatch;
34use crate::read::LeasedReaderId;
35use crate::rpc::{PUBSUB_PUSH_DIFF_ENABLED, PubSubSender};
36use crate::schema::SchemaCache;
37use crate::{Diagnostics, PersistConfig, ShardId, cfg};
38use differential_dataflow::difference::Monoid;
39use differential_dataflow::lattice::Lattice;
40use mz_ore::cast::CastFrom;
41use mz_ore::soft_assert_or_log;
42use mz_persist::location::{CaSResult, Indeterminate, SeqNo, VersionedData};
43use mz_persist_types::schema::SchemaId;
44use mz_persist_types::{Codec, Codec64};
45use timely::progress::{Antichain, Timestamp};
46use tracing::debug;
47
48#[derive(Debug)]
53pub struct Applier<K, V, T, D> {
54 pub(crate) cfg: PersistConfig,
55 pub(crate) metrics: Arc<Metrics>,
56 pub(crate) shard_metrics: Arc<ShardMetrics>,
57 pub(crate) state_versions: Arc<StateVersions>,
58 pubsub_sender: Arc<dyn PubSubSender>,
59 pub(crate) shard_id: ShardId,
60
61 state: Arc<LockingTypedState<K, V, T, D>>,
70}
71
72impl<K, V, T: Clone, D> Clone for Applier<K, V, T, D> {
74 fn clone(&self) -> Self {
75 Self {
76 cfg: self.cfg.clone(),
77 metrics: Arc::clone(&self.metrics),
78 shard_metrics: Arc::clone(&self.shard_metrics),
79 state_versions: Arc::clone(&self.state_versions),
80 pubsub_sender: Arc::clone(&self.pubsub_sender),
81 shard_id: self.shard_id,
82 state: Arc::clone(&self.state),
83 }
84 }
85}
86
87impl<K, V, T, D> Applier<K, V, T, D>
88where
89 K: Debug + Codec,
90 V: Debug + Codec,
91 T: Timestamp + Lattice + Codec64 + Sync,
92 D: Monoid + Codec64,
93{
94 pub async fn new(
95 cfg: PersistConfig,
96 shard_id: ShardId,
97 metrics: Arc<Metrics>,
98 state_versions: Arc<StateVersions>,
99 shared_states: Arc<StateCache>,
100 pubsub_sender: Arc<dyn PubSubSender>,
101 diagnostics: Diagnostics,
102 ) -> Result<Self, Box<CodecMismatch>> {
103 let shard_metrics = metrics.shards.shard(&shard_id, &diagnostics.shard_name);
104 let state = shared_states
105 .get::<K, V, T, D, _, _>(
106 shard_id,
107 || {
108 metrics.cmds.init_state.run_cmd(&shard_metrics, || {
109 state_versions.maybe_init_shard(&shard_metrics)
110 })
111 },
112 &diagnostics,
113 )
114 .await?;
115 let ret = Applier {
116 cfg,
117 metrics,
118 shard_metrics,
119 state_versions,
120 pubsub_sender,
121 shard_id,
122 state,
123 };
124 Ok(ret)
125 }
126
127 pub fn watch(&self) -> StateWatch<K, V, T, D> {
129 StateWatch::new(Arc::clone(&self.state), Arc::clone(&self.metrics))
130 }
131
132 pub async fn fetch_upper<R, F: FnMut(&Antichain<T>) -> R>(&self, mut f: F) -> R {
134 self.metrics.cmds.fetch_upper_count.inc();
135 self.fetch_and_update_state(None).await;
136 self.upper(|_seqno, upper| f(upper))
137 }
138
139 pub fn clone_upper(&self) -> Antichain<T> {
146 self.upper(|_seqno, upper| upper.clone())
147 }
148
149 pub(crate) fn upper<R, F: FnMut(SeqNo, &Antichain<T>) -> R>(&self, mut f: F) -> R {
150 self.state
151 .read_lock(&self.metrics.locks.applier_read_cacheable, move |state| {
152 f(state.seqno, state.upper())
153 })
154 }
155
156 pub(crate) fn schemas<R>(
157 &self,
158 mut f: impl FnMut(SeqNo, &BTreeMap<SchemaId, EncodedSchemas>) -> R,
159 ) -> R {
160 self.state
161 .read_lock(&self.metrics.locks.applier_read_cacheable, move |state| {
162 f(state.seqno, &state.collections.schemas)
163 })
164 }
165
166 pub(crate) fn schema_cache(&self) -> SchemaCache<K, V, T, D> {
167 SchemaCache::new(self.state.schema_cache(), self.clone())
168 }
169
170 #[cfg(test)]
177 pub fn since(&self) -> Antichain<T> {
178 self.state
179 .read_lock(&self.metrics.locks.applier_read_cacheable, |state| {
180 state.since().clone()
181 })
182 }
183
184 pub fn reader_lease(&self, id: LeasedReaderId) -> Option<LeasedReaderState<T>> {
191 self.state
192 .read_lock(&self.metrics.locks.applier_read_cacheable, |state| {
193 state.state.collections.leased_readers.get(&id).cloned()
194 })
195 }
196
197 pub fn seqno(&self) -> SeqNo {
204 self.state
205 .read_lock(&self.metrics.locks.applier_read_cacheable, |state| {
206 state.seqno
207 })
208 }
209
210 pub fn seqno_since(&self) -> SeqNo {
217 self.state
218 .read_lock(&self.metrics.locks.applier_read_cacheable, |state| {
219 state.seqno_since()
220 })
221 }
222
223 pub fn is_finalized(&self) -> bool {
230 self.state
231 .read_lock(&self.metrics.locks.applier_read_cacheable, |state| {
232 state.collections.is_tombstone() && state.collections.is_single_empty_batch()
233 })
234 }
235
236 pub fn get_schema(&self, schema_id: SchemaId) -> Option<(K::Schema, V::Schema)> {
238 self.state
239 .read_lock(&self.metrics.locks.applier_read_cacheable, |state| {
240 let x = state.collections.schemas.get(&schema_id)?;
241 Some((K::decode_schema(&x.key), V::decode_schema(&x.val)))
242 })
243 }
244
245 pub fn latest_schema(&self) -> Option<(SchemaId, K::Schema, V::Schema)> {
247 self.state
248 .read_lock(&self.metrics.locks.applier_read_cacheable, |state| {
249 let (id, x) = state.collections.schemas.last_key_value()?;
250 Some((*id, K::decode_schema(&x.key), V::decode_schema(&x.val)))
251 })
252 }
253
254 pub fn find_schema(&self, key_schema: &K::Schema, val_schema: &V::Schema) -> Option<SchemaId> {
256 self.state
257 .read_lock(&self.metrics.locks.applier_read_cacheable, |state| {
258 let mut schemas = state.collections.schemas.iter().rev();
261 schemas
262 .find(|(_, x)| {
263 K::decode_schema(&x.key) == *key_schema
264 && V::decode_schema(&x.val) == *val_schema
265 })
266 .map(|(id, _)| *id)
267 })
268 }
269
270 pub fn check_since_upper_both_empty(&self) -> Result<(), InvalidUsage<T>> {
276 self.state
277 .read_lock(&self.metrics.locks.applier_read_cacheable, |state| {
278 if state.since().is_empty() && state.upper().is_empty() {
279 Ok(())
280 } else {
281 Err(InvalidUsage::FinalizationError {
282 since: state.since().clone(),
283 upper: state.upper().clone(),
284 })
285 }
286 })
287 }
288
289 pub fn rollups_lte_seqno(&self, seqno: SeqNo) -> Vec<(SeqNo, PartialRollupKey)> {
295 self.state
296 .read_lock(&self.metrics.locks.applier_read_noncacheable, |state| {
297 state
298 .collections
299 .rollups
300 .range(..=seqno)
301 .map(|(seqno, rollup)| (*seqno, rollup.key.clone()))
302 .collect::<Vec<(SeqNo, PartialRollupKey)>>()
303 })
304 }
305
306 pub fn all_fueled_merge_reqs(&self) -> Vec<FueledMergeReq<T>> {
307 self.state
308 .read_lock(&self.metrics.locks.applier_read_noncacheable, |state| {
309 state
310 .collections
311 .trace
312 .fueled_merge_reqs_before_ms(u64::MAX, None)
313 .collect()
314 })
315 }
316
317 pub fn snapshot(&self, as_of: &Antichain<T>) -> Result<Vec<HollowBatch<T>>, SnapshotErr<T>> {
318 self.state
319 .read_lock(&self.metrics.locks.applier_read_noncacheable, |state| {
320 state.snapshot(as_of)
321 })
322 }
323
324 pub fn all_batches(&self) -> Vec<HollowBatch<T>> {
325 self.state
326 .read_lock(&self.metrics.locks.applier_read_noncacheable, |state| {
327 state.state.collections.trace.batches().cloned().collect()
328 })
329 }
330
331 pub fn verify_listen(&self, as_of: &Antichain<T>) -> Result<(), Since<T>> {
332 self.state
333 .read_lock(&self.metrics.locks.applier_read_noncacheable, |state| {
334 state.verify_listen(as_of)
335 })
336 }
337
338 pub fn next_listen_batch(&self, frontier: &Antichain<T>) -> Result<HollowBatch<T>, SeqNo> {
339 self.state
340 .read_lock(&self.metrics.locks.applier_read_noncacheable, |state| {
341 state.next_listen_batch(frontier)
342 })
343 }
344
345 pub async fn write_rollup_for_state(&self) -> Option<EncodedRollup> {
346 let state = self
347 .state
348 .read_lock(&self.metrics.locks.applier_read_noncacheable, |state| {
349 state.clone_for_rollup()
350 });
351
352 self.state_versions
353 .write_rollup_for_state(self.shard_metrics.as_ref(), state, &RollupId::new())
354 .await
355 }
356
357 pub async fn apply_unbatched_cmd<
358 R,
359 E,
360 WorkFn: FnMut(SeqNo, &PersistConfig, &mut StateCollections<T>) -> ControlFlow<E, R>,
361 >(
362 &self,
363 cmd: &CmdMetrics,
364 mut work_fn: WorkFn,
365 ) -> Result<(SeqNo, Result<R, E>, RoutineMaintenance), Indeterminate> {
366 loop {
367 cmd.started.inc();
368 let now = Instant::now();
369 let ret = Self::apply_unbatched_cmd_locked(
370 &self.state,
371 cmd,
372 &mut work_fn,
373 &self.cfg,
374 &self.metrics,
375 &self.shard_metrics,
376 &self.state_versions,
377 )
378 .await;
379 cmd.seconds.inc_by(now.elapsed().as_secs_f64());
380
381 match ret {
382 ApplyCmdResult::Committed((diff, new_state, res, maintenance)) => {
383 cmd.succeeded.inc();
384 self.shard_metrics.cmd_succeeded.inc();
385 self.update_state(new_state);
386 if PUBSUB_PUSH_DIFF_ENABLED.get(&self.cfg) {
387 self.pubsub_sender.push_diff(&self.shard_id, &diff);
388 }
389 return Ok((diff.seqno, Ok(res), maintenance));
390 }
391 ApplyCmdResult::SkippedStateTransition((seqno, err, maintenance)) => {
392 cmd.succeeded.inc();
393 self.shard_metrics.cmd_succeeded.inc();
394 return Ok((seqno, Err(err), maintenance));
395 }
396 ApplyCmdResult::Indeterminate(err) => {
397 cmd.failed.inc();
398 return Err(err);
399 }
400 ApplyCmdResult::ExpectationMismatch(seqno) => {
401 cmd.cas_mismatch.inc();
402 self.fetch_and_update_state(Some(seqno)).await;
403 }
404 }
405 }
406 }
407
408 #[allow(clippy::needless_pass_by_ref_mut)]
410 async fn apply_unbatched_cmd_locked<
411 R,
412 E,
413 WorkFn: FnMut(SeqNo, &PersistConfig, &mut StateCollections<T>) -> ControlFlow<E, R>,
414 >(
415 state: &LockingTypedState<K, V, T, D>,
416 cmd: &CmdMetrics,
417 work_fn: &mut WorkFn,
418 cfg: &PersistConfig,
419 metrics: &Metrics,
420 shard_metrics: &ShardMetrics,
421 state_versions: &StateVersions,
422 ) -> ApplyCmdResult<K, V, T, D, R, E> {
423 let _permit_opt = state.lease_for_update().await;
428
429 let computed_next_state = state
430 .read_lock(&metrics.locks.applier_read_noncacheable, |state| {
431 Self::compute_next_state_locked(state, work_fn, metrics, cmd, cfg)
432 });
433
434 let next_state = match computed_next_state {
435 Ok(x) => x,
436 Err((seqno, err)) => {
437 return ApplyCmdResult::SkippedStateTransition((
438 seqno,
439 err,
440 RoutineMaintenance::default(),
441 ));
442 }
443 };
444
445 let NextState {
446 expected,
447 diff,
448 state,
449 expiry_metrics,
450 garbage_collection,
451 write_rollup,
452 work_ret,
453 } = next_state;
454
455 {
456 let build_version = &cfg.build_version;
457 let state_version = &state.state.collections.version;
458 soft_assert_or_log!(
459 cfg::code_can_write_data(build_version, state_version),
460 "current version {build_version} does not support state format {state_version}"
461 );
462 }
463
464 let cas_res = state_versions
470 .try_compare_and_set_current(&cmd.name, shard_metrics, Some(expected), &state, &diff)
471 .await;
472
473 match cas_res {
474 Ok((CaSResult::Committed, diff)) => {
475 assert!(
476 expected <= state.seqno,
477 "state seqno regressed: {} vs {}",
478 expected,
479 state.seqno
480 );
481
482 metrics
483 .lease
484 .timeout_read
485 .inc_by(u64::cast_from(expiry_metrics.readers_expired));
486
487 metrics
488 .state
489 .writer_removed
490 .inc_by(u64::cast_from(expiry_metrics.writers_expired));
491
492 if let Some(gc) = garbage_collection.as_ref() {
493 debug!("Assigned gc request: {:?}", gc);
494 }
495
496 let maintenance = RoutineMaintenance {
497 garbage_collection,
498 write_rollup,
499 };
500
501 ApplyCmdResult::Committed((diff, state, work_ret, maintenance))
502 }
503 Ok((CaSResult::ExpectationMismatch, _diff)) => {
504 ApplyCmdResult::ExpectationMismatch(expected)
505 }
506 Err(err) => ApplyCmdResult::Indeterminate(err),
507 }
508 }
509
510 fn compute_next_state_locked<
511 R,
512 E,
513 WorkFn: FnMut(SeqNo, &PersistConfig, &mut StateCollections<T>) -> ControlFlow<E, R>,
514 >(
515 state: &TypedState<K, V, T, D>,
516 work_fn: &mut WorkFn,
517 metrics: &Metrics,
518 cmd: &CmdMetrics,
519 cfg: &PersistConfig,
520 ) -> Result<NextState<K, V, T, D, R>, (SeqNo, E)> {
521 let is_write = cmd.name == metrics.cmds.compare_and_append.name;
522 let is_rollup = cmd.name == metrics.cmds.add_rollup.name;
523 let is_become_tombstone = cmd.name == metrics.cmds.become_tombstone.name;
524
525 let gc_config = GcConfig {
526 use_active_gc: GC_USE_ACTIVE_GC.get(cfg),
527 fallback_threshold_ms: u64::cast_from(GC_FALLBACK_THRESHOLD_MS.get(cfg)),
528 min_versions: GC_MIN_VERSIONS.get(cfg),
529 max_versions: GC_MAX_VERSIONS.get(cfg),
530 };
531
532 let use_active_rollup = ROLLUP_USE_ACTIVE_ROLLUP.get(cfg);
533 let rollup_threshold = ROLLUP_THRESHOLD.get(cfg);
534 let rollup_fallback_threshold_ms = u64::cast_from(ROLLUP_FALLBACK_THRESHOLD_MS.get(cfg));
535
536 let expected = state.seqno;
537 let was_tombstone_before = state.collections.is_tombstone();
538
539 let (work_ret, mut new_state) = match state.clone_apply(cfg, work_fn) {
540 Continue(x) => x,
541 Break(err) => {
542 return Err((expected, err));
543 }
544 };
545 let expiry_metrics = new_state.expire_at((cfg.now)());
546 new_state.state.collections.trace.roundtrip_structure = true;
547
548 if was_tombstone_before && !(is_rollup || is_become_tombstone) {
558 panic!(
559 "cmd {} unexpectedly tried to commit a new state on a tombstone: {:?}",
560 cmd.name, state
561 );
562 }
563
564 let now = (cfg.now)();
565 let write_rollup = new_state.need_rollup(
566 rollup_threshold,
567 use_active_rollup,
568 rollup_fallback_threshold_ms,
569 now,
570 );
571
572 if let Some(write_rollup_seqno) = write_rollup {
573 if use_active_rollup {
574 new_state.collections.active_rollup = Some(ActiveRollup {
575 seqno: write_rollup_seqno,
576 start_ms: now,
577 });
578 }
579 }
580
581 let garbage_collection = new_state.maybe_gc(is_write, now, gc_config);
587
588 if let Some(gc) = garbage_collection.as_ref() {
589 if gc_config.use_active_gc {
590 new_state.collections.active_gc = Some(ActiveGc {
591 seqno: gc.new_seqno_since,
592 start_ms: now,
593 });
594 }
595 }
596
597 let diff = StateDiff::from_diff(&state.state, &new_state);
600 #[cfg(any(test, debug_assertions))]
603 {
604 if let Err(err) = StateDiff::validate_roundtrip(metrics, state, &diff, &new_state) {
605 panic!("validate_roundtrips failed: {}", err);
606 }
607 }
608
609 Ok(NextState {
610 expected,
611 diff,
612 state: new_state,
613 expiry_metrics,
614 garbage_collection,
615 write_rollup,
616 work_ret,
617 })
618 }
619
620 pub fn update_state(&self, new_state: TypedState<K, V, T, D>) {
621 let (seqno_before, seqno_after) =
622 self.state
623 .write_lock(&self.metrics.locks.applier_write, |state| {
624 let seqno_before = state.seqno;
625 if seqno_before < new_state.seqno {
626 *state = new_state;
627 }
628 let seqno_after = state.seqno;
629 (seqno_before, seqno_after)
630 });
631
632 assert!(
633 seqno_before <= seqno_after,
634 "state seqno regressed: {} vs {}",
635 seqno_before,
636 seqno_after
637 );
638 }
639
640 pub async fn fetch_and_update_state(&self, seqno_hint: Option<SeqNo>) {
644 let current_seqno = self.seqno();
645 let seqno_before = match seqno_hint {
646 None => current_seqno,
647 Some(hint) => {
648 if hint < current_seqno {
651 self.metrics.state.update_state_noop_path.inc();
652 return;
653 }
654 current_seqno
655 }
656 };
657
658 let diffs_to_current = self
659 .state_versions
660 .fetch_all_live_diffs_gt_seqno::<K, V, T, D>(&self.shard_id, seqno_before)
661 .await;
662
663 if diffs_to_current.is_empty() {
665 self.metrics.state.update_state_empty_path.inc();
666 return;
667 }
668
669 let new_seqno = self
670 .state
671 .write_lock(&self.metrics.locks.applier_write, |state| {
672 state.apply_encoded_diffs(&self.cfg, &self.metrics, &diffs_to_current);
673 state.seqno
674 });
675
676 assert!(
677 seqno_before <= new_seqno,
678 "state seqno regressed: {} vs {}",
679 seqno_before,
680 new_seqno
681 );
682
683 if seqno_before < new_seqno {
686 self.metrics.state.update_state_fast_path.inc();
687 return;
688 }
689
690 let new_state = self
694 .state_versions
695 .fetch_current_state(&self.shard_id, diffs_to_current)
696 .await
697 .check_codecs::<K, V, D>(&self.shard_id)
698 .expect("shard codecs should not change");
699
700 let new_seqno = self
701 .state
702 .write_lock(&self.metrics.locks.applier_write, |state| {
703 if state.seqno < new_state.seqno {
704 *state = new_state;
705 }
706 state.seqno
707 });
708
709 self.metrics.state.update_state_slow_path.inc();
710 assert!(
711 seqno_before <= new_seqno,
712 "state seqno regressed: {} vs {}",
713 seqno_before,
714 new_seqno
715 );
716 }
717}
718
719enum ApplyCmdResult<K, V, T, D, R, E> {
720 Committed((VersionedData, TypedState<K, V, T, D>, R, RoutineMaintenance)),
721 SkippedStateTransition((SeqNo, E, RoutineMaintenance)),
722 Indeterminate(Indeterminate),
723 ExpectationMismatch(SeqNo),
724}
725
726struct NextState<K, V, T, D, R> {
727 expected: SeqNo,
728 diff: StateDiff<T>,
729 state: TypedState<K, V, T, D>,
730 expiry_metrics: ExpiryMetrics,
731 write_rollup: Option<SeqNo>,
732 garbage_collection: Option<GcReq>,
733 work_ret: R,
734}