1use std::fmt::Debug;
13use std::ops::ControlFlow::{self, Break, Continue};
14use std::sync::Arc;
15use std::time::{Duration, Instant, SystemTime};
16
17use differential_dataflow::difference::Monoid;
18use differential_dataflow::lattice::Lattice;
19use futures::FutureExt;
20use futures::future::{self, BoxFuture};
21use mz_dyncfg::{Config, ConfigSet};
22use mz_ore::cast::CastFrom;
23use mz_ore::error::ErrorExt;
24#[allow(unused_imports)] use mz_ore::fmt::FormatBuffer;
26use mz_ore::{assert_none, soft_assert_no_log};
27use mz_persist::location::{ExternalError, Indeterminate, SeqNo};
28use mz_persist::retry::Retry;
29use mz_persist_types::schema::SchemaId;
30use mz_persist_types::{Codec, Codec64, Opaque};
31use semver::Version;
32use timely::PartialOrder;
33use timely::progress::{Antichain, Timestamp};
34use tracing::{Instrument, debug, info, trace_span};
35
36use crate::async_runtime::IsolatedRuntime;
37use crate::batch::INLINE_WRITES_TOTAL_MAX_BYTES;
38use crate::cache::StateCache;
39use crate::cfg::RetryParameters;
40use crate::critical::CriticalReaderId;
41use crate::error::{CodecMismatch, InvalidUsage};
42use crate::internal::apply::Applier;
43use crate::internal::compact::CompactReq;
44use crate::internal::maintenance::{RoutineMaintenance, WriterMaintenance};
45use crate::internal::metrics::{CmdMetrics, Metrics, MetricsRetryStream, RetryMetrics};
46use crate::internal::paths::PartialRollupKey;
47use crate::internal::state::{
48 CompareAndAppendBreak, CriticalReaderState, HandleDebugState, HollowBatch, HollowRollup,
49 IdempotencyToken, LeasedReaderState, NoOpStateTransition, Since, SnapshotErr, StateCollections,
50 Upper,
51};
52use crate::internal::state_versions::StateVersions;
53use crate::internal::trace::{ApplyMergeResult, FueledMergeRes};
54use crate::internal::watch::StateWatch;
55use crate::read::LeasedReaderId;
56use crate::rpc::PubSubSender;
57use crate::schema::CaESchema;
58use crate::write::WriterId;
59use crate::{Diagnostics, PersistConfig, ShardId};
60
61#[derive(Debug)]
62pub struct Machine<K, V, T, D> {
63 pub(crate) applier: Applier<K, V, T, D>,
64 pub(crate) isolated_runtime: Arc<IsolatedRuntime>,
65}
66
67impl<K, V, T: Clone, D> Clone for Machine<K, V, T, D> {
69 fn clone(&self) -> Self {
70 Self {
71 applier: self.applier.clone(),
72 isolated_runtime: Arc::clone(&self.isolated_runtime),
73 }
74 }
75}
76
77pub(crate) const CLAIM_UNCLAIMED_COMPACTIONS: Config<bool> = Config::new(
78 "persist_claim_unclaimed_compactions",
79 false,
80 "If an append doesn't result in a compaction request, but there is some uncompacted batch \
81 in state, compact that instead.",
82);
83
84pub(crate) const CLAIM_COMPACTION_PERCENT: Config<usize> = Config::new(
85 "persist_claim_compaction_percent",
86 100,
87 "Claim a compaction with the given percent chance, if claiming compactions is enabled. \
88 (If over 100, we'll always claim at least one; for example, if set to 365, we'll claim at least \
89 three and have a 65% chance of claiming a fourth.)",
90);
91
92pub(crate) const CLAIM_COMPACTION_MIN_VERSION: Config<String> = Config::new(
93 "persist_claim_compaction_min_version",
94 String::new(),
95 "If set to a valid version string, compact away any earlier versions if possible.",
96);
97
98impl<K, V, T, D> Machine<K, V, T, D>
99where
100 K: Debug + Codec,
101 V: Debug + Codec,
102 T: Timestamp + Lattice + Codec64 + Sync,
103 D: Monoid + Codec64,
104{
105 pub async fn new(
106 cfg: PersistConfig,
107 shard_id: ShardId,
108 metrics: Arc<Metrics>,
109 state_versions: Arc<StateVersions>,
110 shared_states: Arc<StateCache>,
111 pubsub_sender: Arc<dyn PubSubSender>,
112 isolated_runtime: Arc<IsolatedRuntime>,
113 diagnostics: Diagnostics,
114 ) -> Result<Self, Box<CodecMismatch>> {
115 let applier = Applier::new(
116 cfg,
117 shard_id,
118 metrics,
119 state_versions,
120 shared_states,
121 pubsub_sender,
122 diagnostics,
123 )
124 .await?;
125 Ok(Machine {
126 applier,
127 isolated_runtime,
128 })
129 }
130
131 pub fn shard_id(&self) -> ShardId {
132 self.applier.shard_id
133 }
134
135 pub fn seqno(&self) -> SeqNo {
136 self.applier.seqno()
137 }
138
139 pub async fn add_rollup_for_current_seqno(&self) -> RoutineMaintenance {
140 let rollup = self.applier.write_rollup_for_state().await;
141 let Some(rollup) = rollup else {
142 return RoutineMaintenance::default();
143 };
144
145 let (applied, maintenance) = self.add_rollup((rollup.seqno, &rollup.to_hollow())).await;
146 if !applied {
147 self.applier
150 .state_versions
151 .delete_rollup(&rollup.shard_id, &rollup.key)
152 .await;
153 }
154 maintenance
155 }
156
157 pub async fn add_rollup(
158 &self,
159 add_rollup: (SeqNo, &HollowRollup),
160 ) -> (bool, RoutineMaintenance) {
161 let mut applied_ever_true = false;
164 let metrics = Arc::clone(&self.applier.metrics);
165 let (_seqno, _applied, maintenance) = self
166 .apply_unbatched_idempotent_cmd(&metrics.cmds.add_rollup, |_, _, state| {
167 let ret = state.add_rollup(add_rollup);
168 if let Continue(applied) = ret {
169 applied_ever_true = applied_ever_true || applied;
170 }
171 ret
172 })
173 .await;
174 (applied_ever_true, maintenance)
175 }
176
177 pub async fn remove_rollups(
178 &self,
179 remove_rollups: &[(SeqNo, PartialRollupKey)],
180 ) -> (Vec<SeqNo>, RoutineMaintenance) {
181 let metrics = Arc::clone(&self.applier.metrics);
182 let (_seqno, removed_rollup_seqnos, maintenance) = self
183 .apply_unbatched_idempotent_cmd(&metrics.cmds.remove_rollups, |_, _, state| {
184 state.remove_rollups(remove_rollups)
185 })
186 .await;
187 (removed_rollup_seqnos, maintenance)
188 }
189
190 pub async fn upgrade_version(&self) -> Result<RoutineMaintenance, Version> {
193 let metrics = Arc::clone(&self.applier.metrics);
194 let (_seqno, upgrade_result, maintenance) = self
195 .apply_unbatched_idempotent_cmd(&metrics.cmds.remove_rollups, |_, cfg, state| {
196 if state.version <= cfg.build_version {
197 state.version = cfg.build_version.clone();
200 Continue(Ok(()))
201 } else {
202 Break(NoOpStateTransition(Err(state.version.clone())))
203 }
204 })
205 .await;
206
207 match upgrade_result {
208 Ok(()) => Ok(maintenance),
209 Err(version) => {
210 soft_assert_no_log!(
211 maintenance.is_empty(),
212 "should not generate maintenance on failed upgrade"
213 );
214 Err(version)
215 }
216 }
217 }
218
219 pub async fn register_leased_reader(
220 &self,
221 reader_id: &LeasedReaderId,
222 purpose: &str,
223 lease_duration: Duration,
224 heartbeat_timestamp_ms: u64,
225 use_critical_since: bool,
226 ) -> (LeasedReaderState<T>, RoutineMaintenance) {
227 let metrics = Arc::clone(&self.applier.metrics);
228 let (_seqno, (reader_state, seqno_since), maintenance) = self
229 .apply_unbatched_idempotent_cmd(&metrics.cmds.register, |seqno, cfg, state| {
230 state.register_leased_reader(
231 &cfg.hostname,
232 reader_id,
233 purpose,
234 seqno,
235 lease_duration,
236 heartbeat_timestamp_ms,
237 use_critical_since,
238 )
239 })
240 .await;
241 debug_assert!(
250 reader_state.seqno >= seqno_since,
251 "{} vs {}",
252 reader_state.seqno,
253 seqno_since,
254 );
255 (reader_state, maintenance)
256 }
257
258 pub async fn register_critical_reader<O: Opaque + Codec64>(
259 &self,
260 reader_id: &CriticalReaderId,
261 purpose: &str,
262 ) -> (CriticalReaderState<T>, RoutineMaintenance) {
263 let metrics = Arc::clone(&self.applier.metrics);
264 let (_seqno, state, maintenance) = self
265 .apply_unbatched_idempotent_cmd(&metrics.cmds.register, |_seqno, cfg, state| {
266 state.register_critical_reader::<O>(&cfg.hostname, reader_id, purpose)
267 })
268 .await;
269 (state, maintenance)
270 }
271
272 pub async fn register_schema(
273 &self,
274 key_schema: &K::Schema,
275 val_schema: &V::Schema,
276 ) -> (Option<SchemaId>, RoutineMaintenance) {
277 let metrics = Arc::clone(&self.applier.metrics);
278 let (_seqno, state, maintenance) = self
279 .apply_unbatched_idempotent_cmd(&metrics.cmds.register, |_seqno, _cfg, state| {
280 state.register_schema::<K, V>(key_schema, val_schema)
281 })
282 .await;
283 (state, maintenance)
284 }
285
286 pub async fn spine_exert(&self, fuel: usize) -> (Vec<CompactReq<T>>, RoutineMaintenance) {
287 if fuel == 0 || self.applier.all_batches().len() < 2 {
289 return (Vec::new(), RoutineMaintenance::default());
290 }
291
292 let metrics = Arc::clone(&self.applier.metrics);
293 let (_seqno, reqs, maintenance) = self
294 .apply_unbatched_idempotent_cmd(&metrics.cmds.spine_exert, |_seqno, _cfg, state| {
295 state.spine_exert(fuel)
296 })
297 .await;
298 let reqs = reqs
299 .into_iter()
300 .map(|req| CompactReq {
301 shard_id: self.shard_id(),
302 desc: req.desc,
303 inputs: req.inputs,
304 })
305 .collect();
306 (reqs, maintenance)
307 }
308
309 pub async fn compare_and_append(
310 &self,
311 batch: &HollowBatch<T>,
312 writer_id: &WriterId,
313 debug_info: &HandleDebugState,
314 heartbeat_timestamp_ms: u64,
315 ) -> CompareAndAppendRes<T> {
316 let idempotency_token = IdempotencyToken::new();
317 loop {
318 let res = self
319 .compare_and_append_idempotent(
320 batch,
321 writer_id,
322 heartbeat_timestamp_ms,
323 &idempotency_token,
324 debug_info,
325 None,
326 )
327 .await;
328 match res {
329 CompareAndAppendRes::Success(seqno, maintenance) => {
330 return CompareAndAppendRes::Success(seqno, maintenance);
331 }
332 CompareAndAppendRes::InvalidUsage(x) => {
333 return CompareAndAppendRes::InvalidUsage(x);
334 }
335 CompareAndAppendRes::InlineBackpressure => {
336 return CompareAndAppendRes::InlineBackpressure;
337 }
338 CompareAndAppendRes::UpperMismatch(seqno, _current_upper) => {
339 self.applier.fetch_and_update_state(Some(seqno)).await;
346 let (current_seqno, current_upper) =
347 self.applier.upper(|seqno, upper| (seqno, upper.clone()));
348
349 if ¤t_upper != batch.desc.lower() {
352 return CompareAndAppendRes::UpperMismatch(current_seqno, current_upper);
353 } else {
354 }
357 }
358 }
359 }
360 }
361
362 async fn compare_and_append_idempotent(
363 &self,
364 batch: &HollowBatch<T>,
365 writer_id: &WriterId,
366 heartbeat_timestamp_ms: u64,
367 idempotency_token: &IdempotencyToken,
368 debug_info: &HandleDebugState,
369 mut indeterminate: Option<Indeterminate>,
373 ) -> CompareAndAppendRes<T> {
374 let metrics = Arc::clone(&self.applier.metrics);
375 let lease_duration_ms = self
376 .applier
377 .cfg
378 .writer_lease_duration
379 .as_millis()
380 .try_into()
381 .expect("reasonable duration");
382 let mut retry = self
465 .applier
466 .metrics
467 .retries
468 .compare_and_append_idempotent
469 .stream(Retry::persist_defaults(SystemTime::now()).into_retry_stream());
470 let mut writer_was_present = false;
471 loop {
472 let cmd_res = self
473 .applier
474 .apply_unbatched_cmd(&metrics.cmds.compare_and_append, |_, cfg, state| {
475 writer_was_present = state.writers.contains_key(writer_id);
476 state.compare_and_append(
477 batch,
478 writer_id,
479 heartbeat_timestamp_ms,
480 lease_duration_ms,
481 idempotency_token,
482 debug_info,
483 INLINE_WRITES_TOTAL_MAX_BYTES.get(cfg),
484 if CLAIM_UNCLAIMED_COMPACTIONS.get(cfg) {
485 CLAIM_COMPACTION_PERCENT.get(cfg)
486 } else {
487 0
488 },
489 Version::parse(&CLAIM_COMPACTION_MIN_VERSION.get(cfg))
490 .ok()
491 .as_ref(),
492 )
493 })
494 .await;
495 let (seqno, res, routine) = match cmd_res {
496 Ok(x) => x,
497 Err(err) => {
498 info!(
501 "compare_and_append received an indeterminate error, retrying in {:?}: {}",
502 retry.next_sleep(),
503 err
504 );
505 if indeterminate.is_none() {
506 indeterminate = Some(err);
507 }
508 retry = retry.sleep().await;
509 continue;
510 }
511 };
512 match res {
513 Ok(merge_reqs) => {
514 let mut compact_reqs = Vec::with_capacity(merge_reqs.len());
517 for req in merge_reqs {
518 let req = CompactReq {
519 shard_id: self.shard_id(),
520 desc: req.desc,
521 inputs: req.inputs,
522 };
523 compact_reqs.push(req);
524 }
525 let writer_maintenance = WriterMaintenance {
526 routine,
527 compaction: compact_reqs,
528 };
529
530 if !writer_was_present {
531 metrics.state.writer_added.inc();
532 }
533 for part in &batch.parts {
534 if part.is_inline() {
535 let bytes = u64::cast_from(part.inline_bytes());
536 metrics.inline.part_commit_bytes.inc_by(bytes);
537 metrics.inline.part_commit_count.inc();
538 }
539 }
540 return CompareAndAppendRes::Success(seqno, writer_maintenance);
541 }
542 Err(CompareAndAppendBreak::AlreadyCommitted) => {
543 assert!(indeterminate.is_some());
547 self.applier.metrics.cmds.compare_and_append_noop.inc();
548 if !writer_was_present {
549 metrics.state.writer_added.inc();
550 }
551 return CompareAndAppendRes::Success(seqno, WriterMaintenance::default());
552 }
553 Err(CompareAndAppendBreak::InvalidUsage(err)) => {
554 assert_none!(indeterminate);
559 return CompareAndAppendRes::InvalidUsage(err);
560 }
561 Err(CompareAndAppendBreak::InlineBackpressure) => {
562 return CompareAndAppendRes::InlineBackpressure;
565 }
566 Err(CompareAndAppendBreak::Upper {
567 shard_upper,
568 writer_upper,
569 }) => {
570 assert!(
575 PartialOrder::less_equal(&writer_upper, &shard_upper),
576 "{:?} vs {:?}",
577 &writer_upper,
578 &shard_upper
579 );
580 if PartialOrder::less_than(&writer_upper, batch.desc.upper()) {
581 return CompareAndAppendRes::UpperMismatch(seqno, shard_upper);
585 }
586 if indeterminate.is_none() {
587 return CompareAndAppendRes::UpperMismatch(seqno, shard_upper);
592 }
593 panic!(
603 concat!(
604 "cannot distinguish compare_and_append success or failure ",
605 "caa_lower={:?} caa_upper={:?} writer_upper={:?} shard_upper={:?} err={:?}"
606 ),
607 batch.desc.lower().elements(),
608 batch.desc.upper().elements(),
609 writer_upper.elements(),
610 shard_upper.elements(),
611 indeterminate,
612 );
613 }
614 };
615 }
616 }
617
618 pub async fn downgrade_since(
619 &self,
620 reader_id: &LeasedReaderId,
621 outstanding_seqno: SeqNo,
622 new_since: &Antichain<T>,
623 heartbeat_timestamp_ms: u64,
624 ) -> (SeqNo, Since<T>, RoutineMaintenance) {
625 let metrics = Arc::clone(&self.applier.metrics);
626 self.apply_unbatched_idempotent_cmd(&metrics.cmds.downgrade_since, |seqno, _cfg, state| {
627 state.downgrade_since(
628 reader_id,
629 seqno,
630 outstanding_seqno,
631 new_since,
632 heartbeat_timestamp_ms,
633 )
634 })
635 .await
636 }
637
638 pub async fn compare_and_downgrade_since<O: Opaque + Codec64>(
639 &self,
640 reader_id: &CriticalReaderId,
641 expected_opaque: &O,
642 (new_opaque, new_since): (&O, &Antichain<T>),
643 ) -> (Result<Since<T>, (O, Since<T>)>, RoutineMaintenance) {
644 let metrics = Arc::clone(&self.applier.metrics);
645 let (_seqno, res, maintenance) = self
646 .apply_unbatched_idempotent_cmd(
647 &metrics.cmds.compare_and_downgrade_since,
648 |_seqno, _cfg, state| {
649 state.compare_and_downgrade_since::<O>(
650 reader_id,
651 expected_opaque,
652 (new_opaque, new_since),
653 )
654 },
655 )
656 .await;
657
658 match res {
659 Ok(since) => (Ok(since), maintenance),
660 Err((opaque, since)) => (Err((opaque, since)), maintenance),
661 }
662 }
663
664 pub async fn expire_leased_reader(
665 &self,
666 reader_id: &LeasedReaderId,
667 ) -> (SeqNo, RoutineMaintenance) {
668 let metrics = Arc::clone(&self.applier.metrics);
669 let (seqno, _existed, maintenance) = self
670 .apply_unbatched_idempotent_cmd(&metrics.cmds.expire_reader, |_, _, state| {
671 state.expire_leased_reader(reader_id)
672 })
673 .await;
674 (seqno, maintenance)
675 }
676
677 #[allow(dead_code)] pub async fn expire_critical_reader(
679 &self,
680 reader_id: &CriticalReaderId,
681 ) -> (SeqNo, RoutineMaintenance) {
682 let metrics = Arc::clone(&self.applier.metrics);
683 let (seqno, _existed, maintenance) = self
684 .apply_unbatched_idempotent_cmd(&metrics.cmds.expire_reader, |_, _, state| {
685 state.expire_critical_reader(reader_id)
686 })
687 .await;
688 (seqno, maintenance)
689 }
690
691 pub async fn expire_writer(&self, writer_id: &WriterId) -> (SeqNo, RoutineMaintenance) {
692 let metrics = Arc::clone(&self.applier.metrics);
693 let (seqno, _existed, maintenance) = self
694 .apply_unbatched_idempotent_cmd(&metrics.cmds.expire_writer, |_, _, state| {
695 state.expire_writer(writer_id)
696 })
697 .await;
698 metrics.state.writer_removed.inc();
699 (seqno, maintenance)
700 }
701
702 pub fn is_finalized(&self) -> bool {
703 self.applier.is_finalized()
704 }
705
706 pub fn get_schema(&self, schema_id: SchemaId) -> Option<(K::Schema, V::Schema)> {
708 self.applier.get_schema(schema_id)
709 }
710
711 pub fn latest_schema(&self) -> Option<(SchemaId, K::Schema, V::Schema)> {
713 self.applier.latest_schema()
714 }
715
716 pub fn find_schema(&self, key_schema: &K::Schema, val_schema: &V::Schema) -> Option<SchemaId> {
718 self.applier.find_schema(key_schema, val_schema)
719 }
720
721 pub async fn compare_and_evolve_schema(
725 &self,
726 expected: SchemaId,
727 key_schema: &K::Schema,
728 val_schema: &V::Schema,
729 ) -> (CaESchema<K, V>, RoutineMaintenance) {
730 let metrics = Arc::clone(&self.applier.metrics);
731 let (_seqno, state, maintenance) = self
732 .apply_unbatched_idempotent_cmd(
733 &metrics.cmds.compare_and_evolve_schema,
734 |_seqno, _cfg, state| {
735 state.compare_and_evolve_schema::<K, V>(expected, key_schema, val_schema)
736 },
737 )
738 .await;
739 (state, maintenance)
740 }
741
742 async fn tombstone_step(&self) -> Result<(bool, RoutineMaintenance), InvalidUsage<T>> {
743 let metrics = Arc::clone(&self.applier.metrics);
744 let mut retry = self
745 .applier
746 .metrics
747 .retries
748 .idempotent_cmd
749 .stream(Retry::persist_defaults(SystemTime::now()).into_retry_stream());
750 loop {
751 let res = self
752 .applier
753 .apply_unbatched_cmd(&metrics.cmds.become_tombstone, |_, _, state| {
754 state.become_tombstone_and_shrink()
755 })
756 .await;
757 let err = match res {
758 Ok((_seqno, Ok(()), maintenance)) => return Ok((true, maintenance)),
759 Ok((_seqno, Err(NoOpStateTransition(())), maintenance)) => {
760 return Ok((false, maintenance));
761 }
762 Err(err) => err,
763 };
764 if retry.attempt() >= INFO_MIN_ATTEMPTS {
765 info!(
766 "become_tombstone received an indeterminate error, retrying in {:?}: {}",
767 retry.next_sleep(),
768 err
769 );
770 } else {
771 debug!(
772 "become_tombstone received an indeterminate error, retrying in {:?}: {}",
773 retry.next_sleep(),
774 err
775 );
776 }
777 retry = retry.sleep().await;
778 }
779 }
780
781 pub async fn become_tombstone(&self) -> Result<RoutineMaintenance, InvalidUsage<T>> {
782 self.applier.check_since_upper_both_empty()?;
783
784 let mut maintenance = RoutineMaintenance::default();
785
786 loop {
787 let (made_progress, more_maintenance) = self.tombstone_step().await?;
788 maintenance.merge(more_maintenance);
789 if !made_progress {
790 break;
791 }
792 }
793
794 Ok(maintenance)
795 }
796
797 pub async fn snapshot(&self, as_of: &Antichain<T>) -> Result<Vec<HollowBatch<T>>, Since<T>> {
798 let start = Instant::now();
799 let (mut seqno, mut upper) = match self.applier.snapshot(as_of) {
800 Ok(x) => return Ok(x),
801 Err(SnapshotErr::AsOfNotYetAvailable(seqno, Upper(upper))) => (seqno, upper),
802 Err(SnapshotErr::AsOfHistoricalDistinctionsLost(Since(since))) => {
803 return Err(Since(since));
804 }
805 };
806
807 let mut watch = self.applier.watch();
810 let watch = &mut watch;
811 let sleeps = self
812 .applier
813 .metrics
814 .retries
815 .snapshot
816 .stream(Retry::persist_defaults(SystemTime::now()).into_retry_stream());
817
818 enum Wake<'a, K, V, T, D> {
819 Watch(&'a mut StateWatch<K, V, T, D>),
820 Sleep(MetricsRetryStream),
821 }
822 let mut watch_fut = std::pin::pin!(
823 watch
824 .wait_for_seqno_ge(seqno.next())
825 .map(Wake::Watch)
826 .instrument(trace_span!("snapshot::watch")),
827 );
828 let mut sleep_fut = std::pin::pin!(
829 sleeps
830 .sleep()
831 .map(Wake::Sleep)
832 .instrument(trace_span!("snapshot::sleep")),
833 );
834
835 let mut logged_at_info = false;
839 loop {
840 if !logged_at_info && start.elapsed() >= Duration::from_millis(1024) {
844 logged_at_info = true;
845 info!(
846 "snapshot {} {} as of {:?} not yet available for {} upper {:?}",
847 self.applier.shard_metrics.name,
848 self.shard_id(),
849 as_of.elements(),
850 seqno,
851 upper.elements(),
852 );
853 } else {
854 debug!(
855 "snapshot {} {} as of {:?} not yet available for {} upper {:?}",
856 self.applier.shard_metrics.name,
857 self.shard_id(),
858 as_of.elements(),
859 seqno,
860 upper.elements(),
861 );
862 }
863
864 let wake = match future::select(watch_fut.as_mut(), sleep_fut.as_mut()).await {
865 future::Either::Left((wake, _)) => wake,
866 future::Either::Right((wake, _)) => wake,
867 };
868 match &wake {
872 Wake::Watch(_) => self.applier.metrics.watch.snapshot_woken_via_watch.inc(),
873 Wake::Sleep(_) => {
874 self.applier.metrics.watch.snapshot_woken_via_sleep.inc();
875 self.applier.fetch_and_update_state(Some(seqno)).await;
876 }
877 }
878
879 (seqno, upper) = match self.applier.snapshot(as_of) {
880 Ok(x) => {
881 if logged_at_info {
882 info!(
883 "snapshot {} {} as of {:?} now available",
884 self.applier.shard_metrics.name,
885 self.shard_id(),
886 as_of.elements(),
887 );
888 }
889 return Ok(x);
890 }
891 Err(SnapshotErr::AsOfNotYetAvailable(seqno, Upper(upper))) => {
892 (seqno, upper)
894 }
895 Err(SnapshotErr::AsOfHistoricalDistinctionsLost(Since(since))) => {
896 return Err(Since(since));
897 }
898 };
899
900 match wake {
901 Wake::Watch(watch) => {
902 watch_fut.set(
903 watch
904 .wait_for_seqno_ge(seqno.next())
905 .map(Wake::Watch)
906 .instrument(trace_span!("snapshot::watch")),
907 );
908 }
909 Wake::Sleep(sleeps) => {
910 debug!(
911 "snapshot {} {} sleeping for {:?}",
912 self.applier.shard_metrics.name,
913 self.shard_id(),
914 sleeps.next_sleep()
915 );
916 sleep_fut.set(
917 sleeps
918 .sleep()
919 .map(Wake::Sleep)
920 .instrument(trace_span!("snapshot::sleep")),
921 );
922 }
923 }
924 }
925 }
926
927 pub fn verify_listen(&self, as_of: &Antichain<T>) -> Result<(), Since<T>> {
929 self.applier.verify_listen(as_of)
930 }
931
932 pub async fn next_listen_batch(
933 &self,
934 frontier: &Antichain<T>,
935 watch: &mut StateWatch<K, V, T, D>,
936 reader_id: Option<&LeasedReaderId>,
937 retry: Option<RetryParameters>,
939 ) -> HollowBatch<T> {
940 let mut seqno = match self.applier.next_listen_batch(frontier) {
941 Ok(b) => return b,
942 Err(seqno) => seqno,
943 };
944
945 let retry = retry.unwrap_or_else(|| next_listen_batch_retry_params(&self.applier.cfg));
948 let sleeps = self
949 .applier
950 .metrics
951 .retries
952 .next_listen_batch
953 .stream(retry.into_retry(SystemTime::now()).into_retry_stream());
954
955 enum Wake<'a, K, V, T, D> {
956 Watch(&'a mut StateWatch<K, V, T, D>),
957 Sleep(MetricsRetryStream),
958 }
959 let mut watch_fut = std::pin::pin!(
960 watch
961 .wait_for_seqno_ge(seqno.next())
962 .map(Wake::Watch)
963 .instrument(trace_span!("snapshot::watch"))
964 );
965 let mut sleep_fut = std::pin::pin!(
966 sleeps
967 .sleep()
968 .map(Wake::Sleep)
969 .instrument(trace_span!("snapshot::sleep"))
970 );
971
972 loop {
973 let wake = match future::select(watch_fut.as_mut(), sleep_fut.as_mut()).await {
974 future::Either::Left((wake, _)) => wake,
975 future::Either::Right((wake, _)) => wake,
976 };
977 match &wake {
981 Wake::Watch(_) => self.applier.metrics.watch.listen_woken_via_watch.inc(),
982 Wake::Sleep(_) => {
983 self.applier.metrics.watch.listen_woken_via_sleep.inc();
984 self.applier.fetch_and_update_state(Some(seqno)).await;
985 }
986 }
987
988 seqno = match self.applier.next_listen_batch(frontier) {
989 Ok(b) => {
990 match &wake {
991 Wake::Watch(_) => {
992 self.applier.metrics.watch.listen_resolved_via_watch.inc()
993 }
994 Wake::Sleep(_) => {
995 self.applier.metrics.watch.listen_resolved_via_sleep.inc()
996 }
997 }
998 return b;
999 }
1000 Err(seqno) => seqno,
1001 };
1002
1003 match wake {
1006 Wake::Watch(watch) => {
1007 watch_fut.set(
1008 watch
1009 .wait_for_seqno_ge(seqno.next())
1010 .map(Wake::Watch)
1011 .instrument(trace_span!("snapshot::watch")),
1012 );
1013 }
1014 Wake::Sleep(sleeps) => {
1015 debug!(
1016 "{:?}: {} {} next_listen_batch didn't find new data, retrying in {:?}",
1017 reader_id,
1018 self.applier.shard_metrics.name,
1019 self.shard_id(),
1020 sleeps.next_sleep()
1021 );
1022 sleep_fut.set(
1023 sleeps
1024 .sleep()
1025 .map(Wake::Sleep)
1026 .instrument(trace_span!("snapshot::sleep")),
1027 );
1028 }
1029 }
1030 }
1031 }
1032
1033 async fn apply_unbatched_idempotent_cmd<
1034 R,
1035 WorkFn: FnMut(
1036 SeqNo,
1037 &PersistConfig,
1038 &mut StateCollections<T>,
1039 ) -> ControlFlow<NoOpStateTransition<R>, R>,
1040 >(
1041 &self,
1042 cmd: &CmdMetrics,
1043 mut work_fn: WorkFn,
1044 ) -> (SeqNo, R, RoutineMaintenance) {
1045 let mut retry = self
1046 .applier
1047 .metrics
1048 .retries
1049 .idempotent_cmd
1050 .stream(Retry::persist_defaults(SystemTime::now()).into_retry_stream());
1051 loop {
1052 match self.applier.apply_unbatched_cmd(cmd, &mut work_fn).await {
1053 Ok((seqno, x, maintenance)) => match x {
1054 Ok(x) => {
1055 return (seqno, x, maintenance);
1056 }
1057 Err(NoOpStateTransition(x)) => {
1058 return (seqno, x, maintenance);
1059 }
1060 },
1061 Err(err) => {
1062 if retry.attempt() >= INFO_MIN_ATTEMPTS {
1063 info!(
1064 "apply_unbatched_idempotent_cmd {} received an indeterminate error, retrying in {:?}: {}",
1065 cmd.name,
1066 retry.next_sleep(),
1067 err
1068 );
1069 } else {
1070 debug!(
1071 "apply_unbatched_idempotent_cmd {} received an indeterminate error, retrying in {:?}: {}",
1072 cmd.name,
1073 retry.next_sleep(),
1074 err
1075 );
1076 }
1077 retry = retry.sleep().await;
1078 continue;
1079 }
1080 }
1081 }
1082 }
1083}
1084
1085impl<K, V, T, D> Machine<K, V, T, D>
1086where
1087 K: Debug + Codec,
1088 V: Debug + Codec,
1089 T: Timestamp + Lattice + Codec64 + Sync,
1090 D: Monoid + Codec64 + PartialEq,
1091{
1092 pub async fn merge_res(
1093 &self,
1094 res: &FueledMergeRes<T>,
1095 ) -> (ApplyMergeResult, RoutineMaintenance) {
1096 let metrics = Arc::clone(&self.applier.metrics);
1097
1098 let mut merge_result_ever_applied = ApplyMergeResult::NotAppliedNoMatch;
1127 let (_seqno, _apply_merge_result, maintenance) = self
1128 .apply_unbatched_idempotent_cmd(&metrics.cmds.merge_res, |_, _, state| {
1129 let ret = state.apply_merge_res::<D>(res, &Arc::clone(&metrics).columnar);
1130 if let Continue(result) = ret {
1131 if result.applied() {
1133 merge_result_ever_applied = result;
1134 }
1135 if result.matched() && !result.applied() && !merge_result_ever_applied.applied()
1138 {
1139 merge_result_ever_applied = result;
1140 }
1141 }
1142 ret
1143 })
1144 .await;
1145 (merge_result_ever_applied, maintenance)
1146 }
1147}
1148
1149pub(crate) struct ExpireFn(
1150 pub(crate) Box<dyn FnOnce() -> BoxFuture<'static, ()> + Send + Sync + 'static>,
1155);
1156
1157impl Debug for ExpireFn {
1158 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1159 f.debug_struct("ExpireFn").finish_non_exhaustive()
1160 }
1161}
1162
1163#[derive(Debug)]
1164pub(crate) enum CompareAndAppendRes<T> {
1165 Success(SeqNo, WriterMaintenance<T>),
1166 InvalidUsage(InvalidUsage<T>),
1167 UpperMismatch(SeqNo, Antichain<T>),
1168 InlineBackpressure,
1169}
1170
1171#[cfg(test)]
1172impl<T: Debug> CompareAndAppendRes<T> {
1173 #[track_caller]
1174 fn unwrap(self) -> (SeqNo, WriterMaintenance<T>) {
1175 match self {
1176 CompareAndAppendRes::Success(seqno, maintenance) => (seqno, maintenance),
1177 x => panic!("{:?}", x),
1178 }
1179 }
1180}
1181
1182pub(crate) const NEXT_LISTEN_BATCH_RETRYER_FIXED_SLEEP: Config<Duration> = Config::new(
1183 "persist_next_listen_batch_retryer_fixed_sleep",
1184 Duration::from_millis(1200), "\
1186 The fixed sleep when polling for new batches from a Listen or Subscribe. Skipped if zero.",
1187);
1188
1189pub(crate) const NEXT_LISTEN_BATCH_RETRYER_INITIAL_BACKOFF: Config<Duration> = Config::new(
1190 "persist_next_listen_batch_retryer_initial_backoff",
1191 Duration::from_millis(100), "The initial backoff when polling for new batches from a Listen or Subscribe.",
1193);
1194
1195pub(crate) const NEXT_LISTEN_BATCH_RETRYER_MULTIPLIER: Config<u32> = Config::new(
1196 "persist_next_listen_batch_retryer_multiplier",
1197 2,
1198 "The backoff multiplier when polling for new batches from a Listen or Subscribe.",
1199);
1200
1201pub(crate) const NEXT_LISTEN_BATCH_RETRYER_CLAMP: Config<Duration> = Config::new(
1202 "persist_next_listen_batch_retryer_clamp",
1203 Duration::from_secs(16), "The backoff clamp duration when polling for new batches from a Listen or Subscribe.",
1205);
1206
1207fn next_listen_batch_retry_params(cfg: &ConfigSet) -> RetryParameters {
1208 RetryParameters {
1209 fixed_sleep: NEXT_LISTEN_BATCH_RETRYER_FIXED_SLEEP.get(cfg),
1210 initial_backoff: NEXT_LISTEN_BATCH_RETRYER_INITIAL_BACKOFF.get(cfg),
1211 multiplier: NEXT_LISTEN_BATCH_RETRYER_MULTIPLIER.get(cfg),
1212 clamp: NEXT_LISTEN_BATCH_RETRYER_CLAMP.get(cfg),
1213 }
1214}
1215
1216pub const INFO_MIN_ATTEMPTS: usize = 3;
1217
1218pub async fn retry_external<R, F, WorkFn>(metrics: &RetryMetrics, mut work_fn: WorkFn) -> R
1219where
1220 F: std::future::Future<Output = Result<R, ExternalError>>,
1221 WorkFn: FnMut() -> F,
1222{
1223 let mut retry = metrics.stream(Retry::persist_defaults(SystemTime::now()).into_retry_stream());
1224 loop {
1225 match work_fn().await {
1226 Ok(x) => {
1227 if retry.attempt() > 0 {
1228 debug!(
1229 "external operation {} succeeded after failing at least once",
1230 metrics.name,
1231 );
1232 }
1233 return x;
1234 }
1235 Err(err) => {
1236 if retry.attempt() >= INFO_MIN_ATTEMPTS {
1237 info!(
1238 "external operation {} failed, retrying in {:?}: {}",
1239 metrics.name,
1240 retry.next_sleep(),
1241 err.display_with_causes()
1242 );
1243 } else {
1244 debug!(
1245 "external operation {} failed, retrying in {:?}: {}",
1246 metrics.name,
1247 retry.next_sleep(),
1248 err.display_with_causes()
1249 );
1250 }
1251 retry = retry.sleep().await;
1252 }
1253 }
1254 }
1255}
1256
1257pub async fn retry_determinate<R, F, WorkFn>(
1258 metrics: &RetryMetrics,
1259 mut work_fn: WorkFn,
1260) -> Result<R, Indeterminate>
1261where
1262 F: std::future::Future<Output = Result<R, ExternalError>>,
1263 WorkFn: FnMut() -> F,
1264{
1265 let mut retry = metrics.stream(Retry::persist_defaults(SystemTime::now()).into_retry_stream());
1266 loop {
1267 match work_fn().await {
1268 Ok(x) => {
1269 if retry.attempt() > 0 {
1270 debug!(
1271 "external operation {} succeeded after failing at least once",
1272 metrics.name,
1273 );
1274 }
1275 return Ok(x);
1276 }
1277 Err(ExternalError::Determinate(err)) => {
1278 debug!(
1287 "external operation {} failed, retrying in {:?}: {}",
1288 metrics.name,
1289 retry.next_sleep(),
1290 err.display_with_causes()
1291 );
1292 retry = retry.sleep().await;
1293 continue;
1294 }
1295 Err(ExternalError::Indeterminate(x)) => return Err(x),
1296 }
1297 }
1298}
1299
1300#[cfg(test)]
1301pub mod datadriven {
1302 use std::collections::{BTreeMap, BTreeSet};
1303 use std::pin::pin;
1304 use std::sync::{Arc, LazyLock};
1305
1306 use anyhow::anyhow;
1307 use differential_dataflow::consolidation::consolidate_updates;
1308 use differential_dataflow::trace::Description;
1309 use futures::StreamExt;
1310 use mz_dyncfg::{ConfigUpdates, ConfigVal};
1311 use mz_persist::indexed::encoding::BlobTraceBatchPart;
1312 use mz_persist_types::codec_impls::{StringSchema, UnitSchema};
1313
1314 use crate::batch::{
1315 BLOB_TARGET_SIZE, Batch, BatchBuilder, BatchBuilderConfig, BatchBuilderInternal,
1316 BatchParts, validate_truncate_batch,
1317 };
1318 use crate::cfg::COMPACTION_MEMORY_BOUND_BYTES;
1319 use crate::fetch::{EncodedPart, FetchConfig};
1320 use crate::internal::compact::{CompactConfig, CompactReq, Compactor};
1321 use crate::internal::datadriven::DirectiveArgs;
1322 use crate::internal::encoding::Schemas;
1323 use crate::internal::gc::GcReq;
1324 use crate::internal::paths::{BlobKey, BlobKeyPrefix, PartialBlobKey};
1325 use crate::internal::state::{BatchPart, RunOrder, RunPart};
1326 use crate::internal::state_versions::EncodedRollup;
1327 use crate::internal::trace::{CompactionInput, IdHollowBatch, SpineId};
1328 use crate::read::{Listen, ListenEvent, READER_LEASE_DURATION};
1329 use crate::rpc::NoopPubSubSender;
1330 use crate::tests::new_test_client;
1331 use crate::write::COMBINE_INLINE_WRITES;
1332 use crate::{GarbageCollector, PersistClient};
1333
1334 use super::*;
1335
1336 static SCHEMAS: LazyLock<Schemas<String, ()>> = LazyLock::new(|| Schemas {
1337 id: Some(SchemaId(0)),
1338 key: Arc::new(StringSchema),
1339 val: Arc::new(UnitSchema),
1340 });
1341
1342 #[derive(Debug)]
1344 pub struct MachineState {
1345 pub client: PersistClient,
1346 pub shard_id: ShardId,
1347 pub state_versions: Arc<StateVersions>,
1348 pub machine: Machine<String, (), u64, i64>,
1349 pub gc: GarbageCollector<String, (), u64, i64>,
1350 pub batches: BTreeMap<String, IdHollowBatch<u64>>,
1351 pub next_id: usize,
1352 pub rollups: BTreeMap<String, EncodedRollup>,
1353 pub listens: BTreeMap<String, Listen<String, (), u64, i64>>,
1354 pub routine: Vec<RoutineMaintenance>,
1355 pub compactions: BTreeMap<String, CompactReq<u64>>,
1356 }
1357
1358 impl MachineState {
1359 pub async fn new(dyncfgs: &ConfigUpdates) -> Self {
1360 let shard_id = ShardId::new();
1361 let client = new_test_client(dyncfgs).await;
1362 client
1365 .cfg
1366 .set_config(&BLOB_TARGET_SIZE, *BLOB_TARGET_SIZE.default());
1367 client.cfg.set_config(&COMBINE_INLINE_WRITES, false);
1371 let state_versions = Arc::new(StateVersions::new(
1372 client.cfg.clone(),
1373 Arc::clone(&client.consensus),
1374 Arc::clone(&client.blob),
1375 Arc::clone(&client.metrics),
1376 ));
1377 let machine = Machine::new(
1378 client.cfg.clone(),
1379 shard_id,
1380 Arc::clone(&client.metrics),
1381 Arc::clone(&state_versions),
1382 Arc::clone(&client.shared_states),
1383 Arc::new(NoopPubSubSender),
1384 Arc::clone(&client.isolated_runtime),
1385 Diagnostics::for_tests(),
1386 )
1387 .await
1388 .expect("codecs should match");
1389 let gc = GarbageCollector::new(machine.clone(), Arc::clone(&client.isolated_runtime));
1390 MachineState {
1391 shard_id,
1392 client,
1393 state_versions,
1394 machine,
1395 gc,
1396 batches: BTreeMap::default(),
1397 rollups: BTreeMap::default(),
1398 listens: BTreeMap::default(),
1399 routine: Vec::new(),
1400 compactions: BTreeMap::default(),
1401 next_id: 0,
1402 }
1403 }
1404
1405 fn to_batch(&self, hollow: HollowBatch<u64>) -> Batch<String, (), u64, i64> {
1406 Batch::new(
1407 true,
1408 Arc::clone(&self.client.metrics),
1409 Arc::clone(&self.client.blob),
1410 self.client.metrics.shards.shard(&self.shard_id, "test"),
1411 self.client.cfg.build_version.clone(),
1412 (
1413 <String>::encode_schema(&*SCHEMAS.key),
1414 <()>::encode_schema(&*SCHEMAS.val),
1415 ),
1416 hollow,
1417 )
1418 }
1419 }
1420
1421 pub async fn consensus_scan(
1424 datadriven: &MachineState,
1425 args: DirectiveArgs<'_>,
1426 ) -> Result<String, anyhow::Error> {
1427 let from = args.expect("from_seqno");
1428
1429 let mut states = datadriven
1430 .state_versions
1431 .fetch_all_live_states::<u64>(datadriven.shard_id)
1432 .await
1433 .expect("should only be called on an initialized shard")
1434 .check_ts_codec()
1435 .expect("shard codecs should not change");
1436 let mut s = String::new();
1437 while let Some(x) = states.next(|_| {}) {
1438 if x.seqno < from {
1439 continue;
1440 }
1441 let rollups: Vec<_> = x
1442 .collections
1443 .rollups
1444 .keys()
1445 .map(|seqno| seqno.to_string())
1446 .collect();
1447 let batches: Vec<_> = x
1448 .collections
1449 .trace
1450 .batches()
1451 .filter(|b| !b.is_empty())
1452 .filter_map(|b| {
1453 datadriven
1454 .batches
1455 .iter()
1456 .find(|(_, original_batch)| original_batch.batch.parts == b.parts)
1457 .map(|(batch_name, _)| batch_name.to_owned())
1458 })
1459 .collect();
1460 write!(
1461 s,
1462 "seqno={} batches={} rollups={}\n",
1463 x.seqno,
1464 batches.join(","),
1465 rollups.join(","),
1466 );
1467 }
1468 Ok(s)
1469 }
1470
1471 pub async fn consensus_truncate(
1472 datadriven: &MachineState,
1473 args: DirectiveArgs<'_>,
1474 ) -> Result<String, anyhow::Error> {
1475 let to = args.expect("to_seqno");
1476 let removed = datadriven
1477 .client
1478 .consensus
1479 .truncate(&datadriven.shard_id.to_string(), to)
1480 .await
1481 .expect("valid truncation");
1482 Ok(format!("{:?}\n", removed))
1483 }
1484
1485 pub async fn blob_scan_batches(
1486 datadriven: &MachineState,
1487 _args: DirectiveArgs<'_>,
1488 ) -> Result<String, anyhow::Error> {
1489 let key_prefix = BlobKeyPrefix::Shard(&datadriven.shard_id).to_string();
1490
1491 let mut s = String::new();
1492 let () = datadriven
1493 .state_versions
1494 .blob
1495 .list_keys_and_metadata(&key_prefix, &mut |x| {
1496 let (_, key) = BlobKey::parse_ids(x.key).expect("key should be valid");
1497 if let PartialBlobKey::Batch(_, _) = key {
1498 write!(s, "{}: {}b\n", x.key, x.size_in_bytes);
1499 }
1500 })
1501 .await?;
1502 Ok(s)
1503 }
1504
1505 #[allow(clippy::unused_async)]
1506 pub async fn shard_desc(
1507 datadriven: &MachineState,
1508 _args: DirectiveArgs<'_>,
1509 ) -> Result<String, anyhow::Error> {
1510 Ok(format!(
1511 "since={:?} upper={:?}\n",
1512 datadriven.machine.applier.since().elements(),
1513 datadriven.machine.applier.clone_upper().elements()
1514 ))
1515 }
1516
1517 pub async fn downgrade_since(
1518 datadriven: &mut MachineState,
1519 args: DirectiveArgs<'_>,
1520 ) -> Result<String, anyhow::Error> {
1521 let since = args.expect_antichain("since");
1522 let seqno = args
1523 .optional("seqno")
1524 .unwrap_or_else(|| datadriven.machine.seqno());
1525 let reader_id = args.expect("reader_id");
1526 let (_, since, routine) = datadriven
1527 .machine
1528 .downgrade_since(
1529 &reader_id,
1530 seqno,
1531 &since,
1532 (datadriven.machine.applier.cfg.now)(),
1533 )
1534 .await;
1535 datadriven.routine.push(routine);
1536 Ok(format!(
1537 "{} {:?}\n",
1538 datadriven.machine.seqno(),
1539 since.0.elements()
1540 ))
1541 }
1542
1543 #[allow(clippy::unused_async)]
1544 pub async fn dyncfg(
1545 datadriven: &MachineState,
1546 args: DirectiveArgs<'_>,
1547 ) -> Result<String, anyhow::Error> {
1548 let mut updates = ConfigUpdates::default();
1549 for x in args.input.trim().split('\n') {
1550 match x.split(' ').collect::<Vec<_>>().as_slice() {
1551 &[name, val] => {
1552 let config = datadriven
1553 .client
1554 .cfg
1555 .entries()
1556 .find(|x| x.name() == name)
1557 .ok_or_else(|| anyhow!("unknown dyncfg: {}", name))?;
1558 match config.val() {
1559 ConfigVal::Usize(_) => {
1560 let val = val.parse().map_err(anyhow::Error::new)?;
1561 updates.add_dynamic(name, ConfigVal::Usize(val));
1562 }
1563 ConfigVal::Bool(_) => {
1564 let val = val.parse().map_err(anyhow::Error::new)?;
1565 updates.add_dynamic(name, ConfigVal::Bool(val));
1566 }
1567 x => unimplemented!("dyncfg type: {:?}", x),
1568 }
1569 }
1570 x => return Err(anyhow!("expected `name val` got: {:?}", x)),
1571 }
1572 }
1573 updates.apply(&datadriven.client.cfg);
1574
1575 Ok("ok\n".to_string())
1576 }
1577
1578 pub async fn compare_and_downgrade_since(
1579 datadriven: &mut MachineState,
1580 args: DirectiveArgs<'_>,
1581 ) -> Result<String, anyhow::Error> {
1582 let expected_opaque: u64 = args.expect("expect_opaque");
1583 let new_opaque: u64 = args.expect("opaque");
1584 let new_since = args.expect_antichain("since");
1585 let reader_id = args.expect("reader_id");
1586 let (res, routine) = datadriven
1587 .machine
1588 .compare_and_downgrade_since(&reader_id, &expected_opaque, (&new_opaque, &new_since))
1589 .await;
1590 datadriven.routine.push(routine);
1591 let since = res.map_err(|(opaque, since)| {
1592 anyhow!("mismatch: opaque={} since={:?}", opaque, since.0.elements())
1593 })?;
1594 Ok(format!(
1595 "{} {} {:?}\n",
1596 datadriven.machine.seqno(),
1597 new_opaque,
1598 since.0.elements()
1599 ))
1600 }
1601
1602 pub async fn write_rollup(
1603 datadriven: &mut MachineState,
1604 args: DirectiveArgs<'_>,
1605 ) -> Result<String, anyhow::Error> {
1606 let output = args.expect_str("output");
1607
1608 let rollup = datadriven
1609 .machine
1610 .applier
1611 .write_rollup_for_state()
1612 .await
1613 .expect("rollup");
1614
1615 datadriven
1616 .rollups
1617 .insert(output.to_string(), rollup.clone());
1618
1619 Ok(format!(
1620 "state={} diffs=[{}, {})\n",
1621 rollup.seqno,
1622 rollup._desc.lower().first().expect("seqno"),
1623 rollup._desc.upper().first().expect("seqno"),
1624 ))
1625 }
1626
1627 pub async fn add_rollup(
1628 datadriven: &mut MachineState,
1629 args: DirectiveArgs<'_>,
1630 ) -> Result<String, anyhow::Error> {
1631 let input = args.expect_str("input");
1632 let rollup = datadriven
1633 .rollups
1634 .get(input)
1635 .expect("unknown batch")
1636 .clone();
1637
1638 let (applied, maintenance) = datadriven
1639 .machine
1640 .add_rollup((rollup.seqno, &rollup.to_hollow()))
1641 .await;
1642
1643 if !applied {
1644 return Err(anyhow!("failed to apply rollup for: {}", rollup.seqno));
1645 }
1646
1647 datadriven.routine.push(maintenance);
1648 Ok(format!("{}\n", datadriven.machine.seqno()))
1649 }
1650
1651 pub async fn write_batch(
1652 datadriven: &mut MachineState,
1653 args: DirectiveArgs<'_>,
1654 ) -> Result<String, anyhow::Error> {
1655 let output = args.expect_str("output");
1656 let lower = args.expect_antichain("lower");
1657 let upper = args.expect_antichain("upper");
1658 assert!(PartialOrder::less_than(&lower, &upper));
1659 let since = args
1660 .optional_antichain("since")
1661 .unwrap_or_else(|| Antichain::from_elem(0));
1662 let target_size = args.optional("target_size");
1663 let parts_size_override = args.optional("parts_size_override");
1664 let consolidate = args.optional("consolidate").unwrap_or(true);
1665 let mut updates: Vec<_> = args
1666 .input
1667 .split('\n')
1668 .flat_map(DirectiveArgs::parse_update)
1669 .collect();
1670
1671 let mut cfg = BatchBuilderConfig::new(&datadriven.client.cfg, datadriven.shard_id);
1672 if let Some(target_size) = target_size {
1673 cfg.blob_target_size = target_size;
1674 };
1675 if consolidate {
1676 consolidate_updates(&mut updates);
1677 }
1678 let run_order = if consolidate {
1679 cfg.preferred_order
1680 } else {
1681 RunOrder::Unordered
1682 };
1683 let parts = BatchParts::new_ordered::<i64>(
1684 cfg.clone(),
1685 run_order,
1686 Arc::clone(&datadriven.client.metrics),
1687 Arc::clone(&datadriven.machine.applier.shard_metrics),
1688 datadriven.shard_id,
1689 Arc::clone(&datadriven.client.blob),
1690 Arc::clone(&datadriven.client.isolated_runtime),
1691 &datadriven.client.metrics.user,
1692 );
1693 let builder = BatchBuilderInternal::new(
1694 cfg.clone(),
1695 parts,
1696 Arc::clone(&datadriven.client.metrics),
1697 SCHEMAS.clone(),
1698 Arc::clone(&datadriven.client.blob),
1699 datadriven.shard_id.clone(),
1700 datadriven.client.cfg.build_version.clone(),
1701 );
1702 let mut builder = BatchBuilder::new(builder, Description::new(lower, upper.clone(), since));
1703 for ((k, ()), t, d) in updates {
1704 builder.add(&k, &(), &t, &d).await.expect("invalid batch");
1705 }
1706 let mut batch = builder.finish(upper).await?;
1707 if parts_size_override.is_some() {
1710 batch
1711 .flush_to_blob(
1712 &cfg,
1713 &datadriven.client.metrics.user,
1714 &datadriven.client.isolated_runtime,
1715 &SCHEMAS,
1716 )
1717 .await;
1718 }
1719 let batch = batch.into_hollow_batch();
1720 let batch = IdHollowBatch {
1721 batch: Arc::new(batch),
1722 id: SpineId(datadriven.next_id, datadriven.next_id + 1),
1723 };
1724 datadriven.next_id += 1;
1725
1726 if let Some(size) = parts_size_override {
1727 let mut batch = batch.clone();
1728 let mut hollow_batch = (*batch.batch).clone();
1729 for part in hollow_batch.parts.iter_mut() {
1730 match part {
1731 RunPart::Many(run) => run.max_part_bytes = size,
1732 RunPart::Single(BatchPart::Hollow(part)) => part.encoded_size_bytes = size,
1733 RunPart::Single(BatchPart::Inline { .. }) => unreachable!("flushed out above"),
1734 }
1735 }
1736 batch.batch = Arc::new(hollow_batch);
1737 datadriven.batches.insert(output.to_owned(), batch);
1738 } else {
1739 datadriven.batches.insert(output.to_owned(), batch.clone());
1740 }
1741 Ok(format!(
1742 "parts={} len={}\n",
1743 batch.batch.part_count(),
1744 batch.batch.len
1745 ))
1746 }
1747
1748 pub async fn fetch_batch(
1749 datadriven: &MachineState,
1750 args: DirectiveArgs<'_>,
1751 ) -> Result<String, anyhow::Error> {
1752 let input = args.expect_str("input");
1753 let stats = args.optional_str("stats");
1754 let batch = datadriven.batches.get(input).expect("unknown batch");
1755
1756 let mut s = String::new();
1757 let mut stream = pin!(
1758 batch
1759 .batch
1760 .part_stream(
1761 datadriven.shard_id,
1762 &*datadriven.state_versions.blob,
1763 &*datadriven.state_versions.metrics
1764 )
1765 .enumerate()
1766 );
1767 while let Some((idx, part)) = stream.next().await {
1768 let part = &*part?;
1769 write!(s, "<part {idx}>\n");
1770
1771 let lower = match part {
1772 BatchPart::Inline { updates, .. } => {
1773 let updates: BlobTraceBatchPart<u64> =
1774 updates.decode(&datadriven.client.metrics.columnar)?;
1775 updates.structured_key_lower()
1776 }
1777 other => other.structured_key_lower(),
1778 };
1779
1780 if let Some(lower) = lower {
1781 if stats == Some("lower") {
1782 writeln!(s, "<key lower={}>", lower.get())
1783 }
1784 }
1785
1786 match part {
1787 BatchPart::Hollow(part) => {
1788 let blob_batch = datadriven
1789 .client
1790 .blob
1791 .get(&part.key.complete(&datadriven.shard_id))
1792 .await;
1793 match blob_batch {
1794 Ok(Some(_)) | Err(_) => {}
1795 Ok(None) => {
1798 s.push_str("<empty>\n");
1799 continue;
1800 }
1801 };
1802 }
1803 BatchPart::Inline { .. } => {}
1804 };
1805 let part = EncodedPart::fetch(
1806 &FetchConfig::from_persist_config(&datadriven.client.cfg),
1807 &datadriven.shard_id,
1808 datadriven.client.blob.as_ref(),
1809 datadriven.client.metrics.as_ref(),
1810 datadriven.machine.applier.shard_metrics.as_ref(),
1811 &datadriven.client.metrics.read.batch_fetcher,
1812 &batch.batch.desc,
1813 part,
1814 )
1815 .await
1816 .expect("invalid batch part");
1817 let part = part
1818 .normalize(&datadriven.client.metrics.columnar)
1819 .into_part::<String, ()>(&*SCHEMAS.key, &*SCHEMAS.val);
1820
1821 for ((k, _v), t, d) in part
1822 .decode_iter::<_, _, u64, i64>(&*SCHEMAS.key, &*SCHEMAS.val)
1823 .expect("valid schemas")
1824 {
1825 writeln!(s, "{k} {t} {d}");
1826 }
1827 }
1828 if !s.is_empty() {
1829 for (idx, (_meta, run)) in batch.batch.runs().enumerate() {
1830 write!(s, "<run {idx}>\n");
1831 for part in run {
1832 let part_idx = batch
1833 .batch
1834 .parts
1835 .iter()
1836 .position(|p| p == part)
1837 .expect("part should exist");
1838 write!(s, "part {part_idx}\n");
1839 }
1840 }
1841 }
1842 Ok(s)
1843 }
1844
1845 #[allow(clippy::unused_async)]
1846 pub async fn truncate_batch_desc(
1847 datadriven: &mut MachineState,
1848 args: DirectiveArgs<'_>,
1849 ) -> Result<String, anyhow::Error> {
1850 let input = args.expect_str("input");
1851 let output = args.expect_str("output");
1852 let lower = args.expect_antichain("lower");
1853 let upper = args.expect_antichain("upper");
1854
1855 let batch = datadriven
1856 .batches
1857 .get(input)
1858 .expect("unknown batch")
1859 .clone();
1860 let truncated_desc = Description::new(lower, upper, batch.batch.desc.since().clone());
1861 let () = validate_truncate_batch(&batch.batch, &truncated_desc, false, true)?;
1862 let mut new_hollow_batch = (*batch.batch).clone();
1863 new_hollow_batch.desc = truncated_desc;
1864 let new_batch = IdHollowBatch {
1865 batch: Arc::new(new_hollow_batch),
1866 id: batch.id,
1867 };
1868 datadriven
1869 .batches
1870 .insert(output.to_owned(), new_batch.clone());
1871 Ok(format!(
1872 "parts={} len={}\n",
1873 batch.batch.part_count(),
1874 batch.batch.len
1875 ))
1876 }
1877
1878 #[allow(clippy::unused_async)]
1879 pub async fn set_batch_parts_size(
1880 datadriven: &mut MachineState,
1881 args: DirectiveArgs<'_>,
1882 ) -> Result<String, anyhow::Error> {
1883 let input = args.expect_str("input");
1884 let size = args.expect("size");
1885 let batch = datadriven.batches.get_mut(input).expect("unknown batch");
1886 let mut hollow_batch = (*batch.batch).clone();
1887 for part in hollow_batch.parts.iter_mut() {
1888 match part {
1889 RunPart::Single(BatchPart::Hollow(x)) => x.encoded_size_bytes = size,
1890 _ => {
1891 panic!("set_batch_parts_size only supports hollow parts")
1892 }
1893 }
1894 }
1895 batch.batch = Arc::new(hollow_batch);
1896 Ok("ok\n".to_string())
1897 }
1898
1899 pub async fn compact(
1900 datadriven: &mut MachineState,
1901 args: DirectiveArgs<'_>,
1902 ) -> Result<String, anyhow::Error> {
1903 let output = args.expect_str("output");
1904 let lower = args.expect_antichain("lower");
1905 let upper = args.expect_antichain("upper");
1906 let since = args.expect_antichain("since");
1907 let target_size = args.optional("target_size");
1908 let memory_bound = args.optional("memory_bound");
1909
1910 let mut inputs = Vec::new();
1911 for input in args.args.get("inputs").expect("missing inputs") {
1912 inputs.push(
1913 datadriven
1914 .batches
1915 .get(input)
1916 .expect("unknown batch")
1917 .clone(),
1918 );
1919 }
1920
1921 let cfg = datadriven.client.cfg.clone();
1922 if let Some(target_size) = target_size {
1923 cfg.set_config(&BLOB_TARGET_SIZE, target_size);
1924 };
1925 if let Some(memory_bound) = memory_bound {
1926 cfg.set_config(&COMPACTION_MEMORY_BOUND_BYTES, memory_bound);
1927 }
1928 let req = CompactReq {
1929 shard_id: datadriven.shard_id,
1930 desc: Description::new(lower, upper, since),
1931 inputs: inputs.clone(),
1932 };
1933 datadriven
1934 .compactions
1935 .insert(output.to_owned(), req.clone());
1936 let spine_lower = inputs
1937 .first()
1938 .map_or_else(|| datadriven.next_id, |x| x.id.0);
1939 let spine_upper = inputs.last().map_or_else(
1940 || {
1941 datadriven.next_id += 1;
1942 datadriven.next_id
1943 },
1944 |x| x.id.1,
1945 );
1946 let new_spine_id = SpineId(spine_lower, spine_upper);
1947 let res = Compactor::<String, (), u64, i64>::compact(
1948 CompactConfig::new(&cfg, datadriven.shard_id),
1949 Arc::clone(&datadriven.client.blob),
1950 Arc::clone(&datadriven.client.metrics),
1951 Arc::clone(&datadriven.machine.applier.shard_metrics),
1952 Arc::clone(&datadriven.client.isolated_runtime),
1953 req,
1954 SCHEMAS.clone(),
1955 )
1956 .await?;
1957
1958 let batch = IdHollowBatch {
1959 batch: Arc::new(res.output.clone()),
1960 id: new_spine_id,
1961 };
1962
1963 datadriven.batches.insert(output.to_owned(), batch.clone());
1964 Ok(format!(
1965 "parts={} len={}\n",
1966 res.output.part_count(),
1967 res.output.len
1968 ))
1969 }
1970
1971 pub async fn clear_blob(
1972 datadriven: &MachineState,
1973 _args: DirectiveArgs<'_>,
1974 ) -> Result<String, anyhow::Error> {
1975 let mut to_delete = vec![];
1976 datadriven
1977 .client
1978 .blob
1979 .list_keys_and_metadata("", &mut |meta| {
1980 to_delete.push(meta.key.to_owned());
1981 })
1982 .await?;
1983 for blob in &to_delete {
1984 datadriven.client.blob.delete(blob).await?;
1985 }
1986 Ok(format!("deleted={}\n", to_delete.len()))
1987 }
1988
1989 pub async fn restore_blob(
1990 datadriven: &MachineState,
1991 _args: DirectiveArgs<'_>,
1992 ) -> Result<String, anyhow::Error> {
1993 let not_restored = crate::internal::restore::restore_blob(
1994 &datadriven.state_versions,
1995 datadriven.client.blob.as_ref(),
1996 &datadriven.client.cfg.build_version,
1997 datadriven.shard_id,
1998 &*datadriven.state_versions.metrics,
1999 )
2000 .await?;
2001 let mut out = String::new();
2002 for key in not_restored {
2003 writeln!(&mut out, "{key}");
2004 }
2005 Ok(out)
2006 }
2007
2008 #[allow(clippy::unused_async)]
2009 pub async fn rewrite_ts(
2010 datadriven: &mut MachineState,
2011 args: DirectiveArgs<'_>,
2012 ) -> Result<String, anyhow::Error> {
2013 let input = args.expect_str("input");
2014 let ts_rewrite = args.expect_antichain("frontier");
2015 let upper = args.expect_antichain("upper");
2016
2017 let batch = datadriven.batches.get_mut(input).expect("unknown batch");
2018 let mut hollow_batch = (*batch.batch).clone();
2019 let () = hollow_batch
2020 .rewrite_ts(&ts_rewrite, upper)
2021 .map_err(|err| anyhow!("invalid rewrite: {}", err))?;
2022 batch.batch = Arc::new(hollow_batch);
2023 Ok("ok\n".into())
2024 }
2025
2026 pub async fn gc(
2027 datadriven: &mut MachineState,
2028 args: DirectiveArgs<'_>,
2029 ) -> Result<String, anyhow::Error> {
2030 let new_seqno_since = args.expect("to_seqno");
2031
2032 let req = GcReq {
2033 shard_id: datadriven.shard_id,
2034 new_seqno_since,
2035 };
2036 let (maintenance, stats) =
2037 GarbageCollector::gc_and_truncate(&datadriven.machine, req).await;
2038 datadriven.routine.push(maintenance);
2039
2040 Ok(format!(
2041 "{} batch_parts={} rollups={} truncated={} state_rollups={}\n",
2042 datadriven.machine.seqno(),
2043 stats.batch_parts_deleted_from_blob,
2044 stats.rollups_deleted_from_blob,
2045 stats
2046 .truncated_consensus_to
2047 .iter()
2048 .map(|x| x.to_string())
2049 .collect::<Vec<_>>()
2050 .join(","),
2051 stats
2052 .rollups_removed_from_state
2053 .iter()
2054 .map(|x| x.to_string())
2055 .collect::<Vec<_>>()
2056 .join(","),
2057 ))
2058 }
2059
2060 pub async fn snapshot(
2061 datadriven: &MachineState,
2062 args: DirectiveArgs<'_>,
2063 ) -> Result<String, anyhow::Error> {
2064 let as_of = args.expect_antichain("as_of");
2065 let snapshot = datadriven
2066 .machine
2067 .snapshot(&as_of)
2068 .await
2069 .map_err(|err| anyhow!("{:?}", err))?;
2070
2071 let mut result = String::new();
2072
2073 for batch in snapshot {
2074 writeln!(
2075 result,
2076 "<batch {:?}-{:?}>",
2077 batch.desc.lower().elements(),
2078 batch.desc.upper().elements()
2079 );
2080 for (run, (_meta, parts)) in batch.runs().enumerate() {
2081 writeln!(result, "<run {run}>");
2082 let mut stream = pin!(
2083 futures::stream::iter(parts)
2084 .flat_map(|part| part.part_stream(
2085 datadriven.shard_id,
2086 &*datadriven.state_versions.blob,
2087 &*datadriven.state_versions.metrics
2088 ))
2089 .enumerate()
2090 );
2091
2092 while let Some((idx, part)) = stream.next().await {
2093 let part = &*part?;
2094 writeln!(result, "<part {idx}>");
2095
2096 let part = EncodedPart::fetch(
2097 &FetchConfig::from_persist_config(&datadriven.client.cfg),
2098 &datadriven.shard_id,
2099 datadriven.client.blob.as_ref(),
2100 datadriven.client.metrics.as_ref(),
2101 datadriven.machine.applier.shard_metrics.as_ref(),
2102 &datadriven.client.metrics.read.batch_fetcher,
2103 &batch.desc,
2104 part,
2105 )
2106 .await
2107 .expect("invalid batch part");
2108 let part = part
2109 .normalize(&datadriven.client.metrics.columnar)
2110 .into_part::<String, ()>(&*SCHEMAS.key, &*SCHEMAS.val);
2111
2112 let mut updates = Vec::new();
2113
2114 for ((k, _v), mut t, d) in part
2115 .decode_iter::<_, _, u64, i64>(&*SCHEMAS.key, &*SCHEMAS.val)
2116 .expect("valid schemas")
2117 {
2118 t.advance_by(as_of.borrow());
2119 updates.push((k, t, d));
2120 }
2121
2122 consolidate_updates(&mut updates);
2123
2124 for (k, t, d) in updates {
2125 writeln!(result, "{k} {t} {d}");
2126 }
2127 }
2128 }
2129 }
2130
2131 Ok(result)
2132 }
2133
2134 pub async fn register_listen(
2135 datadriven: &mut MachineState,
2136 args: DirectiveArgs<'_>,
2137 ) -> Result<String, anyhow::Error> {
2138 let output = args.expect_str("output");
2139 let as_of = args.expect_antichain("as_of");
2140 let read = datadriven
2141 .client
2142 .open_leased_reader::<String, (), u64, i64>(
2143 datadriven.shard_id,
2144 Arc::new(StringSchema),
2145 Arc::new(UnitSchema),
2146 Diagnostics::for_tests(),
2147 true,
2148 )
2149 .await
2150 .expect("invalid shard types");
2151 let listen = read
2152 .listen(as_of)
2153 .await
2154 .map_err(|err| anyhow!("{:?}", err))?;
2155 datadriven.listens.insert(output.to_owned(), listen);
2156 Ok("ok\n".into())
2157 }
2158
2159 pub async fn listen_through(
2160 datadriven: &mut MachineState,
2161 args: DirectiveArgs<'_>,
2162 ) -> Result<String, anyhow::Error> {
2163 let input = args.expect_str("input");
2164 let frontier = args.expect("frontier");
2167 let listen = datadriven.listens.get_mut(input).expect("unknown listener");
2168 let mut s = String::new();
2169 loop {
2170 for event in listen.fetch_next().await {
2171 match event {
2172 ListenEvent::Updates(x) => {
2173 for ((k, _v), t, d) in x.iter() {
2174 write!(s, "{} {} {}\n", k, t, d);
2175 }
2176 }
2177 ListenEvent::Progress(x) => {
2178 if !x.less_than(&frontier) {
2179 return Ok(s);
2180 }
2181 }
2182 }
2183 }
2184 }
2185 }
2186
2187 pub async fn register_critical_reader(
2188 datadriven: &mut MachineState,
2189 args: DirectiveArgs<'_>,
2190 ) -> Result<String, anyhow::Error> {
2191 let reader_id = args.expect("reader_id");
2192 let (state, maintenance) = datadriven
2193 .machine
2194 .register_critical_reader::<u64>(&reader_id, "tests")
2195 .await;
2196 datadriven.routine.push(maintenance);
2197 Ok(format!(
2198 "{} {:?}\n",
2199 datadriven.machine.seqno(),
2200 state.since.elements(),
2201 ))
2202 }
2203
2204 pub async fn register_leased_reader(
2205 datadriven: &mut MachineState,
2206 args: DirectiveArgs<'_>,
2207 ) -> Result<String, anyhow::Error> {
2208 let reader_id = args.expect("reader_id");
2209 let (reader_state, maintenance) = datadriven
2210 .machine
2211 .register_leased_reader(
2212 &reader_id,
2213 "tests",
2214 READER_LEASE_DURATION.get(&datadriven.client.cfg),
2215 (datadriven.client.cfg.now)(),
2216 false,
2217 )
2218 .await;
2219 datadriven.routine.push(maintenance);
2220 Ok(format!(
2221 "{} {:?}\n",
2222 datadriven.machine.seqno(),
2223 reader_state.since.elements(),
2224 ))
2225 }
2226
2227 pub async fn expire_critical_reader(
2228 datadriven: &mut MachineState,
2229 args: DirectiveArgs<'_>,
2230 ) -> Result<String, anyhow::Error> {
2231 let reader_id = args.expect("reader_id");
2232 let (_, maintenance) = datadriven.machine.expire_critical_reader(&reader_id).await;
2233 datadriven.routine.push(maintenance);
2234 Ok(format!("{} ok\n", datadriven.machine.seqno()))
2235 }
2236
2237 pub async fn expire_leased_reader(
2238 datadriven: &mut MachineState,
2239 args: DirectiveArgs<'_>,
2240 ) -> Result<String, anyhow::Error> {
2241 let reader_id = args.expect("reader_id");
2242 let (_, maintenance) = datadriven.machine.expire_leased_reader(&reader_id).await;
2243 datadriven.routine.push(maintenance);
2244 Ok(format!("{} ok\n", datadriven.machine.seqno()))
2245 }
2246
2247 pub async fn compare_and_append_batches(
2248 datadriven: &MachineState,
2249 args: DirectiveArgs<'_>,
2250 ) -> Result<String, anyhow::Error> {
2251 let expected_upper = args.expect_antichain("expected_upper");
2252 let new_upper = args.expect_antichain("new_upper");
2253
2254 let mut batches: Vec<Batch<String, (), u64, i64>> = args
2255 .args
2256 .get("batches")
2257 .expect("missing batches")
2258 .into_iter()
2259 .map(|batch| {
2260 let hollow = (*datadriven
2261 .batches
2262 .get(batch)
2263 .expect("unknown batch")
2264 .clone()
2265 .batch)
2266 .clone();
2267 datadriven.to_batch(hollow)
2268 })
2269 .collect();
2270
2271 let mut writer = datadriven
2272 .client
2273 .open_writer(
2274 datadriven.shard_id,
2275 Arc::new(StringSchema),
2276 Arc::new(UnitSchema),
2277 Diagnostics::for_tests(),
2278 )
2279 .await?;
2280
2281 let mut batch_refs: Vec<_> = batches.iter_mut().collect();
2282
2283 let () = writer
2284 .compare_and_append_batch(batch_refs.as_mut_slice(), expected_upper, new_upper, true)
2285 .await?
2286 .map_err(|err| anyhow!("upper mismatch: {:?}", err))?;
2287
2288 writer.expire().await;
2289
2290 Ok("ok\n".into())
2291 }
2292
2293 pub async fn expire_writer(
2294 datadriven: &mut MachineState,
2295 args: DirectiveArgs<'_>,
2296 ) -> Result<String, anyhow::Error> {
2297 let writer_id = args.expect("writer_id");
2298 let (_, maintenance) = datadriven.machine.expire_writer(&writer_id).await;
2299 datadriven.routine.push(maintenance);
2300 Ok(format!("{} ok\n", datadriven.machine.seqno()))
2301 }
2302
2303 pub(crate) async fn finalize(
2304 datadriven: &mut MachineState,
2305 _args: DirectiveArgs<'_>,
2306 ) -> anyhow::Result<String> {
2307 let maintenance = datadriven.machine.become_tombstone().await?;
2308 datadriven.routine.push(maintenance);
2309 Ok(format!("{} ok\n", datadriven.machine.seqno()))
2310 }
2311
2312 pub(crate) fn is_finalized(
2313 datadriven: &MachineState,
2314 _args: DirectiveArgs<'_>,
2315 ) -> anyhow::Result<String> {
2316 let seqno = datadriven.machine.seqno();
2317 let tombstone = datadriven.machine.is_finalized();
2318 Ok(format!("{seqno} {tombstone}\n"))
2319 }
2320
2321 pub async fn compare_and_append(
2322 datadriven: &mut MachineState,
2323 args: DirectiveArgs<'_>,
2324 ) -> Result<String, anyhow::Error> {
2325 let input = args.expect_str("input");
2326 let writer_id = args.expect("writer_id");
2327 let mut batch = datadriven
2328 .batches
2329 .get(input)
2330 .expect("unknown batch")
2331 .clone();
2332 let token = args.optional("token").unwrap_or_else(IdempotencyToken::new);
2333 let now = (datadriven.client.cfg.now)();
2334
2335 let (id, maintenance) = datadriven
2336 .machine
2337 .register_schema(&*SCHEMAS.key, &*SCHEMAS.val)
2338 .await;
2339 assert_eq!(id, SCHEMAS.id);
2340 datadriven.routine.push(maintenance);
2341 let maintenance = loop {
2342 let indeterminate = args
2343 .optional::<String>("prev_indeterminate")
2344 .map(|x| Indeterminate::new(anyhow::Error::msg(x)));
2345 let res = datadriven
2346 .machine
2347 .compare_and_append_idempotent(
2348 &batch.batch,
2349 &writer_id,
2350 now,
2351 &token,
2352 &HandleDebugState::default(),
2353 indeterminate,
2354 )
2355 .await;
2356 match res {
2357 CompareAndAppendRes::Success(_, x) => break x,
2358 CompareAndAppendRes::UpperMismatch(_seqno, upper) => {
2359 return Err(anyhow!("{:?}", Upper(upper)));
2360 }
2361 CompareAndAppendRes::InlineBackpressure => {
2362 let hollow_batch = (*batch.batch).clone();
2363 let mut b = datadriven.to_batch(hollow_batch);
2364 let cfg = BatchBuilderConfig::new(&datadriven.client.cfg, datadriven.shard_id);
2365 b.flush_to_blob(
2366 &cfg,
2367 &datadriven.client.metrics.user,
2368 &datadriven.client.isolated_runtime,
2369 &*SCHEMAS,
2370 )
2371 .await;
2372 batch.batch = Arc::new(b.into_hollow_batch());
2373 continue;
2374 }
2375 _ => panic!("{:?}", res),
2376 };
2377 };
2378 datadriven.routine.push(maintenance.routine);
2381 Ok(format!(
2382 "{} {:?}\n",
2383 datadriven.machine.seqno(),
2384 datadriven.machine.applier.clone_upper().elements(),
2385 ))
2386 }
2387
2388 pub async fn apply_merge_res(
2389 datadriven: &mut MachineState,
2390 args: DirectiveArgs<'_>,
2391 ) -> Result<String, anyhow::Error> {
2392 let input = args.expect_str("input");
2393 let batch = datadriven
2394 .batches
2395 .get(input)
2396 .expect("unknown batch")
2397 .clone();
2398 let compact_req = datadriven
2399 .compactions
2400 .get(input)
2401 .expect("unknown compact req")
2402 .clone();
2403 let input_batches = compact_req
2404 .inputs
2405 .iter()
2406 .map(|x| x.id)
2407 .collect::<BTreeSet<_>>();
2408 let lower_spine_bound = input_batches
2409 .first()
2410 .map(|id| id.0)
2411 .expect("at least one batch must be present");
2412 let upper_spine_bound = input_batches
2413 .last()
2414 .map(|id| id.1)
2415 .expect("at least one batch must be present");
2416 let id = SpineId(lower_spine_bound, upper_spine_bound);
2417 let hollow_batch = (*batch.batch).clone();
2418
2419 let (merge_res, maintenance) = datadriven
2420 .machine
2421 .merge_res(&FueledMergeRes {
2422 output: hollow_batch,
2423 input: CompactionInput::IdRange(id),
2424 new_active_compaction: None,
2425 })
2426 .await;
2427 datadriven.routine.push(maintenance);
2428 Ok(format!(
2429 "{} {}\n",
2430 datadriven.machine.seqno(),
2431 merge_res.applied()
2432 ))
2433 }
2434
2435 pub async fn perform_maintenance(
2436 datadriven: &mut MachineState,
2437 _args: DirectiveArgs<'_>,
2438 ) -> Result<String, anyhow::Error> {
2439 let mut s = String::new();
2440 for maintenance in datadriven.routine.drain(..) {
2441 let () = maintenance
2442 .perform(&datadriven.machine, &datadriven.gc)
2443 .await;
2444 let () = datadriven
2445 .machine
2446 .applier
2447 .fetch_and_update_state(None)
2448 .await;
2449 write!(s, "{} ok\n", datadriven.machine.seqno());
2450 }
2451 Ok(s)
2452 }
2453}
2454
2455#[cfg(test)]
2456pub mod tests {
2457 use std::sync::Arc;
2458
2459 use mz_dyncfg::ConfigUpdates;
2460 use mz_ore::cast::CastFrom;
2461 use mz_ore::task::spawn;
2462 use mz_persist::intercept::{InterceptBlob, InterceptHandle};
2463 use mz_persist::location::SeqNo;
2464 use mz_persist_types::PersistLocation;
2465 use semver::Version;
2466 use timely::progress::Antichain;
2467
2468 use crate::batch::BatchBuilderConfig;
2469 use crate::cache::StateCache;
2470 use crate::internal::gc::{GarbageCollector, GcReq};
2471 use crate::internal::state::{HandleDebugState, ROLLUP_THRESHOLD};
2472 use crate::tests::{new_test_client, new_test_client_cache};
2473 use crate::{Diagnostics, PersistClient, ShardId};
2474
2475 #[mz_persist_proc::test(tokio::test(flavor = "multi_thread"))]
2476 #[cfg_attr(miri, ignore)] async fn apply_unbatched_cmd_truncate(dyncfgs: ConfigUpdates) {
2478 mz_ore::test::init_logging();
2479
2480 let client = new_test_client(&dyncfgs).await;
2481 client.cfg.set_config(&ROLLUP_THRESHOLD, 5);
2483 let (mut write, read) = client
2484 .expect_open::<String, (), u64, i64>(ShardId::new())
2485 .await;
2486
2487 read.expire().await;
2489
2490 const NUM_BATCHES: u64 = 100;
2493 for idx in 0..NUM_BATCHES {
2494 let mut batch = write
2495 .expect_batch(&[((idx.to_string(), ()), idx, 1)], idx, idx + 1)
2496 .await;
2497 let cfg = BatchBuilderConfig::new(&client.cfg, write.shard_id());
2500 batch
2501 .flush_to_blob(
2502 &cfg,
2503 &client.metrics.user,
2504 &client.isolated_runtime,
2505 &write.write_schemas,
2506 )
2507 .await;
2508 let (_, writer_maintenance) = write
2509 .machine
2510 .compare_and_append(
2511 &batch.into_hollow_batch(),
2512 &write.writer_id,
2513 &HandleDebugState::default(),
2514 (write.cfg.now)(),
2515 )
2516 .await
2517 .unwrap();
2518 writer_maintenance
2519 .perform(&write.machine, &write.gc, write.compact.as_ref())
2520 .await;
2521 }
2522 let live_diffs = write
2523 .machine
2524 .applier
2525 .state_versions
2526 .fetch_all_live_diffs(&write.machine.shard_id())
2527 .await;
2528 assert!(live_diffs.len() > 0);
2530 let max_live_diffs = 2 * usize::cast_from(NUM_BATCHES.next_power_of_two().trailing_zeros());
2534 assert!(
2535 live_diffs.len() <= max_live_diffs,
2536 "{} vs {}",
2537 live_diffs.len(),
2538 max_live_diffs
2539 );
2540 }
2541
2542 #[mz_persist_proc::test(tokio::test(flavor = "multi_thread"))]
2546 #[cfg_attr(miri, ignore)] async fn regression_gc_skipped_req_and_interrupted(dyncfgs: ConfigUpdates) {
2548 let mut client = new_test_client(&dyncfgs).await;
2549 let intercept = InterceptHandle::default();
2550 client.blob = Arc::new(InterceptBlob::new(
2551 Arc::clone(&client.blob),
2552 intercept.clone(),
2553 ));
2554 let (_, mut read) = client
2555 .expect_open::<String, String, u64, i64>(ShardId::new())
2556 .await;
2557
2558 read.downgrade_since(&Antichain::from_elem(1)).await;
2560 let new_seqno_since = read.machine.applier.seqno_since();
2561
2562 assert!(new_seqno_since > SeqNo::minimum());
2567 intercept.set_post_delete(Some(Arc::new(|_, _| panic!("boom"))));
2568 let machine = read.machine.clone();
2569 let gc = spawn(|| "", async move {
2571 let req = GcReq {
2572 shard_id: machine.shard_id(),
2573 new_seqno_since,
2574 };
2575 GarbageCollector::gc_and_truncate(&machine, req).await
2576 });
2577 let _ = gc.await;
2580
2581 intercept.set_post_delete(None);
2584 let req = GcReq {
2585 shard_id: read.machine.shard_id(),
2586 new_seqno_since,
2587 };
2588 let _ = GarbageCollector::gc_and_truncate(&read.machine, req.clone()).await;
2589 }
2590
2591 #[mz_persist_proc::test(tokio::test(flavor = "multi_thread"))]
2596 #[cfg_attr(miri, ignore)] async fn regression_update_state_after_upper_mismatch(dyncfgs: ConfigUpdates) {
2598 let client = new_test_client(&dyncfgs).await;
2599 let mut client2 = client.clone();
2600
2601 let new_state_cache = Arc::new(StateCache::new_no_metrics());
2604 client2.shared_states = new_state_cache;
2605
2606 let shard_id = ShardId::new();
2607 let (mut write1, _) = client.expect_open::<String, (), u64, i64>(shard_id).await;
2608 let (mut write2, _) = client2.expect_open::<String, (), u64, i64>(shard_id).await;
2609
2610 let data = [
2611 (("1".to_owned(), ()), 1, 1),
2612 (("2".to_owned(), ()), 2, 1),
2613 (("3".to_owned(), ()), 3, 1),
2614 (("4".to_owned(), ()), 4, 1),
2615 (("5".to_owned(), ()), 5, 1),
2616 ];
2617
2618 write1.expect_compare_and_append(&data[..1], 0, 2).await;
2619
2620 write2.expect_compare_and_append(&data[1..2], 2, 3).await;
2623 }
2624
2625 #[mz_persist_proc::test(tokio::test(flavor = "multi_thread"))]
2626 #[cfg_attr(miri, ignore)]
2627 async fn version_upgrade(dyncfgs: ConfigUpdates) {
2628 let mut cache = new_test_client_cache(&dyncfgs);
2629 cache.cfg.build_version = Version::new(26, 1, 0);
2630 let shard_id = ShardId::new();
2631
2632 async fn fetch_catalog_upgrade_shard_version(
2633 persist_client: &PersistClient,
2634 upgrade_shard_id: ShardId,
2635 ) -> Option<semver::Version> {
2636 let shard_state = persist_client
2637 .inspect_shard::<u64>(&upgrade_shard_id)
2638 .await
2639 .ok()?;
2640 let json_state = serde_json::to_value(shard_state).expect("state serialization error");
2641 let upgrade_version = json_state
2642 .get("applier_version")
2643 .cloned()
2644 .expect("missing applier_version");
2645 let upgrade_version =
2646 serde_json::from_value(upgrade_version).expect("version deserialization error");
2647 Some(upgrade_version)
2648 }
2649
2650 cache.cfg.build_version = Version::new(26, 1, 0);
2651 let client = cache.open(PersistLocation::new_in_mem()).await.unwrap();
2652 let (_, mut reader) = client.expect_open::<String, (), u64, i64>(shard_id).await;
2653 reader.downgrade_since(&Antichain::from_elem(1)).await;
2654 assert_eq!(
2655 fetch_catalog_upgrade_shard_version(&client, shard_id).await,
2656 Some(Version::new(26, 1, 0)),
2657 );
2658
2659 cache.cfg.build_version = Version::new(27, 1, 0);
2661 let client = cache.open(PersistLocation::new_in_mem()).await.unwrap();
2662 let (_, mut reader) = client.expect_open::<String, (), u64, i64>(shard_id).await;
2663 reader.downgrade_since(&Antichain::from_elem(2)).await;
2664 assert_eq!(
2665 fetch_catalog_upgrade_shard_version(&client, shard_id).await,
2666 Some(Version::new(26, 1, 0)),
2667 );
2668
2669 client
2671 .upgrade_version::<String, (), u64, i64>(shard_id, Diagnostics::for_tests())
2672 .await
2673 .unwrap();
2674 assert_eq!(
2675 fetch_catalog_upgrade_shard_version(&client, shard_id).await,
2676 Some(Version::new(27, 1, 0)),
2677 );
2678 }
2679}