1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861 862 863 864 865 866 867 868 869 870 871 872 873 874 875 876 877 878 879 880 881 882 883 884 885 886 887 888 889 890 891 892 893 894 895 896 897 898 899 900 901 902 903 904 905 906 907 908 909 910 911 912 913 914 915 916 917 918 919 920 921 922 923 924 925 926 927 928 929 930 931 932 933 934 935 936 937 938 939 940 941 942 943 944 945 946 947 948 949 950 951 952 953 954 955 956 957 958 959 960 961 962 963 964 965 966 967 968 969 970 971 972 973 974 975 976 977 978 979 980 981 982 983 984 985 986 987 988 989 990 991 992 993 994 995 996 997 998 999 1000 1001 1002 1003 1004 1005 1006 1007 1008 1009 1010 1011 1012 1013 1014 1015 1016 1017 1018 1019 1020 1021 1022 1023 1024 1025 1026 1027 1028 1029 1030 1031 1032 1033 1034 1035 1036 1037 1038 1039 1040 1041 1042 1043 1044 1045 1046 1047 1048 1049 1050 1051 1052 1053 1054 1055 1056 1057 1058 1059 1060 1061 1062 1063 1064 1065 1066 1067 1068 1069 1070 1071 1072 1073 1074 1075 1076 1077 1078 1079 1080 1081 1082 1083 1084 1085 1086 1087 1088 1089 1090 1091 1092 1093 1094 1095 1096 1097 1098 1099 1100 1101 1102 1103 1104 1105 1106 1107 1108 1109 1110 1111 1112 1113 1114 1115 1116 1117 1118 1119 1120 1121 1122 1123 1124 1125 1126 1127 1128 1129 1130 1131 1132 1133 1134 1135 1136 1137 1138 1139 1140 1141 1142 1143 1144 1145 1146 1147 1148 1149 1150 1151 1152 1153 1154 1155 1156 1157 1158 1159 1160 1161 1162 1163 1164 1165 1166 1167 1168 1169 1170 1171 1172 1173 1174 1175 1176 1177 1178 1179 1180 1181 1182 1183 1184 1185 1186 1187 1188 1189 1190 1191 1192 1193 1194 1195 1196 1197 1198 1199 1200 1201 1202 1203 1204 1205 1206 1207 1208 1209 1210 1211 1212 1213 1214 1215 1216 1217 1218 1219 1220 1221 1222 1223 1224 1225 1226 1227 1228 1229 1230 1231 1232 1233 1234 1235 1236 1237 1238 1239 1240 1241 1242 1243 1244 1245 1246 1247 1248 1249 1250 1251 1252 1253 1254 1255 1256 1257 1258 1259 1260 1261 1262 1263 1264 1265 1266 1267 1268 1269 1270 1271 1272 1273 1274 1275 1276 1277 1278 1279 1280 1281 1282 1283 1284 1285 1286 1287 1288 1289 1290 1291 1292 1293 1294 1295 1296 1297 1298 1299 1300 1301 1302 1303 1304 1305 1306 1307 1308 1309 1310 1311 1312 1313 1314 1315 1316 1317 1318 1319 1320 1321 1322 1323 1324 1325 1326 1327 1328 1329 1330 1331 1332 1333 1334 1335 1336 1337 1338 1339 1340 1341 1342 1343 1344 1345 1346 1347 1348 1349 1350 1351 1352 1353 1354 1355 1356 1357 1358 1359 1360 1361 1362 1363 1364 1365 1366 1367 1368 1369 1370 1371 1372 1373 1374 1375 1376 1377 1378 1379 1380 1381 1382 1383 1384 1385 1386 1387 1388 1389 1390 1391 1392 1393 1394 1395 1396 1397 1398 1399 1400 1401 1402 1403 1404 1405 1406 1407 1408 1409 1410 1411 1412 1413 1414 1415 1416 1417 1418 1419 1420 1421 1422 1423 1424 1425 1426 1427 1428 1429 1430 1431 1432 1433 1434 1435 1436 1437 1438 1439 1440 1441 1442 1443 1444 1445 1446 1447 1448 1449 1450 1451 1452 1453 1454 1455 1456 1457 1458 1459 1460 1461 1462 1463 1464 1465 1466 1467 1468 1469 1470 1471 1472 1473 1474 1475 1476 1477 1478 1479 1480 1481 1482 1483 1484 1485 1486 1487 1488 1489 1490 1491 1492 1493 1494 1495 1496 1497 1498 1499 1500 1501 1502 1503 1504 1505 1506 1507 1508 1509 1510 1511 1512 1513 1514 1515 1516 1517 1518 1519 1520 1521 1522 1523 1524 1525 1526 1527 1528 1529 1530 1531 1532 1533 1534 1535 1536 1537 1538 1539 1540 1541 1542 1543 1544 1545 1546 1547 1548 1549 1550 1551 1552 1553 1554 1555 1556 1557 1558 1559 1560 1561 1562 1563 1564 1565 1566 1567 1568 1569 1570 1571 1572 1573 1574 1575 1576 1577 1578 1579 1580 1581 1582 1583 1584 1585 1586 1587 1588 1589 1590 1591 1592 1593 1594 1595 1596 1597 1598 1599 1600 1601 1602 1603 1604 1605 1606 1607 1608 1609 1610 1611 1612 1613 1614 1615 1616 1617 1618 1619 1620 1621 1622 1623 1624 1625 1626 1627 1628 1629 1630 1631 1632 1633 1634 1635 1636 1637 1638 1639 1640 1641 1642 1643 1644 1645 1646 1647 1648 1649 1650 1651 1652 1653 1654 1655 1656 1657 1658 1659 1660 1661 1662 1663 1664 1665 1666 1667 1668 1669 1670 1671 1672 1673 1674 1675 1676 1677 1678 1679 1680 1681 1682 1683 1684 1685 1686 1687 1688 1689 1690 1691 1692 1693 1694 1695 1696 1697 1698 1699 1700 1701 1702 1703 1704 1705 1706 1707 1708 1709 1710 1711 1712 1713 1714 1715 1716 1717 1718 1719 1720 1721 1722 1723 1724 1725 1726 1727 1728 1729 1730 1731 1732 1733 1734 1735 1736 1737 1738 1739 1740 1741 1742 1743 1744 1745 1746 1747 1748 1749 1750 1751 1752 1753 1754 1755 1756 1757 1758 1759 1760 1761 1762 1763 1764 1765 1766 1767 1768 1769 1770 1771 1772 1773 1774 1775 1776 1777 1778 1779 1780 1781 1782 1783 1784 1785 1786 1787 1788 1789 1790 1791 1792 1793 1794 1795 1796 1797 1798 1799 1800 1801 1802 1803 1804 1805 1806 1807 1808 1809 1810 1811 1812 1813 1814 1815 1816 1817 1818 1819 1820 1821 1822 1823 1824 1825 1826 1827 1828 1829 1830 1831 1832 1833 1834 1835 1836 1837 1838 1839 1840 1841 1842 1843 1844 1845 1846 1847 1848 1849 1850 1851 1852 1853 1854 1855 1856 1857 1858 1859 1860 1861 1862 1863 1864 1865 1866 1867 1868 1869 1870 1871 1872 1873 1874 1875 1876 1877 1878 1879 1880 1881 1882 1883 1884 1885 1886 1887 1888 1889 1890 1891 1892 1893 1894 1895 1896 1897 1898 1899 1900 1901 1902 1903 1904 1905 1906 1907 1908 1909 1910 1911 1912 1913 1914 1915 1916 1917 1918 1919 1920 1921 1922 1923 1924 1925 1926 1927 1928 1929 1930 1931 1932 1933 1934 1935 1936 1937 1938 1939 1940 1941 1942 1943 1944 1945 1946 1947 1948 1949 1950 1951 1952 1953 1954 1955 1956 1957 1958 1959 1960 1961 1962 1963 1964 1965 1966 1967 1968 1969 1970 1971 1972 1973 1974 1975 1976 1977 1978 1979 1980 1981 1982 1983 1984 1985 1986 1987 1988 1989 1990 1991
// Copyright Materialize, Inc. and contributors. All rights reserved.
//
// Use of this software is governed by the Business Source License
// included in the LICENSE file.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.
use std::collections::{BTreeMap, BTreeSet};
use std::fmt::Display;
use mz_proto::{IntoRustIfSome, ProtoType, RustType, TryFromProtoError};
use mz_repr::{Datum, Row};
use proptest::prelude::*;
use serde::{Deserialize, Serialize};
use crate::linear::proto_map_filter_project::ProtoPredicate;
use crate::visit::Visit;
use crate::{MirRelationExpr, MirScalarExpr};
include!(concat!(env!("OUT_DIR"), "/mz_expr.linear.rs"));
/// A compound operator that can be applied row-by-row.
///
/// This operator integrates the map, filter, and project operators.
/// It applies a sequences of map expressions, which are allowed to
/// refer to previous expressions, interleaved with predicates which
/// must be satisfied for an output to be produced. If all predicates
/// evaluate to `Datum::True` the data at the identified columns are
/// collected and produced as output in a packed `Row`.
///
/// This operator is a "builder" and its contents may contain expressions
/// that are not yet executable. For example, it may contain temporal
/// expressions in `self.expressions`, even though this is not something
/// we can directly evaluate. The plan creation methods will defensively
/// ensure that the right thing happens.
#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize, Hash, Ord, PartialOrd)]
pub struct MapFilterProject {
/// A sequence of expressions that should be appended to the row.
///
/// Many of these expressions may not be produced in the output,
/// and may only be present as common subexpressions.
pub expressions: Vec<MirScalarExpr>,
/// Expressions that must evaluate to `Datum::True` for the output
/// row to be produced.
///
/// Each entry is prepended with a column identifier indicating
/// the column *before* which the predicate should first be applied.
/// Most commonly this would be one plus the largest column identifier
/// in the predicate's support, but it could be larger to implement
/// guarded evaluation of predicates.
///
/// This list should be sorted by the first field.
pub predicates: Vec<(usize, MirScalarExpr)>,
/// A sequence of column identifiers whose data form the output row.
pub projection: Vec<usize>,
/// The expected number of input columns.
///
/// This is needed to ensure correct identification of newly formed
/// columns in the output.
pub input_arity: usize,
}
/// Use a custom [`Arbitrary`] implementation to cap the number of internal
/// [`MirScalarExpr`] referenced by the generated [`MapFilterProject`] instances.
impl Arbitrary for MapFilterProject {
type Parameters = ();
type Strategy = BoxedStrategy<Self>;
fn arbitrary_with(_: Self::Parameters) -> Self::Strategy {
(
prop::collection::vec(any::<MirScalarExpr>(), 0..10),
prop::collection::vec((any::<usize>(), any::<MirScalarExpr>()), 0..10),
prop::collection::vec(any::<usize>(), 0..10),
usize::arbitrary(),
)
.prop_map(
|(expressions, predicates, projection, input_arity)| MapFilterProject {
expressions,
predicates,
projection,
input_arity,
},
)
.boxed()
}
}
impl RustType<ProtoMapFilterProject> for MapFilterProject {
fn into_proto(&self) -> ProtoMapFilterProject {
ProtoMapFilterProject {
expressions: self.expressions.into_proto(),
predicates: self.predicates.into_proto(),
projection: self.projection.into_proto(),
input_arity: self.input_arity.into_proto(),
}
}
fn from_proto(proto: ProtoMapFilterProject) -> Result<Self, TryFromProtoError> {
Ok(MapFilterProject {
expressions: proto.expressions.into_rust()?,
predicates: proto.predicates.into_rust()?,
projection: proto.projection.into_rust()?,
input_arity: proto.input_arity.into_rust()?,
})
}
}
impl RustType<ProtoPredicate> for (usize, MirScalarExpr) {
fn into_proto(&self) -> ProtoPredicate {
ProtoPredicate {
column_to_apply: self.0.into_proto(),
predicate: Some(self.1.into_proto()),
}
}
fn from_proto(proto: ProtoPredicate) -> Result<Self, TryFromProtoError> {
Ok((
proto.column_to_apply.into_rust()?,
proto
.predicate
.into_rust_if_some("ProtoPredicate::predicate")?,
))
}
}
impl Display for MapFilterProject {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
writeln!(f, "MapFilterProject(")?;
writeln!(f, " expressions:")?;
self.expressions
.iter()
.enumerate()
.try_for_each(|(i, e)| writeln!(f, " #{} <- {},", i + self.input_arity, e))?;
writeln!(f, " predicates:")?;
self.predicates
.iter()
.try_for_each(|(before, p)| writeln!(f, " <before: {}> {},", before, p))?;
writeln!(f, " projection: {:?}", self.projection)?;
writeln!(f, " input_arity: {}", self.input_arity)?;
writeln!(f, ")")
}
}
impl MapFilterProject {
/// Create a no-op operator for an input of a supplied arity.
pub fn new(input_arity: usize) -> Self {
Self {
expressions: Vec::new(),
predicates: Vec::new(),
projection: (0..input_arity).collect(),
input_arity,
}
}
/// Given two mfps, return an mfp that applies one
/// followed by the other.
/// Note that the arguments are in the opposite order
/// from how function composition is usually written in mathematics.
pub fn compose(before: Self, after: Self) -> Self {
let (m, f, p) = after.into_map_filter_project();
before.map(m).filter(f).project(p)
}
/// True if the operator describes the identity transformation.
pub fn is_identity(&self) -> bool {
self.expressions.is_empty()
&& self.predicates.is_empty()
&& self.projection.len() == self.input_arity
&& self.projection.iter().enumerate().all(|(i, p)| i == *p)
}
/// Retain only the indicated columns in the presented order.
pub fn project<I>(mut self, columns: I) -> Self
where
I: IntoIterator<Item = usize> + std::fmt::Debug,
{
self.projection = columns.into_iter().map(|c| self.projection[c]).collect();
self
}
/// Retain only rows satisfying these predicates.
///
/// This method introduces predicates as eagerly as they can be evaluated,
/// which may not be desired for predicates that may cause exceptions.
/// If fine manipulation is required, the predicates can be added manually.
pub fn filter<I>(mut self, predicates: I) -> Self
where
I: IntoIterator<Item = MirScalarExpr>,
{
for mut predicate in predicates {
// Correct column references.
predicate.permute(&self.projection[..]);
// Validate column references.
assert!(predicate
.support()
.into_iter()
.all(|c| c < self.input_arity + self.expressions.len()));
// Insert predicate as eagerly as it can be evaluated:
// just after the largest column in its support is formed.
let max_support = predicate
.support()
.into_iter()
.max()
.map(|c| c + 1)
.unwrap_or(0);
self.predicates.push((max_support, predicate))
}
// Stable sort predicates by position at which they take effect.
// We put literal errors at the end as a stop-gap to avoid erroring
// before we are able to evaluate any predicates that might prevent it.
self.predicates
.sort_by_key(|(position, predicate)| (predicate.is_literal_err(), *position));
self
}
/// Append the result of evaluating expressions to each row.
pub fn map<I>(mut self, expressions: I) -> Self
where
I: IntoIterator<Item = MirScalarExpr>,
{
for mut expression in expressions {
// Correct column references.
expression.permute(&self.projection[..]);
// Validate column references.
assert!(expression
.support()
.into_iter()
.all(|c| c < self.input_arity + self.expressions.len()));
// Introduce expression and produce as output.
self.expressions.push(expression);
self.projection
.push(self.input_arity + self.expressions.len() - 1);
}
self
}
/// Like [`MapFilterProject::as_map_filter_project`], but consumes `self` rather than cloning.
pub fn into_map_filter_project(self) -> (Vec<MirScalarExpr>, Vec<MirScalarExpr>, Vec<usize>) {
let predicates = self
.predicates
.into_iter()
.map(|(_pos, predicate)| predicate)
.collect();
(self.expressions, predicates, self.projection)
}
/// As the arguments to `Map`, `Filter`, and `Project` operators.
///
/// In principle, this operator can be implemented as a sequence of
/// more elemental operators, likely less efficiently.
pub fn as_map_filter_project(&self) -> (Vec<MirScalarExpr>, Vec<MirScalarExpr>, Vec<usize>) {
self.clone().into_map_filter_project()
}
/// Determines if a scalar expression must be equal to a literal datum.
pub fn literal_constraint(&self, expr: &MirScalarExpr) -> Option<Datum> {
for (_pos, predicate) in self.predicates.iter() {
if let MirScalarExpr::CallBinary {
func: crate::BinaryFunc::Eq,
expr1,
expr2,
} = predicate
{
if let Some(Ok(datum1)) = expr1.as_literal() {
if &**expr2 == expr {
return Some(datum1);
}
}
if let Some(Ok(datum2)) = expr2.as_literal() {
if &**expr1 == expr {
return Some(datum2);
}
}
}
}
None
}
/// Determines if a sequence of scalar expressions must be equal to a literal row.
///
/// This method returns `None` on an empty `exprs`, which might be surprising, but
/// seems to line up with its callers' expectations of that being a non-constraint.
/// The caller knows if `exprs` is empty, and can modify their behavior appropriately.
/// if they would rather have a literal empty row.
pub fn literal_constraints(&self, exprs: &[MirScalarExpr]) -> Option<Row> {
if exprs.is_empty() {
return None;
}
let mut row = Row::default();
let mut packer = row.packer();
for expr in exprs {
if let Some(literal) = self.literal_constraint(expr) {
packer.push(literal);
} else {
return None;
}
}
Some(row)
}
/// Extracts any MapFilterProject at the root of the expression.
///
/// The expression will be modified to extract any maps, filters, and
/// projections, which will be returned as `Self`. If there are no maps,
/// filters, or projections the method will return an identity operator.
///
/// The extracted expressions may contain temporal predicates, and one
/// should be careful to apply them blindly.
pub fn extract_from_expression(expr: &MirRelationExpr) -> (Self, &MirRelationExpr) {
// TODO: This could become iterative rather than recursive if
// we were able to fuse MFP operators from below, rather than
// from above.
match expr {
MirRelationExpr::Map { input, scalars } => {
let (mfp, expr) = Self::extract_from_expression(input);
(mfp.map(scalars.iter().cloned()), expr)
}
MirRelationExpr::Filter { input, predicates } => {
let (mfp, expr) = Self::extract_from_expression(input);
(mfp.filter(predicates.iter().cloned()), expr)
}
MirRelationExpr::Project { input, outputs } => {
let (mfp, expr) = Self::extract_from_expression(input);
(mfp.project(outputs.iter().cloned()), expr)
}
x => (Self::new(x.arity()), x),
}
}
/// Extracts an error-free MapFilterProject at the root of the expression.
///
/// The expression will be modified to extract maps, filters, and projects
/// from the root of the expression, which will be returned as `Self`. The
/// extraction will halt if a Map or Filter containing a literal error is
/// reached. Otherwise, the method will return an identity operator.
///
/// This method is meant to be used during optimization, where it is
/// necessary to avoid moving around maps and filters with errors.
pub fn extract_non_errors_from_expr(expr: &MirRelationExpr) -> (Self, &MirRelationExpr) {
match expr {
MirRelationExpr::Map { input, scalars }
if scalars.iter().all(|s| !s.is_literal_err()) =>
{
let (mfp, expr) = Self::extract_non_errors_from_expr(input);
(mfp.map(scalars.iter().cloned()), expr)
}
MirRelationExpr::Filter { input, predicates }
if predicates.iter().all(|p| !p.is_literal_err()) =>
{
let (mfp, expr) = Self::extract_non_errors_from_expr(input);
(mfp.filter(predicates.iter().cloned()), expr)
}
MirRelationExpr::Project { input, outputs } => {
let (mfp, expr) = Self::extract_non_errors_from_expr(input);
(mfp.project(outputs.iter().cloned()), expr)
}
x => (Self::new(x.arity()), x),
}
}
/// Extracts an error-free MapFilterProject at the root of the expression.
///
/// Differs from [MapFilterProject::extract_non_errors_from_expr] by taking and returning a
/// mutable reference.
pub fn extract_non_errors_from_expr_ref_mut(
expr: &mut MirRelationExpr,
) -> (Self, &mut MirRelationExpr) {
// This is essentially the same code as `extract_non_errors_from_expr`, except the seemingly
// superfluous outer if, which works around a borrow-checker issue:
// https://github.com/rust-lang/rust/issues/54663
if matches!(expr, MirRelationExpr::Map { input: _, scalars } if scalars.iter().all(|s| !s.is_literal_err()))
|| matches!(expr, MirRelationExpr::Filter { input: _, predicates } if predicates.iter().all(|p| !p.is_literal_err()))
|| matches!(expr, MirRelationExpr::Project { .. })
{
match expr {
MirRelationExpr::Map { input, scalars }
if scalars.iter().all(|s| !s.is_literal_err()) =>
{
let (mfp, expr) = Self::extract_non_errors_from_expr_ref_mut(input);
(mfp.map(scalars.iter().cloned()), expr)
}
MirRelationExpr::Filter { input, predicates }
if predicates.iter().all(|p| !p.is_literal_err()) =>
{
let (mfp, expr) = Self::extract_non_errors_from_expr_ref_mut(input);
(mfp.filter(predicates.iter().cloned()), expr)
}
MirRelationExpr::Project { input, outputs } => {
let (mfp, expr) = Self::extract_non_errors_from_expr_ref_mut(input);
(mfp.project(outputs.iter().cloned()), expr)
}
_ => unreachable!(),
}
} else {
(Self::new(expr.arity()), expr)
}
}
/// Removes an error-free MapFilterProject from the root of the expression.
///
/// The expression will be modified to extract maps, filters, and projects
/// from the root of the expression, which will be returned as `Self`. The
/// extraction will halt if a Map or Filter containing a literal error is
/// reached. Otherwise, the method will return an
/// identity operator, and the expression will remain unchanged.
///
/// This method is meant to be used during optimization, where it is
/// necessary to avoid moving around maps and filters with errors.
pub fn extract_non_errors_from_expr_mut(expr: &mut MirRelationExpr) -> Self {
match expr {
MirRelationExpr::Map { input, scalars }
if scalars.iter().all(|s| !s.is_literal_err()) =>
{
let mfp =
Self::extract_non_errors_from_expr_mut(input).map(scalars.iter().cloned());
*expr = input.take_dangerous();
mfp
}
MirRelationExpr::Filter { input, predicates }
if predicates.iter().all(|p| !p.is_literal_err()) =>
{
let mfp = Self::extract_non_errors_from_expr_mut(input)
.filter(predicates.iter().cloned());
*expr = input.take_dangerous();
mfp
}
MirRelationExpr::Project { input, outputs } => {
let mfp =
Self::extract_non_errors_from_expr_mut(input).project(outputs.iter().cloned());
*expr = input.take_dangerous();
mfp
}
x => Self::new(x.arity()),
}
}
/// Extracts temporal predicates into their own `Self`.
///
/// Expressions that are used by the temporal predicates are exposed by `self.projection`,
/// though there could be justification for extracting them as well if they are otherwise
/// unused.
///
/// This separation is valuable when the execution cannot be fused into one operator.
pub fn extract_temporal(&mut self) -> Self {
// Optimize the expression, as it is only post-optimization that we can be certain
// that temporal expressions are restricted to filters. We could relax this in the
// future to be only `inline_expressions` and `remove_undemanded`, but optimization
// seems to be the best fit at the moment.
self.optimize();
// Assert that we no longer have temporal expressions to evaluate. This should only
// occur if the optimization above results with temporal expressions yielded in the
// output, which is out of spec for how the type is meant to be used.
assert!(!self.expressions.iter().any(|e| e.contains_temporal()));
// Extract temporal predicates from `self.predicates`.
let mut temporal_predicates = Vec::new();
self.predicates.retain(|(_position, predicate)| {
if predicate.contains_temporal() {
temporal_predicates.push(predicate.clone());
false
} else {
true
}
});
// Determine extended input columns used by temporal filters.
let mut support = BTreeSet::new();
for predicate in temporal_predicates.iter() {
support.extend(predicate.support());
}
// Discover the locations of these columns after `self.projection`.
let old_projection_len = self.projection.len();
let mut new_location = BTreeMap::new();
for original in support.iter() {
if let Some(position) = self.projection.iter().position(|x| x == original) {
new_location.insert(*original, position);
} else {
new_location.insert(*original, self.projection.len());
self.projection.push(*original);
}
}
// Permute references in extracted predicates to their new locations.
for predicate in temporal_predicates.iter_mut() {
predicate.permute_map(&new_location);
}
// Form a new `Self` containing the temporal predicates to return.
Self::new(self.projection.len())
.filter(temporal_predicates)
.project(0..old_projection_len)
}
/// Extracts common expressions from multiple `Self` into a result `Self`.
///
/// The argument `mfps` are mutated so that each are functionaly equivalent to their
/// corresponding input, when composed atop the resulting `Self`.
pub fn extract_common(mfps: &mut [&mut Self]) -> Self {
match mfps.len() {
0 => {
panic!("Cannot call method on empty arguments");
}
1 => {
let output_arity = mfps[0].projection.len();
std::mem::replace(mfps[0], MapFilterProject::new(output_arity))
}
_ => {
// Prepare a return `Self`.
let input_arity = mfps[0].input_arity;
let mut result_mfp = MapFilterProject::new(input_arity);
// Naive strategy:
// First, look for identical predicates and extract them.
// Then, look for unused columns and project them away.
// First, look for identical predicates and extract them.
// The trouble here is CSE, as predicates may not "look"
// identical despite being identical.
// unimplemented!()
// Then, look for unused columns and project them away.
let mut common_demand = BTreeSet::new();
for mfp in mfps.iter() {
common_demand.extend(mfp.demand());
}
// columns in `common_demand` must be retained, but others
// may be discarded.
let common_demand = (0..input_arity)
.filter(|x| common_demand.contains(x))
.collect::<Vec<_>>();
let remap = common_demand
.iter()
.cloned()
.enumerate()
.map(|(new, old)| (old, new))
.collect::<BTreeMap<_, _>>();
for mfp in mfps.iter_mut() {
mfp.permute(remap.clone(), common_demand.len());
}
result_mfp = result_mfp.project(common_demand);
// Return the resulting MFP.
result_mfp
}
}
}
/// Returns `self`, and leaves behind an identity operator that acts on its output.
pub fn take(&mut self) -> Self {
let mut identity = Self::new(self.projection.len());
std::mem::swap(self, &mut identity);
identity
}
/// Convert the `MapFilterProject` into a staged evaluation plan.
///
/// The main behavior is extract temporal predicates, which cannot be evaluated
/// using the standard machinery.
pub fn into_plan(self) -> Result<plan::MfpPlan, String> {
plan::MfpPlan::create_from(self)
}
}
impl MapFilterProject {
/// Partitions `self` into two instances, one of which can be eagerly applied.
///
/// The `available` argument indicates which input columns are available (keys)
/// and in which positions (values). This information may allow some maps and
/// filters to execute. The `input_arity` argument reports the total number of
/// input columns (which may include some not present in `available`)
///
/// This method partitions `self` in two parts, `(before, after)`, where `before`
/// can be applied on columns present as keys in `available`, and `after` must
/// await the introduction of the other input columns.
///
/// The `before` instance will *append* any columns that can be determined from
/// `available` but will project away any of these columns that are not needed by
/// `after`. Importantly, this means that `before` will leave intact *all* input
/// columns including those not referenced in `available`.
///
/// The `after` instance will presume all input columns are available, followed
/// by the appended columns of the `before` instance. It may be that some input
/// columns can be projected away in `before` if `after` does not need them, but
/// we leave that as something the caller can apply if needed (it is otherwise
/// complicated to negotiate which input columns `before` should retain).
///
/// To correctly reconstruct `self` from `before` and `after`, one must introduce
/// additional input columns, permute all input columns to their locations as
/// expected by `self`, follow this by new columns appended by `before`, and
/// remove all other columns that may be present.
///
/// # Example
///
/// ```rust
/// use mz_expr::{BinaryFunc, MapFilterProject, MirScalarExpr};
///
/// // imagine an action on columns (a, b, c, d).
/// let original = MapFilterProject::new(4).map(vec![
/// MirScalarExpr::column(0).call_binary(MirScalarExpr::column(1), BinaryFunc::AddInt64),
/// MirScalarExpr::column(2).call_binary(MirScalarExpr::column(4), BinaryFunc::AddInt64),
/// MirScalarExpr::column(3).call_binary(MirScalarExpr::column(5), BinaryFunc::AddInt64),
/// ]).project(vec![6]);
///
/// // Imagine we start with columns (b, x, a, y, c).
/// //
/// // The `partition` method requires a map from *expected* input columns to *actual*
/// // input columns. In the example above, the columns a, b, and c exist, and are at
/// // locations 2, 0, and 4 respectively. We must construct a map to this effect.
/// let mut available_columns = std::collections::BTreeMap::new();
/// available_columns.insert(0, 2);
/// available_columns.insert(1, 0);
/// available_columns.insert(2, 4);
/// // Partition `original` using the available columns and current input arity.
/// // This informs `partition` which columns are available, where they can be found,
/// // and how many columns are not relevant but should be preserved.
/// let (before, after) = original.partition(available_columns, 5);
///
/// // `before` sees all five input columns, and should append `a + b + c`.
/// assert_eq!(before, MapFilterProject::new(5).map(vec![
/// MirScalarExpr::column(2).call_binary(MirScalarExpr::column(0), BinaryFunc::AddInt64),
/// MirScalarExpr::column(4).call_binary(MirScalarExpr::column(5), BinaryFunc::AddInt64),
/// ]).project(vec![0, 1, 2, 3, 4, 6]));
///
/// // `after` expects to see `(a, b, c, d, a + b + c)`.
/// assert_eq!(after, MapFilterProject::new(5).map(vec![
/// MirScalarExpr::column(3).call_binary(MirScalarExpr::column(4), BinaryFunc::AddInt64)
/// ]).project(vec![5]));
///
/// // To reconstruct `self`, we must introduce the columns that are not present,
/// // and present them in the order intended by `self`. In this example, we must
/// // introduce column d and permute the columns so that they begin (a, b, c, d).
/// // The columns x and y must be projected away, and any columns introduced by
/// // `begin` must be retained in their current order.
///
/// // The `after` instance expects to be provided with all inputs, but it
/// // may not need all inputs. The `demand()` and `permute()` methods can
/// // optimize the representation.
/// ```
pub fn partition(self, available: BTreeMap<usize, usize>, input_arity: usize) -> (Self, Self) {
// Map expressions, filter predicates, and projections for `before` and `after`.
let mut before_expr = Vec::new();
let mut before_pred = Vec::new();
let mut before_proj = Vec::new();
let mut after_expr = Vec::new();
let mut after_pred = Vec::new();
let mut after_proj = Vec::new();
// Track which output columns must be preserved in the output of `before`.
let mut demanded = BTreeSet::new();
demanded.extend(0..self.input_arity);
demanded.extend(self.projection.iter());
// Determine which map expressions can be computed from the available subset.
// Some expressions may depend on other expressions, but by evaluating them
// in forward order we should accurately determine the available expressions.
let mut available_expr = vec![false; self.input_arity];
// Initialize available columns from `available`, which is then not used again.
for index in available.keys() {
available_expr[*index] = true;
}
for expr in self.expressions.into_iter() {
// We treat an expression as available if its supporting columns are available,
// and if it is not a literal (we want to avoid pushing down literals). This
// choice is ad-hoc, but the intent is that we partition the operators so
// that we can reduce the row representation size and total computation.
// Pushing down literals harms the former and does nothing for the latter.
// In the future, we'll want to have a harder think about this trade-off, as
// we are certainly making sub-optimal decisions by pushing down all available
// work.
// TODO(mcsherry): establish better principles about what work to push down.
let is_available =
expr.support().into_iter().all(|i| available_expr[i]) && !expr.is_literal();
if is_available {
before_expr.push(expr);
} else {
demanded.extend(expr.support());
after_expr.push(expr);
}
available_expr.push(is_available);
}
// Determine which predicates can be computed from the available subset.
for (_when, pred) in self.predicates.into_iter() {
let is_available = pred.support().into_iter().all(|i| available_expr[i]);
if is_available {
before_pred.push(pred);
} else {
demanded.extend(pred.support());
after_pred.push(pred);
}
}
// Map from prior output location to location in un-projected `before`.
// This map is used to correct references in `before` but it should be
// adjusted to reflect `before`s projection prior to use in `after`.
let mut before_map = available;
// Input columns include any additional undescribed columns that may
// not be captured by the `available` argument, so we must independently
// track the current number of columns (vs relying on `before_map.len()`).
let mut input_columns = input_arity;
for index in self.input_arity..available_expr.len() {
if available_expr[index] {
before_map.insert(index, input_columns);
input_columns += 1;
}
}
// Permute the column references in `before` expressions and predicates.
for expr in before_expr.iter_mut() {
expr.permute_map(&before_map);
}
for pred in before_pred.iter_mut() {
pred.permute_map(&before_map);
}
// Demand information determines `before`s output projection.
// Specifically, we produce all input columns in the output, as well as
// any columns that are available and demanded.
before_proj.extend(0..input_arity);
for index in self.input_arity..available_expr.len() {
// If an intermediate result is both available and demanded,
// we should produce it as output.
if available_expr[index] && demanded.contains(&index) {
// Use the new location of `index`.
before_proj.push(before_map[&index]);
}
}
// Map from prior output locations to location in post-`before` columns.
// This map is used to correct references in `after`.
// The presumption is that `after` will be presented with all input columns,
// followed by the output columns introduced by `before` in order.
let mut after_map = BTreeMap::new();
for index in 0..self.input_arity {
after_map.insert(index, index);
}
for index in self.input_arity..available_expr.len() {
// If an intermediate result is both available and demanded,
// it was produced as output.
if available_expr[index] && demanded.contains(&index) {
// We expect to find the output as far after `self.input_arity` as
// it was produced after `input_arity` in the output of `before`.
let location = self.input_arity
+ (before_proj
.iter()
.position(|x| x == &before_map[&index])
.unwrap()
- input_arity);
after_map.insert(index, location);
}
}
// We must now re-map the remaining non-demanded expressions, which are
// contiguous rather than potentially interspersed.
for index in self.input_arity..available_expr.len() {
if !available_expr[index] {
after_map.insert(index, after_map.len());
}
}
// Permute the column references in `after` expressions and predicates.
for expr in after_expr.iter_mut() {
expr.permute_map(&after_map);
}
for pred in after_pred.iter_mut() {
pred.permute_map(&after_map);
}
// Populate `after` projection with the new locations of `self.projection`.
for index in self.projection {
after_proj.push(after_map[&index]);
}
// Form and return the before and after MapFilterProject instances.
let before = Self::new(input_arity)
.map(before_expr)
.filter(before_pred)
.project(before_proj.clone());
let after = Self::new(self.input_arity + (before_proj.len() - input_arity))
.map(after_expr)
.filter(after_pred)
.project(after_proj);
(before, after)
}
/// Lists input columns whose values are used in outputs.
///
/// It is entirely appropriate to determine the demand of an instance
/// and then both apply a projection to the subject of the instance and
/// `self.permute` this instance.
pub fn demand(&self) -> BTreeSet<usize> {
let mut demanded = BTreeSet::new();
for (_index, pred) in self.predicates.iter() {
demanded.extend(pred.support());
}
demanded.extend(self.projection.iter().cloned());
for index in (0..self.expressions.len()).rev() {
if demanded.contains(&(self.input_arity + index)) {
demanded.extend(self.expressions[index].support());
}
}
demanded.retain(|col| col < &self.input_arity);
demanded
}
/// Update input column references, due to an input projection or permutation.
///
/// The `shuffle` argument remaps expected column identifiers to new locations,
/// with the expectation that `shuffle` describes all input columns, and so the
/// intermediate results will be able to start at position `shuffle.len()`.
///
/// The supplied `shuffle` might not list columns that are not "demanded" by the
/// instance, and so we should ensure that `self` is optimized to not reference
/// columns that are not demanded.
pub fn permute(&mut self, mut shuffle: BTreeMap<usize, usize>, new_input_arity: usize) {
let (mut map, mut filter, mut project) = self.as_map_filter_project();
for index in 0..map.len() {
// Intermediate columns are just shifted.
shuffle.insert(self.input_arity + index, new_input_arity + index);
}
for expr in map.iter_mut() {
expr.permute_map(&shuffle);
}
for pred in filter.iter_mut() {
pred.permute_map(&shuffle);
}
for proj in project.iter_mut() {
assert!(shuffle[proj] < new_input_arity + map.len());
*proj = shuffle[proj];
}
*self = Self::new(new_input_arity)
.map(map)
.filter(filter)
.project(project)
}
}
// Optimization routines.
impl MapFilterProject {
/// Optimize the internal expression evaluation order.
///
/// This method performs several optimizations that are meant to streamline
/// the execution of the `MapFilterProject` instance, but not to alter its
/// semantics. This includes extracting expressions that are used multiple
/// times, inlining those that are not, and removing expressions that are
/// unreferenced.
///
/// This method will inline all temporal expressions, and remove any columns
/// that are not demanded by the output, which should transform any temporal
/// filters to a state where the temporal expressions exist only in the list
/// of predicates.
///
/// # Example
///
/// This example demonstrates how the re-use of one expression, converting
/// column 1 from a string to an integer, can be extracted and the results
/// shared among the two uses. This example is used for each of the steps
/// along the optimization path.
///
/// ```rust
/// use mz_expr::{func, MapFilterProject, MirScalarExpr, UnaryFunc, BinaryFunc};
/// // Demonstrate extraction of common expressions (here: parsing strings).
/// let mut map_filter_project = MapFilterProject::new(5)
/// .map(vec![
/// MirScalarExpr::column(0).call_unary(UnaryFunc::CastStringToInt64(func::CastStringToInt64)).call_binary(MirScalarExpr::column(1).call_unary(UnaryFunc::CastStringToInt64(func::CastStringToInt64)), BinaryFunc::AddInt64),
/// MirScalarExpr::column(1).call_unary(UnaryFunc::CastStringToInt64(func::CastStringToInt64)).call_binary(MirScalarExpr::column(2).call_unary(UnaryFunc::CastStringToInt64(func::CastStringToInt64)), BinaryFunc::AddInt64),
/// ])
/// .project(vec![3,4,5,6]);
///
/// let mut expected_optimized = MapFilterProject::new(5)
/// .map(vec![
/// MirScalarExpr::column(1).call_unary(UnaryFunc::CastStringToInt64(func::CastStringToInt64)),
/// MirScalarExpr::column(0).call_unary(UnaryFunc::CastStringToInt64(func::CastStringToInt64)).call_binary(MirScalarExpr::column(5), BinaryFunc::AddInt64),
/// MirScalarExpr::column(5).call_binary(MirScalarExpr::column(2).call_unary(UnaryFunc::CastStringToInt64(func::CastStringToInt64)), BinaryFunc::AddInt64),
/// ])
/// .project(vec![3,4,6,7]);
///
/// // Optimize the expression.
/// map_filter_project.optimize();
///
/// assert_eq!(
/// map_filter_project,
/// expected_optimized,
/// );
/// ```
pub fn optimize(&mut self) {
// Track sizes and iterate as long as they decrease.
let mut prev_size = None;
let mut self_size = usize::max_value();
// Continue as long as strict improvements occur.
while prev_size.map(|p| self_size < p).unwrap_or(true) {
// Lock in current size.
prev_size = Some(self_size);
// We have an annoying pattern of mapping literals that already exist as columns (by filters).
// Try to identify this pattern, of a map that introduces an expression equated to a prior column,
// and then replace the mapped expression by a column reference.
//
// We think this is due to `LiteralLifting`, and we might investigate removing the introduciton in
// the first place. The tell-tale that we see when we fix is a diff that look likes
//
// - Project (#0, #2)
// - Filter (#1 = 1)
// - Map (1)
// - Get l0
// + Filter (#1 = 1)
// + Get l0
//
for (index, expr) in self.expressions.iter_mut().enumerate() {
// If `expr` matches a filter equating it to a column < index + input_arity, rewrite it
for (_, predicate) in self.predicates.iter() {
if let MirScalarExpr::CallBinary {
func: crate::BinaryFunc::Eq,
expr1,
expr2,
} = predicate
{
if let MirScalarExpr::Column(c) = &**expr1 {
if *c < index + self.input_arity && &**expr2 == expr {
*expr = MirScalarExpr::Column(*c);
}
}
if let MirScalarExpr::Column(c) = &**expr2 {
if *c < index + self.input_arity && &**expr1 == expr {
*expr = MirScalarExpr::Column(*c);
}
}
}
}
}
// Optimization memoizes individual `ScalarExpr` expressions that
// are sure to be evaluated, canonicalizes references to the first
// occurrence of each, inlines expressions that have a reference
// count of one, and then removes any expressions that are not
// referenced.
self.memoize_expressions();
self.predicates.sort();
self.predicates.dedup();
self.inline_expressions();
self.remove_undemanded();
// Re-build `self` from parts to restore evaluation order invariants.
let (map, filter, project) = self.as_map_filter_project();
*self = Self::new(self.input_arity)
.map(map)
.filter(filter)
.project(project);
self_size = self.size();
}
}
/// Total expression sizes across all expressions.
pub fn size(&self) -> usize {
self.expressions.iter().map(|e| e.size()).sum::<usize>()
+ self.predicates.iter().map(|(_, e)| e.size()).sum::<usize>()
}
/// Place each certainly evaluated expression in its own column.
///
/// This method places each non-trivial, certainly evaluated expression
/// in its own column, and deduplicates them so that all references to
/// the same expression reference the same column.
///
/// This transformation is restricted to expressions we are certain will
/// be evaluated, which does not include expressions in `if` statements.
///
/// # Example
///
/// This example demonstrates how memoization notices `MirScalarExpr`s
/// that are used multiple times, and ensures that each are extracted
/// into columns and then referenced by column. This pass does not try
/// to minimize the occurrences of column references, which will happen
/// in inlining.
///
/// ```rust
/// use mz_expr::{func, MapFilterProject, MirScalarExpr, UnaryFunc, BinaryFunc};
/// // Demonstrate extraction of common expressions (here: parsing strings).
/// let mut map_filter_project = MapFilterProject::new(5)
/// .map(vec![
/// MirScalarExpr::column(0).call_unary(UnaryFunc::CastStringToInt64(func::CastStringToInt64)).call_binary(MirScalarExpr::column(1).call_unary(UnaryFunc::CastStringToInt64(func::CastStringToInt64)), BinaryFunc::AddInt64),
/// MirScalarExpr::column(1).call_unary(UnaryFunc::CastStringToInt64(func::CastStringToInt64)).call_binary(MirScalarExpr::column(2).call_unary(UnaryFunc::CastStringToInt64(func::CastStringToInt64)), BinaryFunc::AddInt64),
/// ])
/// .project(vec![3,4,5,6]);
///
/// let mut expected_optimized = MapFilterProject::new(5)
/// .map(vec![
/// MirScalarExpr::column(0).call_unary(UnaryFunc::CastStringToInt64(func::CastStringToInt64)),
/// MirScalarExpr::column(1).call_unary(UnaryFunc::CastStringToInt64(func::CastStringToInt64)),
/// MirScalarExpr::column(5).call_binary(MirScalarExpr::column(6), BinaryFunc::AddInt64),
/// MirScalarExpr::column(7),
/// MirScalarExpr::column(2).call_unary(UnaryFunc::CastStringToInt64(func::CastStringToInt64)),
/// MirScalarExpr::column(6).call_binary(MirScalarExpr::column(9), BinaryFunc::AddInt64),
/// MirScalarExpr::column(10),
/// ])
/// .project(vec![3,4,8,11]);
///
/// // Memoize expressions, ensuring uniqueness of each `MirScalarExpr`.
/// map_filter_project.memoize_expressions();
///
/// assert_eq!(
/// map_filter_project,
/// expected_optimized,
/// );
/// ```
///
/// Expressions may not be memoized if they are not certain to be evaluated,
/// for example if they occur in conditional branches of a `MirScalarExpr::If`.
///
/// ```rust
/// use mz_expr::{MapFilterProject, MirScalarExpr, UnaryFunc, BinaryFunc};
/// // Demonstrate extraction of unconditionally evaluated expressions, as well as
/// // the non-extraction of common expressions guarded by conditions.
/// let mut map_filter_project = MapFilterProject::new(2)
/// .map(vec![
/// MirScalarExpr::If {
/// cond: Box::new(MirScalarExpr::column(0).call_binary(MirScalarExpr::column(1), BinaryFunc::Lt)),
/// then: Box::new(MirScalarExpr::column(0).call_binary(MirScalarExpr::column(1), BinaryFunc::DivInt64)),
/// els: Box::new(MirScalarExpr::column(1).call_binary(MirScalarExpr::column(0), BinaryFunc::DivInt64)),
/// },
/// MirScalarExpr::If {
/// cond: Box::new(MirScalarExpr::column(0).call_binary(MirScalarExpr::column(1), BinaryFunc::Lt)),
/// then: Box::new(MirScalarExpr::column(1).call_binary(MirScalarExpr::column(0), BinaryFunc::DivInt64)),
/// els: Box::new(MirScalarExpr::column(0).call_binary(MirScalarExpr::column(1), BinaryFunc::DivInt64)),
/// },
/// ]);
///
/// let mut expected_optimized = MapFilterProject::new(2)
/// .map(vec![
/// MirScalarExpr::column(0).call_binary(MirScalarExpr::column(1), BinaryFunc::Lt),
/// MirScalarExpr::If {
/// cond: Box::new(MirScalarExpr::column(2)),
/// then: Box::new(MirScalarExpr::column(0).call_binary(MirScalarExpr::column(1), BinaryFunc::DivInt64)),
/// els: Box::new(MirScalarExpr::column(1).call_binary(MirScalarExpr::column(0), BinaryFunc::DivInt64)),
/// },
/// MirScalarExpr::column(3),
/// MirScalarExpr::If {
/// cond: Box::new(MirScalarExpr::column(2)),
/// then: Box::new(MirScalarExpr::column(1).call_binary(MirScalarExpr::column(0), BinaryFunc::DivInt64)),
/// els: Box::new(MirScalarExpr::column(0).call_binary(MirScalarExpr::column(1), BinaryFunc::DivInt64)),
/// },
/// MirScalarExpr::column(5),
/// ])
/// .project(vec![0,1,4,6]);
///
/// // Memoize expressions, ensuring uniqueness of each `MirScalarExpr`.
/// map_filter_project.memoize_expressions();
///
/// assert_eq!(
/// map_filter_project,
/// expected_optimized,
/// );
/// ```
pub fn memoize_expressions(&mut self) {
// Record the mapping from starting column references to new column
// references.
let mut remaps = BTreeMap::new();
for index in 0..self.input_arity {
remaps.insert(index, index);
}
let mut new_expressions = Vec::new();
// We follow the same order as for evaluation, to ensure that all
// column references exist in time for their evaluation. We could
// prioritize predicates, but we would need to be careful to chase
// down column references to expressions and memoize those as well.
let mut expression = 0;
for (support, predicate) in self.predicates.iter_mut() {
while self.input_arity + expression < *support {
self.expressions[expression].permute_map(&remaps);
memoize_expr(
&mut self.expressions[expression],
&mut new_expressions,
self.input_arity,
);
remaps.insert(
self.input_arity + expression,
self.input_arity + new_expressions.len(),
);
new_expressions.push(self.expressions[expression].clone());
expression += 1;
}
predicate.permute_map(&remaps);
memoize_expr(predicate, &mut new_expressions, self.input_arity);
}
while expression < self.expressions.len() {
self.expressions[expression].permute_map(&remaps);
memoize_expr(
&mut self.expressions[expression],
&mut new_expressions,
self.input_arity,
);
remaps.insert(
self.input_arity + expression,
self.input_arity + new_expressions.len(),
);
new_expressions.push(self.expressions[expression].clone());
expression += 1;
}
self.expressions = new_expressions;
for proj in self.projection.iter_mut() {
*proj = remaps[proj];
}
// Restore predicate order invariants.
for (pos, pred) in self.predicates.iter_mut() {
*pos = pred.support().into_iter().max().map(|x| x + 1).unwrap_or(0);
}
}
/// This method inlines expressions with a single use.
///
/// This method only inlines expressions; it does not delete expressions
/// that are no longer referenced. The `remove_undemanded()` method does
/// that, and should likely be used after this method.
///
/// Inlining replaces column references when the referred-to item is either
/// another column reference, or the only referrer of its referent. This
/// is most common after memoization has atomized all expressions to seek
/// out re-use: inlining re-assembles expressions that were not helpfully
/// shared with other expressions.
///
/// # Example
///
/// In this example, we see that with only a single reference to columns
/// 0 and 2, their parsing can each be inlined. Similarly, column references
/// can be cleaned up among expressions, and in the final projection.
///
/// Also notice the remaining expressions, which can be cleaned up in a later
/// pass (the `remove_undemanded` method).
///
/// ```rust
/// use mz_expr::{func, MapFilterProject, MirScalarExpr, UnaryFunc, BinaryFunc};
/// // Use the output from first `memoize_expression` example.
/// let mut map_filter_project = MapFilterProject::new(5)
/// .map(vec![
/// MirScalarExpr::column(0).call_unary(UnaryFunc::CastStringToInt64(func::CastStringToInt64)),
/// MirScalarExpr::column(1).call_unary(UnaryFunc::CastStringToInt64(func::CastStringToInt64)),
/// MirScalarExpr::column(5).call_binary(MirScalarExpr::column(6), BinaryFunc::AddInt64),
/// MirScalarExpr::column(7),
/// MirScalarExpr::column(2).call_unary(UnaryFunc::CastStringToInt64(func::CastStringToInt64)),
/// MirScalarExpr::column(6).call_binary(MirScalarExpr::column(9), BinaryFunc::AddInt64),
/// MirScalarExpr::column(10),
/// ])
/// .project(vec![3,4,8,11]);
///
/// let mut expected_optimized = MapFilterProject::new(5)
/// .map(vec![
/// MirScalarExpr::column(0).call_unary(UnaryFunc::CastStringToInt64(func::CastStringToInt64)),
/// MirScalarExpr::column(1).call_unary(UnaryFunc::CastStringToInt64(func::CastStringToInt64)),
/// MirScalarExpr::column(0).call_unary(UnaryFunc::CastStringToInt64(func::CastStringToInt64)).call_binary(MirScalarExpr::column(6), BinaryFunc::AddInt64),
/// MirScalarExpr::column(0).call_unary(UnaryFunc::CastStringToInt64(func::CastStringToInt64)).call_binary(MirScalarExpr::column(6), BinaryFunc::AddInt64),
/// MirScalarExpr::column(2).call_unary(UnaryFunc::CastStringToInt64(func::CastStringToInt64)),
/// MirScalarExpr::column(6).call_binary(MirScalarExpr::column(2).call_unary(UnaryFunc::CastStringToInt64(func::CastStringToInt64)), BinaryFunc::AddInt64),
/// MirScalarExpr::column(6).call_binary(MirScalarExpr::column(2).call_unary(UnaryFunc::CastStringToInt64(func::CastStringToInt64)), BinaryFunc::AddInt64),
/// ])
/// .project(vec![3,4,8,11]);
///
/// // Inline expressions that are referenced only once.
/// map_filter_project.inline_expressions();
///
/// assert_eq!(
/// map_filter_project,
/// expected_optimized,
/// );
/// ```
pub fn inline_expressions(&mut self) {
// Local copy of input_arity to avoid borrowing `self` in closures.
let input_arity = self.input_arity;
// Reference counts track the number of places that a reference occurs.
let mut reference_count = vec![0; input_arity + self.expressions.len()];
// Increment reference counts for each use
for expr in self.expressions.iter() {
expr.visit_pre(|e| {
if let MirScalarExpr::Column(i) = e {
reference_count[*i] += 1;
}
});
}
for (_, pred) in self.predicates.iter() {
pred.visit_pre(|e| {
if let MirScalarExpr::Column(i) = e {
reference_count[*i] += 1;
}
});
}
for proj in self.projection.iter() {
reference_count[*proj] += 1;
}
// Determine which expressions should be inlined because they reference temporal expressions.
let mut is_temporal = vec![false; input_arity];
for expr in self.expressions.iter() {
// An express may contain a temporal expression, or reference a column containing such.
is_temporal.push(
expr.contains_temporal() || expr.support().into_iter().any(|col| is_temporal[col]),
);
}
// Inline only those columns that 1. are expressions not inputs, and
// 2a. are column references or literals or 2b. have a refcount of 1,
// or 2c. reference temporal expressions (which cannot be evaluated).
let mut should_inline = vec![false; reference_count.len()];
for i in (input_arity..reference_count.len()).rev() {
if let MirScalarExpr::Column(c) = self.expressions[i - input_arity] {
should_inline[i] = true;
// The reference count of the referenced column should be
// incremented with the number of references
// `self.expressions[i - input_arity]` has.
// Subtract 1 because `self.expressions[i - input_arity]` is
// itself a reference.
reference_count[c] += reference_count[i] - 1;
} else {
should_inline[i] = reference_count[i] == 1 || is_temporal[i];
}
}
// Inline expressions per `should_inline`.
self.perform_inlining(should_inline);
// We can only inline column references in `self.projection`, but we should.
for proj in self.projection.iter_mut() {
if *proj >= self.input_arity {
if let MirScalarExpr::Column(i) = self.expressions[*proj - self.input_arity] {
*proj = i;
}
}
}
}
/// Inlines those expressions that are indicated by should_inline.
/// See `inline_expressions` for usage.
pub fn perform_inlining(&mut self, should_inline: Vec<bool>) {
for index in 0..self.expressions.len() {
let (prior, expr) = self.expressions.split_at_mut(index);
#[allow(deprecated)]
expr[0].visit_mut_post_nolimit(&mut |e| {
if let MirScalarExpr::Column(i) = e {
if should_inline[*i] {
*e = prior[*i - self.input_arity].clone();
}
}
});
}
for (_index, pred) in self.predicates.iter_mut() {
let expressions = &self.expressions;
#[allow(deprecated)]
pred.visit_mut_post_nolimit(&mut |e| {
if let MirScalarExpr::Column(i) = e {
if should_inline[*i] {
*e = expressions[*i - self.input_arity].clone();
}
}
});
}
}
/// Removes unused expressions from `self.expressions`.
///
/// Expressions are "used" if they are relied upon by any output columns
/// or any predicates, even transitively. Any expressions that are not
/// relied upon in this way can be discarded.
///
/// # Example
///
/// ```rust
/// use mz_expr::{func, MapFilterProject, MirScalarExpr, UnaryFunc, BinaryFunc};
/// // Use the output from `inline_expression` example.
/// let mut map_filter_project = MapFilterProject::new(5)
/// .map(vec![
/// MirScalarExpr::column(0).call_unary(UnaryFunc::CastStringToInt64(func::CastStringToInt64)),
/// MirScalarExpr::column(1).call_unary(UnaryFunc::CastStringToInt64(func::CastStringToInt64)),
/// MirScalarExpr::column(0).call_unary(UnaryFunc::CastStringToInt64(func::CastStringToInt64)).call_binary(MirScalarExpr::column(6), BinaryFunc::AddInt64),
/// MirScalarExpr::column(0).call_unary(UnaryFunc::CastStringToInt64(func::CastStringToInt64)).call_binary(MirScalarExpr::column(6), BinaryFunc::AddInt64),
/// MirScalarExpr::column(2).call_unary(UnaryFunc::CastStringToInt64(func::CastStringToInt64)),
/// MirScalarExpr::column(6).call_binary(MirScalarExpr::column(2).call_unary(UnaryFunc::CastStringToInt64(func::CastStringToInt64)), BinaryFunc::AddInt64),
/// MirScalarExpr::column(6).call_binary(MirScalarExpr::column(2).call_unary(UnaryFunc::CastStringToInt64(func::CastStringToInt64)), BinaryFunc::AddInt64),
/// ])
/// .project(vec![3,4,8,11]);
///
/// let mut expected_optimized = MapFilterProject::new(5)
/// .map(vec![
/// MirScalarExpr::column(1).call_unary(UnaryFunc::CastStringToInt64(func::CastStringToInt64)),
/// MirScalarExpr::column(0).call_unary(UnaryFunc::CastStringToInt64(func::CastStringToInt64)).call_binary(MirScalarExpr::column(5), BinaryFunc::AddInt64),
/// MirScalarExpr::column(5).call_binary(MirScalarExpr::column(2).call_unary(UnaryFunc::CastStringToInt64(func::CastStringToInt64)), BinaryFunc::AddInt64),
/// ])
/// .project(vec![3,4,6,7]);
///
/// // Remove undemanded expressions, streamlining the work done..
/// map_filter_project.remove_undemanded();
///
/// assert_eq!(
/// map_filter_project,
/// expected_optimized,
/// );
/// ```
pub fn remove_undemanded(&mut self) {
// Determine the demanded expressions to remove irrelevant ones.
let mut demand = BTreeSet::new();
for (_index, pred) in self.predicates.iter() {
demand.extend(pred.support());
}
// Start from the output columns as presumed demanded.
// If this is not the case, the caller should project some away.
demand.extend(self.projection.iter().cloned());
// Proceed in *reverse* order, as expressions may depend on other
// expressions that precede them.
for index in (0..self.expressions.len()).rev() {
if demand.contains(&(self.input_arity + index)) {
demand.extend(self.expressions[index].support());
}
}
// Maintain a map from initial column identifiers to locations
// once we have removed undemanded expressions.
let mut remap = BTreeMap::new();
// This map only needs to map elements of `demand` to a new location,
// but the logic is easier if we include all input columns (as the
// new position is then determined by the size of the map).
for index in 0..self.input_arity {
remap.insert(index, index);
}
// Retain demanded expressions, and record their new locations.
let mut new_expressions = Vec::new();
for (index, expr) in self.expressions.drain(..).enumerate() {
if demand.contains(&(index + self.input_arity)) {
remap.insert(index + self.input_arity, remap.len());
new_expressions.push(expr);
}
}
self.expressions = new_expressions;
// Update column identifiers; rebuild `Self` to re-establish any invariants.
// We mirror `self.permute(&remap)` but we specifically want to remap columns
// that are produced by `self.expressions` after the input columns.
let (expressions, predicates, projection) = self.as_map_filter_project();
*self = Self::new(self.input_arity)
.map(expressions.into_iter().map(|mut e| {
e.permute_map(&remap);
e
}))
.filter(predicates.into_iter().map(|mut p| {
p.permute_map(&remap);
p
}))
.project(projection.into_iter().map(|c| remap[&c]));
}
}
// TODO: move this elsewhere?
/// Recursively memoize parts of `expr`, storing those parts in `memoized_parts`.
///
/// A part of `expr` that is memoized is replaced by a reference to column
/// `(input_arity + pos)`, where `pos` is the position of the memoized part in
/// `memoized_parts`, and `input_arity` is the arity of the input that `expr`
/// refers to.
pub fn memoize_expr(
expr: &mut MirScalarExpr,
memoized_parts: &mut Vec<MirScalarExpr>,
input_arity: usize,
) {
#[allow(deprecated)]
expr.visit_mut_pre_post_nolimit(
&mut |e| {
// We should not eagerly memoize `if` branches that might not be taken.
// TODO: Memoize expressions in the intersection of `then` and `els`.
if let MirScalarExpr::If { cond, .. } = e {
return Some(vec![cond]);
}
None
},
&mut |e| {
match e {
MirScalarExpr::Literal(_, _) => {
// Literals do not need to be memoized.
}
MirScalarExpr::Column(col) => {
// Column references do not need to be memoized, but may need to be
// updated if they reference a column reference themselves.
if *col > input_arity {
if let MirScalarExpr::Column(col2) = memoized_parts[*col - input_arity] {
*col = col2;
}
}
}
_ => {
if let Some(position) = memoized_parts.iter().position(|e2| e2 == e) {
// Any complex expression that already exists as a prior column can
// be replaced by a reference to that column.
*e = MirScalarExpr::Column(input_arity + position);
} else {
// A complex expression that does not exist should be memoized, and
// replaced by a reference to the column.
memoized_parts.push(std::mem::replace(
e,
MirScalarExpr::Column(input_arity + memoized_parts.len()),
));
}
}
}
},
)
}
pub mod util {
use std::collections::BTreeMap;
use crate::MirScalarExpr;
/// Return the map associating columns in the logical,
/// unthinned representation of a collection to columns in the
/// thinned representation of the arrangement corresponding to `key`.
///
/// Returns the permutation and the thinning
/// expression that should be used to create the arrangement.
///
/// The permutations and thinning expressions generated here will be tracked in
/// `dataflow::plan::AvailableCollections`; see the
/// documentation there for more details.
pub fn permutation_for_arrangement(
key: &[MirScalarExpr],
unthinned_arity: usize,
) -> (BTreeMap<usize, usize>, Vec<usize>) {
let columns_in_key: BTreeMap<_, _> = key
.iter()
.enumerate()
.filter_map(|(i, key_col)| key_col.as_column().map(|c| (c, i)))
.collect();
let mut input_cursor = key.len();
let permutation = (0..unthinned_arity)
.map(|c| {
if let Some(c) = columns_in_key.get(&c) {
// Column is in key (and thus gone from the value
// of the thinned representation)
*c
} else {
// Column remains in value of the thinned representation
input_cursor += 1;
input_cursor - 1
}
})
.enumerate()
.collect();
let thinning = (0..unthinned_arity)
.filter(|c| !columns_in_key.contains_key(c))
.collect();
(permutation, thinning)
}
/// Given the permutations (see [`permutation_for_arrangement`] and
/// (`dataflow::plan::AvailableCollections`) corresponding to two
/// collections with the same key arity,
/// computes the permutation for the result of joining them.
pub fn join_permutations(
key_arity: usize,
stream_permutation: BTreeMap<usize, usize>,
thinned_stream_arity: usize,
lookup_permutation: BTreeMap<usize, usize>,
) -> BTreeMap<usize, usize> {
let stream_arity = stream_permutation
.keys()
.cloned()
.max()
.map(|i| i + 1)
.unwrap_or(0);
let lookup_arity = lookup_permutation
.keys()
.cloned()
.max()
.map(|i| i + 1)
.unwrap_or(0);
(0..stream_arity + lookup_arity)
.map(|i| {
let location = if i < stream_arity {
stream_permutation[&i]
} else {
let location_in_lookup = lookup_permutation[&(i - stream_arity)];
if location_in_lookup < key_arity {
location_in_lookup
} else {
location_in_lookup + thinned_stream_arity
}
};
(i, location)
})
.collect()
}
}
pub mod plan {
use std::collections::BTreeMap;
use std::iter;
use mz_proto::{IntoRustIfSome, ProtoType, RustType, TryFromProtoError};
use mz_repr::{Datum, Diff, Row, RowArena};
use proptest::prelude::*;
use proptest_derive::Arbitrary;
use serde::{Deserialize, Serialize};
use crate::{
func, BinaryFunc, EvalError, MapFilterProject, MirScalarExpr, ProtoMfpPlan,
ProtoSafeMfpPlan, UnaryFunc, UnmaterializableFunc,
};
/// A wrapper type which indicates it is safe to simply evaluate all expressions.
#[derive(Arbitrary, Clone, Debug, Serialize, Deserialize, Eq, PartialEq, Ord, PartialOrd)]
pub struct SafeMfpPlan {
pub(crate) mfp: MapFilterProject,
}
impl RustType<ProtoSafeMfpPlan> for SafeMfpPlan {
fn into_proto(&self) -> ProtoSafeMfpPlan {
ProtoSafeMfpPlan {
mfp: Some(self.mfp.into_proto()),
}
}
fn from_proto(proto: ProtoSafeMfpPlan) -> Result<Self, TryFromProtoError> {
Ok(SafeMfpPlan {
mfp: proto.mfp.into_rust_if_some("ProtoSafeMfpPlan::mfp")?,
})
}
}
impl SafeMfpPlan {
pub fn permute(&mut self, map: BTreeMap<usize, usize>, new_arity: usize) {
self.mfp.permute(map, new_arity);
}
/// Evaluates the linear operator on a supplied list of datums.
///
/// The arguments are the initial datums associated with the row,
/// and an appropriately lifetimed arena for temporary allocations
/// needed by scalar evaluation.
///
/// An `Ok` result will either be `None` if any predicate did not
/// evaluate to `Datum::True`, or the values of the columns listed
/// by `self.projection` if all predicates passed. If an error
/// occurs in the evaluation it is returned as an `Err` variant.
/// As the evaluation exits early with failed predicates, it may
/// miss some errors that would occur later in evaluation.
///
/// The `row` is not cleared first, but emptied if the function
/// returns `Ok(Some(row)).
#[inline(always)]
pub fn evaluate_into<'a, 'row>(
&'a self,
datums: &mut Vec<Datum<'a>>,
arena: &'a RowArena,
row_buf: &'row mut Row,
) -> Result<Option<Row>, EvalError> {
let passed_predicates = self.evaluate_inner(datums, arena)?;
if !passed_predicates {
Ok(None)
} else {
row_buf
.packer()
.extend(self.mfp.projection.iter().map(|c| datums[*c]));
Ok(Some(row_buf.clone()))
}
}
/// A version of `evaluate` which produces an iterator over `Datum`
/// as output.
///
/// This version can be useful when one wants to capture the resulting
/// datums without packing and then unpacking a row.
#[inline(always)]
pub fn evaluate_iter<'b, 'a: 'b>(
&'a self,
datums: &'b mut Vec<Datum<'a>>,
arena: &'a RowArena,
) -> Result<Option<impl Iterator<Item = Datum<'a>> + 'b>, EvalError> {
let passed_predicates = self.evaluate_inner(datums, arena)?;
if !passed_predicates {
Ok(None)
} else {
Ok(Some(self.mfp.projection.iter().map(move |i| datums[*i])))
}
}
/// Populates `datums` with `self.expressions` and tests `self.predicates`.
///
/// This does not apply `self.projection`, which is up to the calling method.
pub fn evaluate_inner<'b, 'a: 'b>(
&'a self,
datums: &'b mut Vec<Datum<'a>>,
arena: &'a RowArena,
) -> Result<bool, EvalError> {
let mut expression = 0;
for (support, predicate) in self.mfp.predicates.iter() {
while self.mfp.input_arity + expression < *support {
datums.push(self.mfp.expressions[expression].eval(&datums[..], arena)?);
expression += 1;
}
if predicate.eval(&datums[..], arena)? != Datum::True {
return Ok(false);
}
}
while expression < self.mfp.expressions.len() {
datums.push(self.mfp.expressions[expression].eval(&datums[..], arena)?);
expression += 1;
}
Ok(true)
}
/// Returns true if evaluation could introduce an error on non-error inputs.
pub fn could_error(&self) -> bool {
self.mfp.predicates.iter().any(|(_pos, e)| e.could_error())
|| self.mfp.expressions.iter().any(|e| e.could_error())
}
/// Returns true when `Self` is the identity.
pub fn is_identity(&self) -> bool {
self.mfp.is_identity()
}
}
impl std::ops::Deref for SafeMfpPlan {
type Target = MapFilterProject;
fn deref(&self) -> &Self::Target {
&self.mfp
}
}
/// Predicates partitioned into temporal and non-temporal.
///
/// Temporal predicates require some recognition to determine their
/// structure, and it is best to do that once and re-use the results.
///
/// There are restrictions on the temporal predicates we currently support.
/// They must directly constrain `MzNow` from below or above,
/// by expressions that do not themselves contain `MzNow`.
/// Conjunctions of such constraints are also ok.
#[derive(Arbitrary, Clone, Debug, PartialEq)]
pub struct MfpPlan {
/// Normal predicates to evaluate on `&[Datum]` and expect `Ok(Datum::True)`.
pub(crate) mfp: SafeMfpPlan,
/// Expressions that when evaluated lower-bound `MzNow`.
#[proptest(strategy = "prop::collection::vec(any::<MirScalarExpr>(), 0..2)")]
pub(crate) lower_bounds: Vec<MirScalarExpr>,
/// Expressions that when evaluated upper-bound `MzNow`.
#[proptest(strategy = "prop::collection::vec(any::<MirScalarExpr>(), 0..2)")]
pub(crate) upper_bounds: Vec<MirScalarExpr>,
}
impl RustType<ProtoMfpPlan> for MfpPlan {
fn into_proto(&self) -> ProtoMfpPlan {
ProtoMfpPlan {
mfp: Some(self.mfp.into_proto()),
lower_bounds: self.lower_bounds.into_proto(),
upper_bounds: self.upper_bounds.into_proto(),
}
}
fn from_proto(proto: ProtoMfpPlan) -> Result<Self, TryFromProtoError> {
Ok(MfpPlan {
mfp: proto.mfp.into_rust_if_some("ProtoMfpPlan::mfp")?,
lower_bounds: proto.lower_bounds.into_rust()?,
upper_bounds: proto.upper_bounds.into_rust()?,
})
}
}
impl MfpPlan {
/// Partitions `predicates` into non-temporal, and lower and upper temporal bounds.
///
/// The first returned list is of predicates that do not contain `mz_now`.
/// The second and third returned lists contain expressions that, once evaluated, lower
/// and upper bound the validity interval of a record, respectively. These second two
/// lists are populated only by binary expressions of the form
/// ```ignore
/// mz_now cmp_op expr
/// ```
/// where `cmp_op` is a comparison operator and `expr` does not contain `mz_now`.
///
/// If any unsupported expression is found, for example one that uses `mz_now`
/// in an unsupported position, an error is returned.
pub fn create_from(mut mfp: MapFilterProject) -> Result<Self, String> {
let mut lower_bounds = Vec::new();
let mut upper_bounds = Vec::new();
let mut temporal = Vec::new();
// Optimize, to ensure that temporal predicates are move in to `mfp.predicates`.
mfp.optimize();
mfp.predicates.retain(|(_position, predicate)| {
if predicate.contains_temporal() {
temporal.push(predicate.clone());
false
} else {
true
}
});
for predicate in temporal.into_iter() {
// Supported temporal predicates are exclusively binary operators.
if let MirScalarExpr::CallBinary {
mut func,
mut expr1,
mut expr2,
} = predicate
{
// Attempt to put `LogicalTimestamp` in the first argument position.
if !expr1.contains_temporal()
&& *expr2
== MirScalarExpr::CallUnmaterializable(UnmaterializableFunc::MzNow)
{
std::mem::swap(&mut expr1, &mut expr2);
func = match func {
BinaryFunc::Eq => BinaryFunc::Eq,
BinaryFunc::Lt => BinaryFunc::Gt,
BinaryFunc::Lte => BinaryFunc::Gte,
BinaryFunc::Gt => BinaryFunc::Lt,
BinaryFunc::Gte => BinaryFunc::Lte,
x => {
return Err(format!(
"Unsupported binary temporal operation: {:?}",
x
));
}
};
}
// Error if MLT is referenced in an unsupported position.
if expr2.contains_temporal()
|| *expr1
!= MirScalarExpr::CallUnmaterializable(UnmaterializableFunc::MzNow)
{
return Err(format!(
"Unsupported temporal predicate. Note: `mz_now()` must be directly compared to a mz_timestamp-castable expression. Expression found: {}",
MirScalarExpr::CallBinary { func, expr1, expr2 },
));
}
// LogicalTimestamp <OP> <EXPR2> for several supported operators.
match func {
BinaryFunc::Eq => {
lower_bounds.push(*expr2.clone());
upper_bounds.push(
expr2.call_unary(UnaryFunc::StepMzTimestamp(func::StepMzTimestamp)),
);
}
BinaryFunc::Lt => {
upper_bounds.push(*expr2.clone());
}
BinaryFunc::Lte => {
upper_bounds.push(
expr2.call_unary(UnaryFunc::StepMzTimestamp(func::StepMzTimestamp)),
);
}
BinaryFunc::Gt => {
lower_bounds.push(
expr2.call_unary(UnaryFunc::StepMzTimestamp(func::StepMzTimestamp)),
);
}
BinaryFunc::Gte => {
lower_bounds.push(*expr2.clone());
}
_ => {
return Err(format!(
"Unsupported binary temporal operation: {:?}",
func
));
}
}
} else {
return Err(format!(
"Unsupported temporal predicate. Note: `mz_now()` must be directly compared to a non-temporal expression of mz_timestamp-castable type. Expression found: {}",
predicate,
));
}
}
Ok(Self {
mfp: SafeMfpPlan { mfp },
lower_bounds,
upper_bounds,
})
}
/// Indicates if the planned `MapFilterProject` emits exactly its inputs as outputs.
pub fn is_identity(&self) -> bool {
self.mfp.mfp.is_identity()
&& self.lower_bounds.is_empty()
&& self.upper_bounds.is_empty()
}
/// Returns `self`, and leaves behind an identity operator that acts on its output.
pub fn take(&mut self) -> Self {
let mut identity = Self {
mfp: SafeMfpPlan {
mfp: MapFilterProject::new(self.mfp.projection.len()),
},
lower_bounds: Default::default(),
upper_bounds: Default::default(),
};
std::mem::swap(self, &mut identity);
identity
}
/// Attempt to convert self into a non-temporal MapFilterProject plan.
///
/// If that is not possible, the original instance is returned as an error.
#[allow(clippy::result_large_err)]
pub fn into_nontemporal(self) -> Result<SafeMfpPlan, Self> {
if self.lower_bounds.is_empty() && self.upper_bounds.is_empty() {
Ok(self.mfp)
} else {
Err(self)
}
}
/// Returns an iterator over mutable references to all non-temporal
/// scalar expressions in the plan.
///
/// The order of iteration is unspecified.
pub fn iter_nontemporal_exprs(&mut self) -> impl Iterator<Item = &mut MirScalarExpr> {
iter::empty()
.chain(self.mfp.mfp.predicates.iter_mut().map(|(_, expr)| expr))
.chain(&mut self.mfp.mfp.expressions)
.chain(&mut self.lower_bounds)
.chain(&mut self.upper_bounds)
}
/// Evaluate the predicates, temporal and non-, and return times and differences for `data`.
///
/// If `self` contains only non-temporal predicates, the result will either be `(time, diff)`,
/// or an evaluation error. If `self contains temporal predicates, the results can be times
/// that are greater than the input `time`, and may contain negated `diff` values.
///
/// The `row_builder` is not cleared first, but emptied if the function
/// returns an iterator with any `Ok(_)` element.
pub fn evaluate<'b, 'a: 'b, E: From<EvalError>, V: Fn(&mz_repr::Timestamp) -> bool>(
&'a self,
datums: &'b mut Vec<Datum<'a>>,
arena: &'a RowArena,
time: mz_repr::Timestamp,
diff: Diff,
valid_time: V,
row_builder: &mut Row,
) -> impl Iterator<Item = Result<(Row, mz_repr::Timestamp, Diff), (E, mz_repr::Timestamp, Diff)>>
{
match self.mfp.evaluate_inner(datums, arena) {
Err(e) => {
return Some(Err((e.into(), time, diff))).into_iter().chain(None);
}
Ok(true) => {}
Ok(false) => {
return None.into_iter().chain(None);
}
}
// Lower and upper bounds.
let mut lower_bound = time;
let mut upper_bound = None;
// Track whether we have seen a null in either bound, as this should
// prevent the record from being produced at any time.
let mut null_eval = false;
// Advance our lower bound to be at least the result of any lower bound
// expressions.
for l in self.lower_bounds.iter() {
match l.eval(datums, arena) {
Err(e) => {
return Some(Err((e.into(), time, diff)))
.into_iter()
.chain(None.into_iter());
}
Ok(Datum::MzTimestamp(d)) => {
lower_bound = lower_bound.max(d);
}
Ok(Datum::Null) => {
null_eval = true;
}
x => {
panic!("Non-mz_timestamp value in temporal predicate: {:?}", x);
}
}
}
// If the lower bound exceeds our `until` frontier, it should not appear in the output.
if !valid_time(&lower_bound) {
return None.into_iter().chain(None);
}
// If there are any upper bounds, determine the minimum upper bound.
for u in self.upper_bounds.iter() {
// We can cease as soon as the lower and upper bounds match,
// as the update will certainly not be produced in that case.
if upper_bound != Some(lower_bound) {
match u.eval(datums, arena) {
Err(e) => {
return Some(Err((e.into(), time, diff)))
.into_iter()
.chain(None.into_iter());
}
Ok(Datum::MzTimestamp(d)) => {
if let Some(upper) = upper_bound {
upper_bound = Some(upper.min(d));
} else {
upper_bound = Some(d);
};
// Force the upper bound to be at least the lower
// bound. The `is_some()` test should always be true
// due to the above block, but maintain it here in
// case that changes. It's hopefully optimized away.
if upper_bound.is_some() && upper_bound < Some(lower_bound) {
upper_bound = Some(lower_bound);
}
}
Ok(Datum::Null) => {
null_eval = true;
}
x => {
panic!("Non-mz_timestamp value in temporal predicate: {:?}", x);
}
}
}
}
// If the upper bound exceeds our `until` frontier, it should not appear in the output.
if let Some(upper) = &mut upper_bound {
if !valid_time(upper) {
upper_bound = None;
}
}
// Produce an output only if the upper bound exceeds the lower bound,
// and if we did not encounter a `null` in our evaluation.
if Some(lower_bound) != upper_bound && !null_eval {
row_builder
.packer()
.extend(self.mfp.mfp.projection.iter().map(|c| datums[*c]));
let upper_opt =
upper_bound.map(|upper_bound| Ok((row_builder.clone(), upper_bound, -diff)));
let lower = Some(Ok((row_builder.clone(), lower_bound, diff)));
lower.into_iter().chain(upper_opt)
} else {
None.into_iter().chain(None)
}
}
/// Returns true if evaluation could introduce an error on non-error inputs.
pub fn could_error(&self) -> bool {
self.mfp.could_error()
|| self.lower_bounds.iter().any(|e| e.could_error())
|| self.upper_bounds.iter().any(|e| e.could_error())
}
/// Indicates that `Self` ignores its input to the extent that it can be evaluated on `&[]`.
///
/// At the moment, this is only true if it projects away all columns and applies no filters,
/// but it could be extended to plans that produce literals independent of the input.
pub fn ignores_input(&self) -> bool {
self.lower_bounds.is_empty()
&& self.upper_bounds.is_empty()
&& self.mfp.mfp.projection.is_empty()
&& self.mfp.mfp.predicates.is_empty()
}
}
}
#[cfg(test)]
mod tests {
use mz_ore::assert_ok;
use mz_proto::protobuf_roundtrip;
use crate::linear::plan::*;
use super::*;
proptest! {
#![proptest_config(ProptestConfig::with_cases(32))]
#[mz_ore::test]
#[cfg_attr(miri, ignore)] // too slow
fn mfp_plan_protobuf_roundtrip(expect in any::<MfpPlan>()) {
let actual = protobuf_roundtrip::<_, ProtoMfpPlan>(&expect);
assert_ok!(actual);
assert_eq!(actual.unwrap(), expect);
}
}
}