use std::any::Any;
use std::fmt::Debug;
use std::str::FromStr;
use std::sync::Arc;
use std::time::{Duration, Instant};
use anyhow::{anyhow, bail};
use differential_dataflow::difference::Semigroup;
use differential_dataflow::lattice::Lattice;
use futures_util::{stream, StreamExt, TryStreamExt};
use mz_dyncfg::{Config, ConfigSet};
use mz_ore::metrics::MetricsRegistry;
use mz_ore::now::SYSTEM_TIME;
use mz_ore::url::SensitiveUrl;
use mz_persist::location::{Blob, Consensus, ExternalError};
use mz_persist_types::codec_impls::TodoSchema;
use mz_persist_types::{Codec, Codec64};
use prometheus::proto::{MetricFamily, MetricType};
use semver::Version;
use timely::progress::{Antichain, Timestamp};
use tracing::{info, warn};
use crate::async_runtime::IsolatedRuntime;
use crate::cache::StateCache;
use crate::cfg::{all_dyncfgs, COMPACTION_MEMORY_BOUND_BYTES};
use crate::cli::args::{make_blob, make_consensus, StateArgs, StoreArgs};
use crate::cli::inspect::FAKE_OPAQUE_CODEC;
use crate::internal::compact::{CompactConfig, CompactReq, Compactor};
use crate::internal::encoding::Schemas;
use crate::internal::gc::{GarbageCollector, GcReq};
use crate::internal::machine::Machine;
use crate::internal::trace::FueledMergeRes;
use crate::rpc::{NoopPubSubSender, PubSubSender};
use crate::write::{WriteHandle, WriterId};
use crate::{
Diagnostics, Metrics, PersistClient, PersistConfig, ShardId, StateVersions, BUILD_INFO,
};
#[derive(Debug, clap::Args)]
pub struct AdminArgs {
#[clap(subcommand)]
command: Command,
#[clap(long)]
pub(crate) commit: bool,
#[clap(long)]
pub(crate) expected_version: Option<String>,
}
#[derive(Debug, clap::Subcommand)]
pub(crate) enum Command {
ForceCompaction(ForceCompactionArgs),
ForceGc(ForceGcArgs),
Finalize(FinalizeArgs),
RestoreBlob(RestoreBlobArgs),
}
#[derive(Debug, clap::Parser)]
pub(crate) struct ForceCompactionArgs {
#[clap(flatten)]
state: StateArgs,
#[clap(long, default_value_t = 0)]
compaction_memory_bound_bytes: usize,
}
#[derive(Debug, clap::Parser)]
pub(crate) struct ForceGcArgs {
#[clap(flatten)]
state: StateArgs,
}
#[derive(Debug, clap::Parser)]
pub(crate) struct FinalizeArgs {
#[clap(flatten)]
state: StateArgs,
#[clap(long, default_value_t = false)]
force_downgrade_since: bool,
#[clap(long, default_value_t = false)]
force_downgrade_upper: bool,
}
#[derive(Debug, clap::Parser)]
pub(crate) struct RestoreBlobArgs {
#[clap(flatten)]
state: StoreArgs,
#[clap(long, default_value_t = 16)]
concurrency: usize,
}
pub async fn run(command: AdminArgs) -> Result<(), anyhow::Error> {
match command.command {
Command::ForceCompaction(args) => {
let shard_id = ShardId::from_str(&args.state.shard_id).expect("invalid shard id");
let configs = all_dyncfgs(ConfigSet::default());
let cfg = PersistConfig::new(&BUILD_INFO, SYSTEM_TIME.clone(), configs);
cfg.set_config(
&COMPACTION_MEMORY_BOUND_BYTES,
args.compaction_memory_bound_bytes,
);
let metrics_registry = MetricsRegistry::new();
let expected_version = command
.expected_version
.as_ref()
.map(|v| Version::parse(v))
.transpose()?;
let () = force_compaction::<crate::cli::inspect::K, crate::cli::inspect::V, u64, i64>(
cfg,
&metrics_registry,
shard_id,
&args.state.consensus_uri,
&args.state.blob_uri,
Arc::new(TodoSchema::default()),
Arc::new(TodoSchema::default()),
command.commit,
expected_version,
)
.await?;
info_log_non_zero_metrics(&metrics_registry.gather());
}
Command::ForceGc(args) => {
let shard_id = ShardId::from_str(&args.state.shard_id).expect("invalid shard id");
let configs = all_dyncfgs(ConfigSet::default());
let cfg = PersistConfig::new(&BUILD_INFO, SYSTEM_TIME.clone(), configs);
let metrics_registry = MetricsRegistry::new();
let expected_version = command
.expected_version
.as_ref()
.map(|v| Version::parse(v))
.transpose()?;
let _machine = force_gc(
cfg,
&metrics_registry,
shard_id,
&args.state.consensus_uri,
&args.state.blob_uri,
command.commit,
expected_version,
)
.await?;
info_log_non_zero_metrics(&metrics_registry.gather());
}
Command::Finalize(args) => {
let FinalizeArgs {
state:
StateArgs {
shard_id,
consensus_uri,
blob_uri,
},
force_downgrade_since,
force_downgrade_upper,
} = args;
let shard_id = ShardId::from_str(&shard_id).expect("invalid shard id");
let commit = command.commit;
let expected_version = command
.expected_version
.as_ref()
.map(|v| Version::parse(v))
.transpose()?;
let configs = all_dyncfgs(ConfigSet::default());
let cfg = PersistConfig::new(&BUILD_INFO, SYSTEM_TIME.clone(), configs);
let metrics_registry = MetricsRegistry::new();
let metrics = Arc::new(Metrics::new(&cfg, &metrics_registry));
let consensus =
make_consensus(&cfg, &consensus_uri, commit, Arc::clone(&metrics)).await?;
let blob = make_blob(&cfg, &blob_uri, commit, Arc::clone(&metrics)).await?;
let machine = make_machine(
&cfg,
Arc::clone(&consensus),
Arc::clone(&blob),
Arc::clone(&metrics),
shard_id,
commit,
expected_version,
)
.await?;
if force_downgrade_upper {
let isolated_runtime = Arc::new(IsolatedRuntime::default());
let pubsub_sender: Arc<dyn PubSubSender> = Arc::new(NoopPubSubSender);
let shared_states = Arc::new(StateCache::new(
&cfg,
Arc::clone(&metrics),
Arc::clone(&pubsub_sender),
));
let persist_client = PersistClient::new(
cfg,
blob,
consensus,
metrics,
isolated_runtime,
shared_states,
pubsub_sender,
)?;
let diagnostics = Diagnostics {
shard_name: shard_id.to_string(),
handle_purpose: "persist-cli finalize shard".to_string(),
};
let mut write_handle: WriteHandle<
crate::cli::inspect::K,
crate::cli::inspect::V,
u64,
i64,
> = persist_client
.open_writer(
shard_id,
Arc::new(TodoSchema::<crate::cli::inspect::K>::default()),
Arc::new(TodoSchema::<crate::cli::inspect::V>::default()),
diagnostics,
)
.await?;
if !write_handle.upper().is_empty() {
let empty_batch: Vec<(
(crate::cli::inspect::K, crate::cli::inspect::V),
u64,
i64,
)> = vec![];
let lower = write_handle.upper().clone();
let upper = Antichain::new();
let result = write_handle.append(empty_batch, lower, upper).await?;
if let Err(err) = result {
anyhow::bail!("failed to force downgrade upper, {err:?}");
}
}
}
if force_downgrade_since {
let (state, _maintenance) = machine
.register_critical_reader::<crate::cli::inspect::O>(
&crate::PersistClient::CONTROLLER_CRITICAL_SINCE,
"persist-cli finalize with force downgrade",
)
.await;
let expected_opaque = crate::cli::inspect::O::decode(state.opaque.0);
FAKE_OPAQUE_CODEC
.lock()
.expect("lockable")
.clone_from(&state.opaque_codec);
let (result, _maintenance) = machine
.compare_and_downgrade_since(
&crate::PersistClient::CONTROLLER_CRITICAL_SINCE,
&expected_opaque,
(&expected_opaque, &Antichain::new()),
)
.await;
if let Err((actual_opaque, _since)) = result {
bail!(
"opaque changed, expected: {expected_opaque:?}, actual: {actual_opaque:?}"
)
}
}
let maintenance = machine.become_tombstone().await?;
if !maintenance.is_empty() {
info!("ignoring non-empty requested maintenance: {maintenance:?}")
}
info_log_non_zero_metrics(&metrics_registry.gather());
}
Command::RestoreBlob(args) => {
let RestoreBlobArgs {
state:
StoreArgs {
consensus_uri,
blob_uri,
},
concurrency,
} = args;
let commit = command.commit;
let configs = all_dyncfgs(ConfigSet::default());
let cfg = PersistConfig::new(&BUILD_INFO, SYSTEM_TIME.clone(), configs);
let metrics_registry = MetricsRegistry::new();
let metrics = Arc::new(Metrics::new(&cfg, &metrics_registry));
let consensus =
make_consensus(&cfg, &consensus_uri, commit, Arc::clone(&metrics)).await?;
let blob = make_blob(&cfg, &blob_uri, commit, Arc::clone(&metrics)).await?;
let versions = StateVersions::new(
cfg.clone(),
Arc::clone(&consensus),
Arc::clone(&blob),
Arc::clone(&metrics),
);
let not_restored: Vec<_> = consensus
.list_keys()
.flat_map_unordered(concurrency, |shard| {
stream::once(Box::pin(async {
let shard_id = shard?;
let shard_id = ShardId::from_str(&shard_id).expect("invalid shard id");
let start = Instant::now();
info!("Restoring blob state for shard {shard_id}.",);
let shard_not_restored = crate::internal::restore::restore_blob(
&versions,
blob.as_ref(),
&cfg.build_version,
shard_id,
&*metrics,
)
.await?;
info!(
"Restored blob state for shard {shard_id}; {} errors, {:?} elapsed.",
shard_not_restored.len(),
start.elapsed()
);
Ok::<_, ExternalError>(shard_not_restored)
}))
})
.try_fold(vec![], |mut a, b| async move {
a.extend(b);
Ok(a)
})
.await?;
info_log_non_zero_metrics(&metrics_registry.gather());
if !not_restored.is_empty() {
bail!("referenced blobs were not restored: {not_restored:#?}")
}
}
}
Ok(())
}
pub(crate) fn info_log_non_zero_metrics(metric_families: &[MetricFamily]) {
for mf in metric_families {
for m in mf.get_metric() {
let val = match mf.get_field_type() {
MetricType::COUNTER => m.get_counter().get_value(),
MetricType::GAUGE => m.get_gauge().get_value(),
x => {
info!("unhandled {} metric type: {:?}", mf.get_name(), x);
continue;
}
};
if val == 0.0 {
continue;
}
let label_pairs = m.get_label();
let mut labels = String::new();
if !label_pairs.is_empty() {
labels.push_str("{");
for lb in label_pairs {
if labels != "{" {
labels.push_str(",");
}
labels.push_str(lb.get_name());
labels.push_str(":");
labels.push_str(lb.get_value());
}
labels.push_str("}");
}
info!("{}{} {}", mf.get_name(), labels, val);
}
}
}
pub async fn force_compaction<K, V, T, D>(
cfg: PersistConfig,
metrics_registry: &MetricsRegistry,
shard_id: ShardId,
consensus_uri: &SensitiveUrl,
blob_uri: &SensitiveUrl,
key_schema: Arc<K::Schema>,
val_schema: Arc<V::Schema>,
commit: bool,
expected_version: Option<Version>,
) -> Result<(), anyhow::Error>
where
K: Debug + Codec,
V: Debug + Codec,
T: Timestamp + Lattice + Codec64 + Sync,
D: Semigroup + Ord + Codec64 + Send + Sync,
{
let metrics = Arc::new(Metrics::new(&cfg, metrics_registry));
let consensus = make_consensus(&cfg, consensus_uri, commit, Arc::clone(&metrics)).await?;
let blob = make_blob(&cfg, blob_uri, commit, Arc::clone(&metrics)).await?;
let machine = make_typed_machine::<K, V, T, D>(
&cfg,
consensus,
Arc::clone(&blob),
Arc::clone(&metrics),
shard_id,
commit,
expected_version,
)
.await?;
let writer_id = WriterId::new();
let mut attempt = 0;
'outer: loop {
machine.applier.fetch_and_update_state(None).await;
let reqs = machine.applier.all_fueled_merge_reqs();
info!("attempt {}: got {} compaction reqs", attempt, reqs.len());
for (idx, req) in reqs.clone().into_iter().enumerate() {
let req = CompactReq {
shard_id,
desc: req.desc,
inputs: req
.inputs
.into_iter()
.map(|b| Arc::unwrap_or_clone(b.batch))
.collect(),
};
let parts = req.inputs.iter().map(|x| x.part_count()).sum::<usize>();
let bytes = req
.inputs
.iter()
.map(|x| x.encoded_size_bytes())
.sum::<usize>();
let start = Instant::now();
info!(
"attempt {} req {}: compacting {} batches {} in parts {} totaling bytes: lower={:?} upper={:?} since={:?}",
attempt,
idx,
req.inputs.len(),
parts,
bytes,
req.desc.lower().elements(),
req.desc.upper().elements(),
req.desc.since().elements(),
);
if !commit {
info!("skipping compaction because --commit is not set");
continue;
}
let schemas = Schemas {
id: None,
key: Arc::clone(&key_schema),
val: Arc::clone(&val_schema),
};
let res = Compactor::<K, V, T, D>::compact(
CompactConfig::new(&cfg, shard_id),
Arc::clone(&blob),
Arc::clone(&metrics),
Arc::clone(&machine.applier.shard_metrics),
Arc::new(IsolatedRuntime::default()),
req,
schemas,
)
.await?;
metrics.compaction.admin_count.inc();
info!(
"attempt {} req {}: compacted into {} parts {} bytes in {:?}",
attempt,
idx,
res.output.part_count(),
res.output.encoded_size_bytes(),
start.elapsed(),
);
let (apply_res, maintenance) = machine
.merge_res(&FueledMergeRes { output: res.output })
.await;
if !maintenance.is_empty() {
info!("ignoring non-empty requested maintenance: {maintenance:?}")
}
if apply_res.applied() {
info!("attempt {} req {}: {:?}", attempt, idx, apply_res);
} else {
info!(
"attempt {} req {}: {:?} trying again",
attempt, idx, apply_res
);
attempt += 1;
continue 'outer;
}
}
info!("attempt {}: did {} compactions", attempt, reqs.len());
let _ = machine.expire_writer(&writer_id).await;
info!("expired writer {}", writer_id);
return Ok(());
}
}
async fn make_machine(
cfg: &PersistConfig,
consensus: Arc<dyn Consensus>,
blob: Arc<dyn Blob>,
metrics: Arc<Metrics>,
shard_id: ShardId,
commit: bool,
expected_version: Option<Version>,
) -> anyhow::Result<Machine<crate::cli::inspect::K, crate::cli::inspect::V, u64, i64>> {
make_typed_machine::<crate::cli::inspect::K, crate::cli::inspect::V, u64, i64>(
cfg,
consensus,
blob,
metrics,
shard_id,
commit,
expected_version,
)
.await
}
async fn make_typed_machine<K, V, T, D>(
cfg: &PersistConfig,
consensus: Arc<dyn Consensus>,
blob: Arc<dyn Blob>,
metrics: Arc<Metrics>,
shard_id: ShardId,
commit: bool,
expected_version: Option<Version>,
) -> anyhow::Result<Machine<K, V, T, D>>
where
K: Debug + Codec,
V: Debug + Codec,
T: Timestamp + Lattice + Codec64 + Sync,
D: Semigroup + Codec64,
{
let state_versions = Arc::new(StateVersions::new(
cfg.clone(),
consensus,
blob,
Arc::clone(&metrics),
));
let versions = state_versions
.fetch_recent_live_diffs::<u64>(&shard_id)
.await;
loop {
let state_res = state_versions
.fetch_current_state::<u64>(&shard_id, versions.0.clone())
.await
.check_codecs::<crate::cli::inspect::K, crate::cli::inspect::V, i64>(&shard_id);
let state = match state_res {
Ok(state) => state,
Err(codec) => {
let mut kvtd = crate::cli::inspect::KVTD_CODECS.lock().expect("lockable");
*kvtd = codec.actual;
continue;
}
};
let safe_version_change = match (commit, expected_version) {
(false, _) => cfg.build_version >= state.applier_version,
(true, None) => cfg.build_version == state.applier_version,
(true, Some(expected)) => {
state.applier_version == expected && expected <= cfg.build_version
}
};
if !safe_version_change {
return Err(anyhow!("version of this tool {} does not match version of state {} when --commit is {commit}. bailing so we don't corrupt anything", cfg.build_version, state.applier_version));
}
break;
}
let machine = Machine::<K, V, T, D>::new(
cfg.clone(),
shard_id,
Arc::clone(&metrics),
state_versions,
Arc::new(StateCache::new(cfg, metrics, Arc::new(NoopPubSubSender))),
Arc::new(NoopPubSubSender),
Arc::new(IsolatedRuntime::default()),
Diagnostics::from_purpose("admin"),
)
.await?;
Ok(machine)
}
async fn force_gc(
cfg: PersistConfig,
metrics_registry: &MetricsRegistry,
shard_id: ShardId,
consensus_uri: &SensitiveUrl,
blob_uri: &SensitiveUrl,
commit: bool,
expected_version: Option<Version>,
) -> anyhow::Result<Box<dyn Any>> {
let metrics = Arc::new(Metrics::new(&cfg, metrics_registry));
let consensus = make_consensus(&cfg, consensus_uri, commit, Arc::clone(&metrics)).await?;
let blob = make_blob(&cfg, blob_uri, commit, Arc::clone(&metrics)).await?;
let machine = make_machine(
&cfg,
consensus,
blob,
metrics,
shard_id,
commit,
expected_version,
)
.await?;
let gc_req = GcReq {
shard_id,
new_seqno_since: machine.applier.seqno_since(),
};
let (maintenance, _stats) = GarbageCollector::gc_and_truncate(&machine, gc_req).await;
if !maintenance.is_empty() {
info!("ignoring non-empty requested maintenance: {maintenance:?}")
}
Ok(Box::new(machine))
}
pub const CATALOG_FORCE_COMPACTION_FUEL: Config<usize> = Config::new(
"persist_catalog_force_compaction_fuel",
1024,
"fuel to use in catalog dangerous_force_compaction task",
);
pub const CATALOG_FORCE_COMPACTION_WAIT: Config<Duration> = Config::new(
"persist_catalog_force_compaction_wait",
Duration::from_secs(60),
"wait to use in catalog dangerous_force_compaction task",
);
pub const EXPRESSION_CACHE_FORCE_COMPACTION_FUEL: Config<usize> = Config::new(
"persist_expression_cache_force_compaction_fuel",
131_072,
"fuel to use in expression cache dangerous_force_compaction",
);
pub const EXPRESSION_CACHE_FORCE_COMPACTION_WAIT: Config<Duration> = Config::new(
"persist_expression_cache_force_compaction_wait",
Duration::from_secs(0),
"wait to use in expression cache dangerous_force_compaction",
);
pub async fn dangerous_force_compaction_and_break_pushdown<K, V, T, D>(
write: &WriteHandle<K, V, T, D>,
fuel: impl Fn() -> usize,
wait: impl Fn() -> Duration,
) where
K: Debug + Codec,
V: Debug + Codec,
T: Timestamp + Lattice + Codec64 + Sync,
D: Semigroup + Ord + Codec64 + Send + Sync,
{
let machine = write.machine.clone();
let mut last_exert: Instant;
loop {
last_exert = Instant::now();
let fuel = fuel();
let (reqs, mut maintenance) = machine.spine_exert(fuel).await;
for req in reqs {
info!(
"dangerous_force_compaction_and_break_pushdown {} {} compacting {} batches in {} parts totaling {} bytes: lower={:?} upper={:?} since={:?}",
machine.applier.shard_metrics.name,
machine.applier.shard_metrics.shard_id,
req.inputs.len(),
req.inputs.iter().flat_map(|x| &x.parts).count(),
req.inputs.iter().flat_map(|x| &x.parts).map(|x| x.encoded_size_bytes()).sum::<usize>(),
req.desc.lower().elements(),
req.desc.upper().elements(),
req.desc.since().elements(),
);
machine.applier.metrics.compaction.requested.inc();
let start = Instant::now();
let res = Compactor::<K, V, T, D>::compact_and_apply(
&machine,
req,
write.write_schemas.clone(),
)
.await;
let (res, apply_maintenance) = match res {
Ok(x) => x,
Err(err) => {
warn!(
"dangerous_force_compaction_and_break_pushdown {} {} errored in compaction: {:?}",
machine.applier.shard_metrics.name,
machine.applier.shard_metrics.shard_id,
err
);
continue;
}
};
machine.applier.metrics.compaction.admin_count.inc();
info!(
"dangerous_force_compaction_and_break_pushdown {} {} compacted in {:?}: {:?}",
machine.applier.shard_metrics.name,
machine.applier.shard_metrics.shard_id,
start.elapsed(),
res
);
maintenance.merge(apply_maintenance);
}
maintenance.perform(&machine, &write.gc).await;
let next_exert = last_exert + wait();
tokio::time::sleep_until(next_exert.into()).await;
let num_batches = machine.applier.all_batches().len();
if num_batches < 2 {
info!(
"dangerous_force_compaction_and_break_pushdown {} {} exiting with {} batches",
machine.applier.shard_metrics.name,
machine.applier.shard_metrics.shard_id,
num_batches
);
return;
}
}
}
#[cfg(test)]
mod tests {
use std::time::Duration;
use mz_dyncfg::ConfigUpdates;
use mz_persist_types::ShardId;
use crate::tests::new_test_client;
#[mz_persist_proc::test(tokio::test)]
#[cfg_attr(miri, ignore)]
async fn dangerous_force_compaction_and_break_pushdown(dyncfgs: ConfigUpdates) {
let client = new_test_client(&dyncfgs).await;
for num_batches in 0..=17 {
let (mut write, _read) = client
.expect_open::<String, (), u64, i64>(ShardId::new())
.await;
let machine = write.machine.clone();
for idx in 0..num_batches {
let () = write
.expect_compare_and_append(&[((idx.to_string(), ()), idx, 1)], idx, idx + 1)
.await;
}
super::dangerous_force_compaction_and_break_pushdown(&write, || 1, || Duration::ZERO)
.await;
let batches_after = machine.applier.all_batches().len();
assert!(batches_after < 2, "{} vs {}", num_batches, batches_after);
}
}
}