1mod builtin_schema_migration;
13
14use std::collections::{BTreeMap, BTreeSet};
15use std::num::NonZeroU32;
16use std::sync::Arc;
17use std::time::{Duration, Instant};
18
19use futures::future::{BoxFuture, FutureExt};
20use itertools::{Either, Itertools};
21use mz_adapter_types::bootstrap_builtin_cluster_config::BootstrapBuiltinClusterConfig;
22use mz_adapter_types::dyncfgs::{ENABLE_CONTINUAL_TASK_BUILTINS, ENABLE_EXPRESSION_CACHE};
23use mz_audit_log::{
24 CreateOrDropClusterReplicaReasonV1, EventDetails, EventType, ObjectType, VersionedEvent,
25};
26use mz_auth::hash::scram256_hash;
27use mz_catalog::SYSTEM_CONN_ID;
28use mz_catalog::builtin::{
29 BUILTIN_CLUSTER_REPLICAS, BUILTIN_CLUSTERS, BUILTIN_PREFIXES, BUILTIN_ROLES, BUILTINS, Builtin,
30 Fingerprint, MZ_CATALOG_RAW, RUNTIME_ALTERABLE_FINGERPRINT_SENTINEL,
31};
32use mz_catalog::config::StateConfig;
33use mz_catalog::durable::objects::{
34 SystemObjectDescription, SystemObjectMapping, SystemObjectUniqueIdentifier,
35};
36use mz_catalog::durable::{ClusterReplica, ClusterVariant, ClusterVariantManaged, Transaction};
37use mz_catalog::expr_cache::{
38 ExpressionCacheConfig, ExpressionCacheHandle, GlobalExpressions, LocalExpressions,
39};
40use mz_catalog::memory::error::{Error, ErrorKind};
41use mz_catalog::memory::objects::{
42 BootstrapStateUpdateKind, CommentsMap, DefaultPrivileges, RoleAuth, StateUpdate,
43};
44use mz_controller::clusters::ReplicaLogging;
45use mz_controller_types::ClusterId;
46use mz_ore::cast::usize_to_u64;
47use mz_ore::collections::HashSet;
48use mz_ore::now::{SYSTEM_TIME, to_datetime};
49use mz_ore::{instrument, soft_assert_no_log};
50use mz_repr::adt::mz_acl_item::PrivilegeMap;
51use mz_repr::namespaces::is_unstable_schema;
52use mz_repr::{CatalogItemId, Diff, GlobalId, Timestamp};
53use mz_sql::catalog::{
54 BuiltinsConfig, CatalogError as SqlCatalogError, CatalogItemType, RoleMembership, RoleVars,
55};
56use mz_sql::func::OP_IMPLS;
57use mz_sql::names::CommentObjectId;
58use mz_sql::rbac;
59use mz_sql::session::user::{MZ_SYSTEM_ROLE_ID, SYSTEM_USER};
60use mz_sql::session::vars::{SessionVars, SystemVars, VarError, VarInput};
61use mz_storage_client::controller::{StorageMetadata, StorageTxn};
62use mz_storage_client::storage_collections::StorageCollections;
63use tracing::{Instrument, info, warn};
64use uuid::Uuid;
65
66use crate::AdapterError;
68use crate::catalog::migrate::{self, get_migration_version, set_migration_version};
69use crate::catalog::state::LocalExpressionCache;
70use crate::catalog::{
71 BuiltinTableUpdate, Catalog, CatalogPlans, CatalogState, Config, is_reserved_name,
72};
73
74pub struct InitializeStateResult {
75 pub state: CatalogState,
77 pub migrated_storage_collections_0dt: BTreeSet<CatalogItemId>,
79 pub new_builtin_collections: BTreeSet<GlobalId>,
81 pub builtin_table_updates: Vec<BuiltinTableUpdate>,
83 pub last_seen_version: String,
85 pub expr_cache_handle: Option<ExpressionCacheHandle>,
87 pub cached_global_exprs: BTreeMap<GlobalId, GlobalExpressions>,
89 pub uncached_local_exprs: BTreeMap<GlobalId, LocalExpressions>,
91}
92
93pub struct OpenCatalogResult {
94 pub catalog: Catalog,
96 pub migrated_storage_collections_0dt: BTreeSet<CatalogItemId>,
98 pub new_builtin_collections: BTreeSet<GlobalId>,
100 pub builtin_table_updates: Vec<BuiltinTableUpdate>,
102 pub cached_global_exprs: BTreeMap<GlobalId, GlobalExpressions>,
104 pub uncached_local_exprs: BTreeMap<GlobalId, LocalExpressions>,
106}
107
108impl Catalog {
109 pub async fn initialize_state<'a>(
113 config: StateConfig,
114 storage: &'a mut Box<dyn mz_catalog::durable::DurableCatalogState>,
115 ) -> Result<InitializeStateResult, AdapterError> {
116 for builtin_role in BUILTIN_ROLES {
117 assert!(
118 is_reserved_name(builtin_role.name),
119 "builtin role {builtin_role:?} must start with one of the following prefixes {}",
120 BUILTIN_PREFIXES.join(", ")
121 );
122 }
123 for builtin_cluster in BUILTIN_CLUSTERS {
124 assert!(
125 is_reserved_name(builtin_cluster.name),
126 "builtin cluster {builtin_cluster:?} must start with one of the following prefixes {}",
127 BUILTIN_PREFIXES.join(", ")
128 );
129 }
130
131 let mut system_configuration = SystemVars::new().set_unsafe(config.unsafe_mode);
132 if config.all_features {
133 system_configuration.enable_all_feature_flags_by_default();
134 }
135
136 let mut state = CatalogState {
137 database_by_name: imbl::OrdMap::new(),
138 database_by_id: imbl::OrdMap::new(),
139 entry_by_id: imbl::OrdMap::new(),
140 entry_by_global_id: imbl::OrdMap::new(),
141 ambient_schemas_by_name: imbl::OrdMap::new(),
142 ambient_schemas_by_id: imbl::OrdMap::new(),
143 clusters_by_name: imbl::OrdMap::new(),
144 clusters_by_id: imbl::OrdMap::new(),
145 roles_by_name: imbl::OrdMap::new(),
146 roles_by_id: imbl::OrdMap::new(),
147 network_policies_by_id: imbl::OrdMap::new(),
148 role_auth_by_id: imbl::OrdMap::new(),
149 network_policies_by_name: imbl::OrdMap::new(),
150 system_configuration: Arc::new(system_configuration),
151 default_privileges: Arc::new(DefaultPrivileges::default()),
152 system_privileges: Arc::new(PrivilegeMap::default()),
153 comments: Arc::new(CommentsMap::default()),
154 source_references: imbl::OrdMap::new(),
155 storage_metadata: Arc::new(StorageMetadata::default()),
156 temporary_schemas: imbl::OrdMap::new(),
157 mock_authentication_nonce: Default::default(),
158 config: mz_sql::catalog::CatalogConfig {
159 start_time: to_datetime((config.now)()),
160 start_instant: Instant::now(),
161 nonce: rand::random(),
162 environment_id: config.environment_id,
163 session_id: Uuid::new_v4(),
164 build_info: config.build_info,
165 now: config.now.clone(),
166 connection_context: config.connection_context,
167 builtins_cfg: BuiltinsConfig {
168 include_continual_tasks: get_dyncfg_val_from_defaults_and_remote(
172 &config.system_parameter_defaults,
173 config.remote_system_parameters.as_ref(),
174 &ENABLE_CONTINUAL_TASK_BUILTINS,
175 ),
176 },
177 helm_chart_version: config.helm_chart_version,
178 },
179 cluster_replica_sizes: config.cluster_replica_sizes,
180 availability_zones: config.availability_zones,
181 egress_addresses: config.egress_addresses,
182 aws_principal_context: config.aws_principal_context,
183 aws_privatelink_availability_zones: config.aws_privatelink_availability_zones,
184 http_host_name: config.http_host_name,
185 license_key: config.license_key,
186 };
187
188 let deploy_generation = storage.get_deployment_generation().await?;
189
190 let mut updates: Vec<_> = storage.sync_to_current_updates().await?;
191 assert!(!updates.is_empty(), "initial catalog snapshot is missing");
192 let mut txn = storage.transaction().await?;
193
194 let new_builtin_collections = {
196 migrate::durable_migrate(
197 &mut txn,
198 state.config.environment_id.organization_id(),
199 config.boot_ts,
200 )?;
201 if let Some(remote_system_parameters) = config.remote_system_parameters {
204 for (name, value) in remote_system_parameters {
205 txn.upsert_system_config(&name, value)?;
206 }
207 txn.set_system_config_synced_once()?;
208 }
209 let new_builtin_collections =
211 add_new_remove_old_builtin_items_migration(&state.config().builtins_cfg, &mut txn)?;
212 let builtin_bootstrap_cluster_config_map = BuiltinBootstrapClusterConfigMap {
213 system_cluster: config.builtin_system_cluster_config,
214 catalog_server_cluster: config.builtin_catalog_server_cluster_config,
215 probe_cluster: config.builtin_probe_cluster_config,
216 support_cluster: config.builtin_support_cluster_config,
217 analytics_cluster: config.builtin_analytics_cluster_config,
218 };
219 add_new_remove_old_builtin_clusters_migration(
220 &mut txn,
221 &builtin_bootstrap_cluster_config_map,
222 config.boot_ts,
223 )?;
224 add_new_remove_old_builtin_introspection_source_migration(&mut txn)?;
225 add_new_remove_old_builtin_cluster_replicas_migration(
226 &mut txn,
227 &builtin_bootstrap_cluster_config_map,
228 config.boot_ts,
229 )?;
230 add_new_remove_old_builtin_roles_migration(&mut txn)?;
231 remove_invalid_config_param_role_defaults_migration(&mut txn)?;
232 remove_pending_cluster_replicas_migration(&mut txn, config.boot_ts)?;
233
234 new_builtin_collections
235 };
236
237 let op_updates = txn.get_and_commit_op_updates();
238 updates.extend(op_updates);
239
240 let mut builtin_table_updates = Vec::new();
241
242 {
244 for (name, value) in config.system_parameter_defaults {
247 match state.set_system_configuration_default(&name, VarInput::Flat(&value)) {
248 Ok(_) => (),
249 Err(Error {
250 kind: ErrorKind::VarError(VarError::UnknownParameter(name)),
251 }) => {
252 warn!(%name, "cannot load unknown system parameter from catalog storage to set default parameter");
253 }
254 Err(e) => return Err(e.into()),
255 };
256 }
257 state.create_temporary_schema(&SYSTEM_CONN_ID, MZ_SYSTEM_ROLE_ID)?;
258 }
259
260 let mut updates = into_consolidatable_updates_startup(updates, config.boot_ts);
263 differential_dataflow::consolidation::consolidate_updates(&mut updates);
264 soft_assert_no_log!(
265 updates.iter().all(|(_, _, diff)| *diff == Diff::ONE),
266 "consolidated updates should be positive during startup: {updates:?}"
267 );
268
269 let mut pre_item_updates = Vec::new();
270 let mut system_item_updates = Vec::new();
271 let mut item_updates = Vec::new();
272 let mut post_item_updates = Vec::new();
273 let mut audit_log_updates = Vec::new();
274 for (kind, ts, diff) in updates {
275 match kind {
276 BootstrapStateUpdateKind::Role(_)
277 | BootstrapStateUpdateKind::RoleAuth(_)
278 | BootstrapStateUpdateKind::Database(_)
279 | BootstrapStateUpdateKind::Schema(_)
280 | BootstrapStateUpdateKind::DefaultPrivilege(_)
281 | BootstrapStateUpdateKind::SystemPrivilege(_)
282 | BootstrapStateUpdateKind::SystemConfiguration(_)
283 | BootstrapStateUpdateKind::Cluster(_)
284 | BootstrapStateUpdateKind::NetworkPolicy(_)
285 | BootstrapStateUpdateKind::ClusterReplica(_) => {
286 pre_item_updates.push(StateUpdate {
287 kind: kind.into(),
288 ts,
289 diff: diff.try_into().expect("valid diff"),
290 })
291 }
292 BootstrapStateUpdateKind::IntrospectionSourceIndex(_)
293 | BootstrapStateUpdateKind::SystemObjectMapping(_) => {
294 system_item_updates.push(StateUpdate {
295 kind: kind.into(),
296 ts,
297 diff: diff.try_into().expect("valid diff"),
298 })
299 }
300 BootstrapStateUpdateKind::Item(_) => item_updates.push(StateUpdate {
301 kind: kind.into(),
302 ts,
303 diff: diff.try_into().expect("valid diff"),
304 }),
305 BootstrapStateUpdateKind::Comment(_)
306 | BootstrapStateUpdateKind::StorageCollectionMetadata(_)
307 | BootstrapStateUpdateKind::SourceReferences(_)
308 | BootstrapStateUpdateKind::UnfinalizedShard(_) => {
309 post_item_updates.push((kind, ts, diff));
310 }
311 BootstrapStateUpdateKind::AuditLog(_) => {
312 audit_log_updates.push(StateUpdate {
313 kind: kind.into(),
314 ts,
315 diff: diff.try_into().expect("valid diff"),
316 });
317 }
318 }
319 }
320
321 let (builtin_table_update, _catalog_updates) = state
322 .apply_updates(pre_item_updates, &mut LocalExpressionCache::Closed)
323 .await;
324 builtin_table_updates.extend(builtin_table_update);
325
326 {
330 if let Some(password) = config.external_login_password_mz_system {
331 let role_auth = RoleAuth {
332 role_id: MZ_SYSTEM_ROLE_ID,
333 password_hash: Some(
336 scram256_hash(&password, &NonZeroU32::new(600_000).expect("known valid"))
337 .map_err(|_| {
338 AdapterError::Internal("Failed to hash mz_system password.".to_owned())
339 })?,
340 ),
341 updated_at: SYSTEM_TIME(),
342 };
343 state
344 .role_auth_by_id
345 .insert(MZ_SYSTEM_ROLE_ID, role_auth.clone());
346 let builtin_table_update = state.generate_builtin_table_update(
347 mz_catalog::memory::objects::StateUpdateKind::RoleAuth(role_auth.into()),
348 mz_catalog::memory::objects::StateDiff::Addition,
349 );
350 builtin_table_updates.extend(builtin_table_update);
351 }
352 }
353
354 let expr_cache_start = Instant::now();
355 info!("startup: coordinator init: catalog open: expr cache open beginning");
356 let enable_expr_cache_dyncfg = ENABLE_EXPRESSION_CACHE.get(state.system_config().dyncfgs());
359 let expr_cache_enabled = config
360 .enable_expression_cache_override
361 .unwrap_or(enable_expr_cache_dyncfg);
362 let (expr_cache_handle, cached_local_exprs, cached_global_exprs) = if expr_cache_enabled {
363 info!(
364 ?config.enable_expression_cache_override,
365 ?enable_expr_cache_dyncfg,
366 "using expression cache for startup"
367 );
368 let current_ids = txn
369 .get_items()
370 .flat_map(|item| {
371 let gid = item.global_id.clone();
372 let gids: Vec<_> = item.extra_versions.values().cloned().collect();
373 std::iter::once(gid).chain(gids.into_iter())
374 })
375 .chain(
376 txn.get_system_object_mappings()
377 .map(|som| som.unique_identifier.global_id),
378 )
379 .collect();
380 let dyncfgs = config.persist_client.dyncfgs().clone();
381 let build_version = if config.build_info.is_dev() {
382 config
385 .build_info
386 .semver_version_build()
387 .expect("build ID is not available on your platform!")
388 } else {
389 config.build_info.semver_version()
390 };
391 let expr_cache_config = ExpressionCacheConfig {
392 build_version,
393 shard_id: txn
394 .get_expression_cache_shard()
395 .expect("expression cache shard should exist for opened catalogs"),
396 persist: config.persist_client,
397 current_ids,
398 remove_prior_versions: !config.read_only,
399 compact_shard: config.read_only,
400 dyncfgs,
401 };
402 let (expr_cache_handle, cached_local_exprs, cached_global_exprs) =
403 ExpressionCacheHandle::spawn_expression_cache(expr_cache_config).await;
404 (
405 Some(expr_cache_handle),
406 cached_local_exprs,
407 cached_global_exprs,
408 )
409 } else {
410 (None, BTreeMap::new(), BTreeMap::new())
411 };
412 let mut local_expr_cache = LocalExpressionCache::new(cached_local_exprs);
413 info!(
414 "startup: coordinator init: catalog open: expr cache open complete in {:?}",
415 expr_cache_start.elapsed()
416 );
417
418 let (builtin_table_update, _catalog_updates) = state
424 .apply_updates(system_item_updates, &mut local_expr_cache)
425 .await;
426 builtin_table_updates.extend(builtin_table_update);
427
428 let last_seen_version =
429 get_migration_version(&txn).map_or_else(|| "new".into(), |v| v.to_string());
430
431 let mz_authentication_mock_nonce =
432 txn.get_authentication_mock_nonce().ok_or_else(|| {
433 Error::new(ErrorKind::SettingError("authentication nonce".to_string()))
434 })?;
435
436 state.mock_authentication_nonce = Some(mz_authentication_mock_nonce);
437
438 let (builtin_table_update, _catalog_updates) = if !config.skip_migrations {
440 let migrate_result = migrate::migrate(
441 &mut state,
442 &mut txn,
443 &mut local_expr_cache,
444 item_updates,
445 config.now,
446 config.boot_ts,
447 )
448 .await
449 .map_err(|e| {
450 Error::new(ErrorKind::FailedCatalogMigration {
451 last_seen_version: last_seen_version.clone(),
452 this_version: config.build_info.version,
453 cause: e.to_string(),
454 })
455 })?;
456 if !migrate_result.post_item_updates.is_empty() {
457 post_item_updates.extend(migrate_result.post_item_updates);
460 if let Some(max_ts) = post_item_updates.iter().map(|(_, ts, _)| ts).max().cloned() {
462 for (_, ts, _) in &mut post_item_updates {
463 *ts = max_ts;
464 }
465 }
466 differential_dataflow::consolidation::consolidate_updates(&mut post_item_updates);
467 }
468
469 (
470 migrate_result.builtin_table_updates,
471 migrate_result.catalog_updates,
472 )
473 } else {
474 state
475 .apply_updates(item_updates, &mut local_expr_cache)
476 .await
477 };
478 builtin_table_updates.extend(builtin_table_update);
479
480 let post_item_updates = post_item_updates
481 .into_iter()
482 .map(|(kind, ts, diff)| StateUpdate {
483 kind: kind.into(),
484 ts,
485 diff: diff.try_into().expect("valid diff"),
486 })
487 .collect();
488 let (builtin_table_update, _catalog_updates) = state
489 .apply_updates(post_item_updates, &mut local_expr_cache)
490 .await;
491 builtin_table_updates.extend(builtin_table_update);
492
493 for audit_log_update in audit_log_updates {
497 builtin_table_updates.extend(
498 state.generate_builtin_table_update(audit_log_update.kind, audit_log_update.diff),
499 );
500 }
501
502 let schema_migration_result = builtin_schema_migration::run(
504 config.build_info,
505 deploy_generation,
506 &mut txn,
507 config.builtin_item_migration_config,
508 )
509 .await?;
510
511 let state_updates = txn.get_and_commit_op_updates();
512
513 let (table_updates, _catalog_updates) = state
519 .apply_updates(state_updates, &mut local_expr_cache)
520 .await;
521 builtin_table_updates.extend(table_updates);
522 let builtin_table_updates = state.resolve_builtin_table_updates(builtin_table_updates);
523
524 set_migration_version(&mut txn, config.build_info.semver_version())?;
526
527 txn.commit(config.boot_ts).await?;
528
529 schema_migration_result.cleanup_action.await;
531
532 Ok(InitializeStateResult {
533 state,
534 migrated_storage_collections_0dt: schema_migration_result.replaced_items,
535 new_builtin_collections: new_builtin_collections.into_iter().collect(),
536 builtin_table_updates,
537 last_seen_version,
538 expr_cache_handle,
539 cached_global_exprs,
540 uncached_local_exprs: local_expr_cache.into_uncached_exprs(),
541 })
542 }
543
544 #[instrument(name = "catalog::open")]
555 pub fn open(config: Config<'_>) -> BoxFuture<'static, Result<OpenCatalogResult, AdapterError>> {
556 async move {
557 let mut storage = config.storage;
558
559 let InitializeStateResult {
560 state,
561 migrated_storage_collections_0dt,
562 new_builtin_collections,
563 mut builtin_table_updates,
564 last_seen_version: _,
565 expr_cache_handle,
566 cached_global_exprs,
567 uncached_local_exprs,
568 } =
569 Self::initialize_state(config.state, &mut storage)
573 .instrument(tracing::info_span!("catalog::initialize_state"))
574 .boxed()
575 .await?;
576
577 let catalog = Catalog {
578 state,
579 plans: CatalogPlans::default(),
580 expr_cache_handle,
581 transient_revision: 1,
582 storage: Arc::new(tokio::sync::Mutex::new(storage)),
583 };
584
585 for (op, func) in OP_IMPLS.iter() {
588 match func {
589 mz_sql::func::Func::Scalar(impls) => {
590 for imp in impls {
591 builtin_table_updates.push(catalog.state.resolve_builtin_table_update(
592 catalog.state.pack_op_update(op, imp.details(), Diff::ONE),
593 ));
594 }
595 }
596 _ => unreachable!("all operators must be scalar functions"),
597 }
598 }
599
600 for ip in &catalog.state.egress_addresses {
601 builtin_table_updates.push(
602 catalog
603 .state
604 .resolve_builtin_table_update(catalog.state.pack_egress_ip_update(ip)?),
605 );
606 }
607
608 if !catalog.state.license_key.id.is_empty() {
609 builtin_table_updates.push(
610 catalog.state.resolve_builtin_table_update(
611 catalog
612 .state
613 .pack_license_key_update(&catalog.state.license_key)?,
614 ),
615 );
616 }
617
618 catalog.storage().await.mark_bootstrap_complete().await;
619
620 Ok(OpenCatalogResult {
621 catalog,
622 migrated_storage_collections_0dt,
623 new_builtin_collections,
624 builtin_table_updates,
625 cached_global_exprs,
626 uncached_local_exprs,
627 })
628 }
629 .instrument(tracing::info_span!("catalog::open"))
630 .boxed()
631 }
632
633 async fn initialize_storage_state(
640 &mut self,
641 storage_collections: &Arc<
642 dyn StorageCollections<Timestamp = mz_repr::Timestamp> + Send + Sync,
643 >,
644 ) -> Result<(), mz_catalog::durable::CatalogError> {
645 let collections = self
646 .entries()
647 .filter(|entry| entry.item().is_storage_collection())
648 .flat_map(|entry| entry.global_ids())
649 .collect();
650
651 let mut state = self.state.clone();
654
655 let mut storage = self.storage().await;
656 let shard_id = storage.shard_id();
657 let mut txn = storage.transaction().await?;
658
659 let item_id = self.resolve_builtin_storage_collection(&MZ_CATALOG_RAW);
662 let global_id = self.get_entry(&item_id).latest_global_id();
663 match txn.get_collection_metadata().get(&global_id) {
664 None => {
665 txn.insert_collection_metadata([(global_id, shard_id)].into())
666 .map_err(mz_catalog::durable::DurableCatalogError::from)?;
667 }
668 Some(id) => assert_eq!(*id, shard_id),
669 }
670
671 storage_collections
672 .initialize_state(&mut txn, collections)
673 .await
674 .map_err(mz_catalog::durable::DurableCatalogError::from)?;
675
676 let updates = txn.get_and_commit_op_updates();
677 let (builtin_updates, catalog_updates) = state
678 .apply_updates(updates, &mut LocalExpressionCache::Closed)
679 .await;
680 assert!(
681 builtin_updates.is_empty(),
682 "storage is not allowed to generate catalog changes that would cause changes to builtin tables"
683 );
684 assert!(
685 catalog_updates.is_empty(),
686 "storage is not allowed to generate catalog changes that would change the catalog or controller state"
687 );
688 let commit_ts = txn.upper();
689 txn.commit(commit_ts).await?;
690 drop(storage);
691
692 self.state = state;
694 Ok(())
695 }
696
697 pub async fn initialize_controller(
700 &mut self,
701 config: mz_controller::ControllerConfig,
702 envd_epoch: core::num::NonZeroI64,
703 read_only: bool,
704 ) -> Result<mz_controller::Controller<mz_repr::Timestamp>, mz_catalog::durable::CatalogError>
705 {
706 let controller_start = Instant::now();
707 info!("startup: controller init: beginning");
708
709 let controller = {
710 let mut storage = self.storage().await;
711 let mut tx = storage.transaction().await?;
712 mz_controller::prepare_initialization(&mut tx)
713 .map_err(mz_catalog::durable::DurableCatalogError::from)?;
714 let updates = tx.get_and_commit_op_updates();
715 assert!(
716 updates.is_empty(),
717 "initializing controller should not produce updates: {updates:?}"
718 );
719 let commit_ts = tx.upper();
720 tx.commit(commit_ts).await?;
721
722 let read_only_tx = storage.transaction().await?;
723
724 mz_controller::Controller::new(config, envd_epoch, read_only, &read_only_tx).await
725 };
726
727 self.initialize_storage_state(&controller.storage_collections)
728 .await?;
729
730 info!(
731 "startup: controller init: complete in {:?}",
732 controller_start.elapsed()
733 );
734
735 Ok(controller)
736 }
737
738 pub async fn expire(self) {
740 if let Some(storage) = Arc::into_inner(self.storage) {
743 let storage = storage.into_inner();
744 storage.expire().await;
745 }
746 }
747}
748
749impl CatalogState {
750 fn set_system_configuration_default(
752 &mut self,
753 name: &str,
754 value: VarInput,
755 ) -> Result<(), Error> {
756 Ok(Arc::make_mut(&mut self.system_configuration).set_default(name, value)?)
757 }
758}
759
760fn add_new_remove_old_builtin_items_migration(
764 builtins_cfg: &BuiltinsConfig,
765 txn: &mut mz_catalog::durable::Transaction<'_>,
766) -> Result<Vec<GlobalId>, mz_catalog::durable::CatalogError> {
767 let mut new_builtin_mappings = Vec::new();
768 let mut builtin_descs = HashSet::new();
770
771 let mut builtins = Vec::new();
774 for builtin in BUILTINS::iter(builtins_cfg) {
775 let desc = SystemObjectDescription {
776 schema_name: builtin.schema().to_string(),
777 object_type: builtin.catalog_item_type(),
778 object_name: builtin.name().to_string(),
779 };
780 if !builtin_descs.insert(desc.clone()) {
782 panic!(
783 "duplicate builtin description: {:?}, {:?}",
784 SystemObjectDescription {
785 schema_name: builtin.schema().to_string(),
786 object_type: builtin.catalog_item_type(),
787 object_name: builtin.name().to_string(),
788 },
789 builtin
790 );
791 }
792 builtins.push((desc, builtin));
793 }
794
795 let mut system_object_mappings: BTreeMap<_, _> = txn
796 .get_system_object_mappings()
797 .map(|system_object_mapping| {
798 (
799 system_object_mapping.description.clone(),
800 system_object_mapping,
801 )
802 })
803 .collect();
804
805 let (existing_builtins, new_builtins): (Vec<_>, Vec<_>) =
806 builtins.into_iter().partition_map(|(desc, builtin)| {
807 let fingerprint = match builtin.runtime_alterable() {
808 false => builtin.fingerprint(),
809 true => RUNTIME_ALTERABLE_FINGERPRINT_SENTINEL.into(),
810 };
811 match system_object_mappings.remove(&desc) {
812 Some(system_object_mapping) => {
813 Either::Left((builtin, system_object_mapping, fingerprint))
814 }
815 None => Either::Right((builtin, fingerprint)),
816 }
817 });
818 let new_builtin_ids = txn.allocate_system_item_ids(usize_to_u64(new_builtins.len()))?;
819 let new_builtins: Vec<_> = new_builtins
820 .into_iter()
821 .zip_eq(new_builtin_ids.clone())
822 .collect();
823
824 for ((builtin, fingerprint), (catalog_id, global_id)) in new_builtins.iter().cloned() {
826 new_builtin_mappings.push(SystemObjectMapping {
827 description: SystemObjectDescription {
828 schema_name: builtin.schema().to_string(),
829 object_type: builtin.catalog_item_type(),
830 object_name: builtin.name().to_string(),
831 },
832 unique_identifier: SystemObjectUniqueIdentifier {
833 catalog_id,
834 global_id,
835 fingerprint,
836 },
837 });
838
839 let handled_runtime_alterable = match builtin {
845 Builtin::Connection(c) if c.runtime_alterable => {
846 let mut acl_items = vec![rbac::owner_privilege(
847 mz_sql::catalog::ObjectType::Connection,
848 c.owner_id.clone(),
849 )];
850 acl_items.extend_from_slice(c.access);
851 let versions = BTreeMap::new();
853
854 txn.insert_item(
855 catalog_id,
856 c.oid,
857 global_id,
858 mz_catalog::durable::initialize::resolve_system_schema(c.schema).id,
859 c.name,
860 c.sql.into(),
861 *c.owner_id,
862 acl_items,
863 versions,
864 )?;
865 true
866 }
867 _ => false,
868 };
869 assert_eq!(
870 builtin.runtime_alterable(),
871 handled_runtime_alterable,
872 "runtime alterable object was not handled by migration",
873 );
874 }
875 txn.set_system_object_mappings(new_builtin_mappings)?;
876
877 let builtins_with_catalog_ids = existing_builtins
879 .iter()
880 .map(|(b, m, _)| (*b, m.unique_identifier.catalog_id))
881 .chain(
882 new_builtins
883 .into_iter()
884 .map(|((b, _), (catalog_id, _))| (b, catalog_id)),
885 );
886
887 for (builtin, id) in builtins_with_catalog_ids {
888 let (comment_id, desc, comments) = match builtin {
889 Builtin::Source(s) => (CommentObjectId::Source(id), &s.desc, &s.column_comments),
890 Builtin::View(v) => (CommentObjectId::View(id), &v.desc, &v.column_comments),
891 Builtin::Table(t) => (CommentObjectId::Table(id), &t.desc, &t.column_comments),
892 Builtin::MaterializedView(mv) => (
893 CommentObjectId::MaterializedView(id),
894 &mv.desc,
895 &mv.column_comments,
896 ),
897 Builtin::Log(_)
898 | Builtin::Type(_)
899 | Builtin::Func(_)
900 | Builtin::ContinualTask(_)
901 | Builtin::Index(_)
902 | Builtin::Connection(_) => continue,
903 };
904 txn.drop_comments(&BTreeSet::from_iter([comment_id]))?;
905
906 let mut comments = comments.clone();
907 for (col_idx, name) in desc.iter_names().enumerate() {
908 if let Some(comment) = comments.remove(name.as_str()) {
909 txn.update_comment(comment_id, Some(col_idx + 1), Some(comment.to_owned()))?;
911 }
912 }
913 assert!(
914 comments.is_empty(),
915 "builtin object contains dangling comments that don't correspond to columns {comments:?}"
916 );
917 }
918
919 let mut deleted_system_objects = BTreeSet::new();
922 let mut deleted_runtime_alterable_system_ids = BTreeSet::new();
923 let mut deleted_comments = BTreeSet::new();
924 for (desc, mapping) in system_object_mappings {
925 deleted_system_objects.insert(mapping.description);
926 if mapping.unique_identifier.fingerprint == RUNTIME_ALTERABLE_FINGERPRINT_SENTINEL {
927 deleted_runtime_alterable_system_ids.insert(mapping.unique_identifier.catalog_id);
928 }
929
930 let id = mapping.unique_identifier.catalog_id;
931 let comment_id = match desc.object_type {
932 CatalogItemType::Table => CommentObjectId::Table(id),
933 CatalogItemType::Source => CommentObjectId::Source(id),
934 CatalogItemType::View => CommentObjectId::View(id),
935 CatalogItemType::MaterializedView => CommentObjectId::MaterializedView(id),
936 CatalogItemType::Sink
937 | CatalogItemType::Index
938 | CatalogItemType::Type
939 | CatalogItemType::Func
940 | CatalogItemType::Secret
941 | CatalogItemType::Connection
942 | CatalogItemType::ContinualTask => continue,
943 };
944 deleted_comments.insert(comment_id);
945 }
946 let delete_exceptions: HashSet<SystemObjectDescription> = [].into();
952 assert!(
956 deleted_system_objects
957 .iter()
958 .filter(|object| object.object_type != CatalogItemType::Index)
960 .all(
961 |deleted_object| is_unstable_schema(&deleted_object.schema_name)
962 || delete_exceptions.contains(deleted_object)
963 ),
964 "only objects in unstable schemas can be deleted, deleted objects: {:?}",
965 deleted_system_objects
966 );
967 txn.drop_comments(&deleted_comments)?;
968 txn.remove_items(&deleted_runtime_alterable_system_ids)?;
969 txn.remove_system_object_mappings(deleted_system_objects)?;
970
971 let new_builtin_collections = new_builtin_ids
973 .into_iter()
974 .map(|(_catalog_id, global_id)| global_id)
975 .collect();
976
977 Ok(new_builtin_collections)
978}
979
980fn add_new_remove_old_builtin_clusters_migration(
981 txn: &mut mz_catalog::durable::Transaction<'_>,
982 builtin_cluster_config_map: &BuiltinBootstrapClusterConfigMap,
983 boot_ts: Timestamp,
984) -> Result<(), mz_catalog::durable::CatalogError> {
985 let mut durable_clusters: BTreeMap<_, _> = txn
986 .get_clusters()
987 .filter(|cluster| cluster.id.is_system())
988 .map(|cluster| (cluster.name.to_string(), cluster))
989 .collect();
990
991 for builtin_cluster in BUILTIN_CLUSTERS {
993 if durable_clusters.remove(builtin_cluster.name).is_none() {
994 let cluster_config = builtin_cluster_config_map.get_config(builtin_cluster.name)?;
995
996 let cluster_id = txn.insert_system_cluster(
997 builtin_cluster.name,
998 vec![],
999 builtin_cluster.privileges.to_vec(),
1000 builtin_cluster.owner_id.to_owned(),
1001 mz_catalog::durable::ClusterConfig {
1002 variant: mz_catalog::durable::ClusterVariant::Managed(ClusterVariantManaged {
1003 size: cluster_config.size,
1004 availability_zones: vec![],
1005 replication_factor: cluster_config.replication_factor,
1006 logging: default_logging_config(),
1007 optimizer_feature_overrides: Default::default(),
1008 schedule: Default::default(),
1009 }),
1010 workload_class: None,
1011 },
1012 &HashSet::new(),
1013 )?;
1014
1015 let audit_id = txn.allocate_audit_log_id()?;
1016 txn.insert_audit_log_event(VersionedEvent::new(
1017 audit_id,
1018 EventType::Create,
1019 ObjectType::Cluster,
1020 EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
1021 id: cluster_id.to_string(),
1022 name: builtin_cluster.name.to_string(),
1023 }),
1024 None,
1025 boot_ts.into(),
1026 ));
1027 }
1028 }
1029
1030 let old_clusters = durable_clusters
1032 .values()
1033 .map(|cluster| cluster.id)
1034 .collect();
1035 txn.remove_clusters(&old_clusters)?;
1036
1037 for (_name, cluster) in &durable_clusters {
1038 let audit_id = txn.allocate_audit_log_id()?;
1039 txn.insert_audit_log_event(VersionedEvent::new(
1040 audit_id,
1041 EventType::Drop,
1042 ObjectType::Cluster,
1043 EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
1044 id: cluster.id.to_string(),
1045 name: cluster.name.clone(),
1046 }),
1047 None,
1048 boot_ts.into(),
1049 ));
1050 }
1051
1052 Ok(())
1053}
1054
1055fn add_new_remove_old_builtin_introspection_source_migration(
1056 txn: &mut mz_catalog::durable::Transaction<'_>,
1057) -> Result<(), AdapterError> {
1058 let mut new_indexes = Vec::new();
1059 let mut removed_indexes = BTreeSet::new();
1060 for cluster in txn.get_clusters() {
1061 let mut introspection_source_index_ids = txn.get_introspection_source_indexes(cluster.id);
1062
1063 let mut new_logs = Vec::new();
1064
1065 for log in BUILTINS::logs() {
1066 if introspection_source_index_ids.remove(log.name).is_none() {
1067 new_logs.push(log);
1068 }
1069 }
1070
1071 for log in new_logs {
1072 let (item_id, gid) =
1073 Transaction::allocate_introspection_source_index_id(&cluster.id, log.variant);
1074 new_indexes.push((cluster.id, log.name.to_string(), item_id, gid));
1075 }
1076
1077 removed_indexes.extend(
1080 introspection_source_index_ids
1081 .into_keys()
1082 .map(|name| (cluster.id, name.to_string())),
1083 );
1084 }
1085 txn.insert_introspection_source_indexes(new_indexes, &HashSet::new())?;
1086 txn.remove_introspection_source_indexes(removed_indexes)?;
1087 Ok(())
1088}
1089
1090fn add_new_remove_old_builtin_roles_migration(
1091 txn: &mut mz_catalog::durable::Transaction<'_>,
1092) -> Result<(), mz_catalog::durable::CatalogError> {
1093 let mut durable_roles: BTreeMap<_, _> = txn
1094 .get_roles()
1095 .filter(|role| role.id.is_system() || role.id.is_predefined())
1096 .map(|role| (role.name.to_string(), role))
1097 .collect();
1098
1099 for builtin_role in BUILTIN_ROLES {
1101 if durable_roles.remove(builtin_role.name).is_none() {
1102 txn.insert_builtin_role(
1103 builtin_role.id,
1104 builtin_role.name.to_string(),
1105 builtin_role.attributes.clone(),
1106 RoleMembership::new(),
1107 RoleVars::default(),
1108 builtin_role.oid,
1109 )?;
1110 }
1111 }
1112
1113 let old_roles = durable_roles.values().map(|role| role.id).collect();
1115 txn.remove_roles(&old_roles)?;
1116
1117 Ok(())
1118}
1119
1120fn add_new_remove_old_builtin_cluster_replicas_migration(
1121 txn: &mut Transaction<'_>,
1122 builtin_cluster_config_map: &BuiltinBootstrapClusterConfigMap,
1123 boot_ts: Timestamp,
1124) -> Result<(), AdapterError> {
1125 let cluster_lookup: BTreeMap<_, _> = txn
1126 .get_clusters()
1127 .map(|cluster| (cluster.name.clone(), cluster.clone()))
1128 .collect();
1129
1130 let cluster_id_to_name: BTreeMap<ClusterId, String> = cluster_lookup
1131 .values()
1132 .map(|cluster| (cluster.id, cluster.name.clone()))
1133 .collect();
1134
1135 let mut durable_replicas: BTreeMap<ClusterId, BTreeMap<String, ClusterReplica>> = txn
1136 .get_cluster_replicas()
1137 .filter(|replica| replica.replica_id.is_system())
1138 .fold(BTreeMap::new(), |mut acc, replica| {
1139 acc.entry(replica.cluster_id)
1140 .or_insert_with(BTreeMap::new)
1141 .insert(replica.name.to_string(), replica);
1142 acc
1143 });
1144
1145 for builtin_replica in BUILTIN_CLUSTER_REPLICAS {
1147 let cluster = cluster_lookup
1148 .get(builtin_replica.cluster_name)
1149 .expect("builtin cluster replica references non-existent cluster");
1150 let mut empty_map: BTreeMap<String, ClusterReplica> = BTreeMap::new();
1152 let replica_names = durable_replicas
1153 .get_mut(&cluster.id)
1154 .unwrap_or(&mut empty_map);
1155
1156 let builtin_cluster_bootstrap_config =
1157 builtin_cluster_config_map.get_config(builtin_replica.cluster_name)?;
1158 if replica_names.remove(builtin_replica.name).is_none()
1159 && builtin_cluster_bootstrap_config.replication_factor > 0
1163 {
1164 let replica_size = match cluster.config.variant {
1165 ClusterVariant::Managed(ClusterVariantManaged { ref size, .. }) => size.clone(),
1166 ClusterVariant::Unmanaged => builtin_cluster_bootstrap_config.size.clone(),
1167 };
1168
1169 let config = builtin_cluster_replica_config(replica_size.clone());
1170 let replica_id = txn.insert_cluster_replica(
1171 cluster.id,
1172 builtin_replica.name,
1173 config,
1174 MZ_SYSTEM_ROLE_ID,
1175 )?;
1176
1177 let audit_id = txn.allocate_audit_log_id()?;
1178 txn.insert_audit_log_event(VersionedEvent::new(
1179 audit_id,
1180 EventType::Create,
1181 ObjectType::ClusterReplica,
1182 EventDetails::CreateClusterReplicaV4(mz_audit_log::CreateClusterReplicaV4 {
1183 cluster_id: cluster.id.to_string(),
1184 cluster_name: cluster.name.clone(),
1185 replica_id: Some(replica_id.to_string()),
1186 replica_name: builtin_replica.name.to_string(),
1187 logical_size: replica_size,
1188 billed_as: None,
1189 internal: false,
1190 reason: CreateOrDropClusterReplicaReasonV1::System,
1191 scheduling_policies: None,
1192 }),
1193 None,
1194 boot_ts.into(),
1195 ));
1196 }
1197 }
1198
1199 let old_replicas: Vec<_> = durable_replicas
1201 .values()
1202 .flat_map(|replicas| replicas.values())
1203 .collect();
1204 let old_replica_ids = old_replicas.iter().map(|r| r.replica_id).collect();
1205 txn.remove_cluster_replicas(&old_replica_ids)?;
1206
1207 for replica in &old_replicas {
1208 let cluster_name = cluster_id_to_name
1209 .get(&replica.cluster_id)
1210 .cloned()
1211 .unwrap_or_else(|| "<unknown>".to_string());
1212
1213 let audit_id = txn.allocate_audit_log_id()?;
1214 txn.insert_audit_log_event(VersionedEvent::new(
1215 audit_id,
1216 EventType::Drop,
1217 ObjectType::ClusterReplica,
1218 EventDetails::DropClusterReplicaV3(mz_audit_log::DropClusterReplicaV3 {
1219 cluster_id: replica.cluster_id.to_string(),
1220 cluster_name,
1221 replica_id: Some(replica.replica_id.to_string()),
1222 replica_name: replica.name.clone(),
1223 reason: CreateOrDropClusterReplicaReasonV1::System,
1224 scheduling_policies: None,
1225 }),
1226 None,
1227 boot_ts.into(),
1228 ));
1229 }
1230
1231 Ok(())
1232}
1233
1234fn remove_invalid_config_param_role_defaults_migration(
1241 txn: &mut Transaction<'_>,
1242) -> Result<(), AdapterError> {
1243 static BUILD_INFO: mz_build_info::BuildInfo = mz_build_info::build_info!();
1244
1245 let roles_to_migrate: BTreeMap<_, _> = txn
1246 .get_roles()
1247 .filter_map(|mut role| {
1248 let session_vars = SessionVars::new_unchecked(&BUILD_INFO, SYSTEM_USER.clone(), None);
1253
1254 let mut invalid_roles_vars = BTreeMap::new();
1256 for (name, value) in &role.vars.map {
1257 let Ok(session_var) = session_vars.inspect(name) else {
1259 invalid_roles_vars.insert(name.clone(), value.clone());
1260 continue;
1261 };
1262 if session_var.check(value.borrow()).is_err() {
1263 invalid_roles_vars.insert(name.clone(), value.clone());
1264 }
1265 }
1266
1267 if invalid_roles_vars.is_empty() {
1269 return None;
1270 }
1271
1272 tracing::warn!(?role, ?invalid_roles_vars, "removing invalid role vars");
1273
1274 for (name, _value) in invalid_roles_vars {
1276 role.vars.map.remove(&name);
1277 }
1278 Some(role)
1279 })
1280 .map(|role| (role.id, role))
1281 .collect();
1282
1283 txn.update_roles_without_auth(roles_to_migrate)?;
1284
1285 Ok(())
1286}
1287
1288fn remove_pending_cluster_replicas_migration(
1291 tx: &mut Transaction,
1292 boot_ts: mz_repr::Timestamp,
1293) -> Result<(), anyhow::Error> {
1294 let cluster_names: BTreeMap<_, _> = tx.get_clusters().map(|c| (c.id, c.name)).collect();
1296
1297 let occurred_at = boot_ts.into();
1298
1299 for replica in tx.get_cluster_replicas().collect::<Vec<_>>() {
1300 if let mz_catalog::durable::ReplicaLocation::Managed { pending: true, .. } =
1301 replica.config.location
1302 {
1303 let cluster_name = cluster_names
1304 .get(&replica.cluster_id)
1305 .cloned()
1306 .unwrap_or_else(|| "<unknown>".to_string());
1307
1308 info!(
1309 "removing pending cluster replica '{}' from cluster '{}'",
1310 replica.name, cluster_name,
1311 );
1312
1313 tx.remove_cluster_replica(replica.replica_id)?;
1314
1315 let audit_id = tx.allocate_audit_log_id()?;
1319 tx.insert_audit_log_event(VersionedEvent::new(
1320 audit_id,
1321 EventType::Drop,
1322 ObjectType::ClusterReplica,
1323 EventDetails::DropClusterReplicaV3(mz_audit_log::DropClusterReplicaV3 {
1324 cluster_id: replica.cluster_id.to_string(),
1325 cluster_name,
1326 replica_id: Some(replica.replica_id.to_string()),
1327 replica_name: replica.name,
1328 reason: CreateOrDropClusterReplicaReasonV1::System,
1329 scheduling_policies: None,
1330 }),
1331 None,
1332 occurred_at,
1333 ));
1334 }
1335 }
1336 Ok(())
1337}
1338
1339pub(crate) fn builtin_cluster_replica_config(
1340 replica_size: String,
1341) -> mz_catalog::durable::ReplicaConfig {
1342 mz_catalog::durable::ReplicaConfig {
1343 location: mz_catalog::durable::ReplicaLocation::Managed {
1344 availability_zone: None,
1345 billed_as: None,
1346 pending: false,
1347 internal: false,
1348 size: replica_size,
1349 },
1350 logging: default_logging_config(),
1351 }
1352}
1353
1354fn default_logging_config() -> ReplicaLogging {
1355 ReplicaLogging {
1356 log_logging: false,
1357 interval: Some(Duration::from_secs(1)),
1358 }
1359}
1360
1361#[derive(Debug)]
1362pub struct BuiltinBootstrapClusterConfigMap {
1363 pub system_cluster: BootstrapBuiltinClusterConfig,
1365 pub catalog_server_cluster: BootstrapBuiltinClusterConfig,
1367 pub probe_cluster: BootstrapBuiltinClusterConfig,
1369 pub support_cluster: BootstrapBuiltinClusterConfig,
1371 pub analytics_cluster: BootstrapBuiltinClusterConfig,
1373}
1374
1375impl BuiltinBootstrapClusterConfigMap {
1376 fn get_config(
1378 &self,
1379 cluster_name: &str,
1380 ) -> Result<BootstrapBuiltinClusterConfig, mz_catalog::durable::CatalogError> {
1381 let cluster_config = if cluster_name == mz_catalog::builtin::MZ_SYSTEM_CLUSTER.name {
1382 &self.system_cluster
1383 } else if cluster_name == mz_catalog::builtin::MZ_CATALOG_SERVER_CLUSTER.name {
1384 &self.catalog_server_cluster
1385 } else if cluster_name == mz_catalog::builtin::MZ_PROBE_CLUSTER.name {
1386 &self.probe_cluster
1387 } else if cluster_name == mz_catalog::builtin::MZ_SUPPORT_CLUSTER.name {
1388 &self.support_cluster
1389 } else if cluster_name == mz_catalog::builtin::MZ_ANALYTICS_CLUSTER.name {
1390 &self.analytics_cluster
1391 } else {
1392 return Err(mz_catalog::durable::CatalogError::Catalog(
1393 SqlCatalogError::UnexpectedBuiltinCluster(cluster_name.to_owned()),
1394 ));
1395 };
1396 Ok(cluster_config.clone())
1397 }
1398}
1399
1400pub(crate) fn into_consolidatable_updates_startup(
1417 updates: Vec<StateUpdate>,
1418 ts: Timestamp,
1419) -> Vec<(BootstrapStateUpdateKind, Timestamp, Diff)> {
1420 updates
1421 .into_iter()
1422 .map(|StateUpdate { kind, ts: _, diff }| {
1423 let kind: BootstrapStateUpdateKind = kind
1424 .try_into()
1425 .unwrap_or_else(|e| panic!("temporary items do not exist during bootstrap: {e:?}"));
1426 (kind, ts, Diff::from(diff))
1427 })
1428 .collect()
1429}
1430
1431fn get_dyncfg_val_from_defaults_and_remote<T: mz_dyncfg::ConfigDefault>(
1432 defaults: &BTreeMap<String, String>,
1433 remote: Option<&BTreeMap<String, String>>,
1434 cfg: &mz_dyncfg::Config<T>,
1435) -> T::ConfigType {
1436 let mut val = T::into_config_type(cfg.default().clone());
1437 let get_fn = |map: &BTreeMap<String, String>| {
1438 let val = map.get(cfg.name())?;
1439 match <T::ConfigType as mz_dyncfg::ConfigType>::parse(val) {
1440 Ok(x) => Some(x),
1441 Err(err) => {
1442 tracing::warn!("could not parse {} value [{}]: {}", cfg.name(), val, err);
1443 None
1444 }
1445 }
1446 };
1447 if let Some(x) = get_fn(defaults) {
1448 val = x;
1449 }
1450 if let Some(x) = remote.and_then(get_fn) {
1451 val = x;
1452 }
1453 val
1454}