mz_deploy/project/analysis/changeset/datalog.rs
1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Datalog-style fixed-point computation of dirty objects, clusters, and schemas.
11//!
12//! Implements the propagation rules documented in the parent module's header.
13//! The evaluator is organized as:
14//!
15//! 1. immutable input facts (`BaseFacts`)
16//! 2. precomputed join indexes and helper relations (`FactIndexes`)
17//! 3. mutable derived relations (`DirtyState`)
18//! 4. rule-group evaluators that collect pending derivations before merging
19//!
20//! Each `derive_*` helper corresponds to one or more Datalog rules and is
21//! annotated with the exact rule it encodes.
22//!
23//! ## Algorithm
24//!
25//! 1. **Seed:** Initialize `dirty_stmts` from `changed_stmts` (objects whose
26//! hashes differ between the old and new snapshots) and `dirty_schemas` from
27//! `forced_dirty_schemas` (schemas the caller redeploys unconditionally).
28//! 2. **Build indexes:** Pre-compute reverse lookup maps and the current
29//! `ClusterBoundary` relation from base facts for O(1) rule evaluation.
30//! 3. **Fixed-point loop:** In each iteration, apply all five rule groups in
31//! a fixed order:
32//! 1. Cluster dirtiness (from changed statements only, within `ClusterBoundary`)
33//! 2. Statement dirtiness from dirty clusters
34//! 3. Dependency propagation (downstream of dirty statements, excluding
35//! replacement MVs, whose dirtiness does not fan out to dependents)
36//! 4. Schema dirtiness (from dirty statements, excluding sinks)
37//! 5. Statement dirtiness from dirty schemas
38//! 4. **Termination:** The loop converges when no set (`dirty_stmts`,
39//! `dirty_clusters`, `dirty_schemas`) grows in an iteration. Convergence
40//! is guaranteed because all sets are monotonically non-decreasing and
41//! bounded by the finite project universe.
42//!
43//! ## Rule Application Order
44//!
45//! The ordering within a single iteration matters for convergence speed but
46//! not correctness — the fixed-point semantics guarantee the same result
47//! regardless of order. The chosen order (clusters → stmts-from-clusters →
48//! dependencies → schemas → stmts-from-schemas) maximizes information
49//! propagated per iteration, typically converging in 2–3 rounds.
50//!
51//! `ClusterBoundary` is materialized as all clusters referenced by statements
52//! or indexes in the project.
53
54use super::base_facts::BaseFacts;
55use super::logging::{log_datalog_start, log_final_results, log_iteration};
56use crate::project::SchemaQualifier;
57use crate::project::ast::Cluster;
58use crate::project::ir::object_id::ObjectId;
59use crate::verbose;
60use owo_colors::{OwoColorize, Stream, Style};
61use std::collections::{BTreeMap, BTreeSet};
62
63/// Pre-computed indexes for efficient Datalog rule evaluation.
64pub(super) struct FactIndexes {
65 /// Object -> clusters used by the statement
66 stmt_to_clusters: BTreeMap<ObjectId, Vec<Cluster>>,
67 /// Object -> clusters used by indexes on that object
68 index_to_clusters: BTreeMap<ObjectId, Vec<Cluster>>,
69 /// Parent -> list of dependent children (reverse of depends_on)
70 dependents: BTreeMap<ObjectId, Vec<ObjectId>>,
71 /// Object -> schema qualifier it belongs to
72 object_to_schema: BTreeMap<ObjectId, SchemaQualifier>,
73 /// Schema qualifier -> objects in that schema
74 schema_to_objects: BTreeMap<SchemaQualifier, Vec<ObjectId>>,
75 /// Clusters eligible to become `DirtyCluster` in the current evaluation
76 cluster_boundary: BTreeSet<Cluster>,
77}
78
79impl FactIndexes {
80 pub(super) fn from_base_facts(facts: &BaseFacts) -> Self {
81 // stmt_to_clusters: group by object
82 let mut stmt_to_clusters: BTreeMap<ObjectId, Vec<Cluster>> = BTreeMap::new();
83 for (obj, cluster) in &facts.stmt_uses_cluster {
84 stmt_to_clusters
85 .entry(obj.clone())
86 .or_default()
87 .push(cluster.clone());
88 }
89
90 // index_to_clusters: group by object (ignoring index name)
91 let mut index_to_clusters: BTreeMap<ObjectId, Vec<Cluster>> = BTreeMap::new();
92 for (obj, _index_name, cluster) in &facts.index_uses_cluster {
93 index_to_clusters
94 .entry(obj.clone())
95 .or_default()
96 .push(cluster.clone());
97 }
98
99 // dependents: reverse the depends_on relation (parent -> children)
100 let mut dependents: BTreeMap<ObjectId, Vec<ObjectId>> = BTreeMap::new();
101 for (child, parent) in &facts.depends_on {
102 dependents
103 .entry(parent.clone())
104 .or_default()
105 .push(child.clone());
106 }
107
108 // object_to_schema / schema_to_objects: direct mappings
109 let mut object_to_schema = BTreeMap::new();
110 let mut schema_to_objects: BTreeMap<SchemaQualifier, Vec<ObjectId>> = BTreeMap::new();
111 for (obj, db, sch) in &facts.object_in_schema {
112 let qualifier = SchemaQualifier::new(db.clone(), sch.clone());
113 object_to_schema.insert(obj.clone(), qualifier.clone());
114 schema_to_objects
115 .entry(qualifier)
116 .or_default()
117 .push(obj.clone());
118 }
119
120 let cluster_boundary = stmt_to_clusters
121 .values()
122 .flat_map(|clusters| clusters.iter().cloned())
123 .chain(
124 index_to_clusters
125 .values()
126 .flat_map(|clusters| clusters.iter().cloned()),
127 )
128 .collect();
129
130 FactIndexes {
131 stmt_to_clusters,
132 index_to_clusters,
133 dependents,
134 object_to_schema,
135 schema_to_objects,
136 cluster_boundary,
137 }
138 }
139}
140
141/// Mutable working sets carried across fixed-point iterations.
142///
143/// Keeps the Datalog loop state explicit and grouped so each rule helper can
144/// update shared dirtiness sets without additional tuple plumbing.
145pub(super) struct DirtyState {
146 pub dirty_stmts: BTreeSet<ObjectId>,
147 pub dirty_clusters: BTreeSet<Cluster>,
148 pub dirty_schemas: BTreeSet<SchemaQualifier>,
149}
150
151impl DirtyState {
152 /// Seeds dirty state from objects whose hashes changed between snapshots,
153 /// plus any schemas forced dirty by the caller. Forced schemas let the
154 /// fixed-point treat an unchanged schema as if it had changed: Rule 6 turns
155 /// each into its objects, which then propagate to downstream dependents.
156 pub(super) fn new(
157 changed_stmts: &BTreeSet<ObjectId>,
158 forced_dirty_schemas: &BTreeSet<SchemaQualifier>,
159 ) -> Self {
160 Self {
161 dirty_stmts: changed_stmts.clone(),
162 dirty_clusters: BTreeSet::new(),
163 dirty_schemas: forced_dirty_schemas.clone(),
164 }
165 }
166
167 pub(super) fn sizes(&self) -> (usize, usize, usize) {
168 (
169 self.dirty_stmts.len(),
170 self.dirty_clusters.len(),
171 self.dirty_schemas.len(),
172 )
173 }
174}
175
176/// Newly derived facts from one rule group before they are merged into `DirtyState`.
177#[derive(Default)]
178struct PendingFacts {
179 dirty_stmts: BTreeSet<ObjectId>,
180 dirty_clusters: BTreeSet<Cluster>,
181 dirty_schemas: BTreeSet<SchemaQualifier>,
182}
183
184impl PendingFacts {
185 fn merge_into(self, state: &mut DirtyState) {
186 state.dirty_stmts.extend(self.dirty_stmts);
187 state.dirty_clusters.extend(self.dirty_clusters);
188 state.dirty_schemas.extend(self.dirty_schemas);
189 }
190}
191
192/// Fixed-point evaluator for the dirty propagation rules.
193struct Evaluator<'a> {
194 changed_stmts: &'a BTreeSet<ObjectId>,
195 base_facts: &'a BaseFacts,
196 forced_dirty_schemas: &'a BTreeSet<SchemaQualifier>,
197 indexes: FactIndexes,
198}
199
200impl<'a> Evaluator<'a> {
201 fn new(
202 changed_stmts: &'a BTreeSet<ObjectId>,
203 base_facts: &'a BaseFacts,
204 forced_dirty_schemas: &'a BTreeSet<SchemaQualifier>,
205 ) -> Self {
206 Self {
207 changed_stmts,
208 base_facts,
209 forced_dirty_schemas,
210 indexes: FactIndexes::from_base_facts(base_facts),
211 }
212 }
213
214 fn run(&self) -> DirtyState {
215 let mut state = DirtyState::new(self.changed_stmts, self.forced_dirty_schemas);
216
217 let mut iteration = 0;
218 loop {
219 iteration += 1;
220 let prev_sizes = state.sizes();
221 log_iteration(iteration, &state);
222
223 self.apply_rule_group(&mut state, Self::derive_cluster_dirtiness);
224 self.apply_rule_group(&mut state, Self::derive_stmt_dirtiness_from_clusters);
225 self.apply_rule_group(&mut state, Self::derive_stmt_dependency_dirtiness);
226 self.apply_rule_group(&mut state, Self::derive_schema_dirtiness);
227 self.apply_rule_group(&mut state, Self::derive_stmt_dirtiness_from_schemas);
228
229 // Fixed point reached when no sets grew
230 if state.sizes() == prev_sizes {
231 verbose!(
232 "\n{} Fixed point reached after {} iteration(s)",
233 "✓".if_supports_color(Stream::Stderr, |t| t.green()),
234 iteration
235 .to_string()
236 .if_supports_color(Stream::Stderr, |t| t.bold())
237 );
238 break;
239 }
240 }
241
242 state
243 }
244
245 fn apply_rule_group(
246 &self,
247 state: &mut DirtyState,
248 derive: fn(&Self, &DirtyState, &mut PendingFacts),
249 ) {
250 let mut pending = PendingFacts::default();
251 derive(self, state, &mut pending);
252 pending.merge_into(state);
253 }
254
255 /// Applies cluster dirtiness rules that originate only from changed statements.
256 ///
257 /// This intentionally excludes sink propagation so sink changes do not force
258 /// unrelated cluster redeployments.
259 ///
260 /// ```datalog
261 /// DirtyCluster(C) :- ChangedStmt(O), StmtUsesCluster(O, C),
262 /// NOT IsSink(O), ClusterBoundary(C)
263 /// DirtyCluster(C) :- ChangedStmt(O), IndexUsesCluster(O, _, C),
264 /// NOT IsSink(O), ClusterBoundary(C)
265 /// ```
266 fn derive_cluster_dirtiness(&self, state: &DirtyState, pending: &mut PendingFacts) {
267 for obj in self.changed_stmts {
268 if self.base_facts.is_sink.contains(obj) {
269 let skip_style = Style::new().yellow().bold();
270 verbose!(
271 " ├─ {}: {} is a sink, not marking clusters dirty",
272 "SKIP".if_supports_color(Stream::Stderr, |t| skip_style.style(t)),
273 obj.to_string()
274 .if_supports_color(Stream::Stderr, |t| t.cyan())
275 );
276 continue;
277 }
278
279 if let Some(clusters) = self.indexes.stmt_to_clusters.get(obj) {
280 for cluster in clusters {
281 if self.indexes.cluster_boundary.contains(cluster)
282 && !state.dirty_clusters.contains(cluster)
283 && pending.dirty_clusters.insert(cluster.clone())
284 {
285 verbose!(
286 " ├─ {}: DirtyCluster({}) ← ChangedStmt({}) uses cluster",
287 "Rule 1".if_supports_color(Stream::Stderr, |t| t.bold()),
288 cluster.if_supports_color(Stream::Stderr, |t| t.magenta()),
289 obj.to_string()
290 .if_supports_color(Stream::Stderr, |t| t.cyan())
291 );
292 }
293 }
294 }
295
296 if let Some(clusters) = self.indexes.index_to_clusters.get(obj) {
297 for cluster in clusters {
298 if self.indexes.cluster_boundary.contains(cluster)
299 && !state.dirty_clusters.contains(cluster)
300 && pending.dirty_clusters.insert(cluster.clone())
301 {
302 verbose!(
303 " ├─ {}: DirtyCluster({}) ← ChangedStmt({}) has index on cluster",
304 "Rule 2".if_supports_color(Stream::Stderr, |t| t.bold()),
305 cluster.if_supports_color(Stream::Stderr, |t| t.magenta()),
306 obj.to_string()
307 .if_supports_color(Stream::Stderr, |t| t.cyan())
308 );
309 }
310 }
311 }
312 }
313 }
314
315 /// Applies statement dirtiness induced by dirty clusters.
316 ///
317 /// ```datalog
318 /// DirtyStmt(O) :- StmtUsesCluster(O, C), DirtyCluster(C)
319 /// ```
320 fn derive_stmt_dirtiness_from_clusters(&self, state: &DirtyState, pending: &mut PendingFacts) {
321 for (obj, clusters) in &self.indexes.stmt_to_clusters {
322 for cluster in clusters {
323 if state.dirty_clusters.contains(cluster)
324 && !state.dirty_stmts.contains(obj)
325 && pending.dirty_stmts.insert(obj.clone())
326 {
327 verbose!(
328 " ├─ {}: DirtyStmt({}) ← uses DirtyCluster({})",
329 "Rule 3".if_supports_color(Stream::Stderr, |t| t.bold()),
330 obj.to_string()
331 .if_supports_color(Stream::Stderr, |t| t.cyan()),
332 cluster.if_supports_color(Stream::Stderr, |t| t.magenta())
333 );
334 break;
335 }
336 }
337 }
338 }
339
340 /// Applies downstream dependency propagation for dirty statements.
341 ///
342 /// Replacement MVs are excluded as parents: a dirty replacement MV is
343 /// redeployed in place, so its dirtiness does not fan out to dependents in
344 /// other schemas. This is the only special property of replacement MVs.
345 ///
346 /// ```datalog
347 /// DirtyStmt(O) :- DependsOn(O, P), DirtyStmt(P), NOT IsReplacement(P)
348 /// ```
349 fn derive_stmt_dependency_dirtiness(&self, state: &DirtyState, pending: &mut PendingFacts) {
350 for dirty_obj in &state.dirty_stmts {
351 if self.base_facts.is_replacement.contains(dirty_obj) {
352 let skip_style = Style::new().yellow().bold();
353 verbose!(
354 " ├─ {}: {} is a replacement MV, not propagating to dependents",
355 "SKIP".if_supports_color(Stream::Stderr, |t| skip_style.style(t)),
356 dirty_obj
357 .to_string()
358 .if_supports_color(Stream::Stderr, |t| t.cyan())
359 );
360 continue;
361 }
362 if let Some(children) = self.indexes.dependents.get(dirty_obj) {
363 for child in children {
364 if !state.dirty_stmts.contains(child)
365 && pending.dirty_stmts.insert(child.clone())
366 {
367 verbose!(
368 " ├─ {}: DirtyStmt({}) ← depends on DirtyStmt({})",
369 "Rule 4".if_supports_color(Stream::Stderr, |t| t.bold()),
370 child
371 .to_string()
372 .if_supports_color(Stream::Stderr, |t| t.cyan()),
373 dirty_obj
374 .to_string()
375 .if_supports_color(Stream::Stderr, |t| t.cyan())
376 );
377 }
378 }
379 }
380 }
381 }
382
383 /// Marks schemas dirty from dirty statements eligible for schema-level redeploy.
384 ///
385 /// Sinks are excluded: they write to external systems and are created after
386 /// the swap during apply, so they must not pull their schema into a
387 /// redeploy. Replacement MVs are ordinary here — a dirty replacement MV
388 /// dirties its schema like any other compute object.
389 ///
390 /// ```datalog
391 /// DirtySchema(Db, Sch) :- DirtyStmt(O), ObjectInSchema(O, Db, Sch),
392 /// NOT IsSink(O)
393 /// ```
394 fn derive_schema_dirtiness(&self, state: &DirtyState, pending: &mut PendingFacts) {
395 for obj in &state.dirty_stmts {
396 if self.base_facts.is_sink.contains(obj) {
397 let skip_style = Style::new().yellow().bold();
398 verbose!(
399 " ├─ {}: {} is a sink, not marking schema dirty",
400 "SKIP".if_supports_color(Stream::Stderr, |t| skip_style.style(t)),
401 obj.to_string()
402 .if_supports_color(Stream::Stderr, |t| t.cyan())
403 );
404 continue;
405 }
406 if let Some(sq) = self.indexes.object_to_schema.get(obj)
407 && !state.dirty_schemas.contains(sq)
408 && pending.dirty_schemas.insert(sq.clone())
409 {
410 verbose!(
411 " ├─ {}: DirtySchema({}) ← DirtyStmt({}) in schema",
412 "Rule 5".if_supports_color(Stream::Stderr, |t| t.bold()),
413 format!("{}.{}", sq.database, sq.schema)
414 .if_supports_color(Stream::Stderr, |t| t.blue()),
415 obj.to_string()
416 .if_supports_color(Stream::Stderr, |t| t.cyan())
417 );
418 }
419 }
420 }
421
422 /// Pulls statements into the dirty set when their schema is marked dirty.
423 ///
424 /// Every object in a dirty schema is dirty, replacement MVs included: a
425 /// stable schema redeploys atomically. The downstream-propagation exclusion
426 /// (Rule 4) is what keeps a dirty replacement MV from pulling in dependents
427 /// in *other* schemas.
428 ///
429 /// ```datalog
430 /// DirtyStmt(O) :- DirtySchema(Db, Sch), ObjectInSchema(O, Db, Sch)
431 /// ```
432 fn derive_stmt_dirtiness_from_schemas(&self, state: &DirtyState, pending: &mut PendingFacts) {
433 for dirty_schema in &state.dirty_schemas {
434 if let Some(objects) = self.indexes.schema_to_objects.get(dirty_schema) {
435 for obj in objects {
436 if state.dirty_stmts.contains(obj) || !pending.dirty_stmts.insert(obj.clone()) {
437 continue;
438 }
439 verbose!(
440 " ├─ {}: DirtyStmt({}) ← in DirtySchema({})",
441 "Rule 6".if_supports_color(Stream::Stderr, |t| t.bold()),
442 obj.to_string()
443 .if_supports_color(Stream::Stderr, |t| t.cyan()),
444 format!("{}.{}", dirty_schema.database, dirty_schema.schema)
445 .if_supports_color(Stream::Stderr, |t| t.blue())
446 );
447 }
448 }
449 }
450 }
451}
452
453/// Compute dirty objects, clusters, and schemas using fixed-point iteration.
454///
455/// Implements the Datalog rules defined at the top of this module.
456///
457/// `forced_dirty_schemas` seeds the fixed-point with schemas the caller forces
458/// dirty (e.g. `stage --redeploy-schema`); pass an empty set for pure
459/// change-driven dirtiness. Every object in a forced schema, and its downstream
460/// dependents, ends up in the dirty set.
461///
462/// **Important:** Sinks are special - they do NOT propagate dirtiness to clusters or schemas.
463/// Sinks write to external systems and are created after the swap during apply, so they
464/// shouldn't cause other objects to be redeployed.
465pub(super) fn compute_dirty_datalog(
466 changed_stmts: &BTreeSet<ObjectId>,
467 base_facts: &BaseFacts,
468 forced_dirty_schemas: &BTreeSet<SchemaQualifier>,
469) -> (
470 BTreeSet<ObjectId>,
471 BTreeSet<Cluster>,
472 BTreeSet<SchemaQualifier>,
473) {
474 log_datalog_start(changed_stmts, base_facts);
475 let evaluator = Evaluator::new(changed_stmts, base_facts, forced_dirty_schemas);
476 let state = evaluator.run();
477
478 log_final_results(&state);
479 (state.dirty_stmts, state.dirty_clusters, state.dirty_schemas)
480}