1use std::collections::{BTreeMap, BTreeSet};
13use std::sync::Arc;
14
15use futures::FutureExt;
16use futures::future::BoxFuture;
17use mz_catalog::SYSTEM_CONN_ID;
18use mz_catalog::builtin::{BUILTINS, BuiltinTable, Fingerprint};
19use mz_catalog::config::BuiltinItemMigrationConfig;
20use mz_catalog::durable::objects::SystemObjectUniqueIdentifier;
21use mz_catalog::durable::{
22 DurableCatalogError, FenceError, SystemObjectDescription, SystemObjectMapping, Transaction,
23};
24use mz_catalog::memory::error::{Error, ErrorKind};
25use mz_catalog::memory::objects::CatalogItem;
26use mz_ore::collections::CollectionExt;
27use mz_ore::{halt, soft_assert_or_log, soft_panic_or_log};
28use mz_persist_client::cfg::USE_CRITICAL_SINCE_CATALOG;
29use mz_persist_client::critical::SinceHandle;
30use mz_persist_client::read::ReadHandle;
31use mz_persist_client::write::WriteHandle;
32use mz_persist_client::{Diagnostics, PersistClient};
33use mz_persist_types::ShardId;
34use mz_persist_types::codec_impls::ShardIdSchema;
35use mz_repr::{CatalogItemId, GlobalId, Timestamp};
36use mz_sql::catalog::CatalogItem as _;
37use mz_storage_client::controller::StorageTxn;
38use mz_storage_types::StorageDiff;
39use timely::progress::{Antichain, Timestamp as TimelyTimestamp};
40use tracing::{debug, error};
41
42use crate::catalog::open::builtin_item_migration::persist_schema::{TableKey, TableKeySchema};
43use crate::catalog::state::LocalExpressionCache;
44use crate::catalog::{BuiltinTableUpdate, CatalogState};
45
46pub(crate) struct BuiltinItemMigrationResult {
48 pub(crate) builtin_table_updates: Vec<BuiltinTableUpdate<&'static BuiltinTable>>,
50 pub(crate) migrated_storage_collections_0dt: BTreeSet<CatalogItemId>,
52 pub(crate) cleanup_action: BoxFuture<'static, ()>,
54}
55
56pub(crate) async fn migrate_builtin_items(
58 state: &mut CatalogState,
59 txn: &mut Transaction<'_>,
60 local_expr_cache: &mut LocalExpressionCache,
61 migrated_builtins: Vec<CatalogItemId>,
62 BuiltinItemMigrationConfig {
63 persist_client,
64 read_only,
65 }: BuiltinItemMigrationConfig,
66) -> Result<BuiltinItemMigrationResult, Error> {
67 migrate_builtin_items_0dt(
68 state,
69 txn,
70 local_expr_cache,
71 persist_client,
72 migrated_builtins,
73 read_only,
74 )
75 .await
76}
77
78async fn migrate_builtin_items_0dt(
133 state: &mut CatalogState,
134 txn: &mut Transaction<'_>,
135 local_expr_cache: &mut LocalExpressionCache,
136 persist_client: PersistClient,
137 migrated_builtins: Vec<CatalogItemId>,
138 read_only: bool,
139) -> Result<BuiltinItemMigrationResult, Error> {
140 assert_eq!(
141 read_only,
142 txn.is_savepoint(),
143 "txn must be in savepoint mode when read_only is true, and in writable mode when read_only is false"
144 );
145
146 let build_version = state.config.build_info.semver_version();
147
148 let id_fingerprint_map: BTreeMap<_, _> = BUILTINS::iter(&state.config().builtins_cfg)
150 .map(|builtin| {
151 let id = state.resolve_builtin_object(builtin);
152 let fingerprint = builtin.fingerprint();
153 (id, fingerprint)
154 })
155 .collect();
156 let mut migrated_system_object_mappings = BTreeMap::new();
157 for item_id in &migrated_builtins {
158 let fingerprint = id_fingerprint_map
159 .get(item_id)
160 .expect("missing fingerprint");
161 let entry = state.get_entry(item_id);
162 let schema_name = state
163 .get_schema(
164 &entry.name().qualifiers.database_spec,
165 &entry.name().qualifiers.schema_spec,
166 entry.conn_id().unwrap_or(&SYSTEM_CONN_ID),
167 )
168 .name
169 .schema
170 .as_str();
171 let global_id = state.get_entry(item_id).global_ids().into_element();
173
174 migrated_system_object_mappings.insert(
175 *item_id,
176 SystemObjectMapping {
177 description: SystemObjectDescription {
178 schema_name: schema_name.to_string(),
179 object_type: entry.item_type(),
180 object_name: entry.name().item.clone(),
181 },
182 unique_identifier: SystemObjectUniqueIdentifier {
183 catalog_id: *item_id,
184 global_id,
185 fingerprint: fingerprint.clone(),
186 },
187 },
188 );
189 }
190 txn.update_system_object_mappings(migrated_system_object_mappings)?;
191
192 let organization_id = state.config.environment_id.organization_id();
194 let shard_id = txn
195 .get_builtin_migration_shard()
196 .expect("builtin migration shard should exist for opened catalogs");
197 let diagnostics = Diagnostics {
198 shard_name: "builtin_migration".to_string(),
199 handle_purpose: format!(
200 "builtin table migration shard for org {organization_id:?} version {build_version:?}"
201 ),
202 };
203 let mut since_handle: SinceHandle<TableKey, ShardId, Timestamp, StorageDiff, i64> =
204 persist_client
205 .open_critical_since(
206 shard_id,
207 PersistClient::CONTROLLER_CRITICAL_SINCE,
210 diagnostics.clone(),
211 )
212 .await
213 .expect("invalid usage");
214 let (mut write_handle, mut read_handle): (
215 WriteHandle<TableKey, ShardId, Timestamp, StorageDiff>,
216 ReadHandle<TableKey, ShardId, Timestamp, StorageDiff>,
217 ) = persist_client
218 .open(
219 shard_id,
220 Arc::new(TableKeySchema),
221 Arc::new(ShardIdSchema),
222 diagnostics,
223 USE_CRITICAL_SINCE_CATALOG.get(persist_client.dyncfgs()),
224 )
225 .await
226 .expect("invalid usage");
227 const EMPTY_UPDATES: &[((TableKey, ShardId), Timestamp, StorageDiff)] = &[];
229 let res = write_handle
230 .compare_and_append(
231 EMPTY_UPDATES,
232 Antichain::from_elem(Timestamp::minimum()),
233 Antichain::from_elem(Timestamp::minimum().step_forward()),
234 )
235 .await
236 .expect("invalid usage");
237 if let Err(e) = res {
238 debug!("migration shard already initialized: {e:?}");
239 }
240
241 let migrated_storage_collections: BTreeSet<_> = migrated_builtins
243 .into_iter()
244 .filter_map(|item_id| {
245 let gid = match state.get_entry(&item_id).item() {
246 CatalogItem::Table(table) => {
247 let mut ids: Vec<_> = table.global_ids().collect();
248 assert_eq!(ids.len(), 1, "{ids:?}");
249 ids.pop().expect("checked length")
250 }
251 CatalogItem::Source(source) => source.global_id(),
252 CatalogItem::MaterializedView(mv) => mv.global_id(),
253 CatalogItem::ContinualTask(ct) => ct.global_id(),
254 CatalogItem::Log(_)
255 | CatalogItem::Sink(_)
256 | CatalogItem::View(_)
257 | CatalogItem::Index(_)
258 | CatalogItem::Type(_)
259 | CatalogItem::Func(_)
260 | CatalogItem::Secret(_)
261 | CatalogItem::Connection(_) => return None,
262 };
263 let GlobalId::System(raw_gid) = gid else {
264 unreachable!(
265 "builtin objects must have system ID, found: {item_id:?} with {gid:?}"
266 );
267 };
268 Some(raw_gid)
269 })
270 .collect();
271
272 let upper = fetch_upper(&mut write_handle).await;
276 let as_of = upper.checked_sub(1).ok_or_else(|| {
278 Error::new(ErrorKind::Internal(format!(
279 "builtin migration failed, unexpected upper: {upper:?}"
280 )))
281 })?;
282 let since = read_handle.since();
283 assert!(
284 since.less_equal(&as_of),
285 "since={since:?}, as_of={as_of:?}; since must be less than or equal to as_of"
286 );
287 let as_of = Antichain::from_elem(as_of);
288 let snapshot = read_handle
289 .snapshot_and_fetch(as_of)
290 .await
291 .expect("we have advanced the as_of by the since");
292 soft_assert_or_log!(
293 snapshot.iter().all(|(_, _, diff)| *diff == 1),
294 "snapshot_and_fetch guarantees a consolidated result: {snapshot:?}"
295 );
296 let mut global_id_shards: BTreeMap<_, _> = snapshot
297 .into_iter()
298 .map(|((key, value), _ts, _diff)| {
299 let table_key = key.expect("persist decoding error");
300 let shard_id = value.expect("persist decoding error");
301 (table_key, shard_id)
302 })
303 .collect();
304
305 let mut migrated_shard_updates: Vec<((TableKey, ShardId), Timestamp, StorageDiff)> = Vec::new();
307 let mut migration_shards_to_finalize = BTreeSet::new();
308 let storage_collection_metadata = {
309 let txn: &mut dyn StorageTxn<Timestamp> = txn;
310 txn.get_collection_metadata()
311 };
312 for (table_key, shard_id) in global_id_shards.clone() {
313 if table_key.build_version > build_version {
314 halt!(
315 "saw build version {}, which is greater than current build version {}",
316 table_key.build_version,
317 build_version
318 );
319 }
320
321 if !migrated_storage_collections.contains(&table_key.global_id)
322 || table_key.build_version < build_version
323 {
324 global_id_shards.remove(&table_key);
325 if storage_collection_metadata.get(&GlobalId::System(table_key.global_id))
326 == Some(&shard_id)
327 {
328 migrated_shard_updates.push(((table_key, shard_id.clone()), upper, -1));
329 } else {
330 migration_shards_to_finalize.insert((table_key, shard_id));
331 }
332 }
333 }
334
335 let mut global_id_shards: BTreeMap<_, _> = global_id_shards
337 .into_iter()
338 .map(|(table_key, shard_id)| (table_key.global_id, shard_id))
339 .collect();
340 for global_id in migrated_storage_collections {
341 if !global_id_shards.contains_key(&global_id) {
342 let shard_id = ShardId::new();
343 global_id_shards.insert(global_id, shard_id);
344 let table_key = TableKey {
345 global_id,
346 build_version: build_version.clone(),
347 };
348 migrated_shard_updates.push(((table_key, shard_id), upper, 1));
349 }
350 }
351
352 let upper = if !migrated_shard_updates.is_empty() {
355 write_to_migration_shard(
356 migrated_shard_updates,
357 upper,
358 &mut write_handle,
359 &mut since_handle,
360 )
361 .await?
362 } else {
363 upper
364 };
365
366 let migrated_storage_collections_0dt = {
368 let txn: &mut dyn StorageTxn<Timestamp> = txn;
369 let storage_collection_metadata = txn.get_collection_metadata();
370 let global_id_shards: BTreeMap<_, _> = global_id_shards
371 .into_iter()
372 .map(|(global_id, shard_id)| (GlobalId::System(global_id), shard_id))
373 .filter(|(global_id, shard_id)| {
374 storage_collection_metadata.get(global_id) != Some(shard_id)
375 })
376 .collect();
377 let global_ids: BTreeSet<_> = global_id_shards.keys().cloned().collect();
378 let mut old_shard_ids: BTreeSet<_> = txn
379 .delete_collection_metadata(global_ids.clone())
380 .into_iter()
381 .map(|(_, shard_id)| shard_id)
382 .collect();
383 old_shard_ids.extend(
384 migration_shards_to_finalize
385 .iter()
386 .map(|(_, shard_id)| shard_id),
387 );
388 txn.insert_unfinalized_shards(old_shard_ids).map_err(|e| {
389 Error::new(ErrorKind::Internal(format!(
390 "builtin migration failed: {e}"
391 )))
392 })?;
393 txn.insert_collection_metadata(global_id_shards)
394 .map_err(|e| {
395 Error::new(ErrorKind::Internal(format!(
396 "builtin migration failed: {e}"
397 )))
398 })?;
399 global_ids
400 };
401
402 let migrated_storage_collections_0dt = migrated_storage_collections_0dt
404 .into_iter()
405 .map(|gid| state.get_entry_by_global_id(&gid).id())
406 .collect();
407
408 let updates = txn.get_and_commit_op_updates();
409 let builtin_table_updates = state
410 .apply_updates_for_bootstrap(updates, local_expr_cache)
411 .await;
412
413 let cleanup_action = async move {
414 if !read_only {
415 let updates: Vec<_> = migration_shards_to_finalize
416 .into_iter()
417 .map(|(table_key, shard_id)| ((table_key, shard_id), upper, -1))
418 .collect();
419 if !updates.is_empty() {
420 let res =
424 write_to_migration_shard(updates, upper, &mut write_handle, &mut since_handle)
425 .await;
426 if let Err(e) = res {
427 error!("Unable to remove old entries from migration shard: {e:?}");
428 }
429 }
430 }
431 }
432 .boxed();
433
434 Ok(BuiltinItemMigrationResult {
435 builtin_table_updates,
436 migrated_storage_collections_0dt,
437 cleanup_action,
438 })
439}
440
441async fn fetch_upper(
442 write_handle: &mut WriteHandle<TableKey, ShardId, Timestamp, StorageDiff>,
443) -> Timestamp {
444 write_handle
445 .fetch_recent_upper()
446 .await
447 .as_option()
448 .cloned()
449 .expect("we use a totally ordered time and never finalize the shard")
450}
451
452async fn write_to_migration_shard(
453 updates: Vec<((TableKey, ShardId), Timestamp, StorageDiff)>,
454 upper: Timestamp,
455 write_handle: &mut WriteHandle<TableKey, ShardId, Timestamp, StorageDiff>,
456 since_handle: &mut SinceHandle<TableKey, ShardId, Timestamp, StorageDiff, i64>,
457) -> Result<Timestamp, Error> {
458 let next_upper = upper.step_forward();
459 let downgrade_to = Antichain::from_elem(next_upper.saturating_sub(1));
461 let next_upper_antichain = Antichain::from_elem(next_upper);
462
463 if let Err(err) = write_handle
464 .compare_and_append(updates, Antichain::from_elem(upper), next_upper_antichain)
465 .await
466 .expect("invalid usage")
467 {
468 return Err(Error::new(ErrorKind::Durable(DurableCatalogError::Fence(
469 FenceError::migration(err),
470 ))));
471 }
472
473 let opaque = *since_handle.opaque();
478 let downgrade = since_handle
479 .maybe_compare_and_downgrade_since(&opaque, (&opaque, &downgrade_to))
480 .await;
481 match downgrade {
482 None => {}
483 Some(Err(e)) => soft_panic_or_log!("found opaque value {e}, but expected {opaque}"),
484 Some(Ok(updated)) => soft_assert_or_log!(
485 updated == downgrade_to,
486 "updated bound ({updated:?}) should match expected ({downgrade_to:?})"
487 ),
488 }
489
490 Ok(next_upper)
491}
492
493mod persist_schema {
494 use std::num::ParseIntError;
495
496 use arrow::array::{StringArray, StringBuilder};
497 use bytes::{BufMut, Bytes};
498 use mz_persist_types::Codec;
499 use mz_persist_types::codec_impls::{
500 SimpleColumnarData, SimpleColumnarDecoder, SimpleColumnarEncoder,
501 };
502 use mz_persist_types::columnar::Schema;
503 use mz_persist_types::stats::NoneStats;
504
505 #[derive(Debug, Clone, Eq, Ord, PartialEq, PartialOrd)]
506 pub(super) struct TableKey {
507 pub(super) global_id: u64,
508 pub(super) build_version: semver::Version,
509 }
510
511 impl std::fmt::Display for TableKey {
512 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
513 write!(f, "{}-{}", self.global_id, self.build_version)
514 }
515 }
516
517 impl std::str::FromStr for TableKey {
518 type Err = String;
519
520 fn from_str(s: &str) -> Result<Self, Self::Err> {
521 let parts: Vec<_> = s.splitn(2, '-').collect();
522 let &[global_id, build_version] = parts.as_slice() else {
523 return Err(format!("invalid TableKey '{s}'"));
524 };
525 let global_id = global_id
526 .parse()
527 .map_err(|e: ParseIntError| e.to_string())?;
528 let build_version = build_version
529 .parse()
530 .map_err(|e: semver::Error| e.to_string())?;
531 Ok(TableKey {
532 global_id,
533 build_version,
534 })
535 }
536 }
537
538 impl From<TableKey> for String {
539 fn from(table_key: TableKey) -> Self {
540 table_key.to_string()
541 }
542 }
543
544 impl TryFrom<String> for TableKey {
545 type Error = String;
546
547 fn try_from(s: String) -> Result<Self, Self::Error> {
548 s.parse()
549 }
550 }
551
552 impl Default for TableKey {
553 fn default() -> Self {
554 Self {
555 global_id: Default::default(),
556 build_version: semver::Version::new(0, 0, 0),
557 }
558 }
559 }
560
561 impl Codec for TableKey {
562 type Storage = ();
563 type Schema = TableKeySchema;
564 fn codec_name() -> String {
565 "TableKey".into()
566 }
567 fn encode<B: BufMut>(&self, buf: &mut B) {
568 buf.put(self.to_string().as_bytes())
569 }
570 fn decode<'a>(buf: &'a [u8], _schema: &TableKeySchema) -> Result<Self, String> {
571 let table_key = String::from_utf8(buf.to_owned()).map_err(|err| err.to_string())?;
572 table_key.parse()
573 }
574 fn encode_schema(_schema: &Self::Schema) -> Bytes {
575 Bytes::new()
576 }
577 fn decode_schema(buf: &Bytes) -> Self::Schema {
578 assert_eq!(*buf, Bytes::new());
579 TableKeySchema
580 }
581 }
582
583 impl SimpleColumnarData for TableKey {
584 type ArrowBuilder = StringBuilder;
585 type ArrowColumn = StringArray;
586
587 fn goodbytes(builder: &Self::ArrowBuilder) -> usize {
588 builder.values_slice().len()
589 }
590
591 fn push(&self, builder: &mut Self::ArrowBuilder) {
592 builder.append_value(&self.to_string());
593 }
594 fn push_null(builder: &mut Self::ArrowBuilder) {
595 builder.append_null();
596 }
597 fn read(&mut self, idx: usize, column: &Self::ArrowColumn) {
598 *self = column.value(idx).parse().expect("should be valid TableKey");
599 }
600 }
601
602 #[derive(Debug, PartialEq)]
604 pub(super) struct TableKeySchema;
605
606 impl Schema<TableKey> for TableKeySchema {
607 type ArrowColumn = StringArray;
608 type Statistics = NoneStats;
609
610 type Decoder = SimpleColumnarDecoder<TableKey>;
611 type Encoder = SimpleColumnarEncoder<TableKey>;
612
613 fn encoder(&self) -> Result<Self::Encoder, anyhow::Error> {
614 Ok(SimpleColumnarEncoder::default())
615 }
616
617 fn decoder(&self, col: Self::ArrowColumn) -> Result<Self::Decoder, anyhow::Error> {
618 Ok(SimpleColumnarDecoder::new(col))
619 }
620 }
621}