mz_testdrive/action/
persist.rs
1use std::str::FromStr;
11use std::sync::Arc;
12
13use mz_ore::metrics::MetricsRegistry;
14use mz_ore::now::SYSTEM_TIME;
15use mz_persist_client::ShardId;
16use mz_persist_client::cfg::PersistConfig;
17use mz_persist_types::codec_impls::UnitSchema;
18use mz_repr::{RelationDesc, ScalarType, Timestamp};
19use mz_storage_types::StorageDiff;
20use mz_storage_types::sources::SourceData;
21
22use crate::action::{ControlFlow, State};
23use crate::parser::BuiltinCommand;
24
25pub async fn run_force_compaction(
26 mut cmd: BuiltinCommand,
27 state: &State,
28) -> Result<ControlFlow, anyhow::Error> {
29 let shard_id = cmd.args.string("shard-id")?;
30 cmd.args.done()?;
31
32 let shard_id = ShardId::from_str(&shard_id).expect("invalid shard id");
33 let cfg = PersistConfig::new_default_configs(state.build_info, SYSTEM_TIME.clone());
34
35 let metrics_registry = MetricsRegistry::new();
36
37 let Some(consensus_url) = state.persist_consensus_url.as_ref() else {
38 anyhow::bail!("Missing persist consensus URL");
39 };
40 let Some(blob_url) = state.persist_blob_url.as_ref() else {
41 anyhow::bail!("Missing persist blob URL");
42 };
43
44 let relation_desc = RelationDesc::builder()
45 .with_column("key", ScalarType::String.nullable(true))
46 .with_column("f1", ScalarType::String.nullable(true))
47 .with_column("f2", ScalarType::Int64.nullable(true))
48 .finish();
49
50 mz_persist_client::cli::admin::force_compaction::<SourceData, (), Timestamp, StorageDiff>(
51 cfg,
52 &metrics_registry,
53 shard_id,
54 consensus_url,
55 blob_url,
56 Arc::new(relation_desc),
57 Arc::new(UnitSchema),
58 true,
59 None,
60 )
61 .await?;
62
63 Ok(ControlFlow::Continue)
64}