mz_deploy/project/analysis/changeset/base_facts.rs
1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Base fact extraction from a planned project.
11//!
12//! Translates the project's object graph into relational facts consumed by
13//! the Datalog fixed-point computation in [`super::datalog`].
14//!
15//! ## Base Facts
16//!
17//! Each base fact corresponds to a Datalog relation used by the propagation
18//! rules in [`super::datalog`]:
19//!
20//! | Relation | Source | Meaning |
21//! |----------|--------|---------|
22//! | `ObjectInSchema(obj, db, sch)` | Project hierarchy | Object `obj` lives in `db.sch` |
23//! | `DependsOn(child, parent)` | `project.dependency_graph` | `child` references `parent` in its query |
24//! | `StmtUsesCluster(obj, cluster)` | `IN CLUSTER` clause on main CREATE | Object's statement runs on `cluster` |
25//! | `IndexUsesCluster(obj, idx, cluster)` | `IN CLUSTER` clause on CREATE INDEX | Index `idx` on `obj` runs on `cluster` |
26//! | `ClusterBoundary(cluster)` | Evaluator-derived from cluster usage facts | The set of clusters eligible to become `DirtyCluster` |
27//! | `IsSink(obj)` | `Statement::CreateSink` | Object writes to an external system |
28//! | `IsReplacement(obj)` | Schema is in `project.replacement_schemas` | Object uses in-place replacement protocol |
29//!
30//! `ClusterBoundary` is derived as the set of all clusters referenced by
31//! statements or indexes in the project.
32
33use crate::project::analysis::deps::extract_dependencies;
34use crate::project::ast::Cluster;
35use crate::project::ast::Statement;
36use crate::project::ir::graph::Project;
37use crate::project::ir::object_id::ObjectId;
38use crate::verbose;
39use owo_colors::{OwoColorize, Stream, Style};
40use std::collections::BTreeSet;
41
42/// Base facts extracted from the project for Datalog computation.
43///
44/// Each field corresponds to a stored extensional relation. Helper relations
45/// such as `ClusterBoundary` are materialized from these facts in the
46/// evaluator layer.
47#[derive(Debug)]
48pub(super) struct BaseFacts {
49 /// `ObjectInSchema(object, database, schema)` -- every object and the
50 /// schema it belongs to. Used by schema-level propagation rules.
51 pub object_in_schema: Vec<(ObjectId, String, String)>,
52
53 /// `DependsOn(child, parent)` -- child depends on parent. Derived from
54 /// the project's dependency graph so dirtiness propagates downstream.
55 pub depends_on: Vec<(ObjectId, ObjectId)>,
56
57 /// `StmtUsesCluster(object, cluster_name)` -- the cluster referenced in
58 /// the object's CREATE statement. A dirty cluster dirties the object.
59 pub stmt_uses_cluster: Vec<(ObjectId, Cluster)>,
60
61 /// `IndexUsesCluster(object, index_name, cluster_name)` -- clusters used
62 /// by indexes on an object. Index clusters dirty only the cluster set,
63 /// **not** the parent object itself.
64 pub index_uses_cluster: Vec<(ObjectId, String, Cluster)>,
65
66 /// `IsSink(object)` -- objects that are sinks. Sinks do not propagate
67 /// dirtiness to clusters or schemas because they write to external systems
68 /// and are created after the swap.
69 pub is_sink: BTreeSet<ObjectId>,
70
71 /// `IsReplacement(object)` -- objects in replacement schemas. Changed
72 /// replacement MVs use the in-place replacement protocol so dirtiness
73 /// should not propagate through them to downstream objects.
74 pub is_replacement: BTreeSet<ObjectId>,
75}
76
77/// Extract all base facts from the project for Datalog computation.
78pub(super) fn extract_base_facts(project: &Project) -> BaseFacts {
79 let header_style = Style::new().cyan().bold();
80 verbose!(
81 "{} {}",
82 "▶".if_supports_color(Stream::Stderr, |t| t.cyan()),
83 "Extracting base facts from project..."
84 .if_supports_color(Stream::Stderr, |t| header_style.style(t))
85 );
86 let mut object_in_schema = Vec::new();
87 let mut depends_on = Vec::new();
88 let mut stmt_uses_cluster = Vec::new();
89 let mut index_uses_cluster = Vec::new();
90 let mut is_sink = BTreeSet::new();
91 let mut is_replacement = BTreeSet::new();
92
93 // Extract facts from each object in the project
94 for db in &project.databases {
95 for schema in &db.schemas {
96 // Check if this schema is a replacement schema
97 let is_replacement_schema = project
98 .replacement_schemas
99 .iter()
100 .any(|sq| sq.database == db.name && sq.schema == schema.name);
101
102 for obj in &schema.objects {
103 let obj_id = obj.id.clone();
104
105 // ObjectInSchema fact
106 object_in_schema.push((obj_id.clone(), db.name.clone(), schema.name.clone()));
107
108 // IsSink fact - sinks should not propagate dirtiness to clusters/schemas
109 if matches!(obj.typed_object.stmt, Statement::CreateSink(_)) {
110 verbose!(
111 " ├─ {}: {}",
112 "IsSink".if_supports_color(Stream::Stderr, |t| t.yellow()),
113 obj_id
114 .to_string()
115 .if_supports_color(Stream::Stderr, |t| t.cyan())
116 );
117 is_sink.insert(obj_id.clone());
118 }
119
120 // IsReplacement fact - replacement MVs should not propagate dirtiness to schemas
121 if is_replacement_schema {
122 verbose!(
123 " ├─ {}: {}",
124 "IsReplacement".if_supports_color(Stream::Stderr, |t| t.yellow()),
125 obj_id
126 .to_string()
127 .if_supports_color(Stream::Stderr, |t| t.cyan())
128 );
129 is_replacement.insert(obj_id.clone());
130 }
131
132 // DependsOn facts from dependency graph
133 if let Some(deps) = project.dependency_graph.get(&obj_id) {
134 for parent in deps {
135 depends_on.push((obj_id.clone(), parent.clone()));
136 }
137 }
138
139 // Extract cluster usage from statement
140 let (_, clusters) =
141 extract_dependencies(&obj.typed_object.stmt, &db.name, &schema.name);
142
143 // StmtUsesCluster facts
144 for cluster in clusters {
145 stmt_uses_cluster.push((obj_id.clone(), cluster));
146 }
147
148 // IndexUsesCluster facts - extract from indexes
149 for index in &obj.typed_object.indexes {
150 // Extract cluster directly from CreateIndexStatement
151 if let Some(cluster_name) = &index.in_cluster {
152 let index_name = index
153 .name
154 .as_ref()
155 .map(|n| n.to_string())
156 .unwrap_or_else(|| "unnamed_index".to_string());
157
158 // Convert cluster name to string
159 index_uses_cluster.push((
160 obj_id.clone(),
161 index_name,
162 Cluster::new(cluster_name.to_string()),
163 ));
164 }
165 }
166 }
167 }
168 }
169
170 verbose!(
171 " └─ Base facts: {} objects, {} dependencies, {} stmt→cluster, {} index→cluster, {} sinks, {} replacements",
172 object_in_schema
173 .len()
174 .to_string()
175 .if_supports_color(Stream::Stderr, |t| t.bold()),
176 depends_on
177 .len()
178 .to_string()
179 .if_supports_color(Stream::Stderr, |t| t.bold()),
180 stmt_uses_cluster
181 .len()
182 .to_string()
183 .if_supports_color(Stream::Stderr, |t| t.bold()),
184 index_uses_cluster
185 .len()
186 .to_string()
187 .if_supports_color(Stream::Stderr, |t| t.bold()),
188 is_sink
189 .len()
190 .to_string()
191 .if_supports_color(Stream::Stderr, |t| t.bold()),
192 is_replacement
193 .len()
194 .to_string()
195 .if_supports_color(Stream::Stderr, |t| t.bold())
196 );
197
198 BaseFacts {
199 object_in_schema,
200 depends_on,
201 stmt_uses_cluster,
202 index_uses_cluster,
203 is_sink,
204 is_replacement,
205 }
206}