mz/command/
region.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License in the LICENSE file at the
6// root of this repository, or online at
7//
8//     http://www.apache.org/licenses/LICENSE-2.0
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16//! Implementation of the `mz region` command.
17//!
18//! Consult the user-facing documentation for details.
19//!
20use std::time::Duration;
21
22use crate::{context::RegionContext, error::Error};
23
24use mz_cloud_api::client::{cloud_provider::CloudProvider, region::RegionState};
25use mz_ore::retry::Retry;
26use serde::{Deserialize, Serialize};
27use tabled::Tabled;
28
29/// Enable a region in the profile organization.
30///
31/// In cases where the organization has already enabled the region
32/// the command will try to run a version update. Resulting
33/// in a downtime for a short period.
34pub async fn enable(
35    cx: RegionContext,
36    version: Option<String>,
37    environmentd_extra_arg: Option<Vec<String>>,
38) -> Result<(), Error> {
39    let loading_spinner = cx
40        .output_formatter()
41        .loading_spinner("Retrieving information...");
42    let cloud_provider = cx.get_cloud_provider().await?;
43
44    loading_spinner.set_message("Enabling the region...");
45
46    let environmentd_extra_arg: Vec<String> = environmentd_extra_arg.unwrap_or_else(Vec::new);
47
48    // Loop region creation.
49    // After 6 minutes it will timeout.
50    let _ = Retry::default()
51        .max_duration(Duration::from_secs(720))
52        .clamp_backoff(Duration::from_secs(1))
53        .retry_async(|_| async {
54            let _ = cx
55                .cloud_client()
56                .create_region(
57                    version.clone(),
58                    environmentd_extra_arg.clone(),
59                    cloud_provider.clone(),
60                )
61                .await?;
62            Ok(())
63        })
64        .await
65        .map_err(|e| Error::TimeoutError(Box::new(e)))?;
66
67    loading_spinner.set_message("Waiting for the region to be online...");
68
69    // Loop retrieving the region and checking the SQL connection for 6 minutes.
70    // After 6 minutes it will timeout.
71    let _ = Retry::default()
72        .max_duration(Duration::from_secs(720))
73        .clamp_backoff(Duration::from_secs(1))
74        .retry_async(|_| async {
75            let region = cx.get_region().await?;
76
77            match region.region_state {
78                RegionState::EnablementPending => {
79                    loading_spinner.set_message("Waiting for the region to be ready...");
80                    Err(Error::NotReadyRegion)
81                }
82                RegionState::DeletionPending => Err(Error::CommandExecutionError(
83                    "This region is pending deletion!".to_string(),
84                )),
85                RegionState::SoftDeleted => Err(Error::CommandExecutionError(
86                    "This region has been marked soft-deleted!".to_string(),
87                )),
88                RegionState::Enabled => match region.region_info {
89                    Some(region_info) => {
90                        loading_spinner.set_message("Waiting for the region to be resolvable...");
91                        if region_info.resolvable {
92                            let claims = cx.admin_client().claims().await?;
93                            let user = claims.user()?;
94                            if cx.sql_client().is_ready(&region_info, user)? {
95                                return Ok(());
96                            }
97                            Err(Error::NotPgReadyError)
98                        } else {
99                            Err(Error::NotResolvableRegion)
100                        }
101                    }
102                    None => Err(Error::NotReadyRegion),
103                },
104            }
105        })
106        .await
107        .map_err(|e| Error::TimeoutError(Box::new(e)))?;
108
109    loading_spinner.finish_with_message(format!("Region in {} is now online", cloud_provider.id));
110
111    Ok(())
112}
113
114/// Disable a region in the profile organization.
115///
116/// This command can take several minutes to complete.
117pub async fn disable(cx: RegionContext, hard: bool) -> Result<(), Error> {
118    let loading_spinner = cx
119        .output_formatter()
120        .loading_spinner("Retrieving information...");
121
122    let cloud_provider = cx.get_cloud_provider().await?;
123
124    // The `delete_region` method retries disabling a region,
125    // has an inner timeout, and manages a `504` response.
126    // For any other type of error response, we handle it here
127    // with a retry loop.
128    Retry::default()
129        .max_duration(Duration::from_secs(720))
130        .clamp_backoff(Duration::from_secs(1))
131        .retry_async(|_| async {
132            loading_spinner.set_message("Disabling region...");
133            cx.cloud_client()
134                .delete_region(cloud_provider.clone(), hard)
135                .await?;
136
137            loading_spinner.finish_with_message("Region disabled.");
138            Ok(())
139        })
140        .await
141}
142
143/// Lists all the available regions and their status.
144pub async fn list(cx: RegionContext) -> Result<(), Error> {
145    let output_formatter = cx.output_formatter();
146    let loading_spinner = output_formatter.loading_spinner("Retrieving regions...");
147
148    #[derive(Deserialize, Serialize, Tabled)]
149    pub struct Region<'a> {
150        #[tabled(rename = "Region")]
151        region: String,
152        #[tabled(rename = "Status")]
153        status: &'a str,
154    }
155
156    let cloud_providers: Vec<CloudProvider> = cx.cloud_client().list_cloud_regions().await?;
157    let mut regions: Vec<Region> = vec![];
158
159    for cloud_provider in cloud_providers {
160        match cx.cloud_client().get_region(cloud_provider.clone()).await {
161            Ok(_) => regions.push(Region {
162                region: cloud_provider.id,
163                status: "enabled",
164            }),
165            Err(mz_cloud_api::error::Error::EmptyRegion) => regions.push(Region {
166                region: cloud_provider.id,
167                status: "disabled",
168            }),
169            Err(err) => {
170                println!("Error: {:?}", err)
171            }
172        }
173    }
174
175    loading_spinner.finish_and_clear();
176    output_formatter.output_table(regions)?;
177    Ok(())
178}
179
180/// Shows the health of the profile region followed by the HTTP and SQL endpoints.
181pub async fn show(cx: RegionContext) -> Result<(), Error> {
182    // Sharing the reference of the context in multiple places makes
183    // it necesarry to wrap in an `alloc::rc`.
184
185    let output_formatter = cx.output_formatter();
186    let loading_spinner = output_formatter.loading_spinner("Retrieving region...");
187
188    let region_info = cx.get_region_info().await?;
189
190    loading_spinner.set_message("Checking environment health...");
191    let claims = cx.admin_client().claims().await?;
192    let sql_client = cx.sql_client();
193    let environment_health = match sql_client.is_ready(&region_info, claims.user()?) {
194        Ok(healthy) => match healthy {
195            true => "yes",
196            _ => "no",
197        },
198        Err(_) => "no",
199    };
200
201    loading_spinner.finish_and_clear();
202    output_formatter.output_scalar(Some(&format!("Healthy: \t{}", environment_health)))?;
203    output_formatter.output_scalar(Some(&format!("SQL address: \t{}", region_info.sql_address)))?;
204    output_formatter.output_scalar(Some(&format!("HTTP URL: \t{}", region_info.http_address)))?;
205
206    Ok(())
207}