1use std::collections::{BTreeMap, BTreeSet};
13use std::sync::Arc;
14
15use futures::FutureExt;
16use futures::future::BoxFuture;
17use mz_adapter_types::dyncfgs::ENABLE_BUILTIN_MIGRATION_SCHEMA_EVOLUTION;
18use mz_catalog::SYSTEM_CONN_ID;
19use mz_catalog::builtin::{BUILTINS, BuiltinTable, Fingerprint};
20use mz_catalog::config::BuiltinItemMigrationConfig;
21use mz_catalog::durable::objects::SystemObjectUniqueIdentifier;
22use mz_catalog::durable::{
23 DurableCatalogError, FenceError, SystemObjectDescription, SystemObjectMapping, Transaction,
24};
25use mz_catalog::memory::error::{Error, ErrorKind};
26use mz_catalog::memory::objects::CatalogItem;
27use mz_ore::collections::CollectionExt;
28use mz_ore::{halt, soft_assert_or_log, soft_panic_or_log};
29use mz_persist_client::cfg::USE_CRITICAL_SINCE_CATALOG;
30use mz_persist_client::critical::SinceHandle;
31use mz_persist_client::read::ReadHandle;
32use mz_persist_client::schema::CaESchema;
33use mz_persist_client::write::WriteHandle;
34use mz_persist_client::{Diagnostics, PersistClient};
35use mz_persist_types::ShardId;
36use mz_persist_types::codec_impls::{ShardIdSchema, UnitSchema};
37use mz_repr::{CatalogItemId, GlobalId, Timestamp};
38use mz_sql::catalog::CatalogItem as _;
39use mz_storage_client::controller::StorageTxn;
40use mz_storage_types::StorageDiff;
41use mz_storage_types::sources::SourceData;
42use timely::progress::{Antichain, Timestamp as TimelyTimestamp};
43use tracing::{debug, error, info, warn};
44
45use crate::catalog::open::builtin_item_migration::persist_schema::{TableKey, TableKeySchema};
46use crate::catalog::state::LocalExpressionCache;
47use crate::catalog::{BuiltinTableUpdate, CatalogState};
48
49pub(crate) struct BuiltinItemMigrationResult {
51 pub(crate) builtin_table_updates: Vec<BuiltinTableUpdate<&'static BuiltinTable>>,
53 pub(crate) migrated_storage_collections_0dt: BTreeSet<CatalogItemId>,
55 pub(crate) cleanup_action: BoxFuture<'static, ()>,
57}
58
59pub(crate) async fn migrate_builtin_items(
70 state: &mut CatalogState,
71 txn: &mut Transaction<'_>,
72 local_expr_cache: &mut LocalExpressionCache,
73 migrated_builtins: Vec<CatalogItemId>,
74 BuiltinItemMigrationConfig {
75 persist_client,
76 read_only,
77 }: BuiltinItemMigrationConfig,
78) -> Result<BuiltinItemMigrationResult, Error> {
79 assert_eq!(
80 read_only,
81 txn.is_savepoint(),
82 "txn must be in savepoint mode when read_only is true, and in writable mode otherwise",
83 );
84
85 update_catalog_fingerprints(state, txn, &migrated_builtins)?;
86
87 let mut collections_to_migrate: Vec<_> = migrated_builtins
89 .into_iter()
90 .filter_map(|id| {
91 use CatalogItem::*;
92 match &state.get_entry(&id).item() {
93 Table(table) => Some(table.global_ids().into_element()),
94 Source(source) => Some(source.global_id()),
95 MaterializedView(mv) => Some(mv.global_id()),
96 ContinualTask(ct) => Some(ct.global_id()),
97 Log(_) | Sink(_) | View(_) | Index(_) | Type(_) | Func(_) | Secret(_)
98 | Connection(_) => None,
99 }
100 })
101 .collect();
102
103 if ENABLE_BUILTIN_MIGRATION_SCHEMA_EVOLUTION.get(state.system_config().dyncfgs()) {
110 collections_to_migrate =
111 try_evolve_persist_schemas(state, txn, collections_to_migrate, &persist_client).await?;
112 } else {
113 info!("skipping builtin migration by schema evolution");
114 }
115
116 migrate_builtin_collections_incompatible(
120 state,
121 txn,
122 local_expr_cache,
123 persist_client,
124 collections_to_migrate,
125 read_only,
126 )
127 .await
128}
129
130fn update_catalog_fingerprints(
132 state: &CatalogState,
133 txn: &mut Transaction<'_>,
134 migrated_builtins: &[CatalogItemId],
135) -> Result<(), Error> {
136 let id_fingerprint_map: BTreeMap<_, _> = BUILTINS::iter(&state.config().builtins_cfg)
137 .map(|builtin| {
138 let id = state.resolve_builtin_object(builtin);
139 let fingerprint = builtin.fingerprint();
140 (id, fingerprint)
141 })
142 .collect();
143 let mut migrated_system_object_mappings = BTreeMap::new();
144 for item_id in migrated_builtins {
145 let fingerprint = id_fingerprint_map
146 .get(item_id)
147 .expect("missing fingerprint");
148 let entry = state.get_entry(item_id);
149 let schema_name = state
150 .get_schema(
151 &entry.name().qualifiers.database_spec,
152 &entry.name().qualifiers.schema_spec,
153 entry.conn_id().unwrap_or(&SYSTEM_CONN_ID),
154 )
155 .name
156 .schema
157 .as_str();
158 let global_id = state.get_entry(item_id).global_ids().into_element();
160
161 migrated_system_object_mappings.insert(
162 *item_id,
163 SystemObjectMapping {
164 description: SystemObjectDescription {
165 schema_name: schema_name.to_string(),
166 object_type: entry.item_type(),
167 object_name: entry.name().item.clone(),
168 },
169 unique_identifier: SystemObjectUniqueIdentifier {
170 catalog_id: *item_id,
171 global_id,
172 fingerprint: fingerprint.clone(),
173 },
174 },
175 );
176 }
177 txn.update_system_object_mappings(migrated_system_object_mappings)?;
178
179 Ok(())
180}
181
182async fn try_evolve_persist_schemas(
192 state: &CatalogState,
193 txn: &Transaction<'_>,
194 migrated_storage_collections: Vec<GlobalId>,
195 persist_client: &PersistClient,
196) -> Result<Vec<GlobalId>, Error> {
197 let collection_metadata = txn.get_collection_metadata();
198
199 let mut failed = Vec::new();
200 for id in migrated_storage_collections {
201 let Some(&shard_id) = collection_metadata.get(&id) else {
202 return Err(Error::new(ErrorKind::Internal(format!(
203 "builtin migration: missing metadata for builtin collection {id}"
204 ))));
205 };
206
207 let diagnostics = Diagnostics {
208 shard_name: id.to_string(),
209 handle_purpose: "migrate builtin schema".to_string(),
210 };
211 let Some((old_schema_id, old_schema, _)) = persist_client
212 .latest_schema::<SourceData, (), Timestamp, StorageDiff>(shard_id, diagnostics.clone())
213 .await
214 .expect("invalid usage")
215 else {
216 info!(%id, "builtin schema evolution failed: missing latest schema");
220 failed.push(id);
221 continue;
222 };
223
224 let entry = state.get_entry_by_global_id(&id);
225 let Some(new_schema) = entry.desc_opt_latest() else {
226 return Err(Error::new(ErrorKind::Internal(format!(
227 "builtin migration: missing new schema for builtin collection {id}"
228 ))));
229 };
230
231 info!(%id, ?old_schema, ?new_schema, "attempting builtin schema evolution");
232
233 let result = persist_client
234 .compare_and_evolve_schema::<SourceData, (), Timestamp, StorageDiff>(
235 shard_id,
236 old_schema_id,
237 &new_schema,
238 &UnitSchema,
239 diagnostics,
240 )
241 .await
242 .expect("invalid usage");
243
244 match result {
245 CaESchema::Ok(_) => {
246 info!("builtin schema evolution succeeded");
247 }
248 CaESchema::Incompatible => {
249 info!("builtin schema evolution failed: incompatible");
250 failed.push(id);
251 }
252 CaESchema::ExpectedMismatch { schema_id, .. } => {
253 return Err(Error::new(ErrorKind::Internal(format!(
254 "builtin migration: unexpected schema mismatch ({} != {})",
255 schema_id, old_schema_id,
256 ))));
257 }
258 }
259 }
260
261 Ok(failed)
262}
263
264async fn migrate_builtin_collections_incompatible(
311 state: &mut CatalogState,
312 txn: &mut Transaction<'_>,
313 local_expr_cache: &mut LocalExpressionCache,
314 persist_client: PersistClient,
315 migrated_storage_collections: Vec<GlobalId>,
316 read_only: bool,
317) -> Result<BuiltinItemMigrationResult, Error> {
318 let build_version = state.config.build_info.semver_version();
319
320 let migrated_storage_collections: Vec<_> = migrated_storage_collections
323 .into_iter()
324 .map(|gid| match gid {
325 GlobalId::System(raw) => raw,
326 _ => panic!("builtins must have system IDs"),
327 })
328 .collect();
329
330 let organization_id = state.config.environment_id.organization_id();
332 let shard_id = txn
333 .get_builtin_migration_shard()
334 .expect("builtin migration shard should exist for opened catalogs");
335 let diagnostics = Diagnostics {
336 shard_name: "builtin_migration".to_string(),
337 handle_purpose: format!(
338 "builtin table migration shard for org {organization_id:?} version {build_version:?}"
339 ),
340 };
341 let mut since_handle: SinceHandle<TableKey, ShardId, Timestamp, StorageDiff, i64> =
342 persist_client
343 .open_critical_since(
344 shard_id,
345 PersistClient::CONTROLLER_CRITICAL_SINCE,
348 diagnostics.clone(),
349 )
350 .await
351 .expect("invalid usage");
352 let (mut write_handle, mut read_handle): (
353 WriteHandle<TableKey, ShardId, Timestamp, StorageDiff>,
354 ReadHandle<TableKey, ShardId, Timestamp, StorageDiff>,
355 ) = persist_client
356 .open(
357 shard_id,
358 Arc::new(TableKeySchema),
359 Arc::new(ShardIdSchema),
360 diagnostics.clone(),
361 USE_CRITICAL_SINCE_CATALOG.get(persist_client.dyncfgs()),
362 )
363 .await
364 .expect("invalid usage");
365 const EMPTY_UPDATES: &[((TableKey, ShardId), Timestamp, StorageDiff)] = &[];
367 let res = write_handle
368 .compare_and_append(
369 EMPTY_UPDATES,
370 Antichain::from_elem(Timestamp::minimum()),
371 Antichain::from_elem(Timestamp::minimum().step_forward()),
372 )
373 .await
374 .expect("invalid usage");
375 if let Err(e) = res {
376 debug!("migration shard already initialized: {e:?}");
377 }
378
379 let upper = fetch_upper(&mut write_handle).await;
383 let as_of = upper.checked_sub(1).ok_or_else(|| {
385 Error::new(ErrorKind::Internal(format!(
386 "builtin migration failed, unexpected upper: {upper:?}"
387 )))
388 })?;
389 let since = read_handle.since();
390 assert!(
391 since.less_equal(&as_of),
392 "since={since:?}, as_of={as_of:?}; since must be less than or equal to as_of"
393 );
394 let as_of = Antichain::from_elem(as_of);
395 let snapshot = read_handle
396 .snapshot_and_fetch(as_of)
397 .await
398 .expect("we have advanced the as_of by the since");
399 soft_assert_or_log!(
400 snapshot.iter().all(|(_, _, diff)| *diff == 1),
401 "snapshot_and_fetch guarantees a consolidated result: {snapshot:?}"
402 );
403 let mut global_id_shards: BTreeMap<_, _> = snapshot
404 .into_iter()
405 .filter_map(|(data, _ts, _diff)| {
406 if let (Ok(table_key), Ok(shard_id)) = data {
407 Some((table_key, shard_id))
408 } else {
409 warn!("skipping unreadable migration shard entry: {data:?}");
412 None
413 }
414 })
415 .collect();
416
417 let mut migrated_shard_updates: Vec<((TableKey, ShardId), Timestamp, StorageDiff)> = Vec::new();
419 let mut migration_shards_to_finalize = BTreeSet::new();
420 let storage_collection_metadata = txn.get_collection_metadata();
421 for (table_key, shard_id) in global_id_shards.clone() {
422 if table_key.build_version > build_version {
423 if read_only {
424 halt!(
425 "saw build version {}, which is greater than current build version {}",
426 table_key.build_version,
427 build_version
428 );
429 } else {
430 warn!(
436 %table_key.build_version, %build_version,
437 "saw build version which is greater than current build version",
438 );
439 global_id_shards.remove(&table_key);
440 continue;
441 }
442 }
443
444 if !migrated_storage_collections.contains(&table_key.global_id)
445 || table_key.build_version < build_version
446 {
447 global_id_shards.remove(&table_key);
448 if storage_collection_metadata.get(&GlobalId::System(table_key.global_id))
449 == Some(&shard_id)
450 {
451 migrated_shard_updates.push(((table_key, shard_id.clone()), upper, -1));
452 } else {
453 migration_shards_to_finalize.insert((table_key, shard_id));
454 }
455 }
456 }
457
458 let mut global_id_shards: BTreeMap<_, _> = global_id_shards
460 .into_iter()
461 .map(|(table_key, shard_id)| (table_key.global_id, shard_id))
462 .collect();
463 for global_id in migrated_storage_collections {
464 if !global_id_shards.contains_key(&global_id) {
465 let shard_id = ShardId::new();
466 global_id_shards.insert(global_id, shard_id);
467 let table_key = TableKey {
468 global_id,
469 build_version: build_version.clone(),
470 };
471 migrated_shard_updates.push(((table_key, shard_id), upper, 1));
472 }
473 }
474
475 let upper = if !migrated_shard_updates.is_empty() {
478 write_to_migration_shard(
479 migrated_shard_updates,
480 upper,
481 &mut write_handle,
482 &mut since_handle,
483 )
484 .await?
485 } else {
486 upper
487 };
488
489 let migrated_storage_collections_0dt = {
491 let txn: &mut dyn StorageTxn<Timestamp> = txn;
492 let storage_collection_metadata = txn.get_collection_metadata();
493 let global_id_shards: BTreeMap<_, _> = global_id_shards
494 .into_iter()
495 .map(|(global_id, shard_id)| (GlobalId::System(global_id), shard_id))
496 .filter(|(global_id, shard_id)| {
497 storage_collection_metadata.get(global_id) != Some(shard_id)
498 })
499 .collect();
500 let global_ids: BTreeSet<_> = global_id_shards.keys().cloned().collect();
501 let mut old_shard_ids: BTreeSet<_> = txn
502 .delete_collection_metadata(global_ids.clone())
503 .into_iter()
504 .map(|(_, shard_id)| shard_id)
505 .collect();
506 old_shard_ids.extend(
507 migration_shards_to_finalize
508 .iter()
509 .map(|(_, shard_id)| shard_id),
510 );
511 txn.insert_unfinalized_shards(old_shard_ids).map_err(|e| {
512 Error::new(ErrorKind::Internal(format!(
513 "builtin migration failed: {e}"
514 )))
515 })?;
516 txn.insert_collection_metadata(global_id_shards)
517 .map_err(|e| {
518 Error::new(ErrorKind::Internal(format!(
519 "builtin migration failed: {e}"
520 )))
521 })?;
522 global_ids
523 };
524
525 let migrated_storage_collections_0dt = migrated_storage_collections_0dt
527 .into_iter()
528 .map(|gid| state.get_entry_by_global_id(&gid).id())
529 .collect();
530
531 let updates = txn.get_and_commit_op_updates();
532 let builtin_table_updates = state
533 .apply_updates_for_bootstrap(updates, local_expr_cache)
534 .await;
535
536 let cleanup_action = async move {
537 if !read_only {
538 let updates: Vec<_> = migration_shards_to_finalize
539 .into_iter()
540 .map(|(table_key, shard_id)| ((table_key, shard_id), upper, -1))
541 .collect();
542 if !updates.is_empty() {
543 let res =
547 write_to_migration_shard(updates, upper, &mut write_handle, &mut since_handle)
548 .await;
549 if let Err(e) = res {
550 error!("Unable to remove old entries from migration shard: {e:?}");
551 }
552 }
553 persist_client
554 .upgrade_version::<TableKey, ShardId, Timestamp, StorageDiff>(shard_id, diagnostics)
555 .await
556 .expect("valid usage");
557 }
558 }
559 .boxed();
560
561 Ok(BuiltinItemMigrationResult {
562 builtin_table_updates,
563 migrated_storage_collections_0dt,
564 cleanup_action,
565 })
566}
567
568async fn fetch_upper(
569 write_handle: &mut WriteHandle<TableKey, ShardId, Timestamp, StorageDiff>,
570) -> Timestamp {
571 write_handle
572 .fetch_recent_upper()
573 .await
574 .as_option()
575 .cloned()
576 .expect("we use a totally ordered time and never finalize the shard")
577}
578
579async fn write_to_migration_shard(
580 updates: Vec<((TableKey, ShardId), Timestamp, StorageDiff)>,
581 upper: Timestamp,
582 write_handle: &mut WriteHandle<TableKey, ShardId, Timestamp, StorageDiff>,
583 since_handle: &mut SinceHandle<TableKey, ShardId, Timestamp, StorageDiff, i64>,
584) -> Result<Timestamp, Error> {
585 let next_upper = upper.step_forward();
586 let downgrade_to = Antichain::from_elem(next_upper.saturating_sub(1));
588 let next_upper_antichain = Antichain::from_elem(next_upper);
589
590 if let Err(err) = write_handle
591 .compare_and_append(updates, Antichain::from_elem(upper), next_upper_antichain)
592 .await
593 .expect("invalid usage")
594 {
595 return Err(Error::new(ErrorKind::Durable(DurableCatalogError::Fence(
596 FenceError::migration(err),
597 ))));
598 }
599
600 let opaque = *since_handle.opaque();
605 let downgrade = since_handle
606 .maybe_compare_and_downgrade_since(&opaque, (&opaque, &downgrade_to))
607 .await;
608 match downgrade {
609 None => {}
610 Some(Err(e)) => soft_panic_or_log!("found opaque value {e}, but expected {opaque}"),
611 Some(Ok(updated)) => soft_assert_or_log!(
612 updated == downgrade_to,
613 "updated bound ({updated:?}) should match expected ({downgrade_to:?})"
614 ),
615 }
616
617 Ok(next_upper)
618}
619
620mod persist_schema {
621 use std::num::ParseIntError;
622
623 use arrow::array::{StringArray, StringBuilder};
624 use bytes::{BufMut, Bytes};
625 use mz_persist_types::Codec;
626 use mz_persist_types::codec_impls::{
627 SimpleColumnarData, SimpleColumnarDecoder, SimpleColumnarEncoder,
628 };
629 use mz_persist_types::columnar::Schema;
630 use mz_persist_types::stats::NoneStats;
631
632 #[derive(Debug, Clone, Eq, Ord, PartialEq, PartialOrd)]
633 pub(super) struct TableKey {
634 pub(super) global_id: u64,
635 pub(super) build_version: semver::Version,
636 }
637
638 impl std::fmt::Display for TableKey {
639 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
640 write!(f, "{}-{}", self.global_id, self.build_version)
641 }
642 }
643
644 impl std::str::FromStr for TableKey {
645 type Err = String;
646
647 fn from_str(s: &str) -> Result<Self, Self::Err> {
648 let parts: Vec<_> = s.splitn(2, '-').collect();
649 let &[global_id, build_version] = parts.as_slice() else {
650 return Err(format!("invalid TableKey '{s}'"));
651 };
652 let global_id = global_id
653 .parse()
654 .map_err(|e: ParseIntError| e.to_string())?;
655 let build_version = build_version
656 .parse()
657 .map_err(|e: semver::Error| e.to_string())?;
658 Ok(TableKey {
659 global_id,
660 build_version,
661 })
662 }
663 }
664
665 impl From<TableKey> for String {
666 fn from(table_key: TableKey) -> Self {
667 table_key.to_string()
668 }
669 }
670
671 impl TryFrom<String> for TableKey {
672 type Error = String;
673
674 fn try_from(s: String) -> Result<Self, Self::Error> {
675 s.parse()
676 }
677 }
678
679 impl Default for TableKey {
680 fn default() -> Self {
681 Self {
682 global_id: Default::default(),
683 build_version: semver::Version::new(0, 0, 0),
684 }
685 }
686 }
687
688 impl Codec for TableKey {
689 type Storage = ();
690 type Schema = TableKeySchema;
691 fn codec_name() -> String {
692 "TableKey".into()
693 }
694 fn encode<B: BufMut>(&self, buf: &mut B) {
695 buf.put(self.to_string().as_bytes())
696 }
697 fn decode<'a>(buf: &'a [u8], _schema: &TableKeySchema) -> Result<Self, String> {
698 let table_key = String::from_utf8(buf.to_owned()).map_err(|err| err.to_string())?;
699 table_key.parse()
700 }
701 fn encode_schema(_schema: &Self::Schema) -> Bytes {
702 Bytes::new()
703 }
704 fn decode_schema(buf: &Bytes) -> Self::Schema {
705 assert_eq!(*buf, Bytes::new());
706 TableKeySchema
707 }
708 }
709
710 impl SimpleColumnarData for TableKey {
711 type ArrowBuilder = StringBuilder;
712 type ArrowColumn = StringArray;
713
714 fn goodbytes(builder: &Self::ArrowBuilder) -> usize {
715 builder.values_slice().len()
716 }
717
718 fn push(&self, builder: &mut Self::ArrowBuilder) {
719 builder.append_value(&self.to_string());
720 }
721 fn push_null(builder: &mut Self::ArrowBuilder) {
722 builder.append_null();
723 }
724 fn read(&mut self, idx: usize, column: &Self::ArrowColumn) {
725 *self = column.value(idx).parse().expect("should be valid TableKey");
726 }
727 }
728
729 #[derive(Debug, PartialEq)]
731 pub(super) struct TableKeySchema;
732
733 impl Schema<TableKey> for TableKeySchema {
734 type ArrowColumn = StringArray;
735 type Statistics = NoneStats;
736
737 type Decoder = SimpleColumnarDecoder<TableKey>;
738 type Encoder = SimpleColumnarEncoder<TableKey>;
739
740 fn encoder(&self) -> Result<Self::Encoder, anyhow::Error> {
741 Ok(SimpleColumnarEncoder::default())
742 }
743
744 fn decoder(&self, col: Self::ArrowColumn) -> Result<Self::Decoder, anyhow::Error> {
745 Ok(SimpleColumnarDecoder::new(col))
746 }
747 }
748}