Skip to main content

mz_persist_client/cli/
admin.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! CLI introspection tools for persist
11
12use std::any::Any;
13use std::fmt::Debug;
14use std::str::FromStr;
15use std::sync::Arc;
16use std::time::{Duration, Instant};
17
18use anyhow::{anyhow, bail};
19use differential_dataflow::difference::Monoid;
20use differential_dataflow::lattice::Lattice;
21use futures_util::{StreamExt, TryStreamExt, stream};
22use mz_dyncfg::{Config, ConfigSet};
23use mz_ore::metrics::MetricsRegistry;
24use mz_ore::now::SYSTEM_TIME;
25use mz_ore::url::SensitiveUrl;
26use mz_persist::location::{Blob, Consensus, ExternalError};
27use mz_persist_types::codec_impls::TodoSchema;
28use mz_persist_types::{Codec, Codec64};
29use prometheus::proto::{MetricFamily, MetricType};
30use semver::Version;
31use timely::progress::{Antichain, Timestamp};
32use tracing::{info, warn};
33
34use crate::async_runtime::IsolatedRuntime;
35use crate::cache::StateCache;
36use crate::cfg::{COMPACTION_MEMORY_BOUND_BYTES, all_dyncfgs};
37use crate::cli::args::{StateArgs, StoreArgs, make_blob, make_consensus};
38use crate::critical::Opaque;
39use crate::internal::compact::{CompactConfig, CompactReq, Compactor};
40use crate::internal::encoding::Schemas;
41use crate::internal::gc::{GarbageCollector, GcReq};
42use crate::internal::machine::Machine;
43use crate::internal::trace::FueledMergeRes;
44use crate::rpc::{NoopPubSubSender, PubSubSender};
45use crate::write::{WriteHandle, WriterId};
46use crate::{
47    BUILD_INFO, Diagnostics, Metrics, PersistClient, PersistConfig, ShardId, StateVersions,
48};
49
50/// Commands for read-write administration of persist state
51#[derive(Debug, clap::Args)]
52pub struct AdminArgs {
53    #[clap(subcommand)]
54    command: Command,
55
56    /// Whether to commit any modifications (defaults to dry run).
57    #[clap(long)]
58    pub(crate) commit: bool,
59
60    /// !!DANGER ZONE!! - Has the posibility of breaking production!
61    ///
62    /// Allows specifying an expected `applier_version` of the shard we're operating on, so we can
63    /// modify old/leaked shards.
64    #[clap(long)]
65    pub(crate) expected_version: Option<String>,
66}
67
68/// Individual subcommands of admin
69#[derive(Debug, clap::Subcommand)]
70pub(crate) enum Command {
71    /// Manually completes all fueled compactions in a shard.
72    ForceCompaction(ForceCompactionArgs),
73    /// Manually kick off a GC run for a shard.
74    ForceGc(ForceGcArgs),
75    /// Manually finalize an unfinalized shard.
76    Finalize(FinalizeArgs),
77    /// Attempt to ensure that all the files referenced by consensus are available
78    /// in Blob.
79    RestoreBlob(RestoreBlobArgs),
80}
81
82/// Manually completes all fueled compactions in a shard.
83#[derive(Debug, clap::Parser)]
84pub(crate) struct ForceCompactionArgs {
85    #[clap(flatten)]
86    state: StateArgs,
87
88    /// An upper bound on compaction's memory consumption.
89    #[clap(long, default_value_t = 0)]
90    compaction_memory_bound_bytes: usize,
91}
92
93/// Manually completes all fueled compactions in a shard.
94#[derive(Debug, clap::Parser)]
95pub(crate) struct ForceGcArgs {
96    #[clap(flatten)]
97    state: StateArgs,
98}
99
100/// Manually finalizes a shard.
101#[derive(Debug, clap::Parser)]
102pub(crate) struct FinalizeArgs {
103    #[clap(flatten)]
104    state: StateArgs,
105
106    /// Force downgrade the `since` of the shard to the empty antichain.
107    #[clap(long, default_value_t = false)]
108    force_downgrade_since: bool,
109
110    /// Force downgrade the `upper` of the shard to the empty antichain.
111    #[clap(long, default_value_t = false)]
112    force_downgrade_upper: bool,
113}
114
115/// Attempt to restore all the blobs that are referenced by the current state of consensus.
116#[derive(Debug, clap::Parser)]
117pub(crate) struct RestoreBlobArgs {
118    #[clap(flatten)]
119    state: StoreArgs,
120
121    /// The number of concurrent restore operations to run at once.
122    #[clap(long, default_value_t = 16)]
123    concurrency: usize,
124}
125
126/// Runs the given read-write admin command.
127pub async fn run(command: AdminArgs) -> Result<(), anyhow::Error> {
128    match command.command {
129        Command::ForceCompaction(args) => {
130            let shard_id = ShardId::from_str(&args.state.shard_id).expect("invalid shard id");
131            let configs = all_dyncfgs(ConfigSet::default());
132            // TODO: Fetch the latest values of these configs from Launch Darkly.
133            let cfg = PersistConfig::new(&BUILD_INFO, SYSTEM_TIME.clone(), configs);
134            cfg.set_config(
135                &COMPACTION_MEMORY_BOUND_BYTES,
136                args.compaction_memory_bound_bytes,
137            );
138
139            let metrics_registry = MetricsRegistry::new();
140            let expected_version = command
141                .expected_version
142                .as_ref()
143                .map(|v| Version::parse(v))
144                .transpose()?;
145            let () = force_compaction::<crate::cli::inspect::K, crate::cli::inspect::V, u64, i64>(
146                cfg,
147                &metrics_registry,
148                shard_id,
149                &args.state.consensus_uri,
150                &args.state.blob_uri,
151                Arc::new(TodoSchema::default()),
152                Arc::new(TodoSchema::default()),
153                command.commit,
154                expected_version,
155            )
156            .await?;
157            info_log_non_zero_metrics(&metrics_registry.gather());
158        }
159        Command::ForceGc(args) => {
160            let shard_id = ShardId::from_str(&args.state.shard_id).expect("invalid shard id");
161            let configs = all_dyncfgs(ConfigSet::default());
162            // TODO: Fetch the latest values of these configs from Launch Darkly.
163            let cfg = PersistConfig::new(&BUILD_INFO, SYSTEM_TIME.clone(), configs);
164            let metrics_registry = MetricsRegistry::new();
165            let expected_version = command
166                .expected_version
167                .as_ref()
168                .map(|v| Version::parse(v))
169                .transpose()?;
170            // We don't actually care about the return value here, but we do need to prevent
171            // the shard metrics from being dropped before they're reported below.
172            let _machine = force_gc(
173                cfg,
174                &metrics_registry,
175                shard_id,
176                &args.state.consensus_uri,
177                &args.state.blob_uri,
178                command.commit,
179                expected_version,
180            )
181            .await?;
182            info_log_non_zero_metrics(&metrics_registry.gather());
183        }
184        Command::Finalize(args) => {
185            let FinalizeArgs {
186                state:
187                    StateArgs {
188                        shard_id,
189                        consensus_uri,
190                        blob_uri,
191                    },
192                force_downgrade_since,
193                force_downgrade_upper,
194            } = args;
195            let shard_id = ShardId::from_str(&shard_id).expect("invalid shard id");
196            let commit = command.commit;
197            let expected_version = command
198                .expected_version
199                .as_ref()
200                .map(|v| Version::parse(v))
201                .transpose()?;
202
203            let configs = all_dyncfgs(ConfigSet::default());
204            // TODO: Fetch the latest values of these configs from Launch Darkly.
205            let cfg = PersistConfig::new(&BUILD_INFO, SYSTEM_TIME.clone(), configs);
206            let metrics_registry = MetricsRegistry::new();
207            let metrics = Arc::new(Metrics::new(&cfg, &metrics_registry));
208            let consensus =
209                make_consensus(&cfg, &consensus_uri, commit, Arc::clone(&metrics)).await?;
210            let blob = make_blob(&cfg, &blob_uri, commit, Arc::clone(&metrics)).await?;
211
212            // Open a machine so we can read the state of the Opaque, and set
213            // our fake codecs.
214            let machine = make_machine(
215                &cfg,
216                Arc::clone(&consensus),
217                Arc::clone(&blob),
218                Arc::clone(&metrics),
219                shard_id,
220                commit,
221                expected_version,
222            )
223            .await?;
224
225            if force_downgrade_upper {
226                let isolated_runtime = Arc::new(IsolatedRuntime::new(&metrics_registry, None));
227                let pubsub_sender: Arc<dyn PubSubSender> = Arc::new(NoopPubSubSender);
228                let shared_states = Arc::new(StateCache::new(
229                    &cfg,
230                    Arc::clone(&metrics),
231                    Arc::clone(&pubsub_sender),
232                ));
233
234                let persist_client = PersistClient::new(
235                    cfg,
236                    blob,
237                    consensus,
238                    metrics,
239                    isolated_runtime,
240                    shared_states,
241                    pubsub_sender,
242                )?;
243                let diagnostics = Diagnostics {
244                    shard_name: shard_id.to_string(),
245                    handle_purpose: "persist-cli finalize shard".to_string(),
246                };
247
248                let mut write_handle: WriteHandle<
249                    crate::cli::inspect::K,
250                    crate::cli::inspect::V,
251                    u64,
252                    i64,
253                > = persist_client
254                    .open_writer(
255                        shard_id,
256                        Arc::new(TodoSchema::<crate::cli::inspect::K>::default()),
257                        Arc::new(TodoSchema::<crate::cli::inspect::V>::default()),
258                        diagnostics,
259                    )
260                    .await?;
261                write_handle.advance_upper(&Antichain::new()).await;
262            }
263
264            if force_downgrade_since {
265                let (state, _maintenance) = machine
266                    .register_critical_reader(
267                        &crate::PersistClient::CONTROLLER_CRITICAL_SINCE,
268                        Opaque::encode(&crate::cli::inspect::O::default()),
269                        "persist-cli finalize with force downgrade",
270                    )
271                    .await;
272
273                let expected_opaque = state.opaque;
274
275                let (result, _maintenance) = machine
276                    .compare_and_downgrade_since(
277                        &crate::PersistClient::CONTROLLER_CRITICAL_SINCE,
278                        &expected_opaque,
279                        (&expected_opaque, &Antichain::new()),
280                    )
281                    .await;
282                if let Err((actual_opaque, _since)) = result {
283                    bail!(
284                        "opaque changed, expected: {expected_opaque:?}, actual: {actual_opaque:?}"
285                    )
286                }
287            }
288
289            let maintenance = machine.become_tombstone().await?;
290            if !maintenance.is_empty() {
291                info!("ignoring non-empty requested maintenance: {maintenance:?}")
292            }
293            info_log_non_zero_metrics(&metrics_registry.gather());
294        }
295        Command::RestoreBlob(args) => {
296            let RestoreBlobArgs {
297                state:
298                    StoreArgs {
299                        consensus_uri,
300                        blob_uri,
301                    },
302                concurrency,
303            } = args;
304            let commit = command.commit;
305            let configs = all_dyncfgs(ConfigSet::default());
306            // TODO: Fetch the latest values of these configs from Launch Darkly.
307            let cfg = PersistConfig::new(&BUILD_INFO, SYSTEM_TIME.clone(), configs);
308            let metrics_registry = MetricsRegistry::new();
309            let metrics = Arc::new(Metrics::new(&cfg, &metrics_registry));
310            let consensus =
311                make_consensus(&cfg, &consensus_uri, commit, Arc::clone(&metrics)).await?;
312            let blob = make_blob(&cfg, &blob_uri, commit, Arc::clone(&metrics)).await?;
313            let versions = StateVersions::new(
314                cfg.clone(),
315                Arc::clone(&consensus),
316                Arc::clone(&blob),
317                Arc::clone(&metrics),
318            );
319
320            let not_restored: Vec<_> = consensus
321                .list_keys()
322                .flat_map_unordered(concurrency, |shard| {
323                    stream::once(Box::pin(async {
324                        let shard_id = shard?;
325                        let shard_id = ShardId::from_str(&shard_id).expect("invalid shard id");
326                        let start = Instant::now();
327                        info!("Restoring blob state for shard {shard_id}.",);
328                        let shard_not_restored = crate::internal::restore::restore_blob(
329                            &versions,
330                            blob.as_ref(),
331                            &cfg.build_version,
332                            shard_id,
333                            &*metrics,
334                        )
335                        .await?;
336                        info!(
337                            "Restored blob state for shard {shard_id}; {} errors, {:?} elapsed.",
338                            shard_not_restored.len(),
339                            start.elapsed()
340                        );
341                        Ok::<_, ExternalError>(shard_not_restored)
342                    }))
343                })
344                .try_fold(vec![], |mut a, b| async move {
345                    a.extend(b);
346                    Ok(a)
347                })
348                .await?;
349
350            info_log_non_zero_metrics(&metrics_registry.gather());
351            if !not_restored.is_empty() {
352                bail!("referenced blobs were not restored: {not_restored:#?}")
353            }
354        }
355    }
356    Ok(())
357}
358
359pub(crate) fn info_log_non_zero_metrics(metric_families: &[MetricFamily]) {
360    for mf in metric_families {
361        for m in mf.get_metric() {
362            let val = match mf.get_field_type() {
363                MetricType::COUNTER => m.get_counter().get_value(),
364                MetricType::GAUGE => m.get_gauge().get_value(),
365                x => {
366                    info!("unhandled {} metric type: {:?}", mf.name(), x);
367                    continue;
368                }
369            };
370            if val == 0.0 {
371                continue;
372            }
373            let label_pairs = m.get_label();
374            let mut labels = String::new();
375            if !label_pairs.is_empty() {
376                labels.push_str("{");
377                for lb in label_pairs {
378                    if labels != "{" {
379                        labels.push_str(",");
380                    }
381                    labels.push_str(lb.name());
382                    labels.push_str(":");
383                    labels.push_str(lb.name());
384                }
385                labels.push_str("}");
386            }
387            info!("{}{} {}", mf.name(), labels, val);
388        }
389    }
390}
391
392/// Manually completes all fueled compactions in a shard.
393pub async fn force_compaction<K, V, T, D>(
394    cfg: PersistConfig,
395    metrics_registry: &MetricsRegistry,
396    shard_id: ShardId,
397    consensus_uri: &SensitiveUrl,
398    blob_uri: &SensitiveUrl,
399    key_schema: Arc<K::Schema>,
400    val_schema: Arc<V::Schema>,
401    commit: bool,
402    expected_version: Option<Version>,
403) -> Result<(), anyhow::Error>
404where
405    K: Debug + Codec,
406    V: Debug + Codec,
407    T: Timestamp + Lattice + Codec64 + Sync,
408    D: Monoid + Ord + Codec64 + Send + Sync,
409{
410    let metrics = Arc::new(Metrics::new(&cfg, metrics_registry));
411    let consensus = make_consensus(&cfg, consensus_uri, commit, Arc::clone(&metrics)).await?;
412    let blob = make_blob(&cfg, blob_uri, commit, Arc::clone(&metrics)).await?;
413
414    let machine = make_typed_machine::<K, V, T, D>(
415        &cfg,
416        consensus,
417        Arc::clone(&blob),
418        Arc::clone(&metrics),
419        shard_id,
420        commit,
421        expected_version,
422    )
423    .await?;
424
425    let writer_id = WriterId::new();
426
427    let mut attempt = 0;
428    'outer: loop {
429        machine.applier.fetch_and_update_state(None).await;
430        let reqs = machine.applier.all_fueled_merge_reqs();
431        info!("attempt {}: got {} compaction reqs", attempt, reqs.len());
432        for (idx, req) in reqs.clone().into_iter().enumerate() {
433            let req = CompactReq {
434                shard_id,
435                desc: req.desc,
436                inputs: req.inputs,
437            };
438            let parts = req
439                .inputs
440                .iter()
441                .map(|x| x.batch.part_count())
442                .sum::<usize>();
443            let bytes = req
444                .inputs
445                .iter()
446                .map(|x| x.batch.encoded_size_bytes())
447                .sum::<usize>();
448            let start = Instant::now();
449            info!(
450                "attempt {} req {}: compacting {} batches {} in parts {} totaling bytes: lower={:?} upper={:?} since={:?}",
451                attempt,
452                idx,
453                req.inputs.len(),
454                parts,
455                bytes,
456                req.desc.lower().elements(),
457                req.desc.upper().elements(),
458                req.desc.since().elements(),
459            );
460            if !commit {
461                info!("skipping compaction because --commit is not set");
462                continue;
463            }
464            let schemas = Schemas {
465                id: None,
466                key: Arc::clone(&key_schema),
467                val: Arc::clone(&val_schema),
468            };
469
470            let res = Compactor::<K, V, T, D>::compact(
471                CompactConfig::new(&cfg, shard_id),
472                Arc::clone(&blob),
473                Arc::clone(&metrics),
474                Arc::clone(&machine.applier.shard_metrics),
475                Arc::new(IsolatedRuntime::new(
476                    metrics_registry,
477                    Some(cfg.isolated_runtime_worker_threads),
478                )),
479                req,
480                schemas,
481            )
482            .await?;
483            metrics.compaction.admin_count.inc();
484            info!(
485                "attempt {} req {}: compacted into {} parts {} bytes in {:?}",
486                attempt,
487                idx,
488                res.output.part_count(),
489                res.output.encoded_size_bytes(),
490                start.elapsed(),
491            );
492            let (apply_res, maintenance) = machine
493                .merge_res(&FueledMergeRes {
494                    output: res.output,
495                    input: res.input,
496                    new_active_compaction: None,
497                })
498                .await;
499            if !maintenance.is_empty() {
500                info!("ignoring non-empty requested maintenance: {maintenance:?}")
501            }
502            if apply_res.applied() {
503                info!("attempt {} req {}: {:?}", attempt, idx, apply_res);
504            } else {
505                info!(
506                    "attempt {} req {}: {:?} trying again",
507                    attempt, idx, apply_res
508                );
509                attempt += 1;
510                continue 'outer;
511            }
512        }
513        info!("attempt {}: did {} compactions", attempt, reqs.len());
514        let _ = machine.expire_writer(&writer_id).await;
515        info!("expired writer {}", writer_id);
516        return Ok(());
517    }
518}
519
520async fn make_machine(
521    cfg: &PersistConfig,
522    consensus: Arc<dyn Consensus>,
523    blob: Arc<dyn Blob>,
524    metrics: Arc<Metrics>,
525    shard_id: ShardId,
526    commit: bool,
527    expected_version: Option<Version>,
528) -> anyhow::Result<Machine<crate::cli::inspect::K, crate::cli::inspect::V, u64, i64>> {
529    make_typed_machine::<crate::cli::inspect::K, crate::cli::inspect::V, u64, i64>(
530        cfg,
531        consensus,
532        blob,
533        metrics,
534        shard_id,
535        commit,
536        expected_version,
537    )
538    .await
539}
540
541async fn make_typed_machine<K, V, T, D>(
542    cfg: &PersistConfig,
543    consensus: Arc<dyn Consensus>,
544    blob: Arc<dyn Blob>,
545    metrics: Arc<Metrics>,
546    shard_id: ShardId,
547    commit: bool,
548    expected_version: Option<Version>,
549) -> anyhow::Result<Machine<K, V, T, D>>
550where
551    K: Debug + Codec,
552    V: Debug + Codec,
553    T: Timestamp + Lattice + Codec64 + Sync,
554    D: Monoid + Codec64,
555{
556    let state_versions = Arc::new(StateVersions::new(
557        cfg.clone(),
558        consensus,
559        blob,
560        Arc::clone(&metrics),
561    ));
562
563    // Prime the K V codec magic
564    let versions = state_versions
565        .fetch_recent_live_diffs::<u64>(&shard_id)
566        .await;
567
568    loop {
569        let state_res = state_versions
570            .fetch_current_state::<u64>(&shard_id, versions.0.clone())
571            .await
572            .check_codecs::<crate::cli::inspect::K, crate::cli::inspect::V, i64>(&shard_id);
573        let state = match state_res {
574            Ok(state) => state,
575            Err(codec) => {
576                let mut kvtd = crate::cli::inspect::KVTD_CODECS.lock().expect("lockable");
577                *kvtd = codec.actual;
578                continue;
579            }
580        };
581        // This isn't the perfect place to put this check, the ideal would be in
582        // the apply_unbatched_cmd loop, but I don't want to pollute the prod
583        // code with this logic.
584        let safe_version_change = match (commit, expected_version) {
585            // We never actually write out state changes, so increasing the version is okay.
586            (false, _) => cfg.build_version >= state.collections.version,
587            // If the versions match that's okay because any commits won't change it.
588            (true, None) => cfg.build_version == state.collections.version,
589            // !!DANGER ZONE!!
590            (true, Some(expected)) => {
591                // If we're not _extremely_ careful, the persistcli could make shards unreadable by
592                // production. But there are times when we want to operate on a leaked shard with a
593                // newer version of the build.
594                //
595                // We only allow a mismatch in version if we provided the expected version to the
596                // command, and the expected version is less than the current build, which
597                // indicates this is an old shard.
598                state.collections.version == expected && expected <= cfg.build_version
599            }
600        };
601        if !safe_version_change {
602            // We could add a flag to override this check, if that comes up.
603            return Err(anyhow!(
604                "version of this tool {} does not match version of state {} when --commit is {commit}. bailing so we don't corrupt anything",
605                cfg.build_version,
606                state.collections.version
607            ));
608        }
609        break;
610    }
611
612    let machine = Machine::<K, V, T, D>::new(
613        cfg.clone(),
614        shard_id,
615        Arc::clone(&metrics),
616        state_versions,
617        Arc::new(StateCache::new(cfg, metrics, Arc::new(NoopPubSubSender))),
618        Arc::new(NoopPubSubSender),
619        Arc::new(IsolatedRuntime::new(
620            &MetricsRegistry::new(),
621            Some(cfg.isolated_runtime_worker_threads),
622        )),
623        Diagnostics::from_purpose("admin"),
624    )
625    .await?;
626
627    Ok(machine)
628}
629
630async fn force_gc(
631    cfg: PersistConfig,
632    metrics_registry: &MetricsRegistry,
633    shard_id: ShardId,
634    consensus_uri: &SensitiveUrl,
635    blob_uri: &SensitiveUrl,
636    commit: bool,
637    expected_version: Option<Version>,
638) -> anyhow::Result<Box<dyn Any>> {
639    let metrics = Arc::new(Metrics::new(&cfg, metrics_registry));
640    let consensus = make_consensus(&cfg, consensus_uri, commit, Arc::clone(&metrics)).await?;
641    let blob = make_blob(&cfg, blob_uri, commit, Arc::clone(&metrics)).await?;
642    let machine = make_machine(
643        &cfg,
644        consensus,
645        blob,
646        metrics,
647        shard_id,
648        commit,
649        expected_version,
650    )
651    .await?;
652    let gc_req = GcReq {
653        shard_id,
654        new_seqno_since: machine.applier.seqno_since(),
655    };
656    let (maintenance, _stats) = GarbageCollector::gc_and_truncate(&machine, gc_req).await;
657    if !maintenance.is_empty() {
658        info!("ignoring non-empty requested maintenance: {maintenance:?}")
659    }
660
661    Ok(Box::new(machine))
662}
663
664/// Exposed for `mz-catalog`.
665pub const CATALOG_FORCE_COMPACTION_FUEL: Config<usize> = Config::new(
666    "persist_catalog_force_compaction_fuel",
667    1024,
668    "fuel to use in catalog dangerous_force_compaction task",
669);
670
671/// Exposed for `mz-catalog`.
672pub const CATALOG_FORCE_COMPACTION_WAIT: Config<Duration> = Config::new(
673    "persist_catalog_force_compaction_wait",
674    Duration::from_secs(60),
675    "wait to use in catalog dangerous_force_compaction task",
676);
677
678/// Exposed for `mz-catalog`.
679pub const EXPRESSION_CACHE_FORCE_COMPACTION_FUEL: Config<usize> = Config::new(
680    "persist_expression_cache_force_compaction_fuel",
681    131_072,
682    "fuel to use in expression cache dangerous_force_compaction",
683);
684
685/// Exposed for `mz-catalog`.
686pub const EXPRESSION_CACHE_FORCE_COMPACTION_WAIT: Config<Duration> = Config::new(
687    "persist_expression_cache_force_compaction_wait",
688    Duration::from_secs(0),
689    "wait to use in expression cache dangerous_force_compaction",
690);
691
692/// Attempts to compact all batches in a shard into a minimal number.
693///
694/// This destroys most hope of filter pushdown doing anything useful. If you
695/// think you want this, you almost certainly want something else, please come
696/// talk to the persist team before using.
697///
698/// This is accomplished by adding artificial fuel (representing some count of
699/// updates) to the shard's internal Spine of batches structure on some schedule
700/// and doing any compaction work that results. Both the amount of fuel and
701/// cadence are dynamically tunable. However, it is not clear exactly how much
702/// fuel is safe to add in each call: 10 definitely seems fine, usize::MAX may
703/// not be. This also adds to the danger in "dangerous".
704///
705/// This is intentionally not even hooked up to `persistcli admin` for the above
706/// reasons.
707///
708/// Exits once the shard is sufficiently compacted, but can be called in a loop
709/// to keep it compacted.
710pub async fn dangerous_force_compaction_and_break_pushdown<K, V, T, D>(
711    write: &WriteHandle<K, V, T, D>,
712    fuel: impl Fn() -> usize,
713    wait: impl Fn() -> Duration,
714) where
715    K: Debug + Codec,
716    V: Debug + Codec,
717    T: Timestamp + Lattice + Codec64 + Sync,
718    D: Monoid + Ord + Codec64 + Send + Sync,
719{
720    let machine = write.machine.clone();
721
722    let mut last_exert: Instant;
723
724    loop {
725        last_exert = Instant::now();
726        let fuel = fuel();
727        let (reqs, mut maintenance) = machine.spine_exert(fuel).await;
728        for req in reqs {
729            info!(
730                "force_compaction {} {} compacting {} batches in {} parts with {} runs totaling {} bytes: lower={:?} upper={:?} since={:?}",
731                machine.applier.shard_metrics.name,
732                machine.applier.shard_metrics.shard_id,
733                req.inputs.len(),
734                req.inputs.iter().flat_map(|x| &x.batch.parts).count(),
735                req.inputs
736                    .iter()
737                    .map(|x| x.batch.runs().count())
738                    .sum::<usize>(),
739                req.inputs
740                    .iter()
741                    .flat_map(|x| &x.batch.parts)
742                    .map(|x| x.encoded_size_bytes())
743                    .sum::<usize>(),
744                req.desc.lower().elements(),
745                req.desc.upper().elements(),
746                req.desc.since().elements(),
747            );
748            machine.applier.metrics.compaction.requested.inc();
749            let start = Instant::now();
750            let res = Compactor::<K, V, T, D>::compact_and_apply(&machine, req).await;
751            let apply_maintenance = match res {
752                Ok(x) => x,
753                Err(err) => {
754                    warn!(
755                        "force_compaction {} {} errored in compaction: {:?}",
756                        machine.applier.shard_metrics.name,
757                        machine.applier.shard_metrics.shard_id,
758                        err
759                    );
760                    continue;
761                }
762            };
763            machine.applier.metrics.compaction.admin_count.inc();
764            info!(
765                "force_compaction {} {} compacted in {:?}",
766                machine.applier.shard_metrics.name,
767                machine.applier.shard_metrics.shard_id,
768                start.elapsed(),
769            );
770            maintenance.merge(apply_maintenance);
771        }
772        maintenance.perform(&machine, &write.gc).await;
773
774        // Now sleep before the next one. Make sure to delay from when we
775        // started the last exert in case the compaction takes non-trivial time.
776        let next_exert = last_exert + wait();
777        tokio::time::sleep_until(next_exert.into()).await;
778
779        // NB: This check is intentionally at the end so that it's safe to call
780        // this method in a loop.
781        let num_runs: usize = machine
782            .applier
783            .all_batches()
784            .iter()
785            .map(|x| x.runs().count())
786            .sum();
787        if num_runs <= 1 {
788            info!(
789                "force_compaction {} {} exiting with {} runs",
790                machine.applier.shard_metrics.name,
791                machine.applier.shard_metrics.shard_id,
792                num_runs
793            );
794            return;
795        }
796    }
797}
798
799#[cfg(test)]
800mod tests {
801    use std::time::Duration;
802
803    use mz_dyncfg::ConfigUpdates;
804    use mz_persist_types::ShardId;
805
806    use crate::tests::new_test_client;
807
808    #[mz_persist_proc::test(tokio::test)]
809    #[cfg_attr(miri, ignore)]
810    async fn dangerous_force_compaction_and_break_pushdown(dyncfgs: ConfigUpdates) {
811        let client = new_test_client(&dyncfgs).await;
812        for num_batches in 0..=17 {
813            let (mut write, _read) = client
814                .expect_open::<String, (), u64, i64>(ShardId::new())
815                .await;
816            let machine = write.machine.clone();
817
818            for idx in 0..num_batches {
819                let () = write
820                    .expect_compare_and_append(&[((idx.to_string(), ()), idx, 1)], idx, idx + 1)
821                    .await;
822            }
823
824            // Run the tool and verify that we get down to at most two.
825            super::dangerous_force_compaction_and_break_pushdown(&write, || 1, || Duration::ZERO)
826                .await;
827            let batches_after = machine.applier.all_batches().len();
828            assert!(batches_after < 2, "{} vs {}", num_batches, batches_after);
829        }
830    }
831}