Skip to main content

mz_compute_client/controller/
error.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Errors returned by the compute controller.
11//!
12//! The guiding principle for error handling in the compute controller is that each public method
13//! should return an error type that defines exactly the error variants the method can return, and
14//! no additional ones. This precludes the usage of a single `ComputeControllerError` enum that
15//! simply includes the union of all error variants the compute controller can ever return (and
16//! possibly some internal ones that are never returned to external callers). Instead, compute
17//! controller methods return bespoke error types that serve as documentation for the failure modes
18//! of each method and make it easy for callers to ensure that all possible errors are handled.
19
20use mz_repr::GlobalId;
21use thiserror::Error;
22
23pub use mz_storage_types::errors::CollectionMissing;
24
25use crate::controller::{ComputeInstanceId, ReplicaId};
26
27/// The error returned by replica-targeted peeks and subscribes when the target replica
28/// disconnects.
29pub const ERROR_TARGET_REPLICA_FAILED: &str = "target replica failed or was dropped";
30
31/// Error returned in response to a reference to an unknown compute instance.
32#[derive(Error, Debug)]
33#[error("instance does not exist: {0}")]
34pub struct InstanceMissing(pub ComputeInstanceId);
35
36/// Error returned in response to a request to create a compute instance with an ID of an existing
37/// compute instance.
38#[derive(Error, Debug)]
39#[error("instance exists already: {0}")]
40pub struct InstanceExists(pub ComputeInstanceId);
41
42/// Error returned in response to a reference to an unknown compute collection.
43#[derive(Error, Debug)]
44#[error("No replicas found in cluster for target list.")]
45pub struct HydrationCheckBadTarget(pub Vec<ReplicaId>);
46
47/// Errors arising during compute collection frontiers lookup.
48#[derive(Error, Debug)]
49pub enum CollectionLookupError {
50    /// The specified compute instance does not exist.
51    #[error("instance does not exist: {0}")]
52    InstanceMissing(ComputeInstanceId),
53    /// The compute collection does not exist.
54    #[error("collection does not exist: {0}")]
55    CollectionMissing(GlobalId),
56}
57
58impl From<InstanceMissing> for CollectionLookupError {
59    fn from(error: InstanceMissing) -> Self {
60        Self::InstanceMissing(error.0)
61    }
62}
63
64impl From<CollectionMissing> for CollectionLookupError {
65    fn from(error: CollectionMissing) -> Self {
66        Self::CollectionMissing(error.0)
67    }
68}
69
70/// Errors arising during compute replica creation.
71#[derive(Error, Debug)]
72pub enum ReplicaCreationError {
73    /// The target compute instance does not exist.
74    #[error("instance does not exist: {0}")]
75    InstanceMissing(ComputeInstanceId),
76    /// A replica with the given ID already exists on the target instance.
77    #[error("replica exists already: {0}")]
78    ReplicaExists(ReplicaId),
79}
80
81impl From<InstanceMissing> for ReplicaCreationError {
82    fn from(error: InstanceMissing) -> Self {
83        Self::InstanceMissing(error.0)
84    }
85}
86
87/// Errors arising during compute replica removal.
88#[derive(Error, Debug)]
89pub enum ReplicaDropError {
90    /// The target compute instance does not exist.
91    #[error("instance does not exist: {0}")]
92    InstanceMissing(ComputeInstanceId),
93    /// The replica to be dropped does not exist on the target instance.
94    #[error("replica does not exist: {0}")]
95    ReplicaMissing(ReplicaId),
96}
97
98impl From<InstanceMissing> for ReplicaDropError {
99    fn from(error: InstanceMissing) -> Self {
100        Self::InstanceMissing(error.0)
101    }
102}
103
104/// Errors arising during dataflow creation.
105#[derive(Error, Debug)]
106pub enum DataflowCreationError {
107    /// The given instance does not exist.
108    #[error("instance does not exist: {0}")]
109    InstanceMissing(ComputeInstanceId),
110    /// One of the imported collections does not exist.
111    #[error("collection does not exist: {0}")]
112    CollectionMissing(GlobalId),
113    /// The targeted replica does not exist.
114    #[error("replica does not exist: {0}")]
115    ReplicaMissing(ReplicaId),
116    /// The dataflow definition has doesn't have an `as_of` set.
117    #[error("dataflow definition lacks an as_of value")]
118    MissingAsOf,
119    /// One of the imported collections has a read frontier greater than the dataflow `as_of`.
120    #[error("dataflow has an as_of not beyond the since of collection: {0}")]
121    SinceViolation(GlobalId),
122    /// We skip dataflow creation for empty `as_of`s, which would be a problem for a SUBSCRIBE,
123    /// because an initial response is expected.
124    #[error("subscribe dataflow has an empty as_of")]
125    EmptyAsOfForSubscribe,
126    /// We skip dataflow creation for empty `as_of`s, which would be a problem for a COPY TO,
127    /// because it should always have an external side effect.
128    #[error("copy to dataflow has an empty as_of")]
129    EmptyAsOfForCopyTo,
130}
131
132impl From<InstanceMissing> for DataflowCreationError {
133    fn from(error: InstanceMissing) -> Self {
134        Self::InstanceMissing(error.0)
135    }
136}
137
138impl From<CollectionMissing> for DataflowCreationError {
139    fn from(error: CollectionMissing) -> Self {
140        Self::CollectionMissing(error.0)
141    }
142}
143
144/// Errors arising during peek processing.
145#[derive(Error, Debug)]
146pub enum PeekError {
147    /// The instance that the peek was issued against does not exist.
148    #[error("instance does not exist: {0}")]
149    InstanceMissing(ComputeInstanceId),
150    /// The peek's target collection was not found.
151    #[error("collection does not exist: {0}")]
152    CollectionMissing(GlobalId),
153    /// The replica that the peek was issued against does not exist.
154    #[error("replica does not exist: {0}")]
155    ReplicaMissing(ReplicaId),
156    /// The read hold that was passed in is for a later time than the peek's timestamp.
157    #[error("peek timestamp is not beyond the since of collection: {0}")]
158    SinceViolation(GlobalId),
159}
160
161impl From<InstanceMissing> for PeekError {
162    fn from(error: InstanceMissing) -> Self {
163        Self::InstanceMissing(error.0)
164    }
165}
166
167impl From<CollectionMissing> for PeekError {
168    fn from(error: CollectionMissing) -> Self {
169        Self::CollectionMissing(error.0)
170    }
171}
172
173/// Errors arising during collection updates.
174#[derive(Error, Debug)]
175pub enum CollectionUpdateError {
176    /// The target compute instance does not exist.
177    #[error("instance does not exist: {0}")]
178    InstanceMissing(ComputeInstanceId),
179    /// The collection to be updated does not exist.
180    #[error("collection does not exist: {0}")]
181    CollectionMissing(GlobalId),
182}
183
184impl From<InstanceMissing> for CollectionUpdateError {
185    fn from(error: InstanceMissing) -> Self {
186        Self::InstanceMissing(error.0)
187    }
188}
189
190impl From<CollectionMissing> for CollectionUpdateError {
191    fn from(error: CollectionMissing) -> Self {
192        Self::CollectionMissing(error.0)
193    }
194}
195
196/// Errors arising during collection read policy assignment.
197#[derive(Error, Debug)]
198pub enum ReadPolicyError {
199    /// The target compute instance does not exist.
200    #[error("instance does not exist: {0}")]
201    InstanceMissing(ComputeInstanceId),
202    /// The collection does not exist.
203    #[error("collection does not exist: {0}")]
204    CollectionMissing(GlobalId),
205    /// The collection is write-only and does not support read policies.
206    #[error("collection is write-only: {0}")]
207    WriteOnlyCollection(GlobalId),
208}
209
210impl From<InstanceMissing> for ReadPolicyError {
211    fn from(error: InstanceMissing) -> Self {
212        Self::InstanceMissing(error.0)
213    }
214}
215
216impl From<CollectionMissing> for ReadPolicyError {
217    fn from(error: CollectionMissing) -> Self {
218        Self::CollectionMissing(error.0)
219    }
220}
221
222/// Errors arising during orphan removal.
223#[derive(Error, Debug)]
224pub enum RemoveOrphansError {
225    /// An error occurred in the orchestrator while removing orphaned replicas.
226    #[error("orchestrator error: {0}")]
227    OrchestratorError(anyhow::Error),
228}
229
230impl From<anyhow::Error> for RemoveOrphansError {
231    fn from(error: anyhow::Error) -> Self {
232        Self::OrchestratorError(error)
233    }
234}