use std::sync::Arc;
use std::time::Duration;
use mz_adapter::ResultExt;
use mz_catalog::durable::{BootstrapArgs, CatalogError, Metrics, OpenableDurableCatalogState};
use mz_ore::channel::trigger;
use mz_ore::exit;
use mz_ore::halt;
use mz_persist_client::PersistClient;
use mz_repr::Timestamp;
use mz_sql::catalog::EnvironmentId;
use tracing::info;
use crate::deployment::state::DeploymentState;
use crate::BUILD_INFO;
pub struct PreflightInput {
pub boot_ts: Timestamp,
pub environment_id: EnvironmentId,
pub persist_client: PersistClient,
pub deploy_generation: u64,
pub deployment_state: DeploymentState,
pub openable_adapter_storage: Box<dyn OpenableDurableCatalogState>,
pub catalog_metrics: Arc<Metrics>,
pub caught_up_max_wait: Duration,
pub panic_after_timeout: bool,
pub bootstrap_args: BootstrapArgs,
}
pub struct PreflightOutput {
pub openable_adapter_storage: Box<dyn OpenableDurableCatalogState>,
pub read_only: bool,
pub caught_up_trigger: Option<trigger::Trigger>,
}
pub async fn preflight_legacy(
PreflightInput {
boot_ts,
environment_id,
persist_client,
deploy_generation,
deployment_state,
mut openable_adapter_storage,
catalog_metrics,
bootstrap_args,
caught_up_max_wait: _,
panic_after_timeout: _,
}: PreflightInput,
) -> Result<Box<dyn OpenableDurableCatalogState>, CatalogError> {
tracing::info!("Requested deploy generation {deploy_generation}");
if !openable_adapter_storage.is_initialized().await? {
tracing::info!("Catalog storage doesn't exist so there's no current deploy generation. We won't wait to be leader");
return Ok(openable_adapter_storage);
}
let catalog_generation = openable_adapter_storage.get_deployment_generation().await?;
tracing::info!("Found catalog generation {catalog_generation:?}");
if catalog_generation < deploy_generation {
tracing::info!("Catalog generation {catalog_generation:?} is less than deploy generation {deploy_generation}. Performing pre-flight checks");
match openable_adapter_storage
.open_savepoint(boot_ts.clone(), &bootstrap_args)
.await
{
Ok((adapter_storage, _)) => Box::new(adapter_storage).expire().await,
Err(CatalogError::Durable(e)) if e.can_recover_with_write_mode() => {
tracing::warn!("Unable to perform upgrade test because the target implementation is uninitialized");
return Ok(mz_catalog::durable::persist_backed_catalog_state(
persist_client,
environment_id.organization_id(),
BUILD_INFO.semver_version(),
Some(deploy_generation),
Arc::clone(&catalog_metrics),
)
.await?);
}
Err(e) => {
tracing::warn!(error = %e, "catalog upgrade would have failed");
return Err(e);
}
}
let promoted = deployment_state.set_ready_to_promote();
tracing::info!("Waiting for user to promote this envd to leader");
promoted.await;
Ok(mz_catalog::durable::persist_backed_catalog_state(
persist_client,
environment_id.organization_id(),
BUILD_INFO.semver_version(),
Some(deploy_generation),
Arc::clone(&catalog_metrics),
)
.await?)
} else if catalog_generation == deploy_generation {
tracing::info!("Server requested generation {deploy_generation} which is equal to catalog's generation");
Ok(openable_adapter_storage)
} else {
mz_ore::halt!("Server started with requested generation {deploy_generation} but catalog was already at {catalog_generation:?}. Deploy generations must increase monotonically");
}
}
pub async fn preflight_0dt(
PreflightInput {
boot_ts,
environment_id,
persist_client,
deploy_generation,
deployment_state,
mut openable_adapter_storage,
catalog_metrics,
caught_up_max_wait,
panic_after_timeout,
bootstrap_args,
}: PreflightInput,
) -> Result<PreflightOutput, CatalogError> {
info!(%deploy_generation, ?caught_up_max_wait, "performing 0dt preflight checks");
if !openable_adapter_storage.is_initialized().await? {
info!("catalog not initialized; booting with writes allowed");
return Ok(PreflightOutput {
openable_adapter_storage,
read_only: false,
caught_up_trigger: None,
});
}
let catalog_generation = openable_adapter_storage.get_deployment_generation().await?;
info!(%catalog_generation, "catalog initialized");
if catalog_generation < deploy_generation {
info!("this deployment is a new generation; booting in read only mode");
let (caught_up_trigger, caught_up_receiver) = trigger::channel();
mz_ore::task::spawn(|| "preflight_0dt", async move {
info!("waiting for deployment to be caught up");
let caught_up_max_wait_fut = async {
tokio::time::sleep(caught_up_max_wait).await;
()
};
let skip_catchup = deployment_state.set_catching_up();
tokio::select! {
() = caught_up_receiver => {
info!("deployment caught up");
}
() = skip_catchup => {
info!("skipping waiting for deployment to catch up due to administrator request");
}
() = caught_up_max_wait_fut => {
if panic_after_timeout {
panic!("not caught up within {:?}", caught_up_max_wait);
}
info!("not caught up within {:?}, proceeding now", caught_up_max_wait);
}
}
let promoted = deployment_state.set_ready_to_promote();
info!("announced as ready to promote; waiting for promotion");
promoted.await;
info!("promoted; attempting takeover");
let openable_adapter_storage = mz_catalog::durable::persist_backed_catalog_state(
persist_client.clone(),
environment_id.organization_id(),
BUILD_INFO.semver_version(),
Some(deploy_generation),
Arc::clone(&catalog_metrics),
)
.await
.expect("incompatible catalog/persist version");
let (_catalog, _audit_logs) = openable_adapter_storage
.open(boot_ts, &bootstrap_args)
.await
.unwrap_or_terminate("unexpected error while fencing out old deployment");
halt!("fenced out old deployment; rebooting as leader")
});
Ok(PreflightOutput {
openable_adapter_storage,
read_only: true,
caught_up_trigger: Some(caught_up_trigger),
})
} else if catalog_generation == deploy_generation {
info!("this deployment is the current generation; booting with writes allowed");
Ok(PreflightOutput {
openable_adapter_storage,
read_only: false,
caught_up_trigger: None,
})
} else {
exit!(0, "this deployment has been fenced out");
}
}