mz_adapter/coord/read_then_write.rs
1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Coordinator-side support machinery for (frontend) read-then write.
11
12use mz_catalog::memory::objects::CatalogItem;
13use mz_repr::CatalogItemId;
14use mz_sql::catalog::CatalogItemType;
15
16use crate::catalog::Catalog;
17use crate::error::AdapterError;
18
19/// Validates that all dependencies are valid for read-then-write operations.
20///
21/// Ensures all objects the selection depends on are valid for `ReadThenWrite` operations:
22///
23/// - They do not refer to any objects whose notion of time moves differently than that of
24/// user tables. This limitation is meant to ensure no writes occur between this read and the
25/// subsequent write.
26/// - They do not use mz_now(), whose time produced during read will differ from the write
27/// timestamp.
28pub(crate) fn validate_read_then_write_dependencies(
29 catalog: &Catalog,
30 id: &CatalogItemId,
31) -> Result<(), AdapterError> {
32 use CatalogItemType::*;
33 use mz_catalog::memory::objects;
34 let mut ids_to_check = Vec::new();
35 let valid = match catalog.try_get_entry(id) {
36 Some(entry) => {
37 if let CatalogItem::View(objects::View {
38 locally_optimized_expr: optimized_expr,
39 ..
40 })
41 | CatalogItem::MaterializedView(objects::MaterializedView {
42 locally_optimized_expr: optimized_expr,
43 ..
44 }) = entry.item()
45 {
46 if optimized_expr.contains_temporal() {
47 return Err(AdapterError::Unsupported(
48 "calls to mz_now in write statements",
49 ));
50 }
51 }
52 match entry.item().typ() {
53 typ @ (Func | View | MaterializedView) => {
54 ids_to_check.extend(entry.uses());
55 let valid_id = id.is_user() || matches!(typ, Func);
56 valid_id
57 }
58 Source | Secret | Connection => false,
59 // Cannot select from sinks or indexes.
60 Sink | Index => unreachable!(),
61 Table => {
62 if !id.is_user() {
63 // We can't read from non-user tables
64 false
65 } else {
66 // We can't read from tables that are source-exports
67 entry.source_export_details().is_none()
68 }
69 }
70 Type => true,
71 }
72 }
73 None => false,
74 };
75 if !valid {
76 let (object_name, object_type) = match catalog.try_get_entry(id) {
77 Some(entry) => {
78 let object_name = catalog.resolve_full_name(entry.name(), None).to_string();
79 let object_type = match entry.item().typ() {
80 // We only need the disallowed types here; the allowed types are handled above.
81 Source => "source",
82 Secret => "secret",
83 Connection => "connection",
84 Table => {
85 if !id.is_user() {
86 "system table"
87 } else {
88 "source-export table"
89 }
90 }
91 View => "system view",
92 MaterializedView => "system materialized view",
93 _ => "invalid dependency",
94 };
95 (object_name, object_type.to_string())
96 }
97 None => (id.to_string(), "unknown".to_string()),
98 };
99 return Err(AdapterError::InvalidTableMutationSelection {
100 object_name,
101 object_type,
102 });
103 }
104 for id in ids_to_check {
105 validate_read_then_write_dependencies(catalog, &id)?;
106 }
107 Ok(())
108}