mz_storage_types/
controller.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::error::Error;
11use std::fmt::{self, Debug, Display};
12use std::str::FromStr;
13
14use itertools::Itertools;
15use mz_ore::assert_none;
16use mz_ore::url::SensitiveUrl;
17use mz_persist_types::codec_impls::UnitSchema;
18use mz_persist_types::schema::SchemaId;
19use mz_persist_types::stats::PartStats;
20use mz_persist_types::txn::{TxnsCodec, TxnsEntry};
21use mz_persist_types::{PersistLocation, ShardId};
22use mz_proto::{IntoRustIfSome, RustType, TryFromProtoError};
23use mz_repr::{Datum, GlobalId, RelationDesc, Row, ScalarType};
24use mz_sql_parser::ast::UnresolvedItemName;
25use mz_timely_util::antichain::AntichainExt;
26use proptest_derive::Arbitrary;
27use serde::{Deserialize, Serialize};
28use timely::progress::Antichain;
29use tracing::error;
30
31use crate::errors::DataflowError;
32use crate::instances::StorageInstanceId;
33use crate::sources::SourceData;
34
35include!(concat!(env!("OUT_DIR"), "/mz_storage_types.controller.rs"));
36
37/// Metadata required by a storage instance to read a storage collection
38#[derive(Arbitrary, Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
39pub struct CollectionMetadata {
40    /// The persist location where the shards are located.
41    pub persist_location: PersistLocation,
42    /// The persist shard id of the remap collection used to reclock this collection.
43    pub remap_shard: Option<ShardId>,
44    /// The persist shard containing the contents of this storage collection.
45    pub data_shard: ShardId,
46    /// The `RelationDesc` that describes the contents of the `data_shard`.
47    pub relation_desc: RelationDesc,
48    /// The shard id of the txn-wal shard, if `self.data_shard` is managed
49    /// by the txn-wal system, or None if it's not.
50    pub txns_shard: Option<ShardId>,
51}
52
53impl crate::AlterCompatible for CollectionMetadata {
54    fn alter_compatible(&self, id: GlobalId, other: &Self) -> Result<(), self::AlterError> {
55        if self == other {
56            return Ok(());
57        }
58
59        let CollectionMetadata {
60            // persist locations may change (though not as a result of ALTER);
61            // we allow this because if this changes unexpectedly, we will
62            // notice in other ways.
63            persist_location: _,
64            remap_shard,
65            data_shard,
66            relation_desc,
67            txns_shard,
68        } = self;
69
70        let compatibility_checks = [
71            (remap_shard == &other.remap_shard, "remap_shard"),
72            (data_shard == &other.data_shard, "data_shard"),
73            (relation_desc == &other.relation_desc, "relation_desc"),
74            (txns_shard == &other.txns_shard, "txns_shard"),
75        ];
76
77        for (compatible, field) in compatibility_checks {
78            if !compatible {
79                tracing::warn!(
80                    "CollectionMetadata incompatible at {field}:\nself:\n{:#?}\n\nother\n{:#?}",
81                    self,
82                    other
83                );
84
85                return Err(AlterError { id });
86            }
87        }
88
89        Ok(())
90    }
91}
92
93impl RustType<ProtoCollectionMetadata> for CollectionMetadata {
94    fn into_proto(&self) -> ProtoCollectionMetadata {
95        ProtoCollectionMetadata {
96            blob_uri: self.persist_location.blob_uri.to_string_unredacted(),
97            consensus_uri: self.persist_location.consensus_uri.to_string_unredacted(),
98            data_shard: self.data_shard.to_string(),
99            remap_shard: self.remap_shard.map(|s| s.to_string()),
100            relation_desc: Some(self.relation_desc.into_proto()),
101            txns_shard: self.txns_shard.map(|x| x.to_string()),
102        }
103    }
104
105    fn from_proto(value: ProtoCollectionMetadata) -> Result<Self, TryFromProtoError> {
106        Ok(CollectionMetadata {
107            persist_location: PersistLocation {
108                blob_uri: SensitiveUrl::from_str(&value.blob_uri)?,
109                consensus_uri: SensitiveUrl::from_str(&value.consensus_uri)?,
110            },
111            remap_shard: value
112                .remap_shard
113                .map(|s| s.parse().map_err(TryFromProtoError::InvalidShardId))
114                .transpose()?,
115            data_shard: value
116                .data_shard
117                .parse()
118                .map_err(TryFromProtoError::InvalidShardId)?,
119            relation_desc: value
120                .relation_desc
121                .into_rust_if_some("ProtoCollectionMetadata::relation_desc")?,
122            txns_shard: value
123                .txns_shard
124                .map(|s| s.parse().map_err(TryFromProtoError::InvalidShardId))
125                .transpose()?,
126        })
127    }
128}
129
130/// The subset of [`CollectionMetadata`] that must be durable stored.
131#[derive(Arbitrary, Clone, Debug, PartialEq, PartialOrd, Ord, Eq, Serialize, Deserialize)]
132pub struct DurableCollectionMetadata {
133    pub data_shard: ShardId,
134}
135
136impl RustType<ProtoDurableCollectionMetadata> for DurableCollectionMetadata {
137    fn into_proto(&self) -> ProtoDurableCollectionMetadata {
138        ProtoDurableCollectionMetadata {
139            data_shard: self.data_shard.to_string(),
140        }
141    }
142
143    fn from_proto(value: ProtoDurableCollectionMetadata) -> Result<Self, TryFromProtoError> {
144        Ok(DurableCollectionMetadata {
145            data_shard: value
146                .data_shard
147                .parse()
148                .map_err(TryFromProtoError::InvalidShardId)?,
149        })
150    }
151}
152
153#[derive(Debug)]
154pub enum StorageError<T> {
155    /// The source identifier was re-created after having been dropped,
156    /// or installed with a different description.
157    CollectionIdReused(GlobalId),
158    /// The sink identifier was re-created after having been dropped, or
159    /// installed with a different description.
160    SinkIdReused(GlobalId),
161    /// The source identifier is not present.
162    IdentifierMissing(GlobalId),
163    /// The provided identifier was invalid, maybe missing, wrong type, not registered, etc.
164    IdentifierInvalid(GlobalId),
165    /// The update contained in the appended batch was at a timestamp equal or beyond the batch's upper
166    UpdateBeyondUpper(GlobalId),
167    /// The read was at a timestamp before the collection's since
168    ReadBeforeSince(GlobalId),
169    /// The expected upper of one or more appends was different from the actual upper of the collection
170    InvalidUppers(Vec<InvalidUpper<T>>),
171    /// The (client for) the requested cluster instance is missing.
172    IngestionInstanceMissing {
173        storage_instance_id: StorageInstanceId,
174        ingestion_id: GlobalId,
175    },
176    /// The (client for) the requested cluster instance is missing.
177    ExportInstanceMissing {
178        storage_instance_id: StorageInstanceId,
179        export_id: GlobalId,
180    },
181    /// Dataflow was not able to process a request
182    DataflowError(DataflowError),
183    /// Response to an invalid/unsupported `ALTER..` command.
184    InvalidAlter(AlterError),
185    /// The controller API was used in some invalid way. This usually indicates
186    /// a bug.
187    InvalidUsage(String),
188    /// The specified resource was exhausted, and is not currently accepting more requests.
189    ResourceExhausted(&'static str),
190    /// The specified component is shutting down.
191    ShuttingDown(&'static str),
192    /// Collection metadata already exists for ID.
193    CollectionMetadataAlreadyExists(GlobalId),
194    /// Some other collection is already writing to this persist shard.
195    PersistShardAlreadyInUse(ShardId),
196    /// Raced with some other process while trying to evolve the schema of a Persist shard.
197    PersistSchemaEvolveRace {
198        global_id: GlobalId,
199        shard_id: ShardId,
200        schema_id: SchemaId,
201        relation_desc: RelationDesc,
202    },
203    /// We tried to evolve the schema of a Persist shard in an invalid way.
204    PersistInvalidSchemaEvolve {
205        global_id: GlobalId,
206        shard_id: ShardId,
207    },
208    /// Txn WAL shard already exists.
209    TxnWalShardAlreadyExists,
210    /// The item that a subsource refers to is unexpectedly missing from the
211    /// source.
212    MissingSubsourceReference {
213        ingestion_id: GlobalId,
214        reference: UnresolvedItemName,
215    },
216    /// We failed to determine the real-time-recency timestamp.
217    RtrTimeout(GlobalId),
218    /// The collection was dropped before we could ingest its external frontier.
219    RtrDropFailure(GlobalId),
220    /// A generic error that happens during operations of the storage controller.
221    // TODO(aljoscha): Get rid of this!
222    Generic(anyhow::Error),
223    /// We are in read-only mode and were asked to do a something that requires
224    /// writing.
225    ReadOnly,
226}
227
228impl<T: Debug + Display + 'static> Error for StorageError<T> {
229    fn source(&self) -> Option<&(dyn Error + 'static)> {
230        match self {
231            Self::CollectionIdReused(_) => None,
232            Self::SinkIdReused(_) => None,
233            Self::IdentifierMissing(_) => None,
234            Self::IdentifierInvalid(_) => None,
235            Self::UpdateBeyondUpper(_) => None,
236            Self::ReadBeforeSince(_) => None,
237            Self::InvalidUppers(_) => None,
238            Self::IngestionInstanceMissing { .. } => None,
239            Self::ExportInstanceMissing { .. } => None,
240            Self::DataflowError(err) => Some(err),
241            Self::InvalidAlter { .. } => None,
242            Self::InvalidUsage(_) => None,
243            Self::ResourceExhausted(_) => None,
244            Self::ShuttingDown(_) => None,
245            Self::CollectionMetadataAlreadyExists(_) => None,
246            Self::PersistShardAlreadyInUse(_) => None,
247            Self::PersistSchemaEvolveRace { .. } => None,
248            Self::PersistInvalidSchemaEvolve { .. } => None,
249            Self::TxnWalShardAlreadyExists => None,
250            Self::MissingSubsourceReference { .. } => None,
251            Self::RtrTimeout(_) => None,
252            Self::RtrDropFailure(_) => None,
253            Self::Generic(err) => err.source(),
254            Self::ReadOnly => None,
255        }
256    }
257}
258
259impl<T: fmt::Display + 'static> fmt::Display for StorageError<T> {
260    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
261        f.write_str("storage error: ")?;
262        match self {
263            Self::CollectionIdReused(id) => write!(
264                f,
265                "source identifier was re-created after having been dropped: {id}"
266            ),
267            Self::SinkIdReused(id) => write!(
268                f,
269                "sink identifier was re-created after having been dropped: {id}"
270            ),
271            Self::IdentifierMissing(id) => write!(f, "collection identifier is not present: {id}"),
272            Self::IdentifierInvalid(id) => write!(f, "collection identifier is invalid {id}"),
273            Self::UpdateBeyondUpper(id) => {
274                write!(
275                    f,
276                    "append batch for {id} contained update at or beyond its upper"
277                )
278            }
279            Self::ReadBeforeSince(id) => {
280                write!(f, "read for {id} was at a timestamp before its since")
281            }
282            Self::InvalidUppers(id) => {
283                write!(
284                    f,
285                    "expected upper was different from the actual upper for: {}",
286                    id.iter()
287                        .map(|InvalidUpper { id, current_upper }| {
288                            format!("(id: {}; actual upper: {})", id, current_upper.pretty())
289                        })
290                        .join(", ")
291                )
292            }
293            Self::IngestionInstanceMissing {
294                storage_instance_id,
295                ingestion_id,
296            } => write!(
297                f,
298                "instance {} missing for ingestion {}",
299                storage_instance_id, ingestion_id
300            ),
301            Self::ExportInstanceMissing {
302                storage_instance_id,
303                export_id,
304            } => write!(
305                f,
306                "instance {} missing for export {}",
307                storage_instance_id, export_id
308            ),
309            // N.B. For these errors, the underlying error is reported in `source()`, and it
310            // is the responsibility of the caller to print the chain of errors, when desired.
311            Self::DataflowError(_err) => write!(f, "dataflow failed to process request",),
312            Self::InvalidAlter(err) => std::fmt::Display::fmt(err, f),
313            Self::InvalidUsage(err) => write!(f, "invalid usage: {}", err),
314            Self::ResourceExhausted(rsc) => write!(f, "{rsc} is exhausted"),
315            Self::ShuttingDown(cmp) => write!(f, "{cmp} is shutting down"),
316            Self::CollectionMetadataAlreadyExists(key) => {
317                write!(f, "storage metadata for '{key}' already exists")
318            }
319            Self::PersistShardAlreadyInUse(shard) => {
320                write!(f, "persist shard already in use: {shard}")
321            }
322            Self::PersistSchemaEvolveRace {
323                global_id,
324                shard_id,
325                ..
326            } => {
327                write!(
328                    f,
329                    "persist raced when trying to evolve the schema of a shard: {global_id}, {shard_id}"
330                )
331            }
332            Self::PersistInvalidSchemaEvolve {
333                global_id,
334                shard_id,
335            } => {
336                write!(
337                    f,
338                    "persist shard evolved in an invalid way: {global_id}, {shard_id}"
339                )
340            }
341            Self::TxnWalShardAlreadyExists => {
342                write!(f, "txn WAL already exists")
343            }
344            Self::MissingSubsourceReference {
345                ingestion_id,
346                reference,
347            } => write!(
348                f,
349                "ingestion {ingestion_id} unexpectedly missing reference to {}",
350                reference
351            ),
352            Self::RtrTimeout(_) => {
353                write!(
354                    f,
355                    "timed out before ingesting the source's visible frontier when real-time-recency query issued"
356                )
357            }
358            Self::RtrDropFailure(_) => write!(
359                f,
360                "real-time source dropped before ingesting the upstream system's visible frontier"
361            ),
362            Self::Generic(err) => std::fmt::Display::fmt(err, f),
363            Self::ReadOnly => write!(f, "cannot write in read-only mode"),
364        }
365    }
366}
367
368#[derive(Debug, Clone)]
369pub struct InvalidUpper<T> {
370    pub id: GlobalId,
371    pub current_upper: Antichain<T>,
372}
373
374#[derive(Debug)]
375pub struct AlterError {
376    pub id: GlobalId,
377}
378
379impl Error for AlterError {}
380
381impl fmt::Display for AlterError {
382    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
383        write!(f, "{} cannot be altered in the requested way", self.id)
384    }
385}
386
387impl<T> From<AlterError> for StorageError<T> {
388    fn from(error: AlterError) -> Self {
389        Self::InvalidAlter(error)
390    }
391}
392
393impl<T> From<DataflowError> for StorageError<T> {
394    fn from(error: DataflowError) -> Self {
395        Self::DataflowError(error)
396    }
397}
398
399#[derive(Debug)]
400pub struct TxnsCodecRow;
401
402impl TxnsCodecRow {
403    pub fn desc() -> RelationDesc {
404        RelationDesc::builder()
405            .with_column("shard_id", ScalarType::String.nullable(false))
406            .with_column("ts", ScalarType::UInt64.nullable(false))
407            .with_column("batch", ScalarType::Bytes.nullable(true))
408            .finish()
409    }
410}
411
412impl TxnsCodec for TxnsCodecRow {
413    type Key = SourceData;
414    type Val = ();
415
416    fn schemas() -> (
417        <Self::Key as mz_persist_types::Codec>::Schema,
418        <Self::Val as mz_persist_types::Codec>::Schema,
419    ) {
420        (Self::desc(), UnitSchema)
421    }
422
423    fn encode(e: TxnsEntry) -> (Self::Key, Self::Val) {
424        let row = match &e {
425            TxnsEntry::Register(data_id, ts) => Row::pack([
426                Datum::from(data_id.to_string().as_str()),
427                Datum::from(u64::from_le_bytes(*ts)),
428                Datum::Null,
429            ]),
430            TxnsEntry::Append(data_id, ts, batch) => Row::pack([
431                Datum::from(data_id.to_string().as_str()),
432                Datum::from(u64::from_le_bytes(*ts)),
433                Datum::from(batch.as_slice()),
434            ]),
435        };
436        (SourceData(Ok(row)), ())
437    }
438
439    fn decode(row: SourceData, _: ()) -> TxnsEntry {
440        let mut datums = row.0.as_ref().expect("valid entry").iter();
441        let data_id = datums.next().expect("valid entry").unwrap_str();
442        let data_id = data_id.parse::<ShardId>().expect("valid entry");
443        let ts = datums.next().expect("valid entry");
444        let ts = u64::to_le_bytes(ts.unwrap_uint64());
445        let batch = datums.next().expect("valid entry");
446        assert_none!(datums.next());
447        if batch.is_null() {
448            TxnsEntry::Register(data_id, ts)
449        } else {
450            TxnsEntry::Append(data_id, ts, batch.unwrap_bytes().to_vec())
451        }
452    }
453
454    fn should_fetch_part(data_id: &ShardId, stats: &PartStats) -> Option<bool> {
455        let stats = stats
456            .key
457            .col("key")?
458            .try_as_optional_struct()
459            .map_err(|err| error!("unexpected stats type for col 'key': {}", err))
460            .ok()?;
461        let stats = stats
462            .some
463            .col("shard_id")?
464            .try_as_string()
465            .map_err(|err| error!("unexpected stats type for col 'shard_id': {}", err))
466            .ok()?;
467        let data_id_str = data_id.to_string();
468        Some(stats.lower <= data_id_str && stats.upper >= data_id_str)
469    }
470}