1use std::error::Error;
11use std::fmt::{self, Debug, Display};
12use std::str::FromStr;
13
14use itertools::Itertools;
15use mz_ore::assert_none;
16use mz_ore::url::SensitiveUrl;
17use mz_persist_types::codec_impls::UnitSchema;
18use mz_persist_types::schema::SchemaId;
19use mz_persist_types::stats::PartStats;
20use mz_persist_types::txn::{TxnsCodec, TxnsEntry};
21use mz_persist_types::{PersistLocation, ShardId};
22use mz_proto::{IntoRustIfSome, RustType, TryFromProtoError};
23use mz_repr::{Datum, GlobalId, RelationDesc, Row, ScalarType};
24use mz_sql_parser::ast::UnresolvedItemName;
25use mz_timely_util::antichain::AntichainExt;
26use proptest_derive::Arbitrary;
27use serde::{Deserialize, Serialize};
28use timely::progress::Antichain;
29use tracing::error;
30
31use crate::errors::DataflowError;
32use crate::instances::StorageInstanceId;
33use crate::sources::SourceData;
34
35include!(concat!(env!("OUT_DIR"), "/mz_storage_types.controller.rs"));
36
37#[derive(Arbitrary, Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
39pub struct CollectionMetadata {
40 pub persist_location: PersistLocation,
42 pub remap_shard: Option<ShardId>,
44 pub data_shard: ShardId,
46 pub relation_desc: RelationDesc,
48 pub txns_shard: Option<ShardId>,
51}
52
53impl crate::AlterCompatible for CollectionMetadata {
54 fn alter_compatible(&self, id: GlobalId, other: &Self) -> Result<(), self::AlterError> {
55 if self == other {
56 return Ok(());
57 }
58
59 let CollectionMetadata {
60 persist_location: _,
64 remap_shard,
65 data_shard,
66 relation_desc,
67 txns_shard,
68 } = self;
69
70 let compatibility_checks = [
71 (remap_shard == &other.remap_shard, "remap_shard"),
72 (data_shard == &other.data_shard, "data_shard"),
73 (relation_desc == &other.relation_desc, "relation_desc"),
74 (txns_shard == &other.txns_shard, "txns_shard"),
75 ];
76
77 for (compatible, field) in compatibility_checks {
78 if !compatible {
79 tracing::warn!(
80 "CollectionMetadata incompatible at {field}:\nself:\n{:#?}\n\nother\n{:#?}",
81 self,
82 other
83 );
84
85 return Err(AlterError { id });
86 }
87 }
88
89 Ok(())
90 }
91}
92
93impl RustType<ProtoCollectionMetadata> for CollectionMetadata {
94 fn into_proto(&self) -> ProtoCollectionMetadata {
95 ProtoCollectionMetadata {
96 blob_uri: self.persist_location.blob_uri.to_string_unredacted(),
97 consensus_uri: self.persist_location.consensus_uri.to_string_unredacted(),
98 data_shard: self.data_shard.to_string(),
99 remap_shard: self.remap_shard.map(|s| s.to_string()),
100 relation_desc: Some(self.relation_desc.into_proto()),
101 txns_shard: self.txns_shard.map(|x| x.to_string()),
102 }
103 }
104
105 fn from_proto(value: ProtoCollectionMetadata) -> Result<Self, TryFromProtoError> {
106 Ok(CollectionMetadata {
107 persist_location: PersistLocation {
108 blob_uri: SensitiveUrl::from_str(&value.blob_uri)?,
109 consensus_uri: SensitiveUrl::from_str(&value.consensus_uri)?,
110 },
111 remap_shard: value
112 .remap_shard
113 .map(|s| s.parse().map_err(TryFromProtoError::InvalidShardId))
114 .transpose()?,
115 data_shard: value
116 .data_shard
117 .parse()
118 .map_err(TryFromProtoError::InvalidShardId)?,
119 relation_desc: value
120 .relation_desc
121 .into_rust_if_some("ProtoCollectionMetadata::relation_desc")?,
122 txns_shard: value
123 .txns_shard
124 .map(|s| s.parse().map_err(TryFromProtoError::InvalidShardId))
125 .transpose()?,
126 })
127 }
128}
129
130#[derive(Arbitrary, Clone, Debug, PartialEq, PartialOrd, Ord, Eq, Serialize, Deserialize)]
132pub struct DurableCollectionMetadata {
133 pub data_shard: ShardId,
134}
135
136impl RustType<ProtoDurableCollectionMetadata> for DurableCollectionMetadata {
137 fn into_proto(&self) -> ProtoDurableCollectionMetadata {
138 ProtoDurableCollectionMetadata {
139 data_shard: self.data_shard.to_string(),
140 }
141 }
142
143 fn from_proto(value: ProtoDurableCollectionMetadata) -> Result<Self, TryFromProtoError> {
144 Ok(DurableCollectionMetadata {
145 data_shard: value
146 .data_shard
147 .parse()
148 .map_err(TryFromProtoError::InvalidShardId)?,
149 })
150 }
151}
152
153#[derive(Debug)]
154pub enum StorageError<T> {
155 CollectionIdReused(GlobalId),
158 SinkIdReused(GlobalId),
161 IdentifierMissing(GlobalId),
163 IdentifierInvalid(GlobalId),
165 UpdateBeyondUpper(GlobalId),
167 ReadBeforeSince(GlobalId),
169 InvalidUppers(Vec<InvalidUpper<T>>),
171 IngestionInstanceMissing {
173 storage_instance_id: StorageInstanceId,
174 ingestion_id: GlobalId,
175 },
176 ExportInstanceMissing {
178 storage_instance_id: StorageInstanceId,
179 export_id: GlobalId,
180 },
181 DataflowError(DataflowError),
183 InvalidAlter(AlterError),
185 InvalidUsage(String),
188 ResourceExhausted(&'static str),
190 ShuttingDown(&'static str),
192 CollectionMetadataAlreadyExists(GlobalId),
194 PersistShardAlreadyInUse(ShardId),
196 PersistSchemaEvolveRace {
198 global_id: GlobalId,
199 shard_id: ShardId,
200 schema_id: SchemaId,
201 relation_desc: RelationDesc,
202 },
203 PersistInvalidSchemaEvolve {
205 global_id: GlobalId,
206 shard_id: ShardId,
207 },
208 TxnWalShardAlreadyExists,
210 MissingSubsourceReference {
213 ingestion_id: GlobalId,
214 reference: UnresolvedItemName,
215 },
216 RtrTimeout(GlobalId),
218 RtrDropFailure(GlobalId),
220 Generic(anyhow::Error),
223 ReadOnly,
226}
227
228impl<T: Debug + Display + 'static> Error for StorageError<T> {
229 fn source(&self) -> Option<&(dyn Error + 'static)> {
230 match self {
231 Self::CollectionIdReused(_) => None,
232 Self::SinkIdReused(_) => None,
233 Self::IdentifierMissing(_) => None,
234 Self::IdentifierInvalid(_) => None,
235 Self::UpdateBeyondUpper(_) => None,
236 Self::ReadBeforeSince(_) => None,
237 Self::InvalidUppers(_) => None,
238 Self::IngestionInstanceMissing { .. } => None,
239 Self::ExportInstanceMissing { .. } => None,
240 Self::DataflowError(err) => Some(err),
241 Self::InvalidAlter { .. } => None,
242 Self::InvalidUsage(_) => None,
243 Self::ResourceExhausted(_) => None,
244 Self::ShuttingDown(_) => None,
245 Self::CollectionMetadataAlreadyExists(_) => None,
246 Self::PersistShardAlreadyInUse(_) => None,
247 Self::PersistSchemaEvolveRace { .. } => None,
248 Self::PersistInvalidSchemaEvolve { .. } => None,
249 Self::TxnWalShardAlreadyExists => None,
250 Self::MissingSubsourceReference { .. } => None,
251 Self::RtrTimeout(_) => None,
252 Self::RtrDropFailure(_) => None,
253 Self::Generic(err) => err.source(),
254 Self::ReadOnly => None,
255 }
256 }
257}
258
259impl<T: fmt::Display + 'static> fmt::Display for StorageError<T> {
260 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
261 f.write_str("storage error: ")?;
262 match self {
263 Self::CollectionIdReused(id) => write!(
264 f,
265 "source identifier was re-created after having been dropped: {id}"
266 ),
267 Self::SinkIdReused(id) => write!(
268 f,
269 "sink identifier was re-created after having been dropped: {id}"
270 ),
271 Self::IdentifierMissing(id) => write!(f, "collection identifier is not present: {id}"),
272 Self::IdentifierInvalid(id) => write!(f, "collection identifier is invalid {id}"),
273 Self::UpdateBeyondUpper(id) => {
274 write!(
275 f,
276 "append batch for {id} contained update at or beyond its upper"
277 )
278 }
279 Self::ReadBeforeSince(id) => {
280 write!(f, "read for {id} was at a timestamp before its since")
281 }
282 Self::InvalidUppers(id) => {
283 write!(
284 f,
285 "expected upper was different from the actual upper for: {}",
286 id.iter()
287 .map(|InvalidUpper { id, current_upper }| {
288 format!("(id: {}; actual upper: {})", id, current_upper.pretty())
289 })
290 .join(", ")
291 )
292 }
293 Self::IngestionInstanceMissing {
294 storage_instance_id,
295 ingestion_id,
296 } => write!(
297 f,
298 "instance {} missing for ingestion {}",
299 storage_instance_id, ingestion_id
300 ),
301 Self::ExportInstanceMissing {
302 storage_instance_id,
303 export_id,
304 } => write!(
305 f,
306 "instance {} missing for export {}",
307 storage_instance_id, export_id
308 ),
309 Self::DataflowError(_err) => write!(f, "dataflow failed to process request",),
312 Self::InvalidAlter(err) => std::fmt::Display::fmt(err, f),
313 Self::InvalidUsage(err) => write!(f, "invalid usage: {}", err),
314 Self::ResourceExhausted(rsc) => write!(f, "{rsc} is exhausted"),
315 Self::ShuttingDown(cmp) => write!(f, "{cmp} is shutting down"),
316 Self::CollectionMetadataAlreadyExists(key) => {
317 write!(f, "storage metadata for '{key}' already exists")
318 }
319 Self::PersistShardAlreadyInUse(shard) => {
320 write!(f, "persist shard already in use: {shard}")
321 }
322 Self::PersistSchemaEvolveRace {
323 global_id,
324 shard_id,
325 ..
326 } => {
327 write!(
328 f,
329 "persist raced when trying to evolve the schema of a shard: {global_id}, {shard_id}"
330 )
331 }
332 Self::PersistInvalidSchemaEvolve {
333 global_id,
334 shard_id,
335 } => {
336 write!(
337 f,
338 "persist shard evolved in an invalid way: {global_id}, {shard_id}"
339 )
340 }
341 Self::TxnWalShardAlreadyExists => {
342 write!(f, "txn WAL already exists")
343 }
344 Self::MissingSubsourceReference {
345 ingestion_id,
346 reference,
347 } => write!(
348 f,
349 "ingestion {ingestion_id} unexpectedly missing reference to {}",
350 reference
351 ),
352 Self::RtrTimeout(_) => {
353 write!(
354 f,
355 "timed out before ingesting the source's visible frontier when real-time-recency query issued"
356 )
357 }
358 Self::RtrDropFailure(_) => write!(
359 f,
360 "real-time source dropped before ingesting the upstream system's visible frontier"
361 ),
362 Self::Generic(err) => std::fmt::Display::fmt(err, f),
363 Self::ReadOnly => write!(f, "cannot write in read-only mode"),
364 }
365 }
366}
367
368#[derive(Debug, Clone)]
369pub struct InvalidUpper<T> {
370 pub id: GlobalId,
371 pub current_upper: Antichain<T>,
372}
373
374#[derive(Debug)]
375pub struct AlterError {
376 pub id: GlobalId,
377}
378
379impl Error for AlterError {}
380
381impl fmt::Display for AlterError {
382 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
383 write!(f, "{} cannot be altered in the requested way", self.id)
384 }
385}
386
387impl<T> From<AlterError> for StorageError<T> {
388 fn from(error: AlterError) -> Self {
389 Self::InvalidAlter(error)
390 }
391}
392
393impl<T> From<DataflowError> for StorageError<T> {
394 fn from(error: DataflowError) -> Self {
395 Self::DataflowError(error)
396 }
397}
398
399#[derive(Debug)]
400pub struct TxnsCodecRow;
401
402impl TxnsCodecRow {
403 pub fn desc() -> RelationDesc {
404 RelationDesc::builder()
405 .with_column("shard_id", ScalarType::String.nullable(false))
406 .with_column("ts", ScalarType::UInt64.nullable(false))
407 .with_column("batch", ScalarType::Bytes.nullable(true))
408 .finish()
409 }
410}
411
412impl TxnsCodec for TxnsCodecRow {
413 type Key = SourceData;
414 type Val = ();
415
416 fn schemas() -> (
417 <Self::Key as mz_persist_types::Codec>::Schema,
418 <Self::Val as mz_persist_types::Codec>::Schema,
419 ) {
420 (Self::desc(), UnitSchema)
421 }
422
423 fn encode(e: TxnsEntry) -> (Self::Key, Self::Val) {
424 let row = match &e {
425 TxnsEntry::Register(data_id, ts) => Row::pack([
426 Datum::from(data_id.to_string().as_str()),
427 Datum::from(u64::from_le_bytes(*ts)),
428 Datum::Null,
429 ]),
430 TxnsEntry::Append(data_id, ts, batch) => Row::pack([
431 Datum::from(data_id.to_string().as_str()),
432 Datum::from(u64::from_le_bytes(*ts)),
433 Datum::from(batch.as_slice()),
434 ]),
435 };
436 (SourceData(Ok(row)), ())
437 }
438
439 fn decode(row: SourceData, _: ()) -> TxnsEntry {
440 let mut datums = row.0.as_ref().expect("valid entry").iter();
441 let data_id = datums.next().expect("valid entry").unwrap_str();
442 let data_id = data_id.parse::<ShardId>().expect("valid entry");
443 let ts = datums.next().expect("valid entry");
444 let ts = u64::to_le_bytes(ts.unwrap_uint64());
445 let batch = datums.next().expect("valid entry");
446 assert_none!(datums.next());
447 if batch.is_null() {
448 TxnsEntry::Register(data_id, ts)
449 } else {
450 TxnsEntry::Append(data_id, ts, batch.unwrap_bytes().to_vec())
451 }
452 }
453
454 fn should_fetch_part(data_id: &ShardId, stats: &PartStats) -> Option<bool> {
455 let stats = stats
456 .key
457 .col("key")?
458 .try_as_optional_struct()
459 .map_err(|err| error!("unexpected stats type for col 'key': {}", err))
460 .ok()?;
461 let stats = stats
462 .some
463 .col("shard_id")?
464 .try_as_string()
465 .map_err(|err| error!("unexpected stats type for col 'shard_id': {}", err))
466 .ok()?;
467 let data_id_str = data_id.to_string();
468 Some(stats.lower <= data_id_str && stats.upper >= data_id_str)
469 }
470}