1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
// Copyright Materialize, Inc. and contributors. All rights reserved.
//
// Use of this software is governed by the Business Source License
// included in the LICENSE file.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

//! Definition and helper structs for the [`NonNegative`] attribute.

use mz_expr::{Id, MirRelationExpr};

use crate::attribute::subtree_size::SubtreeSize;
use crate::attribute::{Attribute, DerivedAttributes, DerivedAttributesBuilder, Env};

/// Traverses a [`MirRelationExpr`] tree and figures out whether for subtree
/// the sum of all diffs up to a specific time for any record can be a
/// negative value.
///
/// The results for each subtree are accumulated in a bottom-up order
/// in [`NonNegative::results`].
///
/// This method is a conservative approximation and is known to miss not-hard
/// cases.
///
/// It assumes that all `Get`s to global ids correspond to collections without negative
/// multiplicities. Local let bindings' non-negativity results are stored in `env`.
#[derive(Default)]
#[allow(missing_debug_implementations)]
pub struct NonNegative {
    /// Environment of computed values for this attribute.
    env: Env<Self>,
    /// A vector of results for all nodes in the visited tree in
    /// post-visit order.
    pub results: Vec<bool>,
}

impl Attribute for NonNegative {
    type Value = bool;

    fn derive(&mut self, expr: &MirRelationExpr, deps: &DerivedAttributes) {
        use MirRelationExpr::*;
        let n = self.results.len();
        match expr {
            Constant { rows, .. } => {
                if let Ok(rows) = rows {
                    let has_negative_rows = rows.iter().any(|(_data, diff)| diff < &0);
                    self.results.push(!has_negative_rows);
                } else {
                    self.results.push(true); // constant errors are considered "non-negative"
                }
            }
            Get { id, .. } => match id {
                Id::Local(id) => match self.env.get(id) {
                    Some(value) => self.results.push(value.clone()),
                    None => self.results.push(false),
                },
                Id::Global(_) => {
                    self.results.push(true);
                }
            },
            Let {
                value: _, body: _, ..
            } => {
                let body = self.results[n - 1];
                self.results.push(body);
            }
            LetRec { .. } => {
                let body = self.results[n - 1];
                self.results.push(body);
            }
            Project { input: _, .. } => {
                let input = self.results[n - 1];
                self.results.push(input);
            }
            Map { input: _, .. } => {
                let input = self.results[n - 1];
                self.results.push(input);
            }
            FlatMap { input: _, .. } => {
                let input = self.results[n - 1];
                self.results.push(input);
            }
            Filter { input: _, .. } => {
                let input = self.results[n - 1];
                self.results.push(input);
            }
            Join { inputs, .. } => {
                let mut result = true;
                let mut offset = 1;
                for _ in 0..inputs.len() {
                    result &= &self.results[n - offset];
                    offset += &deps.get_results::<SubtreeSize>()[n - offset];
                }
                self.results.push(result);
            }
            Reduce { input: _, .. } => {
                let input = self.results[n - 1];
                self.results.push(input);
            }
            TopK { input: _, .. } => {
                let input = self.results[n - 1];
                self.results.push(input);
            }
            Negate { input: _ } => {
                // Can be refined if we have proper LetRec handling
                self.results.push(false);
            }
            Threshold { input: _ } => {
                self.results.push(true);
            }
            Union { base: _, inputs } => {
                let mut result = true;
                let mut offset = 1;
                for _ in 0..inputs.len() {
                    result &= &self.results[n - offset];
                    offset += &deps.get_results::<SubtreeSize>()[n - offset];
                }
                result &= &self.results[n - offset]; // include the base result
                self.results.push(result);
            }
            ArrangeBy { input: _, .. } => {
                let input = self.results[n - 1];
                self.results.push(input);
            }
        }
    }

    fn schedule_env_tasks(&mut self, expr: &MirRelationExpr) {
        self.env.schedule_tasks(expr);
    }

    fn handle_env_tasks(&mut self) {
        self.env.handle_tasks(&self.results);
    }

    fn add_dependencies(builder: &mut DerivedAttributesBuilder)
    where
        Self: Sized,
    {
        builder.require(SubtreeSize::default());
    }

    fn get_results(&self) -> &Vec<Self::Value> {
        &self.results
    }

    fn get_results_mut(&mut self) -> &mut Vec<Self::Value> {
        &mut self.results
    }

    fn take(self) -> Vec<Self::Value> {
        self.results
    }
}