mz_persist_client/cli/
admin.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! CLI introspection tools for persist
11
12use std::any::Any;
13use std::fmt::Debug;
14use std::str::FromStr;
15use std::sync::Arc;
16use std::time::{Duration, Instant};
17
18use anyhow::{anyhow, bail};
19use differential_dataflow::difference::Semigroup;
20use differential_dataflow::lattice::Lattice;
21use futures_util::{StreamExt, TryStreamExt, stream};
22use mz_dyncfg::{Config, ConfigSet};
23use mz_ore::metrics::MetricsRegistry;
24use mz_ore::now::SYSTEM_TIME;
25use mz_ore::url::SensitiveUrl;
26use mz_persist::location::{Blob, Consensus, ExternalError};
27use mz_persist_types::codec_impls::TodoSchema;
28use mz_persist_types::{Codec, Codec64};
29use prometheus::proto::{MetricFamily, MetricType};
30use semver::Version;
31use timely::progress::{Antichain, Timestamp};
32use tracing::{info, warn};
33
34use crate::async_runtime::IsolatedRuntime;
35use crate::cache::StateCache;
36use crate::cfg::{COMPACTION_MEMORY_BOUND_BYTES, all_dyncfgs};
37use crate::cli::args::{StateArgs, StoreArgs, make_blob, make_consensus};
38use crate::cli::inspect::FAKE_OPAQUE_CODEC;
39use crate::internal::compact::{CompactConfig, CompactReq, Compactor};
40use crate::internal::encoding::Schemas;
41use crate::internal::gc::{GarbageCollector, GcReq};
42use crate::internal::machine::Machine;
43use crate::internal::trace::FueledMergeRes;
44use crate::rpc::{NoopPubSubSender, PubSubSender};
45use crate::write::{WriteHandle, WriterId};
46use crate::{
47    BUILD_INFO, Diagnostics, Metrics, PersistClient, PersistConfig, ShardId, StateVersions,
48};
49
50/// Commands for read-write administration of persist state
51#[derive(Debug, clap::Args)]
52pub struct AdminArgs {
53    #[clap(subcommand)]
54    command: Command,
55
56    /// Whether to commit any modifications (defaults to dry run).
57    #[clap(long)]
58    pub(crate) commit: bool,
59
60    /// !!DANGER ZONE!! - Has the posibility of breaking production!
61    ///
62    /// Allows specifying an expected `applier_version` of the shard we're operating on, so we can
63    /// modify old/leaked shards.
64    #[clap(long)]
65    pub(crate) expected_version: Option<String>,
66}
67
68/// Individual subcommands of admin
69#[derive(Debug, clap::Subcommand)]
70pub(crate) enum Command {
71    /// Manually completes all fueled compactions in a shard.
72    ForceCompaction(ForceCompactionArgs),
73    /// Manually kick off a GC run for a shard.
74    ForceGc(ForceGcArgs),
75    /// Manually finalize an unfinalized shard.
76    Finalize(FinalizeArgs),
77    /// Attempt to ensure that all the files referenced by consensus are available
78    /// in Blob.
79    RestoreBlob(RestoreBlobArgs),
80}
81
82/// Manually completes all fueled compactions in a shard.
83#[derive(Debug, clap::Parser)]
84pub(crate) struct ForceCompactionArgs {
85    #[clap(flatten)]
86    state: StateArgs,
87
88    /// An upper bound on compaction's memory consumption.
89    #[clap(long, default_value_t = 0)]
90    compaction_memory_bound_bytes: usize,
91}
92
93/// Manually completes all fueled compactions in a shard.
94#[derive(Debug, clap::Parser)]
95pub(crate) struct ForceGcArgs {
96    #[clap(flatten)]
97    state: StateArgs,
98}
99
100/// Manually finalizes a shard.
101#[derive(Debug, clap::Parser)]
102pub(crate) struct FinalizeArgs {
103    #[clap(flatten)]
104    state: StateArgs,
105
106    /// Force downgrade the `since` of the shard to the empty antichain.
107    #[clap(long, default_value_t = false)]
108    force_downgrade_since: bool,
109
110    /// Force downgrade the `upper` of the shard to the empty antichain.
111    #[clap(long, default_value_t = false)]
112    force_downgrade_upper: bool,
113}
114
115/// Attempt to restore all the blobs that are referenced by the current state of consensus.
116#[derive(Debug, clap::Parser)]
117pub(crate) struct RestoreBlobArgs {
118    #[clap(flatten)]
119    state: StoreArgs,
120
121    /// The number of concurrent restore operations to run at once.
122    #[clap(long, default_value_t = 16)]
123    concurrency: usize,
124}
125
126/// Runs the given read-write admin command.
127pub async fn run(command: AdminArgs) -> Result<(), anyhow::Error> {
128    match command.command {
129        Command::ForceCompaction(args) => {
130            let shard_id = ShardId::from_str(&args.state.shard_id).expect("invalid shard id");
131            let configs = all_dyncfgs(ConfigSet::default());
132            // TODO: Fetch the latest values of these configs from Launch Darkly.
133            let cfg = PersistConfig::new(&BUILD_INFO, SYSTEM_TIME.clone(), configs);
134            cfg.set_config(
135                &COMPACTION_MEMORY_BOUND_BYTES,
136                args.compaction_memory_bound_bytes,
137            );
138
139            let metrics_registry = MetricsRegistry::new();
140            let expected_version = command
141                .expected_version
142                .as_ref()
143                .map(|v| Version::parse(v))
144                .transpose()?;
145            let () = force_compaction::<crate::cli::inspect::K, crate::cli::inspect::V, u64, i64>(
146                cfg,
147                &metrics_registry,
148                shard_id,
149                &args.state.consensus_uri,
150                &args.state.blob_uri,
151                Arc::new(TodoSchema::default()),
152                Arc::new(TodoSchema::default()),
153                command.commit,
154                expected_version,
155            )
156            .await?;
157            info_log_non_zero_metrics(&metrics_registry.gather());
158        }
159        Command::ForceGc(args) => {
160            let shard_id = ShardId::from_str(&args.state.shard_id).expect("invalid shard id");
161            let configs = all_dyncfgs(ConfigSet::default());
162            // TODO: Fetch the latest values of these configs from Launch Darkly.
163            let cfg = PersistConfig::new(&BUILD_INFO, SYSTEM_TIME.clone(), configs);
164            let metrics_registry = MetricsRegistry::new();
165            let expected_version = command
166                .expected_version
167                .as_ref()
168                .map(|v| Version::parse(v))
169                .transpose()?;
170            // We don't actually care about the return value here, but we do need to prevent
171            // the shard metrics from being dropped before they're reported below.
172            let _machine = force_gc(
173                cfg,
174                &metrics_registry,
175                shard_id,
176                &args.state.consensus_uri,
177                &args.state.blob_uri,
178                command.commit,
179                expected_version,
180            )
181            .await?;
182            info_log_non_zero_metrics(&metrics_registry.gather());
183        }
184        Command::Finalize(args) => {
185            let FinalizeArgs {
186                state:
187                    StateArgs {
188                        shard_id,
189                        consensus_uri,
190                        blob_uri,
191                    },
192                force_downgrade_since,
193                force_downgrade_upper,
194            } = args;
195            let shard_id = ShardId::from_str(&shard_id).expect("invalid shard id");
196            let commit = command.commit;
197            let expected_version = command
198                .expected_version
199                .as_ref()
200                .map(|v| Version::parse(v))
201                .transpose()?;
202
203            let configs = all_dyncfgs(ConfigSet::default());
204            // TODO: Fetch the latest values of these configs from Launch Darkly.
205            let cfg = PersistConfig::new(&BUILD_INFO, SYSTEM_TIME.clone(), configs);
206            let metrics_registry = MetricsRegistry::new();
207            let metrics = Arc::new(Metrics::new(&cfg, &metrics_registry));
208            let consensus =
209                make_consensus(&cfg, &consensus_uri, commit, Arc::clone(&metrics)).await?;
210            let blob = make_blob(&cfg, &blob_uri, commit, Arc::clone(&metrics)).await?;
211
212            // Open a machine so we can read the state of the Opaque, and set
213            // our fake codecs.
214            let machine = make_machine(
215                &cfg,
216                Arc::clone(&consensus),
217                Arc::clone(&blob),
218                Arc::clone(&metrics),
219                shard_id,
220                commit,
221                expected_version,
222            )
223            .await?;
224
225            if force_downgrade_upper {
226                let isolated_runtime = Arc::new(IsolatedRuntime::new(&metrics_registry, None));
227                let pubsub_sender: Arc<dyn PubSubSender> = Arc::new(NoopPubSubSender);
228                let shared_states = Arc::new(StateCache::new(
229                    &cfg,
230                    Arc::clone(&metrics),
231                    Arc::clone(&pubsub_sender),
232                ));
233
234                // We need a PersistClient to open a write handle so we can append an empty batch.
235                let persist_client = PersistClient::new(
236                    cfg,
237                    blob,
238                    consensus,
239                    metrics,
240                    isolated_runtime,
241                    shared_states,
242                    pubsub_sender,
243                )?;
244                let diagnostics = Diagnostics {
245                    shard_name: shard_id.to_string(),
246                    handle_purpose: "persist-cli finalize shard".to_string(),
247                };
248
249                let mut write_handle: WriteHandle<
250                    crate::cli::inspect::K,
251                    crate::cli::inspect::V,
252                    u64,
253                    i64,
254                > = persist_client
255                    .open_writer(
256                        shard_id,
257                        Arc::new(TodoSchema::<crate::cli::inspect::K>::default()),
258                        Arc::new(TodoSchema::<crate::cli::inspect::V>::default()),
259                        diagnostics,
260                    )
261                    .await?;
262
263                if !write_handle.upper().is_empty() {
264                    let empty_batch: Vec<(
265                        (crate::cli::inspect::K, crate::cli::inspect::V),
266                        u64,
267                        i64,
268                    )> = vec![];
269                    let lower = write_handle.upper().clone();
270                    let upper = Antichain::new();
271
272                    let result = write_handle.append(empty_batch, lower, upper).await?;
273                    if let Err(err) = result {
274                        anyhow::bail!("failed to force downgrade upper, {err:?}");
275                    }
276                }
277            }
278
279            if force_downgrade_since {
280                let (state, _maintenance) = machine
281                    .register_critical_reader::<crate::cli::inspect::O>(
282                        &crate::PersistClient::CONTROLLER_CRITICAL_SINCE,
283                        "persist-cli finalize with force downgrade",
284                    )
285                    .await;
286
287                // HACK: Internally we have a check that the Opaque is using
288                // the correct codec. For the purposes of this command we want
289                // to side step that check so we set our reported codec to
290                // whatever the current state of the Shard is.
291                let expected_opaque = crate::cli::inspect::O::decode(state.opaque.0);
292                FAKE_OPAQUE_CODEC
293                    .lock()
294                    .expect("lockable")
295                    .clone_from(&state.opaque_codec);
296
297                let (result, _maintenance) = machine
298                    .compare_and_downgrade_since(
299                        &crate::PersistClient::CONTROLLER_CRITICAL_SINCE,
300                        &expected_opaque,
301                        (&expected_opaque, &Antichain::new()),
302                    )
303                    .await;
304                if let Err((actual_opaque, _since)) = result {
305                    bail!(
306                        "opaque changed, expected: {expected_opaque:?}, actual: {actual_opaque:?}"
307                    )
308                }
309            }
310
311            let maintenance = machine.become_tombstone().await?;
312            if !maintenance.is_empty() {
313                info!("ignoring non-empty requested maintenance: {maintenance:?}")
314            }
315            info_log_non_zero_metrics(&metrics_registry.gather());
316        }
317        Command::RestoreBlob(args) => {
318            let RestoreBlobArgs {
319                state:
320                    StoreArgs {
321                        consensus_uri,
322                        blob_uri,
323                    },
324                concurrency,
325            } = args;
326            let commit = command.commit;
327            let configs = all_dyncfgs(ConfigSet::default());
328            // TODO: Fetch the latest values of these configs from Launch Darkly.
329            let cfg = PersistConfig::new(&BUILD_INFO, SYSTEM_TIME.clone(), configs);
330            let metrics_registry = MetricsRegistry::new();
331            let metrics = Arc::new(Metrics::new(&cfg, &metrics_registry));
332            let consensus =
333                make_consensus(&cfg, &consensus_uri, commit, Arc::clone(&metrics)).await?;
334            let blob = make_blob(&cfg, &blob_uri, commit, Arc::clone(&metrics)).await?;
335            let versions = StateVersions::new(
336                cfg.clone(),
337                Arc::clone(&consensus),
338                Arc::clone(&blob),
339                Arc::clone(&metrics),
340            );
341
342            let not_restored: Vec<_> = consensus
343                .list_keys()
344                .flat_map_unordered(concurrency, |shard| {
345                    stream::once(Box::pin(async {
346                        let shard_id = shard?;
347                        let shard_id = ShardId::from_str(&shard_id).expect("invalid shard id");
348                        let start = Instant::now();
349                        info!("Restoring blob state for shard {shard_id}.",);
350                        let shard_not_restored = crate::internal::restore::restore_blob(
351                            &versions,
352                            blob.as_ref(),
353                            &cfg.build_version,
354                            shard_id,
355                            &*metrics,
356                        )
357                        .await?;
358                        info!(
359                            "Restored blob state for shard {shard_id}; {} errors, {:?} elapsed.",
360                            shard_not_restored.len(),
361                            start.elapsed()
362                        );
363                        Ok::<_, ExternalError>(shard_not_restored)
364                    }))
365                })
366                .try_fold(vec![], |mut a, b| async move {
367                    a.extend(b);
368                    Ok(a)
369                })
370                .await?;
371
372            info_log_non_zero_metrics(&metrics_registry.gather());
373            if !not_restored.is_empty() {
374                bail!("referenced blobs were not restored: {not_restored:#?}")
375            }
376        }
377    }
378    Ok(())
379}
380
381pub(crate) fn info_log_non_zero_metrics(metric_families: &[MetricFamily]) {
382    for mf in metric_families {
383        for m in mf.get_metric() {
384            let val = match mf.get_field_type() {
385                MetricType::COUNTER => m.get_counter().get_value(),
386                MetricType::GAUGE => m.get_gauge().get_value(),
387                x => {
388                    info!("unhandled {} metric type: {:?}", mf.get_name(), x);
389                    continue;
390                }
391            };
392            if val == 0.0 {
393                continue;
394            }
395            let label_pairs = m.get_label();
396            let mut labels = String::new();
397            if !label_pairs.is_empty() {
398                labels.push_str("{");
399                for lb in label_pairs {
400                    if labels != "{" {
401                        labels.push_str(",");
402                    }
403                    labels.push_str(lb.get_name());
404                    labels.push_str(":");
405                    labels.push_str(lb.get_value());
406                }
407                labels.push_str("}");
408            }
409            info!("{}{} {}", mf.get_name(), labels, val);
410        }
411    }
412}
413
414/// Manually completes all fueled compactions in a shard.
415pub async fn force_compaction<K, V, T, D>(
416    cfg: PersistConfig,
417    metrics_registry: &MetricsRegistry,
418    shard_id: ShardId,
419    consensus_uri: &SensitiveUrl,
420    blob_uri: &SensitiveUrl,
421    key_schema: Arc<K::Schema>,
422    val_schema: Arc<V::Schema>,
423    commit: bool,
424    expected_version: Option<Version>,
425) -> Result<(), anyhow::Error>
426where
427    K: Debug + Codec,
428    V: Debug + Codec,
429    T: Timestamp + Lattice + Codec64 + Sync,
430    D: Semigroup + Ord + Codec64 + Send + Sync,
431{
432    let metrics = Arc::new(Metrics::new(&cfg, metrics_registry));
433    let consensus = make_consensus(&cfg, consensus_uri, commit, Arc::clone(&metrics)).await?;
434    let blob = make_blob(&cfg, blob_uri, commit, Arc::clone(&metrics)).await?;
435
436    let machine = make_typed_machine::<K, V, T, D>(
437        &cfg,
438        consensus,
439        Arc::clone(&blob),
440        Arc::clone(&metrics),
441        shard_id,
442        commit,
443        expected_version,
444    )
445    .await?;
446
447    let writer_id = WriterId::new();
448
449    let mut attempt = 0;
450    'outer: loop {
451        machine.applier.fetch_and_update_state(None).await;
452        let reqs = machine.applier.all_fueled_merge_reqs();
453        info!("attempt {}: got {} compaction reqs", attempt, reqs.len());
454        for (idx, req) in reqs.clone().into_iter().enumerate() {
455            let req = CompactReq {
456                shard_id,
457                desc: req.desc,
458                inputs: req.inputs,
459            };
460            let parts = req
461                .inputs
462                .iter()
463                .map(|x| x.batch.part_count())
464                .sum::<usize>();
465            let bytes = req
466                .inputs
467                .iter()
468                .map(|x| x.batch.encoded_size_bytes())
469                .sum::<usize>();
470            let start = Instant::now();
471            info!(
472                "attempt {} req {}: compacting {} batches {} in parts {} totaling bytes: lower={:?} upper={:?} since={:?}",
473                attempt,
474                idx,
475                req.inputs.len(),
476                parts,
477                bytes,
478                req.desc.lower().elements(),
479                req.desc.upper().elements(),
480                req.desc.since().elements(),
481            );
482            if !commit {
483                info!("skipping compaction because --commit is not set");
484                continue;
485            }
486            let schemas = Schemas {
487                id: None,
488                key: Arc::clone(&key_schema),
489                val: Arc::clone(&val_schema),
490            };
491
492            let res = Compactor::<K, V, T, D>::compact(
493                CompactConfig::new(&cfg, shard_id),
494                Arc::clone(&blob),
495                Arc::clone(&metrics),
496                Arc::clone(&machine.applier.shard_metrics),
497                Arc::new(IsolatedRuntime::new(
498                    metrics_registry,
499                    Some(cfg.isolated_runtime_worker_threads),
500                )),
501                req,
502                schemas,
503            )
504            .await?;
505            metrics.compaction.admin_count.inc();
506            info!(
507                "attempt {} req {}: compacted into {} parts {} bytes in {:?}",
508                attempt,
509                idx,
510                res.output.part_count(),
511                res.output.encoded_size_bytes(),
512                start.elapsed(),
513            );
514            let (apply_res, maintenance) = machine
515                .merge_res(&FueledMergeRes {
516                    output: res.output,
517                    input: res.input,
518                    new_active_compaction: None,
519                })
520                .await;
521            if !maintenance.is_empty() {
522                info!("ignoring non-empty requested maintenance: {maintenance:?}")
523            }
524            if apply_res.applied() {
525                info!("attempt {} req {}: {:?}", attempt, idx, apply_res);
526            } else {
527                info!(
528                    "attempt {} req {}: {:?} trying again",
529                    attempt, idx, apply_res
530                );
531                attempt += 1;
532                continue 'outer;
533            }
534        }
535        info!("attempt {}: did {} compactions", attempt, reqs.len());
536        let _ = machine.expire_writer(&writer_id).await;
537        info!("expired writer {}", writer_id);
538        return Ok(());
539    }
540}
541
542async fn make_machine(
543    cfg: &PersistConfig,
544    consensus: Arc<dyn Consensus>,
545    blob: Arc<dyn Blob>,
546    metrics: Arc<Metrics>,
547    shard_id: ShardId,
548    commit: bool,
549    expected_version: Option<Version>,
550) -> anyhow::Result<Machine<crate::cli::inspect::K, crate::cli::inspect::V, u64, i64>> {
551    make_typed_machine::<crate::cli::inspect::K, crate::cli::inspect::V, u64, i64>(
552        cfg,
553        consensus,
554        blob,
555        metrics,
556        shard_id,
557        commit,
558        expected_version,
559    )
560    .await
561}
562
563async fn make_typed_machine<K, V, T, D>(
564    cfg: &PersistConfig,
565    consensus: Arc<dyn Consensus>,
566    blob: Arc<dyn Blob>,
567    metrics: Arc<Metrics>,
568    shard_id: ShardId,
569    commit: bool,
570    expected_version: Option<Version>,
571) -> anyhow::Result<Machine<K, V, T, D>>
572where
573    K: Debug + Codec,
574    V: Debug + Codec,
575    T: Timestamp + Lattice + Codec64 + Sync,
576    D: Semigroup + Codec64,
577{
578    let state_versions = Arc::new(StateVersions::new(
579        cfg.clone(),
580        consensus,
581        blob,
582        Arc::clone(&metrics),
583    ));
584
585    // Prime the K V codec magic
586    let versions = state_versions
587        .fetch_recent_live_diffs::<u64>(&shard_id)
588        .await;
589
590    loop {
591        let state_res = state_versions
592            .fetch_current_state::<u64>(&shard_id, versions.0.clone())
593            .await
594            .check_codecs::<crate::cli::inspect::K, crate::cli::inspect::V, i64>(&shard_id);
595        let state = match state_res {
596            Ok(state) => state,
597            Err(codec) => {
598                let mut kvtd = crate::cli::inspect::KVTD_CODECS.lock().expect("lockable");
599                *kvtd = codec.actual;
600                continue;
601            }
602        };
603        // This isn't the perfect place to put this check, the ideal would be in
604        // the apply_unbatched_cmd loop, but I don't want to pollute the prod
605        // code with this logic.
606        let safe_version_change = match (commit, expected_version) {
607            // We never actually write out state changes, so increasing the version is okay.
608            (false, _) => cfg.build_version >= state.applier_version,
609            // If the versions match that's okay because any commits won't change it.
610            (true, None) => cfg.build_version == state.applier_version,
611            // !!DANGER ZONE!!
612            (true, Some(expected)) => {
613                // If we're not _extremely_ careful, the persistcli could make shards unreadable by
614                // production. But there are times when we want to operate on a leaked shard with a
615                // newer version of the build.
616                //
617                // We only allow a mismatch in version if we provided the expected version to the
618                // command, and the expected version is less than the current build, which
619                // indicates this is an old shard.
620                state.applier_version == expected && expected <= cfg.build_version
621            }
622        };
623        if !safe_version_change {
624            // We could add a flag to override this check, if that comes up.
625            return Err(anyhow!(
626                "version of this tool {} does not match version of state {} when --commit is {commit}. bailing so we don't corrupt anything",
627                cfg.build_version,
628                state.applier_version
629            ));
630        }
631        break;
632    }
633
634    let machine = Machine::<K, V, T, D>::new(
635        cfg.clone(),
636        shard_id,
637        Arc::clone(&metrics),
638        state_versions,
639        Arc::new(StateCache::new(cfg, metrics, Arc::new(NoopPubSubSender))),
640        Arc::new(NoopPubSubSender),
641        Arc::new(IsolatedRuntime::new(
642            &MetricsRegistry::new(),
643            Some(cfg.isolated_runtime_worker_threads),
644        )),
645        Diagnostics::from_purpose("admin"),
646    )
647    .await?;
648
649    Ok(machine)
650}
651
652async fn force_gc(
653    cfg: PersistConfig,
654    metrics_registry: &MetricsRegistry,
655    shard_id: ShardId,
656    consensus_uri: &SensitiveUrl,
657    blob_uri: &SensitiveUrl,
658    commit: bool,
659    expected_version: Option<Version>,
660) -> anyhow::Result<Box<dyn Any>> {
661    let metrics = Arc::new(Metrics::new(&cfg, metrics_registry));
662    let consensus = make_consensus(&cfg, consensus_uri, commit, Arc::clone(&metrics)).await?;
663    let blob = make_blob(&cfg, blob_uri, commit, Arc::clone(&metrics)).await?;
664    let machine = make_machine(
665        &cfg,
666        consensus,
667        blob,
668        metrics,
669        shard_id,
670        commit,
671        expected_version,
672    )
673    .await?;
674    let gc_req = GcReq {
675        shard_id,
676        new_seqno_since: machine.applier.seqno_since(),
677    };
678    let (maintenance, _stats) = GarbageCollector::gc_and_truncate(&machine, gc_req).await;
679    if !maintenance.is_empty() {
680        info!("ignoring non-empty requested maintenance: {maintenance:?}")
681    }
682
683    Ok(Box::new(machine))
684}
685
686/// Exposed for `mz-catalog`.
687pub const CATALOG_FORCE_COMPACTION_FUEL: Config<usize> = Config::new(
688    "persist_catalog_force_compaction_fuel",
689    1024,
690    "fuel to use in catalog dangerous_force_compaction task",
691);
692
693/// Exposed for `mz-catalog`.
694pub const CATALOG_FORCE_COMPACTION_WAIT: Config<Duration> = Config::new(
695    "persist_catalog_force_compaction_wait",
696    Duration::from_secs(60),
697    "wait to use in catalog dangerous_force_compaction task",
698);
699
700/// Exposed for `mz-catalog`.
701pub const EXPRESSION_CACHE_FORCE_COMPACTION_FUEL: Config<usize> = Config::new(
702    "persist_expression_cache_force_compaction_fuel",
703    131_072,
704    "fuel to use in expression cache dangerous_force_compaction",
705);
706
707/// Exposed for `mz-catalog`.
708pub const EXPRESSION_CACHE_FORCE_COMPACTION_WAIT: Config<Duration> = Config::new(
709    "persist_expression_cache_force_compaction_wait",
710    Duration::from_secs(0),
711    "wait to use in expression cache dangerous_force_compaction",
712);
713
714/// Attempts to compact all batches in a shard into a minimal number.
715///
716/// This destroys most hope of filter pushdown doing anything useful. If you
717/// think you want this, you almost certainly want something else, please come
718/// talk to the persist team before using.
719///
720/// This is accomplished by adding artificial fuel (representing some count of
721/// updates) to the shard's internal Spine of batches structure on some schedule
722/// and doing any compaction work that results. Both the amount of fuel and
723/// cadence are dynamically tunable. However, it is not clear exactly how much
724/// fuel is safe to add in each call: 10 definitely seems fine, usize::MAX may
725/// not be. This also adds to the danger in "dangerous".
726///
727/// This is intentionally not even hooked up to `persistcli admin` for the above
728/// reasons.
729///
730/// Exits once the shard is sufficiently compacted, but can be called in a loop
731/// to keep it compacted.
732pub async fn dangerous_force_compaction_and_break_pushdown<K, V, T, D>(
733    write: &WriteHandle<K, V, T, D>,
734    fuel: impl Fn() -> usize,
735    wait: impl Fn() -> Duration,
736) where
737    K: Debug + Codec,
738    V: Debug + Codec,
739    T: Timestamp + Lattice + Codec64 + Sync,
740    D: Semigroup + Ord + Codec64 + Send + Sync,
741{
742    let machine = write.machine.clone();
743
744    let mut last_exert: Instant;
745
746    loop {
747        last_exert = Instant::now();
748        let fuel = fuel();
749        let (reqs, mut maintenance) = machine.spine_exert(fuel).await;
750        for req in reqs {
751            info!(
752                "force_compaction {} {} compacting {} batches in {} parts with {} runs totaling {} bytes: lower={:?} upper={:?} since={:?}",
753                machine.applier.shard_metrics.name,
754                machine.applier.shard_metrics.shard_id,
755                req.inputs.len(),
756                req.inputs.iter().flat_map(|x| &x.batch.parts).count(),
757                req.inputs
758                    .iter()
759                    .map(|x| x.batch.runs().count())
760                    .sum::<usize>(),
761                req.inputs
762                    .iter()
763                    .flat_map(|x| &x.batch.parts)
764                    .map(|x| x.encoded_size_bytes())
765                    .sum::<usize>(),
766                req.desc.lower().elements(),
767                req.desc.upper().elements(),
768                req.desc.since().elements(),
769            );
770            machine.applier.metrics.compaction.requested.inc();
771            let start = Instant::now();
772            let res = Compactor::<K, V, T, D>::compact_and_apply(
773                &machine,
774                req,
775                write.write_schemas.clone(),
776            )
777            .await;
778            let apply_maintenance = match res {
779                Ok(x) => x,
780                Err(err) => {
781                    warn!(
782                        "force_compaction {} {} errored in compaction: {:?}",
783                        machine.applier.shard_metrics.name,
784                        machine.applier.shard_metrics.shard_id,
785                        err
786                    );
787                    continue;
788                }
789            };
790            machine.applier.metrics.compaction.admin_count.inc();
791            info!(
792                "force_compaction {} {} compacted in {:?}",
793                machine.applier.shard_metrics.name,
794                machine.applier.shard_metrics.shard_id,
795                start.elapsed(),
796            );
797            maintenance.merge(apply_maintenance);
798        }
799        maintenance.perform(&machine, &write.gc).await;
800
801        // Now sleep before the next one. Make sure to delay from when we
802        // started the last exert in case the compaction takes non-trivial time.
803        let next_exert = last_exert + wait();
804        tokio::time::sleep_until(next_exert.into()).await;
805
806        // NB: This check is intentionally at the end so that it's safe to call
807        // this method in a loop.
808        let num_runs: usize = machine
809            .applier
810            .all_batches()
811            .iter()
812            .map(|x| x.runs().count())
813            .sum();
814        if num_runs <= 1 {
815            info!(
816                "force_compaction {} {} exiting with {} runs",
817                machine.applier.shard_metrics.name,
818                machine.applier.shard_metrics.shard_id,
819                num_runs
820            );
821            return;
822        }
823    }
824}
825
826#[cfg(test)]
827mod tests {
828    use std::time::Duration;
829
830    use mz_dyncfg::ConfigUpdates;
831    use mz_persist_types::ShardId;
832
833    use crate::tests::new_test_client;
834
835    #[mz_persist_proc::test(tokio::test)]
836    #[cfg_attr(miri, ignore)]
837    async fn dangerous_force_compaction_and_break_pushdown(dyncfgs: ConfigUpdates) {
838        let client = new_test_client(&dyncfgs).await;
839        for num_batches in 0..=17 {
840            let (mut write, _read) = client
841                .expect_open::<String, (), u64, i64>(ShardId::new())
842                .await;
843            let machine = write.machine.clone();
844
845            for idx in 0..num_batches {
846                let () = write
847                    .expect_compare_and_append(&[((idx.to_string(), ()), idx, 1)], idx, idx + 1)
848                    .await;
849            }
850
851            // Run the tool and verify that we get down to at most two.
852            super::dangerous_force_compaction_and_break_pushdown(&write, || 1, || Duration::ZERO)
853                .await;
854            let batches_after = machine.applier.all_batches().len();
855            assert!(batches_after < 2, "{} vs {}", num_batches, batches_after);
856        }
857    }
858}