mz_environmentd/deployment/
preflight.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Preflight checks for deployments.
11
12use std::pin::pin;
13use std::sync::Arc;
14use std::time::Duration;
15
16use mz_adapter::ResultExt;
17use mz_catalog::durable::{BootstrapArgs, CatalogError, Metrics, OpenableDurableCatalogState};
18use mz_ore::channel::trigger;
19use mz_ore::exit;
20use mz_ore::halt;
21use mz_persist_client::PersistClient;
22use mz_repr::Timestamp;
23use mz_sql::catalog::EnvironmentId;
24use tracing::info;
25
26use crate::BUILD_INFO;
27use crate::deployment::state::DeploymentState;
28
29/// The necessary input for preflight checks.
30pub struct PreflightInput {
31    pub boot_ts: Timestamp,
32    pub environment_id: EnvironmentId,
33    pub persist_client: PersistClient,
34    pub deploy_generation: u64,
35    pub deployment_state: DeploymentState,
36    pub openable_adapter_storage: Box<dyn OpenableDurableCatalogState>,
37    pub catalog_metrics: Arc<Metrics>,
38    pub caught_up_max_wait: Duration,
39    pub ddl_check_interval: Duration,
40    pub panic_after_timeout: bool,
41    pub bootstrap_args: BootstrapArgs,
42}
43
44/// Output of preflight checks.
45pub struct PreflightOutput {
46    pub openable_adapter_storage: Box<dyn OpenableDurableCatalogState>,
47    pub read_only: bool,
48    pub caught_up_trigger: Option<trigger::Trigger>,
49}
50
51/// Perform a 0dt preflight check.
52///
53/// Returns the openable adapter storage to use and whether or not to boot in
54/// read only mode.
55pub async fn preflight_0dt(
56    PreflightInput {
57        boot_ts,
58        environment_id,
59        persist_client,
60        deploy_generation,
61        deployment_state,
62        mut openable_adapter_storage,
63        catalog_metrics,
64        caught_up_max_wait,
65        ddl_check_interval,
66        panic_after_timeout,
67        bootstrap_args,
68    }: PreflightInput,
69) -> Result<PreflightOutput, CatalogError> {
70    info!(%deploy_generation, ?caught_up_max_wait, "performing 0dt preflight checks");
71
72    if !openable_adapter_storage.is_initialized().await? {
73        info!("catalog not initialized; booting with writes allowed");
74        return Ok(PreflightOutput {
75            openable_adapter_storage,
76            read_only: false,
77            caught_up_trigger: None,
78        });
79    }
80
81    let catalog_generation = openable_adapter_storage.get_deployment_generation().await?;
82    info!(%catalog_generation, "catalog initialized");
83    if catalog_generation < deploy_generation {
84        info!("this deployment is a new generation; booting in read only mode");
85
86        let (caught_up_trigger, mut caught_up_receiver) = trigger::channel();
87
88        // Spawn a background task to handle promotion to leader.
89        mz_ore::task::spawn(|| "preflight_0dt", async move {
90            let (initial_next_user_item_id, initial_next_replica_id) = get_next_ids(
91                boot_ts,
92                persist_client.clone(),
93                environment_id.clone(),
94                deploy_generation,
95                Arc::clone(&catalog_metrics),
96                bootstrap_args.clone(),
97            )
98            .await;
99
100            info!(
101                %initial_next_user_item_id,
102                %initial_next_replica_id,
103                "waiting for deployment to be caught up");
104
105            let caught_up_max_wait_fut = async {
106                tokio::time::sleep(caught_up_max_wait).await;
107                ()
108            };
109            let mut caught_up_max_wait_fut = pin!(caught_up_max_wait_fut);
110
111            let mut skip_catchup = deployment_state.set_catching_up();
112
113            let mut check_ddl_changes_interval = tokio::time::interval(ddl_check_interval);
114            check_ddl_changes_interval
115                .set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
116
117            let mut should_skip_catchup = false;
118            loop {
119                tokio::select! {
120                    biased;
121
122                    () = &mut skip_catchup => {
123                        info!("skipping waiting for deployment to catch up due to administrator request");
124                        should_skip_catchup = true;
125                        break;
126                    }
127                    () = &mut caught_up_receiver => {
128                        info!("deployment caught up");
129                        break;
130                    }
131                    () = &mut caught_up_max_wait_fut => {
132                        if panic_after_timeout {
133                            panic!("not caught up within {:?}", caught_up_max_wait);
134                        }
135                        info!("not caught up within {:?}, proceeding now", caught_up_max_wait);
136                        break;
137                    }
138                    _ = check_ddl_changes_interval.tick() => {
139                        check_ddl_changes(
140                            boot_ts,
141                            persist_client.clone(),
142                            environment_id.clone(),
143                            deploy_generation,
144                            Arc::clone(&catalog_metrics),
145                            bootstrap_args.clone(),
146                            initial_next_user_item_id,
147                            initial_next_replica_id,
148                        )
149                        .await;
150                    }
151                }
152            }
153
154            // Check for DDL changes one last time before announcing as ready to
155            // promote.
156            if !should_skip_catchup {
157                check_ddl_changes(
158                    boot_ts,
159                    persist_client.clone(),
160                    environment_id.clone(),
161                    deploy_generation,
162                    Arc::clone(&catalog_metrics),
163                    bootstrap_args.clone(),
164                    initial_next_user_item_id,
165                    initial_next_replica_id,
166                )
167                .await;
168            }
169
170            // Announce that we're ready to promote.
171            let promoted = deployment_state.set_ready_to_promote();
172            info!("announced as ready to promote; waiting for promotion");
173            promoted.await;
174
175            // Take over the catalog.
176            info!("promoted; attempting takeover");
177
178            // NOTE: There _is_ a window where DDL can happen in the old
179            // environment, between checking above, us announcing as ready to
180            // promote, and cloud giving us the go-ahead signal. Its size
181            // depends on how quickly cloud will trigger promotion once we
182            // report as ready.
183            //
184            // We could add another check here, right before cutting over, but I
185            // think this requires changes in Cloud: with this additional check,
186            // it can now happen that cloud gives us the promote signal but we
187            // then notice there were changes and restart. Could would have to
188            // notice this and give us the promote signal again, once we're
189            // ready again.
190
191            let openable_adapter_storage = mz_catalog::durable::persist_backed_catalog_state(
192                persist_client.clone(),
193                environment_id.organization_id(),
194                BUILD_INFO.semver_version(),
195                Some(deploy_generation),
196                Arc::clone(&catalog_metrics),
197            )
198            .await
199            .expect("incompatible catalog/persist version");
200
201            let (_catalog, _audit_logs) = openable_adapter_storage
202                .open(boot_ts, &bootstrap_args)
203                .await
204                .unwrap_or_terminate("unexpected error while fencing out old deployment");
205
206            // Reboot as the leader.
207            halt!("fenced out old deployment; rebooting as leader")
208        });
209
210        Ok(PreflightOutput {
211            openable_adapter_storage,
212            read_only: true,
213            caught_up_trigger: Some(caught_up_trigger),
214        })
215    } else if catalog_generation == deploy_generation {
216        info!("this deployment is the current generation; booting with writes allowed");
217        Ok(PreflightOutput {
218            openable_adapter_storage,
219            read_only: false,
220            caught_up_trigger: None,
221        })
222    } else {
223        exit!(0, "this deployment has been fenced out");
224    }
225}
226
227/// Check if there have been any DDL that create new collections or replicas,
228/// restart in read-only mode if so, in order to pick up those new items and
229/// start hydrating them before cutting over.
230async fn check_ddl_changes(
231    boot_ts: Timestamp,
232    persist_client: PersistClient,
233    environment_id: EnvironmentId,
234    deploy_generation: u64,
235    catalog_metrics: Arc<Metrics>,
236    bootstrap_args: BootstrapArgs,
237    initial_next_user_item_id: u64,
238    initial_next_replica_id: u64,
239) {
240    let (next_user_item_id, next_replica_id) = get_next_ids(
241        boot_ts,
242        persist_client.clone(),
243        environment_id.clone(),
244        deploy_generation,
245        Arc::clone(&catalog_metrics),
246        bootstrap_args.clone(),
247    )
248    .await;
249
250    tracing::info!(
251        %initial_next_user_item_id,
252        %initial_next_replica_id,
253        %next_user_item_id,
254        %next_replica_id,
255        "checking if there was any relevant DDL");
256
257    if next_user_item_id > initial_next_user_item_id || next_replica_id > initial_next_replica_id {
258        halt!("there have been DDL that we need to react to; rebooting in read-only mode")
259    }
260}
261
262/// Gets and returns the next user item ID and user replica ID that would be
263/// allocated as of the current catalog state.
264async fn get_next_ids(
265    boot_ts: Timestamp,
266    persist_client: PersistClient,
267    environment_id: EnvironmentId,
268    deploy_generation: u64,
269    catalog_metrics: Arc<Metrics>,
270    bootstrap_args: BootstrapArgs,
271) -> (u64, u64) {
272    let openable_adapter_storage = mz_catalog::durable::persist_backed_catalog_state(
273        persist_client,
274        environment_id.organization_id(),
275        BUILD_INFO.semver_version(),
276        Some(deploy_generation),
277        catalog_metrics,
278    )
279    .await
280    .expect("incompatible catalog/persist version");
281
282    let (mut catalog, _audit_logs) = openable_adapter_storage
283        .open_savepoint(boot_ts, &bootstrap_args)
284        .await
285        .unwrap_or_terminate("can open in savepoint mode");
286
287    let next_user_item_id = catalog
288        .get_next_user_item_id()
289        .await
290        .expect("can access catalog");
291    let next_replica_item_id = catalog
292        .get_next_user_replica_id()
293        .await
294        .expect("can access catalog");
295
296    (next_user_item_id, next_replica_item_id)
297}