1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
// Copyright Materialize, Inc. and contributors. All rights reserved.
//
// Use of this software is governed by the Business Source License
// included in the LICENSE file.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

//! Analysis to identify monotonic collections, especially TopK inputs.
use std::collections::BTreeSet;

use itertools::zip_eq;
use mz_expr::visit::VisitChildren;
use mz_expr::{Id, LetRecLimit, LocalId, MirRelationExpr, RECURSION_LIMIT};
use mz_ore::cast::CastFrom;
use mz_ore::soft_panic_or_log;
use mz_ore::stack::{CheckedRecursion, RecursionGuard};
use mz_repr::GlobalId;

/// A struct that holds a recursive function that determines if a
/// relation is monotonic, and applies any optimizations along the way.
#[derive(Debug)]
pub struct MonotonicFlag {
    recursion_guard: RecursionGuard,
}

impl Default for MonotonicFlag {
    fn default() -> MonotonicFlag {
        MonotonicFlag {
            recursion_guard: RecursionGuard::with_limit(RECURSION_LIMIT),
        }
    }
}

impl CheckedRecursion for MonotonicFlag {
    fn recursion_guard(&self) -> &RecursionGuard {
        &self.recursion_guard
    }
}

impl MonotonicFlag {
    /// Determines if a relation is monotonic, and applies any optimizations along the way.
    /// mon_ids should be the ids of monotonic sources and indexes involved in expr.
    pub fn apply(
        &self,
        expr: &mut MirRelationExpr,
        mon_ids: &BTreeSet<GlobalId>,
        locals: &mut BTreeSet<LocalId>,
    ) -> Result<bool, crate::RecursionLimitError> {
        self.checked_recur(|_| {
            let is_monotonic = match expr {
                MirRelationExpr::Get { id, .. } => match id {
                    Id::Global(id) => mon_ids.contains(id),
                    Id::Local(id) => locals.contains(id),
                },
                MirRelationExpr::Project { input, .. } => self.apply(input, mon_ids, locals)?,
                MirRelationExpr::Filter { input, predicates } => {
                    let is_monotonic = self.apply(input, mon_ids, locals)?;
                    // Non-temporal predicates can introduce non-monotonicity, as they
                    // can result in the future removal of records.
                    // TODO: this could be improved to only restrict if upper bounds
                    // are present, as temporal lower bounds only delay introduction.
                    is_monotonic && !predicates.iter().any(|p| p.contains_temporal())
                }
                MirRelationExpr::Map { input, .. } => self.apply(input, mon_ids, locals)?,
                MirRelationExpr::TopK {
                    input, monotonic, ..
                } => {
                    *monotonic = self.apply(input, mon_ids, locals)?;
                    false
                }
                MirRelationExpr::Reduce {
                    input,
                    aggregates,
                    monotonic,
                    ..
                } => {
                    *monotonic = self.apply(input, mon_ids, locals)?;
                    // Reduce is monotonic iff its input is and it is a "distinct",
                    // with no aggregate values; otherwise it may need to retract.
                    *monotonic && aggregates.is_empty()
                }
                MirRelationExpr::Union { base, inputs } => {
                    let mut monotonic = self.apply(base, mon_ids, locals)?;
                    for input in inputs.iter_mut() {
                        let monotonic_i = self.apply(input, mon_ids, locals)?;
                        monotonic = monotonic && monotonic_i;
                    }
                    monotonic
                }
                MirRelationExpr::ArrangeBy { input, .. } => self.apply(input, mon_ids, locals)?,
                MirRelationExpr::FlatMap { input, func, .. } => {
                    let is_monotonic = self.apply(input, mon_ids, locals)?;
                    is_monotonic && func.preserves_monotonicity()
                }
                MirRelationExpr::Join { inputs, .. } => {
                    // If all inputs to the join are monotonic then so is the join.
                    let mut monotonic = true;
                    for input in inputs.iter_mut() {
                        let monotonic_i = self.apply(input, mon_ids, locals)?;
                        monotonic = monotonic && monotonic_i;
                    }
                    monotonic
                }
                MirRelationExpr::Constant { rows: Ok(rows), .. } => {
                    rows.iter().all(|(_, diff)| diff > &0)
                }
                MirRelationExpr::Threshold { input } => self.apply(input, mon_ids, locals)?,
                MirRelationExpr::Let { id, value, body } => {
                    let prior = locals.remove(id);
                    if self.apply(value, mon_ids, locals)? {
                        locals.insert(*id);
                    }
                    let result = self.apply(body, mon_ids, locals)?;
                    if prior {
                        locals.insert(*id);
                    } else {
                        locals.remove(id);
                    }
                    result
                }
                MirRelationExpr::LetRec {
                    ids,
                    values,
                    limits,
                    body,
                } => {
                    // Optimistically assume that all bindings were monotonic
                    // (it is safe to do this as we initialize each binding to
                    // the empty collection, which is trivially monotonic).
                    for id in ids.iter() {
                        let inserted = locals.insert(id.clone());
                        assert!(inserted, "Shadowing of identifier: {id:?}");
                    }

                    // The following is equivalent to a dataflow analysis on the
                    // following lattice:
                    // - The bottom element is `true` / contained in `locals`.
                    // - The top element is `false` / not present in `locals`.
                    // - The join operator is boolean `AND`.
                    // - The meet operator is boolean `OR` (but it's not used).
                    //
                    // Sequentially AND locals[id] with the result of descending
                    // into a clone of values[id]. Repeat until one of the
                    // following conditions is met:
                    //
                    // 1. The locals entries have stabilized at a fixpoint.
                    // 2. No fixpoint was found after `max_iterations`. If this
                    //    is the case we reset the locals entries for all
                    //    recursive CTEs to `false` by removing them from
                    //    `locals` (pessimistic approximation).
                    // 3. We reach the user-specified recursion limit of any of
                    //    the bindings. In this case, we also give up similarly
                    //    to (2), because we don't want to complicate things
                    //    with handling different limits per binding.
                    let min_max_iter = LetRecLimit::min_max_iter(limits);
                    let max_iterations = 100;
                    let mut curr_iteration = 0;
                    loop {
                        // Check for conditions (2) and (3).
                        if curr_iteration >= max_iterations
                            || min_max_iter
                                .map(|min_max_iter| curr_iteration >= min_max_iter)
                                .unwrap_or(false)
                        {
                            if curr_iteration > u64::cast_from(ids.len()) {
                                soft_panic_or_log!(
                                    "LetRec loop in MonotonicFlag has not converged in |{}|",
                                    ids.len()
                                );
                            }

                            for id in ids.iter() {
                                locals.remove(id);
                            }
                            break;
                        }

                        // Check for condition (1).
                        let mut change = false;
                        for (id, mut value) in zip_eq(ids.iter(), values.iter().cloned()) {
                            if !self.apply(&mut value, mon_ids, locals)? {
                                change |= locals.remove(id); // set only if `id` is still present
                            }
                        }
                        if !change {
                            break;
                        }

                        curr_iteration += 1;
                    }

                    // Descend into the values with the locals.
                    for value in values.iter_mut() {
                        self.apply(value, mon_ids, locals)?;
                    }

                    let is_monotonic = self.apply(body, mon_ids, locals)?;

                    // Remove shadowed bindings. This is good hygiene, as
                    // otherwise with nested LetRec blocks the `loop { ... }`
                    // above will carry inner LetRec IDs across outer LetRec
                    // iterations. As a consequence, the "no shadowing"
                    // assertion at the beginning of this block will fail at the
                    // inner LetRec for the second outer LetRec iteration.
                    for id in ids.iter() {
                        locals.remove(id);
                    }

                    is_monotonic
                }
                // The default behavior.
                // TODO: check that this is the behavior we want.
                MirRelationExpr::Negate { .. } | MirRelationExpr::Constant { rows: Err(_), .. } => {
                    expr.try_visit_mut_children(|e| self.apply(e, mon_ids, locals).map(|_| ()))?;
                    false
                }
            };
            Ok(is_monotonic)
        })
    }
}