1use std::any::Any;
13use std::fmt::Debug;
14use std::str::FromStr;
15use std::sync::Arc;
16use std::time::{Duration, Instant};
17
18use anyhow::{anyhow, bail};
19use differential_dataflow::difference::Monoid;
20use differential_dataflow::lattice::Lattice;
21use futures_util::{StreamExt, TryStreamExt, stream};
22use mz_dyncfg::{Config, ConfigSet};
23use mz_ore::metrics::MetricsRegistry;
24use mz_ore::now::SYSTEM_TIME;
25use mz_ore::url::SensitiveUrl;
26use mz_persist::location::{Blob, Consensus, ExternalError};
27use mz_persist_types::codec_impls::TodoSchema;
28use mz_persist_types::{Codec, Codec64};
29use prometheus::proto::{MetricFamily, MetricType};
30use semver::Version;
31use timely::progress::{Antichain, Timestamp};
32use tracing::{info, warn};
33
34use crate::async_runtime::IsolatedRuntime;
35use crate::cache::StateCache;
36use crate::cfg::{COMPACTION_MEMORY_BOUND_BYTES, all_dyncfgs};
37use crate::cli::args::{StateArgs, StoreArgs, make_blob, make_consensus};
38use crate::critical::Opaque;
39use crate::internal::compact::{CompactConfig, CompactReq, Compactor};
40use crate::internal::encoding::Schemas;
41use crate::internal::gc::{GarbageCollector, GcReq};
42use crate::internal::machine::Machine;
43use crate::internal::trace::FueledMergeRes;
44use crate::rpc::{NoopPubSubSender, PubSubSender};
45use crate::write::{WriteHandle, WriterId};
46use crate::{
47 BUILD_INFO, Diagnostics, Metrics, PersistClient, PersistConfig, ShardId, StateVersions,
48};
49
50#[derive(Debug, clap::Args)]
52pub struct AdminArgs {
53 #[clap(subcommand)]
54 command: Command,
55
56 #[clap(long)]
58 pub(crate) commit: bool,
59
60 #[clap(long)]
65 pub(crate) expected_version: Option<String>,
66}
67
68#[derive(Debug, clap::Subcommand)]
70pub(crate) enum Command {
71 ForceCompaction(ForceCompactionArgs),
73 ForceGc(ForceGcArgs),
75 Finalize(FinalizeArgs),
77 RestoreBlob(RestoreBlobArgs),
80}
81
82#[derive(Debug, clap::Parser)]
84pub(crate) struct ForceCompactionArgs {
85 #[clap(flatten)]
86 state: StateArgs,
87
88 #[clap(long, default_value_t = 0)]
90 compaction_memory_bound_bytes: usize,
91}
92
93#[derive(Debug, clap::Parser)]
95pub(crate) struct ForceGcArgs {
96 #[clap(flatten)]
97 state: StateArgs,
98}
99
100#[derive(Debug, clap::Parser)]
102pub(crate) struct FinalizeArgs {
103 #[clap(flatten)]
104 state: StateArgs,
105
106 #[clap(long, default_value_t = false)]
108 force_downgrade_since: bool,
109
110 #[clap(long, default_value_t = false)]
112 force_downgrade_upper: bool,
113}
114
115#[derive(Debug, clap::Parser)]
117pub(crate) struct RestoreBlobArgs {
118 #[clap(flatten)]
119 state: StoreArgs,
120
121 #[clap(long, default_value_t = 16)]
123 concurrency: usize,
124}
125
126pub async fn run(command: AdminArgs) -> Result<(), anyhow::Error> {
128 match command.command {
129 Command::ForceCompaction(args) => {
130 let shard_id = ShardId::from_str(&args.state.shard_id).expect("invalid shard id");
131 let configs = all_dyncfgs(ConfigSet::default());
132 let cfg = PersistConfig::new(&BUILD_INFO, SYSTEM_TIME.clone(), configs);
134 cfg.set_config(
135 &COMPACTION_MEMORY_BOUND_BYTES,
136 args.compaction_memory_bound_bytes,
137 );
138
139 let metrics_registry = MetricsRegistry::new();
140 let expected_version = command
141 .expected_version
142 .as_ref()
143 .map(|v| Version::parse(v))
144 .transpose()?;
145 let () = force_compaction::<crate::cli::inspect::K, crate::cli::inspect::V, u64, i64>(
146 cfg,
147 &metrics_registry,
148 shard_id,
149 &args.state.consensus_uri,
150 &args.state.blob_uri,
151 Arc::new(TodoSchema::default()),
152 Arc::new(TodoSchema::default()),
153 command.commit,
154 expected_version,
155 )
156 .await?;
157 info_log_non_zero_metrics(&metrics_registry.gather());
158 }
159 Command::ForceGc(args) => {
160 let shard_id = ShardId::from_str(&args.state.shard_id).expect("invalid shard id");
161 let configs = all_dyncfgs(ConfigSet::default());
162 let cfg = PersistConfig::new(&BUILD_INFO, SYSTEM_TIME.clone(), configs);
164 let metrics_registry = MetricsRegistry::new();
165 let expected_version = command
166 .expected_version
167 .as_ref()
168 .map(|v| Version::parse(v))
169 .transpose()?;
170 let _machine = force_gc(
173 cfg,
174 &metrics_registry,
175 shard_id,
176 &args.state.consensus_uri,
177 &args.state.blob_uri,
178 command.commit,
179 expected_version,
180 )
181 .await?;
182 info_log_non_zero_metrics(&metrics_registry.gather());
183 }
184 Command::Finalize(args) => {
185 let FinalizeArgs {
186 state:
187 StateArgs {
188 shard_id,
189 consensus_uri,
190 blob_uri,
191 },
192 force_downgrade_since,
193 force_downgrade_upper,
194 } = args;
195 let shard_id = ShardId::from_str(&shard_id).expect("invalid shard id");
196 let commit = command.commit;
197 let expected_version = command
198 .expected_version
199 .as_ref()
200 .map(|v| Version::parse(v))
201 .transpose()?;
202
203 let configs = all_dyncfgs(ConfigSet::default());
204 let cfg = PersistConfig::new(&BUILD_INFO, SYSTEM_TIME.clone(), configs);
206 let metrics_registry = MetricsRegistry::new();
207 let metrics = Arc::new(Metrics::new(&cfg, &metrics_registry));
208 let consensus =
209 make_consensus(&cfg, &consensus_uri, commit, Arc::clone(&metrics)).await?;
210 let blob = make_blob(&cfg, &blob_uri, commit, Arc::clone(&metrics)).await?;
211
212 let machine = make_machine(
215 &cfg,
216 Arc::clone(&consensus),
217 Arc::clone(&blob),
218 Arc::clone(&metrics),
219 shard_id,
220 commit,
221 expected_version,
222 )
223 .await?;
224
225 if force_downgrade_upper {
226 let isolated_runtime = Arc::new(IsolatedRuntime::new(&metrics_registry, None));
227 let pubsub_sender: Arc<dyn PubSubSender> = Arc::new(NoopPubSubSender);
228 let shared_states = Arc::new(StateCache::new(
229 &cfg,
230 Arc::clone(&metrics),
231 Arc::clone(&pubsub_sender),
232 ));
233
234 let persist_client = PersistClient::new(
235 cfg,
236 blob,
237 consensus,
238 metrics,
239 isolated_runtime,
240 shared_states,
241 pubsub_sender,
242 )?;
243 let diagnostics = Diagnostics {
244 shard_name: shard_id.to_string(),
245 handle_purpose: "persist-cli finalize shard".to_string(),
246 };
247
248 let mut write_handle: WriteHandle<
249 crate::cli::inspect::K,
250 crate::cli::inspect::V,
251 u64,
252 i64,
253 > = persist_client
254 .open_writer(
255 shard_id,
256 Arc::new(TodoSchema::<crate::cli::inspect::K>::default()),
257 Arc::new(TodoSchema::<crate::cli::inspect::V>::default()),
258 diagnostics,
259 )
260 .await?;
261 write_handle.advance_upper(&Antichain::new()).await;
262 }
263
264 if force_downgrade_since {
265 let (state, _maintenance) = machine
266 .register_critical_reader(
267 &crate::PersistClient::CONTROLLER_CRITICAL_SINCE,
268 Opaque::encode(&crate::cli::inspect::O::default()),
269 "persist-cli finalize with force downgrade",
270 )
271 .await;
272
273 let expected_opaque = state.opaque;
274
275 let (result, _maintenance) = machine
276 .compare_and_downgrade_since(
277 &crate::PersistClient::CONTROLLER_CRITICAL_SINCE,
278 &expected_opaque,
279 (&expected_opaque, &Antichain::new()),
280 )
281 .await;
282 if let Err((actual_opaque, _since)) = result {
283 bail!(
284 "opaque changed, expected: {expected_opaque:?}, actual: {actual_opaque:?}"
285 )
286 }
287 }
288
289 let maintenance = machine.become_tombstone().await?;
290 if !maintenance.is_empty() {
291 info!("ignoring non-empty requested maintenance: {maintenance:?}")
292 }
293 info_log_non_zero_metrics(&metrics_registry.gather());
294 }
295 Command::RestoreBlob(args) => {
296 let RestoreBlobArgs {
297 state:
298 StoreArgs {
299 consensus_uri,
300 blob_uri,
301 },
302 concurrency,
303 } = args;
304 let commit = command.commit;
305 let configs = all_dyncfgs(ConfigSet::default());
306 let cfg = PersistConfig::new(&BUILD_INFO, SYSTEM_TIME.clone(), configs);
308 let metrics_registry = MetricsRegistry::new();
309 let metrics = Arc::new(Metrics::new(&cfg, &metrics_registry));
310 let consensus =
311 make_consensus(&cfg, &consensus_uri, commit, Arc::clone(&metrics)).await?;
312 let blob = make_blob(&cfg, &blob_uri, commit, Arc::clone(&metrics)).await?;
313 let versions = StateVersions::new(
314 cfg.clone(),
315 Arc::clone(&consensus),
316 Arc::clone(&blob),
317 Arc::clone(&metrics),
318 );
319
320 let not_restored: Vec<_> = consensus
321 .list_keys()
322 .flat_map_unordered(concurrency, |shard| {
323 stream::once(Box::pin(async {
324 let shard_id = shard?;
325 let shard_id = ShardId::from_str(&shard_id).expect("invalid shard id");
326 let start = Instant::now();
327 info!("Restoring blob state for shard {shard_id}.",);
328 let shard_not_restored = crate::internal::restore::restore_blob(
329 &versions,
330 blob.as_ref(),
331 &cfg.build_version,
332 shard_id,
333 &*metrics,
334 )
335 .await?;
336 info!(
337 "Restored blob state for shard {shard_id}; {} errors, {:?} elapsed.",
338 shard_not_restored.len(),
339 start.elapsed()
340 );
341 Ok::<_, ExternalError>(shard_not_restored)
342 }))
343 })
344 .try_fold(vec![], |mut a, b| async move {
345 a.extend(b);
346 Ok(a)
347 })
348 .await?;
349
350 info_log_non_zero_metrics(&metrics_registry.gather());
351 if !not_restored.is_empty() {
352 bail!("referenced blobs were not restored: {not_restored:#?}")
353 }
354 }
355 }
356 Ok(())
357}
358
359pub(crate) fn info_log_non_zero_metrics(metric_families: &[MetricFamily]) {
360 for mf in metric_families {
361 for m in mf.get_metric() {
362 let val = match mf.get_field_type() {
363 MetricType::COUNTER => m.get_counter().get_value(),
364 MetricType::GAUGE => m.get_gauge().get_value(),
365 x => {
366 info!("unhandled {} metric type: {:?}", mf.name(), x);
367 continue;
368 }
369 };
370 if val == 0.0 {
371 continue;
372 }
373 let label_pairs = m.get_label();
374 let mut labels = String::new();
375 if !label_pairs.is_empty() {
376 labels.push_str("{");
377 for lb in label_pairs {
378 if labels != "{" {
379 labels.push_str(",");
380 }
381 labels.push_str(lb.name());
382 labels.push_str(":");
383 labels.push_str(lb.name());
384 }
385 labels.push_str("}");
386 }
387 info!("{}{} {}", mf.name(), labels, val);
388 }
389 }
390}
391
392pub async fn force_compaction<K, V, T, D>(
394 cfg: PersistConfig,
395 metrics_registry: &MetricsRegistry,
396 shard_id: ShardId,
397 consensus_uri: &SensitiveUrl,
398 blob_uri: &SensitiveUrl,
399 key_schema: Arc<K::Schema>,
400 val_schema: Arc<V::Schema>,
401 commit: bool,
402 expected_version: Option<Version>,
403) -> Result<(), anyhow::Error>
404where
405 K: Debug + Codec,
406 V: Debug + Codec,
407 T: Timestamp + Lattice + Codec64 + Sync,
408 D: Monoid + Ord + Codec64 + Send + Sync,
409{
410 let metrics = Arc::new(Metrics::new(&cfg, metrics_registry));
411 let consensus = make_consensus(&cfg, consensus_uri, commit, Arc::clone(&metrics)).await?;
412 let blob = make_blob(&cfg, blob_uri, commit, Arc::clone(&metrics)).await?;
413
414 let machine = make_typed_machine::<K, V, T, D>(
415 &cfg,
416 consensus,
417 Arc::clone(&blob),
418 Arc::clone(&metrics),
419 shard_id,
420 commit,
421 expected_version,
422 )
423 .await?;
424
425 let writer_id = WriterId::new();
426
427 let mut attempt = 0;
428 'outer: loop {
429 machine.applier.fetch_and_update_state(None).await;
430 let reqs = machine.applier.all_fueled_merge_reqs();
431 info!("attempt {}: got {} compaction reqs", attempt, reqs.len());
432 for (idx, req) in reqs.clone().into_iter().enumerate() {
433 let req = CompactReq {
434 shard_id,
435 desc: req.desc,
436 inputs: req.inputs,
437 };
438 let parts = req
439 .inputs
440 .iter()
441 .map(|x| x.batch.part_count())
442 .sum::<usize>();
443 let bytes = req
444 .inputs
445 .iter()
446 .map(|x| x.batch.encoded_size_bytes())
447 .sum::<usize>();
448 let start = Instant::now();
449 info!(
450 "attempt {} req {}: compacting {} batches {} in parts {} totaling bytes: lower={:?} upper={:?} since={:?}",
451 attempt,
452 idx,
453 req.inputs.len(),
454 parts,
455 bytes,
456 req.desc.lower().elements(),
457 req.desc.upper().elements(),
458 req.desc.since().elements(),
459 );
460 if !commit {
461 info!("skipping compaction because --commit is not set");
462 continue;
463 }
464 let schemas = Schemas {
465 id: None,
466 key: Arc::clone(&key_schema),
467 val: Arc::clone(&val_schema),
468 };
469
470 let res = Compactor::<K, V, T, D>::compact(
471 CompactConfig::new(&cfg, shard_id),
472 Arc::clone(&blob),
473 Arc::clone(&metrics),
474 Arc::clone(&machine.applier.shard_metrics),
475 Arc::new(IsolatedRuntime::new(
476 metrics_registry,
477 Some(cfg.isolated_runtime_worker_threads),
478 )),
479 req,
480 schemas,
481 )
482 .await?;
483 metrics.compaction.admin_count.inc();
484 info!(
485 "attempt {} req {}: compacted into {} parts {} bytes in {:?}",
486 attempt,
487 idx,
488 res.output.part_count(),
489 res.output.encoded_size_bytes(),
490 start.elapsed(),
491 );
492 let (apply_res, maintenance) = machine
493 .merge_res(&FueledMergeRes {
494 output: res.output,
495 input: res.input,
496 new_active_compaction: None,
497 })
498 .await;
499 if !maintenance.is_empty() {
500 info!("ignoring non-empty requested maintenance: {maintenance:?}")
501 }
502 if apply_res.applied() {
503 info!("attempt {} req {}: {:?}", attempt, idx, apply_res);
504 } else {
505 info!(
506 "attempt {} req {}: {:?} trying again",
507 attempt, idx, apply_res
508 );
509 attempt += 1;
510 continue 'outer;
511 }
512 }
513 info!("attempt {}: did {} compactions", attempt, reqs.len());
514 let _ = machine.expire_writer(&writer_id).await;
515 info!("expired writer {}", writer_id);
516 return Ok(());
517 }
518}
519
520async fn make_machine(
521 cfg: &PersistConfig,
522 consensus: Arc<dyn Consensus>,
523 blob: Arc<dyn Blob>,
524 metrics: Arc<Metrics>,
525 shard_id: ShardId,
526 commit: bool,
527 expected_version: Option<Version>,
528) -> anyhow::Result<Machine<crate::cli::inspect::K, crate::cli::inspect::V, u64, i64>> {
529 make_typed_machine::<crate::cli::inspect::K, crate::cli::inspect::V, u64, i64>(
530 cfg,
531 consensus,
532 blob,
533 metrics,
534 shard_id,
535 commit,
536 expected_version,
537 )
538 .await
539}
540
541async fn make_typed_machine<K, V, T, D>(
542 cfg: &PersistConfig,
543 consensus: Arc<dyn Consensus>,
544 blob: Arc<dyn Blob>,
545 metrics: Arc<Metrics>,
546 shard_id: ShardId,
547 commit: bool,
548 expected_version: Option<Version>,
549) -> anyhow::Result<Machine<K, V, T, D>>
550where
551 K: Debug + Codec,
552 V: Debug + Codec,
553 T: Timestamp + Lattice + Codec64 + Sync,
554 D: Monoid + Codec64,
555{
556 let state_versions = Arc::new(StateVersions::new(
557 cfg.clone(),
558 consensus,
559 blob,
560 Arc::clone(&metrics),
561 ));
562
563 let versions = state_versions
565 .fetch_recent_live_diffs::<u64>(&shard_id)
566 .await;
567
568 loop {
569 let state_res = state_versions
570 .fetch_current_state::<u64>(&shard_id, versions.0.clone())
571 .await
572 .check_codecs::<crate::cli::inspect::K, crate::cli::inspect::V, i64>(&shard_id);
573 let state = match state_res {
574 Ok(state) => state,
575 Err(codec) => {
576 let mut kvtd = crate::cli::inspect::KVTD_CODECS.lock().expect("lockable");
577 *kvtd = codec.actual;
578 continue;
579 }
580 };
581 let safe_version_change = match (commit, expected_version) {
585 (false, _) => cfg.build_version >= state.collections.version,
587 (true, None) => cfg.build_version == state.collections.version,
589 (true, Some(expected)) => {
591 state.collections.version == expected && expected <= cfg.build_version
599 }
600 };
601 if !safe_version_change {
602 return Err(anyhow!(
604 "version of this tool {} does not match version of state {} when --commit is {commit}. bailing so we don't corrupt anything",
605 cfg.build_version,
606 state.collections.version
607 ));
608 }
609 break;
610 }
611
612 let machine = Machine::<K, V, T, D>::new(
613 cfg.clone(),
614 shard_id,
615 Arc::clone(&metrics),
616 state_versions,
617 Arc::new(StateCache::new(cfg, metrics, Arc::new(NoopPubSubSender))),
618 Arc::new(NoopPubSubSender),
619 Arc::new(IsolatedRuntime::new(
620 &MetricsRegistry::new(),
621 Some(cfg.isolated_runtime_worker_threads),
622 )),
623 Diagnostics::from_purpose("admin"),
624 )
625 .await?;
626
627 Ok(machine)
628}
629
630async fn force_gc(
631 cfg: PersistConfig,
632 metrics_registry: &MetricsRegistry,
633 shard_id: ShardId,
634 consensus_uri: &SensitiveUrl,
635 blob_uri: &SensitiveUrl,
636 commit: bool,
637 expected_version: Option<Version>,
638) -> anyhow::Result<Box<dyn Any>> {
639 let metrics = Arc::new(Metrics::new(&cfg, metrics_registry));
640 let consensus = make_consensus(&cfg, consensus_uri, commit, Arc::clone(&metrics)).await?;
641 let blob = make_blob(&cfg, blob_uri, commit, Arc::clone(&metrics)).await?;
642 let machine = make_machine(
643 &cfg,
644 consensus,
645 blob,
646 metrics,
647 shard_id,
648 commit,
649 expected_version,
650 )
651 .await?;
652 let gc_req = GcReq {
653 shard_id,
654 new_seqno_since: machine.applier.seqno_since(),
655 };
656 let (maintenance, _stats) = GarbageCollector::gc_and_truncate(&machine, gc_req).await;
657 if !maintenance.is_empty() {
658 info!("ignoring non-empty requested maintenance: {maintenance:?}")
659 }
660
661 Ok(Box::new(machine))
662}
663
664pub const CATALOG_FORCE_COMPACTION_FUEL: Config<usize> = Config::new(
666 "persist_catalog_force_compaction_fuel",
667 1024,
668 "fuel to use in catalog dangerous_force_compaction task",
669);
670
671pub const CATALOG_FORCE_COMPACTION_WAIT: Config<Duration> = Config::new(
673 "persist_catalog_force_compaction_wait",
674 Duration::from_secs(60),
675 "wait to use in catalog dangerous_force_compaction task",
676);
677
678pub const EXPRESSION_CACHE_FORCE_COMPACTION_FUEL: Config<usize> = Config::new(
680 "persist_expression_cache_force_compaction_fuel",
681 131_072,
682 "fuel to use in expression cache dangerous_force_compaction",
683);
684
685pub const EXPRESSION_CACHE_FORCE_COMPACTION_WAIT: Config<Duration> = Config::new(
687 "persist_expression_cache_force_compaction_wait",
688 Duration::from_secs(0),
689 "wait to use in expression cache dangerous_force_compaction",
690);
691
692pub async fn dangerous_force_compaction_and_break_pushdown<K, V, T, D>(
711 write: &WriteHandle<K, V, T, D>,
712 fuel: impl Fn() -> usize,
713 wait: impl Fn() -> Duration,
714) where
715 K: Debug + Codec,
716 V: Debug + Codec,
717 T: Timestamp + Lattice + Codec64 + Sync,
718 D: Monoid + Ord + Codec64 + Send + Sync,
719{
720 let machine = write.machine.clone();
721
722 let mut last_exert: Instant;
723
724 loop {
725 last_exert = Instant::now();
726 let fuel = fuel();
727 let (reqs, mut maintenance) = machine.spine_exert(fuel).await;
728 for req in reqs {
729 info!(
730 "force_compaction {} {} compacting {} batches in {} parts with {} runs totaling {} bytes: lower={:?} upper={:?} since={:?}",
731 machine.applier.shard_metrics.name,
732 machine.applier.shard_metrics.shard_id,
733 req.inputs.len(),
734 req.inputs.iter().flat_map(|x| &x.batch.parts).count(),
735 req.inputs
736 .iter()
737 .map(|x| x.batch.runs().count())
738 .sum::<usize>(),
739 req.inputs
740 .iter()
741 .flat_map(|x| &x.batch.parts)
742 .map(|x| x.encoded_size_bytes())
743 .sum::<usize>(),
744 req.desc.lower().elements(),
745 req.desc.upper().elements(),
746 req.desc.since().elements(),
747 );
748 machine.applier.metrics.compaction.requested.inc();
749 let start = Instant::now();
750 let res = Compactor::<K, V, T, D>::compact_and_apply(&machine, req).await;
751 let apply_maintenance = match res {
752 Ok(x) => x,
753 Err(err) => {
754 warn!(
755 "force_compaction {} {} errored in compaction: {:?}",
756 machine.applier.shard_metrics.name,
757 machine.applier.shard_metrics.shard_id,
758 err
759 );
760 continue;
761 }
762 };
763 machine.applier.metrics.compaction.admin_count.inc();
764 info!(
765 "force_compaction {} {} compacted in {:?}",
766 machine.applier.shard_metrics.name,
767 machine.applier.shard_metrics.shard_id,
768 start.elapsed(),
769 );
770 maintenance.merge(apply_maintenance);
771 }
772 maintenance.perform(&machine, &write.gc).await;
773
774 let next_exert = last_exert + wait();
777 tokio::time::sleep_until(next_exert.into()).await;
778
779 let num_runs: usize = machine
782 .applier
783 .all_batches()
784 .iter()
785 .map(|x| x.runs().count())
786 .sum();
787 if num_runs <= 1 {
788 info!(
789 "force_compaction {} {} exiting with {} runs",
790 machine.applier.shard_metrics.name,
791 machine.applier.shard_metrics.shard_id,
792 num_runs
793 );
794 return;
795 }
796 }
797}
798
799#[cfg(test)]
800mod tests {
801 use std::time::Duration;
802
803 use mz_dyncfg::ConfigUpdates;
804 use mz_persist_types::ShardId;
805
806 use crate::tests::new_test_client;
807
808 #[mz_persist_proc::test(tokio::test)]
809 #[cfg_attr(miri, ignore)]
810 async fn dangerous_force_compaction_and_break_pushdown(dyncfgs: ConfigUpdates) {
811 let client = new_test_client(&dyncfgs).await;
812 for num_batches in 0..=17 {
813 let (mut write, _read) = client
814 .expect_open::<String, (), u64, i64>(ShardId::new())
815 .await;
816 let machine = write.machine.clone();
817
818 for idx in 0..num_batches {
819 let () = write
820 .expect_compare_and_append(&[((idx.to_string(), ()), idx, 1)], idx, idx + 1)
821 .await;
822 }
823
824 super::dangerous_force_compaction_and_break_pushdown(&write, || 1, || Duration::ZERO)
826 .await;
827 let batches_after = machine.applier.all_batches().len();
828 assert!(batches_after < 2, "{} vs {}", num_batches, batches_after);
829 }
830 }
831}