1use std::any::Any;
13use std::fmt::Debug;
14use std::str::FromStr;
15use std::sync::Arc;
16use std::time::{Duration, Instant};
17
18use anyhow::{anyhow, bail};
19use differential_dataflow::difference::Semigroup;
20use differential_dataflow::lattice::Lattice;
21use futures_util::{StreamExt, TryStreamExt, stream};
22use mz_dyncfg::{Config, ConfigSet};
23use mz_ore::metrics::MetricsRegistry;
24use mz_ore::now::SYSTEM_TIME;
25use mz_ore::url::SensitiveUrl;
26use mz_persist::location::{Blob, Consensus, ExternalError};
27use mz_persist_types::codec_impls::TodoSchema;
28use mz_persist_types::{Codec, Codec64};
29use prometheus::proto::{MetricFamily, MetricType};
30use semver::Version;
31use timely::progress::{Antichain, Timestamp};
32use tracing::{info, warn};
33
34use crate::async_runtime::IsolatedRuntime;
35use crate::cache::StateCache;
36use crate::cfg::{COMPACTION_MEMORY_BOUND_BYTES, all_dyncfgs};
37use crate::cli::args::{StateArgs, StoreArgs, make_blob, make_consensus};
38use crate::cli::inspect::FAKE_OPAQUE_CODEC;
39use crate::internal::compact::{CompactConfig, CompactReq, Compactor};
40use crate::internal::encoding::Schemas;
41use crate::internal::gc::{GarbageCollector, GcReq};
42use crate::internal::machine::Machine;
43use crate::internal::trace::FueledMergeRes;
44use crate::rpc::{NoopPubSubSender, PubSubSender};
45use crate::write::{WriteHandle, WriterId};
46use crate::{
47 BUILD_INFO, Diagnostics, Metrics, PersistClient, PersistConfig, ShardId, StateVersions,
48};
49
50#[derive(Debug, clap::Args)]
52pub struct AdminArgs {
53 #[clap(subcommand)]
54 command: Command,
55
56 #[clap(long)]
58 pub(crate) commit: bool,
59
60 #[clap(long)]
65 pub(crate) expected_version: Option<String>,
66}
67
68#[derive(Debug, clap::Subcommand)]
70pub(crate) enum Command {
71 ForceCompaction(ForceCompactionArgs),
73 ForceGc(ForceGcArgs),
75 Finalize(FinalizeArgs),
77 RestoreBlob(RestoreBlobArgs),
80}
81
82#[derive(Debug, clap::Parser)]
84pub(crate) struct ForceCompactionArgs {
85 #[clap(flatten)]
86 state: StateArgs,
87
88 #[clap(long, default_value_t = 0)]
90 compaction_memory_bound_bytes: usize,
91}
92
93#[derive(Debug, clap::Parser)]
95pub(crate) struct ForceGcArgs {
96 #[clap(flatten)]
97 state: StateArgs,
98}
99
100#[derive(Debug, clap::Parser)]
102pub(crate) struct FinalizeArgs {
103 #[clap(flatten)]
104 state: StateArgs,
105
106 #[clap(long, default_value_t = false)]
108 force_downgrade_since: bool,
109
110 #[clap(long, default_value_t = false)]
112 force_downgrade_upper: bool,
113}
114
115#[derive(Debug, clap::Parser)]
117pub(crate) struct RestoreBlobArgs {
118 #[clap(flatten)]
119 state: StoreArgs,
120
121 #[clap(long, default_value_t = 16)]
123 concurrency: usize,
124}
125
126pub async fn run(command: AdminArgs) -> Result<(), anyhow::Error> {
128 match command.command {
129 Command::ForceCompaction(args) => {
130 let shard_id = ShardId::from_str(&args.state.shard_id).expect("invalid shard id");
131 let configs = all_dyncfgs(ConfigSet::default());
132 let cfg = PersistConfig::new(&BUILD_INFO, SYSTEM_TIME.clone(), configs);
134 cfg.set_config(
135 &COMPACTION_MEMORY_BOUND_BYTES,
136 args.compaction_memory_bound_bytes,
137 );
138
139 let metrics_registry = MetricsRegistry::new();
140 let expected_version = command
141 .expected_version
142 .as_ref()
143 .map(|v| Version::parse(v))
144 .transpose()?;
145 let () = force_compaction::<crate::cli::inspect::K, crate::cli::inspect::V, u64, i64>(
146 cfg,
147 &metrics_registry,
148 shard_id,
149 &args.state.consensus_uri,
150 &args.state.blob_uri,
151 Arc::new(TodoSchema::default()),
152 Arc::new(TodoSchema::default()),
153 command.commit,
154 expected_version,
155 )
156 .await?;
157 info_log_non_zero_metrics(&metrics_registry.gather());
158 }
159 Command::ForceGc(args) => {
160 let shard_id = ShardId::from_str(&args.state.shard_id).expect("invalid shard id");
161 let configs = all_dyncfgs(ConfigSet::default());
162 let cfg = PersistConfig::new(&BUILD_INFO, SYSTEM_TIME.clone(), configs);
164 let metrics_registry = MetricsRegistry::new();
165 let expected_version = command
166 .expected_version
167 .as_ref()
168 .map(|v| Version::parse(v))
169 .transpose()?;
170 let _machine = force_gc(
173 cfg,
174 &metrics_registry,
175 shard_id,
176 &args.state.consensus_uri,
177 &args.state.blob_uri,
178 command.commit,
179 expected_version,
180 )
181 .await?;
182 info_log_non_zero_metrics(&metrics_registry.gather());
183 }
184 Command::Finalize(args) => {
185 let FinalizeArgs {
186 state:
187 StateArgs {
188 shard_id,
189 consensus_uri,
190 blob_uri,
191 },
192 force_downgrade_since,
193 force_downgrade_upper,
194 } = args;
195 let shard_id = ShardId::from_str(&shard_id).expect("invalid shard id");
196 let commit = command.commit;
197 let expected_version = command
198 .expected_version
199 .as_ref()
200 .map(|v| Version::parse(v))
201 .transpose()?;
202
203 let configs = all_dyncfgs(ConfigSet::default());
204 let cfg = PersistConfig::new(&BUILD_INFO, SYSTEM_TIME.clone(), configs);
206 let metrics_registry = MetricsRegistry::new();
207 let metrics = Arc::new(Metrics::new(&cfg, &metrics_registry));
208 let consensus =
209 make_consensus(&cfg, &consensus_uri, commit, Arc::clone(&metrics)).await?;
210 let blob = make_blob(&cfg, &blob_uri, commit, Arc::clone(&metrics)).await?;
211
212 let machine = make_machine(
215 &cfg,
216 Arc::clone(&consensus),
217 Arc::clone(&blob),
218 Arc::clone(&metrics),
219 shard_id,
220 commit,
221 expected_version,
222 )
223 .await?;
224
225 if force_downgrade_upper {
226 let isolated_runtime = Arc::new(IsolatedRuntime::new(&metrics_registry, None));
227 let pubsub_sender: Arc<dyn PubSubSender> = Arc::new(NoopPubSubSender);
228 let shared_states = Arc::new(StateCache::new(
229 &cfg,
230 Arc::clone(&metrics),
231 Arc::clone(&pubsub_sender),
232 ));
233
234 let persist_client = PersistClient::new(
236 cfg,
237 blob,
238 consensus,
239 metrics,
240 isolated_runtime,
241 shared_states,
242 pubsub_sender,
243 )?;
244 let diagnostics = Diagnostics {
245 shard_name: shard_id.to_string(),
246 handle_purpose: "persist-cli finalize shard".to_string(),
247 };
248
249 let mut write_handle: WriteHandle<
250 crate::cli::inspect::K,
251 crate::cli::inspect::V,
252 u64,
253 i64,
254 > = persist_client
255 .open_writer(
256 shard_id,
257 Arc::new(TodoSchema::<crate::cli::inspect::K>::default()),
258 Arc::new(TodoSchema::<crate::cli::inspect::V>::default()),
259 diagnostics,
260 )
261 .await?;
262
263 if !write_handle.upper().is_empty() {
264 let empty_batch: Vec<(
265 (crate::cli::inspect::K, crate::cli::inspect::V),
266 u64,
267 i64,
268 )> = vec![];
269 let lower = write_handle.upper().clone();
270 let upper = Antichain::new();
271
272 let result = write_handle.append(empty_batch, lower, upper).await?;
273 if let Err(err) = result {
274 anyhow::bail!("failed to force downgrade upper, {err:?}");
275 }
276 }
277 }
278
279 if force_downgrade_since {
280 let (state, _maintenance) = machine
281 .register_critical_reader::<crate::cli::inspect::O>(
282 &crate::PersistClient::CONTROLLER_CRITICAL_SINCE,
283 "persist-cli finalize with force downgrade",
284 )
285 .await;
286
287 let expected_opaque = crate::cli::inspect::O::decode(state.opaque.0);
292 FAKE_OPAQUE_CODEC
293 .lock()
294 .expect("lockable")
295 .clone_from(&state.opaque_codec);
296
297 let (result, _maintenance) = machine
298 .compare_and_downgrade_since(
299 &crate::PersistClient::CONTROLLER_CRITICAL_SINCE,
300 &expected_opaque,
301 (&expected_opaque, &Antichain::new()),
302 )
303 .await;
304 if let Err((actual_opaque, _since)) = result {
305 bail!(
306 "opaque changed, expected: {expected_opaque:?}, actual: {actual_opaque:?}"
307 )
308 }
309 }
310
311 let maintenance = machine.become_tombstone().await?;
312 if !maintenance.is_empty() {
313 info!("ignoring non-empty requested maintenance: {maintenance:?}")
314 }
315 info_log_non_zero_metrics(&metrics_registry.gather());
316 }
317 Command::RestoreBlob(args) => {
318 let RestoreBlobArgs {
319 state:
320 StoreArgs {
321 consensus_uri,
322 blob_uri,
323 },
324 concurrency,
325 } = args;
326 let commit = command.commit;
327 let configs = all_dyncfgs(ConfigSet::default());
328 let cfg = PersistConfig::new(&BUILD_INFO, SYSTEM_TIME.clone(), configs);
330 let metrics_registry = MetricsRegistry::new();
331 let metrics = Arc::new(Metrics::new(&cfg, &metrics_registry));
332 let consensus =
333 make_consensus(&cfg, &consensus_uri, commit, Arc::clone(&metrics)).await?;
334 let blob = make_blob(&cfg, &blob_uri, commit, Arc::clone(&metrics)).await?;
335 let versions = StateVersions::new(
336 cfg.clone(),
337 Arc::clone(&consensus),
338 Arc::clone(&blob),
339 Arc::clone(&metrics),
340 );
341
342 let not_restored: Vec<_> = consensus
343 .list_keys()
344 .flat_map_unordered(concurrency, |shard| {
345 stream::once(Box::pin(async {
346 let shard_id = shard?;
347 let shard_id = ShardId::from_str(&shard_id).expect("invalid shard id");
348 let start = Instant::now();
349 info!("Restoring blob state for shard {shard_id}.",);
350 let shard_not_restored = crate::internal::restore::restore_blob(
351 &versions,
352 blob.as_ref(),
353 &cfg.build_version,
354 shard_id,
355 &*metrics,
356 )
357 .await?;
358 info!(
359 "Restored blob state for shard {shard_id}; {} errors, {:?} elapsed.",
360 shard_not_restored.len(),
361 start.elapsed()
362 );
363 Ok::<_, ExternalError>(shard_not_restored)
364 }))
365 })
366 .try_fold(vec![], |mut a, b| async move {
367 a.extend(b);
368 Ok(a)
369 })
370 .await?;
371
372 info_log_non_zero_metrics(&metrics_registry.gather());
373 if !not_restored.is_empty() {
374 bail!("referenced blobs were not restored: {not_restored:#?}")
375 }
376 }
377 }
378 Ok(())
379}
380
381pub(crate) fn info_log_non_zero_metrics(metric_families: &[MetricFamily]) {
382 for mf in metric_families {
383 for m in mf.get_metric() {
384 let val = match mf.get_field_type() {
385 MetricType::COUNTER => m.get_counter().get_value(),
386 MetricType::GAUGE => m.get_gauge().get_value(),
387 x => {
388 info!("unhandled {} metric type: {:?}", mf.get_name(), x);
389 continue;
390 }
391 };
392 if val == 0.0 {
393 continue;
394 }
395 let label_pairs = m.get_label();
396 let mut labels = String::new();
397 if !label_pairs.is_empty() {
398 labels.push_str("{");
399 for lb in label_pairs {
400 if labels != "{" {
401 labels.push_str(",");
402 }
403 labels.push_str(lb.get_name());
404 labels.push_str(":");
405 labels.push_str(lb.get_value());
406 }
407 labels.push_str("}");
408 }
409 info!("{}{} {}", mf.get_name(), labels, val);
410 }
411 }
412}
413
414pub async fn force_compaction<K, V, T, D>(
416 cfg: PersistConfig,
417 metrics_registry: &MetricsRegistry,
418 shard_id: ShardId,
419 consensus_uri: &SensitiveUrl,
420 blob_uri: &SensitiveUrl,
421 key_schema: Arc<K::Schema>,
422 val_schema: Arc<V::Schema>,
423 commit: bool,
424 expected_version: Option<Version>,
425) -> Result<(), anyhow::Error>
426where
427 K: Debug + Codec,
428 V: Debug + Codec,
429 T: Timestamp + Lattice + Codec64 + Sync,
430 D: Semigroup + Ord + Codec64 + Send + Sync,
431{
432 let metrics = Arc::new(Metrics::new(&cfg, metrics_registry));
433 let consensus = make_consensus(&cfg, consensus_uri, commit, Arc::clone(&metrics)).await?;
434 let blob = make_blob(&cfg, blob_uri, commit, Arc::clone(&metrics)).await?;
435
436 let machine = make_typed_machine::<K, V, T, D>(
437 &cfg,
438 consensus,
439 Arc::clone(&blob),
440 Arc::clone(&metrics),
441 shard_id,
442 commit,
443 expected_version,
444 )
445 .await?;
446
447 let writer_id = WriterId::new();
448
449 let mut attempt = 0;
450 'outer: loop {
451 machine.applier.fetch_and_update_state(None).await;
452 let reqs = machine.applier.all_fueled_merge_reqs();
453 info!("attempt {}: got {} compaction reqs", attempt, reqs.len());
454 for (idx, req) in reqs.clone().into_iter().enumerate() {
455 let req = CompactReq {
456 shard_id,
457 desc: req.desc,
458 inputs: req.inputs,
459 };
460 let parts = req
461 .inputs
462 .iter()
463 .map(|x| x.batch.part_count())
464 .sum::<usize>();
465 let bytes = req
466 .inputs
467 .iter()
468 .map(|x| x.batch.encoded_size_bytes())
469 .sum::<usize>();
470 let start = Instant::now();
471 info!(
472 "attempt {} req {}: compacting {} batches {} in parts {} totaling bytes: lower={:?} upper={:?} since={:?}",
473 attempt,
474 idx,
475 req.inputs.len(),
476 parts,
477 bytes,
478 req.desc.lower().elements(),
479 req.desc.upper().elements(),
480 req.desc.since().elements(),
481 );
482 if !commit {
483 info!("skipping compaction because --commit is not set");
484 continue;
485 }
486 let schemas = Schemas {
487 id: None,
488 key: Arc::clone(&key_schema),
489 val: Arc::clone(&val_schema),
490 };
491
492 let res = Compactor::<K, V, T, D>::compact(
493 CompactConfig::new(&cfg, shard_id),
494 Arc::clone(&blob),
495 Arc::clone(&metrics),
496 Arc::clone(&machine.applier.shard_metrics),
497 Arc::new(IsolatedRuntime::new(
498 metrics_registry,
499 Some(cfg.isolated_runtime_worker_threads),
500 )),
501 req,
502 schemas,
503 )
504 .await?;
505 metrics.compaction.admin_count.inc();
506 info!(
507 "attempt {} req {}: compacted into {} parts {} bytes in {:?}",
508 attempt,
509 idx,
510 res.output.part_count(),
511 res.output.encoded_size_bytes(),
512 start.elapsed(),
513 );
514 let (apply_res, maintenance) = machine
515 .merge_res(&FueledMergeRes {
516 output: res.output,
517 input: res.input,
518 new_active_compaction: None,
519 })
520 .await;
521 if !maintenance.is_empty() {
522 info!("ignoring non-empty requested maintenance: {maintenance:?}")
523 }
524 if apply_res.applied() {
525 info!("attempt {} req {}: {:?}", attempt, idx, apply_res);
526 } else {
527 info!(
528 "attempt {} req {}: {:?} trying again",
529 attempt, idx, apply_res
530 );
531 attempt += 1;
532 continue 'outer;
533 }
534 }
535 info!("attempt {}: did {} compactions", attempt, reqs.len());
536 let _ = machine.expire_writer(&writer_id).await;
537 info!("expired writer {}", writer_id);
538 return Ok(());
539 }
540}
541
542async fn make_machine(
543 cfg: &PersistConfig,
544 consensus: Arc<dyn Consensus>,
545 blob: Arc<dyn Blob>,
546 metrics: Arc<Metrics>,
547 shard_id: ShardId,
548 commit: bool,
549 expected_version: Option<Version>,
550) -> anyhow::Result<Machine<crate::cli::inspect::K, crate::cli::inspect::V, u64, i64>> {
551 make_typed_machine::<crate::cli::inspect::K, crate::cli::inspect::V, u64, i64>(
552 cfg,
553 consensus,
554 blob,
555 metrics,
556 shard_id,
557 commit,
558 expected_version,
559 )
560 .await
561}
562
563async fn make_typed_machine<K, V, T, D>(
564 cfg: &PersistConfig,
565 consensus: Arc<dyn Consensus>,
566 blob: Arc<dyn Blob>,
567 metrics: Arc<Metrics>,
568 shard_id: ShardId,
569 commit: bool,
570 expected_version: Option<Version>,
571) -> anyhow::Result<Machine<K, V, T, D>>
572where
573 K: Debug + Codec,
574 V: Debug + Codec,
575 T: Timestamp + Lattice + Codec64 + Sync,
576 D: Semigroup + Codec64,
577{
578 let state_versions = Arc::new(StateVersions::new(
579 cfg.clone(),
580 consensus,
581 blob,
582 Arc::clone(&metrics),
583 ));
584
585 let versions = state_versions
587 .fetch_recent_live_diffs::<u64>(&shard_id)
588 .await;
589
590 loop {
591 let state_res = state_versions
592 .fetch_current_state::<u64>(&shard_id, versions.0.clone())
593 .await
594 .check_codecs::<crate::cli::inspect::K, crate::cli::inspect::V, i64>(&shard_id);
595 let state = match state_res {
596 Ok(state) => state,
597 Err(codec) => {
598 let mut kvtd = crate::cli::inspect::KVTD_CODECS.lock().expect("lockable");
599 *kvtd = codec.actual;
600 continue;
601 }
602 };
603 let safe_version_change = match (commit, expected_version) {
607 (false, _) => cfg.build_version >= state.applier_version,
609 (true, None) => cfg.build_version == state.applier_version,
611 (true, Some(expected)) => {
613 state.applier_version == expected && expected <= cfg.build_version
621 }
622 };
623 if !safe_version_change {
624 return Err(anyhow!(
626 "version of this tool {} does not match version of state {} when --commit is {commit}. bailing so we don't corrupt anything",
627 cfg.build_version,
628 state.applier_version
629 ));
630 }
631 break;
632 }
633
634 let machine = Machine::<K, V, T, D>::new(
635 cfg.clone(),
636 shard_id,
637 Arc::clone(&metrics),
638 state_versions,
639 Arc::new(StateCache::new(cfg, metrics, Arc::new(NoopPubSubSender))),
640 Arc::new(NoopPubSubSender),
641 Arc::new(IsolatedRuntime::new(
642 &MetricsRegistry::new(),
643 Some(cfg.isolated_runtime_worker_threads),
644 )),
645 Diagnostics::from_purpose("admin"),
646 )
647 .await?;
648
649 Ok(machine)
650}
651
652async fn force_gc(
653 cfg: PersistConfig,
654 metrics_registry: &MetricsRegistry,
655 shard_id: ShardId,
656 consensus_uri: &SensitiveUrl,
657 blob_uri: &SensitiveUrl,
658 commit: bool,
659 expected_version: Option<Version>,
660) -> anyhow::Result<Box<dyn Any>> {
661 let metrics = Arc::new(Metrics::new(&cfg, metrics_registry));
662 let consensus = make_consensus(&cfg, consensus_uri, commit, Arc::clone(&metrics)).await?;
663 let blob = make_blob(&cfg, blob_uri, commit, Arc::clone(&metrics)).await?;
664 let machine = make_machine(
665 &cfg,
666 consensus,
667 blob,
668 metrics,
669 shard_id,
670 commit,
671 expected_version,
672 )
673 .await?;
674 let gc_req = GcReq {
675 shard_id,
676 new_seqno_since: machine.applier.seqno_since(),
677 };
678 let (maintenance, _stats) = GarbageCollector::gc_and_truncate(&machine, gc_req).await;
679 if !maintenance.is_empty() {
680 info!("ignoring non-empty requested maintenance: {maintenance:?}")
681 }
682
683 Ok(Box::new(machine))
684}
685
686pub const CATALOG_FORCE_COMPACTION_FUEL: Config<usize> = Config::new(
688 "persist_catalog_force_compaction_fuel",
689 1024,
690 "fuel to use in catalog dangerous_force_compaction task",
691);
692
693pub const CATALOG_FORCE_COMPACTION_WAIT: Config<Duration> = Config::new(
695 "persist_catalog_force_compaction_wait",
696 Duration::from_secs(60),
697 "wait to use in catalog dangerous_force_compaction task",
698);
699
700pub const EXPRESSION_CACHE_FORCE_COMPACTION_FUEL: Config<usize> = Config::new(
702 "persist_expression_cache_force_compaction_fuel",
703 131_072,
704 "fuel to use in expression cache dangerous_force_compaction",
705);
706
707pub const EXPRESSION_CACHE_FORCE_COMPACTION_WAIT: Config<Duration> = Config::new(
709 "persist_expression_cache_force_compaction_wait",
710 Duration::from_secs(0),
711 "wait to use in expression cache dangerous_force_compaction",
712);
713
714pub async fn dangerous_force_compaction_and_break_pushdown<K, V, T, D>(
733 write: &WriteHandle<K, V, T, D>,
734 fuel: impl Fn() -> usize,
735 wait: impl Fn() -> Duration,
736) where
737 K: Debug + Codec,
738 V: Debug + Codec,
739 T: Timestamp + Lattice + Codec64 + Sync,
740 D: Semigroup + Ord + Codec64 + Send + Sync,
741{
742 let machine = write.machine.clone();
743
744 let mut last_exert: Instant;
745
746 loop {
747 last_exert = Instant::now();
748 let fuel = fuel();
749 let (reqs, mut maintenance) = machine.spine_exert(fuel).await;
750 for req in reqs {
751 info!(
752 "force_compaction {} {} compacting {} batches in {} parts with {} runs totaling {} bytes: lower={:?} upper={:?} since={:?}",
753 machine.applier.shard_metrics.name,
754 machine.applier.shard_metrics.shard_id,
755 req.inputs.len(),
756 req.inputs.iter().flat_map(|x| &x.batch.parts).count(),
757 req.inputs
758 .iter()
759 .map(|x| x.batch.runs().count())
760 .sum::<usize>(),
761 req.inputs
762 .iter()
763 .flat_map(|x| &x.batch.parts)
764 .map(|x| x.encoded_size_bytes())
765 .sum::<usize>(),
766 req.desc.lower().elements(),
767 req.desc.upper().elements(),
768 req.desc.since().elements(),
769 );
770 machine.applier.metrics.compaction.requested.inc();
771 let start = Instant::now();
772 let res = Compactor::<K, V, T, D>::compact_and_apply(
773 &machine,
774 req,
775 write.write_schemas.clone(),
776 )
777 .await;
778 let apply_maintenance = match res {
779 Ok(x) => x,
780 Err(err) => {
781 warn!(
782 "force_compaction {} {} errored in compaction: {:?}",
783 machine.applier.shard_metrics.name,
784 machine.applier.shard_metrics.shard_id,
785 err
786 );
787 continue;
788 }
789 };
790 machine.applier.metrics.compaction.admin_count.inc();
791 info!(
792 "force_compaction {} {} compacted in {:?}",
793 machine.applier.shard_metrics.name,
794 machine.applier.shard_metrics.shard_id,
795 start.elapsed(),
796 );
797 maintenance.merge(apply_maintenance);
798 }
799 maintenance.perform(&machine, &write.gc).await;
800
801 let next_exert = last_exert + wait();
804 tokio::time::sleep_until(next_exert.into()).await;
805
806 let num_runs: usize = machine
809 .applier
810 .all_batches()
811 .iter()
812 .map(|x| x.runs().count())
813 .sum();
814 if num_runs <= 1 {
815 info!(
816 "force_compaction {} {} exiting with {} runs",
817 machine.applier.shard_metrics.name,
818 machine.applier.shard_metrics.shard_id,
819 num_runs
820 );
821 return;
822 }
823 }
824}
825
826#[cfg(test)]
827mod tests {
828 use std::time::Duration;
829
830 use mz_dyncfg::ConfigUpdates;
831 use mz_persist_types::ShardId;
832
833 use crate::tests::new_test_client;
834
835 #[mz_persist_proc::test(tokio::test)]
836 #[cfg_attr(miri, ignore)]
837 async fn dangerous_force_compaction_and_break_pushdown(dyncfgs: ConfigUpdates) {
838 let client = new_test_client(&dyncfgs).await;
839 for num_batches in 0..=17 {
840 let (mut write, _read) = client
841 .expect_open::<String, (), u64, i64>(ShardId::new())
842 .await;
843 let machine = write.machine.clone();
844
845 for idx in 0..num_batches {
846 let () = write
847 .expect_compare_and_append(&[((idx.to_string(), ()), idx, 1)], idx, idx + 1)
848 .await;
849 }
850
851 super::dangerous_force_compaction_and_break_pushdown(&write, || 1, || Duration::ZERO)
853 .await;
854 let batches_after = machine.applier.all_batches().len();
855 assert!(batches_after < 2, "{} vs {}", num_batches, batches_after);
856 }
857 }
858}