Skip to main content

mz_adapter/coord/
read_then_write.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Coordinator-side support machinery for (frontend) read-then write.
11
12use mz_catalog::memory::objects::CatalogItem;
13use mz_repr::CatalogItemId;
14use mz_sql::catalog::CatalogItemType;
15
16use crate::catalog::Catalog;
17use crate::error::AdapterError;
18
19/// Validates that all dependencies are valid for read-then-write operations.
20///
21/// Ensures all objects the selection depends on are valid for `ReadThenWrite` operations:
22///
23/// - They do not refer to any objects whose notion of time moves differently than that of
24///   user tables. This limitation is meant to ensure no writes occur between this read and the
25///   subsequent write.
26/// - They do not use mz_now(), whose time produced during read will differ from the write
27///   timestamp.
28pub(crate) fn validate_read_then_write_dependencies(
29    catalog: &Catalog,
30    id: &CatalogItemId,
31) -> Result<(), AdapterError> {
32    use CatalogItemType::*;
33    use mz_catalog::memory::objects;
34    let mut ids_to_check = Vec::new();
35    let valid = match catalog.try_get_entry(id) {
36        Some(entry) => {
37            if let CatalogItem::View(objects::View {
38                locally_optimized_expr: optimized_expr,
39                ..
40            })
41            | CatalogItem::MaterializedView(objects::MaterializedView {
42                locally_optimized_expr: optimized_expr,
43                ..
44            }) = entry.item()
45            {
46                if optimized_expr.contains_temporal() {
47                    return Err(AdapterError::Unsupported(
48                        "calls to mz_now in write statements",
49                    ));
50                }
51            }
52            match entry.item().typ() {
53                typ @ (Func | View | MaterializedView) => {
54                    ids_to_check.extend(entry.uses());
55                    let valid_id = id.is_user() || matches!(typ, Func);
56                    valid_id
57                }
58                Source | Secret | Connection => false,
59                // Cannot select from sinks or indexes.
60                Sink | Index => unreachable!(),
61                Table => {
62                    if !id.is_user() {
63                        // We can't read from non-user tables
64                        false
65                    } else {
66                        // We can't read from tables that are source-exports
67                        entry.source_export_details().is_none()
68                    }
69                }
70                Type => true,
71            }
72        }
73        None => false,
74    };
75    if !valid {
76        let (object_name, object_type) = match catalog.try_get_entry(id) {
77            Some(entry) => {
78                let object_name = catalog.resolve_full_name(entry.name(), None).to_string();
79                let object_type = match entry.item().typ() {
80                    // We only need the disallowed types here; the allowed types are handled above.
81                    Source => "source",
82                    Secret => "secret",
83                    Connection => "connection",
84                    Table => {
85                        if !id.is_user() {
86                            "system table"
87                        } else {
88                            "source-export table"
89                        }
90                    }
91                    View => "system view",
92                    MaterializedView => "system materialized view",
93                    _ => "invalid dependency",
94                };
95                (object_name, object_type.to_string())
96            }
97            None => (id.to_string(), "unknown".to_string()),
98        };
99        return Err(AdapterError::InvalidTableMutationSelection {
100            object_name,
101            object_type,
102        });
103    }
104    for id in ids_to_check {
105        validate_read_then_write_dependencies(catalog, &id)?;
106    }
107    Ok(())
108}