mz_persist_client/cli/
admin.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! CLI introspection tools for persist
11
12use std::any::Any;
13use std::fmt::Debug;
14use std::str::FromStr;
15use std::sync::Arc;
16use std::time::{Duration, Instant};
17
18use anyhow::{anyhow, bail};
19use differential_dataflow::difference::Monoid;
20use differential_dataflow::lattice::Lattice;
21use futures_util::{StreamExt, TryStreamExt, stream};
22use mz_dyncfg::{Config, ConfigSet};
23use mz_ore::metrics::MetricsRegistry;
24use mz_ore::now::SYSTEM_TIME;
25use mz_ore::url::SensitiveUrl;
26use mz_persist::location::{Blob, Consensus, ExternalError};
27use mz_persist_types::codec_impls::TodoSchema;
28use mz_persist_types::{Codec, Codec64};
29use prometheus::proto::{MetricFamily, MetricType};
30use semver::Version;
31use timely::progress::{Antichain, Timestamp};
32use tracing::{info, warn};
33
34use crate::async_runtime::IsolatedRuntime;
35use crate::cache::StateCache;
36use crate::cfg::{COMPACTION_MEMORY_BOUND_BYTES, all_dyncfgs};
37use crate::cli::args::{StateArgs, StoreArgs, make_blob, make_consensus};
38use crate::cli::inspect::FAKE_OPAQUE_CODEC;
39use crate::internal::compact::{CompactConfig, CompactReq, Compactor};
40use crate::internal::encoding::Schemas;
41use crate::internal::gc::{GarbageCollector, GcReq};
42use crate::internal::machine::Machine;
43use crate::internal::trace::FueledMergeRes;
44use crate::rpc::{NoopPubSubSender, PubSubSender};
45use crate::write::{WriteHandle, WriterId};
46use crate::{
47    BUILD_INFO, Diagnostics, Metrics, PersistClient, PersistConfig, ShardId, StateVersions,
48};
49
50/// Commands for read-write administration of persist state
51#[derive(Debug, clap::Args)]
52pub struct AdminArgs {
53    #[clap(subcommand)]
54    command: Command,
55
56    /// Whether to commit any modifications (defaults to dry run).
57    #[clap(long)]
58    pub(crate) commit: bool,
59
60    /// !!DANGER ZONE!! - Has the posibility of breaking production!
61    ///
62    /// Allows specifying an expected `applier_version` of the shard we're operating on, so we can
63    /// modify old/leaked shards.
64    #[clap(long)]
65    pub(crate) expected_version: Option<String>,
66}
67
68/// Individual subcommands of admin
69#[derive(Debug, clap::Subcommand)]
70pub(crate) enum Command {
71    /// Manually completes all fueled compactions in a shard.
72    ForceCompaction(ForceCompactionArgs),
73    /// Manually kick off a GC run for a shard.
74    ForceGc(ForceGcArgs),
75    /// Manually finalize an unfinalized shard.
76    Finalize(FinalizeArgs),
77    /// Attempt to ensure that all the files referenced by consensus are available
78    /// in Blob.
79    RestoreBlob(RestoreBlobArgs),
80}
81
82/// Manually completes all fueled compactions in a shard.
83#[derive(Debug, clap::Parser)]
84pub(crate) struct ForceCompactionArgs {
85    #[clap(flatten)]
86    state: StateArgs,
87
88    /// An upper bound on compaction's memory consumption.
89    #[clap(long, default_value_t = 0)]
90    compaction_memory_bound_bytes: usize,
91}
92
93/// Manually completes all fueled compactions in a shard.
94#[derive(Debug, clap::Parser)]
95pub(crate) struct ForceGcArgs {
96    #[clap(flatten)]
97    state: StateArgs,
98}
99
100/// Manually finalizes a shard.
101#[derive(Debug, clap::Parser)]
102pub(crate) struct FinalizeArgs {
103    #[clap(flatten)]
104    state: StateArgs,
105
106    /// Force downgrade the `since` of the shard to the empty antichain.
107    #[clap(long, default_value_t = false)]
108    force_downgrade_since: bool,
109
110    /// Force downgrade the `upper` of the shard to the empty antichain.
111    #[clap(long, default_value_t = false)]
112    force_downgrade_upper: bool,
113}
114
115/// Attempt to restore all the blobs that are referenced by the current state of consensus.
116#[derive(Debug, clap::Parser)]
117pub(crate) struct RestoreBlobArgs {
118    #[clap(flatten)]
119    state: StoreArgs,
120
121    /// The number of concurrent restore operations to run at once.
122    #[clap(long, default_value_t = 16)]
123    concurrency: usize,
124}
125
126/// Runs the given read-write admin command.
127pub async fn run(command: AdminArgs) -> Result<(), anyhow::Error> {
128    match command.command {
129        Command::ForceCompaction(args) => {
130            let shard_id = ShardId::from_str(&args.state.shard_id).expect("invalid shard id");
131            let configs = all_dyncfgs(ConfigSet::default());
132            // TODO: Fetch the latest values of these configs from Launch Darkly.
133            let cfg = PersistConfig::new(&BUILD_INFO, SYSTEM_TIME.clone(), configs);
134            cfg.set_config(
135                &COMPACTION_MEMORY_BOUND_BYTES,
136                args.compaction_memory_bound_bytes,
137            );
138
139            let metrics_registry = MetricsRegistry::new();
140            let expected_version = command
141                .expected_version
142                .as_ref()
143                .map(|v| Version::parse(v))
144                .transpose()?;
145            let () = force_compaction::<crate::cli::inspect::K, crate::cli::inspect::V, u64, i64>(
146                cfg,
147                &metrics_registry,
148                shard_id,
149                &args.state.consensus_uri,
150                &args.state.blob_uri,
151                Arc::new(TodoSchema::default()),
152                Arc::new(TodoSchema::default()),
153                command.commit,
154                expected_version,
155            )
156            .await?;
157            info_log_non_zero_metrics(&metrics_registry.gather());
158        }
159        Command::ForceGc(args) => {
160            let shard_id = ShardId::from_str(&args.state.shard_id).expect("invalid shard id");
161            let configs = all_dyncfgs(ConfigSet::default());
162            // TODO: Fetch the latest values of these configs from Launch Darkly.
163            let cfg = PersistConfig::new(&BUILD_INFO, SYSTEM_TIME.clone(), configs);
164            let metrics_registry = MetricsRegistry::new();
165            let expected_version = command
166                .expected_version
167                .as_ref()
168                .map(|v| Version::parse(v))
169                .transpose()?;
170            // We don't actually care about the return value here, but we do need to prevent
171            // the shard metrics from being dropped before they're reported below.
172            let _machine = force_gc(
173                cfg,
174                &metrics_registry,
175                shard_id,
176                &args.state.consensus_uri,
177                &args.state.blob_uri,
178                command.commit,
179                expected_version,
180            )
181            .await?;
182            info_log_non_zero_metrics(&metrics_registry.gather());
183        }
184        Command::Finalize(args) => {
185            let FinalizeArgs {
186                state:
187                    StateArgs {
188                        shard_id,
189                        consensus_uri,
190                        blob_uri,
191                    },
192                force_downgrade_since,
193                force_downgrade_upper,
194            } = args;
195            let shard_id = ShardId::from_str(&shard_id).expect("invalid shard id");
196            let commit = command.commit;
197            let expected_version = command
198                .expected_version
199                .as_ref()
200                .map(|v| Version::parse(v))
201                .transpose()?;
202
203            let configs = all_dyncfgs(ConfigSet::default());
204            // TODO: Fetch the latest values of these configs from Launch Darkly.
205            let cfg = PersistConfig::new(&BUILD_INFO, SYSTEM_TIME.clone(), configs);
206            let metrics_registry = MetricsRegistry::new();
207            let metrics = Arc::new(Metrics::new(&cfg, &metrics_registry));
208            let consensus =
209                make_consensus(&cfg, &consensus_uri, commit, Arc::clone(&metrics)).await?;
210            let blob = make_blob(&cfg, &blob_uri, commit, Arc::clone(&metrics)).await?;
211
212            // Open a machine so we can read the state of the Opaque, and set
213            // our fake codecs.
214            let machine = make_machine(
215                &cfg,
216                Arc::clone(&consensus),
217                Arc::clone(&blob),
218                Arc::clone(&metrics),
219                shard_id,
220                commit,
221                expected_version,
222            )
223            .await?;
224
225            if force_downgrade_upper {
226                let isolated_runtime = Arc::new(IsolatedRuntime::new(&metrics_registry, None));
227                let pubsub_sender: Arc<dyn PubSubSender> = Arc::new(NoopPubSubSender);
228                let shared_states = Arc::new(StateCache::new(
229                    &cfg,
230                    Arc::clone(&metrics),
231                    Arc::clone(&pubsub_sender),
232                ));
233
234                let persist_client = PersistClient::new(
235                    cfg,
236                    blob,
237                    consensus,
238                    metrics,
239                    isolated_runtime,
240                    shared_states,
241                    pubsub_sender,
242                )?;
243                let diagnostics = Diagnostics {
244                    shard_name: shard_id.to_string(),
245                    handle_purpose: "persist-cli finalize shard".to_string(),
246                };
247
248                let mut write_handle: WriteHandle<
249                    crate::cli::inspect::K,
250                    crate::cli::inspect::V,
251                    u64,
252                    i64,
253                > = persist_client
254                    .open_writer(
255                        shard_id,
256                        Arc::new(TodoSchema::<crate::cli::inspect::K>::default()),
257                        Arc::new(TodoSchema::<crate::cli::inspect::V>::default()),
258                        diagnostics,
259                    )
260                    .await?;
261                write_handle.advance_upper(&Antichain::new()).await;
262            }
263
264            if force_downgrade_since {
265                let (state, _maintenance) = machine
266                    .register_critical_reader::<crate::cli::inspect::O>(
267                        &crate::PersistClient::CONTROLLER_CRITICAL_SINCE,
268                        "persist-cli finalize with force downgrade",
269                    )
270                    .await;
271
272                // HACK: Internally we have a check that the Opaque is using
273                // the correct codec. For the purposes of this command we want
274                // to side step that check so we set our reported codec to
275                // whatever the current state of the Shard is.
276                let expected_opaque = crate::cli::inspect::O::decode(state.opaque.0);
277                FAKE_OPAQUE_CODEC
278                    .lock()
279                    .expect("lockable")
280                    .clone_from(&state.opaque_codec);
281
282                let (result, _maintenance) = machine
283                    .compare_and_downgrade_since(
284                        &crate::PersistClient::CONTROLLER_CRITICAL_SINCE,
285                        &expected_opaque,
286                        (&expected_opaque, &Antichain::new()),
287                    )
288                    .await;
289                if let Err((actual_opaque, _since)) = result {
290                    bail!(
291                        "opaque changed, expected: {expected_opaque:?}, actual: {actual_opaque:?}"
292                    )
293                }
294            }
295
296            let maintenance = machine.become_tombstone().await?;
297            if !maintenance.is_empty() {
298                info!("ignoring non-empty requested maintenance: {maintenance:?}")
299            }
300            info_log_non_zero_metrics(&metrics_registry.gather());
301        }
302        Command::RestoreBlob(args) => {
303            let RestoreBlobArgs {
304                state:
305                    StoreArgs {
306                        consensus_uri,
307                        blob_uri,
308                    },
309                concurrency,
310            } = args;
311            let commit = command.commit;
312            let configs = all_dyncfgs(ConfigSet::default());
313            // TODO: Fetch the latest values of these configs from Launch Darkly.
314            let cfg = PersistConfig::new(&BUILD_INFO, SYSTEM_TIME.clone(), configs);
315            let metrics_registry = MetricsRegistry::new();
316            let metrics = Arc::new(Metrics::new(&cfg, &metrics_registry));
317            let consensus =
318                make_consensus(&cfg, &consensus_uri, commit, Arc::clone(&metrics)).await?;
319            let blob = make_blob(&cfg, &blob_uri, commit, Arc::clone(&metrics)).await?;
320            let versions = StateVersions::new(
321                cfg.clone(),
322                Arc::clone(&consensus),
323                Arc::clone(&blob),
324                Arc::clone(&metrics),
325            );
326
327            let not_restored: Vec<_> = consensus
328                .list_keys()
329                .flat_map_unordered(concurrency, |shard| {
330                    stream::once(Box::pin(async {
331                        let shard_id = shard?;
332                        let shard_id = ShardId::from_str(&shard_id).expect("invalid shard id");
333                        let start = Instant::now();
334                        info!("Restoring blob state for shard {shard_id}.",);
335                        let shard_not_restored = crate::internal::restore::restore_blob(
336                            &versions,
337                            blob.as_ref(),
338                            &cfg.build_version,
339                            shard_id,
340                            &*metrics,
341                        )
342                        .await?;
343                        info!(
344                            "Restored blob state for shard {shard_id}; {} errors, {:?} elapsed.",
345                            shard_not_restored.len(),
346                            start.elapsed()
347                        );
348                        Ok::<_, ExternalError>(shard_not_restored)
349                    }))
350                })
351                .try_fold(vec![], |mut a, b| async move {
352                    a.extend(b);
353                    Ok(a)
354                })
355                .await?;
356
357            info_log_non_zero_metrics(&metrics_registry.gather());
358            if !not_restored.is_empty() {
359                bail!("referenced blobs were not restored: {not_restored:#?}")
360            }
361        }
362    }
363    Ok(())
364}
365
366pub(crate) fn info_log_non_zero_metrics(metric_families: &[MetricFamily]) {
367    for mf in metric_families {
368        for m in mf.get_metric() {
369            let val = match mf.get_field_type() {
370                MetricType::COUNTER => m.get_counter().get_value(),
371                MetricType::GAUGE => m.get_gauge().get_value(),
372                x => {
373                    info!("unhandled {} metric type: {:?}", mf.name(), x);
374                    continue;
375                }
376            };
377            if val == 0.0 {
378                continue;
379            }
380            let label_pairs = m.get_label();
381            let mut labels = String::new();
382            if !label_pairs.is_empty() {
383                labels.push_str("{");
384                for lb in label_pairs {
385                    if labels != "{" {
386                        labels.push_str(",");
387                    }
388                    labels.push_str(lb.name());
389                    labels.push_str(":");
390                    labels.push_str(lb.name());
391                }
392                labels.push_str("}");
393            }
394            info!("{}{} {}", mf.name(), labels, val);
395        }
396    }
397}
398
399/// Manually completes all fueled compactions in a shard.
400pub async fn force_compaction<K, V, T, D>(
401    cfg: PersistConfig,
402    metrics_registry: &MetricsRegistry,
403    shard_id: ShardId,
404    consensus_uri: &SensitiveUrl,
405    blob_uri: &SensitiveUrl,
406    key_schema: Arc<K::Schema>,
407    val_schema: Arc<V::Schema>,
408    commit: bool,
409    expected_version: Option<Version>,
410) -> Result<(), anyhow::Error>
411where
412    K: Debug + Codec,
413    V: Debug + Codec,
414    T: Timestamp + Lattice + Codec64 + Sync,
415    D: Monoid + Ord + Codec64 + Send + Sync,
416{
417    let metrics = Arc::new(Metrics::new(&cfg, metrics_registry));
418    let consensus = make_consensus(&cfg, consensus_uri, commit, Arc::clone(&metrics)).await?;
419    let blob = make_blob(&cfg, blob_uri, commit, Arc::clone(&metrics)).await?;
420
421    let machine = make_typed_machine::<K, V, T, D>(
422        &cfg,
423        consensus,
424        Arc::clone(&blob),
425        Arc::clone(&metrics),
426        shard_id,
427        commit,
428        expected_version,
429    )
430    .await?;
431
432    let writer_id = WriterId::new();
433
434    let mut attempt = 0;
435    'outer: loop {
436        machine.applier.fetch_and_update_state(None).await;
437        let reqs = machine.applier.all_fueled_merge_reqs();
438        info!("attempt {}: got {} compaction reqs", attempt, reqs.len());
439        for (idx, req) in reqs.clone().into_iter().enumerate() {
440            let req = CompactReq {
441                shard_id,
442                desc: req.desc,
443                inputs: req.inputs,
444            };
445            let parts = req
446                .inputs
447                .iter()
448                .map(|x| x.batch.part_count())
449                .sum::<usize>();
450            let bytes = req
451                .inputs
452                .iter()
453                .map(|x| x.batch.encoded_size_bytes())
454                .sum::<usize>();
455            let start = Instant::now();
456            info!(
457                "attempt {} req {}: compacting {} batches {} in parts {} totaling bytes: lower={:?} upper={:?} since={:?}",
458                attempt,
459                idx,
460                req.inputs.len(),
461                parts,
462                bytes,
463                req.desc.lower().elements(),
464                req.desc.upper().elements(),
465                req.desc.since().elements(),
466            );
467            if !commit {
468                info!("skipping compaction because --commit is not set");
469                continue;
470            }
471            let schemas = Schemas {
472                id: None,
473                key: Arc::clone(&key_schema),
474                val: Arc::clone(&val_schema),
475            };
476
477            let res = Compactor::<K, V, T, D>::compact(
478                CompactConfig::new(&cfg, shard_id),
479                Arc::clone(&blob),
480                Arc::clone(&metrics),
481                Arc::clone(&machine.applier.shard_metrics),
482                Arc::new(IsolatedRuntime::new(
483                    metrics_registry,
484                    Some(cfg.isolated_runtime_worker_threads),
485                )),
486                req,
487                schemas,
488            )
489            .await?;
490            metrics.compaction.admin_count.inc();
491            info!(
492                "attempt {} req {}: compacted into {} parts {} bytes in {:?}",
493                attempt,
494                idx,
495                res.output.part_count(),
496                res.output.encoded_size_bytes(),
497                start.elapsed(),
498            );
499            let (apply_res, maintenance) = machine
500                .merge_res(&FueledMergeRes {
501                    output: res.output,
502                    input: res.input,
503                    new_active_compaction: None,
504                })
505                .await;
506            if !maintenance.is_empty() {
507                info!("ignoring non-empty requested maintenance: {maintenance:?}")
508            }
509            if apply_res.applied() {
510                info!("attempt {} req {}: {:?}", attempt, idx, apply_res);
511            } else {
512                info!(
513                    "attempt {} req {}: {:?} trying again",
514                    attempt, idx, apply_res
515                );
516                attempt += 1;
517                continue 'outer;
518            }
519        }
520        info!("attempt {}: did {} compactions", attempt, reqs.len());
521        let _ = machine.expire_writer(&writer_id).await;
522        info!("expired writer {}", writer_id);
523        return Ok(());
524    }
525}
526
527async fn make_machine(
528    cfg: &PersistConfig,
529    consensus: Arc<dyn Consensus>,
530    blob: Arc<dyn Blob>,
531    metrics: Arc<Metrics>,
532    shard_id: ShardId,
533    commit: bool,
534    expected_version: Option<Version>,
535) -> anyhow::Result<Machine<crate::cli::inspect::K, crate::cli::inspect::V, u64, i64>> {
536    make_typed_machine::<crate::cli::inspect::K, crate::cli::inspect::V, u64, i64>(
537        cfg,
538        consensus,
539        blob,
540        metrics,
541        shard_id,
542        commit,
543        expected_version,
544    )
545    .await
546}
547
548async fn make_typed_machine<K, V, T, D>(
549    cfg: &PersistConfig,
550    consensus: Arc<dyn Consensus>,
551    blob: Arc<dyn Blob>,
552    metrics: Arc<Metrics>,
553    shard_id: ShardId,
554    commit: bool,
555    expected_version: Option<Version>,
556) -> anyhow::Result<Machine<K, V, T, D>>
557where
558    K: Debug + Codec,
559    V: Debug + Codec,
560    T: Timestamp + Lattice + Codec64 + Sync,
561    D: Monoid + Codec64,
562{
563    let state_versions = Arc::new(StateVersions::new(
564        cfg.clone(),
565        consensus,
566        blob,
567        Arc::clone(&metrics),
568    ));
569
570    // Prime the K V codec magic
571    let versions = state_versions
572        .fetch_recent_live_diffs::<u64>(&shard_id)
573        .await;
574
575    loop {
576        let state_res = state_versions
577            .fetch_current_state::<u64>(&shard_id, versions.0.clone())
578            .await
579            .check_codecs::<crate::cli::inspect::K, crate::cli::inspect::V, i64>(&shard_id);
580        let state = match state_res {
581            Ok(state) => state,
582            Err(codec) => {
583                let mut kvtd = crate::cli::inspect::KVTD_CODECS.lock().expect("lockable");
584                *kvtd = codec.actual;
585                continue;
586            }
587        };
588        // This isn't the perfect place to put this check, the ideal would be in
589        // the apply_unbatched_cmd loop, but I don't want to pollute the prod
590        // code with this logic.
591        let safe_version_change = match (commit, expected_version) {
592            // We never actually write out state changes, so increasing the version is okay.
593            (false, _) => cfg.build_version >= state.applier_version,
594            // If the versions match that's okay because any commits won't change it.
595            (true, None) => cfg.build_version == state.applier_version,
596            // !!DANGER ZONE!!
597            (true, Some(expected)) => {
598                // If we're not _extremely_ careful, the persistcli could make shards unreadable by
599                // production. But there are times when we want to operate on a leaked shard with a
600                // newer version of the build.
601                //
602                // We only allow a mismatch in version if we provided the expected version to the
603                // command, and the expected version is less than the current build, which
604                // indicates this is an old shard.
605                state.applier_version == expected && expected <= cfg.build_version
606            }
607        };
608        if !safe_version_change {
609            // We could add a flag to override this check, if that comes up.
610            return Err(anyhow!(
611                "version of this tool {} does not match version of state {} when --commit is {commit}. bailing so we don't corrupt anything",
612                cfg.build_version,
613                state.applier_version
614            ));
615        }
616        break;
617    }
618
619    let machine = Machine::<K, V, T, D>::new(
620        cfg.clone(),
621        shard_id,
622        Arc::clone(&metrics),
623        state_versions,
624        Arc::new(StateCache::new(cfg, metrics, Arc::new(NoopPubSubSender))),
625        Arc::new(NoopPubSubSender),
626        Arc::new(IsolatedRuntime::new(
627            &MetricsRegistry::new(),
628            Some(cfg.isolated_runtime_worker_threads),
629        )),
630        Diagnostics::from_purpose("admin"),
631    )
632    .await?;
633
634    Ok(machine)
635}
636
637async fn force_gc(
638    cfg: PersistConfig,
639    metrics_registry: &MetricsRegistry,
640    shard_id: ShardId,
641    consensus_uri: &SensitiveUrl,
642    blob_uri: &SensitiveUrl,
643    commit: bool,
644    expected_version: Option<Version>,
645) -> anyhow::Result<Box<dyn Any>> {
646    let metrics = Arc::new(Metrics::new(&cfg, metrics_registry));
647    let consensus = make_consensus(&cfg, consensus_uri, commit, Arc::clone(&metrics)).await?;
648    let blob = make_blob(&cfg, blob_uri, commit, Arc::clone(&metrics)).await?;
649    let machine = make_machine(
650        &cfg,
651        consensus,
652        blob,
653        metrics,
654        shard_id,
655        commit,
656        expected_version,
657    )
658    .await?;
659    let gc_req = GcReq {
660        shard_id,
661        new_seqno_since: machine.applier.seqno_since(),
662    };
663    let (maintenance, _stats) = GarbageCollector::gc_and_truncate(&machine, gc_req).await;
664    if !maintenance.is_empty() {
665        info!("ignoring non-empty requested maintenance: {maintenance:?}")
666    }
667
668    Ok(Box::new(machine))
669}
670
671/// Exposed for `mz-catalog`.
672pub const CATALOG_FORCE_COMPACTION_FUEL: Config<usize> = Config::new(
673    "persist_catalog_force_compaction_fuel",
674    1024,
675    "fuel to use in catalog dangerous_force_compaction task",
676);
677
678/// Exposed for `mz-catalog`.
679pub const CATALOG_FORCE_COMPACTION_WAIT: Config<Duration> = Config::new(
680    "persist_catalog_force_compaction_wait",
681    Duration::from_secs(60),
682    "wait to use in catalog dangerous_force_compaction task",
683);
684
685/// Exposed for `mz-catalog`.
686pub const EXPRESSION_CACHE_FORCE_COMPACTION_FUEL: Config<usize> = Config::new(
687    "persist_expression_cache_force_compaction_fuel",
688    131_072,
689    "fuel to use in expression cache dangerous_force_compaction",
690);
691
692/// Exposed for `mz-catalog`.
693pub const EXPRESSION_CACHE_FORCE_COMPACTION_WAIT: Config<Duration> = Config::new(
694    "persist_expression_cache_force_compaction_wait",
695    Duration::from_secs(0),
696    "wait to use in expression cache dangerous_force_compaction",
697);
698
699/// Attempts to compact all batches in a shard into a minimal number.
700///
701/// This destroys most hope of filter pushdown doing anything useful. If you
702/// think you want this, you almost certainly want something else, please come
703/// talk to the persist team before using.
704///
705/// This is accomplished by adding artificial fuel (representing some count of
706/// updates) to the shard's internal Spine of batches structure on some schedule
707/// and doing any compaction work that results. Both the amount of fuel and
708/// cadence are dynamically tunable. However, it is not clear exactly how much
709/// fuel is safe to add in each call: 10 definitely seems fine, usize::MAX may
710/// not be. This also adds to the danger in "dangerous".
711///
712/// This is intentionally not even hooked up to `persistcli admin` for the above
713/// reasons.
714///
715/// Exits once the shard is sufficiently compacted, but can be called in a loop
716/// to keep it compacted.
717pub async fn dangerous_force_compaction_and_break_pushdown<K, V, T, D>(
718    write: &WriteHandle<K, V, T, D>,
719    fuel: impl Fn() -> usize,
720    wait: impl Fn() -> Duration,
721) where
722    K: Debug + Codec,
723    V: Debug + Codec,
724    T: Timestamp + Lattice + Codec64 + Sync,
725    D: Monoid + Ord + Codec64 + Send + Sync,
726{
727    let machine = write.machine.clone();
728
729    let mut last_exert: Instant;
730
731    loop {
732        last_exert = Instant::now();
733        let fuel = fuel();
734        let (reqs, mut maintenance) = machine.spine_exert(fuel).await;
735        for req in reqs {
736            info!(
737                "force_compaction {} {} compacting {} batches in {} parts with {} runs totaling {} bytes: lower={:?} upper={:?} since={:?}",
738                machine.applier.shard_metrics.name,
739                machine.applier.shard_metrics.shard_id,
740                req.inputs.len(),
741                req.inputs.iter().flat_map(|x| &x.batch.parts).count(),
742                req.inputs
743                    .iter()
744                    .map(|x| x.batch.runs().count())
745                    .sum::<usize>(),
746                req.inputs
747                    .iter()
748                    .flat_map(|x| &x.batch.parts)
749                    .map(|x| x.encoded_size_bytes())
750                    .sum::<usize>(),
751                req.desc.lower().elements(),
752                req.desc.upper().elements(),
753                req.desc.since().elements(),
754            );
755            machine.applier.metrics.compaction.requested.inc();
756            let start = Instant::now();
757            let res = Compactor::<K, V, T, D>::compact_and_apply(
758                &machine,
759                req,
760                write.write_schemas.clone(),
761            )
762            .await;
763            let apply_maintenance = match res {
764                Ok(x) => x,
765                Err(err) => {
766                    warn!(
767                        "force_compaction {} {} errored in compaction: {:?}",
768                        machine.applier.shard_metrics.name,
769                        machine.applier.shard_metrics.shard_id,
770                        err
771                    );
772                    continue;
773                }
774            };
775            machine.applier.metrics.compaction.admin_count.inc();
776            info!(
777                "force_compaction {} {} compacted in {:?}",
778                machine.applier.shard_metrics.name,
779                machine.applier.shard_metrics.shard_id,
780                start.elapsed(),
781            );
782            maintenance.merge(apply_maintenance);
783        }
784        maintenance.perform(&machine, &write.gc).await;
785
786        // Now sleep before the next one. Make sure to delay from when we
787        // started the last exert in case the compaction takes non-trivial time.
788        let next_exert = last_exert + wait();
789        tokio::time::sleep_until(next_exert.into()).await;
790
791        // NB: This check is intentionally at the end so that it's safe to call
792        // this method in a loop.
793        let num_runs: usize = machine
794            .applier
795            .all_batches()
796            .iter()
797            .map(|x| x.runs().count())
798            .sum();
799        if num_runs <= 1 {
800            info!(
801                "force_compaction {} {} exiting with {} runs",
802                machine.applier.shard_metrics.name,
803                machine.applier.shard_metrics.shard_id,
804                num_runs
805            );
806            return;
807        }
808    }
809}
810
811#[cfg(test)]
812mod tests {
813    use std::time::Duration;
814
815    use mz_dyncfg::ConfigUpdates;
816    use mz_persist_types::ShardId;
817
818    use crate::tests::new_test_client;
819
820    #[mz_persist_proc::test(tokio::test)]
821    #[cfg_attr(miri, ignore)]
822    async fn dangerous_force_compaction_and_break_pushdown(dyncfgs: ConfigUpdates) {
823        let client = new_test_client(&dyncfgs).await;
824        for num_batches in 0..=17 {
825            let (mut write, _read) = client
826                .expect_open::<String, (), u64, i64>(ShardId::new())
827                .await;
828            let machine = write.machine.clone();
829
830            for idx in 0..num_batches {
831                let () = write
832                    .expect_compare_and_append(&[((idx.to_string(), ()), idx, 1)], idx, idx + 1)
833                    .await;
834            }
835
836            // Run the tool and verify that we get down to at most two.
837            super::dangerous_force_compaction_and_break_pushdown(&write, || 1, || Duration::ZERO)
838                .await;
839            let batches_after = machine.applier.all_batches().len();
840            assert!(batches_after < 2, "{} vs {}", num_batches, batches_after);
841        }
842    }
843}