1mod builtin_schema_migration;
13
14use std::collections::{BTreeMap, BTreeSet};
15use std::num::NonZeroU32;
16use std::sync::Arc;
17use std::time::{Duration, Instant};
18
19use futures::future::{BoxFuture, FutureExt};
20use itertools::{Either, Itertools};
21use mz_adapter_types::bootstrap_builtin_cluster_config::BootstrapBuiltinClusterConfig;
22use mz_adapter_types::dyncfgs::{ENABLE_CONTINUAL_TASK_BUILTINS, ENABLE_EXPRESSION_CACHE};
23use mz_audit_log::{
24 CreateOrDropClusterReplicaReasonV1, EventDetails, EventType, ObjectType, VersionedEvent,
25};
26use mz_auth::hash::scram256_hash;
27use mz_catalog::SYSTEM_CONN_ID;
28use mz_catalog::builtin::{
29 BUILTIN_CLUSTER_REPLICAS, BUILTIN_CLUSTERS, BUILTIN_PREFIXES, BUILTIN_ROLES, BUILTINS, Builtin,
30 Fingerprint, MZ_CATALOG_RAW, RUNTIME_ALTERABLE_FINGERPRINT_SENTINEL,
31};
32use mz_catalog::config::StateConfig;
33use mz_catalog::durable::objects::{
34 SystemObjectDescription, SystemObjectMapping, SystemObjectUniqueIdentifier,
35};
36use mz_catalog::durable::{ClusterReplica, ClusterVariant, ClusterVariantManaged, Transaction};
37use mz_catalog::expr_cache::{
38 ExpressionCacheConfig, ExpressionCacheHandle, GlobalExpressions, LocalExpressions,
39};
40use mz_catalog::memory::error::{Error, ErrorKind};
41use mz_catalog::memory::objects::{
42 BootstrapStateUpdateKind, CommentsMap, DefaultPrivileges, RoleAuth, StateUpdate,
43};
44use mz_controller::clusters::ReplicaLogging;
45use mz_controller_types::ClusterId;
46use mz_ore::cast::usize_to_u64;
47use mz_ore::collections::HashSet;
48use mz_ore::now::{SYSTEM_TIME, to_datetime};
49use mz_ore::{instrument, soft_assert_no_log};
50use mz_repr::adt::mz_acl_item::PrivilegeMap;
51use mz_repr::namespaces::is_unstable_schema;
52use mz_repr::{CatalogItemId, Diff, GlobalId, Timestamp};
53use mz_sql::catalog::{
54 BuiltinsConfig, CatalogError as SqlCatalogError, CatalogItemType, RoleMembership, RoleVars,
55};
56use mz_sql::func::OP_IMPLS;
57use mz_sql::names::CommentObjectId;
58use mz_sql::rbac;
59use mz_sql::session::user::{MZ_SYSTEM_ROLE_ID, SYSTEM_USER};
60use mz_sql::session::vars::{SessionVars, SystemVars, VarError, VarInput};
61use mz_storage_client::controller::{StorageMetadata, StorageTxn};
62use mz_storage_client::storage_collections::StorageCollections;
63use tracing::{Instrument, info, warn};
64use uuid::Uuid;
65
66use crate::AdapterError;
68use crate::catalog::migrate::{self, get_migration_version, set_migration_version};
69use crate::catalog::state::LocalExpressionCache;
70use crate::catalog::{BuiltinTableUpdate, Catalog, CatalogState, Config, is_reserved_name};
71
72pub struct InitializeStateResult {
73 pub state: CatalogState,
75 pub migrated_storage_collections_0dt: BTreeSet<CatalogItemId>,
77 pub new_builtin_collections: BTreeSet<GlobalId>,
79 pub builtin_table_updates: Vec<BuiltinTableUpdate>,
81 pub last_seen_version: String,
83 pub expr_cache_handle: Option<ExpressionCacheHandle>,
85 pub cached_global_exprs: BTreeMap<GlobalId, GlobalExpressions>,
87 pub uncached_local_exprs: BTreeMap<GlobalId, LocalExpressions>,
89}
90
91pub struct OpenCatalogResult {
92 pub catalog: Catalog,
94 pub migrated_storage_collections_0dt: BTreeSet<CatalogItemId>,
96 pub new_builtin_collections: BTreeSet<GlobalId>,
98 pub builtin_table_updates: Vec<BuiltinTableUpdate>,
100 pub cached_global_exprs: BTreeMap<GlobalId, GlobalExpressions>,
102 pub uncached_local_exprs: BTreeMap<GlobalId, LocalExpressions>,
104}
105
106impl Catalog {
107 pub async fn initialize_state<'a>(
111 config: StateConfig,
112 storage: &'a mut Box<dyn mz_catalog::durable::DurableCatalogState>,
113 ) -> Result<InitializeStateResult, AdapterError> {
114 for builtin_role in BUILTIN_ROLES {
115 assert!(
116 is_reserved_name(builtin_role.name),
117 "builtin role {builtin_role:?} must start with one of the following prefixes {}",
118 BUILTIN_PREFIXES.join(", ")
119 );
120 }
121 for builtin_cluster in BUILTIN_CLUSTERS {
122 assert!(
123 is_reserved_name(builtin_cluster.name),
124 "builtin cluster {builtin_cluster:?} must start with one of the following prefixes {}",
125 BUILTIN_PREFIXES.join(", ")
126 );
127 }
128
129 let mut system_configuration = SystemVars::new().set_unsafe(config.unsafe_mode);
130 if config.all_features {
131 system_configuration.enable_all_feature_flags_by_default();
132 }
133
134 let mut state = CatalogState {
135 database_by_name: imbl::OrdMap::new(),
136 database_by_id: imbl::OrdMap::new(),
137 entry_by_id: imbl::OrdMap::new(),
138 entry_by_global_id: imbl::OrdMap::new(),
139 notices_by_dep_id: imbl::OrdMap::new(),
140 ambient_schemas_by_name: imbl::OrdMap::new(),
141 ambient_schemas_by_id: imbl::OrdMap::new(),
142 clusters_by_name: imbl::OrdMap::new(),
143 clusters_by_id: imbl::OrdMap::new(),
144 roles_by_name: imbl::OrdMap::new(),
145 roles_by_id: imbl::OrdMap::new(),
146 network_policies_by_id: imbl::OrdMap::new(),
147 role_auth_by_id: imbl::OrdMap::new(),
148 network_policies_by_name: imbl::OrdMap::new(),
149 system_configuration: Arc::new(system_configuration),
150 default_privileges: Arc::new(DefaultPrivileges::default()),
151 system_privileges: Arc::new(PrivilegeMap::default()),
152 comments: Arc::new(CommentsMap::default()),
153 source_references: imbl::OrdMap::new(),
154 storage_metadata: Arc::new(StorageMetadata::default()),
155 temporary_schemas: imbl::OrdMap::new(),
156 mock_authentication_nonce: Default::default(),
157 config: mz_sql::catalog::CatalogConfig {
158 start_time: to_datetime((config.now)()),
159 start_instant: Instant::now(),
160 nonce: rand::random(),
161 environment_id: config.environment_id,
162 session_id: Uuid::new_v4(),
163 build_info: config.build_info,
164 now: config.now.clone(),
165 connection_context: config.connection_context,
166 builtins_cfg: BuiltinsConfig {
167 include_continual_tasks: get_dyncfg_val_from_defaults_and_remote(
171 &config.system_parameter_defaults,
172 config.remote_system_parameters.as_ref(),
173 &ENABLE_CONTINUAL_TASK_BUILTINS,
174 ),
175 },
176 helm_chart_version: config.helm_chart_version,
177 },
178 cluster_replica_sizes: config.cluster_replica_sizes,
179 availability_zones: config.availability_zones,
180 egress_addresses: config.egress_addresses,
181 aws_principal_context: config.aws_principal_context,
182 aws_privatelink_availability_zones: config.aws_privatelink_availability_zones,
183 http_host_name: config.http_host_name,
184 license_key: config.license_key,
185 };
186
187 let deploy_generation = storage.get_deployment_generation().await?;
188
189 let mut updates: Vec<_> = storage.sync_to_current_updates().await?;
190 assert!(!updates.is_empty(), "initial catalog snapshot is missing");
191 let mut txn = storage.transaction().await?;
192
193 let new_builtin_collections = {
195 migrate::durable_migrate(
196 &mut txn,
197 state.config.environment_id.organization_id(),
198 config.boot_ts,
199 )?;
200 if let Some(remote_system_parameters) = config.remote_system_parameters {
203 for (name, value) in remote_system_parameters {
204 txn.upsert_system_config(&name, value)?;
205 }
206 txn.set_system_config_synced_once()?;
207 }
208 let new_builtin_collections =
210 add_new_remove_old_builtin_items_migration(&state.config().builtins_cfg, &mut txn)?;
211 let builtin_bootstrap_cluster_config_map = BuiltinBootstrapClusterConfigMap {
212 system_cluster: config.builtin_system_cluster_config,
213 catalog_server_cluster: config.builtin_catalog_server_cluster_config,
214 probe_cluster: config.builtin_probe_cluster_config,
215 support_cluster: config.builtin_support_cluster_config,
216 analytics_cluster: config.builtin_analytics_cluster_config,
217 };
218 add_new_remove_old_builtin_clusters_migration(
219 &mut txn,
220 &builtin_bootstrap_cluster_config_map,
221 config.boot_ts,
222 )?;
223 add_new_remove_old_builtin_introspection_source_migration(&mut txn)?;
224 add_new_remove_old_builtin_cluster_replicas_migration(
225 &mut txn,
226 &builtin_bootstrap_cluster_config_map,
227 config.boot_ts,
228 )?;
229 add_new_remove_old_builtin_roles_migration(&mut txn)?;
230 remove_invalid_config_param_role_defaults_migration(&mut txn)?;
231 remove_pending_cluster_replicas_migration(&mut txn, config.boot_ts)?;
232
233 new_builtin_collections
234 };
235
236 let op_updates = txn.get_and_commit_op_updates();
237 updates.extend(op_updates);
238
239 let mut builtin_table_updates = Vec::new();
240
241 {
243 for (name, value) in config.system_parameter_defaults {
246 match state.set_system_configuration_default(&name, VarInput::Flat(&value)) {
247 Ok(_) => (),
248 Err(Error {
249 kind: ErrorKind::VarError(VarError::UnknownParameter(name)),
250 }) => {
251 warn!(%name, "cannot load unknown system parameter from catalog storage to set default parameter");
252 }
253 Err(e) => return Err(e.into()),
254 };
255 }
256 state.create_temporary_schema(&SYSTEM_CONN_ID, MZ_SYSTEM_ROLE_ID)?;
257 }
258
259 let mut updates = into_consolidatable_updates_startup(updates, config.boot_ts);
262 differential_dataflow::consolidation::consolidate_updates(&mut updates);
263 soft_assert_no_log!(
264 updates.iter().all(|(_, _, diff)| *diff == Diff::ONE),
265 "consolidated updates should be positive during startup: {updates:?}"
266 );
267
268 let mut pre_item_updates = Vec::new();
269 let mut system_item_updates = Vec::new();
270 let mut item_updates = Vec::new();
271 let mut post_item_updates = Vec::new();
272 let mut audit_log_updates = Vec::new();
273 for (kind, ts, diff) in updates {
274 match kind {
275 BootstrapStateUpdateKind::Role(_)
276 | BootstrapStateUpdateKind::RoleAuth(_)
277 | BootstrapStateUpdateKind::Database(_)
278 | BootstrapStateUpdateKind::Schema(_)
279 | BootstrapStateUpdateKind::DefaultPrivilege(_)
280 | BootstrapStateUpdateKind::SystemPrivilege(_)
281 | BootstrapStateUpdateKind::SystemConfiguration(_)
282 | BootstrapStateUpdateKind::Cluster(_)
283 | BootstrapStateUpdateKind::NetworkPolicy(_)
284 | BootstrapStateUpdateKind::ClusterReplica(_) => {
285 pre_item_updates.push(StateUpdate {
286 kind: kind.into(),
287 ts,
288 diff: diff.try_into().expect("valid diff"),
289 })
290 }
291 BootstrapStateUpdateKind::IntrospectionSourceIndex(_)
292 | BootstrapStateUpdateKind::SystemObjectMapping(_) => {
293 system_item_updates.push(StateUpdate {
294 kind: kind.into(),
295 ts,
296 diff: diff.try_into().expect("valid diff"),
297 })
298 }
299 BootstrapStateUpdateKind::Item(_) => item_updates.push(StateUpdate {
300 kind: kind.into(),
301 ts,
302 diff: diff.try_into().expect("valid diff"),
303 }),
304 BootstrapStateUpdateKind::Comment(_)
305 | BootstrapStateUpdateKind::StorageCollectionMetadata(_)
306 | BootstrapStateUpdateKind::SourceReferences(_)
307 | BootstrapStateUpdateKind::UnfinalizedShard(_) => {
308 post_item_updates.push((kind, ts, diff));
309 }
310 BootstrapStateUpdateKind::AuditLog(_) => {
311 audit_log_updates.push(StateUpdate {
312 kind: kind.into(),
313 ts,
314 diff: diff.try_into().expect("valid diff"),
315 });
316 }
317 }
318 }
319
320 let (builtin_table_update, _catalog_updates) = state
321 .apply_updates(pre_item_updates, &mut LocalExpressionCache::Closed)
322 .await;
323 builtin_table_updates.extend(builtin_table_update);
324
325 {
329 if let Some(password) = config.external_login_password_mz_system {
330 let role_auth = RoleAuth {
331 role_id: MZ_SYSTEM_ROLE_ID,
332 password_hash: Some(
335 scram256_hash(&password, &NonZeroU32::new(600_000).expect("known valid"))
336 .map_err(|_| {
337 AdapterError::Internal("Failed to hash mz_system password.".to_owned())
338 })?,
339 ),
340 updated_at: SYSTEM_TIME(),
341 };
342 state
343 .role_auth_by_id
344 .insert(MZ_SYSTEM_ROLE_ID, role_auth.clone());
345 let builtin_table_update = state.generate_builtin_table_update(
346 mz_catalog::memory::objects::StateUpdateKind::RoleAuth(role_auth.into()),
347 mz_catalog::memory::objects::StateDiff::Addition,
348 );
349 builtin_table_updates.extend(builtin_table_update);
350 }
351 }
352
353 let expr_cache_start = Instant::now();
354 info!("startup: coordinator init: catalog open: expr cache open beginning");
355 let enable_expr_cache_dyncfg = ENABLE_EXPRESSION_CACHE.get(state.system_config().dyncfgs());
358 let expr_cache_enabled = config
359 .enable_expression_cache_override
360 .unwrap_or(enable_expr_cache_dyncfg);
361 let (expr_cache_handle, cached_local_exprs, cached_global_exprs) = if expr_cache_enabled {
362 info!(
363 ?config.enable_expression_cache_override,
364 ?enable_expr_cache_dyncfg,
365 "using expression cache for startup"
366 );
367 let current_ids = txn
368 .get_items()
369 .flat_map(|item| {
370 let gid = item.global_id.clone();
371 let gids: Vec<_> = item.extra_versions.values().cloned().collect();
372 std::iter::once(gid).chain(gids.into_iter())
373 })
374 .chain(
375 txn.get_system_object_mappings()
376 .map(|som| som.unique_identifier.global_id),
377 )
378 .collect();
379 let dyncfgs = config.persist_client.dyncfgs().clone();
380 let build_version = if config.build_info.is_dev() {
381 config
384 .build_info
385 .semver_version_build()
386 .expect("build ID is not available on your platform!")
387 } else {
388 config.build_info.semver_version()
389 };
390 let expr_cache_config = ExpressionCacheConfig {
391 build_version,
392 shard_id: txn
393 .get_expression_cache_shard()
394 .expect("expression cache shard should exist for opened catalogs"),
395 persist: config.persist_client,
396 current_ids,
397 remove_prior_versions: !config.read_only,
398 compact_shard: config.read_only,
399 dyncfgs,
400 };
401 let (expr_cache_handle, cached_local_exprs, cached_global_exprs) =
402 ExpressionCacheHandle::spawn_expression_cache(expr_cache_config).await;
403 (
404 Some(expr_cache_handle),
405 cached_local_exprs,
406 cached_global_exprs,
407 )
408 } else {
409 (None, BTreeMap::new(), BTreeMap::new())
410 };
411 let mut local_expr_cache = LocalExpressionCache::new(cached_local_exprs);
412 info!(
413 "startup: coordinator init: catalog open: expr cache open complete in {:?}",
414 expr_cache_start.elapsed()
415 );
416
417 let (builtin_table_update, _catalog_updates) = state
423 .apply_updates(system_item_updates, &mut local_expr_cache)
424 .await;
425 builtin_table_updates.extend(builtin_table_update);
426
427 let last_seen_version =
428 get_migration_version(&txn).map_or_else(|| "new".into(), |v| v.to_string());
429
430 let mz_authentication_mock_nonce =
431 txn.get_authentication_mock_nonce().ok_or_else(|| {
432 Error::new(ErrorKind::SettingError("authentication nonce".to_string()))
433 })?;
434
435 state.mock_authentication_nonce = Some(mz_authentication_mock_nonce);
436
437 let (builtin_table_update, _catalog_updates) = if !config.skip_migrations {
439 let migrate_result = migrate::migrate(
440 &mut state,
441 &mut txn,
442 &mut local_expr_cache,
443 item_updates,
444 config.now,
445 config.boot_ts,
446 )
447 .await
448 .map_err(|e| {
449 Error::new(ErrorKind::FailedCatalogMigration {
450 last_seen_version: last_seen_version.clone(),
451 this_version: config.build_info.version,
452 cause: e.to_string(),
453 })
454 })?;
455 if !migrate_result.post_item_updates.is_empty() {
456 post_item_updates.extend(migrate_result.post_item_updates);
459 if let Some(max_ts) = post_item_updates.iter().map(|(_, ts, _)| ts).max().cloned() {
461 for (_, ts, _) in &mut post_item_updates {
462 *ts = max_ts;
463 }
464 }
465 differential_dataflow::consolidation::consolidate_updates(&mut post_item_updates);
466 }
467
468 (
469 migrate_result.builtin_table_updates,
470 migrate_result.catalog_updates,
471 )
472 } else {
473 state
474 .apply_updates(item_updates, &mut local_expr_cache)
475 .await
476 };
477 builtin_table_updates.extend(builtin_table_update);
478
479 let post_item_updates = post_item_updates
480 .into_iter()
481 .map(|(kind, ts, diff)| StateUpdate {
482 kind: kind.into(),
483 ts,
484 diff: diff.try_into().expect("valid diff"),
485 })
486 .collect();
487 let (builtin_table_update, _catalog_updates) = state
488 .apply_updates(post_item_updates, &mut local_expr_cache)
489 .await;
490 builtin_table_updates.extend(builtin_table_update);
491
492 for audit_log_update in audit_log_updates {
496 builtin_table_updates.extend(
497 state.generate_builtin_table_update(audit_log_update.kind, audit_log_update.diff),
498 );
499 }
500
501 let schema_migration_result = builtin_schema_migration::run(
503 config.build_info,
504 deploy_generation,
505 &mut txn,
506 config.builtin_item_migration_config,
507 )
508 .await?;
509
510 let state_updates = txn.get_and_commit_op_updates();
511
512 let (table_updates, _catalog_updates) = state
518 .apply_updates(state_updates, &mut local_expr_cache)
519 .await;
520 builtin_table_updates.extend(table_updates);
521 let builtin_table_updates = state.resolve_builtin_table_updates(builtin_table_updates);
522
523 set_migration_version(&mut txn, config.build_info.semver_version())?;
525
526 txn.commit(config.boot_ts).await?;
527
528 schema_migration_result.cleanup_action.await;
530
531 Ok(InitializeStateResult {
532 state,
533 migrated_storage_collections_0dt: schema_migration_result.replaced_items,
534 new_builtin_collections: new_builtin_collections.into_iter().collect(),
535 builtin_table_updates,
536 last_seen_version,
537 expr_cache_handle,
538 cached_global_exprs,
539 uncached_local_exprs: local_expr_cache.into_uncached_exprs(),
540 })
541 }
542
543 #[instrument(name = "catalog::open")]
554 pub fn open(config: Config<'_>) -> BoxFuture<'static, Result<OpenCatalogResult, AdapterError>> {
555 async move {
556 let mut storage = config.storage;
557
558 let InitializeStateResult {
559 state,
560 migrated_storage_collections_0dt,
561 new_builtin_collections,
562 mut builtin_table_updates,
563 last_seen_version: _,
564 expr_cache_handle,
565 cached_global_exprs,
566 uncached_local_exprs,
567 } =
568 Self::initialize_state(config.state, &mut storage)
572 .instrument(tracing::info_span!("catalog::initialize_state"))
573 .boxed()
574 .await?;
575
576 let catalog = Catalog {
577 state,
578 expr_cache_handle,
579 transient_revision: 1,
580 storage: Arc::new(tokio::sync::Mutex::new(storage)),
581 };
582
583 for (op, func) in OP_IMPLS.iter() {
586 match func {
587 mz_sql::func::Func::Scalar(impls) => {
588 for imp in impls {
589 builtin_table_updates.push(catalog.state.resolve_builtin_table_update(
590 catalog.state.pack_op_update(op, imp.details(), Diff::ONE),
591 ));
592 }
593 }
594 _ => unreachable!("all operators must be scalar functions"),
595 }
596 }
597
598 for ip in &catalog.state.egress_addresses {
599 builtin_table_updates.push(
600 catalog
601 .state
602 .resolve_builtin_table_update(catalog.state.pack_egress_ip_update(ip)?),
603 );
604 }
605
606 if !catalog.state.license_key.id.is_empty() {
607 builtin_table_updates.push(
608 catalog.state.resolve_builtin_table_update(
609 catalog
610 .state
611 .pack_license_key_update(&catalog.state.license_key)?,
612 ),
613 );
614 }
615
616 catalog.storage().await.mark_bootstrap_complete().await;
617
618 Ok(OpenCatalogResult {
619 catalog,
620 migrated_storage_collections_0dt,
621 new_builtin_collections,
622 builtin_table_updates,
623 cached_global_exprs,
624 uncached_local_exprs,
625 })
626 }
627 .instrument(tracing::info_span!("catalog::open"))
628 .boxed()
629 }
630
631 async fn initialize_storage_state(
638 &mut self,
639 storage_collections: &Arc<dyn StorageCollections + Send + Sync>,
640 ) -> Result<(), mz_catalog::durable::CatalogError> {
641 let collections = self
642 .entries()
643 .filter(|entry| entry.item().is_storage_collection())
644 .flat_map(|entry| entry.global_ids())
645 .collect();
646
647 let mut state = self.state.clone();
650
651 let mut storage = self.storage().await;
652 let shard_id = storage.shard_id();
653 let mut txn = storage.transaction().await?;
654
655 let item_id = self.resolve_builtin_storage_collection(&MZ_CATALOG_RAW);
658 let global_id = self.get_entry(&item_id).latest_global_id();
659 match txn.get_collection_metadata().get(&global_id) {
660 None => {
661 txn.insert_collection_metadata([(global_id, shard_id)].into())
662 .map_err(mz_catalog::durable::DurableCatalogError::from)?;
663 }
664 Some(id) => assert_eq!(*id, shard_id),
665 }
666
667 storage_collections
668 .initialize_state(&mut txn, collections)
669 .await
670 .map_err(mz_catalog::durable::DurableCatalogError::from)?;
671
672 let updates = txn.get_and_commit_op_updates();
673 let (builtin_updates, catalog_updates) = state
674 .apply_updates(updates, &mut LocalExpressionCache::Closed)
675 .await;
676 assert!(
677 builtin_updates.is_empty(),
678 "storage is not allowed to generate catalog changes that would cause changes to builtin tables"
679 );
680 assert!(
681 catalog_updates.is_empty(),
682 "storage is not allowed to generate catalog changes that would change the catalog or controller state"
683 );
684 let commit_ts = txn.upper();
685 txn.commit(commit_ts).await?;
686 drop(storage);
687
688 self.state = state;
690 Ok(())
691 }
692
693 pub async fn initialize_controller(
696 &mut self,
697 config: mz_controller::ControllerConfig,
698 envd_epoch: core::num::NonZeroI64,
699 read_only: bool,
700 ) -> Result<mz_controller::Controller, mz_catalog::durable::CatalogError> {
701 let controller_start = Instant::now();
702 info!("startup: controller init: beginning");
703
704 let controller = {
705 let mut storage = self.storage().await;
706 let mut tx = storage.transaction().await?;
707 mz_controller::prepare_initialization(&mut tx)
708 .map_err(mz_catalog::durable::DurableCatalogError::from)?;
709 let updates = tx.get_and_commit_op_updates();
710 assert!(
711 updates.is_empty(),
712 "initializing controller should not produce updates: {updates:?}"
713 );
714 let commit_ts = tx.upper();
715 tx.commit(commit_ts).await?;
716
717 let read_only_tx = storage.transaction().await?;
718
719 mz_controller::Controller::new(config, envd_epoch, read_only, &read_only_tx).await
720 };
721
722 self.initialize_storage_state(&controller.storage_collections)
723 .await?;
724
725 info!(
726 "startup: controller init: complete in {:?}",
727 controller_start.elapsed()
728 );
729
730 Ok(controller)
731 }
732
733 pub async fn expire(self) {
735 if let Some(storage) = Arc::into_inner(self.storage) {
738 let storage = storage.into_inner();
739 storage.expire().await;
740 }
741 }
742}
743
744impl CatalogState {
745 fn set_system_configuration_default(
747 &mut self,
748 name: &str,
749 value: VarInput,
750 ) -> Result<(), Error> {
751 Ok(Arc::make_mut(&mut self.system_configuration).set_default(name, value)?)
752 }
753}
754
755fn add_new_remove_old_builtin_items_migration(
759 builtins_cfg: &BuiltinsConfig,
760 txn: &mut mz_catalog::durable::Transaction<'_>,
761) -> Result<Vec<GlobalId>, mz_catalog::durable::CatalogError> {
762 let mut new_builtin_mappings = Vec::new();
763 let mut builtin_descs = HashSet::new();
765
766 let mut builtins = Vec::new();
769 for builtin in BUILTINS::iter(builtins_cfg) {
770 let desc = SystemObjectDescription {
771 schema_name: builtin.schema().to_string(),
772 object_type: builtin.catalog_item_type(),
773 object_name: builtin.name().to_string(),
774 };
775 if !builtin_descs.insert(desc.clone()) {
777 panic!(
778 "duplicate builtin description: {:?}, {:?}",
779 SystemObjectDescription {
780 schema_name: builtin.schema().to_string(),
781 object_type: builtin.catalog_item_type(),
782 object_name: builtin.name().to_string(),
783 },
784 builtin
785 );
786 }
787 builtins.push((desc, builtin));
788 }
789
790 let mut system_object_mappings: BTreeMap<_, _> = txn
791 .get_system_object_mappings()
792 .map(|system_object_mapping| {
793 (
794 system_object_mapping.description.clone(),
795 system_object_mapping,
796 )
797 })
798 .collect();
799
800 let (existing_builtins, new_builtins): (Vec<_>, Vec<_>) =
801 builtins.into_iter().partition_map(|(desc, builtin)| {
802 let fingerprint = match builtin.runtime_alterable() {
803 false => builtin.fingerprint(),
804 true => RUNTIME_ALTERABLE_FINGERPRINT_SENTINEL.into(),
805 };
806 match system_object_mappings.remove(&desc) {
807 Some(system_object_mapping) => {
808 Either::Left((builtin, system_object_mapping, fingerprint))
809 }
810 None => Either::Right((builtin, fingerprint)),
811 }
812 });
813 let new_builtin_ids = txn.allocate_system_item_ids(usize_to_u64(new_builtins.len()))?;
814 let new_builtins: Vec<_> = new_builtins
815 .into_iter()
816 .zip_eq(new_builtin_ids.clone())
817 .collect();
818
819 for ((builtin, fingerprint), (catalog_id, global_id)) in new_builtins.iter().cloned() {
821 new_builtin_mappings.push(SystemObjectMapping {
822 description: SystemObjectDescription {
823 schema_name: builtin.schema().to_string(),
824 object_type: builtin.catalog_item_type(),
825 object_name: builtin.name().to_string(),
826 },
827 unique_identifier: SystemObjectUniqueIdentifier {
828 catalog_id,
829 global_id,
830 fingerprint,
831 },
832 });
833
834 let handled_runtime_alterable = match builtin {
840 Builtin::Connection(c) if c.runtime_alterable => {
841 let mut acl_items = vec![rbac::owner_privilege(
842 mz_sql::catalog::ObjectType::Connection,
843 c.owner_id.clone(),
844 )];
845 acl_items.extend_from_slice(c.access);
846 let versions = BTreeMap::new();
848
849 txn.insert_item(
850 catalog_id,
851 c.oid,
852 global_id,
853 mz_catalog::durable::initialize::resolve_system_schema(c.schema).id,
854 c.name,
855 c.sql.into(),
856 *c.owner_id,
857 acl_items,
858 versions,
859 )?;
860 true
861 }
862 _ => false,
863 };
864 assert_eq!(
865 builtin.runtime_alterable(),
866 handled_runtime_alterable,
867 "runtime alterable object was not handled by migration",
868 );
869 }
870 txn.set_system_object_mappings(new_builtin_mappings)?;
871
872 let builtins_with_catalog_ids = existing_builtins
874 .iter()
875 .map(|(b, m, _)| (*b, m.unique_identifier.catalog_id))
876 .chain(
877 new_builtins
878 .into_iter()
879 .map(|((b, _), (catalog_id, _))| (b, catalog_id)),
880 );
881
882 for (builtin, id) in builtins_with_catalog_ids {
883 let (comment_id, desc, comments) = match builtin {
884 Builtin::Source(s) => (CommentObjectId::Source(id), &s.desc, &s.column_comments),
885 Builtin::View(v) => (CommentObjectId::View(id), &v.desc, &v.column_comments),
886 Builtin::Table(t) => (CommentObjectId::Table(id), &t.desc, &t.column_comments),
887 Builtin::MaterializedView(mv) => (
888 CommentObjectId::MaterializedView(id),
889 &mv.desc,
890 &mv.column_comments,
891 ),
892 Builtin::Log(_)
893 | Builtin::Type(_)
894 | Builtin::Func(_)
895 | Builtin::ContinualTask(_)
896 | Builtin::Index(_)
897 | Builtin::Connection(_) => continue,
898 };
899 txn.drop_comments(&BTreeSet::from_iter([comment_id]))?;
900
901 let mut comments = comments.clone();
902 for (col_idx, name) in desc.iter_names().enumerate() {
903 if let Some(comment) = comments.remove(name.as_str()) {
904 txn.update_comment(comment_id, Some(col_idx + 1), Some(comment.to_owned()))?;
906 }
907 }
908 assert!(
909 comments.is_empty(),
910 "builtin object contains dangling comments that don't correspond to columns {comments:?}"
911 );
912 }
913
914 let mut deleted_system_objects = BTreeSet::new();
917 let mut deleted_runtime_alterable_system_ids = BTreeSet::new();
918 let mut deleted_comments = BTreeSet::new();
919 for (desc, mapping) in system_object_mappings {
920 deleted_system_objects.insert(mapping.description);
921 if mapping.unique_identifier.fingerprint == RUNTIME_ALTERABLE_FINGERPRINT_SENTINEL {
922 deleted_runtime_alterable_system_ids.insert(mapping.unique_identifier.catalog_id);
923 }
924
925 let id = mapping.unique_identifier.catalog_id;
926 let comment_id = match desc.object_type {
927 CatalogItemType::Table => CommentObjectId::Table(id),
928 CatalogItemType::Source => CommentObjectId::Source(id),
929 CatalogItemType::View => CommentObjectId::View(id),
930 CatalogItemType::MaterializedView => CommentObjectId::MaterializedView(id),
931 CatalogItemType::Sink
932 | CatalogItemType::Index
933 | CatalogItemType::Type
934 | CatalogItemType::Func
935 | CatalogItemType::Secret
936 | CatalogItemType::Connection
937 | CatalogItemType::ContinualTask => continue,
938 };
939 deleted_comments.insert(comment_id);
940 }
941 let delete_exceptions: HashSet<SystemObjectDescription> = [].into();
947 assert!(
951 deleted_system_objects
952 .iter()
953 .filter(|object| object.object_type != CatalogItemType::Index)
955 .all(
956 |deleted_object| is_unstable_schema(&deleted_object.schema_name)
957 || delete_exceptions.contains(deleted_object)
958 ),
959 "only objects in unstable schemas can be deleted, deleted objects: {:?}",
960 deleted_system_objects
961 );
962 txn.drop_comments(&deleted_comments)?;
963 txn.remove_items(&deleted_runtime_alterable_system_ids)?;
964 txn.remove_system_object_mappings(deleted_system_objects)?;
965
966 let new_builtin_collections = new_builtin_ids
968 .into_iter()
969 .map(|(_catalog_id, global_id)| global_id)
970 .collect();
971
972 Ok(new_builtin_collections)
973}
974
975fn add_new_remove_old_builtin_clusters_migration(
976 txn: &mut mz_catalog::durable::Transaction<'_>,
977 builtin_cluster_config_map: &BuiltinBootstrapClusterConfigMap,
978 boot_ts: Timestamp,
979) -> Result<(), mz_catalog::durable::CatalogError> {
980 let mut durable_clusters: BTreeMap<_, _> = txn
981 .get_clusters()
982 .filter(|cluster| cluster.id.is_system())
983 .map(|cluster| (cluster.name.to_string(), cluster))
984 .collect();
985
986 for builtin_cluster in BUILTIN_CLUSTERS {
988 if durable_clusters.remove(builtin_cluster.name).is_none() {
989 let cluster_config = builtin_cluster_config_map.get_config(builtin_cluster.name)?;
990
991 let cluster_id = txn.insert_system_cluster(
992 builtin_cluster.name,
993 vec![],
994 builtin_cluster.privileges.to_vec(),
995 builtin_cluster.owner_id.to_owned(),
996 mz_catalog::durable::ClusterConfig {
997 variant: mz_catalog::durable::ClusterVariant::Managed(ClusterVariantManaged {
998 size: cluster_config.size,
999 availability_zones: vec![],
1000 replication_factor: cluster_config.replication_factor,
1001 logging: default_logging_config(),
1002 optimizer_feature_overrides: Default::default(),
1003 schedule: Default::default(),
1004 }),
1005 workload_class: None,
1006 },
1007 &HashSet::new(),
1008 )?;
1009
1010 let audit_id = txn.allocate_audit_log_id()?;
1011 txn.insert_audit_log_event(VersionedEvent::new(
1012 audit_id,
1013 EventType::Create,
1014 ObjectType::Cluster,
1015 EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
1016 id: cluster_id.to_string(),
1017 name: builtin_cluster.name.to_string(),
1018 }),
1019 None,
1020 boot_ts.into(),
1021 ));
1022 }
1023 }
1024
1025 let old_clusters = durable_clusters
1027 .values()
1028 .map(|cluster| cluster.id)
1029 .collect();
1030 txn.remove_clusters(&old_clusters)?;
1031
1032 for (_name, cluster) in &durable_clusters {
1033 let audit_id = txn.allocate_audit_log_id()?;
1034 txn.insert_audit_log_event(VersionedEvent::new(
1035 audit_id,
1036 EventType::Drop,
1037 ObjectType::Cluster,
1038 EventDetails::IdNameV1(mz_audit_log::IdNameV1 {
1039 id: cluster.id.to_string(),
1040 name: cluster.name.clone(),
1041 }),
1042 None,
1043 boot_ts.into(),
1044 ));
1045 }
1046
1047 Ok(())
1048}
1049
1050fn add_new_remove_old_builtin_introspection_source_migration(
1051 txn: &mut mz_catalog::durable::Transaction<'_>,
1052) -> Result<(), AdapterError> {
1053 let mut new_indexes = Vec::new();
1054 let mut removed_indexes = BTreeSet::new();
1055 for cluster in txn.get_clusters() {
1056 let mut introspection_source_index_ids = txn.get_introspection_source_indexes(cluster.id);
1057
1058 let mut new_logs = Vec::new();
1059
1060 for log in BUILTINS::logs() {
1061 if introspection_source_index_ids.remove(log.name).is_none() {
1062 new_logs.push(log);
1063 }
1064 }
1065
1066 for log in new_logs {
1067 let (item_id, gid) =
1068 Transaction::allocate_introspection_source_index_id(&cluster.id, log.variant);
1069 new_indexes.push((cluster.id, log.name.to_string(), item_id, gid));
1070 }
1071
1072 removed_indexes.extend(
1075 introspection_source_index_ids
1076 .into_keys()
1077 .map(|name| (cluster.id, name.to_string())),
1078 );
1079 }
1080 txn.insert_introspection_source_indexes(new_indexes, &HashSet::new())?;
1081 txn.remove_introspection_source_indexes(removed_indexes)?;
1082 Ok(())
1083}
1084
1085fn add_new_remove_old_builtin_roles_migration(
1086 txn: &mut mz_catalog::durable::Transaction<'_>,
1087) -> Result<(), mz_catalog::durable::CatalogError> {
1088 let mut durable_roles: BTreeMap<_, _> = txn
1089 .get_roles()
1090 .filter(|role| role.id.is_system() || role.id.is_predefined())
1091 .map(|role| (role.name.to_string(), role))
1092 .collect();
1093
1094 for builtin_role in BUILTIN_ROLES {
1096 if durable_roles.remove(builtin_role.name).is_none() {
1097 txn.insert_builtin_role(
1098 builtin_role.id,
1099 builtin_role.name.to_string(),
1100 builtin_role.attributes.clone(),
1101 RoleMembership::new(),
1102 RoleVars::default(),
1103 builtin_role.oid,
1104 )?;
1105 }
1106 }
1107
1108 let old_roles = durable_roles.values().map(|role| role.id).collect();
1110 txn.remove_roles(&old_roles)?;
1111
1112 Ok(())
1113}
1114
1115fn add_new_remove_old_builtin_cluster_replicas_migration(
1116 txn: &mut Transaction<'_>,
1117 builtin_cluster_config_map: &BuiltinBootstrapClusterConfigMap,
1118 boot_ts: Timestamp,
1119) -> Result<(), AdapterError> {
1120 let cluster_lookup: BTreeMap<_, _> = txn
1121 .get_clusters()
1122 .map(|cluster| (cluster.name.clone(), cluster.clone()))
1123 .collect();
1124
1125 let cluster_id_to_name: BTreeMap<ClusterId, String> = cluster_lookup
1126 .values()
1127 .map(|cluster| (cluster.id, cluster.name.clone()))
1128 .collect();
1129
1130 let mut durable_replicas: BTreeMap<ClusterId, BTreeMap<String, ClusterReplica>> = txn
1131 .get_cluster_replicas()
1132 .filter(|replica| replica.replica_id.is_system())
1133 .fold(BTreeMap::new(), |mut acc, replica| {
1134 acc.entry(replica.cluster_id)
1135 .or_insert_with(BTreeMap::new)
1136 .insert(replica.name.to_string(), replica);
1137 acc
1138 });
1139
1140 for builtin_replica in BUILTIN_CLUSTER_REPLICAS {
1142 let cluster = cluster_lookup
1143 .get(builtin_replica.cluster_name)
1144 .expect("builtin cluster replica references non-existent cluster");
1145 let mut empty_map: BTreeMap<String, ClusterReplica> = BTreeMap::new();
1147 let replica_names = durable_replicas
1148 .get_mut(&cluster.id)
1149 .unwrap_or(&mut empty_map);
1150
1151 let builtin_cluster_bootstrap_config =
1152 builtin_cluster_config_map.get_config(builtin_replica.cluster_name)?;
1153 if replica_names.remove(builtin_replica.name).is_none()
1154 && builtin_cluster_bootstrap_config.replication_factor > 0
1158 {
1159 let replica_size = match cluster.config.variant {
1160 ClusterVariant::Managed(ClusterVariantManaged { ref size, .. }) => size.clone(),
1161 ClusterVariant::Unmanaged => builtin_cluster_bootstrap_config.size.clone(),
1162 };
1163
1164 let config = builtin_cluster_replica_config(replica_size.clone());
1165 let replica_id = txn.insert_cluster_replica(
1166 cluster.id,
1167 builtin_replica.name,
1168 config,
1169 MZ_SYSTEM_ROLE_ID,
1170 )?;
1171
1172 let audit_id = txn.allocate_audit_log_id()?;
1173 txn.insert_audit_log_event(VersionedEvent::new(
1174 audit_id,
1175 EventType::Create,
1176 ObjectType::ClusterReplica,
1177 EventDetails::CreateClusterReplicaV4(mz_audit_log::CreateClusterReplicaV4 {
1178 cluster_id: cluster.id.to_string(),
1179 cluster_name: cluster.name.clone(),
1180 replica_id: Some(replica_id.to_string()),
1181 replica_name: builtin_replica.name.to_string(),
1182 logical_size: replica_size,
1183 billed_as: None,
1184 internal: false,
1185 reason: CreateOrDropClusterReplicaReasonV1::System,
1186 scheduling_policies: None,
1187 }),
1188 None,
1189 boot_ts.into(),
1190 ));
1191 }
1192 }
1193
1194 let old_replicas: Vec<_> = durable_replicas
1196 .values()
1197 .flat_map(|replicas| replicas.values())
1198 .collect();
1199 let old_replica_ids = old_replicas.iter().map(|r| r.replica_id).collect();
1200 txn.remove_cluster_replicas(&old_replica_ids)?;
1201
1202 for replica in &old_replicas {
1203 let cluster_name = cluster_id_to_name
1204 .get(&replica.cluster_id)
1205 .cloned()
1206 .unwrap_or_else(|| "<unknown>".to_string());
1207
1208 let audit_id = txn.allocate_audit_log_id()?;
1209 txn.insert_audit_log_event(VersionedEvent::new(
1210 audit_id,
1211 EventType::Drop,
1212 ObjectType::ClusterReplica,
1213 EventDetails::DropClusterReplicaV3(mz_audit_log::DropClusterReplicaV3 {
1214 cluster_id: replica.cluster_id.to_string(),
1215 cluster_name,
1216 replica_id: Some(replica.replica_id.to_string()),
1217 replica_name: replica.name.clone(),
1218 reason: CreateOrDropClusterReplicaReasonV1::System,
1219 scheduling_policies: None,
1220 }),
1221 None,
1222 boot_ts.into(),
1223 ));
1224 }
1225
1226 Ok(())
1227}
1228
1229fn remove_invalid_config_param_role_defaults_migration(
1236 txn: &mut Transaction<'_>,
1237) -> Result<(), AdapterError> {
1238 static BUILD_INFO: mz_build_info::BuildInfo = mz_build_info::build_info!();
1239
1240 let roles_to_migrate: BTreeMap<_, _> = txn
1241 .get_roles()
1242 .filter_map(|mut role| {
1243 let session_vars = SessionVars::new_unchecked(&BUILD_INFO, SYSTEM_USER.clone(), None);
1248
1249 let mut invalid_roles_vars = BTreeMap::new();
1251 for (name, value) in &role.vars.map {
1252 let Ok(session_var) = session_vars.inspect(name) else {
1254 invalid_roles_vars.insert(name.clone(), value.clone());
1255 continue;
1256 };
1257 if session_var.check(value.borrow()).is_err() {
1258 invalid_roles_vars.insert(name.clone(), value.clone());
1259 }
1260 }
1261
1262 if invalid_roles_vars.is_empty() {
1264 return None;
1265 }
1266
1267 tracing::warn!(?role, ?invalid_roles_vars, "removing invalid role vars");
1268
1269 for (name, _value) in invalid_roles_vars {
1271 role.vars.map.remove(&name);
1272 }
1273 Some(role)
1274 })
1275 .map(|role| (role.id, role))
1276 .collect();
1277
1278 txn.update_roles_without_auth(roles_to_migrate)?;
1279
1280 Ok(())
1281}
1282
1283fn remove_pending_cluster_replicas_migration(
1286 tx: &mut Transaction,
1287 boot_ts: mz_repr::Timestamp,
1288) -> Result<(), anyhow::Error> {
1289 let cluster_names: BTreeMap<_, _> = tx.get_clusters().map(|c| (c.id, c.name)).collect();
1291
1292 let occurred_at = boot_ts.into();
1293
1294 for replica in tx.get_cluster_replicas().collect::<Vec<_>>() {
1295 if let mz_catalog::durable::ReplicaLocation::Managed { pending: true, .. } =
1296 replica.config.location
1297 {
1298 let cluster_name = cluster_names
1299 .get(&replica.cluster_id)
1300 .cloned()
1301 .unwrap_or_else(|| "<unknown>".to_string());
1302
1303 info!(
1304 "removing pending cluster replica '{}' from cluster '{}'",
1305 replica.name, cluster_name,
1306 );
1307
1308 tx.remove_cluster_replica(replica.replica_id)?;
1309
1310 let audit_id = tx.allocate_audit_log_id()?;
1314 tx.insert_audit_log_event(VersionedEvent::new(
1315 audit_id,
1316 EventType::Drop,
1317 ObjectType::ClusterReplica,
1318 EventDetails::DropClusterReplicaV3(mz_audit_log::DropClusterReplicaV3 {
1319 cluster_id: replica.cluster_id.to_string(),
1320 cluster_name,
1321 replica_id: Some(replica.replica_id.to_string()),
1322 replica_name: replica.name,
1323 reason: CreateOrDropClusterReplicaReasonV1::System,
1324 scheduling_policies: None,
1325 }),
1326 None,
1327 occurred_at,
1328 ));
1329 }
1330 }
1331 Ok(())
1332}
1333
1334pub(crate) fn builtin_cluster_replica_config(
1335 replica_size: String,
1336) -> mz_catalog::durable::ReplicaConfig {
1337 mz_catalog::durable::ReplicaConfig {
1338 location: mz_catalog::durable::ReplicaLocation::Managed {
1339 availability_zone: None,
1340 billed_as: None,
1341 pending: false,
1342 internal: false,
1343 size: replica_size,
1344 },
1345 logging: default_logging_config(),
1346 }
1347}
1348
1349fn default_logging_config() -> ReplicaLogging {
1350 ReplicaLogging {
1351 log_logging: false,
1352 interval: Some(Duration::from_secs(1)),
1353 }
1354}
1355
1356#[derive(Debug)]
1357pub struct BuiltinBootstrapClusterConfigMap {
1358 pub system_cluster: BootstrapBuiltinClusterConfig,
1360 pub catalog_server_cluster: BootstrapBuiltinClusterConfig,
1362 pub probe_cluster: BootstrapBuiltinClusterConfig,
1364 pub support_cluster: BootstrapBuiltinClusterConfig,
1366 pub analytics_cluster: BootstrapBuiltinClusterConfig,
1368}
1369
1370impl BuiltinBootstrapClusterConfigMap {
1371 fn get_config(
1373 &self,
1374 cluster_name: &str,
1375 ) -> Result<BootstrapBuiltinClusterConfig, mz_catalog::durable::CatalogError> {
1376 let cluster_config = if cluster_name == mz_catalog::builtin::MZ_SYSTEM_CLUSTER.name {
1377 &self.system_cluster
1378 } else if cluster_name == mz_catalog::builtin::MZ_CATALOG_SERVER_CLUSTER.name {
1379 &self.catalog_server_cluster
1380 } else if cluster_name == mz_catalog::builtin::MZ_PROBE_CLUSTER.name {
1381 &self.probe_cluster
1382 } else if cluster_name == mz_catalog::builtin::MZ_SUPPORT_CLUSTER.name {
1383 &self.support_cluster
1384 } else if cluster_name == mz_catalog::builtin::MZ_ANALYTICS_CLUSTER.name {
1385 &self.analytics_cluster
1386 } else {
1387 return Err(mz_catalog::durable::CatalogError::Catalog(
1388 SqlCatalogError::UnexpectedBuiltinCluster(cluster_name.to_owned()),
1389 ));
1390 };
1391 Ok(cluster_config.clone())
1392 }
1393}
1394
1395pub(crate) fn into_consolidatable_updates_startup(
1412 updates: Vec<StateUpdate>,
1413 ts: Timestamp,
1414) -> Vec<(BootstrapStateUpdateKind, Timestamp, Diff)> {
1415 updates
1416 .into_iter()
1417 .map(|StateUpdate { kind, ts: _, diff }| {
1418 let kind: BootstrapStateUpdateKind = kind
1419 .try_into()
1420 .unwrap_or_else(|e| panic!("temporary items do not exist during bootstrap: {e:?}"));
1421 (kind, ts, Diff::from(diff))
1422 })
1423 .collect()
1424}
1425
1426fn get_dyncfg_val_from_defaults_and_remote<T: mz_dyncfg::ConfigDefault>(
1427 defaults: &BTreeMap<String, String>,
1428 remote: Option<&BTreeMap<String, String>>,
1429 cfg: &mz_dyncfg::Config<T>,
1430) -> T::ConfigType {
1431 let mut val = T::into_config_type(cfg.default().clone());
1432 let get_fn = |map: &BTreeMap<String, String>| {
1433 let val = map.get(cfg.name())?;
1434 match <T::ConfigType as mz_dyncfg::ConfigType>::parse(val) {
1435 Ok(x) => Some(x),
1436 Err(err) => {
1437 tracing::warn!("could not parse {} value [{}]: {}", cfg.name(), val, err);
1438 None
1439 }
1440 }
1441 };
1442 if let Some(x) = get_fn(defaults) {
1443 val = x;
1444 }
1445 if let Some(x) = remote.and_then(get_fn) {
1446 val = x;
1447 }
1448 val
1449}