1use std::collections::{BTreeMap, BTreeSet};
13use std::sync::Arc;
14
15use futures::FutureExt;
16use futures::future::BoxFuture;
17use mz_adapter_types::dyncfgs::ENABLE_BUILTIN_MIGRATION_SCHEMA_EVOLUTION;
18use mz_catalog::SYSTEM_CONN_ID;
19use mz_catalog::builtin::{BUILTINS, BuiltinTable, Fingerprint};
20use mz_catalog::config::BuiltinItemMigrationConfig;
21use mz_catalog::durable::objects::SystemObjectUniqueIdentifier;
22use mz_catalog::durable::{
23    DurableCatalogError, FenceError, SystemObjectDescription, SystemObjectMapping, Transaction,
24};
25use mz_catalog::memory::error::{Error, ErrorKind};
26use mz_catalog::memory::objects::CatalogItem;
27use mz_ore::collections::CollectionExt;
28use mz_ore::{halt, soft_assert_or_log, soft_panic_or_log};
29use mz_persist_client::cfg::USE_CRITICAL_SINCE_CATALOG;
30use mz_persist_client::critical::SinceHandle;
31use mz_persist_client::read::ReadHandle;
32use mz_persist_client::schema::CaESchema;
33use mz_persist_client::write::WriteHandle;
34use mz_persist_client::{Diagnostics, PersistClient};
35use mz_persist_types::ShardId;
36use mz_persist_types::codec_impls::{ShardIdSchema, UnitSchema};
37use mz_repr::{CatalogItemId, GlobalId, Timestamp};
38use mz_sql::catalog::CatalogItem as _;
39use mz_storage_client::controller::StorageTxn;
40use mz_storage_types::StorageDiff;
41use mz_storage_types::sources::SourceData;
42use timely::progress::{Antichain, Timestamp as TimelyTimestamp};
43use tracing::{debug, error, info};
44
45use crate::catalog::open::builtin_item_migration::persist_schema::{TableKey, TableKeySchema};
46use crate::catalog::state::LocalExpressionCache;
47use crate::catalog::{BuiltinTableUpdate, CatalogState};
48
49pub(crate) struct BuiltinItemMigrationResult {
51    pub(crate) builtin_table_updates: Vec<BuiltinTableUpdate<&'static BuiltinTable>>,
53    pub(crate) migrated_storage_collections_0dt: BTreeSet<CatalogItemId>,
55    pub(crate) cleanup_action: BoxFuture<'static, ()>,
57}
58
59pub(crate) async fn migrate_builtin_items(
70    state: &mut CatalogState,
71    txn: &mut Transaction<'_>,
72    local_expr_cache: &mut LocalExpressionCache,
73    migrated_builtins: Vec<CatalogItemId>,
74    BuiltinItemMigrationConfig {
75        persist_client,
76        read_only,
77    }: BuiltinItemMigrationConfig,
78) -> Result<BuiltinItemMigrationResult, Error> {
79    assert_eq!(
80        read_only,
81        txn.is_savepoint(),
82        "txn must be in savepoint mode when read_only is true, and in writable mode otherwise",
83    );
84
85    update_catalog_fingerprints(state, txn, &migrated_builtins)?;
86
87    let mut collections_to_migrate: Vec<_> = migrated_builtins
89        .into_iter()
90        .filter_map(|id| {
91            use CatalogItem::*;
92            match &state.get_entry(&id).item() {
93                Table(table) => Some(table.global_ids().into_element()),
94                Source(source) => Some(source.global_id()),
95                MaterializedView(mv) => Some(mv.global_id()),
96                ContinualTask(ct) => Some(ct.global_id()),
97                Log(_) | Sink(_) | View(_) | Index(_) | Type(_) | Func(_) | Secret(_)
98                | Connection(_) => None,
99            }
100        })
101        .collect();
102
103    if ENABLE_BUILTIN_MIGRATION_SCHEMA_EVOLUTION.get(state.system_config().dyncfgs()) {
110        collections_to_migrate =
111            try_evolve_persist_schemas(state, txn, collections_to_migrate, &persist_client).await?;
112    } else {
113        info!("skipping builtin migration by schema evolution");
114    }
115
116    migrate_builtin_collections_incompatible(
120        state,
121        txn,
122        local_expr_cache,
123        persist_client,
124        collections_to_migrate,
125        read_only,
126    )
127    .await
128}
129
130fn update_catalog_fingerprints(
132    state: &CatalogState,
133    txn: &mut Transaction<'_>,
134    migrated_builtins: &[CatalogItemId],
135) -> Result<(), Error> {
136    let id_fingerprint_map: BTreeMap<_, _> = BUILTINS::iter(&state.config().builtins_cfg)
137        .map(|builtin| {
138            let id = state.resolve_builtin_object(builtin);
139            let fingerprint = builtin.fingerprint();
140            (id, fingerprint)
141        })
142        .collect();
143    let mut migrated_system_object_mappings = BTreeMap::new();
144    for item_id in migrated_builtins {
145        let fingerprint = id_fingerprint_map
146            .get(item_id)
147            .expect("missing fingerprint");
148        let entry = state.get_entry(item_id);
149        let schema_name = state
150            .get_schema(
151                &entry.name().qualifiers.database_spec,
152                &entry.name().qualifiers.schema_spec,
153                entry.conn_id().unwrap_or(&SYSTEM_CONN_ID),
154            )
155            .name
156            .schema
157            .as_str();
158        let global_id = state.get_entry(item_id).global_ids().into_element();
160
161        migrated_system_object_mappings.insert(
162            *item_id,
163            SystemObjectMapping {
164                description: SystemObjectDescription {
165                    schema_name: schema_name.to_string(),
166                    object_type: entry.item_type(),
167                    object_name: entry.name().item.clone(),
168                },
169                unique_identifier: SystemObjectUniqueIdentifier {
170                    catalog_id: *item_id,
171                    global_id,
172                    fingerprint: fingerprint.clone(),
173                },
174            },
175        );
176    }
177    txn.update_system_object_mappings(migrated_system_object_mappings)?;
178
179    Ok(())
180}
181
182async fn try_evolve_persist_schemas(
192    state: &CatalogState,
193    txn: &Transaction<'_>,
194    migrated_storage_collections: Vec<GlobalId>,
195    persist_client: &PersistClient,
196) -> Result<Vec<GlobalId>, Error> {
197    let collection_metadata = txn.get_collection_metadata();
198
199    let mut failed = Vec::new();
200    for id in migrated_storage_collections {
201        let Some(&shard_id) = collection_metadata.get(&id) else {
202            return Err(Error::new(ErrorKind::Internal(format!(
203                "builtin migration: missing metadata for builtin collection {id}"
204            ))));
205        };
206
207        let diagnostics = Diagnostics {
208            shard_name: id.to_string(),
209            handle_purpose: "migrate builtin schema".to_string(),
210        };
211        let Some((old_schema_id, old_schema, _)) = persist_client
212            .latest_schema::<SourceData, (), Timestamp, StorageDiff>(shard_id, diagnostics.clone())
213            .await
214            .expect("invalid usage")
215        else {
216            info!(%id, "builtin schema evolution failed: missing latest schema");
220            failed.push(id);
221            continue;
222        };
223
224        let entry = state.get_entry_by_global_id(&id);
225        let Some(new_schema) = entry.desc_opt_latest() else {
226            return Err(Error::new(ErrorKind::Internal(format!(
227                "builtin migration: missing new schema for builtin collection {id}"
228            ))));
229        };
230
231        info!(%id, ?old_schema, ?new_schema, "attempting builtin schema evolution");
232
233        let result = persist_client
234            .compare_and_evolve_schema::<SourceData, (), Timestamp, StorageDiff>(
235                shard_id,
236                old_schema_id,
237                &new_schema,
238                &UnitSchema,
239                diagnostics,
240            )
241            .await
242            .expect("invalid usage");
243
244        match result {
245            CaESchema::Ok(_) => {
246                info!("builtin schema evolution succeeded");
247            }
248            CaESchema::Incompatible => {
249                info!("builtin schema evolution failed: incompatible");
250                failed.push(id);
251            }
252            CaESchema::ExpectedMismatch { schema_id, .. } => {
253                return Err(Error::new(ErrorKind::Internal(format!(
254                    "builtin migration: unexpected schema mismatch ({} != {})",
255                    schema_id, old_schema_id,
256                ))));
257            }
258        }
259    }
260
261    Ok(failed)
262}
263
264async fn migrate_builtin_collections_incompatible(
311    state: &mut CatalogState,
312    txn: &mut Transaction<'_>,
313    local_expr_cache: &mut LocalExpressionCache,
314    persist_client: PersistClient,
315    migrated_storage_collections: Vec<GlobalId>,
316    read_only: bool,
317) -> Result<BuiltinItemMigrationResult, Error> {
318    let build_version = state.config.build_info.semver_version();
319
320    let migrated_storage_collections: Vec<_> = migrated_storage_collections
323        .into_iter()
324        .map(|gid| match gid {
325            GlobalId::System(raw) => raw,
326            _ => panic!("builtins must have system IDs"),
327        })
328        .collect();
329
330    let organization_id = state.config.environment_id.organization_id();
332    let shard_id = txn
333        .get_builtin_migration_shard()
334        .expect("builtin migration shard should exist for opened catalogs");
335    let diagnostics = Diagnostics {
336        shard_name: "builtin_migration".to_string(),
337        handle_purpose: format!(
338            "builtin table migration shard for org {organization_id:?} version {build_version:?}"
339        ),
340    };
341    let mut since_handle: SinceHandle<TableKey, ShardId, Timestamp, StorageDiff, i64> =
342        persist_client
343            .open_critical_since(
344                shard_id,
345                PersistClient::CONTROLLER_CRITICAL_SINCE,
348                diagnostics.clone(),
349            )
350            .await
351            .expect("invalid usage");
352    let (mut write_handle, mut read_handle): (
353        WriteHandle<TableKey, ShardId, Timestamp, StorageDiff>,
354        ReadHandle<TableKey, ShardId, Timestamp, StorageDiff>,
355    ) = persist_client
356        .open(
357            shard_id,
358            Arc::new(TableKeySchema),
359            Arc::new(ShardIdSchema),
360            diagnostics,
361            USE_CRITICAL_SINCE_CATALOG.get(persist_client.dyncfgs()),
362        )
363        .await
364        .expect("invalid usage");
365    const EMPTY_UPDATES: &[((TableKey, ShardId), Timestamp, StorageDiff)] = &[];
367    let res = write_handle
368        .compare_and_append(
369            EMPTY_UPDATES,
370            Antichain::from_elem(Timestamp::minimum()),
371            Antichain::from_elem(Timestamp::minimum().step_forward()),
372        )
373        .await
374        .expect("invalid usage");
375    if let Err(e) = res {
376        debug!("migration shard already initialized: {e:?}");
377    }
378
379    let upper = fetch_upper(&mut write_handle).await;
383    let as_of = upper.checked_sub(1).ok_or_else(|| {
385        Error::new(ErrorKind::Internal(format!(
386            "builtin migration failed, unexpected upper: {upper:?}"
387        )))
388    })?;
389    let since = read_handle.since();
390    assert!(
391        since.less_equal(&as_of),
392        "since={since:?}, as_of={as_of:?}; since must be less than or equal to as_of"
393    );
394    let as_of = Antichain::from_elem(as_of);
395    let snapshot = read_handle
396        .snapshot_and_fetch(as_of)
397        .await
398        .expect("we have advanced the as_of by the since");
399    soft_assert_or_log!(
400        snapshot.iter().all(|(_, _, diff)| *diff == 1),
401        "snapshot_and_fetch guarantees a consolidated result: {snapshot:?}"
402    );
403    let mut global_id_shards: BTreeMap<_, _> = snapshot
404        .into_iter()
405        .map(|((key, value), _ts, _diff)| {
406            let table_key = key.expect("persist decoding error");
407            let shard_id = value.expect("persist decoding error");
408            (table_key, shard_id)
409        })
410        .collect();
411
412    let mut migrated_shard_updates: Vec<((TableKey, ShardId), Timestamp, StorageDiff)> = Vec::new();
414    let mut migration_shards_to_finalize = BTreeSet::new();
415    let storage_collection_metadata = txn.get_collection_metadata();
416    for (table_key, shard_id) in global_id_shards.clone() {
417        if table_key.build_version > build_version {
418            halt!(
419                "saw build version {}, which is greater than current build version {}",
420                table_key.build_version,
421                build_version
422            );
423        }
424
425        if !migrated_storage_collections.contains(&table_key.global_id)
426            || table_key.build_version < build_version
427        {
428            global_id_shards.remove(&table_key);
429            if storage_collection_metadata.get(&GlobalId::System(table_key.global_id))
430                == Some(&shard_id)
431            {
432                migrated_shard_updates.push(((table_key, shard_id.clone()), upper, -1));
433            } else {
434                migration_shards_to_finalize.insert((table_key, shard_id));
435            }
436        }
437    }
438
439    let mut global_id_shards: BTreeMap<_, _> = global_id_shards
441        .into_iter()
442        .map(|(table_key, shard_id)| (table_key.global_id, shard_id))
443        .collect();
444    for global_id in migrated_storage_collections {
445        if !global_id_shards.contains_key(&global_id) {
446            let shard_id = ShardId::new();
447            global_id_shards.insert(global_id, shard_id);
448            let table_key = TableKey {
449                global_id,
450                build_version: build_version.clone(),
451            };
452            migrated_shard_updates.push(((table_key, shard_id), upper, 1));
453        }
454    }
455
456    let upper = if !migrated_shard_updates.is_empty() {
459        write_to_migration_shard(
460            migrated_shard_updates,
461            upper,
462            &mut write_handle,
463            &mut since_handle,
464        )
465        .await?
466    } else {
467        upper
468    };
469
470    let migrated_storage_collections_0dt = {
472        let txn: &mut dyn StorageTxn<Timestamp> = txn;
473        let storage_collection_metadata = txn.get_collection_metadata();
474        let global_id_shards: BTreeMap<_, _> = global_id_shards
475            .into_iter()
476            .map(|(global_id, shard_id)| (GlobalId::System(global_id), shard_id))
477            .filter(|(global_id, shard_id)| {
478                storage_collection_metadata.get(global_id) != Some(shard_id)
479            })
480            .collect();
481        let global_ids: BTreeSet<_> = global_id_shards.keys().cloned().collect();
482        let mut old_shard_ids: BTreeSet<_> = txn
483            .delete_collection_metadata(global_ids.clone())
484            .into_iter()
485            .map(|(_, shard_id)| shard_id)
486            .collect();
487        old_shard_ids.extend(
488            migration_shards_to_finalize
489                .iter()
490                .map(|(_, shard_id)| shard_id),
491        );
492        txn.insert_unfinalized_shards(old_shard_ids).map_err(|e| {
493            Error::new(ErrorKind::Internal(format!(
494                "builtin migration failed: {e}"
495            )))
496        })?;
497        txn.insert_collection_metadata(global_id_shards)
498            .map_err(|e| {
499                Error::new(ErrorKind::Internal(format!(
500                    "builtin migration failed: {e}"
501                )))
502            })?;
503        global_ids
504    };
505
506    let migrated_storage_collections_0dt = migrated_storage_collections_0dt
508        .into_iter()
509        .map(|gid| state.get_entry_by_global_id(&gid).id())
510        .collect();
511
512    let updates = txn.get_and_commit_op_updates();
513    let builtin_table_updates = state
514        .apply_updates_for_bootstrap(updates, local_expr_cache)
515        .await;
516
517    let cleanup_action = async move {
518        if !read_only {
519            let updates: Vec<_> = migration_shards_to_finalize
520                .into_iter()
521                .map(|(table_key, shard_id)| ((table_key, shard_id), upper, -1))
522                .collect();
523            if !updates.is_empty() {
524                let res =
528                    write_to_migration_shard(updates, upper, &mut write_handle, &mut since_handle)
529                        .await;
530                if let Err(e) = res {
531                    error!("Unable to remove old entries from migration shard: {e:?}");
532                }
533            }
534        }
535    }
536    .boxed();
537
538    Ok(BuiltinItemMigrationResult {
539        builtin_table_updates,
540        migrated_storage_collections_0dt,
541        cleanup_action,
542    })
543}
544
545async fn fetch_upper(
546    write_handle: &mut WriteHandle<TableKey, ShardId, Timestamp, StorageDiff>,
547) -> Timestamp {
548    write_handle
549        .fetch_recent_upper()
550        .await
551        .as_option()
552        .cloned()
553        .expect("we use a totally ordered time and never finalize the shard")
554}
555
556async fn write_to_migration_shard(
557    updates: Vec<((TableKey, ShardId), Timestamp, StorageDiff)>,
558    upper: Timestamp,
559    write_handle: &mut WriteHandle<TableKey, ShardId, Timestamp, StorageDiff>,
560    since_handle: &mut SinceHandle<TableKey, ShardId, Timestamp, StorageDiff, i64>,
561) -> Result<Timestamp, Error> {
562    let next_upper = upper.step_forward();
563    let downgrade_to = Antichain::from_elem(next_upper.saturating_sub(1));
565    let next_upper_antichain = Antichain::from_elem(next_upper);
566
567    if let Err(err) = write_handle
568        .compare_and_append(updates, Antichain::from_elem(upper), next_upper_antichain)
569        .await
570        .expect("invalid usage")
571    {
572        return Err(Error::new(ErrorKind::Durable(DurableCatalogError::Fence(
573            FenceError::migration(err),
574        ))));
575    }
576
577    let opaque = *since_handle.opaque();
582    let downgrade = since_handle
583        .maybe_compare_and_downgrade_since(&opaque, (&opaque, &downgrade_to))
584        .await;
585    match downgrade {
586        None => {}
587        Some(Err(e)) => soft_panic_or_log!("found opaque value {e}, but expected {opaque}"),
588        Some(Ok(updated)) => soft_assert_or_log!(
589            updated == downgrade_to,
590            "updated bound ({updated:?}) should match expected ({downgrade_to:?})"
591        ),
592    }
593
594    Ok(next_upper)
595}
596
597mod persist_schema {
598    use std::num::ParseIntError;
599
600    use arrow::array::{StringArray, StringBuilder};
601    use bytes::{BufMut, Bytes};
602    use mz_persist_types::Codec;
603    use mz_persist_types::codec_impls::{
604        SimpleColumnarData, SimpleColumnarDecoder, SimpleColumnarEncoder,
605    };
606    use mz_persist_types::columnar::Schema;
607    use mz_persist_types::stats::NoneStats;
608
609    #[derive(Debug, Clone, Eq, Ord, PartialEq, PartialOrd)]
610    pub(super) struct TableKey {
611        pub(super) global_id: u64,
612        pub(super) build_version: semver::Version,
613    }
614
615    impl std::fmt::Display for TableKey {
616        fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
617            write!(f, "{}-{}", self.global_id, self.build_version)
618        }
619    }
620
621    impl std::str::FromStr for TableKey {
622        type Err = String;
623
624        fn from_str(s: &str) -> Result<Self, Self::Err> {
625            let parts: Vec<_> = s.splitn(2, '-').collect();
626            let &[global_id, build_version] = parts.as_slice() else {
627                return Err(format!("invalid TableKey '{s}'"));
628            };
629            let global_id = global_id
630                .parse()
631                .map_err(|e: ParseIntError| e.to_string())?;
632            let build_version = build_version
633                .parse()
634                .map_err(|e: semver::Error| e.to_string())?;
635            Ok(TableKey {
636                global_id,
637                build_version,
638            })
639        }
640    }
641
642    impl From<TableKey> for String {
643        fn from(table_key: TableKey) -> Self {
644            table_key.to_string()
645        }
646    }
647
648    impl TryFrom<String> for TableKey {
649        type Error = String;
650
651        fn try_from(s: String) -> Result<Self, Self::Error> {
652            s.parse()
653        }
654    }
655
656    impl Default for TableKey {
657        fn default() -> Self {
658            Self {
659                global_id: Default::default(),
660                build_version: semver::Version::new(0, 0, 0),
661            }
662        }
663    }
664
665    impl Codec for TableKey {
666        type Storage = ();
667        type Schema = TableKeySchema;
668        fn codec_name() -> String {
669            "TableKey".into()
670        }
671        fn encode<B: BufMut>(&self, buf: &mut B) {
672            buf.put(self.to_string().as_bytes())
673        }
674        fn decode<'a>(buf: &'a [u8], _schema: &TableKeySchema) -> Result<Self, String> {
675            let table_key = String::from_utf8(buf.to_owned()).map_err(|err| err.to_string())?;
676            table_key.parse()
677        }
678        fn encode_schema(_schema: &Self::Schema) -> Bytes {
679            Bytes::new()
680        }
681        fn decode_schema(buf: &Bytes) -> Self::Schema {
682            assert_eq!(*buf, Bytes::new());
683            TableKeySchema
684        }
685    }
686
687    impl SimpleColumnarData for TableKey {
688        type ArrowBuilder = StringBuilder;
689        type ArrowColumn = StringArray;
690
691        fn goodbytes(builder: &Self::ArrowBuilder) -> usize {
692            builder.values_slice().len()
693        }
694
695        fn push(&self, builder: &mut Self::ArrowBuilder) {
696            builder.append_value(&self.to_string());
697        }
698        fn push_null(builder: &mut Self::ArrowBuilder) {
699            builder.append_null();
700        }
701        fn read(&mut self, idx: usize, column: &Self::ArrowColumn) {
702            *self = column.value(idx).parse().expect("should be valid TableKey");
703        }
704    }
705
706    #[derive(Debug, PartialEq)]
708    pub(super) struct TableKeySchema;
709
710    impl Schema<TableKey> for TableKeySchema {
711        type ArrowColumn = StringArray;
712        type Statistics = NoneStats;
713
714        type Decoder = SimpleColumnarDecoder<TableKey>;
715        type Encoder = SimpleColumnarEncoder<TableKey>;
716
717        fn encoder(&self) -> Result<Self::Encoder, anyhow::Error> {
718            Ok(SimpleColumnarEncoder::default())
719        }
720
721        fn decoder(&self, col: Self::ArrowColumn) -> Result<Self::Decoder, anyhow::Error> {
722            Ok(SimpleColumnarDecoder::new(col))
723        }
724    }
725}