Skip to main content

mz_adapter/coord/
catalog_serving.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Special cases related to the "catalog serving" of Materialize
11//!
12//! Every Materialize deployment has a pre-installed [`mz_catalog_server`] cluster, which
13//! has several indexes to speed up common catalog queries. We also have a special
14//! `mz_support` role, which can be used by support teams to diagnose a deployment.
15//! For each of these use cases, we have some special restrictions we want to apply. The
16//! logic around these restrictions is defined here.
17//!
18//!
19//! [`mz_catalog_server`]: https://materialize.com/docs/sql/show-clusters/#mz_catalog_server-system-cluster
20
21use mz_expr::CollectionPlan;
22use mz_repr::GlobalId;
23use mz_repr::namespaces::is_system_schema;
24use mz_sql::catalog::SessionCatalog;
25use mz_sql::plan::{
26    ExplainPlanPlan, ExplainTimestampPlan, Explainee, ExplaineeStatement, Plan, SubscribeFrom,
27};
28use smallvec::SmallVec;
29
30use crate::AdapterError;
31use crate::catalog::ConnCatalog;
32use crate::coord::TargetCluster;
33use crate::notice::AdapterNotice;
34use crate::session::Session;
35use mz_catalog::builtin::MZ_CATALOG_SERVER_CLUSTER;
36
37/// Checks whether or not we should automatically run a query on the `mz_catalog_server`
38/// cluster, as opposed to whatever the current default cluster is.
39pub fn auto_run_on_catalog_server<'a, 's, 'p>(
40    catalog: &'a ConnCatalog<'a>,
41    session: &'s Session,
42    plan: &'p Plan,
43) -> TargetCluster {
44    let (depends_on, could_run_expensive_function) = match plan {
45        Plan::Select(plan) => (
46            plan.source.depends_on(),
47            plan.source.could_run_expensive_function(),
48        ),
49        Plan::ShowColumns(plan) => (
50            plan.select_plan.source.depends_on(),
51            plan.select_plan.source.could_run_expensive_function(),
52        ),
53        Plan::Subscribe(plan) => (
54            plan.from.depends_on(),
55            match &plan.from {
56                SubscribeFrom::Id(_) => false,
57                SubscribeFrom::Query { expr, desc: _ } => expr.could_run_expensive_function(),
58            },
59        ),
60        Plan::ExplainPlan(ExplainPlanPlan {
61            explainee: Explainee::Statement(ExplaineeStatement::Select { plan, .. }),
62            ..
63        }) => (
64            plan.source.depends_on(),
65            plan.source.could_run_expensive_function(),
66        ),
67        Plan::ExplainTimestamp(ExplainTimestampPlan { raw_plan, .. }) => (
68            raw_plan.depends_on(),
69            raw_plan.could_run_expensive_function(),
70        ),
71        Plan::CreateConnection(_)
72        | Plan::CreateDatabase(_)
73        | Plan::CreateSchema(_)
74        | Plan::CreateRole(_)
75        | Plan::CreateNetworkPolicy(_)
76        | Plan::CreateCluster(_)
77        | Plan::CreateClusterReplica(_)
78        | Plan::CreateContinualTask(_)
79        | Plan::CreateSource(_)
80        | Plan::CreateSources(_)
81        | Plan::CreateSecret(_)
82        | Plan::CreateSink(_)
83        | Plan::CreateTable(_)
84        | Plan::CreateView(_)
85        | Plan::CreateMaterializedView(_)
86        | Plan::CreateIndex(_)
87        | Plan::CreateType(_)
88        | Plan::Comment(_)
89        | Plan::DiscardTemp
90        | Plan::DiscardAll
91        | Plan::DropObjects(_)
92        | Plan::DropOwned(_)
93        | Plan::EmptyQuery
94        | Plan::ShowAllVariables
95        | Plan::ShowCreate(_)
96        | Plan::ShowVariable(_)
97        | Plan::InspectShard(_)
98        | Plan::SetVariable(_)
99        | Plan::ResetVariable(_)
100        | Plan::SetTransaction(_)
101        | Plan::StartTransaction(_)
102        | Plan::CommitTransaction(_)
103        | Plan::AbortTransaction(_)
104        | Plan::CopyFrom(_)
105        | Plan::CopyTo(_)
106        | Plan::ExplainPlan(_)
107        | Plan::ExplainPushdown(_)
108        | Plan::ExplainSinkSchema(_)
109        | Plan::Insert(_)
110        | Plan::AlterNetworkPolicy(_)
111        | Plan::AlterNoop(_)
112        | Plan::AlterClusterRename(_)
113        | Plan::AlterClusterSwap(_)
114        | Plan::AlterClusterReplicaRename(_)
115        | Plan::AlterCluster(_)
116        | Plan::AlterConnection(_)
117        | Plan::AlterSource(_)
118        | Plan::AlterSetCluster(_)
119        | Plan::AlterItemRename(_)
120        | Plan::AlterRetainHistory(_)
121        | Plan::AlterSourceTimestampInterval(_)
122        | Plan::AlterSchemaRename(_)
123        | Plan::AlterSchemaSwap(_)
124        | Plan::AlterSecret(_)
125        | Plan::AlterSink(_)
126        | Plan::AlterSystemSet(_)
127        | Plan::AlterSystemReset(_)
128        | Plan::AlterSystemResetAll(_)
129        | Plan::AlterRole(_)
130        | Plan::AlterOwner(_)
131        | Plan::AlterTableAddColumn(_)
132        | Plan::AlterMaterializedViewApplyReplacement(_)
133        | Plan::Declare(_)
134        | Plan::Fetch(_)
135        | Plan::Close(_)
136        | Plan::ReadThenWrite(_)
137        | Plan::Prepare(_)
138        | Plan::Execute(_)
139        | Plan::Deallocate(_)
140        | Plan::Raise(_)
141        | Plan::GrantRole(_)
142        | Plan::RevokeRole(_)
143        | Plan::GrantPrivileges(_)
144        | Plan::RevokePrivileges(_)
145        | Plan::AlterDefaultPrivileges(_)
146        | Plan::ReassignOwned(_)
147        | Plan::ValidateConnection(_)
148        | Plan::SideEffectingFunc(_) => return TargetCluster::Active,
149    };
150
151    // Bail if the user has disabled it via the SessionVar.
152    if !session.vars().auto_route_catalog_queries() {
153        return TargetCluster::Active;
154    }
155
156    // We can't switch what cluster we're using, if the user has specified a replica.
157    if session.vars().cluster_replica().is_some() {
158        return TargetCluster::Active;
159    }
160
161    // These dependencies are just existing dataflows that are referenced in the plan.
162    let mut depends_on = depends_on
163        .into_iter()
164        .map(|gid| catalog.resolve_item_id(&gid))
165        .peekable();
166    let has_dependencies = depends_on.peek().is_some();
167
168    // Make sure we only depend on the system catalog, and nothing we depend on is a
169    // per-replica object, that requires being run a specific replica.
170    let valid_dependencies = depends_on.all(|id| {
171        let entry = catalog.state().get_entry(&id);
172        let schema = entry.name().qualifiers.schema_spec;
173
174        let system_only = catalog.state().is_system_schema_specifier(schema);
175        let non_replica = catalog.state().introspection_dependencies(id).is_empty();
176
177        system_only && non_replica
178    });
179
180    if (has_dependencies && valid_dependencies)
181        || (!has_dependencies && !could_run_expensive_function)
182    {
183        let intros_cluster = catalog
184            .state()
185            .resolve_builtin_cluster(&MZ_CATALOG_SERVER_CLUSTER);
186        tracing::debug!("Running on '{}' cluster", MZ_CATALOG_SERVER_CLUSTER.name);
187
188        // If we're running on a different cluster than the active one, notify the user.
189        if intros_cluster.name != session.vars().cluster() {
190            session.add_notice(AdapterNotice::AutoRunOnCatalogServerCluster);
191        }
192        TargetCluster::CatalogServer
193    } else {
194        TargetCluster::Active
195    }
196}
197
198/// Checks if we're currently running on the [`MZ_CATALOG_SERVER_CLUSTER`], and if so, do
199/// we depend on any objects that we're not allowed to query from the cluster.
200pub fn check_cluster_restrictions(
201    cluster: &str,
202    catalog: &impl SessionCatalog,
203    plan: &Plan,
204) -> Result<(), AdapterError> {
205    // We only impose restrictions if the current cluster is the catalog server cluster.
206    if cluster != MZ_CATALOG_SERVER_CLUSTER.name {
207        return Ok(());
208    }
209
210    // Only continue, and check restrictions, if a Plan would run some computation on the cluster.
211    //
212    // Note: We get the dependencies from the Plans themselves, because it's only after planning
213    // that we actually know what objects we'll need to reference.
214    //
215    // Note: Creating other objects like Materialized Views is prevented elsewhere. We define the
216    // 'mz_catalog_server' cluster to be "read-only", which restricts these actions.
217    let depends_on: Box<dyn Iterator<Item = GlobalId>> = match plan {
218        Plan::ReadThenWrite(plan) => Box::new(plan.selection.depends_on().into_iter()),
219        Plan::Subscribe(plan) => match plan.from {
220            SubscribeFrom::Id(id) => Box::new(std::iter::once(id)),
221            SubscribeFrom::Query { ref expr, .. } => Box::new(expr.depends_on().into_iter()),
222        },
223        Plan::Select(plan) => Box::new(plan.source.depends_on().into_iter()),
224        _ => return Ok(()),
225    };
226
227    // Collect any items that are not allowed to be run on the catalog server cluster.
228    let unallowed_dependents: SmallVec<[String; 2]> = depends_on
229        .filter_map(|id| {
230            let item = catalog.get_item_by_global_id(&id);
231            let full_name = catalog.resolve_full_name(item.name());
232
233            if !is_system_schema(&full_name.schema) {
234                Some(full_name.to_string())
235            } else {
236                None
237            }
238        })
239        .collect();
240
241    // If the query depends on unallowed items, error out.
242    if !unallowed_dependents.is_empty() {
243        Err(AdapterError::UnallowedOnCluster {
244            depends_on: unallowed_dependents,
245            cluster: MZ_CATALOG_SERVER_CLUSTER.name.to_string(),
246        })
247    } else {
248        Ok(())
249    }
250}