1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98
// Copyright Materialize, Inc. and contributors. All rights reserved.
//
// Use of this software is governed by the Business Source License
// included in the LICENSE file.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.
use proptest::prelude::any;
use proptest_derive::Arbitrary;
use serde::{Deserialize, Serialize};
use mz_expr::{MapFilterProject, MirScalarExpr};
use mz_proto::{ProtoType, RustType, TryFromProtoError};
use mz_repr::{Datum, RelationType};
include!(concat!(env!("OUT_DIR"), "/mz_storage.client.transforms.rs"));
// TODO: change contract to ensure that the operator is always applied to
// streams of rows
/// In-place restrictions that can be made to rows.
///
/// These fields indicate *optional* information that may applied to
/// streams of rows. Any row that does not satisfy all predicates may
/// be discarded, and any column not listed in the projection may be
/// replaced by a default value.
///
/// The intended order of operations is that the predicates are first
/// applied, and columns not in projection can then be overwritten with
/// default values. This allows the projection to avoid capturing columns
/// used by the predicates but not otherwise required.
#[derive(Arbitrary, Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Serialize, Deserialize, Hash)]
pub struct LinearOperator {
/// Rows that do not pass all predicates may be discarded.
#[proptest(strategy = "proptest::collection::vec(any::<MirScalarExpr>(), 0..2)")]
pub predicates: Vec<MirScalarExpr>,
/// Columns not present in `projection` may be replaced with
/// default values.
pub projection: Vec<usize>,
}
impl RustType<ProtoLinearOperator> for LinearOperator {
fn into_proto(&self) -> ProtoLinearOperator {
ProtoLinearOperator {
predicates: self.predicates.into_proto(),
projection: self.projection.into_proto(),
}
}
fn from_proto(proto: ProtoLinearOperator) -> Result<Self, TryFromProtoError> {
Ok(LinearOperator {
predicates: proto.predicates.into_rust()?,
projection: proto.projection.into_rust()?,
})
}
}
impl LinearOperator {
/// Reports whether this linear operator is trivial when applied to an
/// input of the specified arity.
pub fn is_trivial(&self, arity: usize) -> bool {
self.predicates.is_empty() && self.projection.iter().copied().eq(0..arity)
}
/// Converts linear operators to MapFilterProject instances.
///
/// This method produces a `MapFilterProject` instance that first applies any predicates,
/// and then introduces `Datum::Dummy` literals in columns that are not demanded.
/// The `RelationType` is required so that we can fill in the correct type of `Datum::Dummy`.
pub fn to_mfp(self, typ: &RelationType) -> MapFilterProject {
let LinearOperator {
predicates,
projection,
} = self;
let arity = typ.arity();
let mut dummies = Vec::new();
let mut demand_projection = Vec::new();
for (column, typ) in typ.column_types.iter().enumerate() {
if projection.contains(&column) {
demand_projection.push(column);
} else {
demand_projection.push(arity + dummies.len());
dummies.push(MirScalarExpr::literal_ok(
Datum::Dummy,
typ.scalar_type.clone(),
));
}
}
// First filter, then introduce and reposition `Datum::Dummy` values.
MapFilterProject::new(arity)
.filter(predicates)
.map(dummies)
.project(demand_projection)
}
}