mz_storage_types/
controller.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::error::Error;
11use std::fmt::{self, Debug, Display};
12
13use itertools::Itertools;
14use mz_ore::assert_none;
15use mz_persist_types::codec_impls::UnitSchema;
16use mz_persist_types::schema::SchemaId;
17use mz_persist_types::stats::PartStats;
18use mz_persist_types::txn::{TxnsCodec, TxnsEntry};
19use mz_persist_types::{PersistLocation, ShardId};
20use mz_repr::{Datum, GlobalId, RelationDesc, Row, SqlScalarType};
21use mz_sql_parser::ast::UnresolvedItemName;
22use mz_timely_util::antichain::AntichainExt;
23use serde::{Deserialize, Serialize};
24use timely::progress::Antichain;
25use tracing::error;
26
27use crate::errors::DataflowError;
28use crate::instances::StorageInstanceId;
29use crate::sources::SourceData;
30
31/// Metadata required by a storage instance to read a storage collection
32#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
33pub struct CollectionMetadata {
34    /// The persist location where the shards are located.
35    pub persist_location: PersistLocation,
36    /// The persist shard containing the contents of this storage collection.
37    pub data_shard: ShardId,
38    /// The `RelationDesc` that describes the contents of the `data_shard`.
39    pub relation_desc: RelationDesc,
40    /// The shard id of the txn-wal shard, if `self.data_shard` is managed
41    /// by the txn-wal system, or None if it's not.
42    pub txns_shard: Option<ShardId>,
43}
44
45impl crate::AlterCompatible for CollectionMetadata {
46    fn alter_compatible(&self, id: GlobalId, other: &Self) -> Result<(), self::AlterError> {
47        if self == other {
48            return Ok(());
49        }
50
51        let CollectionMetadata {
52            // persist locations may change (though not as a result of ALTER);
53            // we allow this because if this changes unexpectedly, we will
54            // notice in other ways.
55            persist_location: _,
56            data_shard,
57            relation_desc,
58            txns_shard,
59        } = self;
60
61        let compatibility_checks = [
62            (data_shard == &other.data_shard, "data_shard"),
63            (relation_desc == &other.relation_desc, "relation_desc"),
64            (txns_shard == &other.txns_shard, "txns_shard"),
65        ];
66
67        for (compatible, field) in compatibility_checks {
68            if !compatible {
69                tracing::warn!(
70                    "CollectionMetadata incompatible at {field}:\nself:\n{:#?}\n\nother\n{:#?}",
71                    self,
72                    other
73                );
74
75                return Err(AlterError { id });
76            }
77        }
78
79        Ok(())
80    }
81}
82
83/// The subset of [`CollectionMetadata`] that must be durable stored.
84#[derive(Clone, Debug, PartialEq, PartialOrd, Ord, Eq, Serialize, Deserialize)]
85pub struct DurableCollectionMetadata {
86    pub data_shard: ShardId,
87}
88
89#[derive(Debug)]
90pub enum StorageError<T> {
91    /// The source identifier was re-created after having been dropped,
92    /// or installed with a different description.
93    CollectionIdReused(GlobalId),
94    /// The sink identifier was re-created after having been dropped, or
95    /// installed with a different description.
96    SinkIdReused(GlobalId),
97    /// The source identifier is not present.
98    IdentifierMissing(GlobalId),
99    /// The provided identifier was invalid, maybe missing, wrong type, not registered, etc.
100    IdentifierInvalid(GlobalId),
101    /// The update contained in the appended batch was at a timestamp equal or beyond the batch's upper
102    UpdateBeyondUpper(GlobalId),
103    /// The read was at a timestamp before the collection's since
104    ReadBeforeSince(GlobalId),
105    /// The expected upper of one or more appends was different from the actual upper of the collection
106    InvalidUppers(Vec<InvalidUpper<T>>),
107    /// The (client for) the requested cluster instance is missing.
108    IngestionInstanceMissing {
109        storage_instance_id: StorageInstanceId,
110        ingestion_id: GlobalId,
111    },
112    /// The (client for) the requested cluster instance is missing.
113    ExportInstanceMissing {
114        storage_instance_id: StorageInstanceId,
115        export_id: GlobalId,
116    },
117    /// Dataflow was not able to process a request
118    DataflowError(DataflowError),
119    /// Response to an invalid/unsupported `ALTER..` command.
120    InvalidAlter(AlterError),
121    /// The controller API was used in some invalid way. This usually indicates
122    /// a bug.
123    InvalidUsage(String),
124    /// The specified resource was exhausted, and is not currently accepting more requests.
125    ResourceExhausted(&'static str),
126    /// The specified component is shutting down.
127    ShuttingDown(&'static str),
128    /// Collection metadata already exists for ID.
129    CollectionMetadataAlreadyExists(GlobalId),
130    /// Some other collection is already writing to this persist shard.
131    PersistShardAlreadyInUse(ShardId),
132    /// Raced with some other process while trying to evolve the schema of a Persist shard.
133    PersistSchemaEvolveRace {
134        global_id: GlobalId,
135        shard_id: ShardId,
136        schema_id: SchemaId,
137        relation_desc: RelationDesc,
138    },
139    /// We tried to evolve the schema of a Persist shard in an invalid way.
140    PersistInvalidSchemaEvolve {
141        global_id: GlobalId,
142        shard_id: ShardId,
143    },
144    /// Txn WAL shard already exists.
145    TxnWalShardAlreadyExists,
146    /// The item that a subsource refers to is unexpectedly missing from the
147    /// source.
148    MissingSubsourceReference {
149        ingestion_id: GlobalId,
150        reference: UnresolvedItemName,
151    },
152    /// We failed to determine the real-time-recency timestamp.
153    RtrTimeout(GlobalId),
154    /// The collection was dropped before we could ingest its external frontier.
155    RtrDropFailure(GlobalId),
156    /// A generic error that happens during operations of the storage controller.
157    // TODO(aljoscha): Get rid of this!
158    Generic(anyhow::Error),
159    /// We are in read-only mode and were asked to do a something that requires
160    /// writing.
161    ReadOnly,
162}
163
164impl<T: Debug + Display + 'static> Error for StorageError<T> {
165    fn source(&self) -> Option<&(dyn Error + 'static)> {
166        match self {
167            Self::CollectionIdReused(_) => None,
168            Self::SinkIdReused(_) => None,
169            Self::IdentifierMissing(_) => None,
170            Self::IdentifierInvalid(_) => None,
171            Self::UpdateBeyondUpper(_) => None,
172            Self::ReadBeforeSince(_) => None,
173            Self::InvalidUppers(_) => None,
174            Self::IngestionInstanceMissing { .. } => None,
175            Self::ExportInstanceMissing { .. } => None,
176            Self::DataflowError(err) => Some(err),
177            Self::InvalidAlter { .. } => None,
178            Self::InvalidUsage(_) => None,
179            Self::ResourceExhausted(_) => None,
180            Self::ShuttingDown(_) => None,
181            Self::CollectionMetadataAlreadyExists(_) => None,
182            Self::PersistShardAlreadyInUse(_) => None,
183            Self::PersistSchemaEvolveRace { .. } => None,
184            Self::PersistInvalidSchemaEvolve { .. } => None,
185            Self::TxnWalShardAlreadyExists => None,
186            Self::MissingSubsourceReference { .. } => None,
187            Self::RtrTimeout(_) => None,
188            Self::RtrDropFailure(_) => None,
189            Self::Generic(err) => err.source(),
190            Self::ReadOnly => None,
191        }
192    }
193}
194
195impl<T: fmt::Display + 'static> fmt::Display for StorageError<T> {
196    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
197        f.write_str("storage error: ")?;
198        match self {
199            Self::CollectionIdReused(id) => write!(
200                f,
201                "source identifier was re-created after having been dropped: {id}"
202            ),
203            Self::SinkIdReused(id) => write!(
204                f,
205                "sink identifier was re-created after having been dropped: {id}"
206            ),
207            Self::IdentifierMissing(id) => write!(f, "collection identifier is not present: {id}"),
208            Self::IdentifierInvalid(id) => write!(f, "collection identifier is invalid {id}"),
209            Self::UpdateBeyondUpper(id) => {
210                write!(
211                    f,
212                    "append batch for {id} contained update at or beyond its upper"
213                )
214            }
215            Self::ReadBeforeSince(id) => {
216                write!(f, "read for {id} was at a timestamp before its since")
217            }
218            Self::InvalidUppers(id) => {
219                write!(
220                    f,
221                    "expected upper was different from the actual upper for: {}",
222                    id.iter()
223                        .map(|InvalidUpper { id, current_upper }| {
224                            format!("(id: {}; actual upper: {})", id, current_upper.pretty())
225                        })
226                        .join(", ")
227                )
228            }
229            Self::IngestionInstanceMissing {
230                storage_instance_id,
231                ingestion_id,
232            } => write!(
233                f,
234                "instance {} missing for ingestion {}",
235                storage_instance_id, ingestion_id
236            ),
237            Self::ExportInstanceMissing {
238                storage_instance_id,
239                export_id,
240            } => write!(
241                f,
242                "instance {} missing for export {}",
243                storage_instance_id, export_id
244            ),
245            // N.B. For these errors, the underlying error is reported in `source()`, and it
246            // is the responsibility of the caller to print the chain of errors, when desired.
247            Self::DataflowError(_err) => write!(f, "dataflow failed to process request",),
248            Self::InvalidAlter(err) => std::fmt::Display::fmt(err, f),
249            Self::InvalidUsage(err) => write!(f, "invalid usage: {}", err),
250            Self::ResourceExhausted(rsc) => write!(f, "{rsc} is exhausted"),
251            Self::ShuttingDown(cmp) => write!(f, "{cmp} is shutting down"),
252            Self::CollectionMetadataAlreadyExists(key) => {
253                write!(f, "storage metadata for '{key}' already exists")
254            }
255            Self::PersistShardAlreadyInUse(shard) => {
256                write!(f, "persist shard already in use: {shard}")
257            }
258            Self::PersistSchemaEvolveRace {
259                global_id,
260                shard_id,
261                ..
262            } => {
263                write!(
264                    f,
265                    "persist raced when trying to evolve the schema of a shard: {global_id}, {shard_id}"
266                )
267            }
268            Self::PersistInvalidSchemaEvolve {
269                global_id,
270                shard_id,
271            } => {
272                write!(
273                    f,
274                    "persist shard evolved in an invalid way: {global_id}, {shard_id}"
275                )
276            }
277            Self::TxnWalShardAlreadyExists => {
278                write!(f, "txn WAL already exists")
279            }
280            Self::MissingSubsourceReference {
281                ingestion_id,
282                reference,
283            } => write!(
284                f,
285                "ingestion {ingestion_id} unexpectedly missing reference to {}",
286                reference
287            ),
288            Self::RtrTimeout(_) => {
289                write!(
290                    f,
291                    "timed out before ingesting the source's visible frontier when real-time-recency query issued"
292                )
293            }
294            Self::RtrDropFailure(_) => write!(
295                f,
296                "real-time source dropped before ingesting the upstream system's visible frontier"
297            ),
298            Self::Generic(err) => std::fmt::Display::fmt(err, f),
299            Self::ReadOnly => write!(f, "cannot write in read-only mode"),
300        }
301    }
302}
303
304#[derive(Debug, Clone)]
305pub struct InvalidUpper<T> {
306    pub id: GlobalId,
307    pub current_upper: Antichain<T>,
308}
309
310#[derive(Debug)]
311pub struct AlterError {
312    pub id: GlobalId,
313}
314
315impl Error for AlterError {}
316
317impl fmt::Display for AlterError {
318    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
319        write!(f, "{} cannot be altered in the requested way", self.id)
320    }
321}
322
323impl<T> From<AlterError> for StorageError<T> {
324    fn from(error: AlterError) -> Self {
325        Self::InvalidAlter(error)
326    }
327}
328
329impl<T> From<DataflowError> for StorageError<T> {
330    fn from(error: DataflowError) -> Self {
331        Self::DataflowError(error)
332    }
333}
334
335#[derive(Debug)]
336pub struct TxnsCodecRow;
337
338impl TxnsCodecRow {
339    pub fn desc() -> RelationDesc {
340        RelationDesc::builder()
341            .with_column("shard_id", SqlScalarType::String.nullable(false))
342            .with_column("ts", SqlScalarType::UInt64.nullable(false))
343            .with_column("batch", SqlScalarType::Bytes.nullable(true))
344            .finish()
345    }
346}
347
348impl TxnsCodec for TxnsCodecRow {
349    type Key = SourceData;
350    type Val = ();
351
352    fn schemas() -> (
353        <Self::Key as mz_persist_types::Codec>::Schema,
354        <Self::Val as mz_persist_types::Codec>::Schema,
355    ) {
356        (Self::desc(), UnitSchema)
357    }
358
359    fn encode(e: TxnsEntry) -> (Self::Key, Self::Val) {
360        let row = match &e {
361            TxnsEntry::Register(data_id, ts) => Row::pack([
362                Datum::from(data_id.to_string().as_str()),
363                Datum::from(u64::from_le_bytes(*ts)),
364                Datum::Null,
365            ]),
366            TxnsEntry::Append(data_id, ts, batch) => Row::pack([
367                Datum::from(data_id.to_string().as_str()),
368                Datum::from(u64::from_le_bytes(*ts)),
369                Datum::from(batch.as_slice()),
370            ]),
371        };
372        (SourceData(Ok(row)), ())
373    }
374
375    fn decode(row: SourceData, _: ()) -> TxnsEntry {
376        let mut datums = row.0.as_ref().expect("valid entry").iter();
377        let data_id = datums.next().expect("valid entry").unwrap_str();
378        let data_id = data_id.parse::<ShardId>().expect("valid entry");
379        let ts = datums.next().expect("valid entry");
380        let ts = u64::to_le_bytes(ts.unwrap_uint64());
381        let batch = datums.next().expect("valid entry");
382        assert_none!(datums.next());
383        if batch.is_null() {
384            TxnsEntry::Register(data_id, ts)
385        } else {
386            TxnsEntry::Append(data_id, ts, batch.unwrap_bytes().to_vec())
387        }
388    }
389
390    fn should_fetch_part(data_id: &ShardId, stats: &PartStats) -> Option<bool> {
391        let stats = stats
392            .key
393            .col("key")?
394            .try_as_optional_struct()
395            .map_err(|err| error!("unexpected stats type for col 'key': {}", err))
396            .ok()?;
397        let stats = stats
398            .some
399            .col("shard_id")?
400            .try_as_string()
401            .map_err(|err| error!("unexpected stats type for col 'shard_id': {}", err))
402            .ok()?;
403        let data_id_str = data_id.to_string();
404        Some(stats.lower <= data_id_str && stats.upper >= data_id_str)
405    }
406}