1use std::fmt::Debug;
13use std::ops::ControlFlow::{self, Continue};
14use std::sync::Arc;
15use std::time::{Duration, Instant, SystemTime};
16
17use differential_dataflow::difference::Semigroup;
18use differential_dataflow::lattice::Lattice;
19use futures::FutureExt;
20use futures::future::{self, BoxFuture};
21use mz_dyncfg::{Config, ConfigSet};
22use mz_ore::assert_none;
23use mz_ore::cast::CastFrom;
24use mz_ore::error::ErrorExt;
25#[allow(unused_imports)] use mz_ore::fmt::FormatBuffer;
27use mz_ore::task::JoinHandle;
28use mz_persist::location::{ExternalError, Indeterminate, SeqNo};
29use mz_persist::retry::Retry;
30use mz_persist_types::schema::SchemaId;
31use mz_persist_types::{Codec, Codec64, Opaque};
32use semver::Version;
33use timely::PartialOrder;
34use timely::progress::{Antichain, Timestamp};
35use tracing::{Instrument, debug, info, trace_span, warn};
36
37use crate::async_runtime::IsolatedRuntime;
38use crate::batch::INLINE_WRITES_TOTAL_MAX_BYTES;
39use crate::cache::StateCache;
40use crate::cfg::RetryParameters;
41use crate::critical::CriticalReaderId;
42use crate::error::{CodecMismatch, InvalidUsage};
43use crate::internal::apply::Applier;
44use crate::internal::compact::CompactReq;
45use crate::internal::gc::GarbageCollector;
46use crate::internal::maintenance::{RoutineMaintenance, WriterMaintenance};
47use crate::internal::metrics::{CmdMetrics, Metrics, MetricsRetryStream, RetryMetrics};
48use crate::internal::paths::PartialRollupKey;
49use crate::internal::state::{
50 CompareAndAppendBreak, CriticalReaderState, HandleDebugState, HollowBatch, HollowRollup,
51 IdempotencyToken, LeasedReaderState, NoOpStateTransition, Since, SnapshotErr, StateCollections,
52 Upper,
53};
54use crate::internal::state_versions::StateVersions;
55use crate::internal::trace::{ApplyMergeResult, FueledMergeRes};
56use crate::internal::watch::StateWatch;
57use crate::read::{LeasedReaderId, READER_LEASE_DURATION};
58use crate::rpc::PubSubSender;
59use crate::schema::CaESchema;
60use crate::write::WriterId;
61use crate::{Diagnostics, PersistConfig, ShardId};
62
63#[derive(Debug)]
64pub struct Machine<K, V, T, D> {
65 pub(crate) applier: Applier<K, V, T, D>,
66 pub(crate) isolated_runtime: Arc<IsolatedRuntime>,
67}
68
69impl<K, V, T: Clone, D> Clone for Machine<K, V, T, D> {
71 fn clone(&self) -> Self {
72 Self {
73 applier: self.applier.clone(),
74 isolated_runtime: Arc::clone(&self.isolated_runtime),
75 }
76 }
77}
78
79pub(crate) const CLAIM_UNCLAIMED_COMPACTIONS: Config<bool> = Config::new(
80 "persist_claim_unclaimed_compactions",
81 false,
82 "If an append doesn't result in a compaction request, but there is some uncompacted batch \
83 in state, compact that instead.",
84);
85
86pub(crate) const CLAIM_COMPACTION_PERCENT: Config<usize> = Config::new(
87 "persist_claim_compaction_percent",
88 100,
89 "Claim a compaction with the given percent chance, if claiming compactions is enabled. \
90 (If over 100, we'll always claim at least one; for example, if set to 365, we'll claim at least \
91 three and have a 65% chance of claiming a fourth.)",
92);
93
94pub(crate) const CLAIM_COMPACTION_MIN_VERSION: Config<String> = Config::new(
95 "persist_claim_compaction_min_version",
96 String::new(),
97 "If set to a valid version string, compact away any earlier versions if possible.",
98);
99
100impl<K, V, T, D> Machine<K, V, T, D>
101where
102 K: Debug + Codec,
103 V: Debug + Codec,
104 T: Timestamp + Lattice + Codec64 + Sync,
105 D: Semigroup + Codec64,
106{
107 pub async fn new(
108 cfg: PersistConfig,
109 shard_id: ShardId,
110 metrics: Arc<Metrics>,
111 state_versions: Arc<StateVersions>,
112 shared_states: Arc<StateCache>,
113 pubsub_sender: Arc<dyn PubSubSender>,
114 isolated_runtime: Arc<IsolatedRuntime>,
115 diagnostics: Diagnostics,
116 ) -> Result<Self, Box<CodecMismatch>> {
117 let applier = Applier::new(
118 cfg,
119 shard_id,
120 metrics,
121 state_versions,
122 shared_states,
123 pubsub_sender,
124 diagnostics,
125 )
126 .await?;
127 Ok(Machine {
128 applier,
129 isolated_runtime,
130 })
131 }
132
133 pub fn shard_id(&self) -> ShardId {
134 self.applier.shard_id
135 }
136
137 pub fn seqno(&self) -> SeqNo {
138 self.applier.seqno()
139 }
140
141 pub async fn add_rollup_for_current_seqno(&self) -> RoutineMaintenance {
142 let rollup = self.applier.write_rollup_for_state().await;
143 let Some(rollup) = rollup else {
144 return RoutineMaintenance::default();
145 };
146
147 let (applied, maintenance) = self.add_rollup((rollup.seqno, &rollup.to_hollow())).await;
148 if !applied {
149 self.applier
152 .state_versions
153 .delete_rollup(&rollup.shard_id, &rollup.key)
154 .await;
155 }
156 maintenance
157 }
158
159 pub async fn add_rollup(
160 &self,
161 add_rollup: (SeqNo, &HollowRollup),
162 ) -> (bool, RoutineMaintenance) {
163 let mut applied_ever_true = false;
166 let metrics = Arc::clone(&self.applier.metrics);
167 let (_seqno, _applied, maintenance) = self
168 .apply_unbatched_idempotent_cmd(&metrics.cmds.add_rollup, |_, _, state| {
169 let ret = state.add_rollup(add_rollup);
170 if let Continue(applied) = ret {
171 applied_ever_true = applied_ever_true || applied;
172 }
173 ret
174 })
175 .await;
176 (applied_ever_true, maintenance)
177 }
178
179 pub async fn remove_rollups(
180 &self,
181 remove_rollups: &[(SeqNo, PartialRollupKey)],
182 ) -> (Vec<SeqNo>, RoutineMaintenance) {
183 let metrics = Arc::clone(&self.applier.metrics);
184 let (_seqno, removed_rollup_seqnos, maintenance) = self
185 .apply_unbatched_idempotent_cmd(&metrics.cmds.remove_rollups, |_, _, state| {
186 state.remove_rollups(remove_rollups)
187 })
188 .await;
189 (removed_rollup_seqnos, maintenance)
190 }
191
192 pub async fn register_leased_reader(
193 &self,
194 reader_id: &LeasedReaderId,
195 purpose: &str,
196 lease_duration: Duration,
197 heartbeat_timestamp_ms: u64,
198 use_critical_since: bool,
199 ) -> (LeasedReaderState<T>, RoutineMaintenance) {
200 let metrics = Arc::clone(&self.applier.metrics);
201 let (_seqno, (reader_state, seqno_since), maintenance) = self
202 .apply_unbatched_idempotent_cmd(&metrics.cmds.register, |seqno, cfg, state| {
203 state.register_leased_reader(
204 &cfg.hostname,
205 reader_id,
206 purpose,
207 seqno,
208 lease_duration,
209 heartbeat_timestamp_ms,
210 use_critical_since,
211 )
212 })
213 .await;
214 debug_assert!(
223 reader_state.seqno >= seqno_since,
224 "{} vs {}",
225 reader_state.seqno,
226 seqno_since,
227 );
228 (reader_state, maintenance)
229 }
230
231 pub async fn register_critical_reader<O: Opaque + Codec64>(
232 &self,
233 reader_id: &CriticalReaderId,
234 purpose: &str,
235 ) -> (CriticalReaderState<T>, RoutineMaintenance) {
236 let metrics = Arc::clone(&self.applier.metrics);
237 let (_seqno, state, maintenance) = self
238 .apply_unbatched_idempotent_cmd(&metrics.cmds.register, |_seqno, cfg, state| {
239 state.register_critical_reader::<O>(&cfg.hostname, reader_id, purpose)
240 })
241 .await;
242 (state, maintenance)
243 }
244
245 pub async fn register_schema(
246 &self,
247 key_schema: &K::Schema,
248 val_schema: &V::Schema,
249 ) -> (Option<SchemaId>, RoutineMaintenance) {
250 let metrics = Arc::clone(&self.applier.metrics);
251 let (_seqno, state, maintenance) = self
252 .apply_unbatched_idempotent_cmd(&metrics.cmds.register, |_seqno, _cfg, state| {
253 state.register_schema::<K, V>(key_schema, val_schema)
254 })
255 .await;
256 (state, maintenance)
257 }
258
259 pub async fn spine_exert(&self, fuel: usize) -> (Vec<CompactReq<T>>, RoutineMaintenance) {
260 if fuel == 0 || self.applier.all_batches().len() < 2 {
262 return (Vec::new(), RoutineMaintenance::default());
263 }
264
265 let metrics = Arc::clone(&self.applier.metrics);
266 let (_seqno, reqs, maintenance) = self
267 .apply_unbatched_idempotent_cmd(&metrics.cmds.spine_exert, |_seqno, _cfg, state| {
268 state.spine_exert(fuel)
269 })
270 .await;
271 let reqs = reqs
272 .into_iter()
273 .map(|req| CompactReq {
274 shard_id: self.shard_id(),
275 desc: req.desc,
276 inputs: req.inputs,
277 })
278 .collect();
279 (reqs, maintenance)
280 }
281
282 pub async fn compare_and_append(
283 &self,
284 batch: &HollowBatch<T>,
285 writer_id: &WriterId,
286 debug_info: &HandleDebugState,
287 heartbeat_timestamp_ms: u64,
288 ) -> CompareAndAppendRes<T> {
289 let idempotency_token = IdempotencyToken::new();
290 loop {
291 let res = self
292 .compare_and_append_idempotent(
293 batch,
294 writer_id,
295 heartbeat_timestamp_ms,
296 &idempotency_token,
297 debug_info,
298 None,
299 )
300 .await;
301 match res {
302 CompareAndAppendRes::Success(seqno, maintenance) => {
303 return CompareAndAppendRes::Success(seqno, maintenance);
304 }
305 CompareAndAppendRes::InvalidUsage(x) => {
306 return CompareAndAppendRes::InvalidUsage(x);
307 }
308 CompareAndAppendRes::InlineBackpressure => {
309 return CompareAndAppendRes::InlineBackpressure;
310 }
311 CompareAndAppendRes::UpperMismatch(seqno, _current_upper) => {
312 self.applier.fetch_and_update_state(Some(seqno)).await;
319 let (current_seqno, current_upper) =
320 self.applier.upper(|seqno, upper| (seqno, upper.clone()));
321
322 if ¤t_upper != batch.desc.lower() {
325 return CompareAndAppendRes::UpperMismatch(current_seqno, current_upper);
326 } else {
327 }
330 }
331 }
332 }
333 }
334
335 async fn compare_and_append_idempotent(
336 &self,
337 batch: &HollowBatch<T>,
338 writer_id: &WriterId,
339 heartbeat_timestamp_ms: u64,
340 idempotency_token: &IdempotencyToken,
341 debug_info: &HandleDebugState,
342 mut indeterminate: Option<Indeterminate>,
346 ) -> CompareAndAppendRes<T> {
347 let metrics = Arc::clone(&self.applier.metrics);
348 let lease_duration_ms = self
349 .applier
350 .cfg
351 .writer_lease_duration
352 .as_millis()
353 .try_into()
354 .expect("reasonable duration");
355 let mut retry = self
438 .applier
439 .metrics
440 .retries
441 .compare_and_append_idempotent
442 .stream(Retry::persist_defaults(SystemTime::now()).into_retry_stream());
443 let mut writer_was_present = false;
444 loop {
445 let cmd_res = self
446 .applier
447 .apply_unbatched_cmd(&metrics.cmds.compare_and_append, |_, cfg, state| {
448 writer_was_present = state.writers.contains_key(writer_id);
449 state.compare_and_append(
450 batch,
451 writer_id,
452 heartbeat_timestamp_ms,
453 lease_duration_ms,
454 idempotency_token,
455 debug_info,
456 INLINE_WRITES_TOTAL_MAX_BYTES.get(cfg),
457 if CLAIM_UNCLAIMED_COMPACTIONS.get(cfg) {
458 CLAIM_COMPACTION_PERCENT.get(cfg)
459 } else {
460 0
461 },
462 Version::parse(&CLAIM_COMPACTION_MIN_VERSION.get(cfg))
463 .ok()
464 .as_ref(),
465 )
466 })
467 .await;
468 let (seqno, res, routine) = match cmd_res {
469 Ok(x) => x,
470 Err(err) => {
471 info!(
474 "compare_and_append received an indeterminate error, retrying in {:?}: {}",
475 retry.next_sleep(),
476 err
477 );
478 if indeterminate.is_none() {
479 indeterminate = Some(err);
480 }
481 retry = retry.sleep().await;
482 continue;
483 }
484 };
485 match res {
486 Ok(merge_reqs) => {
487 let mut compact_reqs = Vec::with_capacity(merge_reqs.len());
490 for req in merge_reqs {
491 let req = CompactReq {
492 shard_id: self.shard_id(),
493 desc: req.desc,
494 inputs: req.inputs,
495 };
496 compact_reqs.push(req);
497 }
498 let writer_maintenance = WriterMaintenance {
499 routine,
500 compaction: compact_reqs,
501 };
502
503 if !writer_was_present {
504 metrics.state.writer_added.inc();
505 }
506 for part in &batch.parts {
507 if part.is_inline() {
508 let bytes = u64::cast_from(part.inline_bytes());
509 metrics.inline.part_commit_bytes.inc_by(bytes);
510 metrics.inline.part_commit_count.inc();
511 }
512 }
513 return CompareAndAppendRes::Success(seqno, writer_maintenance);
514 }
515 Err(CompareAndAppendBreak::AlreadyCommitted) => {
516 assert!(indeterminate.is_some());
520 self.applier.metrics.cmds.compare_and_append_noop.inc();
521 if !writer_was_present {
522 metrics.state.writer_added.inc();
523 }
524 return CompareAndAppendRes::Success(seqno, WriterMaintenance::default());
525 }
526 Err(CompareAndAppendBreak::InvalidUsage(err)) => {
527 assert_none!(indeterminate);
532 return CompareAndAppendRes::InvalidUsage(err);
533 }
534 Err(CompareAndAppendBreak::InlineBackpressure) => {
535 return CompareAndAppendRes::InlineBackpressure;
538 }
539 Err(CompareAndAppendBreak::Upper {
540 shard_upper,
541 writer_upper,
542 }) => {
543 assert!(
548 PartialOrder::less_equal(&writer_upper, &shard_upper),
549 "{:?} vs {:?}",
550 &writer_upper,
551 &shard_upper
552 );
553 if PartialOrder::less_than(&writer_upper, batch.desc.upper()) {
554 return CompareAndAppendRes::UpperMismatch(seqno, shard_upper);
558 }
559 if indeterminate.is_none() {
560 return CompareAndAppendRes::UpperMismatch(seqno, shard_upper);
565 }
566 panic!(
576 concat!(
577 "cannot distinguish compare_and_append success or failure ",
578 "caa_lower={:?} caa_upper={:?} writer_upper={:?} shard_upper={:?} err={:?}"
579 ),
580 batch.desc.lower().elements(),
581 batch.desc.upper().elements(),
582 writer_upper.elements(),
583 shard_upper.elements(),
584 indeterminate,
585 );
586 }
587 };
588 }
589 }
590
591 pub async fn downgrade_since(
592 &self,
593 reader_id: &LeasedReaderId,
594 outstanding_seqno: Option<SeqNo>,
595 new_since: &Antichain<T>,
596 heartbeat_timestamp_ms: u64,
597 ) -> (SeqNo, Since<T>, RoutineMaintenance) {
598 let metrics = Arc::clone(&self.applier.metrics);
599 self.apply_unbatched_idempotent_cmd(&metrics.cmds.downgrade_since, |seqno, _cfg, state| {
600 state.downgrade_since(
601 reader_id,
602 seqno,
603 outstanding_seqno,
604 new_since,
605 heartbeat_timestamp_ms,
606 )
607 })
608 .await
609 }
610
611 pub async fn compare_and_downgrade_since<O: Opaque + Codec64>(
612 &self,
613 reader_id: &CriticalReaderId,
614 expected_opaque: &O,
615 (new_opaque, new_since): (&O, &Antichain<T>),
616 ) -> (Result<Since<T>, (O, Since<T>)>, RoutineMaintenance) {
617 let metrics = Arc::clone(&self.applier.metrics);
618 let (_seqno, res, maintenance) = self
619 .apply_unbatched_idempotent_cmd(
620 &metrics.cmds.compare_and_downgrade_since,
621 |_seqno, _cfg, state| {
622 state.compare_and_downgrade_since::<O>(
623 reader_id,
624 expected_opaque,
625 (new_opaque, new_since),
626 )
627 },
628 )
629 .await;
630
631 match res {
632 Ok(since) => (Ok(since), maintenance),
633 Err((opaque, since)) => (Err((opaque, since)), maintenance),
634 }
635 }
636
637 pub async fn heartbeat_leased_reader(
638 &self,
639 reader_id: &LeasedReaderId,
640 heartbeat_timestamp_ms: u64,
641 ) -> (SeqNo, bool, RoutineMaintenance) {
642 let metrics = Arc::clone(&self.applier.metrics);
643 let (seqno, existed, maintenance) = self
644 .apply_unbatched_idempotent_cmd(&metrics.cmds.heartbeat_reader, |_, _, state| {
645 state.heartbeat_leased_reader(reader_id, heartbeat_timestamp_ms)
646 })
647 .await;
648 (seqno, existed, maintenance)
649 }
650
651 pub async fn expire_leased_reader(
652 &self,
653 reader_id: &LeasedReaderId,
654 ) -> (SeqNo, RoutineMaintenance) {
655 let metrics = Arc::clone(&self.applier.metrics);
656 let (seqno, _existed, maintenance) = self
657 .apply_unbatched_idempotent_cmd(&metrics.cmds.expire_reader, |_, _, state| {
658 state.expire_leased_reader(reader_id)
659 })
660 .await;
661 (seqno, maintenance)
662 }
663
664 #[allow(dead_code)] pub async fn expire_critical_reader(
666 &self,
667 reader_id: &CriticalReaderId,
668 ) -> (SeqNo, RoutineMaintenance) {
669 let metrics = Arc::clone(&self.applier.metrics);
670 let (seqno, _existed, maintenance) = self
671 .apply_unbatched_idempotent_cmd(&metrics.cmds.expire_reader, |_, _, state| {
672 state.expire_critical_reader(reader_id)
673 })
674 .await;
675 (seqno, maintenance)
676 }
677
678 pub async fn expire_writer(&self, writer_id: &WriterId) -> (SeqNo, RoutineMaintenance) {
679 let metrics = Arc::clone(&self.applier.metrics);
680 let (seqno, _existed, maintenance) = self
681 .apply_unbatched_idempotent_cmd(&metrics.cmds.expire_writer, |_, _, state| {
682 state.expire_writer(writer_id)
683 })
684 .await;
685 metrics.state.writer_removed.inc();
686 (seqno, maintenance)
687 }
688
689 pub fn is_finalized(&self) -> bool {
690 self.applier.is_finalized()
691 }
692
693 pub fn get_schema(&self, schema_id: SchemaId) -> Option<(K::Schema, V::Schema)> {
695 self.applier.get_schema(schema_id)
696 }
697
698 pub fn latest_schema(&self) -> Option<(SchemaId, K::Schema, V::Schema)> {
700 self.applier.latest_schema()
701 }
702
703 pub async fn compare_and_evolve_schema(
707 &self,
708 expected: SchemaId,
709 key_schema: &K::Schema,
710 val_schema: &V::Schema,
711 ) -> (CaESchema<K, V>, RoutineMaintenance) {
712 let metrics = Arc::clone(&self.applier.metrics);
713 let (_seqno, state, maintenance) = self
714 .apply_unbatched_idempotent_cmd(
715 &metrics.cmds.compare_and_evolve_schema,
716 |_seqno, _cfg, state| {
717 state.compare_and_evolve_schema::<K, V>(expected, key_schema, val_schema)
718 },
719 )
720 .await;
721 (state, maintenance)
722 }
723
724 async fn tombstone_step(&self) -> Result<(bool, RoutineMaintenance), InvalidUsage<T>> {
725 let metrics = Arc::clone(&self.applier.metrics);
726 let mut retry = self
727 .applier
728 .metrics
729 .retries
730 .idempotent_cmd
731 .stream(Retry::persist_defaults(SystemTime::now()).into_retry_stream());
732 loop {
733 let res = self
734 .applier
735 .apply_unbatched_cmd(&metrics.cmds.become_tombstone, |_, _, state| {
736 state.become_tombstone_and_shrink()
737 })
738 .await;
739 let err = match res {
740 Ok((_seqno, Ok(()), maintenance)) => return Ok((true, maintenance)),
741 Ok((_seqno, Err(NoOpStateTransition(())), maintenance)) => {
742 return Ok((false, maintenance));
743 }
744 Err(err) => err,
745 };
746 if retry.attempt() >= INFO_MIN_ATTEMPTS {
747 info!(
748 "become_tombstone received an indeterminate error, retrying in {:?}: {}",
749 retry.next_sleep(),
750 err
751 );
752 } else {
753 debug!(
754 "become_tombstone received an indeterminate error, retrying in {:?}: {}",
755 retry.next_sleep(),
756 err
757 );
758 }
759 retry = retry.sleep().await;
760 }
761 }
762
763 pub async fn become_tombstone(&self) -> Result<RoutineMaintenance, InvalidUsage<T>> {
764 self.applier.check_since_upper_both_empty()?;
765
766 let mut maintenance = RoutineMaintenance::default();
767
768 loop {
769 let (made_progress, more_maintenance) = self.tombstone_step().await?;
770 maintenance.merge(more_maintenance);
771 if !made_progress {
772 break;
773 }
774 }
775
776 Ok(maintenance)
777 }
778
779 pub async fn snapshot(&self, as_of: &Antichain<T>) -> Result<Vec<HollowBatch<T>>, Since<T>> {
780 let start = Instant::now();
781 let (mut seqno, mut upper) = match self.applier.snapshot(as_of) {
782 Ok(x) => return Ok(x),
783 Err(SnapshotErr::AsOfNotYetAvailable(seqno, Upper(upper))) => (seqno, upper),
784 Err(SnapshotErr::AsOfHistoricalDistinctionsLost(Since(since))) => {
785 return Err(Since(since));
786 }
787 };
788
789 let mut watch = self.applier.watch();
792 let watch = &mut watch;
793 let sleeps = self
794 .applier
795 .metrics
796 .retries
797 .snapshot
798 .stream(Retry::persist_defaults(SystemTime::now()).into_retry_stream());
799
800 enum Wake<'a, K, V, T, D> {
801 Watch(&'a mut StateWatch<K, V, T, D>),
802 Sleep(MetricsRetryStream),
803 }
804 let mut watch_fut = std::pin::pin!(
805 watch
806 .wait_for_seqno_ge(seqno.next())
807 .map(Wake::Watch)
808 .instrument(trace_span!("snapshot::watch")),
809 );
810 let mut sleep_fut = std::pin::pin!(
811 sleeps
812 .sleep()
813 .map(Wake::Sleep)
814 .instrument(trace_span!("snapshot::sleep")),
815 );
816
817 let mut logged_at_info = false;
821 loop {
822 if !logged_at_info && start.elapsed() >= Duration::from_millis(1024) {
826 logged_at_info = true;
827 info!(
828 "snapshot {} {} as of {:?} not yet available for {} upper {:?}",
829 self.applier.shard_metrics.name,
830 self.shard_id(),
831 as_of.elements(),
832 seqno,
833 upper.elements(),
834 );
835 } else {
836 debug!(
837 "snapshot {} {} as of {:?} not yet available for {} upper {:?}",
838 self.applier.shard_metrics.name,
839 self.shard_id(),
840 as_of.elements(),
841 seqno,
842 upper.elements(),
843 );
844 }
845
846 let wake = match future::select(watch_fut.as_mut(), sleep_fut.as_mut()).await {
847 future::Either::Left((wake, _)) => wake,
848 future::Either::Right((wake, _)) => wake,
849 };
850 match &wake {
854 Wake::Watch(_) => self.applier.metrics.watch.snapshot_woken_via_watch.inc(),
855 Wake::Sleep(_) => {
856 self.applier.metrics.watch.snapshot_woken_via_sleep.inc();
857 self.applier.fetch_and_update_state(Some(seqno)).await;
858 }
859 }
860
861 (seqno, upper) = match self.applier.snapshot(as_of) {
862 Ok(x) => {
863 if logged_at_info {
864 info!(
865 "snapshot {} {} as of {:?} now available",
866 self.applier.shard_metrics.name,
867 self.shard_id(),
868 as_of.elements(),
869 );
870 }
871 return Ok(x);
872 }
873 Err(SnapshotErr::AsOfNotYetAvailable(seqno, Upper(upper))) => {
874 (seqno, upper)
876 }
877 Err(SnapshotErr::AsOfHistoricalDistinctionsLost(Since(since))) => {
878 return Err(Since(since));
879 }
880 };
881
882 match wake {
883 Wake::Watch(watch) => {
884 watch_fut.set(
885 watch
886 .wait_for_seqno_ge(seqno.next())
887 .map(Wake::Watch)
888 .instrument(trace_span!("snapshot::watch")),
889 );
890 }
891 Wake::Sleep(sleeps) => {
892 debug!(
893 "snapshot {} {} sleeping for {:?}",
894 self.applier.shard_metrics.name,
895 self.shard_id(),
896 sleeps.next_sleep()
897 );
898 sleep_fut.set(
899 sleeps
900 .sleep()
901 .map(Wake::Sleep)
902 .instrument(trace_span!("snapshot::sleep")),
903 );
904 }
905 }
906 }
907 }
908
909 pub fn verify_listen(&self, as_of: &Antichain<T>) -> Result<(), Since<T>> {
911 self.applier.verify_listen(as_of)
912 }
913
914 pub async fn next_listen_batch(
915 &self,
916 frontier: &Antichain<T>,
917 watch: &mut StateWatch<K, V, T, D>,
918 reader_id: Option<&LeasedReaderId>,
919 retry: Option<RetryParameters>,
921 ) -> HollowBatch<T> {
922 let mut seqno = match self.applier.next_listen_batch(frontier) {
923 Ok(b) => return b,
924 Err(seqno) => seqno,
925 };
926
927 let retry = retry.unwrap_or_else(|| next_listen_batch_retry_params(&self.applier.cfg));
930 let sleeps = self
931 .applier
932 .metrics
933 .retries
934 .next_listen_batch
935 .stream(retry.into_retry(SystemTime::now()).into_retry_stream());
936
937 enum Wake<'a, K, V, T, D> {
938 Watch(&'a mut StateWatch<K, V, T, D>),
939 Sleep(MetricsRetryStream),
940 }
941 let mut watch_fut = std::pin::pin!(
942 watch
943 .wait_for_seqno_ge(seqno.next())
944 .map(Wake::Watch)
945 .instrument(trace_span!("snapshot::watch"))
946 );
947 let mut sleep_fut = std::pin::pin!(
948 sleeps
949 .sleep()
950 .map(Wake::Sleep)
951 .instrument(trace_span!("snapshot::sleep"))
952 );
953
954 loop {
955 let wake = match future::select(watch_fut.as_mut(), sleep_fut.as_mut()).await {
956 future::Either::Left((wake, _)) => wake,
957 future::Either::Right((wake, _)) => wake,
958 };
959 match &wake {
963 Wake::Watch(_) => self.applier.metrics.watch.listen_woken_via_watch.inc(),
964 Wake::Sleep(_) => {
965 self.applier.metrics.watch.listen_woken_via_sleep.inc();
966 self.applier.fetch_and_update_state(Some(seqno)).await;
967 }
968 }
969
970 seqno = match self.applier.next_listen_batch(frontier) {
971 Ok(b) => {
972 match &wake {
973 Wake::Watch(_) => {
974 self.applier.metrics.watch.listen_resolved_via_watch.inc()
975 }
976 Wake::Sleep(_) => {
977 self.applier.metrics.watch.listen_resolved_via_sleep.inc()
978 }
979 }
980 return b;
981 }
982 Err(seqno) => seqno,
983 };
984
985 match wake {
988 Wake::Watch(watch) => {
989 watch_fut.set(
990 watch
991 .wait_for_seqno_ge(seqno.next())
992 .map(Wake::Watch)
993 .instrument(trace_span!("snapshot::watch")),
994 );
995 }
996 Wake::Sleep(sleeps) => {
997 debug!(
998 "{:?}: {} {} next_listen_batch didn't find new data, retrying in {:?}",
999 reader_id,
1000 self.applier.shard_metrics.name,
1001 self.shard_id(),
1002 sleeps.next_sleep()
1003 );
1004 sleep_fut.set(
1005 sleeps
1006 .sleep()
1007 .map(Wake::Sleep)
1008 .instrument(trace_span!("snapshot::sleep")),
1009 );
1010 }
1011 }
1012 }
1013 }
1014
1015 async fn apply_unbatched_idempotent_cmd<
1016 R,
1017 WorkFn: FnMut(
1018 SeqNo,
1019 &PersistConfig,
1020 &mut StateCollections<T>,
1021 ) -> ControlFlow<NoOpStateTransition<R>, R>,
1022 >(
1023 &self,
1024 cmd: &CmdMetrics,
1025 mut work_fn: WorkFn,
1026 ) -> (SeqNo, R, RoutineMaintenance) {
1027 let mut retry = self
1028 .applier
1029 .metrics
1030 .retries
1031 .idempotent_cmd
1032 .stream(Retry::persist_defaults(SystemTime::now()).into_retry_stream());
1033 loop {
1034 match self.applier.apply_unbatched_cmd(cmd, &mut work_fn).await {
1035 Ok((seqno, x, maintenance)) => match x {
1036 Ok(x) => {
1037 return (seqno, x, maintenance);
1038 }
1039 Err(NoOpStateTransition(x)) => {
1040 return (seqno, x, maintenance);
1041 }
1042 },
1043 Err(err) => {
1044 if retry.attempt() >= INFO_MIN_ATTEMPTS {
1045 info!(
1046 "apply_unbatched_idempotent_cmd {} received an indeterminate error, retrying in {:?}: {}",
1047 cmd.name,
1048 retry.next_sleep(),
1049 err
1050 );
1051 } else {
1052 debug!(
1053 "apply_unbatched_idempotent_cmd {} received an indeterminate error, retrying in {:?}: {}",
1054 cmd.name,
1055 retry.next_sleep(),
1056 err
1057 );
1058 }
1059 retry = retry.sleep().await;
1060 continue;
1061 }
1062 }
1063 }
1064 }
1065}
1066
1067impl<K, V, T, D> Machine<K, V, T, D>
1068where
1069 K: Debug + Codec,
1070 V: Debug + Codec,
1071 T: Timestamp + Lattice + Codec64 + Sync,
1072 D: Semigroup + Codec64 + PartialEq,
1073{
1074 pub async fn merge_res(
1075 &self,
1076 res: &FueledMergeRes<T>,
1077 ) -> (ApplyMergeResult, RoutineMaintenance) {
1078 let metrics = Arc::clone(&self.applier.metrics);
1079
1080 let mut merge_result_ever_applied = ApplyMergeResult::NotAppliedNoMatch;
1109 let (_seqno, _apply_merge_result, maintenance) = self
1110 .apply_unbatched_idempotent_cmd(&metrics.cmds.merge_res, |_, _, state| {
1111 let ret = state.apply_merge_res::<D>(res, &Arc::clone(&metrics).columnar);
1112 if let Continue(result) = ret {
1113 if result.applied() {
1115 merge_result_ever_applied = result;
1116 }
1117 if result.matched() && !result.applied() && !merge_result_ever_applied.applied()
1120 {
1121 merge_result_ever_applied = result;
1122 }
1123 }
1124 ret
1125 })
1126 .await;
1127 (merge_result_ever_applied, maintenance)
1128 }
1129}
1130
1131pub(crate) struct ExpireFn(
1132 pub(crate) Box<dyn FnOnce() -> BoxFuture<'static, ()> + Send + Sync + 'static>,
1137);
1138
1139impl Debug for ExpireFn {
1140 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1141 f.debug_struct("ExpireFn").finish_non_exhaustive()
1142 }
1143}
1144
1145#[derive(Debug)]
1146pub(crate) enum CompareAndAppendRes<T> {
1147 Success(SeqNo, WriterMaintenance<T>),
1148 InvalidUsage(InvalidUsage<T>),
1149 UpperMismatch(SeqNo, Antichain<T>),
1150 InlineBackpressure,
1151}
1152
1153#[cfg(test)]
1154impl<T: Debug> CompareAndAppendRes<T> {
1155 #[track_caller]
1156 fn unwrap(self) -> (SeqNo, WriterMaintenance<T>) {
1157 match self {
1158 CompareAndAppendRes::Success(seqno, maintenance) => (seqno, maintenance),
1159 x => panic!("{:?}", x),
1160 }
1161 }
1162}
1163
1164impl<K, V, T, D> Machine<K, V, T, D>
1165where
1166 K: Debug + Codec,
1167 V: Debug + Codec,
1168 T: Timestamp + Lattice + Codec64 + Sync,
1169 D: Semigroup + Codec64 + Send + Sync,
1170{
1171 #[allow(clippy::unused_async)]
1172 pub async fn start_reader_heartbeat_tasks(
1173 self,
1174 reader_id: LeasedReaderId,
1175 gc: GarbageCollector<K, V, T, D>,
1176 ) -> Vec<JoinHandle<()>> {
1177 let mut ret = Vec::new();
1178 let metrics = Arc::clone(&self.applier.metrics);
1179
1180 let name = format!("persist::heartbeat_read({},{})", self.shard_id(), reader_id);
1192 ret.push(mz_ore::task::spawn(|| name, {
1193 let machine = self.clone();
1194 let reader_id = reader_id.clone();
1195 let gc = gc.clone();
1196 metrics
1197 .tasks
1198 .heartbeat_read
1199 .instrument_task(Self::reader_heartbeat_task(machine, reader_id, gc))
1200 }));
1201
1202 let isolated_runtime = Arc::clone(&self.isolated_runtime);
1203 let name = format!(
1204 "persist::heartbeat_read_isolated({},{})",
1205 self.shard_id(),
1206 reader_id
1207 );
1208 ret.push(
1209 isolated_runtime.spawn_named(
1210 || name,
1211 metrics
1212 .tasks
1213 .heartbeat_read
1214 .instrument_task(Self::reader_heartbeat_task(self, reader_id, gc)),
1215 ),
1216 );
1217
1218 ret
1219 }
1220
1221 async fn reader_heartbeat_task(
1222 machine: Self,
1223 reader_id: LeasedReaderId,
1224 gc: GarbageCollector<K, V, T, D>,
1225 ) {
1226 let sleep_duration = READER_LEASE_DURATION.get(&machine.applier.cfg) / 2;
1227 loop {
1228 let before_sleep = Instant::now();
1229 tokio::time::sleep(sleep_duration).await;
1230
1231 let elapsed_since_before_sleeping = before_sleep.elapsed();
1232 if elapsed_since_before_sleeping > sleep_duration + Duration::from_secs(60) {
1233 warn!(
1234 "reader ({}) of shard ({}) went {}s between heartbeats",
1235 reader_id,
1236 machine.shard_id(),
1237 elapsed_since_before_sleeping.as_secs_f64()
1238 );
1239 }
1240
1241 let before_heartbeat = Instant::now();
1242 let (_seqno, existed, maintenance) = machine
1243 .heartbeat_leased_reader(&reader_id, (machine.applier.cfg.now)())
1244 .await;
1245 maintenance.start_performing(&machine, &gc);
1246
1247 let elapsed_since_heartbeat = before_heartbeat.elapsed();
1248 if elapsed_since_heartbeat > Duration::from_secs(60) {
1249 warn!(
1250 "reader ({}) of shard ({}) heartbeat call took {}s",
1251 reader_id,
1252 machine.shard_id(),
1253 elapsed_since_heartbeat.as_secs_f64(),
1254 );
1255 }
1256
1257 if !existed {
1258 warn!(
1266 "heartbeat task for reader ({}) of shard ({}) exiting due to expired lease \
1267 while read handle is live",
1268 reader_id,
1269 machine.shard_id(),
1270 );
1271 return;
1272 }
1273 }
1274 }
1275}
1276
1277pub(crate) const NEXT_LISTEN_BATCH_RETRYER_FIXED_SLEEP: Config<Duration> = Config::new(
1278 "persist_next_listen_batch_retryer_fixed_sleep",
1279 Duration::from_millis(1200), "\
1281 The fixed sleep when polling for new batches from a Listen or Subscribe. Skipped if zero.",
1282);
1283
1284pub(crate) const NEXT_LISTEN_BATCH_RETRYER_INITIAL_BACKOFF: Config<Duration> = Config::new(
1285 "persist_next_listen_batch_retryer_initial_backoff",
1286 Duration::from_millis(100), "The initial backoff when polling for new batches from a Listen or Subscribe.",
1288);
1289
1290pub(crate) const NEXT_LISTEN_BATCH_RETRYER_MULTIPLIER: Config<u32> = Config::new(
1291 "persist_next_listen_batch_retryer_multiplier",
1292 2,
1293 "The backoff multiplier when polling for new batches from a Listen or Subscribe.",
1294);
1295
1296pub(crate) const NEXT_LISTEN_BATCH_RETRYER_CLAMP: Config<Duration> = Config::new(
1297 "persist_next_listen_batch_retryer_clamp",
1298 Duration::from_secs(16), "The backoff clamp duration when polling for new batches from a Listen or Subscribe.",
1300);
1301
1302fn next_listen_batch_retry_params(cfg: &ConfigSet) -> RetryParameters {
1303 RetryParameters {
1304 fixed_sleep: NEXT_LISTEN_BATCH_RETRYER_FIXED_SLEEP.get(cfg),
1305 initial_backoff: NEXT_LISTEN_BATCH_RETRYER_INITIAL_BACKOFF.get(cfg),
1306 multiplier: NEXT_LISTEN_BATCH_RETRYER_MULTIPLIER.get(cfg),
1307 clamp: NEXT_LISTEN_BATCH_RETRYER_CLAMP.get(cfg),
1308 }
1309}
1310
1311pub const INFO_MIN_ATTEMPTS: usize = 3;
1312
1313pub async fn retry_external<R, F, WorkFn>(metrics: &RetryMetrics, mut work_fn: WorkFn) -> R
1314where
1315 F: std::future::Future<Output = Result<R, ExternalError>>,
1316 WorkFn: FnMut() -> F,
1317{
1318 let mut retry = metrics.stream(Retry::persist_defaults(SystemTime::now()).into_retry_stream());
1319 loop {
1320 match work_fn().await {
1321 Ok(x) => {
1322 if retry.attempt() > 0 {
1323 debug!(
1324 "external operation {} succeeded after failing at least once",
1325 metrics.name,
1326 );
1327 }
1328 return x;
1329 }
1330 Err(err) => {
1331 if retry.attempt() >= INFO_MIN_ATTEMPTS {
1332 info!(
1333 "external operation {} failed, retrying in {:?}: {}",
1334 metrics.name,
1335 retry.next_sleep(),
1336 err.display_with_causes()
1337 );
1338 } else {
1339 debug!(
1340 "external operation {} failed, retrying in {:?}: {}",
1341 metrics.name,
1342 retry.next_sleep(),
1343 err.display_with_causes()
1344 );
1345 }
1346 retry = retry.sleep().await;
1347 }
1348 }
1349 }
1350}
1351
1352pub async fn retry_determinate<R, F, WorkFn>(
1353 metrics: &RetryMetrics,
1354 mut work_fn: WorkFn,
1355) -> Result<R, Indeterminate>
1356where
1357 F: std::future::Future<Output = Result<R, ExternalError>>,
1358 WorkFn: FnMut() -> F,
1359{
1360 let mut retry = metrics.stream(Retry::persist_defaults(SystemTime::now()).into_retry_stream());
1361 loop {
1362 match work_fn().await {
1363 Ok(x) => {
1364 if retry.attempt() > 0 {
1365 debug!(
1366 "external operation {} succeeded after failing at least once",
1367 metrics.name,
1368 );
1369 }
1370 return Ok(x);
1371 }
1372 Err(ExternalError::Determinate(err)) => {
1373 debug!(
1382 "external operation {} failed, retrying in {:?}: {}",
1383 metrics.name,
1384 retry.next_sleep(),
1385 err.display_with_causes()
1386 );
1387 retry = retry.sleep().await;
1388 continue;
1389 }
1390 Err(ExternalError::Indeterminate(x)) => return Err(x),
1391 }
1392 }
1393}
1394
1395#[cfg(test)]
1396pub mod datadriven {
1397 use std::collections::{BTreeMap, BTreeSet};
1398 use std::pin::pin;
1399 use std::sync::{Arc, LazyLock};
1400
1401 use anyhow::anyhow;
1402 use differential_dataflow::consolidation::consolidate_updates;
1403 use differential_dataflow::trace::Description;
1404 use futures::StreamExt;
1405 use mz_dyncfg::{ConfigUpdates, ConfigVal};
1406 use mz_persist::indexed::encoding::BlobTraceBatchPart;
1407 use mz_persist_types::codec_impls::{StringSchema, UnitSchema};
1408
1409 use crate::batch::{
1410 BLOB_TARGET_SIZE, Batch, BatchBuilder, BatchBuilderConfig, BatchBuilderInternal,
1411 BatchParts, validate_truncate_batch,
1412 };
1413 use crate::cfg::COMPACTION_MEMORY_BOUND_BYTES;
1414 use crate::fetch::{EncodedPart, FetchConfig};
1415 use crate::internal::compact::{CompactConfig, CompactReq, Compactor};
1416 use crate::internal::datadriven::DirectiveArgs;
1417 use crate::internal::encoding::Schemas;
1418 use crate::internal::gc::GcReq;
1419 use crate::internal::paths::{BlobKey, BlobKeyPrefix, PartialBlobKey};
1420 use crate::internal::state::{BatchPart, RunOrder, RunPart};
1421 use crate::internal::state_versions::EncodedRollup;
1422 use crate::internal::trace::{CompactionInput, IdHollowBatch, SpineId};
1423 use crate::read::{Listen, ListenEvent, READER_LEASE_DURATION};
1424 use crate::rpc::NoopPubSubSender;
1425 use crate::tests::new_test_client;
1426 use crate::write::COMBINE_INLINE_WRITES;
1427 use crate::{GarbageCollector, PersistClient};
1428
1429 use super::*;
1430
1431 static SCHEMAS: LazyLock<Schemas<String, ()>> = LazyLock::new(|| Schemas {
1432 id: Some(SchemaId(0)),
1433 key: Arc::new(StringSchema),
1434 val: Arc::new(UnitSchema),
1435 });
1436
1437 #[derive(Debug)]
1439 pub struct MachineState {
1440 pub client: PersistClient,
1441 pub shard_id: ShardId,
1442 pub state_versions: Arc<StateVersions>,
1443 pub machine: Machine<String, (), u64, i64>,
1444 pub gc: GarbageCollector<String, (), u64, i64>,
1445 pub batches: BTreeMap<String, IdHollowBatch<u64>>,
1446 pub next_id: usize,
1447 pub rollups: BTreeMap<String, EncodedRollup>,
1448 pub listens: BTreeMap<String, Listen<String, (), u64, i64>>,
1449 pub routine: Vec<RoutineMaintenance>,
1450 pub compactions: BTreeMap<String, CompactReq<u64>>,
1451 }
1452
1453 impl MachineState {
1454 pub async fn new(dyncfgs: &ConfigUpdates) -> Self {
1455 let shard_id = ShardId::new();
1456 let client = new_test_client(dyncfgs).await;
1457 client
1460 .cfg
1461 .set_config(&BLOB_TARGET_SIZE, *BLOB_TARGET_SIZE.default());
1462 client.cfg.set_config(&COMBINE_INLINE_WRITES, false);
1466 let state_versions = Arc::new(StateVersions::new(
1467 client.cfg.clone(),
1468 Arc::clone(&client.consensus),
1469 Arc::clone(&client.blob),
1470 Arc::clone(&client.metrics),
1471 ));
1472 let machine = Machine::new(
1473 client.cfg.clone(),
1474 shard_id,
1475 Arc::clone(&client.metrics),
1476 Arc::clone(&state_versions),
1477 Arc::clone(&client.shared_states),
1478 Arc::new(NoopPubSubSender),
1479 Arc::clone(&client.isolated_runtime),
1480 Diagnostics::for_tests(),
1481 )
1482 .await
1483 .expect("codecs should match");
1484 let gc = GarbageCollector::new(machine.clone(), Arc::clone(&client.isolated_runtime));
1485 MachineState {
1486 shard_id,
1487 client,
1488 state_versions,
1489 machine,
1490 gc,
1491 batches: BTreeMap::default(),
1492 rollups: BTreeMap::default(),
1493 listens: BTreeMap::default(),
1494 routine: Vec::new(),
1495 compactions: BTreeMap::default(),
1496 next_id: 0,
1497 }
1498 }
1499
1500 fn to_batch(&self, hollow: HollowBatch<u64>) -> Batch<String, (), u64, i64> {
1501 Batch::new(
1502 true,
1503 Arc::clone(&self.client.metrics),
1504 Arc::clone(&self.client.blob),
1505 self.client.metrics.shards.shard(&self.shard_id, "test"),
1506 self.client.cfg.build_version.clone(),
1507 hollow,
1508 )
1509 }
1510 }
1511
1512 pub async fn consensus_scan(
1515 datadriven: &MachineState,
1516 args: DirectiveArgs<'_>,
1517 ) -> Result<String, anyhow::Error> {
1518 let from = args.expect("from_seqno");
1519
1520 let mut states = datadriven
1521 .state_versions
1522 .fetch_all_live_states::<u64>(datadriven.shard_id)
1523 .await
1524 .expect("should only be called on an initialized shard")
1525 .check_ts_codec()
1526 .expect("shard codecs should not change");
1527 let mut s = String::new();
1528 while let Some(x) = states.next(|_| {}) {
1529 if x.seqno < from {
1530 continue;
1531 }
1532 let rollups: Vec<_> = x
1533 .collections
1534 .rollups
1535 .keys()
1536 .map(|seqno| seqno.to_string())
1537 .collect();
1538 let batches: Vec<_> = x
1539 .collections
1540 .trace
1541 .batches()
1542 .filter(|b| !b.is_empty())
1543 .filter_map(|b| {
1544 datadriven
1545 .batches
1546 .iter()
1547 .find(|(_, original_batch)| original_batch.batch.parts == b.parts)
1548 .map(|(batch_name, _)| batch_name.to_owned())
1549 })
1550 .collect();
1551 write!(
1552 s,
1553 "seqno={} batches={} rollups={}\n",
1554 x.seqno,
1555 batches.join(","),
1556 rollups.join(","),
1557 );
1558 }
1559 Ok(s)
1560 }
1561
1562 pub async fn consensus_truncate(
1563 datadriven: &MachineState,
1564 args: DirectiveArgs<'_>,
1565 ) -> Result<String, anyhow::Error> {
1566 let to = args.expect("to_seqno");
1567 let removed = datadriven
1568 .client
1569 .consensus
1570 .truncate(&datadriven.shard_id.to_string(), to)
1571 .await
1572 .expect("valid truncation");
1573 Ok(format!("{}\n", removed))
1574 }
1575
1576 pub async fn blob_scan_batches(
1577 datadriven: &MachineState,
1578 _args: DirectiveArgs<'_>,
1579 ) -> Result<String, anyhow::Error> {
1580 let key_prefix = BlobKeyPrefix::Shard(&datadriven.shard_id).to_string();
1581
1582 let mut s = String::new();
1583 let () = datadriven
1584 .state_versions
1585 .blob
1586 .list_keys_and_metadata(&key_prefix, &mut |x| {
1587 let (_, key) = BlobKey::parse_ids(x.key).expect("key should be valid");
1588 if let PartialBlobKey::Batch(_, _) = key {
1589 write!(s, "{}: {}b\n", x.key, x.size_in_bytes);
1590 }
1591 })
1592 .await?;
1593 Ok(s)
1594 }
1595
1596 #[allow(clippy::unused_async)]
1597 pub async fn shard_desc(
1598 datadriven: &MachineState,
1599 _args: DirectiveArgs<'_>,
1600 ) -> Result<String, anyhow::Error> {
1601 Ok(format!(
1602 "since={:?} upper={:?}\n",
1603 datadriven.machine.applier.since().elements(),
1604 datadriven.machine.applier.clone_upper().elements()
1605 ))
1606 }
1607
1608 pub async fn downgrade_since(
1609 datadriven: &mut MachineState,
1610 args: DirectiveArgs<'_>,
1611 ) -> Result<String, anyhow::Error> {
1612 let since = args.expect_antichain("since");
1613 let seqno = args.optional("seqno");
1614 let reader_id = args.expect("reader_id");
1615 let (_, since, routine) = datadriven
1616 .machine
1617 .downgrade_since(
1618 &reader_id,
1619 seqno,
1620 &since,
1621 (datadriven.machine.applier.cfg.now)(),
1622 )
1623 .await;
1624 datadriven.routine.push(routine);
1625 Ok(format!(
1626 "{} {:?}\n",
1627 datadriven.machine.seqno(),
1628 since.0.elements()
1629 ))
1630 }
1631
1632 #[allow(clippy::unused_async)]
1633 pub async fn dyncfg(
1634 datadriven: &MachineState,
1635 args: DirectiveArgs<'_>,
1636 ) -> Result<String, anyhow::Error> {
1637 let mut updates = ConfigUpdates::default();
1638 for x in args.input.trim().split('\n') {
1639 match x.split(' ').collect::<Vec<_>>().as_slice() {
1640 &[name, val] => {
1641 let config = datadriven
1642 .client
1643 .cfg
1644 .entries()
1645 .find(|x| x.name() == name)
1646 .ok_or_else(|| anyhow!("unknown dyncfg: {}", name))?;
1647 match config.val() {
1648 ConfigVal::Usize(_) => {
1649 let val = val.parse().map_err(anyhow::Error::new)?;
1650 updates.add_dynamic(name, ConfigVal::Usize(val));
1651 }
1652 ConfigVal::Bool(_) => {
1653 let val = val.parse().map_err(anyhow::Error::new)?;
1654 updates.add_dynamic(name, ConfigVal::Bool(val));
1655 }
1656 x => unimplemented!("dyncfg type: {:?}", x),
1657 }
1658 }
1659 x => return Err(anyhow!("expected `name val` got: {:?}", x)),
1660 }
1661 }
1662 updates.apply(&datadriven.client.cfg);
1663
1664 Ok("ok\n".to_string())
1665 }
1666
1667 pub async fn compare_and_downgrade_since(
1668 datadriven: &mut MachineState,
1669 args: DirectiveArgs<'_>,
1670 ) -> Result<String, anyhow::Error> {
1671 let expected_opaque: u64 = args.expect("expect_opaque");
1672 let new_opaque: u64 = args.expect("opaque");
1673 let new_since = args.expect_antichain("since");
1674 let reader_id = args.expect("reader_id");
1675 let (res, routine) = datadriven
1676 .machine
1677 .compare_and_downgrade_since(&reader_id, &expected_opaque, (&new_opaque, &new_since))
1678 .await;
1679 datadriven.routine.push(routine);
1680 let since = res.map_err(|(opaque, since)| {
1681 anyhow!("mismatch: opaque={} since={:?}", opaque, since.0.elements())
1682 })?;
1683 Ok(format!(
1684 "{} {} {:?}\n",
1685 datadriven.machine.seqno(),
1686 new_opaque,
1687 since.0.elements()
1688 ))
1689 }
1690
1691 pub async fn write_rollup(
1692 datadriven: &mut MachineState,
1693 args: DirectiveArgs<'_>,
1694 ) -> Result<String, anyhow::Error> {
1695 let output = args.expect_str("output");
1696
1697 let rollup = datadriven
1698 .machine
1699 .applier
1700 .write_rollup_for_state()
1701 .await
1702 .expect("rollup");
1703
1704 datadriven
1705 .rollups
1706 .insert(output.to_string(), rollup.clone());
1707
1708 Ok(format!(
1709 "state={} diffs=[{}, {})\n",
1710 rollup.seqno,
1711 rollup._desc.lower().first().expect("seqno"),
1712 rollup._desc.upper().first().expect("seqno"),
1713 ))
1714 }
1715
1716 pub async fn add_rollup(
1717 datadriven: &mut MachineState,
1718 args: DirectiveArgs<'_>,
1719 ) -> Result<String, anyhow::Error> {
1720 let input = args.expect_str("input");
1721 let rollup = datadriven
1722 .rollups
1723 .get(input)
1724 .expect("unknown batch")
1725 .clone();
1726
1727 let (applied, maintenance) = datadriven
1728 .machine
1729 .add_rollup((rollup.seqno, &rollup.to_hollow()))
1730 .await;
1731
1732 if !applied {
1733 return Err(anyhow!("failed to apply rollup for: {}", rollup.seqno));
1734 }
1735
1736 datadriven.routine.push(maintenance);
1737 Ok(format!("{}\n", datadriven.machine.seqno()))
1738 }
1739
1740 pub async fn write_batch(
1741 datadriven: &mut MachineState,
1742 args: DirectiveArgs<'_>,
1743 ) -> Result<String, anyhow::Error> {
1744 let output = args.expect_str("output");
1745 let lower = args.expect_antichain("lower");
1746 let upper = args.expect_antichain("upper");
1747 assert!(PartialOrder::less_than(&lower, &upper));
1748 let since = args
1749 .optional_antichain("since")
1750 .unwrap_or_else(|| Antichain::from_elem(0));
1751 let target_size = args.optional("target_size");
1752 let parts_size_override = args.optional("parts_size_override");
1753 let consolidate = args.optional("consolidate").unwrap_or(true);
1754 let mut updates: Vec<_> = args
1755 .input
1756 .split('\n')
1757 .flat_map(DirectiveArgs::parse_update)
1758 .collect();
1759
1760 let mut cfg = BatchBuilderConfig::new(&datadriven.client.cfg, datadriven.shard_id);
1761 if let Some(target_size) = target_size {
1762 cfg.blob_target_size = target_size;
1763 };
1764 if consolidate {
1765 consolidate_updates(&mut updates);
1766 }
1767 let run_order = if consolidate {
1768 cfg.preferred_order
1769 } else {
1770 RunOrder::Unordered
1771 };
1772 let parts = BatchParts::new_ordered::<i64>(
1773 cfg.clone(),
1774 run_order,
1775 Arc::clone(&datadriven.client.metrics),
1776 Arc::clone(&datadriven.machine.applier.shard_metrics),
1777 datadriven.shard_id,
1778 Arc::clone(&datadriven.client.blob),
1779 Arc::clone(&datadriven.client.isolated_runtime),
1780 &datadriven.client.metrics.user,
1781 );
1782 let builder = BatchBuilderInternal::new(
1783 cfg.clone(),
1784 parts,
1785 Arc::clone(&datadriven.client.metrics),
1786 SCHEMAS.clone(),
1787 Arc::clone(&datadriven.client.blob),
1788 datadriven.shard_id.clone(),
1789 datadriven.client.cfg.build_version.clone(),
1790 );
1791 let mut builder = BatchBuilder::new(builder, Description::new(lower, upper.clone(), since));
1792 for ((k, ()), t, d) in updates {
1793 builder.add(&k, &(), &t, &d).await.expect("invalid batch");
1794 }
1795 let mut batch = builder.finish(upper).await?;
1796 if parts_size_override.is_some() {
1799 batch
1800 .flush_to_blob(
1801 &cfg,
1802 &datadriven.client.metrics.user,
1803 &datadriven.client.isolated_runtime,
1804 &SCHEMAS,
1805 )
1806 .await;
1807 }
1808 let batch = batch.into_hollow_batch();
1809 let batch = IdHollowBatch {
1810 batch: Arc::new(batch),
1811 id: SpineId(datadriven.next_id, datadriven.next_id + 1),
1812 };
1813 datadriven.next_id += 1;
1814
1815 if let Some(size) = parts_size_override {
1816 let mut batch = batch.clone();
1817 let mut hollow_batch = (*batch.batch).clone();
1818 for part in hollow_batch.parts.iter_mut() {
1819 match part {
1820 RunPart::Many(run) => run.max_part_bytes = size,
1821 RunPart::Single(BatchPart::Hollow(part)) => part.encoded_size_bytes = size,
1822 RunPart::Single(BatchPart::Inline { .. }) => unreachable!("flushed out above"),
1823 }
1824 }
1825 batch.batch = Arc::new(hollow_batch);
1826 datadriven.batches.insert(output.to_owned(), batch);
1827 } else {
1828 datadriven.batches.insert(output.to_owned(), batch.clone());
1829 }
1830 Ok(format!(
1831 "parts={} len={}\n",
1832 batch.batch.part_count(),
1833 batch.batch.len
1834 ))
1835 }
1836
1837 pub async fn fetch_batch(
1838 datadriven: &MachineState,
1839 args: DirectiveArgs<'_>,
1840 ) -> Result<String, anyhow::Error> {
1841 let input = args.expect_str("input");
1842 let stats = args.optional_str("stats");
1843 let batch = datadriven.batches.get(input).expect("unknown batch");
1844
1845 let mut s = String::new();
1846 let mut stream = pin!(
1847 batch
1848 .batch
1849 .part_stream(
1850 datadriven.shard_id,
1851 &*datadriven.state_versions.blob,
1852 &*datadriven.state_versions.metrics
1853 )
1854 .enumerate()
1855 );
1856 while let Some((idx, part)) = stream.next().await {
1857 let part = &*part?;
1858 write!(s, "<part {idx}>\n");
1859
1860 let lower = match part {
1861 BatchPart::Inline { updates, .. } => {
1862 let updates: BlobTraceBatchPart<u64> =
1863 updates.decode(&datadriven.client.metrics.columnar)?;
1864 updates.structured_key_lower()
1865 }
1866 other => other.structured_key_lower(),
1867 };
1868
1869 if let Some(lower) = lower {
1870 if stats == Some("lower") {
1871 writeln!(s, "<key lower={}>", lower.get())
1872 }
1873 }
1874
1875 match part {
1876 BatchPart::Hollow(part) => {
1877 let blob_batch = datadriven
1878 .client
1879 .blob
1880 .get(&part.key.complete(&datadriven.shard_id))
1881 .await;
1882 match blob_batch {
1883 Ok(Some(_)) | Err(_) => {}
1884 Ok(None) => {
1887 s.push_str("<empty>\n");
1888 continue;
1889 }
1890 };
1891 }
1892 BatchPart::Inline { .. } => {}
1893 };
1894 let part = EncodedPart::fetch(
1895 &FetchConfig::from_persist_config(&datadriven.client.cfg),
1896 &datadriven.shard_id,
1897 datadriven.client.blob.as_ref(),
1898 datadriven.client.metrics.as_ref(),
1899 datadriven.machine.applier.shard_metrics.as_ref(),
1900 &datadriven.client.metrics.read.batch_fetcher,
1901 &batch.batch.desc,
1902 part,
1903 )
1904 .await
1905 .expect("invalid batch part");
1906 let part = part
1907 .normalize(&datadriven.client.metrics.columnar)
1908 .into_part::<String, ()>(&*SCHEMAS.key, &*SCHEMAS.val);
1909
1910 for ((k, _v), t, d) in part
1911 .decode_iter::<_, _, u64, i64>(&*SCHEMAS.key, &*SCHEMAS.val)
1912 .expect("valid schemas")
1913 {
1914 writeln!(s, "{k} {t} {d}");
1915 }
1916 }
1917 if !s.is_empty() {
1918 for (idx, (_meta, run)) in batch.batch.runs().enumerate() {
1919 write!(s, "<run {idx}>\n");
1920 for part in run {
1921 let part_idx = batch
1922 .batch
1923 .parts
1924 .iter()
1925 .position(|p| p == part)
1926 .expect("part should exist");
1927 write!(s, "part {part_idx}\n");
1928 }
1929 }
1930 }
1931 Ok(s)
1932 }
1933
1934 #[allow(clippy::unused_async)]
1935 pub async fn truncate_batch_desc(
1936 datadriven: &mut MachineState,
1937 args: DirectiveArgs<'_>,
1938 ) -> Result<String, anyhow::Error> {
1939 let input = args.expect_str("input");
1940 let output = args.expect_str("output");
1941 let lower = args.expect_antichain("lower");
1942 let upper = args.expect_antichain("upper");
1943
1944 let batch = datadriven
1945 .batches
1946 .get(input)
1947 .expect("unknown batch")
1948 .clone();
1949 let truncated_desc = Description::new(lower, upper, batch.batch.desc.since().clone());
1950 let () = validate_truncate_batch(&batch.batch, &truncated_desc, false, true)?;
1951 let mut new_hollow_batch = (*batch.batch).clone();
1952 new_hollow_batch.desc = truncated_desc;
1953 let new_batch = IdHollowBatch {
1954 batch: Arc::new(new_hollow_batch),
1955 id: batch.id,
1956 };
1957 datadriven
1958 .batches
1959 .insert(output.to_owned(), new_batch.clone());
1960 Ok(format!(
1961 "parts={} len={}\n",
1962 batch.batch.part_count(),
1963 batch.batch.len
1964 ))
1965 }
1966
1967 #[allow(clippy::unused_async)]
1968 pub async fn set_batch_parts_size(
1969 datadriven: &mut MachineState,
1970 args: DirectiveArgs<'_>,
1971 ) -> Result<String, anyhow::Error> {
1972 let input = args.expect_str("input");
1973 let size = args.expect("size");
1974 let batch = datadriven.batches.get_mut(input).expect("unknown batch");
1975 let mut hollow_batch = (*batch.batch).clone();
1976 for part in hollow_batch.parts.iter_mut() {
1977 match part {
1978 RunPart::Single(BatchPart::Hollow(x)) => x.encoded_size_bytes = size,
1979 _ => {
1980 panic!("set_batch_parts_size only supports hollow parts")
1981 }
1982 }
1983 }
1984 batch.batch = Arc::new(hollow_batch);
1985 Ok("ok\n".to_string())
1986 }
1987
1988 pub async fn compact(
1989 datadriven: &mut MachineState,
1990 args: DirectiveArgs<'_>,
1991 ) -> Result<String, anyhow::Error> {
1992 let output = args.expect_str("output");
1993 let lower = args.expect_antichain("lower");
1994 let upper = args.expect_antichain("upper");
1995 let since = args.expect_antichain("since");
1996 let target_size = args.optional("target_size");
1997 let memory_bound = args.optional("memory_bound");
1998
1999 let mut inputs = Vec::new();
2000 for input in args.args.get("inputs").expect("missing inputs") {
2001 inputs.push(
2002 datadriven
2003 .batches
2004 .get(input)
2005 .expect("unknown batch")
2006 .clone(),
2007 );
2008 }
2009
2010 let cfg = datadriven.client.cfg.clone();
2011 if let Some(target_size) = target_size {
2012 cfg.set_config(&BLOB_TARGET_SIZE, target_size);
2013 };
2014 if let Some(memory_bound) = memory_bound {
2015 cfg.set_config(&COMPACTION_MEMORY_BOUND_BYTES, memory_bound);
2016 }
2017 let req = CompactReq {
2018 shard_id: datadriven.shard_id,
2019 desc: Description::new(lower, upper, since),
2020 inputs: inputs.clone(),
2021 };
2022 datadriven
2023 .compactions
2024 .insert(output.to_owned(), req.clone());
2025 let spine_lower = inputs
2026 .first()
2027 .map_or_else(|| datadriven.next_id, |x| x.id.0);
2028 let spine_upper = inputs.last().map_or_else(
2029 || {
2030 datadriven.next_id += 1;
2031 datadriven.next_id
2032 },
2033 |x| x.id.1,
2034 );
2035 let new_spine_id = SpineId(spine_lower, spine_upper);
2036 let res = Compactor::<String, (), u64, i64>::compact(
2037 CompactConfig::new(&cfg, datadriven.shard_id),
2038 Arc::clone(&datadriven.client.blob),
2039 Arc::clone(&datadriven.client.metrics),
2040 Arc::clone(&datadriven.machine.applier.shard_metrics),
2041 Arc::clone(&datadriven.client.isolated_runtime),
2042 req,
2043 SCHEMAS.clone(),
2044 )
2045 .await?;
2046
2047 let batch = IdHollowBatch {
2048 batch: Arc::new(res.output.clone()),
2049 id: new_spine_id,
2050 };
2051
2052 datadriven.batches.insert(output.to_owned(), batch.clone());
2053 Ok(format!(
2054 "parts={} len={}\n",
2055 res.output.part_count(),
2056 res.output.len
2057 ))
2058 }
2059
2060 pub async fn clear_blob(
2061 datadriven: &MachineState,
2062 _args: DirectiveArgs<'_>,
2063 ) -> Result<String, anyhow::Error> {
2064 let mut to_delete = vec![];
2065 datadriven
2066 .client
2067 .blob
2068 .list_keys_and_metadata("", &mut |meta| {
2069 to_delete.push(meta.key.to_owned());
2070 })
2071 .await?;
2072 for blob in &to_delete {
2073 datadriven.client.blob.delete(blob).await?;
2074 }
2075 Ok(format!("deleted={}\n", to_delete.len()))
2076 }
2077
2078 pub async fn restore_blob(
2079 datadriven: &MachineState,
2080 _args: DirectiveArgs<'_>,
2081 ) -> Result<String, anyhow::Error> {
2082 let not_restored = crate::internal::restore::restore_blob(
2083 &datadriven.state_versions,
2084 datadriven.client.blob.as_ref(),
2085 &datadriven.client.cfg.build_version,
2086 datadriven.shard_id,
2087 &*datadriven.state_versions.metrics,
2088 )
2089 .await?;
2090 let mut out = String::new();
2091 for key in not_restored {
2092 writeln!(&mut out, "{key}");
2093 }
2094 Ok(out)
2095 }
2096
2097 #[allow(clippy::unused_async)]
2098 pub async fn rewrite_ts(
2099 datadriven: &mut MachineState,
2100 args: DirectiveArgs<'_>,
2101 ) -> Result<String, anyhow::Error> {
2102 let input = args.expect_str("input");
2103 let ts_rewrite = args.expect_antichain("frontier");
2104 let upper = args.expect_antichain("upper");
2105
2106 let batch = datadriven.batches.get_mut(input).expect("unknown batch");
2107 let mut hollow_batch = (*batch.batch).clone();
2108 let () = hollow_batch
2109 .rewrite_ts(&ts_rewrite, upper)
2110 .map_err(|err| anyhow!("invalid rewrite: {}", err))?;
2111 batch.batch = Arc::new(hollow_batch);
2112 Ok("ok\n".into())
2113 }
2114
2115 pub async fn gc(
2116 datadriven: &mut MachineState,
2117 args: DirectiveArgs<'_>,
2118 ) -> Result<String, anyhow::Error> {
2119 let new_seqno_since = args.expect("to_seqno");
2120
2121 let req = GcReq {
2122 shard_id: datadriven.shard_id,
2123 new_seqno_since,
2124 };
2125 let (maintenance, stats) =
2126 GarbageCollector::gc_and_truncate(&datadriven.machine, req).await;
2127 datadriven.routine.push(maintenance);
2128
2129 Ok(format!(
2130 "{} batch_parts={} rollups={} truncated={} state_rollups={}\n",
2131 datadriven.machine.seqno(),
2132 stats.batch_parts_deleted_from_blob,
2133 stats.rollups_deleted_from_blob,
2134 stats
2135 .truncated_consensus_to
2136 .iter()
2137 .map(|x| x.to_string())
2138 .collect::<Vec<_>>()
2139 .join(","),
2140 stats
2141 .rollups_removed_from_state
2142 .iter()
2143 .map(|x| x.to_string())
2144 .collect::<Vec<_>>()
2145 .join(","),
2146 ))
2147 }
2148
2149 pub async fn snapshot(
2150 datadriven: &MachineState,
2151 args: DirectiveArgs<'_>,
2152 ) -> Result<String, anyhow::Error> {
2153 let as_of = args.expect_antichain("as_of");
2154 let snapshot = datadriven
2155 .machine
2156 .snapshot(&as_of)
2157 .await
2158 .map_err(|err| anyhow!("{:?}", err))?;
2159
2160 let mut result = String::new();
2161
2162 for batch in snapshot {
2163 writeln!(
2164 result,
2165 "<batch {:?}-{:?}>",
2166 batch.desc.lower().elements(),
2167 batch.desc.upper().elements()
2168 );
2169 for (run, (_meta, parts)) in batch.runs().enumerate() {
2170 writeln!(result, "<run {run}>");
2171 let mut stream = pin!(
2172 futures::stream::iter(parts)
2173 .flat_map(|part| part.part_stream(
2174 datadriven.shard_id,
2175 &*datadriven.state_versions.blob,
2176 &*datadriven.state_versions.metrics
2177 ))
2178 .enumerate()
2179 );
2180
2181 while let Some((idx, part)) = stream.next().await {
2182 let part = &*part?;
2183 writeln!(result, "<part {idx}>");
2184
2185 let part = EncodedPart::fetch(
2186 &FetchConfig::from_persist_config(&datadriven.client.cfg),
2187 &datadriven.shard_id,
2188 datadriven.client.blob.as_ref(),
2189 datadriven.client.metrics.as_ref(),
2190 datadriven.machine.applier.shard_metrics.as_ref(),
2191 &datadriven.client.metrics.read.batch_fetcher,
2192 &batch.desc,
2193 part,
2194 )
2195 .await
2196 .expect("invalid batch part");
2197 let part = part
2198 .normalize(&datadriven.client.metrics.columnar)
2199 .into_part::<String, ()>(&*SCHEMAS.key, &*SCHEMAS.val);
2200
2201 let mut updates = Vec::new();
2202
2203 for ((k, _v), mut t, d) in part
2204 .decode_iter::<_, _, u64, i64>(&*SCHEMAS.key, &*SCHEMAS.val)
2205 .expect("valid schemas")
2206 {
2207 t.advance_by(as_of.borrow());
2208 updates.push((k, t, d));
2209 }
2210
2211 consolidate_updates(&mut updates);
2212
2213 for (k, t, d) in updates {
2214 writeln!(result, "{k} {t} {d}");
2215 }
2216 }
2217 }
2218 }
2219
2220 Ok(result)
2221 }
2222
2223 pub async fn register_listen(
2224 datadriven: &mut MachineState,
2225 args: DirectiveArgs<'_>,
2226 ) -> Result<String, anyhow::Error> {
2227 let output = args.expect_str("output");
2228 let as_of = args.expect_antichain("as_of");
2229 let read = datadriven
2230 .client
2231 .open_leased_reader::<String, (), u64, i64>(
2232 datadriven.shard_id,
2233 Arc::new(StringSchema),
2234 Arc::new(UnitSchema),
2235 Diagnostics::for_tests(),
2236 true,
2237 )
2238 .await
2239 .expect("invalid shard types");
2240 let listen = read
2241 .listen(as_of)
2242 .await
2243 .map_err(|err| anyhow!("{:?}", err))?;
2244 datadriven.listens.insert(output.to_owned(), listen);
2245 Ok("ok\n".into())
2246 }
2247
2248 pub async fn listen_through(
2249 datadriven: &mut MachineState,
2250 args: DirectiveArgs<'_>,
2251 ) -> Result<String, anyhow::Error> {
2252 let input = args.expect_str("input");
2253 let frontier = args.expect("frontier");
2256 let listen = datadriven.listens.get_mut(input).expect("unknown listener");
2257 let mut s = String::new();
2258 loop {
2259 for event in listen.fetch_next().await {
2260 match event {
2261 ListenEvent::Updates(x) => {
2262 for ((k, _v), t, d) in x.iter() {
2263 write!(s, "{} {} {}\n", k.as_ref().unwrap(), t, d);
2264 }
2265 }
2266 ListenEvent::Progress(x) => {
2267 if !x.less_than(&frontier) {
2268 return Ok(s);
2269 }
2270 }
2271 }
2272 }
2273 }
2274 }
2275
2276 pub async fn register_critical_reader(
2277 datadriven: &mut MachineState,
2278 args: DirectiveArgs<'_>,
2279 ) -> Result<String, anyhow::Error> {
2280 let reader_id = args.expect("reader_id");
2281 let (state, maintenance) = datadriven
2282 .machine
2283 .register_critical_reader::<u64>(&reader_id, "tests")
2284 .await;
2285 datadriven.routine.push(maintenance);
2286 Ok(format!(
2287 "{} {:?}\n",
2288 datadriven.machine.seqno(),
2289 state.since.elements(),
2290 ))
2291 }
2292
2293 pub async fn register_leased_reader(
2294 datadriven: &mut MachineState,
2295 args: DirectiveArgs<'_>,
2296 ) -> Result<String, anyhow::Error> {
2297 let reader_id = args.expect("reader_id");
2298 let (reader_state, maintenance) = datadriven
2299 .machine
2300 .register_leased_reader(
2301 &reader_id,
2302 "tests",
2303 READER_LEASE_DURATION.get(&datadriven.client.cfg),
2304 (datadriven.client.cfg.now)(),
2305 false,
2306 )
2307 .await;
2308 datadriven.routine.push(maintenance);
2309 Ok(format!(
2310 "{} {:?}\n",
2311 datadriven.machine.seqno(),
2312 reader_state.since.elements(),
2313 ))
2314 }
2315
2316 pub async fn heartbeat_leased_reader(
2317 datadriven: &MachineState,
2318 args: DirectiveArgs<'_>,
2319 ) -> Result<String, anyhow::Error> {
2320 let reader_id = args.expect("reader_id");
2321 let _ = datadriven
2322 .machine
2323 .heartbeat_leased_reader(&reader_id, (datadriven.client.cfg.now)())
2324 .await;
2325 Ok(format!("{} ok\n", datadriven.machine.seqno()))
2326 }
2327
2328 pub async fn expire_critical_reader(
2329 datadriven: &mut MachineState,
2330 args: DirectiveArgs<'_>,
2331 ) -> Result<String, anyhow::Error> {
2332 let reader_id = args.expect("reader_id");
2333 let (_, maintenance) = datadriven.machine.expire_critical_reader(&reader_id).await;
2334 datadriven.routine.push(maintenance);
2335 Ok(format!("{} ok\n", datadriven.machine.seqno()))
2336 }
2337
2338 pub async fn expire_leased_reader(
2339 datadriven: &mut MachineState,
2340 args: DirectiveArgs<'_>,
2341 ) -> Result<String, anyhow::Error> {
2342 let reader_id = args.expect("reader_id");
2343 let (_, maintenance) = datadriven.machine.expire_leased_reader(&reader_id).await;
2344 datadriven.routine.push(maintenance);
2345 Ok(format!("{} ok\n", datadriven.machine.seqno()))
2346 }
2347
2348 pub async fn compare_and_append_batches(
2349 datadriven: &MachineState,
2350 args: DirectiveArgs<'_>,
2351 ) -> Result<String, anyhow::Error> {
2352 let expected_upper = args.expect_antichain("expected_upper");
2353 let new_upper = args.expect_antichain("new_upper");
2354
2355 let mut batches: Vec<Batch<String, (), u64, i64>> = args
2356 .args
2357 .get("batches")
2358 .expect("missing batches")
2359 .into_iter()
2360 .map(|batch| {
2361 let hollow = (*datadriven
2362 .batches
2363 .get(batch)
2364 .expect("unknown batch")
2365 .clone()
2366 .batch)
2367 .clone();
2368 datadriven.to_batch(hollow)
2369 })
2370 .collect();
2371
2372 let mut writer = datadriven
2373 .client
2374 .open_writer(
2375 datadriven.shard_id,
2376 Arc::new(StringSchema),
2377 Arc::new(UnitSchema),
2378 Diagnostics::for_tests(),
2379 )
2380 .await?;
2381
2382 let mut batch_refs: Vec<_> = batches.iter_mut().collect();
2383
2384 let () = writer
2385 .compare_and_append_batch(batch_refs.as_mut_slice(), expected_upper, new_upper, true)
2386 .await?
2387 .map_err(|err| anyhow!("upper mismatch: {:?}", err))?;
2388
2389 writer.expire().await;
2390
2391 Ok("ok\n".into())
2392 }
2393
2394 pub async fn expire_writer(
2395 datadriven: &mut MachineState,
2396 args: DirectiveArgs<'_>,
2397 ) -> Result<String, anyhow::Error> {
2398 let writer_id = args.expect("writer_id");
2399 let (_, maintenance) = datadriven.machine.expire_writer(&writer_id).await;
2400 datadriven.routine.push(maintenance);
2401 Ok(format!("{} ok\n", datadriven.machine.seqno()))
2402 }
2403
2404 pub(crate) async fn finalize(
2405 datadriven: &mut MachineState,
2406 _args: DirectiveArgs<'_>,
2407 ) -> anyhow::Result<String> {
2408 let maintenance = datadriven.machine.become_tombstone().await?;
2409 datadriven.routine.push(maintenance);
2410 Ok(format!("{} ok\n", datadriven.machine.seqno()))
2411 }
2412
2413 pub(crate) fn is_finalized(
2414 datadriven: &MachineState,
2415 _args: DirectiveArgs<'_>,
2416 ) -> anyhow::Result<String> {
2417 let seqno = datadriven.machine.seqno();
2418 let tombstone = datadriven.machine.is_finalized();
2419 Ok(format!("{seqno} {tombstone}\n"))
2420 }
2421
2422 pub async fn compare_and_append(
2423 datadriven: &mut MachineState,
2424 args: DirectiveArgs<'_>,
2425 ) -> Result<String, anyhow::Error> {
2426 let input = args.expect_str("input");
2427 let writer_id = args.expect("writer_id");
2428 let mut batch = datadriven
2429 .batches
2430 .get(input)
2431 .expect("unknown batch")
2432 .clone();
2433 let token = args.optional("token").unwrap_or_else(IdempotencyToken::new);
2434 let now = (datadriven.client.cfg.now)();
2435
2436 let (id, maintenance) = datadriven
2437 .machine
2438 .register_schema(&*SCHEMAS.key, &*SCHEMAS.val)
2439 .await;
2440 assert_eq!(id, SCHEMAS.id);
2441 datadriven.routine.push(maintenance);
2442 let maintenance = loop {
2443 let indeterminate = args
2444 .optional::<String>("prev_indeterminate")
2445 .map(|x| Indeterminate::new(anyhow::Error::msg(x)));
2446 let res = datadriven
2447 .machine
2448 .compare_and_append_idempotent(
2449 &batch.batch,
2450 &writer_id,
2451 now,
2452 &token,
2453 &HandleDebugState::default(),
2454 indeterminate,
2455 )
2456 .await;
2457 match res {
2458 CompareAndAppendRes::Success(_, x) => break x,
2459 CompareAndAppendRes::UpperMismatch(_seqno, upper) => {
2460 return Err(anyhow!("{:?}", Upper(upper)));
2461 }
2462 CompareAndAppendRes::InlineBackpressure => {
2463 let hollow_batch = (*batch.batch).clone();
2464 let mut b = datadriven.to_batch(hollow_batch);
2465 let cfg = BatchBuilderConfig::new(&datadriven.client.cfg, datadriven.shard_id);
2466 b.flush_to_blob(
2467 &cfg,
2468 &datadriven.client.metrics.user,
2469 &datadriven.client.isolated_runtime,
2470 &*SCHEMAS,
2471 )
2472 .await;
2473 batch.batch = Arc::new(b.into_hollow_batch());
2474 continue;
2475 }
2476 _ => panic!("{:?}", res),
2477 };
2478 };
2479 datadriven.routine.push(maintenance.routine);
2482 Ok(format!(
2483 "{} {:?}\n",
2484 datadriven.machine.seqno(),
2485 datadriven.machine.applier.clone_upper().elements(),
2486 ))
2487 }
2488
2489 pub async fn apply_merge_res(
2490 datadriven: &mut MachineState,
2491 args: DirectiveArgs<'_>,
2492 ) -> Result<String, anyhow::Error> {
2493 let input = args.expect_str("input");
2494 let batch = datadriven
2495 .batches
2496 .get(input)
2497 .expect("unknown batch")
2498 .clone();
2499 let compact_req = datadriven
2500 .compactions
2501 .get(input)
2502 .expect("unknown compact req")
2503 .clone();
2504 let input_batches = compact_req
2505 .inputs
2506 .iter()
2507 .map(|x| x.id)
2508 .collect::<BTreeSet<_>>();
2509 let lower_spine_bound = input_batches
2510 .first()
2511 .map(|id| id.0)
2512 .expect("at least one batch must be present");
2513 let upper_spine_bound = input_batches
2514 .last()
2515 .map(|id| id.1)
2516 .expect("at least one batch must be present");
2517 let id = SpineId(lower_spine_bound, upper_spine_bound);
2518 let hollow_batch = (*batch.batch).clone();
2519
2520 let (merge_res, maintenance) = datadriven
2521 .machine
2522 .merge_res(&FueledMergeRes {
2523 output: hollow_batch,
2524 input: CompactionInput::IdRange(id),
2525 new_active_compaction: None,
2526 })
2527 .await;
2528 datadriven.routine.push(maintenance);
2529 Ok(format!(
2530 "{} {}\n",
2531 datadriven.machine.seqno(),
2532 merge_res.applied()
2533 ))
2534 }
2535
2536 pub async fn perform_maintenance(
2537 datadriven: &mut MachineState,
2538 _args: DirectiveArgs<'_>,
2539 ) -> Result<String, anyhow::Error> {
2540 let mut s = String::new();
2541 for maintenance in datadriven.routine.drain(..) {
2542 let () = maintenance
2543 .perform(&datadriven.machine, &datadriven.gc)
2544 .await;
2545 let () = datadriven
2546 .machine
2547 .applier
2548 .fetch_and_update_state(None)
2549 .await;
2550 write!(s, "{} ok\n", datadriven.machine.seqno());
2551 }
2552 Ok(s)
2553 }
2554}
2555
2556#[cfg(test)]
2557pub mod tests {
2558 use std::sync::Arc;
2559
2560 use mz_dyncfg::ConfigUpdates;
2561 use mz_ore::cast::CastFrom;
2562 use mz_ore::task::spawn;
2563 use mz_persist::intercept::{InterceptBlob, InterceptHandle};
2564 use mz_persist::location::SeqNo;
2565 use timely::progress::Antichain;
2566
2567 use crate::ShardId;
2568 use crate::batch::BatchBuilderConfig;
2569 use crate::cache::StateCache;
2570 use crate::internal::gc::{GarbageCollector, GcReq};
2571 use crate::internal::state::{HandleDebugState, ROLLUP_THRESHOLD};
2572 use crate::tests::new_test_client;
2573
2574 #[mz_persist_proc::test(tokio::test(flavor = "multi_thread"))]
2575 #[cfg_attr(miri, ignore)] async fn apply_unbatched_cmd_truncate(dyncfgs: ConfigUpdates) {
2577 mz_ore::test::init_logging();
2578
2579 let client = new_test_client(&dyncfgs).await;
2580 client.cfg.set_config(&ROLLUP_THRESHOLD, 5);
2582 let (mut write, _) = client
2583 .expect_open::<String, (), u64, i64>(ShardId::new())
2584 .await;
2585
2586 const NUM_BATCHES: u64 = 100;
2589 for idx in 0..NUM_BATCHES {
2590 let mut batch = write
2591 .expect_batch(&[((idx.to_string(), ()), idx, 1)], idx, idx + 1)
2592 .await;
2593 let cfg = BatchBuilderConfig::new(&client.cfg, write.shard_id());
2596 batch
2597 .flush_to_blob(
2598 &cfg,
2599 &client.metrics.user,
2600 &client.isolated_runtime,
2601 &write.write_schemas,
2602 )
2603 .await;
2604 let (_, writer_maintenance) = write
2605 .machine
2606 .compare_and_append(
2607 &batch.into_hollow_batch(),
2608 &write.writer_id,
2609 &HandleDebugState::default(),
2610 (write.cfg.now)(),
2611 )
2612 .await
2613 .unwrap();
2614 writer_maintenance
2615 .perform(&write.machine, &write.gc, write.compact.as_ref())
2616 .await;
2617 }
2618 let live_diffs = write
2619 .machine
2620 .applier
2621 .state_versions
2622 .fetch_all_live_diffs(&write.machine.shard_id())
2623 .await;
2624 assert!(live_diffs.0.len() > 0);
2626 let max_live_diffs = 2 * usize::cast_from(NUM_BATCHES.next_power_of_two().trailing_zeros());
2630 assert!(
2631 live_diffs.0.len() <= max_live_diffs,
2632 "{} vs {}",
2633 live_diffs.0.len(),
2634 max_live_diffs
2635 );
2636 }
2637
2638 #[mz_persist_proc::test(tokio::test(flavor = "multi_thread"))]
2642 #[cfg_attr(miri, ignore)] async fn regression_gc_skipped_req_and_interrupted(dyncfgs: ConfigUpdates) {
2644 let mut client = new_test_client(&dyncfgs).await;
2645 let intercept = InterceptHandle::default();
2646 client.blob = Arc::new(InterceptBlob::new(
2647 Arc::clone(&client.blob),
2648 intercept.clone(),
2649 ));
2650 let (_, mut read) = client
2651 .expect_open::<String, String, u64, i64>(ShardId::new())
2652 .await;
2653
2654 read.downgrade_since(&Antichain::from_elem(1)).await;
2656 let new_seqno_since = read.machine.applier.seqno_since();
2657
2658 assert!(new_seqno_since > SeqNo::minimum());
2663 intercept.set_post_delete(Some(Arc::new(|_, _| panic!("boom"))));
2664 let machine = read.machine.clone();
2665 let gc = spawn(|| "", async move {
2667 let req = GcReq {
2668 shard_id: machine.shard_id(),
2669 new_seqno_since,
2670 };
2671 GarbageCollector::gc_and_truncate(&machine, req).await
2672 });
2673 let _ = gc.await;
2676
2677 intercept.set_post_delete(None);
2680 let req = GcReq {
2681 shard_id: read.machine.shard_id(),
2682 new_seqno_since,
2683 };
2684 let _ = GarbageCollector::gc_and_truncate(&read.machine, req.clone()).await;
2685 }
2686
2687 #[mz_persist_proc::test(tokio::test(flavor = "multi_thread"))]
2692 #[cfg_attr(miri, ignore)] async fn regression_update_state_after_upper_mismatch(dyncfgs: ConfigUpdates) {
2694 let client = new_test_client(&dyncfgs).await;
2695 let mut client2 = client.clone();
2696
2697 let new_state_cache = Arc::new(StateCache::new_no_metrics());
2700 client2.shared_states = new_state_cache;
2701
2702 let shard_id = ShardId::new();
2703 let (mut write1, _) = client.expect_open::<String, (), u64, i64>(shard_id).await;
2704 let (mut write2, _) = client2.expect_open::<String, (), u64, i64>(shard_id).await;
2705
2706 let data = [
2707 (("1".to_owned(), ()), 1, 1),
2708 (("2".to_owned(), ()), 2, 1),
2709 (("3".to_owned(), ()), 3, 1),
2710 (("4".to_owned(), ()), 4, 1),
2711 (("5".to_owned(), ()), 5, 1),
2712 ];
2713
2714 write1.expect_compare_and_append(&data[..1], 0, 2).await;
2715
2716 write2.expect_compare_and_append(&data[1..2], 2, 3).await;
2719 }
2720}