1use std::any::Any;
13use std::fmt::Debug;
14use std::str::FromStr;
15use std::sync::Arc;
16use std::time::{Duration, Instant};
17
18use anyhow::{anyhow, bail};
19use differential_dataflow::difference::Monoid;
20use differential_dataflow::lattice::Lattice;
21use futures_util::{StreamExt, TryStreamExt, stream};
22use mz_dyncfg::{Config, ConfigSet};
23use mz_ore::metrics::MetricsRegistry;
24use mz_ore::now::SYSTEM_TIME;
25use mz_ore::url::SensitiveUrl;
26use mz_persist::location::{Blob, Consensus, ExternalError};
27use mz_persist_types::codec_impls::TodoSchema;
28use mz_persist_types::{Codec, Codec64};
29use prometheus::proto::{MetricFamily, MetricType};
30use semver::Version;
31use timely::progress::{Antichain, Timestamp};
32use tracing::{info, warn};
33
34use crate::async_runtime::IsolatedRuntime;
35use crate::cache::StateCache;
36use crate::cfg::{COMPACTION_MEMORY_BOUND_BYTES, all_dyncfgs};
37use crate::cli::args::{StateArgs, StoreArgs, make_blob, make_consensus};
38use crate::cli::inspect::FAKE_OPAQUE_CODEC;
39use crate::internal::compact::{CompactConfig, CompactReq, Compactor};
40use crate::internal::encoding::Schemas;
41use crate::internal::gc::{GarbageCollector, GcReq};
42use crate::internal::machine::Machine;
43use crate::internal::trace::FueledMergeRes;
44use crate::rpc::{NoopPubSubSender, PubSubSender};
45use crate::write::{WriteHandle, WriterId};
46use crate::{
47 BUILD_INFO, Diagnostics, Metrics, PersistClient, PersistConfig, ShardId, StateVersions,
48};
49
50#[derive(Debug, clap::Args)]
52pub struct AdminArgs {
53 #[clap(subcommand)]
54 command: Command,
55
56 #[clap(long)]
58 pub(crate) commit: bool,
59
60 #[clap(long)]
65 pub(crate) expected_version: Option<String>,
66}
67
68#[derive(Debug, clap::Subcommand)]
70pub(crate) enum Command {
71 ForceCompaction(ForceCompactionArgs),
73 ForceGc(ForceGcArgs),
75 Finalize(FinalizeArgs),
77 RestoreBlob(RestoreBlobArgs),
80}
81
82#[derive(Debug, clap::Parser)]
84pub(crate) struct ForceCompactionArgs {
85 #[clap(flatten)]
86 state: StateArgs,
87
88 #[clap(long, default_value_t = 0)]
90 compaction_memory_bound_bytes: usize,
91}
92
93#[derive(Debug, clap::Parser)]
95pub(crate) struct ForceGcArgs {
96 #[clap(flatten)]
97 state: StateArgs,
98}
99
100#[derive(Debug, clap::Parser)]
102pub(crate) struct FinalizeArgs {
103 #[clap(flatten)]
104 state: StateArgs,
105
106 #[clap(long, default_value_t = false)]
108 force_downgrade_since: bool,
109
110 #[clap(long, default_value_t = false)]
112 force_downgrade_upper: bool,
113}
114
115#[derive(Debug, clap::Parser)]
117pub(crate) struct RestoreBlobArgs {
118 #[clap(flatten)]
119 state: StoreArgs,
120
121 #[clap(long, default_value_t = 16)]
123 concurrency: usize,
124}
125
126pub async fn run(command: AdminArgs) -> Result<(), anyhow::Error> {
128 match command.command {
129 Command::ForceCompaction(args) => {
130 let shard_id = ShardId::from_str(&args.state.shard_id).expect("invalid shard id");
131 let configs = all_dyncfgs(ConfigSet::default());
132 let cfg = PersistConfig::new(&BUILD_INFO, SYSTEM_TIME.clone(), configs);
134 cfg.set_config(
135 &COMPACTION_MEMORY_BOUND_BYTES,
136 args.compaction_memory_bound_bytes,
137 );
138
139 let metrics_registry = MetricsRegistry::new();
140 let expected_version = command
141 .expected_version
142 .as_ref()
143 .map(|v| Version::parse(v))
144 .transpose()?;
145 let () = force_compaction::<crate::cli::inspect::K, crate::cli::inspect::V, u64, i64>(
146 cfg,
147 &metrics_registry,
148 shard_id,
149 &args.state.consensus_uri,
150 &args.state.blob_uri,
151 Arc::new(TodoSchema::default()),
152 Arc::new(TodoSchema::default()),
153 command.commit,
154 expected_version,
155 )
156 .await?;
157 info_log_non_zero_metrics(&metrics_registry.gather());
158 }
159 Command::ForceGc(args) => {
160 let shard_id = ShardId::from_str(&args.state.shard_id).expect("invalid shard id");
161 let configs = all_dyncfgs(ConfigSet::default());
162 let cfg = PersistConfig::new(&BUILD_INFO, SYSTEM_TIME.clone(), configs);
164 let metrics_registry = MetricsRegistry::new();
165 let expected_version = command
166 .expected_version
167 .as_ref()
168 .map(|v| Version::parse(v))
169 .transpose()?;
170 let _machine = force_gc(
173 cfg,
174 &metrics_registry,
175 shard_id,
176 &args.state.consensus_uri,
177 &args.state.blob_uri,
178 command.commit,
179 expected_version,
180 )
181 .await?;
182 info_log_non_zero_metrics(&metrics_registry.gather());
183 }
184 Command::Finalize(args) => {
185 let FinalizeArgs {
186 state:
187 StateArgs {
188 shard_id,
189 consensus_uri,
190 blob_uri,
191 },
192 force_downgrade_since,
193 force_downgrade_upper,
194 } = args;
195 let shard_id = ShardId::from_str(&shard_id).expect("invalid shard id");
196 let commit = command.commit;
197 let expected_version = command
198 .expected_version
199 .as_ref()
200 .map(|v| Version::parse(v))
201 .transpose()?;
202
203 let configs = all_dyncfgs(ConfigSet::default());
204 let cfg = PersistConfig::new(&BUILD_INFO, SYSTEM_TIME.clone(), configs);
206 let metrics_registry = MetricsRegistry::new();
207 let metrics = Arc::new(Metrics::new(&cfg, &metrics_registry));
208 let consensus =
209 make_consensus(&cfg, &consensus_uri, commit, Arc::clone(&metrics)).await?;
210 let blob = make_blob(&cfg, &blob_uri, commit, Arc::clone(&metrics)).await?;
211
212 let machine = make_machine(
215 &cfg,
216 Arc::clone(&consensus),
217 Arc::clone(&blob),
218 Arc::clone(&metrics),
219 shard_id,
220 commit,
221 expected_version,
222 )
223 .await?;
224
225 if force_downgrade_upper {
226 let isolated_runtime = Arc::new(IsolatedRuntime::new(&metrics_registry, None));
227 let pubsub_sender: Arc<dyn PubSubSender> = Arc::new(NoopPubSubSender);
228 let shared_states = Arc::new(StateCache::new(
229 &cfg,
230 Arc::clone(&metrics),
231 Arc::clone(&pubsub_sender),
232 ));
233
234 let persist_client = PersistClient::new(
235 cfg,
236 blob,
237 consensus,
238 metrics,
239 isolated_runtime,
240 shared_states,
241 pubsub_sender,
242 )?;
243 let diagnostics = Diagnostics {
244 shard_name: shard_id.to_string(),
245 handle_purpose: "persist-cli finalize shard".to_string(),
246 };
247
248 let mut write_handle: WriteHandle<
249 crate::cli::inspect::K,
250 crate::cli::inspect::V,
251 u64,
252 i64,
253 > = persist_client
254 .open_writer(
255 shard_id,
256 Arc::new(TodoSchema::<crate::cli::inspect::K>::default()),
257 Arc::new(TodoSchema::<crate::cli::inspect::V>::default()),
258 diagnostics,
259 )
260 .await?;
261 write_handle.advance_upper(&Antichain::new()).await;
262 }
263
264 if force_downgrade_since {
265 let (state, _maintenance) = machine
266 .register_critical_reader::<crate::cli::inspect::O>(
267 &crate::PersistClient::CONTROLLER_CRITICAL_SINCE,
268 "persist-cli finalize with force downgrade",
269 )
270 .await;
271
272 let expected_opaque = crate::cli::inspect::O::decode(state.opaque.0);
277 FAKE_OPAQUE_CODEC
278 .lock()
279 .expect("lockable")
280 .clone_from(&state.opaque_codec);
281
282 let (result, _maintenance) = machine
283 .compare_and_downgrade_since(
284 &crate::PersistClient::CONTROLLER_CRITICAL_SINCE,
285 &expected_opaque,
286 (&expected_opaque, &Antichain::new()),
287 )
288 .await;
289 if let Err((actual_opaque, _since)) = result {
290 bail!(
291 "opaque changed, expected: {expected_opaque:?}, actual: {actual_opaque:?}"
292 )
293 }
294 }
295
296 let maintenance = machine.become_tombstone().await?;
297 if !maintenance.is_empty() {
298 info!("ignoring non-empty requested maintenance: {maintenance:?}")
299 }
300 info_log_non_zero_metrics(&metrics_registry.gather());
301 }
302 Command::RestoreBlob(args) => {
303 let RestoreBlobArgs {
304 state:
305 StoreArgs {
306 consensus_uri,
307 blob_uri,
308 },
309 concurrency,
310 } = args;
311 let commit = command.commit;
312 let configs = all_dyncfgs(ConfigSet::default());
313 let cfg = PersistConfig::new(&BUILD_INFO, SYSTEM_TIME.clone(), configs);
315 let metrics_registry = MetricsRegistry::new();
316 let metrics = Arc::new(Metrics::new(&cfg, &metrics_registry));
317 let consensus =
318 make_consensus(&cfg, &consensus_uri, commit, Arc::clone(&metrics)).await?;
319 let blob = make_blob(&cfg, &blob_uri, commit, Arc::clone(&metrics)).await?;
320 let versions = StateVersions::new(
321 cfg.clone(),
322 Arc::clone(&consensus),
323 Arc::clone(&blob),
324 Arc::clone(&metrics),
325 );
326
327 let not_restored: Vec<_> = consensus
328 .list_keys()
329 .flat_map_unordered(concurrency, |shard| {
330 stream::once(Box::pin(async {
331 let shard_id = shard?;
332 let shard_id = ShardId::from_str(&shard_id).expect("invalid shard id");
333 let start = Instant::now();
334 info!("Restoring blob state for shard {shard_id}.",);
335 let shard_not_restored = crate::internal::restore::restore_blob(
336 &versions,
337 blob.as_ref(),
338 &cfg.build_version,
339 shard_id,
340 &*metrics,
341 )
342 .await?;
343 info!(
344 "Restored blob state for shard {shard_id}; {} errors, {:?} elapsed.",
345 shard_not_restored.len(),
346 start.elapsed()
347 );
348 Ok::<_, ExternalError>(shard_not_restored)
349 }))
350 })
351 .try_fold(vec![], |mut a, b| async move {
352 a.extend(b);
353 Ok(a)
354 })
355 .await?;
356
357 info_log_non_zero_metrics(&metrics_registry.gather());
358 if !not_restored.is_empty() {
359 bail!("referenced blobs were not restored: {not_restored:#?}")
360 }
361 }
362 }
363 Ok(())
364}
365
366pub(crate) fn info_log_non_zero_metrics(metric_families: &[MetricFamily]) {
367 for mf in metric_families {
368 for m in mf.get_metric() {
369 let val = match mf.get_field_type() {
370 MetricType::COUNTER => m.get_counter().get_value(),
371 MetricType::GAUGE => m.get_gauge().get_value(),
372 x => {
373 info!("unhandled {} metric type: {:?}", mf.name(), x);
374 continue;
375 }
376 };
377 if val == 0.0 {
378 continue;
379 }
380 let label_pairs = m.get_label();
381 let mut labels = String::new();
382 if !label_pairs.is_empty() {
383 labels.push_str("{");
384 for lb in label_pairs {
385 if labels != "{" {
386 labels.push_str(",");
387 }
388 labels.push_str(lb.name());
389 labels.push_str(":");
390 labels.push_str(lb.name());
391 }
392 labels.push_str("}");
393 }
394 info!("{}{} {}", mf.name(), labels, val);
395 }
396 }
397}
398
399pub async fn force_compaction<K, V, T, D>(
401 cfg: PersistConfig,
402 metrics_registry: &MetricsRegistry,
403 shard_id: ShardId,
404 consensus_uri: &SensitiveUrl,
405 blob_uri: &SensitiveUrl,
406 key_schema: Arc<K::Schema>,
407 val_schema: Arc<V::Schema>,
408 commit: bool,
409 expected_version: Option<Version>,
410) -> Result<(), anyhow::Error>
411where
412 K: Debug + Codec,
413 V: Debug + Codec,
414 T: Timestamp + Lattice + Codec64 + Sync,
415 D: Monoid + Ord + Codec64 + Send + Sync,
416{
417 let metrics = Arc::new(Metrics::new(&cfg, metrics_registry));
418 let consensus = make_consensus(&cfg, consensus_uri, commit, Arc::clone(&metrics)).await?;
419 let blob = make_blob(&cfg, blob_uri, commit, Arc::clone(&metrics)).await?;
420
421 let machine = make_typed_machine::<K, V, T, D>(
422 &cfg,
423 consensus,
424 Arc::clone(&blob),
425 Arc::clone(&metrics),
426 shard_id,
427 commit,
428 expected_version,
429 )
430 .await?;
431
432 let writer_id = WriterId::new();
433
434 let mut attempt = 0;
435 'outer: loop {
436 machine.applier.fetch_and_update_state(None).await;
437 let reqs = machine.applier.all_fueled_merge_reqs();
438 info!("attempt {}: got {} compaction reqs", attempt, reqs.len());
439 for (idx, req) in reqs.clone().into_iter().enumerate() {
440 let req = CompactReq {
441 shard_id,
442 desc: req.desc,
443 inputs: req.inputs,
444 };
445 let parts = req
446 .inputs
447 .iter()
448 .map(|x| x.batch.part_count())
449 .sum::<usize>();
450 let bytes = req
451 .inputs
452 .iter()
453 .map(|x| x.batch.encoded_size_bytes())
454 .sum::<usize>();
455 let start = Instant::now();
456 info!(
457 "attempt {} req {}: compacting {} batches {} in parts {} totaling bytes: lower={:?} upper={:?} since={:?}",
458 attempt,
459 idx,
460 req.inputs.len(),
461 parts,
462 bytes,
463 req.desc.lower().elements(),
464 req.desc.upper().elements(),
465 req.desc.since().elements(),
466 );
467 if !commit {
468 info!("skipping compaction because --commit is not set");
469 continue;
470 }
471 let schemas = Schemas {
472 id: None,
473 key: Arc::clone(&key_schema),
474 val: Arc::clone(&val_schema),
475 };
476
477 let res = Compactor::<K, V, T, D>::compact(
478 CompactConfig::new(&cfg, shard_id),
479 Arc::clone(&blob),
480 Arc::clone(&metrics),
481 Arc::clone(&machine.applier.shard_metrics),
482 Arc::new(IsolatedRuntime::new(
483 metrics_registry,
484 Some(cfg.isolated_runtime_worker_threads),
485 )),
486 req,
487 schemas,
488 )
489 .await?;
490 metrics.compaction.admin_count.inc();
491 info!(
492 "attempt {} req {}: compacted into {} parts {} bytes in {:?}",
493 attempt,
494 idx,
495 res.output.part_count(),
496 res.output.encoded_size_bytes(),
497 start.elapsed(),
498 );
499 let (apply_res, maintenance) = machine
500 .merge_res(&FueledMergeRes {
501 output: res.output,
502 input: res.input,
503 new_active_compaction: None,
504 })
505 .await;
506 if !maintenance.is_empty() {
507 info!("ignoring non-empty requested maintenance: {maintenance:?}")
508 }
509 if apply_res.applied() {
510 info!("attempt {} req {}: {:?}", attempt, idx, apply_res);
511 } else {
512 info!(
513 "attempt {} req {}: {:?} trying again",
514 attempt, idx, apply_res
515 );
516 attempt += 1;
517 continue 'outer;
518 }
519 }
520 info!("attempt {}: did {} compactions", attempt, reqs.len());
521 let _ = machine.expire_writer(&writer_id).await;
522 info!("expired writer {}", writer_id);
523 return Ok(());
524 }
525}
526
527async fn make_machine(
528 cfg: &PersistConfig,
529 consensus: Arc<dyn Consensus>,
530 blob: Arc<dyn Blob>,
531 metrics: Arc<Metrics>,
532 shard_id: ShardId,
533 commit: bool,
534 expected_version: Option<Version>,
535) -> anyhow::Result<Machine<crate::cli::inspect::K, crate::cli::inspect::V, u64, i64>> {
536 make_typed_machine::<crate::cli::inspect::K, crate::cli::inspect::V, u64, i64>(
537 cfg,
538 consensus,
539 blob,
540 metrics,
541 shard_id,
542 commit,
543 expected_version,
544 )
545 .await
546}
547
548async fn make_typed_machine<K, V, T, D>(
549 cfg: &PersistConfig,
550 consensus: Arc<dyn Consensus>,
551 blob: Arc<dyn Blob>,
552 metrics: Arc<Metrics>,
553 shard_id: ShardId,
554 commit: bool,
555 expected_version: Option<Version>,
556) -> anyhow::Result<Machine<K, V, T, D>>
557where
558 K: Debug + Codec,
559 V: Debug + Codec,
560 T: Timestamp + Lattice + Codec64 + Sync,
561 D: Monoid + Codec64,
562{
563 let state_versions = Arc::new(StateVersions::new(
564 cfg.clone(),
565 consensus,
566 blob,
567 Arc::clone(&metrics),
568 ));
569
570 let versions = state_versions
572 .fetch_recent_live_diffs::<u64>(&shard_id)
573 .await;
574
575 loop {
576 let state_res = state_versions
577 .fetch_current_state::<u64>(&shard_id, versions.0.clone())
578 .await
579 .check_codecs::<crate::cli::inspect::K, crate::cli::inspect::V, i64>(&shard_id);
580 let state = match state_res {
581 Ok(state) => state,
582 Err(codec) => {
583 let mut kvtd = crate::cli::inspect::KVTD_CODECS.lock().expect("lockable");
584 *kvtd = codec.actual;
585 continue;
586 }
587 };
588 let safe_version_change = match (commit, expected_version) {
592 (false, _) => cfg.build_version >= state.applier_version,
594 (true, None) => cfg.build_version == state.applier_version,
596 (true, Some(expected)) => {
598 state.applier_version == expected && expected <= cfg.build_version
606 }
607 };
608 if !safe_version_change {
609 return Err(anyhow!(
611 "version of this tool {} does not match version of state {} when --commit is {commit}. bailing so we don't corrupt anything",
612 cfg.build_version,
613 state.applier_version
614 ));
615 }
616 break;
617 }
618
619 let machine = Machine::<K, V, T, D>::new(
620 cfg.clone(),
621 shard_id,
622 Arc::clone(&metrics),
623 state_versions,
624 Arc::new(StateCache::new(cfg, metrics, Arc::new(NoopPubSubSender))),
625 Arc::new(NoopPubSubSender),
626 Arc::new(IsolatedRuntime::new(
627 &MetricsRegistry::new(),
628 Some(cfg.isolated_runtime_worker_threads),
629 )),
630 Diagnostics::from_purpose("admin"),
631 )
632 .await?;
633
634 Ok(machine)
635}
636
637async fn force_gc(
638 cfg: PersistConfig,
639 metrics_registry: &MetricsRegistry,
640 shard_id: ShardId,
641 consensus_uri: &SensitiveUrl,
642 blob_uri: &SensitiveUrl,
643 commit: bool,
644 expected_version: Option<Version>,
645) -> anyhow::Result<Box<dyn Any>> {
646 let metrics = Arc::new(Metrics::new(&cfg, metrics_registry));
647 let consensus = make_consensus(&cfg, consensus_uri, commit, Arc::clone(&metrics)).await?;
648 let blob = make_blob(&cfg, blob_uri, commit, Arc::clone(&metrics)).await?;
649 let machine = make_machine(
650 &cfg,
651 consensus,
652 blob,
653 metrics,
654 shard_id,
655 commit,
656 expected_version,
657 )
658 .await?;
659 let gc_req = GcReq {
660 shard_id,
661 new_seqno_since: machine.applier.seqno_since(),
662 };
663 let (maintenance, _stats) = GarbageCollector::gc_and_truncate(&machine, gc_req).await;
664 if !maintenance.is_empty() {
665 info!("ignoring non-empty requested maintenance: {maintenance:?}")
666 }
667
668 Ok(Box::new(machine))
669}
670
671pub const CATALOG_FORCE_COMPACTION_FUEL: Config<usize> = Config::new(
673 "persist_catalog_force_compaction_fuel",
674 1024,
675 "fuel to use in catalog dangerous_force_compaction task",
676);
677
678pub const CATALOG_FORCE_COMPACTION_WAIT: Config<Duration> = Config::new(
680 "persist_catalog_force_compaction_wait",
681 Duration::from_secs(60),
682 "wait to use in catalog dangerous_force_compaction task",
683);
684
685pub const EXPRESSION_CACHE_FORCE_COMPACTION_FUEL: Config<usize> = Config::new(
687 "persist_expression_cache_force_compaction_fuel",
688 131_072,
689 "fuel to use in expression cache dangerous_force_compaction",
690);
691
692pub const EXPRESSION_CACHE_FORCE_COMPACTION_WAIT: Config<Duration> = Config::new(
694 "persist_expression_cache_force_compaction_wait",
695 Duration::from_secs(0),
696 "wait to use in expression cache dangerous_force_compaction",
697);
698
699pub async fn dangerous_force_compaction_and_break_pushdown<K, V, T, D>(
718 write: &WriteHandle<K, V, T, D>,
719 fuel: impl Fn() -> usize,
720 wait: impl Fn() -> Duration,
721) where
722 K: Debug + Codec,
723 V: Debug + Codec,
724 T: Timestamp + Lattice + Codec64 + Sync,
725 D: Monoid + Ord + Codec64 + Send + Sync,
726{
727 let machine = write.machine.clone();
728
729 let mut last_exert: Instant;
730
731 loop {
732 last_exert = Instant::now();
733 let fuel = fuel();
734 let (reqs, mut maintenance) = machine.spine_exert(fuel).await;
735 for req in reqs {
736 info!(
737 "force_compaction {} {} compacting {} batches in {} parts with {} runs totaling {} bytes: lower={:?} upper={:?} since={:?}",
738 machine.applier.shard_metrics.name,
739 machine.applier.shard_metrics.shard_id,
740 req.inputs.len(),
741 req.inputs.iter().flat_map(|x| &x.batch.parts).count(),
742 req.inputs
743 .iter()
744 .map(|x| x.batch.runs().count())
745 .sum::<usize>(),
746 req.inputs
747 .iter()
748 .flat_map(|x| &x.batch.parts)
749 .map(|x| x.encoded_size_bytes())
750 .sum::<usize>(),
751 req.desc.lower().elements(),
752 req.desc.upper().elements(),
753 req.desc.since().elements(),
754 );
755 machine.applier.metrics.compaction.requested.inc();
756 let start = Instant::now();
757 let res = Compactor::<K, V, T, D>::compact_and_apply(
758 &machine,
759 req,
760 write.write_schemas.clone(),
761 )
762 .await;
763 let apply_maintenance = match res {
764 Ok(x) => x,
765 Err(err) => {
766 warn!(
767 "force_compaction {} {} errored in compaction: {:?}",
768 machine.applier.shard_metrics.name,
769 machine.applier.shard_metrics.shard_id,
770 err
771 );
772 continue;
773 }
774 };
775 machine.applier.metrics.compaction.admin_count.inc();
776 info!(
777 "force_compaction {} {} compacted in {:?}",
778 machine.applier.shard_metrics.name,
779 machine.applier.shard_metrics.shard_id,
780 start.elapsed(),
781 );
782 maintenance.merge(apply_maintenance);
783 }
784 maintenance.perform(&machine, &write.gc).await;
785
786 let next_exert = last_exert + wait();
789 tokio::time::sleep_until(next_exert.into()).await;
790
791 let num_runs: usize = machine
794 .applier
795 .all_batches()
796 .iter()
797 .map(|x| x.runs().count())
798 .sum();
799 if num_runs <= 1 {
800 info!(
801 "force_compaction {} {} exiting with {} runs",
802 machine.applier.shard_metrics.name,
803 machine.applier.shard_metrics.shard_id,
804 num_runs
805 );
806 return;
807 }
808 }
809}
810
811#[cfg(test)]
812mod tests {
813 use std::time::Duration;
814
815 use mz_dyncfg::ConfigUpdates;
816 use mz_persist_types::ShardId;
817
818 use crate::tests::new_test_client;
819
820 #[mz_persist_proc::test(tokio::test)]
821 #[cfg_attr(miri, ignore)]
822 async fn dangerous_force_compaction_and_break_pushdown(dyncfgs: ConfigUpdates) {
823 let client = new_test_client(&dyncfgs).await;
824 for num_batches in 0..=17 {
825 let (mut write, _read) = client
826 .expect_open::<String, (), u64, i64>(ShardId::new())
827 .await;
828 let machine = write.machine.clone();
829
830 for idx in 0..num_batches {
831 let () = write
832 .expect_compare_and_append(&[((idx.to_string(), ()), idx, 1)], idx, idx + 1)
833 .await;
834 }
835
836 super::dangerous_force_compaction_and_break_pushdown(&write, || 1, || Duration::ZERO)
838 .await;
839 let batches_after = machine.applier.all_batches().len();
840 assert!(batches_after < 2, "{} vs {}", num_batches, batches_after);
841 }
842 }
843}