Skip to main content

mz_storage_types/
controller.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::error::Error;
11use std::fmt::{self, Debug};
12
13use itertools::Itertools;
14use mz_ore::assert_none;
15use mz_persist_types::codec_impls::UnitSchema;
16use mz_persist_types::schema::SchemaId;
17use mz_persist_types::stats::PartStats;
18use mz_persist_types::txn::{TxnsCodec, TxnsEntry};
19use mz_persist_types::{PersistLocation, ShardId};
20use mz_repr::{Datum, GlobalId, RelationDesc, Row, SqlScalarType, Timestamp};
21use mz_sql_parser::ast::UnresolvedItemName;
22use mz_timely_util::antichain::AntichainExt;
23use serde::{Deserialize, Serialize};
24use timely::progress::Antichain;
25use tracing::error;
26
27use crate::errors::{CollectionMissing, DataflowError};
28use crate::instances::StorageInstanceId;
29use crate::sources::SourceData;
30
31/// Metadata required by a storage instance to read a storage collection
32#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
33pub struct CollectionMetadata {
34    /// The persist location where the shards are located.
35    pub persist_location: PersistLocation,
36    /// The persist shard containing the contents of this storage collection.
37    pub data_shard: ShardId,
38    /// The `RelationDesc` that describes the contents of the `data_shard`.
39    pub relation_desc: RelationDesc,
40    /// The shard id of the txn-wal shard, if `self.data_shard` is managed
41    /// by the txn-wal system, or None if it's not.
42    pub txns_shard: Option<ShardId>,
43}
44
45impl crate::AlterCompatible for CollectionMetadata {
46    fn alter_compatible(&self, id: GlobalId, other: &Self) -> Result<(), self::AlterError> {
47        if self == other {
48            return Ok(());
49        }
50
51        let CollectionMetadata {
52            // persist locations may change (though not as a result of ALTER);
53            // we allow this because if this changes unexpectedly, we will
54            // notice in other ways.
55            persist_location: _,
56            data_shard,
57            relation_desc,
58            txns_shard,
59        } = self;
60
61        let compatibility_checks = [
62            (data_shard == &other.data_shard, "data_shard"),
63            (relation_desc == &other.relation_desc, "relation_desc"),
64            (txns_shard == &other.txns_shard, "txns_shard"),
65        ];
66
67        for (compatible, field) in compatibility_checks {
68            if !compatible {
69                tracing::warn!(
70                    "CollectionMetadata incompatible at {field}:\nself:\n{:#?}\n\nother\n{:#?}",
71                    self,
72                    other
73                );
74
75                return Err(AlterError { id });
76            }
77        }
78
79        Ok(())
80    }
81}
82
83/// The subset of [`CollectionMetadata`] that must be durable stored.
84#[derive(Clone, Debug, PartialEq, PartialOrd, Ord, Eq, Serialize, Deserialize)]
85pub struct DurableCollectionMetadata {
86    pub data_shard: ShardId,
87}
88
89#[derive(Debug)]
90pub enum StorageError {
91    /// The source identifier was re-created after having been dropped,
92    /// or installed with a different description.
93    CollectionIdReused(GlobalId),
94    /// The sink identifier was re-created after having been dropped, or
95    /// installed with a different description.
96    SinkIdReused(GlobalId),
97    /// The source identifier is not present.
98    IdentifierMissing(GlobalId),
99    /// The provided identifier was invalid, maybe missing, wrong type, not registered, etc.
100    IdentifierInvalid(GlobalId),
101    /// The update contained in the appended batch was at a timestamp equal or beyond the batch's upper
102    UpdateBeyondUpper(GlobalId),
103    /// The read was at a timestamp before the collection's since
104    ReadBeforeSince(GlobalId),
105    /// The expected upper of one or more appends was different from the actual upper of the collection
106    InvalidUppers(Vec<InvalidUpper>),
107    /// The (client for) the requested cluster instance is missing.
108    IngestionInstanceMissing {
109        storage_instance_id: StorageInstanceId,
110        ingestion_id: GlobalId,
111    },
112    /// The (client for) the requested cluster instance is missing.
113    ExportInstanceMissing {
114        storage_instance_id: StorageInstanceId,
115        export_id: GlobalId,
116    },
117    /// Dataflow was not able to process a request
118    DataflowError(DataflowError),
119    /// Response to an invalid/unsupported `ALTER..` command.
120    InvalidAlter(AlterError),
121    /// The controller API was used in some invalid way. This usually indicates
122    /// a bug.
123    InvalidUsage(String),
124    /// The specified component is shutting down.
125    ShuttingDown(&'static str),
126    /// Collection metadata already exists for ID.
127    CollectionMetadataAlreadyExists(GlobalId),
128    /// Some other collection is already writing to this persist shard.
129    PersistShardAlreadyInUse(ShardId),
130    /// Raced with some other process while trying to evolve the schema of a Persist shard.
131    PersistSchemaEvolveRace {
132        global_id: GlobalId,
133        shard_id: ShardId,
134        schema_id: SchemaId,
135        relation_desc: RelationDesc,
136    },
137    /// We tried to evolve the schema of a Persist shard in an invalid way.
138    PersistInvalidSchemaEvolve {
139        global_id: GlobalId,
140        shard_id: ShardId,
141    },
142    /// Txn WAL shard already exists.
143    TxnWalShardAlreadyExists,
144    /// The item that a subsource refers to is unexpectedly missing from the
145    /// source.
146    MissingSubsourceReference {
147        ingestion_id: GlobalId,
148        reference: UnresolvedItemName,
149    },
150    /// We failed to determine the real-time-recency timestamp.
151    RtrTimeout(GlobalId),
152    /// The collection was dropped before we could ingest its external frontier.
153    RtrDropFailure(GlobalId),
154    /// A generic error that happens during operations of the storage controller.
155    // TODO(aljoscha): Get rid of this!
156    Generic(anyhow::Error),
157    /// We are in read-only mode and were asked to do a something that requires
158    /// writing.
159    ReadOnly,
160}
161
162impl Error for StorageError {
163    fn source(&self) -> Option<&(dyn Error + 'static)> {
164        match self {
165            Self::CollectionIdReused(_) => None,
166            Self::SinkIdReused(_) => None,
167            Self::IdentifierMissing(_) => None,
168            Self::IdentifierInvalid(_) => None,
169            Self::UpdateBeyondUpper(_) => None,
170            Self::ReadBeforeSince(_) => None,
171            Self::InvalidUppers(_) => None,
172            Self::IngestionInstanceMissing { .. } => None,
173            Self::ExportInstanceMissing { .. } => None,
174            Self::DataflowError(err) => Some(err),
175            Self::InvalidAlter { .. } => None,
176            Self::InvalidUsage(_) => None,
177            Self::ShuttingDown(_) => None,
178            Self::CollectionMetadataAlreadyExists(_) => None,
179            Self::PersistShardAlreadyInUse(_) => None,
180            Self::PersistSchemaEvolveRace { .. } => None,
181            Self::PersistInvalidSchemaEvolve { .. } => None,
182            Self::TxnWalShardAlreadyExists => None,
183            Self::MissingSubsourceReference { .. } => None,
184            Self::RtrTimeout(_) => None,
185            Self::RtrDropFailure(_) => None,
186            Self::Generic(err) => err.source(),
187            Self::ReadOnly => None,
188        }
189    }
190}
191
192impl fmt::Display for StorageError {
193    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
194        f.write_str("storage error: ")?;
195        match self {
196            Self::CollectionIdReused(id) => write!(
197                f,
198                "source identifier was re-created after having been dropped: {id}"
199            ),
200            Self::SinkIdReused(id) => write!(
201                f,
202                "sink identifier was re-created after having been dropped: {id}"
203            ),
204            Self::IdentifierMissing(id) => write!(f, "collection identifier is not present: {id}"),
205            Self::IdentifierInvalid(id) => write!(f, "collection identifier is invalid {id}"),
206            Self::UpdateBeyondUpper(id) => {
207                write!(
208                    f,
209                    "append batch for {id} contained update at or beyond its upper"
210                )
211            }
212            Self::ReadBeforeSince(id) => {
213                write!(f, "read for {id} was at a timestamp before its since")
214            }
215            Self::InvalidUppers(id) => {
216                write!(
217                    f,
218                    "expected upper was different from the actual upper for: {}",
219                    id.iter()
220                        .map(|InvalidUpper { id, current_upper }| {
221                            format!("(id: {}; actual upper: {})", id, current_upper.pretty())
222                        })
223                        .join(", ")
224                )
225            }
226            Self::IngestionInstanceMissing {
227                storage_instance_id,
228                ingestion_id,
229            } => write!(
230                f,
231                "instance {} missing for ingestion {}",
232                storage_instance_id, ingestion_id
233            ),
234            Self::ExportInstanceMissing {
235                storage_instance_id,
236                export_id,
237            } => write!(
238                f,
239                "instance {} missing for export {}",
240                storage_instance_id, export_id
241            ),
242            // N.B. For these errors, the underlying error is reported in `source()`, and it
243            // is the responsibility of the caller to print the chain of errors, when desired.
244            Self::DataflowError(_err) => write!(f, "dataflow failed to process request",),
245            Self::InvalidAlter(err) => std::fmt::Display::fmt(err, f),
246            Self::InvalidUsage(err) => write!(f, "invalid usage: {}", err),
247            Self::ShuttingDown(cmp) => write!(f, "{cmp} is shutting down"),
248            Self::CollectionMetadataAlreadyExists(key) => {
249                write!(f, "storage metadata for '{key}' already exists")
250            }
251            Self::PersistShardAlreadyInUse(shard) => {
252                write!(f, "persist shard already in use: {shard}")
253            }
254            Self::PersistSchemaEvolveRace {
255                global_id,
256                shard_id,
257                ..
258            } => {
259                write!(
260                    f,
261                    "persist raced when trying to evolve the schema of a shard: {global_id}, {shard_id}"
262                )
263            }
264            Self::PersistInvalidSchemaEvolve {
265                global_id,
266                shard_id,
267            } => {
268                write!(
269                    f,
270                    "persist shard evolved in an invalid way: {global_id}, {shard_id}"
271                )
272            }
273            Self::TxnWalShardAlreadyExists => {
274                write!(f, "txn WAL already exists")
275            }
276            Self::MissingSubsourceReference {
277                ingestion_id,
278                reference,
279            } => write!(
280                f,
281                "ingestion {ingestion_id} unexpectedly missing reference to {}",
282                reference
283            ),
284            Self::RtrTimeout(_) => {
285                write!(
286                    f,
287                    "timed out before ingesting the source's visible frontier when real-time-recency query issued"
288                )
289            }
290            Self::RtrDropFailure(_) => write!(
291                f,
292                "real-time source dropped before ingesting the upstream system's visible frontier"
293            ),
294            Self::Generic(err) => std::fmt::Display::fmt(err, f),
295            Self::ReadOnly => write!(f, "cannot write in read-only mode"),
296        }
297    }
298}
299
300#[derive(Debug, Clone)]
301pub struct InvalidUpper {
302    pub id: GlobalId,
303    pub current_upper: Antichain<Timestamp>,
304}
305
306#[derive(Debug)]
307pub struct AlterError {
308    pub id: GlobalId,
309}
310
311impl Error for AlterError {}
312
313impl fmt::Display for AlterError {
314    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
315        write!(f, "{} cannot be altered in the requested way", self.id)
316    }
317}
318
319impl From<AlterError> for StorageError {
320    fn from(error: AlterError) -> Self {
321        Self::InvalidAlter(error)
322    }
323}
324
325impl From<DataflowError> for StorageError {
326    fn from(error: DataflowError) -> Self {
327        Self::DataflowError(error)
328    }
329}
330
331impl From<CollectionMissing> for StorageError {
332    fn from(err: CollectionMissing) -> Self {
333        Self::IdentifierMissing(err.0)
334    }
335}
336
337#[derive(Debug)]
338pub struct TxnsCodecRow;
339
340impl TxnsCodecRow {
341    pub fn desc() -> RelationDesc {
342        RelationDesc::builder()
343            .with_column("shard_id", SqlScalarType::String.nullable(false))
344            .with_column("ts", SqlScalarType::UInt64.nullable(false))
345            .with_column("batch", SqlScalarType::Bytes.nullable(true))
346            .finish()
347    }
348}
349
350impl TxnsCodec for TxnsCodecRow {
351    type Key = SourceData;
352    type Val = ();
353
354    fn schemas() -> (
355        <Self::Key as mz_persist_types::Codec>::Schema,
356        <Self::Val as mz_persist_types::Codec>::Schema,
357    ) {
358        (Self::desc(), UnitSchema)
359    }
360
361    fn encode(e: TxnsEntry) -> (Self::Key, Self::Val) {
362        let row = match &e {
363            TxnsEntry::Register(data_id, ts) => Row::pack([
364                Datum::from(data_id.to_string().as_str()),
365                Datum::from(u64::from_le_bytes(*ts)),
366                Datum::Null,
367            ]),
368            TxnsEntry::Append(data_id, ts, batch) => Row::pack([
369                Datum::from(data_id.to_string().as_str()),
370                Datum::from(u64::from_le_bytes(*ts)),
371                Datum::from(batch.as_slice()),
372            ]),
373        };
374        (SourceData(Ok(row)), ())
375    }
376
377    fn decode(row: SourceData, _: ()) -> TxnsEntry {
378        let mut datums = row.0.as_ref().expect("valid entry").iter();
379        let data_id = datums.next().expect("valid entry").unwrap_str();
380        let data_id = data_id.parse::<ShardId>().expect("valid entry");
381        let ts = datums.next().expect("valid entry");
382        let ts = u64::to_le_bytes(ts.unwrap_uint64());
383        let batch = datums.next().expect("valid entry");
384        assert_none!(datums.next());
385        if batch.is_null() {
386            TxnsEntry::Register(data_id, ts)
387        } else {
388            TxnsEntry::Append(data_id, ts, batch.unwrap_bytes().to_vec())
389        }
390    }
391
392    fn should_fetch_part(data_id: &ShardId, stats: &PartStats) -> Option<bool> {
393        let stats = stats
394            .key
395            .col("ok")?
396            .try_as_optional_struct()
397            .map_err(|err| error!("unexpected stats type for col 'ok': {}", err))
398            .ok()?;
399        let stats = stats
400            .some
401            .col("shard_id")?
402            .try_as_string()
403            .map_err(|err| error!("unexpected stats type for col 'shard_id': {}", err))
404            .ok()?;
405        let data_id_str = data_id.to_string();
406        Some(stats.lower <= data_id_str && stats.upper >= data_id_str)
407    }
408}