Skip to main content

mz_compute_client/controller/
error.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Errors returned by the compute controller.
11//!
12//! The guiding principle for error handling in the compute controller is that each public method
13//! should return an error type that defines exactly the error variants the method can return, and
14//! no additional ones. This precludes the usage of a single `ComputeControllerError` enum that
15//! simply includes the union of all error variants the compute controller can ever return (and
16//! possibly some internal ones that are never returned to external callers). Instead, compute
17//! controller methods return bespoke error types that serve as documentation for the failure modes
18//! of each method and make it easy for callers to ensure that all possible errors are handled.
19
20use mz_repr::GlobalId;
21use thiserror::Error;
22
23pub use mz_storage_types::errors::CollectionMissing;
24
25use crate::controller::{ComputeInstanceId, ReplicaId};
26
27/// The error returned by replica-targeted peeks and subscribes when the target replica
28/// disconnects.
29pub const ERROR_TARGET_REPLICA_FAILED: &str = "target replica failed or was dropped";
30
31/// Error returned in response to a reference to an unknown compute instance.
32#[derive(Error, Debug)]
33#[error("instance does not exist: {0}")]
34pub struct InstanceMissing(pub ComputeInstanceId);
35
36/// Error returned in response to a request to create a compute instance with an ID of an existing
37/// compute instance.
38#[derive(Error, Debug)]
39#[error("instance exists already: {0}")]
40pub struct InstanceExists(pub ComputeInstanceId);
41
42/// Error returned in response to a reference to an unknown compute collection.
43#[derive(Error, Debug)]
44#[error("No replicas found in cluster for target list.")]
45pub struct HydrationCheckBadTarget(pub Vec<ReplicaId>);
46
47/// Errors arising during compute collection frontiers lookup.
48#[derive(Error, Debug)]
49pub enum CollectionLookupError {
50    /// The specified compute instance does not exist.
51    #[error("instance does not exist: {0}")]
52    InstanceMissing(ComputeInstanceId),
53    /// The compute collection does not exist.
54    #[error("collection does not exist: {0}")]
55    CollectionMissing(GlobalId),
56}
57
58impl From<InstanceMissing> for CollectionLookupError {
59    fn from(error: InstanceMissing) -> Self {
60        Self::InstanceMissing(error.0)
61    }
62}
63
64impl From<CollectionMissing> for CollectionLookupError {
65    fn from(error: CollectionMissing) -> Self {
66        Self::CollectionMissing(error.0)
67    }
68}
69
70/// Errors arising during compute replica creation.
71#[derive(Error, Debug)]
72pub enum ReplicaCreationError {
73    /// The target compute instance does not exist.
74    #[error("instance does not exist: {0}")]
75    InstanceMissing(ComputeInstanceId),
76    /// A replica with the given ID already exists on the target instance.
77    #[error("replica exists already: {0}")]
78    ReplicaExists(ReplicaId),
79}
80
81impl From<InstanceMissing> for ReplicaCreationError {
82    fn from(error: InstanceMissing) -> Self {
83        Self::InstanceMissing(error.0)
84    }
85}
86
87/// Errors arising during compute replica removal.
88#[derive(Error, Debug)]
89pub enum ReplicaDropError {
90    /// The target compute instance does not exist.
91    #[error("instance does not exist: {0}")]
92    InstanceMissing(ComputeInstanceId),
93    /// The replica to be dropped does not exist on the target instance.
94    #[error("replica does not exist: {0}")]
95    ReplicaMissing(ReplicaId),
96}
97
98impl From<InstanceMissing> for ReplicaDropError {
99    fn from(error: InstanceMissing) -> Self {
100        Self::InstanceMissing(error.0)
101    }
102}
103
104/// Errors arising during dataflow creation.
105#[derive(Error, Debug)]
106pub enum DataflowCreationError {
107    /// The given instance does not exist.
108    #[error("instance does not exist: {0}")]
109    InstanceMissing(ComputeInstanceId),
110    /// One of the imported collections does not exist.
111    #[error("collection does not exist: {0}")]
112    CollectionMissing(GlobalId),
113    /// The targeted replica does not exist.
114    #[error("replica does not exist: {0}")]
115    ReplicaMissing(ReplicaId),
116    /// The dataflow definition has doesn't have an `as_of` set.
117    #[error("dataflow definition lacks an as_of value")]
118    MissingAsOf,
119    /// One of the imported collections has a read frontier greater than the dataflow `as_of`.
120    #[error("dataflow has an as_of not beyond the since of collection: {0}")]
121    SinceViolation(GlobalId),
122    /// We skip dataflow creation for empty `as_of`s, which would be a problem for a SUBSCRIBE,
123    /// because an initial response is expected.
124    #[error("subscribe dataflow has an empty as_of")]
125    EmptyAsOfForSubscribe,
126    /// We skip dataflow creation for empty `as_of`s, which would be a problem for a COPY TO,
127    /// because it should always have an external side effect.
128    #[error("copy to dataflow has an empty as_of")]
129    EmptyAsOfForCopyTo,
130}
131
132impl From<InstanceMissing> for DataflowCreationError {
133    fn from(error: InstanceMissing) -> Self {
134        Self::InstanceMissing(error.0)
135    }
136}
137
138impl From<CollectionMissing> for DataflowCreationError {
139    fn from(error: CollectionMissing) -> Self {
140        Self::CollectionMissing(error.0)
141    }
142}
143
144/// Errors arising during peek processing.
145#[derive(Error, Debug)]
146pub enum PeekError {
147    /// The instance that the peek was issued against does not exist.
148    #[error("instance does not exist: {0}")]
149    InstanceMissing(ComputeInstanceId),
150    /// The peek's target collection was not found.
151    #[error("collection does not exist: {0}")]
152    CollectionMissing(GlobalId),
153    /// The replica that the peek was issued against does not exist.
154    #[error("replica does not exist: {0}")]
155    ReplicaMissing(ReplicaId),
156    /// The caller-supplied read hold is for a different collection than the peek target.
157    #[error("read hold ID does not match peeked collection: {0}")]
158    ReadHoldIdMismatch(GlobalId),
159    /// The read hold that was passed in is for a later time than the peek's timestamp.
160    #[error("peek timestamp is not beyond the since of collection: {0}")]
161    SinceViolation(GlobalId),
162}
163
164impl From<InstanceMissing> for PeekError {
165    fn from(error: InstanceMissing) -> Self {
166        Self::InstanceMissing(error.0)
167    }
168}
169
170impl From<CollectionMissing> for PeekError {
171    fn from(error: CollectionMissing) -> Self {
172        Self::CollectionMissing(error.0)
173    }
174}
175
176/// Errors arising during collection updates.
177#[derive(Error, Debug)]
178pub enum CollectionUpdateError {
179    /// The target compute instance does not exist.
180    #[error("instance does not exist: {0}")]
181    InstanceMissing(ComputeInstanceId),
182    /// The collection to be updated does not exist.
183    #[error("collection does not exist: {0}")]
184    CollectionMissing(GlobalId),
185}
186
187impl From<InstanceMissing> for CollectionUpdateError {
188    fn from(error: InstanceMissing) -> Self {
189        Self::InstanceMissing(error.0)
190    }
191}
192
193impl From<CollectionMissing> for CollectionUpdateError {
194    fn from(error: CollectionMissing) -> Self {
195        Self::CollectionMissing(error.0)
196    }
197}
198
199/// Errors arising during collection read policy assignment.
200#[derive(Error, Debug)]
201pub enum ReadPolicyError {
202    /// The target compute instance does not exist.
203    #[error("instance does not exist: {0}")]
204    InstanceMissing(ComputeInstanceId),
205    /// The collection does not exist.
206    #[error("collection does not exist: {0}")]
207    CollectionMissing(GlobalId),
208    /// The collection is write-only and does not support read policies.
209    #[error("collection is write-only: {0}")]
210    WriteOnlyCollection(GlobalId),
211}
212
213impl From<InstanceMissing> for ReadPolicyError {
214    fn from(error: InstanceMissing) -> Self {
215        Self::InstanceMissing(error.0)
216    }
217}
218
219impl From<CollectionMissing> for ReadPolicyError {
220    fn from(error: CollectionMissing) -> Self {
221        Self::CollectionMissing(error.0)
222    }
223}
224
225/// Errors arising during orphan removal.
226#[derive(Error, Debug)]
227pub enum RemoveOrphansError {
228    /// An error occurred in the orchestrator while removing orphaned replicas.
229    #[error("orchestrator error: {0}")]
230    OrchestratorError(anyhow::Error),
231}
232
233impl From<anyhow::Error> for RemoveOrphansError {
234    fn from(error: anyhow::Error) -> Self {
235        Self::OrchestratorError(error)
236    }
237}