1use std::any::Any;
13use std::fmt::Debug;
14use std::str::FromStr;
15use std::sync::Arc;
16use std::time::{Duration, Instant};
17
18use anyhow::{anyhow, bail};
19use differential_dataflow::difference::Semigroup;
20use differential_dataflow::lattice::Lattice;
21use futures_util::{StreamExt, TryStreamExt, stream};
22use mz_dyncfg::{Config, ConfigSet};
23use mz_ore::metrics::MetricsRegistry;
24use mz_ore::now::SYSTEM_TIME;
25use mz_ore::url::SensitiveUrl;
26use mz_persist::location::{Blob, Consensus, ExternalError};
27use mz_persist_types::codec_impls::TodoSchema;
28use mz_persist_types::{Codec, Codec64};
29use prometheus::proto::{MetricFamily, MetricType};
30use semver::Version;
31use timely::progress::{Antichain, Timestamp};
32use tracing::{info, warn};
33
34use crate::async_runtime::IsolatedRuntime;
35use crate::cache::StateCache;
36use crate::cfg::{COMPACTION_MEMORY_BOUND_BYTES, all_dyncfgs};
37use crate::cli::args::{StateArgs, StoreArgs, make_blob, make_consensus};
38use crate::cli::inspect::FAKE_OPAQUE_CODEC;
39use crate::internal::compact::{CompactConfig, CompactReq, Compactor};
40use crate::internal::encoding::Schemas;
41use crate::internal::gc::{GarbageCollector, GcReq};
42use crate::internal::machine::Machine;
43use crate::internal::trace::FueledMergeRes;
44use crate::rpc::{NoopPubSubSender, PubSubSender};
45use crate::write::{WriteHandle, WriterId};
46use crate::{
47 BUILD_INFO, Diagnostics, Metrics, PersistClient, PersistConfig, ShardId, StateVersions,
48};
49
50#[derive(Debug, clap::Args)]
52pub struct AdminArgs {
53 #[clap(subcommand)]
54 command: Command,
55
56 #[clap(long)]
58 pub(crate) commit: bool,
59
60 #[clap(long)]
65 pub(crate) expected_version: Option<String>,
66}
67
68#[derive(Debug, clap::Subcommand)]
70pub(crate) enum Command {
71 ForceCompaction(ForceCompactionArgs),
73 ForceGc(ForceGcArgs),
75 Finalize(FinalizeArgs),
77 RestoreBlob(RestoreBlobArgs),
80}
81
82#[derive(Debug, clap::Parser)]
84pub(crate) struct ForceCompactionArgs {
85 #[clap(flatten)]
86 state: StateArgs,
87
88 #[clap(long, default_value_t = 0)]
90 compaction_memory_bound_bytes: usize,
91}
92
93#[derive(Debug, clap::Parser)]
95pub(crate) struct ForceGcArgs {
96 #[clap(flatten)]
97 state: StateArgs,
98}
99
100#[derive(Debug, clap::Parser)]
102pub(crate) struct FinalizeArgs {
103 #[clap(flatten)]
104 state: StateArgs,
105
106 #[clap(long, default_value_t = false)]
108 force_downgrade_since: bool,
109
110 #[clap(long, default_value_t = false)]
112 force_downgrade_upper: bool,
113}
114
115#[derive(Debug, clap::Parser)]
117pub(crate) struct RestoreBlobArgs {
118 #[clap(flatten)]
119 state: StoreArgs,
120
121 #[clap(long, default_value_t = 16)]
123 concurrency: usize,
124}
125
126pub async fn run(command: AdminArgs) -> Result<(), anyhow::Error> {
128 match command.command {
129 Command::ForceCompaction(args) => {
130 let shard_id = ShardId::from_str(&args.state.shard_id).expect("invalid shard id");
131 let configs = all_dyncfgs(ConfigSet::default());
132 let cfg = PersistConfig::new(&BUILD_INFO, SYSTEM_TIME.clone(), configs);
134 cfg.set_config(
135 &COMPACTION_MEMORY_BOUND_BYTES,
136 args.compaction_memory_bound_bytes,
137 );
138
139 let metrics_registry = MetricsRegistry::new();
140 let expected_version = command
141 .expected_version
142 .as_ref()
143 .map(|v| Version::parse(v))
144 .transpose()?;
145 let () = force_compaction::<crate::cli::inspect::K, crate::cli::inspect::V, u64, i64>(
146 cfg,
147 &metrics_registry,
148 shard_id,
149 &args.state.consensus_uri,
150 &args.state.blob_uri,
151 Arc::new(TodoSchema::default()),
152 Arc::new(TodoSchema::default()),
153 command.commit,
154 expected_version,
155 )
156 .await?;
157 info_log_non_zero_metrics(&metrics_registry.gather());
158 }
159 Command::ForceGc(args) => {
160 let shard_id = ShardId::from_str(&args.state.shard_id).expect("invalid shard id");
161 let configs = all_dyncfgs(ConfigSet::default());
162 let cfg = PersistConfig::new(&BUILD_INFO, SYSTEM_TIME.clone(), configs);
164 let metrics_registry = MetricsRegistry::new();
165 let expected_version = command
166 .expected_version
167 .as_ref()
168 .map(|v| Version::parse(v))
169 .transpose()?;
170 let _machine = force_gc(
173 cfg,
174 &metrics_registry,
175 shard_id,
176 &args.state.consensus_uri,
177 &args.state.blob_uri,
178 command.commit,
179 expected_version,
180 )
181 .await?;
182 info_log_non_zero_metrics(&metrics_registry.gather());
183 }
184 Command::Finalize(args) => {
185 let FinalizeArgs {
186 state:
187 StateArgs {
188 shard_id,
189 consensus_uri,
190 blob_uri,
191 },
192 force_downgrade_since,
193 force_downgrade_upper,
194 } = args;
195 let shard_id = ShardId::from_str(&shard_id).expect("invalid shard id");
196 let commit = command.commit;
197 let expected_version = command
198 .expected_version
199 .as_ref()
200 .map(|v| Version::parse(v))
201 .transpose()?;
202
203 let configs = all_dyncfgs(ConfigSet::default());
204 let cfg = PersistConfig::new(&BUILD_INFO, SYSTEM_TIME.clone(), configs);
206 let metrics_registry = MetricsRegistry::new();
207 let metrics = Arc::new(Metrics::new(&cfg, &metrics_registry));
208 let consensus =
209 make_consensus(&cfg, &consensus_uri, commit, Arc::clone(&metrics)).await?;
210 let blob = make_blob(&cfg, &blob_uri, commit, Arc::clone(&metrics)).await?;
211
212 let machine = make_machine(
215 &cfg,
216 Arc::clone(&consensus),
217 Arc::clone(&blob),
218 Arc::clone(&metrics),
219 shard_id,
220 commit,
221 expected_version,
222 )
223 .await?;
224
225 if force_downgrade_upper {
226 let isolated_runtime = Arc::new(IsolatedRuntime::new(&metrics_registry, None));
227 let pubsub_sender: Arc<dyn PubSubSender> = Arc::new(NoopPubSubSender);
228 let shared_states = Arc::new(StateCache::new(
229 &cfg,
230 Arc::clone(&metrics),
231 Arc::clone(&pubsub_sender),
232 ));
233
234 let persist_client = PersistClient::new(
236 cfg,
237 blob,
238 consensus,
239 metrics,
240 isolated_runtime,
241 shared_states,
242 pubsub_sender,
243 )?;
244 let diagnostics = Diagnostics {
245 shard_name: shard_id.to_string(),
246 handle_purpose: "persist-cli finalize shard".to_string(),
247 };
248
249 let mut write_handle: WriteHandle<
250 crate::cli::inspect::K,
251 crate::cli::inspect::V,
252 u64,
253 i64,
254 > = persist_client
255 .open_writer(
256 shard_id,
257 Arc::new(TodoSchema::<crate::cli::inspect::K>::default()),
258 Arc::new(TodoSchema::<crate::cli::inspect::V>::default()),
259 diagnostics,
260 )
261 .await?;
262
263 if !write_handle.upper().is_empty() {
264 let empty_batch: Vec<(
265 (crate::cli::inspect::K, crate::cli::inspect::V),
266 u64,
267 i64,
268 )> = vec![];
269 let lower = write_handle.upper().clone();
270 let upper = Antichain::new();
271
272 let result = write_handle.append(empty_batch, lower, upper).await?;
273 if let Err(err) = result {
274 anyhow::bail!("failed to force downgrade upper, {err:?}");
275 }
276 }
277 }
278
279 if force_downgrade_since {
280 let (state, _maintenance) = machine
281 .register_critical_reader::<crate::cli::inspect::O>(
282 &crate::PersistClient::CONTROLLER_CRITICAL_SINCE,
283 "persist-cli finalize with force downgrade",
284 )
285 .await;
286
287 let expected_opaque = crate::cli::inspect::O::decode(state.opaque.0);
292 FAKE_OPAQUE_CODEC
293 .lock()
294 .expect("lockable")
295 .clone_from(&state.opaque_codec);
296
297 let (result, _maintenance) = machine
298 .compare_and_downgrade_since(
299 &crate::PersistClient::CONTROLLER_CRITICAL_SINCE,
300 &expected_opaque,
301 (&expected_opaque, &Antichain::new()),
302 )
303 .await;
304 if let Err((actual_opaque, _since)) = result {
305 bail!(
306 "opaque changed, expected: {expected_opaque:?}, actual: {actual_opaque:?}"
307 )
308 }
309 }
310
311 let maintenance = machine.become_tombstone().await?;
312 if !maintenance.is_empty() {
313 info!("ignoring non-empty requested maintenance: {maintenance:?}")
314 }
315 info_log_non_zero_metrics(&metrics_registry.gather());
316 }
317 Command::RestoreBlob(args) => {
318 let RestoreBlobArgs {
319 state:
320 StoreArgs {
321 consensus_uri,
322 blob_uri,
323 },
324 concurrency,
325 } = args;
326 let commit = command.commit;
327 let configs = all_dyncfgs(ConfigSet::default());
328 let cfg = PersistConfig::new(&BUILD_INFO, SYSTEM_TIME.clone(), configs);
330 let metrics_registry = MetricsRegistry::new();
331 let metrics = Arc::new(Metrics::new(&cfg, &metrics_registry));
332 let consensus =
333 make_consensus(&cfg, &consensus_uri, commit, Arc::clone(&metrics)).await?;
334 let blob = make_blob(&cfg, &blob_uri, commit, Arc::clone(&metrics)).await?;
335 let versions = StateVersions::new(
336 cfg.clone(),
337 Arc::clone(&consensus),
338 Arc::clone(&blob),
339 Arc::clone(&metrics),
340 );
341
342 let not_restored: Vec<_> = consensus
343 .list_keys()
344 .flat_map_unordered(concurrency, |shard| {
345 stream::once(Box::pin(async {
346 let shard_id = shard?;
347 let shard_id = ShardId::from_str(&shard_id).expect("invalid shard id");
348 let start = Instant::now();
349 info!("Restoring blob state for shard {shard_id}.",);
350 let shard_not_restored = crate::internal::restore::restore_blob(
351 &versions,
352 blob.as_ref(),
353 &cfg.build_version,
354 shard_id,
355 &*metrics,
356 )
357 .await?;
358 info!(
359 "Restored blob state for shard {shard_id}; {} errors, {:?} elapsed.",
360 shard_not_restored.len(),
361 start.elapsed()
362 );
363 Ok::<_, ExternalError>(shard_not_restored)
364 }))
365 })
366 .try_fold(vec![], |mut a, b| async move {
367 a.extend(b);
368 Ok(a)
369 })
370 .await?;
371
372 info_log_non_zero_metrics(&metrics_registry.gather());
373 if !not_restored.is_empty() {
374 bail!("referenced blobs were not restored: {not_restored:#?}")
375 }
376 }
377 }
378 Ok(())
379}
380
381pub(crate) fn info_log_non_zero_metrics(metric_families: &[MetricFamily]) {
382 for mf in metric_families {
383 for m in mf.get_metric() {
384 let val = match mf.get_field_type() {
385 MetricType::COUNTER => m.get_counter().get_value(),
386 MetricType::GAUGE => m.get_gauge().get_value(),
387 x => {
388 info!("unhandled {} metric type: {:?}", mf.get_name(), x);
389 continue;
390 }
391 };
392 if val == 0.0 {
393 continue;
394 }
395 let label_pairs = m.get_label();
396 let mut labels = String::new();
397 if !label_pairs.is_empty() {
398 labels.push_str("{");
399 for lb in label_pairs {
400 if labels != "{" {
401 labels.push_str(",");
402 }
403 labels.push_str(lb.get_name());
404 labels.push_str(":");
405 labels.push_str(lb.get_value());
406 }
407 labels.push_str("}");
408 }
409 info!("{}{} {}", mf.get_name(), labels, val);
410 }
411 }
412}
413
414pub async fn force_compaction<K, V, T, D>(
416 cfg: PersistConfig,
417 metrics_registry: &MetricsRegistry,
418 shard_id: ShardId,
419 consensus_uri: &SensitiveUrl,
420 blob_uri: &SensitiveUrl,
421 key_schema: Arc<K::Schema>,
422 val_schema: Arc<V::Schema>,
423 commit: bool,
424 expected_version: Option<Version>,
425) -> Result<(), anyhow::Error>
426where
427 K: Debug + Codec,
428 V: Debug + Codec,
429 T: Timestamp + Lattice + Codec64 + Sync,
430 D: Semigroup + Ord + Codec64 + Send + Sync,
431{
432 let metrics = Arc::new(Metrics::new(&cfg, metrics_registry));
433 let consensus = make_consensus(&cfg, consensus_uri, commit, Arc::clone(&metrics)).await?;
434 let blob = make_blob(&cfg, blob_uri, commit, Arc::clone(&metrics)).await?;
435
436 let machine = make_typed_machine::<K, V, T, D>(
437 &cfg,
438 consensus,
439 Arc::clone(&blob),
440 Arc::clone(&metrics),
441 shard_id,
442 commit,
443 expected_version,
444 )
445 .await?;
446
447 let writer_id = WriterId::new();
448
449 let mut attempt = 0;
450 'outer: loop {
451 machine.applier.fetch_and_update_state(None).await;
452 let reqs = machine.applier.all_fueled_merge_reqs();
453 info!("attempt {}: got {} compaction reqs", attempt, reqs.len());
454 for (idx, req) in reqs.clone().into_iter().enumerate() {
455 let req = CompactReq {
456 shard_id,
457 desc: req.desc,
458 inputs: req
459 .inputs
460 .into_iter()
461 .map(|b| Arc::unwrap_or_clone(b.batch))
462 .collect(),
463 };
464 let parts = req.inputs.iter().map(|x| x.part_count()).sum::<usize>();
465 let bytes = req
466 .inputs
467 .iter()
468 .map(|x| x.encoded_size_bytes())
469 .sum::<usize>();
470 let start = Instant::now();
471 info!(
472 "attempt {} req {}: compacting {} batches {} in parts {} totaling bytes: lower={:?} upper={:?} since={:?}",
473 attempt,
474 idx,
475 req.inputs.len(),
476 parts,
477 bytes,
478 req.desc.lower().elements(),
479 req.desc.upper().elements(),
480 req.desc.since().elements(),
481 );
482 if !commit {
483 info!("skipping compaction because --commit is not set");
484 continue;
485 }
486 let schemas = Schemas {
487 id: None,
488 key: Arc::clone(&key_schema),
489 val: Arc::clone(&val_schema),
490 };
491
492 let res = Compactor::<K, V, T, D>::compact(
493 CompactConfig::new(&cfg, shard_id),
494 Arc::clone(&blob),
495 Arc::clone(&metrics),
496 Arc::clone(&machine.applier.shard_metrics),
497 Arc::new(IsolatedRuntime::default()),
498 req,
499 schemas,
500 )
501 .await?;
502 metrics.compaction.admin_count.inc();
503 info!(
504 "attempt {} req {}: compacted into {} parts {} bytes in {:?}",
505 attempt,
506 idx,
507 res.output.part_count(),
508 res.output.encoded_size_bytes(),
509 start.elapsed(),
510 );
511 let (apply_res, maintenance) = machine
512 .merge_res(&FueledMergeRes { output: res.output })
513 .await;
514 if !maintenance.is_empty() {
515 info!("ignoring non-empty requested maintenance: {maintenance:?}")
516 }
517 if apply_res.applied() {
518 info!("attempt {} req {}: {:?}", attempt, idx, apply_res);
519 } else {
520 info!(
521 "attempt {} req {}: {:?} trying again",
522 attempt, idx, apply_res
523 );
524 attempt += 1;
525 continue 'outer;
526 }
527 }
528 info!("attempt {}: did {} compactions", attempt, reqs.len());
529 let _ = machine.expire_writer(&writer_id).await;
530 info!("expired writer {}", writer_id);
531 return Ok(());
532 }
533}
534
535async fn make_machine(
536 cfg: &PersistConfig,
537 consensus: Arc<dyn Consensus>,
538 blob: Arc<dyn Blob>,
539 metrics: Arc<Metrics>,
540 shard_id: ShardId,
541 commit: bool,
542 expected_version: Option<Version>,
543) -> anyhow::Result<Machine<crate::cli::inspect::K, crate::cli::inspect::V, u64, i64>> {
544 make_typed_machine::<crate::cli::inspect::K, crate::cli::inspect::V, u64, i64>(
545 cfg,
546 consensus,
547 blob,
548 metrics,
549 shard_id,
550 commit,
551 expected_version,
552 )
553 .await
554}
555
556async fn make_typed_machine<K, V, T, D>(
557 cfg: &PersistConfig,
558 consensus: Arc<dyn Consensus>,
559 blob: Arc<dyn Blob>,
560 metrics: Arc<Metrics>,
561 shard_id: ShardId,
562 commit: bool,
563 expected_version: Option<Version>,
564) -> anyhow::Result<Machine<K, V, T, D>>
565where
566 K: Debug + Codec,
567 V: Debug + Codec,
568 T: Timestamp + Lattice + Codec64 + Sync,
569 D: Semigroup + Codec64,
570{
571 let state_versions = Arc::new(StateVersions::new(
572 cfg.clone(),
573 consensus,
574 blob,
575 Arc::clone(&metrics),
576 ));
577
578 let versions = state_versions
580 .fetch_recent_live_diffs::<u64>(&shard_id)
581 .await;
582
583 loop {
584 let state_res = state_versions
585 .fetch_current_state::<u64>(&shard_id, versions.0.clone())
586 .await
587 .check_codecs::<crate::cli::inspect::K, crate::cli::inspect::V, i64>(&shard_id);
588 let state = match state_res {
589 Ok(state) => state,
590 Err(codec) => {
591 let mut kvtd = crate::cli::inspect::KVTD_CODECS.lock().expect("lockable");
592 *kvtd = codec.actual;
593 continue;
594 }
595 };
596 let safe_version_change = match (commit, expected_version) {
600 (false, _) => cfg.build_version >= state.applier_version,
602 (true, None) => cfg.build_version == state.applier_version,
604 (true, Some(expected)) => {
606 state.applier_version == expected && expected <= cfg.build_version
614 }
615 };
616 if !safe_version_change {
617 return Err(anyhow!(
619 "version of this tool {} does not match version of state {} when --commit is {commit}. bailing so we don't corrupt anything",
620 cfg.build_version,
621 state.applier_version
622 ));
623 }
624 break;
625 }
626
627 let machine = Machine::<K, V, T, D>::new(
628 cfg.clone(),
629 shard_id,
630 Arc::clone(&metrics),
631 state_versions,
632 Arc::new(StateCache::new(cfg, metrics, Arc::new(NoopPubSubSender))),
633 Arc::new(NoopPubSubSender),
634 Arc::new(IsolatedRuntime::default()),
635 Diagnostics::from_purpose("admin"),
636 )
637 .await?;
638
639 Ok(machine)
640}
641
642async fn force_gc(
643 cfg: PersistConfig,
644 metrics_registry: &MetricsRegistry,
645 shard_id: ShardId,
646 consensus_uri: &SensitiveUrl,
647 blob_uri: &SensitiveUrl,
648 commit: bool,
649 expected_version: Option<Version>,
650) -> anyhow::Result<Box<dyn Any>> {
651 let metrics = Arc::new(Metrics::new(&cfg, metrics_registry));
652 let consensus = make_consensus(&cfg, consensus_uri, commit, Arc::clone(&metrics)).await?;
653 let blob = make_blob(&cfg, blob_uri, commit, Arc::clone(&metrics)).await?;
654 let machine = make_machine(
655 &cfg,
656 consensus,
657 blob,
658 metrics,
659 shard_id,
660 commit,
661 expected_version,
662 )
663 .await?;
664 let gc_req = GcReq {
665 shard_id,
666 new_seqno_since: machine.applier.seqno_since(),
667 };
668 let (maintenance, _stats) = GarbageCollector::gc_and_truncate(&machine, gc_req).await;
669 if !maintenance.is_empty() {
670 info!("ignoring non-empty requested maintenance: {maintenance:?}")
671 }
672
673 Ok(Box::new(machine))
674}
675
676pub const CATALOG_FORCE_COMPACTION_FUEL: Config<usize> = Config::new(
678 "persist_catalog_force_compaction_fuel",
679 1024,
680 "fuel to use in catalog dangerous_force_compaction task",
681);
682
683pub const CATALOG_FORCE_COMPACTION_WAIT: Config<Duration> = Config::new(
685 "persist_catalog_force_compaction_wait",
686 Duration::from_secs(60),
687 "wait to use in catalog dangerous_force_compaction task",
688);
689
690pub const EXPRESSION_CACHE_FORCE_COMPACTION_FUEL: Config<usize> = Config::new(
692 "persist_expression_cache_force_compaction_fuel",
693 131_072,
694 "fuel to use in expression cache dangerous_force_compaction",
695);
696
697pub const EXPRESSION_CACHE_FORCE_COMPACTION_WAIT: Config<Duration> = Config::new(
699 "persist_expression_cache_force_compaction_wait",
700 Duration::from_secs(0),
701 "wait to use in expression cache dangerous_force_compaction",
702);
703
704pub async fn dangerous_force_compaction_and_break_pushdown<K, V, T, D>(
723 write: &WriteHandle<K, V, T, D>,
724 fuel: impl Fn() -> usize,
725 wait: impl Fn() -> Duration,
726) where
727 K: Debug + Codec,
728 V: Debug + Codec,
729 T: Timestamp + Lattice + Codec64 + Sync,
730 D: Semigroup + Ord + Codec64 + Send + Sync,
731{
732 let machine = write.machine.clone();
733
734 let mut last_exert: Instant;
735
736 loop {
737 last_exert = Instant::now();
738 let fuel = fuel();
739 let (reqs, mut maintenance) = machine.spine_exert(fuel).await;
740 for req in reqs {
741 info!(
742 "force_compaction {} {} compacting {} batches in {} parts totaling {} bytes: lower={:?} upper={:?} since={:?}",
743 machine.applier.shard_metrics.name,
744 machine.applier.shard_metrics.shard_id,
745 req.inputs.len(),
746 req.inputs.iter().flat_map(|x| &x.parts).count(),
747 req.inputs
748 .iter()
749 .flat_map(|x| &x.parts)
750 .map(|x| x.encoded_size_bytes())
751 .sum::<usize>(),
752 req.desc.lower().elements(),
753 req.desc.upper().elements(),
754 req.desc.since().elements(),
755 );
756 machine.applier.metrics.compaction.requested.inc();
757 let start = Instant::now();
758 let res = Compactor::<K, V, T, D>::compact_and_apply(
759 &machine,
760 req,
761 write.write_schemas.clone(),
762 )
763 .await;
764 let (res, apply_maintenance) = match res {
765 Ok(x) => x,
766 Err(err) => {
767 warn!(
768 "force_compaction {} {} errored in compaction: {:?}",
769 machine.applier.shard_metrics.name,
770 machine.applier.shard_metrics.shard_id,
771 err
772 );
773 continue;
774 }
775 };
776 machine.applier.metrics.compaction.admin_count.inc();
777 info!(
778 "force_compaction {} {} compacted in {:?}: {:?}",
779 machine.applier.shard_metrics.name,
780 machine.applier.shard_metrics.shard_id,
781 start.elapsed(),
782 res
783 );
784 maintenance.merge(apply_maintenance);
785 }
786 maintenance.perform(&machine, &write.gc).await;
787
788 let next_exert = last_exert + wait();
791 tokio::time::sleep_until(next_exert.into()).await;
792
793 let num_batches = machine.applier.all_batches().len();
796 if num_batches < 2 {
797 info!(
798 "force_compaction {} {} exiting with {} batches",
799 machine.applier.shard_metrics.name,
800 machine.applier.shard_metrics.shard_id,
801 num_batches
802 );
803 return;
804 }
805 }
806}
807
808#[cfg(test)]
809mod tests {
810 use std::time::Duration;
811
812 use mz_dyncfg::ConfigUpdates;
813 use mz_persist_types::ShardId;
814
815 use crate::tests::new_test_client;
816
817 #[mz_persist_proc::test(tokio::test)]
818 #[cfg_attr(miri, ignore)]
819 async fn dangerous_force_compaction_and_break_pushdown(dyncfgs: ConfigUpdates) {
820 let client = new_test_client(&dyncfgs).await;
821 for num_batches in 0..=17 {
822 let (mut write, _read) = client
823 .expect_open::<String, (), u64, i64>(ShardId::new())
824 .await;
825 let machine = write.machine.clone();
826
827 for idx in 0..num_batches {
828 let () = write
829 .expect_compare_and_append(&[((idx.to_string(), ()), idx, 1)], idx, idx + 1)
830 .await;
831 }
832
833 super::dangerous_force_compaction_and_break_pushdown(&write, || 1, || Duration::ZERO)
835 .await;
836 let batches_after = machine.applier.all_batches().len();
837 assert!(batches_after < 2, "{} vs {}", num_batches, batches_after);
838 }
839 }
840}