1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
// Copyright Materialize, Inc. and contributors. All rights reserved.
//
// Use of this software is governed by the Business Source License
// included in the LICENSE file.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

//! Analysis to identify monotonic collections, especially TopK inputs.
use expr::{GlobalId, Id, LocalId};
use expr::{MirRelationExpr, RECURSION_LIMIT};
use ore::stack::{CheckedRecursion, RecursionGuard};
use std::collections::HashSet;

/// A struct that holds a recursive function that determines if a
/// relation is monotonic, and applies any optimizations along the way.
#[derive(Debug)]
pub struct MonotonicFlag {
    recursion_guard: RecursionGuard,
}

impl Default for MonotonicFlag {
    fn default() -> MonotonicFlag {
        MonotonicFlag {
            recursion_guard: RecursionGuard::with_limit(RECURSION_LIMIT),
        }
    }
}

impl CheckedRecursion for MonotonicFlag {
    fn recursion_guard(&self) -> &RecursionGuard {
        &self.recursion_guard
    }
}

impl MonotonicFlag {
    /// Determines if a relation is monotonic, and applies any optimizations along the way.
    pub fn apply(
        &self,
        expr: &mut MirRelationExpr,
        sources: &HashSet<GlobalId>,
        locals: &mut HashSet<LocalId>,
    ) -> Result<bool, crate::TransformError> {
        self.checked_recur(|_| {
            let is_monotonic = match expr {
                MirRelationExpr::Get { id, .. } => match id {
                    Id::Global(id) => sources.contains(id),
                    Id::Local(id) => locals.contains(id),
                    _ => false,
                },
                MirRelationExpr::Project { input, .. } => self.apply(input, sources, locals)?,
                MirRelationExpr::Filter { input, predicates } => {
                    let is_monotonic = self.apply(input, sources, locals)?;
                    // Non-temporal predicates can introduce non-monotonicity, as they
                    // can result in the future removal of records.
                    // TODO: this could be improved to only restrict if upper bounds
                    // are present, as temporal lower bounds only delay introduction.
                    is_monotonic && !predicates.iter().any(|p| p.contains_temporal())
                }
                MirRelationExpr::Map { input, .. } => self.apply(input, sources, locals)?,
                MirRelationExpr::TopK {
                    input, monotonic, ..
                } => {
                    *monotonic = self.apply(input, sources, locals)?;
                    false
                }
                MirRelationExpr::Reduce {
                    input,
                    aggregates,
                    monotonic,
                    ..
                } => {
                    *monotonic = self.apply(input, sources, locals)?;
                    // Reduce is monotonic iff its input is and it is a "distinct",
                    // with no aggregate values; otherwise it may need to retract.
                    *monotonic && aggregates.is_empty()
                }
                MirRelationExpr::Union { base, inputs } => {
                    let mut monotonic = self.apply(base, sources, locals)?;
                    for input in inputs.iter_mut() {
                        let monotonic_i = self.apply(input, sources, locals)?;
                        monotonic = monotonic && monotonic_i;
                    }
                    monotonic
                }
                MirRelationExpr::ArrangeBy { input, .. }
                | MirRelationExpr::DeclareKeys { input, .. } => {
                    self.apply(input, sources, locals)?
                }
                MirRelationExpr::FlatMap { input, func, .. } => {
                    let is_monotonic = self.apply(input, sources, locals)?;
                    is_monotonic && func.preserves_monotonicity()
                }
                MirRelationExpr::Join { inputs, .. } => {
                    // If all inputs to the join are monotonic then so is the join.
                    let mut monotonic = true;
                    for input in inputs.iter_mut() {
                        let monotonic_i = self.apply(input, sources, locals)?;
                        monotonic = monotonic && monotonic_i;
                    }
                    monotonic
                }
                MirRelationExpr::Constant { rows: Ok(rows), .. } => {
                    rows.iter().all(|(_, diff)| diff > &0)
                }
                MirRelationExpr::Threshold { input } => self.apply(input, sources, locals)?,
                MirRelationExpr::Let { id, value, body } => {
                    let prior = locals.remove(id);
                    if self.apply(value, sources, locals)? {
                        locals.insert(*id);
                    }
                    let result = self.apply(body, sources, locals)?;
                    if prior {
                        locals.insert(*id);
                    } else {
                        locals.remove(id);
                    }
                    result
                }
                // The default behavior.
                // TODO: check that this is the behavior we want.
                MirRelationExpr::Negate { .. } | MirRelationExpr::Constant { rows: Err(_), .. } => {
                    expr.try_visit_mut_children(|e| self.apply(e, sources, locals).map(|_| ()))?;
                    false
                }
            };
            Ok(is_monotonic)
        })
    }
}