mz_environmentd/deployment/
preflight.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Preflight checks for deployments.
11
12use std::pin::pin;
13use std::sync::Arc;
14use std::time::Duration;
15
16use mz_adapter::ResultExt;
17use mz_catalog::durable::{BootstrapArgs, CatalogError, Metrics, OpenableDurableCatalogState};
18use mz_controller_types::ReplicaId;
19use mz_ore::channel::trigger;
20use mz_ore::exit;
21use mz_ore::halt;
22use mz_ore::str::separated;
23use mz_persist_client::PersistClient;
24use mz_repr::{CatalogItemId, Timestamp};
25use mz_sql::catalog::EnvironmentId;
26use tracing::info;
27
28use crate::BUILD_INFO;
29use crate::deployment::state::DeploymentState;
30
31/// The necessary input for preflight checks.
32pub struct PreflightInput {
33    pub boot_ts: Timestamp,
34    pub environment_id: EnvironmentId,
35    pub persist_client: PersistClient,
36    pub deploy_generation: u64,
37    pub deployment_state: DeploymentState,
38    pub openable_adapter_storage: Box<dyn OpenableDurableCatalogState>,
39    pub catalog_metrics: Arc<Metrics>,
40    pub caught_up_max_wait: Duration,
41    pub ddl_check_interval: Duration,
42    pub panic_after_timeout: bool,
43    pub bootstrap_args: BootstrapArgs,
44}
45
46/// Output of preflight checks.
47pub struct PreflightOutput {
48    pub openable_adapter_storage: Box<dyn OpenableDurableCatalogState>,
49    pub read_only: bool,
50    pub caught_up_trigger: Option<trigger::Trigger>,
51}
52
53/// Perform a 0dt preflight check.
54///
55/// Returns the openable adapter storage to use and whether or not to boot in
56/// read only mode.
57pub async fn preflight_0dt(
58    PreflightInput {
59        boot_ts,
60        environment_id,
61        persist_client,
62        deploy_generation,
63        deployment_state,
64        mut openable_adapter_storage,
65        catalog_metrics,
66        caught_up_max_wait,
67        ddl_check_interval,
68        panic_after_timeout,
69        bootstrap_args,
70    }: PreflightInput,
71) -> Result<PreflightOutput, CatalogError> {
72    info!(%deploy_generation, ?caught_up_max_wait, "performing 0dt preflight checks");
73
74    if !openable_adapter_storage.is_initialized().await? {
75        info!("catalog not initialized; booting with writes allowed");
76        return Ok(PreflightOutput {
77            openable_adapter_storage,
78            read_only: false,
79            caught_up_trigger: None,
80        });
81    }
82
83    let catalog_generation = openable_adapter_storage.get_deployment_generation().await?;
84    info!(%catalog_generation, "catalog initialized");
85    if catalog_generation < deploy_generation {
86        info!("this deployment is a new generation; booting in read only mode");
87
88        let (caught_up_trigger, mut caught_up_receiver) = trigger::channel();
89
90        // Spawn a background task to handle promotion to leader.
91        mz_ore::task::spawn(|| "preflight_0dt", async move {
92            let (initial_next_user_item_id, initial_next_replica_id) = get_next_ids(
93                boot_ts,
94                persist_client.clone(),
95                environment_id.clone(),
96                deploy_generation,
97                Arc::clone(&catalog_metrics),
98                bootstrap_args.clone(),
99            )
100            .await;
101
102            info!(
103                %initial_next_user_item_id,
104                %initial_next_replica_id,
105                "waiting for deployment to be caught up");
106
107            let caught_up_max_wait_fut = async {
108                tokio::time::sleep(caught_up_max_wait).await;
109                ()
110            };
111            let mut caught_up_max_wait_fut = pin!(caught_up_max_wait_fut);
112
113            let mut skip_catchup = deployment_state.set_catching_up();
114
115            let mut check_ddl_changes_interval = tokio::time::interval(ddl_check_interval);
116            check_ddl_changes_interval
117                .set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
118
119            let mut should_skip_catchup = false;
120            loop {
121                tokio::select! {
122                    biased;
123
124                    () = &mut skip_catchup => {
125                        info!("skipping waiting for deployment to catch up due to administrator request");
126                        should_skip_catchup = true;
127                        break;
128                    }
129                    () = &mut caught_up_receiver => {
130                        info!("deployment caught up");
131                        break;
132                    }
133                    () = &mut caught_up_max_wait_fut => {
134                        if panic_after_timeout {
135                            panic!("not caught up within {:?}", caught_up_max_wait);
136                        }
137                        info!("not caught up within {:?}, proceeding now", caught_up_max_wait);
138                        break;
139                    }
140                    _ = check_ddl_changes_interval.tick() => {
141                        check_ddl_changes(
142                            boot_ts,
143                            persist_client.clone(),
144                            environment_id.clone(),
145                            deploy_generation,
146                            Arc::clone(&catalog_metrics),
147                            bootstrap_args.clone(),
148                            initial_next_user_item_id,
149                            initial_next_replica_id,
150                        )
151                        .await;
152                    }
153                }
154            }
155
156            // Check for DDL changes one last time before announcing as ready to
157            // promote.
158            if !should_skip_catchup {
159                check_ddl_changes(
160                    boot_ts,
161                    persist_client.clone(),
162                    environment_id.clone(),
163                    deploy_generation,
164                    Arc::clone(&catalog_metrics),
165                    bootstrap_args.clone(),
166                    initial_next_user_item_id,
167                    initial_next_replica_id,
168                )
169                .await;
170            }
171
172            // Announce that we're ready to promote.
173            let promoted = deployment_state.set_ready_to_promote();
174            info!("announced as ready to promote; waiting for promotion");
175            promoted.await;
176
177            // Take over the catalog.
178            info!("promoted; attempting takeover");
179
180            // NOTE: There _is_ a window where DDL can happen in the old
181            // environment, between checking above, us announcing as ready to
182            // promote, and cloud giving us the go-ahead signal. Its size
183            // depends on how quickly cloud will trigger promotion once we
184            // report as ready.
185            //
186            // We could add another check here, right before cutting over, but I
187            // think this requires changes in Cloud: with this additional check,
188            // it can now happen that cloud gives us the promote signal but we
189            // then notice there were changes and restart. Could would have to
190            // notice this and give us the promote signal again, once we're
191            // ready again.
192
193            let openable_adapter_storage = mz_catalog::durable::persist_backed_catalog_state(
194                persist_client.clone(),
195                environment_id.organization_id(),
196                BUILD_INFO.semver_version(),
197                Some(deploy_generation),
198                Arc::clone(&catalog_metrics),
199            )
200            .await
201            .expect("incompatible catalog/persist version");
202
203            let (_catalog, _audit_logs) = openable_adapter_storage
204                .open(boot_ts, &bootstrap_args)
205                .await
206                .unwrap_or_terminate("unexpected error while fencing out old deployment");
207
208            // Reboot as the leader.
209            halt!("fenced out old deployment; rebooting as leader")
210        });
211
212        Ok(PreflightOutput {
213            openable_adapter_storage,
214            read_only: true,
215            caught_up_trigger: Some(caught_up_trigger),
216        })
217    } else if catalog_generation == deploy_generation {
218        info!("this deployment is the current generation; booting with writes allowed");
219        Ok(PreflightOutput {
220            openable_adapter_storage,
221            read_only: false,
222            caught_up_trigger: None,
223        })
224    } else {
225        exit!(0, "this deployment has been fenced out");
226    }
227}
228
229/// Check if there have been any DDL that create new collections or replicas,
230/// restart in read-only mode if so, in order to pick up those new items and
231/// start hydrating them before cutting over.
232///
233/// We do this by checking if new IDs that were allocated after the preflight
234/// check began were committed to the catalog.
235async fn check_ddl_changes(
236    boot_ts: Timestamp,
237    persist_client: PersistClient,
238    environment_id: EnvironmentId,
239    deploy_generation: u64,
240    catalog_metrics: Arc<Metrics>,
241    bootstrap_args: BootstrapArgs,
242    initial_next_user_item_id: u64,
243    initial_next_replica_id: u64,
244) {
245    let openable_adapter_storage = mz_catalog::durable::persist_backed_catalog_state(
246        persist_client,
247        environment_id.organization_id(),
248        BUILD_INFO.semver_version(),
249        Some(deploy_generation),
250        catalog_metrics,
251    )
252    .await
253    .expect("incompatible catalog/persist version");
254
255    let (mut catalog, _audit_logs) = openable_adapter_storage
256        .open_savepoint(boot_ts, &bootstrap_args)
257        .await
258        .unwrap_or_terminate("can open in savepoint mode");
259
260    let tx = catalog
261        .transaction()
262        .await
263        .unwrap_or_terminate("unexpected error while getting transaction");
264
265    // We must explicitly check the catalog for these IDs since IDs can be
266    // allocated during sequencing/planning but not yet committed to the catalog.
267    // Furthermore, these IDs might never be committed to the catalog because
268    // their sequencing has been aborted.
269    let new_replicas = tx
270        .get_cluster_replicas()
271        .filter_map(|replica| match replica.replica_id {
272            ReplicaId::User(id) if id >= initial_next_replica_id => Some(replica),
273            _ => None,
274        })
275        .collect::<Vec<_>>();
276
277    let new_objects = tx
278        .get_items()
279        .filter_map(|item| match item.id {
280            CatalogItemId::User(id) if id >= initial_next_user_item_id => Some(item),
281            _ => None,
282        })
283        .collect::<Vec<_>>();
284
285    if new_replicas.is_empty() && new_objects.is_empty() {
286        return;
287    }
288
289    let mut info_parts = Vec::new();
290
291    if !new_replicas.is_empty() {
292        let replicas = new_replicas.iter().map(|r| {
293            format!(
294                "{{replica_id: {}, replica_name: {}, cluster_id: {}}}",
295                r.replica_id, r.name, r.cluster_id
296            )
297        });
298        info_parts.push(format!("New replicas: [{}]", separated(", ", replicas)));
299    }
300
301    if !new_objects.is_empty() {
302        let objects = new_objects
303            .iter()
304            .map(|o| format!("{{object_id: {}, object_name: {}}}", o.id, o.name));
305        info_parts.push(format!("New objects: [{}]", separated(", ", objects)));
306    }
307
308    let extra_info = separated(". ", info_parts);
309
310    halt!(
311        "there have been DDL that we need to react to; rebooting in read-only mode. {}",
312        extra_info
313    )
314}
315
316/// Gets and returns the next user item ID and user replica ID that would be
317/// allocated as of the current catalog state.
318async fn get_next_ids(
319    boot_ts: Timestamp,
320    persist_client: PersistClient,
321    environment_id: EnvironmentId,
322    deploy_generation: u64,
323    catalog_metrics: Arc<Metrics>,
324    bootstrap_args: BootstrapArgs,
325) -> (u64, u64) {
326    let openable_adapter_storage = mz_catalog::durable::persist_backed_catalog_state(
327        persist_client,
328        environment_id.organization_id(),
329        BUILD_INFO.semver_version(),
330        Some(deploy_generation),
331        catalog_metrics,
332    )
333    .await
334    .expect("incompatible catalog/persist version");
335
336    let (mut catalog, _audit_logs) = openable_adapter_storage
337        .open_savepoint(boot_ts, &bootstrap_args)
338        .await
339        .unwrap_or_terminate("can open in savepoint mode");
340
341    let next_user_item_id = catalog
342        .get_next_user_item_id()
343        .await
344        .expect("can access catalog");
345    let next_replica_item_id = catalog
346        .get_next_user_replica_id()
347        .await
348        .expect("can access catalog");
349
350    (next_user_item_id, next_replica_item_id)
351}