1use std::fmt::Debug;
13use std::ops::ControlFlow::{self, Break, Continue};
14use std::sync::Arc;
15use std::time::{Duration, Instant, SystemTime};
16
17use differential_dataflow::difference::Monoid;
18use differential_dataflow::lattice::Lattice;
19use futures::FutureExt;
20use futures::future::{self, BoxFuture};
21use mz_dyncfg::{Config, ConfigSet};
22use mz_ore::cast::CastFrom;
23use mz_ore::error::ErrorExt;
24#[allow(unused_imports)] use mz_ore::fmt::FormatBuffer;
26use mz_ore::{assert_none, soft_assert_no_log};
27use mz_persist::location::{ExternalError, Indeterminate, SeqNo};
28use mz_persist::retry::Retry;
29use mz_persist_types::schema::SchemaId;
30use mz_persist_types::{Codec, Codec64};
31use semver::Version;
32use timely::PartialOrder;
33use timely::progress::{Antichain, Timestamp};
34use tracing::{Instrument, debug, info, trace_span};
35
36use crate::async_runtime::IsolatedRuntime;
37use crate::batch::INLINE_WRITES_TOTAL_MAX_BYTES;
38use crate::cache::StateCache;
39use crate::cfg::RetryParameters;
40use crate::critical::{CriticalReaderId, Opaque};
41use crate::error::{CodecMismatch, InvalidUsage};
42use crate::internal::apply::Applier;
43use crate::internal::compact::CompactReq;
44use crate::internal::maintenance::{RoutineMaintenance, WriterMaintenance};
45use crate::internal::metrics::{CmdMetrics, Metrics, MetricsRetryStream, RetryMetrics};
46use crate::internal::paths::PartialRollupKey;
47use crate::internal::state::{
48 CompareAndAppendBreak, CriticalReaderState, HandleDebugState, HollowBatch, HollowRollup,
49 IdempotencyToken, LeasedReaderState, NoOpStateTransition, Since, SnapshotErr, StateCollections,
50 Upper,
51};
52use crate::internal::state_versions::StateVersions;
53use crate::internal::trace::{ApplyMergeResult, FueledMergeRes};
54use crate::internal::watch::StateWatch;
55use crate::read::LeasedReaderId;
56use crate::rpc::PubSubSender;
57use crate::schema::CaESchema;
58use crate::write::WriterId;
59use crate::{Diagnostics, PersistConfig, ShardId};
60
61#[derive(Debug)]
62pub struct Machine<K, V, T, D> {
63 pub(crate) applier: Applier<K, V, T, D>,
64 pub(crate) isolated_runtime: Arc<IsolatedRuntime>,
65}
66
67impl<K, V, T: Clone, D> Clone for Machine<K, V, T, D> {
69 fn clone(&self) -> Self {
70 Self {
71 applier: self.applier.clone(),
72 isolated_runtime: Arc::clone(&self.isolated_runtime),
73 }
74 }
75}
76
77pub(crate) const CLAIM_UNCLAIMED_COMPACTIONS: Config<bool> = Config::new(
78 "persist_claim_unclaimed_compactions",
79 false,
80 "If an append doesn't result in a compaction request, but there is some uncompacted batch \
81 in state, compact that instead.",
82);
83
84pub(crate) const CLAIM_COMPACTION_PERCENT: Config<usize> = Config::new(
85 "persist_claim_compaction_percent",
86 100,
87 "Claim a compaction with the given percent chance, if claiming compactions is enabled. \
88 (If over 100, we'll always claim at least one; for example, if set to 365, we'll claim at least \
89 three and have a 65% chance of claiming a fourth.)",
90);
91
92pub(crate) const CLAIM_COMPACTION_MIN_VERSION: Config<String> = Config::new(
93 "persist_claim_compaction_min_version",
94 String::new(),
95 "If set to a valid version string, compact away any earlier versions if possible.",
96);
97
98impl<K, V, T, D> Machine<K, V, T, D>
99where
100 K: Debug + Codec,
101 V: Debug + Codec,
102 T: Timestamp + Lattice + Codec64 + Sync,
103 D: Monoid + Codec64,
104{
105 pub async fn new(
106 cfg: PersistConfig,
107 shard_id: ShardId,
108 metrics: Arc<Metrics>,
109 state_versions: Arc<StateVersions>,
110 shared_states: Arc<StateCache>,
111 pubsub_sender: Arc<dyn PubSubSender>,
112 isolated_runtime: Arc<IsolatedRuntime>,
113 diagnostics: Diagnostics,
114 ) -> Result<Self, Box<CodecMismatch>> {
115 let applier = Applier::new(
116 cfg,
117 shard_id,
118 metrics,
119 state_versions,
120 shared_states,
121 pubsub_sender,
122 diagnostics,
123 )
124 .await?;
125 Ok(Machine {
126 applier,
127 isolated_runtime,
128 })
129 }
130
131 pub fn shard_id(&self) -> ShardId {
132 self.applier.shard_id
133 }
134
135 pub fn seqno(&self) -> SeqNo {
136 self.applier.seqno()
137 }
138
139 pub async fn add_rollup_for_current_seqno(&self) -> RoutineMaintenance {
140 let rollup = self.applier.write_rollup_for_state().await;
141 let Some(rollup) = rollup else {
142 return RoutineMaintenance::default();
143 };
144
145 let (applied, maintenance) = self.add_rollup((rollup.seqno, &rollup.to_hollow())).await;
146 if !applied {
147 self.applier
150 .state_versions
151 .delete_rollup(&rollup.shard_id, &rollup.key)
152 .await;
153 }
154 maintenance
155 }
156
157 pub async fn add_rollup(
158 &self,
159 add_rollup: (SeqNo, &HollowRollup),
160 ) -> (bool, RoutineMaintenance) {
161 let mut applied_ever_true = false;
164 let metrics = Arc::clone(&self.applier.metrics);
165 let (_seqno, _applied, maintenance) = self
166 .apply_unbatched_idempotent_cmd(&metrics.cmds.add_rollup, |_, _, state| {
167 let ret = state.add_rollup(add_rollup);
168 if let Continue(applied) = ret {
169 applied_ever_true = applied_ever_true || applied;
170 }
171 ret
172 })
173 .await;
174 (applied_ever_true, maintenance)
175 }
176
177 pub async fn remove_rollups(
178 &self,
179 remove_rollups: &[(SeqNo, PartialRollupKey)],
180 ) -> (Vec<SeqNo>, RoutineMaintenance) {
181 let metrics = Arc::clone(&self.applier.metrics);
182 let (_seqno, removed_rollup_seqnos, maintenance) = self
183 .apply_unbatched_idempotent_cmd(&metrics.cmds.remove_rollups, |_, _, state| {
184 state.remove_rollups(remove_rollups)
185 })
186 .await;
187 (removed_rollup_seqnos, maintenance)
188 }
189
190 pub async fn upgrade_version(&self) -> Result<RoutineMaintenance, Version> {
193 let metrics = Arc::clone(&self.applier.metrics);
194 let (_seqno, upgrade_result, maintenance) = self
195 .apply_unbatched_idempotent_cmd(&metrics.cmds.remove_rollups, |_, cfg, state| {
196 if state.version <= cfg.build_version {
197 state.version = cfg.build_version.clone();
200 Continue(Ok(()))
201 } else {
202 Break(NoOpStateTransition(Err(state.version.clone())))
203 }
204 })
205 .await;
206
207 match upgrade_result {
208 Ok(()) => Ok(maintenance),
209 Err(version) => {
210 soft_assert_no_log!(
211 maintenance.is_empty(),
212 "should not generate maintenance on failed upgrade"
213 );
214 Err(version)
215 }
216 }
217 }
218
219 pub async fn register_leased_reader(
220 &self,
221 reader_id: &LeasedReaderId,
222 purpose: &str,
223 lease_duration: Duration,
224 heartbeat_timestamp_ms: u64,
225 use_critical_since: bool,
226 ) -> (LeasedReaderState<T>, RoutineMaintenance) {
227 let metrics = Arc::clone(&self.applier.metrics);
228 let (_seqno, (reader_state, seqno_since), maintenance) = self
229 .apply_unbatched_idempotent_cmd(&metrics.cmds.register, |seqno, cfg, state| {
230 state.register_leased_reader(
231 &cfg.hostname,
232 reader_id,
233 purpose,
234 seqno,
235 lease_duration,
236 heartbeat_timestamp_ms,
237 use_critical_since,
238 )
239 })
240 .await;
241 debug_assert!(
250 reader_state.seqno >= seqno_since,
251 "{} vs {}",
252 reader_state.seqno,
253 seqno_since,
254 );
255 (reader_state, maintenance)
256 }
257
258 pub async fn register_critical_reader(
259 &self,
260 reader_id: &CriticalReaderId,
261 default_opaque: Opaque,
262 purpose: &str,
263 ) -> (CriticalReaderState<T>, RoutineMaintenance) {
264 let metrics = Arc::clone(&self.applier.metrics);
265 let (_seqno, state, maintenance) = self
266 .apply_unbatched_idempotent_cmd(&metrics.cmds.register, |_seqno, cfg, state| {
267 state.register_critical_reader(
268 &cfg.hostname,
269 reader_id,
270 default_opaque.clone(),
271 purpose,
272 )
273 })
274 .await;
275 (state, maintenance)
276 }
277
278 pub async fn register_schema(
279 &self,
280 key_schema: &K::Schema,
281 val_schema: &V::Schema,
282 ) -> (Option<SchemaId>, RoutineMaintenance) {
283 let metrics = Arc::clone(&self.applier.metrics);
284 let (_seqno, state, maintenance) = self
285 .apply_unbatched_idempotent_cmd(&metrics.cmds.register, |_seqno, _cfg, state| {
286 state.register_schema::<K, V>(key_schema, val_schema)
287 })
288 .await;
289 (state, maintenance)
290 }
291
292 pub async fn spine_exert(&self, fuel: usize) -> (Vec<CompactReq<T>>, RoutineMaintenance) {
293 if fuel == 0 || self.applier.all_batches().len() < 2 {
295 return (Vec::new(), RoutineMaintenance::default());
296 }
297
298 let metrics = Arc::clone(&self.applier.metrics);
299 let (_seqno, reqs, maintenance) = self
300 .apply_unbatched_idempotent_cmd(&metrics.cmds.spine_exert, |_seqno, _cfg, state| {
301 state.spine_exert(fuel)
302 })
303 .await;
304 let reqs = reqs
305 .into_iter()
306 .map(|req| CompactReq {
307 shard_id: self.shard_id(),
308 desc: req.desc,
309 inputs: req.inputs,
310 })
311 .collect();
312 (reqs, maintenance)
313 }
314
315 pub async fn compare_and_append(
316 &self,
317 batch: &HollowBatch<T>,
318 writer_id: &WriterId,
319 debug_info: &HandleDebugState,
320 heartbeat_timestamp_ms: u64,
321 ) -> CompareAndAppendRes<T> {
322 let idempotency_token = IdempotencyToken::new();
323 loop {
324 let res = self
325 .compare_and_append_idempotent(
326 batch,
327 writer_id,
328 heartbeat_timestamp_ms,
329 &idempotency_token,
330 debug_info,
331 None,
332 )
333 .await;
334 match res {
335 CompareAndAppendRes::Success(seqno, maintenance) => {
336 return CompareAndAppendRes::Success(seqno, maintenance);
337 }
338 CompareAndAppendRes::InvalidUsage(x) => {
339 return CompareAndAppendRes::InvalidUsage(x);
340 }
341 CompareAndAppendRes::InlineBackpressure => {
342 return CompareAndAppendRes::InlineBackpressure;
343 }
344 CompareAndAppendRes::UpperMismatch(seqno, _current_upper) => {
345 self.applier.fetch_and_update_state(Some(seqno)).await;
352 let (current_seqno, current_upper) =
353 self.applier.upper(|seqno, upper| (seqno, upper.clone()));
354
355 if ¤t_upper != batch.desc.lower() {
358 return CompareAndAppendRes::UpperMismatch(current_seqno, current_upper);
359 } else {
360 }
363 }
364 }
365 }
366 }
367
368 async fn compare_and_append_idempotent(
369 &self,
370 batch: &HollowBatch<T>,
371 writer_id: &WriterId,
372 heartbeat_timestamp_ms: u64,
373 idempotency_token: &IdempotencyToken,
374 debug_info: &HandleDebugState,
375 mut indeterminate: Option<Indeterminate>,
379 ) -> CompareAndAppendRes<T> {
380 let metrics = Arc::clone(&self.applier.metrics);
381 let lease_duration_ms = self
382 .applier
383 .cfg
384 .writer_lease_duration
385 .as_millis()
386 .try_into()
387 .expect("reasonable duration");
388 let mut retry = self
471 .applier
472 .metrics
473 .retries
474 .compare_and_append_idempotent
475 .stream(Retry::persist_defaults(SystemTime::now()).into_retry_stream());
476 let mut writer_was_present = false;
477 loop {
478 let cmd_res = self
479 .applier
480 .apply_unbatched_cmd(&metrics.cmds.compare_and_append, |_, cfg, state| {
481 writer_was_present = state.writers.contains_key(writer_id);
482 state.compare_and_append(
483 batch,
484 writer_id,
485 heartbeat_timestamp_ms,
486 lease_duration_ms,
487 idempotency_token,
488 debug_info,
489 INLINE_WRITES_TOTAL_MAX_BYTES.get(cfg),
490 if CLAIM_UNCLAIMED_COMPACTIONS.get(cfg) {
491 CLAIM_COMPACTION_PERCENT.get(cfg)
492 } else {
493 0
494 },
495 Version::parse(&CLAIM_COMPACTION_MIN_VERSION.get(cfg))
496 .ok()
497 .as_ref(),
498 )
499 })
500 .await;
501 let (seqno, res, routine) = match cmd_res {
502 Ok(x) => x,
503 Err(err) => {
504 info!(
507 "compare_and_append received an indeterminate error, retrying in {:?}: {}",
508 retry.next_sleep(),
509 err
510 );
511 if indeterminate.is_none() {
512 indeterminate = Some(err);
513 }
514 retry = retry.sleep().await;
515 continue;
516 }
517 };
518 match res {
519 Ok(merge_reqs) => {
520 let mut compact_reqs = Vec::with_capacity(merge_reqs.len());
523 for req in merge_reqs {
524 let req = CompactReq {
525 shard_id: self.shard_id(),
526 desc: req.desc,
527 inputs: req.inputs,
528 };
529 compact_reqs.push(req);
530 }
531 let writer_maintenance = WriterMaintenance {
532 routine,
533 compaction: compact_reqs,
534 };
535
536 if !writer_was_present {
537 metrics.state.writer_added.inc();
538 }
539 for part in &batch.parts {
540 if part.is_inline() {
541 let bytes = u64::cast_from(part.inline_bytes());
542 metrics.inline.part_commit_bytes.inc_by(bytes);
543 metrics.inline.part_commit_count.inc();
544 }
545 }
546 return CompareAndAppendRes::Success(seqno, writer_maintenance);
547 }
548 Err(CompareAndAppendBreak::AlreadyCommitted) => {
549 assert!(indeterminate.is_some());
553 self.applier.metrics.cmds.compare_and_append_noop.inc();
554 if !writer_was_present {
555 metrics.state.writer_added.inc();
556 }
557 return CompareAndAppendRes::Success(seqno, WriterMaintenance::default());
558 }
559 Err(CompareAndAppendBreak::InvalidUsage(err)) => {
560 assert_none!(indeterminate);
565 return CompareAndAppendRes::InvalidUsage(err);
566 }
567 Err(CompareAndAppendBreak::InlineBackpressure) => {
568 return CompareAndAppendRes::InlineBackpressure;
571 }
572 Err(CompareAndAppendBreak::Upper {
573 shard_upper,
574 writer_upper,
575 }) => {
576 assert!(
581 PartialOrder::less_equal(&writer_upper, &shard_upper),
582 "{:?} vs {:?}",
583 &writer_upper,
584 &shard_upper
585 );
586 if PartialOrder::less_than(&writer_upper, batch.desc.upper()) {
587 return CompareAndAppendRes::UpperMismatch(seqno, shard_upper);
591 }
592 if indeterminate.is_none() {
593 return CompareAndAppendRes::UpperMismatch(seqno, shard_upper);
598 }
599 panic!(
609 concat!(
610 "cannot distinguish compare_and_append success or failure ",
611 "caa_lower={:?} caa_upper={:?} writer_upper={:?} shard_upper={:?} err={:?}"
612 ),
613 batch.desc.lower().elements(),
614 batch.desc.upper().elements(),
615 writer_upper.elements(),
616 shard_upper.elements(),
617 indeterminate,
618 );
619 }
620 };
621 }
622 }
623
624 pub async fn downgrade_since(
625 &self,
626 reader_id: &LeasedReaderId,
627 outstanding_seqno: SeqNo,
628 new_since: &Antichain<T>,
629 heartbeat_timestamp_ms: u64,
630 ) -> (SeqNo, Since<T>, RoutineMaintenance) {
631 let metrics = Arc::clone(&self.applier.metrics);
632 self.apply_unbatched_idempotent_cmd(&metrics.cmds.downgrade_since, |seqno, _cfg, state| {
633 state.downgrade_since(
634 reader_id,
635 seqno,
636 outstanding_seqno,
637 new_since,
638 heartbeat_timestamp_ms,
639 )
640 })
641 .await
642 }
643
644 pub async fn compare_and_downgrade_since(
645 &self,
646 reader_id: &CriticalReaderId,
647 expected_opaque: &Opaque,
648 (new_opaque, new_since): (&Opaque, &Antichain<T>),
649 ) -> (Result<Since<T>, (Opaque, Since<T>)>, RoutineMaintenance) {
650 let metrics = Arc::clone(&self.applier.metrics);
651 let (_seqno, res, maintenance) = self
652 .apply_unbatched_idempotent_cmd(
653 &metrics.cmds.compare_and_downgrade_since,
654 |_seqno, _cfg, state| {
655 state.compare_and_downgrade_since(
656 reader_id,
657 expected_opaque,
658 (new_opaque, new_since),
659 )
660 },
661 )
662 .await;
663
664 match res {
665 Ok(since) => (Ok(since), maintenance),
666 Err((opaque, since)) => (Err((opaque, since)), maintenance),
667 }
668 }
669
670 pub async fn expire_leased_reader(
671 &self,
672 reader_id: &LeasedReaderId,
673 ) -> (SeqNo, RoutineMaintenance) {
674 let metrics = Arc::clone(&self.applier.metrics);
675 let (seqno, _existed, maintenance) = self
676 .apply_unbatched_idempotent_cmd(&metrics.cmds.expire_reader, |_, _, state| {
677 state.expire_leased_reader(reader_id)
678 })
679 .await;
680 (seqno, maintenance)
681 }
682
683 #[allow(dead_code)] pub async fn expire_critical_reader(
685 &self,
686 reader_id: &CriticalReaderId,
687 ) -> (SeqNo, RoutineMaintenance) {
688 let metrics = Arc::clone(&self.applier.metrics);
689 let (seqno, _existed, maintenance) = self
690 .apply_unbatched_idempotent_cmd(&metrics.cmds.expire_reader, |_, _, state| {
691 state.expire_critical_reader(reader_id)
692 })
693 .await;
694 (seqno, maintenance)
695 }
696
697 pub async fn expire_writer(&self, writer_id: &WriterId) -> (SeqNo, RoutineMaintenance) {
698 let metrics = Arc::clone(&self.applier.metrics);
699 let (seqno, _existed, maintenance) = self
700 .apply_unbatched_idempotent_cmd(&metrics.cmds.expire_writer, |_, _, state| {
701 state.expire_writer(writer_id)
702 })
703 .await;
704 metrics.state.writer_removed.inc();
705 (seqno, maintenance)
706 }
707
708 pub fn is_finalized(&self) -> bool {
709 self.applier.is_finalized()
710 }
711
712 pub fn get_schema(&self, schema_id: SchemaId) -> Option<(K::Schema, V::Schema)> {
714 self.applier.get_schema(schema_id)
715 }
716
717 pub fn latest_schema(&self) -> Option<(SchemaId, K::Schema, V::Schema)> {
719 self.applier.latest_schema()
720 }
721
722 pub fn find_schema(&self, key_schema: &K::Schema, val_schema: &V::Schema) -> Option<SchemaId> {
724 self.applier.find_schema(key_schema, val_schema)
725 }
726
727 pub async fn compare_and_evolve_schema(
731 &self,
732 expected: SchemaId,
733 key_schema: &K::Schema,
734 val_schema: &V::Schema,
735 ) -> (CaESchema<K, V>, RoutineMaintenance) {
736 let metrics = Arc::clone(&self.applier.metrics);
737 let (_seqno, state, maintenance) = self
738 .apply_unbatched_idempotent_cmd(
739 &metrics.cmds.compare_and_evolve_schema,
740 |_seqno, _cfg, state| {
741 state.compare_and_evolve_schema::<K, V>(expected, key_schema, val_schema)
742 },
743 )
744 .await;
745 (state, maintenance)
746 }
747
748 async fn tombstone_step(&self) -> Result<(bool, RoutineMaintenance), InvalidUsage<T>> {
749 let metrics = Arc::clone(&self.applier.metrics);
750 let mut retry = self
751 .applier
752 .metrics
753 .retries
754 .idempotent_cmd
755 .stream(Retry::persist_defaults(SystemTime::now()).into_retry_stream());
756 loop {
757 let res = self
758 .applier
759 .apply_unbatched_cmd(&metrics.cmds.become_tombstone, |_, _, state| {
760 state.become_tombstone_and_shrink()
761 })
762 .await;
763 let err = match res {
764 Ok((_seqno, Ok(()), maintenance)) => return Ok((true, maintenance)),
765 Ok((_seqno, Err(NoOpStateTransition(())), maintenance)) => {
766 return Ok((false, maintenance));
767 }
768 Err(err) => err,
769 };
770 if retry.attempt() >= INFO_MIN_ATTEMPTS {
771 info!(
772 "become_tombstone received an indeterminate error, retrying in {:?}: {}",
773 retry.next_sleep(),
774 err
775 );
776 } else {
777 debug!(
778 "become_tombstone received an indeterminate error, retrying in {:?}: {}",
779 retry.next_sleep(),
780 err
781 );
782 }
783 retry = retry.sleep().await;
784 }
785 }
786
787 pub async fn become_tombstone(&self) -> Result<RoutineMaintenance, InvalidUsage<T>> {
788 self.applier.check_since_upper_both_empty()?;
789
790 let mut maintenance = RoutineMaintenance::default();
791
792 loop {
793 let (made_progress, more_maintenance) = self.tombstone_step().await?;
794 maintenance.merge(more_maintenance);
795 if !made_progress {
796 break;
797 }
798 }
799
800 Ok(maintenance)
801 }
802
803 pub async fn snapshot(&self, as_of: &Antichain<T>) -> Result<Vec<HollowBatch<T>>, Since<T>> {
804 let start = Instant::now();
805 let (mut seqno, mut upper) = match self.applier.snapshot(as_of) {
806 Ok(x) => return Ok(x),
807 Err(SnapshotErr::AsOfNotYetAvailable(seqno, Upper(upper))) => (seqno, upper),
808 Err(SnapshotErr::AsOfHistoricalDistinctionsLost(Since(since))) => {
809 return Err(Since(since));
810 }
811 };
812
813 let mut watch = self.applier.watch();
816 let watch = &mut watch;
817 let sleeps = self
818 .applier
819 .metrics
820 .retries
821 .snapshot
822 .stream(Retry::persist_defaults(SystemTime::now()).into_retry_stream());
823
824 enum Wake<'a, K, V, T, D> {
825 Watch(&'a mut StateWatch<K, V, T, D>),
826 Sleep(MetricsRetryStream),
827 }
828 let mut watch_fut = std::pin::pin!(
829 watch
830 .wait_for_seqno_ge(seqno.next())
831 .map(Wake::Watch)
832 .instrument(trace_span!("snapshot::watch")),
833 );
834 let mut sleep_fut = std::pin::pin!(
835 sleeps
836 .sleep()
837 .map(Wake::Sleep)
838 .instrument(trace_span!("snapshot::sleep")),
839 );
840
841 let mut logged_at_info = false;
845 loop {
846 if !logged_at_info && start.elapsed() >= Duration::from_millis(1024) {
850 logged_at_info = true;
851 info!(
852 "snapshot {} {} as of {:?} not yet available for {} upper {:?}",
853 self.applier.shard_metrics.name,
854 self.shard_id(),
855 as_of.elements(),
856 seqno,
857 upper.elements(),
858 );
859 } else {
860 debug!(
861 "snapshot {} {} as of {:?} not yet available for {} upper {:?}",
862 self.applier.shard_metrics.name,
863 self.shard_id(),
864 as_of.elements(),
865 seqno,
866 upper.elements(),
867 );
868 }
869
870 let wake = match future::select(watch_fut.as_mut(), sleep_fut.as_mut()).await {
871 future::Either::Left((wake, _)) => wake,
872 future::Either::Right((wake, _)) => wake,
873 };
874 match &wake {
878 Wake::Watch(_) => self.applier.metrics.watch.snapshot_woken_via_watch.inc(),
879 Wake::Sleep(_) => {
880 self.applier.metrics.watch.snapshot_woken_via_sleep.inc();
881 self.applier.fetch_and_update_state(Some(seqno)).await;
882 }
883 }
884
885 (seqno, upper) = match self.applier.snapshot(as_of) {
886 Ok(x) => {
887 if logged_at_info {
888 info!(
889 "snapshot {} {} as of {:?} now available",
890 self.applier.shard_metrics.name,
891 self.shard_id(),
892 as_of.elements(),
893 );
894 }
895 return Ok(x);
896 }
897 Err(SnapshotErr::AsOfNotYetAvailable(seqno, Upper(upper))) => {
898 (seqno, upper)
900 }
901 Err(SnapshotErr::AsOfHistoricalDistinctionsLost(Since(since))) => {
902 return Err(Since(since));
903 }
904 };
905
906 match wake {
907 Wake::Watch(watch) => {
908 watch_fut.set(
909 watch
910 .wait_for_seqno_ge(seqno.next())
911 .map(Wake::Watch)
912 .instrument(trace_span!("snapshot::watch")),
913 );
914 }
915 Wake::Sleep(sleeps) => {
916 debug!(
917 "snapshot {} {} sleeping for {:?}",
918 self.applier.shard_metrics.name,
919 self.shard_id(),
920 sleeps.next_sleep()
921 );
922 sleep_fut.set(
923 sleeps
924 .sleep()
925 .map(Wake::Sleep)
926 .instrument(trace_span!("snapshot::sleep")),
927 );
928 }
929 }
930 }
931 }
932
933 pub fn verify_listen(&self, as_of: &Antichain<T>) -> Result<(), Since<T>> {
935 self.applier.verify_listen(as_of)
936 }
937
938 pub async fn next_listen_batch(
939 &self,
940 frontier: &Antichain<T>,
941 watch: &mut StateWatch<K, V, T, D>,
942 reader_id: Option<&LeasedReaderId>,
943 retry: Option<RetryParameters>,
945 ) -> HollowBatch<T> {
946 let mut seqno = match self.applier.next_listen_batch(frontier) {
947 Ok(b) => return b,
948 Err(seqno) => seqno,
949 };
950
951 let retry = retry.unwrap_or_else(|| next_listen_batch_retry_params(&self.applier.cfg));
954 let sleeps = self
955 .applier
956 .metrics
957 .retries
958 .next_listen_batch
959 .stream(retry.into_retry(SystemTime::now()).into_retry_stream());
960
961 enum Wake<'a, K, V, T, D> {
962 Watch(&'a mut StateWatch<K, V, T, D>),
963 Sleep(MetricsRetryStream),
964 }
965 let mut watch_fut = std::pin::pin!(
966 watch
967 .wait_for_seqno_ge(seqno.next())
968 .map(Wake::Watch)
969 .instrument(trace_span!("snapshot::watch"))
970 );
971 let mut sleep_fut = std::pin::pin!(
972 sleeps
973 .sleep()
974 .map(Wake::Sleep)
975 .instrument(trace_span!("snapshot::sleep"))
976 );
977
978 loop {
979 let wake = match future::select(watch_fut.as_mut(), sleep_fut.as_mut()).await {
980 future::Either::Left((wake, _)) => wake,
981 future::Either::Right((wake, _)) => wake,
982 };
983 match &wake {
987 Wake::Watch(_) => self.applier.metrics.watch.listen_woken_via_watch.inc(),
988 Wake::Sleep(_) => {
989 self.applier.metrics.watch.listen_woken_via_sleep.inc();
990 self.applier.fetch_and_update_state(Some(seqno)).await;
991 }
992 }
993
994 seqno = match self.applier.next_listen_batch(frontier) {
995 Ok(b) => {
996 match &wake {
997 Wake::Watch(_) => {
998 self.applier.metrics.watch.listen_resolved_via_watch.inc()
999 }
1000 Wake::Sleep(_) => {
1001 self.applier.metrics.watch.listen_resolved_via_sleep.inc()
1002 }
1003 }
1004 return b;
1005 }
1006 Err(seqno) => seqno,
1007 };
1008
1009 match wake {
1012 Wake::Watch(watch) => {
1013 watch_fut.set(
1014 watch
1015 .wait_for_seqno_ge(seqno.next())
1016 .map(Wake::Watch)
1017 .instrument(trace_span!("snapshot::watch")),
1018 );
1019 }
1020 Wake::Sleep(sleeps) => {
1021 debug!(
1022 "{:?}: {} {} next_listen_batch didn't find new data, retrying in {:?}",
1023 reader_id,
1024 self.applier.shard_metrics.name,
1025 self.shard_id(),
1026 sleeps.next_sleep()
1027 );
1028 sleep_fut.set(
1029 sleeps
1030 .sleep()
1031 .map(Wake::Sleep)
1032 .instrument(trace_span!("snapshot::sleep")),
1033 );
1034 }
1035 }
1036 }
1037 }
1038
1039 async fn apply_unbatched_idempotent_cmd<
1040 R,
1041 WorkFn: FnMut(
1042 SeqNo,
1043 &PersistConfig,
1044 &mut StateCollections<T>,
1045 ) -> ControlFlow<NoOpStateTransition<R>, R>,
1046 >(
1047 &self,
1048 cmd: &CmdMetrics,
1049 mut work_fn: WorkFn,
1050 ) -> (SeqNo, R, RoutineMaintenance) {
1051 let mut retry = self
1052 .applier
1053 .metrics
1054 .retries
1055 .idempotent_cmd
1056 .stream(Retry::persist_defaults(SystemTime::now()).into_retry_stream());
1057 loop {
1058 match self.applier.apply_unbatched_cmd(cmd, &mut work_fn).await {
1059 Ok((seqno, x, maintenance)) => match x {
1060 Ok(x) => {
1061 return (seqno, x, maintenance);
1062 }
1063 Err(NoOpStateTransition(x)) => {
1064 return (seqno, x, maintenance);
1065 }
1066 },
1067 Err(err) => {
1068 if retry.attempt() >= INFO_MIN_ATTEMPTS {
1069 info!(
1070 "apply_unbatched_idempotent_cmd {} received an indeterminate error, retrying in {:?}: {}",
1071 cmd.name,
1072 retry.next_sleep(),
1073 err
1074 );
1075 } else {
1076 debug!(
1077 "apply_unbatched_idempotent_cmd {} received an indeterminate error, retrying in {:?}: {}",
1078 cmd.name,
1079 retry.next_sleep(),
1080 err
1081 );
1082 }
1083 retry = retry.sleep().await;
1084 continue;
1085 }
1086 }
1087 }
1088 }
1089}
1090
1091impl<K, V, T, D> Machine<K, V, T, D>
1092where
1093 K: Debug + Codec,
1094 V: Debug + Codec,
1095 T: Timestamp + Lattice + Codec64 + Sync,
1096 D: Monoid + Codec64 + PartialEq,
1097{
1098 pub async fn merge_res(
1099 &self,
1100 res: &FueledMergeRes<T>,
1101 ) -> (ApplyMergeResult, RoutineMaintenance) {
1102 let metrics = Arc::clone(&self.applier.metrics);
1103
1104 let mut merge_result_ever_applied = ApplyMergeResult::NotAppliedNoMatch;
1133 let (_seqno, _apply_merge_result, maintenance) = self
1134 .apply_unbatched_idempotent_cmd(&metrics.cmds.merge_res, |_, _, state| {
1135 let ret = state.apply_merge_res::<D>(res, &Arc::clone(&metrics).columnar);
1136 if let Continue(result) = ret {
1137 if result.applied() {
1139 merge_result_ever_applied = result;
1140 }
1141 if result.matched() && !result.applied() && !merge_result_ever_applied.applied()
1144 {
1145 merge_result_ever_applied = result;
1146 }
1147 }
1148 ret
1149 })
1150 .await;
1151 (merge_result_ever_applied, maintenance)
1152 }
1153}
1154
1155pub(crate) struct ExpireFn(
1156 pub(crate) Box<dyn FnOnce() -> BoxFuture<'static, ()> + Send + Sync + 'static>,
1161);
1162
1163impl Debug for ExpireFn {
1164 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1165 f.debug_struct("ExpireFn").finish_non_exhaustive()
1166 }
1167}
1168
1169#[derive(Debug)]
1170pub(crate) enum CompareAndAppendRes<T> {
1171 Success(SeqNo, WriterMaintenance<T>),
1172 InvalidUsage(InvalidUsage<T>),
1173 UpperMismatch(SeqNo, Antichain<T>),
1174 InlineBackpressure,
1175}
1176
1177#[cfg(test)]
1178impl<T: Debug> CompareAndAppendRes<T> {
1179 #[track_caller]
1180 fn unwrap(self) -> (SeqNo, WriterMaintenance<T>) {
1181 match self {
1182 CompareAndAppendRes::Success(seqno, maintenance) => (seqno, maintenance),
1183 x => panic!("{:?}", x),
1184 }
1185 }
1186}
1187
1188pub(crate) const NEXT_LISTEN_BATCH_RETRYER_FIXED_SLEEP: Config<Duration> = Config::new(
1189 "persist_next_listen_batch_retryer_fixed_sleep",
1190 Duration::from_millis(1200), "\
1192 The fixed sleep when polling for new batches from a Listen or Subscribe. Skipped if zero.",
1193);
1194
1195pub(crate) const NEXT_LISTEN_BATCH_RETRYER_INITIAL_BACKOFF: Config<Duration> = Config::new(
1196 "persist_next_listen_batch_retryer_initial_backoff",
1197 Duration::from_millis(100), "The initial backoff when polling for new batches from a Listen or Subscribe.",
1199);
1200
1201pub(crate) const NEXT_LISTEN_BATCH_RETRYER_MULTIPLIER: Config<u32> = Config::new(
1202 "persist_next_listen_batch_retryer_multiplier",
1203 2,
1204 "The backoff multiplier when polling for new batches from a Listen or Subscribe.",
1205);
1206
1207pub(crate) const NEXT_LISTEN_BATCH_RETRYER_CLAMP: Config<Duration> = Config::new(
1208 "persist_next_listen_batch_retryer_clamp",
1209 Duration::from_secs(16), "The backoff clamp duration when polling for new batches from a Listen or Subscribe.",
1211);
1212
1213fn next_listen_batch_retry_params(cfg: &ConfigSet) -> RetryParameters {
1214 RetryParameters {
1215 fixed_sleep: NEXT_LISTEN_BATCH_RETRYER_FIXED_SLEEP.get(cfg),
1216 initial_backoff: NEXT_LISTEN_BATCH_RETRYER_INITIAL_BACKOFF.get(cfg),
1217 multiplier: NEXT_LISTEN_BATCH_RETRYER_MULTIPLIER.get(cfg),
1218 clamp: NEXT_LISTEN_BATCH_RETRYER_CLAMP.get(cfg),
1219 }
1220}
1221
1222pub const INFO_MIN_ATTEMPTS: usize = 3;
1223
1224pub async fn retry_external<R, F, WorkFn>(metrics: &RetryMetrics, mut work_fn: WorkFn) -> R
1225where
1226 F: std::future::Future<Output = Result<R, ExternalError>>,
1227 WorkFn: FnMut() -> F,
1228{
1229 let mut retry = metrics.stream(Retry::persist_defaults(SystemTime::now()).into_retry_stream());
1230 loop {
1231 match work_fn().await {
1232 Ok(x) => {
1233 if retry.attempt() > 0 {
1234 debug!(
1235 "external operation {} succeeded after failing at least once",
1236 metrics.name,
1237 );
1238 }
1239 return x;
1240 }
1241 Err(err) => {
1242 if retry.attempt() >= INFO_MIN_ATTEMPTS {
1243 info!(
1244 "external operation {} failed, retrying in {:?}: {}",
1245 metrics.name,
1246 retry.next_sleep(),
1247 err.display_with_causes()
1248 );
1249 } else {
1250 debug!(
1251 "external operation {} failed, retrying in {:?}: {}",
1252 metrics.name,
1253 retry.next_sleep(),
1254 err.display_with_causes()
1255 );
1256 }
1257 retry = retry.sleep().await;
1258 }
1259 }
1260 }
1261}
1262
1263pub async fn retry_determinate<R, F, WorkFn>(
1264 metrics: &RetryMetrics,
1265 mut work_fn: WorkFn,
1266) -> Result<R, Indeterminate>
1267where
1268 F: std::future::Future<Output = Result<R, ExternalError>>,
1269 WorkFn: FnMut() -> F,
1270{
1271 let mut retry = metrics.stream(Retry::persist_defaults(SystemTime::now()).into_retry_stream());
1272 loop {
1273 match work_fn().await {
1274 Ok(x) => {
1275 if retry.attempt() > 0 {
1276 debug!(
1277 "external operation {} succeeded after failing at least once",
1278 metrics.name,
1279 );
1280 }
1281 return Ok(x);
1282 }
1283 Err(ExternalError::Determinate(err)) => {
1284 debug!(
1293 "external operation {} failed, retrying in {:?}: {}",
1294 metrics.name,
1295 retry.next_sleep(),
1296 err.display_with_causes()
1297 );
1298 retry = retry.sleep().await;
1299 continue;
1300 }
1301 Err(ExternalError::Indeterminate(x)) => return Err(x),
1302 }
1303 }
1304}
1305
1306#[cfg(test)]
1307pub mod datadriven {
1308 use std::collections::{BTreeMap, BTreeSet};
1309 use std::pin::pin;
1310 use std::sync::{Arc, LazyLock};
1311
1312 use anyhow::anyhow;
1313 use differential_dataflow::consolidation::consolidate_updates;
1314 use differential_dataflow::trace::Description;
1315 use futures::StreamExt;
1316 use mz_dyncfg::{ConfigUpdates, ConfigVal};
1317 use mz_persist::indexed::encoding::BlobTraceBatchPart;
1318 use mz_persist_types::codec_impls::{StringSchema, UnitSchema};
1319
1320 use crate::batch::{
1321 BLOB_TARGET_SIZE, Batch, BatchBuilder, BatchBuilderConfig, BatchBuilderInternal,
1322 BatchParts, validate_truncate_batch,
1323 };
1324 use crate::cfg::COMPACTION_MEMORY_BOUND_BYTES;
1325 use crate::fetch::{EncodedPart, FetchConfig};
1326 use crate::internal::compact::{CompactConfig, CompactReq, Compactor};
1327 use crate::internal::datadriven::DirectiveArgs;
1328 use crate::internal::encoding::Schemas;
1329 use crate::internal::gc::GcReq;
1330 use crate::internal::paths::{BlobKey, BlobKeyPrefix, PartialBlobKey};
1331 use crate::internal::state::{BatchPart, RunOrder, RunPart};
1332 use crate::internal::state_versions::EncodedRollup;
1333 use crate::internal::trace::{CompactionInput, IdHollowBatch, SpineId};
1334 use crate::read::{Listen, ListenEvent, READER_LEASE_DURATION};
1335 use crate::rpc::NoopPubSubSender;
1336 use crate::tests::new_test_client;
1337 use crate::write::COMBINE_INLINE_WRITES;
1338 use crate::{GarbageCollector, PersistClient};
1339
1340 use super::*;
1341
1342 static SCHEMAS: LazyLock<Schemas<String, ()>> = LazyLock::new(|| Schemas {
1343 id: Some(SchemaId(0)),
1344 key: Arc::new(StringSchema),
1345 val: Arc::new(UnitSchema),
1346 });
1347
1348 #[derive(Debug)]
1350 pub struct MachineState {
1351 pub client: PersistClient,
1352 pub shard_id: ShardId,
1353 pub state_versions: Arc<StateVersions>,
1354 pub machine: Machine<String, (), u64, i64>,
1355 pub gc: GarbageCollector<String, (), u64, i64>,
1356 pub batches: BTreeMap<String, IdHollowBatch<u64>>,
1357 pub next_id: usize,
1358 pub rollups: BTreeMap<String, EncodedRollup>,
1359 pub listens: BTreeMap<String, Listen<String, (), u64, i64>>,
1360 pub routine: Vec<RoutineMaintenance>,
1361 pub compactions: BTreeMap<String, CompactReq<u64>>,
1362 }
1363
1364 impl MachineState {
1365 pub async fn new(dyncfgs: &ConfigUpdates) -> Self {
1366 let shard_id = ShardId::new();
1367 let client = new_test_client(dyncfgs).await;
1368 client
1371 .cfg
1372 .set_config(&BLOB_TARGET_SIZE, *BLOB_TARGET_SIZE.default());
1373 client.cfg.set_config(&COMBINE_INLINE_WRITES, false);
1377 let state_versions = Arc::new(StateVersions::new(
1378 client.cfg.clone(),
1379 Arc::clone(&client.consensus),
1380 Arc::clone(&client.blob),
1381 Arc::clone(&client.metrics),
1382 ));
1383 let machine = Machine::new(
1384 client.cfg.clone(),
1385 shard_id,
1386 Arc::clone(&client.metrics),
1387 Arc::clone(&state_versions),
1388 Arc::clone(&client.shared_states),
1389 Arc::new(NoopPubSubSender),
1390 Arc::clone(&client.isolated_runtime),
1391 Diagnostics::for_tests(),
1392 )
1393 .await
1394 .expect("codecs should match");
1395 let gc = GarbageCollector::new(machine.clone(), Arc::clone(&client.isolated_runtime));
1396 MachineState {
1397 shard_id,
1398 client,
1399 state_versions,
1400 machine,
1401 gc,
1402 batches: BTreeMap::default(),
1403 rollups: BTreeMap::default(),
1404 listens: BTreeMap::default(),
1405 routine: Vec::new(),
1406 compactions: BTreeMap::default(),
1407 next_id: 0,
1408 }
1409 }
1410
1411 fn to_batch(&self, hollow: HollowBatch<u64>) -> Batch<String, (), u64, i64> {
1412 Batch::new(
1413 true,
1414 Arc::clone(&self.client.metrics),
1415 Arc::clone(&self.client.blob),
1416 self.client.metrics.shards.shard(&self.shard_id, "test"),
1417 self.client.cfg.build_version.clone(),
1418 (
1419 <String>::encode_schema(&*SCHEMAS.key),
1420 <()>::encode_schema(&*SCHEMAS.val),
1421 ),
1422 hollow,
1423 )
1424 }
1425 }
1426
1427 pub async fn consensus_scan(
1430 datadriven: &MachineState,
1431 args: DirectiveArgs<'_>,
1432 ) -> Result<String, anyhow::Error> {
1433 let from = args.expect("from_seqno");
1434
1435 let mut states = datadriven
1436 .state_versions
1437 .fetch_all_live_states::<u64>(datadriven.shard_id)
1438 .await
1439 .expect("should only be called on an initialized shard")
1440 .check_ts_codec()
1441 .expect("shard codecs should not change");
1442 let mut s = String::new();
1443 while let Some(x) = states.next(|_| {}) {
1444 if x.seqno < from {
1445 continue;
1446 }
1447 let rollups: Vec<_> = x
1448 .collections
1449 .rollups
1450 .keys()
1451 .map(|seqno| seqno.to_string())
1452 .collect();
1453 let batches: Vec<_> = x
1454 .collections
1455 .trace
1456 .batches()
1457 .filter(|b| !b.is_empty())
1458 .filter_map(|b| {
1459 datadriven
1460 .batches
1461 .iter()
1462 .find(|(_, original_batch)| original_batch.batch.parts == b.parts)
1463 .map(|(batch_name, _)| batch_name.to_owned())
1464 })
1465 .collect();
1466 write!(
1467 s,
1468 "seqno={} batches={} rollups={}\n",
1469 x.seqno,
1470 batches.join(","),
1471 rollups.join(","),
1472 );
1473 }
1474 Ok(s)
1475 }
1476
1477 pub async fn consensus_truncate(
1478 datadriven: &MachineState,
1479 args: DirectiveArgs<'_>,
1480 ) -> Result<String, anyhow::Error> {
1481 let to = args.expect("to_seqno");
1482 let removed = datadriven
1483 .client
1484 .consensus
1485 .truncate(&datadriven.shard_id.to_string(), to)
1486 .await
1487 .expect("valid truncation");
1488 Ok(format!("{:?}\n", removed))
1489 }
1490
1491 pub async fn blob_scan_batches(
1492 datadriven: &MachineState,
1493 _args: DirectiveArgs<'_>,
1494 ) -> Result<String, anyhow::Error> {
1495 let key_prefix = BlobKeyPrefix::Shard(&datadriven.shard_id).to_string();
1496
1497 let mut s = String::new();
1498 let () = datadriven
1499 .state_versions
1500 .blob
1501 .list_keys_and_metadata(&key_prefix, &mut |x| {
1502 let (_, key) = BlobKey::parse_ids(x.key).expect("key should be valid");
1503 if let PartialBlobKey::Batch(_, _) = key {
1504 write!(s, "{}: {}b\n", x.key, x.size_in_bytes);
1505 }
1506 })
1507 .await?;
1508 Ok(s)
1509 }
1510
1511 #[allow(clippy::unused_async)]
1512 pub async fn shard_desc(
1513 datadriven: &MachineState,
1514 _args: DirectiveArgs<'_>,
1515 ) -> Result<String, anyhow::Error> {
1516 Ok(format!(
1517 "since={:?} upper={:?}\n",
1518 datadriven.machine.applier.since().elements(),
1519 datadriven.machine.applier.clone_upper().elements()
1520 ))
1521 }
1522
1523 pub async fn downgrade_since(
1524 datadriven: &mut MachineState,
1525 args: DirectiveArgs<'_>,
1526 ) -> Result<String, anyhow::Error> {
1527 let since = args.expect_antichain("since");
1528 let seqno = args
1529 .optional("seqno")
1530 .unwrap_or_else(|| datadriven.machine.seqno());
1531 let reader_id = args.expect("reader_id");
1532 let (_, since, routine) = datadriven
1533 .machine
1534 .downgrade_since(
1535 &reader_id,
1536 seqno,
1537 &since,
1538 (datadriven.machine.applier.cfg.now)(),
1539 )
1540 .await;
1541 datadriven.routine.push(routine);
1542 Ok(format!(
1543 "{} {:?}\n",
1544 datadriven.machine.seqno(),
1545 since.0.elements()
1546 ))
1547 }
1548
1549 #[allow(clippy::unused_async)]
1550 pub async fn dyncfg(
1551 datadriven: &MachineState,
1552 args: DirectiveArgs<'_>,
1553 ) -> Result<String, anyhow::Error> {
1554 let mut updates = ConfigUpdates::default();
1555 for x in args.input.trim().split('\n') {
1556 match x.split(' ').collect::<Vec<_>>().as_slice() {
1557 &[name, val] => {
1558 let config = datadriven
1559 .client
1560 .cfg
1561 .entries()
1562 .find(|x| x.name() == name)
1563 .ok_or_else(|| anyhow!("unknown dyncfg: {}", name))?;
1564 match config.val() {
1565 ConfigVal::Usize(_) => {
1566 let val = val.parse().map_err(anyhow::Error::new)?;
1567 updates.add_dynamic(name, ConfigVal::Usize(val));
1568 }
1569 ConfigVal::Bool(_) => {
1570 let val = val.parse().map_err(anyhow::Error::new)?;
1571 updates.add_dynamic(name, ConfigVal::Bool(val));
1572 }
1573 x => unimplemented!("dyncfg type: {:?}", x),
1574 }
1575 }
1576 x => return Err(anyhow!("expected `name val` got: {:?}", x)),
1577 }
1578 }
1579 updates.apply(&datadriven.client.cfg);
1580
1581 Ok("ok\n".to_string())
1582 }
1583
1584 pub async fn compare_and_downgrade_since(
1585 datadriven: &mut MachineState,
1586 args: DirectiveArgs<'_>,
1587 ) -> Result<String, anyhow::Error> {
1588 let expected_opaque: u64 = args.expect("expect_opaque");
1589 let new_opaque: u64 = args.expect("opaque");
1590 let new_since = args.expect_antichain("since");
1591 let reader_id = args.expect("reader_id");
1592 let (res, routine) = datadriven
1593 .machine
1594 .compare_and_downgrade_since(
1595 &reader_id,
1596 &Opaque::encode(&expected_opaque),
1597 (&Opaque::encode(&new_opaque), &new_since),
1598 )
1599 .await;
1600 datadriven.routine.push(routine);
1601 let since = res.map_err(|(opaque, since)| {
1602 anyhow!(
1603 "mismatch: opaque={} since={:?}",
1604 opaque.decode::<u64>(),
1605 since.0.elements()
1606 )
1607 })?;
1608 Ok(format!(
1609 "{} {} {:?}\n",
1610 datadriven.machine.seqno(),
1611 new_opaque,
1612 since.0.elements()
1613 ))
1614 }
1615
1616 pub async fn write_rollup(
1617 datadriven: &mut MachineState,
1618 args: DirectiveArgs<'_>,
1619 ) -> Result<String, anyhow::Error> {
1620 let output = args.expect_str("output");
1621
1622 let rollup = datadriven
1623 .machine
1624 .applier
1625 .write_rollup_for_state()
1626 .await
1627 .expect("rollup");
1628
1629 datadriven
1630 .rollups
1631 .insert(output.to_string(), rollup.clone());
1632
1633 Ok(format!(
1634 "state={} diffs=[{}, {})\n",
1635 rollup.seqno,
1636 rollup._desc.lower().first().expect("seqno"),
1637 rollup._desc.upper().first().expect("seqno"),
1638 ))
1639 }
1640
1641 pub async fn add_rollup(
1642 datadriven: &mut MachineState,
1643 args: DirectiveArgs<'_>,
1644 ) -> Result<String, anyhow::Error> {
1645 let input = args.expect_str("input");
1646 let rollup = datadriven
1647 .rollups
1648 .get(input)
1649 .expect("unknown batch")
1650 .clone();
1651
1652 let (applied, maintenance) = datadriven
1653 .machine
1654 .add_rollup((rollup.seqno, &rollup.to_hollow()))
1655 .await;
1656
1657 if !applied {
1658 return Err(anyhow!("failed to apply rollup for: {}", rollup.seqno));
1659 }
1660
1661 datadriven.routine.push(maintenance);
1662 Ok(format!("{}\n", datadriven.machine.seqno()))
1663 }
1664
1665 pub async fn write_batch(
1666 datadriven: &mut MachineState,
1667 args: DirectiveArgs<'_>,
1668 ) -> Result<String, anyhow::Error> {
1669 let output = args.expect_str("output");
1670 let lower = args.expect_antichain("lower");
1671 let upper = args.expect_antichain("upper");
1672 assert!(PartialOrder::less_than(&lower, &upper));
1673 let since = args
1674 .optional_antichain("since")
1675 .unwrap_or_else(|| Antichain::from_elem(0));
1676 let target_size = args.optional("target_size");
1677 let parts_size_override = args.optional("parts_size_override");
1678 let consolidate = args.optional("consolidate").unwrap_or(true);
1679 let mut updates: Vec<_> = args
1680 .input
1681 .split('\n')
1682 .flat_map(DirectiveArgs::parse_update)
1683 .collect();
1684
1685 let mut cfg = BatchBuilderConfig::new(&datadriven.client.cfg, datadriven.shard_id);
1686 if let Some(target_size) = target_size {
1687 cfg.blob_target_size = target_size;
1688 };
1689 if consolidate {
1690 consolidate_updates(&mut updates);
1691 }
1692 let run_order = if consolidate {
1693 cfg.preferred_order
1694 } else {
1695 RunOrder::Unordered
1696 };
1697 let parts = BatchParts::new_ordered::<i64>(
1698 cfg.clone(),
1699 run_order,
1700 Arc::clone(&datadriven.client.metrics),
1701 Arc::clone(&datadriven.machine.applier.shard_metrics),
1702 datadriven.shard_id,
1703 Arc::clone(&datadriven.client.blob),
1704 Arc::clone(&datadriven.client.isolated_runtime),
1705 &datadriven.client.metrics.user,
1706 );
1707 let builder = BatchBuilderInternal::new(
1708 cfg.clone(),
1709 parts,
1710 Arc::clone(&datadriven.client.metrics),
1711 SCHEMAS.clone(),
1712 Arc::clone(&datadriven.client.blob),
1713 datadriven.shard_id.clone(),
1714 datadriven.client.cfg.build_version.clone(),
1715 );
1716 let mut builder = BatchBuilder::new(builder, Description::new(lower, upper.clone(), since));
1717 for ((k, ()), t, d) in updates {
1718 builder.add(&k, &(), &t, &d).await.expect("invalid batch");
1719 }
1720 let mut batch = builder.finish(upper).await?;
1721 if parts_size_override.is_some() {
1724 batch
1725 .flush_to_blob(
1726 &cfg,
1727 &datadriven.client.metrics.user,
1728 &datadriven.client.isolated_runtime,
1729 &SCHEMAS,
1730 )
1731 .await;
1732 }
1733 let batch = batch.into_hollow_batch();
1734 let batch = IdHollowBatch {
1735 batch: Arc::new(batch),
1736 id: SpineId(datadriven.next_id, datadriven.next_id + 1),
1737 };
1738 datadriven.next_id += 1;
1739
1740 if let Some(size) = parts_size_override {
1741 let mut batch = batch.clone();
1742 let mut hollow_batch = (*batch.batch).clone();
1743 for part in hollow_batch.parts.iter_mut() {
1744 match part {
1745 RunPart::Many(run) => run.max_part_bytes = size,
1746 RunPart::Single(BatchPart::Hollow(part)) => part.encoded_size_bytes = size,
1747 RunPart::Single(BatchPart::Inline { .. }) => unreachable!("flushed out above"),
1748 }
1749 }
1750 batch.batch = Arc::new(hollow_batch);
1751 datadriven.batches.insert(output.to_owned(), batch);
1752 } else {
1753 datadriven.batches.insert(output.to_owned(), batch.clone());
1754 }
1755 Ok(format!(
1756 "parts={} len={}\n",
1757 batch.batch.part_count(),
1758 batch.batch.len
1759 ))
1760 }
1761
1762 pub async fn fetch_batch(
1763 datadriven: &MachineState,
1764 args: DirectiveArgs<'_>,
1765 ) -> Result<String, anyhow::Error> {
1766 let input = args.expect_str("input");
1767 let stats = args.optional_str("stats");
1768 let batch = datadriven.batches.get(input).expect("unknown batch");
1769
1770 let mut s = String::new();
1771 let mut stream = pin!(
1772 batch
1773 .batch
1774 .part_stream(
1775 datadriven.shard_id,
1776 &*datadriven.state_versions.blob,
1777 &*datadriven.state_versions.metrics
1778 )
1779 .enumerate()
1780 );
1781 while let Some((idx, part)) = stream.next().await {
1782 let part = &*part?;
1783 write!(s, "<part {idx}>\n");
1784
1785 let lower = match part {
1786 BatchPart::Inline { updates, .. } => {
1787 let updates: BlobTraceBatchPart<u64> =
1788 updates.decode(&datadriven.client.metrics.columnar)?;
1789 updates.structured_key_lower()
1790 }
1791 other => other.structured_key_lower(),
1792 };
1793
1794 if let Some(lower) = lower {
1795 if stats == Some("lower") {
1796 writeln!(s, "<key lower={}>", lower.get())
1797 }
1798 }
1799
1800 match part {
1801 BatchPart::Hollow(part) => {
1802 let blob_batch = datadriven
1803 .client
1804 .blob
1805 .get(&part.key.complete(&datadriven.shard_id))
1806 .await;
1807 match blob_batch {
1808 Ok(Some(_)) | Err(_) => {}
1809 Ok(None) => {
1812 s.push_str("<empty>\n");
1813 continue;
1814 }
1815 };
1816 }
1817 BatchPart::Inline { .. } => {}
1818 };
1819 let part = EncodedPart::fetch(
1820 &FetchConfig::from_persist_config(&datadriven.client.cfg),
1821 &datadriven.shard_id,
1822 datadriven.client.blob.as_ref(),
1823 datadriven.client.metrics.as_ref(),
1824 datadriven.machine.applier.shard_metrics.as_ref(),
1825 &datadriven.client.metrics.read.batch_fetcher,
1826 &batch.batch.desc,
1827 part,
1828 )
1829 .await
1830 .expect("invalid batch part");
1831 let part = part
1832 .normalize(&datadriven.client.metrics.columnar)
1833 .into_part::<String, ()>(&*SCHEMAS.key, &*SCHEMAS.val);
1834
1835 for ((k, _v), t, d) in part
1836 .decode_iter::<_, _, u64, i64>(&*SCHEMAS.key, &*SCHEMAS.val)
1837 .expect("valid schemas")
1838 {
1839 writeln!(s, "{k} {t} {d}");
1840 }
1841 }
1842 if !s.is_empty() {
1843 for (idx, (_meta, run)) in batch.batch.runs().enumerate() {
1844 write!(s, "<run {idx}>\n");
1845 for part in run {
1846 let part_idx = batch
1847 .batch
1848 .parts
1849 .iter()
1850 .position(|p| p == part)
1851 .expect("part should exist");
1852 write!(s, "part {part_idx}\n");
1853 }
1854 }
1855 }
1856 Ok(s)
1857 }
1858
1859 #[allow(clippy::unused_async)]
1860 pub async fn truncate_batch_desc(
1861 datadriven: &mut MachineState,
1862 args: DirectiveArgs<'_>,
1863 ) -> Result<String, anyhow::Error> {
1864 let input = args.expect_str("input");
1865 let output = args.expect_str("output");
1866 let lower = args.expect_antichain("lower");
1867 let upper = args.expect_antichain("upper");
1868
1869 let batch = datadriven
1870 .batches
1871 .get(input)
1872 .expect("unknown batch")
1873 .clone();
1874 let truncated_desc = Description::new(lower, upper, batch.batch.desc.since().clone());
1875 let () = validate_truncate_batch(&batch.batch, &truncated_desc, false, true)?;
1876 let mut new_hollow_batch = (*batch.batch).clone();
1877 new_hollow_batch.desc = truncated_desc;
1878 let new_batch = IdHollowBatch {
1879 batch: Arc::new(new_hollow_batch),
1880 id: batch.id,
1881 };
1882 datadriven
1883 .batches
1884 .insert(output.to_owned(), new_batch.clone());
1885 Ok(format!(
1886 "parts={} len={}\n",
1887 batch.batch.part_count(),
1888 batch.batch.len
1889 ))
1890 }
1891
1892 #[allow(clippy::unused_async)]
1893 pub async fn set_batch_parts_size(
1894 datadriven: &mut MachineState,
1895 args: DirectiveArgs<'_>,
1896 ) -> Result<String, anyhow::Error> {
1897 let input = args.expect_str("input");
1898 let size = args.expect("size");
1899 let batch = datadriven.batches.get_mut(input).expect("unknown batch");
1900 let mut hollow_batch = (*batch.batch).clone();
1901 for part in hollow_batch.parts.iter_mut() {
1902 match part {
1903 RunPart::Single(BatchPart::Hollow(x)) => x.encoded_size_bytes = size,
1904 _ => {
1905 panic!("set_batch_parts_size only supports hollow parts")
1906 }
1907 }
1908 }
1909 batch.batch = Arc::new(hollow_batch);
1910 Ok("ok\n".to_string())
1911 }
1912
1913 pub async fn compact(
1914 datadriven: &mut MachineState,
1915 args: DirectiveArgs<'_>,
1916 ) -> Result<String, anyhow::Error> {
1917 let output = args.expect_str("output");
1918 let lower = args.expect_antichain("lower");
1919 let upper = args.expect_antichain("upper");
1920 let since = args.expect_antichain("since");
1921 let target_size = args.optional("target_size");
1922 let memory_bound = args.optional("memory_bound");
1923
1924 let mut inputs = Vec::new();
1925 for input in args.args.get("inputs").expect("missing inputs") {
1926 inputs.push(
1927 datadriven
1928 .batches
1929 .get(input)
1930 .expect("unknown batch")
1931 .clone(),
1932 );
1933 }
1934
1935 let cfg = datadriven.client.cfg.clone();
1936 if let Some(target_size) = target_size {
1937 cfg.set_config(&BLOB_TARGET_SIZE, target_size);
1938 };
1939 if let Some(memory_bound) = memory_bound {
1940 cfg.set_config(&COMPACTION_MEMORY_BOUND_BYTES, memory_bound);
1941 }
1942 let req = CompactReq {
1943 shard_id: datadriven.shard_id,
1944 desc: Description::new(lower, upper, since),
1945 inputs: inputs.clone(),
1946 };
1947 datadriven
1948 .compactions
1949 .insert(output.to_owned(), req.clone());
1950 let spine_lower = inputs
1951 .first()
1952 .map_or_else(|| datadriven.next_id, |x| x.id.0);
1953 let spine_upper = inputs.last().map_or_else(
1954 || {
1955 datadriven.next_id += 1;
1956 datadriven.next_id
1957 },
1958 |x| x.id.1,
1959 );
1960 let new_spine_id = SpineId(spine_lower, spine_upper);
1961 let res = Compactor::<String, (), u64, i64>::compact(
1962 CompactConfig::new(&cfg, datadriven.shard_id),
1963 Arc::clone(&datadriven.client.blob),
1964 Arc::clone(&datadriven.client.metrics),
1965 Arc::clone(&datadriven.machine.applier.shard_metrics),
1966 Arc::clone(&datadriven.client.isolated_runtime),
1967 req,
1968 SCHEMAS.clone(),
1969 )
1970 .await?;
1971
1972 let batch = IdHollowBatch {
1973 batch: Arc::new(res.output.clone()),
1974 id: new_spine_id,
1975 };
1976
1977 datadriven.batches.insert(output.to_owned(), batch.clone());
1978 Ok(format!(
1979 "parts={} len={}\n",
1980 res.output.part_count(),
1981 res.output.len
1982 ))
1983 }
1984
1985 pub async fn clear_blob(
1986 datadriven: &MachineState,
1987 _args: DirectiveArgs<'_>,
1988 ) -> Result<String, anyhow::Error> {
1989 let mut to_delete = vec![];
1990 datadriven
1991 .client
1992 .blob
1993 .list_keys_and_metadata("", &mut |meta| {
1994 to_delete.push(meta.key.to_owned());
1995 })
1996 .await?;
1997 for blob in &to_delete {
1998 datadriven.client.blob.delete(blob).await?;
1999 }
2000 Ok(format!("deleted={}\n", to_delete.len()))
2001 }
2002
2003 pub async fn restore_blob(
2004 datadriven: &MachineState,
2005 _args: DirectiveArgs<'_>,
2006 ) -> Result<String, anyhow::Error> {
2007 let not_restored = crate::internal::restore::restore_blob(
2008 &datadriven.state_versions,
2009 datadriven.client.blob.as_ref(),
2010 &datadriven.client.cfg.build_version,
2011 datadriven.shard_id,
2012 &*datadriven.state_versions.metrics,
2013 )
2014 .await?;
2015 let mut out = String::new();
2016 for key in not_restored {
2017 writeln!(&mut out, "{key}");
2018 }
2019 Ok(out)
2020 }
2021
2022 #[allow(clippy::unused_async)]
2023 pub async fn rewrite_ts(
2024 datadriven: &mut MachineState,
2025 args: DirectiveArgs<'_>,
2026 ) -> Result<String, anyhow::Error> {
2027 let input = args.expect_str("input");
2028 let ts_rewrite = args.expect_antichain("frontier");
2029 let upper = args.expect_antichain("upper");
2030
2031 let batch = datadriven.batches.get_mut(input).expect("unknown batch");
2032 let mut hollow_batch = (*batch.batch).clone();
2033 let () = hollow_batch
2034 .rewrite_ts(&ts_rewrite, upper)
2035 .map_err(|err| anyhow!("invalid rewrite: {}", err))?;
2036 batch.batch = Arc::new(hollow_batch);
2037 Ok("ok\n".into())
2038 }
2039
2040 pub async fn gc(
2041 datadriven: &mut MachineState,
2042 args: DirectiveArgs<'_>,
2043 ) -> Result<String, anyhow::Error> {
2044 let new_seqno_since = args.expect("to_seqno");
2045
2046 let req = GcReq {
2047 shard_id: datadriven.shard_id,
2048 new_seqno_since,
2049 };
2050 let (maintenance, stats) =
2051 GarbageCollector::gc_and_truncate(&datadriven.machine, req).await;
2052 datadriven.routine.push(maintenance);
2053
2054 Ok(format!(
2055 "{} batch_parts={} rollups={} truncated={} state_rollups={}\n",
2056 datadriven.machine.seqno(),
2057 stats.batch_parts_deleted_from_blob,
2058 stats.rollups_deleted_from_blob,
2059 stats
2060 .truncated_consensus_to
2061 .iter()
2062 .map(|x| x.to_string())
2063 .collect::<Vec<_>>()
2064 .join(","),
2065 stats
2066 .rollups_removed_from_state
2067 .iter()
2068 .map(|x| x.to_string())
2069 .collect::<Vec<_>>()
2070 .join(","),
2071 ))
2072 }
2073
2074 pub async fn snapshot(
2075 datadriven: &MachineState,
2076 args: DirectiveArgs<'_>,
2077 ) -> Result<String, anyhow::Error> {
2078 let as_of = args.expect_antichain("as_of");
2079 let snapshot = datadriven
2080 .machine
2081 .snapshot(&as_of)
2082 .await
2083 .map_err(|err| anyhow!("{:?}", err))?;
2084
2085 let mut result = String::new();
2086
2087 for batch in snapshot {
2088 writeln!(
2089 result,
2090 "<batch {:?}-{:?}>",
2091 batch.desc.lower().elements(),
2092 batch.desc.upper().elements()
2093 );
2094 for (run, (_meta, parts)) in batch.runs().enumerate() {
2095 writeln!(result, "<run {run}>");
2096 let mut stream = pin!(
2097 futures::stream::iter(parts)
2098 .flat_map(|part| part.part_stream(
2099 datadriven.shard_id,
2100 &*datadriven.state_versions.blob,
2101 &*datadriven.state_versions.metrics
2102 ))
2103 .enumerate()
2104 );
2105
2106 while let Some((idx, part)) = stream.next().await {
2107 let part = &*part?;
2108 writeln!(result, "<part {idx}>");
2109
2110 let part = EncodedPart::fetch(
2111 &FetchConfig::from_persist_config(&datadriven.client.cfg),
2112 &datadriven.shard_id,
2113 datadriven.client.blob.as_ref(),
2114 datadriven.client.metrics.as_ref(),
2115 datadriven.machine.applier.shard_metrics.as_ref(),
2116 &datadriven.client.metrics.read.batch_fetcher,
2117 &batch.desc,
2118 part,
2119 )
2120 .await
2121 .expect("invalid batch part");
2122 let part = part
2123 .normalize(&datadriven.client.metrics.columnar)
2124 .into_part::<String, ()>(&*SCHEMAS.key, &*SCHEMAS.val);
2125
2126 let mut updates = Vec::new();
2127
2128 for ((k, _v), mut t, d) in part
2129 .decode_iter::<_, _, u64, i64>(&*SCHEMAS.key, &*SCHEMAS.val)
2130 .expect("valid schemas")
2131 {
2132 t.advance_by(as_of.borrow());
2133 updates.push((k, t, d));
2134 }
2135
2136 consolidate_updates(&mut updates);
2137
2138 for (k, t, d) in updates {
2139 writeln!(result, "{k} {t} {d}");
2140 }
2141 }
2142 }
2143 }
2144
2145 Ok(result)
2146 }
2147
2148 pub async fn register_listen(
2149 datadriven: &mut MachineState,
2150 args: DirectiveArgs<'_>,
2151 ) -> Result<String, anyhow::Error> {
2152 let output = args.expect_str("output");
2153 let as_of = args.expect_antichain("as_of");
2154 let read = datadriven
2155 .client
2156 .open_leased_reader::<String, (), u64, i64>(
2157 datadriven.shard_id,
2158 Arc::new(StringSchema),
2159 Arc::new(UnitSchema),
2160 Diagnostics::for_tests(),
2161 true,
2162 )
2163 .await
2164 .expect("invalid shard types");
2165 let listen = read
2166 .listen(as_of)
2167 .await
2168 .map_err(|err| anyhow!("{:?}", err))?;
2169 datadriven.listens.insert(output.to_owned(), listen);
2170 Ok("ok\n".into())
2171 }
2172
2173 pub async fn listen_through(
2174 datadriven: &mut MachineState,
2175 args: DirectiveArgs<'_>,
2176 ) -> Result<String, anyhow::Error> {
2177 let input = args.expect_str("input");
2178 let frontier = args.expect("frontier");
2181 let listen = datadriven.listens.get_mut(input).expect("unknown listener");
2182 let mut s = String::new();
2183 loop {
2184 for event in listen.fetch_next().await {
2185 match event {
2186 ListenEvent::Updates(x) => {
2187 for ((k, _v), t, d) in x.iter() {
2188 write!(s, "{} {} {}\n", k, t, d);
2189 }
2190 }
2191 ListenEvent::Progress(x) => {
2192 if !x.less_than(&frontier) {
2193 return Ok(s);
2194 }
2195 }
2196 }
2197 }
2198 }
2199 }
2200
2201 pub async fn register_critical_reader(
2202 datadriven: &mut MachineState,
2203 args: DirectiveArgs<'_>,
2204 ) -> Result<String, anyhow::Error> {
2205 let reader_id = args.expect("reader_id");
2206 let (state, maintenance) = datadriven
2207 .machine
2208 .register_critical_reader(&reader_id, Opaque::encode(&0u64), "tests")
2209 .await;
2210 datadriven.routine.push(maintenance);
2211 Ok(format!(
2212 "{} {:?}\n",
2213 datadriven.machine.seqno(),
2214 state.since.elements(),
2215 ))
2216 }
2217
2218 pub async fn register_leased_reader(
2219 datadriven: &mut MachineState,
2220 args: DirectiveArgs<'_>,
2221 ) -> Result<String, anyhow::Error> {
2222 let reader_id = args.expect("reader_id");
2223 let (reader_state, maintenance) = datadriven
2224 .machine
2225 .register_leased_reader(
2226 &reader_id,
2227 "tests",
2228 READER_LEASE_DURATION.get(&datadriven.client.cfg),
2229 (datadriven.client.cfg.now)(),
2230 false,
2231 )
2232 .await;
2233 datadriven.routine.push(maintenance);
2234 Ok(format!(
2235 "{} {:?}\n",
2236 datadriven.machine.seqno(),
2237 reader_state.since.elements(),
2238 ))
2239 }
2240
2241 pub async fn expire_critical_reader(
2242 datadriven: &mut MachineState,
2243 args: DirectiveArgs<'_>,
2244 ) -> Result<String, anyhow::Error> {
2245 let reader_id = args.expect("reader_id");
2246 let (_, maintenance) = datadriven.machine.expire_critical_reader(&reader_id).await;
2247 datadriven.routine.push(maintenance);
2248 Ok(format!("{} ok\n", datadriven.machine.seqno()))
2249 }
2250
2251 pub async fn expire_leased_reader(
2252 datadriven: &mut MachineState,
2253 args: DirectiveArgs<'_>,
2254 ) -> Result<String, anyhow::Error> {
2255 let reader_id = args.expect("reader_id");
2256 let (_, maintenance) = datadriven.machine.expire_leased_reader(&reader_id).await;
2257 datadriven.routine.push(maintenance);
2258 Ok(format!("{} ok\n", datadriven.machine.seqno()))
2259 }
2260
2261 pub async fn compare_and_append_batches(
2262 datadriven: &MachineState,
2263 args: DirectiveArgs<'_>,
2264 ) -> Result<String, anyhow::Error> {
2265 let expected_upper = args.expect_antichain("expected_upper");
2266 let new_upper = args.expect_antichain("new_upper");
2267
2268 let mut batches: Vec<Batch<String, (), u64, i64>> = args
2269 .args
2270 .get("batches")
2271 .expect("missing batches")
2272 .into_iter()
2273 .map(|batch| {
2274 let hollow = (*datadriven
2275 .batches
2276 .get(batch)
2277 .expect("unknown batch")
2278 .clone()
2279 .batch)
2280 .clone();
2281 datadriven.to_batch(hollow)
2282 })
2283 .collect();
2284
2285 let mut writer = datadriven
2286 .client
2287 .open_writer(
2288 datadriven.shard_id,
2289 Arc::new(StringSchema),
2290 Arc::new(UnitSchema),
2291 Diagnostics::for_tests(),
2292 )
2293 .await?;
2294
2295 let mut batch_refs: Vec<_> = batches.iter_mut().collect();
2296
2297 let () = writer
2298 .compare_and_append_batch(batch_refs.as_mut_slice(), expected_upper, new_upper, true)
2299 .await?
2300 .map_err(|err| anyhow!("upper mismatch: {:?}", err))?;
2301
2302 writer.expire().await;
2303
2304 Ok("ok\n".into())
2305 }
2306
2307 pub async fn expire_writer(
2308 datadriven: &mut MachineState,
2309 args: DirectiveArgs<'_>,
2310 ) -> Result<String, anyhow::Error> {
2311 let writer_id = args.expect("writer_id");
2312 let (_, maintenance) = datadriven.machine.expire_writer(&writer_id).await;
2313 datadriven.routine.push(maintenance);
2314 Ok(format!("{} ok\n", datadriven.machine.seqno()))
2315 }
2316
2317 pub(crate) async fn finalize(
2318 datadriven: &mut MachineState,
2319 _args: DirectiveArgs<'_>,
2320 ) -> anyhow::Result<String> {
2321 let maintenance = datadriven.machine.become_tombstone().await?;
2322 datadriven.routine.push(maintenance);
2323 Ok(format!("{} ok\n", datadriven.machine.seqno()))
2324 }
2325
2326 pub(crate) fn is_finalized(
2327 datadriven: &MachineState,
2328 _args: DirectiveArgs<'_>,
2329 ) -> anyhow::Result<String> {
2330 let seqno = datadriven.machine.seqno();
2331 let tombstone = datadriven.machine.is_finalized();
2332 Ok(format!("{seqno} {tombstone}\n"))
2333 }
2334
2335 pub async fn compare_and_append(
2336 datadriven: &mut MachineState,
2337 args: DirectiveArgs<'_>,
2338 ) -> Result<String, anyhow::Error> {
2339 let input = args.expect_str("input");
2340 let writer_id = args.expect("writer_id");
2341 let mut batch = datadriven
2342 .batches
2343 .get(input)
2344 .expect("unknown batch")
2345 .clone();
2346 let token = args.optional("token").unwrap_or_else(IdempotencyToken::new);
2347 let now = (datadriven.client.cfg.now)();
2348
2349 let (id, maintenance) = datadriven
2350 .machine
2351 .register_schema(&*SCHEMAS.key, &*SCHEMAS.val)
2352 .await;
2353 assert_eq!(id, SCHEMAS.id);
2354 datadriven.routine.push(maintenance);
2355 let maintenance = loop {
2356 let indeterminate = args
2357 .optional::<String>("prev_indeterminate")
2358 .map(|x| Indeterminate::new(anyhow::Error::msg(x)));
2359 let res = datadriven
2360 .machine
2361 .compare_and_append_idempotent(
2362 &batch.batch,
2363 &writer_id,
2364 now,
2365 &token,
2366 &HandleDebugState::default(),
2367 indeterminate,
2368 )
2369 .await;
2370 match res {
2371 CompareAndAppendRes::Success(_, x) => break x,
2372 CompareAndAppendRes::UpperMismatch(_seqno, upper) => {
2373 return Err(anyhow!("{:?}", Upper(upper)));
2374 }
2375 CompareAndAppendRes::InlineBackpressure => {
2376 let hollow_batch = (*batch.batch).clone();
2377 let mut b = datadriven.to_batch(hollow_batch);
2378 let cfg = BatchBuilderConfig::new(&datadriven.client.cfg, datadriven.shard_id);
2379 b.flush_to_blob(
2380 &cfg,
2381 &datadriven.client.metrics.user,
2382 &datadriven.client.isolated_runtime,
2383 &*SCHEMAS,
2384 )
2385 .await;
2386 batch.batch = Arc::new(b.into_hollow_batch());
2387 continue;
2388 }
2389 _ => panic!("{:?}", res),
2390 };
2391 };
2392 datadriven.routine.push(maintenance.routine);
2395 Ok(format!(
2396 "{} {:?}\n",
2397 datadriven.machine.seqno(),
2398 datadriven.machine.applier.clone_upper().elements(),
2399 ))
2400 }
2401
2402 pub async fn apply_merge_res(
2403 datadriven: &mut MachineState,
2404 args: DirectiveArgs<'_>,
2405 ) -> Result<String, anyhow::Error> {
2406 let input = args.expect_str("input");
2407 let batch = datadriven
2408 .batches
2409 .get(input)
2410 .expect("unknown batch")
2411 .clone();
2412 let compact_req = datadriven
2413 .compactions
2414 .get(input)
2415 .expect("unknown compact req")
2416 .clone();
2417 let input_batches = compact_req
2418 .inputs
2419 .iter()
2420 .map(|x| x.id)
2421 .collect::<BTreeSet<_>>();
2422 let lower_spine_bound = input_batches
2423 .first()
2424 .map(|id| id.0)
2425 .expect("at least one batch must be present");
2426 let upper_spine_bound = input_batches
2427 .last()
2428 .map(|id| id.1)
2429 .expect("at least one batch must be present");
2430 let id = SpineId(lower_spine_bound, upper_spine_bound);
2431 let hollow_batch = (*batch.batch).clone();
2432
2433 let (merge_res, maintenance) = datadriven
2434 .machine
2435 .merge_res(&FueledMergeRes {
2436 output: hollow_batch,
2437 input: CompactionInput::IdRange(id),
2438 new_active_compaction: None,
2439 })
2440 .await;
2441 datadriven.routine.push(maintenance);
2442 Ok(format!(
2443 "{} {}\n",
2444 datadriven.machine.seqno(),
2445 merge_res.applied()
2446 ))
2447 }
2448
2449 pub async fn perform_maintenance(
2450 datadriven: &mut MachineState,
2451 _args: DirectiveArgs<'_>,
2452 ) -> Result<String, anyhow::Error> {
2453 let mut s = String::new();
2454 for maintenance in datadriven.routine.drain(..) {
2455 let () = maintenance
2456 .perform(&datadriven.machine, &datadriven.gc)
2457 .await;
2458 let () = datadriven
2459 .machine
2460 .applier
2461 .fetch_and_update_state(None)
2462 .await;
2463 write!(s, "{} ok\n", datadriven.machine.seqno());
2464 }
2465 Ok(s)
2466 }
2467}
2468
2469#[cfg(test)]
2470pub mod tests {
2471 use std::sync::Arc;
2472
2473 use mz_dyncfg::ConfigUpdates;
2474 use mz_ore::cast::CastFrom;
2475 use mz_ore::task::spawn;
2476 use mz_persist::intercept::{InterceptBlob, InterceptHandle};
2477 use mz_persist::location::SeqNo;
2478 use mz_persist_types::PersistLocation;
2479 use semver::Version;
2480 use timely::progress::Antichain;
2481
2482 use crate::batch::BatchBuilderConfig;
2483 use crate::cache::StateCache;
2484 use crate::internal::gc::{GarbageCollector, GcReq};
2485 use crate::internal::state::{HandleDebugState, ROLLUP_THRESHOLD};
2486 use crate::tests::{new_test_client, new_test_client_cache};
2487 use crate::{Diagnostics, PersistClient, ShardId};
2488
2489 #[mz_persist_proc::test(tokio::test(flavor = "multi_thread"))]
2490 #[cfg_attr(miri, ignore)] async fn apply_unbatched_cmd_truncate(dyncfgs: ConfigUpdates) {
2492 mz_ore::test::init_logging();
2493
2494 let client = new_test_client(&dyncfgs).await;
2495 client.cfg.set_config(&ROLLUP_THRESHOLD, 5);
2497 let (mut write, read) = client
2498 .expect_open::<String, (), u64, i64>(ShardId::new())
2499 .await;
2500
2501 read.expire().await;
2503
2504 const NUM_BATCHES: u64 = 100;
2507 for idx in 0..NUM_BATCHES {
2508 let mut batch = write
2509 .expect_batch(&[((idx.to_string(), ()), idx, 1)], idx, idx + 1)
2510 .await;
2511 let cfg = BatchBuilderConfig::new(&client.cfg, write.shard_id());
2514 batch
2515 .flush_to_blob(
2516 &cfg,
2517 &client.metrics.user,
2518 &client.isolated_runtime,
2519 &write.write_schemas,
2520 )
2521 .await;
2522 let (_, writer_maintenance) = write
2523 .machine
2524 .compare_and_append(
2525 &batch.into_hollow_batch(),
2526 &write.writer_id,
2527 &HandleDebugState::default(),
2528 (write.cfg.now)(),
2529 )
2530 .await
2531 .unwrap();
2532 writer_maintenance
2533 .perform(&write.machine, &write.gc, write.compact.as_ref())
2534 .await;
2535 }
2536 let live_diffs = write
2537 .machine
2538 .applier
2539 .state_versions
2540 .fetch_all_live_diffs(&write.machine.shard_id())
2541 .await;
2542 assert!(live_diffs.len() > 0);
2544 let max_live_diffs = 2 * usize::cast_from(NUM_BATCHES.next_power_of_two().trailing_zeros());
2548 assert!(
2549 live_diffs.len() <= max_live_diffs,
2550 "{} vs {}",
2551 live_diffs.len(),
2552 max_live_diffs
2553 );
2554 }
2555
2556 #[mz_persist_proc::test(tokio::test(flavor = "multi_thread"))]
2560 #[cfg_attr(miri, ignore)] async fn regression_gc_skipped_req_and_interrupted(dyncfgs: ConfigUpdates) {
2562 let mut client = new_test_client(&dyncfgs).await;
2563 let intercept = InterceptHandle::default();
2564 client.blob = Arc::new(InterceptBlob::new(
2565 Arc::clone(&client.blob),
2566 intercept.clone(),
2567 ));
2568 let (_, mut read) = client
2569 .expect_open::<String, String, u64, i64>(ShardId::new())
2570 .await;
2571
2572 read.downgrade_since(&Antichain::from_elem(1)).await;
2574 let new_seqno_since = read.machine.applier.seqno_since();
2575
2576 assert!(new_seqno_since > SeqNo::minimum());
2581 intercept.set_post_delete(Some(Arc::new(|_, _| panic!("boom"))));
2582 let machine = read.machine.clone();
2583 let gc = spawn(|| "", async move {
2585 let req = GcReq {
2586 shard_id: machine.shard_id(),
2587 new_seqno_since,
2588 };
2589 GarbageCollector::gc_and_truncate(&machine, req).await
2590 });
2591 let _ = gc.await;
2594
2595 intercept.set_post_delete(None);
2598 let req = GcReq {
2599 shard_id: read.machine.shard_id(),
2600 new_seqno_since,
2601 };
2602 let _ = GarbageCollector::gc_and_truncate(&read.machine, req.clone()).await;
2603 }
2604
2605 #[mz_persist_proc::test(tokio::test(flavor = "multi_thread"))]
2610 #[cfg_attr(miri, ignore)] async fn regression_update_state_after_upper_mismatch(dyncfgs: ConfigUpdates) {
2612 let client = new_test_client(&dyncfgs).await;
2613 let mut client2 = client.clone();
2614
2615 let new_state_cache = Arc::new(StateCache::new_no_metrics());
2618 client2.shared_states = new_state_cache;
2619
2620 let shard_id = ShardId::new();
2621 let (mut write1, _) = client.expect_open::<String, (), u64, i64>(shard_id).await;
2622 let (mut write2, _) = client2.expect_open::<String, (), u64, i64>(shard_id).await;
2623
2624 let data = [
2625 (("1".to_owned(), ()), 1, 1),
2626 (("2".to_owned(), ()), 2, 1),
2627 (("3".to_owned(), ()), 3, 1),
2628 (("4".to_owned(), ()), 4, 1),
2629 (("5".to_owned(), ()), 5, 1),
2630 ];
2631
2632 write1.expect_compare_and_append(&data[..1], 0, 2).await;
2633
2634 write2.expect_compare_and_append(&data[1..2], 2, 3).await;
2637 }
2638
2639 #[mz_persist_proc::test(tokio::test(flavor = "multi_thread"))]
2640 #[cfg_attr(miri, ignore)]
2641 async fn version_upgrade(dyncfgs: ConfigUpdates) {
2642 let mut cache = new_test_client_cache(&dyncfgs);
2643 cache.cfg.build_version = Version::new(26, 1, 0);
2644 let shard_id = ShardId::new();
2645
2646 async fn fetch_catalog_upgrade_shard_version(
2647 persist_client: &PersistClient,
2648 upgrade_shard_id: ShardId,
2649 ) -> Option<semver::Version> {
2650 let shard_state = persist_client
2651 .inspect_shard::<u64>(&upgrade_shard_id)
2652 .await
2653 .ok()?;
2654 let json_state = serde_json::to_value(shard_state).expect("state serialization error");
2655 let upgrade_version = json_state
2656 .get("applier_version")
2657 .cloned()
2658 .expect("missing applier_version");
2659 let upgrade_version =
2660 serde_json::from_value(upgrade_version).expect("version deserialization error");
2661 Some(upgrade_version)
2662 }
2663
2664 cache.cfg.build_version = Version::new(26, 1, 0);
2665 let client = cache.open(PersistLocation::new_in_mem()).await.unwrap();
2666 let (_, mut reader) = client.expect_open::<String, (), u64, i64>(shard_id).await;
2667 reader.downgrade_since(&Antichain::from_elem(1)).await;
2668 assert_eq!(
2669 fetch_catalog_upgrade_shard_version(&client, shard_id).await,
2670 Some(Version::new(26, 1, 0)),
2671 );
2672
2673 cache.cfg.build_version = Version::new(27, 1, 0);
2675 let client = cache.open(PersistLocation::new_in_mem()).await.unwrap();
2676 let (_, mut reader) = client.expect_open::<String, (), u64, i64>(shard_id).await;
2677 reader.downgrade_since(&Antichain::from_elem(2)).await;
2678 assert_eq!(
2679 fetch_catalog_upgrade_shard_version(&client, shard_id).await,
2680 Some(Version::new(26, 1, 0)),
2681 );
2682
2683 client
2685 .upgrade_version::<String, (), u64, i64>(shard_id, Diagnostics::for_tests())
2686 .await
2687 .unwrap();
2688 assert_eq!(
2689 fetch_catalog_upgrade_shard_version(&client, shard_id).await,
2690 Some(Version::new(27, 1, 0)),
2691 );
2692 }
2693}