1use std::error::Error;
11use std::fmt::{self, Debug};
12
13use itertools::Itertools;
14use mz_ore::assert_none;
15use mz_persist_types::codec_impls::UnitSchema;
16use mz_persist_types::schema::SchemaId;
17use mz_persist_types::stats::PartStats;
18use mz_persist_types::txn::{TxnsCodec, TxnsEntry};
19use mz_persist_types::{PersistLocation, ShardId};
20use mz_repr::{Datum, GlobalId, RelationDesc, Row, SqlScalarType, Timestamp};
21use mz_sql_parser::ast::UnresolvedItemName;
22use mz_timely_util::antichain::AntichainExt;
23use serde::{Deserialize, Serialize};
24use timely::progress::Antichain;
25use tracing::error;
26
27use crate::errors::{CollectionMissing, DataflowError};
28use crate::instances::StorageInstanceId;
29use crate::sources::SourceData;
30
31#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
33pub struct CollectionMetadata {
34 pub persist_location: PersistLocation,
36 pub data_shard: ShardId,
38 pub relation_desc: RelationDesc,
40 pub txns_shard: Option<ShardId>,
43}
44
45impl crate::AlterCompatible for CollectionMetadata {
46 fn alter_compatible(&self, id: GlobalId, other: &Self) -> Result<(), self::AlterError> {
47 if self == other {
48 return Ok(());
49 }
50
51 let CollectionMetadata {
52 persist_location: _,
56 data_shard,
57 relation_desc,
58 txns_shard,
59 } = self;
60
61 let compatibility_checks = [
62 (data_shard == &other.data_shard, "data_shard"),
63 (relation_desc == &other.relation_desc, "relation_desc"),
64 (txns_shard == &other.txns_shard, "txns_shard"),
65 ];
66
67 for (compatible, field) in compatibility_checks {
68 if !compatible {
69 tracing::warn!(
70 "CollectionMetadata incompatible at {field}:\nself:\n{:#?}\n\nother\n{:#?}",
71 self,
72 other
73 );
74
75 return Err(AlterError { id });
76 }
77 }
78
79 Ok(())
80 }
81}
82
83#[derive(Clone, Debug, PartialEq, PartialOrd, Ord, Eq, Serialize, Deserialize)]
85pub struct DurableCollectionMetadata {
86 pub data_shard: ShardId,
87}
88
89#[derive(Debug)]
90pub enum StorageError {
91 CollectionIdReused(GlobalId),
94 SinkIdReused(GlobalId),
97 IdentifierMissing(GlobalId),
99 IdentifierInvalid(GlobalId),
101 UpdateBeyondUpper(GlobalId),
103 ReadBeforeSince(GlobalId),
105 InvalidUppers(Vec<InvalidUpper>),
107 IngestionInstanceMissing {
109 storage_instance_id: StorageInstanceId,
110 ingestion_id: GlobalId,
111 },
112 ExportInstanceMissing {
114 storage_instance_id: StorageInstanceId,
115 export_id: GlobalId,
116 },
117 DataflowError(DataflowError),
119 InvalidAlter(AlterError),
121 InvalidUsage(String),
124 ShuttingDown(&'static str),
126 CollectionMetadataAlreadyExists(GlobalId),
128 PersistShardAlreadyInUse(ShardId),
130 PersistSchemaEvolveRace {
132 global_id: GlobalId,
133 shard_id: ShardId,
134 schema_id: SchemaId,
135 relation_desc: RelationDesc,
136 },
137 PersistInvalidSchemaEvolve {
139 global_id: GlobalId,
140 shard_id: ShardId,
141 },
142 TxnWalShardAlreadyExists,
144 MissingSubsourceReference {
147 ingestion_id: GlobalId,
148 reference: UnresolvedItemName,
149 },
150 RtrTimeout(GlobalId),
152 RtrDropFailure(GlobalId),
154 Generic(anyhow::Error),
157 ReadOnly,
160}
161
162impl Error for StorageError {
163 fn source(&self) -> Option<&(dyn Error + 'static)> {
164 match self {
165 Self::CollectionIdReused(_) => None,
166 Self::SinkIdReused(_) => None,
167 Self::IdentifierMissing(_) => None,
168 Self::IdentifierInvalid(_) => None,
169 Self::UpdateBeyondUpper(_) => None,
170 Self::ReadBeforeSince(_) => None,
171 Self::InvalidUppers(_) => None,
172 Self::IngestionInstanceMissing { .. } => None,
173 Self::ExportInstanceMissing { .. } => None,
174 Self::DataflowError(err) => Some(err),
175 Self::InvalidAlter { .. } => None,
176 Self::InvalidUsage(_) => None,
177 Self::ShuttingDown(_) => None,
178 Self::CollectionMetadataAlreadyExists(_) => None,
179 Self::PersistShardAlreadyInUse(_) => None,
180 Self::PersistSchemaEvolveRace { .. } => None,
181 Self::PersistInvalidSchemaEvolve { .. } => None,
182 Self::TxnWalShardAlreadyExists => None,
183 Self::MissingSubsourceReference { .. } => None,
184 Self::RtrTimeout(_) => None,
185 Self::RtrDropFailure(_) => None,
186 Self::Generic(err) => err.source(),
187 Self::ReadOnly => None,
188 }
189 }
190}
191
192impl fmt::Display for StorageError {
193 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
194 f.write_str("storage error: ")?;
195 match self {
196 Self::CollectionIdReused(id) => write!(
197 f,
198 "source identifier was re-created after having been dropped: {id}"
199 ),
200 Self::SinkIdReused(id) => write!(
201 f,
202 "sink identifier was re-created after having been dropped: {id}"
203 ),
204 Self::IdentifierMissing(id) => write!(f, "collection identifier is not present: {id}"),
205 Self::IdentifierInvalid(id) => write!(f, "collection identifier is invalid {id}"),
206 Self::UpdateBeyondUpper(id) => {
207 write!(
208 f,
209 "append batch for {id} contained update at or beyond its upper"
210 )
211 }
212 Self::ReadBeforeSince(id) => {
213 write!(f, "read for {id} was at a timestamp before its since")
214 }
215 Self::InvalidUppers(id) => {
216 write!(
217 f,
218 "expected upper was different from the actual upper for: {}",
219 id.iter()
220 .map(|InvalidUpper { id, current_upper }| {
221 format!("(id: {}; actual upper: {})", id, current_upper.pretty())
222 })
223 .join(", ")
224 )
225 }
226 Self::IngestionInstanceMissing {
227 storage_instance_id,
228 ingestion_id,
229 } => write!(
230 f,
231 "instance {} missing for ingestion {}",
232 storage_instance_id, ingestion_id
233 ),
234 Self::ExportInstanceMissing {
235 storage_instance_id,
236 export_id,
237 } => write!(
238 f,
239 "instance {} missing for export {}",
240 storage_instance_id, export_id
241 ),
242 Self::DataflowError(_err) => write!(f, "dataflow failed to process request",),
245 Self::InvalidAlter(err) => std::fmt::Display::fmt(err, f),
246 Self::InvalidUsage(err) => write!(f, "invalid usage: {}", err),
247 Self::ShuttingDown(cmp) => write!(f, "{cmp} is shutting down"),
248 Self::CollectionMetadataAlreadyExists(key) => {
249 write!(f, "storage metadata for '{key}' already exists")
250 }
251 Self::PersistShardAlreadyInUse(shard) => {
252 write!(f, "persist shard already in use: {shard}")
253 }
254 Self::PersistSchemaEvolveRace {
255 global_id,
256 shard_id,
257 ..
258 } => {
259 write!(
260 f,
261 "persist raced when trying to evolve the schema of a shard: {global_id}, {shard_id}"
262 )
263 }
264 Self::PersistInvalidSchemaEvolve {
265 global_id,
266 shard_id,
267 } => {
268 write!(
269 f,
270 "persist shard evolved in an invalid way: {global_id}, {shard_id}"
271 )
272 }
273 Self::TxnWalShardAlreadyExists => {
274 write!(f, "txn WAL already exists")
275 }
276 Self::MissingSubsourceReference {
277 ingestion_id,
278 reference,
279 } => write!(
280 f,
281 "ingestion {ingestion_id} unexpectedly missing reference to {}",
282 reference
283 ),
284 Self::RtrTimeout(_) => {
285 write!(
286 f,
287 "timed out before ingesting the source's visible frontier when real-time-recency query issued"
288 )
289 }
290 Self::RtrDropFailure(_) => write!(
291 f,
292 "real-time source dropped before ingesting the upstream system's visible frontier"
293 ),
294 Self::Generic(err) => std::fmt::Display::fmt(err, f),
295 Self::ReadOnly => write!(f, "cannot write in read-only mode"),
296 }
297 }
298}
299
300#[derive(Debug, Clone)]
301pub struct InvalidUpper {
302 pub id: GlobalId,
303 pub current_upper: Antichain<Timestamp>,
304}
305
306#[derive(Debug)]
307pub struct AlterError {
308 pub id: GlobalId,
309}
310
311impl Error for AlterError {}
312
313impl fmt::Display for AlterError {
314 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
315 write!(f, "{} cannot be altered in the requested way", self.id)
316 }
317}
318
319impl From<AlterError> for StorageError {
320 fn from(error: AlterError) -> Self {
321 Self::InvalidAlter(error)
322 }
323}
324
325impl From<DataflowError> for StorageError {
326 fn from(error: DataflowError) -> Self {
327 Self::DataflowError(error)
328 }
329}
330
331impl From<CollectionMissing> for StorageError {
332 fn from(err: CollectionMissing) -> Self {
333 Self::IdentifierMissing(err.0)
334 }
335}
336
337#[derive(Debug)]
338pub struct TxnsCodecRow;
339
340impl TxnsCodecRow {
341 pub fn desc() -> RelationDesc {
342 RelationDesc::builder()
343 .with_column("shard_id", SqlScalarType::String.nullable(false))
344 .with_column("ts", SqlScalarType::UInt64.nullable(false))
345 .with_column("batch", SqlScalarType::Bytes.nullable(true))
346 .finish()
347 }
348}
349
350impl TxnsCodec for TxnsCodecRow {
351 type Key = SourceData;
352 type Val = ();
353
354 fn schemas() -> (
355 <Self::Key as mz_persist_types::Codec>::Schema,
356 <Self::Val as mz_persist_types::Codec>::Schema,
357 ) {
358 (Self::desc(), UnitSchema)
359 }
360
361 fn encode(e: TxnsEntry) -> (Self::Key, Self::Val) {
362 let row = match &e {
363 TxnsEntry::Register(data_id, ts) => Row::pack([
364 Datum::from(data_id.to_string().as_str()),
365 Datum::from(u64::from_le_bytes(*ts)),
366 Datum::Null,
367 ]),
368 TxnsEntry::Append(data_id, ts, batch) => Row::pack([
369 Datum::from(data_id.to_string().as_str()),
370 Datum::from(u64::from_le_bytes(*ts)),
371 Datum::from(batch.as_slice()),
372 ]),
373 };
374 (SourceData(Ok(row)), ())
375 }
376
377 fn decode(row: SourceData, _: ()) -> TxnsEntry {
378 let mut datums = row.0.as_ref().expect("valid entry").iter();
379 let data_id = datums.next().expect("valid entry").unwrap_str();
380 let data_id = data_id.parse::<ShardId>().expect("valid entry");
381 let ts = datums.next().expect("valid entry");
382 let ts = u64::to_le_bytes(ts.unwrap_uint64());
383 let batch = datums.next().expect("valid entry");
384 assert_none!(datums.next());
385 if batch.is_null() {
386 TxnsEntry::Register(data_id, ts)
387 } else {
388 TxnsEntry::Append(data_id, ts, batch.unwrap_bytes().to_vec())
389 }
390 }
391
392 fn should_fetch_part(data_id: &ShardId, stats: &PartStats) -> Option<bool> {
393 let stats = stats
394 .key
395 .col("ok")?
396 .try_as_optional_struct()
397 .map_err(|err| error!("unexpected stats type for col 'ok': {}", err))
398 .ok()?;
399 let stats = stats
400 .some
401 .col("shard_id")?
402 .try_as_string()
403 .map_err(|err| error!("unexpected stats type for col 'shard_id': {}", err))
404 .ok()?;
405 let data_id_str = data_id.to_string();
406 Some(stats.lower <= data_id_str && stats.upper >= data_id_str)
407 }
408}