1use std::collections::{BTreeMap, BTreeSet};
13use std::sync::Arc;
14
15use futures::FutureExt;
16use futures::future::BoxFuture;
17use mz_adapter_types::dyncfgs::ENABLE_BUILTIN_MIGRATION_SCHEMA_EVOLUTION;
18use mz_catalog::SYSTEM_CONN_ID;
19use mz_catalog::builtin::{BUILTINS, BuiltinTable, Fingerprint};
20use mz_catalog::config::BuiltinItemMigrationConfig;
21use mz_catalog::durable::objects::SystemObjectUniqueIdentifier;
22use mz_catalog::durable::{
23 DurableCatalogError, FenceError, SystemObjectDescription, SystemObjectMapping, Transaction,
24};
25use mz_catalog::memory::error::{Error, ErrorKind};
26use mz_catalog::memory::objects::CatalogItem;
27use mz_ore::collections::CollectionExt;
28use mz_ore::{halt, soft_assert_or_log, soft_panic_or_log};
29use mz_persist_client::cfg::USE_CRITICAL_SINCE_CATALOG;
30use mz_persist_client::critical::SinceHandle;
31use mz_persist_client::read::ReadHandle;
32use mz_persist_client::schema::CaESchema;
33use mz_persist_client::write::WriteHandle;
34use mz_persist_client::{Diagnostics, PersistClient};
35use mz_persist_types::ShardId;
36use mz_persist_types::codec_impls::{ShardIdSchema, UnitSchema};
37use mz_repr::{CatalogItemId, GlobalId, Timestamp};
38use mz_sql::catalog::CatalogItem as _;
39use mz_storage_client::controller::StorageTxn;
40use mz_storage_types::StorageDiff;
41use mz_storage_types::sources::SourceData;
42use timely::progress::{Antichain, Timestamp as TimelyTimestamp};
43use tracing::{debug, error, info, warn};
44
45use crate::catalog::open::builtin_item_migration::persist_schema::{TableKey, TableKeySchema};
46use crate::catalog::state::LocalExpressionCache;
47use crate::catalog::{BuiltinTableUpdate, CatalogState};
48
49pub(crate) struct BuiltinItemMigrationResult {
51 pub(crate) builtin_table_updates: Vec<BuiltinTableUpdate<&'static BuiltinTable>>,
53 pub(crate) migrated_storage_collections_0dt: BTreeSet<CatalogItemId>,
55 pub(crate) cleanup_action: BoxFuture<'static, ()>,
57}
58
59pub(crate) async fn migrate_builtin_items(
70 state: &mut CatalogState,
71 txn: &mut Transaction<'_>,
72 local_expr_cache: &mut LocalExpressionCache,
73 migrated_builtins: Vec<CatalogItemId>,
74 BuiltinItemMigrationConfig {
75 persist_client,
76 read_only,
77 }: BuiltinItemMigrationConfig,
78) -> Result<BuiltinItemMigrationResult, Error> {
79 assert_eq!(
80 read_only,
81 txn.is_savepoint(),
82 "txn must be in savepoint mode when read_only is true, and in writable mode otherwise",
83 );
84
85 update_catalog_fingerprints(state, txn, &migrated_builtins)?;
86
87 let mut collections_to_migrate: Vec<_> = migrated_builtins
89 .into_iter()
90 .filter_map(|id| {
91 use CatalogItem::*;
92 match &state.get_entry(&id).item() {
93 Table(table) => Some(table.global_ids().into_element()),
94 Source(source) => Some(source.global_id()),
95 MaterializedView(mv) => Some(mv.global_id()),
96 ContinualTask(ct) => Some(ct.global_id()),
97 Log(_) | Sink(_) | View(_) | Index(_) | Type(_) | Func(_) | Secret(_)
98 | Connection(_) => None,
99 }
100 })
101 .collect();
102
103 if ENABLE_BUILTIN_MIGRATION_SCHEMA_EVOLUTION.get(state.system_config().dyncfgs()) {
110 collections_to_migrate =
111 try_evolve_persist_schemas(state, txn, collections_to_migrate, &persist_client).await?;
112 } else {
113 info!("skipping builtin migration by schema evolution");
114 }
115
116 migrate_builtin_collections_incompatible(
120 state,
121 txn,
122 local_expr_cache,
123 persist_client,
124 collections_to_migrate,
125 read_only,
126 )
127 .await
128}
129
130fn update_catalog_fingerprints(
132 state: &CatalogState,
133 txn: &mut Transaction<'_>,
134 migrated_builtins: &[CatalogItemId],
135) -> Result<(), Error> {
136 let id_fingerprint_map: BTreeMap<_, _> = BUILTINS::iter(&state.config().builtins_cfg)
137 .map(|builtin| {
138 let id = state.resolve_builtin_object(builtin);
139 let fingerprint = builtin.fingerprint();
140 (id, fingerprint)
141 })
142 .collect();
143 let mut migrated_system_object_mappings = BTreeMap::new();
144 for item_id in migrated_builtins {
145 let fingerprint = id_fingerprint_map
146 .get(item_id)
147 .expect("missing fingerprint");
148 let entry = state.get_entry(item_id);
149 let schema_name = state
150 .get_schema(
151 &entry.name().qualifiers.database_spec,
152 &entry.name().qualifiers.schema_spec,
153 entry.conn_id().unwrap_or(&SYSTEM_CONN_ID),
154 )
155 .name
156 .schema
157 .as_str();
158 let global_id = state.get_entry(item_id).global_ids().into_element();
160
161 migrated_system_object_mappings.insert(
162 *item_id,
163 SystemObjectMapping {
164 description: SystemObjectDescription {
165 schema_name: schema_name.to_string(),
166 object_type: entry.item_type(),
167 object_name: entry.name().item.clone(),
168 },
169 unique_identifier: SystemObjectUniqueIdentifier {
170 catalog_id: *item_id,
171 global_id,
172 fingerprint: fingerprint.clone(),
173 },
174 },
175 );
176 }
177 txn.update_system_object_mappings(migrated_system_object_mappings)?;
178
179 Ok(())
180}
181
182async fn try_evolve_persist_schemas(
192 state: &CatalogState,
193 txn: &Transaction<'_>,
194 migrated_storage_collections: Vec<GlobalId>,
195 persist_client: &PersistClient,
196) -> Result<Vec<GlobalId>, Error> {
197 let collection_metadata = txn.get_collection_metadata();
198
199 let mut failed = Vec::new();
200 for id in migrated_storage_collections {
201 let Some(&shard_id) = collection_metadata.get(&id) else {
202 return Err(Error::new(ErrorKind::Internal(format!(
203 "builtin migration: missing metadata for builtin collection {id}"
204 ))));
205 };
206
207 let diagnostics = Diagnostics {
208 shard_name: id.to_string(),
209 handle_purpose: "migrate builtin schema".to_string(),
210 };
211 let Some((old_schema_id, old_schema, _)) = persist_client
212 .latest_schema::<SourceData, (), Timestamp, StorageDiff>(shard_id, diagnostics.clone())
213 .await
214 .expect("invalid usage")
215 else {
216 info!(%id, "builtin schema evolution failed: missing latest schema");
220 failed.push(id);
221 continue;
222 };
223
224 let entry = state.get_entry_by_global_id(&id);
225 let Some(new_schema) = entry.desc_opt_latest() else {
226 return Err(Error::new(ErrorKind::Internal(format!(
227 "builtin migration: missing new schema for builtin collection {id}"
228 ))));
229 };
230
231 info!(%id, ?old_schema, ?new_schema, "attempting builtin schema evolution");
232
233 let result = persist_client
234 .compare_and_evolve_schema::<SourceData, (), Timestamp, StorageDiff>(
235 shard_id,
236 old_schema_id,
237 &new_schema,
238 &UnitSchema,
239 diagnostics,
240 )
241 .await
242 .expect("invalid usage");
243
244 match result {
245 CaESchema::Ok(_) => {
246 info!("builtin schema evolution succeeded");
247 }
248 CaESchema::Incompatible => {
249 info!("builtin schema evolution failed: incompatible");
250 failed.push(id);
251 }
252 CaESchema::ExpectedMismatch { schema_id, .. } => {
253 return Err(Error::new(ErrorKind::Internal(format!(
254 "builtin migration: unexpected schema mismatch ({} != {})",
255 schema_id, old_schema_id,
256 ))));
257 }
258 }
259 }
260
261 Ok(failed)
262}
263
264async fn migrate_builtin_collections_incompatible(
311 state: &mut CatalogState,
312 txn: &mut Transaction<'_>,
313 local_expr_cache: &mut LocalExpressionCache,
314 persist_client: PersistClient,
315 migrated_storage_collections: Vec<GlobalId>,
316 read_only: bool,
317) -> Result<BuiltinItemMigrationResult, Error> {
318 let build_version = state.config.build_info.semver_version();
319
320 let migrated_storage_collections: Vec<_> = migrated_storage_collections
323 .into_iter()
324 .map(|gid| match gid {
325 GlobalId::System(raw) => raw,
326 _ => panic!("builtins must have system IDs"),
327 })
328 .collect();
329
330 let organization_id = state.config.environment_id.organization_id();
332 let shard_id = txn
333 .get_builtin_migration_shard()
334 .expect("builtin migration shard should exist for opened catalogs");
335 let diagnostics = Diagnostics {
336 shard_name: "builtin_migration".to_string(),
337 handle_purpose: format!(
338 "builtin table migration shard for org {organization_id:?} version {build_version:?}"
339 ),
340 };
341 let mut since_handle: SinceHandle<TableKey, ShardId, Timestamp, StorageDiff, i64> =
342 persist_client
343 .open_critical_since(
344 shard_id,
345 PersistClient::CONTROLLER_CRITICAL_SINCE,
348 diagnostics.clone(),
349 )
350 .await
351 .expect("invalid usage");
352 let (mut write_handle, mut read_handle): (
353 WriteHandle<TableKey, ShardId, Timestamp, StorageDiff>,
354 ReadHandle<TableKey, ShardId, Timestamp, StorageDiff>,
355 ) = persist_client
356 .open(
357 shard_id,
358 Arc::new(TableKeySchema),
359 Arc::new(ShardIdSchema),
360 diagnostics,
361 USE_CRITICAL_SINCE_CATALOG.get(persist_client.dyncfgs()),
362 )
363 .await
364 .expect("invalid usage");
365 const EMPTY_UPDATES: &[((TableKey, ShardId), Timestamp, StorageDiff)] = &[];
367 let res = write_handle
368 .compare_and_append(
369 EMPTY_UPDATES,
370 Antichain::from_elem(Timestamp::minimum()),
371 Antichain::from_elem(Timestamp::minimum().step_forward()),
372 )
373 .await
374 .expect("invalid usage");
375 if let Err(e) = res {
376 debug!("migration shard already initialized: {e:?}");
377 }
378
379 let upper = fetch_upper(&mut write_handle).await;
383 let as_of = upper.checked_sub(1).ok_or_else(|| {
385 Error::new(ErrorKind::Internal(format!(
386 "builtin migration failed, unexpected upper: {upper:?}"
387 )))
388 })?;
389 let since = read_handle.since();
390 assert!(
391 since.less_equal(&as_of),
392 "since={since:?}, as_of={as_of:?}; since must be less than or equal to as_of"
393 );
394 let as_of = Antichain::from_elem(as_of);
395 let snapshot = read_handle
396 .snapshot_and_fetch(as_of)
397 .await
398 .expect("we have advanced the as_of by the since");
399 soft_assert_or_log!(
400 snapshot.iter().all(|(_, _, diff)| *diff == 1),
401 "snapshot_and_fetch guarantees a consolidated result: {snapshot:?}"
402 );
403 let mut global_id_shards: BTreeMap<_, _> = snapshot
404 .into_iter()
405 .filter_map(|(data, _ts, _diff)| {
406 if let (Ok(table_key), Ok(shard_id)) = data {
407 Some((table_key, shard_id))
408 } else {
409 warn!("skipping unreadable migration shard entry: {data:?}");
412 None
413 }
414 })
415 .collect();
416
417 let mut migrated_shard_updates: Vec<((TableKey, ShardId), Timestamp, StorageDiff)> = Vec::new();
419 let mut migration_shards_to_finalize = BTreeSet::new();
420 let storage_collection_metadata = txn.get_collection_metadata();
421 for (table_key, shard_id) in global_id_shards.clone() {
422 if table_key.build_version > build_version {
423 if read_only {
424 halt!(
425 "saw build version {}, which is greater than current build version {}",
426 table_key.build_version,
427 build_version
428 );
429 } else {
430 warn!(
436 %table_key.build_version, %build_version,
437 "saw build version which is greater than current build version",
438 );
439 global_id_shards.remove(&table_key);
440 continue;
441 }
442 }
443
444 if !migrated_storage_collections.contains(&table_key.global_id)
445 || table_key.build_version < build_version
446 {
447 global_id_shards.remove(&table_key);
448 if storage_collection_metadata.get(&GlobalId::System(table_key.global_id))
449 == Some(&shard_id)
450 {
451 migrated_shard_updates.push(((table_key, shard_id.clone()), upper, -1));
452 } else {
453 migration_shards_to_finalize.insert((table_key, shard_id));
454 }
455 }
456 }
457
458 let mut global_id_shards: BTreeMap<_, _> = global_id_shards
460 .into_iter()
461 .map(|(table_key, shard_id)| (table_key.global_id, shard_id))
462 .collect();
463 for global_id in migrated_storage_collections {
464 if !global_id_shards.contains_key(&global_id) {
465 let shard_id = ShardId::new();
466 global_id_shards.insert(global_id, shard_id);
467 let table_key = TableKey {
468 global_id,
469 build_version: build_version.clone(),
470 };
471 migrated_shard_updates.push(((table_key, shard_id), upper, 1));
472 }
473 }
474
475 let upper = if !migrated_shard_updates.is_empty() {
478 write_to_migration_shard(
479 migrated_shard_updates,
480 upper,
481 &mut write_handle,
482 &mut since_handle,
483 )
484 .await?
485 } else {
486 upper
487 };
488
489 let migrated_storage_collections_0dt = {
491 let txn: &mut dyn StorageTxn<Timestamp> = txn;
492 let storage_collection_metadata = txn.get_collection_metadata();
493 let global_id_shards: BTreeMap<_, _> = global_id_shards
494 .into_iter()
495 .map(|(global_id, shard_id)| (GlobalId::System(global_id), shard_id))
496 .filter(|(global_id, shard_id)| {
497 storage_collection_metadata.get(global_id) != Some(shard_id)
498 })
499 .collect();
500 let global_ids: BTreeSet<_> = global_id_shards.keys().cloned().collect();
501 let mut old_shard_ids: BTreeSet<_> = txn
502 .delete_collection_metadata(global_ids.clone())
503 .into_iter()
504 .map(|(_, shard_id)| shard_id)
505 .collect();
506 old_shard_ids.extend(
507 migration_shards_to_finalize
508 .iter()
509 .map(|(_, shard_id)| shard_id),
510 );
511 txn.insert_unfinalized_shards(old_shard_ids).map_err(|e| {
512 Error::new(ErrorKind::Internal(format!(
513 "builtin migration failed: {e}"
514 )))
515 })?;
516 txn.insert_collection_metadata(global_id_shards)
517 .map_err(|e| {
518 Error::new(ErrorKind::Internal(format!(
519 "builtin migration failed: {e}"
520 )))
521 })?;
522 global_ids
523 };
524
525 let migrated_storage_collections_0dt = migrated_storage_collections_0dt
527 .into_iter()
528 .map(|gid| state.get_entry_by_global_id(&gid).id())
529 .collect();
530
531 let updates = txn.get_and_commit_op_updates();
532 let builtin_table_updates = state
533 .apply_updates_for_bootstrap(updates, local_expr_cache)
534 .await;
535
536 let cleanup_action = async move {
537 if !read_only {
538 let updates: Vec<_> = migration_shards_to_finalize
539 .into_iter()
540 .map(|(table_key, shard_id)| ((table_key, shard_id), upper, -1))
541 .collect();
542 if !updates.is_empty() {
543 let res =
547 write_to_migration_shard(updates, upper, &mut write_handle, &mut since_handle)
548 .await;
549 if let Err(e) = res {
550 error!("Unable to remove old entries from migration shard: {e:?}");
551 }
552 }
553 }
554 }
555 .boxed();
556
557 Ok(BuiltinItemMigrationResult {
558 builtin_table_updates,
559 migrated_storage_collections_0dt,
560 cleanup_action,
561 })
562}
563
564async fn fetch_upper(
565 write_handle: &mut WriteHandle<TableKey, ShardId, Timestamp, StorageDiff>,
566) -> Timestamp {
567 write_handle
568 .fetch_recent_upper()
569 .await
570 .as_option()
571 .cloned()
572 .expect("we use a totally ordered time and never finalize the shard")
573}
574
575async fn write_to_migration_shard(
576 updates: Vec<((TableKey, ShardId), Timestamp, StorageDiff)>,
577 upper: Timestamp,
578 write_handle: &mut WriteHandle<TableKey, ShardId, Timestamp, StorageDiff>,
579 since_handle: &mut SinceHandle<TableKey, ShardId, Timestamp, StorageDiff, i64>,
580) -> Result<Timestamp, Error> {
581 let next_upper = upper.step_forward();
582 let downgrade_to = Antichain::from_elem(next_upper.saturating_sub(1));
584 let next_upper_antichain = Antichain::from_elem(next_upper);
585
586 if let Err(err) = write_handle
587 .compare_and_append(updates, Antichain::from_elem(upper), next_upper_antichain)
588 .await
589 .expect("invalid usage")
590 {
591 return Err(Error::new(ErrorKind::Durable(DurableCatalogError::Fence(
592 FenceError::migration(err),
593 ))));
594 }
595
596 let opaque = *since_handle.opaque();
601 let downgrade = since_handle
602 .maybe_compare_and_downgrade_since(&opaque, (&opaque, &downgrade_to))
603 .await;
604 match downgrade {
605 None => {}
606 Some(Err(e)) => soft_panic_or_log!("found opaque value {e}, but expected {opaque}"),
607 Some(Ok(updated)) => soft_assert_or_log!(
608 updated == downgrade_to,
609 "updated bound ({updated:?}) should match expected ({downgrade_to:?})"
610 ),
611 }
612
613 Ok(next_upper)
614}
615
616mod persist_schema {
617 use std::num::ParseIntError;
618
619 use arrow::array::{StringArray, StringBuilder};
620 use bytes::{BufMut, Bytes};
621 use mz_persist_types::Codec;
622 use mz_persist_types::codec_impls::{
623 SimpleColumnarData, SimpleColumnarDecoder, SimpleColumnarEncoder,
624 };
625 use mz_persist_types::columnar::Schema;
626 use mz_persist_types::stats::NoneStats;
627
628 #[derive(Debug, Clone, Eq, Ord, PartialEq, PartialOrd)]
629 pub(super) struct TableKey {
630 pub(super) global_id: u64,
631 pub(super) build_version: semver::Version,
632 }
633
634 impl std::fmt::Display for TableKey {
635 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
636 write!(f, "{}-{}", self.global_id, self.build_version)
637 }
638 }
639
640 impl std::str::FromStr for TableKey {
641 type Err = String;
642
643 fn from_str(s: &str) -> Result<Self, Self::Err> {
644 let parts: Vec<_> = s.splitn(2, '-').collect();
645 let &[global_id, build_version] = parts.as_slice() else {
646 return Err(format!("invalid TableKey '{s}'"));
647 };
648 let global_id = global_id
649 .parse()
650 .map_err(|e: ParseIntError| e.to_string())?;
651 let build_version = build_version
652 .parse()
653 .map_err(|e: semver::Error| e.to_string())?;
654 Ok(TableKey {
655 global_id,
656 build_version,
657 })
658 }
659 }
660
661 impl From<TableKey> for String {
662 fn from(table_key: TableKey) -> Self {
663 table_key.to_string()
664 }
665 }
666
667 impl TryFrom<String> for TableKey {
668 type Error = String;
669
670 fn try_from(s: String) -> Result<Self, Self::Error> {
671 s.parse()
672 }
673 }
674
675 impl Default for TableKey {
676 fn default() -> Self {
677 Self {
678 global_id: Default::default(),
679 build_version: semver::Version::new(0, 0, 0),
680 }
681 }
682 }
683
684 impl Codec for TableKey {
685 type Storage = ();
686 type Schema = TableKeySchema;
687 fn codec_name() -> String {
688 "TableKey".into()
689 }
690 fn encode<B: BufMut>(&self, buf: &mut B) {
691 buf.put(self.to_string().as_bytes())
692 }
693 fn decode<'a>(buf: &'a [u8], _schema: &TableKeySchema) -> Result<Self, String> {
694 let table_key = String::from_utf8(buf.to_owned()).map_err(|err| err.to_string())?;
695 table_key.parse()
696 }
697 fn encode_schema(_schema: &Self::Schema) -> Bytes {
698 Bytes::new()
699 }
700 fn decode_schema(buf: &Bytes) -> Self::Schema {
701 assert_eq!(*buf, Bytes::new());
702 TableKeySchema
703 }
704 }
705
706 impl SimpleColumnarData for TableKey {
707 type ArrowBuilder = StringBuilder;
708 type ArrowColumn = StringArray;
709
710 fn goodbytes(builder: &Self::ArrowBuilder) -> usize {
711 builder.values_slice().len()
712 }
713
714 fn push(&self, builder: &mut Self::ArrowBuilder) {
715 builder.append_value(&self.to_string());
716 }
717 fn push_null(builder: &mut Self::ArrowBuilder) {
718 builder.append_null();
719 }
720 fn read(&mut self, idx: usize, column: &Self::ArrowColumn) {
721 *self = column.value(idx).parse().expect("should be valid TableKey");
722 }
723 }
724
725 #[derive(Debug, PartialEq)]
727 pub(super) struct TableKeySchema;
728
729 impl Schema<TableKey> for TableKeySchema {
730 type ArrowColumn = StringArray;
731 type Statistics = NoneStats;
732
733 type Decoder = SimpleColumnarDecoder<TableKey>;
734 type Encoder = SimpleColumnarEncoder<TableKey>;
735
736 fn encoder(&self) -> Result<Self::Encoder, anyhow::Error> {
737 Ok(SimpleColumnarEncoder::default())
738 }
739
740 fn decoder(&self, col: Self::ArrowColumn) -> Result<Self::Decoder, anyhow::Error> {
741 Ok(SimpleColumnarDecoder::new(col))
742 }
743 }
744}