mz_persist_client/cli/
admin.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! CLI introspection tools for persist
11
12use std::any::Any;
13use std::fmt::Debug;
14use std::str::FromStr;
15use std::sync::Arc;
16use std::time::{Duration, Instant};
17
18use anyhow::{anyhow, bail};
19use differential_dataflow::difference::Semigroup;
20use differential_dataflow::lattice::Lattice;
21use futures_util::{StreamExt, TryStreamExt, stream};
22use mz_dyncfg::{Config, ConfigSet};
23use mz_ore::metrics::MetricsRegistry;
24use mz_ore::now::SYSTEM_TIME;
25use mz_ore::url::SensitiveUrl;
26use mz_persist::location::{Blob, Consensus, ExternalError};
27use mz_persist_types::codec_impls::TodoSchema;
28use mz_persist_types::{Codec, Codec64};
29use prometheus::proto::{MetricFamily, MetricType};
30use semver::Version;
31use timely::progress::{Antichain, Timestamp};
32use tracing::{info, warn};
33
34use crate::async_runtime::IsolatedRuntime;
35use crate::cache::StateCache;
36use crate::cfg::{COMPACTION_MEMORY_BOUND_BYTES, all_dyncfgs};
37use crate::cli::args::{StateArgs, StoreArgs, make_blob, make_consensus};
38use crate::cli::inspect::FAKE_OPAQUE_CODEC;
39use crate::internal::compact::{CompactConfig, CompactReq, Compactor};
40use crate::internal::encoding::Schemas;
41use crate::internal::gc::{GarbageCollector, GcReq};
42use crate::internal::machine::Machine;
43use crate::internal::trace::FueledMergeRes;
44use crate::rpc::{NoopPubSubSender, PubSubSender};
45use crate::write::{WriteHandle, WriterId};
46use crate::{
47    BUILD_INFO, Diagnostics, Metrics, PersistClient, PersistConfig, ShardId, StateVersions,
48};
49
50/// Commands for read-write administration of persist state
51#[derive(Debug, clap::Args)]
52pub struct AdminArgs {
53    #[clap(subcommand)]
54    command: Command,
55
56    /// Whether to commit any modifications (defaults to dry run).
57    #[clap(long)]
58    pub(crate) commit: bool,
59
60    /// !!DANGER ZONE!! - Has the posibility of breaking production!
61    ///
62    /// Allows specifying an expected `applier_version` of the shard we're operating on, so we can
63    /// modify old/leaked shards.
64    #[clap(long)]
65    pub(crate) expected_version: Option<String>,
66}
67
68/// Individual subcommands of admin
69#[derive(Debug, clap::Subcommand)]
70pub(crate) enum Command {
71    /// Manually completes all fueled compactions in a shard.
72    ForceCompaction(ForceCompactionArgs),
73    /// Manually kick off a GC run for a shard.
74    ForceGc(ForceGcArgs),
75    /// Manually finalize an unfinalized shard.
76    Finalize(FinalizeArgs),
77    /// Attempt to ensure that all the files referenced by consensus are available
78    /// in Blob.
79    RestoreBlob(RestoreBlobArgs),
80}
81
82/// Manually completes all fueled compactions in a shard.
83#[derive(Debug, clap::Parser)]
84pub(crate) struct ForceCompactionArgs {
85    #[clap(flatten)]
86    state: StateArgs,
87
88    /// An upper bound on compaction's memory consumption.
89    #[clap(long, default_value_t = 0)]
90    compaction_memory_bound_bytes: usize,
91}
92
93/// Manually completes all fueled compactions in a shard.
94#[derive(Debug, clap::Parser)]
95pub(crate) struct ForceGcArgs {
96    #[clap(flatten)]
97    state: StateArgs,
98}
99
100/// Manually finalizes a shard.
101#[derive(Debug, clap::Parser)]
102pub(crate) struct FinalizeArgs {
103    #[clap(flatten)]
104    state: StateArgs,
105
106    /// Force downgrade the `since` of the shard to the empty antichain.
107    #[clap(long, default_value_t = false)]
108    force_downgrade_since: bool,
109
110    /// Force downgrade the `upper` of the shard to the empty antichain.
111    #[clap(long, default_value_t = false)]
112    force_downgrade_upper: bool,
113}
114
115/// Attempt to restore all the blobs that are referenced by the current state of consensus.
116#[derive(Debug, clap::Parser)]
117pub(crate) struct RestoreBlobArgs {
118    #[clap(flatten)]
119    state: StoreArgs,
120
121    /// The number of concurrent restore operations to run at once.
122    #[clap(long, default_value_t = 16)]
123    concurrency: usize,
124}
125
126/// Runs the given read-write admin command.
127pub async fn run(command: AdminArgs) -> Result<(), anyhow::Error> {
128    match command.command {
129        Command::ForceCompaction(args) => {
130            let shard_id = ShardId::from_str(&args.state.shard_id).expect("invalid shard id");
131            let configs = all_dyncfgs(ConfigSet::default());
132            // TODO: Fetch the latest values of these configs from Launch Darkly.
133            let cfg = PersistConfig::new(&BUILD_INFO, SYSTEM_TIME.clone(), configs);
134            cfg.set_config(
135                &COMPACTION_MEMORY_BOUND_BYTES,
136                args.compaction_memory_bound_bytes,
137            );
138
139            let metrics_registry = MetricsRegistry::new();
140            let expected_version = command
141                .expected_version
142                .as_ref()
143                .map(|v| Version::parse(v))
144                .transpose()?;
145            let () = force_compaction::<crate::cli::inspect::K, crate::cli::inspect::V, u64, i64>(
146                cfg,
147                &metrics_registry,
148                shard_id,
149                &args.state.consensus_uri,
150                &args.state.blob_uri,
151                Arc::new(TodoSchema::default()),
152                Arc::new(TodoSchema::default()),
153                command.commit,
154                expected_version,
155            )
156            .await?;
157            info_log_non_zero_metrics(&metrics_registry.gather());
158        }
159        Command::ForceGc(args) => {
160            let shard_id = ShardId::from_str(&args.state.shard_id).expect("invalid shard id");
161            let configs = all_dyncfgs(ConfigSet::default());
162            // TODO: Fetch the latest values of these configs from Launch Darkly.
163            let cfg = PersistConfig::new(&BUILD_INFO, SYSTEM_TIME.clone(), configs);
164            let metrics_registry = MetricsRegistry::new();
165            let expected_version = command
166                .expected_version
167                .as_ref()
168                .map(|v| Version::parse(v))
169                .transpose()?;
170            // We don't actually care about the return value here, but we do need to prevent
171            // the shard metrics from being dropped before they're reported below.
172            let _machine = force_gc(
173                cfg,
174                &metrics_registry,
175                shard_id,
176                &args.state.consensus_uri,
177                &args.state.blob_uri,
178                command.commit,
179                expected_version,
180            )
181            .await?;
182            info_log_non_zero_metrics(&metrics_registry.gather());
183        }
184        Command::Finalize(args) => {
185            let FinalizeArgs {
186                state:
187                    StateArgs {
188                        shard_id,
189                        consensus_uri,
190                        blob_uri,
191                    },
192                force_downgrade_since,
193                force_downgrade_upper,
194            } = args;
195            let shard_id = ShardId::from_str(&shard_id).expect("invalid shard id");
196            let commit = command.commit;
197            let expected_version = command
198                .expected_version
199                .as_ref()
200                .map(|v| Version::parse(v))
201                .transpose()?;
202
203            let configs = all_dyncfgs(ConfigSet::default());
204            // TODO: Fetch the latest values of these configs from Launch Darkly.
205            let cfg = PersistConfig::new(&BUILD_INFO, SYSTEM_TIME.clone(), configs);
206            let metrics_registry = MetricsRegistry::new();
207            let metrics = Arc::new(Metrics::new(&cfg, &metrics_registry));
208            let consensus =
209                make_consensus(&cfg, &consensus_uri, commit, Arc::clone(&metrics)).await?;
210            let blob = make_blob(&cfg, &blob_uri, commit, Arc::clone(&metrics)).await?;
211
212            // Open a machine so we can read the state of the Opaque, and set
213            // our fake codecs.
214            let machine = make_machine(
215                &cfg,
216                Arc::clone(&consensus),
217                Arc::clone(&blob),
218                Arc::clone(&metrics),
219                shard_id,
220                commit,
221                expected_version,
222            )
223            .await?;
224
225            if force_downgrade_upper {
226                let isolated_runtime = Arc::new(IsolatedRuntime::new(&metrics_registry, None));
227                let pubsub_sender: Arc<dyn PubSubSender> = Arc::new(NoopPubSubSender);
228                let shared_states = Arc::new(StateCache::new(
229                    &cfg,
230                    Arc::clone(&metrics),
231                    Arc::clone(&pubsub_sender),
232                ));
233
234                // We need a PersistClient to open a write handle so we can append an empty batch.
235                let persist_client = PersistClient::new(
236                    cfg,
237                    blob,
238                    consensus,
239                    metrics,
240                    isolated_runtime,
241                    shared_states,
242                    pubsub_sender,
243                )?;
244                let diagnostics = Diagnostics {
245                    shard_name: shard_id.to_string(),
246                    handle_purpose: "persist-cli finalize shard".to_string(),
247                };
248
249                let mut write_handle: WriteHandle<
250                    crate::cli::inspect::K,
251                    crate::cli::inspect::V,
252                    u64,
253                    i64,
254                > = persist_client
255                    .open_writer(
256                        shard_id,
257                        Arc::new(TodoSchema::<crate::cli::inspect::K>::default()),
258                        Arc::new(TodoSchema::<crate::cli::inspect::V>::default()),
259                        diagnostics,
260                    )
261                    .await?;
262
263                if !write_handle.upper().is_empty() {
264                    let empty_batch: Vec<(
265                        (crate::cli::inspect::K, crate::cli::inspect::V),
266                        u64,
267                        i64,
268                    )> = vec![];
269                    let lower = write_handle.upper().clone();
270                    let upper = Antichain::new();
271
272                    let result = write_handle.append(empty_batch, lower, upper).await?;
273                    if let Err(err) = result {
274                        anyhow::bail!("failed to force downgrade upper, {err:?}");
275                    }
276                }
277            }
278
279            if force_downgrade_since {
280                let (state, _maintenance) = machine
281                    .register_critical_reader::<crate::cli::inspect::O>(
282                        &crate::PersistClient::CONTROLLER_CRITICAL_SINCE,
283                        "persist-cli finalize with force downgrade",
284                    )
285                    .await;
286
287                // HACK: Internally we have a check that the Opaque is using
288                // the correct codec. For the purposes of this command we want
289                // to side step that check so we set our reported codec to
290                // whatever the current state of the Shard is.
291                let expected_opaque = crate::cli::inspect::O::decode(state.opaque.0);
292                FAKE_OPAQUE_CODEC
293                    .lock()
294                    .expect("lockable")
295                    .clone_from(&state.opaque_codec);
296
297                let (result, _maintenance) = machine
298                    .compare_and_downgrade_since(
299                        &crate::PersistClient::CONTROLLER_CRITICAL_SINCE,
300                        &expected_opaque,
301                        (&expected_opaque, &Antichain::new()),
302                    )
303                    .await;
304                if let Err((actual_opaque, _since)) = result {
305                    bail!(
306                        "opaque changed, expected: {expected_opaque:?}, actual: {actual_opaque:?}"
307                    )
308                }
309            }
310
311            let maintenance = machine.become_tombstone().await?;
312            if !maintenance.is_empty() {
313                info!("ignoring non-empty requested maintenance: {maintenance:?}")
314            }
315            info_log_non_zero_metrics(&metrics_registry.gather());
316        }
317        Command::RestoreBlob(args) => {
318            let RestoreBlobArgs {
319                state:
320                    StoreArgs {
321                        consensus_uri,
322                        blob_uri,
323                    },
324                concurrency,
325            } = args;
326            let commit = command.commit;
327            let configs = all_dyncfgs(ConfigSet::default());
328            // TODO: Fetch the latest values of these configs from Launch Darkly.
329            let cfg = PersistConfig::new(&BUILD_INFO, SYSTEM_TIME.clone(), configs);
330            let metrics_registry = MetricsRegistry::new();
331            let metrics = Arc::new(Metrics::new(&cfg, &metrics_registry));
332            let consensus =
333                make_consensus(&cfg, &consensus_uri, commit, Arc::clone(&metrics)).await?;
334            let blob = make_blob(&cfg, &blob_uri, commit, Arc::clone(&metrics)).await?;
335            let versions = StateVersions::new(
336                cfg.clone(),
337                Arc::clone(&consensus),
338                Arc::clone(&blob),
339                Arc::clone(&metrics),
340            );
341
342            let not_restored: Vec<_> = consensus
343                .list_keys()
344                .flat_map_unordered(concurrency, |shard| {
345                    stream::once(Box::pin(async {
346                        let shard_id = shard?;
347                        let shard_id = ShardId::from_str(&shard_id).expect("invalid shard id");
348                        let start = Instant::now();
349                        info!("Restoring blob state for shard {shard_id}.",);
350                        let shard_not_restored = crate::internal::restore::restore_blob(
351                            &versions,
352                            blob.as_ref(),
353                            &cfg.build_version,
354                            shard_id,
355                            &*metrics,
356                        )
357                        .await?;
358                        info!(
359                            "Restored blob state for shard {shard_id}; {} errors, {:?} elapsed.",
360                            shard_not_restored.len(),
361                            start.elapsed()
362                        );
363                        Ok::<_, ExternalError>(shard_not_restored)
364                    }))
365                })
366                .try_fold(vec![], |mut a, b| async move {
367                    a.extend(b);
368                    Ok(a)
369                })
370                .await?;
371
372            info_log_non_zero_metrics(&metrics_registry.gather());
373            if !not_restored.is_empty() {
374                bail!("referenced blobs were not restored: {not_restored:#?}")
375            }
376        }
377    }
378    Ok(())
379}
380
381pub(crate) fn info_log_non_zero_metrics(metric_families: &[MetricFamily]) {
382    for mf in metric_families {
383        for m in mf.get_metric() {
384            let val = match mf.get_field_type() {
385                MetricType::COUNTER => m.get_counter().get_value(),
386                MetricType::GAUGE => m.get_gauge().get_value(),
387                x => {
388                    info!("unhandled {} metric type: {:?}", mf.get_name(), x);
389                    continue;
390                }
391            };
392            if val == 0.0 {
393                continue;
394            }
395            let label_pairs = m.get_label();
396            let mut labels = String::new();
397            if !label_pairs.is_empty() {
398                labels.push_str("{");
399                for lb in label_pairs {
400                    if labels != "{" {
401                        labels.push_str(",");
402                    }
403                    labels.push_str(lb.get_name());
404                    labels.push_str(":");
405                    labels.push_str(lb.get_value());
406                }
407                labels.push_str("}");
408            }
409            info!("{}{} {}", mf.get_name(), labels, val);
410        }
411    }
412}
413
414/// Manually completes all fueled compactions in a shard.
415pub async fn force_compaction<K, V, T, D>(
416    cfg: PersistConfig,
417    metrics_registry: &MetricsRegistry,
418    shard_id: ShardId,
419    consensus_uri: &SensitiveUrl,
420    blob_uri: &SensitiveUrl,
421    key_schema: Arc<K::Schema>,
422    val_schema: Arc<V::Schema>,
423    commit: bool,
424    expected_version: Option<Version>,
425) -> Result<(), anyhow::Error>
426where
427    K: Debug + Codec,
428    V: Debug + Codec,
429    T: Timestamp + Lattice + Codec64 + Sync,
430    D: Semigroup + Ord + Codec64 + Send + Sync,
431{
432    let metrics = Arc::new(Metrics::new(&cfg, metrics_registry));
433    let consensus = make_consensus(&cfg, consensus_uri, commit, Arc::clone(&metrics)).await?;
434    let blob = make_blob(&cfg, blob_uri, commit, Arc::clone(&metrics)).await?;
435
436    let machine = make_typed_machine::<K, V, T, D>(
437        &cfg,
438        consensus,
439        Arc::clone(&blob),
440        Arc::clone(&metrics),
441        shard_id,
442        commit,
443        expected_version,
444    )
445    .await?;
446
447    let writer_id = WriterId::new();
448
449    let mut attempt = 0;
450    'outer: loop {
451        machine.applier.fetch_and_update_state(None).await;
452        let reqs = machine.applier.all_fueled_merge_reqs();
453        info!("attempt {}: got {} compaction reqs", attempt, reqs.len());
454        for (idx, req) in reqs.clone().into_iter().enumerate() {
455            let req = CompactReq {
456                shard_id,
457                desc: req.desc,
458                inputs: req
459                    .inputs
460                    .into_iter()
461                    .map(|b| Arc::unwrap_or_clone(b.batch))
462                    .collect(),
463            };
464            let parts = req.inputs.iter().map(|x| x.part_count()).sum::<usize>();
465            let bytes = req
466                .inputs
467                .iter()
468                .map(|x| x.encoded_size_bytes())
469                .sum::<usize>();
470            let start = Instant::now();
471            info!(
472                "attempt {} req {}: compacting {} batches {} in parts {} totaling bytes: lower={:?} upper={:?} since={:?}",
473                attempt,
474                idx,
475                req.inputs.len(),
476                parts,
477                bytes,
478                req.desc.lower().elements(),
479                req.desc.upper().elements(),
480                req.desc.since().elements(),
481            );
482            if !commit {
483                info!("skipping compaction because --commit is not set");
484                continue;
485            }
486            let schemas = Schemas {
487                id: None,
488                key: Arc::clone(&key_schema),
489                val: Arc::clone(&val_schema),
490            };
491
492            let res = Compactor::<K, V, T, D>::compact(
493                CompactConfig::new(&cfg, shard_id),
494                Arc::clone(&blob),
495                Arc::clone(&metrics),
496                Arc::clone(&machine.applier.shard_metrics),
497                Arc::new(IsolatedRuntime::default()),
498                req,
499                schemas,
500            )
501            .await?;
502            metrics.compaction.admin_count.inc();
503            info!(
504                "attempt {} req {}: compacted into {} parts {} bytes in {:?}",
505                attempt,
506                idx,
507                res.output.part_count(),
508                res.output.encoded_size_bytes(),
509                start.elapsed(),
510            );
511            let (apply_res, maintenance) = machine
512                .merge_res(&FueledMergeRes { output: res.output })
513                .await;
514            if !maintenance.is_empty() {
515                info!("ignoring non-empty requested maintenance: {maintenance:?}")
516            }
517            if apply_res.applied() {
518                info!("attempt {} req {}: {:?}", attempt, idx, apply_res);
519            } else {
520                info!(
521                    "attempt {} req {}: {:?} trying again",
522                    attempt, idx, apply_res
523                );
524                attempt += 1;
525                continue 'outer;
526            }
527        }
528        info!("attempt {}: did {} compactions", attempt, reqs.len());
529        let _ = machine.expire_writer(&writer_id).await;
530        info!("expired writer {}", writer_id);
531        return Ok(());
532    }
533}
534
535async fn make_machine(
536    cfg: &PersistConfig,
537    consensus: Arc<dyn Consensus>,
538    blob: Arc<dyn Blob>,
539    metrics: Arc<Metrics>,
540    shard_id: ShardId,
541    commit: bool,
542    expected_version: Option<Version>,
543) -> anyhow::Result<Machine<crate::cli::inspect::K, crate::cli::inspect::V, u64, i64>> {
544    make_typed_machine::<crate::cli::inspect::K, crate::cli::inspect::V, u64, i64>(
545        cfg,
546        consensus,
547        blob,
548        metrics,
549        shard_id,
550        commit,
551        expected_version,
552    )
553    .await
554}
555
556async fn make_typed_machine<K, V, T, D>(
557    cfg: &PersistConfig,
558    consensus: Arc<dyn Consensus>,
559    blob: Arc<dyn Blob>,
560    metrics: Arc<Metrics>,
561    shard_id: ShardId,
562    commit: bool,
563    expected_version: Option<Version>,
564) -> anyhow::Result<Machine<K, V, T, D>>
565where
566    K: Debug + Codec,
567    V: Debug + Codec,
568    T: Timestamp + Lattice + Codec64 + Sync,
569    D: Semigroup + Codec64,
570{
571    let state_versions = Arc::new(StateVersions::new(
572        cfg.clone(),
573        consensus,
574        blob,
575        Arc::clone(&metrics),
576    ));
577
578    // Prime the K V codec magic
579    let versions = state_versions
580        .fetch_recent_live_diffs::<u64>(&shard_id)
581        .await;
582
583    loop {
584        let state_res = state_versions
585            .fetch_current_state::<u64>(&shard_id, versions.0.clone())
586            .await
587            .check_codecs::<crate::cli::inspect::K, crate::cli::inspect::V, i64>(&shard_id);
588        let state = match state_res {
589            Ok(state) => state,
590            Err(codec) => {
591                let mut kvtd = crate::cli::inspect::KVTD_CODECS.lock().expect("lockable");
592                *kvtd = codec.actual;
593                continue;
594            }
595        };
596        // This isn't the perfect place to put this check, the ideal would be in
597        // the apply_unbatched_cmd loop, but I don't want to pollute the prod
598        // code with this logic.
599        let safe_version_change = match (commit, expected_version) {
600            // We never actually write out state changes, so increasing the version is okay.
601            (false, _) => cfg.build_version >= state.applier_version,
602            // If the versions match that's okay because any commits won't change it.
603            (true, None) => cfg.build_version == state.applier_version,
604            // !!DANGER ZONE!!
605            (true, Some(expected)) => {
606                // If we're not _extremely_ careful, the persistcli could make shards unreadable by
607                // production. But there are times when we want to operate on a leaked shard with a
608                // newer version of the build.
609                //
610                // We only allow a mismatch in version if we provided the expected version to the
611                // command, and the expected version is less than the current build, which
612                // indicates this is an old shard.
613                state.applier_version == expected && expected <= cfg.build_version
614            }
615        };
616        if !safe_version_change {
617            // We could add a flag to override this check, if that comes up.
618            return Err(anyhow!(
619                "version of this tool {} does not match version of state {} when --commit is {commit}. bailing so we don't corrupt anything",
620                cfg.build_version,
621                state.applier_version
622            ));
623        }
624        break;
625    }
626
627    let machine = Machine::<K, V, T, D>::new(
628        cfg.clone(),
629        shard_id,
630        Arc::clone(&metrics),
631        state_versions,
632        Arc::new(StateCache::new(cfg, metrics, Arc::new(NoopPubSubSender))),
633        Arc::new(NoopPubSubSender),
634        Arc::new(IsolatedRuntime::default()),
635        Diagnostics::from_purpose("admin"),
636    )
637    .await?;
638
639    Ok(machine)
640}
641
642async fn force_gc(
643    cfg: PersistConfig,
644    metrics_registry: &MetricsRegistry,
645    shard_id: ShardId,
646    consensus_uri: &SensitiveUrl,
647    blob_uri: &SensitiveUrl,
648    commit: bool,
649    expected_version: Option<Version>,
650) -> anyhow::Result<Box<dyn Any>> {
651    let metrics = Arc::new(Metrics::new(&cfg, metrics_registry));
652    let consensus = make_consensus(&cfg, consensus_uri, commit, Arc::clone(&metrics)).await?;
653    let blob = make_blob(&cfg, blob_uri, commit, Arc::clone(&metrics)).await?;
654    let machine = make_machine(
655        &cfg,
656        consensus,
657        blob,
658        metrics,
659        shard_id,
660        commit,
661        expected_version,
662    )
663    .await?;
664    let gc_req = GcReq {
665        shard_id,
666        new_seqno_since: machine.applier.seqno_since(),
667    };
668    let (maintenance, _stats) = GarbageCollector::gc_and_truncate(&machine, gc_req).await;
669    if !maintenance.is_empty() {
670        info!("ignoring non-empty requested maintenance: {maintenance:?}")
671    }
672
673    Ok(Box::new(machine))
674}
675
676/// Exposed for `mz-catalog`.
677pub const CATALOG_FORCE_COMPACTION_FUEL: Config<usize> = Config::new(
678    "persist_catalog_force_compaction_fuel",
679    1024,
680    "fuel to use in catalog dangerous_force_compaction task",
681);
682
683/// Exposed for `mz-catalog`.
684pub const CATALOG_FORCE_COMPACTION_WAIT: Config<Duration> = Config::new(
685    "persist_catalog_force_compaction_wait",
686    Duration::from_secs(60),
687    "wait to use in catalog dangerous_force_compaction task",
688);
689
690/// Exposed for `mz-catalog`.
691pub const EXPRESSION_CACHE_FORCE_COMPACTION_FUEL: Config<usize> = Config::new(
692    "persist_expression_cache_force_compaction_fuel",
693    131_072,
694    "fuel to use in expression cache dangerous_force_compaction",
695);
696
697/// Exposed for `mz-catalog`.
698pub const EXPRESSION_CACHE_FORCE_COMPACTION_WAIT: Config<Duration> = Config::new(
699    "persist_expression_cache_force_compaction_wait",
700    Duration::from_secs(0),
701    "wait to use in expression cache dangerous_force_compaction",
702);
703
704/// Attempts to compact all batches in a shard into a minimal number.
705///
706/// This destroys most hope of filter pushdown doing anything useful. If you
707/// think you want this, you almost certainly want something else, please come
708/// talk to the persist team before using.
709///
710/// This is accomplished by adding artificial fuel (representing some count of
711/// updates) to the shard's internal Spine of batches structure on some schedule
712/// and doing any compaction work that results. Both the amount of fuel and
713/// cadence are dynamically tunable. However, it is not clear exactly how much
714/// fuel is safe to add in each call: 10 definitely seems fine, usize::MAX may
715/// not be. This also adds to the danger in "dangerous".
716///
717/// This is intentionally not even hooked up to `persistcli admin` for the above
718/// reasons.
719///
720/// Exits once the shard is sufficiently compacted, but can be called in a loop
721/// to keep it compacted.
722pub async fn dangerous_force_compaction_and_break_pushdown<K, V, T, D>(
723    write: &WriteHandle<K, V, T, D>,
724    fuel: impl Fn() -> usize,
725    wait: impl Fn() -> Duration,
726) where
727    K: Debug + Codec,
728    V: Debug + Codec,
729    T: Timestamp + Lattice + Codec64 + Sync,
730    D: Semigroup + Ord + Codec64 + Send + Sync,
731{
732    let machine = write.machine.clone();
733
734    let mut last_exert: Instant;
735
736    loop {
737        last_exert = Instant::now();
738        let fuel = fuel();
739        let (reqs, mut maintenance) = machine.spine_exert(fuel).await;
740        for req in reqs {
741            info!(
742                "force_compaction {} {} compacting {} batches in {} parts totaling {} bytes: lower={:?} upper={:?} since={:?}",
743                machine.applier.shard_metrics.name,
744                machine.applier.shard_metrics.shard_id,
745                req.inputs.len(),
746                req.inputs.iter().flat_map(|x| &x.parts).count(),
747                req.inputs
748                    .iter()
749                    .flat_map(|x| &x.parts)
750                    .map(|x| x.encoded_size_bytes())
751                    .sum::<usize>(),
752                req.desc.lower().elements(),
753                req.desc.upper().elements(),
754                req.desc.since().elements(),
755            );
756            machine.applier.metrics.compaction.requested.inc();
757            let start = Instant::now();
758            let res = Compactor::<K, V, T, D>::compact_and_apply(
759                &machine,
760                req,
761                write.write_schemas.clone(),
762            )
763            .await;
764            let (res, apply_maintenance) = match res {
765                Ok(x) => x,
766                Err(err) => {
767                    warn!(
768                        "force_compaction {} {} errored in compaction: {:?}",
769                        machine.applier.shard_metrics.name,
770                        machine.applier.shard_metrics.shard_id,
771                        err
772                    );
773                    continue;
774                }
775            };
776            machine.applier.metrics.compaction.admin_count.inc();
777            info!(
778                "force_compaction {} {} compacted in {:?}: {:?}",
779                machine.applier.shard_metrics.name,
780                machine.applier.shard_metrics.shard_id,
781                start.elapsed(),
782                res
783            );
784            maintenance.merge(apply_maintenance);
785        }
786        maintenance.perform(&machine, &write.gc).await;
787
788        // Now sleep before the next one. Make sure to delay from when we
789        // started the last exert in case the compaction takes non-trivial time.
790        let next_exert = last_exert + wait();
791        tokio::time::sleep_until(next_exert.into()).await;
792
793        // NB: This check is intentionally at the end so that it's safe to call
794        // this method in a loop.
795        let num_batches = machine.applier.all_batches().len();
796        if num_batches < 2 {
797            info!(
798                "force_compaction {} {} exiting with {} batches",
799                machine.applier.shard_metrics.name,
800                machine.applier.shard_metrics.shard_id,
801                num_batches
802            );
803            return;
804        }
805    }
806}
807
808#[cfg(test)]
809mod tests {
810    use std::time::Duration;
811
812    use mz_dyncfg::ConfigUpdates;
813    use mz_persist_types::ShardId;
814
815    use crate::tests::new_test_client;
816
817    #[mz_persist_proc::test(tokio::test)]
818    #[cfg_attr(miri, ignore)]
819    async fn dangerous_force_compaction_and_break_pushdown(dyncfgs: ConfigUpdates) {
820        let client = new_test_client(&dyncfgs).await;
821        for num_batches in 0..=17 {
822            let (mut write, _read) = client
823                .expect_open::<String, (), u64, i64>(ShardId::new())
824                .await;
825            let machine = write.machine.clone();
826
827            for idx in 0..num_batches {
828                let () = write
829                    .expect_compare_and_append(&[((idx.to_string(), ()), idx, 1)], idx, idx + 1)
830                    .await;
831            }
832
833            // Run the tool and verify that we get down to at most two.
834            super::dangerous_force_compaction_and_break_pushdown(&write, || 1, || Duration::ZERO)
835                .await;
836            let batches_after = machine.applier.all_batches().len();
837            assert!(batches_after < 2, "{} vs {}", num_batches, batches_after);
838        }
839    }
840}