1use std::fmt::Debug;
13use std::ops::ControlFlow::{self, Continue};
14use std::sync::Arc;
15use std::time::{Duration, Instant, SystemTime};
16
17use differential_dataflow::difference::Semigroup;
18use differential_dataflow::lattice::Lattice;
19use futures::FutureExt;
20use futures::future::{self, BoxFuture};
21use mz_dyncfg::{Config, ConfigSet};
22use mz_ore::assert_none;
23use mz_ore::cast::CastFrom;
24use mz_ore::error::ErrorExt;
25#[allow(unused_imports)] use mz_ore::fmt::FormatBuffer;
27use mz_ore::task::JoinHandle;
28use mz_persist::location::{ExternalError, Indeterminate, SeqNo};
29use mz_persist::retry::Retry;
30use mz_persist_types::schema::SchemaId;
31use mz_persist_types::{Codec, Codec64, Opaque};
32use semver::Version;
33use timely::PartialOrder;
34use timely::progress::{Antichain, Timestamp};
35use tracing::{Instrument, debug, info, trace_span, warn};
36
37use crate::async_runtime::IsolatedRuntime;
38use crate::batch::INLINE_WRITES_TOTAL_MAX_BYTES;
39use crate::cache::StateCache;
40use crate::cfg::RetryParameters;
41use crate::critical::CriticalReaderId;
42use crate::error::{CodecMismatch, InvalidUsage};
43use crate::internal::apply::Applier;
44use crate::internal::compact::CompactReq;
45use crate::internal::gc::GarbageCollector;
46use crate::internal::maintenance::{RoutineMaintenance, WriterMaintenance};
47use crate::internal::metrics::{CmdMetrics, Metrics, MetricsRetryStream, RetryMetrics};
48use crate::internal::paths::PartialRollupKey;
49use crate::internal::state::{
50 CompareAndAppendBreak, CriticalReaderState, HandleDebugState, HollowBatch, HollowRollup,
51 IdempotencyToken, LeasedReaderState, NoOpStateTransition, Since, SnapshotErr, StateCollections,
52 Upper,
53};
54use crate::internal::state_versions::StateVersions;
55use crate::internal::trace::{ApplyMergeResult, FueledMergeRes};
56use crate::internal::watch::StateWatch;
57use crate::read::{LeasedReaderId, READER_LEASE_DURATION};
58use crate::rpc::PubSubSender;
59use crate::schema::CaESchema;
60use crate::write::WriterId;
61use crate::{Diagnostics, PersistConfig, ShardId};
62
63#[derive(Debug)]
64pub struct Machine<K, V, T, D> {
65 pub(crate) applier: Applier<K, V, T, D>,
66 pub(crate) isolated_runtime: Arc<IsolatedRuntime>,
67}
68
69impl<K, V, T: Clone, D> Clone for Machine<K, V, T, D> {
71 fn clone(&self) -> Self {
72 Self {
73 applier: self.applier.clone(),
74 isolated_runtime: Arc::clone(&self.isolated_runtime),
75 }
76 }
77}
78
79pub(crate) const CLAIM_UNCLAIMED_COMPACTIONS: Config<bool> = Config::new(
80 "persist_claim_unclaimed_compactions",
81 false,
82 "If an append doesn't result in a compaction request, but there is some uncompacted batch \
83 in state, compact that instead.",
84);
85
86pub(crate) const CLAIM_COMPACTION_PERCENT: Config<usize> = Config::new(
87 "persist_claim_compaction_percent",
88 100,
89 "Claim a compaction with the given percent chance, if claiming compactions is enabled. \
90 (If over 100, we'll always claim at least one; for example, if set to 365, we'll claim at least \
91 three and have a 65% chance of claiming a fourth.)",
92);
93
94pub(crate) const CLAIM_COMPACTION_MIN_VERSION: Config<String> = Config::new(
95 "persist_claim_compaction_min_version",
96 String::new(),
97 "If set to a valid version string, compact away any earlier versions if possible.",
98);
99
100impl<K, V, T, D> Machine<K, V, T, D>
101where
102 K: Debug + Codec,
103 V: Debug + Codec,
104 T: Timestamp + Lattice + Codec64 + Sync,
105 D: Semigroup + Codec64,
106{
107 pub async fn new(
108 cfg: PersistConfig,
109 shard_id: ShardId,
110 metrics: Arc<Metrics>,
111 state_versions: Arc<StateVersions>,
112 shared_states: Arc<StateCache>,
113 pubsub_sender: Arc<dyn PubSubSender>,
114 isolated_runtime: Arc<IsolatedRuntime>,
115 diagnostics: Diagnostics,
116 ) -> Result<Self, Box<CodecMismatch>> {
117 let applier = Applier::new(
118 cfg,
119 shard_id,
120 metrics,
121 state_versions,
122 shared_states,
123 pubsub_sender,
124 diagnostics,
125 )
126 .await?;
127 Ok(Machine {
128 applier,
129 isolated_runtime,
130 })
131 }
132
133 pub fn shard_id(&self) -> ShardId {
134 self.applier.shard_id
135 }
136
137 pub fn seqno(&self) -> SeqNo {
138 self.applier.seqno()
139 }
140
141 pub async fn add_rollup_for_current_seqno(&self) -> RoutineMaintenance {
142 let rollup = self.applier.write_rollup_for_state().await;
143 let Some(rollup) = rollup else {
144 return RoutineMaintenance::default();
145 };
146
147 let (applied, maintenance) = self.add_rollup((rollup.seqno, &rollup.to_hollow())).await;
148 if !applied {
149 self.applier
152 .state_versions
153 .delete_rollup(&rollup.shard_id, &rollup.key)
154 .await;
155 }
156 maintenance
157 }
158
159 pub async fn add_rollup(
160 &self,
161 add_rollup: (SeqNo, &HollowRollup),
162 ) -> (bool, RoutineMaintenance) {
163 let mut applied_ever_true = false;
166 let metrics = Arc::clone(&self.applier.metrics);
167 let (_seqno, _applied, maintenance) = self
168 .apply_unbatched_idempotent_cmd(&metrics.cmds.add_rollup, |_, _, state| {
169 let ret = state.add_rollup(add_rollup);
170 if let Continue(applied) = ret {
171 applied_ever_true = applied_ever_true || applied;
172 }
173 ret
174 })
175 .await;
176 (applied_ever_true, maintenance)
177 }
178
179 pub async fn remove_rollups(
180 &self,
181 remove_rollups: &[(SeqNo, PartialRollupKey)],
182 ) -> (Vec<SeqNo>, RoutineMaintenance) {
183 let metrics = Arc::clone(&self.applier.metrics);
184 let (_seqno, removed_rollup_seqnos, maintenance) = self
185 .apply_unbatched_idempotent_cmd(&metrics.cmds.remove_rollups, |_, _, state| {
186 state.remove_rollups(remove_rollups)
187 })
188 .await;
189 (removed_rollup_seqnos, maintenance)
190 }
191
192 pub async fn register_leased_reader(
193 &self,
194 reader_id: &LeasedReaderId,
195 purpose: &str,
196 lease_duration: Duration,
197 heartbeat_timestamp_ms: u64,
198 use_critical_since: bool,
199 ) -> (LeasedReaderState<T>, RoutineMaintenance) {
200 let metrics = Arc::clone(&self.applier.metrics);
201 let (_seqno, (reader_state, seqno_since), maintenance) = self
202 .apply_unbatched_idempotent_cmd(&metrics.cmds.register, |seqno, cfg, state| {
203 state.register_leased_reader(
204 &cfg.hostname,
205 reader_id,
206 purpose,
207 seqno,
208 lease_duration,
209 heartbeat_timestamp_ms,
210 use_critical_since,
211 )
212 })
213 .await;
214 debug_assert!(
223 reader_state.seqno >= seqno_since,
224 "{} vs {}",
225 reader_state.seqno,
226 seqno_since,
227 );
228 (reader_state, maintenance)
229 }
230
231 pub async fn register_critical_reader<O: Opaque + Codec64>(
232 &self,
233 reader_id: &CriticalReaderId,
234 purpose: &str,
235 ) -> (CriticalReaderState<T>, RoutineMaintenance) {
236 let metrics = Arc::clone(&self.applier.metrics);
237 let (_seqno, state, maintenance) = self
238 .apply_unbatched_idempotent_cmd(&metrics.cmds.register, |_seqno, cfg, state| {
239 state.register_critical_reader::<O>(&cfg.hostname, reader_id, purpose)
240 })
241 .await;
242 (state, maintenance)
243 }
244
245 pub async fn register_schema(
246 &self,
247 key_schema: &K::Schema,
248 val_schema: &V::Schema,
249 ) -> (Option<SchemaId>, RoutineMaintenance) {
250 let metrics = Arc::clone(&self.applier.metrics);
251 let (_seqno, state, maintenance) = self
252 .apply_unbatched_idempotent_cmd(&metrics.cmds.register, |_seqno, _cfg, state| {
253 state.register_schema::<K, V>(key_schema, val_schema)
254 })
255 .await;
256 (state, maintenance)
257 }
258
259 pub async fn spine_exert(&self, fuel: usize) -> (Vec<CompactReq<T>>, RoutineMaintenance) {
260 if fuel == 0 || self.applier.all_batches().len() < 2 {
262 return (Vec::new(), RoutineMaintenance::default());
263 }
264
265 let metrics = Arc::clone(&self.applier.metrics);
266 let (_seqno, reqs, maintenance) = self
267 .apply_unbatched_idempotent_cmd(&metrics.cmds.spine_exert, |_seqno, _cfg, state| {
268 state.spine_exert(fuel)
269 })
270 .await;
271 let reqs = reqs
272 .into_iter()
273 .map(|req| CompactReq {
274 shard_id: self.shard_id(),
275 desc: req.desc,
276 inputs: req
277 .inputs
278 .into_iter()
279 .map(|b| Arc::unwrap_or_clone(b.batch))
280 .collect(),
281 })
282 .collect();
283 (reqs, maintenance)
284 }
285
286 pub async fn compare_and_append(
287 &self,
288 batch: &HollowBatch<T>,
289 writer_id: &WriterId,
290 debug_info: &HandleDebugState,
291 heartbeat_timestamp_ms: u64,
292 ) -> CompareAndAppendRes<T> {
293 let idempotency_token = IdempotencyToken::new();
294 loop {
295 let res = self
296 .compare_and_append_idempotent(
297 batch,
298 writer_id,
299 heartbeat_timestamp_ms,
300 &idempotency_token,
301 debug_info,
302 None,
303 )
304 .await;
305 match res {
306 CompareAndAppendRes::Success(seqno, maintenance) => {
307 return CompareAndAppendRes::Success(seqno, maintenance);
308 }
309 CompareAndAppendRes::InvalidUsage(x) => {
310 return CompareAndAppendRes::InvalidUsage(x);
311 }
312 CompareAndAppendRes::InlineBackpressure => {
313 return CompareAndAppendRes::InlineBackpressure;
314 }
315 CompareAndAppendRes::UpperMismatch(seqno, _current_upper) => {
316 self.applier.fetch_and_update_state(Some(seqno)).await;
323 let (current_seqno, current_upper) =
324 self.applier.upper(|seqno, upper| (seqno, upper.clone()));
325
326 if ¤t_upper != batch.desc.lower() {
329 return CompareAndAppendRes::UpperMismatch(current_seqno, current_upper);
330 } else {
331 }
334 }
335 }
336 }
337 }
338
339 async fn compare_and_append_idempotent(
340 &self,
341 batch: &HollowBatch<T>,
342 writer_id: &WriterId,
343 heartbeat_timestamp_ms: u64,
344 idempotency_token: &IdempotencyToken,
345 debug_info: &HandleDebugState,
346 mut indeterminate: Option<Indeterminate>,
350 ) -> CompareAndAppendRes<T> {
351 let metrics = Arc::clone(&self.applier.metrics);
352 let lease_duration_ms = self
353 .applier
354 .cfg
355 .writer_lease_duration
356 .as_millis()
357 .try_into()
358 .expect("reasonable duration");
359 let mut retry = self
442 .applier
443 .metrics
444 .retries
445 .compare_and_append_idempotent
446 .stream(Retry::persist_defaults(SystemTime::now()).into_retry_stream());
447 let mut writer_was_present = false;
448 loop {
449 let cmd_res = self
450 .applier
451 .apply_unbatched_cmd(&metrics.cmds.compare_and_append, |_, cfg, state| {
452 writer_was_present = state.writers.contains_key(writer_id);
453 state.compare_and_append(
454 batch,
455 writer_id,
456 heartbeat_timestamp_ms,
457 lease_duration_ms,
458 idempotency_token,
459 debug_info,
460 INLINE_WRITES_TOTAL_MAX_BYTES.get(cfg),
461 if CLAIM_UNCLAIMED_COMPACTIONS.get(cfg) {
462 CLAIM_COMPACTION_PERCENT.get(cfg)
463 } else {
464 0
465 },
466 Version::parse(&CLAIM_COMPACTION_MIN_VERSION.get(cfg))
467 .ok()
468 .as_ref(),
469 )
470 })
471 .await;
472 let (seqno, res, routine) = match cmd_res {
473 Ok(x) => x,
474 Err(err) => {
475 info!(
478 "compare_and_append received an indeterminate error, retrying in {:?}: {}",
479 retry.next_sleep(),
480 err
481 );
482 if indeterminate.is_none() {
483 indeterminate = Some(err);
484 }
485 retry = retry.sleep().await;
486 continue;
487 }
488 };
489 match res {
490 Ok(merge_reqs) => {
491 let mut compact_reqs = Vec::with_capacity(merge_reqs.len());
494 for req in merge_reqs {
495 let req = CompactReq {
496 shard_id: self.shard_id(),
497 desc: req.desc,
498 inputs: req
499 .inputs
500 .into_iter()
501 .map(|b| Arc::unwrap_or_clone(b.batch))
502 .collect(),
503 };
504 compact_reqs.push(req);
505 }
506 let writer_maintenance = WriterMaintenance {
507 routine,
508 compaction: compact_reqs,
509 };
510
511 if !writer_was_present {
512 metrics.state.writer_added.inc();
513 }
514 for part in &batch.parts {
515 if part.is_inline() {
516 let bytes = u64::cast_from(part.inline_bytes());
517 metrics.inline.part_commit_bytes.inc_by(bytes);
518 metrics.inline.part_commit_count.inc();
519 }
520 }
521 return CompareAndAppendRes::Success(seqno, writer_maintenance);
522 }
523 Err(CompareAndAppendBreak::AlreadyCommitted) => {
524 assert!(indeterminate.is_some());
528 self.applier.metrics.cmds.compare_and_append_noop.inc();
529 if !writer_was_present {
530 metrics.state.writer_added.inc();
531 }
532 return CompareAndAppendRes::Success(seqno, WriterMaintenance::default());
533 }
534 Err(CompareAndAppendBreak::InvalidUsage(err)) => {
535 assert_none!(indeterminate);
540 return CompareAndAppendRes::InvalidUsage(err);
541 }
542 Err(CompareAndAppendBreak::InlineBackpressure) => {
543 return CompareAndAppendRes::InlineBackpressure;
546 }
547 Err(CompareAndAppendBreak::Upper {
548 shard_upper,
549 writer_upper,
550 }) => {
551 assert!(
556 PartialOrder::less_equal(&writer_upper, &shard_upper),
557 "{:?} vs {:?}",
558 &writer_upper,
559 &shard_upper
560 );
561 if PartialOrder::less_than(&writer_upper, batch.desc.upper()) {
562 return CompareAndAppendRes::UpperMismatch(seqno, shard_upper);
566 }
567 if indeterminate.is_none() {
568 return CompareAndAppendRes::UpperMismatch(seqno, shard_upper);
573 }
574 panic!(
584 concat!(
585 "cannot distinguish compare_and_append success or failure ",
586 "caa_lower={:?} caa_upper={:?} writer_upper={:?} shard_upper={:?} err={:?}"
587 ),
588 batch.desc.lower().elements(),
589 batch.desc.upper().elements(),
590 writer_upper.elements(),
591 shard_upper.elements(),
592 indeterminate,
593 );
594 }
595 };
596 }
597 }
598
599 pub async fn merge_res(
600 &self,
601 res: &FueledMergeRes<T>,
602 ) -> (ApplyMergeResult, RoutineMaintenance) {
603 let metrics = Arc::clone(&self.applier.metrics);
604
605 let mut merge_result_ever_applied = ApplyMergeResult::NotAppliedNoMatch;
634 let (_seqno, _apply_merge_result, maintenance) = self
635 .apply_unbatched_idempotent_cmd(&metrics.cmds.merge_res, |_, _, state| {
636 let ret = state.apply_merge_res(res);
637 if let Continue(result) = ret {
638 if result.applied() {
640 merge_result_ever_applied = result;
641 }
642 if result.matched() && !result.applied() && !merge_result_ever_applied.applied()
645 {
646 merge_result_ever_applied = result;
647 }
648 }
649 ret
650 })
651 .await;
652 (merge_result_ever_applied, maintenance)
653 }
654
655 pub async fn downgrade_since(
656 &self,
657 reader_id: &LeasedReaderId,
658 outstanding_seqno: Option<SeqNo>,
659 new_since: &Antichain<T>,
660 heartbeat_timestamp_ms: u64,
661 ) -> (SeqNo, Since<T>, RoutineMaintenance) {
662 let metrics = Arc::clone(&self.applier.metrics);
663 self.apply_unbatched_idempotent_cmd(&metrics.cmds.downgrade_since, |seqno, _cfg, state| {
664 state.downgrade_since(
665 reader_id,
666 seqno,
667 outstanding_seqno,
668 new_since,
669 heartbeat_timestamp_ms,
670 )
671 })
672 .await
673 }
674
675 pub async fn compare_and_downgrade_since<O: Opaque + Codec64>(
676 &self,
677 reader_id: &CriticalReaderId,
678 expected_opaque: &O,
679 (new_opaque, new_since): (&O, &Antichain<T>),
680 ) -> (Result<Since<T>, (O, Since<T>)>, RoutineMaintenance) {
681 let metrics = Arc::clone(&self.applier.metrics);
682 let (_seqno, res, maintenance) = self
683 .apply_unbatched_idempotent_cmd(
684 &metrics.cmds.compare_and_downgrade_since,
685 |_seqno, _cfg, state| {
686 state.compare_and_downgrade_since::<O>(
687 reader_id,
688 expected_opaque,
689 (new_opaque, new_since),
690 )
691 },
692 )
693 .await;
694
695 match res {
696 Ok(since) => (Ok(since), maintenance),
697 Err((opaque, since)) => (Err((opaque, since)), maintenance),
698 }
699 }
700
701 pub async fn heartbeat_leased_reader(
702 &self,
703 reader_id: &LeasedReaderId,
704 heartbeat_timestamp_ms: u64,
705 ) -> (SeqNo, bool, RoutineMaintenance) {
706 let metrics = Arc::clone(&self.applier.metrics);
707 let (seqno, existed, maintenance) = self
708 .apply_unbatched_idempotent_cmd(&metrics.cmds.heartbeat_reader, |_, _, state| {
709 state.heartbeat_leased_reader(reader_id, heartbeat_timestamp_ms)
710 })
711 .await;
712 (seqno, existed, maintenance)
713 }
714
715 pub async fn expire_leased_reader(
716 &self,
717 reader_id: &LeasedReaderId,
718 ) -> (SeqNo, RoutineMaintenance) {
719 let metrics = Arc::clone(&self.applier.metrics);
720 let (seqno, _existed, maintenance) = self
721 .apply_unbatched_idempotent_cmd(&metrics.cmds.expire_reader, |_, _, state| {
722 state.expire_leased_reader(reader_id)
723 })
724 .await;
725 (seqno, maintenance)
726 }
727
728 #[allow(dead_code)] pub async fn expire_critical_reader(
730 &self,
731 reader_id: &CriticalReaderId,
732 ) -> (SeqNo, RoutineMaintenance) {
733 let metrics = Arc::clone(&self.applier.metrics);
734 let (seqno, _existed, maintenance) = self
735 .apply_unbatched_idempotent_cmd(&metrics.cmds.expire_reader, |_, _, state| {
736 state.expire_critical_reader(reader_id)
737 })
738 .await;
739 (seqno, maintenance)
740 }
741
742 pub async fn expire_writer(&self, writer_id: &WriterId) -> (SeqNo, RoutineMaintenance) {
743 let metrics = Arc::clone(&self.applier.metrics);
744 let (seqno, _existed, maintenance) = self
745 .apply_unbatched_idempotent_cmd(&metrics.cmds.expire_writer, |_, _, state| {
746 state.expire_writer(writer_id)
747 })
748 .await;
749 metrics.state.writer_removed.inc();
750 (seqno, maintenance)
751 }
752
753 pub fn is_finalized(&self) -> bool {
754 self.applier.is_finalized()
755 }
756
757 pub fn get_schema(&self, schema_id: SchemaId) -> Option<(K::Schema, V::Schema)> {
759 self.applier.get_schema(schema_id)
760 }
761
762 pub fn latest_schema(&self) -> Option<(SchemaId, K::Schema, V::Schema)> {
764 self.applier.latest_schema()
765 }
766
767 pub async fn compare_and_evolve_schema(
771 &self,
772 expected: SchemaId,
773 key_schema: &K::Schema,
774 val_schema: &V::Schema,
775 ) -> (CaESchema<K, V>, RoutineMaintenance) {
776 let metrics = Arc::clone(&self.applier.metrics);
777 let (_seqno, state, maintenance) = self
778 .apply_unbatched_idempotent_cmd(
779 &metrics.cmds.compare_and_evolve_schema,
780 |_seqno, _cfg, state| {
781 state.compare_and_evolve_schema::<K, V>(expected, key_schema, val_schema)
782 },
783 )
784 .await;
785 (state, maintenance)
786 }
787
788 async fn tombstone_step(&self) -> Result<(bool, RoutineMaintenance), InvalidUsage<T>> {
789 let metrics = Arc::clone(&self.applier.metrics);
790 let mut retry = self
791 .applier
792 .metrics
793 .retries
794 .idempotent_cmd
795 .stream(Retry::persist_defaults(SystemTime::now()).into_retry_stream());
796 loop {
797 let res = self
798 .applier
799 .apply_unbatched_cmd(&metrics.cmds.become_tombstone, |_, _, state| {
800 state.become_tombstone_and_shrink()
801 })
802 .await;
803 let err = match res {
804 Ok((_seqno, Ok(()), maintenance)) => return Ok((true, maintenance)),
805 Ok((_seqno, Err(NoOpStateTransition(())), maintenance)) => {
806 return Ok((false, maintenance));
807 }
808 Err(err) => err,
809 };
810 if retry.attempt() >= INFO_MIN_ATTEMPTS {
811 info!(
812 "become_tombstone received an indeterminate error, retrying in {:?}: {}",
813 retry.next_sleep(),
814 err
815 );
816 } else {
817 debug!(
818 "become_tombstone received an indeterminate error, retrying in {:?}: {}",
819 retry.next_sleep(),
820 err
821 );
822 }
823 retry = retry.sleep().await;
824 }
825 }
826
827 pub async fn become_tombstone(&self) -> Result<RoutineMaintenance, InvalidUsage<T>> {
828 self.applier.check_since_upper_both_empty()?;
829
830 let mut maintenance = RoutineMaintenance::default();
831
832 loop {
833 let (made_progress, more_maintenance) = self.tombstone_step().await?;
834 maintenance.merge(more_maintenance);
835 if !made_progress {
836 break;
837 }
838 }
839
840 Ok(maintenance)
841 }
842
843 pub async fn snapshot(&self, as_of: &Antichain<T>) -> Result<Vec<HollowBatch<T>>, Since<T>> {
844 let start = Instant::now();
845 let (mut seqno, mut upper) = match self.applier.snapshot(as_of) {
846 Ok(x) => return Ok(x),
847 Err(SnapshotErr::AsOfNotYetAvailable(seqno, Upper(upper))) => (seqno, upper),
848 Err(SnapshotErr::AsOfHistoricalDistinctionsLost(Since(since))) => {
849 return Err(Since(since));
850 }
851 };
852
853 let mut watch = self.applier.watch();
856 let watch = &mut watch;
857 let sleeps = self
858 .applier
859 .metrics
860 .retries
861 .snapshot
862 .stream(Retry::persist_defaults(SystemTime::now()).into_retry_stream());
863
864 enum Wake<'a, K, V, T, D> {
865 Watch(&'a mut StateWatch<K, V, T, D>),
866 Sleep(MetricsRetryStream),
867 }
868 let mut watch_fut = std::pin::pin!(
869 watch
870 .wait_for_seqno_ge(seqno.next())
871 .map(Wake::Watch)
872 .instrument(trace_span!("snapshot::watch")),
873 );
874 let mut sleep_fut = std::pin::pin!(
875 sleeps
876 .sleep()
877 .map(Wake::Sleep)
878 .instrument(trace_span!("snapshot::sleep")),
879 );
880
881 let mut logged_at_info = false;
885 loop {
886 if !logged_at_info && start.elapsed() >= Duration::from_millis(1024) {
890 logged_at_info = true;
891 info!(
892 "snapshot {} {} as of {:?} not yet available for {} upper {:?}",
893 self.applier.shard_metrics.name,
894 self.shard_id(),
895 as_of.elements(),
896 seqno,
897 upper.elements(),
898 );
899 } else {
900 debug!(
901 "snapshot {} {} as of {:?} not yet available for {} upper {:?}",
902 self.applier.shard_metrics.name,
903 self.shard_id(),
904 as_of.elements(),
905 seqno,
906 upper.elements(),
907 );
908 }
909
910 let wake = match future::select(watch_fut.as_mut(), sleep_fut.as_mut()).await {
911 future::Either::Left((wake, _)) => wake,
912 future::Either::Right((wake, _)) => wake,
913 };
914 match &wake {
918 Wake::Watch(_) => self.applier.metrics.watch.snapshot_woken_via_watch.inc(),
919 Wake::Sleep(_) => {
920 self.applier.metrics.watch.snapshot_woken_via_sleep.inc();
921 self.applier.fetch_and_update_state(Some(seqno)).await;
922 }
923 }
924
925 (seqno, upper) = match self.applier.snapshot(as_of) {
926 Ok(x) => {
927 if logged_at_info {
928 info!(
929 "snapshot {} {} as of {:?} now available",
930 self.applier.shard_metrics.name,
931 self.shard_id(),
932 as_of.elements(),
933 );
934 }
935 return Ok(x);
936 }
937 Err(SnapshotErr::AsOfNotYetAvailable(seqno, Upper(upper))) => {
938 (seqno, upper)
940 }
941 Err(SnapshotErr::AsOfHistoricalDistinctionsLost(Since(since))) => {
942 return Err(Since(since));
943 }
944 };
945
946 match wake {
947 Wake::Watch(watch) => {
948 watch_fut.set(
949 watch
950 .wait_for_seqno_ge(seqno.next())
951 .map(Wake::Watch)
952 .instrument(trace_span!("snapshot::watch")),
953 );
954 }
955 Wake::Sleep(sleeps) => {
956 debug!(
957 "snapshot {} {} sleeping for {:?}",
958 self.applier.shard_metrics.name,
959 self.shard_id(),
960 sleeps.next_sleep()
961 );
962 sleep_fut.set(
963 sleeps
964 .sleep()
965 .map(Wake::Sleep)
966 .instrument(trace_span!("snapshot::sleep")),
967 );
968 }
969 }
970 }
971 }
972
973 pub fn verify_listen(&self, as_of: &Antichain<T>) -> Result<(), Since<T>> {
975 match self.applier.verify_listen(as_of) {
976 Ok(Ok(())) => Ok(()),
977 Ok(Err(Upper(_))) => {
978 Ok(())
984 }
985 Err(Since(since)) => Err(Since(since)),
986 }
987 }
988
989 pub async fn next_listen_batch(
990 &self,
991 frontier: &Antichain<T>,
992 watch: &mut StateWatch<K, V, T, D>,
993 reader_id: Option<&LeasedReaderId>,
994 retry: Option<RetryParameters>,
996 ) -> HollowBatch<T> {
997 let mut seqno = match self.applier.next_listen_batch(frontier) {
998 Ok(b) => return b,
999 Err(seqno) => seqno,
1000 };
1001
1002 let retry = retry.unwrap_or_else(|| next_listen_batch_retry_params(&self.applier.cfg));
1005 let sleeps = self
1006 .applier
1007 .metrics
1008 .retries
1009 .next_listen_batch
1010 .stream(retry.into_retry(SystemTime::now()).into_retry_stream());
1011
1012 enum Wake<'a, K, V, T, D> {
1013 Watch(&'a mut StateWatch<K, V, T, D>),
1014 Sleep(MetricsRetryStream),
1015 }
1016 let mut watch_fut = std::pin::pin!(
1017 watch
1018 .wait_for_seqno_ge(seqno.next())
1019 .map(Wake::Watch)
1020 .instrument(trace_span!("snapshot::watch"))
1021 );
1022 let mut sleep_fut = std::pin::pin!(
1023 sleeps
1024 .sleep()
1025 .map(Wake::Sleep)
1026 .instrument(trace_span!("snapshot::sleep"))
1027 );
1028
1029 loop {
1030 let wake = match future::select(watch_fut.as_mut(), sleep_fut.as_mut()).await {
1031 future::Either::Left((wake, _)) => wake,
1032 future::Either::Right((wake, _)) => wake,
1033 };
1034 match &wake {
1038 Wake::Watch(_) => self.applier.metrics.watch.listen_woken_via_watch.inc(),
1039 Wake::Sleep(_) => {
1040 self.applier.metrics.watch.listen_woken_via_sleep.inc();
1041 self.applier.fetch_and_update_state(Some(seqno)).await;
1042 }
1043 }
1044
1045 seqno = match self.applier.next_listen_batch(frontier) {
1046 Ok(b) => {
1047 match &wake {
1048 Wake::Watch(_) => {
1049 self.applier.metrics.watch.listen_resolved_via_watch.inc()
1050 }
1051 Wake::Sleep(_) => {
1052 self.applier.metrics.watch.listen_resolved_via_sleep.inc()
1053 }
1054 }
1055 return b;
1056 }
1057 Err(seqno) => seqno,
1058 };
1059
1060 match wake {
1063 Wake::Watch(watch) => {
1064 watch_fut.set(
1065 watch
1066 .wait_for_seqno_ge(seqno.next())
1067 .map(Wake::Watch)
1068 .instrument(trace_span!("snapshot::watch")),
1069 );
1070 }
1071 Wake::Sleep(sleeps) => {
1072 debug!(
1073 "{:?}: {} {} next_listen_batch didn't find new data, retrying in {:?}",
1074 reader_id,
1075 self.applier.shard_metrics.name,
1076 self.shard_id(),
1077 sleeps.next_sleep()
1078 );
1079 sleep_fut.set(
1080 sleeps
1081 .sleep()
1082 .map(Wake::Sleep)
1083 .instrument(trace_span!("snapshot::sleep")),
1084 );
1085 }
1086 }
1087 }
1088 }
1089
1090 async fn apply_unbatched_idempotent_cmd<
1091 R,
1092 WorkFn: FnMut(
1093 SeqNo,
1094 &PersistConfig,
1095 &mut StateCollections<T>,
1096 ) -> ControlFlow<NoOpStateTransition<R>, R>,
1097 >(
1098 &self,
1099 cmd: &CmdMetrics,
1100 mut work_fn: WorkFn,
1101 ) -> (SeqNo, R, RoutineMaintenance) {
1102 let mut retry = self
1103 .applier
1104 .metrics
1105 .retries
1106 .idempotent_cmd
1107 .stream(Retry::persist_defaults(SystemTime::now()).into_retry_stream());
1108 loop {
1109 match self.applier.apply_unbatched_cmd(cmd, &mut work_fn).await {
1110 Ok((seqno, x, maintenance)) => match x {
1111 Ok(x) => {
1112 return (seqno, x, maintenance);
1113 }
1114 Err(NoOpStateTransition(x)) => {
1115 return (seqno, x, maintenance);
1116 }
1117 },
1118 Err(err) => {
1119 if retry.attempt() >= INFO_MIN_ATTEMPTS {
1120 info!(
1121 "apply_unbatched_idempotent_cmd {} received an indeterminate error, retrying in {:?}: {}",
1122 cmd.name,
1123 retry.next_sleep(),
1124 err
1125 );
1126 } else {
1127 debug!(
1128 "apply_unbatched_idempotent_cmd {} received an indeterminate error, retrying in {:?}: {}",
1129 cmd.name,
1130 retry.next_sleep(),
1131 err
1132 );
1133 }
1134 retry = retry.sleep().await;
1135 continue;
1136 }
1137 }
1138 }
1139 }
1140}
1141
1142pub(crate) struct ExpireFn(
1143 pub(crate) Box<dyn FnOnce() -> BoxFuture<'static, ()> + Send + Sync + 'static>,
1148);
1149
1150impl Debug for ExpireFn {
1151 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1152 f.debug_struct("ExpireFn").finish_non_exhaustive()
1153 }
1154}
1155
1156#[derive(Debug)]
1157pub(crate) enum CompareAndAppendRes<T> {
1158 Success(SeqNo, WriterMaintenance<T>),
1159 InvalidUsage(InvalidUsage<T>),
1160 UpperMismatch(SeqNo, Antichain<T>),
1161 InlineBackpressure,
1162}
1163
1164#[cfg(test)]
1165impl<T: Debug> CompareAndAppendRes<T> {
1166 #[track_caller]
1167 fn unwrap(self) -> (SeqNo, WriterMaintenance<T>) {
1168 match self {
1169 CompareAndAppendRes::Success(seqno, maintenance) => (seqno, maintenance),
1170 x => panic!("{:?}", x),
1171 }
1172 }
1173}
1174
1175impl<K, V, T, D> Machine<K, V, T, D>
1176where
1177 K: Debug + Codec,
1178 V: Debug + Codec,
1179 T: Timestamp + Lattice + Codec64 + Sync,
1180 D: Semigroup + Codec64 + Send + Sync,
1181{
1182 #[allow(clippy::unused_async)]
1183 pub async fn start_reader_heartbeat_tasks(
1184 self,
1185 reader_id: LeasedReaderId,
1186 gc: GarbageCollector<K, V, T, D>,
1187 ) -> Vec<JoinHandle<()>> {
1188 let mut ret = Vec::new();
1189 let metrics = Arc::clone(&self.applier.metrics);
1190
1191 let name = format!("persist::heartbeat_read({},{})", self.shard_id(), reader_id);
1203 ret.push(mz_ore::task::spawn(|| name, {
1204 let machine = self.clone();
1205 let reader_id = reader_id.clone();
1206 let gc = gc.clone();
1207 metrics
1208 .tasks
1209 .heartbeat_read
1210 .instrument_task(Self::reader_heartbeat_task(machine, reader_id, gc))
1211 }));
1212
1213 let isolated_runtime = Arc::clone(&self.isolated_runtime);
1214 let name = format!(
1215 "persist::heartbeat_read_isolated({},{})",
1216 self.shard_id(),
1217 reader_id
1218 );
1219 ret.push(
1220 isolated_runtime.spawn_named(
1221 || name,
1222 metrics
1223 .tasks
1224 .heartbeat_read
1225 .instrument_task(Self::reader_heartbeat_task(self, reader_id, gc)),
1226 ),
1227 );
1228
1229 ret
1230 }
1231
1232 async fn reader_heartbeat_task(
1233 machine: Self,
1234 reader_id: LeasedReaderId,
1235 gc: GarbageCollector<K, V, T, D>,
1236 ) {
1237 let sleep_duration = READER_LEASE_DURATION.get(&machine.applier.cfg) / 2;
1238 loop {
1239 let before_sleep = Instant::now();
1240 tokio::time::sleep(sleep_duration).await;
1241
1242 let elapsed_since_before_sleeping = before_sleep.elapsed();
1243 if elapsed_since_before_sleeping > sleep_duration + Duration::from_secs(60) {
1244 warn!(
1245 "reader ({}) of shard ({}) went {}s between heartbeats",
1246 reader_id,
1247 machine.shard_id(),
1248 elapsed_since_before_sleeping.as_secs_f64()
1249 );
1250 }
1251
1252 let before_heartbeat = Instant::now();
1253 let (_seqno, existed, maintenance) = machine
1254 .heartbeat_leased_reader(&reader_id, (machine.applier.cfg.now)())
1255 .await;
1256 maintenance.start_performing(&machine, &gc);
1257
1258 let elapsed_since_heartbeat = before_heartbeat.elapsed();
1259 if elapsed_since_heartbeat > Duration::from_secs(60) {
1260 warn!(
1261 "reader ({}) of shard ({}) heartbeat call took {}s",
1262 reader_id,
1263 machine.shard_id(),
1264 elapsed_since_heartbeat.as_secs_f64(),
1265 );
1266 }
1267
1268 if !existed {
1269 warn!(
1277 "heartbeat task for reader ({}) of shard ({}) exiting due to expired lease \
1278 while read handle is live",
1279 reader_id,
1280 machine.shard_id(),
1281 );
1282 return;
1283 }
1284 }
1285 }
1286}
1287
1288pub(crate) const NEXT_LISTEN_BATCH_RETRYER_FIXED_SLEEP: Config<Duration> = Config::new(
1289 "persist_next_listen_batch_retryer_fixed_sleep",
1290 Duration::from_millis(1200), "\
1292 The fixed sleep when polling for new batches from a Listen or Subscribe. Skipped if zero.",
1293);
1294
1295pub(crate) const NEXT_LISTEN_BATCH_RETRYER_INITIAL_BACKOFF: Config<Duration> = Config::new(
1296 "persist_next_listen_batch_retryer_initial_backoff",
1297 Duration::from_millis(100), "The initial backoff when polling for new batches from a Listen or Subscribe.",
1299);
1300
1301pub(crate) const NEXT_LISTEN_BATCH_RETRYER_MULTIPLIER: Config<u32> = Config::new(
1302 "persist_next_listen_batch_retryer_multiplier",
1303 2,
1304 "The backoff multiplier when polling for new batches from a Listen or Subscribe.",
1305);
1306
1307pub(crate) const NEXT_LISTEN_BATCH_RETRYER_CLAMP: Config<Duration> = Config::new(
1308 "persist_next_listen_batch_retryer_clamp",
1309 Duration::from_secs(16), "The backoff clamp duration when polling for new batches from a Listen or Subscribe.",
1311);
1312
1313fn next_listen_batch_retry_params(cfg: &ConfigSet) -> RetryParameters {
1314 RetryParameters {
1315 fixed_sleep: NEXT_LISTEN_BATCH_RETRYER_FIXED_SLEEP.get(cfg),
1316 initial_backoff: NEXT_LISTEN_BATCH_RETRYER_INITIAL_BACKOFF.get(cfg),
1317 multiplier: NEXT_LISTEN_BATCH_RETRYER_MULTIPLIER.get(cfg),
1318 clamp: NEXT_LISTEN_BATCH_RETRYER_CLAMP.get(cfg),
1319 }
1320}
1321
1322pub const INFO_MIN_ATTEMPTS: usize = 3;
1323
1324pub async fn retry_external<R, F, WorkFn>(metrics: &RetryMetrics, mut work_fn: WorkFn) -> R
1325where
1326 F: std::future::Future<Output = Result<R, ExternalError>>,
1327 WorkFn: FnMut() -> F,
1328{
1329 let mut retry = metrics.stream(Retry::persist_defaults(SystemTime::now()).into_retry_stream());
1330 loop {
1331 match work_fn().await {
1332 Ok(x) => {
1333 if retry.attempt() > 0 {
1334 debug!(
1335 "external operation {} succeeded after failing at least once",
1336 metrics.name,
1337 );
1338 }
1339 return x;
1340 }
1341 Err(err) => {
1342 if retry.attempt() >= INFO_MIN_ATTEMPTS {
1343 info!(
1344 "external operation {} failed, retrying in {:?}: {}",
1345 metrics.name,
1346 retry.next_sleep(),
1347 err.display_with_causes()
1348 );
1349 } else {
1350 debug!(
1351 "external operation {} failed, retrying in {:?}: {}",
1352 metrics.name,
1353 retry.next_sleep(),
1354 err.display_with_causes()
1355 );
1356 }
1357 retry = retry.sleep().await;
1358 }
1359 }
1360 }
1361}
1362
1363pub async fn retry_determinate<R, F, WorkFn>(
1364 metrics: &RetryMetrics,
1365 mut work_fn: WorkFn,
1366) -> Result<R, Indeterminate>
1367where
1368 F: std::future::Future<Output = Result<R, ExternalError>>,
1369 WorkFn: FnMut() -> F,
1370{
1371 let mut retry = metrics.stream(Retry::persist_defaults(SystemTime::now()).into_retry_stream());
1372 loop {
1373 match work_fn().await {
1374 Ok(x) => {
1375 if retry.attempt() > 0 {
1376 debug!(
1377 "external operation {} succeeded after failing at least once",
1378 metrics.name,
1379 );
1380 }
1381 return Ok(x);
1382 }
1383 Err(ExternalError::Determinate(err)) => {
1384 debug!(
1393 "external operation {} failed, retrying in {:?}: {}",
1394 metrics.name,
1395 retry.next_sleep(),
1396 err.display_with_causes()
1397 );
1398 retry = retry.sleep().await;
1399 continue;
1400 }
1401 Err(ExternalError::Indeterminate(x)) => return Err(x),
1402 }
1403 }
1404}
1405
1406#[cfg(test)]
1407pub mod datadriven {
1408 use std::collections::BTreeMap;
1409 use std::pin::pin;
1410 use std::sync::{Arc, LazyLock};
1411
1412 use anyhow::anyhow;
1413 use differential_dataflow::consolidation::consolidate_updates;
1414 use differential_dataflow::trace::Description;
1415 use futures::StreamExt;
1416 use mz_dyncfg::{ConfigUpdates, ConfigVal};
1417 use mz_persist::indexed::encoding::BlobTraceBatchPart;
1418 use mz_persist_types::codec_impls::{StringSchema, UnitSchema};
1419
1420 use crate::batch::{
1421 BLOB_TARGET_SIZE, Batch, BatchBuilder, BatchBuilderConfig, BatchBuilderInternal,
1422 BatchParts, validate_truncate_batch,
1423 };
1424 use crate::cfg::COMPACTION_MEMORY_BOUND_BYTES;
1425 use crate::fetch::EncodedPart;
1426 use crate::internal::compact::{CompactConfig, CompactReq, Compactor};
1427 use crate::internal::datadriven::DirectiveArgs;
1428 use crate::internal::encoding::Schemas;
1429 use crate::internal::gc::GcReq;
1430 use crate::internal::paths::{BlobKey, BlobKeyPrefix, PartialBlobKey};
1431 use crate::internal::state::{BatchPart, RunOrder, RunPart};
1432 use crate::internal::state_versions::EncodedRollup;
1433 use crate::read::{Listen, ListenEvent, READER_LEASE_DURATION};
1434 use crate::rpc::NoopPubSubSender;
1435 use crate::tests::new_test_client;
1436 use crate::write::COMBINE_INLINE_WRITES;
1437 use crate::{GarbageCollector, PersistClient};
1438
1439 use super::*;
1440
1441 static SCHEMAS: LazyLock<Schemas<String, ()>> = LazyLock::new(|| Schemas {
1442 id: Some(SchemaId(0)),
1443 key: Arc::new(StringSchema),
1444 val: Arc::new(UnitSchema),
1445 });
1446
1447 #[derive(Debug)]
1449 pub struct MachineState {
1450 pub client: PersistClient,
1451 pub shard_id: ShardId,
1452 pub state_versions: Arc<StateVersions>,
1453 pub machine: Machine<String, (), u64, i64>,
1454 pub gc: GarbageCollector<String, (), u64, i64>,
1455 pub batches: BTreeMap<String, HollowBatch<u64>>,
1456 pub rollups: BTreeMap<String, EncodedRollup>,
1457 pub listens: BTreeMap<String, Listen<String, (), u64, i64>>,
1458 pub routine: Vec<RoutineMaintenance>,
1459 }
1460
1461 impl MachineState {
1462 pub async fn new(dyncfgs: &ConfigUpdates) -> Self {
1463 let shard_id = ShardId::new();
1464 let client = new_test_client(dyncfgs).await;
1465 client
1468 .cfg
1469 .set_config(&BLOB_TARGET_SIZE, *BLOB_TARGET_SIZE.default());
1470 client.cfg.set_config(&COMBINE_INLINE_WRITES, false);
1474 let state_versions = Arc::new(StateVersions::new(
1475 client.cfg.clone(),
1476 Arc::clone(&client.consensus),
1477 Arc::clone(&client.blob),
1478 Arc::clone(&client.metrics),
1479 ));
1480 let machine = Machine::new(
1481 client.cfg.clone(),
1482 shard_id,
1483 Arc::clone(&client.metrics),
1484 Arc::clone(&state_versions),
1485 Arc::clone(&client.shared_states),
1486 Arc::new(NoopPubSubSender),
1487 Arc::clone(&client.isolated_runtime),
1488 Diagnostics::for_tests(),
1489 )
1490 .await
1491 .expect("codecs should match");
1492 let gc = GarbageCollector::new(machine.clone(), Arc::clone(&client.isolated_runtime));
1493 MachineState {
1494 shard_id,
1495 client,
1496 state_versions,
1497 machine,
1498 gc,
1499 batches: BTreeMap::default(),
1500 rollups: BTreeMap::default(),
1501 listens: BTreeMap::default(),
1502 routine: Vec::new(),
1503 }
1504 }
1505
1506 fn to_batch(&self, hollow: HollowBatch<u64>) -> Batch<String, (), u64, i64> {
1507 Batch::new(
1508 true,
1509 Arc::clone(&self.client.metrics),
1510 Arc::clone(&self.client.blob),
1511 self.client.metrics.shards.shard(&self.shard_id, "test"),
1512 self.client.cfg.build_version.clone(),
1513 hollow,
1514 )
1515 }
1516 }
1517
1518 pub async fn consensus_scan(
1521 datadriven: &MachineState,
1522 args: DirectiveArgs<'_>,
1523 ) -> Result<String, anyhow::Error> {
1524 let from = args.expect("from_seqno");
1525
1526 let mut states = datadriven
1527 .state_versions
1528 .fetch_all_live_states::<u64>(datadriven.shard_id)
1529 .await
1530 .expect("should only be called on an initialized shard")
1531 .check_ts_codec()
1532 .expect("shard codecs should not change");
1533 let mut s = String::new();
1534 while let Some(x) = states.next(|_| {}) {
1535 if x.seqno < from {
1536 continue;
1537 }
1538 let rollups: Vec<_> = x
1539 .collections
1540 .rollups
1541 .keys()
1542 .map(|seqno| seqno.to_string())
1543 .collect();
1544 let batches: Vec<_> = x
1545 .collections
1546 .trace
1547 .batches()
1548 .filter(|b| !b.is_empty())
1549 .filter_map(|b| {
1550 datadriven
1551 .batches
1552 .iter()
1553 .find(|(_, original_batch)| original_batch.parts == b.parts)
1554 .map(|(batch_name, _)| batch_name.to_owned())
1555 })
1556 .collect();
1557 write!(
1558 s,
1559 "seqno={} batches={} rollups={}\n",
1560 x.seqno,
1561 batches.join(","),
1562 rollups.join(","),
1563 );
1564 }
1565 Ok(s)
1566 }
1567
1568 pub async fn consensus_truncate(
1569 datadriven: &MachineState,
1570 args: DirectiveArgs<'_>,
1571 ) -> Result<String, anyhow::Error> {
1572 let to = args.expect("to_seqno");
1573 let removed = datadriven
1574 .client
1575 .consensus
1576 .truncate(&datadriven.shard_id.to_string(), to)
1577 .await
1578 .expect("valid truncation");
1579 Ok(format!("{}\n", removed))
1580 }
1581
1582 pub async fn blob_scan_batches(
1583 datadriven: &MachineState,
1584 _args: DirectiveArgs<'_>,
1585 ) -> Result<String, anyhow::Error> {
1586 let key_prefix = BlobKeyPrefix::Shard(&datadriven.shard_id).to_string();
1587
1588 let mut s = String::new();
1589 let () = datadriven
1590 .state_versions
1591 .blob
1592 .list_keys_and_metadata(&key_prefix, &mut |x| {
1593 let (_, key) = BlobKey::parse_ids(x.key).expect("key should be valid");
1594 if let PartialBlobKey::Batch(_, _) = key {
1595 write!(s, "{}: {}b\n", x.key, x.size_in_bytes);
1596 }
1597 })
1598 .await?;
1599 Ok(s)
1600 }
1601
1602 #[allow(clippy::unused_async)]
1603 pub async fn shard_desc(
1604 datadriven: &MachineState,
1605 _args: DirectiveArgs<'_>,
1606 ) -> Result<String, anyhow::Error> {
1607 Ok(format!(
1608 "since={:?} upper={:?}\n",
1609 datadriven.machine.applier.since().elements(),
1610 datadriven.machine.applier.clone_upper().elements()
1611 ))
1612 }
1613
1614 pub async fn downgrade_since(
1615 datadriven: &mut MachineState,
1616 args: DirectiveArgs<'_>,
1617 ) -> Result<String, anyhow::Error> {
1618 let since = args.expect_antichain("since");
1619 let seqno = args.optional("seqno");
1620 let reader_id = args.expect("reader_id");
1621 let (_, since, routine) = datadriven
1622 .machine
1623 .downgrade_since(
1624 &reader_id,
1625 seqno,
1626 &since,
1627 (datadriven.machine.applier.cfg.now)(),
1628 )
1629 .await;
1630 datadriven.routine.push(routine);
1631 Ok(format!(
1632 "{} {:?}\n",
1633 datadriven.machine.seqno(),
1634 since.0.elements()
1635 ))
1636 }
1637
1638 #[allow(clippy::unused_async)]
1639 pub async fn dyncfg(
1640 datadriven: &MachineState,
1641 args: DirectiveArgs<'_>,
1642 ) -> Result<String, anyhow::Error> {
1643 let mut updates = ConfigUpdates::default();
1644 for x in args.input.trim().split('\n') {
1645 match x.split(' ').collect::<Vec<_>>().as_slice() {
1646 &[name, val] => {
1647 let config = datadriven
1648 .client
1649 .cfg
1650 .entries()
1651 .find(|x| x.name() == name)
1652 .ok_or_else(|| anyhow!("unknown dyncfg: {}", name))?;
1653 match config.val() {
1654 ConfigVal::Usize(_) => {
1655 let val = val.parse().map_err(anyhow::Error::new)?;
1656 updates.add_dynamic(name, ConfigVal::Usize(val));
1657 }
1658 ConfigVal::Bool(_) => {
1659 let val = val.parse().map_err(anyhow::Error::new)?;
1660 updates.add_dynamic(name, ConfigVal::Bool(val));
1661 }
1662 x => unimplemented!("dyncfg type: {:?}", x),
1663 }
1664 }
1665 x => return Err(anyhow!("expected `name val` got: {:?}", x)),
1666 }
1667 }
1668 updates.apply(&datadriven.client.cfg);
1669
1670 Ok("ok\n".to_string())
1671 }
1672
1673 pub async fn compare_and_downgrade_since(
1674 datadriven: &mut MachineState,
1675 args: DirectiveArgs<'_>,
1676 ) -> Result<String, anyhow::Error> {
1677 let expected_opaque: u64 = args.expect("expect_opaque");
1678 let new_opaque: u64 = args.expect("opaque");
1679 let new_since = args.expect_antichain("since");
1680 let reader_id = args.expect("reader_id");
1681 let (res, routine) = datadriven
1682 .machine
1683 .compare_and_downgrade_since(&reader_id, &expected_opaque, (&new_opaque, &new_since))
1684 .await;
1685 datadriven.routine.push(routine);
1686 let since = res.map_err(|(opaque, since)| {
1687 anyhow!("mismatch: opaque={} since={:?}", opaque, since.0.elements())
1688 })?;
1689 Ok(format!(
1690 "{} {} {:?}\n",
1691 datadriven.machine.seqno(),
1692 new_opaque,
1693 since.0.elements()
1694 ))
1695 }
1696
1697 pub async fn write_rollup(
1698 datadriven: &mut MachineState,
1699 args: DirectiveArgs<'_>,
1700 ) -> Result<String, anyhow::Error> {
1701 let output = args.expect_str("output");
1702
1703 let rollup = datadriven
1704 .machine
1705 .applier
1706 .write_rollup_for_state()
1707 .await
1708 .expect("rollup");
1709
1710 datadriven
1711 .rollups
1712 .insert(output.to_string(), rollup.clone());
1713
1714 Ok(format!(
1715 "state={} diffs=[{}, {})\n",
1716 rollup.seqno,
1717 rollup._desc.lower().first().expect("seqno"),
1718 rollup._desc.upper().first().expect("seqno"),
1719 ))
1720 }
1721
1722 pub async fn add_rollup(
1723 datadriven: &mut MachineState,
1724 args: DirectiveArgs<'_>,
1725 ) -> Result<String, anyhow::Error> {
1726 let input = args.expect_str("input");
1727 let rollup = datadriven
1728 .rollups
1729 .get(input)
1730 .expect("unknown batch")
1731 .clone();
1732
1733 let (applied, maintenance) = datadriven
1734 .machine
1735 .add_rollup((rollup.seqno, &rollup.to_hollow()))
1736 .await;
1737
1738 if !applied {
1739 return Err(anyhow!("failed to apply rollup for: {}", rollup.seqno));
1740 }
1741
1742 datadriven.routine.push(maintenance);
1743 Ok(format!("{}\n", datadriven.machine.seqno()))
1744 }
1745
1746 pub async fn write_batch(
1747 datadriven: &mut MachineState,
1748 args: DirectiveArgs<'_>,
1749 ) -> Result<String, anyhow::Error> {
1750 let output = args.expect_str("output");
1751 let lower = args.expect_antichain("lower");
1752 let upper = args.expect_antichain("upper");
1753 assert!(PartialOrder::less_than(&lower, &upper));
1754 let since = args
1755 .optional_antichain("since")
1756 .unwrap_or_else(|| Antichain::from_elem(0));
1757 let target_size = args.optional("target_size");
1758 let parts_size_override = args.optional("parts_size_override");
1759 let consolidate = args.optional("consolidate").unwrap_or(true);
1760 let mut updates: Vec<_> = args
1761 .input
1762 .split('\n')
1763 .flat_map(DirectiveArgs::parse_update)
1764 .collect();
1765
1766 let mut cfg = BatchBuilderConfig::new(&datadriven.client.cfg, datadriven.shard_id);
1767 if let Some(target_size) = target_size {
1768 cfg.blob_target_size = target_size;
1769 };
1770 if consolidate {
1771 consolidate_updates(&mut updates);
1772 }
1773 let run_order = if consolidate {
1774 cfg.preferred_order
1775 } else {
1776 RunOrder::Unordered
1777 };
1778 let parts = BatchParts::new_ordered(
1779 cfg.clone(),
1780 run_order,
1781 Arc::clone(&datadriven.client.metrics),
1782 Arc::clone(&datadriven.machine.applier.shard_metrics),
1783 datadriven.shard_id,
1784 Arc::clone(&datadriven.client.blob),
1785 Arc::clone(&datadriven.client.isolated_runtime),
1786 &datadriven.client.metrics.user,
1787 );
1788 let builder = BatchBuilderInternal::new(
1789 cfg.clone(),
1790 parts,
1791 Arc::clone(&datadriven.client.metrics),
1792 SCHEMAS.clone(),
1793 Arc::clone(&datadriven.client.blob),
1794 datadriven.shard_id.clone(),
1795 datadriven.client.cfg.build_version.clone(),
1796 );
1797 let mut builder = BatchBuilder::new(builder, Description::new(lower, upper.clone(), since));
1798 for ((k, ()), t, d) in updates {
1799 builder.add(&k, &(), &t, &d).await.expect("invalid batch");
1800 }
1801 let mut batch = builder.finish(upper).await?;
1802 if parts_size_override.is_some() {
1805 batch
1806 .flush_to_blob(
1807 &cfg,
1808 &datadriven.client.metrics.user,
1809 &datadriven.client.isolated_runtime,
1810 &SCHEMAS,
1811 )
1812 .await;
1813 }
1814 let batch = batch.into_hollow_batch();
1815
1816 if let Some(size) = parts_size_override {
1817 let mut batch = batch.clone();
1818 for part in batch.parts.iter_mut() {
1819 match part {
1820 RunPart::Many(run) => run.max_part_bytes = size,
1821 RunPart::Single(BatchPart::Hollow(part)) => part.encoded_size_bytes = size,
1822 RunPart::Single(BatchPart::Inline { .. }) => unreachable!("flushed out above"),
1823 }
1824 }
1825 datadriven.batches.insert(output.to_owned(), batch);
1826 } else {
1827 datadriven.batches.insert(output.to_owned(), batch.clone());
1828 }
1829 Ok(format!("parts={} len={}\n", batch.part_count(), batch.len))
1830 }
1831
1832 pub async fn fetch_batch(
1833 datadriven: &MachineState,
1834 args: DirectiveArgs<'_>,
1835 ) -> Result<String, anyhow::Error> {
1836 let input = args.expect_str("input");
1837 let stats = args.optional_str("stats");
1838 let batch = datadriven.batches.get(input).expect("unknown batch");
1839
1840 let mut s = String::new();
1841 let mut stream = pin!(
1842 batch
1843 .part_stream(
1844 datadriven.shard_id,
1845 &*datadriven.state_versions.blob,
1846 &*datadriven.state_versions.metrics
1847 )
1848 .enumerate()
1849 );
1850 while let Some((idx, part)) = stream.next().await {
1851 let part = &*part?;
1852 write!(s, "<part {idx}>\n");
1853
1854 let lower = match part {
1855 BatchPart::Inline { updates, .. } => {
1856 let updates: BlobTraceBatchPart<u64> =
1857 updates.decode(&datadriven.client.metrics.columnar)?;
1858 updates.structured_key_lower()
1859 }
1860 other => other.structured_key_lower(),
1861 };
1862
1863 if let Some(lower) = lower {
1864 if stats == Some("lower") {
1865 writeln!(s, "<key lower={}>", lower.get())
1866 }
1867 }
1868
1869 match part {
1870 BatchPart::Hollow(part) => {
1871 let blob_batch = datadriven
1872 .client
1873 .blob
1874 .get(&part.key.complete(&datadriven.shard_id))
1875 .await;
1876 match blob_batch {
1877 Ok(Some(_)) | Err(_) => {}
1878 Ok(None) => {
1881 s.push_str("<empty>\n");
1882 continue;
1883 }
1884 };
1885 }
1886 BatchPart::Inline { .. } => {}
1887 };
1888 let part = EncodedPart::fetch(
1889 &datadriven.shard_id,
1890 datadriven.client.blob.as_ref(),
1891 datadriven.client.metrics.as_ref(),
1892 datadriven.machine.applier.shard_metrics.as_ref(),
1893 &datadriven.client.metrics.read.batch_fetcher,
1894 &batch.desc,
1895 part,
1896 )
1897 .await
1898 .expect("invalid batch part");
1899 let part = part
1900 .normalize(&datadriven.client.metrics.columnar)
1901 .into_part::<String, ()>(&*SCHEMAS.key, &*SCHEMAS.val);
1902
1903 for ((k, _v), t, d) in part
1904 .decode_iter::<_, _, u64, i64>(&*SCHEMAS.key, &*SCHEMAS.val)
1905 .expect("valid schemas")
1906 {
1907 writeln!(s, "{k} {t} {d}");
1908 }
1909 }
1910 if !s.is_empty() {
1911 for (idx, (_meta, run)) in batch.runs().enumerate() {
1912 write!(s, "<run {idx}>\n");
1913 for part in run {
1914 let part_idx = batch
1915 .parts
1916 .iter()
1917 .position(|p| p == part)
1918 .expect("part should exist");
1919 write!(s, "part {part_idx}\n");
1920 }
1921 }
1922 }
1923 Ok(s)
1924 }
1925
1926 #[allow(clippy::unused_async)]
1927 pub async fn truncate_batch_desc(
1928 datadriven: &mut MachineState,
1929 args: DirectiveArgs<'_>,
1930 ) -> Result<String, anyhow::Error> {
1931 let input = args.expect_str("input");
1932 let output = args.expect_str("output");
1933 let lower = args.expect_antichain("lower");
1934 let upper = args.expect_antichain("upper");
1935
1936 let mut batch = datadriven
1937 .batches
1938 .get(input)
1939 .expect("unknown batch")
1940 .clone();
1941 let truncated_desc = Description::new(lower, upper, batch.desc.since().clone());
1942 let () = validate_truncate_batch(&batch, &truncated_desc, false)?;
1943 batch.desc = truncated_desc;
1944 datadriven.batches.insert(output.to_owned(), batch.clone());
1945 Ok(format!("parts={} len={}\n", batch.part_count(), batch.len))
1946 }
1947
1948 #[allow(clippy::unused_async)]
1949 pub async fn set_batch_parts_size(
1950 datadriven: &mut MachineState,
1951 args: DirectiveArgs<'_>,
1952 ) -> Result<String, anyhow::Error> {
1953 let input = args.expect_str("input");
1954 let size = args.expect("size");
1955 let batch = datadriven.batches.get_mut(input).expect("unknown batch");
1956 for part in batch.parts.iter_mut() {
1957 match part {
1958 RunPart::Single(BatchPart::Hollow(x)) => x.encoded_size_bytes = size,
1959 _ => {
1960 panic!("set_batch_parts_size only supports hollow parts")
1961 }
1962 }
1963 }
1964 Ok("ok\n".to_string())
1965 }
1966
1967 pub async fn compact(
1968 datadriven: &mut MachineState,
1969 args: DirectiveArgs<'_>,
1970 ) -> Result<String, anyhow::Error> {
1971 let output = args.expect_str("output");
1972 let lower = args.expect_antichain("lower");
1973 let upper = args.expect_antichain("upper");
1974 let since = args.expect_antichain("since");
1975 let target_size = args.optional("target_size");
1976 let memory_bound = args.optional("memory_bound");
1977
1978 let mut inputs = Vec::new();
1979 for input in args.args.get("inputs").expect("missing inputs") {
1980 inputs.push(
1981 datadriven
1982 .batches
1983 .get(input)
1984 .expect("unknown batch")
1985 .clone(),
1986 );
1987 }
1988
1989 let cfg = datadriven.client.cfg.clone();
1990 if let Some(target_size) = target_size {
1991 cfg.set_config(&BLOB_TARGET_SIZE, target_size);
1992 };
1993 if let Some(memory_bound) = memory_bound {
1994 cfg.set_config(&COMPACTION_MEMORY_BOUND_BYTES, memory_bound);
1995 }
1996 let req = CompactReq {
1997 shard_id: datadriven.shard_id,
1998 desc: Description::new(lower, upper, since),
1999 inputs,
2000 };
2001
2002 let req_clone = req.clone();
2003 let stream = Compactor::<String, (), u64, i64>::compact_stream(
2004 CompactConfig::new(&cfg, datadriven.shard_id),
2005 Arc::clone(&datadriven.client.blob),
2006 Arc::clone(&datadriven.client.metrics),
2007 Arc::clone(&datadriven.machine.applier.shard_metrics),
2008 Arc::clone(&datadriven.client.isolated_runtime),
2009 req_clone,
2010 SCHEMAS.clone(),
2011 );
2012
2013 let res = Compactor::<String, (), u64, i64>::compact_all(stream, req.clone()).await?;
2014
2015 datadriven
2016 .batches
2017 .insert(output.to_owned(), res.output.clone());
2018 Ok(format!(
2019 "parts={} len={}\n",
2020 res.output.part_count(),
2021 res.output.len
2022 ))
2023 }
2024
2025 pub async fn clear_blob(
2026 datadriven: &MachineState,
2027 _args: DirectiveArgs<'_>,
2028 ) -> Result<String, anyhow::Error> {
2029 let mut to_delete = vec![];
2030 datadriven
2031 .client
2032 .blob
2033 .list_keys_and_metadata("", &mut |meta| {
2034 to_delete.push(meta.key.to_owned());
2035 })
2036 .await?;
2037 for blob in &to_delete {
2038 datadriven.client.blob.delete(blob).await?;
2039 }
2040 Ok(format!("deleted={}\n", to_delete.len()))
2041 }
2042
2043 pub async fn restore_blob(
2044 datadriven: &MachineState,
2045 _args: DirectiveArgs<'_>,
2046 ) -> Result<String, anyhow::Error> {
2047 let not_restored = crate::internal::restore::restore_blob(
2048 &datadriven.state_versions,
2049 datadriven.client.blob.as_ref(),
2050 &datadriven.client.cfg.build_version,
2051 datadriven.shard_id,
2052 &*datadriven.state_versions.metrics,
2053 )
2054 .await?;
2055 let mut out = String::new();
2056 for key in not_restored {
2057 writeln!(&mut out, "{key}");
2058 }
2059 Ok(out)
2060 }
2061
2062 #[allow(clippy::unused_async)]
2063 pub async fn rewrite_ts(
2064 datadriven: &mut MachineState,
2065 args: DirectiveArgs<'_>,
2066 ) -> Result<String, anyhow::Error> {
2067 let input = args.expect_str("input");
2068 let ts_rewrite = args.expect_antichain("frontier");
2069 let upper = args.expect_antichain("upper");
2070
2071 let batch = datadriven.batches.get_mut(input).expect("unknown batch");
2072 let () = batch
2073 .rewrite_ts(&ts_rewrite, upper)
2074 .map_err(|err| anyhow!("invalid rewrite: {}", err))?;
2075 Ok("ok\n".into())
2076 }
2077
2078 pub async fn gc(
2079 datadriven: &mut MachineState,
2080 args: DirectiveArgs<'_>,
2081 ) -> Result<String, anyhow::Error> {
2082 let new_seqno_since = args.expect("to_seqno");
2083
2084 let req = GcReq {
2085 shard_id: datadriven.shard_id,
2086 new_seqno_since,
2087 };
2088 let (maintenance, stats) =
2089 GarbageCollector::gc_and_truncate(&datadriven.machine, req).await;
2090 datadriven.routine.push(maintenance);
2091
2092 Ok(format!(
2093 "{} batch_parts={} rollups={} truncated={} state_rollups={}\n",
2094 datadriven.machine.seqno(),
2095 stats.batch_parts_deleted_from_blob,
2096 stats.rollups_deleted_from_blob,
2097 stats
2098 .truncated_consensus_to
2099 .iter()
2100 .map(|x| x.to_string())
2101 .collect::<Vec<_>>()
2102 .join(","),
2103 stats
2104 .rollups_removed_from_state
2105 .iter()
2106 .map(|x| x.to_string())
2107 .collect::<Vec<_>>()
2108 .join(","),
2109 ))
2110 }
2111
2112 pub async fn snapshot(
2113 datadriven: &MachineState,
2114 args: DirectiveArgs<'_>,
2115 ) -> Result<String, anyhow::Error> {
2116 let as_of = args.expect_antichain("as_of");
2117 let snapshot = datadriven
2118 .machine
2119 .snapshot(&as_of)
2120 .await
2121 .map_err(|err| anyhow!("{:?}", err))?;
2122
2123 let mut result = String::new();
2124
2125 for batch in snapshot {
2126 writeln!(
2127 result,
2128 "<batch {:?}-{:?}>",
2129 batch.desc.lower().elements(),
2130 batch.desc.upper().elements()
2131 );
2132 for (run, (_meta, parts)) in batch.runs().enumerate() {
2133 writeln!(result, "<run {run}>");
2134 let mut stream = pin!(
2135 futures::stream::iter(parts)
2136 .flat_map(|part| part.part_stream(
2137 datadriven.shard_id,
2138 &*datadriven.state_versions.blob,
2139 &*datadriven.state_versions.metrics
2140 ))
2141 .enumerate()
2142 );
2143
2144 while let Some((idx, part)) = stream.next().await {
2145 let part = &*part?;
2146 writeln!(result, "<part {idx}>");
2147
2148 let part = EncodedPart::fetch(
2149 &datadriven.shard_id,
2150 datadriven.client.blob.as_ref(),
2151 datadriven.client.metrics.as_ref(),
2152 datadriven.machine.applier.shard_metrics.as_ref(),
2153 &datadriven.client.metrics.read.batch_fetcher,
2154 &batch.desc,
2155 part,
2156 )
2157 .await
2158 .expect("invalid batch part");
2159 let part = part
2160 .normalize(&datadriven.client.metrics.columnar)
2161 .into_part::<String, ()>(&*SCHEMAS.key, &*SCHEMAS.val);
2162
2163 let mut updates = Vec::new();
2164
2165 for ((k, _v), mut t, d) in part
2166 .decode_iter::<_, _, u64, i64>(&*SCHEMAS.key, &*SCHEMAS.val)
2167 .expect("valid schemas")
2168 {
2169 t.advance_by(as_of.borrow());
2170 updates.push((k, t, d));
2171 }
2172
2173 consolidate_updates(&mut updates);
2174
2175 for (k, t, d) in updates {
2176 writeln!(result, "{k} {t} {d}");
2177 }
2178 }
2179 }
2180 }
2181
2182 Ok(result)
2183 }
2184
2185 pub async fn register_listen(
2186 datadriven: &mut MachineState,
2187 args: DirectiveArgs<'_>,
2188 ) -> Result<String, anyhow::Error> {
2189 let output = args.expect_str("output");
2190 let as_of = args.expect_antichain("as_of");
2191 let read = datadriven
2192 .client
2193 .open_leased_reader::<String, (), u64, i64>(
2194 datadriven.shard_id,
2195 Arc::new(StringSchema),
2196 Arc::new(UnitSchema),
2197 Diagnostics::for_tests(),
2198 true,
2199 )
2200 .await
2201 .expect("invalid shard types");
2202 let listen = read
2203 .listen(as_of)
2204 .await
2205 .map_err(|err| anyhow!("{:?}", err))?;
2206 datadriven.listens.insert(output.to_owned(), listen);
2207 Ok("ok\n".into())
2208 }
2209
2210 pub async fn listen_through(
2211 datadriven: &mut MachineState,
2212 args: DirectiveArgs<'_>,
2213 ) -> Result<String, anyhow::Error> {
2214 let input = args.expect_str("input");
2215 let frontier = args.expect("frontier");
2218 let listen = datadriven.listens.get_mut(input).expect("unknown listener");
2219 let mut s = String::new();
2220 loop {
2221 for event in listen.fetch_next().await {
2222 match event {
2223 ListenEvent::Updates(x) => {
2224 for ((k, _v), t, d) in x.iter() {
2225 write!(s, "{} {} {}\n", k.as_ref().unwrap(), t, d);
2226 }
2227 }
2228 ListenEvent::Progress(x) => {
2229 if !x.less_than(&frontier) {
2230 return Ok(s);
2231 }
2232 }
2233 }
2234 }
2235 }
2236 }
2237
2238 pub async fn register_critical_reader(
2239 datadriven: &mut MachineState,
2240 args: DirectiveArgs<'_>,
2241 ) -> Result<String, anyhow::Error> {
2242 let reader_id = args.expect("reader_id");
2243 let (state, maintenance) = datadriven
2244 .machine
2245 .register_critical_reader::<u64>(&reader_id, "tests")
2246 .await;
2247 datadriven.routine.push(maintenance);
2248 Ok(format!(
2249 "{} {:?}\n",
2250 datadriven.machine.seqno(),
2251 state.since.elements(),
2252 ))
2253 }
2254
2255 pub async fn register_leased_reader(
2256 datadriven: &mut MachineState,
2257 args: DirectiveArgs<'_>,
2258 ) -> Result<String, anyhow::Error> {
2259 let reader_id = args.expect("reader_id");
2260 let (reader_state, maintenance) = datadriven
2261 .machine
2262 .register_leased_reader(
2263 &reader_id,
2264 "tests",
2265 READER_LEASE_DURATION.get(&datadriven.client.cfg),
2266 (datadriven.client.cfg.now)(),
2267 false,
2268 )
2269 .await;
2270 datadriven.routine.push(maintenance);
2271 Ok(format!(
2272 "{} {:?}\n",
2273 datadriven.machine.seqno(),
2274 reader_state.since.elements(),
2275 ))
2276 }
2277
2278 pub async fn heartbeat_leased_reader(
2279 datadriven: &MachineState,
2280 args: DirectiveArgs<'_>,
2281 ) -> Result<String, anyhow::Error> {
2282 let reader_id = args.expect("reader_id");
2283 let _ = datadriven
2284 .machine
2285 .heartbeat_leased_reader(&reader_id, (datadriven.client.cfg.now)())
2286 .await;
2287 Ok(format!("{} ok\n", datadriven.machine.seqno()))
2288 }
2289
2290 pub async fn expire_critical_reader(
2291 datadriven: &mut MachineState,
2292 args: DirectiveArgs<'_>,
2293 ) -> Result<String, anyhow::Error> {
2294 let reader_id = args.expect("reader_id");
2295 let (_, maintenance) = datadriven.machine.expire_critical_reader(&reader_id).await;
2296 datadriven.routine.push(maintenance);
2297 Ok(format!("{} ok\n", datadriven.machine.seqno()))
2298 }
2299
2300 pub async fn expire_leased_reader(
2301 datadriven: &mut MachineState,
2302 args: DirectiveArgs<'_>,
2303 ) -> Result<String, anyhow::Error> {
2304 let reader_id = args.expect("reader_id");
2305 let (_, maintenance) = datadriven.machine.expire_leased_reader(&reader_id).await;
2306 datadriven.routine.push(maintenance);
2307 Ok(format!("{} ok\n", datadriven.machine.seqno()))
2308 }
2309
2310 pub async fn compare_and_append_batches(
2311 datadriven: &MachineState,
2312 args: DirectiveArgs<'_>,
2313 ) -> Result<String, anyhow::Error> {
2314 let expected_upper = args.expect_antichain("expected_upper");
2315 let new_upper = args.expect_antichain("new_upper");
2316
2317 let mut batches: Vec<Batch<String, (), u64, i64>> = args
2318 .args
2319 .get("batches")
2320 .expect("missing batches")
2321 .into_iter()
2322 .map(|batch| {
2323 let hollow = datadriven
2324 .batches
2325 .get(batch)
2326 .expect("unknown batch")
2327 .clone();
2328 datadriven.to_batch(hollow)
2329 })
2330 .collect();
2331
2332 let mut writer = datadriven
2333 .client
2334 .open_writer(
2335 datadriven.shard_id,
2336 Arc::new(StringSchema),
2337 Arc::new(UnitSchema),
2338 Diagnostics::for_tests(),
2339 )
2340 .await?;
2341
2342 let mut batch_refs: Vec<_> = batches.iter_mut().collect();
2343
2344 let () = writer
2345 .compare_and_append_batch(batch_refs.as_mut_slice(), expected_upper, new_upper)
2346 .await?
2347 .map_err(|err| anyhow!("upper mismatch: {:?}", err))?;
2348
2349 writer.expire().await;
2350
2351 Ok("ok\n".into())
2352 }
2353
2354 pub async fn expire_writer(
2355 datadriven: &mut MachineState,
2356 args: DirectiveArgs<'_>,
2357 ) -> Result<String, anyhow::Error> {
2358 let writer_id = args.expect("writer_id");
2359 let (_, maintenance) = datadriven.machine.expire_writer(&writer_id).await;
2360 datadriven.routine.push(maintenance);
2361 Ok(format!("{} ok\n", datadriven.machine.seqno()))
2362 }
2363
2364 pub(crate) async fn finalize(
2365 datadriven: &mut MachineState,
2366 _args: DirectiveArgs<'_>,
2367 ) -> anyhow::Result<String> {
2368 let maintenance = datadriven.machine.become_tombstone().await?;
2369 datadriven.routine.push(maintenance);
2370 Ok(format!("{} ok\n", datadriven.machine.seqno()))
2371 }
2372
2373 pub(crate) fn is_finalized(
2374 datadriven: &MachineState,
2375 _args: DirectiveArgs<'_>,
2376 ) -> anyhow::Result<String> {
2377 let seqno = datadriven.machine.seqno();
2378 let tombstone = datadriven.machine.is_finalized();
2379 Ok(format!("{seqno} {tombstone}\n"))
2380 }
2381
2382 pub async fn compare_and_append(
2383 datadriven: &mut MachineState,
2384 args: DirectiveArgs<'_>,
2385 ) -> Result<String, anyhow::Error> {
2386 let input = args.expect_str("input");
2387 let writer_id = args.expect("writer_id");
2388 let mut batch = datadriven
2389 .batches
2390 .get(input)
2391 .expect("unknown batch")
2392 .clone();
2393 let token = args.optional("token").unwrap_or_else(IdempotencyToken::new);
2394 let now = (datadriven.client.cfg.now)();
2395
2396 let (id, maintenance) = datadriven
2397 .machine
2398 .register_schema(&*SCHEMAS.key, &*SCHEMAS.val)
2399 .await;
2400 assert_eq!(id, SCHEMAS.id);
2401 datadriven.routine.push(maintenance);
2402 let maintenance = loop {
2403 let indeterminate = args
2404 .optional::<String>("prev_indeterminate")
2405 .map(|x| Indeterminate::new(anyhow::Error::msg(x)));
2406 let res = datadriven
2407 .machine
2408 .compare_and_append_idempotent(
2409 &batch,
2410 &writer_id,
2411 now,
2412 &token,
2413 &HandleDebugState::default(),
2414 indeterminate,
2415 )
2416 .await;
2417 match res {
2418 CompareAndAppendRes::Success(_, x) => break x,
2419 CompareAndAppendRes::UpperMismatch(_seqno, upper) => {
2420 return Err(anyhow!("{:?}", Upper(upper)));
2421 }
2422 CompareAndAppendRes::InlineBackpressure => {
2423 let mut b = datadriven.to_batch(batch.clone());
2424 let cfg = BatchBuilderConfig::new(&datadriven.client.cfg, datadriven.shard_id);
2425 b.flush_to_blob(
2426 &cfg,
2427 &datadriven.client.metrics.user,
2428 &datadriven.client.isolated_runtime,
2429 &*SCHEMAS,
2430 )
2431 .await;
2432 batch = b.into_hollow_batch();
2433 continue;
2434 }
2435 _ => panic!("{:?}", res),
2436 };
2437 };
2438 datadriven.routine.push(maintenance.routine);
2441 Ok(format!(
2442 "{} {:?}\n",
2443 datadriven.machine.seqno(),
2444 datadriven.machine.applier.clone_upper().elements(),
2445 ))
2446 }
2447
2448 pub async fn apply_merge_res(
2449 datadriven: &mut MachineState,
2450 args: DirectiveArgs<'_>,
2451 ) -> Result<String, anyhow::Error> {
2452 let input = args.expect_str("input");
2453 let batch = datadriven
2454 .batches
2455 .get(input)
2456 .expect("unknown batch")
2457 .clone();
2458 let (merge_res, maintenance) = datadriven
2459 .machine
2460 .merge_res(&FueledMergeRes {
2461 output: batch,
2462 new_active_compaction: None,
2463 })
2464 .await;
2465 datadriven.routine.push(maintenance);
2466 Ok(format!(
2467 "{} {}\n",
2468 datadriven.machine.seqno(),
2469 merge_res.applied()
2470 ))
2471 }
2472
2473 pub async fn perform_maintenance(
2474 datadriven: &mut MachineState,
2475 _args: DirectiveArgs<'_>,
2476 ) -> Result<String, anyhow::Error> {
2477 let mut s = String::new();
2478 for maintenance in datadriven.routine.drain(..) {
2479 let () = maintenance
2480 .perform(&datadriven.machine, &datadriven.gc)
2481 .await;
2482 let () = datadriven
2483 .machine
2484 .applier
2485 .fetch_and_update_state(None)
2486 .await;
2487 write!(s, "{} ok\n", datadriven.machine.seqno());
2488 }
2489 Ok(s)
2490 }
2491}
2492
2493#[cfg(test)]
2494pub mod tests {
2495 use std::sync::Arc;
2496
2497 use mz_dyncfg::ConfigUpdates;
2498 use mz_ore::cast::CastFrom;
2499 use mz_ore::task::spawn;
2500 use mz_persist::intercept::{InterceptBlob, InterceptHandle};
2501 use mz_persist::location::SeqNo;
2502 use timely::progress::Antichain;
2503
2504 use crate::ShardId;
2505 use crate::batch::BatchBuilderConfig;
2506 use crate::cache::StateCache;
2507 use crate::internal::gc::{GarbageCollector, GcReq};
2508 use crate::internal::state::{HandleDebugState, ROLLUP_THRESHOLD};
2509 use crate::tests::new_test_client;
2510
2511 #[mz_persist_proc::test(tokio::test(flavor = "multi_thread"))]
2512 #[cfg_attr(miri, ignore)] async fn apply_unbatched_cmd_truncate(dyncfgs: ConfigUpdates) {
2514 mz_ore::test::init_logging();
2515
2516 let client = new_test_client(&dyncfgs).await;
2517 client.cfg.set_config(&ROLLUP_THRESHOLD, 5);
2519 let (mut write, _) = client
2520 .expect_open::<String, (), u64, i64>(ShardId::new())
2521 .await;
2522
2523 const NUM_BATCHES: u64 = 100;
2526 for idx in 0..NUM_BATCHES {
2527 let mut batch = write
2528 .expect_batch(&[((idx.to_string(), ()), idx, 1)], idx, idx + 1)
2529 .await;
2530 let cfg = BatchBuilderConfig::new(&client.cfg, write.shard_id());
2533 batch
2534 .flush_to_blob(
2535 &cfg,
2536 &client.metrics.user,
2537 &client.isolated_runtime,
2538 &write.write_schemas,
2539 )
2540 .await;
2541 let (_, writer_maintenance) = write
2542 .machine
2543 .compare_and_append(
2544 &batch.into_hollow_batch(),
2545 &write.writer_id,
2546 &HandleDebugState::default(),
2547 (write.cfg.now)(),
2548 )
2549 .await
2550 .unwrap();
2551 writer_maintenance
2552 .perform(&write.machine, &write.gc, write.compact.as_ref())
2553 .await;
2554 }
2555 let live_diffs = write
2556 .machine
2557 .applier
2558 .state_versions
2559 .fetch_all_live_diffs(&write.machine.shard_id())
2560 .await;
2561 assert!(live_diffs.0.len() > 0);
2563 let max_live_diffs = 2 * usize::cast_from(NUM_BATCHES.next_power_of_two().trailing_zeros());
2567 assert!(
2568 live_diffs.0.len() <= max_live_diffs,
2569 "{} vs {}",
2570 live_diffs.0.len(),
2571 max_live_diffs
2572 );
2573 }
2574
2575 #[mz_persist_proc::test(tokio::test(flavor = "multi_thread"))]
2579 #[cfg_attr(miri, ignore)] async fn regression_gc_skipped_req_and_interrupted(dyncfgs: ConfigUpdates) {
2581 let mut client = new_test_client(&dyncfgs).await;
2582 let intercept = InterceptHandle::default();
2583 client.blob = Arc::new(InterceptBlob::new(
2584 Arc::clone(&client.blob),
2585 intercept.clone(),
2586 ));
2587 let (_, mut read) = client
2588 .expect_open::<String, String, u64, i64>(ShardId::new())
2589 .await;
2590
2591 read.downgrade_since(&Antichain::from_elem(1)).await;
2593 let new_seqno_since = read.machine.applier.seqno_since();
2594
2595 assert!(new_seqno_since > SeqNo::minimum());
2600 intercept.set_post_delete(Some(Arc::new(|_, _| panic!("boom"))));
2601 let machine = read.machine.clone();
2602 let gc = spawn(|| "", async move {
2604 let req = GcReq {
2605 shard_id: machine.shard_id(),
2606 new_seqno_since,
2607 };
2608 GarbageCollector::gc_and_truncate(&machine, req).await
2609 });
2610 let _ = gc.await;
2613
2614 intercept.set_post_delete(None);
2617 let req = GcReq {
2618 shard_id: read.machine.shard_id(),
2619 new_seqno_since,
2620 };
2621 let _ = GarbageCollector::gc_and_truncate(&read.machine, req.clone()).await;
2622 }
2623
2624 #[mz_persist_proc::test(tokio::test(flavor = "multi_thread"))]
2629 #[cfg_attr(miri, ignore)] async fn regression_update_state_after_upper_mismatch(dyncfgs: ConfigUpdates) {
2631 let client = new_test_client(&dyncfgs).await;
2632 let mut client2 = client.clone();
2633
2634 let new_state_cache = Arc::new(StateCache::new_no_metrics());
2637 client2.shared_states = new_state_cache;
2638
2639 let shard_id = ShardId::new();
2640 let (mut write1, _) = client.expect_open::<String, (), u64, i64>(shard_id).await;
2641 let (mut write2, _) = client2.expect_open::<String, (), u64, i64>(shard_id).await;
2642
2643 let data = [
2644 (("1".to_owned(), ()), 1, 1),
2645 (("2".to_owned(), ()), 2, 1),
2646 (("3".to_owned(), ()), 3, 1),
2647 (("4".to_owned(), ()), 4, 1),
2648 (("5".to_owned(), ()), 5, 1),
2649 ];
2650
2651 write1.expect_compare_and_append(&data[..1], 0, 2).await;
2652
2653 write2.expect_compare_and_append(&data[1..2], 2, 3).await;
2656 }
2657}