1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License in the LICENSE file at the
6// root of this repository, or online at
7//
8// http://www.apache.org/licenses/LICENSE-2.0
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
1516//! Implementation of the `mz region` command.
17//!
18//! Consult the user-facing documentation for details.
19//!
20use std::time::Duration;
2122use crate::{context::RegionContext, error::Error};
2324use mz_cloud_api::client::{cloud_provider::CloudProvider, region::RegionState};
25use mz_ore::retry::Retry;
26use serde::{Deserialize, Serialize};
27use tabled::Tabled;
2829/// Enable a region in the profile organization.
30///
31/// In cases where the organization has already enabled the region
32/// the command will try to run a version update. Resulting
33/// in a downtime for a short period.
34pub async fn enable(
35 cx: RegionContext,
36 version: Option<String>,
37 environmentd_extra_arg: Option<Vec<String>>,
38) -> Result<(), Error> {
39let loading_spinner = cx
40 .output_formatter()
41 .loading_spinner("Retrieving information...");
42let cloud_provider = cx.get_cloud_provider().await?;
4344 loading_spinner.set_message("Enabling the region...");
4546let environmentd_extra_arg: Vec<String> = environmentd_extra_arg.unwrap_or_else(Vec::new);
4748// Loop region creation.
49 // After 6 minutes it will timeout.
50let _ = Retry::default()
51 .max_duration(Duration::from_secs(720))
52 .clamp_backoff(Duration::from_secs(1))
53 .retry_async(|_| async {
54let _ = cx
55 .cloud_client()
56 .create_region(
57 version.clone(),
58 environmentd_extra_arg.clone(),
59 cloud_provider.clone(),
60 )
61 .await?;
62Ok(())
63 })
64 .await
65.map_err(|e| Error::TimeoutError(Box::new(e)))?;
6667 loading_spinner.set_message("Waiting for the region to be online...");
6869// Loop retrieving the region and checking the SQL connection for 6 minutes.
70 // After 6 minutes it will timeout.
71let _ = Retry::default()
72 .max_duration(Duration::from_secs(720))
73 .clamp_backoff(Duration::from_secs(1))
74 .retry_async(|_| async {
75let region = cx.get_region().await?;
7677match region.region_state {
78 RegionState::EnablementPending => {
79 loading_spinner.set_message("Waiting for the region to be ready...");
80Err(Error::NotReadyRegion)
81 }
82 RegionState::DeletionPending => Err(Error::CommandExecutionError(
83"This region is pending deletion!".to_string(),
84 )),
85 RegionState::SoftDeleted => Err(Error::CommandExecutionError(
86"This region has been marked soft-deleted!".to_string(),
87 )),
88 RegionState::Enabled => match region.region_info {
89Some(region_info) => {
90 loading_spinner.set_message("Waiting for the region to be resolvable...");
91if region_info.resolvable {
92let claims = cx.admin_client().claims().await?;
93let user = claims.user()?;
94if cx.sql_client().is_ready(®ion_info, user)? {
95return Ok(());
96 }
97Err(Error::NotPgReadyError)
98 } else {
99Err(Error::NotResolvableRegion)
100 }
101 }
102None => Err(Error::NotReadyRegion),
103 },
104 }
105 })
106 .await
107.map_err(|e| Error::TimeoutError(Box::new(e)))?;
108109 loading_spinner.finish_with_message(format!("Region in {} is now online", cloud_provider.id));
110111Ok(())
112}
113114/// Disable a region in the profile organization.
115///
116/// This command can take several minutes to complete.
117pub async fn disable(cx: RegionContext, hard: bool) -> Result<(), Error> {
118let loading_spinner = cx
119 .output_formatter()
120 .loading_spinner("Retrieving information...");
121122let cloud_provider = cx.get_cloud_provider().await?;
123124// The `delete_region` method retries disabling a region,
125 // has an inner timeout, and manages a `504` response.
126 // For any other type of error response, we handle it here
127 // with a retry loop.
128Retry::default()
129 .max_duration(Duration::from_secs(720))
130 .clamp_backoff(Duration::from_secs(1))
131 .retry_async(|_| async {
132 loading_spinner.set_message("Disabling region...");
133 cx.cloud_client()
134 .delete_region(cloud_provider.clone(), hard)
135 .await?;
136137 loading_spinner.finish_with_message("Region disabled.");
138Ok(())
139 })
140 .await
141}
142143/// Lists all the available regions and their status.
144pub async fn list(cx: RegionContext) -> Result<(), Error> {
145let output_formatter = cx.output_formatter();
146let loading_spinner = output_formatter.loading_spinner("Retrieving regions...");
147148#[derive(Deserialize, Serialize, Tabled)]
149pub struct Region<'a> {
150#[tabled(rename = "Region")]
151region: String,
152#[tabled(rename = "Status")]
153status: &'a str,
154 }
155156let cloud_providers: Vec<CloudProvider> = cx.cloud_client().list_cloud_regions().await?;
157let mut regions: Vec<Region> = vec![];
158159for cloud_provider in cloud_providers {
160match cx.cloud_client().get_region(cloud_provider.clone()).await {
161Ok(_) => regions.push(Region {
162 region: cloud_provider.id,
163 status: "enabled",
164 }),
165Err(mz_cloud_api::error::Error::EmptyRegion) => regions.push(Region {
166 region: cloud_provider.id,
167 status: "disabled",
168 }),
169Err(err) => {
170println!("Error: {:?}", err)
171 }
172 }
173 }
174175 loading_spinner.finish_and_clear();
176 output_formatter.output_table(regions)?;
177Ok(())
178}
179180/// Shows the health of the profile region followed by the HTTP and SQL endpoints.
181pub async fn show(cx: RegionContext) -> Result<(), Error> {
182// Sharing the reference of the context in multiple places makes
183 // it necesarry to wrap in an `alloc::rc`.
184185let output_formatter = cx.output_formatter();
186let loading_spinner = output_formatter.loading_spinner("Retrieving region...");
187188let region_info = cx.get_region_info().await?;
189190 loading_spinner.set_message("Checking environment health...");
191let claims = cx.admin_client().claims().await?;
192let sql_client = cx.sql_client();
193let environment_health = match sql_client.is_ready(®ion_info, claims.user()?) {
194Ok(healthy) => match healthy {
195true => "yes",
196_ => "no",
197 },
198Err(_) => "no",
199 };
200201 loading_spinner.finish_and_clear();
202 output_formatter.output_scalar(Some(&format!("Healthy: \t{}", environment_health)))?;
203 output_formatter.output_scalar(Some(&format!("SQL address: \t{}", region_info.sql_address)))?;
204 output_formatter.output_scalar(Some(&format!("HTTP URL: \t{}", region_info.http_address)))?;
205206Ok(())
207}