1use std::error::Error;
11use std::fmt::{self, Debug, Display};
12
13use itertools::Itertools;
14use mz_ore::assert_none;
15use mz_persist_types::codec_impls::UnitSchema;
16use mz_persist_types::schema::SchemaId;
17use mz_persist_types::stats::PartStats;
18use mz_persist_types::txn::{TxnsCodec, TxnsEntry};
19use mz_persist_types::{PersistLocation, ShardId};
20use mz_repr::{Datum, GlobalId, RelationDesc, Row, SqlScalarType};
21use mz_sql_parser::ast::UnresolvedItemName;
22use mz_timely_util::antichain::AntichainExt;
23use serde::{Deserialize, Serialize};
24use timely::progress::Antichain;
25use tracing::error;
26
27use crate::errors::DataflowError;
28use crate::instances::StorageInstanceId;
29use crate::sources::SourceData;
30
31#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
33pub struct CollectionMetadata {
34 pub persist_location: PersistLocation,
36 pub data_shard: ShardId,
38 pub relation_desc: RelationDesc,
40 pub txns_shard: Option<ShardId>,
43}
44
45impl crate::AlterCompatible for CollectionMetadata {
46 fn alter_compatible(&self, id: GlobalId, other: &Self) -> Result<(), self::AlterError> {
47 if self == other {
48 return Ok(());
49 }
50
51 let CollectionMetadata {
52 persist_location: _,
56 data_shard,
57 relation_desc,
58 txns_shard,
59 } = self;
60
61 let compatibility_checks = [
62 (data_shard == &other.data_shard, "data_shard"),
63 (relation_desc == &other.relation_desc, "relation_desc"),
64 (txns_shard == &other.txns_shard, "txns_shard"),
65 ];
66
67 for (compatible, field) in compatibility_checks {
68 if !compatible {
69 tracing::warn!(
70 "CollectionMetadata incompatible at {field}:\nself:\n{:#?}\n\nother\n{:#?}",
71 self,
72 other
73 );
74
75 return Err(AlterError { id });
76 }
77 }
78
79 Ok(())
80 }
81}
82
83#[derive(Clone, Debug, PartialEq, PartialOrd, Ord, Eq, Serialize, Deserialize)]
85pub struct DurableCollectionMetadata {
86 pub data_shard: ShardId,
87}
88
89#[derive(Debug)]
90pub enum StorageError<T> {
91 CollectionIdReused(GlobalId),
94 SinkIdReused(GlobalId),
97 IdentifierMissing(GlobalId),
99 IdentifierInvalid(GlobalId),
101 UpdateBeyondUpper(GlobalId),
103 ReadBeforeSince(GlobalId),
105 InvalidUppers(Vec<InvalidUpper<T>>),
107 IngestionInstanceMissing {
109 storage_instance_id: StorageInstanceId,
110 ingestion_id: GlobalId,
111 },
112 ExportInstanceMissing {
114 storage_instance_id: StorageInstanceId,
115 export_id: GlobalId,
116 },
117 DataflowError(DataflowError),
119 InvalidAlter(AlterError),
121 InvalidUsage(String),
124 ResourceExhausted(&'static str),
126 ShuttingDown(&'static str),
128 CollectionMetadataAlreadyExists(GlobalId),
130 PersistShardAlreadyInUse(ShardId),
132 PersistSchemaEvolveRace {
134 global_id: GlobalId,
135 shard_id: ShardId,
136 schema_id: SchemaId,
137 relation_desc: RelationDesc,
138 },
139 PersistInvalidSchemaEvolve {
141 global_id: GlobalId,
142 shard_id: ShardId,
143 },
144 TxnWalShardAlreadyExists,
146 MissingSubsourceReference {
149 ingestion_id: GlobalId,
150 reference: UnresolvedItemName,
151 },
152 RtrTimeout(GlobalId),
154 RtrDropFailure(GlobalId),
156 Generic(anyhow::Error),
159 ReadOnly,
162}
163
164impl<T: Debug + Display + 'static> Error for StorageError<T> {
165 fn source(&self) -> Option<&(dyn Error + 'static)> {
166 match self {
167 Self::CollectionIdReused(_) => None,
168 Self::SinkIdReused(_) => None,
169 Self::IdentifierMissing(_) => None,
170 Self::IdentifierInvalid(_) => None,
171 Self::UpdateBeyondUpper(_) => None,
172 Self::ReadBeforeSince(_) => None,
173 Self::InvalidUppers(_) => None,
174 Self::IngestionInstanceMissing { .. } => None,
175 Self::ExportInstanceMissing { .. } => None,
176 Self::DataflowError(err) => Some(err),
177 Self::InvalidAlter { .. } => None,
178 Self::InvalidUsage(_) => None,
179 Self::ResourceExhausted(_) => None,
180 Self::ShuttingDown(_) => None,
181 Self::CollectionMetadataAlreadyExists(_) => None,
182 Self::PersistShardAlreadyInUse(_) => None,
183 Self::PersistSchemaEvolveRace { .. } => None,
184 Self::PersistInvalidSchemaEvolve { .. } => None,
185 Self::TxnWalShardAlreadyExists => None,
186 Self::MissingSubsourceReference { .. } => None,
187 Self::RtrTimeout(_) => None,
188 Self::RtrDropFailure(_) => None,
189 Self::Generic(err) => err.source(),
190 Self::ReadOnly => None,
191 }
192 }
193}
194
195impl<T: fmt::Display + 'static> fmt::Display for StorageError<T> {
196 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
197 f.write_str("storage error: ")?;
198 match self {
199 Self::CollectionIdReused(id) => write!(
200 f,
201 "source identifier was re-created after having been dropped: {id}"
202 ),
203 Self::SinkIdReused(id) => write!(
204 f,
205 "sink identifier was re-created after having been dropped: {id}"
206 ),
207 Self::IdentifierMissing(id) => write!(f, "collection identifier is not present: {id}"),
208 Self::IdentifierInvalid(id) => write!(f, "collection identifier is invalid {id}"),
209 Self::UpdateBeyondUpper(id) => {
210 write!(
211 f,
212 "append batch for {id} contained update at or beyond its upper"
213 )
214 }
215 Self::ReadBeforeSince(id) => {
216 write!(f, "read for {id} was at a timestamp before its since")
217 }
218 Self::InvalidUppers(id) => {
219 write!(
220 f,
221 "expected upper was different from the actual upper for: {}",
222 id.iter()
223 .map(|InvalidUpper { id, current_upper }| {
224 format!("(id: {}; actual upper: {})", id, current_upper.pretty())
225 })
226 .join(", ")
227 )
228 }
229 Self::IngestionInstanceMissing {
230 storage_instance_id,
231 ingestion_id,
232 } => write!(
233 f,
234 "instance {} missing for ingestion {}",
235 storage_instance_id, ingestion_id
236 ),
237 Self::ExportInstanceMissing {
238 storage_instance_id,
239 export_id,
240 } => write!(
241 f,
242 "instance {} missing for export {}",
243 storage_instance_id, export_id
244 ),
245 Self::DataflowError(_err) => write!(f, "dataflow failed to process request",),
248 Self::InvalidAlter(err) => std::fmt::Display::fmt(err, f),
249 Self::InvalidUsage(err) => write!(f, "invalid usage: {}", err),
250 Self::ResourceExhausted(rsc) => write!(f, "{rsc} is exhausted"),
251 Self::ShuttingDown(cmp) => write!(f, "{cmp} is shutting down"),
252 Self::CollectionMetadataAlreadyExists(key) => {
253 write!(f, "storage metadata for '{key}' already exists")
254 }
255 Self::PersistShardAlreadyInUse(shard) => {
256 write!(f, "persist shard already in use: {shard}")
257 }
258 Self::PersistSchemaEvolveRace {
259 global_id,
260 shard_id,
261 ..
262 } => {
263 write!(
264 f,
265 "persist raced when trying to evolve the schema of a shard: {global_id}, {shard_id}"
266 )
267 }
268 Self::PersistInvalidSchemaEvolve {
269 global_id,
270 shard_id,
271 } => {
272 write!(
273 f,
274 "persist shard evolved in an invalid way: {global_id}, {shard_id}"
275 )
276 }
277 Self::TxnWalShardAlreadyExists => {
278 write!(f, "txn WAL already exists")
279 }
280 Self::MissingSubsourceReference {
281 ingestion_id,
282 reference,
283 } => write!(
284 f,
285 "ingestion {ingestion_id} unexpectedly missing reference to {}",
286 reference
287 ),
288 Self::RtrTimeout(_) => {
289 write!(
290 f,
291 "timed out before ingesting the source's visible frontier when real-time-recency query issued"
292 )
293 }
294 Self::RtrDropFailure(_) => write!(
295 f,
296 "real-time source dropped before ingesting the upstream system's visible frontier"
297 ),
298 Self::Generic(err) => std::fmt::Display::fmt(err, f),
299 Self::ReadOnly => write!(f, "cannot write in read-only mode"),
300 }
301 }
302}
303
304#[derive(Debug, Clone)]
305pub struct InvalidUpper<T> {
306 pub id: GlobalId,
307 pub current_upper: Antichain<T>,
308}
309
310#[derive(Debug)]
311pub struct AlterError {
312 pub id: GlobalId,
313}
314
315impl Error for AlterError {}
316
317impl fmt::Display for AlterError {
318 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
319 write!(f, "{} cannot be altered in the requested way", self.id)
320 }
321}
322
323impl<T> From<AlterError> for StorageError<T> {
324 fn from(error: AlterError) -> Self {
325 Self::InvalidAlter(error)
326 }
327}
328
329impl<T> From<DataflowError> for StorageError<T> {
330 fn from(error: DataflowError) -> Self {
331 Self::DataflowError(error)
332 }
333}
334
335#[derive(Debug)]
336pub struct TxnsCodecRow;
337
338impl TxnsCodecRow {
339 pub fn desc() -> RelationDesc {
340 RelationDesc::builder()
341 .with_column("shard_id", SqlScalarType::String.nullable(false))
342 .with_column("ts", SqlScalarType::UInt64.nullable(false))
343 .with_column("batch", SqlScalarType::Bytes.nullable(true))
344 .finish()
345 }
346}
347
348impl TxnsCodec for TxnsCodecRow {
349 type Key = SourceData;
350 type Val = ();
351
352 fn schemas() -> (
353 <Self::Key as mz_persist_types::Codec>::Schema,
354 <Self::Val as mz_persist_types::Codec>::Schema,
355 ) {
356 (Self::desc(), UnitSchema)
357 }
358
359 fn encode(e: TxnsEntry) -> (Self::Key, Self::Val) {
360 let row = match &e {
361 TxnsEntry::Register(data_id, ts) => Row::pack([
362 Datum::from(data_id.to_string().as_str()),
363 Datum::from(u64::from_le_bytes(*ts)),
364 Datum::Null,
365 ]),
366 TxnsEntry::Append(data_id, ts, batch) => Row::pack([
367 Datum::from(data_id.to_string().as_str()),
368 Datum::from(u64::from_le_bytes(*ts)),
369 Datum::from(batch.as_slice()),
370 ]),
371 };
372 (SourceData(Ok(row)), ())
373 }
374
375 fn decode(row: SourceData, _: ()) -> TxnsEntry {
376 let mut datums = row.0.as_ref().expect("valid entry").iter();
377 let data_id = datums.next().expect("valid entry").unwrap_str();
378 let data_id = data_id.parse::<ShardId>().expect("valid entry");
379 let ts = datums.next().expect("valid entry");
380 let ts = u64::to_le_bytes(ts.unwrap_uint64());
381 let batch = datums.next().expect("valid entry");
382 assert_none!(datums.next());
383 if batch.is_null() {
384 TxnsEntry::Register(data_id, ts)
385 } else {
386 TxnsEntry::Append(data_id, ts, batch.unwrap_bytes().to_vec())
387 }
388 }
389
390 fn should_fetch_part(data_id: &ShardId, stats: &PartStats) -> Option<bool> {
391 let stats = stats
392 .key
393 .col("key")?
394 .try_as_optional_struct()
395 .map_err(|err| error!("unexpected stats type for col 'key': {}", err))
396 .ok()?;
397 let stats = stats
398 .some
399 .col("shard_id")?
400 .try_as_string()
401 .map_err(|err| error!("unexpected stats type for col 'shard_id': {}", err))
402 .ok()?;
403 let data_id_str = data_id.to_string();
404 Some(stats.lower <= data_id_str && stats.upper >= data_id_str)
405 }
406}