1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223
// Copyright Materialize, Inc. and contributors. All rights reserved.
//
// Use of this software is governed by the Business Source License
// included in the LICENSE file.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.
//! Analysis to identify monotonic collections, especially TopK inputs.
use std::collections::BTreeSet;
use itertools::zip_eq;
use mz_expr::visit::VisitChildren;
use mz_expr::{Id, LetRecLimit, LocalId, MirRelationExpr, RECURSION_LIMIT};
use mz_ore::cast::CastFrom;
use mz_ore::soft_panic_or_log;
use mz_ore::stack::{CheckedRecursion, RecursionGuard};
use mz_repr::GlobalId;
/// A struct that holds a recursive function that determines if a
/// relation is monotonic, and applies any optimizations along the way.
#[derive(Debug)]
pub struct MonotonicFlag {
recursion_guard: RecursionGuard,
}
impl Default for MonotonicFlag {
fn default() -> MonotonicFlag {
MonotonicFlag {
recursion_guard: RecursionGuard::with_limit(RECURSION_LIMIT),
}
}
}
impl CheckedRecursion for MonotonicFlag {
fn recursion_guard(&self) -> &RecursionGuard {
&self.recursion_guard
}
}
impl MonotonicFlag {
/// Determines if a relation is monotonic, and applies any optimizations along the way.
/// mon_ids should be the ids of monotonic sources and indexes involved in expr.
pub fn apply(
&self,
expr: &mut MirRelationExpr,
mon_ids: &BTreeSet<GlobalId>,
locals: &mut BTreeSet<LocalId>,
) -> Result<bool, crate::RecursionLimitError> {
self.checked_recur(|_| {
let is_monotonic = match expr {
MirRelationExpr::Get { id, .. } => match id {
Id::Global(id) => mon_ids.contains(id),
Id::Local(id) => locals.contains(id),
},
MirRelationExpr::Project { input, .. } => self.apply(input, mon_ids, locals)?,
MirRelationExpr::Filter { input, predicates } => {
let is_monotonic = self.apply(input, mon_ids, locals)?;
// Non-temporal predicates can introduce non-monotonicity, as they
// can result in the future removal of records.
// TODO: this could be improved to only restrict if upper bounds
// are present, as temporal lower bounds only delay introduction.
is_monotonic && !predicates.iter().any(|p| p.contains_temporal())
}
MirRelationExpr::Map { input, .. } => self.apply(input, mon_ids, locals)?,
MirRelationExpr::TopK {
input, monotonic, ..
} => {
*monotonic = self.apply(input, mon_ids, locals)?;
false
}
MirRelationExpr::Reduce {
input,
aggregates,
monotonic,
..
} => {
*monotonic = self.apply(input, mon_ids, locals)?;
// Reduce is monotonic iff its input is and it is a "distinct",
// with no aggregate values; otherwise it may need to retract.
*monotonic && aggregates.is_empty()
}
MirRelationExpr::Union { base, inputs } => {
let mut monotonic = self.apply(base, mon_ids, locals)?;
for input in inputs.iter_mut() {
let monotonic_i = self.apply(input, mon_ids, locals)?;
monotonic = monotonic && monotonic_i;
}
monotonic
}
MirRelationExpr::ArrangeBy { input, .. } => self.apply(input, mon_ids, locals)?,
MirRelationExpr::FlatMap { input, func, .. } => {
let is_monotonic = self.apply(input, mon_ids, locals)?;
is_monotonic && func.preserves_monotonicity()
}
MirRelationExpr::Join { inputs, .. } => {
// If all inputs to the join are monotonic then so is the join.
let mut monotonic = true;
for input in inputs.iter_mut() {
let monotonic_i = self.apply(input, mon_ids, locals)?;
monotonic = monotonic && monotonic_i;
}
monotonic
}
MirRelationExpr::Constant { rows: Ok(rows), .. } => {
rows.iter().all(|(_, diff)| diff > &0)
}
MirRelationExpr::Threshold { input } => self.apply(input, mon_ids, locals)?,
MirRelationExpr::Let { id, value, body } => {
let prior = locals.remove(id);
if self.apply(value, mon_ids, locals)? {
locals.insert(*id);
}
let result = self.apply(body, mon_ids, locals)?;
if prior {
locals.insert(*id);
} else {
locals.remove(id);
}
result
}
MirRelationExpr::LetRec {
ids,
values,
limits,
body,
} => {
// Optimistically assume that all bindings were monotonic
// (it is safe to do this as we initialize each binding to
// the empty collection, which is trivially monotonic).
for id in ids.iter() {
let inserted = locals.insert(id.clone());
assert!(inserted, "Shadowing of identifier: {id:?}");
}
// The following is equivalent to a dataflow analysis on the
// following lattice:
// - The bottom element is `true` / contained in `locals`.
// - The top element is `false` / not present in `locals`.
// - The join operator is boolean `AND`.
// - The meet operator is boolean `OR` (but it's not used).
//
// Sequentially AND locals[id] with the result of descending
// into a clone of values[id]. Repeat until one of the
// following conditions is met:
//
// 1. The locals entries have stabilized at a fixpoint.
// 2. No fixpoint was found after `max_iterations`. If this
// is the case we reset the locals entries for all
// recursive CTEs to `false` by removing them from
// `locals` (pessimistic approximation).
// 3. We reach the user-specified recursion limit of any of
// the bindings. In this case, we also give up similarly
// to (2), because we don't want to complicate things
// with handling different limits per binding.
let min_max_iter = LetRecLimit::min_max_iter(limits);
let max_iterations = 100;
let mut curr_iteration = 0;
loop {
// Check for conditions (2) and (3).
if curr_iteration >= max_iterations
|| min_max_iter
.map(|min_max_iter| curr_iteration >= min_max_iter)
.unwrap_or(false)
{
if curr_iteration > u64::cast_from(ids.len()) {
soft_panic_or_log!(
"LetRec loop in MonotonicFlag has not converged in |{}|",
ids.len()
);
}
for id in ids.iter() {
locals.remove(id);
}
break;
}
// Check for condition (1).
let mut change = false;
for (id, mut value) in zip_eq(ids.iter(), values.iter().cloned()) {
if !self.apply(&mut value, mon_ids, locals)? {
change |= locals.remove(id); // set only if `id` is still present
}
}
if !change {
break;
}
curr_iteration += 1;
}
// Descend into the values with the locals.
for value in values.iter_mut() {
self.apply(value, mon_ids, locals)?;
}
let is_monotonic = self.apply(body, mon_ids, locals)?;
// Remove shadowed bindings. This is good hygiene, as
// otherwise with nested LetRec blocks the `loop { ... }`
// above will carry inner LetRec IDs across outer LetRec
// iterations. As a consequence, the "no shadowing"
// assertion at the beginning of this block will fail at the
// inner LetRec for the second outer LetRec iteration.
for id in ids.iter() {
locals.remove(id);
}
is_monotonic
}
// The default behavior.
// TODO: check that this is the behavior we want.
MirRelationExpr::Negate { .. } | MirRelationExpr::Constant { rows: Err(_), .. } => {
expr.try_visit_mut_children(|e| self.apply(e, mon_ids, locals).map(|_| ()))?;
false
}
};
Ok(is_monotonic)
})
}
}