1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
// Copyright Materialize, Inc. and contributors. All rights reserved.
//
// Use of this software is governed by the Business Source License
// included in the LICENSE file.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use std::str::FromStr;
use std::sync::Arc;

use mz_ore::metrics::MetricsRegistry;
use mz_ore::now::SYSTEM_TIME;
use mz_persist_client::cfg::PersistConfig;
use mz_persist_client::ShardId;
use mz_persist_types::codec_impls::UnitSchema;
use mz_repr::{Diff, RelationDesc, ScalarType, Timestamp};
use mz_storage_types::sources::SourceData;

use crate::action::{ControlFlow, State};
use crate::parser::BuiltinCommand;

pub async fn run_force_compaction(
    mut cmd: BuiltinCommand,
    state: &State,
) -> Result<ControlFlow, anyhow::Error> {
    let shard_id = cmd.args.string("shard-id")?;
    cmd.args.done()?;

    let shard_id = ShardId::from_str(&shard_id).expect("invalid shard id");
    let cfg = PersistConfig::new_default_configs(state.build_info, SYSTEM_TIME.clone());

    let metrics_registry = MetricsRegistry::new();

    let Some(consensus_url) = state.persist_consensus_url.as_ref() else {
        anyhow::bail!("Missing persist consensus URL");
    };
    let Some(blob_url) = state.persist_blob_url.as_ref() else {
        anyhow::bail!("Missing persist blob URL");
    };

    let relation_desc = RelationDesc::builder()
        .with_column("key", ScalarType::String.nullable(true))
        .with_column("f1", ScalarType::String.nullable(true))
        .with_column("f2", ScalarType::Int64.nullable(true))
        .finish();

    mz_persist_client::cli::admin::force_compaction::<SourceData, (), Timestamp, Diff>(
        cfg,
        &metrics_registry,
        shard_id,
        consensus_url,
        blob_url,
        Arc::new(relation_desc),
        Arc::new(UnitSchema),
        true,
        None,
    )
    .await?;

    Ok(ControlFlow::Continue)
}