1use std::fmt::Debug;
13use std::ops::ControlFlow::{self, Break, Continue};
14use std::sync::Arc;
15use std::time::{Duration, Instant, SystemTime};
16
17use differential_dataflow::difference::Monoid;
18use differential_dataflow::lattice::Lattice;
19use futures::FutureExt;
20use futures::future::{self, BoxFuture};
21use mz_dyncfg::{Config, ConfigSet};
22use mz_ore::cast::CastFrom;
23use mz_ore::error::ErrorExt;
24#[allow(unused_imports)] use mz_ore::fmt::FormatBuffer;
26use mz_ore::{assert_none, soft_assert_no_log};
27use mz_persist::location::{ExternalError, Indeterminate, SeqNo};
28use mz_persist::retry::Retry;
29use mz_persist_types::schema::SchemaId;
30use mz_persist_types::{Codec, Codec64};
31use semver::Version;
32use timely::PartialOrder;
33use timely::progress::{Antichain, Timestamp};
34use tracing::{Instrument, debug, info, trace_span};
35
36use crate::async_runtime::IsolatedRuntime;
37use crate::batch::INLINE_WRITES_TOTAL_MAX_BYTES;
38use crate::cache::StateCache;
39use crate::cfg::RetryParameters;
40use crate::critical::{CriticalReaderId, Opaque};
41use crate::error::{CodecMismatch, InvalidUsage};
42use crate::internal::apply::Applier;
43use crate::internal::compact::CompactReq;
44use crate::internal::maintenance::{RoutineMaintenance, WriterMaintenance};
45use crate::internal::metrics::{CmdMetrics, Metrics, MetricsRetryStream, RetryMetrics};
46use crate::internal::paths::PartialRollupKey;
47use crate::internal::state::{
48 CompareAndAppendBreak, CriticalReaderState, HandleDebugState, HollowBatch, HollowRollup,
49 IdempotencyToken, LeasedReaderState, NoOpStateTransition, Since, SnapshotErr, StateCollections,
50};
51use crate::internal::state_versions::StateVersions;
52use crate::internal::trace::{ApplyMergeResult, FueledMergeRes};
53use crate::internal::watch::StateWatch;
54use crate::read::LeasedReaderId;
55use crate::rpc::PubSubSender;
56use crate::schema::CaESchema;
57use crate::write::WriterId;
58use crate::{Diagnostics, PersistConfig, ShardId};
59
60#[derive(Debug)]
61pub struct Machine<K, V, T, D> {
62 pub(crate) applier: Applier<K, V, T, D>,
63 pub(crate) isolated_runtime: Arc<IsolatedRuntime>,
64}
65
66impl<K, V, T: Clone, D> Clone for Machine<K, V, T, D> {
68 fn clone(&self) -> Self {
69 Self {
70 applier: self.applier.clone(),
71 isolated_runtime: Arc::clone(&self.isolated_runtime),
72 }
73 }
74}
75
76pub(crate) const CLAIM_UNCLAIMED_COMPACTIONS: Config<bool> = Config::new(
77 "persist_claim_unclaimed_compactions",
78 false,
79 "If an append doesn't result in a compaction request, but there is some uncompacted batch \
80 in state, compact that instead.",
81);
82
83pub(crate) const CLAIM_COMPACTION_PERCENT: Config<usize> = Config::new(
84 "persist_claim_compaction_percent",
85 100,
86 "Claim a compaction with the given percent chance, if claiming compactions is enabled. \
87 (If over 100, we'll always claim at least one; for example, if set to 365, we'll claim at least \
88 three and have a 65% chance of claiming a fourth.)",
89);
90
91pub(crate) const CLAIM_COMPACTION_MIN_VERSION: Config<String> = Config::new(
92 "persist_claim_compaction_min_version",
93 String::new(),
94 "If set to a valid version string, compact away any earlier versions if possible.",
95);
96
97impl<K, V, T, D> Machine<K, V, T, D>
98where
99 K: Debug + Codec,
100 V: Debug + Codec,
101 T: Timestamp + Lattice + Codec64 + Sync,
102 D: Monoid + Codec64,
103{
104 pub async fn new(
105 cfg: PersistConfig,
106 shard_id: ShardId,
107 metrics: Arc<Metrics>,
108 state_versions: Arc<StateVersions>,
109 shared_states: Arc<StateCache>,
110 pubsub_sender: Arc<dyn PubSubSender>,
111 isolated_runtime: Arc<IsolatedRuntime>,
112 diagnostics: Diagnostics,
113 ) -> Result<Self, Box<CodecMismatch>> {
114 let applier = Applier::new(
115 cfg,
116 shard_id,
117 metrics,
118 state_versions,
119 shared_states,
120 pubsub_sender,
121 diagnostics,
122 )
123 .await?;
124 Ok(Machine {
125 applier,
126 isolated_runtime,
127 })
128 }
129
130 pub fn shard_id(&self) -> ShardId {
131 self.applier.shard_id
132 }
133
134 pub fn seqno(&self) -> SeqNo {
135 self.applier.seqno()
136 }
137
138 pub async fn add_rollup_for_current_seqno(&self) -> RoutineMaintenance {
139 let rollup = self.applier.write_rollup_for_state().await;
140 let Some(rollup) = rollup else {
141 return RoutineMaintenance::default();
142 };
143
144 let (applied, maintenance) = self.add_rollup((rollup.seqno, &rollup.to_hollow())).await;
145 if !applied {
146 self.applier
149 .state_versions
150 .delete_rollup(&rollup.shard_id, &rollup.key)
151 .await;
152 }
153 maintenance
154 }
155
156 pub async fn add_rollup(
157 &self,
158 add_rollup: (SeqNo, &HollowRollup),
159 ) -> (bool, RoutineMaintenance) {
160 let mut applied_ever_true = false;
163 let metrics = Arc::clone(&self.applier.metrics);
164 let (_seqno, _applied, maintenance) = self
165 .apply_unbatched_idempotent_cmd(&metrics.cmds.add_rollup, |_, _, state| {
166 let ret = state.add_rollup(add_rollup);
167 if let Continue(applied) = ret {
168 applied_ever_true = applied_ever_true || applied;
169 }
170 ret
171 })
172 .await;
173 (applied_ever_true, maintenance)
174 }
175
176 pub async fn remove_rollups(
177 &self,
178 remove_rollups: &[(SeqNo, PartialRollupKey)],
179 ) -> (Vec<SeqNo>, RoutineMaintenance) {
180 let metrics = Arc::clone(&self.applier.metrics);
181 let (_seqno, removed_rollup_seqnos, maintenance) = self
182 .apply_unbatched_idempotent_cmd(&metrics.cmds.remove_rollups, |_, _, state| {
183 state.remove_rollups(remove_rollups)
184 })
185 .await;
186 (removed_rollup_seqnos, maintenance)
187 }
188
189 pub async fn upgrade_version(&self) -> Result<RoutineMaintenance, Version> {
192 let metrics = Arc::clone(&self.applier.metrics);
193 let (_seqno, upgrade_result, maintenance) = self
194 .apply_unbatched_idempotent_cmd(&metrics.cmds.remove_rollups, |_, cfg, state| {
195 if state.version <= cfg.build_version {
196 state.version = cfg.build_version.clone();
199 Continue(Ok(()))
200 } else {
201 Break(NoOpStateTransition(Err(state.version.clone())))
202 }
203 })
204 .await;
205
206 match upgrade_result {
207 Ok(()) => Ok(maintenance),
208 Err(version) => {
209 soft_assert_no_log!(
210 maintenance.is_empty(),
211 "should not generate maintenance on failed upgrade"
212 );
213 Err(version)
214 }
215 }
216 }
217
218 pub async fn register_leased_reader(
219 &self,
220 reader_id: &LeasedReaderId,
221 purpose: &str,
222 lease_duration: Duration,
223 heartbeat_timestamp_ms: u64,
224 use_critical_since: bool,
225 ) -> (LeasedReaderState<T>, RoutineMaintenance) {
226 let metrics = Arc::clone(&self.applier.metrics);
227 let (_seqno, (reader_state, seqno_since), maintenance) = self
228 .apply_unbatched_idempotent_cmd(&metrics.cmds.register, |seqno, cfg, state| {
229 state.register_leased_reader(
230 &cfg.hostname,
231 reader_id,
232 purpose,
233 seqno,
234 lease_duration,
235 heartbeat_timestamp_ms,
236 use_critical_since,
237 )
238 })
239 .await;
240 debug_assert!(
249 reader_state.seqno >= seqno_since,
250 "{} vs {}",
251 reader_state.seqno,
252 seqno_since,
253 );
254 (reader_state, maintenance)
255 }
256
257 pub async fn register_critical_reader(
258 &self,
259 reader_id: &CriticalReaderId,
260 default_opaque: Opaque,
261 purpose: &str,
262 ) -> (CriticalReaderState<T>, RoutineMaintenance) {
263 let metrics = Arc::clone(&self.applier.metrics);
264 let (_seqno, state, maintenance) = self
265 .apply_unbatched_idempotent_cmd(&metrics.cmds.register, |_seqno, cfg, state| {
266 state.register_critical_reader(
267 &cfg.hostname,
268 reader_id,
269 default_opaque.clone(),
270 purpose,
271 )
272 })
273 .await;
274 (state, maintenance)
275 }
276
277 pub async fn register_schema(
278 &self,
279 key_schema: &K::Schema,
280 val_schema: &V::Schema,
281 ) -> (Option<SchemaId>, RoutineMaintenance) {
282 let metrics = Arc::clone(&self.applier.metrics);
283 let (_seqno, state, maintenance) = self
284 .apply_unbatched_idempotent_cmd(&metrics.cmds.register, |_seqno, _cfg, state| {
285 state.register_schema::<K, V>(key_schema, val_schema)
286 })
287 .await;
288 (state, maintenance)
289 }
290
291 pub async fn spine_exert(&self, fuel: usize) -> (Vec<CompactReq<T>>, RoutineMaintenance) {
292 if fuel == 0 || self.applier.all_batches().len() < 2 {
294 return (Vec::new(), RoutineMaintenance::default());
295 }
296
297 let metrics = Arc::clone(&self.applier.metrics);
298 let (_seqno, reqs, maintenance) = self
299 .apply_unbatched_idempotent_cmd(&metrics.cmds.spine_exert, |_seqno, _cfg, state| {
300 state.spine_exert(fuel)
301 })
302 .await;
303 let reqs = reqs
304 .into_iter()
305 .map(|req| CompactReq {
306 shard_id: self.shard_id(),
307 desc: req.desc,
308 inputs: req.inputs,
309 })
310 .collect();
311 (reqs, maintenance)
312 }
313
314 pub async fn compare_and_append(
315 &self,
316 batch: &HollowBatch<T>,
317 writer_id: &WriterId,
318 debug_info: &HandleDebugState,
319 heartbeat_timestamp_ms: u64,
320 ) -> CompareAndAppendRes<T> {
321 let idempotency_token = IdempotencyToken::new();
322 loop {
323 let res = self
324 .compare_and_append_idempotent(
325 batch,
326 writer_id,
327 heartbeat_timestamp_ms,
328 &idempotency_token,
329 debug_info,
330 None,
331 )
332 .await;
333 match res {
334 CompareAndAppendRes::Success(seqno, maintenance) => {
335 return CompareAndAppendRes::Success(seqno, maintenance);
336 }
337 CompareAndAppendRes::InvalidUsage(x) => {
338 return CompareAndAppendRes::InvalidUsage(x);
339 }
340 CompareAndAppendRes::InlineBackpressure => {
341 return CompareAndAppendRes::InlineBackpressure;
342 }
343 CompareAndAppendRes::UpperMismatch(seqno, _current_upper) => {
344 self.applier.fetch_and_update_state(Some(seqno)).await;
351 let (current_seqno, current_upper) =
352 self.applier.upper(|seqno, upper| (seqno, upper.clone()));
353
354 if ¤t_upper != batch.desc.lower() {
357 return CompareAndAppendRes::UpperMismatch(current_seqno, current_upper);
358 } else {
359 }
362 }
363 }
364 }
365 }
366
367 async fn compare_and_append_idempotent(
368 &self,
369 batch: &HollowBatch<T>,
370 writer_id: &WriterId,
371 heartbeat_timestamp_ms: u64,
372 idempotency_token: &IdempotencyToken,
373 debug_info: &HandleDebugState,
374 mut indeterminate: Option<Indeterminate>,
378 ) -> CompareAndAppendRes<T> {
379 let metrics = Arc::clone(&self.applier.metrics);
380 let lease_duration_ms = self
381 .applier
382 .cfg
383 .writer_lease_duration
384 .as_millis()
385 .try_into()
386 .expect("reasonable duration");
387 let mut retry = self
470 .applier
471 .metrics
472 .retries
473 .compare_and_append_idempotent
474 .stream(Retry::persist_defaults(SystemTime::now()).into_retry_stream());
475 let mut writer_was_present = false;
476 loop {
477 let cmd_res = self
478 .applier
479 .apply_unbatched_cmd(&metrics.cmds.compare_and_append, |_, cfg, state| {
480 writer_was_present = state.writers.contains_key(writer_id);
481 state.compare_and_append(
482 batch,
483 writer_id,
484 heartbeat_timestamp_ms,
485 lease_duration_ms,
486 idempotency_token,
487 debug_info,
488 INLINE_WRITES_TOTAL_MAX_BYTES.get(cfg),
489 if CLAIM_UNCLAIMED_COMPACTIONS.get(cfg) {
490 CLAIM_COMPACTION_PERCENT.get(cfg)
491 } else {
492 0
493 },
494 Version::parse(&CLAIM_COMPACTION_MIN_VERSION.get(cfg))
495 .ok()
496 .as_ref(),
497 )
498 })
499 .await;
500 let (seqno, res, routine) = match cmd_res {
501 Ok(x) => x,
502 Err(err) => {
503 info!(
506 "compare_and_append received an indeterminate error, retrying in {:?}: {}",
507 retry.next_sleep(),
508 err
509 );
510 if indeterminate.is_none() {
511 indeterminate = Some(err);
512 }
513 retry = retry.sleep().await;
514 continue;
515 }
516 };
517 match res {
518 Ok(merge_reqs) => {
519 let mut compact_reqs = Vec::with_capacity(merge_reqs.len());
522 for req in merge_reqs {
523 let req = CompactReq {
524 shard_id: self.shard_id(),
525 desc: req.desc,
526 inputs: req.inputs,
527 };
528 compact_reqs.push(req);
529 }
530 let writer_maintenance = WriterMaintenance {
531 routine,
532 compaction: compact_reqs,
533 };
534
535 if !writer_was_present {
536 metrics.state.writer_added.inc();
537 }
538 for part in &batch.parts {
539 if part.is_inline() {
540 let bytes = u64::cast_from(part.inline_bytes());
541 metrics.inline.part_commit_bytes.inc_by(bytes);
542 metrics.inline.part_commit_count.inc();
543 }
544 }
545 return CompareAndAppendRes::Success(seqno, writer_maintenance);
546 }
547 Err(CompareAndAppendBreak::AlreadyCommitted) => {
548 assert!(indeterminate.is_some());
552 self.applier.metrics.cmds.compare_and_append_noop.inc();
553 if !writer_was_present {
554 metrics.state.writer_added.inc();
555 }
556 return CompareAndAppendRes::Success(seqno, WriterMaintenance::default());
557 }
558 Err(CompareAndAppendBreak::InvalidUsage(err)) => {
559 assert_none!(indeterminate);
564 return CompareAndAppendRes::InvalidUsage(err);
565 }
566 Err(CompareAndAppendBreak::InlineBackpressure) => {
567 return CompareAndAppendRes::InlineBackpressure;
570 }
571 Err(CompareAndAppendBreak::Upper {
572 shard_upper,
573 writer_upper,
574 }) => {
575 assert!(
580 PartialOrder::less_equal(&writer_upper, &shard_upper),
581 "{:?} vs {:?}",
582 &writer_upper,
583 &shard_upper
584 );
585 if PartialOrder::less_than(&writer_upper, batch.desc.upper()) {
586 return CompareAndAppendRes::UpperMismatch(seqno, shard_upper);
590 }
591 if indeterminate.is_none() {
592 return CompareAndAppendRes::UpperMismatch(seqno, shard_upper);
597 }
598 panic!(
608 concat!(
609 "cannot distinguish compare_and_append success or failure ",
610 "caa_lower={:?} caa_upper={:?} writer_upper={:?} shard_upper={:?} err={:?}"
611 ),
612 batch.desc.lower().elements(),
613 batch.desc.upper().elements(),
614 writer_upper.elements(),
615 shard_upper.elements(),
616 indeterminate,
617 );
618 }
619 };
620 }
621 }
622
623 pub async fn downgrade_since(
624 &self,
625 reader_id: &LeasedReaderId,
626 outstanding_seqno: SeqNo,
627 new_since: &Antichain<T>,
628 heartbeat_timestamp_ms: u64,
629 ) -> (SeqNo, Since<T>, RoutineMaintenance) {
630 let metrics = Arc::clone(&self.applier.metrics);
631 self.apply_unbatched_idempotent_cmd(&metrics.cmds.downgrade_since, |seqno, _cfg, state| {
632 state.downgrade_since(
633 reader_id,
634 seqno,
635 outstanding_seqno,
636 new_since,
637 heartbeat_timestamp_ms,
638 )
639 })
640 .await
641 }
642
643 pub async fn compare_and_downgrade_since(
644 &self,
645 reader_id: &CriticalReaderId,
646 expected_opaque: &Opaque,
647 (new_opaque, new_since): (&Opaque, &Antichain<T>),
648 ) -> (Result<Since<T>, (Opaque, Since<T>)>, RoutineMaintenance) {
649 let metrics = Arc::clone(&self.applier.metrics);
650 let (_seqno, res, maintenance) = self
651 .apply_unbatched_idempotent_cmd(
652 &metrics.cmds.compare_and_downgrade_since,
653 |_seqno, _cfg, state| {
654 state.compare_and_downgrade_since(
655 reader_id,
656 expected_opaque,
657 (new_opaque, new_since),
658 )
659 },
660 )
661 .await;
662
663 match res {
664 Ok(since) => (Ok(since), maintenance),
665 Err((opaque, since)) => (Err((opaque, since)), maintenance),
666 }
667 }
668
669 pub async fn expire_leased_reader(
670 &self,
671 reader_id: &LeasedReaderId,
672 ) -> (SeqNo, RoutineMaintenance) {
673 let metrics = Arc::clone(&self.applier.metrics);
674 let (seqno, _existed, maintenance) = self
675 .apply_unbatched_idempotent_cmd(&metrics.cmds.expire_reader, |_, _, state| {
676 state.expire_leased_reader(reader_id)
677 })
678 .await;
679 (seqno, maintenance)
680 }
681
682 #[allow(dead_code)] pub async fn expire_critical_reader(
684 &self,
685 reader_id: &CriticalReaderId,
686 ) -> (SeqNo, RoutineMaintenance) {
687 let metrics = Arc::clone(&self.applier.metrics);
688 let (seqno, _existed, maintenance) = self
689 .apply_unbatched_idempotent_cmd(&metrics.cmds.expire_reader, |_, _, state| {
690 state.expire_critical_reader(reader_id)
691 })
692 .await;
693 (seqno, maintenance)
694 }
695
696 pub async fn expire_writer(&self, writer_id: &WriterId) -> (SeqNo, RoutineMaintenance) {
697 let metrics = Arc::clone(&self.applier.metrics);
698 let (seqno, _existed, maintenance) = self
699 .apply_unbatched_idempotent_cmd(&metrics.cmds.expire_writer, |_, _, state| {
700 state.expire_writer(writer_id)
701 })
702 .await;
703 metrics.state.writer_removed.inc();
704 (seqno, maintenance)
705 }
706
707 pub fn is_finalized(&self) -> bool {
708 self.applier.is_finalized()
709 }
710
711 pub fn get_schema(&self, schema_id: SchemaId) -> Option<(K::Schema, V::Schema)> {
713 self.applier.get_schema(schema_id)
714 }
715
716 pub fn latest_schema(&self) -> Option<(SchemaId, K::Schema, V::Schema)> {
718 self.applier.latest_schema()
719 }
720
721 pub fn find_schema(&self, key_schema: &K::Schema, val_schema: &V::Schema) -> Option<SchemaId> {
723 self.applier.find_schema(key_schema, val_schema)
724 }
725
726 pub async fn compare_and_evolve_schema(
730 &self,
731 expected: SchemaId,
732 key_schema: &K::Schema,
733 val_schema: &V::Schema,
734 ) -> (CaESchema<K, V>, RoutineMaintenance) {
735 let metrics = Arc::clone(&self.applier.metrics);
736 let (_seqno, state, maintenance) = self
737 .apply_unbatched_idempotent_cmd(
738 &metrics.cmds.compare_and_evolve_schema,
739 |_seqno, _cfg, state| {
740 state.compare_and_evolve_schema::<K, V>(expected, key_schema, val_schema)
741 },
742 )
743 .await;
744 (state, maintenance)
745 }
746
747 async fn tombstone_step(&self) -> Result<(bool, RoutineMaintenance), InvalidUsage<T>> {
748 let metrics = Arc::clone(&self.applier.metrics);
749 let mut retry = self
750 .applier
751 .metrics
752 .retries
753 .idempotent_cmd
754 .stream(Retry::persist_defaults(SystemTime::now()).into_retry_stream());
755 loop {
756 let res = self
757 .applier
758 .apply_unbatched_cmd(&metrics.cmds.become_tombstone, |_, _, state| {
759 state.become_tombstone_and_shrink()
760 })
761 .await;
762 let err = match res {
763 Ok((_seqno, Ok(()), maintenance)) => return Ok((true, maintenance)),
764 Ok((_seqno, Err(NoOpStateTransition(())), maintenance)) => {
765 return Ok((false, maintenance));
766 }
767 Err(err) => err,
768 };
769 if retry.attempt() >= INFO_MIN_ATTEMPTS {
770 info!(
771 "become_tombstone received an indeterminate error, retrying in {:?}: {}",
772 retry.next_sleep(),
773 err
774 );
775 } else {
776 debug!(
777 "become_tombstone received an indeterminate error, retrying in {:?}: {}",
778 retry.next_sleep(),
779 err
780 );
781 }
782 retry = retry.sleep().await;
783 }
784 }
785
786 pub async fn become_tombstone(&self) -> Result<RoutineMaintenance, InvalidUsage<T>> {
787 self.applier.check_since_upper_both_empty()?;
788
789 let mut maintenance = RoutineMaintenance::default();
790
791 loop {
792 let (made_progress, more_maintenance) = self.tombstone_step().await?;
793 maintenance.merge(more_maintenance);
794 if !made_progress {
795 break;
796 }
797 }
798
799 Ok(maintenance)
800 }
801
802 pub async fn unleased_snapshot(
806 &self,
807 as_of: &Antichain<T>,
808 ) -> Result<Vec<HollowBatch<T>>, Since<T>> {
809 if let Ok(data) = self.applier.snapshot(as_of) {
810 return Ok(data);
811 }
812 let mut watch = self.applier.watch();
813 self.wait_for_upper_past(
814 as_of,
815 &mut watch,
816 None,
817 &self.applier.metrics.retries.snapshot,
818 RetryParameters::persist_defaults(),
819 )
820 .await;
821 match self.applier.snapshot(as_of) {
822 Ok(data) => Ok(data),
823 Err(SnapshotErr::AsOfHistoricalDistinctionsLost(since)) => Err(since),
824 Err(SnapshotErr::AsOfNotYetAvailable(seqno, upper)) => {
825 panic!(
826 "waited for upper past {as_of:?}, but at latest seqno {seqno:?} the frontier was only {upper:?}",
827 as_of = as_of.elements(),
828 upper = upper.0.elements(),
829 )
830 }
831 }
832 }
833
834 pub fn verify_listen(&self, as_of: &Antichain<T>) -> Result<(), Since<T>> {
836 self.applier.verify_listen(as_of)
837 }
838
839 pub async fn wait_for_upper_past(
840 &self,
841 frontier: &Antichain<T>,
842 watch: &mut StateWatch<K, V, T, D>,
843 reader_id: Option<&LeasedReaderId>,
844 metrics: &RetryMetrics,
845 retry: RetryParameters,
846 ) {
847 let start = Instant::now();
848 let wait_for_seqno_past = self.applier.upper(|seqno, upper| {
849 if PartialOrder::less_than(frontier, upper) {
850 None
851 } else {
852 Some((seqno, upper.clone()))
853 }
854 });
855 let Some((mut seqno, mut upper)) = wait_for_seqno_past else {
856 return;
858 };
859
860 let sleeps = metrics.stream(retry.into_retry(SystemTime::now()).into_retry_stream());
863
864 enum Wake<'a, K, V, T, D> {
865 Watch(&'a mut StateWatch<K, V, T, D>),
866 Sleep(MetricsRetryStream),
867 }
868 let mut watch_fut = std::pin::pin!(
869 watch
870 .wait_for_seqno_ge(seqno.next())
871 .map(Wake::Watch)
872 .instrument(trace_span!("snapshot::watch"))
873 );
874 let mut sleep_fut = std::pin::pin!(
875 sleeps
876 .sleep()
877 .map(Wake::Sleep)
878 .instrument(trace_span!("snapshot::sleep"))
879 );
880
881 let mut logged_at_info = false;
885 loop {
886 if !logged_at_info
890 && start.elapsed() >= Duration::from_millis(1024)
891 && metrics.name.as_str() == "snapshot"
892 {
893 logged_at_info = true;
894 info!(
895 shard_id =? self.shard_id(),
896 shard_name =? self.applier.shard_metrics.name,
897 reader_id =? reader_id,
898 wait_frontier =? frontier.elements(),
899 current_upper =? upper.elements(),
900 current_seqno =? seqno,
901 wait_for = &metrics.name,
902 "desired upper not yet available",
903 );
904 } else {
905 debug!(
906 shard_id =? self.shard_id(),
907 shard_name =? self.applier.shard_metrics.name,
908 reader_id =? reader_id,
909 wait_frontier =? frontier.elements(),
910 current_upper =? upper.elements(),
911 current_seqno =? seqno,
912 wait_for = &metrics.name,
913 "desired upper not yet available",
914 );
915 }
916
917 let wake = match future::select(watch_fut.as_mut(), sleep_fut.as_mut()).await {
918 future::Either::Left((wake, _)) => wake,
919 future::Either::Right((wake, _)) => wake,
920 };
921 match &wake {
925 Wake::Watch(_) => self.applier.metrics.watch.wait_woken_via_watch.inc(),
926 Wake::Sleep(_) => {
927 self.applier.metrics.watch.wait_woken_via_sleep.inc();
928 self.applier.fetch_and_update_state(Some(seqno)).await;
929 }
930 }
931
932 let wait_for_seqno_past = self.applier.upper(|seqno, upper| {
933 if PartialOrder::less_than(frontier, upper) {
934 None
935 } else {
936 Some((seqno, upper.clone()))
937 }
938 });
939 match wait_for_seqno_past {
940 None => {
941 match &wake {
942 Wake::Watch(_) => self.applier.metrics.watch.wait_resolved_via_watch.inc(),
943 Wake::Sleep(_) => self.applier.metrics.watch.wait_resolved_via_sleep.inc(),
944 }
945 return;
946 }
947 Some((s, u)) => {
948 seqno = s;
949 upper = u;
950 }
951 };
952
953 match wake {
956 Wake::Watch(watch) => {
957 watch_fut.set(
958 watch
959 .wait_for_seqno_ge(seqno.next())
960 .map(Wake::Watch)
961 .instrument(trace_span!("snapshot::watch")),
962 );
963 }
964 Wake::Sleep(sleeps) => {
965 debug!(
966 shard_id =? self.shard_id(),
967 shard_name =? self.applier.shard_metrics.name,
968 reader_id =? reader_id,
969 wait_frontier =? frontier.elements(),
970 current_upper =? upper.elements(),
971 current_seqno =? seqno,
972 wait_for = &metrics.name,
973 "didn't find new data, retrying in {:?}",
974 sleeps.next_sleep(),
975 );
976 sleep_fut.set(
977 sleeps
978 .sleep()
979 .map(Wake::Sleep)
980 .instrument(trace_span!("snapshot::sleep")),
981 );
982 }
983 }
984 }
985 }
986
987 async fn apply_unbatched_idempotent_cmd<
988 R,
989 WorkFn: FnMut(
990 SeqNo,
991 &PersistConfig,
992 &mut StateCollections<T>,
993 ) -> ControlFlow<NoOpStateTransition<R>, R>,
994 >(
995 &self,
996 cmd: &CmdMetrics,
997 mut work_fn: WorkFn,
998 ) -> (SeqNo, R, RoutineMaintenance) {
999 let mut retry = self
1000 .applier
1001 .metrics
1002 .retries
1003 .idempotent_cmd
1004 .stream(Retry::persist_defaults(SystemTime::now()).into_retry_stream());
1005 loop {
1006 match self.applier.apply_unbatched_cmd(cmd, &mut work_fn).await {
1007 Ok((seqno, x, maintenance)) => match x {
1008 Ok(x) => {
1009 return (seqno, x, maintenance);
1010 }
1011 Err(NoOpStateTransition(x)) => {
1012 return (seqno, x, maintenance);
1013 }
1014 },
1015 Err(err) => {
1016 if retry.attempt() >= INFO_MIN_ATTEMPTS {
1017 info!(
1018 "apply_unbatched_idempotent_cmd {} received an indeterminate error, retrying in {:?}: {}",
1019 cmd.name,
1020 retry.next_sleep(),
1021 err
1022 );
1023 } else {
1024 debug!(
1025 "apply_unbatched_idempotent_cmd {} received an indeterminate error, retrying in {:?}: {}",
1026 cmd.name,
1027 retry.next_sleep(),
1028 err
1029 );
1030 }
1031 retry = retry.sleep().await;
1032 continue;
1033 }
1034 }
1035 }
1036 }
1037}
1038
1039impl<K, V, T, D> Machine<K, V, T, D>
1040where
1041 K: Debug + Codec,
1042 V: Debug + Codec,
1043 T: Timestamp + Lattice + Codec64 + Sync,
1044 D: Monoid + Codec64 + PartialEq,
1045{
1046 pub async fn merge_res(
1047 &self,
1048 res: &FueledMergeRes<T>,
1049 ) -> (ApplyMergeResult, RoutineMaintenance) {
1050 let metrics = Arc::clone(&self.applier.metrics);
1051
1052 let mut merge_result_ever_applied = ApplyMergeResult::NotAppliedNoMatch;
1081 let (_seqno, _apply_merge_result, maintenance) = self
1082 .apply_unbatched_idempotent_cmd(&metrics.cmds.merge_res, |_, _, state| {
1083 let ret = state.apply_merge_res::<D>(res, &Arc::clone(&metrics).columnar);
1084 if let Continue(result) = ret {
1085 if result.applied() {
1087 merge_result_ever_applied = result;
1088 }
1089 if result.matched() && !result.applied() && !merge_result_ever_applied.applied()
1092 {
1093 merge_result_ever_applied = result;
1094 }
1095 }
1096 ret
1097 })
1098 .await;
1099 (merge_result_ever_applied, maintenance)
1100 }
1101}
1102
1103pub(crate) struct ExpireFn(
1104 pub(crate) Box<dyn FnOnce() -> BoxFuture<'static, ()> + Send + Sync + 'static>,
1109);
1110
1111impl Debug for ExpireFn {
1112 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1113 f.debug_struct("ExpireFn").finish_non_exhaustive()
1114 }
1115}
1116
1117#[derive(Debug)]
1118pub(crate) enum CompareAndAppendRes<T> {
1119 Success(SeqNo, WriterMaintenance<T>),
1120 InvalidUsage(InvalidUsage<T>),
1121 UpperMismatch(SeqNo, Antichain<T>),
1122 InlineBackpressure,
1123}
1124
1125#[cfg(test)]
1126impl<T: Debug> CompareAndAppendRes<T> {
1127 #[track_caller]
1128 fn unwrap(self) -> (SeqNo, WriterMaintenance<T>) {
1129 match self {
1130 CompareAndAppendRes::Success(seqno, maintenance) => (seqno, maintenance),
1131 x => panic!("{:?}", x),
1132 }
1133 }
1134}
1135
1136pub(crate) const NEXT_LISTEN_BATCH_RETRYER_FIXED_SLEEP: Config<Duration> = Config::new(
1137 "persist_next_listen_batch_retryer_fixed_sleep",
1138 Duration::from_millis(1200), "\
1140 The fixed sleep when polling for new batches from a Listen or Subscribe. Skipped if zero.",
1141);
1142
1143pub(crate) const NEXT_LISTEN_BATCH_RETRYER_INITIAL_BACKOFF: Config<Duration> = Config::new(
1144 "persist_next_listen_batch_retryer_initial_backoff",
1145 Duration::from_millis(100), "The initial backoff when polling for new batches from a Listen or Subscribe.",
1147);
1148
1149pub(crate) const NEXT_LISTEN_BATCH_RETRYER_MULTIPLIER: Config<u32> = Config::new(
1150 "persist_next_listen_batch_retryer_multiplier",
1151 2,
1152 "The backoff multiplier when polling for new batches from a Listen or Subscribe.",
1153);
1154
1155pub(crate) const NEXT_LISTEN_BATCH_RETRYER_CLAMP: Config<Duration> = Config::new(
1156 "persist_next_listen_batch_retryer_clamp",
1157 Duration::from_secs(16), "The backoff clamp duration when polling for new batches from a Listen or Subscribe.",
1159);
1160
1161pub(crate) fn next_listen_batch_retry_params(cfg: &ConfigSet) -> RetryParameters {
1162 RetryParameters {
1163 fixed_sleep: NEXT_LISTEN_BATCH_RETRYER_FIXED_SLEEP.get(cfg),
1164 initial_backoff: NEXT_LISTEN_BATCH_RETRYER_INITIAL_BACKOFF.get(cfg),
1165 multiplier: NEXT_LISTEN_BATCH_RETRYER_MULTIPLIER.get(cfg),
1166 clamp: NEXT_LISTEN_BATCH_RETRYER_CLAMP.get(cfg),
1167 }
1168}
1169
1170pub const INFO_MIN_ATTEMPTS: usize = 3;
1171
1172pub async fn retry_external<R, F, WorkFn>(metrics: &RetryMetrics, mut work_fn: WorkFn) -> R
1173where
1174 F: std::future::Future<Output = Result<R, ExternalError>>,
1175 WorkFn: FnMut() -> F,
1176{
1177 let mut retry = metrics.stream(Retry::persist_defaults(SystemTime::now()).into_retry_stream());
1178 loop {
1179 match work_fn().await {
1180 Ok(x) => {
1181 if retry.attempt() > 0 {
1182 debug!(
1183 "external operation {} succeeded after failing at least once",
1184 metrics.name,
1185 );
1186 }
1187 return x;
1188 }
1189 Err(err) => {
1190 if retry.attempt() >= INFO_MIN_ATTEMPTS {
1191 info!(
1192 "external operation {} failed, retrying in {:?}: {}",
1193 metrics.name,
1194 retry.next_sleep(),
1195 err.display_with_causes()
1196 );
1197 } else {
1198 debug!(
1199 "external operation {} failed, retrying in {:?}: {}",
1200 metrics.name,
1201 retry.next_sleep(),
1202 err.display_with_causes()
1203 );
1204 }
1205 retry = retry.sleep().await;
1206 }
1207 }
1208 }
1209}
1210
1211pub async fn retry_determinate<R, F, WorkFn>(
1212 metrics: &RetryMetrics,
1213 mut work_fn: WorkFn,
1214) -> Result<R, Indeterminate>
1215where
1216 F: std::future::Future<Output = Result<R, ExternalError>>,
1217 WorkFn: FnMut() -> F,
1218{
1219 let mut retry = metrics.stream(Retry::persist_defaults(SystemTime::now()).into_retry_stream());
1220 loop {
1221 match work_fn().await {
1222 Ok(x) => {
1223 if retry.attempt() > 0 {
1224 debug!(
1225 "external operation {} succeeded after failing at least once",
1226 metrics.name,
1227 );
1228 }
1229 return Ok(x);
1230 }
1231 Err(ExternalError::Determinate(err)) => {
1232 debug!(
1241 "external operation {} failed, retrying in {:?}: {}",
1242 metrics.name,
1243 retry.next_sleep(),
1244 err.display_with_causes()
1245 );
1246 retry = retry.sleep().await;
1247 continue;
1248 }
1249 Err(ExternalError::Indeterminate(x)) => return Err(x),
1250 }
1251 }
1252}
1253
1254#[cfg(test)]
1255pub mod datadriven {
1256 use std::collections::{BTreeMap, BTreeSet};
1257 use std::pin::pin;
1258 use std::sync::{Arc, LazyLock};
1259
1260 use anyhow::anyhow;
1261 use differential_dataflow::consolidation::consolidate_updates;
1262 use differential_dataflow::trace::Description;
1263 use futures::StreamExt;
1264 use mz_dyncfg::{ConfigUpdates, ConfigVal};
1265 use mz_persist::indexed::encoding::BlobTraceBatchPart;
1266 use mz_persist_types::codec_impls::{StringSchema, UnitSchema};
1267
1268 use crate::batch::{
1269 BLOB_TARGET_SIZE, Batch, BatchBuilder, BatchBuilderConfig, BatchBuilderInternal,
1270 BatchParts, validate_truncate_batch,
1271 };
1272 use crate::cfg::COMPACTION_MEMORY_BOUND_BYTES;
1273 use crate::fetch::{EncodedPart, FetchConfig};
1274 use crate::internal::compact::{CompactConfig, CompactReq, Compactor};
1275 use crate::internal::datadriven::DirectiveArgs;
1276 use crate::internal::encoding::Schemas;
1277 use crate::internal::gc::GcReq;
1278 use crate::internal::paths::{BlobKey, BlobKeyPrefix, PartialBlobKey};
1279 use crate::internal::state::{BatchPart, RunOrder, RunPart, Upper};
1280 use crate::internal::state_versions::EncodedRollup;
1281 use crate::internal::trace::{CompactionInput, IdHollowBatch, SpineId};
1282 use crate::read::{Listen, ListenEvent, READER_LEASE_DURATION};
1283 use crate::rpc::NoopPubSubSender;
1284 use crate::tests::new_test_client;
1285 use crate::write::COMBINE_INLINE_WRITES;
1286 use crate::{GarbageCollector, PersistClient};
1287
1288 use super::*;
1289
1290 static SCHEMAS: LazyLock<Schemas<String, ()>> = LazyLock::new(|| Schemas {
1291 id: Some(SchemaId(0)),
1292 key: Arc::new(StringSchema),
1293 val: Arc::new(UnitSchema),
1294 });
1295
1296 #[derive(Debug)]
1298 pub struct MachineState {
1299 pub client: PersistClient,
1300 pub shard_id: ShardId,
1301 pub state_versions: Arc<StateVersions>,
1302 pub machine: Machine<String, (), u64, i64>,
1303 pub gc: GarbageCollector<String, (), u64, i64>,
1304 pub batches: BTreeMap<String, IdHollowBatch<u64>>,
1305 pub next_id: usize,
1306 pub rollups: BTreeMap<String, EncodedRollup>,
1307 pub listens: BTreeMap<String, Listen<String, (), u64, i64>>,
1308 pub routine: Vec<RoutineMaintenance>,
1309 pub compactions: BTreeMap<String, CompactReq<u64>>,
1310 }
1311
1312 impl MachineState {
1313 pub async fn new(dyncfgs: &ConfigUpdates) -> Self {
1314 let shard_id = ShardId::new();
1315 let client = new_test_client(dyncfgs).await;
1316 client
1319 .cfg
1320 .set_config(&BLOB_TARGET_SIZE, *BLOB_TARGET_SIZE.default());
1321 client.cfg.set_config(&COMBINE_INLINE_WRITES, false);
1325 let state_versions = Arc::new(StateVersions::new(
1326 client.cfg.clone(),
1327 Arc::clone(&client.consensus),
1328 Arc::clone(&client.blob),
1329 Arc::clone(&client.metrics),
1330 ));
1331 let machine = Machine::new(
1332 client.cfg.clone(),
1333 shard_id,
1334 Arc::clone(&client.metrics),
1335 Arc::clone(&state_versions),
1336 Arc::clone(&client.shared_states),
1337 Arc::new(NoopPubSubSender),
1338 Arc::clone(&client.isolated_runtime),
1339 Diagnostics::for_tests(),
1340 )
1341 .await
1342 .expect("codecs should match");
1343 let gc = GarbageCollector::new(machine.clone(), Arc::clone(&client.isolated_runtime));
1344 MachineState {
1345 shard_id,
1346 client,
1347 state_versions,
1348 machine,
1349 gc,
1350 batches: BTreeMap::default(),
1351 rollups: BTreeMap::default(),
1352 listens: BTreeMap::default(),
1353 routine: Vec::new(),
1354 compactions: BTreeMap::default(),
1355 next_id: 0,
1356 }
1357 }
1358
1359 fn to_batch(&self, hollow: HollowBatch<u64>) -> Batch<String, (), u64, i64> {
1360 Batch::new(
1361 true,
1362 Arc::clone(&self.client.metrics),
1363 Arc::clone(&self.client.blob),
1364 self.client.metrics.shards.shard(&self.shard_id, "test"),
1365 self.client.cfg.build_version.clone(),
1366 (
1367 <String>::encode_schema(&*SCHEMAS.key),
1368 <()>::encode_schema(&*SCHEMAS.val),
1369 ),
1370 hollow,
1371 )
1372 }
1373 }
1374
1375 pub async fn consensus_scan(
1378 datadriven: &MachineState,
1379 args: DirectiveArgs<'_>,
1380 ) -> Result<String, anyhow::Error> {
1381 let from = args.expect("from_seqno");
1382
1383 let mut states = datadriven
1384 .state_versions
1385 .fetch_all_live_states::<u64>(datadriven.shard_id)
1386 .await
1387 .expect("should only be called on an initialized shard")
1388 .check_ts_codec()
1389 .expect("shard codecs should not change");
1390 let mut s = String::new();
1391 while let Some(x) = states.next(|_| {}) {
1392 if x.seqno < from {
1393 continue;
1394 }
1395 let rollups: Vec<_> = x
1396 .collections
1397 .rollups
1398 .keys()
1399 .map(|seqno| seqno.to_string())
1400 .collect();
1401 let batches: Vec<_> = x
1402 .collections
1403 .trace
1404 .batches()
1405 .filter(|b| !b.is_empty())
1406 .filter_map(|b| {
1407 datadriven
1408 .batches
1409 .iter()
1410 .find(|(_, original_batch)| original_batch.batch.parts == b.parts)
1411 .map(|(batch_name, _)| batch_name.to_owned())
1412 })
1413 .collect();
1414 write!(
1415 s,
1416 "seqno={} batches={} rollups={}\n",
1417 x.seqno,
1418 batches.join(","),
1419 rollups.join(","),
1420 );
1421 }
1422 Ok(s)
1423 }
1424
1425 pub async fn consensus_truncate(
1426 datadriven: &MachineState,
1427 args: DirectiveArgs<'_>,
1428 ) -> Result<String, anyhow::Error> {
1429 let to = args.expect("to_seqno");
1430 let removed = datadriven
1431 .client
1432 .consensus
1433 .truncate(&datadriven.shard_id.to_string(), to)
1434 .await
1435 .expect("valid truncation");
1436 Ok(format!("{:?}\n", removed))
1437 }
1438
1439 pub async fn blob_scan_batches(
1440 datadriven: &MachineState,
1441 _args: DirectiveArgs<'_>,
1442 ) -> Result<String, anyhow::Error> {
1443 let key_prefix = BlobKeyPrefix::Shard(&datadriven.shard_id).to_string();
1444
1445 let mut s = String::new();
1446 let () = datadriven
1447 .state_versions
1448 .blob
1449 .list_keys_and_metadata(&key_prefix, &mut |x| {
1450 let (_, key) = BlobKey::parse_ids(x.key).expect("key should be valid");
1451 if let PartialBlobKey::Batch(_, _) = key {
1452 write!(s, "{}: {}b\n", x.key, x.size_in_bytes);
1453 }
1454 })
1455 .await?;
1456 Ok(s)
1457 }
1458
1459 #[allow(clippy::unused_async)]
1460 pub async fn shard_desc(
1461 datadriven: &MachineState,
1462 _args: DirectiveArgs<'_>,
1463 ) -> Result<String, anyhow::Error> {
1464 Ok(format!(
1465 "since={:?} upper={:?}\n",
1466 datadriven.machine.applier.since().elements(),
1467 datadriven.machine.applier.clone_upper().elements()
1468 ))
1469 }
1470
1471 pub async fn downgrade_since(
1472 datadriven: &mut MachineState,
1473 args: DirectiveArgs<'_>,
1474 ) -> Result<String, anyhow::Error> {
1475 let since = args.expect_antichain("since");
1476 let seqno = args
1477 .optional("seqno")
1478 .unwrap_or_else(|| datadriven.machine.seqno());
1479 let reader_id = args.expect("reader_id");
1480 let (_, since, routine) = datadriven
1481 .machine
1482 .downgrade_since(
1483 &reader_id,
1484 seqno,
1485 &since,
1486 (datadriven.machine.applier.cfg.now)(),
1487 )
1488 .await;
1489 datadriven.routine.push(routine);
1490 Ok(format!(
1491 "{} {:?}\n",
1492 datadriven.machine.seqno(),
1493 since.0.elements()
1494 ))
1495 }
1496
1497 #[allow(clippy::unused_async)]
1498 pub async fn dyncfg(
1499 datadriven: &MachineState,
1500 args: DirectiveArgs<'_>,
1501 ) -> Result<String, anyhow::Error> {
1502 let mut updates = ConfigUpdates::default();
1503 for x in args.input.trim().split('\n') {
1504 match x.split(' ').collect::<Vec<_>>().as_slice() {
1505 &[name, val] => {
1506 let config = datadriven
1507 .client
1508 .cfg
1509 .entries()
1510 .find(|x| x.name() == name)
1511 .ok_or_else(|| anyhow!("unknown dyncfg: {}", name))?;
1512 match config.val() {
1513 ConfigVal::Usize(_) => {
1514 let val = val.parse().map_err(anyhow::Error::new)?;
1515 updates.add_dynamic(name, ConfigVal::Usize(val));
1516 }
1517 ConfigVal::Bool(_) => {
1518 let val = val.parse().map_err(anyhow::Error::new)?;
1519 updates.add_dynamic(name, ConfigVal::Bool(val));
1520 }
1521 x => unimplemented!("dyncfg type: {:?}", x),
1522 }
1523 }
1524 x => return Err(anyhow!("expected `name val` got: {:?}", x)),
1525 }
1526 }
1527 updates.apply(&datadriven.client.cfg);
1528
1529 Ok("ok\n".to_string())
1530 }
1531
1532 pub async fn compare_and_downgrade_since(
1533 datadriven: &mut MachineState,
1534 args: DirectiveArgs<'_>,
1535 ) -> Result<String, anyhow::Error> {
1536 let expected_opaque: u64 = args.expect("expect_opaque");
1537 let new_opaque: u64 = args.expect("opaque");
1538 let new_since = args.expect_antichain("since");
1539 let reader_id = args.expect("reader_id");
1540 let (res, routine) = datadriven
1541 .machine
1542 .compare_and_downgrade_since(
1543 &reader_id,
1544 &Opaque::encode(&expected_opaque),
1545 (&Opaque::encode(&new_opaque), &new_since),
1546 )
1547 .await;
1548 datadriven.routine.push(routine);
1549 let since = res.map_err(|(opaque, since)| {
1550 anyhow!(
1551 "mismatch: opaque={} since={:?}",
1552 opaque.decode::<u64>(),
1553 since.0.elements()
1554 )
1555 })?;
1556 Ok(format!(
1557 "{} {} {:?}\n",
1558 datadriven.machine.seqno(),
1559 new_opaque,
1560 since.0.elements()
1561 ))
1562 }
1563
1564 pub async fn write_rollup(
1565 datadriven: &mut MachineState,
1566 args: DirectiveArgs<'_>,
1567 ) -> Result<String, anyhow::Error> {
1568 let output = args.expect_str("output");
1569
1570 let rollup = datadriven
1571 .machine
1572 .applier
1573 .write_rollup_for_state()
1574 .await
1575 .expect("rollup");
1576
1577 datadriven
1578 .rollups
1579 .insert(output.to_string(), rollup.clone());
1580
1581 Ok(format!(
1582 "state={} diffs=[{}, {})\n",
1583 rollup.seqno,
1584 rollup._desc.lower().first().expect("seqno"),
1585 rollup._desc.upper().first().expect("seqno"),
1586 ))
1587 }
1588
1589 pub async fn add_rollup(
1590 datadriven: &mut MachineState,
1591 args: DirectiveArgs<'_>,
1592 ) -> Result<String, anyhow::Error> {
1593 let input = args.expect_str("input");
1594 let rollup = datadriven
1595 .rollups
1596 .get(input)
1597 .expect("unknown batch")
1598 .clone();
1599
1600 let (applied, maintenance) = datadriven
1601 .machine
1602 .add_rollup((rollup.seqno, &rollup.to_hollow()))
1603 .await;
1604
1605 if !applied {
1606 return Err(anyhow!("failed to apply rollup for: {}", rollup.seqno));
1607 }
1608
1609 datadriven.routine.push(maintenance);
1610 Ok(format!("{}\n", datadriven.machine.seqno()))
1611 }
1612
1613 pub async fn write_batch(
1614 datadriven: &mut MachineState,
1615 args: DirectiveArgs<'_>,
1616 ) -> Result<String, anyhow::Error> {
1617 let output = args.expect_str("output");
1618 let lower = args.expect_antichain("lower");
1619 let upper = args.expect_antichain("upper");
1620 assert!(PartialOrder::less_than(&lower, &upper));
1621 let since = args
1622 .optional_antichain("since")
1623 .unwrap_or_else(|| Antichain::from_elem(0));
1624 let target_size = args.optional("target_size");
1625 let parts_size_override = args.optional("parts_size_override");
1626 let consolidate = args.optional("consolidate").unwrap_or(true);
1627 let mut updates: Vec<_> = args
1628 .input
1629 .split('\n')
1630 .flat_map(DirectiveArgs::parse_update)
1631 .collect();
1632
1633 let mut cfg = BatchBuilderConfig::new(&datadriven.client.cfg, datadriven.shard_id);
1634 if let Some(target_size) = target_size {
1635 cfg.blob_target_size = target_size;
1636 };
1637 if consolidate {
1638 consolidate_updates(&mut updates);
1639 }
1640 let run_order = if consolidate {
1641 cfg.preferred_order
1642 } else {
1643 RunOrder::Unordered
1644 };
1645 let parts = BatchParts::new_ordered::<i64>(
1646 cfg.clone(),
1647 run_order,
1648 Arc::clone(&datadriven.client.metrics),
1649 Arc::clone(&datadriven.machine.applier.shard_metrics),
1650 datadriven.shard_id,
1651 Arc::clone(&datadriven.client.blob),
1652 Arc::clone(&datadriven.client.isolated_runtime),
1653 &datadriven.client.metrics.user,
1654 );
1655 let builder = BatchBuilderInternal::new(
1656 cfg.clone(),
1657 parts,
1658 Arc::clone(&datadriven.client.metrics),
1659 SCHEMAS.clone(),
1660 Arc::clone(&datadriven.client.blob),
1661 datadriven.shard_id.clone(),
1662 datadriven.client.cfg.build_version.clone(),
1663 );
1664 let mut builder = BatchBuilder::new(builder, Description::new(lower, upper.clone(), since));
1665 for ((k, ()), t, d) in updates {
1666 builder.add(&k, &(), &t, &d).await.expect("invalid batch");
1667 }
1668 let mut batch = builder.finish(upper).await?;
1669 if parts_size_override.is_some() {
1672 batch
1673 .flush_to_blob(
1674 &cfg,
1675 &datadriven.client.metrics.user,
1676 &datadriven.client.isolated_runtime,
1677 &SCHEMAS,
1678 )
1679 .await;
1680 }
1681 let batch = batch.into_hollow_batch();
1682 let batch = IdHollowBatch {
1683 batch: Arc::new(batch),
1684 id: SpineId(datadriven.next_id, datadriven.next_id + 1),
1685 };
1686 datadriven.next_id += 1;
1687
1688 if let Some(size) = parts_size_override {
1689 let mut batch = batch.clone();
1690 let mut hollow_batch = (*batch.batch).clone();
1691 for part in hollow_batch.parts.iter_mut() {
1692 match part {
1693 RunPart::Many(run) => run.max_part_bytes = size,
1694 RunPart::Single(BatchPart::Hollow(part)) => part.encoded_size_bytes = size,
1695 RunPart::Single(BatchPart::Inline { .. }) => unreachable!("flushed out above"),
1696 }
1697 }
1698 batch.batch = Arc::new(hollow_batch);
1699 datadriven.batches.insert(output.to_owned(), batch);
1700 } else {
1701 datadriven.batches.insert(output.to_owned(), batch.clone());
1702 }
1703 Ok(format!(
1704 "parts={} len={}\n",
1705 batch.batch.part_count(),
1706 batch.batch.len
1707 ))
1708 }
1709
1710 pub async fn fetch_batch(
1711 datadriven: &MachineState,
1712 args: DirectiveArgs<'_>,
1713 ) -> Result<String, anyhow::Error> {
1714 let input = args.expect_str("input");
1715 let stats = args.optional_str("stats");
1716 let batch = datadriven.batches.get(input).expect("unknown batch");
1717
1718 let mut s = String::new();
1719 let mut stream = pin!(
1720 batch
1721 .batch
1722 .part_stream(
1723 datadriven.shard_id,
1724 &*datadriven.state_versions.blob,
1725 &*datadriven.state_versions.metrics
1726 )
1727 .enumerate()
1728 );
1729 while let Some((idx, part)) = stream.next().await {
1730 let part = &*part?;
1731 write!(s, "<part {idx}>\n");
1732
1733 let lower = match part {
1734 BatchPart::Inline { updates, .. } => {
1735 let updates: BlobTraceBatchPart<u64> =
1736 updates.decode(&datadriven.client.metrics.columnar)?;
1737 updates.structured_key_lower()
1738 }
1739 other @ BatchPart::Hollow(_) => other.structured_key_lower(),
1740 };
1741
1742 if let Some(lower) = lower {
1743 if stats == Some("lower") {
1744 writeln!(s, "<key lower={}>", lower.get())
1745 }
1746 }
1747
1748 match part {
1749 BatchPart::Hollow(part) => {
1750 let blob_batch = datadriven
1751 .client
1752 .blob
1753 .get(&part.key.complete(&datadriven.shard_id))
1754 .await;
1755 match blob_batch {
1756 Ok(Some(_)) | Err(_) => {}
1757 Ok(None) => {
1760 s.push_str("<empty>\n");
1761 continue;
1762 }
1763 };
1764 }
1765 BatchPart::Inline { .. } => {}
1766 };
1767 let part = EncodedPart::fetch(
1768 &FetchConfig::from_persist_config(&datadriven.client.cfg),
1769 &datadriven.shard_id,
1770 datadriven.client.blob.as_ref(),
1771 datadriven.client.metrics.as_ref(),
1772 datadriven.machine.applier.shard_metrics.as_ref(),
1773 &datadriven.client.metrics.read.batch_fetcher,
1774 &batch.batch.desc,
1775 part,
1776 )
1777 .await
1778 .expect("invalid batch part");
1779 let part = part
1780 .normalize(&datadriven.client.metrics.columnar)
1781 .into_part::<String, ()>(&*SCHEMAS.key, &*SCHEMAS.val);
1782
1783 for ((k, _v), t, d) in part
1784 .decode_iter::<_, _, u64, i64>(&*SCHEMAS.key, &*SCHEMAS.val)
1785 .expect("valid schemas")
1786 {
1787 writeln!(s, "{k} {t} {d}");
1788 }
1789 }
1790 if !s.is_empty() {
1791 for (idx, (_meta, run)) in batch.batch.runs().enumerate() {
1792 write!(s, "<run {idx}>\n");
1793 for part in run {
1794 let part_idx = batch
1795 .batch
1796 .parts
1797 .iter()
1798 .position(|p| p == part)
1799 .expect("part should exist");
1800 write!(s, "part {part_idx}\n");
1801 }
1802 }
1803 }
1804 Ok(s)
1805 }
1806
1807 #[allow(clippy::unused_async)]
1808 pub async fn truncate_batch_desc(
1809 datadriven: &mut MachineState,
1810 args: DirectiveArgs<'_>,
1811 ) -> Result<String, anyhow::Error> {
1812 let input = args.expect_str("input");
1813 let output = args.expect_str("output");
1814 let lower = args.expect_antichain("lower");
1815 let upper = args.expect_antichain("upper");
1816
1817 let batch = datadriven
1818 .batches
1819 .get(input)
1820 .expect("unknown batch")
1821 .clone();
1822 let truncated_desc = Description::new(lower, upper, batch.batch.desc.since().clone());
1823 let () = validate_truncate_batch(&batch.batch, &truncated_desc, false, true)?;
1824 let mut new_hollow_batch = (*batch.batch).clone();
1825 new_hollow_batch.desc = truncated_desc;
1826 let new_batch = IdHollowBatch {
1827 batch: Arc::new(new_hollow_batch),
1828 id: batch.id,
1829 };
1830 datadriven
1831 .batches
1832 .insert(output.to_owned(), new_batch.clone());
1833 Ok(format!(
1834 "parts={} len={}\n",
1835 batch.batch.part_count(),
1836 batch.batch.len
1837 ))
1838 }
1839
1840 #[allow(clippy::unused_async)]
1841 pub async fn set_batch_parts_size(
1842 datadriven: &mut MachineState,
1843 args: DirectiveArgs<'_>,
1844 ) -> Result<String, anyhow::Error> {
1845 let input = args.expect_str("input");
1846 let size = args.expect("size");
1847 let batch = datadriven.batches.get_mut(input).expect("unknown batch");
1848 let mut hollow_batch = (*batch.batch).clone();
1849 for part in hollow_batch.parts.iter_mut() {
1850 match part {
1851 RunPart::Single(BatchPart::Hollow(x)) => x.encoded_size_bytes = size,
1852 _ => {
1853 panic!("set_batch_parts_size only supports hollow parts")
1854 }
1855 }
1856 }
1857 batch.batch = Arc::new(hollow_batch);
1858 Ok("ok\n".to_string())
1859 }
1860
1861 pub async fn compact(
1862 datadriven: &mut MachineState,
1863 args: DirectiveArgs<'_>,
1864 ) -> Result<String, anyhow::Error> {
1865 let output = args.expect_str("output");
1866 let lower = args.expect_antichain("lower");
1867 let upper = args.expect_antichain("upper");
1868 let since = args.expect_antichain("since");
1869 let target_size = args.optional("target_size");
1870 let memory_bound = args.optional("memory_bound");
1871
1872 let mut inputs = Vec::new();
1873 for input in args.args.get("inputs").expect("missing inputs") {
1874 inputs.push(
1875 datadriven
1876 .batches
1877 .get(input)
1878 .expect("unknown batch")
1879 .clone(),
1880 );
1881 }
1882
1883 let cfg = datadriven.client.cfg.clone();
1884 if let Some(target_size) = target_size {
1885 cfg.set_config(&BLOB_TARGET_SIZE, target_size);
1886 };
1887 if let Some(memory_bound) = memory_bound {
1888 cfg.set_config(&COMPACTION_MEMORY_BOUND_BYTES, memory_bound);
1889 }
1890 let req = CompactReq {
1891 shard_id: datadriven.shard_id,
1892 desc: Description::new(lower, upper, since),
1893 inputs: inputs.clone(),
1894 };
1895 datadriven
1896 .compactions
1897 .insert(output.to_owned(), req.clone());
1898 let spine_lower = inputs
1899 .first()
1900 .map_or_else(|| datadriven.next_id, |x| x.id.0);
1901 let spine_upper = inputs.last().map_or_else(
1902 || {
1903 datadriven.next_id += 1;
1904 datadriven.next_id
1905 },
1906 |x| x.id.1,
1907 );
1908 let new_spine_id = SpineId(spine_lower, spine_upper);
1909 let res = Compactor::<String, (), u64, i64>::compact(
1910 CompactConfig::new(&cfg, datadriven.shard_id),
1911 Arc::clone(&datadriven.client.blob),
1912 Arc::clone(&datadriven.client.metrics),
1913 Arc::clone(&datadriven.machine.applier.shard_metrics),
1914 Arc::clone(&datadriven.client.isolated_runtime),
1915 req,
1916 SCHEMAS.clone(),
1917 )
1918 .await?;
1919
1920 let batch = IdHollowBatch {
1921 batch: Arc::new(res.output.clone()),
1922 id: new_spine_id,
1923 };
1924
1925 datadriven.batches.insert(output.to_owned(), batch.clone());
1926 Ok(format!(
1927 "parts={} len={}\n",
1928 res.output.part_count(),
1929 res.output.len
1930 ))
1931 }
1932
1933 pub async fn clear_blob(
1934 datadriven: &MachineState,
1935 _args: DirectiveArgs<'_>,
1936 ) -> Result<String, anyhow::Error> {
1937 let mut to_delete = vec![];
1938 datadriven
1939 .client
1940 .blob
1941 .list_keys_and_metadata("", &mut |meta| {
1942 to_delete.push(meta.key.to_owned());
1943 })
1944 .await?;
1945 for blob in &to_delete {
1946 datadriven.client.blob.delete(blob).await?;
1947 }
1948 Ok(format!("deleted={}\n", to_delete.len()))
1949 }
1950
1951 pub async fn restore_blob(
1952 datadriven: &MachineState,
1953 _args: DirectiveArgs<'_>,
1954 ) -> Result<String, anyhow::Error> {
1955 let not_restored = crate::internal::restore::restore_blob(
1956 &datadriven.state_versions,
1957 datadriven.client.blob.as_ref(),
1958 &datadriven.client.cfg.build_version,
1959 datadriven.shard_id,
1960 &*datadriven.state_versions.metrics,
1961 )
1962 .await?;
1963 let mut out = String::new();
1964 for key in not_restored {
1965 writeln!(&mut out, "{key}");
1966 }
1967 Ok(out)
1968 }
1969
1970 #[allow(clippy::unused_async)]
1971 pub async fn rewrite_ts(
1972 datadriven: &mut MachineState,
1973 args: DirectiveArgs<'_>,
1974 ) -> Result<String, anyhow::Error> {
1975 let input = args.expect_str("input");
1976 let ts_rewrite = args.expect_antichain("frontier");
1977 let upper = args.expect_antichain("upper");
1978
1979 let batch = datadriven.batches.get_mut(input).expect("unknown batch");
1980 let mut hollow_batch = (*batch.batch).clone();
1981 let () = hollow_batch
1982 .rewrite_ts(&ts_rewrite, upper)
1983 .map_err(|err| anyhow!("invalid rewrite: {}", err))?;
1984 batch.batch = Arc::new(hollow_batch);
1985 Ok("ok\n".into())
1986 }
1987
1988 pub async fn gc(
1989 datadriven: &mut MachineState,
1990 args: DirectiveArgs<'_>,
1991 ) -> Result<String, anyhow::Error> {
1992 let new_seqno_since = args.expect("to_seqno");
1993
1994 let req = GcReq {
1995 shard_id: datadriven.shard_id,
1996 new_seqno_since,
1997 };
1998 let (maintenance, stats) =
1999 GarbageCollector::gc_and_truncate(&datadriven.machine, req).await;
2000 datadriven.routine.push(maintenance);
2001
2002 Ok(format!(
2003 "{} batch_parts={} rollups={} truncated={} state_rollups={}\n",
2004 datadriven.machine.seqno(),
2005 stats.batch_parts_deleted_from_blob,
2006 stats.rollups_deleted_from_blob,
2007 stats
2008 .truncated_consensus_to
2009 .iter()
2010 .map(|x| x.to_string())
2011 .collect::<Vec<_>>()
2012 .join(","),
2013 stats
2014 .rollups_removed_from_state
2015 .iter()
2016 .map(|x| x.to_string())
2017 .collect::<Vec<_>>()
2018 .join(","),
2019 ))
2020 }
2021
2022 pub async fn snapshot(
2023 datadriven: &MachineState,
2024 args: DirectiveArgs<'_>,
2025 ) -> Result<String, anyhow::Error> {
2026 let as_of = args.expect_antichain("as_of");
2027 let snapshot = datadriven
2028 .machine
2029 .unleased_snapshot(&as_of)
2030 .await
2031 .map_err(|err| anyhow!("{:?}", err))?;
2032
2033 let mut result = String::new();
2034
2035 for batch in snapshot {
2036 writeln!(
2037 result,
2038 "<batch {:?}-{:?}>",
2039 batch.desc.lower().elements(),
2040 batch.desc.upper().elements()
2041 );
2042 for (run, (_meta, parts)) in batch.runs().enumerate() {
2043 writeln!(result, "<run {run}>");
2044 let mut stream = pin!(
2045 futures::stream::iter(parts)
2046 .flat_map(|part| part.part_stream(
2047 datadriven.shard_id,
2048 &*datadriven.state_versions.blob,
2049 &*datadriven.state_versions.metrics
2050 ))
2051 .enumerate()
2052 );
2053
2054 while let Some((idx, part)) = stream.next().await {
2055 let part = &*part?;
2056 writeln!(result, "<part {idx}>");
2057
2058 let part = EncodedPart::fetch(
2059 &FetchConfig::from_persist_config(&datadriven.client.cfg),
2060 &datadriven.shard_id,
2061 datadriven.client.blob.as_ref(),
2062 datadriven.client.metrics.as_ref(),
2063 datadriven.machine.applier.shard_metrics.as_ref(),
2064 &datadriven.client.metrics.read.batch_fetcher,
2065 &batch.desc,
2066 part,
2067 )
2068 .await
2069 .expect("invalid batch part");
2070 let part = part
2071 .normalize(&datadriven.client.metrics.columnar)
2072 .into_part::<String, ()>(&*SCHEMAS.key, &*SCHEMAS.val);
2073
2074 let mut updates = Vec::new();
2075
2076 for ((k, _v), mut t, d) in part
2077 .decode_iter::<_, _, u64, i64>(&*SCHEMAS.key, &*SCHEMAS.val)
2078 .expect("valid schemas")
2079 {
2080 t.advance_by(as_of.borrow());
2081 updates.push((k, t, d));
2082 }
2083
2084 consolidate_updates(&mut updates);
2085
2086 for (k, t, d) in updates {
2087 writeln!(result, "{k} {t} {d}");
2088 }
2089 }
2090 }
2091 }
2092
2093 Ok(result)
2094 }
2095
2096 pub async fn register_listen(
2097 datadriven: &mut MachineState,
2098 args: DirectiveArgs<'_>,
2099 ) -> Result<String, anyhow::Error> {
2100 let output = args.expect_str("output");
2101 let as_of = args.expect_antichain("as_of");
2102 let read = datadriven
2103 .client
2104 .open_leased_reader::<String, (), u64, i64>(
2105 datadriven.shard_id,
2106 Arc::new(StringSchema),
2107 Arc::new(UnitSchema),
2108 Diagnostics::for_tests(),
2109 true,
2110 )
2111 .await
2112 .expect("invalid shard types");
2113 let listen = read
2114 .listen(as_of)
2115 .await
2116 .map_err(|err| anyhow!("{:?}", err))?;
2117 datadriven.listens.insert(output.to_owned(), listen);
2118 Ok("ok\n".into())
2119 }
2120
2121 pub async fn listen_through(
2122 datadriven: &mut MachineState,
2123 args: DirectiveArgs<'_>,
2124 ) -> Result<String, anyhow::Error> {
2125 let input = args.expect_str("input");
2126 let frontier = args.expect("frontier");
2129 let listen = datadriven.listens.get_mut(input).expect("unknown listener");
2130 let mut s = String::new();
2131 loop {
2132 for event in listen.fetch_next().await {
2133 match event {
2134 ListenEvent::Updates(x) => {
2135 for ((k, _v), t, d) in x.iter() {
2136 write!(s, "{} {} {}\n", k, t, d);
2137 }
2138 }
2139 ListenEvent::Progress(x) => {
2140 if !x.less_than(&frontier) {
2141 return Ok(s);
2142 }
2143 }
2144 }
2145 }
2146 }
2147 }
2148
2149 pub async fn register_critical_reader(
2150 datadriven: &mut MachineState,
2151 args: DirectiveArgs<'_>,
2152 ) -> Result<String, anyhow::Error> {
2153 let reader_id = args.expect("reader_id");
2154 let (state, maintenance) = datadriven
2155 .machine
2156 .register_critical_reader(&reader_id, Opaque::encode(&0u64), "tests")
2157 .await;
2158 datadriven.routine.push(maintenance);
2159 Ok(format!(
2160 "{} {:?}\n",
2161 datadriven.machine.seqno(),
2162 state.since.elements(),
2163 ))
2164 }
2165
2166 pub async fn register_leased_reader(
2167 datadriven: &mut MachineState,
2168 args: DirectiveArgs<'_>,
2169 ) -> Result<String, anyhow::Error> {
2170 let reader_id = args.expect("reader_id");
2171 let (reader_state, maintenance) = datadriven
2172 .machine
2173 .register_leased_reader(
2174 &reader_id,
2175 "tests",
2176 READER_LEASE_DURATION.get(&datadriven.client.cfg),
2177 (datadriven.client.cfg.now)(),
2178 false,
2179 )
2180 .await;
2181 datadriven.routine.push(maintenance);
2182 Ok(format!(
2183 "{} {:?}\n",
2184 datadriven.machine.seqno(),
2185 reader_state.since.elements(),
2186 ))
2187 }
2188
2189 pub async fn expire_critical_reader(
2190 datadriven: &mut MachineState,
2191 args: DirectiveArgs<'_>,
2192 ) -> Result<String, anyhow::Error> {
2193 let reader_id = args.expect("reader_id");
2194 let (_, maintenance) = datadriven.machine.expire_critical_reader(&reader_id).await;
2195 datadriven.routine.push(maintenance);
2196 Ok(format!("{} ok\n", datadriven.machine.seqno()))
2197 }
2198
2199 pub async fn expire_leased_reader(
2200 datadriven: &mut MachineState,
2201 args: DirectiveArgs<'_>,
2202 ) -> Result<String, anyhow::Error> {
2203 let reader_id = args.expect("reader_id");
2204 let (_, maintenance) = datadriven.machine.expire_leased_reader(&reader_id).await;
2205 datadriven.routine.push(maintenance);
2206 Ok(format!("{} ok\n", datadriven.machine.seqno()))
2207 }
2208
2209 pub async fn compare_and_append_batches(
2210 datadriven: &MachineState,
2211 args: DirectiveArgs<'_>,
2212 ) -> Result<String, anyhow::Error> {
2213 let expected_upper = args.expect_antichain("expected_upper");
2214 let new_upper = args.expect_antichain("new_upper");
2215
2216 let mut batches: Vec<Batch<String, (), u64, i64>> = args
2217 .args
2218 .get("batches")
2219 .expect("missing batches")
2220 .into_iter()
2221 .map(|batch| {
2222 let hollow = (*datadriven
2223 .batches
2224 .get(batch)
2225 .expect("unknown batch")
2226 .clone()
2227 .batch)
2228 .clone();
2229 datadriven.to_batch(hollow)
2230 })
2231 .collect();
2232
2233 let mut writer = datadriven
2234 .client
2235 .open_writer(
2236 datadriven.shard_id,
2237 Arc::new(StringSchema),
2238 Arc::new(UnitSchema),
2239 Diagnostics::for_tests(),
2240 )
2241 .await?;
2242
2243 let mut batch_refs: Vec<_> = batches.iter_mut().collect();
2244
2245 let () = writer
2246 .compare_and_append_batch(batch_refs.as_mut_slice(), expected_upper, new_upper, true)
2247 .await?
2248 .map_err(|err| anyhow!("upper mismatch: {:?}", err))?;
2249
2250 writer.expire().await;
2251
2252 Ok("ok\n".into())
2253 }
2254
2255 pub async fn expire_writer(
2256 datadriven: &mut MachineState,
2257 args: DirectiveArgs<'_>,
2258 ) -> Result<String, anyhow::Error> {
2259 let writer_id = args.expect("writer_id");
2260 let (_, maintenance) = datadriven.machine.expire_writer(&writer_id).await;
2261 datadriven.routine.push(maintenance);
2262 Ok(format!("{} ok\n", datadriven.machine.seqno()))
2263 }
2264
2265 pub(crate) async fn finalize(
2266 datadriven: &mut MachineState,
2267 _args: DirectiveArgs<'_>,
2268 ) -> anyhow::Result<String> {
2269 let maintenance = datadriven.machine.become_tombstone().await?;
2270 datadriven.routine.push(maintenance);
2271 Ok(format!("{} ok\n", datadriven.machine.seqno()))
2272 }
2273
2274 pub(crate) fn is_finalized(
2275 datadriven: &MachineState,
2276 _args: DirectiveArgs<'_>,
2277 ) -> anyhow::Result<String> {
2278 let seqno = datadriven.machine.seqno();
2279 let tombstone = datadriven.machine.is_finalized();
2280 Ok(format!("{seqno} {tombstone}\n"))
2281 }
2282
2283 pub async fn compare_and_append(
2284 datadriven: &mut MachineState,
2285 args: DirectiveArgs<'_>,
2286 ) -> Result<String, anyhow::Error> {
2287 let input = args.expect_str("input");
2288 let writer_id = args.expect("writer_id");
2289 let mut batch = datadriven
2290 .batches
2291 .get(input)
2292 .expect("unknown batch")
2293 .clone();
2294 let token = args.optional("token").unwrap_or_else(IdempotencyToken::new);
2295 let now = (datadriven.client.cfg.now)();
2296
2297 let (id, maintenance) = datadriven
2298 .machine
2299 .register_schema(&*SCHEMAS.key, &*SCHEMAS.val)
2300 .await;
2301 assert_eq!(id, SCHEMAS.id);
2302 datadriven.routine.push(maintenance);
2303 let maintenance = loop {
2304 let indeterminate = args
2305 .optional::<String>("prev_indeterminate")
2306 .map(|x| Indeterminate::new(anyhow::Error::msg(x)));
2307 let res = datadriven
2308 .machine
2309 .compare_and_append_idempotent(
2310 &batch.batch,
2311 &writer_id,
2312 now,
2313 &token,
2314 &HandleDebugState::default(),
2315 indeterminate,
2316 )
2317 .await;
2318 match res {
2319 CompareAndAppendRes::Success(_, x) => break x,
2320 CompareAndAppendRes::UpperMismatch(_seqno, upper) => {
2321 return Err(anyhow!("{:?}", Upper(upper)));
2322 }
2323 CompareAndAppendRes::InlineBackpressure => {
2324 let hollow_batch = (*batch.batch).clone();
2325 let mut b = datadriven.to_batch(hollow_batch);
2326 let cfg = BatchBuilderConfig::new(&datadriven.client.cfg, datadriven.shard_id);
2327 b.flush_to_blob(
2328 &cfg,
2329 &datadriven.client.metrics.user,
2330 &datadriven.client.isolated_runtime,
2331 &*SCHEMAS,
2332 )
2333 .await;
2334 batch.batch = Arc::new(b.into_hollow_batch());
2335 continue;
2336 }
2337 CompareAndAppendRes::InvalidUsage(_) => panic!("{:?}", res),
2338 };
2339 };
2340 datadriven.routine.push(maintenance.routine);
2343 Ok(format!(
2344 "{} {:?}\n",
2345 datadriven.machine.seqno(),
2346 datadriven.machine.applier.clone_upper().elements(),
2347 ))
2348 }
2349
2350 pub async fn apply_merge_res(
2351 datadriven: &mut MachineState,
2352 args: DirectiveArgs<'_>,
2353 ) -> Result<String, anyhow::Error> {
2354 let input = args.expect_str("input");
2355 let batch = datadriven
2356 .batches
2357 .get(input)
2358 .expect("unknown batch")
2359 .clone();
2360 let compact_req = datadriven
2361 .compactions
2362 .get(input)
2363 .expect("unknown compact req")
2364 .clone();
2365 let input_batches = compact_req
2366 .inputs
2367 .iter()
2368 .map(|x| x.id)
2369 .collect::<BTreeSet<_>>();
2370 let lower_spine_bound = input_batches
2371 .first()
2372 .map(|id| id.0)
2373 .expect("at least one batch must be present");
2374 let upper_spine_bound = input_batches
2375 .last()
2376 .map(|id| id.1)
2377 .expect("at least one batch must be present");
2378 let id = SpineId(lower_spine_bound, upper_spine_bound);
2379 let hollow_batch = (*batch.batch).clone();
2380
2381 let (merge_res, maintenance) = datadriven
2382 .machine
2383 .merge_res(&FueledMergeRes {
2384 output: hollow_batch,
2385 input: CompactionInput::IdRange(id),
2386 new_active_compaction: None,
2387 })
2388 .await;
2389 datadriven.routine.push(maintenance);
2390 Ok(format!(
2391 "{} {}\n",
2392 datadriven.machine.seqno(),
2393 merge_res.applied()
2394 ))
2395 }
2396
2397 pub async fn perform_maintenance(
2398 datadriven: &mut MachineState,
2399 _args: DirectiveArgs<'_>,
2400 ) -> Result<String, anyhow::Error> {
2401 let mut s = String::new();
2402 for maintenance in datadriven.routine.drain(..) {
2403 let () = maintenance
2404 .perform(&datadriven.machine, &datadriven.gc)
2405 .await;
2406 let () = datadriven
2407 .machine
2408 .applier
2409 .fetch_and_update_state(None)
2410 .await;
2411 write!(s, "{} ok\n", datadriven.machine.seqno());
2412 }
2413 Ok(s)
2414 }
2415}
2416
2417#[cfg(test)]
2418pub mod tests {
2419 use std::sync::Arc;
2420
2421 use mz_dyncfg::ConfigUpdates;
2422 use mz_ore::cast::CastFrom;
2423 use mz_ore::task::spawn;
2424 use mz_persist::intercept::{InterceptBlob, InterceptHandle};
2425 use mz_persist::location::SeqNo;
2426 use mz_persist_types::PersistLocation;
2427 use semver::Version;
2428 use timely::progress::Antichain;
2429
2430 use crate::batch::BatchBuilderConfig;
2431 use crate::cache::StateCache;
2432 use crate::internal::gc::{GarbageCollector, GcReq};
2433 use crate::internal::state::{HandleDebugState, ROLLUP_THRESHOLD};
2434 use crate::tests::{new_test_client, new_test_client_cache};
2435 use crate::{Diagnostics, PersistClient, ShardId};
2436
2437 #[mz_persist_proc::test(tokio::test(flavor = "multi_thread"))]
2438 #[cfg_attr(miri, ignore)] async fn apply_unbatched_cmd_truncate(dyncfgs: ConfigUpdates) {
2440 mz_ore::test::init_logging();
2441
2442 let client = new_test_client(&dyncfgs).await;
2443 client.cfg.set_config(&ROLLUP_THRESHOLD, 5);
2445 let (mut write, read) = client
2446 .expect_open::<String, (), u64, i64>(ShardId::new())
2447 .await;
2448
2449 read.expire().await;
2451
2452 const NUM_BATCHES: u64 = 100;
2455 for idx in 0..NUM_BATCHES {
2456 let mut batch = write
2457 .expect_batch(&[((idx.to_string(), ()), idx, 1)], idx, idx + 1)
2458 .await;
2459 let cfg = BatchBuilderConfig::new(&client.cfg, write.shard_id());
2462 batch
2463 .flush_to_blob(
2464 &cfg,
2465 &client.metrics.user,
2466 &client.isolated_runtime,
2467 &write.write_schemas,
2468 )
2469 .await;
2470 let (_, writer_maintenance) = write
2471 .machine
2472 .compare_and_append(
2473 &batch.into_hollow_batch(),
2474 &write.writer_id,
2475 &HandleDebugState::default(),
2476 (write.cfg.now)(),
2477 )
2478 .await
2479 .unwrap();
2480 writer_maintenance
2481 .perform(&write.machine, &write.gc, write.compact.as_ref())
2482 .await;
2483 }
2484 let live_diffs = write
2485 .machine
2486 .applier
2487 .state_versions
2488 .fetch_all_live_diffs(&write.machine.shard_id())
2489 .await;
2490 assert!(live_diffs.len() > 0);
2492 let max_live_diffs = 2 * usize::cast_from(NUM_BATCHES.next_power_of_two().trailing_zeros());
2496 assert!(
2497 live_diffs.len() <= max_live_diffs,
2498 "{} vs {}",
2499 live_diffs.len(),
2500 max_live_diffs
2501 );
2502 }
2503
2504 #[mz_persist_proc::test(tokio::test(flavor = "multi_thread"))]
2508 #[cfg_attr(miri, ignore)] async fn regression_gc_skipped_req_and_interrupted(dyncfgs: ConfigUpdates) {
2510 let mut client = new_test_client(&dyncfgs).await;
2511 let intercept = InterceptHandle::default();
2512 client.blob = Arc::new(InterceptBlob::new(
2513 Arc::clone(&client.blob),
2514 intercept.clone(),
2515 ));
2516 let (_, mut read) = client
2517 .expect_open::<String, String, u64, i64>(ShardId::new())
2518 .await;
2519
2520 read.downgrade_since(&Antichain::from_elem(1)).await;
2522 let new_seqno_since = read.machine.applier.seqno_since();
2523
2524 assert!(new_seqno_since > SeqNo::minimum());
2529 intercept.set_post_delete(Some(Arc::new(|_, _| panic!("boom"))));
2530 let machine = read.machine.clone();
2531 let gc = spawn(|| "", async move {
2533 let req = GcReq {
2534 shard_id: machine.shard_id(),
2535 new_seqno_since,
2536 };
2537 GarbageCollector::gc_and_truncate(&machine, req).await
2538 });
2539 let _ = gc.await;
2542
2543 intercept.set_post_delete(None);
2546 let req = GcReq {
2547 shard_id: read.machine.shard_id(),
2548 new_seqno_since,
2549 };
2550 let _ = GarbageCollector::gc_and_truncate(&read.machine, req.clone()).await;
2551 }
2552
2553 #[mz_persist_proc::test(tokio::test(flavor = "multi_thread"))]
2558 #[cfg_attr(miri, ignore)] async fn regression_update_state_after_upper_mismatch(dyncfgs: ConfigUpdates) {
2560 let client = new_test_client(&dyncfgs).await;
2561 let mut client2 = client.clone();
2562
2563 let new_state_cache = Arc::new(StateCache::new_no_metrics());
2566 client2.shared_states = new_state_cache;
2567
2568 let shard_id = ShardId::new();
2569 let (mut write1, _) = client.expect_open::<String, (), u64, i64>(shard_id).await;
2570 let (mut write2, _) = client2.expect_open::<String, (), u64, i64>(shard_id).await;
2571
2572 let data = [
2573 (("1".to_owned(), ()), 1, 1),
2574 (("2".to_owned(), ()), 2, 1),
2575 (("3".to_owned(), ()), 3, 1),
2576 (("4".to_owned(), ()), 4, 1),
2577 (("5".to_owned(), ()), 5, 1),
2578 ];
2579
2580 write1.expect_compare_and_append(&data[..1], 0, 2).await;
2581
2582 write2.expect_compare_and_append(&data[1..2], 2, 3).await;
2585 }
2586
2587 #[mz_persist_proc::test(tokio::test(flavor = "multi_thread"))]
2588 #[cfg_attr(miri, ignore)]
2589 async fn version_upgrade(dyncfgs: ConfigUpdates) {
2590 let mut cache = new_test_client_cache(&dyncfgs);
2591 cache.cfg.build_version = Version::new(26, 1, 0);
2592 let shard_id = ShardId::new();
2593
2594 async fn fetch_catalog_upgrade_shard_version(
2595 persist_client: &PersistClient,
2596 upgrade_shard_id: ShardId,
2597 ) -> Option<semver::Version> {
2598 let shard_state = persist_client
2599 .inspect_shard::<u64>(&upgrade_shard_id)
2600 .await
2601 .ok()?;
2602 let json_state = serde_json::to_value(shard_state).expect("state serialization error");
2603 let upgrade_version = json_state
2604 .get("applier_version")
2605 .cloned()
2606 .expect("missing applier_version");
2607 let upgrade_version =
2608 serde_json::from_value(upgrade_version).expect("version deserialization error");
2609 Some(upgrade_version)
2610 }
2611
2612 cache.cfg.build_version = Version::new(26, 1, 0);
2613 let client = cache.open(PersistLocation::new_in_mem()).await.unwrap();
2614 let (_, mut reader) = client.expect_open::<String, (), u64, i64>(shard_id).await;
2615 reader.downgrade_since(&Antichain::from_elem(1)).await;
2616 assert_eq!(
2617 fetch_catalog_upgrade_shard_version(&client, shard_id).await,
2618 Some(Version::new(26, 1, 0)),
2619 );
2620
2621 cache.cfg.build_version = Version::new(27, 1, 0);
2623 let client = cache.open(PersistLocation::new_in_mem()).await.unwrap();
2624 let (_, mut reader) = client.expect_open::<String, (), u64, i64>(shard_id).await;
2625 reader.downgrade_since(&Antichain::from_elem(2)).await;
2626 assert_eq!(
2627 fetch_catalog_upgrade_shard_version(&client, shard_id).await,
2628 Some(Version::new(26, 1, 0)),
2629 );
2630
2631 client
2633 .upgrade_version::<String, (), u64, i64>(shard_id, Diagnostics::for_tests())
2634 .await
2635 .unwrap();
2636 assert_eq!(
2637 fetch_catalog_upgrade_shard_version(&client, shard_id).await,
2638 Some(Version::new(27, 1, 0)),
2639 );
2640 }
2641}