1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
910//! Preflight checks for deployments.
1112use std::pin::pin;
13use std::sync::Arc;
14use std::time::Duration;
1516use mz_adapter::ResultExt;
17use mz_catalog::durable::{BootstrapArgs, CatalogError, Metrics, OpenableDurableCatalogState};
18use mz_ore::channel::trigger;
19use mz_ore::exit;
20use mz_ore::halt;
21use mz_persist_client::PersistClient;
22use mz_repr::Timestamp;
23use mz_sql::catalog::EnvironmentId;
24use tracing::info;
2526use crate::BUILD_INFO;
27use crate::deployment::state::DeploymentState;
2829/// The necessary input for preflight checks.
30pub struct PreflightInput {
31pub boot_ts: Timestamp,
32pub environment_id: EnvironmentId,
33pub persist_client: PersistClient,
34pub deploy_generation: u64,
35pub deployment_state: DeploymentState,
36pub openable_adapter_storage: Box<dyn OpenableDurableCatalogState>,
37pub catalog_metrics: Arc<Metrics>,
38pub caught_up_max_wait: Duration,
39pub ddl_check_interval: Duration,
40pub panic_after_timeout: bool,
41pub bootstrap_args: BootstrapArgs,
42}
4344/// Output of preflight checks.
45pub struct PreflightOutput {
46pub openable_adapter_storage: Box<dyn OpenableDurableCatalogState>,
47pub read_only: bool,
48pub caught_up_trigger: Option<trigger::Trigger>,
49}
5051/// Perform a legacy (non-0dt) preflight check.
52pub async fn preflight_legacy(
53 PreflightInput {
54 boot_ts,
55 environment_id,
56 persist_client,
57 deploy_generation,
58 deployment_state,
59mut openable_adapter_storage,
60 catalog_metrics,
61 bootstrap_args,
62 caught_up_max_wait: _,
63 ddl_check_interval: _,
64 panic_after_timeout: _,
65 }: PreflightInput,
66) -> Result<Box<dyn OpenableDurableCatalogState>, CatalogError> {
67tracing::info!("Requested deploy generation {deploy_generation}");
6869if !openable_adapter_storage.is_initialized().await? {
70tracing::info!(
71"Catalog storage doesn't exist so there's no current deploy generation. We won't wait to be leader"
72);
73return Ok(openable_adapter_storage);
74 }
75let catalog_generation = openable_adapter_storage.get_deployment_generation().await?;
76tracing::info!("Found catalog generation {catalog_generation:?}");
77if catalog_generation < deploy_generation {
78tracing::info!(
79"Catalog generation {catalog_generation:?} is less than deploy generation {deploy_generation}. Performing pre-flight checks"
80);
81match openable_adapter_storage
82 .open_savepoint(boot_ts.clone(), &bootstrap_args)
83 .await
84{
85Ok((adapter_storage, _)) => Box::new(adapter_storage).expire().await,
86Err(CatalogError::Durable(e)) if e.can_recover_with_write_mode() => {
87// This is theoretically possible if catalog implementation A is
88 // initialized, implementation B is uninitialized, and we are going to
89 // migrate from A to B. The current code avoids this by always
90 // initializing all implementations, regardless of the target
91 // implementation. Still it's easy to protect against this and worth it in
92 // case things change in the future.
93tracing::warn!(
94"Unable to perform upgrade test because the target implementation is uninitialized"
95);
96return Ok(mz_catalog::durable::persist_backed_catalog_state(
97 persist_client,
98 environment_id.organization_id(),
99 BUILD_INFO.semver_version(),
100Some(deploy_generation),
101 Arc::clone(&catalog_metrics),
102 )
103 .await?);
104 }
105Err(e) => {
106tracing::warn!(error = %e, "catalog upgrade would have failed");
107return Err(e);
108 }
109 }
110111let promoted = deployment_state.set_ready_to_promote();
112113tracing::info!("Waiting for user to promote this envd to leader");
114 promoted.await;
115116Ok(mz_catalog::durable::persist_backed_catalog_state(
117 persist_client,
118 environment_id.organization_id(),
119 BUILD_INFO.semver_version(),
120Some(deploy_generation),
121 Arc::clone(&catalog_metrics),
122 )
123 .await?)
124 } else if catalog_generation == deploy_generation {
125tracing::info!(
126"Server requested generation {deploy_generation} which is equal to catalog's generation"
127);
128Ok(openable_adapter_storage)
129 } else {
130mz_ore::halt!(
131"Server started with requested generation {deploy_generation} but catalog was already at {catalog_generation:?}. Deploy generations must increase monotonically"
132);
133 }
134}
135136/// Perform a 0dt preflight check.
137///
138/// Returns the openable adapter storage to use and whether or not to boot in
139/// read only mode.
140pub async fn preflight_0dt(
141 PreflightInput {
142 boot_ts,
143 environment_id,
144 persist_client,
145 deploy_generation,
146 deployment_state,
147mut openable_adapter_storage,
148 catalog_metrics,
149 caught_up_max_wait,
150 ddl_check_interval,
151 panic_after_timeout,
152 bootstrap_args,
153 }: PreflightInput,
154) -> Result<PreflightOutput, CatalogError> {
155info!(%deploy_generation, ?caught_up_max_wait, "performing 0dt preflight checks");
156157if !openable_adapter_storage.is_initialized().await? {
158info!("catalog not initialized; booting with writes allowed");
159return Ok(PreflightOutput {
160 openable_adapter_storage,
161 read_only: false,
162 caught_up_trigger: None,
163 });
164 }
165166let catalog_generation = openable_adapter_storage.get_deployment_generation().await?;
167info!(%catalog_generation, "catalog initialized");
168if catalog_generation < deploy_generation {
169info!("this deployment is a new generation; booting in read only mode");
170171let (caught_up_trigger, mut caught_up_receiver) = trigger::channel();
172173// Spawn a background task to handle promotion to leader.
174mz_ore::task::spawn(|| "preflight_0dt", async move {
175let (initial_next_user_item_id, initial_next_replica_id) = get_next_ids(
176 boot_ts,
177 persist_client.clone(),
178 environment_id.clone(),
179 deploy_generation,
180 Arc::clone(&catalog_metrics),
181 bootstrap_args.clone(),
182 )
183 .await;
184185info!(
186 %initial_next_user_item_id,
187 %initial_next_replica_id,
188"waiting for deployment to be caught up");
189190let caught_up_max_wait_fut = async {
191 tokio::time::sleep(caught_up_max_wait).await;
192 ()
193 };
194let mut caught_up_max_wait_fut = pin!(caught_up_max_wait_fut);
195196let mut skip_catchup = deployment_state.set_catching_up();
197198let mut check_ddl_changes_interval = tokio::time::interval(ddl_check_interval);
199 check_ddl_changes_interval
200 .set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
201202let mut should_skip_catchup = false;
203loop {
204tokio::select! {
205 biased;
206207 () = &mut skip_catchup => {
208info!("skipping waiting for deployment to catch up due to administrator request");
209 should_skip_catchup = true;
210break;
211 }
212 () = &mut caught_up_receiver => {
213info!("deployment caught up");
214break;
215 }
216 () = &mut caught_up_max_wait_fut => {
217if panic_after_timeout {
218panic!("not caught up within {:?}", caught_up_max_wait);
219 }
220info!("not caught up within {:?}, proceeding now", caught_up_max_wait);
221break;
222 }
223_ = check_ddl_changes_interval.tick() => {
224 check_ddl_changes(
225 boot_ts,
226 persist_client.clone(),
227 environment_id.clone(),
228 deploy_generation,
229 Arc::clone(&catalog_metrics),
230 bootstrap_args.clone(),
231 initial_next_user_item_id,
232 initial_next_replica_id,
233 )
234 .await;
235 }
236 }
237 }
238239// Check for DDL changes one last time before announcing as ready to
240 // promote.
241if !should_skip_catchup {
242 check_ddl_changes(
243 boot_ts,
244 persist_client.clone(),
245 environment_id.clone(),
246 deploy_generation,
247 Arc::clone(&catalog_metrics),
248 bootstrap_args.clone(),
249 initial_next_user_item_id,
250 initial_next_replica_id,
251 )
252 .await;
253 }
254255// Announce that we're ready to promote.
256let promoted = deployment_state.set_ready_to_promote();
257info!("announced as ready to promote; waiting for promotion");
258 promoted.await;
259260// Take over the catalog.
261info!("promoted; attempting takeover");
262263// NOTE: There _is_ a window where DDL can happen in the old
264 // environment, between checking above, us announcing as ready to
265 // promote, and cloud giving us the go-ahead signal. Its size
266 // depends on how quickly cloud will trigger promotion once we
267 // report as ready.
268 //
269 // We could add another check here, right before cutting over, but I
270 // think this requires changes in Cloud: with this additional check,
271 // it can now happen that cloud gives us the promote signal but we
272 // then notice there were changes and restart. Could would have to
273 // notice this and give us the promote signal again, once we're
274 // ready again.
275276let openable_adapter_storage = mz_catalog::durable::persist_backed_catalog_state(
277 persist_client.clone(),
278 environment_id.organization_id(),
279 BUILD_INFO.semver_version(),
280Some(deploy_generation),
281 Arc::clone(&catalog_metrics),
282 )
283 .await
284.expect("incompatible catalog/persist version");
285286let (_catalog, _audit_logs) = openable_adapter_storage
287 .open(boot_ts, &bootstrap_args)
288 .await
289.unwrap_or_terminate("unexpected error while fencing out old deployment");
290291// Reboot as the leader.
292halt!("fenced out old deployment; rebooting as leader")
293 });
294295Ok(PreflightOutput {
296 openable_adapter_storage,
297 read_only: true,
298 caught_up_trigger: Some(caught_up_trigger),
299 })
300 } else if catalog_generation == deploy_generation {
301info!("this deployment is the current generation; booting with writes allowed");
302Ok(PreflightOutput {
303 openable_adapter_storage,
304 read_only: false,
305 caught_up_trigger: None,
306 })
307 } else {
308exit!(0, "this deployment has been fenced out");
309 }
310}
311312/// Check if there have been any DDL that create new collections or replicas,
313/// restart in read-only mode if so, in order to pick up those new items and
314/// start hydrating them before cutting over.
315async fn check_ddl_changes(
316 boot_ts: Timestamp,
317 persist_client: PersistClient,
318 environment_id: EnvironmentId,
319 deploy_generation: u64,
320 catalog_metrics: Arc<Metrics>,
321 bootstrap_args: BootstrapArgs,
322 initial_next_user_item_id: u64,
323 initial_next_replica_id: u64,
324) {
325let (next_user_item_id, next_replica_id) = get_next_ids(
326 boot_ts,
327 persist_client.clone(),
328 environment_id.clone(),
329 deploy_generation,
330 Arc::clone(&catalog_metrics),
331 bootstrap_args.clone(),
332 )
333 .await;
334335tracing::info!(
336 %initial_next_user_item_id,
337 %initial_next_replica_id,
338 %next_user_item_id,
339 %next_replica_id,
340"checking if there was any relevant DDL");
341342if next_user_item_id > initial_next_user_item_id || next_replica_id > initial_next_replica_id {
343halt!("there have been DDL that we need to react to; rebooting in read-only mode")
344 }
345}
346347/// Gets and returns the next user item ID and user replica ID that would be
348/// allocated as of the current catalog state.
349async fn get_next_ids(
350 boot_ts: Timestamp,
351 persist_client: PersistClient,
352 environment_id: EnvironmentId,
353 deploy_generation: u64,
354 catalog_metrics: Arc<Metrics>,
355 bootstrap_args: BootstrapArgs,
356) -> (u64, u64) {
357let openable_adapter_storage = mz_catalog::durable::persist_backed_catalog_state(
358 persist_client,
359 environment_id.organization_id(),
360 BUILD_INFO.semver_version(),
361Some(deploy_generation),
362 catalog_metrics,
363 )
364 .await
365.expect("incompatible catalog/persist version");
366367let (mut catalog, _audit_logs) = openable_adapter_storage
368 .open_savepoint(boot_ts, &bootstrap_args)
369 .await
370.unwrap_or_terminate("can open in savepoint mode");
371372let next_user_item_id = catalog
373 .get_next_user_item_id()
374 .await
375.expect("can access catalog");
376let next_replica_item_id = catalog
377 .get_next_user_replica_id()
378 .await
379.expect("can access catalog");
380381 (next_user_item_id, next_replica_item_id)
382}