1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
// Copyright Materialize, Inc. and contributors. All rights reserved.
//
// Use of this software is governed by the Business Source License
// included in the LICENSE file.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

//! Reduction execution planning and dataflow construction.

//! We build `ReducePlan`s to manage the complexity of planning the generated dataflow for a
//! given reduce expression. The intent here is that each creating a `ReducePlan` should capture
//! all of the decision making about what kind of dataflow do we need to render and what each
//! operator needs to do, and then actually rendering the plan can be a relatively simple application
//! of (as much as possible) straight line code.
//!
//! Materialize needs to be able to maintain reductions incrementally (roughly, using
//! time proportional to the number of changes in the input) and ideally, with a
//! memory footprint proportional to the number of reductions being computed. We have to employ
//! several tricks to achieve that, and these tricks constitute most of the complexity involved
//! with planning and rendering reduce expressions. There's some additional complexity involved
//! in handling aggregations with `DISTINCT` correctly so that we can efficiently suppress
//! duplicate updates.
//!
//! In order to optimize the performance of our rendered dataflow, we divide all aggregations
//! into three distinct types. Each type gets rendered separately, with its own specialized plan
//! and dataflow. The three types are as follows:
//!
//! 1. Accumulable:
//!    Accumulable reductions can be computed inline in a Differential update's `difference`
//!    field because they basically boil down to tracking counts of things. `sum()` is an
//!    example of an accumulable reduction, and when some element `x` is removed from the set
//!    of elements being summed, we can introduce `-x` to incrementally maintain the sum. More
//!    formally, accumulable reductions correspond to instances of commutative Abelian groups.
//! 2. Hierarchical:
//!    Hierarchical reductions don't have a meaningful negation like accumulable reductions do, but
//!    they are still commutative and associative, which lets us compute the reduction over subsets
//!    of the input, and then compute the reduction again on those results. For example:
//!    `min[2, 5, 1, 10]` is the same as `min[ min[2, 5], min[1, 10]]`. When we compute hierarchical
//!    reductions this way, we can maintain the computation in sublinear time with respect to
//!    the overall input. `min` and `max` are two examples of hierarchical reductions. More formally,
//!    hierarchical reductions correspond to instances of semigroups, in that they are associative,
//!    but in order to benefit from being computed hierarchically, they need to have some reduction
//!    in data size as well. A function like "concat-everything-to-a-string" wouldn't benefit from
//!    hierarchical evaluation.
//!
//!    When the input is append-only, or monotonic, reductions that would otherwise have to be computed
//!    hierarchically can instead be computed in-place, because we only need to keep the value that's
//!    better than the "best" (minimal or maximal for min and max) seen so far.
//! 3. Basic:
//!    Basic reductions are a bit like the Hufflepuffs of this trifecta. They are neither accumulable nor
//!    hierarchical (most likely they are associative but don't involve any data reduction) and so for these
//!    we can't do much more than just defer to Differential's reduce operator and eat a large maintenance cost.
//!
//! When we render these reductions we want to limit the number of arrangements we produce. When we build a
//! dataflow for a reduction containing multiple types of reductions, we have no choice but to divide up the
//! requested aggregations by type, render each type separately and then take those results and collate them
//! back in the requested output order. However, if we only need to perform aggregations of a single reduction
//! type, we can specialize and render the dataflow to compute those aggregations in the correct order, and
//! return the output arrangement directly and avoid the extra collation arrangement.

use std::collections::BTreeMap;

use mz_expr::{
    permutation_for_arrangement, AggregateExpr, AggregateFunc, MapFilterProject, MirScalarExpr,
};
use mz_ore::{assert_none, soft_assert_or_log};
use mz_proto::{IntoRustIfSome, ProtoType, RustType, TryFromProtoError};
use proptest::prelude::{any, Arbitrary, BoxedStrategy};
use proptest::strategy::Strategy;
use proptest_derive::Arbitrary;
use serde::{Deserialize, Serialize};

use crate::plan::{bucketing_of_expected_group_size, AvailableCollections};

include!(concat!(env!("OUT_DIR"), "/mz_compute_types.plan.reduce.rs"));

/// This enum represents the three potential types of aggregations.
#[derive(Copy, Clone, Debug, Deserialize, Eq, Hash, Ord, PartialEq, PartialOrd, Serialize)]
pub enum ReductionType {
    /// Accumulable functions can be subtracted from (are invertible), and associative.
    /// We can compute these results by moving some data to the diff field under arbitrary
    /// changes to inputs. Examples include sum or count.
    Accumulable,
    /// Hierarchical functions are associative, which means we can split up the work of
    /// computing them across subsets. Note that hierarchical reductions should also
    /// reduce the data in some way, as otherwise rendering them hierarchically is not
    /// worth it. Examples include min or max.
    Hierarchical,
    /// Basic, for lack of a better word, are functions that are neither accumulable
    /// nor hierarchical. Examples include jsonb_agg.
    Basic,
}

impl columnation::Columnation for ReductionType {
    type InnerRegion = columnation::CopyRegion<ReductionType>;
}

impl RustType<ProtoReductionType> for ReductionType {
    fn into_proto(&self) -> ProtoReductionType {
        use proto_reduction_type::Kind;
        ProtoReductionType {
            kind: Some(match self {
                ReductionType::Accumulable => Kind::Accumulable(()),
                ReductionType::Hierarchical => Kind::Hierarchical(()),
                ReductionType::Basic => Kind::Basic(()),
            }),
        }
    }

    fn from_proto(proto: ProtoReductionType) -> Result<Self, TryFromProtoError> {
        use proto_reduction_type::Kind;
        let kind = proto
            .kind
            .ok_or_else(|| TryFromProtoError::missing_field("kind"))?;
        Ok(match kind {
            Kind::Accumulable(()) => ReductionType::Accumulable,
            Kind::Hierarchical(()) => ReductionType::Hierarchical,
            Kind::Basic(()) => ReductionType::Basic,
        })
    }
}

impl TryFrom<&ReducePlan> for ReductionType {
    type Error = ();

    fn try_from(plan: &ReducePlan) -> Result<Self, Self::Error> {
        match plan {
            ReducePlan::Hierarchical(_) => Ok(ReductionType::Hierarchical),
            ReducePlan::Accumulable(_) => Ok(ReductionType::Accumulable),
            ReducePlan::Basic(_) => Ok(ReductionType::Basic),
            _ => Err(()),
        }
    }
}

/// A `ReducePlan` provides a concise description for how we will
/// execute a given reduce expression.
///
/// The provided reduce expression can have no
/// aggregations, in which case its just a `Distinct` and otherwise
/// it's composed of a combination of accumulable, hierarchical and
/// basic aggregations.
///
/// We want to try to centralize as much decision making about the
/// shape / general computation of the rendered dataflow graph
/// in this plan, and then make actually rendering the graph
/// be as simple (and compiler verifiable) as possible.
#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq, Ord, PartialOrd)]
pub enum ReducePlan {
    /// Plan for not computing any aggregations, just determining the set of
    /// distinct keys.
    Distinct,
    /// Plan for computing only accumulable aggregations.
    Accumulable(AccumulablePlan),
    /// Plan for computing only hierarchical aggregations.
    Hierarchical(HierarchicalPlan),
    /// Plan for computing only basic aggregations.
    Basic(BasicPlan),
    /// Plan for computing a mix of different kinds of aggregations.
    /// We need to do extra work here to reassemble results back in the
    /// requested order.
    Collation(CollationPlan),
}

proptest::prop_compose! {
    /// `expected_group_size` is a u64, but instead of a uniform distribution,
    /// we want a logarithmic distribution so that we have an even distribution
    /// in the number of layers of buckets that a hierarchical plan would have.
    fn any_group_size()
        (bits in 0..u64::BITS)
        (integer in (((1_u64) << bits) - 1)
            ..(if bits == (u64::BITS - 1){ u64::MAX }
                else { (1_u64) << (bits + 1) - 1 }))
    -> u64 {
        integer
    }
}

/// To avoid stack overflow, this limits the arbitrarily-generated test
/// `ReducePlan`s to involve at most 8 aggregations.
///
/// To have better coverage of realistic expected group sizes, the
/// `expected group size` has a logarithmic distribution.
impl Arbitrary for ReducePlan {
    type Parameters = ();

    type Strategy = BoxedStrategy<Self>;

    fn arbitrary_with(_: Self::Parameters) -> Self::Strategy {
        (
            proptest::collection::vec(any::<AggregateExpr>(), 0..8),
            any::<bool>(),
            any::<bool>(),
            any_group_size(),
            any::<bool>(),
        )
            .prop_map(
                |(
                    exprs,
                    monotonic,
                    any_expected_size,
                    expected_group_size,
                    mut fused_unnest_list,
                )| {
                    let expected_group_size = if any_expected_size {
                        Some(expected_group_size)
                    } else {
                        None
                    };
                    if !(exprs.len() == 1
                        && matches!(reduction_type(&exprs[0].func), ReductionType::Basic))
                    {
                        fused_unnest_list = false;
                    }
                    ReducePlan::create_from(
                        exprs,
                        monotonic,
                        expected_group_size,
                        fused_unnest_list,
                    )
                },
            )
            .boxed()
    }
}

impl RustType<ProtoReducePlan> for ReducePlan {
    fn into_proto(&self) -> ProtoReducePlan {
        use proto_reduce_plan::Kind::*;
        ProtoReducePlan {
            kind: Some(match self {
                ReducePlan::Distinct => Distinct(()),
                ReducePlan::Accumulable(plan) => Accumulable(plan.into_proto()),
                ReducePlan::Hierarchical(plan) => Hierarchical(plan.into_proto()),
                ReducePlan::Basic(plan) => Basic(plan.into_proto()),
                ReducePlan::Collation(plan) => Collation(plan.into_proto()),
            }),
        }
    }

    fn from_proto(proto: ProtoReducePlan) -> Result<Self, TryFromProtoError> {
        use proto_reduce_plan::Kind::*;
        let kind = proto
            .kind
            .ok_or_else(|| TryFromProtoError::missing_field("ProtoReducePlan::kind"))?;
        Ok(match kind {
            Distinct(()) => ReducePlan::Distinct,
            Accumulable(plan) => ReducePlan::Accumulable(plan.into_rust()?),
            Hierarchical(plan) => ReducePlan::Hierarchical(plan.into_rust()?),
            Basic(plan) => ReducePlan::Basic(plan.into_rust()?),
            Collation(plan) => ReducePlan::Collation(plan.into_rust()?),
        })
    }
}

/// Plan for computing a set of accumulable aggregations.
///
/// We fuse all of the accumulable aggregations together
/// and compute them with one dataflow fragment. We need to
/// be careful to separate out the aggregations that
/// apply only to the distinct set of values. We need
/// to apply a distinct operator to those before we
/// combine them with everything else.
#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq, Ord, PartialOrd)]
pub struct AccumulablePlan {
    /// All of the aggregations we were asked to compute, stored
    /// in order.
    pub full_aggrs: Vec<AggregateExpr>,
    /// All of the non-distinct accumulable aggregates.
    /// Each element represents:
    /// (index of the aggregation among accumulable aggregations,
    ///  index of the datum among inputs, aggregation expr)
    /// These will all be rendered together in one dataflow fragment.
    pub simple_aggrs: Vec<(usize, usize, AggregateExpr)>,
    /// Same as above but for all of the `DISTINCT` accumulable aggregations.
    pub distinct_aggrs: Vec<(usize, usize, AggregateExpr)>,
}

impl RustType<proto_accumulable_plan::ProtoAggr> for (usize, usize, AggregateExpr) {
    fn into_proto(&self) -> proto_accumulable_plan::ProtoAggr {
        proto_accumulable_plan::ProtoAggr {
            index_agg: self.0.into_proto(),
            index_inp: self.1.into_proto(),
            expr: Some(self.2.into_proto()),
        }
    }

    fn from_proto(proto: proto_accumulable_plan::ProtoAggr) -> Result<Self, TryFromProtoError> {
        Ok((
            proto.index_agg.into_rust()?,
            proto.index_inp.into_rust()?,
            proto.expr.into_rust_if_some("ProtoAggr::expr")?,
        ))
    }
}

impl RustType<ProtoAccumulablePlan> for AccumulablePlan {
    fn into_proto(&self) -> ProtoAccumulablePlan {
        ProtoAccumulablePlan {
            full_aggrs: self.full_aggrs.into_proto(),
            simple_aggrs: self.simple_aggrs.into_proto(),
            distinct_aggrs: self.distinct_aggrs.into_proto(),
        }
    }

    fn from_proto(proto: ProtoAccumulablePlan) -> Result<Self, TryFromProtoError> {
        Ok(Self {
            full_aggrs: proto.full_aggrs.into_rust()?,
            simple_aggrs: proto.simple_aggrs.into_rust()?,
            distinct_aggrs: proto.distinct_aggrs.into_rust()?,
        })
    }
}

/// Plan for computing a set of hierarchical aggregations.
///
/// In the append-only setting we can render them in-place
/// with monotonic plans, but otherwise, we need to render
/// them with a reduction tree that splits the inputs into
/// small, and then progressively larger, buckets
#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq, Ord, PartialOrd)]
pub enum HierarchicalPlan {
    /// Plan hierarchical aggregations under monotonic inputs.
    Monotonic(MonotonicPlan),
    /// Plan for hierarchical aggregations under non-monotonic inputs.
    Bucketed(BucketedPlan),
}

impl HierarchicalPlan {
    /// Upgrades from a bucketed plan to a monotonic plan, if necessary,
    /// and sets consolidation requirements.
    pub fn as_monotonic(&mut self, must_consolidate: bool) {
        match self {
            HierarchicalPlan::Bucketed(bucketed) => {
                // TODO: ideally we would not have the `clone()` but ownership
                // seems fraught here as we are behind a `&mut self` reference.
                *self =
                    HierarchicalPlan::Monotonic(bucketed.clone().into_monotonic(must_consolidate));
            }
            HierarchicalPlan::Monotonic(monotonic) => {
                monotonic.must_consolidate = must_consolidate;
            }
        }
    }
}

impl RustType<ProtoHierarchicalPlan> for HierarchicalPlan {
    fn into_proto(&self) -> ProtoHierarchicalPlan {
        use proto_hierarchical_plan::Kind;
        ProtoHierarchicalPlan {
            kind: Some(match self {
                HierarchicalPlan::Monotonic(plan) => Kind::Monotonic(plan.into_proto()),
                HierarchicalPlan::Bucketed(plan) => Kind::Bucketed(plan.into_proto()),
            }),
        }
    }

    fn from_proto(proto: ProtoHierarchicalPlan) -> Result<Self, TryFromProtoError> {
        use proto_hierarchical_plan::Kind;
        let kind = proto
            .kind
            .ok_or_else(|| TryFromProtoError::missing_field("ProtoHierarchicalPlan::Kind"))?;
        Ok(match kind {
            Kind::Monotonic(plan) => HierarchicalPlan::Monotonic(plan.into_rust()?),
            Kind::Bucketed(plan) => HierarchicalPlan::Bucketed(plan.into_rust()?),
        })
    }
}

/// Plan for computing a set of hierarchical aggregations with a
/// monotonic input.
///
/// Here, the aggregations will be rendered in place. We don't
/// need to worry about retractions because the inputs are
/// append only, so we can change our computation to
/// only retain the "best" value in the diff field, instead
/// of holding onto all values.
#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq, Ord, PartialOrd)]
pub struct MonotonicPlan {
    /// All of the aggregations we were asked to compute.
    pub aggr_funcs: Vec<AggregateFunc>,
    /// Set of "skips" or calls to `nth()` an iterator needs to do over
    /// the input to extract the relevant datums.
    pub skips: Vec<usize>,
    /// True if the input is not physically monotonic, and the operator must perform
    /// consolidation to remove potential negations. The operator implementation is
    /// free to consolidate as late as possible while ensuring correctness, so it is
    /// not a requirement that the input be directly subjected to consolidation.
    /// More details in the monotonic one-shot `SELECT`s design doc.[^1]
    ///
    /// [^1] <https://github.com/MaterializeInc/materialize/blob/main/doc/developer/design/20230421_stabilize_monotonic_select.md>
    pub must_consolidate: bool,
}

impl RustType<ProtoMonotonicPlan> for MonotonicPlan {
    fn into_proto(&self) -> ProtoMonotonicPlan {
        ProtoMonotonicPlan {
            aggr_funcs: self.aggr_funcs.into_proto(),
            skips: self.skips.into_proto(),
            must_consolidate: self.must_consolidate.into_proto(),
        }
    }

    fn from_proto(proto: ProtoMonotonicPlan) -> Result<Self, TryFromProtoError> {
        Ok(Self {
            aggr_funcs: proto.aggr_funcs.into_rust()?,
            skips: proto.skips.into_rust()?,
            must_consolidate: proto.must_consolidate.into_rust()?,
        })
    }
}

/// Plan for computing a set of hierarchical aggregations
/// with non-monotonic inputs.
///
/// To perform hierarchical aggregations with stable runtimes
/// under updates we'll subdivide the group key into buckets, compute
/// the reduction in each of those subdivided buckets and then combine
/// the results into a coarser bucket (one that represents a larger
/// fraction of the original input) and redo the reduction in another
/// layer. Effectively, we'll construct a min / max heap out of a series
/// of reduce operators (each one is a separate layer).
#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq, Ord, PartialOrd)]
pub struct BucketedPlan {
    /// All of the aggregations we were asked to compute.
    pub aggr_funcs: Vec<AggregateFunc>,
    /// Set of "skips" or calls to `nth()` an iterator needs to do over
    /// the input to extract the relevant datums.
    pub skips: Vec<usize>,
    /// The number of buckets in each layer of the reduction tree. Should
    /// be decreasing, and ideally, a power of two so that we can easily
    /// distribute values to buckets with `value.hashed() % buckets[layer]`.
    pub buckets: Vec<u64>,
}

impl BucketedPlan {
    /// Convert to a monotonic plan, indicate whether the operator must apply
    /// consolidation to its input.
    fn into_monotonic(self, must_consolidate: bool) -> MonotonicPlan {
        MonotonicPlan {
            aggr_funcs: self.aggr_funcs,
            skips: self.skips,
            must_consolidate,
        }
    }
}

impl RustType<ProtoBucketedPlan> for BucketedPlan {
    fn into_proto(&self) -> ProtoBucketedPlan {
        ProtoBucketedPlan {
            aggr_funcs: self.aggr_funcs.into_proto(),
            skips: self.skips.into_proto(),
            buckets: self.buckets.clone(),
        }
    }

    fn from_proto(proto: ProtoBucketedPlan) -> Result<Self, TryFromProtoError> {
        Ok(Self {
            aggr_funcs: proto.aggr_funcs.into_rust()?,
            skips: proto.skips.into_rust()?,
            buckets: proto.buckets,
        })
    }
}

/// Plan for computing a set of basic aggregations.
///
/// There's much less complexity when rendering basic aggregations.
/// Each aggregation corresponds to one Differential reduce operator.
/// That's it. However, we still want to present one final arrangement
/// so basic aggregations present results with the same interface
/// (one arrangement containing a row with all results) that accumulable
/// and hierarchical aggregations do. To provide that, we render an
/// additional reduce operator whenever we have multiple reduce aggregates
/// to combine and present results in the appropriate order. If we
/// were only asked to compute a single aggregation, we can skip
/// that step and return the arrangement provided by computing the aggregation
/// directly.
#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq, Ord, PartialOrd)]
pub enum BasicPlan {
    /// Plan for rendering a single basic aggregation.
    Single(SingleBasicPlan),
    /// Plan for rendering multiple basic aggregations.
    /// These need to then be collated together in an additional
    /// reduction. Each element represents the:
    /// `(index of the set of the input we are aggregating over,
    ///   the aggregation function)`
    Multiple(Vec<(usize, AggregateExpr)>),
}

/// Plan for rendering a single basic aggregation, with possibly fusing a `FlatMap UnnestList` with
/// this aggregation.
#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq, Ord, PartialOrd)]
pub struct SingleBasicPlan {
    /// The index in the set of inputs that we are aggregating over.
    pub index: usize,
    /// The aggregation that we should perform.
    pub expr: AggregateExpr,
    /// Whether we fused a `FlatMap UnnestList` with this aggregation.
    pub fused_unnest_list: bool,
}

impl RustType<proto_basic_plan::ProtoSimpleSingleBasicPlan> for (usize, AggregateExpr) {
    fn into_proto(&self) -> proto_basic_plan::ProtoSimpleSingleBasicPlan {
        proto_basic_plan::ProtoSimpleSingleBasicPlan {
            index: self.0.into_proto(),
            expr: Some(self.1.into_proto()),
        }
    }

    fn from_proto(
        proto: proto_basic_plan::ProtoSimpleSingleBasicPlan,
    ) -> Result<Self, TryFromProtoError> {
        Ok((
            proto.index.into_rust()?,
            proto
                .expr
                .into_rust_if_some("ProtoSimpleSingleBasicPlan::expr")?,
        ))
    }
}

impl RustType<proto_basic_plan::ProtoSingleBasicPlan> for SingleBasicPlan {
    fn into_proto(&self) -> proto_basic_plan::ProtoSingleBasicPlan {
        proto_basic_plan::ProtoSingleBasicPlan {
            index: self.index.into_proto(),
            expr: Some(self.expr.into_proto()),
            fused_unnest_list: self.fused_unnest_list.into_proto(),
        }
    }

    fn from_proto(
        proto: proto_basic_plan::ProtoSingleBasicPlan,
    ) -> Result<Self, TryFromProtoError> {
        Ok(SingleBasicPlan {
            index: proto.index.into_rust()?,
            expr: proto.expr.into_rust_if_some("ProtoSingleBasicPlan::expr")?,
            fused_unnest_list: proto.fused_unnest_list.into_rust()?,
        })
    }
}

impl RustType<ProtoBasicPlan> for BasicPlan {
    fn into_proto(&self) -> ProtoBasicPlan {
        use proto_basic_plan::*;

        ProtoBasicPlan {
            kind: Some(match self {
                BasicPlan::Single(plan) => Kind::Single(plan.into_proto()),
                BasicPlan::Multiple(aggrs) => Kind::Multiple(ProtoMultipleBasicPlan {
                    aggrs: aggrs.into_proto(),
                }),
            }),
        }
    }

    fn from_proto(proto: ProtoBasicPlan) -> Result<Self, TryFromProtoError> {
        use proto_basic_plan::Kind;
        let kind = proto
            .kind
            .ok_or_else(|| TryFromProtoError::missing_field("ProtoBasicPlan::kind"))?;

        Ok(match kind {
            Kind::Single(plan) => BasicPlan::Single(plan.into_rust()?),
            Kind::Multiple(x) => BasicPlan::Multiple(x.aggrs.into_rust()?),
        })
    }
}

/// Plan for collating the results of computing multiple aggregation
/// types.
///
/// TODO: could we express this as a delta join
#[derive(Clone, Debug, Default, Serialize, Deserialize, Eq, PartialEq, Ord, PartialOrd)]
pub struct CollationPlan {
    /// Accumulable aggregation results to collate, if any.
    pub accumulable: Option<AccumulablePlan>,
    /// Hierarchical aggregation results to collate, if any.
    pub hierarchical: Option<HierarchicalPlan>,
    /// Basic aggregation results to collate, if any.
    pub basic: Option<BasicPlan>,
    /// When we get results back from each of the different
    /// aggregation types, they will be subsequences of
    /// the sequence aggregations in the original reduce expression.
    /// We keep a map from output position -> reduction type
    /// to easily merge results back into the requested order.
    pub aggregate_types: Vec<ReductionType>,
}

impl CollationPlan {
    /// Upgrades the hierarchical component of the collation plan to monotonic, if necessary,
    /// and sets consolidation requirements.
    pub fn as_monotonic(&mut self, must_consolidate: bool) {
        self.hierarchical
            .as_mut()
            .map(|plan| plan.as_monotonic(must_consolidate));
    }
}

impl RustType<ProtoCollationPlan> for CollationPlan {
    fn into_proto(&self) -> ProtoCollationPlan {
        ProtoCollationPlan {
            accumulable: self.accumulable.into_proto(),
            hierarchical: self.hierarchical.into_proto(),
            basic: self.basic.into_proto(),
            aggregate_types: self.aggregate_types.into_proto(),
        }
    }

    fn from_proto(proto: ProtoCollationPlan) -> Result<Self, TryFromProtoError> {
        Ok(Self {
            accumulable: proto.accumulable.into_rust()?,
            hierarchical: proto.hierarchical.into_rust()?,
            basic: proto.basic.into_rust()?,
            aggregate_types: proto.aggregate_types.into_rust()?,
        })
    }
}

impl ReducePlan {
    /// Generate a plan for computing the supplied aggregations.
    ///
    /// The resulting plan summarizes what the dataflow to be created
    /// and how the aggregations will be executed.
    pub fn create_from(
        aggregates: Vec<AggregateExpr>,
        monotonic: bool,
        expected_group_size: Option<u64>,
        fused_unnest_list: bool,
    ) -> Self {
        // If we don't have any aggregations we are just computing a distinct.
        if aggregates.is_empty() {
            return ReducePlan::Distinct;
        }

        // Otherwise, we need to group aggregations according to their
        // reduction type (accumulable, hierarchical, or basic)
        let mut reduction_types = BTreeMap::new();
        // We need to make sure that each list of aggregates by type forms
        // a subsequence of the overall sequence of aggregates.
        for index in 0..aggregates.len() {
            let typ = reduction_type(&aggregates[index].func);
            let aggregates_list = reduction_types.entry(typ).or_insert_with(Vec::new);
            aggregates_list.push((index, aggregates[index].clone()));
        }

        // Convert each grouped list of reductions into a plan.
        let plan: Vec<_> = reduction_types
            .into_iter()
            .map(|(typ, aggregates_list)| {
                ReducePlan::create_inner(
                    typ,
                    aggregates_list,
                    monotonic,
                    expected_group_size,
                    fused_unnest_list,
                )
            })
            .collect();

        // If we only have a single type of aggregation present we can
        // render that directly
        if plan.len() == 1 {
            return plan[0].clone();
        }

        // Otherwise, we have to stitch reductions together.

        // First, lets sanity check that we don't have an impossible number
        // of reduction types.
        assert!(plan.len() <= 3);

        let mut collation: CollationPlan = Default::default();

        // Construct a mapping from output_position -> reduction that we can
        // use to reconstruct the output in the correct order.
        let aggregate_types = aggregates
            .iter()
            .map(|a| reduction_type(&a.func))
            .collect::<Vec<_>>();

        collation.aggregate_types = aggregate_types;

        for expr in plan.into_iter() {
            match expr {
                ReducePlan::Accumulable(e) => {
                    assert_none!(collation.accumulable);
                    collation.accumulable = Some(e);
                }
                ReducePlan::Hierarchical(e) => {
                    assert_none!(collation.hierarchical);
                    collation.hierarchical = Some(e);
                }
                ReducePlan::Basic(e) => {
                    assert_none!(collation.basic);
                    collation.basic = Some(e);
                }
                ReducePlan::Distinct | ReducePlan::Collation(_) => {
                    panic!("Inner reduce plan was unsupported type!")
                }
            }
        }

        ReducePlan::Collation(collation)
    }

    /// Generate a plan for computing the specified type of aggregations.
    ///
    /// This function assumes that all of the supplied aggregates are
    /// actually of the correct reduction type, and are a subsequence
    /// of the total list of requested aggregations.
    fn create_inner(
        typ: ReductionType,
        aggregates_list: Vec<(usize, AggregateExpr)>,
        monotonic: bool,
        expected_group_size: Option<u64>,
        fused_unnest_list: bool,
    ) -> Self {
        if fused_unnest_list {
            assert!(matches!(typ, ReductionType::Basic) && aggregates_list.len() == 1);
        }
        assert!(
            aggregates_list.len() > 0,
            "error: tried to render a reduce dataflow with no aggregates"
        );
        match typ {
            ReductionType::Accumulable => {
                let mut simple_aggrs = vec![];
                let mut distinct_aggrs = vec![];
                let full_aggrs: Vec<_> = aggregates_list
                    .iter()
                    .cloned()
                    .map(|(_, aggr)| aggr)
                    .collect();
                for (accumulable_index, (datum_index, aggr)) in
                    aggregates_list.into_iter().enumerate()
                {
                    // Accumulable aggregations need to do extra per-aggregate work
                    // for aggregations with the distinct bit set, so we'll separate
                    // those out now.
                    if aggr.distinct {
                        distinct_aggrs.push((accumulable_index, datum_index, aggr));
                    } else {
                        simple_aggrs.push((accumulable_index, datum_index, aggr));
                    };
                }
                ReducePlan::Accumulable(AccumulablePlan {
                    full_aggrs,
                    simple_aggrs,
                    distinct_aggrs,
                })
            }
            ReductionType::Hierarchical => {
                let aggr_funcs: Vec<_> = aggregates_list
                    .iter()
                    .cloned()
                    .map(|(_, aggr)| aggr.func)
                    .collect();
                let indexes: Vec<_> = aggregates_list
                    .into_iter()
                    .map(|(index, _)| index)
                    .collect();

                // We don't have random access over Rows so we can simplify the
                // task of grabbing the inputs we are aggregating over by
                // generating a list of "skips" an iterator over the Row needs
                // to do to get the desired indexes.
                let skips = convert_indexes_to_skips(indexes);
                if monotonic {
                    let monotonic = MonotonicPlan {
                        aggr_funcs,
                        skips,
                        must_consolidate: false,
                    };
                    ReducePlan::Hierarchical(HierarchicalPlan::Monotonic(monotonic))
                } else {
                    let buckets = bucketing_of_expected_group_size(expected_group_size);
                    let bucketed = BucketedPlan {
                        aggr_funcs,
                        skips,
                        buckets,
                    };

                    ReducePlan::Hierarchical(HierarchicalPlan::Bucketed(bucketed))
                }
            }
            ReductionType::Basic => {
                if aggregates_list.len() == 1 {
                    ReducePlan::Basic(BasicPlan::Single(SingleBasicPlan {
                        index: aggregates_list[0].0,
                        expr: aggregates_list[0].1.clone(),
                        fused_unnest_list,
                    }))
                } else {
                    ReducePlan::Basic(BasicPlan::Multiple(aggregates_list))
                }
            }
        }
    }

    /// Reports all keys of produced arrangements.
    ///
    /// This is likely either an empty vector, for no arrangement,
    /// or a singleton vector containing the list of expressions
    /// that key a single arrangement.
    pub fn keys(&self, key_arity: usize, arity: usize) -> AvailableCollections {
        let key = (0..key_arity)
            .map(MirScalarExpr::Column)
            .collect::<Vec<_>>();
        let (permutation, thinning) = permutation_for_arrangement(&key, arity);
        AvailableCollections::new_arranged(vec![(key, permutation, thinning)], None)
    }

    /// Extracts a fusable MFP for the reduction from the given `mfp` along with a residual
    /// non-fusable MFP and potentially revised output arity. The provided `mfp` must be the
    /// one sitting on top of the reduction.
    ///
    /// Non-fusable parts include temporal predicates or any other parts that cannot be
    /// conservatively asserted to not increase the memory requirements of the output
    /// arrangement for the reduction. Either the fusable or non-fusable parts may end up
    /// being the identity MFP.
    pub fn extract_mfp_after(
        &self,
        mut mfp: MapFilterProject,
        key_arity: usize,
    ) -> (MapFilterProject, MapFilterProject, usize) {
        // Extract temporal predicates, as we cannot push them into `Reduce`.
        let temporal_mfp = mfp.extract_temporal();
        let non_temporal = mfp;
        mfp = temporal_mfp;

        // We ensure we do not attempt to project away the key, as we cannot accomplish
        // this. This is done by a simple analysis of the non-temporal part of `mfp` to
        // check if can be directly absorbed; if it can't, we then default to a general
        // strategy that unpacks the MFP to absorb only the filter and supporting map
        // parts, followed by a post-MFP step.
        let input_arity = non_temporal.input_arity;
        let key = Vec::from_iter(0..key_arity);
        let mut mfp_push;
        let output_arity;

        if non_temporal.projection.len() <= input_arity
            && non_temporal.projection.iter().all(|c| *c < input_arity)
            && non_temporal.projection.starts_with(&key)
        {
            // Special case: The key is preserved as a prefix and the projection is only
            // of output fields from the reduction. So we know that: (a) We can process the
            // fused MFP per-key; (b) The MFP application gets rid of all mapped columns;
            // and (c) The output projection is at most as wide as the output that would be
            // produced by the reduction, so we are sure to never regress the memory
            // requirements of the output arrangement.
            // Note that this strategy may change the arity of the output arrangement.
            output_arity = non_temporal.projection.len();
            mfp_push = non_temporal;
        } else {
            // General strategy: Unpack MFP as MF followed by P' that removes all M
            // columns, then MP afterwards.
            // Note that this strategy does not result in any changes to the arity of
            // the output arrangement.
            let (m, f, p) = non_temporal.into_map_filter_project();
            mfp_push = MapFilterProject::new(input_arity)
                .map(m.clone())
                .filter(f)
                .project(0..input_arity);
            output_arity = input_arity;

            // We still need to perform the map and projection for the actual output.
            let mfp_left = MapFilterProject::new(input_arity).map(m).project(p);

            // Compose the non-pushed MFP components.
            mfp = MapFilterProject::compose(mfp_left, mfp);
        }
        mfp_push.optimize();
        mfp.optimize();
        (mfp_push, mfp, output_arity)
    }
}

/// Plan for extracting keys and values in preparation for a reduction.
#[derive(Arbitrary, Clone, Debug, Serialize, Deserialize, Eq, PartialEq, Ord, PartialOrd)]
pub struct KeyValPlan {
    /// Extracts the columns used as the key.
    pub key_plan: mz_expr::SafeMfpPlan,
    /// Extracts the columns used to feed the aggregations.
    pub val_plan: mz_expr::SafeMfpPlan,
}

impl RustType<ProtoKeyValPlan> for KeyValPlan {
    fn into_proto(&self) -> ProtoKeyValPlan {
        ProtoKeyValPlan {
            key_plan: Some(self.key_plan.into_proto()),
            val_plan: Some(self.val_plan.into_proto()),
        }
    }

    fn from_proto(proto: ProtoKeyValPlan) -> Result<Self, TryFromProtoError> {
        Ok(Self {
            key_plan: proto
                .key_plan
                .into_rust_if_some("ProtoKeyValPlan::key_plan")?,
            val_plan: proto
                .val_plan
                .into_rust_if_some("ProtoKeyValPlan::val_plan")?,
        })
    }
}

impl KeyValPlan {
    /// Create a new [KeyValPlan] from aggregation arguments.
    pub fn new(
        input_arity: usize,
        group_key: &[MirScalarExpr],
        aggregates: &[AggregateExpr],
        input_permutation_and_new_arity: Option<(Vec<usize>, usize)>,
    ) -> Self {
        // Form an operator for evaluating key expressions.
        let mut key_mfp = MapFilterProject::new(input_arity)
            .map(group_key.iter().cloned())
            .project(input_arity..(input_arity + group_key.len()));
        if let Some((input_permutation, new_arity)) = input_permutation_and_new_arity.clone() {
            key_mfp.permute_fn(|c| input_permutation[c], new_arity);
        }

        // Form an operator for evaluating value expressions.
        let mut val_mfp = MapFilterProject::new(input_arity)
            .map(aggregates.iter().map(|a| a.expr.clone()))
            .project(input_arity..(input_arity + aggregates.len()));
        if let Some((input_permutation, new_arity)) = input_permutation_and_new_arity {
            val_mfp.permute_fn(|c| input_permutation[c], new_arity);
        }

        key_mfp.optimize();
        let key_plan = key_mfp.into_plan().unwrap().into_nontemporal().unwrap();
        val_mfp.optimize();
        let val_plan = val_mfp.into_plan().unwrap().into_nontemporal().unwrap();

        Self { key_plan, val_plan }
    }

    /// The arity of the key plan
    pub fn key_arity(&self) -> usize {
        self.key_plan.projection.len()
    }
}

/// Transforms a vector containing indexes of needed columns into one containing
/// the "skips" an iterator over a Row would need to perform to see those values.
///
/// This function requires that all of the elements in `indexes` are strictly
/// increasing.
///
/// # Examples
///
/// ```
/// use mz_compute_types::plan::reduce::convert_indexes_to_skips;
/// assert_eq!(convert_indexes_to_skips(vec![3, 6, 10, 15]), [3, 2, 3, 4])
/// ```
pub fn convert_indexes_to_skips(mut indexes: Vec<usize>) -> Vec<usize> {
    for i in 1..indexes.len() {
        soft_assert_or_log!(
            indexes[i - 1] < indexes[i],
            "convert_indexes_to_skip needs indexes to be strictly increasing. Received: {:?}",
            indexes,
        );
    }

    for i in (1..indexes.len()).rev() {
        indexes[i] -= indexes[i - 1];
        indexes[i] -= 1;
    }

    indexes
}

/// Determines whether a function can be accumulated in an update's "difference" field,
/// and whether it can be subjected to recursive (hierarchical) aggregation.
///
/// Accumulable aggregations will be packed into differential dataflow's "difference" field,
/// which can be accumulated in-place using the addition operation on the type. Aggregations
/// that indicate they are accumulable will still need to provide an action that takes their
/// data and introduces it as a difference, and the post-processing when the accumulated value
/// is presented as data.
///
/// Hierarchical aggregations will be subjected to repeated aggregation on initially small but
/// increasingly large subsets of each key. This has the intended property that no invocation
/// is on a significantly large set of values (and so, no incremental update needs to reform
/// significant input data). Hierarchical aggregates can be rendered more efficiently if the
/// input stream is append-only as then we only need to retain the "currently winning" value.
/// Every hierarchical aggregate needs to supply a corresponding ReductionMonoid implementation.
pub fn reduction_type(func: &AggregateFunc) -> ReductionType {
    match func {
        AggregateFunc::SumInt16
        | AggregateFunc::SumInt32
        | AggregateFunc::SumInt64
        | AggregateFunc::SumUInt16
        | AggregateFunc::SumUInt32
        | AggregateFunc::SumUInt64
        | AggregateFunc::SumFloat32
        | AggregateFunc::SumFloat64
        | AggregateFunc::SumNumeric
        | AggregateFunc::Count
        | AggregateFunc::Any
        | AggregateFunc::All
        | AggregateFunc::Dummy => ReductionType::Accumulable,
        AggregateFunc::MaxNumeric
        | AggregateFunc::MaxInt16
        | AggregateFunc::MaxInt32
        | AggregateFunc::MaxInt64
        | AggregateFunc::MaxUInt16
        | AggregateFunc::MaxUInt32
        | AggregateFunc::MaxUInt64
        | AggregateFunc::MaxMzTimestamp
        | AggregateFunc::MaxFloat32
        | AggregateFunc::MaxFloat64
        | AggregateFunc::MaxBool
        | AggregateFunc::MaxString
        | AggregateFunc::MaxDate
        | AggregateFunc::MaxTimestamp
        | AggregateFunc::MaxTimestampTz
        | AggregateFunc::MaxInterval
        | AggregateFunc::MaxTime
        | AggregateFunc::MinNumeric
        | AggregateFunc::MinInt16
        | AggregateFunc::MinInt32
        | AggregateFunc::MinInt64
        | AggregateFunc::MinUInt16
        | AggregateFunc::MinUInt32
        | AggregateFunc::MinUInt64
        | AggregateFunc::MinMzTimestamp
        | AggregateFunc::MinInterval
        | AggregateFunc::MinFloat32
        | AggregateFunc::MinFloat64
        | AggregateFunc::MinBool
        | AggregateFunc::MinString
        | AggregateFunc::MinDate
        | AggregateFunc::MinTimestamp
        | AggregateFunc::MinTimestampTz
        | AggregateFunc::MinTime => ReductionType::Hierarchical,
        AggregateFunc::JsonbAgg { .. }
        | AggregateFunc::JsonbObjectAgg { .. }
        | AggregateFunc::MapAgg { .. }
        | AggregateFunc::ArrayConcat { .. }
        | AggregateFunc::ListConcat { .. }
        | AggregateFunc::StringAgg { .. }
        | AggregateFunc::RowNumber { .. }
        | AggregateFunc::Rank { .. }
        | AggregateFunc::DenseRank { .. }
        | AggregateFunc::LagLead { .. }
        | AggregateFunc::FirstValue { .. }
        | AggregateFunc::LastValue { .. }
        | AggregateFunc::WindowAggregate { .. }
        | AggregateFunc::FusedValueWindowFunc { .. }
        | AggregateFunc::FusedWindowAggregate { .. } => ReductionType::Basic,
    }
}

#[cfg(test)]
mod tests {
    use mz_ore::assert_ok;
    use mz_proto::protobuf_roundtrip;
    use proptest::prelude::*;

    use super::*;

    // This test causes stack overflows if not run with --release,
    // ignore by default.
    proptest! {
        #[mz_ore::test]
        #[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `decContextDefault` on OS `linux`
        fn reduce_plan_protobuf_roundtrip(expect in any::<ReducePlan>() ) {
            let actual = protobuf_roundtrip::<_, ProtoReducePlan>(&expect);
            assert_ok!(actual);
            assert_eq!(actual.unwrap(), expect);
        }
    }
}