Skip to main content

mz_deploy/project/analysis/changeset/
base_facts.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Base fact extraction from a planned project.
11//!
12//! Translates the project's object graph into relational facts consumed by
13//! the Datalog fixed-point computation in [`super::datalog`].
14//!
15//! ## Base Facts
16//!
17//! Each base fact corresponds to a Datalog relation used by the propagation
18//! rules in [`super::datalog`]:
19//!
20//! | Relation | Source | Meaning |
21//! |----------|--------|---------|
22//! | `ObjectInSchema(obj, db, sch)` | Project hierarchy | Object `obj` lives in `db.sch` |
23//! | `DependsOn(child, parent)` | `project.dependency_graph` | `child` references `parent` in its query |
24//! | `StmtUsesCluster(obj, cluster)` | `IN CLUSTER` clause on main CREATE | Object's statement runs on `cluster` |
25//! | `IndexUsesCluster(obj, idx, cluster)` | `IN CLUSTER` clause on CREATE INDEX | Index `idx` on `obj` runs on `cluster` |
26//! | `ClusterBoundary(cluster)` | Evaluator-derived from cluster usage facts | The set of clusters eligible to become `DirtyCluster` |
27//! | `IsSink(obj)` | `Statement::CreateSink` | Object writes to an external system |
28//! | `IsReplacement(obj)` | Schema is in `project.replacement_schemas` | Object uses in-place replacement protocol |
29//!
30//! `ClusterBoundary` is derived as the set of all clusters referenced by
31//! statements or indexes in the project.
32
33use crate::project::analysis::deps::extract_dependencies;
34use crate::project::ast::Cluster;
35use crate::project::ast::Statement;
36use crate::project::ir::graph::Project;
37use crate::project::ir::object_id::ObjectId;
38use crate::verbose;
39use owo_colors::{OwoColorize, Stream, Style};
40use std::collections::BTreeSet;
41
42/// Base facts extracted from the project for Datalog computation.
43///
44/// Each field corresponds to a stored extensional relation. Helper relations
45/// such as `ClusterBoundary` are materialized from these facts in the
46/// evaluator layer.
47#[derive(Debug)]
48pub(super) struct BaseFacts {
49    /// `ObjectInSchema(object, database, schema)` -- every object and the
50    /// schema it belongs to.  Used by schema-level propagation rules.
51    pub object_in_schema: Vec<(ObjectId, String, String)>,
52
53    /// `DependsOn(child, parent)` -- child depends on parent.  Derived from
54    /// the project's dependency graph so dirtiness propagates downstream.
55    pub depends_on: Vec<(ObjectId, ObjectId)>,
56
57    /// `StmtUsesCluster(object, cluster_name)` -- the cluster referenced in
58    /// the object's CREATE statement.  A dirty cluster dirties the object.
59    pub stmt_uses_cluster: Vec<(ObjectId, Cluster)>,
60
61    /// `IndexUsesCluster(object, index_name, cluster_name)` -- clusters used
62    /// by indexes on an object.  Index clusters dirty only the cluster set,
63    /// **not** the parent object itself.
64    pub index_uses_cluster: Vec<(ObjectId, String, Cluster)>,
65
66    /// `IsSink(object)` -- objects that are sinks.  Sinks do not propagate
67    /// dirtiness to clusters or schemas because they write to external systems
68    /// and are created after the swap.
69    pub is_sink: BTreeSet<ObjectId>,
70
71    /// `IsReplacement(object)` -- objects in replacement schemas.  Changed
72    /// replacement MVs use the in-place replacement protocol so dirtiness
73    /// should not propagate through them to downstream objects.
74    pub is_replacement: BTreeSet<ObjectId>,
75}
76
77/// Extract all base facts from the project for Datalog computation.
78pub(super) fn extract_base_facts(project: &Project) -> BaseFacts {
79    let header_style = Style::new().cyan().bold();
80    verbose!(
81        "{} {}",
82        "▶".if_supports_color(Stream::Stderr, |t| t.cyan()),
83        "Extracting base facts from project..."
84            .if_supports_color(Stream::Stderr, |t| header_style.style(t))
85    );
86    let mut object_in_schema = Vec::new();
87    let mut depends_on = Vec::new();
88    let mut stmt_uses_cluster = Vec::new();
89    let mut index_uses_cluster = Vec::new();
90    let mut is_sink = BTreeSet::new();
91    let mut is_replacement = BTreeSet::new();
92
93    // Extract facts from each object in the project
94    for db in &project.databases {
95        for schema in &db.schemas {
96            // Check if this schema is a replacement schema
97            let is_replacement_schema = project
98                .replacement_schemas
99                .iter()
100                .any(|sq| sq.database == db.name && sq.schema == schema.name);
101
102            for obj in &schema.objects {
103                let obj_id = obj.id.clone();
104
105                // ObjectInSchema fact
106                object_in_schema.push((obj_id.clone(), db.name.clone(), schema.name.clone()));
107
108                // IsSink fact - sinks should not propagate dirtiness to clusters/schemas
109                if matches!(obj.typed_object.stmt, Statement::CreateSink(_)) {
110                    verbose!(
111                        "  ├─ {}: {}",
112                        "IsSink".if_supports_color(Stream::Stderr, |t| t.yellow()),
113                        obj_id
114                            .to_string()
115                            .if_supports_color(Stream::Stderr, |t| t.cyan())
116                    );
117                    is_sink.insert(obj_id.clone());
118                }
119
120                // IsReplacement fact - replacement MVs should not propagate dirtiness to schemas
121                if is_replacement_schema {
122                    verbose!(
123                        "  ├─ {}: {}",
124                        "IsReplacement".if_supports_color(Stream::Stderr, |t| t.yellow()),
125                        obj_id
126                            .to_string()
127                            .if_supports_color(Stream::Stderr, |t| t.cyan())
128                    );
129                    is_replacement.insert(obj_id.clone());
130                }
131
132                // DependsOn facts from dependency graph
133                if let Some(deps) = project.dependency_graph.get(&obj_id) {
134                    for parent in deps {
135                        depends_on.push((obj_id.clone(), parent.clone()));
136                    }
137                }
138
139                // Extract cluster usage from statement
140                let (_, clusters) =
141                    extract_dependencies(&obj.typed_object.stmt, &db.name, &schema.name);
142
143                // StmtUsesCluster facts
144                for cluster in clusters {
145                    stmt_uses_cluster.push((obj_id.clone(), cluster));
146                }
147
148                // IndexUsesCluster facts - extract from indexes
149                for index in &obj.typed_object.indexes {
150                    // Extract cluster directly from CreateIndexStatement
151                    if let Some(cluster_name) = &index.in_cluster {
152                        let index_name = index
153                            .name
154                            .as_ref()
155                            .map(|n| n.to_string())
156                            .unwrap_or_else(|| "unnamed_index".to_string());
157
158                        // Convert cluster name to string
159                        index_uses_cluster.push((
160                            obj_id.clone(),
161                            index_name,
162                            Cluster::new(cluster_name.to_string()),
163                        ));
164                    }
165                }
166            }
167        }
168    }
169
170    verbose!(
171        "  └─ Base facts: {} objects, {} dependencies, {} stmt→cluster, {} index→cluster, {} sinks, {} replacements",
172        object_in_schema
173            .len()
174            .to_string()
175            .if_supports_color(Stream::Stderr, |t| t.bold()),
176        depends_on
177            .len()
178            .to_string()
179            .if_supports_color(Stream::Stderr, |t| t.bold()),
180        stmt_uses_cluster
181            .len()
182            .to_string()
183            .if_supports_color(Stream::Stderr, |t| t.bold()),
184        index_uses_cluster
185            .len()
186            .to_string()
187            .if_supports_color(Stream::Stderr, |t| t.bold()),
188        is_sink
189            .len()
190            .to_string()
191            .if_supports_color(Stream::Stderr, |t| t.bold()),
192        is_replacement
193            .len()
194            .to_string()
195            .if_supports_color(Stream::Stderr, |t| t.bold())
196    );
197
198    BaseFacts {
199        object_in_schema,
200        depends_on,
201        stmt_uses_cluster,
202        index_uses_cluster,
203        is_sink,
204        is_replacement,
205    }
206}