1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194
// Copyright Materialize, Inc. and contributors. All rights reserved.
// Use of this software is governed by the Business Source License
// included in the LICENSE file.
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.
//! Deployment state handling.
use std::future::Future;
use std::sync::{Arc, Mutex};
use mz_orchestratord::controller::materialize::environmentd::DeploymentStatus;
use mz_ore::channel::trigger::{self, Trigger};
enum DeploymentStateInner {
CatchingUp { _skip_trigger: Option<Trigger> },
ReadyToPromote { _promote_trigger: Trigger },
/// The state of an environment deployment.
/// This object should be held by the `environmentd` server. It provides methods
/// to handle state transitions that should be driven by the server itself.
/// A deployment begins in the `Initializing` state.
/// If, during initialization, the server realizes that it is taking over from a
/// failed `environmentd` process of a generation that is already the leader,
/// the server may proceed directly to the `IsLeader` state, via
/// [`DeploymentState::set_is_leader`].
/// Otherwise, the server should leave the deployment state in `Initializing`
/// while performing initialization activities. Once the server is catching up
/// its workloads, it should proceeded to the `CatchingUp` state. Once the
/// environment is ready to take over from the prior generation, the server
/// should call [`DeploymentState::set_ready_to_promote`]. After this, the
/// server should *not* call [`DeploymentState::set_is_leader`], as an external
/// orchestrator will determine when promotion occurs. The future returned by
/// `set_ready_to_promote` will resolve when promotion has occurred and the
/// deployment should take over from the prior generation and begin serving
/// queries.
pub struct DeploymentState {
inner: Arc<Mutex<DeploymentStateInner>>,
impl DeploymentState {
/// Creates a new `LeaderState` for a deployment.
/// Returns the state and a handle to the state.
pub fn new() -> (DeploymentState, DeploymentStateHandle) {
let inner = Arc::new(Mutex::new(DeploymentStateInner::Initializing));
let state = DeploymentState {
inner: Arc::clone(&inner),
let handle = DeploymentStateHandle { inner };
(state, handle)
/// Marks the deployment as catching up.
/// Returns a future that resolves if the catch up phase should be skipped.
pub fn set_catching_up(&self) -> impl Future<Output = ()> {
let (skip_trigger, skip_rx) = trigger::channel();
let mut inner = self.inner.lock().expect("lock poisoned");
matches!(*inner, DeploymentStateInner::Initializing),
"LeaderState::set_catching_up called on non-initializing state",
*inner = DeploymentStateInner::CatchingUp {
_skip_trigger: Some(skip_trigger),
/// Marks the deployment as ready to be promoted to leader.
/// Returns a future that resolves when the leadership promotion occurs.
/// When the function returns, the state will be `ReadyToPromote`. When the
/// returned future resolves, the state will be `Promoting`.
/// Panics if the leader state is not `Initializing`.
pub fn set_ready_to_promote(&self) -> impl Future<Output = ()> {
let (promote_trigger, promote_trigger_rx) = trigger::channel();
let mut inner = self.inner.lock().expect("lock poisoned");
DeploymentStateInner::Initializing | DeploymentStateInner::CatchingUp { .. }
"LeaderState::set_ready_to_promote called on invalid state",
*inner = DeploymentStateInner::ReadyToPromote {
_promote_trigger: promote_trigger,
/// Marks the deployment as the leader.
/// Panics if the leader state is not `Initializing` or `Promoting`.
pub fn set_is_leader(&self) {
let mut inner = self.inner.lock().expect("lock poisoned");
DeploymentStateInner::Initializing | DeploymentStateInner::Promoting
"LeaderState::set_is_leader called on non-initializing state",
*inner = DeploymentStateInner::IsLeader;
/// A cloneable handle to a [`DeploymentState`].
/// This should be held by modules providing external interfaces to
/// `environmentd` (e.g., the HTTP server). It provides methods to inspect the
/// current leadership state, and to promote the deployment to the leader if it
/// is ready to do so.
pub struct DeploymentStateHandle {
inner: Arc<Mutex<DeploymentStateInner>>,
impl DeploymentStateHandle {
/// Returns the current deployment status.
pub fn status(&self) -> DeploymentStatus {
let inner = self.inner.lock().expect("lock poisoned");
match *inner {
DeploymentStateInner::Initializing => DeploymentStatus::Initializing,
DeploymentStateInner::CatchingUp { .. } => DeploymentStatus::Initializing,
DeploymentStateInner::ReadyToPromote { .. } => DeploymentStatus::ReadyToPromote,
DeploymentStateInner::Promoting => DeploymentStatus::Promoting,
DeploymentStateInner::IsLeader => DeploymentStatus::IsLeader,
/// Attempts to skip the catchup phase for the deployment.
/// Deployments in the `Initializing` phase cannot have their catchup phase
/// skipped. Deployments in the `ReadyToPromote`, `Promoting`, and
/// `IsLeader` states can be promoted (with the latter two cases being
/// no-ops).
/// If skipping the catchup was successful, returns `Ok`. Otherwise, returns
/// `Err`.
pub fn try_skip_catchup(&self) -> Result<(), ()> {
let mut inner = self.inner.lock().expect("lock poisoned");
match &mut *inner {
DeploymentStateInner::Initializing => Err(()),
DeploymentStateInner::CatchingUp { _skip_trigger } => {
*_skip_trigger = None;
DeploymentStateInner::ReadyToPromote { .. } => Ok(()),
DeploymentStateInner::Promoting => Ok(()),
DeploymentStateInner::IsLeader => Ok(()),
/// Attempts to promote this deployment to the leader.
/// Deployments in the `Initializing` or `CatchingUp` state cannot be
/// promoted. Deployments in the `ReadyToPromote`, `Promoting`, and
/// `IsLeader` states can be promoted (with the latter two cases being
/// no-ops).
/// If the leader was successfully promoted, returns `Ok`. Otherwise,
/// returns `Err`.
pub fn try_promote(&self) -> Result<(), ()> {
let mut inner = self.inner.lock().expect("lock poisoned");
match *inner {
DeploymentStateInner::Initializing => Err(()),
DeploymentStateInner::CatchingUp { .. } => Err(()),
DeploymentStateInner::ReadyToPromote { .. } => {
*inner = DeploymentStateInner::Promoting;
DeploymentStateInner::Promoting => Ok(()),
DeploymentStateInner::IsLeader => Ok(()),