1use std::fmt::Debug;
13use std::ops::ControlFlow::{self, Continue};
14use std::sync::Arc;
15use std::time::{Duration, Instant, SystemTime};
16
17use differential_dataflow::difference::Semigroup;
18use differential_dataflow::lattice::Lattice;
19use futures::FutureExt;
20use futures::future::{self, BoxFuture};
21use mz_dyncfg::{Config, ConfigSet};
22use mz_ore::assert_none;
23use mz_ore::cast::CastFrom;
24use mz_ore::error::ErrorExt;
25#[allow(unused_imports)] use mz_ore::fmt::FormatBuffer;
27use mz_ore::task::JoinHandle;
28use mz_persist::location::{ExternalError, Indeterminate, SeqNo};
29use mz_persist::retry::Retry;
30use mz_persist_types::schema::SchemaId;
31use mz_persist_types::{Codec, Codec64, Opaque};
32use semver::Version;
33use timely::PartialOrder;
34use timely::progress::{Antichain, Timestamp};
35use tracing::{Instrument, debug, info, trace_span, warn};
36
37use crate::async_runtime::IsolatedRuntime;
38use crate::batch::INLINE_WRITES_TOTAL_MAX_BYTES;
39use crate::cache::StateCache;
40use crate::cfg::RetryParameters;
41use crate::critical::CriticalReaderId;
42use crate::error::{CodecMismatch, InvalidUsage};
43use crate::internal::apply::Applier;
44use crate::internal::compact::CompactReq;
45use crate::internal::gc::GarbageCollector;
46use crate::internal::maintenance::{RoutineMaintenance, WriterMaintenance};
47use crate::internal::metrics::{CmdMetrics, Metrics, MetricsRetryStream, RetryMetrics};
48use crate::internal::paths::PartialRollupKey;
49use crate::internal::state::{
50 CompareAndAppendBreak, CriticalReaderState, ENABLE_INCREMENTAL_COMPACTION, HandleDebugState,
51 HollowBatch, HollowRollup, IdempotencyToken, LeasedReaderState, NoOpStateTransition, Since,
52 SnapshotErr, StateCollections, Upper,
53};
54use crate::internal::state_versions::StateVersions;
55use crate::internal::trace::{ApplyMergeResult, FueledMergeRes};
56use crate::internal::watch::StateWatch;
57use crate::read::{LeasedReaderId, READER_LEASE_DURATION};
58use crate::rpc::PubSubSender;
59use crate::schema::CaESchema;
60use crate::write::WriterId;
61use crate::{Diagnostics, PersistConfig, ShardId};
62
63#[derive(Debug)]
64pub struct Machine<K, V, T, D> {
65 pub(crate) applier: Applier<K, V, T, D>,
66 pub(crate) isolated_runtime: Arc<IsolatedRuntime>,
67}
68
69impl<K, V, T: Clone, D> Clone for Machine<K, V, T, D> {
71 fn clone(&self) -> Self {
72 Self {
73 applier: self.applier.clone(),
74 isolated_runtime: Arc::clone(&self.isolated_runtime),
75 }
76 }
77}
78
79pub(crate) const CLAIM_UNCLAIMED_COMPACTIONS: Config<bool> = Config::new(
80 "persist_claim_unclaimed_compactions",
81 false,
82 "If an append doesn't result in a compaction request, but there is some uncompacted batch \
83 in state, compact that instead.",
84);
85
86pub(crate) const CLAIM_COMPACTION_PERCENT: Config<usize> = Config::new(
87 "persist_claim_compaction_percent",
88 100,
89 "Claim a compaction with the given percent chance, if claiming compactions is enabled. \
90 (If over 100, we'll always claim at least one; for example, if set to 365, we'll claim at least \
91 three and have a 65% chance of claiming a fourth.)",
92);
93
94pub(crate) const CLAIM_COMPACTION_MIN_VERSION: Config<String> = Config::new(
95 "persist_claim_compaction_min_version",
96 String::new(),
97 "If set to a valid version string, compact away any earlier versions if possible.",
98);
99
100impl<K, V, T, D> Machine<K, V, T, D>
101where
102 K: Debug + Codec,
103 V: Debug + Codec,
104 T: Timestamp + Lattice + Codec64 + Sync,
105 D: Semigroup + Codec64,
106{
107 pub async fn new(
108 cfg: PersistConfig,
109 shard_id: ShardId,
110 metrics: Arc<Metrics>,
111 state_versions: Arc<StateVersions>,
112 shared_states: Arc<StateCache>,
113 pubsub_sender: Arc<dyn PubSubSender>,
114 isolated_runtime: Arc<IsolatedRuntime>,
115 diagnostics: Diagnostics,
116 ) -> Result<Self, Box<CodecMismatch>> {
117 let applier = Applier::new(
118 cfg,
119 shard_id,
120 metrics,
121 state_versions,
122 shared_states,
123 pubsub_sender,
124 diagnostics,
125 )
126 .await?;
127 Ok(Machine {
128 applier,
129 isolated_runtime,
130 })
131 }
132
133 pub fn shard_id(&self) -> ShardId {
134 self.applier.shard_id
135 }
136
137 pub fn seqno(&self) -> SeqNo {
138 self.applier.seqno()
139 }
140
141 pub async fn add_rollup_for_current_seqno(&self) -> RoutineMaintenance {
142 let rollup = self.applier.write_rollup_for_state().await;
143 let Some(rollup) = rollup else {
144 return RoutineMaintenance::default();
145 };
146
147 let (applied, maintenance) = self.add_rollup((rollup.seqno, &rollup.to_hollow())).await;
148 if !applied {
149 self.applier
152 .state_versions
153 .delete_rollup(&rollup.shard_id, &rollup.key)
154 .await;
155 }
156 maintenance
157 }
158
159 pub async fn add_rollup(
160 &self,
161 add_rollup: (SeqNo, &HollowRollup),
162 ) -> (bool, RoutineMaintenance) {
163 let mut applied_ever_true = false;
166 let metrics = Arc::clone(&self.applier.metrics);
167 let (_seqno, _applied, maintenance) = self
168 .apply_unbatched_idempotent_cmd(&metrics.cmds.add_rollup, |_, _, state| {
169 let ret = state.add_rollup(add_rollup);
170 if let Continue(applied) = ret {
171 applied_ever_true = applied_ever_true || applied;
172 }
173 ret
174 })
175 .await;
176 (applied_ever_true, maintenance)
177 }
178
179 pub async fn remove_rollups(
180 &self,
181 remove_rollups: &[(SeqNo, PartialRollupKey)],
182 ) -> (Vec<SeqNo>, RoutineMaintenance) {
183 let metrics = Arc::clone(&self.applier.metrics);
184 let (_seqno, removed_rollup_seqnos, maintenance) = self
185 .apply_unbatched_idempotent_cmd(&metrics.cmds.remove_rollups, |_, _, state| {
186 state.remove_rollups(remove_rollups)
187 })
188 .await;
189 (removed_rollup_seqnos, maintenance)
190 }
191
192 pub async fn register_leased_reader(
193 &self,
194 reader_id: &LeasedReaderId,
195 purpose: &str,
196 lease_duration: Duration,
197 heartbeat_timestamp_ms: u64,
198 use_critical_since: bool,
199 ) -> (LeasedReaderState<T>, RoutineMaintenance) {
200 let metrics = Arc::clone(&self.applier.metrics);
201 let (_seqno, (reader_state, seqno_since), maintenance) = self
202 .apply_unbatched_idempotent_cmd(&metrics.cmds.register, |seqno, cfg, state| {
203 state.register_leased_reader(
204 &cfg.hostname,
205 reader_id,
206 purpose,
207 seqno,
208 lease_duration,
209 heartbeat_timestamp_ms,
210 use_critical_since,
211 )
212 })
213 .await;
214 debug_assert!(
223 reader_state.seqno >= seqno_since,
224 "{} vs {}",
225 reader_state.seqno,
226 seqno_since,
227 );
228 (reader_state, maintenance)
229 }
230
231 pub async fn register_critical_reader<O: Opaque + Codec64>(
232 &self,
233 reader_id: &CriticalReaderId,
234 purpose: &str,
235 ) -> (CriticalReaderState<T>, RoutineMaintenance) {
236 let metrics = Arc::clone(&self.applier.metrics);
237 let (_seqno, state, maintenance) = self
238 .apply_unbatched_idempotent_cmd(&metrics.cmds.register, |_seqno, cfg, state| {
239 state.register_critical_reader::<O>(&cfg.hostname, reader_id, purpose)
240 })
241 .await;
242 (state, maintenance)
243 }
244
245 pub async fn register_schema(
246 &self,
247 key_schema: &K::Schema,
248 val_schema: &V::Schema,
249 ) -> (Option<SchemaId>, RoutineMaintenance) {
250 let metrics = Arc::clone(&self.applier.metrics);
251 let (_seqno, state, maintenance) = self
252 .apply_unbatched_idempotent_cmd(&metrics.cmds.register, |_seqno, _cfg, state| {
253 state.register_schema::<K, V>(key_schema, val_schema)
254 })
255 .await;
256 (state, maintenance)
257 }
258
259 pub async fn spine_exert(&self, fuel: usize) -> (Vec<CompactReq<T>>, RoutineMaintenance) {
260 if fuel == 0 || self.applier.all_batches().len() < 2 {
262 return (Vec::new(), RoutineMaintenance::default());
263 }
264
265 let metrics = Arc::clone(&self.applier.metrics);
266 let (_seqno, reqs, maintenance) = self
267 .apply_unbatched_idempotent_cmd(&metrics.cmds.spine_exert, |_seqno, _cfg, state| {
268 state.spine_exert(fuel)
269 })
270 .await;
271 let reqs = reqs
272 .into_iter()
273 .map(|req| CompactReq {
274 shard_id: self.shard_id(),
275 desc: req.desc,
276 inputs: req.inputs,
277 })
278 .collect();
279 (reqs, maintenance)
280 }
281
282 pub async fn compare_and_append(
283 &self,
284 batch: &HollowBatch<T>,
285 writer_id: &WriterId,
286 debug_info: &HandleDebugState,
287 heartbeat_timestamp_ms: u64,
288 ) -> CompareAndAppendRes<T> {
289 let idempotency_token = IdempotencyToken::new();
290 loop {
291 let res = self
292 .compare_and_append_idempotent(
293 batch,
294 writer_id,
295 heartbeat_timestamp_ms,
296 &idempotency_token,
297 debug_info,
298 None,
299 )
300 .await;
301 match res {
302 CompareAndAppendRes::Success(seqno, maintenance) => {
303 return CompareAndAppendRes::Success(seqno, maintenance);
304 }
305 CompareAndAppendRes::InvalidUsage(x) => {
306 return CompareAndAppendRes::InvalidUsage(x);
307 }
308 CompareAndAppendRes::InlineBackpressure => {
309 return CompareAndAppendRes::InlineBackpressure;
310 }
311 CompareAndAppendRes::UpperMismatch(seqno, _current_upper) => {
312 self.applier.fetch_and_update_state(Some(seqno)).await;
319 let (current_seqno, current_upper) =
320 self.applier.upper(|seqno, upper| (seqno, upper.clone()));
321
322 if ¤t_upper != batch.desc.lower() {
325 return CompareAndAppendRes::UpperMismatch(current_seqno, current_upper);
326 } else {
327 }
330 }
331 }
332 }
333 }
334
335 async fn compare_and_append_idempotent(
336 &self,
337 batch: &HollowBatch<T>,
338 writer_id: &WriterId,
339 heartbeat_timestamp_ms: u64,
340 idempotency_token: &IdempotencyToken,
341 debug_info: &HandleDebugState,
342 mut indeterminate: Option<Indeterminate>,
346 ) -> CompareAndAppendRes<T> {
347 let metrics = Arc::clone(&self.applier.metrics);
348 let lease_duration_ms = self
349 .applier
350 .cfg
351 .writer_lease_duration
352 .as_millis()
353 .try_into()
354 .expect("reasonable duration");
355 let mut retry = self
438 .applier
439 .metrics
440 .retries
441 .compare_and_append_idempotent
442 .stream(Retry::persist_defaults(SystemTime::now()).into_retry_stream());
443 let mut writer_was_present = false;
444 loop {
445 let cmd_res = self
446 .applier
447 .apply_unbatched_cmd(&metrics.cmds.compare_and_append, |_, cfg, state| {
448 writer_was_present = state.writers.contains_key(writer_id);
449 state.compare_and_append(
450 batch,
451 writer_id,
452 heartbeat_timestamp_ms,
453 lease_duration_ms,
454 idempotency_token,
455 debug_info,
456 INLINE_WRITES_TOTAL_MAX_BYTES.get(cfg),
457 if CLAIM_UNCLAIMED_COMPACTIONS.get(cfg) {
458 CLAIM_COMPACTION_PERCENT.get(cfg)
459 } else {
460 0
461 },
462 Version::parse(&CLAIM_COMPACTION_MIN_VERSION.get(cfg))
463 .ok()
464 .as_ref(),
465 )
466 })
467 .await;
468 let (seqno, res, routine) = match cmd_res {
469 Ok(x) => x,
470 Err(err) => {
471 info!(
474 "compare_and_append received an indeterminate error, retrying in {:?}: {}",
475 retry.next_sleep(),
476 err
477 );
478 if indeterminate.is_none() {
479 indeterminate = Some(err);
480 }
481 retry = retry.sleep().await;
482 continue;
483 }
484 };
485 match res {
486 Ok(merge_reqs) => {
487 let mut compact_reqs = Vec::with_capacity(merge_reqs.len());
490 for req in merge_reqs {
491 let req = CompactReq {
492 shard_id: self.shard_id(),
493 desc: req.desc,
494 inputs: req.inputs,
495 };
496 compact_reqs.push(req);
497 }
498 let writer_maintenance = WriterMaintenance {
499 routine,
500 compaction: compact_reqs,
501 };
502
503 if !writer_was_present {
504 metrics.state.writer_added.inc();
505 }
506 for part in &batch.parts {
507 if part.is_inline() {
508 let bytes = u64::cast_from(part.inline_bytes());
509 metrics.inline.part_commit_bytes.inc_by(bytes);
510 metrics.inline.part_commit_count.inc();
511 }
512 }
513 return CompareAndAppendRes::Success(seqno, writer_maintenance);
514 }
515 Err(CompareAndAppendBreak::AlreadyCommitted) => {
516 assert!(indeterminate.is_some());
520 self.applier.metrics.cmds.compare_and_append_noop.inc();
521 if !writer_was_present {
522 metrics.state.writer_added.inc();
523 }
524 return CompareAndAppendRes::Success(seqno, WriterMaintenance::default());
525 }
526 Err(CompareAndAppendBreak::InvalidUsage(err)) => {
527 assert_none!(indeterminate);
532 return CompareAndAppendRes::InvalidUsage(err);
533 }
534 Err(CompareAndAppendBreak::InlineBackpressure) => {
535 return CompareAndAppendRes::InlineBackpressure;
538 }
539 Err(CompareAndAppendBreak::Upper {
540 shard_upper,
541 writer_upper,
542 }) => {
543 assert!(
548 PartialOrder::less_equal(&writer_upper, &shard_upper),
549 "{:?} vs {:?}",
550 &writer_upper,
551 &shard_upper
552 );
553 if PartialOrder::less_than(&writer_upper, batch.desc.upper()) {
554 return CompareAndAppendRes::UpperMismatch(seqno, shard_upper);
558 }
559 if indeterminate.is_none() {
560 return CompareAndAppendRes::UpperMismatch(seqno, shard_upper);
565 }
566 panic!(
576 concat!(
577 "cannot distinguish compare_and_append success or failure ",
578 "caa_lower={:?} caa_upper={:?} writer_upper={:?} shard_upper={:?} err={:?}"
579 ),
580 batch.desc.lower().elements(),
581 batch.desc.upper().elements(),
582 writer_upper.elements(),
583 shard_upper.elements(),
584 indeterminate,
585 );
586 }
587 };
588 }
589 }
590
591 pub async fn downgrade_since(
592 &self,
593 reader_id: &LeasedReaderId,
594 outstanding_seqno: Option<SeqNo>,
595 new_since: &Antichain<T>,
596 heartbeat_timestamp_ms: u64,
597 ) -> (SeqNo, Since<T>, RoutineMaintenance) {
598 let metrics = Arc::clone(&self.applier.metrics);
599 self.apply_unbatched_idempotent_cmd(&metrics.cmds.downgrade_since, |seqno, _cfg, state| {
600 state.downgrade_since(
601 reader_id,
602 seqno,
603 outstanding_seqno,
604 new_since,
605 heartbeat_timestamp_ms,
606 )
607 })
608 .await
609 }
610
611 pub async fn compare_and_downgrade_since<O: Opaque + Codec64>(
612 &self,
613 reader_id: &CriticalReaderId,
614 expected_opaque: &O,
615 (new_opaque, new_since): (&O, &Antichain<T>),
616 ) -> (Result<Since<T>, (O, Since<T>)>, RoutineMaintenance) {
617 let metrics = Arc::clone(&self.applier.metrics);
618 let (_seqno, res, maintenance) = self
619 .apply_unbatched_idempotent_cmd(
620 &metrics.cmds.compare_and_downgrade_since,
621 |_seqno, _cfg, state| {
622 state.compare_and_downgrade_since::<O>(
623 reader_id,
624 expected_opaque,
625 (new_opaque, new_since),
626 )
627 },
628 )
629 .await;
630
631 match res {
632 Ok(since) => (Ok(since), maintenance),
633 Err((opaque, since)) => (Err((opaque, since)), maintenance),
634 }
635 }
636
637 pub async fn heartbeat_leased_reader(
638 &self,
639 reader_id: &LeasedReaderId,
640 heartbeat_timestamp_ms: u64,
641 ) -> (SeqNo, bool, RoutineMaintenance) {
642 let metrics = Arc::clone(&self.applier.metrics);
643 let (seqno, existed, maintenance) = self
644 .apply_unbatched_idempotent_cmd(&metrics.cmds.heartbeat_reader, |_, _, state| {
645 state.heartbeat_leased_reader(reader_id, heartbeat_timestamp_ms)
646 })
647 .await;
648 (seqno, existed, maintenance)
649 }
650
651 pub async fn expire_leased_reader(
652 &self,
653 reader_id: &LeasedReaderId,
654 ) -> (SeqNo, RoutineMaintenance) {
655 let metrics = Arc::clone(&self.applier.metrics);
656 let (seqno, _existed, maintenance) = self
657 .apply_unbatched_idempotent_cmd(&metrics.cmds.expire_reader, |_, _, state| {
658 state.expire_leased_reader(reader_id)
659 })
660 .await;
661 (seqno, maintenance)
662 }
663
664 #[allow(dead_code)] pub async fn expire_critical_reader(
666 &self,
667 reader_id: &CriticalReaderId,
668 ) -> (SeqNo, RoutineMaintenance) {
669 let metrics = Arc::clone(&self.applier.metrics);
670 let (seqno, _existed, maintenance) = self
671 .apply_unbatched_idempotent_cmd(&metrics.cmds.expire_reader, |_, _, state| {
672 state.expire_critical_reader(reader_id)
673 })
674 .await;
675 (seqno, maintenance)
676 }
677
678 pub async fn expire_writer(&self, writer_id: &WriterId) -> (SeqNo, RoutineMaintenance) {
679 let metrics = Arc::clone(&self.applier.metrics);
680 let (seqno, _existed, maintenance) = self
681 .apply_unbatched_idempotent_cmd(&metrics.cmds.expire_writer, |_, _, state| {
682 state.expire_writer(writer_id)
683 })
684 .await;
685 metrics.state.writer_removed.inc();
686 (seqno, maintenance)
687 }
688
689 pub fn is_finalized(&self) -> bool {
690 self.applier.is_finalized()
691 }
692
693 pub fn get_schema(&self, schema_id: SchemaId) -> Option<(K::Schema, V::Schema)> {
695 self.applier.get_schema(schema_id)
696 }
697
698 pub fn latest_schema(&self) -> Option<(SchemaId, K::Schema, V::Schema)> {
700 self.applier.latest_schema()
701 }
702
703 pub async fn compare_and_evolve_schema(
707 &self,
708 expected: SchemaId,
709 key_schema: &K::Schema,
710 val_schema: &V::Schema,
711 ) -> (CaESchema<K, V>, RoutineMaintenance) {
712 let metrics = Arc::clone(&self.applier.metrics);
713 let (_seqno, state, maintenance) = self
714 .apply_unbatched_idempotent_cmd(
715 &metrics.cmds.compare_and_evolve_schema,
716 |_seqno, _cfg, state| {
717 state.compare_and_evolve_schema::<K, V>(expected, key_schema, val_schema)
718 },
719 )
720 .await;
721 (state, maintenance)
722 }
723
724 async fn tombstone_step(&self) -> Result<(bool, RoutineMaintenance), InvalidUsage<T>> {
725 let metrics = Arc::clone(&self.applier.metrics);
726 let mut retry = self
727 .applier
728 .metrics
729 .retries
730 .idempotent_cmd
731 .stream(Retry::persist_defaults(SystemTime::now()).into_retry_stream());
732 loop {
733 let res = self
734 .applier
735 .apply_unbatched_cmd(&metrics.cmds.become_tombstone, |_, _, state| {
736 state.become_tombstone_and_shrink()
737 })
738 .await;
739 let err = match res {
740 Ok((_seqno, Ok(()), maintenance)) => return Ok((true, maintenance)),
741 Ok((_seqno, Err(NoOpStateTransition(())), maintenance)) => {
742 return Ok((false, maintenance));
743 }
744 Err(err) => err,
745 };
746 if retry.attempt() >= INFO_MIN_ATTEMPTS {
747 info!(
748 "become_tombstone received an indeterminate error, retrying in {:?}: {}",
749 retry.next_sleep(),
750 err
751 );
752 } else {
753 debug!(
754 "become_tombstone received an indeterminate error, retrying in {:?}: {}",
755 retry.next_sleep(),
756 err
757 );
758 }
759 retry = retry.sleep().await;
760 }
761 }
762
763 pub async fn become_tombstone(&self) -> Result<RoutineMaintenance, InvalidUsage<T>> {
764 self.applier.check_since_upper_both_empty()?;
765
766 let mut maintenance = RoutineMaintenance::default();
767
768 loop {
769 let (made_progress, more_maintenance) = self.tombstone_step().await?;
770 maintenance.merge(more_maintenance);
771 if !made_progress {
772 break;
773 }
774 }
775
776 Ok(maintenance)
777 }
778
779 pub async fn snapshot(&self, as_of: &Antichain<T>) -> Result<Vec<HollowBatch<T>>, Since<T>> {
780 let start = Instant::now();
781 let (mut seqno, mut upper) = match self.applier.snapshot(as_of) {
782 Ok(x) => return Ok(x),
783 Err(SnapshotErr::AsOfNotYetAvailable(seqno, Upper(upper))) => (seqno, upper),
784 Err(SnapshotErr::AsOfHistoricalDistinctionsLost(Since(since))) => {
785 return Err(Since(since));
786 }
787 };
788
789 let mut watch = self.applier.watch();
792 let watch = &mut watch;
793 let sleeps = self
794 .applier
795 .metrics
796 .retries
797 .snapshot
798 .stream(Retry::persist_defaults(SystemTime::now()).into_retry_stream());
799
800 enum Wake<'a, K, V, T, D> {
801 Watch(&'a mut StateWatch<K, V, T, D>),
802 Sleep(MetricsRetryStream),
803 }
804 let mut watch_fut = std::pin::pin!(
805 watch
806 .wait_for_seqno_ge(seqno.next())
807 .map(Wake::Watch)
808 .instrument(trace_span!("snapshot::watch")),
809 );
810 let mut sleep_fut = std::pin::pin!(
811 sleeps
812 .sleep()
813 .map(Wake::Sleep)
814 .instrument(trace_span!("snapshot::sleep")),
815 );
816
817 let mut logged_at_info = false;
821 loop {
822 if !logged_at_info && start.elapsed() >= Duration::from_millis(1024) {
826 logged_at_info = true;
827 info!(
828 "snapshot {} {} as of {:?} not yet available for {} upper {:?}",
829 self.applier.shard_metrics.name,
830 self.shard_id(),
831 as_of.elements(),
832 seqno,
833 upper.elements(),
834 );
835 } else {
836 debug!(
837 "snapshot {} {} as of {:?} not yet available for {} upper {:?}",
838 self.applier.shard_metrics.name,
839 self.shard_id(),
840 as_of.elements(),
841 seqno,
842 upper.elements(),
843 );
844 }
845
846 let wake = match future::select(watch_fut.as_mut(), sleep_fut.as_mut()).await {
847 future::Either::Left((wake, _)) => wake,
848 future::Either::Right((wake, _)) => wake,
849 };
850 match &wake {
854 Wake::Watch(_) => self.applier.metrics.watch.snapshot_woken_via_watch.inc(),
855 Wake::Sleep(_) => {
856 self.applier.metrics.watch.snapshot_woken_via_sleep.inc();
857 self.applier.fetch_and_update_state(Some(seqno)).await;
858 }
859 }
860
861 (seqno, upper) = match self.applier.snapshot(as_of) {
862 Ok(x) => {
863 if logged_at_info {
864 info!(
865 "snapshot {} {} as of {:?} now available",
866 self.applier.shard_metrics.name,
867 self.shard_id(),
868 as_of.elements(),
869 );
870 }
871 return Ok(x);
872 }
873 Err(SnapshotErr::AsOfNotYetAvailable(seqno, Upper(upper))) => {
874 (seqno, upper)
876 }
877 Err(SnapshotErr::AsOfHistoricalDistinctionsLost(Since(since))) => {
878 return Err(Since(since));
879 }
880 };
881
882 match wake {
883 Wake::Watch(watch) => {
884 watch_fut.set(
885 watch
886 .wait_for_seqno_ge(seqno.next())
887 .map(Wake::Watch)
888 .instrument(trace_span!("snapshot::watch")),
889 );
890 }
891 Wake::Sleep(sleeps) => {
892 debug!(
893 "snapshot {} {} sleeping for {:?}",
894 self.applier.shard_metrics.name,
895 self.shard_id(),
896 sleeps.next_sleep()
897 );
898 sleep_fut.set(
899 sleeps
900 .sleep()
901 .map(Wake::Sleep)
902 .instrument(trace_span!("snapshot::sleep")),
903 );
904 }
905 }
906 }
907 }
908
909 pub fn verify_listen(&self, as_of: &Antichain<T>) -> Result<(), Since<T>> {
911 self.applier.verify_listen(as_of)
912 }
913
914 pub async fn next_listen_batch(
915 &self,
916 frontier: &Antichain<T>,
917 watch: &mut StateWatch<K, V, T, D>,
918 reader_id: Option<&LeasedReaderId>,
919 retry: Option<RetryParameters>,
921 ) -> HollowBatch<T> {
922 let mut seqno = match self.applier.next_listen_batch(frontier) {
923 Ok(b) => return b,
924 Err(seqno) => seqno,
925 };
926
927 let retry = retry.unwrap_or_else(|| next_listen_batch_retry_params(&self.applier.cfg));
930 let sleeps = self
931 .applier
932 .metrics
933 .retries
934 .next_listen_batch
935 .stream(retry.into_retry(SystemTime::now()).into_retry_stream());
936
937 enum Wake<'a, K, V, T, D> {
938 Watch(&'a mut StateWatch<K, V, T, D>),
939 Sleep(MetricsRetryStream),
940 }
941 let mut watch_fut = std::pin::pin!(
942 watch
943 .wait_for_seqno_ge(seqno.next())
944 .map(Wake::Watch)
945 .instrument(trace_span!("snapshot::watch"))
946 );
947 let mut sleep_fut = std::pin::pin!(
948 sleeps
949 .sleep()
950 .map(Wake::Sleep)
951 .instrument(trace_span!("snapshot::sleep"))
952 );
953
954 loop {
955 let wake = match future::select(watch_fut.as_mut(), sleep_fut.as_mut()).await {
956 future::Either::Left((wake, _)) => wake,
957 future::Either::Right((wake, _)) => wake,
958 };
959 match &wake {
963 Wake::Watch(_) => self.applier.metrics.watch.listen_woken_via_watch.inc(),
964 Wake::Sleep(_) => {
965 self.applier.metrics.watch.listen_woken_via_sleep.inc();
966 self.applier.fetch_and_update_state(Some(seqno)).await;
967 }
968 }
969
970 seqno = match self.applier.next_listen_batch(frontier) {
971 Ok(b) => {
972 match &wake {
973 Wake::Watch(_) => {
974 self.applier.metrics.watch.listen_resolved_via_watch.inc()
975 }
976 Wake::Sleep(_) => {
977 self.applier.metrics.watch.listen_resolved_via_sleep.inc()
978 }
979 }
980 return b;
981 }
982 Err(seqno) => seqno,
983 };
984
985 match wake {
988 Wake::Watch(watch) => {
989 watch_fut.set(
990 watch
991 .wait_for_seqno_ge(seqno.next())
992 .map(Wake::Watch)
993 .instrument(trace_span!("snapshot::watch")),
994 );
995 }
996 Wake::Sleep(sleeps) => {
997 debug!(
998 "{:?}: {} {} next_listen_batch didn't find new data, retrying in {:?}",
999 reader_id,
1000 self.applier.shard_metrics.name,
1001 self.shard_id(),
1002 sleeps.next_sleep()
1003 );
1004 sleep_fut.set(
1005 sleeps
1006 .sleep()
1007 .map(Wake::Sleep)
1008 .instrument(trace_span!("snapshot::sleep")),
1009 );
1010 }
1011 }
1012 }
1013 }
1014
1015 async fn apply_unbatched_idempotent_cmd<
1016 R,
1017 WorkFn: FnMut(
1018 SeqNo,
1019 &PersistConfig,
1020 &mut StateCollections<T>,
1021 ) -> ControlFlow<NoOpStateTransition<R>, R>,
1022 >(
1023 &self,
1024 cmd: &CmdMetrics,
1025 mut work_fn: WorkFn,
1026 ) -> (SeqNo, R, RoutineMaintenance) {
1027 let mut retry = self
1028 .applier
1029 .metrics
1030 .retries
1031 .idempotent_cmd
1032 .stream(Retry::persist_defaults(SystemTime::now()).into_retry_stream());
1033 loop {
1034 match self.applier.apply_unbatched_cmd(cmd, &mut work_fn).await {
1035 Ok((seqno, x, maintenance)) => match x {
1036 Ok(x) => {
1037 return (seqno, x, maintenance);
1038 }
1039 Err(NoOpStateTransition(x)) => {
1040 return (seqno, x, maintenance);
1041 }
1042 },
1043 Err(err) => {
1044 if retry.attempt() >= INFO_MIN_ATTEMPTS {
1045 info!(
1046 "apply_unbatched_idempotent_cmd {} received an indeterminate error, retrying in {:?}: {}",
1047 cmd.name,
1048 retry.next_sleep(),
1049 err
1050 );
1051 } else {
1052 debug!(
1053 "apply_unbatched_idempotent_cmd {} received an indeterminate error, retrying in {:?}: {}",
1054 cmd.name,
1055 retry.next_sleep(),
1056 err
1057 );
1058 }
1059 retry = retry.sleep().await;
1060 continue;
1061 }
1062 }
1063 }
1064 }
1065}
1066
1067impl<K, V, T, D> Machine<K, V, T, D>
1068where
1069 K: Debug + Codec,
1070 V: Debug + Codec,
1071 T: Timestamp + Lattice + Codec64 + Sync,
1072 D: Semigroup + Codec64 + PartialEq,
1073{
1074 pub async fn merge_res(
1075 &self,
1076 res: &FueledMergeRes<T>,
1077 ) -> (ApplyMergeResult, RoutineMaintenance) {
1078 let metrics = Arc::clone(&self.applier.metrics);
1079 let use_incremental_compaction = ENABLE_INCREMENTAL_COMPACTION.get(&self.applier.cfg);
1080
1081 let mut merge_result_ever_applied = ApplyMergeResult::NotAppliedNoMatch;
1110 let (_seqno, _apply_merge_result, maintenance) = self
1111 .apply_unbatched_idempotent_cmd(&metrics.cmds.merge_res, |_, _, state| {
1112 let ret = if use_incremental_compaction {
1113 state.apply_merge_res::<D>(res, &Arc::clone(&metrics).columnar)
1114 } else {
1115 state.apply_merge_res_classic::<D>(res, &Arc::clone(&metrics).columnar)
1116 };
1117 if let Continue(result) = ret {
1118 if result.applied() {
1120 merge_result_ever_applied = result;
1121 }
1122 if result.matched() && !result.applied() && !merge_result_ever_applied.applied()
1125 {
1126 merge_result_ever_applied = result;
1127 }
1128 }
1129 ret
1130 })
1131 .await;
1132 (merge_result_ever_applied, maintenance)
1133 }
1134}
1135
1136pub(crate) struct ExpireFn(
1137 pub(crate) Box<dyn FnOnce() -> BoxFuture<'static, ()> + Send + Sync + 'static>,
1142);
1143
1144impl Debug for ExpireFn {
1145 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1146 f.debug_struct("ExpireFn").finish_non_exhaustive()
1147 }
1148}
1149
1150#[derive(Debug)]
1151pub(crate) enum CompareAndAppendRes<T> {
1152 Success(SeqNo, WriterMaintenance<T>),
1153 InvalidUsage(InvalidUsage<T>),
1154 UpperMismatch(SeqNo, Antichain<T>),
1155 InlineBackpressure,
1156}
1157
1158#[cfg(test)]
1159impl<T: Debug> CompareAndAppendRes<T> {
1160 #[track_caller]
1161 fn unwrap(self) -> (SeqNo, WriterMaintenance<T>) {
1162 match self {
1163 CompareAndAppendRes::Success(seqno, maintenance) => (seqno, maintenance),
1164 x => panic!("{:?}", x),
1165 }
1166 }
1167}
1168
1169impl<K, V, T, D> Machine<K, V, T, D>
1170where
1171 K: Debug + Codec,
1172 V: Debug + Codec,
1173 T: Timestamp + Lattice + Codec64 + Sync,
1174 D: Semigroup + Codec64 + Send + Sync,
1175{
1176 #[allow(clippy::unused_async)]
1177 pub async fn start_reader_heartbeat_tasks(
1178 self,
1179 reader_id: LeasedReaderId,
1180 gc: GarbageCollector<K, V, T, D>,
1181 ) -> Vec<JoinHandle<()>> {
1182 let mut ret = Vec::new();
1183 let metrics = Arc::clone(&self.applier.metrics);
1184
1185 let name = format!("persist::heartbeat_read({},{})", self.shard_id(), reader_id);
1197 ret.push(mz_ore::task::spawn(|| name, {
1198 let machine = self.clone();
1199 let reader_id = reader_id.clone();
1200 let gc = gc.clone();
1201 metrics
1202 .tasks
1203 .heartbeat_read
1204 .instrument_task(Self::reader_heartbeat_task(machine, reader_id, gc))
1205 }));
1206
1207 let isolated_runtime = Arc::clone(&self.isolated_runtime);
1208 let name = format!(
1209 "persist::heartbeat_read_isolated({},{})",
1210 self.shard_id(),
1211 reader_id
1212 );
1213 ret.push(
1214 isolated_runtime.spawn_named(
1215 || name,
1216 metrics
1217 .tasks
1218 .heartbeat_read
1219 .instrument_task(Self::reader_heartbeat_task(self, reader_id, gc)),
1220 ),
1221 );
1222
1223 ret
1224 }
1225
1226 async fn reader_heartbeat_task(
1227 machine: Self,
1228 reader_id: LeasedReaderId,
1229 gc: GarbageCollector<K, V, T, D>,
1230 ) {
1231 let sleep_duration = READER_LEASE_DURATION.get(&machine.applier.cfg) / 2;
1232 loop {
1233 let before_sleep = Instant::now();
1234 tokio::time::sleep(sleep_duration).await;
1235
1236 let elapsed_since_before_sleeping = before_sleep.elapsed();
1237 if elapsed_since_before_sleeping > sleep_duration + Duration::from_secs(60) {
1238 warn!(
1239 "reader ({}) of shard ({}) went {}s between heartbeats",
1240 reader_id,
1241 machine.shard_id(),
1242 elapsed_since_before_sleeping.as_secs_f64()
1243 );
1244 }
1245
1246 let before_heartbeat = Instant::now();
1247 let (_seqno, existed, maintenance) = machine
1248 .heartbeat_leased_reader(&reader_id, (machine.applier.cfg.now)())
1249 .await;
1250 maintenance.start_performing(&machine, &gc);
1251
1252 let elapsed_since_heartbeat = before_heartbeat.elapsed();
1253 if elapsed_since_heartbeat > Duration::from_secs(60) {
1254 warn!(
1255 "reader ({}) of shard ({}) heartbeat call took {}s",
1256 reader_id,
1257 machine.shard_id(),
1258 elapsed_since_heartbeat.as_secs_f64(),
1259 );
1260 }
1261
1262 if !existed {
1263 warn!(
1271 "heartbeat task for reader ({}) of shard ({}) exiting due to expired lease \
1272 while read handle is live",
1273 reader_id,
1274 machine.shard_id(),
1275 );
1276 return;
1277 }
1278 }
1279 }
1280}
1281
1282pub(crate) const NEXT_LISTEN_BATCH_RETRYER_FIXED_SLEEP: Config<Duration> = Config::new(
1283 "persist_next_listen_batch_retryer_fixed_sleep",
1284 Duration::from_millis(1200), "\
1286 The fixed sleep when polling for new batches from a Listen or Subscribe. Skipped if zero.",
1287);
1288
1289pub(crate) const NEXT_LISTEN_BATCH_RETRYER_INITIAL_BACKOFF: Config<Duration> = Config::new(
1290 "persist_next_listen_batch_retryer_initial_backoff",
1291 Duration::from_millis(100), "The initial backoff when polling for new batches from a Listen or Subscribe.",
1293);
1294
1295pub(crate) const NEXT_LISTEN_BATCH_RETRYER_MULTIPLIER: Config<u32> = Config::new(
1296 "persist_next_listen_batch_retryer_multiplier",
1297 2,
1298 "The backoff multiplier when polling for new batches from a Listen or Subscribe.",
1299);
1300
1301pub(crate) const NEXT_LISTEN_BATCH_RETRYER_CLAMP: Config<Duration> = Config::new(
1302 "persist_next_listen_batch_retryer_clamp",
1303 Duration::from_secs(16), "The backoff clamp duration when polling for new batches from a Listen or Subscribe.",
1305);
1306
1307fn next_listen_batch_retry_params(cfg: &ConfigSet) -> RetryParameters {
1308 RetryParameters {
1309 fixed_sleep: NEXT_LISTEN_BATCH_RETRYER_FIXED_SLEEP.get(cfg),
1310 initial_backoff: NEXT_LISTEN_BATCH_RETRYER_INITIAL_BACKOFF.get(cfg),
1311 multiplier: NEXT_LISTEN_BATCH_RETRYER_MULTIPLIER.get(cfg),
1312 clamp: NEXT_LISTEN_BATCH_RETRYER_CLAMP.get(cfg),
1313 }
1314}
1315
1316pub const INFO_MIN_ATTEMPTS: usize = 3;
1317
1318pub async fn retry_external<R, F, WorkFn>(metrics: &RetryMetrics, mut work_fn: WorkFn) -> R
1319where
1320 F: std::future::Future<Output = Result<R, ExternalError>>,
1321 WorkFn: FnMut() -> F,
1322{
1323 let mut retry = metrics.stream(Retry::persist_defaults(SystemTime::now()).into_retry_stream());
1324 loop {
1325 match work_fn().await {
1326 Ok(x) => {
1327 if retry.attempt() > 0 {
1328 debug!(
1329 "external operation {} succeeded after failing at least once",
1330 metrics.name,
1331 );
1332 }
1333 return x;
1334 }
1335 Err(err) => {
1336 if retry.attempt() >= INFO_MIN_ATTEMPTS {
1337 info!(
1338 "external operation {} failed, retrying in {:?}: {}",
1339 metrics.name,
1340 retry.next_sleep(),
1341 err.display_with_causes()
1342 );
1343 } else {
1344 debug!(
1345 "external operation {} failed, retrying in {:?}: {}",
1346 metrics.name,
1347 retry.next_sleep(),
1348 err.display_with_causes()
1349 );
1350 }
1351 retry = retry.sleep().await;
1352 }
1353 }
1354 }
1355}
1356
1357pub async fn retry_determinate<R, F, WorkFn>(
1358 metrics: &RetryMetrics,
1359 mut work_fn: WorkFn,
1360) -> Result<R, Indeterminate>
1361where
1362 F: std::future::Future<Output = Result<R, ExternalError>>,
1363 WorkFn: FnMut() -> F,
1364{
1365 let mut retry = metrics.stream(Retry::persist_defaults(SystemTime::now()).into_retry_stream());
1366 loop {
1367 match work_fn().await {
1368 Ok(x) => {
1369 if retry.attempt() > 0 {
1370 debug!(
1371 "external operation {} succeeded after failing at least once",
1372 metrics.name,
1373 );
1374 }
1375 return Ok(x);
1376 }
1377 Err(ExternalError::Determinate(err)) => {
1378 debug!(
1387 "external operation {} failed, retrying in {:?}: {}",
1388 metrics.name,
1389 retry.next_sleep(),
1390 err.display_with_causes()
1391 );
1392 retry = retry.sleep().await;
1393 continue;
1394 }
1395 Err(ExternalError::Indeterminate(x)) => return Err(x),
1396 }
1397 }
1398}
1399
1400#[cfg(test)]
1401pub mod datadriven {
1402 use std::collections::{BTreeMap, BTreeSet};
1403 use std::pin::pin;
1404 use std::sync::{Arc, LazyLock};
1405
1406 use anyhow::anyhow;
1407 use differential_dataflow::consolidation::consolidate_updates;
1408 use differential_dataflow::trace::Description;
1409 use futures::StreamExt;
1410 use mz_dyncfg::{ConfigUpdates, ConfigVal};
1411 use mz_persist::indexed::encoding::BlobTraceBatchPart;
1412 use mz_persist_types::codec_impls::{StringSchema, UnitSchema};
1413
1414 use crate::batch::{
1415 BLOB_TARGET_SIZE, Batch, BatchBuilder, BatchBuilderConfig, BatchBuilderInternal,
1416 BatchParts, validate_truncate_batch,
1417 };
1418 use crate::cfg::COMPACTION_MEMORY_BOUND_BYTES;
1419 use crate::fetch::{EncodedPart, FetchConfig};
1420 use crate::internal::compact::{CompactConfig, CompactReq, Compactor};
1421 use crate::internal::datadriven::DirectiveArgs;
1422 use crate::internal::encoding::Schemas;
1423 use crate::internal::gc::GcReq;
1424 use crate::internal::paths::{BlobKey, BlobKeyPrefix, PartialBlobKey};
1425 use crate::internal::state::{BatchPart, RunOrder, RunPart};
1426 use crate::internal::state_versions::EncodedRollup;
1427 use crate::internal::trace::{CompactionInput, IdHollowBatch, SpineId};
1428 use crate::read::{Listen, ListenEvent, READER_LEASE_DURATION};
1429 use crate::rpc::NoopPubSubSender;
1430 use crate::tests::new_test_client;
1431 use crate::write::COMBINE_INLINE_WRITES;
1432 use crate::{GarbageCollector, PersistClient};
1433
1434 use super::*;
1435
1436 static SCHEMAS: LazyLock<Schemas<String, ()>> = LazyLock::new(|| Schemas {
1437 id: Some(SchemaId(0)),
1438 key: Arc::new(StringSchema),
1439 val: Arc::new(UnitSchema),
1440 });
1441
1442 #[derive(Debug)]
1444 pub struct MachineState {
1445 pub client: PersistClient,
1446 pub shard_id: ShardId,
1447 pub state_versions: Arc<StateVersions>,
1448 pub machine: Machine<String, (), u64, i64>,
1449 pub gc: GarbageCollector<String, (), u64, i64>,
1450 pub batches: BTreeMap<String, IdHollowBatch<u64>>,
1451 pub next_id: usize,
1452 pub rollups: BTreeMap<String, EncodedRollup>,
1453 pub listens: BTreeMap<String, Listen<String, (), u64, i64>>,
1454 pub routine: Vec<RoutineMaintenance>,
1455 pub compactions: BTreeMap<String, CompactReq<u64>>,
1456 }
1457
1458 impl MachineState {
1459 pub async fn new(dyncfgs: &ConfigUpdates) -> Self {
1460 let shard_id = ShardId::new();
1461 let client = new_test_client(dyncfgs).await;
1462 client
1465 .cfg
1466 .set_config(&BLOB_TARGET_SIZE, *BLOB_TARGET_SIZE.default());
1467 client.cfg.set_config(&COMBINE_INLINE_WRITES, false);
1471 let state_versions = Arc::new(StateVersions::new(
1472 client.cfg.clone(),
1473 Arc::clone(&client.consensus),
1474 Arc::clone(&client.blob),
1475 Arc::clone(&client.metrics),
1476 ));
1477 let machine = Machine::new(
1478 client.cfg.clone(),
1479 shard_id,
1480 Arc::clone(&client.metrics),
1481 Arc::clone(&state_versions),
1482 Arc::clone(&client.shared_states),
1483 Arc::new(NoopPubSubSender),
1484 Arc::clone(&client.isolated_runtime),
1485 Diagnostics::for_tests(),
1486 )
1487 .await
1488 .expect("codecs should match");
1489 let gc = GarbageCollector::new(machine.clone(), Arc::clone(&client.isolated_runtime));
1490 MachineState {
1491 shard_id,
1492 client,
1493 state_versions,
1494 machine,
1495 gc,
1496 batches: BTreeMap::default(),
1497 rollups: BTreeMap::default(),
1498 listens: BTreeMap::default(),
1499 routine: Vec::new(),
1500 compactions: BTreeMap::default(),
1501 next_id: 0,
1502 }
1503 }
1504
1505 fn to_batch(&self, hollow: HollowBatch<u64>) -> Batch<String, (), u64, i64> {
1506 Batch::new(
1507 true,
1508 Arc::clone(&self.client.metrics),
1509 Arc::clone(&self.client.blob),
1510 self.client.metrics.shards.shard(&self.shard_id, "test"),
1511 self.client.cfg.build_version.clone(),
1512 hollow,
1513 )
1514 }
1515 }
1516
1517 pub async fn consensus_scan(
1520 datadriven: &MachineState,
1521 args: DirectiveArgs<'_>,
1522 ) -> Result<String, anyhow::Error> {
1523 let from = args.expect("from_seqno");
1524
1525 let mut states = datadriven
1526 .state_versions
1527 .fetch_all_live_states::<u64>(datadriven.shard_id)
1528 .await
1529 .expect("should only be called on an initialized shard")
1530 .check_ts_codec()
1531 .expect("shard codecs should not change");
1532 let mut s = String::new();
1533 while let Some(x) = states.next(|_| {}) {
1534 if x.seqno < from {
1535 continue;
1536 }
1537 let rollups: Vec<_> = x
1538 .collections
1539 .rollups
1540 .keys()
1541 .map(|seqno| seqno.to_string())
1542 .collect();
1543 let batches: Vec<_> = x
1544 .collections
1545 .trace
1546 .batches()
1547 .filter(|b| !b.is_empty())
1548 .filter_map(|b| {
1549 datadriven
1550 .batches
1551 .iter()
1552 .find(|(_, original_batch)| original_batch.batch.parts == b.parts)
1553 .map(|(batch_name, _)| batch_name.to_owned())
1554 })
1555 .collect();
1556 write!(
1557 s,
1558 "seqno={} batches={} rollups={}\n",
1559 x.seqno,
1560 batches.join(","),
1561 rollups.join(","),
1562 );
1563 }
1564 Ok(s)
1565 }
1566
1567 pub async fn consensus_truncate(
1568 datadriven: &MachineState,
1569 args: DirectiveArgs<'_>,
1570 ) -> Result<String, anyhow::Error> {
1571 let to = args.expect("to_seqno");
1572 let removed = datadriven
1573 .client
1574 .consensus
1575 .truncate(&datadriven.shard_id.to_string(), to)
1576 .await
1577 .expect("valid truncation");
1578 Ok(format!("{}\n", removed))
1579 }
1580
1581 pub async fn blob_scan_batches(
1582 datadriven: &MachineState,
1583 _args: DirectiveArgs<'_>,
1584 ) -> Result<String, anyhow::Error> {
1585 let key_prefix = BlobKeyPrefix::Shard(&datadriven.shard_id).to_string();
1586
1587 let mut s = String::new();
1588 let () = datadriven
1589 .state_versions
1590 .blob
1591 .list_keys_and_metadata(&key_prefix, &mut |x| {
1592 let (_, key) = BlobKey::parse_ids(x.key).expect("key should be valid");
1593 if let PartialBlobKey::Batch(_, _) = key {
1594 write!(s, "{}: {}b\n", x.key, x.size_in_bytes);
1595 }
1596 })
1597 .await?;
1598 Ok(s)
1599 }
1600
1601 #[allow(clippy::unused_async)]
1602 pub async fn shard_desc(
1603 datadriven: &MachineState,
1604 _args: DirectiveArgs<'_>,
1605 ) -> Result<String, anyhow::Error> {
1606 Ok(format!(
1607 "since={:?} upper={:?}\n",
1608 datadriven.machine.applier.since().elements(),
1609 datadriven.machine.applier.clone_upper().elements()
1610 ))
1611 }
1612
1613 pub async fn downgrade_since(
1614 datadriven: &mut MachineState,
1615 args: DirectiveArgs<'_>,
1616 ) -> Result<String, anyhow::Error> {
1617 let since = args.expect_antichain("since");
1618 let seqno = args.optional("seqno");
1619 let reader_id = args.expect("reader_id");
1620 let (_, since, routine) = datadriven
1621 .machine
1622 .downgrade_since(
1623 &reader_id,
1624 seqno,
1625 &since,
1626 (datadriven.machine.applier.cfg.now)(),
1627 )
1628 .await;
1629 datadriven.routine.push(routine);
1630 Ok(format!(
1631 "{} {:?}\n",
1632 datadriven.machine.seqno(),
1633 since.0.elements()
1634 ))
1635 }
1636
1637 #[allow(clippy::unused_async)]
1638 pub async fn dyncfg(
1639 datadriven: &MachineState,
1640 args: DirectiveArgs<'_>,
1641 ) -> Result<String, anyhow::Error> {
1642 let mut updates = ConfigUpdates::default();
1643 for x in args.input.trim().split('\n') {
1644 match x.split(' ').collect::<Vec<_>>().as_slice() {
1645 &[name, val] => {
1646 let config = datadriven
1647 .client
1648 .cfg
1649 .entries()
1650 .find(|x| x.name() == name)
1651 .ok_or_else(|| anyhow!("unknown dyncfg: {}", name))?;
1652 match config.val() {
1653 ConfigVal::Usize(_) => {
1654 let val = val.parse().map_err(anyhow::Error::new)?;
1655 updates.add_dynamic(name, ConfigVal::Usize(val));
1656 }
1657 ConfigVal::Bool(_) => {
1658 let val = val.parse().map_err(anyhow::Error::new)?;
1659 updates.add_dynamic(name, ConfigVal::Bool(val));
1660 }
1661 x => unimplemented!("dyncfg type: {:?}", x),
1662 }
1663 }
1664 x => return Err(anyhow!("expected `name val` got: {:?}", x)),
1665 }
1666 }
1667 updates.apply(&datadriven.client.cfg);
1668
1669 Ok("ok\n".to_string())
1670 }
1671
1672 pub async fn compare_and_downgrade_since(
1673 datadriven: &mut MachineState,
1674 args: DirectiveArgs<'_>,
1675 ) -> Result<String, anyhow::Error> {
1676 let expected_opaque: u64 = args.expect("expect_opaque");
1677 let new_opaque: u64 = args.expect("opaque");
1678 let new_since = args.expect_antichain("since");
1679 let reader_id = args.expect("reader_id");
1680 let (res, routine) = datadriven
1681 .machine
1682 .compare_and_downgrade_since(&reader_id, &expected_opaque, (&new_opaque, &new_since))
1683 .await;
1684 datadriven.routine.push(routine);
1685 let since = res.map_err(|(opaque, since)| {
1686 anyhow!("mismatch: opaque={} since={:?}", opaque, since.0.elements())
1687 })?;
1688 Ok(format!(
1689 "{} {} {:?}\n",
1690 datadriven.machine.seqno(),
1691 new_opaque,
1692 since.0.elements()
1693 ))
1694 }
1695
1696 pub async fn write_rollup(
1697 datadriven: &mut MachineState,
1698 args: DirectiveArgs<'_>,
1699 ) -> Result<String, anyhow::Error> {
1700 let output = args.expect_str("output");
1701
1702 let rollup = datadriven
1703 .machine
1704 .applier
1705 .write_rollup_for_state()
1706 .await
1707 .expect("rollup");
1708
1709 datadriven
1710 .rollups
1711 .insert(output.to_string(), rollup.clone());
1712
1713 Ok(format!(
1714 "state={} diffs=[{}, {})\n",
1715 rollup.seqno,
1716 rollup._desc.lower().first().expect("seqno"),
1717 rollup._desc.upper().first().expect("seqno"),
1718 ))
1719 }
1720
1721 pub async fn add_rollup(
1722 datadriven: &mut MachineState,
1723 args: DirectiveArgs<'_>,
1724 ) -> Result<String, anyhow::Error> {
1725 let input = args.expect_str("input");
1726 let rollup = datadriven
1727 .rollups
1728 .get(input)
1729 .expect("unknown batch")
1730 .clone();
1731
1732 let (applied, maintenance) = datadriven
1733 .machine
1734 .add_rollup((rollup.seqno, &rollup.to_hollow()))
1735 .await;
1736
1737 if !applied {
1738 return Err(anyhow!("failed to apply rollup for: {}", rollup.seqno));
1739 }
1740
1741 datadriven.routine.push(maintenance);
1742 Ok(format!("{}\n", datadriven.machine.seqno()))
1743 }
1744
1745 pub async fn write_batch(
1746 datadriven: &mut MachineState,
1747 args: DirectiveArgs<'_>,
1748 ) -> Result<String, anyhow::Error> {
1749 let output = args.expect_str("output");
1750 let lower = args.expect_antichain("lower");
1751 let upper = args.expect_antichain("upper");
1752 assert!(PartialOrder::less_than(&lower, &upper));
1753 let since = args
1754 .optional_antichain("since")
1755 .unwrap_or_else(|| Antichain::from_elem(0));
1756 let target_size = args.optional("target_size");
1757 let parts_size_override = args.optional("parts_size_override");
1758 let consolidate = args.optional("consolidate").unwrap_or(true);
1759 let mut updates: Vec<_> = args
1760 .input
1761 .split('\n')
1762 .flat_map(DirectiveArgs::parse_update)
1763 .collect();
1764
1765 let mut cfg = BatchBuilderConfig::new(&datadriven.client.cfg, datadriven.shard_id);
1766 if let Some(target_size) = target_size {
1767 cfg.blob_target_size = target_size;
1768 };
1769 if consolidate {
1770 consolidate_updates(&mut updates);
1771 }
1772 let run_order = if consolidate {
1773 cfg.preferred_order
1774 } else {
1775 RunOrder::Unordered
1776 };
1777 let parts = BatchParts::new_ordered::<i64>(
1778 cfg.clone(),
1779 run_order,
1780 Arc::clone(&datadriven.client.metrics),
1781 Arc::clone(&datadriven.machine.applier.shard_metrics),
1782 datadriven.shard_id,
1783 Arc::clone(&datadriven.client.blob),
1784 Arc::clone(&datadriven.client.isolated_runtime),
1785 &datadriven.client.metrics.user,
1786 );
1787 let builder = BatchBuilderInternal::new(
1788 cfg.clone(),
1789 parts,
1790 Arc::clone(&datadriven.client.metrics),
1791 SCHEMAS.clone(),
1792 Arc::clone(&datadriven.client.blob),
1793 datadriven.shard_id.clone(),
1794 datadriven.client.cfg.build_version.clone(),
1795 );
1796 let mut builder = BatchBuilder::new(builder, Description::new(lower, upper.clone(), since));
1797 for ((k, ()), t, d) in updates {
1798 builder.add(&k, &(), &t, &d).await.expect("invalid batch");
1799 }
1800 let mut batch = builder.finish(upper).await?;
1801 if parts_size_override.is_some() {
1804 batch
1805 .flush_to_blob(
1806 &cfg,
1807 &datadriven.client.metrics.user,
1808 &datadriven.client.isolated_runtime,
1809 &SCHEMAS,
1810 )
1811 .await;
1812 }
1813 let batch = batch.into_hollow_batch();
1814 let batch = IdHollowBatch {
1815 batch: Arc::new(batch),
1816 id: SpineId(datadriven.next_id, datadriven.next_id + 1),
1817 };
1818 datadriven.next_id += 1;
1819
1820 if let Some(size) = parts_size_override {
1821 let mut batch = batch.clone();
1822 let mut hollow_batch = (*batch.batch).clone();
1823 for part in hollow_batch.parts.iter_mut() {
1824 match part {
1825 RunPart::Many(run) => run.max_part_bytes = size,
1826 RunPart::Single(BatchPart::Hollow(part)) => part.encoded_size_bytes = size,
1827 RunPart::Single(BatchPart::Inline { .. }) => unreachable!("flushed out above"),
1828 }
1829 }
1830 batch.batch = Arc::new(hollow_batch);
1831 datadriven.batches.insert(output.to_owned(), batch);
1832 } else {
1833 datadriven.batches.insert(output.to_owned(), batch.clone());
1834 }
1835 Ok(format!(
1836 "parts={} len={}\n",
1837 batch.batch.part_count(),
1838 batch.batch.len
1839 ))
1840 }
1841
1842 pub async fn fetch_batch(
1843 datadriven: &MachineState,
1844 args: DirectiveArgs<'_>,
1845 ) -> Result<String, anyhow::Error> {
1846 let input = args.expect_str("input");
1847 let stats = args.optional_str("stats");
1848 let batch = datadriven.batches.get(input).expect("unknown batch");
1849
1850 let mut s = String::new();
1851 let mut stream = pin!(
1852 batch
1853 .batch
1854 .part_stream(
1855 datadriven.shard_id,
1856 &*datadriven.state_versions.blob,
1857 &*datadriven.state_versions.metrics
1858 )
1859 .enumerate()
1860 );
1861 while let Some((idx, part)) = stream.next().await {
1862 let part = &*part?;
1863 write!(s, "<part {idx}>\n");
1864
1865 let lower = match part {
1866 BatchPart::Inline { updates, .. } => {
1867 let updates: BlobTraceBatchPart<u64> =
1868 updates.decode(&datadriven.client.metrics.columnar)?;
1869 updates.structured_key_lower()
1870 }
1871 other => other.structured_key_lower(),
1872 };
1873
1874 if let Some(lower) = lower {
1875 if stats == Some("lower") {
1876 writeln!(s, "<key lower={}>", lower.get())
1877 }
1878 }
1879
1880 match part {
1881 BatchPart::Hollow(part) => {
1882 let blob_batch = datadriven
1883 .client
1884 .blob
1885 .get(&part.key.complete(&datadriven.shard_id))
1886 .await;
1887 match blob_batch {
1888 Ok(Some(_)) | Err(_) => {}
1889 Ok(None) => {
1892 s.push_str("<empty>\n");
1893 continue;
1894 }
1895 };
1896 }
1897 BatchPart::Inline { .. } => {}
1898 };
1899 let part = EncodedPart::fetch(
1900 &FetchConfig::from_persist_config(&datadriven.client.cfg),
1901 &datadriven.shard_id,
1902 datadriven.client.blob.as_ref(),
1903 datadriven.client.metrics.as_ref(),
1904 datadriven.machine.applier.shard_metrics.as_ref(),
1905 &datadriven.client.metrics.read.batch_fetcher,
1906 &batch.batch.desc,
1907 part,
1908 )
1909 .await
1910 .expect("invalid batch part");
1911 let part = part
1912 .normalize(&datadriven.client.metrics.columnar)
1913 .into_part::<String, ()>(&*SCHEMAS.key, &*SCHEMAS.val);
1914
1915 for ((k, _v), t, d) in part
1916 .decode_iter::<_, _, u64, i64>(&*SCHEMAS.key, &*SCHEMAS.val)
1917 .expect("valid schemas")
1918 {
1919 writeln!(s, "{k} {t} {d}");
1920 }
1921 }
1922 if !s.is_empty() {
1923 for (idx, (_meta, run)) in batch.batch.runs().enumerate() {
1924 write!(s, "<run {idx}>\n");
1925 for part in run {
1926 let part_idx = batch
1927 .batch
1928 .parts
1929 .iter()
1930 .position(|p| p == part)
1931 .expect("part should exist");
1932 write!(s, "part {part_idx}\n");
1933 }
1934 }
1935 }
1936 Ok(s)
1937 }
1938
1939 #[allow(clippy::unused_async)]
1940 pub async fn truncate_batch_desc(
1941 datadriven: &mut MachineState,
1942 args: DirectiveArgs<'_>,
1943 ) -> Result<String, anyhow::Error> {
1944 let input = args.expect_str("input");
1945 let output = args.expect_str("output");
1946 let lower = args.expect_antichain("lower");
1947 let upper = args.expect_antichain("upper");
1948
1949 let batch = datadriven
1950 .batches
1951 .get(input)
1952 .expect("unknown batch")
1953 .clone();
1954 let truncated_desc = Description::new(lower, upper, batch.batch.desc.since().clone());
1955 let () = validate_truncate_batch(&batch.batch, &truncated_desc, false, true)?;
1956 let mut new_hollow_batch = (*batch.batch).clone();
1957 new_hollow_batch.desc = truncated_desc;
1958 let new_batch = IdHollowBatch {
1959 batch: Arc::new(new_hollow_batch),
1960 id: batch.id,
1961 };
1962 datadriven
1963 .batches
1964 .insert(output.to_owned(), new_batch.clone());
1965 Ok(format!(
1966 "parts={} len={}\n",
1967 batch.batch.part_count(),
1968 batch.batch.len
1969 ))
1970 }
1971
1972 #[allow(clippy::unused_async)]
1973 pub async fn set_batch_parts_size(
1974 datadriven: &mut MachineState,
1975 args: DirectiveArgs<'_>,
1976 ) -> Result<String, anyhow::Error> {
1977 let input = args.expect_str("input");
1978 let size = args.expect("size");
1979 let batch = datadriven.batches.get_mut(input).expect("unknown batch");
1980 let mut hollow_batch = (*batch.batch).clone();
1981 for part in hollow_batch.parts.iter_mut() {
1982 match part {
1983 RunPart::Single(BatchPart::Hollow(x)) => x.encoded_size_bytes = size,
1984 _ => {
1985 panic!("set_batch_parts_size only supports hollow parts")
1986 }
1987 }
1988 }
1989 batch.batch = Arc::new(hollow_batch);
1990 Ok("ok\n".to_string())
1991 }
1992
1993 pub async fn compact(
1994 datadriven: &mut MachineState,
1995 args: DirectiveArgs<'_>,
1996 ) -> Result<String, anyhow::Error> {
1997 let output = args.expect_str("output");
1998 let lower = args.expect_antichain("lower");
1999 let upper = args.expect_antichain("upper");
2000 let since = args.expect_antichain("since");
2001 let target_size = args.optional("target_size");
2002 let memory_bound = args.optional("memory_bound");
2003
2004 let mut inputs = Vec::new();
2005 for input in args.args.get("inputs").expect("missing inputs") {
2006 inputs.push(
2007 datadriven
2008 .batches
2009 .get(input)
2010 .expect("unknown batch")
2011 .clone(),
2012 );
2013 }
2014
2015 let cfg = datadriven.client.cfg.clone();
2016 if let Some(target_size) = target_size {
2017 cfg.set_config(&BLOB_TARGET_SIZE, target_size);
2018 };
2019 if let Some(memory_bound) = memory_bound {
2020 cfg.set_config(&COMPACTION_MEMORY_BOUND_BYTES, memory_bound);
2021 }
2022 let req = CompactReq {
2023 shard_id: datadriven.shard_id,
2024 desc: Description::new(lower, upper, since),
2025 inputs: inputs.clone(),
2026 };
2027 datadriven
2028 .compactions
2029 .insert(output.to_owned(), req.clone());
2030 let spine_lower = inputs
2031 .first()
2032 .map_or_else(|| datadriven.next_id, |x| x.id.0);
2033 let spine_upper = inputs.last().map_or_else(
2034 || {
2035 datadriven.next_id += 1;
2036 datadriven.next_id
2037 },
2038 |x| x.id.1,
2039 );
2040 let new_spine_id = SpineId(spine_lower, spine_upper);
2041 let res = Compactor::<String, (), u64, i64>::compact(
2042 CompactConfig::new(&cfg, datadriven.shard_id),
2043 Arc::clone(&datadriven.client.blob),
2044 Arc::clone(&datadriven.client.metrics),
2045 Arc::clone(&datadriven.machine.applier.shard_metrics),
2046 Arc::clone(&datadriven.client.isolated_runtime),
2047 req,
2048 SCHEMAS.clone(),
2049 )
2050 .await?;
2051
2052 let batch = IdHollowBatch {
2053 batch: Arc::new(res.output.clone()),
2054 id: new_spine_id,
2055 };
2056
2057 datadriven.batches.insert(output.to_owned(), batch.clone());
2058 Ok(format!(
2059 "parts={} len={}\n",
2060 res.output.part_count(),
2061 res.output.len
2062 ))
2063 }
2064
2065 pub async fn clear_blob(
2066 datadriven: &MachineState,
2067 _args: DirectiveArgs<'_>,
2068 ) -> Result<String, anyhow::Error> {
2069 let mut to_delete = vec![];
2070 datadriven
2071 .client
2072 .blob
2073 .list_keys_and_metadata("", &mut |meta| {
2074 to_delete.push(meta.key.to_owned());
2075 })
2076 .await?;
2077 for blob in &to_delete {
2078 datadriven.client.blob.delete(blob).await?;
2079 }
2080 Ok(format!("deleted={}\n", to_delete.len()))
2081 }
2082
2083 pub async fn restore_blob(
2084 datadriven: &MachineState,
2085 _args: DirectiveArgs<'_>,
2086 ) -> Result<String, anyhow::Error> {
2087 let not_restored = crate::internal::restore::restore_blob(
2088 &datadriven.state_versions,
2089 datadriven.client.blob.as_ref(),
2090 &datadriven.client.cfg.build_version,
2091 datadriven.shard_id,
2092 &*datadriven.state_versions.metrics,
2093 )
2094 .await?;
2095 let mut out = String::new();
2096 for key in not_restored {
2097 writeln!(&mut out, "{key}");
2098 }
2099 Ok(out)
2100 }
2101
2102 #[allow(clippy::unused_async)]
2103 pub async fn rewrite_ts(
2104 datadriven: &mut MachineState,
2105 args: DirectiveArgs<'_>,
2106 ) -> Result<String, anyhow::Error> {
2107 let input = args.expect_str("input");
2108 let ts_rewrite = args.expect_antichain("frontier");
2109 let upper = args.expect_antichain("upper");
2110
2111 let batch = datadriven.batches.get_mut(input).expect("unknown batch");
2112 let mut hollow_batch = (*batch.batch).clone();
2113 let () = hollow_batch
2114 .rewrite_ts(&ts_rewrite, upper)
2115 .map_err(|err| anyhow!("invalid rewrite: {}", err))?;
2116 batch.batch = Arc::new(hollow_batch);
2117 Ok("ok\n".into())
2118 }
2119
2120 pub async fn gc(
2121 datadriven: &mut MachineState,
2122 args: DirectiveArgs<'_>,
2123 ) -> Result<String, anyhow::Error> {
2124 let new_seqno_since = args.expect("to_seqno");
2125
2126 let req = GcReq {
2127 shard_id: datadriven.shard_id,
2128 new_seqno_since,
2129 };
2130 let (maintenance, stats) =
2131 GarbageCollector::gc_and_truncate(&datadriven.machine, req).await;
2132 datadriven.routine.push(maintenance);
2133
2134 Ok(format!(
2135 "{} batch_parts={} rollups={} truncated={} state_rollups={}\n",
2136 datadriven.machine.seqno(),
2137 stats.batch_parts_deleted_from_blob,
2138 stats.rollups_deleted_from_blob,
2139 stats
2140 .truncated_consensus_to
2141 .iter()
2142 .map(|x| x.to_string())
2143 .collect::<Vec<_>>()
2144 .join(","),
2145 stats
2146 .rollups_removed_from_state
2147 .iter()
2148 .map(|x| x.to_string())
2149 .collect::<Vec<_>>()
2150 .join(","),
2151 ))
2152 }
2153
2154 pub async fn snapshot(
2155 datadriven: &MachineState,
2156 args: DirectiveArgs<'_>,
2157 ) -> Result<String, anyhow::Error> {
2158 let as_of = args.expect_antichain("as_of");
2159 let snapshot = datadriven
2160 .machine
2161 .snapshot(&as_of)
2162 .await
2163 .map_err(|err| anyhow!("{:?}", err))?;
2164
2165 let mut result = String::new();
2166
2167 for batch in snapshot {
2168 writeln!(
2169 result,
2170 "<batch {:?}-{:?}>",
2171 batch.desc.lower().elements(),
2172 batch.desc.upper().elements()
2173 );
2174 for (run, (_meta, parts)) in batch.runs().enumerate() {
2175 writeln!(result, "<run {run}>");
2176 let mut stream = pin!(
2177 futures::stream::iter(parts)
2178 .flat_map(|part| part.part_stream(
2179 datadriven.shard_id,
2180 &*datadriven.state_versions.blob,
2181 &*datadriven.state_versions.metrics
2182 ))
2183 .enumerate()
2184 );
2185
2186 while let Some((idx, part)) = stream.next().await {
2187 let part = &*part?;
2188 writeln!(result, "<part {idx}>");
2189
2190 let part = EncodedPart::fetch(
2191 &FetchConfig::from_persist_config(&datadriven.client.cfg),
2192 &datadriven.shard_id,
2193 datadriven.client.blob.as_ref(),
2194 datadriven.client.metrics.as_ref(),
2195 datadriven.machine.applier.shard_metrics.as_ref(),
2196 &datadriven.client.metrics.read.batch_fetcher,
2197 &batch.desc,
2198 part,
2199 )
2200 .await
2201 .expect("invalid batch part");
2202 let part = part
2203 .normalize(&datadriven.client.metrics.columnar)
2204 .into_part::<String, ()>(&*SCHEMAS.key, &*SCHEMAS.val);
2205
2206 let mut updates = Vec::new();
2207
2208 for ((k, _v), mut t, d) in part
2209 .decode_iter::<_, _, u64, i64>(&*SCHEMAS.key, &*SCHEMAS.val)
2210 .expect("valid schemas")
2211 {
2212 t.advance_by(as_of.borrow());
2213 updates.push((k, t, d));
2214 }
2215
2216 consolidate_updates(&mut updates);
2217
2218 for (k, t, d) in updates {
2219 writeln!(result, "{k} {t} {d}");
2220 }
2221 }
2222 }
2223 }
2224
2225 Ok(result)
2226 }
2227
2228 pub async fn register_listen(
2229 datadriven: &mut MachineState,
2230 args: DirectiveArgs<'_>,
2231 ) -> Result<String, anyhow::Error> {
2232 let output = args.expect_str("output");
2233 let as_of = args.expect_antichain("as_of");
2234 let read = datadriven
2235 .client
2236 .open_leased_reader::<String, (), u64, i64>(
2237 datadriven.shard_id,
2238 Arc::new(StringSchema),
2239 Arc::new(UnitSchema),
2240 Diagnostics::for_tests(),
2241 true,
2242 )
2243 .await
2244 .expect("invalid shard types");
2245 let listen = read
2246 .listen(as_of)
2247 .await
2248 .map_err(|err| anyhow!("{:?}", err))?;
2249 datadriven.listens.insert(output.to_owned(), listen);
2250 Ok("ok\n".into())
2251 }
2252
2253 pub async fn listen_through(
2254 datadriven: &mut MachineState,
2255 args: DirectiveArgs<'_>,
2256 ) -> Result<String, anyhow::Error> {
2257 let input = args.expect_str("input");
2258 let frontier = args.expect("frontier");
2261 let listen = datadriven.listens.get_mut(input).expect("unknown listener");
2262 let mut s = String::new();
2263 loop {
2264 for event in listen.fetch_next().await {
2265 match event {
2266 ListenEvent::Updates(x) => {
2267 for ((k, _v), t, d) in x.iter() {
2268 write!(s, "{} {} {}\n", k.as_ref().unwrap(), t, d);
2269 }
2270 }
2271 ListenEvent::Progress(x) => {
2272 if !x.less_than(&frontier) {
2273 return Ok(s);
2274 }
2275 }
2276 }
2277 }
2278 }
2279 }
2280
2281 pub async fn register_critical_reader(
2282 datadriven: &mut MachineState,
2283 args: DirectiveArgs<'_>,
2284 ) -> Result<String, anyhow::Error> {
2285 let reader_id = args.expect("reader_id");
2286 let (state, maintenance) = datadriven
2287 .machine
2288 .register_critical_reader::<u64>(&reader_id, "tests")
2289 .await;
2290 datadriven.routine.push(maintenance);
2291 Ok(format!(
2292 "{} {:?}\n",
2293 datadriven.machine.seqno(),
2294 state.since.elements(),
2295 ))
2296 }
2297
2298 pub async fn register_leased_reader(
2299 datadriven: &mut MachineState,
2300 args: DirectiveArgs<'_>,
2301 ) -> Result<String, anyhow::Error> {
2302 let reader_id = args.expect("reader_id");
2303 let (reader_state, maintenance) = datadriven
2304 .machine
2305 .register_leased_reader(
2306 &reader_id,
2307 "tests",
2308 READER_LEASE_DURATION.get(&datadriven.client.cfg),
2309 (datadriven.client.cfg.now)(),
2310 false,
2311 )
2312 .await;
2313 datadriven.routine.push(maintenance);
2314 Ok(format!(
2315 "{} {:?}\n",
2316 datadriven.machine.seqno(),
2317 reader_state.since.elements(),
2318 ))
2319 }
2320
2321 pub async fn heartbeat_leased_reader(
2322 datadriven: &MachineState,
2323 args: DirectiveArgs<'_>,
2324 ) -> Result<String, anyhow::Error> {
2325 let reader_id = args.expect("reader_id");
2326 let _ = datadriven
2327 .machine
2328 .heartbeat_leased_reader(&reader_id, (datadriven.client.cfg.now)())
2329 .await;
2330 Ok(format!("{} ok\n", datadriven.machine.seqno()))
2331 }
2332
2333 pub async fn expire_critical_reader(
2334 datadriven: &mut MachineState,
2335 args: DirectiveArgs<'_>,
2336 ) -> Result<String, anyhow::Error> {
2337 let reader_id = args.expect("reader_id");
2338 let (_, maintenance) = datadriven.machine.expire_critical_reader(&reader_id).await;
2339 datadriven.routine.push(maintenance);
2340 Ok(format!("{} ok\n", datadriven.machine.seqno()))
2341 }
2342
2343 pub async fn expire_leased_reader(
2344 datadriven: &mut MachineState,
2345 args: DirectiveArgs<'_>,
2346 ) -> Result<String, anyhow::Error> {
2347 let reader_id = args.expect("reader_id");
2348 let (_, maintenance) = datadriven.machine.expire_leased_reader(&reader_id).await;
2349 datadriven.routine.push(maintenance);
2350 Ok(format!("{} ok\n", datadriven.machine.seqno()))
2351 }
2352
2353 pub async fn compare_and_append_batches(
2354 datadriven: &MachineState,
2355 args: DirectiveArgs<'_>,
2356 ) -> Result<String, anyhow::Error> {
2357 let expected_upper = args.expect_antichain("expected_upper");
2358 let new_upper = args.expect_antichain("new_upper");
2359
2360 let mut batches: Vec<Batch<String, (), u64, i64>> = args
2361 .args
2362 .get("batches")
2363 .expect("missing batches")
2364 .into_iter()
2365 .map(|batch| {
2366 let hollow = (*datadriven
2367 .batches
2368 .get(batch)
2369 .expect("unknown batch")
2370 .clone()
2371 .batch)
2372 .clone();
2373 datadriven.to_batch(hollow)
2374 })
2375 .collect();
2376
2377 let mut writer = datadriven
2378 .client
2379 .open_writer(
2380 datadriven.shard_id,
2381 Arc::new(StringSchema),
2382 Arc::new(UnitSchema),
2383 Diagnostics::for_tests(),
2384 )
2385 .await?;
2386
2387 let mut batch_refs: Vec<_> = batches.iter_mut().collect();
2388
2389 let () = writer
2390 .compare_and_append_batch(batch_refs.as_mut_slice(), expected_upper, new_upper, true)
2391 .await?
2392 .map_err(|err| anyhow!("upper mismatch: {:?}", err))?;
2393
2394 writer.expire().await;
2395
2396 Ok("ok\n".into())
2397 }
2398
2399 pub async fn expire_writer(
2400 datadriven: &mut MachineState,
2401 args: DirectiveArgs<'_>,
2402 ) -> Result<String, anyhow::Error> {
2403 let writer_id = args.expect("writer_id");
2404 let (_, maintenance) = datadriven.machine.expire_writer(&writer_id).await;
2405 datadriven.routine.push(maintenance);
2406 Ok(format!("{} ok\n", datadriven.machine.seqno()))
2407 }
2408
2409 pub(crate) async fn finalize(
2410 datadriven: &mut MachineState,
2411 _args: DirectiveArgs<'_>,
2412 ) -> anyhow::Result<String> {
2413 let maintenance = datadriven.machine.become_tombstone().await?;
2414 datadriven.routine.push(maintenance);
2415 Ok(format!("{} ok\n", datadriven.machine.seqno()))
2416 }
2417
2418 pub(crate) fn is_finalized(
2419 datadriven: &MachineState,
2420 _args: DirectiveArgs<'_>,
2421 ) -> anyhow::Result<String> {
2422 let seqno = datadriven.machine.seqno();
2423 let tombstone = datadriven.machine.is_finalized();
2424 Ok(format!("{seqno} {tombstone}\n"))
2425 }
2426
2427 pub async fn compare_and_append(
2428 datadriven: &mut MachineState,
2429 args: DirectiveArgs<'_>,
2430 ) -> Result<String, anyhow::Error> {
2431 let input = args.expect_str("input");
2432 let writer_id = args.expect("writer_id");
2433 let mut batch = datadriven
2434 .batches
2435 .get(input)
2436 .expect("unknown batch")
2437 .clone();
2438 let token = args.optional("token").unwrap_or_else(IdempotencyToken::new);
2439 let now = (datadriven.client.cfg.now)();
2440
2441 let (id, maintenance) = datadriven
2442 .machine
2443 .register_schema(&*SCHEMAS.key, &*SCHEMAS.val)
2444 .await;
2445 assert_eq!(id, SCHEMAS.id);
2446 datadriven.routine.push(maintenance);
2447 let maintenance = loop {
2448 let indeterminate = args
2449 .optional::<String>("prev_indeterminate")
2450 .map(|x| Indeterminate::new(anyhow::Error::msg(x)));
2451 let res = datadriven
2452 .machine
2453 .compare_and_append_idempotent(
2454 &batch.batch,
2455 &writer_id,
2456 now,
2457 &token,
2458 &HandleDebugState::default(),
2459 indeterminate,
2460 )
2461 .await;
2462 match res {
2463 CompareAndAppendRes::Success(_, x) => break x,
2464 CompareAndAppendRes::UpperMismatch(_seqno, upper) => {
2465 return Err(anyhow!("{:?}", Upper(upper)));
2466 }
2467 CompareAndAppendRes::InlineBackpressure => {
2468 let hollow_batch = (*batch.batch).clone();
2469 let mut b = datadriven.to_batch(hollow_batch);
2470 let cfg = BatchBuilderConfig::new(&datadriven.client.cfg, datadriven.shard_id);
2471 b.flush_to_blob(
2472 &cfg,
2473 &datadriven.client.metrics.user,
2474 &datadriven.client.isolated_runtime,
2475 &*SCHEMAS,
2476 )
2477 .await;
2478 batch.batch = Arc::new(b.into_hollow_batch());
2479 continue;
2480 }
2481 _ => panic!("{:?}", res),
2482 };
2483 };
2484 datadriven.routine.push(maintenance.routine);
2487 Ok(format!(
2488 "{} {:?}\n",
2489 datadriven.machine.seqno(),
2490 datadriven.machine.applier.clone_upper().elements(),
2491 ))
2492 }
2493
2494 pub async fn apply_merge_res(
2495 datadriven: &mut MachineState,
2496 args: DirectiveArgs<'_>,
2497 ) -> Result<String, anyhow::Error> {
2498 let input = args.expect_str("input");
2499 let batch = datadriven
2500 .batches
2501 .get(input)
2502 .expect("unknown batch")
2503 .clone();
2504 let compact_req = datadriven
2505 .compactions
2506 .get(input)
2507 .expect("unknown compact req")
2508 .clone();
2509 let input_batches = compact_req
2510 .inputs
2511 .iter()
2512 .map(|x| x.id)
2513 .collect::<BTreeSet<_>>();
2514 let lower_spine_bound = input_batches
2515 .first()
2516 .map(|id| id.0)
2517 .expect("at least one batch must be present");
2518 let upper_spine_bound = input_batches
2519 .last()
2520 .map(|id| id.1)
2521 .expect("at least one batch must be present");
2522 let id = SpineId(lower_spine_bound, upper_spine_bound);
2523 let hollow_batch = (*batch.batch).clone();
2524
2525 let (merge_res, maintenance) = datadriven
2526 .machine
2527 .merge_res(&FueledMergeRes {
2528 output: hollow_batch,
2529 input: CompactionInput::IdRange(id),
2530 new_active_compaction: None,
2531 })
2532 .await;
2533 datadriven.routine.push(maintenance);
2534 Ok(format!(
2535 "{} {}\n",
2536 datadriven.machine.seqno(),
2537 merge_res.applied()
2538 ))
2539 }
2540
2541 pub async fn perform_maintenance(
2542 datadriven: &mut MachineState,
2543 _args: DirectiveArgs<'_>,
2544 ) -> Result<String, anyhow::Error> {
2545 let mut s = String::new();
2546 for maintenance in datadriven.routine.drain(..) {
2547 let () = maintenance
2548 .perform(&datadriven.machine, &datadriven.gc)
2549 .await;
2550 let () = datadriven
2551 .machine
2552 .applier
2553 .fetch_and_update_state(None)
2554 .await;
2555 write!(s, "{} ok\n", datadriven.machine.seqno());
2556 }
2557 Ok(s)
2558 }
2559}
2560
2561#[cfg(test)]
2562pub mod tests {
2563 use std::sync::Arc;
2564
2565 use mz_dyncfg::ConfigUpdates;
2566 use mz_ore::cast::CastFrom;
2567 use mz_ore::task::spawn;
2568 use mz_persist::intercept::{InterceptBlob, InterceptHandle};
2569 use mz_persist::location::SeqNo;
2570 use timely::progress::Antichain;
2571
2572 use crate::ShardId;
2573 use crate::batch::BatchBuilderConfig;
2574 use crate::cache::StateCache;
2575 use crate::internal::gc::{GarbageCollector, GcReq};
2576 use crate::internal::state::{HandleDebugState, ROLLUP_THRESHOLD};
2577 use crate::tests::new_test_client;
2578
2579 #[mz_persist_proc::test(tokio::test(flavor = "multi_thread"))]
2580 #[cfg_attr(miri, ignore)] async fn apply_unbatched_cmd_truncate(dyncfgs: ConfigUpdates) {
2582 mz_ore::test::init_logging();
2583
2584 let client = new_test_client(&dyncfgs).await;
2585 client.cfg.set_config(&ROLLUP_THRESHOLD, 5);
2587 let (mut write, _) = client
2588 .expect_open::<String, (), u64, i64>(ShardId::new())
2589 .await;
2590
2591 const NUM_BATCHES: u64 = 100;
2594 for idx in 0..NUM_BATCHES {
2595 let mut batch = write
2596 .expect_batch(&[((idx.to_string(), ()), idx, 1)], idx, idx + 1)
2597 .await;
2598 let cfg = BatchBuilderConfig::new(&client.cfg, write.shard_id());
2601 batch
2602 .flush_to_blob(
2603 &cfg,
2604 &client.metrics.user,
2605 &client.isolated_runtime,
2606 &write.write_schemas,
2607 )
2608 .await;
2609 let (_, writer_maintenance) = write
2610 .machine
2611 .compare_and_append(
2612 &batch.into_hollow_batch(),
2613 &write.writer_id,
2614 &HandleDebugState::default(),
2615 (write.cfg.now)(),
2616 )
2617 .await
2618 .unwrap();
2619 writer_maintenance
2620 .perform(&write.machine, &write.gc, write.compact.as_ref())
2621 .await;
2622 }
2623 let live_diffs = write
2624 .machine
2625 .applier
2626 .state_versions
2627 .fetch_all_live_diffs(&write.machine.shard_id())
2628 .await;
2629 assert!(live_diffs.0.len() > 0);
2631 let max_live_diffs = 2 * usize::cast_from(NUM_BATCHES.next_power_of_two().trailing_zeros());
2635 assert!(
2636 live_diffs.0.len() <= max_live_diffs,
2637 "{} vs {}",
2638 live_diffs.0.len(),
2639 max_live_diffs
2640 );
2641 }
2642
2643 #[mz_persist_proc::test(tokio::test(flavor = "multi_thread"))]
2647 #[cfg_attr(miri, ignore)] async fn regression_gc_skipped_req_and_interrupted(dyncfgs: ConfigUpdates) {
2649 let mut client = new_test_client(&dyncfgs).await;
2650 let intercept = InterceptHandle::default();
2651 client.blob = Arc::new(InterceptBlob::new(
2652 Arc::clone(&client.blob),
2653 intercept.clone(),
2654 ));
2655 let (_, mut read) = client
2656 .expect_open::<String, String, u64, i64>(ShardId::new())
2657 .await;
2658
2659 read.downgrade_since(&Antichain::from_elem(1)).await;
2661 let new_seqno_since = read.machine.applier.seqno_since();
2662
2663 assert!(new_seqno_since > SeqNo::minimum());
2668 intercept.set_post_delete(Some(Arc::new(|_, _| panic!("boom"))));
2669 let machine = read.machine.clone();
2670 let gc = spawn(|| "", async move {
2672 let req = GcReq {
2673 shard_id: machine.shard_id(),
2674 new_seqno_since,
2675 };
2676 GarbageCollector::gc_and_truncate(&machine, req).await
2677 });
2678 let _ = gc.await;
2681
2682 intercept.set_post_delete(None);
2685 let req = GcReq {
2686 shard_id: read.machine.shard_id(),
2687 new_seqno_since,
2688 };
2689 let _ = GarbageCollector::gc_and_truncate(&read.machine, req.clone()).await;
2690 }
2691
2692 #[mz_persist_proc::test(tokio::test(flavor = "multi_thread"))]
2697 #[cfg_attr(miri, ignore)] async fn regression_update_state_after_upper_mismatch(dyncfgs: ConfigUpdates) {
2699 let client = new_test_client(&dyncfgs).await;
2700 let mut client2 = client.clone();
2701
2702 let new_state_cache = Arc::new(StateCache::new_no_metrics());
2705 client2.shared_states = new_state_cache;
2706
2707 let shard_id = ShardId::new();
2708 let (mut write1, _) = client.expect_open::<String, (), u64, i64>(shard_id).await;
2709 let (mut write2, _) = client2.expect_open::<String, (), u64, i64>(shard_id).await;
2710
2711 let data = [
2712 (("1".to_owned(), ()), 1, 1),
2713 (("2".to_owned(), ()), 2, 1),
2714 (("3".to_owned(), ()), 3, 1),
2715 (("4".to_owned(), ()), 4, 1),
2716 (("5".to_owned(), ()), 5, 1),
2717 ];
2718
2719 write1.expect_compare_and_append(&data[..1], 0, 2).await;
2720
2721 write2.expect_compare_and_append(&data[1..2], 2, 3).await;
2724 }
2725}