1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
// Copyright Materialize, Inc. and contributors. All rights reserved.
//
// Use of this software is governed by the Business Source License
// included in the LICENSE file.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

//! Propagates expression equivalence from leaves to root, and back down again.
//!
//! Expression equivalences are `MirScalarExpr` replacements by simpler expressions.
//! These equivalences derive from
//!   Filters:  predicates must evaluate to `Datum::True`.
//!   Maps:     new columns equal the expressions that define them.
//!   Joins:    equated columns must be equal.
//!   Others:   lots of other predicates we might learn (range constraints on aggregates; non-negativity)
//!
//! From leaf to root the equivalences are *enforced*, and communicate that the expression will not produce rows that do not satisfy the equivalence.
//! From root to leaf the equivalences are *advised*, and communicate that the expression may discard any outputs that do not satisfy the equivalence.
//!
//! Importantly, in descent the operator *may not* assume any equivalence filtering will be applied to its results.
//! It cannot therefore produce rows it would otherwise not, even rows that do not satisfy the equivalence.
//! Operators *may* introduce filtering in descent, and they must do so to take further advantage of the equivalences.
//!
//! The subtlety is due to the expressions themselves causing the equivalences, and allowing more rows may invalidate equivalences.
//! For example, we might learn that `Column(7)` equals `Literal(3)`, but must refrain from introducing that substitution in descent,
//! because it is possible that the equivalence derives from restrictions in the expression we are visiting. Were we certain that the
//! equivalence was independent of the expression (e.g. through a more nuanced expression traversal) we could imaging relaxing this.

use std::collections::BTreeMap;

use mz_expr::{Id, MirRelationExpr, MirScalarExpr};
use mz_repr::Datum;

use crate::analysis::equivalences::{EquivalenceClasses, Equivalences, ExpressionReducer};
use crate::analysis::{Arity, DerivedView, RelationType};

use crate::{TransformCtx, TransformError};

/// Pulls up and pushes down predicate information represented as equivalences
#[derive(Debug, Default)]
pub struct EquivalencePropagation;

impl crate::Transform for EquivalencePropagation {
    #[mz_ore::instrument(
        target = "optimizer"
        level = "trace",
        fields(path.segment = "equivalence_propagation")
    )]
    fn transform(
        &self,
        relation: &mut MirRelationExpr,
        ctx: &mut TransformCtx,
    ) -> Result<(), TransformError> {
        // Perform bottom-up equivalence class analysis.
        use crate::analysis::DerivedBuilder;
        let mut builder = DerivedBuilder::new(ctx.features);
        builder.require(Equivalences);
        let derived = builder.visit(relation);
        let derived = derived.as_view();

        let mut get_equivalences = BTreeMap::default();
        self.apply(
            relation,
            derived,
            EquivalenceClasses::default(),
            &mut get_equivalences,
        );

        mz_repr::explain::trace_plan(&*relation);
        Ok(())
    }
}

impl EquivalencePropagation {
    /// Provides the opportunity to mutate `relation` in response to equivalences enforced by others.
    ///
    /// Provides the opportunity to mutate `relation` in response to equivalences enforced by their children,
    /// as presented in `derived`, and equivalences enforced of their output (by their ancestors), as presented
    /// in `outer_equivalences` and `get_equivalences`.
    ///
    /// The mutations should never invalidate an equivalence the operator has been reported as providing, as that
    /// information may have already been acted upon by others.
    ///
    /// The `expr_index` argument must equal `expr`s position in post-order, so that it can be used as a reference
    /// into `derived`. The argument can be used with the `SubtreeSize` analysis to determine the range of values
    /// associated with `expr`.
    ///
    /// After the call, `get_equivalences` will be populated with certainly equivalences that will be certainly
    /// enforced for all uses of each identifier. The information can be harvested and propagated to the definitions
    /// of those identifiers.
    pub fn apply(
        &self,
        expr: &mut MirRelationExpr,
        derived: DerivedView,
        mut outer_equivalences: EquivalenceClasses,
        get_equivalences: &mut BTreeMap<Id, EquivalenceClasses>,
    ) {
        // TODO: The top-down traversal can be coded as a worklist, with arguments tupled and enqueued.
        // This has the potential to do a lot more cloning (of `outer_equivalences`), and some care is needed
        // for `get_equivalences` which would be scoped to the whole method rather than tupled and enqueued.

        let expr_type = derived
            .value::<RelationType>()
            .expect("RelationType required");
        let expr_equivalences = derived
            .value::<Equivalences>()
            .expect("Equivalences required");

        // `None` analysis values indicate collections that can be pruned.
        let expr_equivalences = if let Some(e) = expr_equivalences {
            e
        } else {
            expr.take_safely();
            return;
        };

        // Optimize `outer_equivalences` in the context of `expr_type`.
        // If it ends up unsatisfiable, we can replace `expr` with an empty constant of the same relation type.
        let reducer = expr_equivalences.reducer();
        for class in outer_equivalences.classes.iter_mut() {
            for expr in class.iter_mut() {
                reducer.reduce_expr(expr);
            }
        }

        outer_equivalences.minimize(expr_type);
        if outer_equivalences.unsatisfiable() {
            expr.take_safely();
            return;
        }

        match expr {
            MirRelationExpr::Constant { rows, typ: _ } => {
                if let Ok(rows) = rows {
                    let mut datum_vec = mz_repr::DatumVec::new();
                    // Delete any rows that violate the equivalences.
                    // Do not delete rows that produce errors, as they are semantically important.
                    rows.retain(|(row, _count)| {
                        let temp_storage = mz_repr::RowArena::new();
                        let datums = datum_vec.borrow_with(row);
                        outer_equivalences.classes.iter().all(|class| {
                            // Any subset of `Ok` results must equate, or we can drop the row.
                            let mut oks = class
                                .iter()
                                .filter_map(|e| e.eval(&datums[..], &temp_storage).ok());
                            if let Some(e1) = oks.next() {
                                oks.all(|e2| e1 == e2)
                            } else {
                                true
                            }
                        })
                    });
                }
            }
            MirRelationExpr::Get { id, .. } => {
                // Install and intersect with other equivalences from other `Get` sites.
                // These will be read out by the corresponding `Let` binding's `value`.
                if let Some(equivs) = get_equivalences.get_mut(id) {
                    *equivs = equivs.union(&outer_equivalences);
                } else {
                    get_equivalences.insert(*id, outer_equivalences);
                }
            }
            MirRelationExpr::Let { id, .. } => {
                let id = *id;
                // Traverse `body` first to assemble equivalences to push to `value`.
                // Descend without a key for `id`, treating the absence as the identity for union.
                // `Get` nodes with identifier `id` will populate the equivalence classes with the intersection of their guarantees.
                let mut children_rev = expr.children_mut().rev().zip(derived.children_rev());

                let body = children_rev.next().unwrap();
                let value = children_rev.next().unwrap();

                self.apply(body.0, body.1, outer_equivalences.clone(), get_equivalences);

                // We expect to find `id` in `get_equivalences`, as otherwise the binding is
                // not referenced and can be removed.
                if let Some(equivalences) = get_equivalences.get(&Id::Local(id)) {
                    self.apply(value.0, value.1, equivalences.clone(), get_equivalences);
                }
            }
            MirRelationExpr::LetRec { .. } => {
                let mut child_iter = expr.children_mut().rev().zip(derived.children_rev());
                // Continue in `body` with the outer equivalences.
                let (body, view) = child_iter.next().unwrap();
                self.apply(body, view, outer_equivalences, get_equivalences);
                // Continue recursively, but without the outer equivalences supplied to `body`.
                for (child, view) in child_iter {
                    self.apply(child, view, EquivalenceClasses::default(), get_equivalences);
                }
            }
            MirRelationExpr::Project { input, outputs } => {
                // Transform `outer_equivalences` to one relevant for `input`.
                outer_equivalences.permute(outputs);
                self.apply(
                    input,
                    derived.last_child(),
                    outer_equivalences,
                    get_equivalences,
                );
            }
            MirRelationExpr::Map { input, scalars } => {
                // Optimize `scalars` with respect to input equivalences.
                let input_equivalences = derived
                    .last_child()
                    .value::<Equivalences>()
                    .expect("Equivalences required");

                if let Some(input_equivalences) = input_equivalences {
                    let reducer = input_equivalences.reducer();
                    for expr in scalars.iter_mut() {
                        reducer.reduce_expr(expr);
                    }
                    let input_arity = *derived
                        .last_child()
                        .value::<Arity>()
                        .expect("Arity required");
                    outer_equivalences.project(0..input_arity);
                    self.apply(
                        input,
                        derived.last_child(),
                        outer_equivalences,
                        get_equivalences,
                    );
                }
            }
            MirRelationExpr::FlatMap { input, exprs, .. } => {
                // Transform `exprs` by guarantees from `input` *and* from `outer`???
                let input_equivalences = derived
                    .last_child()
                    .value::<Equivalences>()
                    .expect("Equivalences required");

                if let Some(input_equivalences) = input_equivalences {
                    let reducer = input_equivalences.reducer();
                    for expr in exprs.iter_mut() {
                        reducer.reduce_expr(expr);
                    }
                    let input_arity = *derived
                        .last_child()
                        .value::<Arity>()
                        .expect("Arity required");
                    outer_equivalences.project(0..input_arity);
                    self.apply(
                        input,
                        derived.last_child(),
                        outer_equivalences,
                        get_equivalences,
                    );
                }
            }
            MirRelationExpr::Filter { input, predicates } => {
                // Transform `predicates` by guarantees from `input` *and* from `outer`???
                // If we reduce based on `input` guarantees, we won't be able to push those
                // constraints down into input, which may be fine but is worth considering.
                let input_equivalences = derived
                    .last_child()
                    .value::<Equivalences>()
                    .expect("Equivalences required");
                if let Some(input_equivalences) = input_equivalences {
                    let input_types = derived
                        .last_child()
                        .value::<RelationType>()
                        .expect("RelationType required");
                    let reducer = input_equivalences.reducer();
                    for expr in predicates.iter_mut() {
                        reducer.reduce_expr(expr);
                    }
                    // Incorporate `predicates` into `outer_equivalences`.
                    let mut class = predicates.clone();
                    class.push(MirScalarExpr::literal_ok(
                        Datum::True,
                        mz_repr::ScalarType::Bool,
                    ));
                    outer_equivalences.classes.push(class);
                    outer_equivalences.minimize(input_types);
                    self.apply(
                        input,
                        derived.last_child(),
                        outer_equivalences,
                        get_equivalences,
                    );
                }
            }

            MirRelationExpr::Join {
                inputs,
                equivalences,
                ..
            } => {
                // Certain equivalences are ensured by each of the inputs.
                // Other equivalences are imposed by parents of the expression.
                // We must not weaken the properties provided by the expression to its parents,
                // meaning we can optimize `equivalences` with respect to input guarantees,
                // but not with respect to `outer_equivalences`.

                // Each child can be presented with the integration of `join_equivalences`, `outer_equivalences`,
                // and each input equivalence *other than* their own, projected onto the input's columns.

                // Enumerate locations to find each child's analysis outputs.
                let mut children: Vec<_> = derived.children_rev().collect::<Vec<_>>();
                children.reverse();

                // Assemble the appended input types, for use in expression minimization.
                // Do not use `expr_types`, which may reflect nullability that does not hold for the inputs.
                let mut input_types = Some(
                    children
                        .iter()
                        .flat_map(|c| {
                            c.value::<RelationType>()
                                .expect("RelationType required")
                                .as_ref()
                                .unwrap()
                                .iter()
                                .cloned()
                        })
                        .collect::<Vec<_>>(),
                );

                // For each child, assemble its equivalences using join-relative column numbers.
                // Don't do anything with the children yet, as we'll want to revisit each with
                // this information at hand.
                let mut columns = 0;
                let mut input_equivalences = Vec::with_capacity(children.len());
                for child in children.iter() {
                    let child_arity = child.value::<Arity>().expect("Arity required");
                    let equivalences = child
                        .value::<Equivalences>()
                        .expect("Equivalences required")
                        .clone();

                    if let Some(mut equivalences) = equivalences {
                        let permutation = (columns..(columns + child_arity)).collect::<Vec<_>>();
                        equivalences.permute(&permutation);
                        equivalences.minimize(&input_types);
                        input_equivalences.push(equivalences);
                    }
                    columns += child_arity;
                }

                // Form the equivalences we will use to replace `equivalences`.
                let mut join_equivalences = EquivalenceClasses::default();
                join_equivalences
                    .classes
                    .extend(equivalences.iter().cloned());
                // // Optionally, introduce `outer_equivalences` into `equivalences`.
                // // This is not required, but it could be very helpful. To be seen.
                // join_equivalences
                //     .classes
                //     .extend(outer_equivalences.classes.clone());

                // Reduce join equivalences by the input equivalences.
                for input_equivs in input_equivalences.iter() {
                    let reducer = input_equivs.reducer();
                    for class in join_equivalences.classes.iter_mut() {
                        for expr in class.iter_mut() {
                            // Semijoin elimination currently fails if you do more advanced simplification than
                            // literal substitution.
                            let old = expr.clone();
                            reducer.reduce_expr(expr);
                            expr.reduce(input_types.as_ref().unwrap());
                            if !expr.is_literal() {
                                expr.clone_from(&old);
                            }
                        }
                    }
                }
                // Remove nullability information, as it has already been incorporated from input equivalences,
                // and if it was reduced out relative to input equivalences we don't want to re-introduce it.
                if let Some(input_types) = input_types.as_mut() {
                    for col in input_types.iter_mut() {
                        col.nullable = true;
                    }
                }
                join_equivalences.minimize(&input_types);

                // Revisit each child, determining the information to present to it, and recurring.
                let mut columns = 0;
                for ((index, child), expr) in
                    children.into_iter().enumerate().zip(inputs.iter_mut())
                {
                    let child_arity = child.value::<Arity>().expect("Arity required");

                    let mut push_equivalences = join_equivalences.clone();
                    push_equivalences
                        .classes
                        .extend(outer_equivalences.classes.clone());
                    for (other, input_equivs) in input_equivalences.iter().enumerate() {
                        if index != other {
                            push_equivalences
                                .classes
                                .extend(input_equivs.classes.clone());
                        }
                    }
                    push_equivalences.project(columns..(columns + child_arity));
                    self.apply(expr, child, push_equivalences, get_equivalences);

                    columns += child_arity;
                }

                equivalences.clone_from(&join_equivalences.classes);
            }
            MirRelationExpr::Reduce {
                input,
                group_key,
                aggregates,
                ..
            } => {
                // TODO: MIN, MAX, ANY, ALL aggregates pass through all certain properties of their columns.
                // This may involve projection and permutation, to reposition the information appropriately.
                // TODO: Non-null constraints likely push down into the support of the aggregate expressions.

                // Apply any equivalences about the input to key and aggregate expressions.
                let input_equivalences = derived
                    .last_child()
                    .value::<Equivalences>()
                    .expect("Equivalences required");
                if let Some(input_equivalences) = input_equivalences {
                    let input_type = derived
                        .last_child()
                        .value::<RelationType>()
                        .expect("RelationType required");
                    let reducer = input_equivalences.reducer();
                    for key in group_key.iter_mut() {
                        // Semijoin elimination currently fails if you do more advanced simplification than
                        // literal substitution.
                        let old_key = key.clone();
                        reducer.reduce_expr(key);
                        key.reduce(input_type.as_ref().unwrap());
                        if !key.is_literal() {
                            key.clone_from(&old_key);
                        }
                    }
                    for aggr in aggregates.iter_mut() {
                        reducer.reduce_expr(&mut aggr.expr);
                        // A count expression over a non-null expression can discard the expression.
                        if aggr.func == mz_expr::AggregateFunc::Count && !aggr.distinct {
                            let mut probe = aggr.expr.clone().call_is_null();
                            reducer.reduce_expr(&mut probe);
                            if probe.is_literal_false() {
                                aggr.expr = MirScalarExpr::literal_true();
                            }
                        }
                    }
                }

                // To transform `outer_equivalences` to one about `input`, we will "pretend" to pre-pend all of
                // the input columns, introduce equivalences about the evaluation of `group_key` on them
                // and the key columns themselves, and then project onto these "input" columns.
                let input_arity = *derived
                    .last_child()
                    .value::<Arity>()
                    .expect("Arity required");
                let output_arity = *derived.value::<Arity>().expect("Arity required");

                // Permute `outer_equivalences` to reference columns `input_arity` later.
                let permutation = (input_arity..(input_arity + output_arity)).collect::<Vec<_>>();
                outer_equivalences.permute(&permutation[..]);
                for (index, group) in group_key.iter().enumerate() {
                    outer_equivalences.classes.push(vec![
                        MirScalarExpr::Column(input_arity + index),
                        group.clone(),
                    ]);
                }
                outer_equivalences.project(0..input_arity);
                self.apply(
                    input,
                    derived.last_child(),
                    outer_equivalences,
                    get_equivalences,
                );
            }
            MirRelationExpr::TopK {
                input, group_key, ..
            } => {
                // TODO: Update `limit` expressions, but only if we update `group_key` at the same time.
                //       It is important to both or neither, to ensure that `limit` only references columns in `group_key`.

                // Discard equivalences among non-key columns, as it is not correct that `input` may drop rows
                // that violate constraints among non-key columns without affecting the result.
                outer_equivalences.project(0..group_key.len());
                self.apply(
                    input,
                    derived.last_child(),
                    outer_equivalences,
                    get_equivalences,
                );
            }
            MirRelationExpr::Negate { input } => {
                self.apply(
                    input,
                    derived.last_child(),
                    outer_equivalences,
                    get_equivalences,
                );
            }
            MirRelationExpr::Threshold { input } => {
                self.apply(
                    input,
                    derived.last_child(),
                    outer_equivalences,
                    get_equivalences,
                );
            }
            MirRelationExpr::Union { .. } => {
                for (child, derived) in expr.children_mut().rev().zip(derived.children_rev()) {
                    self.apply(child, derived, outer_equivalences.clone(), get_equivalences);
                }
            }
            MirRelationExpr::ArrangeBy { input, .. } => {
                // TODO: Option to alter arrangement keys, though .. terrifying.
                self.apply(
                    input,
                    derived.last_child(),
                    outer_equivalences,
                    get_equivalences,
                );
            }
        }
    }
}