1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104
// Copyright Materialize, Inc. and contributors. All rights reserved.
//
// Use of this software is governed by the Business Source License
// included in the LICENSE file.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.
//! Threshold planning logic.
//!
//! The threshold operator produces only rows with a positive cardinality, for example required to
//! provide SQL except and intersect semantics.
//!
//! We build a plan ([ThresholdPlan]) encapsulating all decisions and requirements on the specific
//! threshold implementation. The idea is to decouple the logic deciding which plan to select from
//! the actual implementation of each variant available.
//!
//! Currently, we provide two variants:
//! * The [BasicThresholdPlan] maintains all its outputs as an arrangement. It is beneficial if the
//! threshold is the final operation, or a downstream operators expects arranged inputs.
//! * The [RetractionsThresholdPlan] maintains retractions, i.e. rows that are not in the output. It
//! is beneficial to use this operator if the number of retractions is expected to be small, and
//! if a potential downstream operator does not expect its input to be arranged.
use std::collections::HashMap;
use expr::{permutation_for_arrangement, MirScalarExpr};
use serde::{Deserialize, Serialize};
use super::AvailableCollections;
/// A plan describing how to compute a threshold operation.
#[derive(Clone, Debug, Serialize, Deserialize)]
pub enum ThresholdPlan {
/// Basic threshold maintains all positive inputs.
Basic(BasicThresholdPlan),
/// Retractions threshold maintains all negative inputs.
Retractions(RetractionsThresholdPlan),
}
impl ThresholdPlan {
/// Reports all keys of produced arrangements.
///
/// This is likely either an empty vector, for no arrangement,
/// or a singleton vector containing the list of expressions
/// that key a single arrangement.
pub fn keys(&self) -> AvailableCollections {
match self {
ThresholdPlan::Basic(plan) => {
AvailableCollections::new_arranged(vec![plan.ensure_arrangement.clone()])
}
ThresholdPlan::Retractions(_plan) => AvailableCollections::new_raw(),
}
}
}
/// A plan to maintain all inputs with positive counts.
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct BasicThresholdPlan {
/// Description of how the input has been arranged, and how to arrange the output
pub ensure_arrangement: (Vec<MirScalarExpr>, HashMap<usize, usize>, Vec<usize>),
}
/// A plan to maintain all inputs with negative counts, which are subtracted from the output
/// in order to maintain an equivalent collection compared to [BasicThresholdPlan].
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct RetractionsThresholdPlan {
/// Description of how the input has been arranged
pub ensure_arrangement: (Vec<MirScalarExpr>, HashMap<usize, usize>, Vec<usize>),
}
impl ThresholdPlan {
/// Construct the plan from the number of columns (`arity`). `maintain_retractions` allows to
/// switch between an implementation that maintains rows with negative counts (`true`), or
/// rows with positive counts (`false`).
///
/// Also returns the arrangement and thinning required for the input.
pub fn create_from(
arity: usize,
maintain_retractions: bool,
) -> (
Self,
(Vec<MirScalarExpr>, HashMap<usize, usize>, Vec<usize>),
) {
// Arrange the input by all columns in order.
let mut all_columns = Vec::new();
for column in 0..arity {
all_columns.push(expr::MirScalarExpr::Column(column));
}
let (permutation, thinning) = permutation_for_arrangement(&all_columns, arity);
let ensure_arrangement = (all_columns, permutation, thinning);
let plan = if maintain_retractions {
ThresholdPlan::Retractions(RetractionsThresholdPlan {
ensure_arrangement: ensure_arrangement.clone(),
})
} else {
ThresholdPlan::Basic(BasicThresholdPlan {
ensure_arrangement: ensure_arrangement.clone(),
})
};
(plan, ensure_arrangement)
}
}