mz_testdrive/action/
persist.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::str::FromStr;
11use std::sync::Arc;
12
13use mz_ore::metrics::MetricsRegistry;
14use mz_ore::now::SYSTEM_TIME;
15use mz_persist_client::ShardId;
16use mz_persist_client::cfg::PersistConfig;
17use mz_persist_types::codec_impls::UnitSchema;
18use mz_repr::{RelationDesc, ScalarType, Timestamp};
19use mz_storage_types::StorageDiff;
20use mz_storage_types::sources::SourceData;
21
22use crate::action::{ControlFlow, State};
23use crate::parser::BuiltinCommand;
24
25pub async fn run_force_compaction(
26    mut cmd: BuiltinCommand,
27    state: &State,
28) -> Result<ControlFlow, anyhow::Error> {
29    let shard_id = cmd.args.string("shard-id")?;
30    cmd.args.done()?;
31
32    let shard_id = ShardId::from_str(&shard_id).expect("invalid shard id");
33    let cfg = PersistConfig::new_default_configs(state.build_info, SYSTEM_TIME.clone());
34
35    let metrics_registry = MetricsRegistry::new();
36
37    let Some(consensus_url) = state.persist_consensus_url.as_ref() else {
38        anyhow::bail!("Missing persist consensus URL");
39    };
40    let Some(blob_url) = state.persist_blob_url.as_ref() else {
41        anyhow::bail!("Missing persist blob URL");
42    };
43
44    let relation_desc = RelationDesc::builder()
45        .with_column("key", ScalarType::String.nullable(true))
46        .with_column("f1", ScalarType::String.nullable(true))
47        .with_column("f2", ScalarType::Int64.nullable(true))
48        .finish();
49
50    mz_persist_client::cli::admin::force_compaction::<SourceData, (), Timestamp, StorageDiff>(
51        cfg,
52        &metrics_registry,
53        shard_id,
54        consensus_url,
55        blob_url,
56        Arc::new(relation_desc),
57        Arc::new(UnitSchema),
58        true,
59        None,
60    )
61    .await?;
62
63    Ok(ControlFlow::Continue)
64}