Skip to main content

mz_deploy/project/analysis/changeset/
datalog.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Datalog-style fixed-point computation of dirty objects, clusters, and schemas.
11//!
12//! Implements the propagation rules documented in the parent module's header.
13//! The evaluator is organized as:
14//!
15//! 1. immutable input facts (`BaseFacts`)
16//! 2. precomputed join indexes and helper relations (`FactIndexes`)
17//! 3. mutable derived relations (`DirtyState`)
18//! 4. rule-group evaluators that collect pending derivations before merging
19//!
20//! Each `derive_*` helper corresponds to one or more Datalog rules and is
21//! annotated with the exact rule it encodes.
22//!
23//! ## Algorithm
24//!
25//! 1. **Seed:** Initialize `dirty_stmts` from `changed_stmts` (objects whose
26//!    hashes differ between the old and new snapshots) and `dirty_schemas` from
27//!    `forced_dirty_schemas` (schemas the caller redeploys unconditionally).
28//! 2. **Build indexes:** Pre-compute reverse lookup maps and the current
29//!    `ClusterBoundary` relation from base facts for O(1) rule evaluation.
30//! 3. **Fixed-point loop:** In each iteration, apply all five rule groups in
31//!    a fixed order:
32//!    1. Cluster dirtiness (from changed statements only, within `ClusterBoundary`)
33//!    2. Statement dirtiness from dirty clusters
34//!    3. Dependency propagation (downstream of dirty statements, excluding
35//!       replacement MVs, whose dirtiness does not fan out to dependents)
36//!    4. Schema dirtiness (from dirty statements, excluding sinks)
37//!    5. Statement dirtiness from dirty schemas
38//! 4. **Termination:** The loop converges when no set (`dirty_stmts`,
39//!    `dirty_clusters`, `dirty_schemas`) grows in an iteration. Convergence
40//!    is guaranteed because all sets are monotonically non-decreasing and
41//!    bounded by the finite project universe.
42//!
43//! ## Rule Application Order
44//!
45//! The ordering within a single iteration matters for convergence speed but
46//! not correctness — the fixed-point semantics guarantee the same result
47//! regardless of order. The chosen order (clusters → stmts-from-clusters →
48//! dependencies → schemas → stmts-from-schemas) maximizes information
49//! propagated per iteration, typically converging in 2–3 rounds.
50//!
51//! `ClusterBoundary` is materialized as all clusters referenced by statements
52//! or indexes in the project.
53
54use super::base_facts::BaseFacts;
55use super::logging::{log_datalog_start, log_final_results, log_iteration};
56use crate::project::SchemaQualifier;
57use crate::project::ast::Cluster;
58use crate::project::ir::object_id::ObjectId;
59use crate::verbose;
60use owo_colors::{OwoColorize, Stream, Style};
61use std::collections::{BTreeMap, BTreeSet};
62
63/// Pre-computed indexes for efficient Datalog rule evaluation.
64pub(super) struct FactIndexes {
65    /// Object -> clusters used by the statement
66    stmt_to_clusters: BTreeMap<ObjectId, Vec<Cluster>>,
67    /// Object -> clusters used by indexes on that object
68    index_to_clusters: BTreeMap<ObjectId, Vec<Cluster>>,
69    /// Parent -> list of dependent children (reverse of depends_on)
70    dependents: BTreeMap<ObjectId, Vec<ObjectId>>,
71    /// Object -> schema qualifier it belongs to
72    object_to_schema: BTreeMap<ObjectId, SchemaQualifier>,
73    /// Schema qualifier -> objects in that schema
74    schema_to_objects: BTreeMap<SchemaQualifier, Vec<ObjectId>>,
75    /// Clusters eligible to become `DirtyCluster` in the current evaluation
76    cluster_boundary: BTreeSet<Cluster>,
77}
78
79impl FactIndexes {
80    pub(super) fn from_base_facts(facts: &BaseFacts) -> Self {
81        // stmt_to_clusters: group by object
82        let mut stmt_to_clusters: BTreeMap<ObjectId, Vec<Cluster>> = BTreeMap::new();
83        for (obj, cluster) in &facts.stmt_uses_cluster {
84            stmt_to_clusters
85                .entry(obj.clone())
86                .or_default()
87                .push(cluster.clone());
88        }
89
90        // index_to_clusters: group by object (ignoring index name)
91        let mut index_to_clusters: BTreeMap<ObjectId, Vec<Cluster>> = BTreeMap::new();
92        for (obj, _index_name, cluster) in &facts.index_uses_cluster {
93            index_to_clusters
94                .entry(obj.clone())
95                .or_default()
96                .push(cluster.clone());
97        }
98
99        // dependents: reverse the depends_on relation (parent -> children)
100        let mut dependents: BTreeMap<ObjectId, Vec<ObjectId>> = BTreeMap::new();
101        for (child, parent) in &facts.depends_on {
102            dependents
103                .entry(parent.clone())
104                .or_default()
105                .push(child.clone());
106        }
107
108        // object_to_schema / schema_to_objects: direct mappings
109        let mut object_to_schema = BTreeMap::new();
110        let mut schema_to_objects: BTreeMap<SchemaQualifier, Vec<ObjectId>> = BTreeMap::new();
111        for (obj, db, sch) in &facts.object_in_schema {
112            let qualifier = SchemaQualifier::new(db.clone(), sch.clone());
113            object_to_schema.insert(obj.clone(), qualifier.clone());
114            schema_to_objects
115                .entry(qualifier)
116                .or_default()
117                .push(obj.clone());
118        }
119
120        let cluster_boundary = stmt_to_clusters
121            .values()
122            .flat_map(|clusters| clusters.iter().cloned())
123            .chain(
124                index_to_clusters
125                    .values()
126                    .flat_map(|clusters| clusters.iter().cloned()),
127            )
128            .collect();
129
130        FactIndexes {
131            stmt_to_clusters,
132            index_to_clusters,
133            dependents,
134            object_to_schema,
135            schema_to_objects,
136            cluster_boundary,
137        }
138    }
139}
140
141/// Mutable working sets carried across fixed-point iterations.
142///
143/// Keeps the Datalog loop state explicit and grouped so each rule helper can
144/// update shared dirtiness sets without additional tuple plumbing.
145pub(super) struct DirtyState {
146    pub dirty_stmts: BTreeSet<ObjectId>,
147    pub dirty_clusters: BTreeSet<Cluster>,
148    pub dirty_schemas: BTreeSet<SchemaQualifier>,
149}
150
151impl DirtyState {
152    /// Seeds dirty state from objects whose hashes changed between snapshots,
153    /// plus any schemas forced dirty by the caller. Forced schemas let the
154    /// fixed-point treat an unchanged schema as if it had changed: Rule 6 turns
155    /// each into its objects, which then propagate to downstream dependents.
156    pub(super) fn new(
157        changed_stmts: &BTreeSet<ObjectId>,
158        forced_dirty_schemas: &BTreeSet<SchemaQualifier>,
159    ) -> Self {
160        Self {
161            dirty_stmts: changed_stmts.clone(),
162            dirty_clusters: BTreeSet::new(),
163            dirty_schemas: forced_dirty_schemas.clone(),
164        }
165    }
166
167    pub(super) fn sizes(&self) -> (usize, usize, usize) {
168        (
169            self.dirty_stmts.len(),
170            self.dirty_clusters.len(),
171            self.dirty_schemas.len(),
172        )
173    }
174}
175
176/// Newly derived facts from one rule group before they are merged into `DirtyState`.
177#[derive(Default)]
178struct PendingFacts {
179    dirty_stmts: BTreeSet<ObjectId>,
180    dirty_clusters: BTreeSet<Cluster>,
181    dirty_schemas: BTreeSet<SchemaQualifier>,
182}
183
184impl PendingFacts {
185    fn merge_into(self, state: &mut DirtyState) {
186        state.dirty_stmts.extend(self.dirty_stmts);
187        state.dirty_clusters.extend(self.dirty_clusters);
188        state.dirty_schemas.extend(self.dirty_schemas);
189    }
190}
191
192/// Fixed-point evaluator for the dirty propagation rules.
193struct Evaluator<'a> {
194    changed_stmts: &'a BTreeSet<ObjectId>,
195    base_facts: &'a BaseFacts,
196    forced_dirty_schemas: &'a BTreeSet<SchemaQualifier>,
197    indexes: FactIndexes,
198}
199
200impl<'a> Evaluator<'a> {
201    fn new(
202        changed_stmts: &'a BTreeSet<ObjectId>,
203        base_facts: &'a BaseFacts,
204        forced_dirty_schemas: &'a BTreeSet<SchemaQualifier>,
205    ) -> Self {
206        Self {
207            changed_stmts,
208            base_facts,
209            forced_dirty_schemas,
210            indexes: FactIndexes::from_base_facts(base_facts),
211        }
212    }
213
214    fn run(&self) -> DirtyState {
215        let mut state = DirtyState::new(self.changed_stmts, self.forced_dirty_schemas);
216
217        let mut iteration = 0;
218        loop {
219            iteration += 1;
220            let prev_sizes = state.sizes();
221            log_iteration(iteration, &state);
222
223            self.apply_rule_group(&mut state, Self::derive_cluster_dirtiness);
224            self.apply_rule_group(&mut state, Self::derive_stmt_dirtiness_from_clusters);
225            self.apply_rule_group(&mut state, Self::derive_stmt_dependency_dirtiness);
226            self.apply_rule_group(&mut state, Self::derive_schema_dirtiness);
227            self.apply_rule_group(&mut state, Self::derive_stmt_dirtiness_from_schemas);
228
229            // Fixed point reached when no sets grew
230            if state.sizes() == prev_sizes {
231                verbose!(
232                    "\n{} Fixed point reached after {} iteration(s)",
233                    "✓".if_supports_color(Stream::Stderr, |t| t.green()),
234                    iteration
235                        .to_string()
236                        .if_supports_color(Stream::Stderr, |t| t.bold())
237                );
238                break;
239            }
240        }
241
242        state
243    }
244
245    fn apply_rule_group(
246        &self,
247        state: &mut DirtyState,
248        derive: fn(&Self, &DirtyState, &mut PendingFacts),
249    ) {
250        let mut pending = PendingFacts::default();
251        derive(self, state, &mut pending);
252        pending.merge_into(state);
253    }
254
255    /// Applies cluster dirtiness rules that originate only from changed statements.
256    ///
257    /// This intentionally excludes sink propagation so sink changes do not force
258    /// unrelated cluster redeployments.
259    ///
260    /// ```datalog
261    /// DirtyCluster(C) :- ChangedStmt(O), StmtUsesCluster(O, C),
262    ///                    NOT IsSink(O), ClusterBoundary(C)
263    /// DirtyCluster(C) :- ChangedStmt(O), IndexUsesCluster(O, _, C),
264    ///                    NOT IsSink(O), ClusterBoundary(C)
265    /// ```
266    fn derive_cluster_dirtiness(&self, state: &DirtyState, pending: &mut PendingFacts) {
267        for obj in self.changed_stmts {
268            if self.base_facts.is_sink.contains(obj) {
269                let skip_style = Style::new().yellow().bold();
270                verbose!(
271                    "  ├─ {}: {} is a sink, not marking clusters dirty",
272                    "SKIP".if_supports_color(Stream::Stderr, |t| skip_style.style(t)),
273                    obj.to_string()
274                        .if_supports_color(Stream::Stderr, |t| t.cyan())
275                );
276                continue;
277            }
278
279            if let Some(clusters) = self.indexes.stmt_to_clusters.get(obj) {
280                for cluster in clusters {
281                    if self.indexes.cluster_boundary.contains(cluster)
282                        && !state.dirty_clusters.contains(cluster)
283                        && pending.dirty_clusters.insert(cluster.clone())
284                    {
285                        verbose!(
286                            "  ├─ {}: DirtyCluster({}) ← ChangedStmt({}) uses cluster",
287                            "Rule 1".if_supports_color(Stream::Stderr, |t| t.bold()),
288                            cluster.if_supports_color(Stream::Stderr, |t| t.magenta()),
289                            obj.to_string()
290                                .if_supports_color(Stream::Stderr, |t| t.cyan())
291                        );
292                    }
293                }
294            }
295
296            if let Some(clusters) = self.indexes.index_to_clusters.get(obj) {
297                for cluster in clusters {
298                    if self.indexes.cluster_boundary.contains(cluster)
299                        && !state.dirty_clusters.contains(cluster)
300                        && pending.dirty_clusters.insert(cluster.clone())
301                    {
302                        verbose!(
303                            "  ├─ {}: DirtyCluster({}) ← ChangedStmt({}) has index on cluster",
304                            "Rule 2".if_supports_color(Stream::Stderr, |t| t.bold()),
305                            cluster.if_supports_color(Stream::Stderr, |t| t.magenta()),
306                            obj.to_string()
307                                .if_supports_color(Stream::Stderr, |t| t.cyan())
308                        );
309                    }
310                }
311            }
312        }
313    }
314
315    /// Applies statement dirtiness induced by dirty clusters.
316    ///
317    /// ```datalog
318    /// DirtyStmt(O) :- StmtUsesCluster(O, C), DirtyCluster(C)
319    /// ```
320    fn derive_stmt_dirtiness_from_clusters(&self, state: &DirtyState, pending: &mut PendingFacts) {
321        for (obj, clusters) in &self.indexes.stmt_to_clusters {
322            for cluster in clusters {
323                if state.dirty_clusters.contains(cluster)
324                    && !state.dirty_stmts.contains(obj)
325                    && pending.dirty_stmts.insert(obj.clone())
326                {
327                    verbose!(
328                        "  ├─ {}: DirtyStmt({}) ← uses DirtyCluster({})",
329                        "Rule 3".if_supports_color(Stream::Stderr, |t| t.bold()),
330                        obj.to_string()
331                            .if_supports_color(Stream::Stderr, |t| t.cyan()),
332                        cluster.if_supports_color(Stream::Stderr, |t| t.magenta())
333                    );
334                    break;
335                }
336            }
337        }
338    }
339
340    /// Applies downstream dependency propagation for dirty statements.
341    ///
342    /// Replacement MVs are excluded as parents: a dirty replacement MV is
343    /// redeployed in place, so its dirtiness does not fan out to dependents in
344    /// other schemas. This is the only special property of replacement MVs.
345    ///
346    /// ```datalog
347    /// DirtyStmt(O) :- DependsOn(O, P), DirtyStmt(P), NOT IsReplacement(P)
348    /// ```
349    fn derive_stmt_dependency_dirtiness(&self, state: &DirtyState, pending: &mut PendingFacts) {
350        for dirty_obj in &state.dirty_stmts {
351            if self.base_facts.is_replacement.contains(dirty_obj) {
352                let skip_style = Style::new().yellow().bold();
353                verbose!(
354                    "  ├─ {}: {} is a replacement MV, not propagating to dependents",
355                    "SKIP".if_supports_color(Stream::Stderr, |t| skip_style.style(t)),
356                    dirty_obj
357                        .to_string()
358                        .if_supports_color(Stream::Stderr, |t| t.cyan())
359                );
360                continue;
361            }
362            if let Some(children) = self.indexes.dependents.get(dirty_obj) {
363                for child in children {
364                    if !state.dirty_stmts.contains(child)
365                        && pending.dirty_stmts.insert(child.clone())
366                    {
367                        verbose!(
368                            "  ├─ {}: DirtyStmt({}) ← depends on DirtyStmt({})",
369                            "Rule 4".if_supports_color(Stream::Stderr, |t| t.bold()),
370                            child
371                                .to_string()
372                                .if_supports_color(Stream::Stderr, |t| t.cyan()),
373                            dirty_obj
374                                .to_string()
375                                .if_supports_color(Stream::Stderr, |t| t.cyan())
376                        );
377                    }
378                }
379            }
380        }
381    }
382
383    /// Marks schemas dirty from dirty statements eligible for schema-level redeploy.
384    ///
385    /// Sinks are excluded: they write to external systems and are created after
386    /// the swap during apply, so they must not pull their schema into a
387    /// redeploy. Replacement MVs are ordinary here — a dirty replacement MV
388    /// dirties its schema like any other compute object.
389    ///
390    /// ```datalog
391    /// DirtySchema(Db, Sch) :- DirtyStmt(O), ObjectInSchema(O, Db, Sch),
392    ///                          NOT IsSink(O)
393    /// ```
394    fn derive_schema_dirtiness(&self, state: &DirtyState, pending: &mut PendingFacts) {
395        for obj in &state.dirty_stmts {
396            if self.base_facts.is_sink.contains(obj) {
397                let skip_style = Style::new().yellow().bold();
398                verbose!(
399                    "  ├─ {}: {} is a sink, not marking schema dirty",
400                    "SKIP".if_supports_color(Stream::Stderr, |t| skip_style.style(t)),
401                    obj.to_string()
402                        .if_supports_color(Stream::Stderr, |t| t.cyan())
403                );
404                continue;
405            }
406            if let Some(sq) = self.indexes.object_to_schema.get(obj)
407                && !state.dirty_schemas.contains(sq)
408                && pending.dirty_schemas.insert(sq.clone())
409            {
410                verbose!(
411                    "  ├─ {}: DirtySchema({}) ← DirtyStmt({}) in schema",
412                    "Rule 5".if_supports_color(Stream::Stderr, |t| t.bold()),
413                    format!("{}.{}", sq.database, sq.schema)
414                        .if_supports_color(Stream::Stderr, |t| t.blue()),
415                    obj.to_string()
416                        .if_supports_color(Stream::Stderr, |t| t.cyan())
417                );
418            }
419        }
420    }
421
422    /// Pulls statements into the dirty set when their schema is marked dirty.
423    ///
424    /// Every object in a dirty schema is dirty, replacement MVs included: a
425    /// stable schema redeploys atomically. The downstream-propagation exclusion
426    /// (Rule 4) is what keeps a dirty replacement MV from pulling in dependents
427    /// in *other* schemas.
428    ///
429    /// ```datalog
430    /// DirtyStmt(O) :- DirtySchema(Db, Sch), ObjectInSchema(O, Db, Sch)
431    /// ```
432    fn derive_stmt_dirtiness_from_schemas(&self, state: &DirtyState, pending: &mut PendingFacts) {
433        for dirty_schema in &state.dirty_schemas {
434            if let Some(objects) = self.indexes.schema_to_objects.get(dirty_schema) {
435                for obj in objects {
436                    if state.dirty_stmts.contains(obj) || !pending.dirty_stmts.insert(obj.clone()) {
437                        continue;
438                    }
439                    verbose!(
440                        "  ├─ {}: DirtyStmt({}) ← in DirtySchema({})",
441                        "Rule 6".if_supports_color(Stream::Stderr, |t| t.bold()),
442                        obj.to_string()
443                            .if_supports_color(Stream::Stderr, |t| t.cyan()),
444                        format!("{}.{}", dirty_schema.database, dirty_schema.schema)
445                            .if_supports_color(Stream::Stderr, |t| t.blue())
446                    );
447                }
448            }
449        }
450    }
451}
452
453/// Compute dirty objects, clusters, and schemas using fixed-point iteration.
454///
455/// Implements the Datalog rules defined at the top of this module.
456///
457/// `forced_dirty_schemas` seeds the fixed-point with schemas the caller forces
458/// dirty (e.g. `stage --redeploy-schema`); pass an empty set for pure
459/// change-driven dirtiness. Every object in a forced schema, and its downstream
460/// dependents, ends up in the dirty set.
461///
462/// **Important:** Sinks are special - they do NOT propagate dirtiness to clusters or schemas.
463/// Sinks write to external systems and are created after the swap during apply, so they
464/// shouldn't cause other objects to be redeployed.
465pub(super) fn compute_dirty_datalog(
466    changed_stmts: &BTreeSet<ObjectId>,
467    base_facts: &BaseFacts,
468    forced_dirty_schemas: &BTreeSet<SchemaQualifier>,
469) -> (
470    BTreeSet<ObjectId>,
471    BTreeSet<Cluster>,
472    BTreeSet<SchemaQualifier>,
473) {
474    log_datalog_start(changed_stmts, base_facts);
475    let evaluator = Evaluator::new(changed_stmts, base_facts, forced_dirty_schemas);
476    let state = evaluator.run();
477
478    log_final_results(&state);
479    (state.dirty_stmts, state.dirty_clusters, state.dirty_schemas)
480}