1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861 862 863 864 865 866 867 868 869 870 871 872 873 874 875 876 877 878 879 880 881 882 883 884 885 886 887 888 889 890 891 892 893 894 895 896 897 898 899 900 901 902 903 904 905 906 907 908 909 910 911 912 913 914 915 916 917 918 919 920 921 922 923 924 925 926 927 928 929 930 931 932 933 934 935 936 937 938 939 940 941 942 943 944 945 946 947 948 949 950 951 952 953 954 955 956 957 958 959 960 961 962 963 964 965 966 967 968 969 970 971 972 973 974 975 976 977 978 979 980 981 982 983 984 985 986 987 988 989 990 991 992 993 994 995 996 997 998 999 1000 1001 1002 1003 1004 1005 1006 1007 1008 1009 1010 1011 1012 1013 1014 1015 1016 1017 1018 1019 1020 1021 1022 1023 1024 1025 1026 1027 1028 1029 1030 1031 1032 1033 1034 1035 1036 1037 1038 1039 1040 1041 1042 1043 1044 1045 1046 1047 1048 1049 1050 1051 1052 1053 1054 1055 1056 1057 1058 1059 1060 1061 1062 1063 1064 1065 1066 1067 1068 1069 1070 1071 1072 1073 1074 1075 1076 1077 1078
// Copyright Materialize, Inc. and contributors. All rights reserved.
//
// Use of this software is governed by the Business Source License
// included in the LICENSE file.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.
//! Reduction execution planning and dataflow construction.
//! We build `ReducePlan`s to manage the complexity of planning the generated dataflow for a
//! given reduce expression. The intent here is that each creating a `ReducePlan` should capture
//! all of the decision making about what kind of dataflow do we need to render and what each
//! operator needs to do, and then actually rendering the plan can be a relatively simple application
//! of (as much as possible) straight line code.
//!
//! Materialize needs to be able to maintain reductions incrementally (roughly, using
//! time proportional to the number of changes in the input) and ideally, with a
//! memory footprint proportional to the number of reductions being computed. We have to employ
//! several tricks to achieve that, and these tricks constitute most of the complexity involved
//! with planning and rendering reduce expressions. There's some additional complexity involved
//! in handling aggregations with `DISTINCT` correctly so that we can efficiently suppress
//! duplicate updates.
//!
//! In order to optimize the performance of our rendered dataflow, we divide all aggregations
//! into three distinct types. Each type gets rendered separately, with its own specialized plan
//! and dataflow. The three types are as follows:
//!
//! 1. Accumulable:
//! Accumulable reductions can be computed inline in a Differential update's `difference`
//! field because they basically boil down to tracking counts of things. `sum()` is an
//! example of an accumulable reduction, and when some element `x` is removed from the set
//! of elements being summed, we can introduce `-x` to incrementally maintain the sum. More
//! formally, accumulable reductions correspond to instances of commutative Abelian groups.
//! 2. Hierarchical:
//! Hierarchical reductions don't have a meaningful negation like accumulable reductions do, but
//! they are still commutative and associative, which lets us compute the reduction over subsets
//! of the input, and then compute the reduction again on those results. For example:
//! `min[2, 5, 1, 10]` is the same as `min[ min[2, 5], min[1, 10]]`. When we compute hierarchical
//! reductions this way, we can maintain the computation in sublinear time with respect to
//! the overall input. `min` and `max` are two examples of hierarchical reductions. More formally,
//! hierarchical reductions correspond to instances of semigroups, in that they are associative,
//! but in order to benefit from being computed hierarchically, they need to have some reduction
//! in data size as well. A function like "concat-everything-to-a-string" wouldn't benefit from
//! hierarchical evaluation.
//!
//! When the input is append-only, or monotonic, reductions that would otherwise have to be computed
//! hierarchically can instead be computed in-place, because we only need to keep the value that's
//! better than the "best" (minimal or maximal for min and max) seen so far.
//! 3. Basic:
//! Basic reductions are a bit like the Hufflepuffs of this trifecta. They are neither accumulable nor
//! hierarchical (most likely they are associative but don't involve any data reduction) and so for these
//! we can't do much more than just defer to Differential's reduce operator and eat a large maintenance cost.
//!
//! When we render these reductions we want to limit the number of arrangements we produce. When we build a
//! dataflow for a reduction containing multiple types of reductions, we have no choice but to divide up the
//! requested aggregations by type, render each type separately and then take those results and collate them
//! back in the requested output order. However, if we only need to perform aggregations of a single reduction
//! type, we can specialize and render the dataflow to compute those aggregations in the correct order, and
//! return the output arrangement directly and avoid the extra collation arrangement.
use std::collections::BTreeMap;
use mz_expr::{
permutation_for_arrangement, AggregateExpr, AggregateFunc, MapFilterProject, MirScalarExpr,
};
use mz_ore::{assert_none, soft_assert_or_log};
use mz_proto::{IntoRustIfSome, ProtoType, RustType, TryFromProtoError};
use proptest::prelude::{any, Arbitrary, BoxedStrategy};
use proptest::strategy::Strategy;
use proptest_derive::Arbitrary;
use serde::{Deserialize, Serialize};
use crate::plan::{bucketing_of_expected_group_size, AvailableCollections};
include!(concat!(env!("OUT_DIR"), "/mz_compute_types.plan.reduce.rs"));
/// This enum represents the three potential types of aggregations.
#[derive(Copy, Clone, Debug, Deserialize, Eq, Hash, Ord, PartialEq, PartialOrd, Serialize)]
pub enum ReductionType {
/// Accumulable functions can be subtracted from (are invertible), and associative.
/// We can compute these results by moving some data to the diff field under arbitrary
/// changes to inputs. Examples include sum or count.
Accumulable,
/// Hierarchical functions are associative, which means we can split up the work of
/// computing them across subsets. Note that hierarchical reductions should also
/// reduce the data in some way, as otherwise rendering them hierarchically is not
/// worth it. Examples include min or max.
Hierarchical,
/// Basic, for lack of a better word, are functions that are neither accumulable
/// nor hierarchical. Examples include jsonb_agg.
Basic,
}
impl columnation::Columnation for ReductionType {
type InnerRegion = columnation::CopyRegion<ReductionType>;
}
impl RustType<ProtoReductionType> for ReductionType {
fn into_proto(&self) -> ProtoReductionType {
use proto_reduction_type::Kind;
ProtoReductionType {
kind: Some(match self {
ReductionType::Accumulable => Kind::Accumulable(()),
ReductionType::Hierarchical => Kind::Hierarchical(()),
ReductionType::Basic => Kind::Basic(()),
}),
}
}
fn from_proto(proto: ProtoReductionType) -> Result<Self, TryFromProtoError> {
use proto_reduction_type::Kind;
let kind = proto
.kind
.ok_or_else(|| TryFromProtoError::missing_field("kind"))?;
Ok(match kind {
Kind::Accumulable(()) => ReductionType::Accumulable,
Kind::Hierarchical(()) => ReductionType::Hierarchical,
Kind::Basic(()) => ReductionType::Basic,
})
}
}
impl TryFrom<&ReducePlan> for ReductionType {
type Error = ();
fn try_from(plan: &ReducePlan) -> Result<Self, Self::Error> {
match plan {
ReducePlan::Hierarchical(_) => Ok(ReductionType::Hierarchical),
ReducePlan::Accumulable(_) => Ok(ReductionType::Accumulable),
ReducePlan::Basic(_) => Ok(ReductionType::Basic),
_ => Err(()),
}
}
}
/// A `ReducePlan` provides a concise description for how we will
/// execute a given reduce expression.
///
/// The provided reduce expression can have no
/// aggregations, in which case its just a `Distinct` and otherwise
/// it's composed of a combination of accumulable, hierarchical and
/// basic aggregations.
///
/// We want to try to centralize as much decision making about the
/// shape / general computation of the rendered dataflow graph
/// in this plan, and then make actually rendering the graph
/// be as simple (and compiler verifiable) as possible.
#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq, Ord, PartialOrd)]
pub enum ReducePlan {
/// Plan for not computing any aggregations, just determining the set of
/// distinct keys.
Distinct,
/// Plan for computing only accumulable aggregations.
Accumulable(AccumulablePlan),
/// Plan for computing only hierarchical aggregations.
Hierarchical(HierarchicalPlan),
/// Plan for computing only basic aggregations.
Basic(BasicPlan),
/// Plan for computing a mix of different kinds of aggregations.
/// We need to do extra work here to reassemble results back in the
/// requested order.
Collation(CollationPlan),
}
proptest::prop_compose! {
/// `expected_group_size` is a u64, but instead of a uniform distribution,
/// we want a logarithmic distribution so that we have an even distribution
/// in the number of layers of buckets that a hierarchical plan would have.
fn any_group_size()
(bits in 0..u64::BITS)
(integer in (((1_u64) << bits) - 1)
..(if bits == (u64::BITS - 1){ u64::MAX }
else { (1_u64) << (bits + 1) - 1 }))
-> u64 {
integer
}
}
/// To avoid stack overflow, this limits the arbitrarily-generated test
/// `ReducePlan`s to involve at most 8 aggregations.
///
/// To have better coverage of realistic expected group sizes, the
/// `expected group size` has a logarithmic distribution.
impl Arbitrary for ReducePlan {
type Parameters = ();
type Strategy = BoxedStrategy<Self>;
fn arbitrary_with(_: Self::Parameters) -> Self::Strategy {
(
proptest::collection::vec(any::<AggregateExpr>(), 0..8),
any::<bool>(),
any::<bool>(),
any_group_size(),
any::<bool>(),
)
.prop_map(
|(
exprs,
monotonic,
any_expected_size,
expected_group_size,
mut fused_unnest_list,
)| {
let expected_group_size = if any_expected_size {
Some(expected_group_size)
} else {
None
};
if !(exprs.len() == 1
&& matches!(reduction_type(&exprs[0].func), ReductionType::Basic))
{
fused_unnest_list = false;
}
ReducePlan::create_from(
exprs,
monotonic,
expected_group_size,
fused_unnest_list,
)
},
)
.boxed()
}
}
impl RustType<ProtoReducePlan> for ReducePlan {
fn into_proto(&self) -> ProtoReducePlan {
use proto_reduce_plan::Kind::*;
ProtoReducePlan {
kind: Some(match self {
ReducePlan::Distinct => Distinct(()),
ReducePlan::Accumulable(plan) => Accumulable(plan.into_proto()),
ReducePlan::Hierarchical(plan) => Hierarchical(plan.into_proto()),
ReducePlan::Basic(plan) => Basic(plan.into_proto()),
ReducePlan::Collation(plan) => Collation(plan.into_proto()),
}),
}
}
fn from_proto(proto: ProtoReducePlan) -> Result<Self, TryFromProtoError> {
use proto_reduce_plan::Kind::*;
let kind = proto
.kind
.ok_or_else(|| TryFromProtoError::missing_field("ProtoReducePlan::kind"))?;
Ok(match kind {
Distinct(()) => ReducePlan::Distinct,
Accumulable(plan) => ReducePlan::Accumulable(plan.into_rust()?),
Hierarchical(plan) => ReducePlan::Hierarchical(plan.into_rust()?),
Basic(plan) => ReducePlan::Basic(plan.into_rust()?),
Collation(plan) => ReducePlan::Collation(plan.into_rust()?),
})
}
}
/// Plan for computing a set of accumulable aggregations.
///
/// We fuse all of the accumulable aggregations together
/// and compute them with one dataflow fragment. We need to
/// be careful to separate out the aggregations that
/// apply only to the distinct set of values. We need
/// to apply a distinct operator to those before we
/// combine them with everything else.
#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq, Ord, PartialOrd)]
pub struct AccumulablePlan {
/// All of the aggregations we were asked to compute, stored
/// in order.
pub full_aggrs: Vec<AggregateExpr>,
/// All of the non-distinct accumulable aggregates.
/// Each element represents:
/// (index of the aggregation among accumulable aggregations,
/// index of the datum among inputs, aggregation expr)
/// These will all be rendered together in one dataflow fragment.
pub simple_aggrs: Vec<(usize, usize, AggregateExpr)>,
/// Same as above but for all of the `DISTINCT` accumulable aggregations.
pub distinct_aggrs: Vec<(usize, usize, AggregateExpr)>,
}
impl RustType<proto_accumulable_plan::ProtoAggr> for (usize, usize, AggregateExpr) {
fn into_proto(&self) -> proto_accumulable_plan::ProtoAggr {
proto_accumulable_plan::ProtoAggr {
index_agg: self.0.into_proto(),
index_inp: self.1.into_proto(),
expr: Some(self.2.into_proto()),
}
}
fn from_proto(proto: proto_accumulable_plan::ProtoAggr) -> Result<Self, TryFromProtoError> {
Ok((
proto.index_agg.into_rust()?,
proto.index_inp.into_rust()?,
proto.expr.into_rust_if_some("ProtoAggr::expr")?,
))
}
}
impl RustType<ProtoAccumulablePlan> for AccumulablePlan {
fn into_proto(&self) -> ProtoAccumulablePlan {
ProtoAccumulablePlan {
full_aggrs: self.full_aggrs.into_proto(),
simple_aggrs: self.simple_aggrs.into_proto(),
distinct_aggrs: self.distinct_aggrs.into_proto(),
}
}
fn from_proto(proto: ProtoAccumulablePlan) -> Result<Self, TryFromProtoError> {
Ok(Self {
full_aggrs: proto.full_aggrs.into_rust()?,
simple_aggrs: proto.simple_aggrs.into_rust()?,
distinct_aggrs: proto.distinct_aggrs.into_rust()?,
})
}
}
/// Plan for computing a set of hierarchical aggregations.
///
/// In the append-only setting we can render them in-place
/// with monotonic plans, but otherwise, we need to render
/// them with a reduction tree that splits the inputs into
/// small, and then progressively larger, buckets
#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq, Ord, PartialOrd)]
pub enum HierarchicalPlan {
/// Plan hierarchical aggregations under monotonic inputs.
Monotonic(MonotonicPlan),
/// Plan for hierarchical aggregations under non-monotonic inputs.
Bucketed(BucketedPlan),
}
impl HierarchicalPlan {
/// Upgrades from a bucketed plan to a monotonic plan, if necessary,
/// and sets consolidation requirements.
pub fn as_monotonic(&mut self, must_consolidate: bool) {
match self {
HierarchicalPlan::Bucketed(bucketed) => {
// TODO: ideally we would not have the `clone()` but ownership
// seems fraught here as we are behind a `&mut self` reference.
*self =
HierarchicalPlan::Monotonic(bucketed.clone().into_monotonic(must_consolidate));
}
HierarchicalPlan::Monotonic(monotonic) => {
monotonic.must_consolidate = must_consolidate;
}
}
}
}
impl RustType<ProtoHierarchicalPlan> for HierarchicalPlan {
fn into_proto(&self) -> ProtoHierarchicalPlan {
use proto_hierarchical_plan::Kind;
ProtoHierarchicalPlan {
kind: Some(match self {
HierarchicalPlan::Monotonic(plan) => Kind::Monotonic(plan.into_proto()),
HierarchicalPlan::Bucketed(plan) => Kind::Bucketed(plan.into_proto()),
}),
}
}
fn from_proto(proto: ProtoHierarchicalPlan) -> Result<Self, TryFromProtoError> {
use proto_hierarchical_plan::Kind;
let kind = proto
.kind
.ok_or_else(|| TryFromProtoError::missing_field("ProtoHierarchicalPlan::Kind"))?;
Ok(match kind {
Kind::Monotonic(plan) => HierarchicalPlan::Monotonic(plan.into_rust()?),
Kind::Bucketed(plan) => HierarchicalPlan::Bucketed(plan.into_rust()?),
})
}
}
/// Plan for computing a set of hierarchical aggregations with a
/// monotonic input.
///
/// Here, the aggregations will be rendered in place. We don't
/// need to worry about retractions because the inputs are
/// append only, so we can change our computation to
/// only retain the "best" value in the diff field, instead
/// of holding onto all values.
#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq, Ord, PartialOrd)]
pub struct MonotonicPlan {
/// All of the aggregations we were asked to compute.
pub aggr_funcs: Vec<AggregateFunc>,
/// Set of "skips" or calls to `nth()` an iterator needs to do over
/// the input to extract the relevant datums.
pub skips: Vec<usize>,
/// True if the input is not physically monotonic, and the operator must perform
/// consolidation to remove potential negations. The operator implementation is
/// free to consolidate as late as possible while ensuring correctness, so it is
/// not a requirement that the input be directly subjected to consolidation.
/// More details in the monotonic one-shot `SELECT`s design doc.[^1]
///
/// [^1] <https://github.com/MaterializeInc/materialize/blob/main/doc/developer/design/20230421_stabilize_monotonic_select.md>
pub must_consolidate: bool,
}
impl RustType<ProtoMonotonicPlan> for MonotonicPlan {
fn into_proto(&self) -> ProtoMonotonicPlan {
ProtoMonotonicPlan {
aggr_funcs: self.aggr_funcs.into_proto(),
skips: self.skips.into_proto(),
must_consolidate: self.must_consolidate.into_proto(),
}
}
fn from_proto(proto: ProtoMonotonicPlan) -> Result<Self, TryFromProtoError> {
Ok(Self {
aggr_funcs: proto.aggr_funcs.into_rust()?,
skips: proto.skips.into_rust()?,
must_consolidate: proto.must_consolidate.into_rust()?,
})
}
}
/// Plan for computing a set of hierarchical aggregations
/// with non-monotonic inputs.
///
/// To perform hierarchical aggregations with stable runtimes
/// under updates we'll subdivide the group key into buckets, compute
/// the reduction in each of those subdivided buckets and then combine
/// the results into a coarser bucket (one that represents a larger
/// fraction of the original input) and redo the reduction in another
/// layer. Effectively, we'll construct a min / max heap out of a series
/// of reduce operators (each one is a separate layer).
#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq, Ord, PartialOrd)]
pub struct BucketedPlan {
/// All of the aggregations we were asked to compute.
pub aggr_funcs: Vec<AggregateFunc>,
/// Set of "skips" or calls to `nth()` an iterator needs to do over
/// the input to extract the relevant datums.
pub skips: Vec<usize>,
/// The number of buckets in each layer of the reduction tree. Should
/// be decreasing, and ideally, a power of two so that we can easily
/// distribute values to buckets with `value.hashed() % buckets[layer]`.
pub buckets: Vec<u64>,
}
impl BucketedPlan {
/// Convert to a monotonic plan, indicate whether the operator must apply
/// consolidation to its input.
fn into_monotonic(self, must_consolidate: bool) -> MonotonicPlan {
MonotonicPlan {
aggr_funcs: self.aggr_funcs,
skips: self.skips,
must_consolidate,
}
}
}
impl RustType<ProtoBucketedPlan> for BucketedPlan {
fn into_proto(&self) -> ProtoBucketedPlan {
ProtoBucketedPlan {
aggr_funcs: self.aggr_funcs.into_proto(),
skips: self.skips.into_proto(),
buckets: self.buckets.clone(),
}
}
fn from_proto(proto: ProtoBucketedPlan) -> Result<Self, TryFromProtoError> {
Ok(Self {
aggr_funcs: proto.aggr_funcs.into_rust()?,
skips: proto.skips.into_rust()?,
buckets: proto.buckets,
})
}
}
/// Plan for computing a set of basic aggregations.
///
/// There's much less complexity when rendering basic aggregations.
/// Each aggregation corresponds to one Differential reduce operator.
/// That's it. However, we still want to present one final arrangement
/// so basic aggregations present results with the same interface
/// (one arrangement containing a row with all results) that accumulable
/// and hierarchical aggregations do. To provide that, we render an
/// additional reduce operator whenever we have multiple reduce aggregates
/// to combine and present results in the appropriate order. If we
/// were only asked to compute a single aggregation, we can skip
/// that step and return the arrangement provided by computing the aggregation
/// directly.
#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq, Ord, PartialOrd)]
pub enum BasicPlan {
/// Plan for rendering a single basic aggregation.
Single(SingleBasicPlan),
/// Plan for rendering multiple basic aggregations.
/// These need to then be collated together in an additional
/// reduction. Each element represents the:
/// `(index of the set of the input we are aggregating over,
/// the aggregation function)`
Multiple(Vec<(usize, AggregateExpr)>),
}
/// Plan for rendering a single basic aggregation, with possibly fusing a `FlatMap UnnestList` with
/// this aggregation.
#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq, Ord, PartialOrd)]
pub struct SingleBasicPlan {
/// The index in the set of inputs that we are aggregating over.
pub index: usize,
/// The aggregation that we should perform.
pub expr: AggregateExpr,
/// Whether we fused a `FlatMap UnnestList` with this aggregation.
pub fused_unnest_list: bool,
}
impl RustType<proto_basic_plan::ProtoSimpleSingleBasicPlan> for (usize, AggregateExpr) {
fn into_proto(&self) -> proto_basic_plan::ProtoSimpleSingleBasicPlan {
proto_basic_plan::ProtoSimpleSingleBasicPlan {
index: self.0.into_proto(),
expr: Some(self.1.into_proto()),
}
}
fn from_proto(
proto: proto_basic_plan::ProtoSimpleSingleBasicPlan,
) -> Result<Self, TryFromProtoError> {
Ok((
proto.index.into_rust()?,
proto
.expr
.into_rust_if_some("ProtoSimpleSingleBasicPlan::expr")?,
))
}
}
impl RustType<proto_basic_plan::ProtoSingleBasicPlan> for SingleBasicPlan {
fn into_proto(&self) -> proto_basic_plan::ProtoSingleBasicPlan {
proto_basic_plan::ProtoSingleBasicPlan {
index: self.index.into_proto(),
expr: Some(self.expr.into_proto()),
fused_unnest_list: self.fused_unnest_list.into_proto(),
}
}
fn from_proto(
proto: proto_basic_plan::ProtoSingleBasicPlan,
) -> Result<Self, TryFromProtoError> {
Ok(SingleBasicPlan {
index: proto.index.into_rust()?,
expr: proto.expr.into_rust_if_some("ProtoSingleBasicPlan::expr")?,
fused_unnest_list: proto.fused_unnest_list.into_rust()?,
})
}
}
impl RustType<ProtoBasicPlan> for BasicPlan {
fn into_proto(&self) -> ProtoBasicPlan {
use proto_basic_plan::*;
ProtoBasicPlan {
kind: Some(match self {
BasicPlan::Single(plan) => Kind::Single(plan.into_proto()),
BasicPlan::Multiple(aggrs) => Kind::Multiple(ProtoMultipleBasicPlan {
aggrs: aggrs.into_proto(),
}),
}),
}
}
fn from_proto(proto: ProtoBasicPlan) -> Result<Self, TryFromProtoError> {
use proto_basic_plan::Kind;
let kind = proto
.kind
.ok_or_else(|| TryFromProtoError::missing_field("ProtoBasicPlan::kind"))?;
Ok(match kind {
Kind::Single(plan) => BasicPlan::Single(plan.into_rust()?),
Kind::Multiple(x) => BasicPlan::Multiple(x.aggrs.into_rust()?),
})
}
}
/// Plan for collating the results of computing multiple aggregation
/// types.
///
/// TODO: could we express this as a delta join
#[derive(Clone, Debug, Default, Serialize, Deserialize, Eq, PartialEq, Ord, PartialOrd)]
pub struct CollationPlan {
/// Accumulable aggregation results to collate, if any.
pub accumulable: Option<AccumulablePlan>,
/// Hierarchical aggregation results to collate, if any.
pub hierarchical: Option<HierarchicalPlan>,
/// Basic aggregation results to collate, if any.
pub basic: Option<BasicPlan>,
/// When we get results back from each of the different
/// aggregation types, they will be subsequences of
/// the sequence aggregations in the original reduce expression.
/// We keep a map from output position -> reduction type
/// to easily merge results back into the requested order.
pub aggregate_types: Vec<ReductionType>,
}
impl CollationPlan {
/// Upgrades the hierarchical component of the collation plan to monotonic, if necessary,
/// and sets consolidation requirements.
pub fn as_monotonic(&mut self, must_consolidate: bool) {
self.hierarchical
.as_mut()
.map(|plan| plan.as_monotonic(must_consolidate));
}
}
impl RustType<ProtoCollationPlan> for CollationPlan {
fn into_proto(&self) -> ProtoCollationPlan {
ProtoCollationPlan {
accumulable: self.accumulable.into_proto(),
hierarchical: self.hierarchical.into_proto(),
basic: self.basic.into_proto(),
aggregate_types: self.aggregate_types.into_proto(),
}
}
fn from_proto(proto: ProtoCollationPlan) -> Result<Self, TryFromProtoError> {
Ok(Self {
accumulable: proto.accumulable.into_rust()?,
hierarchical: proto.hierarchical.into_rust()?,
basic: proto.basic.into_rust()?,
aggregate_types: proto.aggregate_types.into_rust()?,
})
}
}
impl ReducePlan {
/// Generate a plan for computing the supplied aggregations.
///
/// The resulting plan summarizes what the dataflow to be created
/// and how the aggregations will be executed.
pub fn create_from(
aggregates: Vec<AggregateExpr>,
monotonic: bool,
expected_group_size: Option<u64>,
fused_unnest_list: bool,
) -> Self {
// If we don't have any aggregations we are just computing a distinct.
if aggregates.is_empty() {
return ReducePlan::Distinct;
}
// Otherwise, we need to group aggregations according to their
// reduction type (accumulable, hierarchical, or basic)
let mut reduction_types = BTreeMap::new();
// We need to make sure that each list of aggregates by type forms
// a subsequence of the overall sequence of aggregates.
for index in 0..aggregates.len() {
let typ = reduction_type(&aggregates[index].func);
let aggregates_list = reduction_types.entry(typ).or_insert_with(Vec::new);
aggregates_list.push((index, aggregates[index].clone()));
}
// Convert each grouped list of reductions into a plan.
let plan: Vec<_> = reduction_types
.into_iter()
.map(|(typ, aggregates_list)| {
ReducePlan::create_inner(
typ,
aggregates_list,
monotonic,
expected_group_size,
fused_unnest_list,
)
})
.collect();
// If we only have a single type of aggregation present we can
// render that directly
if plan.len() == 1 {
return plan[0].clone();
}
// Otherwise, we have to stitch reductions together.
// First, lets sanity check that we don't have an impossible number
// of reduction types.
assert!(plan.len() <= 3);
let mut collation: CollationPlan = Default::default();
// Construct a mapping from output_position -> reduction that we can
// use to reconstruct the output in the correct order.
let aggregate_types = aggregates
.iter()
.map(|a| reduction_type(&a.func))
.collect::<Vec<_>>();
collation.aggregate_types = aggregate_types;
for expr in plan.into_iter() {
match expr {
ReducePlan::Accumulable(e) => {
assert_none!(collation.accumulable);
collation.accumulable = Some(e);
}
ReducePlan::Hierarchical(e) => {
assert_none!(collation.hierarchical);
collation.hierarchical = Some(e);
}
ReducePlan::Basic(e) => {
assert_none!(collation.basic);
collation.basic = Some(e);
}
ReducePlan::Distinct | ReducePlan::Collation(_) => {
panic!("Inner reduce plan was unsupported type!")
}
}
}
ReducePlan::Collation(collation)
}
/// Generate a plan for computing the specified type of aggregations.
///
/// This function assumes that all of the supplied aggregates are
/// actually of the correct reduction type, and are a subsequence
/// of the total list of requested aggregations.
fn create_inner(
typ: ReductionType,
aggregates_list: Vec<(usize, AggregateExpr)>,
monotonic: bool,
expected_group_size: Option<u64>,
fused_unnest_list: bool,
) -> Self {
if fused_unnest_list {
assert!(matches!(typ, ReductionType::Basic) && aggregates_list.len() == 1);
}
assert!(
aggregates_list.len() > 0,
"error: tried to render a reduce dataflow with no aggregates"
);
match typ {
ReductionType::Accumulable => {
let mut simple_aggrs = vec![];
let mut distinct_aggrs = vec![];
let full_aggrs: Vec<_> = aggregates_list
.iter()
.cloned()
.map(|(_, aggr)| aggr)
.collect();
for (accumulable_index, (datum_index, aggr)) in
aggregates_list.into_iter().enumerate()
{
// Accumulable aggregations need to do extra per-aggregate work
// for aggregations with the distinct bit set, so we'll separate
// those out now.
if aggr.distinct {
distinct_aggrs.push((accumulable_index, datum_index, aggr));
} else {
simple_aggrs.push((accumulable_index, datum_index, aggr));
};
}
ReducePlan::Accumulable(AccumulablePlan {
full_aggrs,
simple_aggrs,
distinct_aggrs,
})
}
ReductionType::Hierarchical => {
let aggr_funcs: Vec<_> = aggregates_list
.iter()
.cloned()
.map(|(_, aggr)| aggr.func)
.collect();
let indexes: Vec<_> = aggregates_list
.into_iter()
.map(|(index, _)| index)
.collect();
// We don't have random access over Rows so we can simplify the
// task of grabbing the inputs we are aggregating over by
// generating a list of "skips" an iterator over the Row needs
// to do to get the desired indexes.
let skips = convert_indexes_to_skips(indexes);
if monotonic {
let monotonic = MonotonicPlan {
aggr_funcs,
skips,
must_consolidate: false,
};
ReducePlan::Hierarchical(HierarchicalPlan::Monotonic(monotonic))
} else {
let buckets = bucketing_of_expected_group_size(expected_group_size);
let bucketed = BucketedPlan {
aggr_funcs,
skips,
buckets,
};
ReducePlan::Hierarchical(HierarchicalPlan::Bucketed(bucketed))
}
}
ReductionType::Basic => {
if aggregates_list.len() == 1 {
ReducePlan::Basic(BasicPlan::Single(SingleBasicPlan {
index: aggregates_list[0].0,
expr: aggregates_list[0].1.clone(),
fused_unnest_list,
}))
} else {
ReducePlan::Basic(BasicPlan::Multiple(aggregates_list))
}
}
}
}
/// Reports all keys of produced arrangements.
///
/// This is likely either an empty vector, for no arrangement,
/// or a singleton vector containing the list of expressions
/// that key a single arrangement.
pub fn keys(&self, key_arity: usize, arity: usize) -> AvailableCollections {
let key = (0..key_arity)
.map(MirScalarExpr::Column)
.collect::<Vec<_>>();
let (permutation, thinning) = permutation_for_arrangement(&key, arity);
AvailableCollections::new_arranged(vec![(key, permutation, thinning)], None)
}
/// Extracts a fusable MFP for the reduction from the given `mfp` along with a residual
/// non-fusable MFP and potentially revised output arity. The provided `mfp` must be the
/// one sitting on top of the reduction.
///
/// Non-fusable parts include temporal predicates or any other parts that cannot be
/// conservatively asserted to not increase the memory requirements of the output
/// arrangement for the reduction. Either the fusable or non-fusable parts may end up
/// being the identity MFP.
pub fn extract_mfp_after(
&self,
mut mfp: MapFilterProject,
key_arity: usize,
) -> (MapFilterProject, MapFilterProject, usize) {
// Extract temporal predicates, as we cannot push them into `Reduce`.
let temporal_mfp = mfp.extract_temporal();
let non_temporal = mfp;
mfp = temporal_mfp;
// We ensure we do not attempt to project away the key, as we cannot accomplish
// this. This is done by a simple analysis of the non-temporal part of `mfp` to
// check if can be directly absorbed; if it can't, we then default to a general
// strategy that unpacks the MFP to absorb only the filter and supporting map
// parts, followed by a post-MFP step.
let input_arity = non_temporal.input_arity;
let key = Vec::from_iter(0..key_arity);
let mut mfp_push;
let output_arity;
if non_temporal.projection.len() <= input_arity
&& non_temporal.projection.iter().all(|c| *c < input_arity)
&& non_temporal.projection.starts_with(&key)
{
// Special case: The key is preserved as a prefix and the projection is only
// of output fields from the reduction. So we know that: (a) We can process the
// fused MFP per-key; (b) The MFP application gets rid of all mapped columns;
// and (c) The output projection is at most as wide as the output that would be
// produced by the reduction, so we are sure to never regress the memory
// requirements of the output arrangement.
// Note that this strategy may change the arity of the output arrangement.
output_arity = non_temporal.projection.len();
mfp_push = non_temporal;
} else {
// General strategy: Unpack MFP as MF followed by P' that removes all M
// columns, then MP afterwards.
// Note that this strategy does not result in any changes to the arity of
// the output arrangement.
let (m, f, p) = non_temporal.into_map_filter_project();
mfp_push = MapFilterProject::new(input_arity)
.map(m.clone())
.filter(f)
.project(0..input_arity);
output_arity = input_arity;
// We still need to perform the map and projection for the actual output.
let mfp_left = MapFilterProject::new(input_arity).map(m).project(p);
// Compose the non-pushed MFP components.
mfp = MapFilterProject::compose(mfp_left, mfp);
}
mfp_push.optimize();
mfp.optimize();
(mfp_push, mfp, output_arity)
}
}
/// Plan for extracting keys and values in preparation for a reduction.
#[derive(Arbitrary, Clone, Debug, Serialize, Deserialize, Eq, PartialEq, Ord, PartialOrd)]
pub struct KeyValPlan {
/// Extracts the columns used as the key.
pub key_plan: mz_expr::SafeMfpPlan,
/// Extracts the columns used to feed the aggregations.
pub val_plan: mz_expr::SafeMfpPlan,
}
impl RustType<ProtoKeyValPlan> for KeyValPlan {
fn into_proto(&self) -> ProtoKeyValPlan {
ProtoKeyValPlan {
key_plan: Some(self.key_plan.into_proto()),
val_plan: Some(self.val_plan.into_proto()),
}
}
fn from_proto(proto: ProtoKeyValPlan) -> Result<Self, TryFromProtoError> {
Ok(Self {
key_plan: proto
.key_plan
.into_rust_if_some("ProtoKeyValPlan::key_plan")?,
val_plan: proto
.val_plan
.into_rust_if_some("ProtoKeyValPlan::val_plan")?,
})
}
}
impl KeyValPlan {
/// Create a new [KeyValPlan] from aggregation arguments.
pub fn new(
input_arity: usize,
group_key: &[MirScalarExpr],
aggregates: &[AggregateExpr],
input_permutation_and_new_arity: Option<(BTreeMap<usize, usize>, usize)>,
) -> Self {
// Form an operator for evaluating key expressions.
let mut key_mfp = MapFilterProject::new(input_arity)
.map(group_key.iter().cloned())
.project(input_arity..(input_arity + group_key.len()));
if let Some((input_permutation, new_arity)) = input_permutation_and_new_arity.clone() {
key_mfp.permute(input_permutation, new_arity);
}
// Form an operator for evaluating value expressions.
let mut val_mfp = MapFilterProject::new(input_arity)
.map(aggregates.iter().map(|a| a.expr.clone()))
.project(input_arity..(input_arity + aggregates.len()));
if let Some((input_permutation, new_arity)) = input_permutation_and_new_arity {
val_mfp.permute(input_permutation, new_arity);
}
key_mfp.optimize();
let key_plan = key_mfp.into_plan().unwrap().into_nontemporal().unwrap();
val_mfp.optimize();
let val_plan = val_mfp.into_plan().unwrap().into_nontemporal().unwrap();
Self { key_plan, val_plan }
}
/// The arity of the key plan
pub fn key_arity(&self) -> usize {
self.key_plan.projection.len()
}
}
/// Transforms a vector containing indexes of needed columns into one containing
/// the "skips" an iterator over a Row would need to perform to see those values.
///
/// This function requires that all of the elements in `indexes` are strictly
/// increasing.
///
/// # Examples
///
/// ```
/// use mz_compute_types::plan::reduce::convert_indexes_to_skips;
/// assert_eq!(convert_indexes_to_skips(vec![3, 6, 10, 15]), [3, 2, 3, 4])
/// ```
pub fn convert_indexes_to_skips(mut indexes: Vec<usize>) -> Vec<usize> {
for i in 1..indexes.len() {
soft_assert_or_log!(
indexes[i - 1] < indexes[i],
"convert_indexes_to_skip needs indexes to be strictly increasing. Received: {:?}",
indexes,
);
}
for i in (1..indexes.len()).rev() {
indexes[i] -= indexes[i - 1];
indexes[i] -= 1;
}
indexes
}
/// Determines whether a function can be accumulated in an update's "difference" field,
/// and whether it can be subjected to recursive (hierarchical) aggregation.
///
/// Accumulable aggregations will be packed into differential dataflow's "difference" field,
/// which can be accumulated in-place using the addition operation on the type. Aggregations
/// that indicate they are accumulable will still need to provide an action that takes their
/// data and introduces it as a difference, and the post-processing when the accumulated value
/// is presented as data.
///
/// Hierarchical aggregations will be subjected to repeated aggregation on initially small but
/// increasingly large subsets of each key. This has the intended property that no invocation
/// is on a significantly large set of values (and so, no incremental update needs to reform
/// significant input data). Hierarchical aggregates can be rendered more efficiently if the
/// input stream is append-only as then we only need to retain the "currently winning" value.
/// Every hierarchical aggregate needs to supply a corresponding ReductionMonoid implementation.
pub fn reduction_type(func: &AggregateFunc) -> ReductionType {
match func {
AggregateFunc::SumInt16
| AggregateFunc::SumInt32
| AggregateFunc::SumInt64
| AggregateFunc::SumUInt16
| AggregateFunc::SumUInt32
| AggregateFunc::SumUInt64
| AggregateFunc::SumFloat32
| AggregateFunc::SumFloat64
| AggregateFunc::SumNumeric
| AggregateFunc::Count
| AggregateFunc::Any
| AggregateFunc::All
| AggregateFunc::Dummy => ReductionType::Accumulable,
AggregateFunc::MaxNumeric
| AggregateFunc::MaxInt16
| AggregateFunc::MaxInt32
| AggregateFunc::MaxInt64
| AggregateFunc::MaxUInt16
| AggregateFunc::MaxUInt32
| AggregateFunc::MaxUInt64
| AggregateFunc::MaxMzTimestamp
| AggregateFunc::MaxFloat32
| AggregateFunc::MaxFloat64
| AggregateFunc::MaxBool
| AggregateFunc::MaxString
| AggregateFunc::MaxDate
| AggregateFunc::MaxTimestamp
| AggregateFunc::MaxTimestampTz
| AggregateFunc::MaxInterval
| AggregateFunc::MaxTime
| AggregateFunc::MinNumeric
| AggregateFunc::MinInt16
| AggregateFunc::MinInt32
| AggregateFunc::MinInt64
| AggregateFunc::MinUInt16
| AggregateFunc::MinUInt32
| AggregateFunc::MinUInt64
| AggregateFunc::MinMzTimestamp
| AggregateFunc::MinInterval
| AggregateFunc::MinFloat32
| AggregateFunc::MinFloat64
| AggregateFunc::MinBool
| AggregateFunc::MinString
| AggregateFunc::MinDate
| AggregateFunc::MinTimestamp
| AggregateFunc::MinTimestampTz
| AggregateFunc::MinTime => ReductionType::Hierarchical,
AggregateFunc::JsonbAgg { .. }
| AggregateFunc::JsonbObjectAgg { .. }
| AggregateFunc::MapAgg { .. }
| AggregateFunc::ArrayConcat { .. }
| AggregateFunc::ListConcat { .. }
| AggregateFunc::StringAgg { .. }
| AggregateFunc::RowNumber { .. }
| AggregateFunc::Rank { .. }
| AggregateFunc::DenseRank { .. }
| AggregateFunc::LagLead { .. }
| AggregateFunc::FirstValue { .. }
| AggregateFunc::LastValue { .. }
| AggregateFunc::WindowAggregate { .. }
| AggregateFunc::FusedValueWindowFunc { .. }
| AggregateFunc::FusedWindowAggregate { .. } => ReductionType::Basic,
}
}
#[cfg(test)]
mod tests {
use mz_ore::assert_ok;
use mz_proto::protobuf_roundtrip;
use proptest::prelude::*;
use super::*;
// This test causes stack overflows if not run with --release,
// ignore by default.
proptest! {
#[mz_ore::test]
#[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `decContextDefault` on OS `linux`
fn reduce_plan_protobuf_roundtrip(expect in any::<ReducePlan>() ) {
let actual = protobuf_roundtrip::<_, ProtoReducePlan>(&expect);
assert_ok!(actual);
assert_eq!(actual.unwrap(), expect);
}
}
}