1use std::fmt::Debug;
13use std::ops::ControlFlow::{self, Break, Continue};
14use std::sync::Arc;
15use std::time::{Duration, Instant, SystemTime};
16
17use differential_dataflow::difference::Monoid;
18use differential_dataflow::lattice::Lattice;
19use futures::FutureExt;
20use futures::future::{self, BoxFuture};
21use mz_dyncfg::{Config, ConfigSet};
22use mz_ore::cast::CastFrom;
23use mz_ore::error::ErrorExt;
24#[allow(unused_imports)] use mz_ore::fmt::FormatBuffer;
26use mz_ore::task::JoinHandle;
27use mz_ore::{assert_none, soft_assert_no_log};
28use mz_persist::location::{ExternalError, Indeterminate, SeqNo};
29use mz_persist::retry::Retry;
30use mz_persist_types::schema::SchemaId;
31use mz_persist_types::{Codec, Codec64, Opaque};
32use semver::Version;
33use timely::PartialOrder;
34use timely::progress::{Antichain, Timestamp};
35use tracing::{Instrument, debug, info, trace_span, warn};
36
37use crate::async_runtime::IsolatedRuntime;
38use crate::batch::INLINE_WRITES_TOTAL_MAX_BYTES;
39use crate::cache::StateCache;
40use crate::cfg::RetryParameters;
41use crate::critical::CriticalReaderId;
42use crate::error::{CodecMismatch, InvalidUsage};
43use crate::internal::apply::Applier;
44use crate::internal::compact::CompactReq;
45use crate::internal::gc::GarbageCollector;
46use crate::internal::maintenance::{RoutineMaintenance, WriterMaintenance};
47use crate::internal::metrics::{CmdMetrics, Metrics, MetricsRetryStream, RetryMetrics};
48use crate::internal::paths::PartialRollupKey;
49use crate::internal::state::{
50 CompareAndAppendBreak, CriticalReaderState, HandleDebugState, HollowBatch, HollowRollup,
51 IdempotencyToken, LeasedReaderState, NoOpStateTransition, Since, SnapshotErr, StateCollections,
52 Upper,
53};
54use crate::internal::state_versions::StateVersions;
55use crate::internal::trace::{ApplyMergeResult, FueledMergeRes};
56use crate::internal::watch::StateWatch;
57use crate::read::{LeasedReaderId, READER_LEASE_DURATION};
58use crate::rpc::PubSubSender;
59use crate::schema::CaESchema;
60use crate::write::WriterId;
61use crate::{Diagnostics, PersistConfig, ShardId};
62
63#[derive(Debug)]
64pub struct Machine<K, V, T, D> {
65 pub(crate) applier: Applier<K, V, T, D>,
66 pub(crate) isolated_runtime: Arc<IsolatedRuntime>,
67}
68
69impl<K, V, T: Clone, D> Clone for Machine<K, V, T, D> {
71 fn clone(&self) -> Self {
72 Self {
73 applier: self.applier.clone(),
74 isolated_runtime: Arc::clone(&self.isolated_runtime),
75 }
76 }
77}
78
79pub(crate) const CLAIM_UNCLAIMED_COMPACTIONS: Config<bool> = Config::new(
80 "persist_claim_unclaimed_compactions",
81 false,
82 "If an append doesn't result in a compaction request, but there is some uncompacted batch \
83 in state, compact that instead.",
84);
85
86pub(crate) const CLAIM_COMPACTION_PERCENT: Config<usize> = Config::new(
87 "persist_claim_compaction_percent",
88 100,
89 "Claim a compaction with the given percent chance, if claiming compactions is enabled. \
90 (If over 100, we'll always claim at least one; for example, if set to 365, we'll claim at least \
91 three and have a 65% chance of claiming a fourth.)",
92);
93
94pub(crate) const CLAIM_COMPACTION_MIN_VERSION: Config<String> = Config::new(
95 "persist_claim_compaction_min_version",
96 String::new(),
97 "If set to a valid version string, compact away any earlier versions if possible.",
98);
99
100impl<K, V, T, D> Machine<K, V, T, D>
101where
102 K: Debug + Codec,
103 V: Debug + Codec,
104 T: Timestamp + Lattice + Codec64 + Sync,
105 D: Monoid + Codec64,
106{
107 pub async fn new(
108 cfg: PersistConfig,
109 shard_id: ShardId,
110 metrics: Arc<Metrics>,
111 state_versions: Arc<StateVersions>,
112 shared_states: Arc<StateCache>,
113 pubsub_sender: Arc<dyn PubSubSender>,
114 isolated_runtime: Arc<IsolatedRuntime>,
115 diagnostics: Diagnostics,
116 ) -> Result<Self, Box<CodecMismatch>> {
117 let applier = Applier::new(
118 cfg,
119 shard_id,
120 metrics,
121 state_versions,
122 shared_states,
123 pubsub_sender,
124 diagnostics,
125 )
126 .await?;
127 Ok(Machine {
128 applier,
129 isolated_runtime,
130 })
131 }
132
133 pub fn shard_id(&self) -> ShardId {
134 self.applier.shard_id
135 }
136
137 pub fn seqno(&self) -> SeqNo {
138 self.applier.seqno()
139 }
140
141 pub async fn add_rollup_for_current_seqno(&self) -> RoutineMaintenance {
142 let rollup = self.applier.write_rollup_for_state().await;
143 let Some(rollup) = rollup else {
144 return RoutineMaintenance::default();
145 };
146
147 let (applied, maintenance) = self.add_rollup((rollup.seqno, &rollup.to_hollow())).await;
148 if !applied {
149 self.applier
152 .state_versions
153 .delete_rollup(&rollup.shard_id, &rollup.key)
154 .await;
155 }
156 maintenance
157 }
158
159 pub async fn add_rollup(
160 &self,
161 add_rollup: (SeqNo, &HollowRollup),
162 ) -> (bool, RoutineMaintenance) {
163 let mut applied_ever_true = false;
166 let metrics = Arc::clone(&self.applier.metrics);
167 let (_seqno, _applied, maintenance) = self
168 .apply_unbatched_idempotent_cmd(&metrics.cmds.add_rollup, |_, _, state| {
169 let ret = state.add_rollup(add_rollup);
170 if let Continue(applied) = ret {
171 applied_ever_true = applied_ever_true || applied;
172 }
173 ret
174 })
175 .await;
176 (applied_ever_true, maintenance)
177 }
178
179 pub async fn remove_rollups(
180 &self,
181 remove_rollups: &[(SeqNo, PartialRollupKey)],
182 ) -> (Vec<SeqNo>, RoutineMaintenance) {
183 let metrics = Arc::clone(&self.applier.metrics);
184 let (_seqno, removed_rollup_seqnos, maintenance) = self
185 .apply_unbatched_idempotent_cmd(&metrics.cmds.remove_rollups, |_, _, state| {
186 state.remove_rollups(remove_rollups)
187 })
188 .await;
189 (removed_rollup_seqnos, maintenance)
190 }
191
192 pub async fn upgrade_version(&self) -> Result<RoutineMaintenance, Version> {
195 let metrics = Arc::clone(&self.applier.metrics);
196 let (_seqno, upgrade_result, maintenance) = self
197 .apply_unbatched_idempotent_cmd(&metrics.cmds.remove_rollups, |_, cfg, state| {
198 if state.version <= cfg.build_version {
199 state.version = cfg.build_version.clone();
202 Continue(Ok(()))
203 } else {
204 Break(NoOpStateTransition(Err(state.version.clone())))
205 }
206 })
207 .await;
208
209 match upgrade_result {
210 Ok(()) => Ok(maintenance),
211 Err(version) => {
212 soft_assert_no_log!(
213 maintenance.is_empty(),
214 "should not generate maintenance on failed upgrade"
215 );
216 Err(version)
217 }
218 }
219 }
220
221 pub async fn register_leased_reader(
222 &self,
223 reader_id: &LeasedReaderId,
224 purpose: &str,
225 lease_duration: Duration,
226 heartbeat_timestamp_ms: u64,
227 use_critical_since: bool,
228 ) -> (LeasedReaderState<T>, RoutineMaintenance) {
229 let metrics = Arc::clone(&self.applier.metrics);
230 let (_seqno, (reader_state, seqno_since), maintenance) = self
231 .apply_unbatched_idempotent_cmd(&metrics.cmds.register, |seqno, cfg, state| {
232 state.register_leased_reader(
233 &cfg.hostname,
234 reader_id,
235 purpose,
236 seqno,
237 lease_duration,
238 heartbeat_timestamp_ms,
239 use_critical_since,
240 )
241 })
242 .await;
243 debug_assert!(
252 reader_state.seqno >= seqno_since,
253 "{} vs {}",
254 reader_state.seqno,
255 seqno_since,
256 );
257 (reader_state, maintenance)
258 }
259
260 pub async fn register_critical_reader<O: Opaque + Codec64>(
261 &self,
262 reader_id: &CriticalReaderId,
263 purpose: &str,
264 ) -> (CriticalReaderState<T>, RoutineMaintenance) {
265 let metrics = Arc::clone(&self.applier.metrics);
266 let (_seqno, state, maintenance) = self
267 .apply_unbatched_idempotent_cmd(&metrics.cmds.register, |_seqno, cfg, state| {
268 state.register_critical_reader::<O>(&cfg.hostname, reader_id, purpose)
269 })
270 .await;
271 (state, maintenance)
272 }
273
274 pub async fn register_schema(
275 &self,
276 key_schema: &K::Schema,
277 val_schema: &V::Schema,
278 ) -> (Option<SchemaId>, RoutineMaintenance) {
279 let metrics = Arc::clone(&self.applier.metrics);
280 let (_seqno, state, maintenance) = self
281 .apply_unbatched_idempotent_cmd(&metrics.cmds.register, |_seqno, _cfg, state| {
282 state.register_schema::<K, V>(key_schema, val_schema)
283 })
284 .await;
285 (state, maintenance)
286 }
287
288 pub async fn spine_exert(&self, fuel: usize) -> (Vec<CompactReq<T>>, RoutineMaintenance) {
289 if fuel == 0 || self.applier.all_batches().len() < 2 {
291 return (Vec::new(), RoutineMaintenance::default());
292 }
293
294 let metrics = Arc::clone(&self.applier.metrics);
295 let (_seqno, reqs, maintenance) = self
296 .apply_unbatched_idempotent_cmd(&metrics.cmds.spine_exert, |_seqno, _cfg, state| {
297 state.spine_exert(fuel)
298 })
299 .await;
300 let reqs = reqs
301 .into_iter()
302 .map(|req| CompactReq {
303 shard_id: self.shard_id(),
304 desc: req.desc,
305 inputs: req.inputs,
306 })
307 .collect();
308 (reqs, maintenance)
309 }
310
311 pub async fn compare_and_append(
312 &self,
313 batch: &HollowBatch<T>,
314 writer_id: &WriterId,
315 debug_info: &HandleDebugState,
316 heartbeat_timestamp_ms: u64,
317 ) -> CompareAndAppendRes<T> {
318 let idempotency_token = IdempotencyToken::new();
319 loop {
320 let res = self
321 .compare_and_append_idempotent(
322 batch,
323 writer_id,
324 heartbeat_timestamp_ms,
325 &idempotency_token,
326 debug_info,
327 None,
328 )
329 .await;
330 match res {
331 CompareAndAppendRes::Success(seqno, maintenance) => {
332 return CompareAndAppendRes::Success(seqno, maintenance);
333 }
334 CompareAndAppendRes::InvalidUsage(x) => {
335 return CompareAndAppendRes::InvalidUsage(x);
336 }
337 CompareAndAppendRes::InlineBackpressure => {
338 return CompareAndAppendRes::InlineBackpressure;
339 }
340 CompareAndAppendRes::UpperMismatch(seqno, _current_upper) => {
341 self.applier.fetch_and_update_state(Some(seqno)).await;
348 let (current_seqno, current_upper) =
349 self.applier.upper(|seqno, upper| (seqno, upper.clone()));
350
351 if ¤t_upper != batch.desc.lower() {
354 return CompareAndAppendRes::UpperMismatch(current_seqno, current_upper);
355 } else {
356 }
359 }
360 }
361 }
362 }
363
364 async fn compare_and_append_idempotent(
365 &self,
366 batch: &HollowBatch<T>,
367 writer_id: &WriterId,
368 heartbeat_timestamp_ms: u64,
369 idempotency_token: &IdempotencyToken,
370 debug_info: &HandleDebugState,
371 mut indeterminate: Option<Indeterminate>,
375 ) -> CompareAndAppendRes<T> {
376 let metrics = Arc::clone(&self.applier.metrics);
377 let lease_duration_ms = self
378 .applier
379 .cfg
380 .writer_lease_duration
381 .as_millis()
382 .try_into()
383 .expect("reasonable duration");
384 let mut retry = self
467 .applier
468 .metrics
469 .retries
470 .compare_and_append_idempotent
471 .stream(Retry::persist_defaults(SystemTime::now()).into_retry_stream());
472 let mut writer_was_present = false;
473 loop {
474 let cmd_res = self
475 .applier
476 .apply_unbatched_cmd(&metrics.cmds.compare_and_append, |_, cfg, state| {
477 writer_was_present = state.writers.contains_key(writer_id);
478 state.compare_and_append(
479 batch,
480 writer_id,
481 heartbeat_timestamp_ms,
482 lease_duration_ms,
483 idempotency_token,
484 debug_info,
485 INLINE_WRITES_TOTAL_MAX_BYTES.get(cfg),
486 if CLAIM_UNCLAIMED_COMPACTIONS.get(cfg) {
487 CLAIM_COMPACTION_PERCENT.get(cfg)
488 } else {
489 0
490 },
491 Version::parse(&CLAIM_COMPACTION_MIN_VERSION.get(cfg))
492 .ok()
493 .as_ref(),
494 )
495 })
496 .await;
497 let (seqno, res, routine) = match cmd_res {
498 Ok(x) => x,
499 Err(err) => {
500 info!(
503 "compare_and_append received an indeterminate error, retrying in {:?}: {}",
504 retry.next_sleep(),
505 err
506 );
507 if indeterminate.is_none() {
508 indeterminate = Some(err);
509 }
510 retry = retry.sleep().await;
511 continue;
512 }
513 };
514 match res {
515 Ok(merge_reqs) => {
516 let mut compact_reqs = Vec::with_capacity(merge_reqs.len());
519 for req in merge_reqs {
520 let req = CompactReq {
521 shard_id: self.shard_id(),
522 desc: req.desc,
523 inputs: req.inputs,
524 };
525 compact_reqs.push(req);
526 }
527 let writer_maintenance = WriterMaintenance {
528 routine,
529 compaction: compact_reqs,
530 };
531
532 if !writer_was_present {
533 metrics.state.writer_added.inc();
534 }
535 for part in &batch.parts {
536 if part.is_inline() {
537 let bytes = u64::cast_from(part.inline_bytes());
538 metrics.inline.part_commit_bytes.inc_by(bytes);
539 metrics.inline.part_commit_count.inc();
540 }
541 }
542 return CompareAndAppendRes::Success(seqno, writer_maintenance);
543 }
544 Err(CompareAndAppendBreak::AlreadyCommitted) => {
545 assert!(indeterminate.is_some());
549 self.applier.metrics.cmds.compare_and_append_noop.inc();
550 if !writer_was_present {
551 metrics.state.writer_added.inc();
552 }
553 return CompareAndAppendRes::Success(seqno, WriterMaintenance::default());
554 }
555 Err(CompareAndAppendBreak::InvalidUsage(err)) => {
556 assert_none!(indeterminate);
561 return CompareAndAppendRes::InvalidUsage(err);
562 }
563 Err(CompareAndAppendBreak::InlineBackpressure) => {
564 return CompareAndAppendRes::InlineBackpressure;
567 }
568 Err(CompareAndAppendBreak::Upper {
569 shard_upper,
570 writer_upper,
571 }) => {
572 assert!(
577 PartialOrder::less_equal(&writer_upper, &shard_upper),
578 "{:?} vs {:?}",
579 &writer_upper,
580 &shard_upper
581 );
582 if PartialOrder::less_than(&writer_upper, batch.desc.upper()) {
583 return CompareAndAppendRes::UpperMismatch(seqno, shard_upper);
587 }
588 if indeterminate.is_none() {
589 return CompareAndAppendRes::UpperMismatch(seqno, shard_upper);
594 }
595 panic!(
605 concat!(
606 "cannot distinguish compare_and_append success or failure ",
607 "caa_lower={:?} caa_upper={:?} writer_upper={:?} shard_upper={:?} err={:?}"
608 ),
609 batch.desc.lower().elements(),
610 batch.desc.upper().elements(),
611 writer_upper.elements(),
612 shard_upper.elements(),
613 indeterminate,
614 );
615 }
616 };
617 }
618 }
619
620 pub async fn downgrade_since(
621 &self,
622 reader_id: &LeasedReaderId,
623 outstanding_seqno: Option<SeqNo>,
624 new_since: &Antichain<T>,
625 heartbeat_timestamp_ms: u64,
626 ) -> (SeqNo, Since<T>, RoutineMaintenance) {
627 let metrics = Arc::clone(&self.applier.metrics);
628 self.apply_unbatched_idempotent_cmd(&metrics.cmds.downgrade_since, |seqno, _cfg, state| {
629 state.downgrade_since(
630 reader_id,
631 seqno,
632 outstanding_seqno,
633 new_since,
634 heartbeat_timestamp_ms,
635 )
636 })
637 .await
638 }
639
640 pub async fn compare_and_downgrade_since<O: Opaque + Codec64>(
641 &self,
642 reader_id: &CriticalReaderId,
643 expected_opaque: &O,
644 (new_opaque, new_since): (&O, &Antichain<T>),
645 ) -> (Result<Since<T>, (O, Since<T>)>, RoutineMaintenance) {
646 let metrics = Arc::clone(&self.applier.metrics);
647 let (_seqno, res, maintenance) = self
648 .apply_unbatched_idempotent_cmd(
649 &metrics.cmds.compare_and_downgrade_since,
650 |_seqno, _cfg, state| {
651 state.compare_and_downgrade_since::<O>(
652 reader_id,
653 expected_opaque,
654 (new_opaque, new_since),
655 )
656 },
657 )
658 .await;
659
660 match res {
661 Ok(since) => (Ok(since), maintenance),
662 Err((opaque, since)) => (Err((opaque, since)), maintenance),
663 }
664 }
665
666 pub async fn heartbeat_leased_reader(
667 &self,
668 reader_id: &LeasedReaderId,
669 heartbeat_timestamp_ms: u64,
670 ) -> (SeqNo, bool, RoutineMaintenance) {
671 let metrics = Arc::clone(&self.applier.metrics);
672 let (seqno, existed, maintenance) = self
673 .apply_unbatched_idempotent_cmd(&metrics.cmds.heartbeat_reader, |_, _, state| {
674 state.heartbeat_leased_reader(reader_id, heartbeat_timestamp_ms)
675 })
676 .await;
677 (seqno, existed, maintenance)
678 }
679
680 pub async fn expire_leased_reader(
681 &self,
682 reader_id: &LeasedReaderId,
683 ) -> (SeqNo, RoutineMaintenance) {
684 let metrics = Arc::clone(&self.applier.metrics);
685 let (seqno, _existed, maintenance) = self
686 .apply_unbatched_idempotent_cmd(&metrics.cmds.expire_reader, |_, _, state| {
687 state.expire_leased_reader(reader_id)
688 })
689 .await;
690 (seqno, maintenance)
691 }
692
693 #[allow(dead_code)] pub async fn expire_critical_reader(
695 &self,
696 reader_id: &CriticalReaderId,
697 ) -> (SeqNo, RoutineMaintenance) {
698 let metrics = Arc::clone(&self.applier.metrics);
699 let (seqno, _existed, maintenance) = self
700 .apply_unbatched_idempotent_cmd(&metrics.cmds.expire_reader, |_, _, state| {
701 state.expire_critical_reader(reader_id)
702 })
703 .await;
704 (seqno, maintenance)
705 }
706
707 pub async fn expire_writer(&self, writer_id: &WriterId) -> (SeqNo, RoutineMaintenance) {
708 let metrics = Arc::clone(&self.applier.metrics);
709 let (seqno, _existed, maintenance) = self
710 .apply_unbatched_idempotent_cmd(&metrics.cmds.expire_writer, |_, _, state| {
711 state.expire_writer(writer_id)
712 })
713 .await;
714 metrics.state.writer_removed.inc();
715 (seqno, maintenance)
716 }
717
718 pub fn is_finalized(&self) -> bool {
719 self.applier.is_finalized()
720 }
721
722 pub fn get_schema(&self, schema_id: SchemaId) -> Option<(K::Schema, V::Schema)> {
724 self.applier.get_schema(schema_id)
725 }
726
727 pub fn latest_schema(&self) -> Option<(SchemaId, K::Schema, V::Schema)> {
729 self.applier.latest_schema()
730 }
731
732 pub fn find_schema(&self, key_schema: &K::Schema, val_schema: &V::Schema) -> Option<SchemaId> {
734 self.applier.find_schema(key_schema, val_schema)
735 }
736
737 pub async fn compare_and_evolve_schema(
741 &self,
742 expected: SchemaId,
743 key_schema: &K::Schema,
744 val_schema: &V::Schema,
745 ) -> (CaESchema<K, V>, RoutineMaintenance) {
746 let metrics = Arc::clone(&self.applier.metrics);
747 let (_seqno, state, maintenance) = self
748 .apply_unbatched_idempotent_cmd(
749 &metrics.cmds.compare_and_evolve_schema,
750 |_seqno, _cfg, state| {
751 state.compare_and_evolve_schema::<K, V>(expected, key_schema, val_schema)
752 },
753 )
754 .await;
755 (state, maintenance)
756 }
757
758 async fn tombstone_step(&self) -> Result<(bool, RoutineMaintenance), InvalidUsage<T>> {
759 let metrics = Arc::clone(&self.applier.metrics);
760 let mut retry = self
761 .applier
762 .metrics
763 .retries
764 .idempotent_cmd
765 .stream(Retry::persist_defaults(SystemTime::now()).into_retry_stream());
766 loop {
767 let res = self
768 .applier
769 .apply_unbatched_cmd(&metrics.cmds.become_tombstone, |_, _, state| {
770 state.become_tombstone_and_shrink()
771 })
772 .await;
773 let err = match res {
774 Ok((_seqno, Ok(()), maintenance)) => return Ok((true, maintenance)),
775 Ok((_seqno, Err(NoOpStateTransition(())), maintenance)) => {
776 return Ok((false, maintenance));
777 }
778 Err(err) => err,
779 };
780 if retry.attempt() >= INFO_MIN_ATTEMPTS {
781 info!(
782 "become_tombstone received an indeterminate error, retrying in {:?}: {}",
783 retry.next_sleep(),
784 err
785 );
786 } else {
787 debug!(
788 "become_tombstone received an indeterminate error, retrying in {:?}: {}",
789 retry.next_sleep(),
790 err
791 );
792 }
793 retry = retry.sleep().await;
794 }
795 }
796
797 pub async fn become_tombstone(&self) -> Result<RoutineMaintenance, InvalidUsage<T>> {
798 self.applier.check_since_upper_both_empty()?;
799
800 let mut maintenance = RoutineMaintenance::default();
801
802 loop {
803 let (made_progress, more_maintenance) = self.tombstone_step().await?;
804 maintenance.merge(more_maintenance);
805 if !made_progress {
806 break;
807 }
808 }
809
810 Ok(maintenance)
811 }
812
813 pub async fn snapshot(&self, as_of: &Antichain<T>) -> Result<Vec<HollowBatch<T>>, Since<T>> {
814 let start = Instant::now();
815 let (mut seqno, mut upper) = match self.applier.snapshot(as_of) {
816 Ok(x) => return Ok(x),
817 Err(SnapshotErr::AsOfNotYetAvailable(seqno, Upper(upper))) => (seqno, upper),
818 Err(SnapshotErr::AsOfHistoricalDistinctionsLost(Since(since))) => {
819 return Err(Since(since));
820 }
821 };
822
823 let mut watch = self.applier.watch();
826 let watch = &mut watch;
827 let sleeps = self
828 .applier
829 .metrics
830 .retries
831 .snapshot
832 .stream(Retry::persist_defaults(SystemTime::now()).into_retry_stream());
833
834 enum Wake<'a, K, V, T, D> {
835 Watch(&'a mut StateWatch<K, V, T, D>),
836 Sleep(MetricsRetryStream),
837 }
838 let mut watch_fut = std::pin::pin!(
839 watch
840 .wait_for_seqno_ge(seqno.next())
841 .map(Wake::Watch)
842 .instrument(trace_span!("snapshot::watch")),
843 );
844 let mut sleep_fut = std::pin::pin!(
845 sleeps
846 .sleep()
847 .map(Wake::Sleep)
848 .instrument(trace_span!("snapshot::sleep")),
849 );
850
851 let mut logged_at_info = false;
855 loop {
856 if !logged_at_info && start.elapsed() >= Duration::from_millis(1024) {
860 logged_at_info = true;
861 info!(
862 "snapshot {} {} as of {:?} not yet available for {} upper {:?}",
863 self.applier.shard_metrics.name,
864 self.shard_id(),
865 as_of.elements(),
866 seqno,
867 upper.elements(),
868 );
869 } else {
870 debug!(
871 "snapshot {} {} as of {:?} not yet available for {} upper {:?}",
872 self.applier.shard_metrics.name,
873 self.shard_id(),
874 as_of.elements(),
875 seqno,
876 upper.elements(),
877 );
878 }
879
880 let wake = match future::select(watch_fut.as_mut(), sleep_fut.as_mut()).await {
881 future::Either::Left((wake, _)) => wake,
882 future::Either::Right((wake, _)) => wake,
883 };
884 match &wake {
888 Wake::Watch(_) => self.applier.metrics.watch.snapshot_woken_via_watch.inc(),
889 Wake::Sleep(_) => {
890 self.applier.metrics.watch.snapshot_woken_via_sleep.inc();
891 self.applier.fetch_and_update_state(Some(seqno)).await;
892 }
893 }
894
895 (seqno, upper) = match self.applier.snapshot(as_of) {
896 Ok(x) => {
897 if logged_at_info {
898 info!(
899 "snapshot {} {} as of {:?} now available",
900 self.applier.shard_metrics.name,
901 self.shard_id(),
902 as_of.elements(),
903 );
904 }
905 return Ok(x);
906 }
907 Err(SnapshotErr::AsOfNotYetAvailable(seqno, Upper(upper))) => {
908 (seqno, upper)
910 }
911 Err(SnapshotErr::AsOfHistoricalDistinctionsLost(Since(since))) => {
912 return Err(Since(since));
913 }
914 };
915
916 match wake {
917 Wake::Watch(watch) => {
918 watch_fut.set(
919 watch
920 .wait_for_seqno_ge(seqno.next())
921 .map(Wake::Watch)
922 .instrument(trace_span!("snapshot::watch")),
923 );
924 }
925 Wake::Sleep(sleeps) => {
926 debug!(
927 "snapshot {} {} sleeping for {:?}",
928 self.applier.shard_metrics.name,
929 self.shard_id(),
930 sleeps.next_sleep()
931 );
932 sleep_fut.set(
933 sleeps
934 .sleep()
935 .map(Wake::Sleep)
936 .instrument(trace_span!("snapshot::sleep")),
937 );
938 }
939 }
940 }
941 }
942
943 pub fn verify_listen(&self, as_of: &Antichain<T>) -> Result<(), Since<T>> {
945 self.applier.verify_listen(as_of)
946 }
947
948 pub async fn next_listen_batch(
949 &self,
950 frontier: &Antichain<T>,
951 watch: &mut StateWatch<K, V, T, D>,
952 reader_id: Option<&LeasedReaderId>,
953 retry: Option<RetryParameters>,
955 ) -> HollowBatch<T> {
956 let mut seqno = match self.applier.next_listen_batch(frontier) {
957 Ok(b) => return b,
958 Err(seqno) => seqno,
959 };
960
961 let retry = retry.unwrap_or_else(|| next_listen_batch_retry_params(&self.applier.cfg));
964 let sleeps = self
965 .applier
966 .metrics
967 .retries
968 .next_listen_batch
969 .stream(retry.into_retry(SystemTime::now()).into_retry_stream());
970
971 enum Wake<'a, K, V, T, D> {
972 Watch(&'a mut StateWatch<K, V, T, D>),
973 Sleep(MetricsRetryStream),
974 }
975 let mut watch_fut = std::pin::pin!(
976 watch
977 .wait_for_seqno_ge(seqno.next())
978 .map(Wake::Watch)
979 .instrument(trace_span!("snapshot::watch"))
980 );
981 let mut sleep_fut = std::pin::pin!(
982 sleeps
983 .sleep()
984 .map(Wake::Sleep)
985 .instrument(trace_span!("snapshot::sleep"))
986 );
987
988 loop {
989 let wake = match future::select(watch_fut.as_mut(), sleep_fut.as_mut()).await {
990 future::Either::Left((wake, _)) => wake,
991 future::Either::Right((wake, _)) => wake,
992 };
993 match &wake {
997 Wake::Watch(_) => self.applier.metrics.watch.listen_woken_via_watch.inc(),
998 Wake::Sleep(_) => {
999 self.applier.metrics.watch.listen_woken_via_sleep.inc();
1000 self.applier.fetch_and_update_state(Some(seqno)).await;
1001 }
1002 }
1003
1004 seqno = match self.applier.next_listen_batch(frontier) {
1005 Ok(b) => {
1006 match &wake {
1007 Wake::Watch(_) => {
1008 self.applier.metrics.watch.listen_resolved_via_watch.inc()
1009 }
1010 Wake::Sleep(_) => {
1011 self.applier.metrics.watch.listen_resolved_via_sleep.inc()
1012 }
1013 }
1014 return b;
1015 }
1016 Err(seqno) => seqno,
1017 };
1018
1019 match wake {
1022 Wake::Watch(watch) => {
1023 watch_fut.set(
1024 watch
1025 .wait_for_seqno_ge(seqno.next())
1026 .map(Wake::Watch)
1027 .instrument(trace_span!("snapshot::watch")),
1028 );
1029 }
1030 Wake::Sleep(sleeps) => {
1031 debug!(
1032 "{:?}: {} {} next_listen_batch didn't find new data, retrying in {:?}",
1033 reader_id,
1034 self.applier.shard_metrics.name,
1035 self.shard_id(),
1036 sleeps.next_sleep()
1037 );
1038 sleep_fut.set(
1039 sleeps
1040 .sleep()
1041 .map(Wake::Sleep)
1042 .instrument(trace_span!("snapshot::sleep")),
1043 );
1044 }
1045 }
1046 }
1047 }
1048
1049 async fn apply_unbatched_idempotent_cmd<
1050 R,
1051 WorkFn: FnMut(
1052 SeqNo,
1053 &PersistConfig,
1054 &mut StateCollections<T>,
1055 ) -> ControlFlow<NoOpStateTransition<R>, R>,
1056 >(
1057 &self,
1058 cmd: &CmdMetrics,
1059 mut work_fn: WorkFn,
1060 ) -> (SeqNo, R, RoutineMaintenance) {
1061 let mut retry = self
1062 .applier
1063 .metrics
1064 .retries
1065 .idempotent_cmd
1066 .stream(Retry::persist_defaults(SystemTime::now()).into_retry_stream());
1067 loop {
1068 match self.applier.apply_unbatched_cmd(cmd, &mut work_fn).await {
1069 Ok((seqno, x, maintenance)) => match x {
1070 Ok(x) => {
1071 return (seqno, x, maintenance);
1072 }
1073 Err(NoOpStateTransition(x)) => {
1074 return (seqno, x, maintenance);
1075 }
1076 },
1077 Err(err) => {
1078 if retry.attempt() >= INFO_MIN_ATTEMPTS {
1079 info!(
1080 "apply_unbatched_idempotent_cmd {} received an indeterminate error, retrying in {:?}: {}",
1081 cmd.name,
1082 retry.next_sleep(),
1083 err
1084 );
1085 } else {
1086 debug!(
1087 "apply_unbatched_idempotent_cmd {} received an indeterminate error, retrying in {:?}: {}",
1088 cmd.name,
1089 retry.next_sleep(),
1090 err
1091 );
1092 }
1093 retry = retry.sleep().await;
1094 continue;
1095 }
1096 }
1097 }
1098 }
1099}
1100
1101impl<K, V, T, D> Machine<K, V, T, D>
1102where
1103 K: Debug + Codec,
1104 V: Debug + Codec,
1105 T: Timestamp + Lattice + Codec64 + Sync,
1106 D: Monoid + Codec64 + PartialEq,
1107{
1108 pub async fn merge_res(
1109 &self,
1110 res: &FueledMergeRes<T>,
1111 ) -> (ApplyMergeResult, RoutineMaintenance) {
1112 let metrics = Arc::clone(&self.applier.metrics);
1113
1114 let mut merge_result_ever_applied = ApplyMergeResult::NotAppliedNoMatch;
1143 let (_seqno, _apply_merge_result, maintenance) = self
1144 .apply_unbatched_idempotent_cmd(&metrics.cmds.merge_res, |_, _, state| {
1145 let ret = state.apply_merge_res::<D>(res, &Arc::clone(&metrics).columnar);
1146 if let Continue(result) = ret {
1147 if result.applied() {
1149 merge_result_ever_applied = result;
1150 }
1151 if result.matched() && !result.applied() && !merge_result_ever_applied.applied()
1154 {
1155 merge_result_ever_applied = result;
1156 }
1157 }
1158 ret
1159 })
1160 .await;
1161 (merge_result_ever_applied, maintenance)
1162 }
1163}
1164
1165pub(crate) struct ExpireFn(
1166 pub(crate) Box<dyn FnOnce() -> BoxFuture<'static, ()> + Send + Sync + 'static>,
1171);
1172
1173impl Debug for ExpireFn {
1174 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1175 f.debug_struct("ExpireFn").finish_non_exhaustive()
1176 }
1177}
1178
1179#[derive(Debug)]
1180pub(crate) enum CompareAndAppendRes<T> {
1181 Success(SeqNo, WriterMaintenance<T>),
1182 InvalidUsage(InvalidUsage<T>),
1183 UpperMismatch(SeqNo, Antichain<T>),
1184 InlineBackpressure,
1185}
1186
1187#[cfg(test)]
1188impl<T: Debug> CompareAndAppendRes<T> {
1189 #[track_caller]
1190 fn unwrap(self) -> (SeqNo, WriterMaintenance<T>) {
1191 match self {
1192 CompareAndAppendRes::Success(seqno, maintenance) => (seqno, maintenance),
1193 x => panic!("{:?}", x),
1194 }
1195 }
1196}
1197
1198impl<K, V, T, D> Machine<K, V, T, D>
1199where
1200 K: Debug + Codec,
1201 V: Debug + Codec,
1202 T: Timestamp + Lattice + Codec64 + Sync,
1203 D: Monoid + Codec64 + Send + Sync,
1204{
1205 #[allow(clippy::unused_async)]
1206 pub async fn start_reader_heartbeat_tasks(
1207 self,
1208 reader_id: LeasedReaderId,
1209 gc: GarbageCollector<K, V, T, D>,
1210 ) -> Vec<JoinHandle<()>> {
1211 let mut ret = Vec::new();
1212 let metrics = Arc::clone(&self.applier.metrics);
1213
1214 let name = format!("persist::heartbeat_read({},{})", self.shard_id(), reader_id);
1226 ret.push(mz_ore::task::spawn(|| name, {
1227 let machine = self.clone();
1228 let reader_id = reader_id.clone();
1229 let gc = gc.clone();
1230 metrics
1231 .tasks
1232 .heartbeat_read
1233 .instrument_task(Self::reader_heartbeat_task(machine, reader_id, gc))
1234 }));
1235
1236 let isolated_runtime = Arc::clone(&self.isolated_runtime);
1237 let name = format!(
1238 "persist::heartbeat_read_isolated({},{})",
1239 self.shard_id(),
1240 reader_id
1241 );
1242 ret.push(
1243 isolated_runtime.spawn_named(
1244 || name,
1245 metrics
1246 .tasks
1247 .heartbeat_read
1248 .instrument_task(Self::reader_heartbeat_task(self, reader_id, gc)),
1249 ),
1250 );
1251
1252 ret
1253 }
1254
1255 async fn reader_heartbeat_task(
1256 machine: Self,
1257 reader_id: LeasedReaderId,
1258 gc: GarbageCollector<K, V, T, D>,
1259 ) {
1260 let sleep_duration = READER_LEASE_DURATION.get(&machine.applier.cfg) / 2;
1261 loop {
1262 let before_sleep = Instant::now();
1263 tokio::time::sleep(sleep_duration).await;
1264
1265 let elapsed_since_before_sleeping = before_sleep.elapsed();
1266 if elapsed_since_before_sleeping > sleep_duration + Duration::from_secs(60) {
1267 warn!(
1268 "reader ({}) of shard ({}) went {}s between heartbeats",
1269 reader_id,
1270 machine.shard_id(),
1271 elapsed_since_before_sleeping.as_secs_f64()
1272 );
1273 }
1274
1275 let before_heartbeat = Instant::now();
1276 let (_seqno, existed, maintenance) = machine
1277 .heartbeat_leased_reader(&reader_id, (machine.applier.cfg.now)())
1278 .await;
1279 maintenance.start_performing(&machine, &gc);
1280
1281 let elapsed_since_heartbeat = before_heartbeat.elapsed();
1282 if elapsed_since_heartbeat > Duration::from_secs(60) {
1283 warn!(
1284 "reader ({}) of shard ({}) heartbeat call took {}s",
1285 reader_id,
1286 machine.shard_id(),
1287 elapsed_since_heartbeat.as_secs_f64(),
1288 );
1289 }
1290
1291 if !existed {
1292 warn!(
1300 "heartbeat task for reader ({}) of shard ({}) exiting due to expired lease \
1301 while read handle is live",
1302 reader_id,
1303 machine.shard_id(),
1304 );
1305 return;
1306 }
1307 }
1308 }
1309}
1310
1311pub(crate) const NEXT_LISTEN_BATCH_RETRYER_FIXED_SLEEP: Config<Duration> = Config::new(
1312 "persist_next_listen_batch_retryer_fixed_sleep",
1313 Duration::from_millis(1200), "\
1315 The fixed sleep when polling for new batches from a Listen or Subscribe. Skipped if zero.",
1316);
1317
1318pub(crate) const NEXT_LISTEN_BATCH_RETRYER_INITIAL_BACKOFF: Config<Duration> = Config::new(
1319 "persist_next_listen_batch_retryer_initial_backoff",
1320 Duration::from_millis(100), "The initial backoff when polling for new batches from a Listen or Subscribe.",
1322);
1323
1324pub(crate) const NEXT_LISTEN_BATCH_RETRYER_MULTIPLIER: Config<u32> = Config::new(
1325 "persist_next_listen_batch_retryer_multiplier",
1326 2,
1327 "The backoff multiplier when polling for new batches from a Listen or Subscribe.",
1328);
1329
1330pub(crate) const NEXT_LISTEN_BATCH_RETRYER_CLAMP: Config<Duration> = Config::new(
1331 "persist_next_listen_batch_retryer_clamp",
1332 Duration::from_secs(16), "The backoff clamp duration when polling for new batches from a Listen or Subscribe.",
1334);
1335
1336fn next_listen_batch_retry_params(cfg: &ConfigSet) -> RetryParameters {
1337 RetryParameters {
1338 fixed_sleep: NEXT_LISTEN_BATCH_RETRYER_FIXED_SLEEP.get(cfg),
1339 initial_backoff: NEXT_LISTEN_BATCH_RETRYER_INITIAL_BACKOFF.get(cfg),
1340 multiplier: NEXT_LISTEN_BATCH_RETRYER_MULTIPLIER.get(cfg),
1341 clamp: NEXT_LISTEN_BATCH_RETRYER_CLAMP.get(cfg),
1342 }
1343}
1344
1345pub const INFO_MIN_ATTEMPTS: usize = 3;
1346
1347pub async fn retry_external<R, F, WorkFn>(metrics: &RetryMetrics, mut work_fn: WorkFn) -> R
1348where
1349 F: std::future::Future<Output = Result<R, ExternalError>>,
1350 WorkFn: FnMut() -> F,
1351{
1352 let mut retry = metrics.stream(Retry::persist_defaults(SystemTime::now()).into_retry_stream());
1353 loop {
1354 match work_fn().await {
1355 Ok(x) => {
1356 if retry.attempt() > 0 {
1357 debug!(
1358 "external operation {} succeeded after failing at least once",
1359 metrics.name,
1360 );
1361 }
1362 return x;
1363 }
1364 Err(err) => {
1365 if retry.attempt() >= INFO_MIN_ATTEMPTS {
1366 info!(
1367 "external operation {} failed, retrying in {:?}: {}",
1368 metrics.name,
1369 retry.next_sleep(),
1370 err.display_with_causes()
1371 );
1372 } else {
1373 debug!(
1374 "external operation {} failed, retrying in {:?}: {}",
1375 metrics.name,
1376 retry.next_sleep(),
1377 err.display_with_causes()
1378 );
1379 }
1380 retry = retry.sleep().await;
1381 }
1382 }
1383 }
1384}
1385
1386pub async fn retry_determinate<R, F, WorkFn>(
1387 metrics: &RetryMetrics,
1388 mut work_fn: WorkFn,
1389) -> Result<R, Indeterminate>
1390where
1391 F: std::future::Future<Output = Result<R, ExternalError>>,
1392 WorkFn: FnMut() -> F,
1393{
1394 let mut retry = metrics.stream(Retry::persist_defaults(SystemTime::now()).into_retry_stream());
1395 loop {
1396 match work_fn().await {
1397 Ok(x) => {
1398 if retry.attempt() > 0 {
1399 debug!(
1400 "external operation {} succeeded after failing at least once",
1401 metrics.name,
1402 );
1403 }
1404 return Ok(x);
1405 }
1406 Err(ExternalError::Determinate(err)) => {
1407 debug!(
1416 "external operation {} failed, retrying in {:?}: {}",
1417 metrics.name,
1418 retry.next_sleep(),
1419 err.display_with_causes()
1420 );
1421 retry = retry.sleep().await;
1422 continue;
1423 }
1424 Err(ExternalError::Indeterminate(x)) => return Err(x),
1425 }
1426 }
1427}
1428
1429#[cfg(test)]
1430pub mod datadriven {
1431 use std::collections::{BTreeMap, BTreeSet};
1432 use std::pin::pin;
1433 use std::sync::{Arc, LazyLock};
1434
1435 use anyhow::anyhow;
1436 use differential_dataflow::consolidation::consolidate_updates;
1437 use differential_dataflow::trace::Description;
1438 use futures::StreamExt;
1439 use mz_dyncfg::{ConfigUpdates, ConfigVal};
1440 use mz_persist::indexed::encoding::BlobTraceBatchPart;
1441 use mz_persist_types::codec_impls::{StringSchema, UnitSchema};
1442
1443 use crate::batch::{
1444 BLOB_TARGET_SIZE, Batch, BatchBuilder, BatchBuilderConfig, BatchBuilderInternal,
1445 BatchParts, validate_truncate_batch,
1446 };
1447 use crate::cfg::COMPACTION_MEMORY_BOUND_BYTES;
1448 use crate::fetch::{EncodedPart, FetchConfig};
1449 use crate::internal::compact::{CompactConfig, CompactReq, Compactor};
1450 use crate::internal::datadriven::DirectiveArgs;
1451 use crate::internal::encoding::Schemas;
1452 use crate::internal::gc::GcReq;
1453 use crate::internal::paths::{BlobKey, BlobKeyPrefix, PartialBlobKey};
1454 use crate::internal::state::{BatchPart, RunOrder, RunPart};
1455 use crate::internal::state_versions::EncodedRollup;
1456 use crate::internal::trace::{CompactionInput, IdHollowBatch, SpineId};
1457 use crate::read::{Listen, ListenEvent, READER_LEASE_DURATION};
1458 use crate::rpc::NoopPubSubSender;
1459 use crate::tests::new_test_client;
1460 use crate::write::COMBINE_INLINE_WRITES;
1461 use crate::{GarbageCollector, PersistClient};
1462
1463 use super::*;
1464
1465 static SCHEMAS: LazyLock<Schemas<String, ()>> = LazyLock::new(|| Schemas {
1466 id: Some(SchemaId(0)),
1467 key: Arc::new(StringSchema),
1468 val: Arc::new(UnitSchema),
1469 });
1470
1471 #[derive(Debug)]
1473 pub struct MachineState {
1474 pub client: PersistClient,
1475 pub shard_id: ShardId,
1476 pub state_versions: Arc<StateVersions>,
1477 pub machine: Machine<String, (), u64, i64>,
1478 pub gc: GarbageCollector<String, (), u64, i64>,
1479 pub batches: BTreeMap<String, IdHollowBatch<u64>>,
1480 pub next_id: usize,
1481 pub rollups: BTreeMap<String, EncodedRollup>,
1482 pub listens: BTreeMap<String, Listen<String, (), u64, i64>>,
1483 pub routine: Vec<RoutineMaintenance>,
1484 pub compactions: BTreeMap<String, CompactReq<u64>>,
1485 }
1486
1487 impl MachineState {
1488 pub async fn new(dyncfgs: &ConfigUpdates) -> Self {
1489 let shard_id = ShardId::new();
1490 let client = new_test_client(dyncfgs).await;
1491 client
1494 .cfg
1495 .set_config(&BLOB_TARGET_SIZE, *BLOB_TARGET_SIZE.default());
1496 client.cfg.set_config(&COMBINE_INLINE_WRITES, false);
1500 let state_versions = Arc::new(StateVersions::new(
1501 client.cfg.clone(),
1502 Arc::clone(&client.consensus),
1503 Arc::clone(&client.blob),
1504 Arc::clone(&client.metrics),
1505 ));
1506 let machine = Machine::new(
1507 client.cfg.clone(),
1508 shard_id,
1509 Arc::clone(&client.metrics),
1510 Arc::clone(&state_versions),
1511 Arc::clone(&client.shared_states),
1512 Arc::new(NoopPubSubSender),
1513 Arc::clone(&client.isolated_runtime),
1514 Diagnostics::for_tests(),
1515 )
1516 .await
1517 .expect("codecs should match");
1518 let gc = GarbageCollector::new(machine.clone(), Arc::clone(&client.isolated_runtime));
1519 MachineState {
1520 shard_id,
1521 client,
1522 state_versions,
1523 machine,
1524 gc,
1525 batches: BTreeMap::default(),
1526 rollups: BTreeMap::default(),
1527 listens: BTreeMap::default(),
1528 routine: Vec::new(),
1529 compactions: BTreeMap::default(),
1530 next_id: 0,
1531 }
1532 }
1533
1534 fn to_batch(&self, hollow: HollowBatch<u64>) -> Batch<String, (), u64, i64> {
1535 Batch::new(
1536 true,
1537 Arc::clone(&self.client.metrics),
1538 Arc::clone(&self.client.blob),
1539 self.client.metrics.shards.shard(&self.shard_id, "test"),
1540 self.client.cfg.build_version.clone(),
1541 (
1542 <String>::encode_schema(&*SCHEMAS.key),
1543 <()>::encode_schema(&*SCHEMAS.val),
1544 ),
1545 hollow,
1546 )
1547 }
1548 }
1549
1550 pub async fn consensus_scan(
1553 datadriven: &MachineState,
1554 args: DirectiveArgs<'_>,
1555 ) -> Result<String, anyhow::Error> {
1556 let from = args.expect("from_seqno");
1557
1558 let mut states = datadriven
1559 .state_versions
1560 .fetch_all_live_states::<u64>(datadriven.shard_id)
1561 .await
1562 .expect("should only be called on an initialized shard")
1563 .check_ts_codec()
1564 .expect("shard codecs should not change");
1565 let mut s = String::new();
1566 while let Some(x) = states.next(|_| {}) {
1567 if x.seqno < from {
1568 continue;
1569 }
1570 let rollups: Vec<_> = x
1571 .collections
1572 .rollups
1573 .keys()
1574 .map(|seqno| seqno.to_string())
1575 .collect();
1576 let batches: Vec<_> = x
1577 .collections
1578 .trace
1579 .batches()
1580 .filter(|b| !b.is_empty())
1581 .filter_map(|b| {
1582 datadriven
1583 .batches
1584 .iter()
1585 .find(|(_, original_batch)| original_batch.batch.parts == b.parts)
1586 .map(|(batch_name, _)| batch_name.to_owned())
1587 })
1588 .collect();
1589 write!(
1590 s,
1591 "seqno={} batches={} rollups={}\n",
1592 x.seqno,
1593 batches.join(","),
1594 rollups.join(","),
1595 );
1596 }
1597 Ok(s)
1598 }
1599
1600 pub async fn consensus_truncate(
1601 datadriven: &MachineState,
1602 args: DirectiveArgs<'_>,
1603 ) -> Result<String, anyhow::Error> {
1604 let to = args.expect("to_seqno");
1605 let removed = datadriven
1606 .client
1607 .consensus
1608 .truncate(&datadriven.shard_id.to_string(), to)
1609 .await
1610 .expect("valid truncation");
1611 Ok(format!("{}\n", removed))
1612 }
1613
1614 pub async fn blob_scan_batches(
1615 datadriven: &MachineState,
1616 _args: DirectiveArgs<'_>,
1617 ) -> Result<String, anyhow::Error> {
1618 let key_prefix = BlobKeyPrefix::Shard(&datadriven.shard_id).to_string();
1619
1620 let mut s = String::new();
1621 let () = datadriven
1622 .state_versions
1623 .blob
1624 .list_keys_and_metadata(&key_prefix, &mut |x| {
1625 let (_, key) = BlobKey::parse_ids(x.key).expect("key should be valid");
1626 if let PartialBlobKey::Batch(_, _) = key {
1627 write!(s, "{}: {}b\n", x.key, x.size_in_bytes);
1628 }
1629 })
1630 .await?;
1631 Ok(s)
1632 }
1633
1634 #[allow(clippy::unused_async)]
1635 pub async fn shard_desc(
1636 datadriven: &MachineState,
1637 _args: DirectiveArgs<'_>,
1638 ) -> Result<String, anyhow::Error> {
1639 Ok(format!(
1640 "since={:?} upper={:?}\n",
1641 datadriven.machine.applier.since().elements(),
1642 datadriven.machine.applier.clone_upper().elements()
1643 ))
1644 }
1645
1646 pub async fn downgrade_since(
1647 datadriven: &mut MachineState,
1648 args: DirectiveArgs<'_>,
1649 ) -> Result<String, anyhow::Error> {
1650 let since = args.expect_antichain("since");
1651 let seqno = args.optional("seqno");
1652 let reader_id = args.expect("reader_id");
1653 let (_, since, routine) = datadriven
1654 .machine
1655 .downgrade_since(
1656 &reader_id,
1657 seqno,
1658 &since,
1659 (datadriven.machine.applier.cfg.now)(),
1660 )
1661 .await;
1662 datadriven.routine.push(routine);
1663 Ok(format!(
1664 "{} {:?}\n",
1665 datadriven.machine.seqno(),
1666 since.0.elements()
1667 ))
1668 }
1669
1670 #[allow(clippy::unused_async)]
1671 pub async fn dyncfg(
1672 datadriven: &MachineState,
1673 args: DirectiveArgs<'_>,
1674 ) -> Result<String, anyhow::Error> {
1675 let mut updates = ConfigUpdates::default();
1676 for x in args.input.trim().split('\n') {
1677 match x.split(' ').collect::<Vec<_>>().as_slice() {
1678 &[name, val] => {
1679 let config = datadriven
1680 .client
1681 .cfg
1682 .entries()
1683 .find(|x| x.name() == name)
1684 .ok_or_else(|| anyhow!("unknown dyncfg: {}", name))?;
1685 match config.val() {
1686 ConfigVal::Usize(_) => {
1687 let val = val.parse().map_err(anyhow::Error::new)?;
1688 updates.add_dynamic(name, ConfigVal::Usize(val));
1689 }
1690 ConfigVal::Bool(_) => {
1691 let val = val.parse().map_err(anyhow::Error::new)?;
1692 updates.add_dynamic(name, ConfigVal::Bool(val));
1693 }
1694 x => unimplemented!("dyncfg type: {:?}", x),
1695 }
1696 }
1697 x => return Err(anyhow!("expected `name val` got: {:?}", x)),
1698 }
1699 }
1700 updates.apply(&datadriven.client.cfg);
1701
1702 Ok("ok\n".to_string())
1703 }
1704
1705 pub async fn compare_and_downgrade_since(
1706 datadriven: &mut MachineState,
1707 args: DirectiveArgs<'_>,
1708 ) -> Result<String, anyhow::Error> {
1709 let expected_opaque: u64 = args.expect("expect_opaque");
1710 let new_opaque: u64 = args.expect("opaque");
1711 let new_since = args.expect_antichain("since");
1712 let reader_id = args.expect("reader_id");
1713 let (res, routine) = datadriven
1714 .machine
1715 .compare_and_downgrade_since(&reader_id, &expected_opaque, (&new_opaque, &new_since))
1716 .await;
1717 datadriven.routine.push(routine);
1718 let since = res.map_err(|(opaque, since)| {
1719 anyhow!("mismatch: opaque={} since={:?}", opaque, since.0.elements())
1720 })?;
1721 Ok(format!(
1722 "{} {} {:?}\n",
1723 datadriven.machine.seqno(),
1724 new_opaque,
1725 since.0.elements()
1726 ))
1727 }
1728
1729 pub async fn write_rollup(
1730 datadriven: &mut MachineState,
1731 args: DirectiveArgs<'_>,
1732 ) -> Result<String, anyhow::Error> {
1733 let output = args.expect_str("output");
1734
1735 let rollup = datadriven
1736 .machine
1737 .applier
1738 .write_rollup_for_state()
1739 .await
1740 .expect("rollup");
1741
1742 datadriven
1743 .rollups
1744 .insert(output.to_string(), rollup.clone());
1745
1746 Ok(format!(
1747 "state={} diffs=[{}, {})\n",
1748 rollup.seqno,
1749 rollup._desc.lower().first().expect("seqno"),
1750 rollup._desc.upper().first().expect("seqno"),
1751 ))
1752 }
1753
1754 pub async fn add_rollup(
1755 datadriven: &mut MachineState,
1756 args: DirectiveArgs<'_>,
1757 ) -> Result<String, anyhow::Error> {
1758 let input = args.expect_str("input");
1759 let rollup = datadriven
1760 .rollups
1761 .get(input)
1762 .expect("unknown batch")
1763 .clone();
1764
1765 let (applied, maintenance) = datadriven
1766 .machine
1767 .add_rollup((rollup.seqno, &rollup.to_hollow()))
1768 .await;
1769
1770 if !applied {
1771 return Err(anyhow!("failed to apply rollup for: {}", rollup.seqno));
1772 }
1773
1774 datadriven.routine.push(maintenance);
1775 Ok(format!("{}\n", datadriven.machine.seqno()))
1776 }
1777
1778 pub async fn write_batch(
1779 datadriven: &mut MachineState,
1780 args: DirectiveArgs<'_>,
1781 ) -> Result<String, anyhow::Error> {
1782 let output = args.expect_str("output");
1783 let lower = args.expect_antichain("lower");
1784 let upper = args.expect_antichain("upper");
1785 assert!(PartialOrder::less_than(&lower, &upper));
1786 let since = args
1787 .optional_antichain("since")
1788 .unwrap_or_else(|| Antichain::from_elem(0));
1789 let target_size = args.optional("target_size");
1790 let parts_size_override = args.optional("parts_size_override");
1791 let consolidate = args.optional("consolidate").unwrap_or(true);
1792 let mut updates: Vec<_> = args
1793 .input
1794 .split('\n')
1795 .flat_map(DirectiveArgs::parse_update)
1796 .collect();
1797
1798 let mut cfg = BatchBuilderConfig::new(&datadriven.client.cfg, datadriven.shard_id);
1799 if let Some(target_size) = target_size {
1800 cfg.blob_target_size = target_size;
1801 };
1802 if consolidate {
1803 consolidate_updates(&mut updates);
1804 }
1805 let run_order = if consolidate {
1806 cfg.preferred_order
1807 } else {
1808 RunOrder::Unordered
1809 };
1810 let parts = BatchParts::new_ordered::<i64>(
1811 cfg.clone(),
1812 run_order,
1813 Arc::clone(&datadriven.client.metrics),
1814 Arc::clone(&datadriven.machine.applier.shard_metrics),
1815 datadriven.shard_id,
1816 Arc::clone(&datadriven.client.blob),
1817 Arc::clone(&datadriven.client.isolated_runtime),
1818 &datadriven.client.metrics.user,
1819 );
1820 let builder = BatchBuilderInternal::new(
1821 cfg.clone(),
1822 parts,
1823 Arc::clone(&datadriven.client.metrics),
1824 SCHEMAS.clone(),
1825 Arc::clone(&datadriven.client.blob),
1826 datadriven.shard_id.clone(),
1827 datadriven.client.cfg.build_version.clone(),
1828 );
1829 let mut builder = BatchBuilder::new(builder, Description::new(lower, upper.clone(), since));
1830 for ((k, ()), t, d) in updates {
1831 builder.add(&k, &(), &t, &d).await.expect("invalid batch");
1832 }
1833 let mut batch = builder.finish(upper).await?;
1834 if parts_size_override.is_some() {
1837 batch
1838 .flush_to_blob(
1839 &cfg,
1840 &datadriven.client.metrics.user,
1841 &datadriven.client.isolated_runtime,
1842 &SCHEMAS,
1843 )
1844 .await;
1845 }
1846 let batch = batch.into_hollow_batch();
1847 let batch = IdHollowBatch {
1848 batch: Arc::new(batch),
1849 id: SpineId(datadriven.next_id, datadriven.next_id + 1),
1850 };
1851 datadriven.next_id += 1;
1852
1853 if let Some(size) = parts_size_override {
1854 let mut batch = batch.clone();
1855 let mut hollow_batch = (*batch.batch).clone();
1856 for part in hollow_batch.parts.iter_mut() {
1857 match part {
1858 RunPart::Many(run) => run.max_part_bytes = size,
1859 RunPart::Single(BatchPart::Hollow(part)) => part.encoded_size_bytes = size,
1860 RunPart::Single(BatchPart::Inline { .. }) => unreachable!("flushed out above"),
1861 }
1862 }
1863 batch.batch = Arc::new(hollow_batch);
1864 datadriven.batches.insert(output.to_owned(), batch);
1865 } else {
1866 datadriven.batches.insert(output.to_owned(), batch.clone());
1867 }
1868 Ok(format!(
1869 "parts={} len={}\n",
1870 batch.batch.part_count(),
1871 batch.batch.len
1872 ))
1873 }
1874
1875 pub async fn fetch_batch(
1876 datadriven: &MachineState,
1877 args: DirectiveArgs<'_>,
1878 ) -> Result<String, anyhow::Error> {
1879 let input = args.expect_str("input");
1880 let stats = args.optional_str("stats");
1881 let batch = datadriven.batches.get(input).expect("unknown batch");
1882
1883 let mut s = String::new();
1884 let mut stream = pin!(
1885 batch
1886 .batch
1887 .part_stream(
1888 datadriven.shard_id,
1889 &*datadriven.state_versions.blob,
1890 &*datadriven.state_versions.metrics
1891 )
1892 .enumerate()
1893 );
1894 while let Some((idx, part)) = stream.next().await {
1895 let part = &*part?;
1896 write!(s, "<part {idx}>\n");
1897
1898 let lower = match part {
1899 BatchPart::Inline { updates, .. } => {
1900 let updates: BlobTraceBatchPart<u64> =
1901 updates.decode(&datadriven.client.metrics.columnar)?;
1902 updates.structured_key_lower()
1903 }
1904 other => other.structured_key_lower(),
1905 };
1906
1907 if let Some(lower) = lower {
1908 if stats == Some("lower") {
1909 writeln!(s, "<key lower={}>", lower.get())
1910 }
1911 }
1912
1913 match part {
1914 BatchPart::Hollow(part) => {
1915 let blob_batch = datadriven
1916 .client
1917 .blob
1918 .get(&part.key.complete(&datadriven.shard_id))
1919 .await;
1920 match blob_batch {
1921 Ok(Some(_)) | Err(_) => {}
1922 Ok(None) => {
1925 s.push_str("<empty>\n");
1926 continue;
1927 }
1928 };
1929 }
1930 BatchPart::Inline { .. } => {}
1931 };
1932 let part = EncodedPart::fetch(
1933 &FetchConfig::from_persist_config(&datadriven.client.cfg),
1934 &datadriven.shard_id,
1935 datadriven.client.blob.as_ref(),
1936 datadriven.client.metrics.as_ref(),
1937 datadriven.machine.applier.shard_metrics.as_ref(),
1938 &datadriven.client.metrics.read.batch_fetcher,
1939 &batch.batch.desc,
1940 part,
1941 )
1942 .await
1943 .expect("invalid batch part");
1944 let part = part
1945 .normalize(&datadriven.client.metrics.columnar)
1946 .into_part::<String, ()>(&*SCHEMAS.key, &*SCHEMAS.val);
1947
1948 for ((k, _v), t, d) in part
1949 .decode_iter::<_, _, u64, i64>(&*SCHEMAS.key, &*SCHEMAS.val)
1950 .expect("valid schemas")
1951 {
1952 writeln!(s, "{k} {t} {d}");
1953 }
1954 }
1955 if !s.is_empty() {
1956 for (idx, (_meta, run)) in batch.batch.runs().enumerate() {
1957 write!(s, "<run {idx}>\n");
1958 for part in run {
1959 let part_idx = batch
1960 .batch
1961 .parts
1962 .iter()
1963 .position(|p| p == part)
1964 .expect("part should exist");
1965 write!(s, "part {part_idx}\n");
1966 }
1967 }
1968 }
1969 Ok(s)
1970 }
1971
1972 #[allow(clippy::unused_async)]
1973 pub async fn truncate_batch_desc(
1974 datadriven: &mut MachineState,
1975 args: DirectiveArgs<'_>,
1976 ) -> Result<String, anyhow::Error> {
1977 let input = args.expect_str("input");
1978 let output = args.expect_str("output");
1979 let lower = args.expect_antichain("lower");
1980 let upper = args.expect_antichain("upper");
1981
1982 let batch = datadriven
1983 .batches
1984 .get(input)
1985 .expect("unknown batch")
1986 .clone();
1987 let truncated_desc = Description::new(lower, upper, batch.batch.desc.since().clone());
1988 let () = validate_truncate_batch(&batch.batch, &truncated_desc, false, true)?;
1989 let mut new_hollow_batch = (*batch.batch).clone();
1990 new_hollow_batch.desc = truncated_desc;
1991 let new_batch = IdHollowBatch {
1992 batch: Arc::new(new_hollow_batch),
1993 id: batch.id,
1994 };
1995 datadriven
1996 .batches
1997 .insert(output.to_owned(), new_batch.clone());
1998 Ok(format!(
1999 "parts={} len={}\n",
2000 batch.batch.part_count(),
2001 batch.batch.len
2002 ))
2003 }
2004
2005 #[allow(clippy::unused_async)]
2006 pub async fn set_batch_parts_size(
2007 datadriven: &mut MachineState,
2008 args: DirectiveArgs<'_>,
2009 ) -> Result<String, anyhow::Error> {
2010 let input = args.expect_str("input");
2011 let size = args.expect("size");
2012 let batch = datadriven.batches.get_mut(input).expect("unknown batch");
2013 let mut hollow_batch = (*batch.batch).clone();
2014 for part in hollow_batch.parts.iter_mut() {
2015 match part {
2016 RunPart::Single(BatchPart::Hollow(x)) => x.encoded_size_bytes = size,
2017 _ => {
2018 panic!("set_batch_parts_size only supports hollow parts")
2019 }
2020 }
2021 }
2022 batch.batch = Arc::new(hollow_batch);
2023 Ok("ok\n".to_string())
2024 }
2025
2026 pub async fn compact(
2027 datadriven: &mut MachineState,
2028 args: DirectiveArgs<'_>,
2029 ) -> Result<String, anyhow::Error> {
2030 let output = args.expect_str("output");
2031 let lower = args.expect_antichain("lower");
2032 let upper = args.expect_antichain("upper");
2033 let since = args.expect_antichain("since");
2034 let target_size = args.optional("target_size");
2035 let memory_bound = args.optional("memory_bound");
2036
2037 let mut inputs = Vec::new();
2038 for input in args.args.get("inputs").expect("missing inputs") {
2039 inputs.push(
2040 datadriven
2041 .batches
2042 .get(input)
2043 .expect("unknown batch")
2044 .clone(),
2045 );
2046 }
2047
2048 let cfg = datadriven.client.cfg.clone();
2049 if let Some(target_size) = target_size {
2050 cfg.set_config(&BLOB_TARGET_SIZE, target_size);
2051 };
2052 if let Some(memory_bound) = memory_bound {
2053 cfg.set_config(&COMPACTION_MEMORY_BOUND_BYTES, memory_bound);
2054 }
2055 let req = CompactReq {
2056 shard_id: datadriven.shard_id,
2057 desc: Description::new(lower, upper, since),
2058 inputs: inputs.clone(),
2059 };
2060 datadriven
2061 .compactions
2062 .insert(output.to_owned(), req.clone());
2063 let spine_lower = inputs
2064 .first()
2065 .map_or_else(|| datadriven.next_id, |x| x.id.0);
2066 let spine_upper = inputs.last().map_or_else(
2067 || {
2068 datadriven.next_id += 1;
2069 datadriven.next_id
2070 },
2071 |x| x.id.1,
2072 );
2073 let new_spine_id = SpineId(spine_lower, spine_upper);
2074 let res = Compactor::<String, (), u64, i64>::compact(
2075 CompactConfig::new(&cfg, datadriven.shard_id),
2076 Arc::clone(&datadriven.client.blob),
2077 Arc::clone(&datadriven.client.metrics),
2078 Arc::clone(&datadriven.machine.applier.shard_metrics),
2079 Arc::clone(&datadriven.client.isolated_runtime),
2080 req,
2081 SCHEMAS.clone(),
2082 )
2083 .await?;
2084
2085 let batch = IdHollowBatch {
2086 batch: Arc::new(res.output.clone()),
2087 id: new_spine_id,
2088 };
2089
2090 datadriven.batches.insert(output.to_owned(), batch.clone());
2091 Ok(format!(
2092 "parts={} len={}\n",
2093 res.output.part_count(),
2094 res.output.len
2095 ))
2096 }
2097
2098 pub async fn clear_blob(
2099 datadriven: &MachineState,
2100 _args: DirectiveArgs<'_>,
2101 ) -> Result<String, anyhow::Error> {
2102 let mut to_delete = vec![];
2103 datadriven
2104 .client
2105 .blob
2106 .list_keys_and_metadata("", &mut |meta| {
2107 to_delete.push(meta.key.to_owned());
2108 })
2109 .await?;
2110 for blob in &to_delete {
2111 datadriven.client.blob.delete(blob).await?;
2112 }
2113 Ok(format!("deleted={}\n", to_delete.len()))
2114 }
2115
2116 pub async fn restore_blob(
2117 datadriven: &MachineState,
2118 _args: DirectiveArgs<'_>,
2119 ) -> Result<String, anyhow::Error> {
2120 let not_restored = crate::internal::restore::restore_blob(
2121 &datadriven.state_versions,
2122 datadriven.client.blob.as_ref(),
2123 &datadriven.client.cfg.build_version,
2124 datadriven.shard_id,
2125 &*datadriven.state_versions.metrics,
2126 )
2127 .await?;
2128 let mut out = String::new();
2129 for key in not_restored {
2130 writeln!(&mut out, "{key}");
2131 }
2132 Ok(out)
2133 }
2134
2135 #[allow(clippy::unused_async)]
2136 pub async fn rewrite_ts(
2137 datadriven: &mut MachineState,
2138 args: DirectiveArgs<'_>,
2139 ) -> Result<String, anyhow::Error> {
2140 let input = args.expect_str("input");
2141 let ts_rewrite = args.expect_antichain("frontier");
2142 let upper = args.expect_antichain("upper");
2143
2144 let batch = datadriven.batches.get_mut(input).expect("unknown batch");
2145 let mut hollow_batch = (*batch.batch).clone();
2146 let () = hollow_batch
2147 .rewrite_ts(&ts_rewrite, upper)
2148 .map_err(|err| anyhow!("invalid rewrite: {}", err))?;
2149 batch.batch = Arc::new(hollow_batch);
2150 Ok("ok\n".into())
2151 }
2152
2153 pub async fn gc(
2154 datadriven: &mut MachineState,
2155 args: DirectiveArgs<'_>,
2156 ) -> Result<String, anyhow::Error> {
2157 let new_seqno_since = args.expect("to_seqno");
2158
2159 let req = GcReq {
2160 shard_id: datadriven.shard_id,
2161 new_seqno_since,
2162 };
2163 let (maintenance, stats) =
2164 GarbageCollector::gc_and_truncate(&datadriven.machine, req).await;
2165 datadriven.routine.push(maintenance);
2166
2167 Ok(format!(
2168 "{} batch_parts={} rollups={} truncated={} state_rollups={}\n",
2169 datadriven.machine.seqno(),
2170 stats.batch_parts_deleted_from_blob,
2171 stats.rollups_deleted_from_blob,
2172 stats
2173 .truncated_consensus_to
2174 .iter()
2175 .map(|x| x.to_string())
2176 .collect::<Vec<_>>()
2177 .join(","),
2178 stats
2179 .rollups_removed_from_state
2180 .iter()
2181 .map(|x| x.to_string())
2182 .collect::<Vec<_>>()
2183 .join(","),
2184 ))
2185 }
2186
2187 pub async fn snapshot(
2188 datadriven: &MachineState,
2189 args: DirectiveArgs<'_>,
2190 ) -> Result<String, anyhow::Error> {
2191 let as_of = args.expect_antichain("as_of");
2192 let snapshot = datadriven
2193 .machine
2194 .snapshot(&as_of)
2195 .await
2196 .map_err(|err| anyhow!("{:?}", err))?;
2197
2198 let mut result = String::new();
2199
2200 for batch in snapshot {
2201 writeln!(
2202 result,
2203 "<batch {:?}-{:?}>",
2204 batch.desc.lower().elements(),
2205 batch.desc.upper().elements()
2206 );
2207 for (run, (_meta, parts)) in batch.runs().enumerate() {
2208 writeln!(result, "<run {run}>");
2209 let mut stream = pin!(
2210 futures::stream::iter(parts)
2211 .flat_map(|part| part.part_stream(
2212 datadriven.shard_id,
2213 &*datadriven.state_versions.blob,
2214 &*datadriven.state_versions.metrics
2215 ))
2216 .enumerate()
2217 );
2218
2219 while let Some((idx, part)) = stream.next().await {
2220 let part = &*part?;
2221 writeln!(result, "<part {idx}>");
2222
2223 let part = EncodedPart::fetch(
2224 &FetchConfig::from_persist_config(&datadriven.client.cfg),
2225 &datadriven.shard_id,
2226 datadriven.client.blob.as_ref(),
2227 datadriven.client.metrics.as_ref(),
2228 datadriven.machine.applier.shard_metrics.as_ref(),
2229 &datadriven.client.metrics.read.batch_fetcher,
2230 &batch.desc,
2231 part,
2232 )
2233 .await
2234 .expect("invalid batch part");
2235 let part = part
2236 .normalize(&datadriven.client.metrics.columnar)
2237 .into_part::<String, ()>(&*SCHEMAS.key, &*SCHEMAS.val);
2238
2239 let mut updates = Vec::new();
2240
2241 for ((k, _v), mut t, d) in part
2242 .decode_iter::<_, _, u64, i64>(&*SCHEMAS.key, &*SCHEMAS.val)
2243 .expect("valid schemas")
2244 {
2245 t.advance_by(as_of.borrow());
2246 updates.push((k, t, d));
2247 }
2248
2249 consolidate_updates(&mut updates);
2250
2251 for (k, t, d) in updates {
2252 writeln!(result, "{k} {t} {d}");
2253 }
2254 }
2255 }
2256 }
2257
2258 Ok(result)
2259 }
2260
2261 pub async fn register_listen(
2262 datadriven: &mut MachineState,
2263 args: DirectiveArgs<'_>,
2264 ) -> Result<String, anyhow::Error> {
2265 let output = args.expect_str("output");
2266 let as_of = args.expect_antichain("as_of");
2267 let read = datadriven
2268 .client
2269 .open_leased_reader::<String, (), u64, i64>(
2270 datadriven.shard_id,
2271 Arc::new(StringSchema),
2272 Arc::new(UnitSchema),
2273 Diagnostics::for_tests(),
2274 true,
2275 )
2276 .await
2277 .expect("invalid shard types");
2278 let listen = read
2279 .listen(as_of)
2280 .await
2281 .map_err(|err| anyhow!("{:?}", err))?;
2282 datadriven.listens.insert(output.to_owned(), listen);
2283 Ok("ok\n".into())
2284 }
2285
2286 pub async fn listen_through(
2287 datadriven: &mut MachineState,
2288 args: DirectiveArgs<'_>,
2289 ) -> Result<String, anyhow::Error> {
2290 let input = args.expect_str("input");
2291 let frontier = args.expect("frontier");
2294 let listen = datadriven.listens.get_mut(input).expect("unknown listener");
2295 let mut s = String::new();
2296 loop {
2297 for event in listen.fetch_next().await {
2298 match event {
2299 ListenEvent::Updates(x) => {
2300 for ((k, _v), t, d) in x.iter() {
2301 write!(s, "{} {} {}\n", k.as_ref().unwrap(), t, d);
2302 }
2303 }
2304 ListenEvent::Progress(x) => {
2305 if !x.less_than(&frontier) {
2306 return Ok(s);
2307 }
2308 }
2309 }
2310 }
2311 }
2312 }
2313
2314 pub async fn register_critical_reader(
2315 datadriven: &mut MachineState,
2316 args: DirectiveArgs<'_>,
2317 ) -> Result<String, anyhow::Error> {
2318 let reader_id = args.expect("reader_id");
2319 let (state, maintenance) = datadriven
2320 .machine
2321 .register_critical_reader::<u64>(&reader_id, "tests")
2322 .await;
2323 datadriven.routine.push(maintenance);
2324 Ok(format!(
2325 "{} {:?}\n",
2326 datadriven.machine.seqno(),
2327 state.since.elements(),
2328 ))
2329 }
2330
2331 pub async fn register_leased_reader(
2332 datadriven: &mut MachineState,
2333 args: DirectiveArgs<'_>,
2334 ) -> Result<String, anyhow::Error> {
2335 let reader_id = args.expect("reader_id");
2336 let (reader_state, maintenance) = datadriven
2337 .machine
2338 .register_leased_reader(
2339 &reader_id,
2340 "tests",
2341 READER_LEASE_DURATION.get(&datadriven.client.cfg),
2342 (datadriven.client.cfg.now)(),
2343 false,
2344 )
2345 .await;
2346 datadriven.routine.push(maintenance);
2347 Ok(format!(
2348 "{} {:?}\n",
2349 datadriven.machine.seqno(),
2350 reader_state.since.elements(),
2351 ))
2352 }
2353
2354 pub async fn heartbeat_leased_reader(
2355 datadriven: &MachineState,
2356 args: DirectiveArgs<'_>,
2357 ) -> Result<String, anyhow::Error> {
2358 let reader_id = args.expect("reader_id");
2359 let _ = datadriven
2360 .machine
2361 .heartbeat_leased_reader(&reader_id, (datadriven.client.cfg.now)())
2362 .await;
2363 Ok(format!("{} ok\n", datadriven.machine.seqno()))
2364 }
2365
2366 pub async fn expire_critical_reader(
2367 datadriven: &mut MachineState,
2368 args: DirectiveArgs<'_>,
2369 ) -> Result<String, anyhow::Error> {
2370 let reader_id = args.expect("reader_id");
2371 let (_, maintenance) = datadriven.machine.expire_critical_reader(&reader_id).await;
2372 datadriven.routine.push(maintenance);
2373 Ok(format!("{} ok\n", datadriven.machine.seqno()))
2374 }
2375
2376 pub async fn expire_leased_reader(
2377 datadriven: &mut MachineState,
2378 args: DirectiveArgs<'_>,
2379 ) -> Result<String, anyhow::Error> {
2380 let reader_id = args.expect("reader_id");
2381 let (_, maintenance) = datadriven.machine.expire_leased_reader(&reader_id).await;
2382 datadriven.routine.push(maintenance);
2383 Ok(format!("{} ok\n", datadriven.machine.seqno()))
2384 }
2385
2386 pub async fn compare_and_append_batches(
2387 datadriven: &MachineState,
2388 args: DirectiveArgs<'_>,
2389 ) -> Result<String, anyhow::Error> {
2390 let expected_upper = args.expect_antichain("expected_upper");
2391 let new_upper = args.expect_antichain("new_upper");
2392
2393 let mut batches: Vec<Batch<String, (), u64, i64>> = args
2394 .args
2395 .get("batches")
2396 .expect("missing batches")
2397 .into_iter()
2398 .map(|batch| {
2399 let hollow = (*datadriven
2400 .batches
2401 .get(batch)
2402 .expect("unknown batch")
2403 .clone()
2404 .batch)
2405 .clone();
2406 datadriven.to_batch(hollow)
2407 })
2408 .collect();
2409
2410 let mut writer = datadriven
2411 .client
2412 .open_writer(
2413 datadriven.shard_id,
2414 Arc::new(StringSchema),
2415 Arc::new(UnitSchema),
2416 Diagnostics::for_tests(),
2417 )
2418 .await?;
2419
2420 let mut batch_refs: Vec<_> = batches.iter_mut().collect();
2421
2422 let () = writer
2423 .compare_and_append_batch(batch_refs.as_mut_slice(), expected_upper, new_upper, true)
2424 .await?
2425 .map_err(|err| anyhow!("upper mismatch: {:?}", err))?;
2426
2427 writer.expire().await;
2428
2429 Ok("ok\n".into())
2430 }
2431
2432 pub async fn expire_writer(
2433 datadriven: &mut MachineState,
2434 args: DirectiveArgs<'_>,
2435 ) -> Result<String, anyhow::Error> {
2436 let writer_id = args.expect("writer_id");
2437 let (_, maintenance) = datadriven.machine.expire_writer(&writer_id).await;
2438 datadriven.routine.push(maintenance);
2439 Ok(format!("{} ok\n", datadriven.machine.seqno()))
2440 }
2441
2442 pub(crate) async fn finalize(
2443 datadriven: &mut MachineState,
2444 _args: DirectiveArgs<'_>,
2445 ) -> anyhow::Result<String> {
2446 let maintenance = datadriven.machine.become_tombstone().await?;
2447 datadriven.routine.push(maintenance);
2448 Ok(format!("{} ok\n", datadriven.machine.seqno()))
2449 }
2450
2451 pub(crate) fn is_finalized(
2452 datadriven: &MachineState,
2453 _args: DirectiveArgs<'_>,
2454 ) -> anyhow::Result<String> {
2455 let seqno = datadriven.machine.seqno();
2456 let tombstone = datadriven.machine.is_finalized();
2457 Ok(format!("{seqno} {tombstone}\n"))
2458 }
2459
2460 pub async fn compare_and_append(
2461 datadriven: &mut MachineState,
2462 args: DirectiveArgs<'_>,
2463 ) -> Result<String, anyhow::Error> {
2464 let input = args.expect_str("input");
2465 let writer_id = args.expect("writer_id");
2466 let mut batch = datadriven
2467 .batches
2468 .get(input)
2469 .expect("unknown batch")
2470 .clone();
2471 let token = args.optional("token").unwrap_or_else(IdempotencyToken::new);
2472 let now = (datadriven.client.cfg.now)();
2473
2474 let (id, maintenance) = datadriven
2475 .machine
2476 .register_schema(&*SCHEMAS.key, &*SCHEMAS.val)
2477 .await;
2478 assert_eq!(id, SCHEMAS.id);
2479 datadriven.routine.push(maintenance);
2480 let maintenance = loop {
2481 let indeterminate = args
2482 .optional::<String>("prev_indeterminate")
2483 .map(|x| Indeterminate::new(anyhow::Error::msg(x)));
2484 let res = datadriven
2485 .machine
2486 .compare_and_append_idempotent(
2487 &batch.batch,
2488 &writer_id,
2489 now,
2490 &token,
2491 &HandleDebugState::default(),
2492 indeterminate,
2493 )
2494 .await;
2495 match res {
2496 CompareAndAppendRes::Success(_, x) => break x,
2497 CompareAndAppendRes::UpperMismatch(_seqno, upper) => {
2498 return Err(anyhow!("{:?}", Upper(upper)));
2499 }
2500 CompareAndAppendRes::InlineBackpressure => {
2501 let hollow_batch = (*batch.batch).clone();
2502 let mut b = datadriven.to_batch(hollow_batch);
2503 let cfg = BatchBuilderConfig::new(&datadriven.client.cfg, datadriven.shard_id);
2504 b.flush_to_blob(
2505 &cfg,
2506 &datadriven.client.metrics.user,
2507 &datadriven.client.isolated_runtime,
2508 &*SCHEMAS,
2509 )
2510 .await;
2511 batch.batch = Arc::new(b.into_hollow_batch());
2512 continue;
2513 }
2514 _ => panic!("{:?}", res),
2515 };
2516 };
2517 datadriven.routine.push(maintenance.routine);
2520 Ok(format!(
2521 "{} {:?}\n",
2522 datadriven.machine.seqno(),
2523 datadriven.machine.applier.clone_upper().elements(),
2524 ))
2525 }
2526
2527 pub async fn apply_merge_res(
2528 datadriven: &mut MachineState,
2529 args: DirectiveArgs<'_>,
2530 ) -> Result<String, anyhow::Error> {
2531 let input = args.expect_str("input");
2532 let batch = datadriven
2533 .batches
2534 .get(input)
2535 .expect("unknown batch")
2536 .clone();
2537 let compact_req = datadriven
2538 .compactions
2539 .get(input)
2540 .expect("unknown compact req")
2541 .clone();
2542 let input_batches = compact_req
2543 .inputs
2544 .iter()
2545 .map(|x| x.id)
2546 .collect::<BTreeSet<_>>();
2547 let lower_spine_bound = input_batches
2548 .first()
2549 .map(|id| id.0)
2550 .expect("at least one batch must be present");
2551 let upper_spine_bound = input_batches
2552 .last()
2553 .map(|id| id.1)
2554 .expect("at least one batch must be present");
2555 let id = SpineId(lower_spine_bound, upper_spine_bound);
2556 let hollow_batch = (*batch.batch).clone();
2557
2558 let (merge_res, maintenance) = datadriven
2559 .machine
2560 .merge_res(&FueledMergeRes {
2561 output: hollow_batch,
2562 input: CompactionInput::IdRange(id),
2563 new_active_compaction: None,
2564 })
2565 .await;
2566 datadriven.routine.push(maintenance);
2567 Ok(format!(
2568 "{} {}\n",
2569 datadriven.machine.seqno(),
2570 merge_res.applied()
2571 ))
2572 }
2573
2574 pub async fn perform_maintenance(
2575 datadriven: &mut MachineState,
2576 _args: DirectiveArgs<'_>,
2577 ) -> Result<String, anyhow::Error> {
2578 let mut s = String::new();
2579 for maintenance in datadriven.routine.drain(..) {
2580 let () = maintenance
2581 .perform(&datadriven.machine, &datadriven.gc)
2582 .await;
2583 let () = datadriven
2584 .machine
2585 .applier
2586 .fetch_and_update_state(None)
2587 .await;
2588 write!(s, "{} ok\n", datadriven.machine.seqno());
2589 }
2590 Ok(s)
2591 }
2592}
2593
2594#[cfg(test)]
2595pub mod tests {
2596 use std::sync::Arc;
2597
2598 use mz_dyncfg::ConfigUpdates;
2599 use mz_ore::cast::CastFrom;
2600 use mz_ore::task::spawn;
2601 use mz_persist::intercept::{InterceptBlob, InterceptHandle};
2602 use mz_persist::location::SeqNo;
2603 use mz_persist_types::PersistLocation;
2604 use semver::Version;
2605 use timely::progress::Antichain;
2606
2607 use crate::batch::BatchBuilderConfig;
2608 use crate::cache::StateCache;
2609 use crate::internal::gc::{GarbageCollector, GcReq};
2610 use crate::internal::state::{HandleDebugState, ROLLUP_THRESHOLD};
2611 use crate::tests::{new_test_client, new_test_client_cache};
2612 use crate::{Diagnostics, PersistClient, ShardId};
2613
2614 #[mz_persist_proc::test(tokio::test(flavor = "multi_thread"))]
2615 #[cfg_attr(miri, ignore)] async fn apply_unbatched_cmd_truncate(dyncfgs: ConfigUpdates) {
2617 mz_ore::test::init_logging();
2618
2619 let client = new_test_client(&dyncfgs).await;
2620 client.cfg.set_config(&ROLLUP_THRESHOLD, 5);
2622 let (mut write, _) = client
2623 .expect_open::<String, (), u64, i64>(ShardId::new())
2624 .await;
2625
2626 const NUM_BATCHES: u64 = 100;
2629 for idx in 0..NUM_BATCHES {
2630 let mut batch = write
2631 .expect_batch(&[((idx.to_string(), ()), idx, 1)], idx, idx + 1)
2632 .await;
2633 let cfg = BatchBuilderConfig::new(&client.cfg, write.shard_id());
2636 batch
2637 .flush_to_blob(
2638 &cfg,
2639 &client.metrics.user,
2640 &client.isolated_runtime,
2641 &write.write_schemas,
2642 )
2643 .await;
2644 let (_, writer_maintenance) = write
2645 .machine
2646 .compare_and_append(
2647 &batch.into_hollow_batch(),
2648 &write.writer_id,
2649 &HandleDebugState::default(),
2650 (write.cfg.now)(),
2651 )
2652 .await
2653 .unwrap();
2654 writer_maintenance
2655 .perform(&write.machine, &write.gc, write.compact.as_ref())
2656 .await;
2657 }
2658 let live_diffs = write
2659 .machine
2660 .applier
2661 .state_versions
2662 .fetch_all_live_diffs(&write.machine.shard_id())
2663 .await;
2664 assert!(live_diffs.0.len() > 0);
2666 let max_live_diffs = 2 * usize::cast_from(NUM_BATCHES.next_power_of_two().trailing_zeros());
2670 assert!(
2671 live_diffs.0.len() <= max_live_diffs,
2672 "{} vs {}",
2673 live_diffs.0.len(),
2674 max_live_diffs
2675 );
2676 }
2677
2678 #[mz_persist_proc::test(tokio::test(flavor = "multi_thread"))]
2682 #[cfg_attr(miri, ignore)] async fn regression_gc_skipped_req_and_interrupted(dyncfgs: ConfigUpdates) {
2684 let mut client = new_test_client(&dyncfgs).await;
2685 let intercept = InterceptHandle::default();
2686 client.blob = Arc::new(InterceptBlob::new(
2687 Arc::clone(&client.blob),
2688 intercept.clone(),
2689 ));
2690 let (_, mut read) = client
2691 .expect_open::<String, String, u64, i64>(ShardId::new())
2692 .await;
2693
2694 read.downgrade_since(&Antichain::from_elem(1)).await;
2696 let new_seqno_since = read.machine.applier.seqno_since();
2697
2698 assert!(new_seqno_since > SeqNo::minimum());
2703 intercept.set_post_delete(Some(Arc::new(|_, _| panic!("boom"))));
2704 let machine = read.machine.clone();
2705 let gc = spawn(|| "", async move {
2707 let req = GcReq {
2708 shard_id: machine.shard_id(),
2709 new_seqno_since,
2710 };
2711 GarbageCollector::gc_and_truncate(&machine, req).await
2712 });
2713 let _ = gc.await;
2716
2717 intercept.set_post_delete(None);
2720 let req = GcReq {
2721 shard_id: read.machine.shard_id(),
2722 new_seqno_since,
2723 };
2724 let _ = GarbageCollector::gc_and_truncate(&read.machine, req.clone()).await;
2725 }
2726
2727 #[mz_persist_proc::test(tokio::test(flavor = "multi_thread"))]
2732 #[cfg_attr(miri, ignore)] async fn regression_update_state_after_upper_mismatch(dyncfgs: ConfigUpdates) {
2734 let client = new_test_client(&dyncfgs).await;
2735 let mut client2 = client.clone();
2736
2737 let new_state_cache = Arc::new(StateCache::new_no_metrics());
2740 client2.shared_states = new_state_cache;
2741
2742 let shard_id = ShardId::new();
2743 let (mut write1, _) = client.expect_open::<String, (), u64, i64>(shard_id).await;
2744 let (mut write2, _) = client2.expect_open::<String, (), u64, i64>(shard_id).await;
2745
2746 let data = [
2747 (("1".to_owned(), ()), 1, 1),
2748 (("2".to_owned(), ()), 2, 1),
2749 (("3".to_owned(), ()), 3, 1),
2750 (("4".to_owned(), ()), 4, 1),
2751 (("5".to_owned(), ()), 5, 1),
2752 ];
2753
2754 write1.expect_compare_and_append(&data[..1], 0, 2).await;
2755
2756 write2.expect_compare_and_append(&data[1..2], 2, 3).await;
2759 }
2760
2761 #[mz_persist_proc::test(tokio::test(flavor = "multi_thread"))]
2762 #[cfg_attr(miri, ignore)]
2763 async fn version_upgrade(dyncfgs: ConfigUpdates) {
2764 let mut cache = new_test_client_cache(&dyncfgs);
2765 cache.cfg.build_version = Version::new(26, 1, 0);
2766 let shard_id = ShardId::new();
2767
2768 async fn fetch_catalog_upgrade_shard_version(
2769 persist_client: &PersistClient,
2770 upgrade_shard_id: ShardId,
2771 ) -> Option<semver::Version> {
2772 let shard_state = persist_client
2773 .inspect_shard::<u64>(&upgrade_shard_id)
2774 .await
2775 .ok()?;
2776 let json_state = serde_json::to_value(shard_state).expect("state serialization error");
2777 let upgrade_version = json_state
2778 .get("applier_version")
2779 .cloned()
2780 .expect("missing applier_version");
2781 let upgrade_version =
2782 serde_json::from_value(upgrade_version).expect("version deserialization error");
2783 Some(upgrade_version)
2784 }
2785
2786 cache.cfg.build_version = Version::new(26, 1, 0);
2787 let client = cache.open(PersistLocation::new_in_mem()).await.unwrap();
2788 let (_, mut reader) = client.expect_open::<String, (), u64, i64>(shard_id).await;
2789 reader.downgrade_since(&Antichain::from_elem(1)).await;
2790 assert_eq!(
2791 fetch_catalog_upgrade_shard_version(&client, shard_id).await,
2792 Some(Version::new(26, 1, 0)),
2793 );
2794
2795 cache.cfg.build_version = Version::new(27, 1, 0);
2797 let client = cache.open(PersistLocation::new_in_mem()).await.unwrap();
2798 let (_, mut reader) = client.expect_open::<String, (), u64, i64>(shard_id).await;
2799 reader.downgrade_since(&Antichain::from_elem(2)).await;
2800 assert_eq!(
2801 fetch_catalog_upgrade_shard_version(&client, shard_id).await,
2802 Some(Version::new(26, 1, 0)),
2803 );
2804
2805 client
2807 .upgrade_version::<String, (), u64, i64>(shard_id, Diagnostics::for_tests())
2808 .await
2809 .unwrap();
2810 assert_eq!(
2811 fetch_catalog_upgrade_shard_version(&client, shard_id).await,
2812 Some(Version::new(27, 1, 0)),
2813 );
2814 }
2815}