Skip to main content

mz_adapter/coord/
catalog_serving.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Special cases related to the "catalog serving" of Materialize
11//!
12//! Every Materialize deployment has a pre-installed [`mz_catalog_server`] cluster, which
13//! has several indexes to speed up common catalog queries. We also have a special
14//! `mz_support` role, which can be used by support teams to diagnose a deployment.
15//! For each of these use cases, we have some special restrictions we want to apply. The
16//! logic around these restrictions is defined here.
17//!
18//!
19//! [`mz_catalog_server`]: https://materialize.com/docs/sql/show-clusters/#mz_catalog_server-system-cluster
20
21use mz_expr::CollectionPlan;
22use mz_repr::GlobalId;
23use mz_repr::namespaces::is_system_schema;
24use mz_sql::catalog::SessionCatalog;
25use mz_sql::plan::{
26    ExplainPlanPlan, ExplainTimestampPlan, Explainee, ExplaineeStatement, Plan, SubscribeFrom,
27    SubscribePlan,
28};
29use smallvec::SmallVec;
30
31use crate::AdapterError;
32use crate::catalog::ConnCatalog;
33use crate::coord::TargetCluster;
34use crate::notice::AdapterNotice;
35use crate::session::Session;
36use mz_catalog::builtin::MZ_CATALOG_SERVER_CLUSTER;
37
38/// Checks whether or not we should automatically run a query on the `mz_catalog_server`
39/// cluster, as opposed to whatever the current default cluster is.
40pub fn auto_run_on_catalog_server<'a, 's, 'p>(
41    catalog: &'a ConnCatalog<'a>,
42    session: &'s Session,
43    plan: &'p Plan,
44) -> TargetCluster {
45    let inspect_subscribe = |plan: &SubscribePlan| {
46        (
47            plan.from.depends_on(),
48            match &plan.from {
49                SubscribeFrom::Id(_) => false,
50                SubscribeFrom::Query { expr, desc: _ } => expr.could_run_expensive_function(),
51            },
52        )
53    };
54
55    let (depends_on, could_run_expensive_function) = match plan {
56        Plan::Select(plan) => (
57            plan.source.depends_on(),
58            plan.source.could_run_expensive_function(),
59        ),
60        Plan::ShowColumns(plan) => (
61            plan.select_plan.source.depends_on(),
62            plan.select_plan.source.could_run_expensive_function(),
63        ),
64        Plan::Subscribe(plan) => inspect_subscribe(plan),
65        Plan::ExplainPlan(ExplainPlanPlan {
66            explainee: Explainee::Statement(ExplaineeStatement::Select { plan, .. }),
67            ..
68        }) => (
69            plan.source.depends_on(),
70            plan.source.could_run_expensive_function(),
71        ),
72        Plan::ExplainPlan(ExplainPlanPlan {
73            explainee: Explainee::Statement(ExplaineeStatement::Subscribe { plan, .. }),
74            ..
75        }) => inspect_subscribe(plan),
76        Plan::ExplainTimestamp(ExplainTimestampPlan { raw_plan, .. }) => (
77            raw_plan.depends_on(),
78            raw_plan.could_run_expensive_function(),
79        ),
80        Plan::CreateConnection(_)
81        | Plan::CreateDatabase(_)
82        | Plan::CreateSchema(_)
83        | Plan::CreateRole(_)
84        | Plan::CreateNetworkPolicy(_)
85        | Plan::CreateCluster(_)
86        | Plan::CreateClusterReplica(_)
87        | Plan::CreateContinualTask(_)
88        | Plan::CreateSource(_)
89        | Plan::CreateSources(_)
90        | Plan::CreateSecret(_)
91        | Plan::CreateSink(_)
92        | Plan::CreateTable(_)
93        | Plan::CreateView(_)
94        | Plan::CreateMaterializedView(_)
95        | Plan::CreateIndex(_)
96        | Plan::CreateType(_)
97        | Plan::Comment(_)
98        | Plan::DiscardTemp
99        | Plan::DiscardAll
100        | Plan::DropObjects(_)
101        | Plan::DropOwned(_)
102        | Plan::EmptyQuery
103        | Plan::ShowAllVariables
104        | Plan::ShowCreate(_)
105        | Plan::ShowVariable(_)
106        | Plan::InspectShard(_)
107        | Plan::SetVariable(_)
108        | Plan::ResetVariable(_)
109        | Plan::SetTransaction(_)
110        | Plan::StartTransaction(_)
111        | Plan::CommitTransaction(_)
112        | Plan::AbortTransaction(_)
113        | Plan::CopyFrom(_)
114        | Plan::CopyTo(_)
115        | Plan::ExplainPlan(ExplainPlanPlan {
116            explainee:
117                Explainee::Statement(
118                    // Explicitly list all enum variants, to avoid bugs when somebody
119                    // adds a new variant (e.g., for `Plan::Execute`).
120                    ExplaineeStatement::CreateView { .. }
121                    | ExplaineeStatement::CreateMaterializedView { .. }
122                    | ExplaineeStatement::CreateIndex { .. },
123                ),
124            ..
125        })
126        | Plan::ExplainPlan(ExplainPlanPlan { explainee: _, .. })
127        | Plan::ExplainPushdown(_)
128        | Plan::ExplainSinkSchema(_)
129        | Plan::Insert(_)
130        | Plan::AlterNetworkPolicy(_)
131        | Plan::AlterNoop(_)
132        | Plan::AlterClusterRename(_)
133        | Plan::AlterClusterSwap(_)
134        | Plan::AlterClusterReplicaRename(_)
135        | Plan::AlterCluster(_)
136        | Plan::AlterConnection(_)
137        | Plan::AlterSource(_)
138        | Plan::AlterSetCluster(_)
139        | Plan::AlterItemRename(_)
140        | Plan::AlterRetainHistory(_)
141        | Plan::AlterSourceTimestampInterval(_)
142        | Plan::AlterSchemaRename(_)
143        | Plan::AlterSchemaSwap(_)
144        | Plan::AlterSecret(_)
145        | Plan::AlterSink(_)
146        | Plan::AlterSystemSet(_)
147        | Plan::AlterSystemReset(_)
148        | Plan::AlterSystemResetAll(_)
149        | Plan::AlterRole(_)
150        | Plan::AlterOwner(_)
151        | Plan::AlterTableAddColumn(_)
152        | Plan::AlterMaterializedViewApplyReplacement(_)
153        | Plan::Declare(_)
154        | Plan::Fetch(_)
155        | Plan::Close(_)
156        | Plan::ReadThenWrite(_)
157        | Plan::Prepare(_)
158        | Plan::Execute(_)
159        | Plan::Deallocate(_)
160        | Plan::Raise(_)
161        | Plan::GrantRole(_)
162        | Plan::RevokeRole(_)
163        | Plan::GrantPrivileges(_)
164        | Plan::RevokePrivileges(_)
165        | Plan::AlterDefaultPrivileges(_)
166        | Plan::ReassignOwned(_)
167        | Plan::ValidateConnection(_)
168        | Plan::SideEffectingFunc(_) => return TargetCluster::Active,
169    };
170
171    // Bail if the user has disabled it via the SessionVar.
172    if !session.vars().auto_route_catalog_queries() {
173        return TargetCluster::Active;
174    }
175
176    // We can't switch what cluster we're using, if the user has specified a replica.
177    if session.vars().cluster_replica().is_some() {
178        return TargetCluster::Active;
179    }
180
181    // These dependencies are just existing dataflows that are referenced in the plan.
182    let mut depends_on = depends_on
183        .into_iter()
184        .map(|gid| catalog.resolve_item_id(&gid))
185        .peekable();
186    let has_dependencies = depends_on.peek().is_some();
187
188    // Make sure we only depend on the system catalog, and nothing we depend on is a
189    // per-replica object, that requires being run a specific replica.
190    let valid_dependencies = depends_on.all(|id| {
191        let entry = catalog.state().get_entry(&id);
192        let schema = entry.name().qualifiers.schema_spec;
193
194        let system_only = catalog.state().is_system_schema_specifier(schema);
195        let non_replica = catalog.state().introspection_dependencies(id).is_empty();
196
197        system_only && non_replica
198    });
199
200    if (has_dependencies && valid_dependencies)
201        || (!has_dependencies && !could_run_expensive_function)
202    {
203        let intros_cluster = catalog
204            .state()
205            .resolve_builtin_cluster(&MZ_CATALOG_SERVER_CLUSTER);
206        tracing::debug!("Running on '{}' cluster", MZ_CATALOG_SERVER_CLUSTER.name);
207
208        // If we're running on a different cluster than the active one, notify the user.
209        if intros_cluster.name != session.vars().cluster() {
210            session.add_notice(AdapterNotice::AutoRunOnCatalogServerCluster);
211        }
212        TargetCluster::CatalogServer
213    } else {
214        TargetCluster::Active
215    }
216}
217
218/// Checks if we're currently running on the [`MZ_CATALOG_SERVER_CLUSTER`], and if so, do
219/// we depend on any objects that we're not allowed to query from the cluster.
220pub fn check_cluster_restrictions(
221    cluster: &str,
222    catalog: &impl SessionCatalog,
223    plan: &Plan,
224) -> Result<(), AdapterError> {
225    // We only impose restrictions if the current cluster is the catalog server cluster.
226    if cluster != MZ_CATALOG_SERVER_CLUSTER.name {
227        return Ok(());
228    }
229
230    // Only continue, and check restrictions, if a Plan would run some computation on the cluster.
231    //
232    // Note: We get the dependencies from the Plans themselves, because it's only after planning
233    // that we actually know what objects we'll need to reference.
234    //
235    // Note: Creating other objects like Materialized Views is prevented elsewhere. We define the
236    // 'mz_catalog_server' cluster to be "read-only", which restricts these actions.
237    let depends_on: Box<dyn Iterator<Item = GlobalId>> = match plan {
238        Plan::ReadThenWrite(plan) => Box::new(plan.selection.depends_on().into_iter()),
239        Plan::Subscribe(plan) => match plan.from {
240            SubscribeFrom::Id(id) => Box::new(std::iter::once(id)),
241            SubscribeFrom::Query { ref expr, .. } => Box::new(expr.depends_on().into_iter()),
242        },
243        Plan::Select(plan) => Box::new(plan.source.depends_on().into_iter()),
244        _ => return Ok(()),
245    };
246
247    // Collect any items that are not allowed to be run on the catalog server cluster.
248    let unallowed_dependents: SmallVec<[String; 2]> = depends_on
249        .filter_map(|id| {
250            let item = catalog.get_item_by_global_id(&id);
251            let full_name = catalog.resolve_full_name(item.name());
252
253            if !is_system_schema(&full_name.schema) {
254                Some(full_name.to_string())
255            } else {
256                None
257            }
258        })
259        .collect();
260
261    // If the query depends on unallowed items, error out.
262    if !unallowed_dependents.is_empty() {
263        Err(AdapterError::UnallowedOnCluster {
264            depends_on: unallowed_dependents,
265            cluster: MZ_CATALOG_SERVER_CLUSTER.name.to_string(),
266        })
267    } else {
268        Ok(())
269    }
270}